Files
Test_AI/rtsp_fps_detector.py
2026-01-20 10:54:30 +08:00

258 lines
8.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
RTSP 视频流帧率检测工具
用法:
python rtsp_fps_detector.py rtsp://your_stream_url
python rtsp_fps_detector.py rtsp://admin:password@192.168.1.100:554/stream1
"""
import cv2
import time
import sys
import argparse
from collections import deque
import threading
class RTSPFPSDetector:
"""RTSP 视频流帧率检测器"""
def __init__(self, rtsp_url: str, detection_time: int = 10):
self.rtsp_url = rtsp_url
self.detection_time = detection_time
self.frame_times = deque()
self.frame_count = 0
self.start_time = None
self.cap = None
self.running = False
def connect_stream(self) -> bool:
"""连接 RTSP 流"""
print(f"🔗 正在连接 RTSP 流: {self.rtsp_url}")
# 设置 OpenCV 参数以优化 RTSP 连接
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
# 设置缓冲区大小
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# 尝试连接
if not self.cap.isOpened():
print("❌ 无法连接到 RTSP 流")
return False
# 读取第一帧测试连接
ret, frame = self.cap.read()
if not ret:
print("❌ 无法读取视频帧")
return False
# 获取视频流信息
width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
codec = int(self.cap.get(cv2.CAP_PROP_FOURCC))
print(f"✅ 连接成功!")
print(f"📺 分辨率: {width}x{height}")
print(f"🎬 编码格式: {self._fourcc_to_string(codec)}")
# 尝试获取声明的帧率(可能不准确)
declared_fps = self.cap.get(cv2.CAP_PROP_FPS)
if declared_fps > 0:
print(f"📋 声明帧率: {declared_fps:.2f} FPS")
else:
print("📋 声明帧率: 未知")
return True
def _fourcc_to_string(self, fourcc: int) -> str:
"""将 FOURCC 代码转换为字符串"""
try:
return "".join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)])
except:
return f"Code_{fourcc}"
def detect_fps(self) -> dict:
"""检测实际帧率"""
if not self.connect_stream():
return None
print(f"\n🎯 开始检测实际帧率 (持续 {self.detection_time} 秒)...")
print("按 Ctrl+C 可提前停止检测")
self.frame_times.clear()
self.frame_count = 0
self.start_time = time.time()
self.running = True
# 启动显示线程
display_thread = threading.Thread(target=self._display_progress)
display_thread.daemon = True
display_thread.start()
try:
while self.running:
ret, frame = self.cap.read()
if not ret:
print("\n⚠️ 读取帧失败,可能是流中断")
break
current_time = time.time()
self.frame_times.append(current_time)
self.frame_count += 1
# 检测时间到达
if current_time - self.start_time >= self.detection_time:
break
# 保持最近的帧时间记录(用于计算瞬时帧率)
while len(self.frame_times) > 100:
self.frame_times.popleft()
except KeyboardInterrupt:
print("\n⏹️ 用户中断检测")
finally:
self.running = False
self.cap.release()
return self._calculate_results()
def _display_progress(self):
"""显示检测进度"""
while self.running:
if self.start_time and self.frame_count > 0:
elapsed = time.time() - self.start_time
current_fps = self.frame_count / elapsed if elapsed > 0 else 0
progress = min(elapsed / self.detection_time * 100, 100)
print(f"\r⏱️ 进度: {progress:.1f}% | 帧数: {self.frame_count} | 当前FPS: {current_fps:.2f}", end="", flush=True)
time.sleep(0.5)
def _calculate_results(self) -> dict:
"""计算检测结果"""
if self.frame_count < 2:
return {
"success": False,
"error": "检测到的帧数太少"
}
total_time = time.time() - self.start_time
average_fps = self.frame_count / total_time
# 计算帧间隔统计
intervals = []
if len(self.frame_times) > 1:
for i in range(1, len(self.frame_times)):
interval = self.frame_times[i] - self.frame_times[i-1]
if interval > 0: # 避免除零
intervals.append(1.0 / interval)
# 统计分析
if intervals:
min_fps = min(intervals)
max_fps = max(intervals)
# 计算稳定性(标准差)
mean_fps = sum(intervals) / len(intervals)
variance = sum((x - mean_fps) ** 2 for x in intervals) / len(intervals)
std_dev = variance ** 0.5
stability = max(0, 100 - (std_dev / mean_fps * 100)) if mean_fps > 0 else 0
else:
min_fps = max_fps = mean_fps = std_dev = stability = 0
return {
"success": True,
"total_frames": self.frame_count,
"detection_time": total_time,
"average_fps": average_fps,
"instantaneous_fps": {
"min": min_fps,
"max": max_fps,
"mean": mean_fps,
"std_dev": std_dev
},
"stability_score": stability
}
def print_results(self, results: dict):
"""打印检测结果"""
if not results or not results.get("success"):
print(f"\n❌ 检测失败: {results.get('error', '未知错误')}")
return
print(f"\n" + "="*60)
print("📊 RTSP 视频流帧率检测结果")
print("="*60)
print(f"🎬 视频流: {self.rtsp_url}")
print(f"⏱️ 检测时长: {results['detection_time']:.2f}")
print(f"🎞️ 总帧数: {results['total_frames']}")
print()
print("📈 帧率分析:")
print(f" 平均帧率: {results['average_fps']:.2f} FPS")
inst = results['instantaneous_fps']
print(f" 瞬时帧率范围: {inst['min']:.2f} - {inst['max']:.2f} FPS")
print(f" 瞬时帧率均值: {inst['mean']:.2f} FPS")
print(f" 帧率标准差: {inst['std_dev']:.2f}")
print(f" 稳定性评分: {results['stability_score']:.1f}/100")
print()
# 帧率评估
avg_fps = results['average_fps']
if avg_fps >= 25:
quality = "优秀 ✅"
elif avg_fps >= 15:
quality = "良好 🟡"
elif avg_fps >= 10:
quality = "一般 🟠"
else:
quality = "较差 ❌"
print(f"🎯 帧率质量: {quality}")
# 稳定性评估
stability = results['stability_score']
if stability >= 90:
stability_desc = "非常稳定 ✅"
elif stability >= 70:
stability_desc = "较稳定 🟡"
elif stability >= 50:
stability_desc = "一般 🟠"
else:
stability_desc = "不稳定 ❌"
print(f"📊 稳定性: {stability_desc}")
print("="*60)
def main():
parser = argparse.ArgumentParser(description="RTSP 视频流帧率检测工具")
parser.add_argument("rtsp_url", help="RTSP 流地址")
parser.add_argument("-t", "--time", type=int, default=10,
help="检测时长默认10秒")
args = parser.parse_args()
print("🎥 RTSP 视频流帧率检测工具")
print("="*60)
detector = RTSPFPSDetector(args.rtsp_url, args.time)
results = detector.detect_fps()
detector.print_results(results)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法: python rtsp_fps_detector.py <rtsp_url> [检测时长]")
print()
print("示例:")
print(" python rtsp_fps_detector.py rtsp://192.168.1.100:554/stream1")
print(" python rtsp_fps_detector.py rtsp://admin:password@192.168.1.100:554/stream1 -t 15")
sys.exit(1)
main()