import asyncio import threading import time from collections import deque from typing import Any, Callable, Dict, List, Optional, Tuple import cv2 import numpy as np class StreamReader: def __init__( self, camera_id: str, rtsp_url: str, buffer_size: int = 2, reconnect_delay: float = 3.0, timeout: float = 10.0, ): self.camera_id = camera_id self.rtsp_url = rtsp_url self.buffer_size = buffer_size self.reconnect_delay = reconnect_delay self.timeout = timeout self.cap = None self.running = False self.thread: Optional[threading.Thread] = None self.frame_buffer: deque = deque(maxlen=buffer_size) self.lock = threading.Lock() self.fps = 0.0 self.last_frame_time = time.time() self.frame_count = 0 def _reader_loop(self): while self.running: with self.lock: if self.cap is None or not self.cap.isOpened(): self._connect() if self.cap is None: time.sleep(self.reconnect_delay) continue ret = False frame = None with self.lock: if self.cap is not None and self.cap.isOpened(): ret, frame = self.cap.read() if ret and frame is not None: self.frame_buffer.append(frame) self.frame_count += 1 current_time = time.time() if current_time - self.last_frame_time >= 1.0: self.fps = self.frame_count / (current_time - self.last_frame_time) self.frame_count = 0 self.last_frame_time = current_time else: time.sleep(0.1) def _connect(self): if self.cap is not None: try: self.cap.release() except Exception: pass self.cap = None try: self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) if self.cap.isOpened(): self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) print(f"[{self.camera_id}] RTSP连接成功: {self.rtsp_url[:50]}...") else: print(f"[{self.camera_id}] 无法打开视频流") self.cap = None except Exception as e: print(f"[{self.camera_id}] 连接失败: {e}") self.cap = None def start(self): if self.running: return self.running = True self.thread = threading.Thread(target=self._reader_loop, daemon=True) self.thread.start() def stop(self): if not self.running: return self.running = False if self.thread is not None: self.thread.join(timeout=5.0) if self.thread.is_alive(): print(f"[{self.camera_id}] 警告: 线程未能在5秒内结束") with self.lock: if self.cap is not None: try: self.cap.release() except Exception: pass self.cap = None self.frame_buffer.clear() def read(self) -> Tuple[bool, Optional[np.ndarray]]: if len(self.frame_buffer) > 0: return True, self.frame_buffer.popleft() return False, None def get_frame(self) -> Optional[np.ndarray]: if len(self.frame_buffer) > 0: return self.frame_buffer[-1] return None def is_connected(self) -> bool: with self.lock: return self.cap is not None and self.cap.isOpened() def get_info(self) -> Dict[str, Any]: return { "camera_id": self.camera_id, "running": self.running, "connected": self.is_connected(), "fps": self.fps, "buffer_size": len(self.frame_buffer), "buffer_max": self.buffer_size, } class StreamManager: def __init__(self, buffer_size: int = 2, reconnect_delay: float = 3.0): self.streams: Dict[str, StreamReader] = {} self.buffer_size = buffer_size self.reconnect_delay = reconnect_delay def add_stream( self, camera_id: str, rtsp_url: str, buffer_size: Optional[int] = None, ) -> StreamReader: if camera_id in self.streams: self.remove_stream(camera_id) stream = StreamReader( camera_id=camera_id, rtsp_url=rtsp_url, buffer_size=buffer_size or self.buffer_size, reconnect_delay=self.reconnect_delay, ) stream.start() self.streams[camera_id] = stream return stream def remove_stream(self, camera_id: str): if camera_id in self.streams: self.streams[camera_id].stop() del self.streams[camera_id] def get_stream(self, camera_id: str) -> Optional[StreamReader]: return self.streams.get(camera_id) def read(self, camera_id: str) -> Tuple[bool, Optional[np.ndarray]]: stream = self.get_stream(camera_id) if stream is None: return False, None return stream.read() def get_frame(self, camera_id: str) -> Optional[np.ndarray]: stream = self.get_stream(camera_id) if stream is None: return None return stream.get_frame() def stop_all(self): for camera_id in list(self.streams.keys()): self.remove_stream(camera_id) def get_all_info(self) -> List[Dict[str, Any]]: return [stream.get_info() for stream in self.streams.values()] def __contains__(self, camera_id: str) -> bool: return camera_id in self.streams def __len__(self) -> int: return len(self.streams)