头哥Python魔鬼训练:解密生产者-消费者问题中的线程同步
1
任务描述
生产者-消费者问题是一个经典的多进程同步问题:
本关任务:利用 Python 的 Condition 对象,解决多线程的生产者-消费者问题。
相关知识
Condition对象
Condition 被称为条件变量,除了提供与 Lock 类似的 acquire 和 release 方法外,还提供了 wait 和 notify 方法。 线程首先 acquire 一个条件变量,然后判断一些条件。如果条件不满足则 wait ;如果条件满足,进行一些处理改变条件后,通过 notify 方法通知其他线程,其他处于 wait 状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。
解决思路
问题的核心是:
- 保证不让生产者在缓存还是满的时候,仍然要向内写数据;
- 不让消费者试图从空的缓存中取出数据。
编程要求
本关的编程任务是,补全右侧编辑器中 Begin 至 End 区间的代码,具体要求如下:
测试说明
平台将运行用户补全后的代码文件,并生成若干组测试数据,接着根据程序的输出判断程序是否正确。
以下是测试样例:
测试输入: 20
预期输出:
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
20 19 18 17 16 15 14 13 12 11 10 9 8 7 6 5 4 3 2 1 0
先上答案,后面讲解
# -*- coding: utf-8 -*-
import threading
import time
con = threading.Condition()
class Producer(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self.num = num
def run(self):
global x
#*********begin*********#
with con:
while x >= self.num:
con.wait()
for i in range(x, self.num+1):
if(i!=self.num):
print(i, end=" ")
else:
print(i)
x += 1
time.sleep(0.1)
con.notify()
#********* end*********#
class Consumer(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self.num = num
def run(self):
global x
#*********begin*********#
with con:
while x <= 0:
con.wait()
for i in range(self.num,-1, -1):
if(i!=-1):
print(i, end=" ")
else:
print(i)
x -= 1
time.sleep(0.1)
con.notify_all()
#********* end*********#
num = input()
x = 0
num = int(num)
p = Producer(num)
c = Consumer(num)
p.start()
c.start()
p.join()
c.join()
con = threading.Condition()
创建一个条件变量对象con,Condition对象用于线程间的同步通信。
class Producer(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self.num = num
定义生产者线程类Producer,继承自threading.Thread。__init__方法接收num参数,表示缓冲区大小上限,并将其赋值给self.num。
def run(self):
global x
#*********begin*********#
with con:
run方法是线程的执行体。声明使用全局变量x,并获取条件变量con的上下文管理器(相当于调用con.acquire())。
while x >= self.num:
con.wait()
如果全局变量x(模拟缓冲区)已经达到上限self.num,则调用con.wait()释放锁并等待通知。
for i in range(x, self.num+1):
if(i!=self.num):
print(i, end=" ")
else:
print(i)
x += 1
time.sleep(0.1)
如果缓冲区未满,进入for循环,从x开始填充到self.num上限。打印当前x值(最后一个x单独一行),x加1,并短暂休眠0.1秒(防止输出混乱)。
con.notify()
当填充完成后,调用con.notify()发送一个通知给其他等待线程。
class Consumer(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self.num = num
定义消费者线程类Consumer,继承自threading.Thread,__init__方法同Producer。
def run(self):
global x
#*********begin*********#
with con:
while x <= 0:
con.wait()
run方法体类似Producer,首先获取con上下文,如果全局x为0(缓冲区为空),则等待通知。
for i in range(self.num,-1, -1):
if(i!=-1):
print(i, end=" ")
else:
print(i)
x -= 1
time.sleep(0.1)
如果缓冲区非空,则进入for循环,从self.num开始消费到0。打印当前x值(最后一个x单独一行),x减1,并短暂休眠。
con.notify_all()
当消费完成后,调用con.notify_all()发送通知给所有其他等待线程。
num = input()
x = 0
num = int(num)
p = Producer(num)
c = Consumer(num)
p.start()
c.start()
p.join()
c.join()
获取用户输入的num值,将全局x初始化为0。创建Producer和Consumer对象p和c,并传入num参数。分别启动p和c线程,并等待它们结束。
总的来说,这段代码使用了条件变量来同步生产者消费者线程,避免了竞态条件。生产者在缓冲区未满时填充,消费者在缓冲区非空时消费,两者通过notify相互通知对方状态的变化。
作者:明日香饽饽