Python的threading和multiprocessing模块实例
前几篇博客,研究了Python的并发编程,所谓并发无非多线程和多进程,最初找到的是threading模块,因为印象中线程“轻量...”,“切换快...”,“可共享进程资源...”等等,但是没想到这里水很深,进而找到了更好的替代品multiprocessing模块。
测试环境:
Ubuntu 14.04.1 LTS
Python 2.7.6
一、使用threading模块创建线程
1、三种线程创建方式
1)传入一个函数
这种方式是最基本的,即调用threading中的Thread类的构造函数,然后指定参数target=func
再使用返回的Thread的实例调用start()方法,即开始运行该线程,该线程将执行函数func,
当然,如果func需要参数,可以在Thread的构造函数中传入参数args=(...)
示例代码:
# !/usr/bin/env python # -*- coding:utf-8 -*- # # Copyright 2015 mimvp.com import threading import time # 线程执行任务的函数 def counter(n): count = 0 for i in xrange(n): i += 1 count += i time.sleep(0.01) print 'n : %d, count : %d' % (n, count) def main(): thd = threading.Thread(target=counter, args=(100,)) thd.start() thd.join() # 阻塞主线程,等待子线程先结束 # 测试 if __name__ == '__main__': main() print('end.')
运行结果:
n : 100, count : 5050
end.
上面这段代码很直观,counter函数是一个循环计数任务。
需要注意的是th.join()这句,其意思是主线程将自我阻塞,然后等待子线程thd执行完毕再结束
如果没有这句,运行代码会立即结束,表现为先执行主线程打印"end.",然后等子线程执行完毕打印"n : 100, count : 5050",其输出结果如下:
end.
n : 100, count : 5050
线程join的意思比较晦涩,其实将这句理解成这样会好理解些“while thd.is_alive(): time.sleep(0.01)”。虽然意思相同,但是后面将看到,使用join也有陷阱。
2)传入一个可调用的对象
许多的python 对象都是我们所说的可调用的,即是任何能通过函数操作符“()”来调用的对象(见《python核心编程》第14章)。类的对象也是可以调用的,当被调用时会自动调用对象的内建方法__call__(),因此这种新建线程的方法就是给线程指定一个__call__方法被重载了的对象。
示例代码:
# !/usr/bin/env python # -*- coding:utf-8 -*- # # Copyright 2015 mimvp.com import threading import time class Callable(object): def __init__(self, func, args): self.__func = func self.__args = args def __call__(self): apply(self.__func, self.__args) # 线程执行任务的函数 def counter(n): count = 0 for i in xrange(n): i += 1 count += i time.sleep(0.01) print 'n : %d, count : %d' % (n, count) def main(): thd = threading.Thread(target=Callable(counter, args=(100,))) thd.start() thd.join() # 阻塞主线程,等待子线程先结束 # 测试 if __name__ == '__main__': main() print('end.')
运行结果:
n : 100, count : 5050
end.
这个例子关键的一句是 apply(self.__func, self.__args) 这里使用初始化时传入的函数对象及其参数来进行一次调用。
3)继承Thread类
这种方式通过继承Thread类,并重载其run方法,来实现自定义的线程行为
示例代码:
# !/usr/bin/env python # -*- coding:utf-8 -*- # # Copyright 2015 mimvp.com import threading import time class SubThread(threading.Thread): def __init__(self, name): threading.Thread.__init__(self, name=name) # 重写父类的run方法 def run(self): time.sleep(0.1) print 'Thread[%s] counter start...' % self.name counter(100) print 'Thread[%s] counter end.' % self.name # 线程执行任务的函数 def counter(n): count = 0 for i in xrange(n): i += 1 count += i print 'n : %d, count : %d' % (n, count) def main(): for i in xrange(4): i += 1 thd = SubThread('thread-' + str(i)) thd.start() thd.join() # 阻塞主线程,等待子线程先结束 # 测试 if __name__ == '__main__': main() print('end.')
运行结果:
Thread[thread-1] counter start...
n : 100, count : 5050
Thread[thread-1] counter end.
Thread[thread-2] counter start...
n : 100, count : 5050
Thread[thread-2] counter end.
Thread[thread-3] counter start...
n : 100, count : 5050
Thread[thread-3] counter end.
Thread[thread-4] counter start...
n : 100, count : 5050
Thread[thread-4] counter end.
end.
这个例子定义了一个SubThread类,它继承了Thread类,并重载了run方法
测试时,循环开启了四个线程调用counter()任务,并打印一些信息,可以看到这种方式比较直观。
注意:在构造函数中要记得先调用父类的构造函数进行初始化,如 threading.Thread.__init__(self, name=name)
上例中,开启了四个线程执行counter()任务函数,且子线程start启动后采用 thd.join() 阻塞等待其执行完毕,因此输出结果是顺序的。
如果子线程start启动后不采用 thd.join() 阻塞方法(注释掉 thd.join() 一行),其输出结果可能不是顺序的,输出结果如下:
end.
Thread[thread-1] counter start...
Thread[thread-2] counter start...
n : 100, count : 5050
Thread[thread-2] counter end.
Thread[thread-3] counter start...
n : 100, count : 5050
Thread[thread-3] counter end.
n : 100, count : 5050
Thread[thread-1] counter end.
Thread[thread-4] counter start...
n : 100, count : 5050
Thread[thread-4] counter end.
2、python多线程的限制
python多线程有一个非常大的限制,全局解释器锁(GIL,Global Interpreter Lock),这个锁的意思是任一时刻只能有一个线程使用解释器,跟单cpu跑多个程序一个意思,大家都是轮着用的,这叫“并发”,不是“并行”。手册上的解释是为了保证对象模型的正确性!
这个锁造成的困扰是如果有一个计算密集型的线程占着cpu,其他的线程都得等着....,试想你的多个线程中有这么一个计算密集型线程(主要耗CPU资源),得多悲剧,多线程生生被搞成串行。
当然这个模块也不是毫无用处,手册上又说了:当用于IO密集型任务时,IO期间线程会释放解释器,这样别的线程就有机会使用解释器了,所以是否使用这个模块需要考虑面对的任务类型。
二、使用multiprocessing创建进程
1、三种创建方式
进程的创建方式跟线程完全一致,只不过要将threading.Thread换成multiprocessing.Process
multiprocessing模块尽力保持了与threading模块在方法名上的一致性,示例代码可参考上面线程部分的,这里只给出第一种使用函数的方式。
示例代码:
# !/usr/bin/env python # -*- coding:utf-8 -*- # # Copyright 2015 mimvp.com import multiprocessing import time import os # 线程执行任务的函数 def counter(n): count = 0 for i in xrange(n): i += 1 count += i time.sleep(0.01) print 'n : %d, count : %d' % (n, count) time.sleep(10) def main(): print 'main pid : %d, ppid : %d' % (os.getpid(), os.getppid()) subPro = multiprocessing.Process(target=counter, args=(100,)) subPro.start() subPro.join() # 阻塞主进程,等待子进程先结束 print 'subPro pid : %d, ppid : %d' % (subPro.pid, os.getpid()) # 测试 if __name__ == '__main__': main() print('end.') time.sleep(20)
运行结果:
main pid : 23969, ppid : 2521
n : 100, count : 5050
subPro pid : 23974, ppid : 23969
end.
上面代码运行时,在子进程和主进程中特意添加了 time.sleep(10) 和 time.sleep(20),目的是通过命令 ps -ef | grep -v grep | grep python 查看真实的进程pid和ppid,本示例如下:
homer@ubuntu:~/workspace/proxy_client$ ps -ef | grep -v grep | grep python
homer 23969 2521 0 17:43 ? 00:00:00 /usr/bin/python2.7 -u /home/homer/test_processpool.py
homer 23974 23969 0 17:43 ? 00:00:00 /usr/bin/python2.7 -u /home/homer/test_processpool.py
从上面结果对比中,可以看到 ps -ef 和 Python代码输出的进程pid和ppid是匹配的。
另外,进程join方法 subPro.join() 的目的和线程一样,都是阻塞父进程来等待子进程执行完毕后,再继续执行父进程后面的代码,具体表现在输出 "end."在子进程执行完毕后才打印出来。
2、创建进程池
该模块还允许一次创建一组进程,然后再给他们分配任务,详细内容可参考手册。
代码示例:
# !/usr/bin/env python # -*- coding:utf-8 -*- # # Copyright 2015 mimvp.com import multiprocessing import time import os # 线程执行任务的函数 def counter(n, processName, ppid): count = 0 for i in xrange(n): i += 1 count += i time.sleep(0.01) # sub process pid, ppid print 'sub pid : %d, ppid : %d, from Process[%s]' % (os.getpid(), ppid, processName) print 'Process[%s], n : %d, count : %d' % (processName, n, count) time.sleep(10) return processName + " done." def main(): # main process pid, ppid print 'main pid : %d, ppid : %d' % (os.getpid(), os.getppid()) subPool = multiprocessing.Pool(4) for i in xrange(4): processName = 'Process-' + str(i+1) result = subPool.apply_async(counter, args=(100, processName, os.getpid())) print processName + ' - result : ' + str(result.get()) subPool.close() subPool.join() # 测试 if __name__ == '__main__': main() print('end.') time.sleep(20)
运行结果:
main pid : 25895, ppid : 2521
sub pid : 25900, ppid : 25895, from Process[Process-1]
Process[Process-1], n : 100, count : 5050
Process-1 - result : Process-1 done.
sub pid : 25901, ppid : 25895, from Process[Process-2]
Process[Process-2], n : 100, count : 5050
Process-2 - result : Process-2 done.
sub pid : 25902, ppid : 25895, from Process[Process-3]
Process[Process-3], n : 100, count : 5050
Process-3 - result : Process-3 done.
sub pid : 25903, ppid : 25895, from Process[Process-4]
Process[Process-4], n : 100, count : 5050
Process-4 - result : Process-4 done.
end.
通过命令 ps -ef | grep -v grep | grep python 查看真实的进程pid和ppid
homer@ubuntu:~/workspace/proxy_client$ ps -ef | grep -v grep | grep python
homer 25895 2521 1 18:06 ? 00:00:00 /usr/bin/python2.7 -u /home/homer/test_processpool.py
homer 25900 25895 0 18:06 ? 00:00:00 /usr/bin/python2.7 -u /home/homer/test_processpool.py
homer 25901 25895 0 18:06 ? 00:00:00 /usr/bin/python2.7 -u /home/homer/test_processpool.py
homer 25902 25895 0 18:06 ? 00:00:00 /usr/bin/python2.7 -u /home/homer/test_processpool.py
homer 25903 25895 0 18:06 ? 00:00:00 /usr/bin/python2.7 -u /home/homer/test_processpool.py
3、使用进程的好处
完全并行,无GIL的限制,可充分利用多cpu多核的环境;可以接受linux信号,后面将看到这个功能非常好用。
三、实例研究
该实例假想的任务是:
一个主进程会启动多个子进程分别处理不同的任务,各个子进程可能又有自己的线程用于不同的IO处理(前面说过,线程在IO方面还是不错的)
要实现的功能是,对这些子进程发送信号,能被正确的处理,例如:发送信号 SIGTERM,子进程能通知其线程收工,然后“优雅”的退出。
现在要解决的问题有:
1)在子类化的Process对象中如何捕捉信号;
2)如何“优雅的退出”。
下面分别说明
1、子类化Process并捕捉信号,全局函数实现
如果是使用第一种进程创建方式(传入函数),那么捕捉信号很容易,假设给进程运行的函数叫func
代码示例:
# !/usr/bin/env python # -*- coding:utf-8 -*- # # Copyright 2015 mimvp.com import multiprocessing import signal import time import os def handler(signum, frame): print 'handler - signal', signum # 线程执行任务的函数 def counter(n, ppid): # main process pid, ppid print 'main pid : %d, ppid : %d' % (os.getpid(), ppid) signal.signal(signal.SIGTERM, handler) signal.signal(signal.SIGINT, handler) count = 0 for i in xrange(n): i += 1 count += i time.sleep(0.01) print 'n : %d, count : %d' % (n, count) time.sleep(15) def main(): # main process pid, ppid print 'main pid : %d, ppid : %d' % (os.getpid(), os.getppid()) subPro = multiprocessing.Process(target=counter, args=(100, os.getpid())) subPro.start() # subPro.join() # 阻塞主进程,等待子进程先结束 print 'subPro pid : %d, ppid : %d' % (subPro.pid, os.getpid()) # 测试 if __name__ == '__main__': main() print('end.') time.sleep(20)
运行结果:
main pid : 29426, ppid : 2521
subPro pid : 29431, ppid : 29426
end.
main pid : 29431, ppid : 29426
n : 100, count : 5050
handler - signal 15 # 在控制台输入执行命令 kill -TERM 29431 后,才会输出此行
上面这段代码,是在第一种创建多进程方式的基础上修改而来的,增加了两行signal.signal(...)调用,说明这个函数要捕捉SIGTERM和SIGINT两个信号。
另外增加了一个handler函数,该函数用于捕捉到信号时进行相应的处理,我们这里只是简单的打印出信号值。
注意:
subPro.join() 被注释掉了,这里跟线程的情况有点区别,新的进程启动后就开始运行了,主进程也不用等待它运行完,可以该干嘛干嘛去。
这段代码运行后会打印出子进程的进程id(subPro pid : 29431, ppid : 29426),根据这个id=29431,在控制台终端输入: kill -TERM 29431,会发现在终端打印出了"handler - signal 15 "
但是使用传入函数的方式有一点不好的是封装性太差,如果功能稍微复杂点,将会有很多的全局变量暴露在外,最好还是将功能封装成类,那么使用类又怎么注册信号相应函数呢?
上面的例子貌似只能使用一个全局的函数,手册也没有给出在类中处理信号的例子,其实解决方法大同小异,也很容易,推荐一篇StackOverflow的帖子:Python signal: reading return from signal handler function
子进程捕捉信号(多进程类实现)
代码示例:
# !/usr/bin/env python # -*- coding:utf-8 -*- # # Copyright 2015 mimvp.com import multiprocessing import signal import time import os class Master(multiprocessing.Process): def __init__(self, n, ppid): super(Master, self).__init__() signal.signal(signal.SIGTERM, self.handler) # signal self.__n = n self.__ppid = ppid self.__live = 1 # handler signal def handler(self, signum, frame): print 'handler - signal : ', signum self.__live = 0 def run(self): print 'pid : ', self.pid while self.__live: print 'Process[%s] living...' % self.name # subprocess name counter(self.__n, self.__ppid) # task function time.sleep(2) def handler(signum, frame): print 'handler - signal : ', signum # 线程执行任务的函数 def counter(n, ppid): # main process pid, ppid print 'main pid : %d, ppid : %d' % (os.getpid(), ppid) count = 0 for i in xrange(n): i += 1 count += i time.sleep(0.01) print 'n : %d, count : %d' % (n, count) def main(): # main process pid, ppid print 'main pid : %d, ppid : %d' % (os.getpid(), os.getppid()) subPro = Master(100, os.getpid()) subPro.start() # subPro.join() # 阻塞主进程,等待子进程先结束 print 'subPro pid : %d, ppid : %d' % (subPro.pid, os.getpid()) # 测试 if __name__ == '__main__': main() print('end.') time.sleep(20)
运行结果:
main pid : 31115, ppid : 2521
subPro pid : 31121, ppid : 31115
end.
pid : 31121
Process[Master-1] living...
main pid : 31121, ppid : 31115
n : 100, count : 5050
Process[Master-1] living...
main pid : 31121, ppid : 31115
n : 100, count : 5050
Process[Master-1] living...
main pid : 31121, ppid : 31115
n : 100, count : 5050
handler - signal : 15 # 在控制台输入执行命令 kill -TERM 29431 后,才会输出此行
上面示例方法很直观,首先在构造函数中注册信号处理函数signal.signal(signal.SIGTERM, self.handler),然后定义了一个方法handler(self, signum, frame)作为信号处理函数。
这个进程类会每隔2秒打印一个“Process[xxx] living...”,当接收到SIGTERM后,改变self.__live的值,run方法的循环在检测到self.__live这个值为0后就结束,子进程也结束了。
2、让进程优雅的退出
下面放出这次的假想任务的全部代码,首先在主进程中启动了一个子进程(通过子类化Process类),然后子进程启动后又产生两个子线程,用来模拟“生产者-消费者”模型,两个线程通过一个队列进行交流,为了互斥访问这个队列,自然要加一把锁(condition对象跟Lock对象差不多,不过多了等待和通知的功能);生产者每次产生一个随机数并扔进队列,然后休息一个随机时间,消费者每次从队列取一个数;而子进程中的主线程要负责接收信号,以便让整个过程优雅的结束。
示例代码:
# !/usr/bin/env python # -*- coding:utf-8 -*- # # Copyright 2015 mimvp.com import multiprocessing import threading import Queue import signal import time import os, random from threading import currentThread class Master(multiprocessing.Process): def __init__(self, n, ppid): super(Master, self).__init__() signal.signal(signal.SIGTERM, self.handler) # signal self.__n = n self.__ppid = ppid # 这个变量要传入线程用于控制线程运行,为什么用dict? # 因为可变对象按引用传递,标量是传值的,不信写成self.live = true 是失败,无法结束子线程 self.__live = {'status' : True} # handler signal def handler(self, signum, frame): print 'handler - signal : ', signum self.__live['status'] = False # 置这个变量为0,通知子线程可以“收工”了 def run(self): print 'pid : ', self.pid lock = threading.Lock() cond = threading.Condition(lock) # 创建一个condition对象,用于子线程交互 qu = Queue.Queue() # 定义一个队列,传入共享资源 sender = Sender(cond, self.__live, qu) getter = Getter(cond, self.__live, qu) sender.start() getter.start() # 主线程睡眠并等待信号,在接收到信号前,后面的代码都不会执行 signal.pause() # 主线程收到信号并被唤醒后,执行下面代码,并检查还有多少线程活着(减1除掉自己) while threading.activeCount() - 1: time.sleep(3) # 进程再睡眠等待,确保子线程都安全的结束 print 'Process[%s] - checking live : %d' % (self.name, threading.activeCount()) print 'sub process end.' # 生产者 class Sender(threading.Thread): def __init__(self, cond, live, queue): super(Sender, self).__init__(name='Thread[sender]') self.__cond = cond self.__queue = queue self.__live = live def run(self): while self.__live['status']: # 检查这个进程内的“全局”变量,为真就继续运行 self.__cond.acquire() num = random.randint(1, 100) self.__queue.put(num, block=False) # 非阻塞获取,默认为True if not self.__queue.full(): print 'Thread[sender] put : ', num self.__cond.notify() # 唤醒等待锁的其他线程 self.__cond.release() time.sleep(random.randint(1,3)) print 'Thread[sender] done' # 消费者 class Getter(threading.Thread): def __init__(self, cond, live, queue): super(Getter, self).__init__(name='Thread[getter]') self.__cond = cond self.__queue = queue self.__live = live def run(self): while self.__live['status']: # 检查这个进程内的“全局”变量,为真就继续运行 self.__cond.acquire() if not self.__queue.empty(): num = self.__queue.get(block=False) # 非阻塞获取,默认为True print 'Thread[getter] get : ', num self.__cond.wait(3) self.__cond.release() time.sleep(random.randint(1,3)) print 'Thread[getter] done' def main(): # main process pid, ppid print 'main pid : %d, ppid : %d' % (os.getpid(), os.getppid()) subPro = Master(100, os.getpid()) subPro.start() # 启动子进程 subPro.join() # 阻塞主进程,等待子进程先结束 print 'subPro pid : %d, ppid : %d' % (subPro.pid, os.getpid()) # 测试 if __name__ == '__main__': main() print('end.') time.sleep(20)
运行结果:
main pid : 4041, ppid : 2521
pid : 4046
Thread[sender] put : 99
Thread[getter] get : 99
Thread[sender] put : 64
Thread[sender] put : 27
Thread[sender] put : 38
Thread[getter] get : 64
Thread[sender] put : 19
Thread[getter] get : 27
Thread[sender] put : 62
handler - signal : 15 # 在控制台输入执行命令 kill -TERM 4046 后,才会输出此行
Thread[sender] done
Thread[getter] done
Process[Master-1] - checking live : 1
sub process end.
subPro pid : 4046, ppid : 4041
end.
通过命令 ps -ef | grep -v grep | grep python 查看真实的进程pid和ppid
homer@ubuntu:~/workspace/proxy_client$ ps -ef | grep -v grep | grep python
homer 4041 2521 0 20:31 ? 00:00:00 /usr/bin/python2.7 -u /home/homer/test_processpool.py
homer 4046 4041 0 20:31 ? 00:00:00 /usr/bin/python2.7 -u /home/homer/test_processpool.py
homer@ubuntu:~/workspace/proxy_client$ kill -TERM 4046
在控制台执行命令 kill -TERM 4046 后,此时输出"handler - signal : 15",并唤醒通知主线程继续执行 signal.pause() 下面代码。
需要注意的地方是,在Master的run方法中sender.start()和geter.start()之后,按常理应该接着调用sender.join()和geter.join(),让主线程等待子线程结束,前面说的join的陷阱就在这里,join将主线程阻塞(blocking)住了,主线程无法再捕捉信号,刚开始研究这块时还以为信号处理函数写错了。
网上讨论比较少,这里推荐两篇帖子:
Interruptible thread join in Python
Joining threads but allowing signals to main thread?
参考推荐:
版权所有: 本文系米扑博客原创、转载、摘录,或修订后发表,最后更新于 2015-01-14 02:24:54
侵权处理: 本个人博客,不盈利,若侵犯了您的作品权,请联系博主删除,莫恶意,索钱财,感谢!