前几篇博客,研究了Python的并发编程,所谓并发无非多线程和多进程,最初找到的是threading模块,因为印象中线程“轻量...”,“切换快...”,“可共享进程资源...”等等,但是没想到这里水很深,进而找到了更好的替代品multiprocessing模块。

测试环境:

Ubuntu 14.04.1 LTS       

Python 2.7.6

 

一、使用threading模块创建线程

1、三种线程创建方式

1)传入一个函数

这种方式是最基本的,即调用threading中的Thread类的构造函数,然后指定参数target=func

再使用返回的Thread的实例调用start()方法,即开始运行该线程,该线程将执行函数func,

当然,如果func需要参数,可以在Thread的构造函数中传入参数args=(...)

示例代码:

# !/usr/bin/env python  
# -*- coding:utf-8 -*-  
#
# Copyright 2015 mimvp.com
  
import threading
import time
  
# 线程执行任务的函数
def counter(n):
    count = 0
    for i in xrange(n):
        i += 1
        count += i
        time.sleep(0.01)
    print 'n : %d, count : %d' % (n, count)
  
  
def main():
    thd = threading.Thread(target=counter, args=(100,))
    thd.start()
    thd.join()      # 阻塞主线程,等待子线程先结束
    
      
# 测试
if __name__ == '__main__':  
    main()
    print('end.')

运行结果:

n : 100, count : 5050
end.

上面这段代码很直观,counter函数是一个循环计数任务。

需要注意的是th.join()这句,其意思是主线程将自我阻塞,然后等待子线程thd执行完毕再结束

如果没有这句,运行代码会立即结束,表现为先执行主线程打印"end.",然后等子线程执行完毕打印"n : 100, count : 5050",其输出结果如下:

end.
n : 100, count : 5050

线程join的意思比较晦涩,其实将这句理解成这样会好理解些“while thd.is_alive(): time.sleep(0.01)”。虽然意思相同,但是后面将看到,使用join也有陷阱。

 

2)传入一个可调用的对象

许多的python 对象都是我们所说的可调用的,即是任何能通过函数操作符“()”来调用的对象(见《python核心编程》第14章)类的对象也是可以调用的,当被调用时会自动调用对象的内建方法__call__(),因此这种新建线程的方法就是给线程指定一个__call__方法被重载了的对象。

示例代码:

# !/usr/bin/env python  
# -*- coding:utf-8 -*-  
#
# Copyright 2015 mimvp.com
  
import threading
import time
  
class Callable(object):
    def __init__(self, func, args):
        self.__func = func
        self.__args = args
        
    def __call__(self):
        apply(self.__func, self.__args)
  
  
# 线程执行任务的函数
def counter(n):
    count = 0
    for i in xrange(n):
        i += 1
        count += i
        time.sleep(0.01)
    print 'n : %d, count : %d' % (n, count)
  
  
def main():
    thd = threading.Thread(target=Callable(counter, args=(100,)))
    thd.start()
    thd.join()      # 阻塞主线程,等待子线程先结束
    
      
# 测试
if __name__ == '__main__':  
    main()
    print('end.')

运行结果:

n : 100, count : 5050
end.

这个例子关键的一句是 apply(self.__func, self.__args) 这里使用初始化时传入的函数对象及其参数来进行一次调用。

 

3)继承Thread类

这种方式通过继承Thread类,并重载其run方法,来实现自定义的线程行为

示例代码:

# !/usr/bin/env python  
# -*- coding:utf-8 -*-  
#
# Copyright 2015 mimvp.com
  
import threading
import time
  
class SubThread(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self, name=name)
        
    # 重写父类的run方法
    def run(self):
        time.sleep(0.1)
        print 'Thread[%s] counter start...' % self.name
        counter(100)
        print 'Thread[%s] counter end.' % self.name
  
# 线程执行任务的函数
def counter(n):
    count = 0
    for i in xrange(n):
        i += 1
        count += i
    print 'n : %d, count : %d' % (n, count)
  
  
def main():
    for i in xrange(4):
        i += 1
        thd = SubThread('thread-' + str(i))
        thd.start()
        thd.join()      # 阻塞主线程,等待子线程先结束
    
      
# 测试
if __name__ == '__main__':  
    main()
    print('end.')

运行结果:

Thread[thread-1] counter start...
n : 100, count : 5050
Thread[thread-1] counter end.
Thread[thread-2] counter start...
n : 100, count : 5050
Thread[thread-2] counter end.
Thread[thread-3] counter start...
n : 100, count : 5050
Thread[thread-3] counter end.
Thread[thread-4] counter start...
n : 100, count : 5050
Thread[thread-4] counter end.
end.

这个例子定义了一个SubThread类,它继承了Thread类,并重载了run方法

测试时,循环开启了四个线程调用counter()任务,并打印一些信息,可以看到这种方式比较直观。

注意:在构造函数中要记得先调用父类的构造函数进行初始化,如 threading.Thread.__init__(self, name=name)

上例中,开启了四个线程执行counter()任务函数,且子线程start启动后采用 thd.join() 阻塞等待其执行完毕,因此输出结果是顺序的。

如果子线程start启动后不采用 thd.join() 阻塞方法(注释掉 thd.join() 一行),其输出结果可能不是顺序的,输出结果如下:

end.
Thread[thread-1] counter start...
Thread[thread-2] counter start...
n : 100, count : 5050
Thread[thread-2] counter end.
Thread[thread-3] counter start...
n : 100, count : 5050
Thread[thread-3] counter end.
n : 100, count : 5050
Thread[thread-1] counter end.
Thread[thread-4] counter start...
n : 100, count : 5050
Thread[thread-4] counter end.

 

2、python多线程的限制

python多线程有一个非常大的限制,全局解释器锁(GIL,Global Interpreter Lock),这个锁的意思是任一时刻只能有一个线程使用解释器,跟单cpu跑多个程序一个意思,大家都是轮着用的,这叫“并发”,不是“并行”。手册上的解释是为了保证对象模型的正确性!

这个锁造成的困扰是如果有一个计算密集型的线程占着cpu,其他的线程都得等着....,试想你的多个线程中有这么一个计算密集型线程(主要耗CPU资源),得多悲剧,多线程生生被搞成串行。

当然这个模块也不是毫无用处,手册上又说了:当用于IO密集型任务时,IO期间线程会释放解释器,这样别的线程就有机会使用解释器了,所以是否使用这个模块需要考虑面对的任务类型

 

二、使用multiprocessing创建进程

1、三种创建方式

进程的创建方式跟线程完全一致,只不过要将threading.Thread换成multiprocessing.Process

multiprocessing模块尽力保持了与threading模块在方法名上的一致性,示例代码可参考上面线程部分的,这里只给出第一种使用函数的方式。

示例代码:

# !/usr/bin/env python  
# -*- coding:utf-8 -*-  
#
# Copyright 2015 mimvp.com
  
import multiprocessing
import time
import os
  
# 线程执行任务的函数
def counter(n):
    count = 0
    for i in xrange(n):
        i += 1
        count += i
        time.sleep(0.01)
    print 'n : %d, count : %d' % (n, count)
    time.sleep(10)
  
  
def main():
    print 'main pid : %d, ppid : %d' % (os.getpid(), os.getppid())
    subPro = multiprocessing.Process(target=counter, args=(100,))
    subPro.start()
    subPro.join()      # 阻塞主进程,等待子进程先结束
    print 'subPro pid : %d, ppid : %d' % (subPro.pid, os.getpid())
    
      
# 测试
if __name__ == '__main__':  
    main()
    print('end.')
    time.sleep(20)

运行结果:

main pid : 23969, ppid : 2521
n : 100, count : 5050
subPro pid : 23974, ppid : 23969
end.

上面代码运行时,在子进程和主进程中特意添加了 time.sleep(10) 和 time.sleep(20),目的是通过命令 ps -ef | grep -v grep | grep python 查看真实的进程pid和ppid,本示例如下:

homer@ubuntu:~/workspace/proxy_client$ ps -ef | grep -v grep | grep python
homer    23969  2521  0 17:43 ?        00:00:00 /usr/bin/python2.7 -u /home/homer/test_processpool.py
homer  
 23974 23969  0 17:43 ?        00:00:00 /usr/bin/python2.7 -u /home/homer/test_processpool.py

从上面结果对比中,可以看到 ps -ef 和 Python代码输出的进程pid和ppid是匹配的。

另外,进程join方法 subPro.join() 的目的和线程一样,都是阻塞父进程来等待子进程执行完毕后,再继续执行父进程后面的代码,具体表现在输出 "end."在子进程执行完毕后才打印出来。

 

2、创建进程池

该模块还允许一次创建一组进程,然后再给他们分配任务,详细内容可参考手册。

代码示例:

# !/usr/bin/env python  
# -*- coding:utf-8 -*-  
#
# Copyright 2015 mimvp.com
  
import multiprocessing
import time
import os
  
# 线程执行任务的函数
def counter(n, processName, ppid):
    count = 0
    for i in xrange(n):
        i += 1
        count += i
        time.sleep(0.01)
    
    # sub process pid, ppid
    print 'sub pid : %d, ppid : %d, from Process[%s]' % (os.getpid(), ppid, processName)
    print 'Process[%s], n : %d, count : %d' % (processName, n, count)
    time.sleep(10)
    
    return processName + " done."
  
  
def main():
    # main process pid, ppid
    print 'main pid : %d, ppid : %d' % (os.getpid(), os.getppid())
    subPool = multiprocessing.Pool(4)
    for i in xrange(4):
        processName = 'Process-' + str(i+1)
        result = subPool.apply_async(counter, args=(100, processName, os.getpid()))
        print processName + ' - result : ' + str(result.get())
    subPool.close()
    subPool.join()
    
      
# 测试
if __name__ == '__main__':  
    main()
    print('end.')
    time.sleep(20)

运行结果:

main pid : 25895, ppid : 2521
sub pid : 25900, ppid : 25895, from Process[Process-1]
Process[Process-1], n : 100, count : 5050
Process-1 - result : Process-1 done.
sub pid : 25901, ppid : 25895, from Process[Process-2]
Process[Process-2], n : 100, count : 5050
Process-2 - result : Process-2 done.
sub pid : 25902, ppid : 25895, from Process[Process-3]
Process[Process-3], n : 100, count : 5050
Process-3 - result : Process-3 done.
sub pid : 25903, ppid : 25895, from Process[Process-4]
Process[Process-4], n : 100, count : 5050
Process-4 - result : Process-4 done.
end.

通过命令 ps -ef | grep -v grep | grep python 查看真实的进程pid和ppid

homer@ubuntu:~/workspace/proxy_client$ ps -ef | grep -v grep | grep python
homer    25895  2521  1 18:06 ?        00:00:00 /usr/bin/python2.7 -u /home/homer/test_processpool.py
homer    25900 25895  0 18:06 ?        00:00:00 /usr/bin/python2.7 -u /home/homer/test_processpool.py
homer    25901 25895  0 18:06 ?        00:00:00 /usr/bin/python2.7 -u /home/homer/test_processpool.py
homer    25902 25895  0 18:06 ?        00:00:00 /usr/bin/python2.7 -u /home/homer/test_processpool.py
homer    25903 25895  0 18:06 ?        00:00:00 /usr/bin/python2.7 -u /home/homer/test_processpool.py

 

3、使用进程的好处

完全并行,无GIL的限制,可充分利用多cpu多核的环境;可以接受linux信号,后面将看到这个功能非常好用。

 

三、实例研究

该实例假想的任务是:

一个主进程会启动多个子进程分别处理不同的任务,各个子进程可能又有自己的线程用于不同的IO处理(前面说过,线程在IO方面还是不错的)

要实现的功能是,对这些子进程发送信号,能被正确的处理,例如:发送信号 SIGTERM,子进程能通知其线程收工,然后“优雅”的退出。

现在要解决的问题有:

1)在子类化的Process对象中如何捕捉信号;

2)如何“优雅的退出”。

 

下面分别说明

1、子类化Process并捕捉信号,全局函数实现

如果是使用第一种进程创建方式(传入函数),那么捕捉信号很容易,假设给进程运行的函数叫func

代码示例:

# !/usr/bin/env python  
# -*- coding:utf-8 -*-  
#
# Copyright 2015 mimvp.com
  
import multiprocessing
import signal
import time
import os
  
def handler(signum, frame):
    print 'handler - signal', signum
  
# 线程执行任务的函数
def counter(n, ppid):
    # main process pid, ppid
    print 'main pid : %d, ppid : %d' % (os.getpid(), ppid)
    
    signal.signal(signal.SIGTERM, handler)
    signal.signal(signal.SIGINT, handler)
    
    count = 0
    for i in xrange(n):
        i += 1
        count += i
        time.sleep(0.01)
    print 'n : %d, count : %d' % (n, count)
    time.sleep(15)
  
  
def main():
    # main process pid, ppid
    print 'main pid : %d, ppid : %d' % (os.getpid(), os.getppid())
    
    subPro = multiprocessing.Process(target=counter, args=(100, os.getpid()))
    subPro.start()
#     subPro.join()      # 阻塞主进程,等待子进程先结束
    print 'subPro pid : %d, ppid : %d' % (subPro.pid, os.getpid())
    
      
# 测试
if __name__ == '__main__':  
    main()
    print('end.')
    time.sleep(20)

运行结果:

main pid : 29426, ppid : 2521
subPro pid : 29431, ppid : 29426
end.
main pid : 29431, ppid : 29426
n : 100, count : 5050
handler - signal 15                            
#  在控制台输入执行命令 kill -TERM 29431 后,才会输出此行

上面这段代码,是在第一种创建多进程方式的基础上修改而来的,增加了两行signal.signal(...)调用,说明这个函数要捕捉SIGTERM和SIGINT两个信号。

另外增加了一个handler函数,该函数用于捕捉到信号时进行相应的处理,我们这里只是简单的打印出信号值。

注意:

subPro.join() 被注释掉了,这里跟线程的情况有点区别,新的进程启动后就开始运行了,主进程也不用等待它运行完,可以该干嘛干嘛去。

这段代码运行后会打印出子进程的进程id(subPro pid : 29431, ppid : 29426),根据这个id=29431,在控制台终端输入: kill -TERM 29431,会发现在终端打印出了"handler - signal 15 "

 

但是使用传入函数的方式有一点不好的是封装性太差,如果功能稍微复杂点,将会有很多的全局变量暴露在外,最好还是将功能封装成类,那么使用类又怎么注册信号相应函数呢?

上面的例子貌似只能使用一个全局的函数,手册也没有给出在类中处理信号的例子,其实解决方法大同小异,也很容易,推荐一篇StackOverflow的帖子:Python signal: reading return from signal handler function

 

子进程捕捉信号(多进程类实现)

代码示例:

# !/usr/bin/env python  
# -*- coding:utf-8 -*-  
#
# Copyright 2015 mimvp.com
  
import multiprocessing
import signal
import time
import os
  
class Master(multiprocessing.Process):
    def __init__(self, n, ppid):
        super(Master, self).__init__()
        signal.signal(signal.SIGTERM, self.handler)     # signal
        self.__n = n
        self.__ppid = ppid
        self.__live = 1
        
    # handler signal
    def handler(self, signum, frame):
        print 'handler - signal : ', signum
        self.__live = 0
        
    def run(self):
        print 'pid : ', self.pid
        while self.__live:
            print 'Process[%s] living...' % self.name       # subprocess name
            counter(self.__n, self.__ppid)                  # task function
            time.sleep(2)
        
  
def handler(signum, frame):
    print 'handler - signal : ', signum
  
# 线程执行任务的函数
def counter(n, ppid):
    # main process pid, ppid
    print 'main pid : %d, ppid : %d' % (os.getpid(), ppid)
    
    count = 0
    for i in xrange(n):
        i += 1
        count += i
        time.sleep(0.01)
    print 'n : %d, count : %d' % (n, count)
  
  
def main():
    # main process pid, ppid
    print 'main pid : %d, ppid : %d' % (os.getpid(), os.getppid())
    
    subPro = Master(100, os.getpid())
    subPro.start()
#     subPro.join()      # 阻塞主进程,等待子进程先结束
    print 'subPro pid : %d, ppid : %d' % (subPro.pid, os.getpid())
    
      
# 测试
if __name__ == '__main__':  
    main()
    print('end.')
    time.sleep(20)

运行结果:

main pid : 31115, ppid : 2521
subPro pid : 31121, ppid : 31115
end.
pid :  31121
Process[Master-1] living...
main pid : 31121, ppid : 31115
n : 100, count : 5050
Process[Master-1] living...
main pid : 31121, ppid : 31115
n : 100, count : 5050
Process[Master-1] living...
main pid : 31121, ppid : 31115
n : 100, count : 5050
handler - signal :  15
                    #  在控制台输入执行命令 kill -TERM 29431 后,才会输出此行

上面示例方法很直观,首先在构造函数中注册信号处理函数signal.signal(signal.SIGTERM, self.handler),然后定义了一个方法handler(self, signum, frame)作为信号处理函数。

这个进程类会每隔2秒打印一个“Process[xxx] living...”,当接收到SIGTERM后,改变self.__live的值,run方法的循环在检测到self.__live这个值为0后就结束,子进程也结束了。

 

2、让进程优雅的退出

下面放出这次的假想任务的全部代码,首先在主进程中启动了一个子进程(通过子类化Process类),然后子进程启动后又产生两个子线程,用来模拟“生产者-消费者”模型,两个线程通过一个队列进行交流,为了互斥访问这个队列,自然要加一把锁(condition对象跟Lock对象差不多,不过多了等待和通知的功能);生产者每次产生一个随机数并扔进队列,然后休息一个随机时间,消费者每次从队列取一个数;而子进程中的主线程要负责接收信号,以便让整个过程优雅的结束。

示例代码:

# !/usr/bin/env python  
# -*- coding:utf-8 -*-  
#
# Copyright 2015 mimvp.com
  
import multiprocessing
import threading
import Queue
import signal
import time
import os, random
from threading import currentThread
  
class Master(multiprocessing.Process):
    def __init__(self, n, ppid):
        super(Master, self).__init__()
        signal.signal(signal.SIGTERM, self.handler)     # signal
        self.__n = n
        self.__ppid = ppid
        
        # 这个变量要传入线程用于控制线程运行,为什么用dict?
        # 因为可变对象按引用传递,标量是传值的,不信写成self.live = true 是失败,无法结束子线程
        self.__live = {'status' : True}
        
    # handler signal
    def handler(self, signum, frame):
        print 'handler - signal : ', signum
        self.__live['status'] = False       # 置这个变量为0,通知子线程可以“收工”了
        
    def run(self):
        print 'pid : ', self.pid
        
        lock = threading.Lock()
        cond = threading.Condition(lock)    # 创建一个condition对象,用于子线程交互
        qu = Queue.Queue()                  # 定义一个队列,传入共享资源
        
        sender = Sender(cond, self.__live, qu)
        getter = Getter(cond, self.__live, qu)
        sender.start()
        getter.start()
            
        # 主线程睡眠并等待信号,在接收到信号前,后面的代码都不会执行
        signal.pause()
        
        # 主线程收到信号并被唤醒后,执行下面代码,并检查还有多少线程活着(减1除掉自己)
        while threading.activeCount() - 1:  
            time.sleep(3)                   # 进程再睡眠等待,确保子线程都安全的结束
            print 'Process[%s] - checking live : %d' % (self.name, threading.activeCount())
        print 'sub process end.'
        
# 生产者
class Sender(threading.Thread):
    def __init__(self, cond, live, queue):
        super(Sender, self).__init__(name='Thread[sender]')
        self.__cond = cond
        self.__queue = queue
        self.__live = live
        
    def run(self):
        while self.__live['status']:      # 检查这个进程内的“全局”变量,为真就继续运行
            self.__cond.acquire()
            num = random.randint(1, 100)
            self.__queue.put(num, block=False)   # 非阻塞获取,默认为True
            if not self.__queue.full():
                print 'Thread[sender] put : ', num
            self.__cond.notify()    # 唤醒等待锁的其他线程
            self.__cond.release()
            time.sleep(random.randint(1,3))
        print 'Thread[sender] done'
  
  
# 消费者
class Getter(threading.Thread):
    def __init__(self, cond, live, queue):
        super(Getter, self).__init__(name='Thread[getter]')
        self.__cond = cond
        self.__queue = queue
        self.__live = live
        
    def run(self):
        while self.__live['status']:      # 检查这个进程内的“全局”变量,为真就继续运行
            self.__cond.acquire()
            if not self.__queue.empty():
                num = self.__queue.get(block=False)     # 非阻塞获取,默认为True
                print 'Thread[getter] get : ', num
            self.__cond.wait(3)
            self.__cond.release()
            time.sleep(random.randint(1,3))
        print 'Thread[getter] done'
  
  
def main():
    # main process pid, ppid
    print 'main pid : %d, ppid : %d' % (os.getpid(), os.getppid())
    
    subPro = Master(100, os.getpid())
    subPro.start()      # 启动子进程  
    subPro.join()       # 阻塞主进程,等待子进程先结束
    print 'subPro pid : %d, ppid : %d' % (subPro.pid, os.getpid())
    
      
# 测试
if __name__ == '__main__':  
    main()
    print('end.')
    time.sleep(20)

运行结果:

main pid : 4041, ppid : 2521
pid :  4046
Thread[sender] put :  99
Thread[getter] get :  99
Thread[sender] put :  64
Thread[sender] put :  27
Thread[sender] put :  38
Thread[getter] get :  64
Thread[sender] put :  19
Thread[getter] get :  27
Thread[sender] put :  62
handler - signal :  15                
     #  在控制台输入执行命令 kill -TERM 4046 后,才会输出此行
Thread[sender] done
Thread[getter] done
Process[Master-1] - checking live : 1
sub process end.
subPro pid : 4046, ppid : 4041
end.

通过命令 ps -ef | grep -v grep | grep python 查看真实的进程pid和ppid

homer@ubuntu:~/workspace/proxy_client$ ps -ef | grep -v grep | grep python 
homer     4041  2521  0 20:31 ?        00:00:00 /usr/bin/python2.7 -u /home/homer/test_processpool.py
homer     4046  4041  0 20:31 ?        00:00:00 /usr/bin/python2.7 -u /home/homer/test_processpool.py
homer@ubuntu:~/workspace/proxy_client$
kill -TERM 4046

 

在控制台执行命令 kill -TERM 4046 后,此时输出"handler - signal :  15",并唤醒通知主线程继续执行 signal.pause() 下面代码。

需要注意的地方是,在Master的run方法中sender.start()和geter.start()之后,按常理应该接着调用sender.join()和geter.join(),让主线程等待子线程结束,前面说的join的陷阱就在这里,join将主线程阻塞(blocking)住了,主线程无法再捕捉信号,刚开始研究这块时还以为信号处理函数写错了。

网上讨论比较少,这里推荐两篇帖子:

Interruptible thread join in Python

Joining threads but allowing signals to main thread?

 

参考推荐:

Python multiprocessing多进程并发

Python 线程池的研究及实现