Files
pig-house-controller/main/uqueue.py

82 lines
2.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
一个适用于MicroPython的、简单的线程安全队列实现。
这个模块提供了一个与标准库 `queue.Queue` 类似的类,
确保在多线程环境下的数据操作是安全的。
"""
import _thread
from collections import deque
class Queue:
"""一个简单的、线程安全的队列。"""
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._lock = _thread.allocate_lock()
# 使用deque可以实现更高效的头部弹出操作 (O(1))
self._items = deque((), maxsize if maxsize > 0 else 1024)
def qsize(self):
"""返回队列中的项目数。"""
with self._lock:
return len(self._items)
def empty(self):
"""如果队列为空返回True否则返回False。"""
return self.qsize() == 0
def full(self):
"""如果队列已满返回True否则返回False。"""
if self.maxsize <= 0:
return False
return self.qsize() >= self.maxsize
def put(self, item, block=True, timeout=None):
"""将一个项目放入队列。"""
if not block:
return self.put_nowait(item)
# 阻塞式put的简单实现 (在实际应用中更复杂的实现会使用信号量)
while True:
with self._lock:
if not self.full():
self._items.append(item)
return
# 如果队列是满的,短暂休眠后重试
import time
time.sleep_ms(5)
def put_nowait(self, item):
"""等同于 put(item, block=False)。"""
if self.full():
raise OSError("Queue full")
with self._lock:
self._items.append(item)
def get(self, block=True, timeout=None):
"""从队列中移除并返回一个项目。"""
if not block:
return self.get_nowait()
# 阻塞式get的简单实现
while True:
with self._lock:
if self._items:
return self._items.popleft()
# 如果队列是空的,短暂休眠后重试
import time
time.sleep_ms(5)
def get_nowait(self):
"""等同于 get(item, block=False)。"""
if self.empty():
raise OSError("Queue empty")
with self._lock:
return self._items.popleft()