Files
Test_AI/benchmark/metrics_collector.py
2026-01-20 10:54:30 +08:00

245 lines
7.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
指标采集模块
"""
import time
import threading
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
from .utils import setup_logging, calculate_statistics
logger = setup_logging()
@dataclass
class GPUMetrics:
"""GPU 指标"""
gpu_utilization: float = 0.0
memory_used_mb: float = 0.0
memory_total_mb: float = 0.0
memory_utilization: float = 0.0
temperature: float = 0.0
power_draw_w: float = 0.0
sm_clock_mhz: float = 0.0
memory_clock_mhz: float = 0.0
@dataclass
class PerformanceMetrics:
"""性能指标"""
total_frames: int = 0
total_batches: int = 0
dropped_frames: int = 0
latencies_ms: List[float] = field(default_factory=list)
throughput_fps: float = 0.0
def get_latency_stats(self) -> Dict[str, float]:
return calculate_statistics(self.latencies_ms)
class GPUMonitor:
"""GPU 监控器"""
def __init__(self, device_id: int = 0, sample_interval_ms: int = 100):
self.device_id = device_id
self.sample_interval_ms = sample_interval_ms
self._running = False
self._thread: Optional[threading.Thread] = None
self._samples: List[GPUMetrics] = []
self._lock = threading.Lock()
self._init_nvml()
def _init_nvml(self):
self._nvml_available = False
try:
import pynvml
pynvml.nvmlInit()
self._nvml = pynvml
self._handle = pynvml.nvmlDeviceGetHandleByIndex(self.device_id)
name = pynvml.nvmlDeviceGetName(self._handle)
if isinstance(name, bytes):
name = name.decode("utf-8")
memory_info = pynvml.nvmlDeviceGetMemoryInfo(self._handle)
total_memory_mb = memory_info.total / (1024 * 1024)
logger.info(f"GPU 监控初始化: {name}, 显存: {total_memory_mb:.0f} MB")
self._nvml_available = True
self._total_memory_mb = total_memory_mb
except ImportError:
logger.warning("pynvml 未安装GPU 监控不可用")
except Exception as e:
logger.warning(f"NVML 初始化失败: {e}")
def _sample(self) -> GPUMetrics:
if not self._nvml_available:
return GPUMetrics()
try:
util = self._nvml.nvmlDeviceGetUtilizationRates(self._handle)
gpu_util = util.gpu
mem_info = self._nvml.nvmlDeviceGetMemoryInfo(self._handle)
mem_used_mb = mem_info.used / (1024 * 1024)
mem_total_mb = mem_info.total / (1024 * 1024)
mem_util = (mem_info.used / mem_info.total) * 100
try:
temp = self._nvml.nvmlDeviceGetTemperature(self._handle, self._nvml.NVML_TEMPERATURE_GPU)
except:
temp = 0.0
try:
power = self._nvml.nvmlDeviceGetPowerUsage(self._handle) / 1000
except:
power = 0.0
return GPUMetrics(
gpu_utilization=gpu_util,
memory_used_mb=mem_used_mb,
memory_total_mb=mem_total_mb,
memory_utilization=mem_util,
temperature=temp,
power_draw_w=power,
)
except Exception as e:
logger.warning(f"GPU 指标采集失败: {e}")
return GPUMetrics()
def _monitor_loop(self):
interval_sec = self.sample_interval_ms / 1000
while self._running:
metrics = self._sample()
with self._lock:
self._samples.append(metrics)
time.sleep(interval_sec)
def start(self):
if self._running:
return
self._running = True
self._samples.clear()
self._thread = threading.Thread(target=self._monitor_loop, daemon=True)
self._thread.start()
logger.info("GPU 监控已启动")
def stop(self):
self._running = False
if self._thread:
self._thread.join(timeout=2.0)
logger.info("GPU 监控已停止")
def get_samples(self) -> List[GPUMetrics]:
with self._lock:
return list(self._samples)
def get_statistics(self) -> Dict[str, Any]:
samples = self.get_samples()
if not samples:
return {}
gpu_utils = [s.gpu_utilization for s in samples]
mem_utils = [s.memory_utilization for s in samples]
mem_used = [s.memory_used_mb for s in samples]
return {
"gpu_utilization": calculate_statistics(gpu_utils),
"memory_utilization": calculate_statistics(mem_utils),
"memory_used_mb": calculate_statistics(mem_used),
"sample_count": len(samples),
}
def clear(self):
with self._lock:
self._samples.clear()
class MetricsCollector:
"""性能指标采集器"""
def __init__(self, device_id: int = 0, sample_interval_ms: int = 100):
self.device_id = device_id
self.sample_interval_ms = sample_interval_ms
self.gpu_monitor = GPUMonitor(device_id, sample_interval_ms)
self._perf_metrics = PerformanceMetrics()
self._lock = threading.Lock()
self._start_time: Optional[float] = None
self._end_time: Optional[float] = None
def start(self):
self._start_time = time.time()
self._perf_metrics = PerformanceMetrics()
self.gpu_monitor.start()
def stop(self):
self._end_time = time.time()
self.gpu_monitor.stop()
def record_inference(self, latency_ms: float, batch_size: int):
with self._lock:
self._perf_metrics.latencies_ms.append(latency_ms)
self._perf_metrics.total_batches += 1
self._perf_metrics.total_frames += batch_size
def record_frame_drop(self, count: int = 1):
with self._lock:
self._perf_metrics.dropped_frames += count
def get_gpu_metrics(self) -> Dict[str, Any]:
return self.gpu_monitor.get_statistics()
def get_throughput_metrics(self) -> Dict[str, Any]:
with self._lock:
perf = self._perf_metrics
if self._start_time and self._end_time:
duration = self._end_time - self._start_time
elif self._start_time:
duration = time.time() - self._start_time
else:
duration = 0
throughput_fps = perf.total_frames / duration if duration > 0 else 0
total_expected = perf.total_frames + perf.dropped_frames
drop_rate = perf.dropped_frames / total_expected * 100 if total_expected > 0 else 0
latency_stats = perf.get_latency_stats()
return {
"total_frames": perf.total_frames,
"total_batches": perf.total_batches,
"dropped_frames": perf.dropped_frames,
"duration_sec": duration,
"throughput_fps": throughput_fps,
"frame_drop_rate": drop_rate,
"latency": latency_stats,
}
def get_all_metrics(self) -> Dict[str, Any]:
return {
"gpu": self.get_gpu_metrics(),
"throughput": self.get_throughput_metrics(),
}
def reset(self):
with self._lock:
self._perf_metrics = PerformanceMetrics()
self.gpu_monitor.clear()
self._start_time = None
self._end_time = None