193 lines
5.8 KiB
Python
193 lines
5.8 KiB
Python
import asyncio
|
|
import threading
|
|
import time
|
|
from collections import deque
|
|
from typing import Any, Callable, Dict, List, Optional, Tuple
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
|
|
class StreamReader:
|
|
def __init__(
|
|
self,
|
|
camera_id: str,
|
|
rtsp_url: str,
|
|
buffer_size: int = 2,
|
|
reconnect_delay: float = 3.0,
|
|
timeout: float = 10.0,
|
|
):
|
|
self.camera_id = camera_id
|
|
self.rtsp_url = rtsp_url
|
|
self.buffer_size = buffer_size
|
|
self.reconnect_delay = reconnect_delay
|
|
self.timeout = timeout
|
|
|
|
self.cap = None
|
|
self.running = False
|
|
self.thread: Optional[threading.Thread] = None
|
|
self.frame_buffer: deque = deque(maxlen=buffer_size)
|
|
self.lock = threading.Lock()
|
|
self.fps = 0.0
|
|
self.last_frame_time = time.time()
|
|
self.frame_count = 0
|
|
|
|
def _reader_loop(self):
|
|
while self.running:
|
|
with self.lock:
|
|
if self.cap is None or not self.cap.isOpened():
|
|
self._connect()
|
|
if self.cap is None:
|
|
time.sleep(self.reconnect_delay)
|
|
continue
|
|
|
|
ret = False
|
|
frame = None
|
|
with self.lock:
|
|
if self.cap is not None and self.cap.isOpened():
|
|
ret, frame = self.cap.read()
|
|
|
|
if ret and frame is not None:
|
|
self.frame_buffer.append(frame)
|
|
self.frame_count += 1
|
|
current_time = time.time()
|
|
|
|
if current_time - self.last_frame_time >= 1.0:
|
|
self.fps = self.frame_count / (current_time - self.last_frame_time)
|
|
self.frame_count = 0
|
|
self.last_frame_time = current_time
|
|
else:
|
|
time.sleep(0.1)
|
|
|
|
def _connect(self):
|
|
if self.cap is not None:
|
|
try:
|
|
self.cap.release()
|
|
except Exception:
|
|
pass
|
|
self.cap = None
|
|
|
|
try:
|
|
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
|
|
if self.cap.isOpened():
|
|
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
|
print(f"[{self.camera_id}] RTSP连接成功: {self.rtsp_url[:50]}...")
|
|
else:
|
|
print(f"[{self.camera_id}] 无法打开视频流")
|
|
self.cap = None
|
|
except Exception as e:
|
|
print(f"[{self.camera_id}] 连接失败: {e}")
|
|
self.cap = None
|
|
|
|
def start(self):
|
|
if self.running:
|
|
return
|
|
|
|
self.running = True
|
|
self.thread = threading.Thread(target=self._reader_loop, daemon=True)
|
|
self.thread.start()
|
|
|
|
def stop(self):
|
|
if not self.running:
|
|
return
|
|
|
|
self.running = False
|
|
|
|
if self.thread is not None:
|
|
self.thread.join(timeout=5.0)
|
|
if self.thread.is_alive():
|
|
print(f"[{self.camera_id}] 警告: 线程未能在5秒内结束")
|
|
|
|
with self.lock:
|
|
if self.cap is not None:
|
|
try:
|
|
self.cap.release()
|
|
except Exception:
|
|
pass
|
|
self.cap = None
|
|
|
|
self.frame_buffer.clear()
|
|
|
|
def read(self) -> Tuple[bool, Optional[np.ndarray]]:
|
|
if len(self.frame_buffer) > 0:
|
|
return True, self.frame_buffer.popleft()
|
|
return False, None
|
|
|
|
def get_frame(self) -> Optional[np.ndarray]:
|
|
if len(self.frame_buffer) > 0:
|
|
return self.frame_buffer[-1]
|
|
return None
|
|
|
|
def is_connected(self) -> bool:
|
|
with self.lock:
|
|
return self.cap is not None and self.cap.isOpened()
|
|
|
|
def get_info(self) -> Dict[str, Any]:
|
|
return {
|
|
"camera_id": self.camera_id,
|
|
"running": self.running,
|
|
"connected": self.is_connected(),
|
|
"fps": self.fps,
|
|
"buffer_size": len(self.frame_buffer),
|
|
"buffer_max": self.buffer_size,
|
|
}
|
|
|
|
|
|
class StreamManager:
|
|
def __init__(self, buffer_size: int = 2, reconnect_delay: float = 3.0):
|
|
self.streams: Dict[str, StreamReader] = {}
|
|
self.buffer_size = buffer_size
|
|
self.reconnect_delay = reconnect_delay
|
|
|
|
def add_stream(
|
|
self,
|
|
camera_id: str,
|
|
rtsp_url: str,
|
|
buffer_size: Optional[int] = None,
|
|
) -> StreamReader:
|
|
if camera_id in self.streams:
|
|
self.remove_stream(camera_id)
|
|
|
|
stream = StreamReader(
|
|
camera_id=camera_id,
|
|
rtsp_url=rtsp_url,
|
|
buffer_size=buffer_size or self.buffer_size,
|
|
reconnect_delay=self.reconnect_delay,
|
|
)
|
|
stream.start()
|
|
self.streams[camera_id] = stream
|
|
return stream
|
|
|
|
def remove_stream(self, camera_id: str):
|
|
if camera_id in self.streams:
|
|
self.streams[camera_id].stop()
|
|
del self.streams[camera_id]
|
|
|
|
def get_stream(self, camera_id: str) -> Optional[StreamReader]:
|
|
return self.streams.get(camera_id)
|
|
|
|
def read(self, camera_id: str) -> Tuple[bool, Optional[np.ndarray]]:
|
|
stream = self.get_stream(camera_id)
|
|
if stream is None:
|
|
return False, None
|
|
return stream.read()
|
|
|
|
def get_frame(self, camera_id: str) -> Optional[np.ndarray]:
|
|
stream = self.get_stream(camera_id)
|
|
if stream is None:
|
|
return None
|
|
return stream.get_frame()
|
|
|
|
def stop_all(self):
|
|
for camera_id in list(self.streams.keys()):
|
|
self.remove_stream(camera_id)
|
|
|
|
def get_all_info(self) -> List[Dict[str, Any]]:
|
|
return [stream.get_info() for stream in self.streams.values()]
|
|
|
|
def __contains__(self, camera_id: str) -> bool:
|
|
return camera_id in self.streams
|
|
|
|
def __len__(self) -> int:
|
|
return len(self.streams)
|