头哥Python魔鬼训练:解密生产者-消费者问题中的线程同步

1
任务描述

生产者-消费者问题是一个经典的多进程同步问题:

  • 有两组进程:一组生产者进程和一组消费者进程共享一个初始为空、固定大小为 n 的缓冲区;
  • 生产者的工作是制造数据,只有缓冲区没满时,生产者才能把数据放入到缓冲区,否则必须等待;
  • 只有缓冲区不空时,消费者才能从中取出数据,一次只取一个数据,否则必须等待;
  • 缓冲区只允许一个生产者放入消息,或者一个消费者从中取出消息。
  • 本关任务:利用 Python 的 Condition 对象,解决多线程的生产者-消费者问题。

    相关知识
    Condition对象

    Condition 被称为条件变量,除了提供与 Lock 类似的 acquire 和 release 方法外,还提供了 wait 和 notify 方法。 线程首先 acquire 一个条件变量,然后判断一些条件。如果条件不满足则 wait ;如果条件满足,进行一些处理改变条件后,通过 notify 方法通知其他线程,其他处于 wait 状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。

    解决思路

    问题的核心是:

    1. 保证不让生产者在缓存还是满的时候,仍然要向内写数据;
    2. 不让消费者试图从空的缓存中取出数据。
  • 对于生产者,如果缓存是满的就等待。消费者从缓存中,取走数据后,就通知生产者,让它再次将缓存填满;
  • 若消费者发现缓存是空的,就去等待。下一轮中,生产者将数据写入后,就通知消费者。
  • 编程要求

    本关的编程任务是,补全右侧编辑器中 Begin 至 End 区间的代码,具体要求如下:

  • 全局变量 x 模拟缓冲区, x 的上限由测试输入给出;
  • 生产者与消费者类的 self.num 变量即为 x 的上限,测试程序会创建一个生产者和一个消费者,并在创建类的同时会传入参数;
  • 完成生产者与消费者类的 run() 方法,使得生产者每次输出 x 的值后,使得 x 加 1;同时消费者每次输出 x 的值后,使得 x 减 1;
  • 注意,在输出语句之后,加入 time.sleep(0.1) 防止输出顺序混乱。
  • 测试说明

    平台将运行用户补全后的代码文件,并生成若干组测试数据,接着根据程序的输出判断程序是否正确。

    以下是测试样例:

    测试输入: 20 预期输出:

    1. 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
    2. 20 19 18 17 16 15 14 13 12 11 10 9 8 7 6 5 4 3 2 1 0
  • 先上答案,后面讲解

  • # -*- coding: utf-8 -*-
    
    import threading
    import time
    con = threading.Condition()
    class Producer(threading.Thread):
        def __init__(self, num):
            threading.Thread.__init__(self)
            self.num = num
        def run(self):
            global x
            #*********begin*********#
            with con:
                while x >= self.num:
                    con.wait()
                for i in range(x, self.num+1):
                    if(i!=self.num):
                        print(i, end=" ")
                    else:
                        print(i)
                    x += 1
                    time.sleep(0.1)
                con.notify()
                
            #********* end*********#
    
    class Consumer(threading.Thread):
        def __init__(self, num):
            threading.Thread.__init__(self)
            self.num = num
        def run(self):
            global x
            #*********begin*********#
            with con:
                while x <= 0:
                    con.wait()
                for i in range(self.num,-1, -1):
                    if(i!=-1):
                        print(i, end=" ")
                    else:
                        print(i)
                    x -= 1
                    time.sleep(0.1)
                con.notify_all()
            #********* end*********#
        
        
    num = input()
    x = 0
    num = int(num)
    p = Producer(num)
    c = Consumer(num)
    p.start()
    c.start()
    p.join()
    c.join()

    con = threading.Condition()

    创建一个条件变量对象con,Condition对象用于线程间的同步通信。

    class Producer(threading.Thread):
        def __init__(self, num):
            threading.Thread.__init__(self)
            self.num = num

    定义生产者线程类Producer,继承自threading.Thread。__init__方法接收num参数,表示缓冲区大小上限,并将其赋值给self.num。

        def run(self):
            global x
            #*********begin*********#
            with con:

    run方法是线程的执行体。声明使用全局变量x,并获取条件变量con的上下文管理器(相当于调用con.acquire())。

                while x >= self.num:
                    con.wait()

    如果全局变量x(模拟缓冲区)已经达到上限self.num,则调用con.wait()释放锁并等待通知。

                for i in range(x, self.num+1):
                    if(i!=self.num):
                        print(i, end=" ")
                    else:
                        print(i)
                    x += 1
                    time.sleep(0.1)

    如果缓冲区未满,进入for循环,从x开始填充到self.num上限。打印当前x值(最后一个x单独一行),x加1,并短暂休眠0.1秒(防止输出混乱)。
                con.notify()
    当填充完成后,调用con.notify()发送一个通知给其他等待线程。

          
    class Consumer(threading.Thread):
        def __init__(self, num):
            threading.Thread.__init__(self)
            self.num = num

    定义消费者线程类Consumer,继承自threading.Thread,__init__方法同Producer。

        def run(self):
            global x
            #*********begin*********#
            with con:
                while x <= 0:
                    con.wait()

    run方法体类似Producer,首先获取con上下文,如果全局x为0(缓冲区为空),则等待通知。

                for i in range(self.num,-1, -1):
                    if(i!=-1):
                        print(i, end=" ")
                    else:
                        print(i)
                    x -= 1
                    time.sleep(0.1)

    如果缓冲区非空,则进入for循环,从self.num开始消费到0。打印当前x值(最后一个x单独一行),x减1,并短暂休眠。

                con.notify_all()

    当消费完成后,调用con.notify_all()发送通知给所有其他等待线程。

    num = input()
    x = 0
    num = int(num)
    p = Producer(num)
    c = Consumer(num)
    p.start()
    c.start()
    p.join()
    c.join()

    获取用户输入的num值,将全局x初始化为0。创建Producer和Consumer对象p和c,并传入num参数。分别启动p和c线程,并等待它们结束。

    总的来说,这段代码使用了条件变量来同步生产者消费者线程,避免了竞态条件。生产者在缓冲区未满时填充,消费者在缓冲区非空时消费,两者通过notify相互通知对方状态的变化。

    作者:明日香饽饽

    物联沃分享整理
    物联沃-IOTWORD物联网 » 头哥Python魔鬼训练:解密生产者-消费者问题中的线程同步

    发表回复