- Moved all project files and directories (config, core, models, etc.) from edge_inference_service/ to the repository root ai_edge/ - Updated model path in config/settings.py to reflect new structure - Revised usage paths in __init__.py documentation
474 lines
15 KiB
Python
474 lines
15 KiB
Python
"""
|
||
视频流接入模块
|
||
实现基于OpenCV的RTSP流拉取,支持多线程并发、动态抽帧、断线重连
|
||
"""
|
||
|
||
import queue
|
||
import threading
|
||
import time
|
||
import logging
|
||
from datetime import datetime
|
||
from typing import Any, Callable, Dict, List, Optional, Tuple
|
||
from dataclasses import dataclass, field
|
||
|
||
import cv2
|
||
import numpy as np
|
||
|
||
from config.settings import get_settings, VideoStreamConfig
|
||
from utils.common import ExponentialBackoff, generate_unique_id
|
||
from utils.logger import get_logger
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@dataclass
|
||
class VideoFrame:
|
||
"""视频帧数据类"""
|
||
frame_id: str
|
||
camera_id: str
|
||
image: np.ndarray
|
||
timestamp: datetime
|
||
frame_number: int
|
||
width: int
|
||
height: int
|
||
fps: float
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
"""转换为字典"""
|
||
return {
|
||
"frame_id": self.frame_id,
|
||
"camera_id": self.camera_id,
|
||
"timestamp": self.timestamp.isoformat(),
|
||
"frame_number": self.frame_number,
|
||
"width": self.width,
|
||
"height": self.height,
|
||
"fps": self.fps,
|
||
}
|
||
|
||
|
||
@dataclass
|
||
class StreamStatus:
|
||
"""流状态信息类"""
|
||
camera_id: str
|
||
is_connected: bool
|
||
is_running: bool
|
||
last_frame_time: Optional[datetime]
|
||
frame_count: int
|
||
reconnect_attempts: int
|
||
error_message: Optional[str]
|
||
|
||
|
||
class RTSPStreamReader:
|
||
"""RTSP流读取器类"""
|
||
|
||
def __init__(
|
||
self,
|
||
camera_id: str,
|
||
rtsp_url: str,
|
||
target_fps: int = 5,
|
||
frame_buffer_size: int = 30,
|
||
on_frame_callback: Optional[Callable[[VideoFrame], None]] = None,
|
||
config: Optional[VideoStreamConfig] = None
|
||
):
|
||
self.camera_id = camera_id
|
||
self.rtsp_url = rtsp_url
|
||
self.target_fps = target_fps
|
||
|
||
if config is None:
|
||
settings = get_settings()
|
||
config = settings.video_stream
|
||
|
||
self.config = config
|
||
|
||
self._cap = None
|
||
self._frame_buffer: queue.Queue = queue.Queue(maxsize=frame_buffer_size)
|
||
self._running = False
|
||
self._connected = False
|
||
self._frame_count = 0
|
||
self._last_frame_time: Optional[datetime] = None
|
||
self._on_frame_callback = on_frame_callback
|
||
|
||
self._read_thread: Optional[threading.Thread] = None
|
||
self._process_thread: Optional[threading.Thread] = None
|
||
self._reconnect_thread: Optional[threading.Thread] = None
|
||
self._stop_event = threading.Event()
|
||
self._reconnect_event = threading.Event()
|
||
|
||
self._reconnect_backoff = ExponentialBackoff(
|
||
base_delay=config.reconnect_base_delay,
|
||
max_delay=config.reconnect_max_delay,
|
||
max_attempts=config.reconnect_max_attempts
|
||
)
|
||
|
||
self._logger = get_logger("video_stream")
|
||
self._lock = threading.Lock()
|
||
|
||
@property
|
||
def is_connected(self) -> bool:
|
||
"""检查是否已连接"""
|
||
return self._connected
|
||
|
||
@property
|
||
def is_running(self) -> bool:
|
||
"""检查是否正在运行"""
|
||
return self._running
|
||
|
||
@property
|
||
def frame_count(self) -> int:
|
||
"""获取已读取帧数"""
|
||
return self._frame_count
|
||
|
||
@property
|
||
def status(self) -> StreamStatus:
|
||
"""获取流状态"""
|
||
return StreamStatus(
|
||
camera_id=self.camera_id,
|
||
is_connected=self._connected,
|
||
is_running=self._running,
|
||
last_frame_time=self._last_frame_time,
|
||
frame_count=self._frame_count,
|
||
reconnect_attempts=self._reconnect_backoff.current_attempt,
|
||
error_message=None
|
||
)
|
||
|
||
def _connect(self) -> bool:
|
||
"""建立RTSP连接"""
|
||
try:
|
||
if self._cap is not None:
|
||
self._cap.release()
|
||
|
||
self._cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
|
||
|
||
self._cap.set(cv2.CAP_PROP_BUFFERSIZE, 3)
|
||
|
||
if not self._cap.isOpened():
|
||
self._logger.error(f"无法打开视频流: {self.rtsp_url}")
|
||
return False
|
||
|
||
actual_fps = self._cap.get(cv2.CAP_PROP_FPS)
|
||
self._logger.log_connection_event(
|
||
"connect", "RTSP", self.camera_id, True
|
||
)
|
||
|
||
self._connected = True
|
||
self._reconnect_backoff.reset()
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
self._logger.error(f"RTSP连接异常: {e}")
|
||
return False
|
||
|
||
def _disconnect(self):
|
||
"""断开连接"""
|
||
if self._cap is not None:
|
||
try:
|
||
self._cap.release()
|
||
except Exception:
|
||
pass
|
||
self._cap = None
|
||
|
||
self._connected = False
|
||
self._logger.log_connection_event(
|
||
"disconnect", "RTSP", self.camera_id, True
|
||
)
|
||
|
||
def _read_frame(self):
|
||
"""读取帧线程函数"""
|
||
while not self._stop_event.is_set():
|
||
if not self._connected:
|
||
if not self._reconnect():
|
||
time.sleep(1)
|
||
continue
|
||
|
||
try:
|
||
ret, frame = self._cap.read()
|
||
|
||
if not ret or frame is None:
|
||
self._logger.warning(f"读取帧失败: {self.camera_id}")
|
||
self._connected = False
|
||
continue
|
||
|
||
self._frame_count += 1
|
||
current_time = datetime.now()
|
||
self._last_frame_time = current_time
|
||
|
||
frame_obj = VideoFrame(
|
||
frame_id=generate_unique_id("frame"),
|
||
camera_id=self.camera_id,
|
||
image=frame,
|
||
timestamp=current_time,
|
||
frame_number=self._frame_count,
|
||
width=frame.shape[1],
|
||
height=frame.shape[0],
|
||
fps=self.target_fps
|
||
)
|
||
|
||
try:
|
||
if self._frame_buffer.full():
|
||
try:
|
||
self._frame_buffer.get_nowait()
|
||
except queue.Empty:
|
||
pass
|
||
|
||
self._frame_buffer.put_nowait(frame_obj)
|
||
|
||
if self._on_frame_callback:
|
||
self._on_frame_callback(frame_obj)
|
||
|
||
except queue.Full:
|
||
self._logger.debug(f"帧缓冲区已满: {self.camera_id}")
|
||
|
||
except Exception as e:
|
||
self._logger.error(f"读取帧异常: {e}")
|
||
self._connected = False
|
||
|
||
def _reconnect(self) -> bool:
|
||
"""执行重连"""
|
||
if not self._reconnect_backoff.next_attempt():
|
||
self._logger.error(f"重连次数已达上限: {self.camera_id}")
|
||
return False
|
||
|
||
delay = self._reconnect_backoff.get_delay()
|
||
self._logger.warning(
|
||
f"准备重连 {self.camera_id}, 等待 {delay:.1f}秒"
|
||
)
|
||
time.sleep(delay)
|
||
|
||
self._disconnect()
|
||
|
||
if self._connect():
|
||
self._logger.info(f"重连成功: {self.camera_id}")
|
||
return True
|
||
else:
|
||
self._logger.error(f"重连失败: {self.camera_id}")
|
||
return False
|
||
|
||
def set_frame_callback(self, callback: Callable[[VideoFrame], None]):
|
||
"""设置帧处理回调"""
|
||
self._on_frame_callback = callback
|
||
|
||
def start(self):
|
||
"""启动流读取"""
|
||
if self._running:
|
||
return
|
||
|
||
self._stop_event.clear()
|
||
|
||
if not self._connect():
|
||
raise RuntimeError(f"无法连接到视频流: {self.rtsp_url}")
|
||
|
||
self._running = True
|
||
|
||
self._read_thread = threading.Thread(
|
||
target=self._read_frame,
|
||
name=f"StreamReader-{self.camera_id}",
|
||
daemon=True
|
||
)
|
||
self._read_thread.start()
|
||
|
||
self._logger.info(f"视频流已启动: {self.camera_id} @ {self.target_fps}fps")
|
||
|
||
def stop(self):
|
||
"""停止流读取"""
|
||
self._running = False
|
||
self._stop_event.set()
|
||
|
||
if self._read_thread and self._read_thread.is_alive():
|
||
self._read_thread.join(timeout=5)
|
||
|
||
self._disconnect()
|
||
|
||
while not self._frame_buffer.empty():
|
||
try:
|
||
self._frame_buffer.get_nowait()
|
||
except queue.Empty:
|
||
break
|
||
|
||
self._logger.info(f"视频流已停止: {self.camera_id}")
|
||
|
||
def read(self, timeout: float = 1.0) -> Optional[VideoFrame]:
|
||
"""从缓冲区读取帧"""
|
||
try:
|
||
return self._frame_buffer.get(timeout=timeout)
|
||
except queue.Empty:
|
||
return None
|
||
|
||
def get_latest_frame(self, timeout: float = 1.0) -> Optional[VideoFrame]:
|
||
"""获取最新帧(丢弃中间帧)"""
|
||
try:
|
||
while True:
|
||
try:
|
||
frame = self._frame_buffer.get_nowait()
|
||
if self._frame_buffer.empty():
|
||
return frame
|
||
except queue.Empty:
|
||
return None
|
||
except Exception:
|
||
return None
|
||
|
||
def get_frame_batch(self, max_count: int = 8,
|
||
timeout: float = 2.0) -> List[VideoFrame]:
|
||
"""批量获取帧"""
|
||
frames = []
|
||
end_time = time.time() + timeout
|
||
|
||
while len(frames) < max_count and time.time() < end_time:
|
||
try:
|
||
remaining = end_time - time.time()
|
||
if remaining <= 0:
|
||
break
|
||
|
||
frame = self._frame_buffer.get(timeout=remaining)
|
||
frames.append(frame)
|
||
except queue.Empty:
|
||
break
|
||
|
||
return frames
|
||
|
||
def get_statistics(self) -> Dict[str, Any]:
|
||
"""获取统计信息"""
|
||
return {
|
||
"camera_id": self.camera_id,
|
||
"is_connected": self._connected,
|
||
"is_running": self._running,
|
||
"frame_count": self._frame_count,
|
||
"target_fps": self.target_fps,
|
||
"buffer_size": self._frame_buffer.qsize(),
|
||
"buffer_capacity": self._frame_buffer.maxsize,
|
||
"last_frame_time": (
|
||
self._last_frame_time.isoformat()
|
||
if self._last_frame_time else None
|
||
),
|
||
}
|
||
|
||
|
||
class MultiStreamManager:
|
||
"""多流管理器类"""
|
||
|
||
def __init__(self, config: Optional[VideoStreamConfig] = None):
|
||
self._streams: Dict[str, RTSPStreamReader] = {}
|
||
self._config = config
|
||
|
||
self._logger = get_logger("multi_stream")
|
||
self._lock = threading.Lock()
|
||
|
||
def add_stream(
|
||
self,
|
||
camera_id: str,
|
||
rtsp_url: str,
|
||
target_fps: int = 5,
|
||
frame_buffer_size: int = 30,
|
||
on_frame_callback: Optional[Callable[[VideoFrame], None]] = None
|
||
) -> RTSPStreamReader:
|
||
"""添加视频流"""
|
||
with self._lock:
|
||
if camera_id in self._streams:
|
||
raise ValueError(f"摄像头已存在: {camera_id}")
|
||
|
||
stream = RTSPStreamReader(
|
||
camera_id=camera_id,
|
||
rtsp_url=rtsp_url,
|
||
target_fps=target_fps,
|
||
frame_buffer_size=frame_buffer_size,
|
||
on_frame_callback=on_frame_callback,
|
||
config=self._config
|
||
)
|
||
|
||
self._streams[camera_id] = stream
|
||
self._logger.info(f"视频流已添加: {camera_id}")
|
||
|
||
return stream
|
||
|
||
def remove_stream(self, camera_id: str):
|
||
"""移除视频流"""
|
||
with self._lock:
|
||
if camera_id not in self._streams:
|
||
return
|
||
|
||
stream = self._streams[camera_id]
|
||
stream.stop()
|
||
del self._streams[camera_id]
|
||
self._logger.info(f"视频流已移除: {camera_id}")
|
||
|
||
def start_stream(self, camera_id: str):
|
||
"""启动指定视频流"""
|
||
with self._lock:
|
||
if camera_id not in self._streams:
|
||
raise ValueError(f"摄像头不存在: {camera_id}")
|
||
self._streams[camera_id].start()
|
||
|
||
def start_all(self):
|
||
"""启动所有视频流"""
|
||
with self._lock:
|
||
for stream in self._streams.values():
|
||
stream.start()
|
||
self._logger.info(f"已启动 {len(self._streams)} 个视频流")
|
||
|
||
def stop_stream(self, camera_id: str):
|
||
"""停止指定视频流"""
|
||
with self._lock:
|
||
if camera_id in self._streams:
|
||
self._streams[camera_id].stop()
|
||
|
||
def stop_all(self):
|
||
"""停止所有视频流"""
|
||
with self._lock:
|
||
for stream in self._streams.values():
|
||
stream.stop()
|
||
self._logger.info("已停止所有视频流")
|
||
|
||
def get_stream(self, camera_id: str) -> Optional[RTSPStreamReader]:
|
||
"""获取指定视频流"""
|
||
return self._streams.get(camera_id)
|
||
|
||
def get_all_streams(self) -> List[RTSPStreamReader]:
|
||
"""获取所有视频流"""
|
||
with self._lock:
|
||
return list(self._streams.values())
|
||
|
||
def get_all_status(self) -> List[StreamStatus]:
|
||
"""获取所有流状态"""
|
||
with self._lock:
|
||
return [stream.status for stream in self._streams.values()]
|
||
|
||
def get_statistics(self) -> Dict[str, Any]:
|
||
"""获取管理器统计信息"""
|
||
with self._lock:
|
||
return {
|
||
"total_streams": len(self._streams),
|
||
"running_streams": sum(
|
||
1 for s in self._streams.values() if s.is_running
|
||
),
|
||
"connected_streams": sum(
|
||
1 for s in self._streams.values() if s.is_connected
|
||
),
|
||
"streams": {
|
||
cid: s.get_statistics()
|
||
for cid, s in self._streams.items()
|
||
}
|
||
}
|
||
|
||
def close(self):
|
||
"""关闭管理器"""
|
||
self.stop_all()
|
||
with self._lock:
|
||
self._streams.clear()
|
||
|
||
|
||
def create_stream_reader(
|
||
camera_id: str,
|
||
rtsp_url: str,
|
||
target_fps: int = 5,
|
||
frame_buffer_size: int = 30,
|
||
on_frame_callback: Optional[Callable[[VideoFrame], None]] = None
|
||
) -> RTSPStreamReader:
|
||
"""创建视频流读取器的便捷函数"""
|
||
return RTSPStreamReader(
|
||
camera_id=camera_id,
|
||
rtsp_url=rtsp_url,
|
||
target_fps=target_fps,
|
||
frame_buffer_size=frame_buffer_size,
|
||
on_frame_callback=on_frame_callback
|
||
)
|