Python-消息队列模块queue使用指南
queue 模块是 Python 标准库中的一个重要模块,主要用于提供线程安全的队列类,适用于多线程编程中不同线程之间的数据交换。它提供了 3 种类型的队列:
- queue.Queue:FIFO(先进先出)队列
- queue.LifoQueue:LIFO(后进先出)队列
- queue.PriorityQueue:优先级队列
每种队列都具有线程安全特性,能够在多线程环境中安全地使用。
1. 导入 queue 模块
queue 是 Python 内置模块,无需安装,可以直接导入:
import queue
2. queue.Queue(FIFO队列)
queue.Queue 是最常用的队列类型,它实现了先进先出(FIFO)的队列行为。常用于任务队列、数据流传递等场景。
2.1 基本操作
import queue
# 创建一个队列,指定最大队列大小(这里设置为3)
q = queue.Queue(maxsize=3)
# 向队列中添加元素
q.put(1)
q.put(2)
q.put(3)
# 获取队列中的元素
print(q.get())
# 输出:1
print(q.get())
# 输出:2
# 向队列添加元素,队列已满时会阻塞#
q.put(4)
# 阻塞
# 使用非阻塞模式向队列添加元素(如果队列满了则抛出异常)
try:
q.put_nowait(4)
except queue.Full:
print("队列已满")
# 使用非阻塞模式从队列获取元素(如果队列空则抛出异常)
try:
print(q.get_nowait())
except queue.Empty:
print("队列为空")
2.2 阻塞与非阻塞
put(block=True, timeout=None): 将元素放入队列,默认阻塞直到有空间可用。
get(block=True, timeout=None): 从队列中取出元素,默认阻塞直到有元素可取。
示例:阻塞和非阻塞
import timeimport queue
q = queue.Queue(maxsize=3)
# 异步加入任务
def producer():
for i in range(5):
print(f"生产者生产了 {i}")
q.put(i) # 可能会阻塞
time.sleep(1)
# 异步获取任务
def consumer():
while True:
try:
item = q.get(timeout=2) # 设置超时,2秒内没有任务就抛出异常
print(f"消费者消费了 {item}")
q.task_done()
except queue.Empty:
print("队列为空,消费者停止")
break
import threading
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
3. queue.LifoQueue(LIFO队列)
LifoQueue 是后进先出(LIFO)队列,行为类似于栈。元素的获取顺序是反向的,先入的元素最后被取出。
示例:LIFO 队列
import queue
# 创建 LIFO 队列
q = queue.LifoQueue()
# 添加元素
q.put(1)
q.put(2)
q.put(3)
# 获取元素
print(q.get())
# 输出:3
print(q.get())
# 输出:2
4. queue.PriorityQueue(优先级队列)
PriorityQueue 是一个基于优先级的队列。每个元素都是一个 (priority, data) 元组,其中 priority 是优先级,数值越小的优先级越高。队列会按照优先级的顺序返回元素。
示例:PriorityQueue 队列
import queue
# 创建优先级队列
pq = queue.PriorityQueue()
# 插入元素(优先级, 数据)
pq.put((3, 'task 3'))
pq.put((1, 'task 1'))
pq.put((2, 'task 2'))
# 获取元素,按优先级顺序
print(pq.get()) # 输出:(1, 'task 1')
print(pq.get()) # 输出:(2, 'task 2')
print(pq.get()) # 输出:(3, 'task 3')
示例:自定义优先级队列
import queue
# 创建优先级队列
pq = queue.PriorityQueue()
# 插入元素(优先级, 数据)
pq.put((5, 'task 5'))
pq.put((1, 'task 1'))
pq.put((3, 'task 3'))
# 获取元素,按优先级顺序
print(pq.get()) # 输出:(1, 'task 1')
print(pq.get()) # 输出:(3, 'task 3')
print(pq.get()) # 输出:(5, 'task 5')
5. 队列的常见方法
5.1 put()
将元素添加到队列末尾。
q.put(10) # 将元素10添加到队列
5.2 get()
从队列中取出元素。
item = q.get() # 获取并移除队列中的第一个元素
5.3 task_done()
调用 task_done() 来通知队列任务已完成。当使用 join() 来等待队列中所有任务完成时,必须调用 task_done()。
q.task_done() # 通知任务完成
5.4 join()
join() 会等待队列中所有任务完成后再返回,通常与 task_done() 配合使用。
q.join() # 阻塞直到队列中所有任务完成
5.5 empty()
检查队列是否为空。
is_empty = q.empty() # 返回 True 如果队列为空
5.6 full()
检查队列是否已满。
is_full = q.full() # 返回 True 如果队列已满
6. 使用 queue 模块的最佳实践
线程安全:queue 模块中的队列类(Queue、LifoQueue、PriorityQueue)是线程安全的,可以在多线程环境下使用。不要使用普通的列表替代队列。
使用 task_done() 和 join():如果你在多线程中使用队列,确保每个任务完成后调用 task_done(),并在主线程中使用 join() 等待所有任务完成。
处理 queue.Empty 和 queue.Full 异常:使用非阻塞方式时,务必捕获并处理 queue.Empty 或 queue.Full 异常,以避免程序崩溃。
Python 的 queue 模块提供了线程安全的队列类,适用于多线程编程中数据交换的需求。常用的队列有 queue.Queue(FIFO)、queue.LifoQueue(LIFO)和 queue.PriorityQueue(优先级队列)。通过正确使用这些队列类型和相关方法,可以有效地管理多线程任务和数据流。
作者:网络风云