Files
security-ai-edge/core/video_stream.py

491 lines
15 KiB
Python
Raw Permalink Normal View History

2026-01-29 18:33:12 +08:00
"""
视频流接入模块
实现基于OpenCV的RTSP流拉取支持多线程并发动态抽帧断线重连
"""
import queue
import threading
import time
import logging
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Tuple
from dataclasses import dataclass, field
import cv2
import numpy as np
from config.settings import get_settings, VideoStreamConfig
from utils.common import ExponentialBackoff, generate_unique_id
from utils.logger import get_logger
logger = logging.getLogger(__name__)
@dataclass
class VideoFrame:
"""视频帧数据类"""
frame_id: str
camera_id: str
image: np.ndarray
timestamp: datetime
frame_number: int
width: int
height: int
fps: float
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"frame_id": self.frame_id,
"camera_id": self.camera_id,
"timestamp": self.timestamp.isoformat(),
"frame_number": self.frame_number,
"width": self.width,
"height": self.height,
"fps": self.fps,
}
@dataclass
class StreamStatus:
"""流状态信息类"""
camera_id: str
is_connected: bool
is_running: bool
last_frame_time: Optional[datetime]
frame_count: int
reconnect_attempts: int
error_message: Optional[str]
class RTSPStreamReader:
"""RTSP流读取器类"""
def __init__(
self,
camera_id: str,
rtsp_url: str,
target_fps: int = 5,
frame_buffer_size: int = 30,
on_frame_callback: Optional[Callable[[VideoFrame], None]] = None,
config: Optional[VideoStreamConfig] = None
):
self.camera_id = camera_id
self.rtsp_url = rtsp_url
self.target_fps = target_fps
if config is None:
settings = get_settings()
config = settings.video_stream
self.config = config
self._cap = None
self._frame_buffer: queue.Queue = queue.Queue(maxsize=frame_buffer_size)
self._running = False
self._connected = False
self._frame_count = 0
self._last_frame_time: Optional[datetime] = None
self._on_frame_callback = on_frame_callback
self._read_thread: Optional[threading.Thread] = None
self._process_thread: Optional[threading.Thread] = None
self._reconnect_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._reconnect_event = threading.Event()
self._reconnect_backoff = ExponentialBackoff(
base_delay=config.reconnect_base_delay,
max_delay=config.reconnect_max_delay,
max_attempts=config.reconnect_max_attempts
)
self._logger = get_logger("video_stream")
self._lock = threading.Lock()
@property
def is_connected(self) -> bool:
"""检查是否已连接"""
return self._connected
@property
def is_running(self) -> bool:
"""检查是否正在运行"""
return self._running
@property
def frame_count(self) -> int:
"""获取已读取帧数"""
return self._frame_count
@property
def status(self) -> StreamStatus:
"""获取流状态"""
return StreamStatus(
camera_id=self.camera_id,
is_connected=self._connected,
is_running=self._running,
last_frame_time=self._last_frame_time,
frame_count=self._frame_count,
reconnect_attempts=self._reconnect_backoff.current_attempt,
error_message=None
)
def _connect(self) -> bool:
"""建立RTSP连接"""
try:
if self._cap is not None:
self._cap.release()
self._cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
self._cap.set(cv2.CAP_PROP_BUFFERSIZE, 3)
if not self._cap.isOpened():
self._logger.error(f"无法打开视频流: {self.rtsp_url}")
return False
actual_fps = self._cap.get(cv2.CAP_PROP_FPS)
self._logger.log_connection_event(
"connect", "RTSP", self.camera_id, True
)
self._connected = True
self._reconnect_backoff.reset()
return True
except Exception as e:
self._logger.error(f"RTSP连接异常: {e}")
return False
def _disconnect(self):
"""断开连接"""
if self._cap is not None:
try:
self._cap.release()
except Exception:
pass
self._cap = None
self._connected = False
self._logger.log_connection_event(
"disconnect", "RTSP", self.camera_id, True
)
def _read_frame(self):
"""读取帧线程函数(带帧率控制)"""
frame_interval = 1.0 / self.target_fps
last_process_time = 0.0
2026-01-29 18:33:12 +08:00
while not self._stop_event.is_set():
if not self._connected:
if not self._reconnect():
time.sleep(1)
continue
2026-01-29 18:33:12 +08:00
try:
ret, frame = self._cap.read()
2026-01-29 18:33:12 +08:00
if not ret or frame is None:
self._logger.warning(f"读取帧失败: {self.camera_id}")
self._connected = False
continue
2026-01-29 18:33:12 +08:00
self._frame_count += 1
current_time_mono = time.monotonic()
# 帧率控制:跳过不满足间隔的帧
if (current_time_mono - last_process_time) < frame_interval:
continue
last_process_time = current_time_mono
2026-01-29 18:33:12 +08:00
current_time = datetime.now()
self._last_frame_time = current_time
2026-01-29 18:33:12 +08:00
frame_obj = VideoFrame(
frame_id=generate_unique_id("frame"),
camera_id=self.camera_id,
image=frame,
timestamp=current_time,
frame_number=self._frame_count,
width=frame.shape[1],
height=frame.shape[0],
fps=self.target_fps
)
2026-01-29 18:33:12 +08:00
try:
if self._frame_buffer.full():
try:
self._frame_buffer.get_nowait()
except queue.Empty:
pass
2026-01-29 18:33:12 +08:00
self._frame_buffer.put_nowait(frame_obj)
2026-01-29 18:33:12 +08:00
if self._on_frame_callback:
self._on_frame_callback(frame_obj)
2026-01-29 18:33:12 +08:00
except queue.Full:
self._logger.debug(f"帧缓冲区已满: {self.camera_id}")
2026-01-29 18:33:12 +08:00
except Exception as e:
self._logger.error(f"读取帧异常: {e}")
self._connected = False
def _reconnect(self) -> bool:
"""执行重连"""
if not self._reconnect_backoff.next_attempt():
self._logger.error(f"重连次数已达上限: {self.camera_id}")
return False
delay = self._reconnect_backoff.get_delay()
self._logger.warning(
f"准备重连 {self.camera_id}, 等待 {delay:.1f}"
)
time.sleep(delay)
self._disconnect()
if self._connect():
self._logger.info(f"重连成功: {self.camera_id}")
return True
else:
self._logger.error(f"重连失败: {self.camera_id}")
return False
def set_frame_callback(self, callback: Callable[[VideoFrame], None]):
"""设置帧处理回调"""
self._on_frame_callback = callback
def start(self):
"""启动流读取"""
if self._running:
return
self._stop_event.clear()
if not self._connect():
raise RuntimeError(f"无法连接到视频流: {self.rtsp_url}")
self._running = True
self._read_thread = threading.Thread(
target=self._read_frame,
name=f"StreamReader-{self.camera_id}",
daemon=True
)
self._read_thread.start()
self._logger.info(f"视频流已启动: {self.camera_id} @ {self.target_fps}fps")
def stop(self):
"""停止流读取"""
self._running = False
self._stop_event.set()
if self._read_thread and self._read_thread.is_alive():
self._read_thread.join(timeout=5)
self._disconnect()
while not self._frame_buffer.empty():
try:
self._frame_buffer.get_nowait()
except queue.Empty:
break
self._logger.info(f"视频流已停止: {self.camera_id}")
def read(self, timeout: float = 1.0) -> Optional[VideoFrame]:
"""从缓冲区读取帧"""
try:
return self._frame_buffer.get(timeout=timeout)
except queue.Empty:
return None
def get_latest_frame(self, timeout: float = 1.0) -> Optional[VideoFrame]:
"""获取最新帧(丢弃中间帧)"""
try:
while True:
try:
frame = self._frame_buffer.get_nowait()
if self._frame_buffer.empty():
return frame
except queue.Empty:
return None
except Exception:
return None
def get_frame_batch(self, max_count: int = 8,
timeout: float = 2.0) -> List[VideoFrame]:
"""批量获取帧"""
frames = []
end_time = time.time() + timeout
while len(frames) < max_count and time.time() < end_time:
try:
remaining = end_time - time.time()
if remaining <= 0:
break
frame = self._frame_buffer.get(timeout=remaining)
frames.append(frame)
except queue.Empty:
break
return frames
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
"camera_id": self.camera_id,
"is_connected": self._connected,
"is_running": self._running,
"frame_count": self._frame_count,
"target_fps": self.target_fps,
"buffer_size": self._frame_buffer.qsize(),
"buffer_capacity": self._frame_buffer.maxsize,
"last_frame_time": (
self._last_frame_time.isoformat()
if self._last_frame_time else None
),
}
class MultiStreamManager:
"""多流管理器类"""
def __init__(self, config: Optional[VideoStreamConfig] = None):
self._streams: Dict[str, RTSPStreamReader] = {}
self._config = config
self._logger = get_logger("multi_stream")
self._lock = threading.Lock()
def add_stream(
self,
camera_id: str,
rtsp_url: str,
target_fps: int = 5,
frame_buffer_size: int = 30,
on_frame_callback: Optional[Callable[[VideoFrame], None]] = None
) -> RTSPStreamReader:
"""添加视频流"""
with self._lock:
if camera_id in self._streams:
raise ValueError(f"摄像头已存在: {camera_id}")
stream = RTSPStreamReader(
camera_id=camera_id,
rtsp_url=rtsp_url,
target_fps=target_fps,
frame_buffer_size=frame_buffer_size,
on_frame_callback=on_frame_callback,
config=self._config
)
self._streams[camera_id] = stream
self._logger.info(f"视频流已添加: {camera_id}")
return stream
def remove_stream(self, camera_id: str):
"""移除视频流"""
with self._lock:
if camera_id not in self._streams:
return
stream = self._streams[camera_id]
stream.stop()
del self._streams[camera_id]
self._logger.info(f"视频流已移除: {camera_id}")
def start_stream(self, camera_id: str):
"""启动指定视频流"""
with self._lock:
if camera_id not in self._streams:
raise ValueError(f"摄像头不存在: {camera_id}")
self._streams[camera_id].start()
def start_all(self):
"""启动所有视频流(跳过连接失败的流)"""
2026-01-29 18:33:12 +08:00
with self._lock:
failed = []
for camera_id, stream in self._streams.items():
try:
stream.start()
except Exception as e:
self._logger.warning(f"视频流启动失败,跳过: {camera_id} - {e}")
failed.append(camera_id)
started = len(self._streams) - len(failed)
self._logger.info(f"已启动 {started}/{len(self._streams)} 个视频流"
+ (f"{len(failed)} 个失败" if failed else ""))
2026-01-29 18:33:12 +08:00
def stop_stream(self, camera_id: str):
"""停止指定视频流"""
with self._lock:
if camera_id in self._streams:
self._streams[camera_id].stop()
def stop_all(self):
"""停止所有视频流"""
with self._lock:
for stream in self._streams.values():
stream.stop()
self._logger.info("已停止所有视频流")
def get_stream(self, camera_id: str) -> Optional[RTSPStreamReader]:
"""获取指定视频流"""
return self._streams.get(camera_id)
def get_all_streams(self) -> List[RTSPStreamReader]:
"""获取所有视频流"""
with self._lock:
return list(self._streams.values())
def get_all_status(self) -> List[StreamStatus]:
"""获取所有流状态"""
with self._lock:
return [stream.status for stream in self._streams.values()]
def get_statistics(self) -> Dict[str, Any]:
"""获取管理器统计信息"""
with self._lock:
return {
"total_streams": len(self._streams),
"running_streams": sum(
1 for s in self._streams.values() if s.is_running
),
"connected_streams": sum(
1 for s in self._streams.values() if s.is_connected
),
"streams": {
cid: s.get_statistics()
for cid, s in self._streams.items()
}
}
def close(self):
"""关闭管理器"""
self.stop_all()
with self._lock:
self._streams.clear()
def create_stream_reader(
camera_id: str,
rtsp_url: str,
target_fps: int = 5,
frame_buffer_size: int = 30,
on_frame_callback: Optional[Callable[[VideoFrame], None]] = None
) -> RTSPStreamReader:
"""创建视频流读取器的便捷函数"""
return RTSPStreamReader(
camera_id=camera_id,
rtsp_url=rtsp_url,
target_fps=target_fps,
frame_buffer_size=frame_buffer_size,
on_frame_callback=on_frame_callback
)