258 lines
8.6 KiB
Python
258 lines
8.6 KiB
Python
"""
|
||
RTSP 视频流帧率检测工具
|
||
|
||
用法:
|
||
python rtsp_fps_detector.py rtsp://your_stream_url
|
||
python rtsp_fps_detector.py rtsp://admin:password@192.168.1.100:554/stream1
|
||
"""
|
||
|
||
import cv2
|
||
import time
|
||
import sys
|
||
import argparse
|
||
from collections import deque
|
||
import threading
|
||
|
||
|
||
class RTSPFPSDetector:
|
||
"""RTSP 视频流帧率检测器"""
|
||
|
||
def __init__(self, rtsp_url: str, detection_time: int = 10):
|
||
self.rtsp_url = rtsp_url
|
||
self.detection_time = detection_time
|
||
self.frame_times = deque()
|
||
self.frame_count = 0
|
||
self.start_time = None
|
||
self.cap = None
|
||
self.running = False
|
||
|
||
def connect_stream(self) -> bool:
|
||
"""连接 RTSP 流"""
|
||
print(f"🔗 正在连接 RTSP 流: {self.rtsp_url}")
|
||
|
||
# 设置 OpenCV 参数以优化 RTSP 连接
|
||
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
|
||
|
||
# 设置缓冲区大小
|
||
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
||
|
||
# 尝试连接
|
||
if not self.cap.isOpened():
|
||
print("❌ 无法连接到 RTSP 流")
|
||
return False
|
||
|
||
# 读取第一帧测试连接
|
||
ret, frame = self.cap.read()
|
||
if not ret:
|
||
print("❌ 无法读取视频帧")
|
||
return False
|
||
|
||
# 获取视频流信息
|
||
width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||
height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||
codec = int(self.cap.get(cv2.CAP_PROP_FOURCC))
|
||
|
||
print(f"✅ 连接成功!")
|
||
print(f"📺 分辨率: {width}x{height}")
|
||
print(f"🎬 编码格式: {self._fourcc_to_string(codec)}")
|
||
|
||
# 尝试获取声明的帧率(可能不准确)
|
||
declared_fps = self.cap.get(cv2.CAP_PROP_FPS)
|
||
if declared_fps > 0:
|
||
print(f"📋 声明帧率: {declared_fps:.2f} FPS")
|
||
else:
|
||
print("📋 声明帧率: 未知")
|
||
|
||
return True
|
||
|
||
def _fourcc_to_string(self, fourcc: int) -> str:
|
||
"""将 FOURCC 代码转换为字符串"""
|
||
try:
|
||
return "".join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)])
|
||
except:
|
||
return f"Code_{fourcc}"
|
||
|
||
def detect_fps(self) -> dict:
|
||
"""检测实际帧率"""
|
||
if not self.connect_stream():
|
||
return None
|
||
|
||
print(f"\n🎯 开始检测实际帧率 (持续 {self.detection_time} 秒)...")
|
||
print("按 Ctrl+C 可提前停止检测")
|
||
|
||
self.frame_times.clear()
|
||
self.frame_count = 0
|
||
self.start_time = time.time()
|
||
self.running = True
|
||
|
||
# 启动显示线程
|
||
display_thread = threading.Thread(target=self._display_progress)
|
||
display_thread.daemon = True
|
||
display_thread.start()
|
||
|
||
try:
|
||
while self.running:
|
||
ret, frame = self.cap.read()
|
||
if not ret:
|
||
print("\n⚠️ 读取帧失败,可能是流中断")
|
||
break
|
||
|
||
current_time = time.time()
|
||
self.frame_times.append(current_time)
|
||
self.frame_count += 1
|
||
|
||
# 检测时间到达
|
||
if current_time - self.start_time >= self.detection_time:
|
||
break
|
||
|
||
# 保持最近的帧时间记录(用于计算瞬时帧率)
|
||
while len(self.frame_times) > 100:
|
||
self.frame_times.popleft()
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n⏹️ 用户中断检测")
|
||
|
||
finally:
|
||
self.running = False
|
||
self.cap.release()
|
||
|
||
return self._calculate_results()
|
||
|
||
def _display_progress(self):
|
||
"""显示检测进度"""
|
||
while self.running:
|
||
if self.start_time and self.frame_count > 0:
|
||
elapsed = time.time() - self.start_time
|
||
current_fps = self.frame_count / elapsed if elapsed > 0 else 0
|
||
progress = min(elapsed / self.detection_time * 100, 100)
|
||
|
||
print(f"\r⏱️ 进度: {progress:.1f}% | 帧数: {self.frame_count} | 当前FPS: {current_fps:.2f}", end="", flush=True)
|
||
|
||
time.sleep(0.5)
|
||
|
||
def _calculate_results(self) -> dict:
|
||
"""计算检测结果"""
|
||
if self.frame_count < 2:
|
||
return {
|
||
"success": False,
|
||
"error": "检测到的帧数太少"
|
||
}
|
||
|
||
total_time = time.time() - self.start_time
|
||
average_fps = self.frame_count / total_time
|
||
|
||
# 计算帧间隔统计
|
||
intervals = []
|
||
if len(self.frame_times) > 1:
|
||
for i in range(1, len(self.frame_times)):
|
||
interval = self.frame_times[i] - self.frame_times[i-1]
|
||
if interval > 0: # 避免除零
|
||
intervals.append(1.0 / interval)
|
||
|
||
# 统计分析
|
||
if intervals:
|
||
min_fps = min(intervals)
|
||
max_fps = max(intervals)
|
||
|
||
# 计算稳定性(标准差)
|
||
mean_fps = sum(intervals) / len(intervals)
|
||
variance = sum((x - mean_fps) ** 2 for x in intervals) / len(intervals)
|
||
std_dev = variance ** 0.5
|
||
stability = max(0, 100 - (std_dev / mean_fps * 100)) if mean_fps > 0 else 0
|
||
else:
|
||
min_fps = max_fps = mean_fps = std_dev = stability = 0
|
||
|
||
return {
|
||
"success": True,
|
||
"total_frames": self.frame_count,
|
||
"detection_time": total_time,
|
||
"average_fps": average_fps,
|
||
"instantaneous_fps": {
|
||
"min": min_fps,
|
||
"max": max_fps,
|
||
"mean": mean_fps,
|
||
"std_dev": std_dev
|
||
},
|
||
"stability_score": stability
|
||
}
|
||
|
||
def print_results(self, results: dict):
|
||
"""打印检测结果"""
|
||
if not results or not results.get("success"):
|
||
print(f"\n❌ 检测失败: {results.get('error', '未知错误')}")
|
||
return
|
||
|
||
print(f"\n" + "="*60)
|
||
print("📊 RTSP 视频流帧率检测结果")
|
||
print("="*60)
|
||
|
||
print(f"🎬 视频流: {self.rtsp_url}")
|
||
print(f"⏱️ 检测时长: {results['detection_time']:.2f} 秒")
|
||
print(f"🎞️ 总帧数: {results['total_frames']} 帧")
|
||
print()
|
||
|
||
print("📈 帧率分析:")
|
||
print(f" 平均帧率: {results['average_fps']:.2f} FPS")
|
||
|
||
inst = results['instantaneous_fps']
|
||
print(f" 瞬时帧率范围: {inst['min']:.2f} - {inst['max']:.2f} FPS")
|
||
print(f" 瞬时帧率均值: {inst['mean']:.2f} FPS")
|
||
print(f" 帧率标准差: {inst['std_dev']:.2f}")
|
||
print(f" 稳定性评分: {results['stability_score']:.1f}/100")
|
||
print()
|
||
|
||
# 帧率评估
|
||
avg_fps = results['average_fps']
|
||
if avg_fps >= 25:
|
||
quality = "优秀 ✅"
|
||
elif avg_fps >= 15:
|
||
quality = "良好 🟡"
|
||
elif avg_fps >= 10:
|
||
quality = "一般 🟠"
|
||
else:
|
||
quality = "较差 ❌"
|
||
|
||
print(f"🎯 帧率质量: {quality}")
|
||
|
||
# 稳定性评估
|
||
stability = results['stability_score']
|
||
if stability >= 90:
|
||
stability_desc = "非常稳定 ✅"
|
||
elif stability >= 70:
|
||
stability_desc = "较稳定 🟡"
|
||
elif stability >= 50:
|
||
stability_desc = "一般 🟠"
|
||
else:
|
||
stability_desc = "不稳定 ❌"
|
||
|
||
print(f"📊 稳定性: {stability_desc}")
|
||
|
||
print("="*60)
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="RTSP 视频流帧率检测工具")
|
||
parser.add_argument("rtsp_url", help="RTSP 流地址")
|
||
parser.add_argument("-t", "--time", type=int, default=10,
|
||
help="检测时长(秒),默认10秒")
|
||
|
||
args = parser.parse_args()
|
||
|
||
print("🎥 RTSP 视频流帧率检测工具")
|
||
print("="*60)
|
||
|
||
detector = RTSPFPSDetector(args.rtsp_url, args.time)
|
||
results = detector.detect_fps()
|
||
detector.print_results(results)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
if len(sys.argv) < 2:
|
||
print("用法: python rtsp_fps_detector.py <rtsp_url> [检测时长]")
|
||
print()
|
||
print("示例:")
|
||
print(" python rtsp_fps_detector.py rtsp://192.168.1.100:554/stream1")
|
||
print(" python rtsp_fps_detector.py rtsp://admin:password@192.168.1.100:554/stream1 -t 15")
|
||
sys.exit(1)
|
||
|
||
main() |