245 lines
7.7 KiB
Python
245 lines
7.7 KiB
Python
"""
|
||
指标采集模块
|
||
"""
|
||
|
||
import time
|
||
import threading
|
||
from typing import Dict, Any, List, Optional
|
||
from dataclasses import dataclass, field
|
||
|
||
from .utils import setup_logging, calculate_statistics
|
||
|
||
logger = setup_logging()
|
||
|
||
|
||
@dataclass
|
||
class GPUMetrics:
|
||
"""GPU 指标"""
|
||
gpu_utilization: float = 0.0
|
||
memory_used_mb: float = 0.0
|
||
memory_total_mb: float = 0.0
|
||
memory_utilization: float = 0.0
|
||
temperature: float = 0.0
|
||
power_draw_w: float = 0.0
|
||
sm_clock_mhz: float = 0.0
|
||
memory_clock_mhz: float = 0.0
|
||
|
||
|
||
@dataclass
|
||
class PerformanceMetrics:
|
||
"""性能指标"""
|
||
total_frames: int = 0
|
||
total_batches: int = 0
|
||
dropped_frames: int = 0
|
||
latencies_ms: List[float] = field(default_factory=list)
|
||
throughput_fps: float = 0.0
|
||
|
||
def get_latency_stats(self) -> Dict[str, float]:
|
||
return calculate_statistics(self.latencies_ms)
|
||
|
||
|
||
class GPUMonitor:
|
||
"""GPU 监控器"""
|
||
|
||
def __init__(self, device_id: int = 0, sample_interval_ms: int = 100):
|
||
self.device_id = device_id
|
||
self.sample_interval_ms = sample_interval_ms
|
||
|
||
self._running = False
|
||
self._thread: Optional[threading.Thread] = None
|
||
self._samples: List[GPUMetrics] = []
|
||
self._lock = threading.Lock()
|
||
|
||
self._init_nvml()
|
||
|
||
def _init_nvml(self):
|
||
self._nvml_available = False
|
||
|
||
try:
|
||
import pynvml
|
||
pynvml.nvmlInit()
|
||
self._nvml = pynvml
|
||
self._handle = pynvml.nvmlDeviceGetHandleByIndex(self.device_id)
|
||
|
||
name = pynvml.nvmlDeviceGetName(self._handle)
|
||
if isinstance(name, bytes):
|
||
name = name.decode("utf-8")
|
||
|
||
memory_info = pynvml.nvmlDeviceGetMemoryInfo(self._handle)
|
||
total_memory_mb = memory_info.total / (1024 * 1024)
|
||
|
||
logger.info(f"GPU 监控初始化: {name}, 显存: {total_memory_mb:.0f} MB")
|
||
self._nvml_available = True
|
||
self._total_memory_mb = total_memory_mb
|
||
|
||
except ImportError:
|
||
logger.warning("pynvml 未安装,GPU 监控不可用")
|
||
except Exception as e:
|
||
logger.warning(f"NVML 初始化失败: {e}")
|
||
|
||
def _sample(self) -> GPUMetrics:
|
||
if not self._nvml_available:
|
||
return GPUMetrics()
|
||
|
||
try:
|
||
util = self._nvml.nvmlDeviceGetUtilizationRates(self._handle)
|
||
gpu_util = util.gpu
|
||
|
||
mem_info = self._nvml.nvmlDeviceGetMemoryInfo(self._handle)
|
||
mem_used_mb = mem_info.used / (1024 * 1024)
|
||
mem_total_mb = mem_info.total / (1024 * 1024)
|
||
mem_util = (mem_info.used / mem_info.total) * 100
|
||
|
||
try:
|
||
temp = self._nvml.nvmlDeviceGetTemperature(self._handle, self._nvml.NVML_TEMPERATURE_GPU)
|
||
except:
|
||
temp = 0.0
|
||
|
||
try:
|
||
power = self._nvml.nvmlDeviceGetPowerUsage(self._handle) / 1000
|
||
except:
|
||
power = 0.0
|
||
|
||
return GPUMetrics(
|
||
gpu_utilization=gpu_util,
|
||
memory_used_mb=mem_used_mb,
|
||
memory_total_mb=mem_total_mb,
|
||
memory_utilization=mem_util,
|
||
temperature=temp,
|
||
power_draw_w=power,
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.warning(f"GPU 指标采集失败: {e}")
|
||
return GPUMetrics()
|
||
|
||
def _monitor_loop(self):
|
||
interval_sec = self.sample_interval_ms / 1000
|
||
|
||
while self._running:
|
||
metrics = self._sample()
|
||
|
||
with self._lock:
|
||
self._samples.append(metrics)
|
||
|
||
time.sleep(interval_sec)
|
||
|
||
def start(self):
|
||
if self._running:
|
||
return
|
||
|
||
self._running = True
|
||
self._samples.clear()
|
||
self._thread = threading.Thread(target=self._monitor_loop, daemon=True)
|
||
self._thread.start()
|
||
logger.info("GPU 监控已启动")
|
||
|
||
def stop(self):
|
||
self._running = False
|
||
if self._thread:
|
||
self._thread.join(timeout=2.0)
|
||
logger.info("GPU 监控已停止")
|
||
|
||
def get_samples(self) -> List[GPUMetrics]:
|
||
with self._lock:
|
||
return list(self._samples)
|
||
|
||
def get_statistics(self) -> Dict[str, Any]:
|
||
samples = self.get_samples()
|
||
if not samples:
|
||
return {}
|
||
|
||
gpu_utils = [s.gpu_utilization for s in samples]
|
||
mem_utils = [s.memory_utilization for s in samples]
|
||
mem_used = [s.memory_used_mb for s in samples]
|
||
|
||
return {
|
||
"gpu_utilization": calculate_statistics(gpu_utils),
|
||
"memory_utilization": calculate_statistics(mem_utils),
|
||
"memory_used_mb": calculate_statistics(mem_used),
|
||
"sample_count": len(samples),
|
||
}
|
||
|
||
def clear(self):
|
||
with self._lock:
|
||
self._samples.clear()
|
||
|
||
|
||
class MetricsCollector:
|
||
"""性能指标采集器"""
|
||
|
||
def __init__(self, device_id: int = 0, sample_interval_ms: int = 100):
|
||
self.device_id = device_id
|
||
self.sample_interval_ms = sample_interval_ms
|
||
|
||
self.gpu_monitor = GPUMonitor(device_id, sample_interval_ms)
|
||
|
||
self._perf_metrics = PerformanceMetrics()
|
||
self._lock = threading.Lock()
|
||
|
||
self._start_time: Optional[float] = None
|
||
self._end_time: Optional[float] = None
|
||
|
||
def start(self):
|
||
self._start_time = time.time()
|
||
self._perf_metrics = PerformanceMetrics()
|
||
self.gpu_monitor.start()
|
||
|
||
def stop(self):
|
||
self._end_time = time.time()
|
||
self.gpu_monitor.stop()
|
||
|
||
def record_inference(self, latency_ms: float, batch_size: int):
|
||
with self._lock:
|
||
self._perf_metrics.latencies_ms.append(latency_ms)
|
||
self._perf_metrics.total_batches += 1
|
||
self._perf_metrics.total_frames += batch_size
|
||
|
||
def record_frame_drop(self, count: int = 1):
|
||
with self._lock:
|
||
self._perf_metrics.dropped_frames += count
|
||
|
||
def get_gpu_metrics(self) -> Dict[str, Any]:
|
||
return self.gpu_monitor.get_statistics()
|
||
|
||
def get_throughput_metrics(self) -> Dict[str, Any]:
|
||
with self._lock:
|
||
perf = self._perf_metrics
|
||
|
||
if self._start_time and self._end_time:
|
||
duration = self._end_time - self._start_time
|
||
elif self._start_time:
|
||
duration = time.time() - self._start_time
|
||
else:
|
||
duration = 0
|
||
|
||
throughput_fps = perf.total_frames / duration if duration > 0 else 0
|
||
|
||
total_expected = perf.total_frames + perf.dropped_frames
|
||
drop_rate = perf.dropped_frames / total_expected * 100 if total_expected > 0 else 0
|
||
|
||
latency_stats = perf.get_latency_stats()
|
||
|
||
return {
|
||
"total_frames": perf.total_frames,
|
||
"total_batches": perf.total_batches,
|
||
"dropped_frames": perf.dropped_frames,
|
||
"duration_sec": duration,
|
||
"throughput_fps": throughput_fps,
|
||
"frame_drop_rate": drop_rate,
|
||
"latency": latency_stats,
|
||
}
|
||
|
||
def get_all_metrics(self) -> Dict[str, Any]:
|
||
return {
|
||
"gpu": self.get_gpu_metrics(),
|
||
"throughput": self.get_throughput_metrics(),
|
||
}
|
||
|
||
def reset(self):
|
||
with self._lock:
|
||
self._perf_metrics = PerformanceMetrics()
|
||
self.gpu_monitor.clear()
|
||
self._start_time = None
|
||
self._end_time = None
|