Python 多线程解析
线程模块
Python 通过两个标准库 thread 和 threading 提供对线程的支持。Python 的 thread 模块是比较底层的模块,Python 的 threading 模块是对 thread 做了一些包装的,可以更加方便的被使用。
thread 模块提供的其他方法:
-
start_new_thread(function,args,kwargs=None)
:生一个新线程,在新线程中用指定参数和可选的 kwargs 调用 function 函数 -
allocate_lock()
:分配一个 LockType 类型的锁对象(注意,此时还没有获得锁) -
interrupt_main()
:在其他线程中终止主线程 -
get_ident()
:获得一个代表当前线程的魔法数字,常用于从一个字典中获得线程相关的数据。这个数字本身没有任何含义,并且当线程结束后会被新线程复用 -
exit()
:退出线程
LockType 类型锁对象的函数:
-
acquire(wait=None)
:尝试获取锁对象 -
locked()
:如果获得了锁对象返回 True,否则返回 False -
release()
:释放锁
thread 还提供了一个 ThreadLocal 类用于管理线程相关的数据,名为 thread._local
,threading 中引用了这个类。
由于 thread 提供的线程功能不多,无法在主线程结束后继续运行,不提供条件变量等等原因,一般不使用 thread 模块。
threading 模块提供的其他方法:
-
threading.currentThread()
:返回当前的线程变量。 -
threading.enumerate()
:返回一个包含正在运行的线程的 list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 -
threading.activeCount()
:返回正在运行的线程数量,与len(threading.enumerate())
有相同的结果。
除了使用方法外,线程模块同样提供了 Thread 类来处理线程,构造方法:
Thread(group=None, target=None, name=None, args=(), kwargs={})
参数说明:
-
group
:线程组,目前还没有实现,库引用中提示必须是 None; -
target
:要执行的方法; -
name
:线程名; -
args/kwargs
:要传入方法的参数。
Thread 类提供了以下实例方法:
-
run()
:用以表示线程活动的方法。 -
start()
:启动线程活动。 -
join([timeout])
:阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的 timeout(可选参数)。 -
isAlive()
:返回线程是否活动的。 -
getName()
:返回线程名。 -
setName()
:设置线程名。
threading 模块提供的类:Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。
使用 thread 模块创建线程
函数式: 调用 thread 模块中的 start_new_thread()
函数来产生新线程。语法如下:
thread.start_new_thread ( function, args[, kwargs] )
参数说明:
-
function
- 线程函数。 -
args
- 传递给线程函数的参数,他必须是个 tuple 类型。 -
kwargs
- 可选参数。
例子1:
# -* - coding: UTF-8 -* -
import thread
import time
# 为线程定义一个函数
def print_time( threadName, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
print "%s: %s" % ( threadName, time.ctime(time.time()) )
# 创建两个线程
try:
thread.start_new_thread( print_time, ("Thread-1", 1, ) )
thread.start_new_thread( print_time, ("Thread-2", 2, ) )
except:
print "Error: unable to start thread"
while 1:
pass
输出结果如下:
Thread-1: Tue Dec 23 14:54:51 2014
Thread-2: Tue Dec 23 14:54:52 2014
Thread-1: Tue Dec 23 14:54:52 2014
Thread-1: Tue Dec 23 14:54:53 2014
Thread-2: Tue Dec 23 14:54:54 2014
Thread-1: Tue Dec 23 14:54:54 2014
Thread-1: Tue Dec 23 14:54:55 2014
Thread-2: Tue Dec 23 14:54:56 2014
Thread-2: Tue Dec 23 14:54:58 2014
Thread-2: Tue Dec 23 14:55:00 2014
主线程没有结束,是因为有个 while 循环一直在执行 pass 语句,导致程序一直没有退出。如果想要主线程主动结束,可以在线程函数中调用 thread.exit()
,他抛出SystemExit exception,达到退出线程的目的。
使用 Threading 模块创建线程
使用 Threading 模块创建线程,直接从 threading.Thread
继承,然后重写 __init__
方法和run
方法。
例子2:
# -* - coding: UTF-8 -* -
#!/usr/bin/python
from threading import Thread
import time
#继承父类threading.Thread
class myThread(Thread):
def __init__(self, name, delay):
Thread.__init__(self)
self.name = name
self.delay = delay
def run(self): #把要执行的代码写到run函数里面 线程在创建后会直接运行run函数
print "Starting " + self.name
print_time(self.name, self.delay, 5)
print "Exiting " + self.name
def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print "%s: %s" % (threadName, time.ctime(time.time()))
counter -= 1
# 创建新线程
thread1 = myThread("Thread-1", 1)
thread2 = myThread("Thread-2", 2)
# 开启线程
thread1.start()
thread2.start()
print "Exiting Main Thread"
执行结果如下:
Starting Thread-1
Starting Thread-2
Starting Thread-3
Thread-3 processing One
Thread-2 processing Two
Thread-3 processing Three
Thread-1 processing Four
Thread-2 processing Five
Exiting Thread-3
Exiting Thread-1
Exiting Thread-2
Exiting Main Thread
线程同步
如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。
使用 Thread 对象的 Lock
和 Rlock
可以实现简单的线程同步,这两个对象都有 acquire 方法和 release 方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和 release 方法之间。
Lock(指令锁)是可用的最低级的同步指令。Lock 处于锁定状态时,不被特定的线程拥有。Lock 包含两种状态——锁定和非锁定,以及两个基本的方法。
可以认为 Lock 有一个锁定池,当线程请求锁定时,将线程至于池中,直到获得锁定后出池。池中的线程处于状态图中的同步阻塞状态。
构造方法:
Lock()
实例方法:
-
acquire([timeout])
:使线程进入同步阻塞状态,尝试获得锁定。 -
release()
:释放锁。使用前线程必须已获得锁定,否则将抛出异常。
RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock 使用了拥有的线程和递归等级的概念,处于锁定状态时,RLock 被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用 release() 相同次数。
可以认为 RLock 包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release()
,计数器将 +1/-1,为0时锁处于未锁定状态。
例子3:
# -* - coding: UTF-8 -* -
#!/usr/bin/python
from threading import Thread,Lock
import time
threadLock = Lock()
class myThread (Thread):
def __init__(self, name, delay):
Thread.__init__(self)
self.name = name
self.delay = delay
def run(self):
print "Starting " + self.name
# 获得锁,成功获得锁定后返回True
# 可选的timeout参数不填时将一直阻塞直到获得锁定
# 否则超时后将返回False
threadLock.acquire()
print_time(self.name, self.delay, 3)
# 释放锁
threadLock.release()
def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print "%s: %s" % (threadName, time.ctime(time.time()))
counter -= 1
# 创建新线程
thread1 = myThread( "Thread-1", 1)
thread2 = myThread("Thread-2", 2)
# 开启新线程
thread1.start()
thread2.start()
# 等待所有线程完成
thread1.join()
thread2.join()
print "Exiting Main Thread"
运行结果:
Starting Thread-1
Starting Thread-2
Thread-1: Tue Dec 23 16:06:32 2014
Thread-1: Tue Dec 23 16:06:33 2014
Thread-1: Tue Dec 23 16:06:34 2014
Thread-2: Tue Dec 23 16:06:36 2014
Thread-2: Tue Dec 23 16:06:38 2014
Thread-2: Tue Dec 23 16:06:40 2014
Exiting Main Thread
例子3和例子2的区别在于,例子上中 print_time
方法前后添加了 threadLock 的两个方法,并且在主线程调用了两个线程的 join 方法,
使得主线程阻塞直到两个子线程运行完成。待子线程运行完成之后,最后才会打印 Exiting Main Thread
,即表示主线程运行完成。
除了使用 Lock 类获取锁之外,我们还可以使用 Condition 类,condition 的 acquire() 和 release() 方法内部调用了 lock 的 acquire() 和 release(),所以我们可以用 condiction 实例取代 lock 实例,但 lock 的行为不会改变。
线程优先级队列
Python 的 Queue 模块中提供了同步的、线程安全的队列类,包括 FIFO(先入先出)队列 Queue,LIFO(后入先出)队列 LifoQueue,和优先级队列 PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。
Queue模块中的常用方法:
-
Queue.qsize()
:返回队列的大小 -
Queue.empty()
:如果队列为空,返回 True,反之 False -
Queue.full()
:如果队列满了,返回 True,反之 False -
Queue.full
:与 maxsize 大小对应 -
Queue.get([block[, timeout]])
:获取队列,timeout 等待时间 -
Queue.get_nowait()
:相当Queue.get(False)
-
Queue.put(item)
:写入队列,timeout 等待时间 -
Queue.put_nowait(item)
:相当Queue.put(item, False)
-
Queue.task_done()
:在完成一项工作之后,Queue.task_done()
函数向任务已经完成的队列发送一个信号 -
Queue.join()
:实际上意味着等到队列为空,再执行别的操作
实例4:
# -* - coding: UTF-8 -* -
#!/usr/bin/python
from threading import Thread,Lock
import Queue
import time
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
workQueue = Queue.Queue(10)
queueLock = Lock()
threads = []
exitFlag = 0
class myThread (Thread):
def __init__(self, name, q):
Thread.__init__(self)
self.name = name
self.q = q
def run(self):
print "Starting " + self.name
process_data(self.name, self.q)
print "Exiting " + self.name
def process_data(threadName, q):
while not exitFlag:
queueLock.acquire()
if not workQueue.empty():
data = q.get()
queueLock.release()
print "%s processing %s" % (threadName, data)
else:
queueLock.release()
time.sleep(1)
# 创建新线程
for tName in threadList:
thread = myThread(tName, workQueue)
thread.start()
threads.append(thread)
# 填充队列
queueLock.acquire()
for word in nameList:
workQueue.put(word)
queueLock.release()
# 等待队列清空
while not workQueue.empty():
pass
# 通知线程是时候退出
exitFlag = 1
# 等待所有线程完成
for t in threads:
t.join()
print "Exiting Main Thread"
例子4中创建了3个线程读取队列的数据,当队列为空时候,三个线程停止运行,另外主线程会一直阻塞直到三个子线程运行完毕,最后再打印 "Exiting Main Thread"。
生产者和消费者模型
生产者的工作是产生一块数据,放到 buffer 中,如此循环。与此同时,消费者在消耗这些数据(例如从 buffer 中把它们移除),每次一块。
这个为描述了两个共享固定大小缓冲队列的进程,即生产者和消费者。
示例5:
from threading import Thread, Condition
import time
import random
queue = []
MAX_NUM = 10
condition = Condition()
class ProducerThread(Thread):
def run(self):
nums = range(5)
global queue
while True:
condition.acquire()
if len(queue) == MAX_NUM:
print "Queue full, producer is waiting"
condition.wait()
print "Space in queue, Consumer notified the producer"
num = random.choice(nums)
queue.append(num)
print "Produced", num
condition.notify()
condition.release()
time.sleep(random.random())
class ConsumerThread(Thread):
def run(self):
global queue
while True:
condition.acquire()
if not queue:
print "Nothing in queue, consumer is waiting"
condition.wait()
print "Producer added something to queue and notified the consumer"
num = queue.pop(0)
print "Consumed", num
condition.notify()
condition.release()
time.sleep(random.random())
ProducerThread().start()
ConsumerThread().start()
上面例子中使用了 Condition,Condition(条件变量)通常与一个锁关联。需要在多个 Contidion 中共享一个锁时,可以传递一个 Lock/RLock 实例给构造方法,否则它将自己生成一个RLock实例。
可以认为,除了 Lock 带有的锁定池外,Condition 还包含一个等待池,池中的线程处于状态图中的等待阻塞状态,直到另一个线程调用 notify()/notifyAll() 通知;得到通知后线程进入锁定池等待锁定。
构造方法:
Condition([lock/rlock])
实例方法:
-
acquire([timeout])/release()
:调用关联的锁的相应方法。 -
wait([timeout])
:调用这个方法将使线程进入 Condition 的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。 -
notify()
:调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用 acquire() 尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。 -
notifyAll()
:调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
例子5中生产者和消费者共享一个 list 集合,其实也可以换成 queue。
例子6:
from threading import Thread
import time
import random
from Queue import Queue
queue = Queue(3)
class ProducerThread(Thread):
def run(self):
nums = range(5)
global queue
while True:
num = random.choice(nums)
queue.put(num)
print "Produced", num
time.sleep(random.random())
class ConsumerThread(Thread):
def run(self):
global queue
while True:
num = queue.get()
queue.task_done()
print "Consumed", num
time.sleep(random.random())
ProducerThread().start()
ConsumerThread().start()
解释:
- 在原来使用 list 的位置,改为使用 Queue 实例(下称队列)。
- 这个队列有一个 condition ,它有自己的 lock。如果你使用 Queue,你不需要为 condition 和 lock 而烦恼。
- 生产者调用队列的 put 方法来插入数据。
- put() 在插入数据前有一个获取 lock 的逻辑。
- 同时,put() 也会检查队列是否已满。如果已满,它会在内部调用 wait(),生产者开始等待。
- 消费者使用get方法。
- get() 从队列中移出数据前会获取 lock。
- get() 会检查队列是否为空,如果为空,消费者进入等待状态。
- get() 和 put() 都有适当的 notify()。
总结
本文主要介绍了 Python 中创建多线程的两种方法,并简单说了 threading 模块中的 Lock、RLock、Condition 三个类以及 Queue 类的使用方法,另外,还通过代码实现了生产者和消费者模型。
参考文章
原文: JavaChen's Blog
版权所有: 本文系米扑博客原创、转载、摘录,或修订后发表,最后更新于 2014-12-25 10:39:40
侵权处理: 本个人博客,不盈利,若侵犯了您的作品权,请联系博主删除,莫恶意,索钱财,感谢!
转载注明: Python 多线程解析 (米扑博客)