""" RTSP 视频流帧率检测工具 用法: python rtsp_fps_detector.py rtsp://your_stream_url python rtsp_fps_detector.py rtsp://admin:password@192.168.1.100:554/stream1 """ import cv2 import time import sys import argparse from collections import deque import threading class RTSPFPSDetector: """RTSP 视频流帧率检测器""" def __init__(self, rtsp_url: str, detection_time: int = 10): self.rtsp_url = rtsp_url self.detection_time = detection_time self.frame_times = deque() self.frame_count = 0 self.start_time = None self.cap = None self.running = False def connect_stream(self) -> bool: """连接 RTSP 流""" print(f"🔗 正在连接 RTSP 流: {self.rtsp_url}") # 设置 OpenCV 参数以优化 RTSP 连接 self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) # 设置缓冲区大小 self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # 尝试连接 if not self.cap.isOpened(): print("❌ 无法连接到 RTSP 流") return False # 读取第一帧测试连接 ret, frame = self.cap.read() if not ret: print("❌ 无法读取视频帧") return False # 获取视频流信息 width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) codec = int(self.cap.get(cv2.CAP_PROP_FOURCC)) print(f"✅ 连接成功!") print(f"📺 分辨率: {width}x{height}") print(f"🎬 编码格式: {self._fourcc_to_string(codec)}") # 尝试获取声明的帧率(可能不准确) declared_fps = self.cap.get(cv2.CAP_PROP_FPS) if declared_fps > 0: print(f"📋 声明帧率: {declared_fps:.2f} FPS") else: print("📋 声明帧率: 未知") return True def _fourcc_to_string(self, fourcc: int) -> str: """将 FOURCC 代码转换为字符串""" try: return "".join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)]) except: return f"Code_{fourcc}" def detect_fps(self) -> dict: """检测实际帧率""" if not self.connect_stream(): return None print(f"\n🎯 开始检测实际帧率 (持续 {self.detection_time} 秒)...") print("按 Ctrl+C 可提前停止检测") self.frame_times.clear() self.frame_count = 0 self.start_time = time.time() self.running = True # 启动显示线程 display_thread = threading.Thread(target=self._display_progress) display_thread.daemon = True display_thread.start() try: while self.running: ret, frame = self.cap.read() if not ret: print("\n⚠️ 读取帧失败,可能是流中断") break current_time = time.time() self.frame_times.append(current_time) self.frame_count += 1 # 检测时间到达 if current_time - self.start_time >= self.detection_time: break # 保持最近的帧时间记录(用于计算瞬时帧率) while len(self.frame_times) > 100: self.frame_times.popleft() except KeyboardInterrupt: print("\n⏹️ 用户中断检测") finally: self.running = False self.cap.release() return self._calculate_results() def _display_progress(self): """显示检测进度""" while self.running: if self.start_time and self.frame_count > 0: elapsed = time.time() - self.start_time current_fps = self.frame_count / elapsed if elapsed > 0 else 0 progress = min(elapsed / self.detection_time * 100, 100) print(f"\r⏱️ 进度: {progress:.1f}% | 帧数: {self.frame_count} | 当前FPS: {current_fps:.2f}", end="", flush=True) time.sleep(0.5) def _calculate_results(self) -> dict: """计算检测结果""" if self.frame_count < 2: return { "success": False, "error": "检测到的帧数太少" } total_time = time.time() - self.start_time average_fps = self.frame_count / total_time # 计算帧间隔统计 intervals = [] if len(self.frame_times) > 1: for i in range(1, len(self.frame_times)): interval = self.frame_times[i] - self.frame_times[i-1] if interval > 0: # 避免除零 intervals.append(1.0 / interval) # 统计分析 if intervals: min_fps = min(intervals) max_fps = max(intervals) # 计算稳定性(标准差) mean_fps = sum(intervals) / len(intervals) variance = sum((x - mean_fps) ** 2 for x in intervals) / len(intervals) std_dev = variance ** 0.5 stability = max(0, 100 - (std_dev / mean_fps * 100)) if mean_fps > 0 else 0 else: min_fps = max_fps = mean_fps = std_dev = stability = 0 return { "success": True, "total_frames": self.frame_count, "detection_time": total_time, "average_fps": average_fps, "instantaneous_fps": { "min": min_fps, "max": max_fps, "mean": mean_fps, "std_dev": std_dev }, "stability_score": stability } def print_results(self, results: dict): """打印检测结果""" if not results or not results.get("success"): print(f"\n❌ 检测失败: {results.get('error', '未知错误')}") return print(f"\n" + "="*60) print("📊 RTSP 视频流帧率检测结果") print("="*60) print(f"🎬 视频流: {self.rtsp_url}") print(f"⏱️ 检测时长: {results['detection_time']:.2f} 秒") print(f"🎞️ 总帧数: {results['total_frames']} 帧") print() print("📈 帧率分析:") print(f" 平均帧率: {results['average_fps']:.2f} FPS") inst = results['instantaneous_fps'] print(f" 瞬时帧率范围: {inst['min']:.2f} - {inst['max']:.2f} FPS") print(f" 瞬时帧率均值: {inst['mean']:.2f} FPS") print(f" 帧率标准差: {inst['std_dev']:.2f}") print(f" 稳定性评分: {results['stability_score']:.1f}/100") print() # 帧率评估 avg_fps = results['average_fps'] if avg_fps >= 25: quality = "优秀 ✅" elif avg_fps >= 15: quality = "良好 🟡" elif avg_fps >= 10: quality = "一般 🟠" else: quality = "较差 ❌" print(f"🎯 帧率质量: {quality}") # 稳定性评估 stability = results['stability_score'] if stability >= 90: stability_desc = "非常稳定 ✅" elif stability >= 70: stability_desc = "较稳定 🟡" elif stability >= 50: stability_desc = "一般 🟠" else: stability_desc = "不稳定 ❌" print(f"📊 稳定性: {stability_desc}") print("="*60) def main(): parser = argparse.ArgumentParser(description="RTSP 视频流帧率检测工具") parser.add_argument("rtsp_url", help="RTSP 流地址") parser.add_argument("-t", "--time", type=int, default=10, help="检测时长(秒),默认10秒") args = parser.parse_args() print("🎥 RTSP 视频流帧率检测工具") print("="*60) detector = RTSPFPSDetector(args.rtsp_url, args.time) results = detector.detect_fps() detector.print_results(results) if __name__ == "__main__": if len(sys.argv) < 2: print("用法: python rtsp_fps_detector.py [检测时长]") print() print("示例:") print(" python rtsp_fps_detector.py rtsp://192.168.1.100:554/stream1") print(" python rtsp_fps_detector.py rtsp://admin:password@192.168.1.100:554/stream1 -t 15") sys.exit(1) main()