Python 进程间通信
multiprocessing 是一个使用方法类似threading模块的进程模块,在python 2.6才开始使用。
它允许程序员做并行开发,并且可以在UNIX和Windows下运行。
multiprocessing 使用示例
先通过创建一个multiprocessing.Process来完成一个简单任务的示例,来初步认识。
简单示例代码:
# !/usr/bin/env python # -*- coding:utf-8 -*- # # Copyright 2015 mimvp.com import multiprocessing import time import os def func(name, processName): print 'Process[%s] hello %s' % (processName, name) print 'sub pid: %d, ppid: %d' % (os.getpid(), os.getppid()) time.sleep(0.1) def main(): print 'main pid: %d, ppid: %d' % (os.getpid(), os.getppid()) processList = [] for i in xrange(4): pro = multiprocessing.Process(target=func, args=(i, 'Process-' + str(i))) pro.start() # pro.join() # 不要在此处阻塞子进程,否则子进程都将是线性顺序执行 processList.append(pro) for pro in processList: pro.join() # 在此处阻塞子进程,可实现异步执行效果,直至子进程全部完成后再继续执行父进程 # 测试 if __name__ == '__main__': main() print('end.')
运行结果:
main pid: 9405, ppid: 6463
Process[Process-0] hello 0
sub pid: 9410, ppid: 9405
Process[Process-2] hello 2
sub pid: 9412, ppid: 9405
Process[Process-3] hello 3
sub pid: 9413, ppid: 9405
Process[Process-1] hello 1
sub pid: 9411, ppid: 9405
end.
进程之间的通信方式
多进程之间进行通信,常用有两种主要的方式:Queue、Pipe
方式1:Queue 队列
这里需要需要,此处多进程之间通信的Queue来自multiprocessing.Queue(),有别于Queue.Queue()的复制。
示例代码:
# !/usr/bin/env python # -*- coding:utf-8 -*- # # Copyright 2015 mimvp.com import multiprocessing import time import os lock = multiprocessing.Lock() def func(name, processName, queue): p_info = 'Process[%s] hello %s' % (processName, name) queue.put(p_info) print 'queue put: ', p_info print 'sub pid: %d, ppid: %d' % (os.getpid(), os.getppid()) time.sleep(0.5) def main(): print 'main pid: %d, ppid: %d' % (os.getpid(), os.getppid()) # 注意: 此处是Queue来自multiprocessing.Queue(), 其来源于 from multiprocessing.queues import Queue # 此处进程的Queue与平时多线程采用的Queue.Queue()是不同的,要进行区分 qu = multiprocessing.Queue() processList = [] for i in xrange(4): pro = multiprocessing.Process(target=func, args=(i, 'Process-' + str(i), qu)) pro.start() processList.append(pro) for pro in processList: pro.join() # 在此处阻塞子进程,可实现异步执行效果,直至子进程全部完成后再继续执行父进程 print 'queue size: ', qu.qsize() while not qu.empty(): p_info = qu.get(block=False) print 'queue get: ', p_info # 测试 if __name__ == '__main__': main() print('end.')
运行结果:
main pid: 13264, ppid: 6463
queue put: Process[Process-0] hello 0
sub pid: 13269, ppid: 13264
queue put: Process[Process-2] hello 2
sub pid: 13272, ppid: 13264
queue put: Process[Process-1] hello 1
sub pid: 13270, ppid: 13264
queue put: Process[Process-3] hello 3
sub pid: 13273, ppid: 13264
queue size: 4
queue get: Process[Process-0] hello 0
queue get: Process[Process-2] hello 2
queue get: Process[Process-3] hello 3
queue get: Process[Process-1] hello 1
end.
从上面输出结果,可以看出多进程是异步执行的,且从 multiprocessing.Queue() 取出的顺序也可能是异步的。
方式2: Pipe 管道
Pipe可以是单向(half-duplex),也可以是双向(duplex)。通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认duplex=True,为双向通信)
一个进程从Pipe某一端输入对象,然后被Pipe另一端的进程接收,单向管道只允许管道一端的进程输入另一端的进程接收,不可以反向通信;而双向管道则允许从两端输入和从两端接收。
示例代码:
# !/usr/bin/env python # -*- coding:utf-8 -*- # # Copyright 2015 mimvp.com import multiprocessing import time import os lock = multiprocessing.Lock() def func(name, processName, pipe): p_info = 'Process[%s] hello %s' % (processName, name) print 'pipe send: ', p_info pipe.send(p_info) print 'sub pid: %d, ppid: %d' % (os.getpid(), os.getppid()) time.sleep(0.1) def main(): print 'main pid: %d, ppid: %d' % (os.getpid(), os.getppid()) # 注意: 此处是Pipe来自multiprocessing.Pipe(), 其来源于 multiprocessing.connection import Pipe pipe_parent, pipe_child = multiprocessing.Pipe(duplex=False) processList = [] for i in xrange(4): pro = multiprocessing.Process(target=func, args=(i, 'Process-' + str(i), pipe_child)) pro.start() processList.append(pro) for pro in processList: pro.join() # 在此处阻塞子进程,可实现异步执行效果,直至子进程全部完成后再继续执行父进程 pipe_child.send(None) # 此处等全部子进程执行完成后,输入'None'标记,表示Pipe结束接收任务,可以退出 while pipe_parent: p_info = pipe_parent.recv() print 'pipe get: ', p_info if not p_info: # 如果接收到了'None'标记,退出Pipe print 'pipe get: None, then exit out.' break # 测试 if __name__ == '__main__': main() print('end.')
运行结果:
main pid: 12978, ppid: 6463
pipe send: Process[Process-3] hello 3
sub pid: 12986, ppid: 12978
pipe send: Process[Process-2] hello 2
sub pid: 12985, ppid: 12978
pipe send: Process[Process-0] hello 0
sub pid: 12983, ppid: 12978
pipe send: Process[Process-1] hello 1
sub pid: 12984, ppid: 12978
pipe get: Process[Process-3] hello 3
pipe get: Process[Process-2] hello 2
pipe get: Process[Process-0] hello 0
pipe get: Process[Process-1] hello 1
pipe get: None
pipe get: None, then exit out.
end.
上面代码,有个需要注意的处理技巧:
在 while pipe_parent 等待接收Pipe管道的消息,如果一直没有消息到来,则一直while循环阻塞在这里,主进程(父进程)无法结束。因此,在子进程任务全部执行完毕后,向Pipe管道发送了一条'None'标记:pipe_child.send(None),用于让while循环判断是否结束退出循环(break)
Pipe()返回两个连接类,代表两个方向,如果两个进程在管道的两边同时读或同时写,会有可能造成corruption.
多进程之间的同步
Python多进程同步方法有 Lock、Semaphore、Event实例,
1) Lock用来避免访问冲突
2) Semaphore用来控制对共享资源的访问数量
3) Event用来实现进程间同步通信
multiprocessing contains equivalents of all the synchronization primitives from threading.
例如,可以加一个锁,以使某一时刻只有一个进程print
示例代码:
# !/usr/bin/env python # -*- coding:utf-8 -*- # # Copyright 2015 mimvp.com import threading import multiprocessing import time import os lock = multiprocessing.Lock() count = 0 def func(name, processName): time.sleep(0.5) # lock同步,使同一时刻两条print连续打印;如果不加lock同步,两条print可能不连续打印(非原子不同) lock.acquire() global count count += 1 p_info = 'Process[%s] hello %s, count: %d' % (processName, name, count) print p_info print 'Process[%s] sub pid: %d, ppid: %d' % (processName, os.getpid(), os.getppid()) lock.release() def main(): print 'main pid: %d, ppid: %d' % (os.getpid(), os.getppid()) global count processList = [] for i in xrange(4): pro = multiprocessing.Process(target=func, args=(i, 'Process-' + str(i))) pro.start() processList.append(pro) for pro in processList: pro.join() # 在此处阻塞子进程,可实现异步执行效果,直至子进程全部完成后再继续执行父进程 print 'main count: %d' % (count,) # 测试 if __name__ == '__main__': main() print('end.')
运行结果:
main pid: 14808, ppid: 6463
Process[Process-3] hello 3, count: 1
Process[Process-3] sub pid: 14817, ppid: 14808
Process[Process-2] hello 2, count: 1
Process[Process-2] sub pid: 14816, ppid: 14808
Process[Process-0] hello 0, count: 1
Process[Process-0] sub pid: 14814, ppid: 14808
Process[Process-1] hello 1, count: 1
Process[Process-1] sub pid: 14815, ppid: 14808
main count: 0
end.
进程间共享状态
当然尽最大可能防止使用共享状态,但最终有可能会使用到.
1-共享内存
可以通过使用Value或者Array把数据存储在一个共享的内存表中
示例代码:
# !/usr/bin/env python # -*- coding:utf-8 -*- # # Copyright 2015 mimvp.com import multiprocessing import time import os def func(name, processName, num, arr): time.sleep(0.5) num.value = int(name) * 2 for i in xrange(len(arr)): arr[i] = arr[i] + 10 p_info = 'Process[%s] name %s, num.value: %d' % (processName, name, num.value) print p_info def main(): print 'main pid: %d, ppid: %d' % (os.getpid(), os.getppid()) # num 和 arr 都是multiprocessing内的变量,共享内存,因此在父进程 - 子进程 - 父进程 内存地址都不变 num = multiprocessing.Value('d', 0.0) # 'd' type is double arr = multiprocessing.Array('i', range(10)) # 'i' type is int print 'main init num: ', num.value print 'main init arr: ', arr[:] processList = [] for i in xrange(4): pro = multiprocessing.Process(target=func, args=(i, 'Process-' + str(i), num, arr)) pro.start() processList.append(pro) for pro in processList: pro.join() print 'main result num: ', num.value print 'main result arr: ', arr[:] # 测试 if __name__ == '__main__': main() print('end.')
运行结果:
main pid: 15914, ppid: 6463
main init num: 0.0
main init arr: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Process[Process-0] name 0, num.value: 0
Process[Process-2] name 2, num.value: 4
Process[Process-3] name 3, num.value: 6
Process[Process-1] name 1, num.value: 2
main result num: 2.0
main result arr: [40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
end.
'd'和'i'参数是num和arr用来设置类型,d表示一个双精浮点类型,i表示一个带符号的整型。
num.value = int(name) * 2 对传入的参数i(也是形参name)乘以2倍
arr[i] = arr[i] + 10 对arr[i]值累加10,并且循环累加4次(因为开启了4个子进程)
更加灵活的共享内存可以使用multiprocessing.sharectypes模块
Server process
Manager()返回一个manager类型,控制一个server process,可以允许其它进程通过代理复制一些python objects
例如: 支持list, dict, Namespace, Lock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value, Array等。
示例代码:
# !/usr/bin/env python # -*- coding:utf-8 -*- # # Copyright 2015 mimvp.com import multiprocessing import time import os lock = multiprocessing.Lock() def func(name, processName, mydict, myarr): time.sleep(0.5) lock.acquire() mydict[name] = int(name) * 2 for i in xrange(len(myarr)): myarr[i] = myarr[i] + 100 p_info1 = 'Process[%s] name %s, mydict: %s' % (processName, name, mydict) print p_info1 # p_info2 = 'Process[%s] name %s, myarr: %s' % (processName, name, myarr) # print p_info2 lock.release() def main(): print 'main pid: %d, ppid: %d' % (os.getpid(), os.getppid()) # 通过manager内的变量同步共享数据,支持 list, dict, Array, Value等python类型 manager = multiprocessing.Manager() mydict = manager.dict() myarr = manager.Array('i', range(10)) print 'main init mydict: ', mydict # print 'main init myarr: ', myarr[:] processList = [] for i in xrange(4): pro = multiprocessing.Process(target=func, args=(i, 'Process-' + str(i), mydict, myarr)) pro.start() processList.append(pro) for pro in processList: pro.join() print 'main result mydict: ', mydict # print 'main result myarr: ', myarr # for i in xrange(len(myarr)): # print 'main result print i: %d, value: %d' % (i, myarr[i]) # 测试 if __name__ == '__main__': main() print('end.')
运行结果:
main pid: 18049, ppid: 6463
main init mydict: {}
Process[Process-0] name 0, mydict: {0: 0}
Process[Process-2] name 2, mydict: {0: 0, 2: 4}
Process[Process-1] name 1, mydict: {0: 0, 1: 2, 2: 4}
Process[Process-3] name 3, mydict: {0: 0, 1: 2, 2: 4, 3: 6}
main result mydict: {0: 0, 1: 2, 2: 4, 3: 6}
end.
上面这段代码,有两个值得注意的地方:
1) 对任务执行函数func(...) 计算和打印需要加锁,保证计算和打印原子输出,否则可能会出现计算和打印结果不一致的情况
2) 上面示例除了演示 manager.dict(),还演示了 manager.Array(),打开注释后可执行运行,查看结
Server process managers比共享内存方法更加的灵活,一个单独的manager可以被同一网络的不同计算机的多个进程共享,但是它也比共享内存更加的缓慢。
使用工作池
Pool类代表 a pool of worker processes.
It has methods which allows tasks to be offloaded to the worker processes in a few different ways.
Python多进程之间的同步,除了上述方法Lock,共享内容,Manager代理外,还有Semaphore、Event等。
Python多进程之间的通信,除了上述方法消息队列Queue,管道Pipe外,还有Socket,RPC等。
参考推荐:
版权所有: 本文系米扑博客原创、转载、摘录,或修订后发表,最后更新于 2015-01-23 02:20:46
侵权处理: 本个人博客,不盈利,若侵犯了您的作品权,请联系博主删除,莫恶意,索钱财,感谢!
转载注明: Python 进程间通信 (米扑博客)