Files
Security_AI_integrated/inference/stream.py

193 lines
5.8 KiB
Python
Raw Normal View History

2026-01-20 17:42:18 +08:00
import asyncio
import threading
import time
from collections import deque
from typing import Any, Callable, Dict, List, Optional, Tuple
import cv2
import numpy as np
class StreamReader:
def __init__(
self,
camera_id: str,
rtsp_url: str,
buffer_size: int = 2,
reconnect_delay: float = 3.0,
timeout: float = 10.0,
):
self.camera_id = camera_id
self.rtsp_url = rtsp_url
self.buffer_size = buffer_size
self.reconnect_delay = reconnect_delay
self.timeout = timeout
self.cap = None
self.running = False
self.thread: Optional[threading.Thread] = None
self.frame_buffer: deque = deque(maxlen=buffer_size)
self.lock = threading.Lock()
self.fps = 0.0
self.last_frame_time = time.time()
self.frame_count = 0
def _reader_loop(self):
while self.running:
with self.lock:
if self.cap is None or not self.cap.isOpened():
self._connect()
if self.cap is None:
time.sleep(self.reconnect_delay)
continue
ret = False
frame = None
with self.lock:
if self.cap is not None and self.cap.isOpened():
ret, frame = self.cap.read()
if ret and frame is not None:
self.frame_buffer.append(frame)
self.frame_count += 1
current_time = time.time()
if current_time - self.last_frame_time >= 1.0:
self.fps = self.frame_count / (current_time - self.last_frame_time)
self.frame_count = 0
self.last_frame_time = current_time
else:
time.sleep(0.1)
def _connect(self):
if self.cap is not None:
try:
self.cap.release()
except Exception:
pass
self.cap = None
try:
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
if self.cap.isOpened():
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
print(f"[{self.camera_id}] RTSP连接成功: {self.rtsp_url[:50]}...")
else:
print(f"[{self.camera_id}] 无法打开视频流")
self.cap = None
except Exception as e:
print(f"[{self.camera_id}] 连接失败: {e}")
self.cap = None
def start(self):
if self.running:
return
self.running = True
self.thread = threading.Thread(target=self._reader_loop, daemon=True)
self.thread.start()
def stop(self):
if not self.running:
return
self.running = False
if self.thread is not None:
self.thread.join(timeout=5.0)
if self.thread.is_alive():
print(f"[{self.camera_id}] 警告: 线程未能在5秒内结束")
with self.lock:
if self.cap is not None:
try:
self.cap.release()
except Exception:
pass
self.cap = None
self.frame_buffer.clear()
def read(self) -> Tuple[bool, Optional[np.ndarray]]:
if len(self.frame_buffer) > 0:
return True, self.frame_buffer.popleft()
return False, None
def get_frame(self) -> Optional[np.ndarray]:
if len(self.frame_buffer) > 0:
return self.frame_buffer[-1]
return None
def is_connected(self) -> bool:
with self.lock:
return self.cap is not None and self.cap.isOpened()
def get_info(self) -> Dict[str, Any]:
return {
"camera_id": self.camera_id,
"running": self.running,
"connected": self.is_connected(),
"fps": self.fps,
"buffer_size": len(self.frame_buffer),
"buffer_max": self.buffer_size,
}
class StreamManager:
def __init__(self, buffer_size: int = 2, reconnect_delay: float = 3.0):
self.streams: Dict[str, StreamReader] = {}
self.buffer_size = buffer_size
self.reconnect_delay = reconnect_delay
def add_stream(
self,
camera_id: str,
rtsp_url: str,
buffer_size: Optional[int] = None,
) -> StreamReader:
if camera_id in self.streams:
self.remove_stream(camera_id)
stream = StreamReader(
camera_id=camera_id,
rtsp_url=rtsp_url,
buffer_size=buffer_size or self.buffer_size,
reconnect_delay=self.reconnect_delay,
)
stream.start()
self.streams[camera_id] = stream
return stream
def remove_stream(self, camera_id: str):
if camera_id in self.streams:
self.streams[camera_id].stop()
del self.streams[camera_id]
def get_stream(self, camera_id: str) -> Optional[StreamReader]:
return self.streams.get(camera_id)
def read(self, camera_id: str) -> Tuple[bool, Optional[np.ndarray]]:
stream = self.get_stream(camera_id)
if stream is None:
return False, None
return stream.read()
def get_frame(self, camera_id: str) -> Optional[np.ndarray]:
stream = self.get_stream(camera_id)
if stream is None:
return None
return stream.get_frame()
def stop_all(self):
for camera_id in list(self.streams.keys()):
self.remove_stream(camera_id)
def get_all_info(self) -> List[Dict[str, Any]]:
return [stream.get_info() for stream in self.streams.values()]
def __contains__(self, camera_id: str) -> bool:
return camera_id in self.streams
def __len__(self) -> int:
return len(self.streams)