python-20-理解多进程多线程,进程池,线程池,进程间通信最详细整理!

python-20-多进程多线程详解

一.说明

在python中的基础系列我们终于来到了多进程和多线程的知识点了,在学习这个知识点前,我想问问都是用来进行并发处理的,为啥要区分多进程 多线程?或者说什么是多进程 什么是多线程?

多进程

1.多进程是通过创建多个独立的进程来并行执行任务

​ 举个例子:你电脑是8核心的,那么可以创建超过8个进程,但是超过8个进程会导致上下文切换开销增加,从而导致降低性能!

说句人话,当进程数小于cpu核心数,每个进程都能分配到自己的核心进行运行,不必进行进程切换,从而没有切换上下文的开销!!

所以,多进程并不是越多越好!!

多线程

  1. 线程 是程序中执行流的最小单元,一个进程可以包含多个线程;

  2. 多线程是通过在同一进程中创建多个线程来并发执行任务,如果任务是 I/O 密集型的(例如网络请求、文件操作),由于 GIL 释放给了 I/O 操作,其他线程可以在 GIL 被释放时执行,因此多线程仍然能够在 I/O 密集型任务中提高并发性能。

  3. 多个线程共享同一进程的内存空间和资源,线程之间的创建和切换比进程更加轻量级;

  4. 对于 CPU 密集型任务,Python 的 全局解释器锁(GIL) 会限制线程的并行执行,这就使得多线程在 CPU 密集型任务上无法提供预期的性能提升;

  5. Python 的 全局解释器锁(GIL):

在 Python 中,全局解释器锁(GIL) 会影响多线程的并发执行。GIL 只允许一个线程在任何时刻执行 Python 字节码,这意味着对于 CPU 密集型任务,Python 的多线程不能利用多核 CPU 的计算能力,只能在单个核心上执行。

  1. 线程池的大小不可随便确定

    CPU 核心数:对于 CPU 密集型任务,线程池的大小通常不会超过 CPU 核心数,因为超出核心数的线程会增加上下文切换的开销,反而降低性能。

    I/O 密集型任务:对于 I/O 密集型任务,线程池的大小可以更大,因为线程在等待 I/O 操作完成时不会占用 CPU 资源,操作系统会将 CPU 分配给其他线程。

    内存和系统资源:每个线程都需要一定的内存和系统资源,过多的线程会导致内存消耗过大,甚至会导致系统资源耗尽。

  2. 线程可以跨多个 CPU 核心运行,操作系统会管理线程在核心之间的调度。

二.多进程

1. os.fork() 函数 (不推荐,不能跨平台,只能Linux、Unix )

它在当前进程的上下文中创建一个子进程。子进程是当前进程(称为父进程)的副本,但拥有自己的进程ID(PID)

fork() 的基本原理
  • 返回值
  • 在父进程中,fork() 返回新创建的子进程的进程 ID。
  • 在子进程中,fork() 返回 0。
  • 如果创建子进程失败,fork() 返回 -1,并抛出OSError。
  • 复制进程:子进程会复制父进程的内存空间,但实际上是采用写时复制(copy-on-write)机制,只有在需要写入时才会真正复制内存,从而节省资源。
  • 示例
    import os
    import time
    
    def main():
        pid = os.fork()  # 创建子进程
    
        if pid < 0:
            # fork() 失败
            print("Fork failed")
        elif pid == 0:
            # 子进程执行的代码
            print(f"Child process: My PID is {os.getpid()}")
            time.sleep(2)  # 模拟子进程工作
            print("Child process: Work done!")
        else:
            # 父进程执行的代码
            print(f"Parent process: My PID is {os.getpid()} and my child's PID is {pid}")
            os.wait()  # 等待子进程结束
            print("Parent process: Child has terminated.")
    
    if __name__ == "__main__":
        main()
    
    2.multiprocessing.Process()函数
    1. multiprocessing模块提供Process类实现新建进程
    import multiprocessing
    import time
    
    def worker(num):
        print(f'Worker {num} starting')
        time.sleep(2)
        print(f'Worker {num} finished')
    
    if __name__ == '__main__':
        processes = []
        for i in range(5):
            p = multiprocessing.Process(target=worker, args=(i,))
            processes.append(p)
            p.start()
    
        for p in processes:
            p.join()
            
    '''
    Worker 0 starting
    Worker 1 starting
    Worker 2 starting
    Worker 3 starting
    Worker 4 starting
    Worker 0 finished
    Worker 1 finished
    Worker 2 finished
    Worker 3 finished
    Worker 4 finished
    '''
    
    1. pool = multiprocessing.Pool(processes=5) # 创建进程池实现多进程

      pool.apply()pool.apply_async()

    2. apply():类似于顺序调用函数,它会阻塞,直到任务执行完成。
    3. apply_async():异步执行任务,不会阻塞主程序。
    4. pool.map()pool.map_async()

    5. map():将函数 func 应用到 iterable 中的每一个元素,返回结果列表,并且是同步执行的。
    6. map_async()map() 的异步版本,返回一个 AsyncResult 对象,允许你在执行期间继续做其他工作。
    7. pool.close()pool.join()

    8. close():# 关闭进程池,表示不能在往进程池中添加进程
    9. join():# 等待进程池中的所有进程执行完毕,必须在close()之后调用
    10. import multiprocessing
      import time
      
      def worker(num):
          print(f'Worker {num} starting')
          time.sleep(2)
          print(f'Worker {num} finished')
      
      if __name__ == '__main__':
          # 创建一个进程池,最多同时运行 5 个进程
          with multiprocessing.Pool(processes=5) as pool:
              # 使用 map 方法将任务分配给进程池,map 会阻塞,直到所有任务完成
              pool.map(worker, range(5))
              
              
              
      ###########利用.get()方法来阻塞和等待结果
      import multiprocessing
      import time
      
      def worker(num):
          print(f'Worker {num} starting')
          time.sleep(2)
          print(f'Worker {num} finished')
          return num * 2
      
      if __name__ == '__main__':
          # 使用进程池
          with multiprocessing.Pool(processes=5) as pool:
              # 使用 apply_async 异步执行任务
              async_result = pool.apply_async(worker, (1,))
              print(f'Result of apply_async: {async_result.get()}')  # 阻塞等待结果
      
              # 使用 map_async 异步执行任务
              async_results = pool.map_async(worker, range(5))
              print(f'Result of map_async: {async_results.get()}')  # 阻塞等待所有结果
              
              
      ####################利用 pool.close()和pool.join()
      import multiprocessing
      import time
      
      def worker(num):
          print(f'Worker {num} starting')
          time.sleep(2)
          print(f'Worker {num} finished')
          return num
      
      if __name__ == '__main__':
          # 创建进程池,最多同时运行 5 个进程
          resList = []
          with multiprocessing.Pool(processes=5) as pool:
              # 使用 apply_async 异步提交任务
              results = []
              for i in range(5):
                  result = pool.apply_async(worker, (i,))
                  results.append(result)  # 存储每个任务的 AsyncResult 对象
              
              # 关闭进程池,不再接受新任务
              pool.close()
      
              # 等待所有任务完成并获取结果
              for result in results:
                  resList.append(result.get())  # 阻塞,等待每个任务的完成
      
              # 等待所有子进程结束
              pool.join()
          print(resList)
          
          '''
          Worker 0 starting
          Worker 1 starting
          Worker 2 starting
          Worker 3 starting
          Worker 4 starting
          Worker 2 finished
          Worker 0 finished
          Worker 1 finished
          Worker 3 finished
          Worker 4 finished
          [0, 1, 2, 3, 4]
          '''
      
    11. 多进程获取进程返回结果

      multiprocessing.Process 本身并不能直接返回值,我们需要Queue (队列)来实现收集结果

      import multiprocessing
      import time
      
      def worker(num, queue):
          print(f'Worker {num} starting')
          time.sleep(2)
          print(f'Worker {num} finished')
          queue.put(num * 2)  # 将计算结果放入队列中
      
      if __name__ == '__main__':
          processes = []
          queue = multiprocessing.Queue()  # 创建一个队列用于进程间通信
          
          # 创建并启动 5 个进程
          for i in range(5):
              p = multiprocessing.Process(target=worker, args=(i, queue))
              processes.append(p)
              p.start()
      
          # 等待所有进程完成
          for p in processes:
              p.join()
      
          # 获取并打印每个进程的返回值
          while not queue.empty():  # 确保队列中所有结果都已获取
              result = queue.get()
              print(f'Result from worker: {result}')
      
    12. 进程池获取返回结果

      Pool.map():是并行计算并按顺序返回结果,适合任务较简单且结果需要按顺序返回的场景;

      Pool.apply_async():是异步调用任务,适合任务较复杂或需要更灵活控制的场景。需要使用 get() 来获取任务结果。

      Pool.map_async():是 map() 的异步版本,返回一个 AsyncResult 对象。

      #利用map
      import multiprocessing
      import time
      
      def worker(num):
          print(f'Worker {num} starting')
          time.sleep(2)
          print(f'Worker {num} finished')
          return num * 2  # 返回任务的结果
      
      if __name__ == '__main__':
          # 创建一个进程池,最多同时运行 5 个进程
          with multiprocessing.Pool(processes=5) as pool:
              # 使用 map 并行地执行任务
              results = pool.map(worker, range(5))  # 返回每个进程的返回值列表
          
          # 打印所有结果
          print("Results:", results)
          
          
          
      ######利用get方法
      import multiprocessing
      import time
      
      def worker(num):
          print(f'Worker {num} starting')
          time.sleep(2)
          print(f'Worker {num} finished')
          return num * 2
      
      if __name__ == '__main__':
          # 创建进程池
          with multiprocessing.Pool(processes=5) as pool:
              # 使用 apply_async 异步提交任务
              results = [pool.apply_async(worker, (i,)) for i in range(5)]
      
              # 获取每个任务的返回值
              output = [result.get() for result in results]  # 使用 get() 等待任务完成并获取结果
      
          print("Results:", output)
          '''
          Worker 0 starting
          Worker 1 starting
          Worker 2 starting
          Worker 3 starting
          Worker 4 starting
          Worker 0 finished
          Worker 2 finished
          Worker 1 finished
          Worker 3 finished
          Worker 4 finished
          Results: [0, 2, 4, 6, 8]
          '''
          
      ################错误map_async用法
      import multiprocessing
      import time
      
      def worker(num):
          print(f'Worker {num} starting')
          time.sleep(2)
          print(f'Worker {num} finished')
          return num * 2
      
      if __name__ == '__main__':
          # 创建进程池
          with multiprocessing.Pool(processes=5) as pool:
              # 使用 map_async 异步提交任务
              results = [pool.map_async(worker, (i,)) for i in range(5)]
      
              # 获取每个任务的返回值
              output = [result.get() for result in results]  # 使用 get() 等待任务完成并获取结果
      
          print("Results:", output)
          
          '''
          Worker 0 starting
          Worker 2 starting
          Worker 1 starting
          Worker 3 starting
          Worker 4 starting
          Worker 2 finished
          Worker 0 finished
          Worker 1 finished
          Worker 3 finished
          Worker 4 finished
          Results: [[0], [2], [4], [6], [8]]
          '''
      
      #########正确map_async用法
      import multiprocessing
      import time
      
      def worker(num):
          print(f'Worker {num} starting')
          time.sleep(2)
          print(f'Worker {num} finished')
          return num * 2
      
      if __name__ == '__main__':
          # 创建进程池
          with multiprocessing.Pool(processes=5) as pool:
              # 使用 map_async 异步提交任务
              result = pool.map_async(worker, range(5)) 
              # 获取每个任务的返回值
              output = result.get()   # 使用 get() 等待任务完成并获取结果
      
          print("Results:", output)
          '''
              Worker 0 starting
              Worker 1 starting
              Worker 2 starting
              Worker 3 starting
              Worker 4 starting
              Worker 0 finished
              Worker 2 finished
              Worker 1 finished
              Worker 3 finished
              Worker 4 finished
              Results: [0, 2, 4, 6, 8]
          '''
      

      注意:

      上面的例子是同样的写法,但结果却不一样:

      apply_async获取的结果是[0, 2, 4, 6, 8]

      map_async获取的结果是[[0], [2], [4], [6], [8]]

      为什么2个代码基本结构一样,但是执行的结构却不一样?

      因为apply_async是异步产生5个独立的任务,每个任务都在独立的进程中运行,并且 apply_async() 返回的是 AsyncResult 对象;

      map_async() 提供了一个 包含单个元素的元组 (i,),这意味着它会按顺序执行 5 次 map_async(),每次提交 单个任务,这与 apply_async() 的行为不同,实际上是将任务映射到整个可迭代对象的所有元素,它会在内部处理多个任务,而返回的是一个包含所有结果的 AsyncResult 对象,因此输出和任务提交的方式不一样;

      通过这个例子能更好的理解apply_asyncmap_async()的区别

    13. 进程间通讯方式

      进程间通信(IPC, Inter-Process Communication) 是指多个进程之间交换数据和信息的机制。python中进程间通信方式,主要包括以下几种;

      Queue(队列):允许多个进程之间以生产者-消费者模式进行数据交换。

      Pipe(管道):通过管道的两端进行数据交换。

      Manager(共享内存):提供进程间共享对象(如共享字典、列表等)。

      Value 和 Array(共享内存):允许多个进程共享单一的变量或数组。

      Lock(锁):确保多个进程对共享资源的同步访问

      1. Queue(队列)

        import multiprocessing
        import time
        
        def worker(num, queue):
            print(f'Worker {num} starting')
            time.sleep(2)
            print(f'Worker {num} finished')
            queue.put(num * 2)  # 将计算结果放入队列中
        
        if __name__ == '__main__':
            processes = []
            queue = multiprocessing.Queue()  # 创建一个队列用于进程间通信
            
            # 创建并启动 5 个进程
            for i in range(5):
                p = multiprocessing.Process(target=worker, args=(i, queue))
                processes.append(p)
                p.start()
        
            # 等待所有进程完成
            for p in processes:
                p.join()
        
            # 获取并打印每个进程的返回值
            while not queue.empty():  # 确保队列中所有结果都已获取
                result = queue.get()
                print(f'Result from worker: {result}')
        
      2. Pipe(管道)

        Pipemultiprocessing 模块提供的另一种进程间通信方式,它通过管道的两端进行通信。管道有两个端口,一个用于发送数据(send),一个用于接收数据(recv)。

        import multiprocessing
        import time
        
        def sender(conn):
            for i in range(5):
                print(f"Sender sending {i}")
                conn.send(i)  # 发送数据
                time.sleep(1)
        
        def receiver(conn):
            while True:
                data = conn.recv()  # 接收数据
                if data == "DONE":
                    break
                print(f"Receiver received {data}")
        
        if __name__ == "__main__":
            # 创建管道,返回两个连接对象
            parent_conn, child_conn = multiprocessing.Pipe()
        
            # 创建进程
            p1 = multiprocessing.Process(target=sender, args=(parent_conn,))
            p2 = multiprocessing.Process(target=receiver, args=(child_conn,))
        
            p1.start()
            p2.start()
        
            p1.join()
            parent_conn.send("DONE")  # 结束信号
            p2.join()
        '''
        Sender sending 0
        Receiver received 0
        Sender sending 1
        Receiver received 1
        Sender sending 2
        Receiver received 2
        Sender sending 3
        Receiver received 3
        Sender sending 4
        Receiver received 4
        '''
        
      3. Manager(共享内存)

        Managermultiprocessing 模块中提供的一个特殊对象,它允许不同进程共享数据。Manager 可以创建共享对象,比如共享的列表、字典等,这些对象会在进程间同步。

        import multiprocessing
        import time
        
        def worker(shared_dict):
            for i in range(5):
                shared_dict[i] = f"Task {i}"
                print(f"Worker added: {shared_dict[i]}")
                time.sleep(1)
        
        if __name__ == "__main__":
            with multiprocessing.Manager() as manager:
                shared_dict = manager.dict()  # 创建一个共享字典
        
                p = multiprocessing.Process(target=worker, args=(shared_dict,))
                p.start()
                p.join()
        
                print(f"Shared dict: {shared_dict}")
        '''
        Worker added: Task 0
        Worker added: Task 1
        Worker added: Task 2
        Worker added: Task 3
        Worker added: Task 4
        Shared dict: {0: 'Task 0', 1: 'Task 1', 2: 'Task 2', 3: 'Task 3', 4: 'Task 4'}
        '''
        
        
      4. Value 和 Array(共享内存)

        ValueArraymultiprocessing 模块中的两种共享内存对象,可以用于在多个进程之间共享数据。Value 用于存储单一数据类型的值,而 Array 用于存储数组(类似于列表)。

        import multiprocessing
        import time
        
        def worker(shared_value):
            for i in range(5):
                shared_value.value += 1  # 修改共享变量
                print(f"Worker incremented: {shared_value.value}")
                time.sleep(1)
        
        if __name__ == "__main__":
            # 创建一个共享变量
            shared_value = multiprocessing.Value('i', 0)  # 'i' 表示整型
        
            p = multiprocessing.Process(target=worker, args=(shared_value,))
            p.start()
            p.join()
        
            print(f"Final shared value: {shared_value.value}")
            '''
            Worker incremented: 1
            Worker incremented: 2
            Worker incremented: 3
            Worker incremented: 4
            Worker incremented: 5
            Final shared value: 5
            '''
        
      5. Lock(锁)

        在多个进程共享数据时,往往需要确保数据的一致性和线程安全。Lock 是 Python 提供的一种同步原语,它用于确保同一时刻只有一个进程能够访问某个共享资源。

        import multiprocessing
        import time
        
        def worker(lock, shared_value):
            for _ in range(5):
                with lock:# 使用锁来保证线程安全
                    shared_value.value += 1
                    print(f"Worker incremented: {shared_value.value}")
                time.sleep(1)
        
        if __name__ == "__main__":
            shared_value = multiprocessing.Value('i', 0)  # 创建一个共享变量
            lock = multiprocessing.Lock()  # 创建一个锁
        
            processes = [multiprocessing.Process(target=worker, args=(lock, shared_value)) for _ in range(3)]
            
            for p in processes:
                p.start()
            for p in processes:
                p.join()
        
            print(f"Final shared value: {shared_value.value}")
        

    三.多线程

    多线程是通过在同一进程中创建多个线程来并发执行任务。线程共享进程的内存空间,因此它们之间可以更容易地共享数据,但也容易引发线程安全问题。由于 GIL 的存在,Python 的多线程在 CPU 密集型任务上通常不能提高性能,但在 I/O 密集型任务上表现良好。

    1. 基本用法

    Python 中可以使用 threading 模块来实现多线程。以下是一个简单的示例:

    import threading
    import time
    
    def worker(num):
        print(f'Worker {num} starting')
        time.sleep(2)
        print(f'Worker {num} finished')
    
    threads = []
    for i in range(5):
        t = threading.Thread(target=worker, args=(i,))
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()
    
    
    1. 线程池

      线程池是一种设计模式,用于管理一定数量的线程来处理多个任务。线程池的目的是避免频繁地创建和销毁线程带来的开销,同时通过复用线程来提高程序的效率。

      在 Python 中,concurrent.futures.ThreadPoolExecutor 提供了线程池的功能,它会管理一定数量的线程,并按需调度这些线程来执行任务。

      import concurrent.futures
      import time
      
      def worker(num):
          print(f"Worker {num} starting")
          time.sleep(2)
          print(f"Worker {num} finished")
          return num * 2  # 返回结果
      
      # 使用线程池执行任务
      with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
          results = [executor.submit(worker, i) for i in range(5)]
      
          # 获取每个任务的结果
          output = [result.result() for result in results]
          print("Results:", output)
          
          '''
          Worker 0 starting
          Worker 1 starting
          Worker 2 starting
          Worker 2 finished
          Worker 0 finished
          Worker 3 starting
          Worker 4 starting
          Worker 1 finished
          Worker 4 finished
          Worker 3 finished
          Results: [0, 2, 4, 6, 8]
          '''
      
    2. 线程锁

      import threading
      import time
      
      # 共享资源(全局变量)
      shared_value = 0
      
      # 创建一个锁对象
      lock = threading.Lock()
      
      # 线程工作函数
      def increment():
          global shared_value
          # 获取锁,防止其他线程访问共享资源
          lock.acquire()  # 获取锁
          try:
              print(f"Thread {threading.current_thread().name} is incrementing shared_value")
              current_value = shared_value
              time.sleep(0.5)  # 模拟一些工作
              shared_value = current_value + 1
              print(f"Thread {threading.current_thread().name} updated shared_value to {shared_value}")
          finally:
              # 无论是否发生异常,都要释放锁
              lock.release()  # 释放锁
      
      if __name__ == "__main__":
          # 创建多个线程
          threads = [threading.Thread(target=increment, name=f"Thread-{i}") for i in range(5)]
      
          # 启动线程
          for t in threads:
              t.start()
      
          # 等待所有线程完成
          for t in threads:
              t.join()
      
          print(f"Final shared_value: {shared_value}")
      

    五.总结

    Python 多线程、多进程初学编程的人可能觉得很复杂 其实静下来,理解他 就知道原来就这么回事,没啥!重点是要理解什么是进程什么是线程!!理解了根据真实项目 照着写就行!切记学编程不是死记硬背!如果你采用死记硬背的方式来学编程,那么你完蛋了!!!你永远背不完!!chatgpt也有错误。。何况人。。人与chatgpt的区别是理解,是思想。。

    创作整理不易,请大家多多关注 多多点赞,有写的不对的地方欢迎大家补充,我来整理,再次感谢!

    作者:SEEONTIME

    物联沃分享整理
    物联沃-IOTWORD物联网 » python-20-理解多进程多线程,进程池,线程池,进程间通信最详细整理!

    发表回复