第十三节:Python多进程

一、进程的定义

进程(Process),顾名思义,就是进行中的程序。进程是python中最小的资源分配单元,进程之间的数据,内存是不共享的,每启动一个进程,都要独立分配资源和拷贝访问的数据,所以进程的启动和销毁的代价是比较大。

Python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。

线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位,一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。 在同一个进程内的线程的数据是可以进行互相访问的,这点区别于多进程。

一个进程至少要包含一个线程,每个进程在启动的时候就会自动的启动一个线程,进程里面的第一个线程就是主线程,每次在进程内创建的子线程都是由主线程进程创建和销毁,子线程也可以由主线程创建出来的线程创建和销毁线程。

进程与线程的区别

  1. 线程是执行的指令集,进程是资源的集合;
  2. 一个程序中默认有一个 主进程 ,一个进程中默认有一个主线程
  3. 线程的启动速度要比进程的启动速度要快;
  4. 一个新的线程很容易被创建,一个新的进程创建需要对父进程进行一次克隆;
  5. 线程共享创建它的进程的内存空间,进程的内存是独立的。
  6. 两个线程共享的数据都是同一份数据,两个子进程的数据不是共享的,而且数据是独立的;
  7. 同一个进程的线程之间可以直接交流,同一个主进程的多个子进程之间是不可以进行交流,如果两个进程之间需要通信,就必须要通过一个中间代理来实现;
  8. 一个线程可以控制和操作同一个进程里的其他线程,线程与线程之间没有隶属关系,但是进程只能操作子进程;

二、创建一个进程

在Python的os模块中封装了许多的系统调用,其中封装了Unix/Linux系统中的fork()系统调用,其作用是复制当前的进程来产生新进程,当前进行称为“父进程”,新进程称为“子进程”。但是,在Windows系统中是没有fork()系统调用的。因此,Python提供了跨平台的多进行模块"multiprocessing"。

Porcess对象常用的方法与属性

  • name:进程的名称;
  • daemon:布尔值,是否是守护进程;
  • pid:进程ID;
  • exitcode:进程退出码;
  • run():表示进程所要做的事情,通过向参数taget指定一个函数的方式指定run的行为,或者在子类中重载该方法;
  • start():启动进程;
  • join():阻塞当前进程,直至调用该方法的进程结束;
  • is_alive():判断进程是否还活着;
  • terminate():终止进程;
  • close():关闭Process对象,释放与之关联的资源
  • 1、直接使用Process类创建

    image.png

    import os
    from multiprocessing import Process
    
    def run_a_sub_proc(name):
        print(f'子进程:{name}({os.getpid()})开始...')
    
    if __name__ == '__main__':
        print(f'主进程({os.getpid()})开始...')
        # 通过对Process类进行实例化创建一个子进程
        p = Process(target=run_a_sub_proc, args=('测试进程', ))
        p.start()
        p.join()
    

    注意:这里需要明确以下主进程和子进程。当我们通过python demo.py开始执行demo.py这个程序时,程序被赋予了声明,成为一个进程,这个进程是 主进程 。而在主进程执行过程,通过对Process类进行实例化创建的是 子进程

    2、继承Process类

    from multiprocessing import Process     # 多进程的类
    import time
    import random
     
    # 创建一个进程类Pros
    class Pros(Process):
        def __init__(self, name):
            super().__init__()
            self.name = name
     
        def run(self):
            print('%s 开始运行!' % self.name)
            # 随机睡眠1~5秒
            time.sleep(random.randrange(1, 5))
            print('%s 结束!' % self.name)
     
     
    if __name__ == "__main__":
        pro_list = []
        # 创建3个子进程
        for i in range(3):
            num = "p" + str(i+1)
            num = Pros(f"子进程{num}")
            num.start()     # start方法会自动调用进程类中的run方法
            pro_list.append(num)
     
        # 子进程阻塞,主进程会等待所有子进程结束再结束
        for i in pro_list:
            i.join()
     
        print('主进程结束')
    

    3、join函数

    在多进程中,join()方法会使主进程进入阻塞,直到调用join()方法的子进程执行完毕。即:使主进程进入阻塞,直到调用join()方法的子进程执行完毕。猜猜以下两个例子的运行结果会有什么不同?

    # 例一
    import os, time
    from multiprocessing import Process
    
    def run_a_sub_proc(name):
        print(f'子进程:{name}({os.getpid()})开始...')
        for i in range(3):
            print(f'子进程:{name}({os.getpid()})运行中...')
            time.sleep(1)
    
    if __name__ == '__main__':
        print(f'主进程({os.getpid()})开始...')
        p1 = Process(target=run_a_sub_proc, args=('进程-1', ))
        p2 = Process(target=run_a_sub_proc, args=('进程-2', ))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
    
    # 例二
    import os, time
    from multiprocessing import Process
    
    def run_a_sub_proc(name):
        print(f'子进程:{name}({os.getpid()})开始...')
        for i in range(3):
            print(f'子进程:{name}({os.getpid()})运行中...')
            time.sleep(1)
    
    if __name__ == '__main__':
        print(f'主进程({os.getpid()})开始...')
        p1 = Process(target=run_a_sub_proc, args=('进程-1', ))
        p2 = Process(target=run_a_sub_proc, args=('进程-2', ))
        p1.start()
        p1.join()
        p2.start()
        p2.join()
    

    join()方法就是让主进程进入阻塞状态,等对应的子进程执行完毕再执行下一行,主要用于进程同步

    三、进程池(Pool)

    大家思考一个问题:在一台计算机中进程可以无限制的创建吗?

    进程池(Pool)的作用, 当进程数过多时,用于限制进程数。Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

    语法:Pool([numprocess [,initializer [, initargs]]]) # 创建进程池

         参数介绍:
    
        1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
        2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
        3 initargs:是要传给initializer的参数组
    
    import os, time
    from multiprocessing import Process, Pool
    
    def run_a_sub_proc(name):
        print(f'子进程:{name}({os.getpid()})开始!')
        for i in range(2):
            print(f'子进程:{name}({os.getpid()})运行中...')
            time.sleep(1)
        print(f'子进程:{name}({os.getpid()})结束!')
    
    if __name__ == '__main__':
        print(f'主进程({os.getpid()})开始...')
        p = Pool(3)
        for i in range(1, 5):
            p.apply_async(run_a_sub_proc, args=(f"进程-{i}",))
        p.close()
        p.join()
    

    四、进程间的通信

    大家思考一下:在多进程中可以使用global+全局变量来共享数据吗?

    现在设想你需要两个进程,一个进程(接收进程)产生数据(比如从网站上爬虫,或者从websocket接收数据等),另一个进程(转发进程)对产生的数据进行处理并转发(比如计算并处理之后上传数据库,或者发送给websocket等)。这是一个非常常见的应用场景,如何把接收进程接受的数据传递给转发进程呢?直接硬写global+变量 是不行的。

    preview

    1、进程队列(Queue)通信

    Queue([maxsize]):建立一个共享的队列(其实并不是共享的,实际是克隆的,内部维护着数据的共享),多个进程可以向队列里存/取数据。其中,参数是队列最大项数,省略则无限制。

    from multiprocessing import Process, Queue
    import time, os
    
    def prodcut(q):
        print("开始生产.")
        for i in range(5):
            time.sleep(1)
            q.put('产品'+str(i))
            print("产品"+str(i)+"生产完成")
    
    def consume(q):
        while True:
            prod = q.get()
            print("消费者:{},消费产品:{}".format(os.getpid(), prod))
            time.sleep(1)
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=prodcut, args=(q, ))  # 生产者
        c1 = Process(target=consume, args=(q, ))  # 消费者1
        c2 = Process(target=consume, args=(q, ))  # 消费者2
        p.start()
        c1.start()
        c2.start()
        p.join()  # 当生产者结束后,将两个消费则也结束
        c1.terminate()
        c2.terminate()
    
    

    2、管道(Pipe)

    如果你创建了很多个子进程,那么其中任何一个子进程都可以对Queue进行存(put)和取(get)。但Pipe不一样,Pipe只提供两个端点,只允许两个子进程进行存(send)和取(recv)。也就是说,Pipe实现了两个子进程之间的通信。

    from multiprocessing import Pipe, Pool
    import os,time
    
    def product(send_pipe):
        print("开始生产.")
        for i in range(5):
            time.sleep(1)
            send_pipe.send("产品"+str(i))
            print("产品" + str(i) + "生产完成")
    
    def consume(recv_pipe):
        while True:
            print("消费者:{},消费产品:{}".format(os.getpid(), recv_pipe.recv()))
            time.sleep(1)
    
    if __name__ == '__main__':
        # 使用进程池来创建进程
        send_pipe, recv_pipe = Pipe()
        pool = Pool(2)
        pool.apply_async(product, args=(send_pipe,))
        pool.apply_async(consume, args=(recv_pipe,))
        pool.close()
        pool.join()
    
    

    计算密集型 vs. IO密集型

    是否采用多任务的第二个考虑是任务的类型。我们可以把任务分为计算密(CPU)集型和IO密集型

    计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、浮点运算、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。

    第二种任务的类型是IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。

    作者:道友老李

    物联沃分享整理
    物联沃-IOTWORD物联网 » 【Python】多进程

    发表回复