Python 线程间通信技巧分享
前言
前面学习了线程的一些基本知识
如与进程的区别,线程的组成,线程如何创建等,接下来继续线程的学习
多线程中的进程资源共享
在多线程编程中,线程共享其父进程的资源是一个基本特性。这意味着在同一个进程中创建的所有线程都可以访问和修改相同的内存数据结构和变量。这种设计有助于线程之间高效地通信和数据交换,但同时也引入了潜在的并发控制问题,如数据竞争和死锁。
- 内存空间:所有线程共享相同的地址空间。因此,由一个线程创建或修改的变量可以被其他所有线程访问或修改。
- 文件描述符:如果一个线程打开了文件或其他资源(如网络连接),这些文件描述符也可以被同一进程中的其他线程使用。
- 全局变量:全局变量是在所有线程之间共享的,任何一个线程对全局变量的修改都会影响到其他所有使用该变量的线程序。
import threading
# 定义一个共享的列表
shared_list = []
# 线程的目标函数,用于修改共享资源
def append_to_list(index):
shared_list.append(f"元素{index}")
# 创建线程列表
threads = []
# 启动 5 个线程,每个线程向列表添加一个元素
for i in range(5):
thread = threading.Thread(target=append_to_list, args=(i,))
thread.start()
threads.append(thread)
# 等待所有线程完成
for thread in threads:
thread.join()
# 输出修改后的共享列表
print("共享列表内容:", shared_list)
输出:
共享列表内容: ['元素0', '元素1', '元素2', '元素3', '元素4']
线程间的通信
线程间的通信是多线程编程中一个关键方面,它允许线程之间交换信息、协调操作、共享数据,并确保数据的一致性和完整性。在Python中,线程间通信常见的几种方法包括使用锁(Locks)、条件变量(Condition)、事件(Event)、队列(Queue)等。这些方法都是通过Python的threading
模块提供的。
注:
1、Python提供了几个用于多线程编程的模块,包括thread、threading和Queue等。thread和threading模块允许开发人员创建和管理线程。thread模块提供了最基本的线程和锁的支持,threading提供了更高级别、功能更强的线程管理的功能。Queue模块允许用户创建一个可以用于多个线程之间共享数据的队列数据结构。**开发中,我们基本不使用thread模块,而是使用threading模块。**原因是threading是thread模块的高度封装,同时thread与threading同时使用可能会产生冲突,并且thread功能并没有threading那么丰富。
2、threading的功能与我们之前学些的multiprocessing非常类似,因为multiprocessing就是仿照了threading编写的,所以threading和multiprocessing提供的常用属性与常用方法也是类似的。
1. 锁(Locks)
锁是最基本的线程同步机制,用来防止多个线程同时访问共享资源。锁提供了基本的阻塞和释放操作,确保在任意时刻只有一个线程可以访问特定的代码段。
- 初始化 Lock 对象: 使用
threading.Lock()
创建一个新的锁对象。 - 获取锁(Acquire): 使用
lock.acquire()
方法获取(锁定)锁。如果锁已经被其他线程持有,acquire()
方法会阻塞调用线程,直到锁被释放。 - 释放锁(Release): 使用
lock.release()
方法释放(解锁)锁。锁被释放后,其他正在等待获取该锁的线程中,将有一个线程能够获取到锁。 - 尝试获取锁(Try Acquire): 使用
lock.acquire(timeout=None)
方法尝试获取锁。如果timeout
为None
,默认情况下会一直阻塞直到获取到锁。如果提供了timeout
参数,那么在指定的时间内如果获取不到锁,将会引发threading.TimeoutError
异常。 - 使用 with 语句: 可以使用
with
语句来自动管理锁的获取和释放,这是一种更安全和方便的做法,可以避免忘记释放锁。
import threading
# 全局变量
shared_data = {'counter': 0}
def increment(lock):
for _ in range(100000):
with lock:
shared_data['counter'] += 1
def decrement(lock):
for _ in range(100000):
with lock:
shared_data['counter'] -= 1
lock = threading.Lock()
# 创建两个对立操作(增加与减少)执行于不同 threads
thread1 = threading.Thread(target=increment, args=(lock,))
thread2 = threading.Thread(target=decrement, args=(lock,))
# 启动 threads
thread1.start()
thread2.start()
# 等待两个 threads 完成执行
thread1.join()
thread2.join()
# 输出最终结果
print("Final counter value:", shared_data['counter'])
2. 条件变量(Condition)
条件变量用于线程间的复杂同步,特别是在某些条件成立之前需要线程等待的场景。条件变量允许一个或多个线程在某个条件变化时得到通知,并重新获得锁继续执行。
- 初始化 Condition 对象: 使用
Condition(lock=None)
创建一个新的条件变量对象。lock
参数是一个锁对象,如果未指定,将自动创建一个新的锁。 - 等待条件(Wait): 使用
condition.wait(timeout=None)
方法使线程挂起,直到条件变量被通知(notify)或超时发生。当线程调用wait()
方法时,它必须持有相关的锁,wait()
方法会释放锁,并使线程进入等待状态。如果timeout
参数为None
,则线程将无限期地等待。 - 通知条件(Notify): 使用
condition.notify(n=1)
方法唤醒等待该条件变量的n
个线程。如果未指定n
,则只唤醒一个等待的线程。 - 通知所有等待的线程(Notify All): 使用
condition.notify_all()
方法唤醒所有等待该条件变量的线程。 - 使用 with 语句: 可以使用
with
语句来自动获取和释放锁,这有助于避免忘记释放锁。
import threading
import time
# 创建一个条件变量对象
condition_out = threading.Condition()
# 创建一个容器
items_out = []
# 定义一个生产者线程函数
def producer(condition, items, max_size):
for i in range(20):
with condition: # 进入临界区
while len(items) >= max_size:
print(f"达到条件:最大容量{max_size},当前元素{items},暂停生产")
condition.wait()
print(f"生产元素:{i}")
items.append(i)
condition.notify() # 通知一个等待的消费者线程
time.sleep(0.2) # 设置比生产者慢
# 定义一个消费者线程函数
def consumer(condition, items):
for i in range(20):
with condition: # 进入临界区
while not items:
condition.wait() # 等待生产者生产物品
i = items.pop(0)
print(f" - 消费元素:{i}")
condition.notify()
time.sleep(0.5) # 设置比生产者慢
# 创建生产者和消费者线程
producer_thread = threading.Thread(target=producer, args=(condition_out, items_out, 3))
consumer_thread = threading.Thread(target=consumer, args=(condition_out, items_out))
# 启动线程
producer_thread.start()
consumer_thread.start()
# 等待线程完成
producer_thread.join()
consumer_thread.join()
3. 信号量(Semaphore)
信号量是一种更高级的同步机制,用于控制对共享资源的访问数量。它允许多个线程同时访问相同的资源,但是会限制同时访问该资源的最大线程数。
- 初始化 Semaphore 对象: 使用
Semaphore(value)
创建一个新的信号量对象,其中value
是信号量的初始值,表示可用资源的数量。 - 获取信号量(Acquire): 使用
semaphore.acquire()
方法获取信号量。如果信号量的计数器大于0,计数器减1,线程可以继续执行。否则,线程将阻塞。 - 释放信号量(Release): 使用
semaphore.release()
方法释放信号量。这会使信号量的计数器加1。 - 使用 with 语句: 可以使用
with
语句来自动获取和释放信号量,这有助于避免忘记释放信号量。
import threading
import time
from threading import Semaphore
# 定义同时共享资源的最大线程数量
resource_count = 2
# 创建一个信号量,其初始值为资源的数量
semaphore = Semaphore(resource_count)
# 定义一个线程函数,模拟对共享资源的访问
def worker(id, semaphore):
for _ in range(5):
semaphore.acquire() # 获取信号量
print(f"线程{id}获取资源")
time.sleep(1) # 模拟使用资源
semaphore.release() # 释放信号量
print(f"线程{id}释放资源")
# 创建并启动线程
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i, semaphore))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
4. 事件(Event)
事件是一个简单的同步机制,用于线程间的信号传递。一个线程信号事件发生,其他线程可以等待这个事件的发生,从而实现同步或状态传递。
- 初始化 Event 对象: 使用
threading.Event()
创建一个新的事件对象。 - 等待事件(等待事件被触发): 调用
event.wait(timeout=None)
方法使线程挂起,直到事件被触发或超时发生。如果timeout
参数为None
,则线程将无限期地等待。 - 触发事件: 使用
event.set()
方法可以触发事件,这将改变事件的状态为“已触发”,并唤醒所有等待该事件的线程。 - 重置事件: 一旦事件被触发,可以使用
event.clear()
方法将事件重置为未触发状态。 - 使用 is_set() 方法检查事件状态: 通过调用
event.is_set()
可以检查事件是否已经被触发。
import threading
import queue
import time
# 创建一个队列
q = queue.Queue()
# 创建一个事件
event = threading.Event()
def producer():
for i in range(5):
time.sleep(1)
q.put(i)
print(f"队列放入元素: {i}")
event.set() # 生产完毕,设置事件
print("生产者线程结束")
def consumer():
while not event.is_set() or not q.empty():
item = q.get()
print(f"消费队列获取元素: {item}")
print("消费者线程结束")
# 创建线程
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
# 启动线程
t1.start()
t2.start()
# 等待线程完成
t1.join()
t2.join()
5. 队列(Queue)
队列是线程安全的数据结构,用于存储由一个线程生产的数据供其他线程消费。它是实现生产者-消费者问题的理想选择,并且可以有效地用于线程间的数据流传递。
- 初始化 Queue 对象: 使用
queue.Queue()
创建一个新的队列对象。 - 添加到队列(Put): 使用
queue.put(item)
方法将一个项目添加到队列的末尾。如果队列已满(如果设置了最大大小),此方法会阻塞,直到队列中有一个空位。 - 从队列中获取(Get): 使用
queue.get()
方法从队列的前面获取一个项目。如果队列为空,此方法会阻塞,直到队列中有项目可用。 - 获取并删除队列中的一个项目(Get Nowait): 使用
queue.get_nowait()
方法尝试立即从队列的前面获取一个项目。如果队列为空,此方法将引发queue.Empty
异常。 - 将项目放入队列而不阻塞(Put Nowait): 使用
queue.put_nowait(item)
方法尝试立即将一个项目添加到队列的末尾。如果队列已满,此方法将引发queue.Full
异常。 - 检查队列是否为空: 使用
queue.empty()
方法检查队列是否为空。 - 检查队列是否为满: 使用
queue.full()
方法检查队列是否已满。 - 获取队列的大小: 使用
queue.qsize()
方法获取队列中的项目数量。 - 设置队列的最大大小: 在创建队列时,可以通过
maxsize
参数设置队列的最大大小。
代码示例见4.事件
思考:条件变量可以用事件来代替吗
条件变量和事件(Event)虽然都是线程同步机制,但它们在用途和工作方式上有一些关键的区别。根据具体的使用场景,一个可能不能完全替代另一个。下面我们来比较这两种机制:
条件变量
条件变量主要用于更复杂的同步问题,特别是当线程需要在特定条件成立之前等待时。条件变量通常与互斥锁(mutex)结合使用,允许线程在等待时释放锁,并在条件满足时重新获得锁。这对于处理生产者-消费者问题非常有效,其中消费者线程需要等待生产者线程生产足够的数据才能继续。
事件(Event)
事件是一个简单的同步机制,主要用于通知一个或多个线程某个条件已经成立。事件可以被设置(set)和重置(clear),当事件被设置时,所有等待该事件的线程会被唤醒。事件不与特定的锁绑定,且不自动处理任何锁的获取或释放。
是否可以互换使用?
替代可能性:在某些简单的用例中,如只需简单通知线程开始或停止某项操作,事件可以作为条件变量的替代。例如,如果你只需要一个信号来告知其他线程数据已准备好或者某个任务已完成,事件可能是更简单直接的选择。
限制:对于需要复杂状态判断和多阶段等待/通知的情况,条件变量是更合适的选择。事件不适用于需要多个线程在多个条件下相互等待并在每次状态改变时进行细粒度控制的场景。
相较之下,如果消费者需要在每次生产者生产了一定数量的产品后才消费,使用条件变量将更加适合,因为它可以更精确地控制每次唤醒的条件。
总结,事件可以在某些情况下代替条件变量,特别是在通信要求比较简单的场景中。然而,对于需要复杂条件同步的场景,条件变量提供了更高的灵活性和控制。
作者:程序猿-瑞瑞