multiprocessing 是一个使用方法类似threading模块的进程模块,在python 2.6才开始使用。

它允许程序员做并行开发,并且可以在UNIX和Windows下运行。

 

multiprocessing 使用示例

先通过创建一个multiprocessing.Process来完成一个简单任务的示例,来初步认识。

简单示例代码:

# !/usr/bin/env python  
# -*- coding:utf-8 -*-  
#
# Copyright 2015 mimvp.com
  
import multiprocessing
import time
import os
  
def func(name, processName):
    print 'Process[%s]  hello %s' % (processName, name)
    print 'sub pid: %d, ppid: %d' % (os.getpid(), os.getppid())
    time.sleep(0.1)
  
  
def main():
    print 'main pid: %d, ppid: %d' % (os.getpid(), os.getppid())
    
    processList = []
    for i in xrange(4):
        pro = multiprocessing.Process(target=func, args=(i, 'Process-' + str(i)))
        pro.start()
#         pro.join()      # 不要在此处阻塞子进程,否则子进程都将是线性顺序执行
        processList.append(pro)
        
    for pro in processList:
        pro.join()      # 在此处阻塞子进程,可实现异步执行效果,直至子进程全部完成后再继续执行父进程
      
# 测试
if __name__ == '__main__':  
    main()
    print('end.')

运行结果:

main pid: 9405, ppid: 6463
Process[Process-0]  hello 0
sub pid: 9410, ppid: 9405
Process[Process-2]  hello 2
sub pid: 9412, ppid: 9405
Process[Process-3]  hello 3
sub pid: 9413, ppid: 9405
Process[Process-1]  hello 1
sub pid: 9411, ppid: 9405
end.

 

进程之间的通信方式

多进程之间进行通信,常用有两种主要的方式:Queue、Pipe

方式1:Queue 队列

这里需要需要,此处多进程之间通信的Queue来自multiprocessing.Queue(),有别于Queue.Queue()的复制。

示例代码:

# !/usr/bin/env python  
# -*- coding:utf-8 -*-  
#
# Copyright 2015 mimvp.com
  
import multiprocessing
import time
import os
  

lock = multiprocessing.Lock()

def func(name, processName, queue):
    p_info = 'Process[%s]  hello %s' % (processName, name)
    queue.put(p_info)
    
    print 'queue put: ', p_info
    print 'sub pid: %d, ppid: %d' % (os.getpid(), os.getppid())
    time.sleep(0.5)
  
  
def main():
    print 'main pid: %d, ppid: %d' % (os.getpid(), os.getppid())
    
    # 注意: 此处是Queue来自multiprocessing.Queue(), 其来源于 from multiprocessing.queues import Queue
    # 此处进程的Queue与平时多线程采用的Queue.Queue()是不同的,要进行区分
    qu = multiprocessing.Queue()    
    
    processList = []
    for i in xrange(4):
        pro = multiprocessing.Process(target=func, args=(i, 'Process-' + str(i), qu))
        pro.start()
        processList.append(pro)
        
    for pro in processList:
        pro.join()      # 在此处阻塞子进程,可实现异步执行效果,直至子进程全部完成后再继续执行父进程
      
    print 'queue size: ', qu.qsize()
    while not qu.empty():
        p_info = qu.get(block=False)
        print 'queue get: ', p_info
        
# 测试
if __name__ == '__main__':  
    main()
    print('end.')

运行结果:

main pid: 13264, ppid: 6463
queue put:  Process[Process-0]  hello 0
sub pid: 13269, ppid: 13264
queue put:  Process[Process-2]  hello 2
sub pid: 13272, ppid: 13264
queue put:  Process[Process-1]  hello 1
sub pid: 13270, ppid: 13264
queue put:  Process[Process-3]  hello 3
sub pid: 13273, ppid: 13264
queue size:  4
queue get:  Process[Process-0]  hello 0
queue get:  Process[Process-2]  hello 2
queue get:  Process[Process-3]  hello 3
queue get:  Process[Process-1]  hello 1
end.

从上面输出结果,可以看出多进程是异步执行的,且从 multiprocessing.Queue() 取出的顺序也可能是异步的。

 

方式2: Pipe 管道

Pipe可以是单向(half-duplex),也可以是双向(duplex)。通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认duplex=True,为双向通信)

一个进程从Pipe某一端输入对象,然后被Pipe另一端的进程接收,单向管道只允许管道一端的进程输入另一端的进程接收,不可以反向通信;而双向管道则允许从两端输入和从两端接收。

示例代码:

# !/usr/bin/env python  
# -*- coding:utf-8 -*-  
#
# Copyright 2015 mimvp.com
  
import multiprocessing
import time
import os
  

lock = multiprocessing.Lock()

def func(name, processName, pipe):
    p_info = 'Process[%s]  hello %s' % (processName, name)
    print 'pipe send: ', p_info
    pipe.send(p_info)
    
    print 'sub pid: %d, ppid: %d' % (os.getpid(), os.getppid())
    time.sleep(0.1)
  
  
def main():
    print 'main pid: %d, ppid: %d' % (os.getpid(), os.getppid())
    
    # 注意: 此处是Pipe来自multiprocessing.Pipe(), 其来源于 multiprocessing.connection import Pipe
    pipe_parent, pipe_child = multiprocessing.Pipe(duplex=False)
    
    processList = []
    for i in xrange(4):
        pro = multiprocessing.Process(target=func, args=(i, 'Process-' + str(i), pipe_child))
        pro.start()
        processList.append(pro)
        
    for pro in processList:
        pro.join()      # 在此处阻塞子进程,可实现异步执行效果,直至子进程全部完成后再继续执行父进程
      
    pipe_child.send(None)   # 此处等全部子进程执行完成后,输入'None'标记,表示Pipe结束接收任务,可以退出
    
    while pipe_parent:
        p_info = pipe_parent.recv()
        print 'pipe get: ', p_info
        
        if not p_info:      # 如果接收到了'None'标记,退出Pipe
            print 'pipe get:  None, then exit out.'
            break
    
# 测试
if __name__ == '__main__':  
    main()
    print('end.')

运行结果:

main pid: 12978, ppid: 6463
pipe send:  Process[Process-3]  hello 3
sub pid: 12986, ppid: 12978
pipe send:  Process[Process-2]  hello 2
sub pid: 12985, ppid: 12978
pipe send:  Process[Process-0]  hello 0
sub pid: 12983, ppid: 12978
pipe send:  Process[Process-1]  hello 1
sub pid: 12984, ppid: 12978
pipe get:  Process[Process-3]  hello 3
pipe get:  Process[Process-2]  hello 2
pipe get:  Process[Process-0]  hello 0
pipe get:  Process[Process-1]  hello 1
pipe get:  None
pipe get:  None, then exit out.
end.

上面代码,有个需要注意的处理技巧:

在 while pipe_parent 等待接收Pipe管道的消息,如果一直没有消息到来,则一直while循环阻塞在这里,主进程(父进程)无法结束。因此,在子进程任务全部执行完毕后,向Pipe管道发送了一条'None'标记:pipe_child.send(None),用于让while循环判断是否结束退出循环(break)

Pipe()返回两个连接类,代表两个方向,如果两个进程在管道的两边同时读或同时写,会有可能造成corruption.

 

多进程之间的同步

Python多进程同步方法有 Lock、Semaphore、Event实例,

1) Lock用来避免访问冲突

2) Semaphore用来控制对共享资源的访问数量

3) Event用来实现进程间同步通信

multiprocessing contains equivalents of all the synchronization primitives from threading.

例如,可以加一个锁,以使某一时刻只有一个进程print

示例代码:

# !/usr/bin/env python  
# -*- coding:utf-8 -*-  
#
# Copyright 2015 mimvp.com
  
import threading
import multiprocessing
import time
import os
  

lock = multiprocessing.Lock()
count = 0

def func(name, processName):
    time.sleep(0.5)
    
    # lock同步,使同一时刻两条print连续打印;如果不加lock同步,两条print可能不连续打印(非原子不同)
    lock.acquire()
    global count
    count += 1
    p_info = 'Process[%s]  hello %s, count: %d' % (processName, name, count)
    print p_info
    print 'Process[%s] sub pid: %d, ppid: %d' % (processName, os.getpid(), os.getppid())
    lock.release()
  
  
def main():
    print 'main pid: %d, ppid: %d' % (os.getpid(), os.getppid())
    
    global count
    processList = []
    for i in xrange(4):
        pro = multiprocessing.Process(target=func, args=(i, 'Process-' + str(i)))
        pro.start()
        processList.append(pro)
        
    for pro in processList:
        pro.join()      # 在此处阻塞子进程,可实现异步执行效果,直至子进程全部完成后再继续执行父进程
      
    print 'main count: %d' % (count,)
    
# 测试
if __name__ == '__main__':  
    main()
    print('end.')

运行结果:

main pid: 14808, ppid: 6463
Process[Process-3]  hello 3, count: 1
Process[Process-3] sub pid: 14817, ppid: 14808
Process[Process-2]  hello 2, count: 1
Process[Process-2] sub pid: 14816, ppid: 14808
Process[Process-0]  hello 0, count: 1
Process[Process-0] sub pid: 14814, ppid: 14808
Process[Process-1]  hello 1, count: 1
Process[Process-1] sub pid: 14815, ppid: 14808
main count: 0
end.

 

进程间共享状态

当然尽最大可能防止使用共享状态,但最终有可能会使用到.

1-共享内存

可以通过使用Value或者Array把数据存储在一个共享的内存表中

示例代码:

# !/usr/bin/env python  
# -*- coding:utf-8 -*-  
#
# Copyright 2015 mimvp.com
  
import multiprocessing
import time
import os


def func(name, processName, num, arr):
    time.sleep(0.5)
    
    num.value = int(name) * 2
    for i in xrange(len(arr)):
        arr[i] = arr[i] + 10
    p_info = 'Process[%s]  name %s, num.value: %d' % (processName, name, num.value)
    print p_info
  
  
def main():
    print 'main pid: %d, ppid: %d' % (os.getpid(), os.getppid())
    
    # num 和 arr 都是multiprocessing内的变量,共享内存,因此在父进程 - 子进程 - 父进程 内存地址都不变
    num = multiprocessing.Value('d', 0.0)           # 'd' type is double
    arr = multiprocessing.Array('i', range(10))     # 'i' type is int
    print 'main init num: ', num.value
    print 'main init arr: ', arr[:]
    
    processList = []
    for i in xrange(4):
        pro = multiprocessing.Process(target=func, args=(i, 'Process-' + str(i), num, arr))
        pro.start()
        processList.append(pro)
        
    for pro in processList:
        pro.join()      
      
    print 'main result num: ', num.value
    print 'main result arr: ', arr[:]
    
# 测试
if __name__ == '__main__':  
    main()
    print('end.')

运行结果:

main pid: 15914, ppid: 6463
main init num:  0.0
main init arr:  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Process[Process-0]  name 0, num.value: 0
Process[Process-2]  name 2, num.value: 4
Process[Process-3]  name 3, num.value: 6
Process[Process-1]  name 1, num.value: 2
main result num:  2.0
main result arr:  [40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
end.

'd'和'i'参数是num和arr用来设置类型,d表示一个双精浮点类型,i表示一个带符号的整型。

num.value = int(name) * 2      对传入的参数i(也是形参name)乘以2倍

arr[i] = arr[i] + 10   对arr[i]值累加10,并且循环累加4次(因为开启了4个子进程)

更加灵活的共享内存可以使用multiprocessing.sharectypes模块

 

Server process

Manager()返回一个manager类型,控制一个server process,可以允许其它进程通过代理复制一些python objects

例如: 支持list, dict, Namespace, Lock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value, Array等。

示例代码:

# !/usr/bin/env python  
# -*- coding:utf-8 -*-  
#
# Copyright 2015 mimvp.com
  
import multiprocessing
import time
import os

lock = multiprocessing.Lock()

def func(name, processName, mydict, myarr):
    time.sleep(0.5)
    
    lock.acquire()
    mydict[name] = int(name) * 2
    for i in xrange(len(myarr)):
        myarr[i] = myarr[i] + 100
        
    p_info1 = 'Process[%s]  name %s, mydict: %s' % (processName, name, mydict)
    print p_info1
#     p_info2 = 'Process[%s]  name %s, myarr: %s' % (processName, name, myarr)
#     print p_info2
    lock.release()
  
  
def main():
    print 'main pid: %d, ppid: %d' % (os.getpid(), os.getppid())
    
    # 通过manager内的变量同步共享数据,支持 list, dict, Array, Value等python类型
    manager = multiprocessing.Manager()
    mydict = manager.dict()
    myarr = manager.Array('i', range(10))
    print 'main init mydict: ', mydict
#     print 'main init myarr: ', myarr[:]
    
    processList = []
    for i in xrange(4):
        pro = multiprocessing.Process(target=func, args=(i, 'Process-' + str(i), mydict, myarr))
        pro.start()
        processList.append(pro)
        
    for pro in processList:
        pro.join()      
      
    print 'main result mydict: ', mydict
#     print 'main result myarr: ', myarr
#     for i in xrange(len(myarr)):
#         print 'main result print i: %d, value: %d' % (i, myarr[i])
    
# 测试
if __name__ == '__main__':  
    main()
    print('end.')

运行结果:

main pid: 18049, ppid: 6463
main init mydict:  {}
Process[Process-0]  name 0, mydict: {0: 0}
Process[Process-2]  name 2, mydict: {0: 0, 2: 4}
Process[Process-1]  name 1, mydict: {0: 0, 1: 2, 2: 4}
Process[Process-3]  name 3, mydict: {0: 0, 1: 2, 2: 4, 3: 6}
main result mydict:  {0: 0, 1: 2, 2: 4, 3: 6}
end.

上面这段代码,有两个值得注意的地方:

1) 对任务执行函数func(...) 计算和打印需要加锁,保证计算和打印原子输出,否则可能会出现计算和打印结果不一致的情况

2) 上面示例除了演示 manager.dict(),还演示了 manager.Array(),打开注释后可执行运行,查看结

Server process managers比共享内存方法更加的灵活,一个单独的manager可以被同一网络的不同计算机的多个进程共享,但是它也比共享内存更加的缓慢。

 

使用工作池

Pool类代表 a pool of worker processes.

It has methods which allows tasks to be offloaded to the worker processes in a few different ways.

Python多进程之间的同步,除了上述方法Lock,共享内容,Manager代理外,还有Semaphore、Event等。

Python多进程之间的通信,除了上述方法消息队列Queue,管道Pipe外,还有Socket,RPC等。

 

参考推荐:

Python fork多进程之间pipe管道通信

Python 线程池的研究及实现