生产者消费者问题的Python实现

0x00 背景知识

生产者和消费者是一个常见的问题,它的模型是多个生产者在生产产品,同时多个消费者消费产品,但是仓库容量是有限的,所以当仓库满了生产者需要休眠,而当仓库空了消费者需要唤醒生产者。这里面涉及的问题有互斥和同步。

0x01 一种解决方法

可以使用锁来负责互斥,使用threading.Condition来做同步。threading的完整用法见官方文档,Condition的完整用法见Condition的描述。简单来说,Condition内置了锁,当调用acquire()方法时,自动上锁,release()释放锁。wait()方法会释放锁后阻塞程序,直到被notify()方法唤醒。Talk is cheap, this is the code.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import random
import time
warehouse = []
max_num = 10
condition = threading.Condition()
class ProducerThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while True:
condition.acquire()
if len(warehouse) == max_num:
print "Warehouse is full"
condition.wait() # Release the lock
print "Producer is notified by consumer"
else:
warehouse.append(1)
print "Producer is working, NUM: " + str(len(warehouse))
condition.notify()
condition.release()
time.sleep(random.random())
class ConsumerThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while True:
condition.acquire()
if len(warehouse) == 0:
print "Warehouse is empty"
condition.wait()
print "Consumer is notified by producer"
else:
warehouse.pop()
print "Consumer is working, NUM: " + str(len(warehouse))
condition.notify()
condition.release()
time.sleep(random.random())
if __name__ == "__main__":
c_list = []
p_list = []
for i in range(5):
p = ProducerThread()
p_list.append(p)
p.start()
for i in range(6):
c = ConsumerThread()
c_list.append(c)
c.start()

常见的错误❌代码如下,这是来自网上关于这个知识点的示例,真是天下代码一大抄啊。至于错误原因,读者不难发现。下面的代码会超过仓库的最大存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from threading import Thread, Condition
import time
import random
queue = []
MAX_NUM = 10
condition = Condition()
class ProducerThread(Thread):
def run(self):
nums = range(5)
global queue
while True:
condition.acquire()
if len(queue) == MAX_NUM:
print "Queue full, producer is waiting"
condition.wait()
print "Space in queue, Consumer notified the producer"
num = random.choice(nums)
queue.append(num)
print "Produced", len(queue)
condition.notify()
condition.release()
time.sleep(random.random())
class ConsumerThread(Thread):
def run(self):
global queue
while True:
condition.acquire()
if not queue:
print "Nothing in queue, consumer is waiting"
condition.wait()
print "Producer added something to queue and notified the consumer"
num = queue.pop(0)
print "Consumed", len(queue)
condition.notify()
condition.release()
time.sleep(random.random())
if __name__ == "__main__":
c_list = []
p_list = []
for i in range(5):
p = ProducerThread()
p_list.append(p)
p.start()
for i in range(2):
c = ConsumerThread()
c_list.append(c)
c.start()

0x02 基于Queue的实现

Queue模块内置了Condition,可以使用Queue同时满足互斥和同步两个要求。Queue本来就是线程安全的,而Queue的task_done()方法可以用来同步。Queue的详细使用见官方文档。这个实现代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import threading
import time
import random
product_queue = Queue.Queue(10)
class ProducerThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while True:
product_queue.put(1, block=True)
time.sleep(random.random())
class ConsumerThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while True:
product_queue.get(block=True)
product_queue.task_done()
time.sleep(random.random())
if __name__ == "__main__":
c_list = []
p_list = []
for i in range(5):
p = ProducerThread()
p_list.append(p)
p.start()
for i in range(10):
c = ConsumerThread()
c_list.append(c)
c.start()

Queue也是用Condition来实现的,注意:

Note: the notify() and notifyAll() methods don’t release the lock; this means that the thread or threads awakened will not return from their wait() call immediately, but only when the thread that called notify() or notifyAll() finally relinquishes ownership of the lock.