一、概述
CPU從三十多年前的8086,到十年前的奔騰,再到當下的多核i7。一開始,以單核cpu的主頻為目標,架構的改良和集成電路工藝的進步使得cpu的性能高速上升,單核cpu的主頻從老爺車的MHz階段一度接近4GHz高地。然而,也因為工藝和功耗等的限制,單核cpu遇到了人生的天花板,急需轉換思維,以滿足無止境的性能需求。多核cpu在此登上歷史舞臺。給你的老爺車多加兩個引擎,讓你有法拉利的感覺。現時代,連手機都到處叫囂自己有4核8核處理器的時代,PC就更不用說了。
扯遠了,anyway,對于俺們程序員來說,如何利用如此強大的引擎完成我們的任務才是我們要考慮的。隨著大規模數據處理、大規模問題和復雜系統求解需求的增加,以前的單核編程已經有心無力了。如果程序一跑就得幾個小時,甚至一天,想想都無法原諒自己。那如何讓自己更快的過度到高大上的多核并行編程中去呢?哈哈,廣大人民的力量!
目前工作中我所接觸到的并行處理框架主要有MPI、OpenMP和MapReduce(Hadoop)三個(CUDA屬于GPU并行編程,這里不提及)。MPI和Hadoop都可以在集群中運行,而OpenMP因為共享存儲結構的關系,不能在集群上運行,只能單機。另外,MPI可以讓數據保留在內存中,可以為節點間的通信和數據交互保存上下文,所以能執行迭代算法,而Hadoop卻不具有這個特性。因此,需要迭代的機器學習算法大多使用MPI來實現。當然了,部分機器學習算法也是可以通過設計使用Hadoop來完成的。(淺見,如果錯誤,希望各位不吝指出,謝謝)。
本文主要介紹Python環境下MPI編程的實踐基礎。
二、MPI與mpi4py
MPI是MessagePassingInterface的簡稱,也就是消息傳遞。消息傳遞指的是并行執行的各個進程具有自己獨立的堆棧和代碼段,作為互不相關的多個程序獨立執行,進程之間的信息交互完全通過顯示地調用通信函數來完成。
Mpi4py是構建在mpi之上的python庫,使得python的數據結構可以在進程(或者多個cpu)之間進行傳遞。
2.1、MPI的工作方式
很簡單,就是你啟動了一組MPI進程,每個進程都是執行同樣的代碼!然后每個進程都有一個ID,也就是rank來標記我是誰。什么意思呢?假設一個CPU是你請的一個工人,共有10個工人。你有100塊磚頭要搬,然后很公平,讓每個工人搬10塊。這時候,你把任務寫到一個任務卡里面,讓10個工人都執行這個任務卡中的任務,也就是搬磚!這個任務卡中的“搬磚”就是你寫的代碼。然后10個CPU執行同一段代碼。需要注意的是,代碼里面的所有變量都是每個進程獨有的,雖然名字相同。
例如,一個腳本test.py,里面包含以下代碼:
frommpi4pyimportMPI
print("helloworld'')
print("myrankis:%d"%MPI.rank)
然后我們在命令行通過以下方式運行:
#mpirun–np5pythontest.py
-np5指定啟動5個mpi進程來執行后面的程序。相當于對腳本拷貝了5份,每個進程運行一份,互不干擾。在運行的時候代碼里面唯一的不同,就是各自的rank也就是ID不一樣。所以這個代碼就會打印5個helloworld和5個不同的rank值,從0到4.
2.2、點對點通信
點對點通信(Point-to-PointCommunication)的能力是信息傳遞系統最基本的要求。意思就是讓兩個進程直接可以傳輸數據,也就是一個發送數據,另一個接收數據。接口就兩個,send和recv,來個例子:
importmpi4py.MPIasMPI
comm=MPI.COMM_WORLD
comm_rank=comm.Get_rank()
comm_size=comm.Get_size()
#pointtopointcommunication
data_send=[comm_rank]*5
comm.send(data_send,dest=(comm_rank+1)%comm_size)
data_recv=comm.recv(source=(comm_rank-1)%comm_size)
print("myrankis%d,andIreceived:"%comm_rank)
printdata_recv
啟動5個進程運行以上代碼,結果如下:
myrankis0,andIreceived:
[4,4,4,4,4]
myrankis1,andIreceived:
[0,0,0,0,0]
myrankis2,andIreceived:
[1,1,1,1,1]
myrankis3,andIreceived:
[2,2,2,2,2]
myrankis4,andIreceived:
[3,3,3,3,3]
可以看到,每個進程都創建了一個數組,然后把它傳遞給下一個進程,最后的那個進程傳遞給第一個進程。comm_size就是mpi的進程個數,也就是-np指定的那個數。MPI.COMM_WORLD表示進程所在的通信組。
但這里面有個需要注意的問題,如果我們要發送的數據比較小的話,mpi會緩存我們的數據,也就是說執行到send這個代碼的時候,會緩存被send的數據,然后繼續執行后面的指令,而不會等待對方進程執行recv指令接收完這個數據。但是,如果要發送的數據很大,那么進程就是掛起等待,直到接收進程執行了recv指令接收了這個數據,進程才繼續往下執行。所以上述的代碼發送[rank]*5沒啥問題,如果發送[rank]*500程序就會半死不活的樣子了。因為所有的進程都會卡在發送這條指令,等待下一個進程發起接收的這個指令,但是進程是執行完發送的指令才能執行接收的指令,這就和死鎖差不多了。所以一般,我們將其修改成以下的方式:
importmpi4py.MPIasMPI
comm=MPI.COMM_WORLD
comm_rank=comm.Get_rank()
comm_size=comm.Get_size()
data_send=[comm_rank]*5
ifcomm_rank==0:
comm.send(data_send,dest=(comm_rank+1)%comm_size)
ifcomm_rank>0:
data_recv=comm.recv(source=(comm_rank-1)%comm_size)
comm.send(data_send,dest=(comm_rank+1)%comm_size)
ifcomm_rank==0:
data_recv=comm.recv(source=(comm_rank-1)%comm_size)
print("myrankis%d,andIreceived:"%comm_rank)
printdata_recv
第一個進程一開始就發送數據,其他進程一開始都是在等待接收數據,這時候進程1接收了進程0的數據,然后發送進程1的數據,進程2接收了,再發送進程2的數據……知道最后進程0接收最后一個進程的數據,從而避免了上述問題。
一個比較常用的方法是封一個組長,也就是一個主進程,一般是進程0作為主進程leader。主進程將數據發送給其他的進程,其他的進程處理數據,然后返回結果給進程0。換句話說,就是進程0來控制整個數據處理流程。
2.3、群體通信
點對點通信是A發送給B,一個人將自己的秘密告訴另一個人,群體通信(CollectiveCommunications)像是拿個大喇叭,一次性告訴所有的人。前者是一對一,后者是一對多。但是,群體通信是以更有效的方式工作的。它的原則就一個:盡量把所有的進程在所有的時刻都使用上!我們在下面的bcast小節講述。
群體通信還是發送和接收兩類,一個是一次性把數據發給所有人,另一個是一次性從所有人那里回收結果。
1)廣播bcast
將一份數據發送給所有的進程。例如我有200份數據,有10個進程,那么每個進程都會得到這200份數據。
importmpi4py.MPIasMPI
comm=MPI.COMM_WORLD
comm_rank=comm.Get_rank()
comm_size=comm.Get_size()
ifcomm_rank==0:
data=range(comm_size)
data=comm.bcast(dataifcomm_rank==0elseNone,root=0)
print'rank%d,got:'%(comm_rank)
printdata
結果如下:
rank0,got:
[0,1,2,3,4]
rank1,got:
[0,1,2,3,4]
rank2,got:
[0,1,2,3,4]
rank3,got:
[0,1,2,3,4]
rank4,got:
[0,1,2,3,4]
Root進程自己建了一個列表,然后廣播給所有的進程。這樣所有的進程都擁有了這個列表。然后愛干嘛就干嘛了。
對廣播最直觀的觀點是某個特定進程將數據一一發送給每個進程。假設有n個進程,那么假設我們的數據在0進程,那么0進程就需要將數據發送給剩下的n-1個進程,這是非常低效的,復雜度是O(n)。那有沒有高效的方式?一個最常用也是非常高效的手段是規約樹廣播:收到廣播數據的所有進程都參與到數據廣播的過程中。首先只有一個進程有數據,然后它把它發送給第一個進程,此時有兩個進程有數據;然后這兩個進程都參與到下一次的廣播中,這時就會有4個進程有數據,……,以此類推,每次都會有2的次方個進程有數據。通過這種規約樹的廣播方法,廣播的復雜度降為O(logn)。這就是上面說的群體通信的高效原則:充分利用所有的進程來實現數據的發送和接收。
2)散播scatter
將一份數據平分給所有的進程。例如我有200份數據,有10個進程,那么每個進程會分別得到20份數據。
importmpi4py.MPIasMPI
comm=MPI.COMM_WORLD
comm_rank=comm.Get_rank()
comm_size=comm.Get_size()
ifcomm_rank==0:
data=range(comm_size)
printdata
else:
data=None
local_data=comm.scatter(data,root=0)
print'rank%d,got:'%comm_rank
printlocal_data
結果如下:
[0,1,2,3,4]
rank0,got:
0
rank1,got:
1
rank2,got:
2
rank3,got:
3
rank4,got:
4
這里root進程創建了一個list,然后將它散播給所有的進程,相當于對這個list做了劃分,每個進程獲得等分的數據,這里就是list的每一個數。(主要根據list的索引來劃分,list索引為第i份的數據就發送給第i個進程)。如果是矩陣,那么就等分的劃分行,每個進程獲得相同的行數進行處理。
需要注意的是,MPI的工作方式是每個進程都會執行所有的代碼,所以每個進程都會執行scatter這個指令,但只有root執行它的時候,它才兼備發送者和接收者的身份(root也會得到屬于自己的數據),對于其他進程來說,他們都只是接收者而已。
3)收集gather
那有發送,就有一起回收的函數。Gather是將所有進程的數據收集回來,合并成一個列表。下面聯合scatter和gather組成一個完成的分發和收回過程:
importmpi4py.MPIasMPI
comm=MPI.COMM_WORLD
comm_rank=comm.Get_rank()
comm_size=comm.Get_size()
ifcomm_rank==0:
data=range(comm_size)
printdata
else:
data=None
local_data=comm.scatter(data,root=0)
local_data=local_data*2
print'rank%d,gotanddo:'%comm_rank
printlocal_data
combine_data=comm.gather(local_data,root=0)
ifcomm_rank==0:
printcombine_data
結果如下:
[0,1,2,3,4]
rank0,gotanddo:
0
rank1,gotanddo:
2
rank2,gotanddo:
4
rank4,gotanddo:
8
rank3,gotanddo:
6
[0,2,4,6,8]
Root進程將數據通過scatter等分發給所有的進程,等待所有的進程都處理完后(這里只是簡單的乘以2),root進程再通過gather回收他們的結果,和分發的原則一樣,組成一個list。Gather還有一個變體就是allgather,可以理解為它在gather的基礎上將gather的結果再bcast了一次。啥意思?意思是root進程將所有進程的結果都回收統計完后,再把整個統計結果告訴大家。這樣,不僅root可以訪問combine_data,所有的進程都可以訪問combine_data了。
4)規約reduce
規約是指不但將所有的數據收集回來,收集回來的過程中還進行了簡單的計算,例如求和,求最大值等等。為什么要有這個呢?我們不是可以直接用gather全部收集回來了,再對列表求個sum或者max就可以了嗎?這樣不是累死組長嗎?為什么不充分使用每個工人呢?規約實際上是使用規約樹來實現的。例如求max,完成可以讓工人兩兩pk后,再返回兩兩pk的最大值,然后再對第二層的最大值兩兩pk,直到返回一個最終的max給組長。組長就非常聰明的將工作分配下工人高效的完成了。這是O(n)的復雜度,下降到O(logn)(底數為2)的復雜度。
importmpi4py.MPIasMPI
comm=MPI.COMM_WORLD
comm_rank=comm.Get_rank()
comm_size=comm.Get_size()
ifcomm_rank==0:
data=range(comm_size)
printdata
else:
data=None
local_data=comm.scatter(data,root=0)
local_data=local_data*2
print'rank%d,gotanddo:'%comm_rank
printlocal_data
all_sum=comm.reduce(local_data,root=0,op=MPI.SUM)
ifcomm_rank==0:
print'sumis:%d'%all_sum
結果如下:
[0,1,2,3,4]
rank0,gotanddo:
0
rank1,gotanddo:
2
rank2,gotanddo:
4
rank3,gotanddo:
6
rank4,gotanddo:
8
sumis:20
可以看到,最后可以得到一個sum值。
以上內容為大家介紹了Python多核編程mpi4py實踐,希望對大家有所幫助,如果想要了解更多Python相關知識,請關注IT培訓機構:千鋒教育。http://www.dietsnews.net/