#!/usr/bin/env python # -*- coding: utf-8 -*- """ 一个适用于MicroPython的、简单的线程安全队列实现。 这个模块提供了一个与标准库 `queue.Queue` 类似的类, 确保在多线程环境下的数据操作是安全的。 """ import _thread from collections import deque class Queue: """一个简单的、线程安全的队列。""" def __init__(self, maxsize=0): self.maxsize = maxsize self._lock = _thread.allocate_lock() # 使用deque可以实现更高效的头部弹出操作 (O(1)) self._items = deque((), maxsize if maxsize > 0 else 1024) def qsize(self): """返回队列中的项目数。""" with self._lock: return len(self._items) def empty(self): """如果队列为空,返回True,否则返回False。""" return self.qsize() == 0 def full(self): """如果队列已满,返回True,否则返回False。""" if self.maxsize <= 0: return False return self.qsize() >= self.maxsize def put(self, item, block=True, timeout=None): """将一个项目放入队列。""" if not block: return self.put_nowait(item) # 阻塞式put的简单实现 (在实际应用中更复杂的实现会使用信号量) while True: with self._lock: if not self.full(): self._items.append(item) return # 如果队列是满的,短暂休眠后重试 import time time.sleep_ms(5) def put_nowait(self, item): """等同于 put(item, block=False)。""" if self.full(): raise OSError("Queue full") with self._lock: self._items.append(item) def get(self, block=True, timeout=None): """从队列中移除并返回一个项目。""" if not block: return self.get_nowait() # 阻塞式get的简单实现 while True: with self._lock: if self._items: return self._items.popleft() # 如果队列是空的,短暂休眠后重试 import time time.sleep_ms(5) def get_nowait(self): """等同于 get(item, block=False)。""" if self.empty(): raise OSError("Queue empty") with self._lock: return self._items.popleft()