Files
Security_AI_integrated/inference/stream.py
2026-01-20 17:42:18 +08:00

193 lines
5.8 KiB
Python

import asyncio
import threading
import time
from collections import deque
from typing import Any, Callable, Dict, List, Optional, Tuple
import cv2
import numpy as np
class StreamReader:
def __init__(
self,
camera_id: str,
rtsp_url: str,
buffer_size: int = 2,
reconnect_delay: float = 3.0,
timeout: float = 10.0,
):
self.camera_id = camera_id
self.rtsp_url = rtsp_url
self.buffer_size = buffer_size
self.reconnect_delay = reconnect_delay
self.timeout = timeout
self.cap = None
self.running = False
self.thread: Optional[threading.Thread] = None
self.frame_buffer: deque = deque(maxlen=buffer_size)
self.lock = threading.Lock()
self.fps = 0.0
self.last_frame_time = time.time()
self.frame_count = 0
def _reader_loop(self):
while self.running:
with self.lock:
if self.cap is None or not self.cap.isOpened():
self._connect()
if self.cap is None:
time.sleep(self.reconnect_delay)
continue
ret = False
frame = None
with self.lock:
if self.cap is not None and self.cap.isOpened():
ret, frame = self.cap.read()
if ret and frame is not None:
self.frame_buffer.append(frame)
self.frame_count += 1
current_time = time.time()
if current_time - self.last_frame_time >= 1.0:
self.fps = self.frame_count / (current_time - self.last_frame_time)
self.frame_count = 0
self.last_frame_time = current_time
else:
time.sleep(0.1)
def _connect(self):
if self.cap is not None:
try:
self.cap.release()
except Exception:
pass
self.cap = None
try:
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
if self.cap.isOpened():
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
print(f"[{self.camera_id}] RTSP连接成功: {self.rtsp_url[:50]}...")
else:
print(f"[{self.camera_id}] 无法打开视频流")
self.cap = None
except Exception as e:
print(f"[{self.camera_id}] 连接失败: {e}")
self.cap = None
def start(self):
if self.running:
return
self.running = True
self.thread = threading.Thread(target=self._reader_loop, daemon=True)
self.thread.start()
def stop(self):
if not self.running:
return
self.running = False
if self.thread is not None:
self.thread.join(timeout=5.0)
if self.thread.is_alive():
print(f"[{self.camera_id}] 警告: 线程未能在5秒内结束")
with self.lock:
if self.cap is not None:
try:
self.cap.release()
except Exception:
pass
self.cap = None
self.frame_buffer.clear()
def read(self) -> Tuple[bool, Optional[np.ndarray]]:
if len(self.frame_buffer) > 0:
return True, self.frame_buffer.popleft()
return False, None
def get_frame(self) -> Optional[np.ndarray]:
if len(self.frame_buffer) > 0:
return self.frame_buffer[-1]
return None
def is_connected(self) -> bool:
with self.lock:
return self.cap is not None and self.cap.isOpened()
def get_info(self) -> Dict[str, Any]:
return {
"camera_id": self.camera_id,
"running": self.running,
"connected": self.is_connected(),
"fps": self.fps,
"buffer_size": len(self.frame_buffer),
"buffer_max": self.buffer_size,
}
class StreamManager:
def __init__(self, buffer_size: int = 2, reconnect_delay: float = 3.0):
self.streams: Dict[str, StreamReader] = {}
self.buffer_size = buffer_size
self.reconnect_delay = reconnect_delay
def add_stream(
self,
camera_id: str,
rtsp_url: str,
buffer_size: Optional[int] = None,
) -> StreamReader:
if camera_id in self.streams:
self.remove_stream(camera_id)
stream = StreamReader(
camera_id=camera_id,
rtsp_url=rtsp_url,
buffer_size=buffer_size or self.buffer_size,
reconnect_delay=self.reconnect_delay,
)
stream.start()
self.streams[camera_id] = stream
return stream
def remove_stream(self, camera_id: str):
if camera_id in self.streams:
self.streams[camera_id].stop()
del self.streams[camera_id]
def get_stream(self, camera_id: str) -> Optional[StreamReader]:
return self.streams.get(camera_id)
def read(self, camera_id: str) -> Tuple[bool, Optional[np.ndarray]]:
stream = self.get_stream(camera_id)
if stream is None:
return False, None
return stream.read()
def get_frame(self, camera_id: str) -> Optional[np.ndarray]:
stream = self.get_stream(camera_id)
if stream is None:
return None
return stream.get_frame()
def stop_all(self):
for camera_id in list(self.streams.keys()):
self.remove_stream(camera_id)
def get_all_info(self) -> List[Dict[str, Any]]:
return [stream.get_info() for stream in self.streams.values()]
def __contains__(self, camera_id: str) -> bool:
return camera_id in self.streams
def __len__(self) -> int:
return len(self.streams)