Files
Test_AI/rtsp_fps_detector.py

258 lines
8.6 KiB
Python
Raw Normal View History

2026-01-20 10:54:30 +08:00
"""
RTSP 视频流帧率检测工具
用法:
python rtsp_fps_detector.py rtsp://your_stream_url
python rtsp_fps_detector.py rtsp://admin:password@192.168.1.100:554/stream1
"""
import cv2
import time
import sys
import argparse
from collections import deque
import threading
class RTSPFPSDetector:
"""RTSP 视频流帧率检测器"""
def __init__(self, rtsp_url: str, detection_time: int = 10):
self.rtsp_url = rtsp_url
self.detection_time = detection_time
self.frame_times = deque()
self.frame_count = 0
self.start_time = None
self.cap = None
self.running = False
def connect_stream(self) -> bool:
"""连接 RTSP 流"""
print(f"🔗 正在连接 RTSP 流: {self.rtsp_url}")
# 设置 OpenCV 参数以优化 RTSP 连接
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
# 设置缓冲区大小
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# 尝试连接
if not self.cap.isOpened():
print("❌ 无法连接到 RTSP 流")
return False
# 读取第一帧测试连接
ret, frame = self.cap.read()
if not ret:
print("❌ 无法读取视频帧")
return False
# 获取视频流信息
width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
codec = int(self.cap.get(cv2.CAP_PROP_FOURCC))
print(f"✅ 连接成功!")
print(f"📺 分辨率: {width}x{height}")
print(f"🎬 编码格式: {self._fourcc_to_string(codec)}")
# 尝试获取声明的帧率(可能不准确)
declared_fps = self.cap.get(cv2.CAP_PROP_FPS)
if declared_fps > 0:
print(f"📋 声明帧率: {declared_fps:.2f} FPS")
else:
print("📋 声明帧率: 未知")
return True
def _fourcc_to_string(self, fourcc: int) -> str:
"""将 FOURCC 代码转换为字符串"""
try:
return "".join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)])
except:
return f"Code_{fourcc}"
def detect_fps(self) -> dict:
"""检测实际帧率"""
if not self.connect_stream():
return None
print(f"\n🎯 开始检测实际帧率 (持续 {self.detection_time} 秒)...")
print("按 Ctrl+C 可提前停止检测")
self.frame_times.clear()
self.frame_count = 0
self.start_time = time.time()
self.running = True
# 启动显示线程
display_thread = threading.Thread(target=self._display_progress)
display_thread.daemon = True
display_thread.start()
try:
while self.running:
ret, frame = self.cap.read()
if not ret:
print("\n⚠️ 读取帧失败,可能是流中断")
break
current_time = time.time()
self.frame_times.append(current_time)
self.frame_count += 1
# 检测时间到达
if current_time - self.start_time >= self.detection_time:
break
# 保持最近的帧时间记录(用于计算瞬时帧率)
while len(self.frame_times) > 100:
self.frame_times.popleft()
except KeyboardInterrupt:
print("\n⏹️ 用户中断检测")
finally:
self.running = False
self.cap.release()
return self._calculate_results()
def _display_progress(self):
"""显示检测进度"""
while self.running:
if self.start_time and self.frame_count > 0:
elapsed = time.time() - self.start_time
current_fps = self.frame_count / elapsed if elapsed > 0 else 0
progress = min(elapsed / self.detection_time * 100, 100)
print(f"\r⏱️ 进度: {progress:.1f}% | 帧数: {self.frame_count} | 当前FPS: {current_fps:.2f}", end="", flush=True)
time.sleep(0.5)
def _calculate_results(self) -> dict:
"""计算检测结果"""
if self.frame_count < 2:
return {
"success": False,
"error": "检测到的帧数太少"
}
total_time = time.time() - self.start_time
average_fps = self.frame_count / total_time
# 计算帧间隔统计
intervals = []
if len(self.frame_times) > 1:
for i in range(1, len(self.frame_times)):
interval = self.frame_times[i] - self.frame_times[i-1]
if interval > 0: # 避免除零
intervals.append(1.0 / interval)
# 统计分析
if intervals:
min_fps = min(intervals)
max_fps = max(intervals)
# 计算稳定性(标准差)
mean_fps = sum(intervals) / len(intervals)
variance = sum((x - mean_fps) ** 2 for x in intervals) / len(intervals)
std_dev = variance ** 0.5
stability = max(0, 100 - (std_dev / mean_fps * 100)) if mean_fps > 0 else 0
else:
min_fps = max_fps = mean_fps = std_dev = stability = 0
return {
"success": True,
"total_frames": self.frame_count,
"detection_time": total_time,
"average_fps": average_fps,
"instantaneous_fps": {
"min": min_fps,
"max": max_fps,
"mean": mean_fps,
"std_dev": std_dev
},
"stability_score": stability
}
def print_results(self, results: dict):
"""打印检测结果"""
if not results or not results.get("success"):
print(f"\n❌ 检测失败: {results.get('error', '未知错误')}")
return
print(f"\n" + "="*60)
print("📊 RTSP 视频流帧率检测结果")
print("="*60)
print(f"🎬 视频流: {self.rtsp_url}")
print(f"⏱️ 检测时长: {results['detection_time']:.2f}")
print(f"🎞️ 总帧数: {results['total_frames']}")
print()
print("📈 帧率分析:")
print(f" 平均帧率: {results['average_fps']:.2f} FPS")
inst = results['instantaneous_fps']
print(f" 瞬时帧率范围: {inst['min']:.2f} - {inst['max']:.2f} FPS")
print(f" 瞬时帧率均值: {inst['mean']:.2f} FPS")
print(f" 帧率标准差: {inst['std_dev']:.2f}")
print(f" 稳定性评分: {results['stability_score']:.1f}/100")
print()
# 帧率评估
avg_fps = results['average_fps']
if avg_fps >= 25:
quality = "优秀 ✅"
elif avg_fps >= 15:
quality = "良好 🟡"
elif avg_fps >= 10:
quality = "一般 🟠"
else:
quality = "较差 ❌"
print(f"🎯 帧率质量: {quality}")
# 稳定性评估
stability = results['stability_score']
if stability >= 90:
stability_desc = "非常稳定 ✅"
elif stability >= 70:
stability_desc = "较稳定 🟡"
elif stability >= 50:
stability_desc = "一般 🟠"
else:
stability_desc = "不稳定 ❌"
print(f"📊 稳定性: {stability_desc}")
print("="*60)
def main():
parser = argparse.ArgumentParser(description="RTSP 视频流帧率检测工具")
parser.add_argument("rtsp_url", help="RTSP 流地址")
parser.add_argument("-t", "--time", type=int, default=10,
help="检测时长默认10秒")
args = parser.parse_args()
print("🎥 RTSP 视频流帧率检测工具")
print("="*60)
detector = RTSPFPSDetector(args.rtsp_url, args.time)
results = detector.detect_fps()
detector.print_results(results)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法: python rtsp_fps_detector.py <rtsp_url> [检测时长]")
print()
print("示例:")
print(" python rtsp_fps_detector.py rtsp://192.168.1.100:554/stream1")
print(" python rtsp_fps_detector.py rtsp://admin:password@192.168.1.100:554/stream1 -t 15")
sys.exit(1)
main()