Files
security-ai-edge/utils/logger.py
16337 b2469c576c fix: 移除 config_sync 模块的日志抑制,恢复配置同步日志输出
core.config_sync 被错误地加入 _QUIET_LOGGERS 列表,导致所有 INFO 级别
日志被抑制,无法看到 Redis 连接、Stream 监听、配置同步等关键运行日志。

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 17:05:49 +08:00

411 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
分级日志系统
提供多级别日志支持,包含文件输出、控制台输出、性能指标记录
"""
import os
import sys
import logging
import logging.handlers
import time
import threading
import json
from datetime import datetime
from typing import Any, Dict, Optional, Union
from pathlib import Path
from config.settings import get_settings
class PerformanceLogger:
"""性能指标记录器"""
def __init__(self):
self._metrics: Dict[str, list] = {}
self._lock = threading.Lock()
def record(self, metric_name: str, value: float, tags: Optional[Dict[str, str]] = None):
"""记录性能指标"""
with self._lock:
key = metric_name
if metric_name not in self._metrics:
self._metrics[metric_name] = []
self._metrics[metric_name].append({
"value": value,
"timestamp": time.time(),
"datetime": datetime.now().isoformat(),
"tags": tags or {}
})
def get_metrics(self, metric_name: Optional[str] = None) -> Dict[str, Any]:
"""获取性能指标"""
with self._lock:
if metric_name:
return self._metrics.get(metric_name, [])
return dict(self._metrics)
def get_last_value(self, metric_name: str) -> Optional[float]:
"""获取最新指标值"""
with self._lock:
metrics = self._metrics.get(metric_name, [])
if metrics:
return metrics[-1].get("value")
return None
def get_statistics(self, metric_name: str) -> Dict[str, float]:
"""获取指标统计信息"""
with self._lock:
values = [m["value"] for m in self._metrics.get(metric_name, [])]
if not values:
return {}
return {
"count": len(values),
"min": min(values),
"max": max(values),
"avg": sum(values) / len(values),
"sum": sum(values),
}
def clear(self):
"""清空所有指标"""
with self._lock:
self._metrics.clear()
class StructuredLogger:
"""结构化日志记录器"""
def __init__(self, name: str = "edge_inference"):
self.name = name
self._logger = None
self._performance_logger = PerformanceLogger()
self._log_dir = "./logs"
self._init_logger()
# 非告警模块列表:这些模块的 INFO/DEBUG 日志会被抑制,只显示 WARNING 及以上
# 告警相关模块alarm_upload_worker, result_reporter, main 等)保持原始日志级别
_QUIET_LOGGERS = [
# 视频流 / 帧处理
"core.video_stream",
"video_stream",
"multi_stream",
# 图像预处理
"core.preprocessor",
"preprocessor",
# 后处理NMS、坐标映射、告警状态机
"core.postprocessor",
"postprocessor",
# TensorRT 推理引擎
"core.tensorrt_engine",
"tensorrt",
# 数据库
"config.database",
# 算法管理器(注册、配置订阅)
"algorithms",
# 图片存储
"core.storage_manager",
# 版本控制
"utils.version_control",
]
def _init_logger(self):
"""初始化日志配置"""
settings = get_settings()
self._log_level = getattr(logging, settings.log_level.upper(), logging.INFO)
self._log_dir = settings.log_dir
self._max_size = settings.log_file_max_size
self._backup_count = settings.log_file_backup_count
os.makedirs(self._log_dir, exist_ok=True)
formatter = logging.Formatter(
fmt='%(asctime)s | %(levelname)-8s | %(name)s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 配置 root logger使所有模块的 logging.getLogger(name) 都能输出
root_logger = logging.getLogger()
root_logger.setLevel(self._log_level)
if not root_logger.handlers:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(self._log_level)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# 抑制非告警模块的 INFO/DEBUG 日志,只保留 WARNING 及以上
# 告警相关模块alarm_upload_worker, result_reporter, main 等)不受影响
for logger_name in self._QUIET_LOGGERS:
quiet_logger = logging.getLogger(logger_name)
quiet_logger.setLevel(logging.WARNING)
# 配置命名 logger主模块专用写入独立日志文件
self._logger = logging.getLogger(self.name)
self._logger.setLevel(self._log_level)
self._logger.handlers.clear()
self._logger.propagate = True # 通过 root logger 输出到控制台
self._add_file_handler(formatter)
def _add_file_handler(self, formatter: logging.Formatter):
"""添加文件处理器"""
log_file = os.path.join(self._log_dir, f"{self.name}.log")
try:
file_handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=self._max_size,
backupCount=self._backup_count,
encoding='utf-8'
)
file_handler.setLevel(self._log_level)
file_handler.setFormatter(formatter)
self._logger.addHandler(file_handler)
error_file = os.path.join(self._log_dir, f"{self.name}_error.log")
error_handler = logging.handlers.RotatingFileHandler(
error_file,
maxBytes=self._max_size,
backupCount=self._backup_count,
encoding='utf-8'
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(formatter)
self._logger.addHandler(error_handler)
except Exception as e:
sys.stderr.write(f"创建日志文件处理器失败: {e}\n")
def _log(
self,
level: int,
message: str,
extra: Optional[Dict[str, Any]] = None,
exc_info: bool = False
):
"""结构化日志记录"""
log_data = {
"timestamp": datetime.now().isoformat(),
"logger": self.name,
}
if extra:
log_data.update(extra)
extra_fields = {"structured_data": json.dumps(log_data, ensure_ascii=False)}
self._logger.log(level, message, extra=extra_fields, exc_info=exc_info)
def debug(self, message: str, **kwargs):
"""DEBUG级别日志"""
self._log(logging.DEBUG, message, kwargs)
def info(self, message: str, **kwargs):
"""INFO级别日志"""
self._log(logging.INFO, message, kwargs)
def warning(self, message: str, **kwargs):
"""WARNING级别日志"""
self._log(logging.WARNING, message, kwargs)
def error(self, message: str, exc_info: bool = True, **kwargs):
"""ERROR级别日志"""
self._log(logging.ERROR, message, kwargs, exc_info=exc_info)
def critical(self, message: str, exc_info: bool = True, **kwargs):
"""CRITICAL级别日志"""
self._log(logging.CRITICAL, message, kwargs, exc_info=exc_info)
def performance(self, metric_name: str, value: float,
duration_ms: Optional[float] = None, **tags):
"""记录性能指标"""
self._performance_logger.record(metric_name, value, tags)
perf_data = {
"metric": metric_name,
"value": value,
"duration_ms": duration_ms,
"tags": tags
}
self.info(f"性能指标: {metric_name} = {value}", **perf_data)
def log_inference_latency(self, latency_ms: float, batch_size: int = 1):
"""记录推理延迟"""
self.performance(
"inference_latency_ms",
latency_ms,
batch_size=batch_size,
throughput_fps=1000.0 / latency_ms if latency_ms > 0 else 0
)
def log_frame_rate(self, fps: float, camera_id: str):
"""记录帧率"""
self.performance(
"frame_rate_fps",
fps,
camera_id=camera_id
)
def log_resource_usage(
self,
cpu_percent: float,
memory_mb: float,
gpu_memory_mb: Optional[float] = None
):
"""记录资源使用情况"""
self.performance(
"cpu_percent",
cpu_percent,
memory_mb=memory_mb,
gpu_memory_mb=gpu_memory_mb
)
def log_alert(self, alert_type: str, camera_id: str, roi_id: str,
confidence: Optional[float] = None):
"""记录告警事件"""
self.info(
f"告警触发: {alert_type}",
alert_type=alert_type,
camera_id=camera_id,
roi_id=roi_id,
confidence=confidence
)
def log_connection_event(self, event_type: str,
connection_type: str,
target: str,
success: bool,
error_msg: Optional[str] = None):
"""记录连接事件"""
self.info(
f"连接事件: {event_type} - {connection_type} -> {target}",
event_type=event_type,
connection_type=connection_type,
target=target,
success=success,
error_msg=error_msg
)
def get_performance_metrics(self) -> Dict[str, Any]:
"""获取性能指标"""
return {
"metrics": self._performance_logger.get_metrics(),
"statistics": {
name: self._performance_logger.get_statistics(name)
for name in self._performance_logger.get_metrics()
}
}
def get_statistics(self, metric_name: str) -> Dict[str, float]:
"""获取指定指标统计"""
return self._performance_logger.get_statistics(metric_name)
def export_metrics(self, output_path: str):
"""导出性能指标到文件"""
metrics = self.get_performance_metrics()
try:
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(metrics, f, ensure_ascii=False, indent=2)
self.info(f"性能指标已导出: {output_path}")
except Exception as e:
self.error(f"导出性能指标失败: {e}")
def flush(self):
"""刷新日志处理器"""
for handler in self._logger.handlers:
if hasattr(handler, 'flush'):
handler.flush()
def close(self):
"""关闭日志系统"""
self.flush()
for handler in self._logger.handlers:
handler.close()
self._logger.handlers.clear()
class ContextLogger:
"""上下文日志记录器,自动附加上下文信息"""
def __init__(self, base_logger: StructuredLogger, context: Dict[str, Any]):
self._base_logger = base_logger
self._context = context
def _add_context(self, kwargs: Dict[str, Any]) -> Dict[str, Any]:
"""添加上下文信息"""
result = dict(self._context)
result.update(kwargs)
return result
def debug(self, message: str, **kwargs):
"""DEBUG级别日志"""
self._base_logger.debug(message, **self._add_context(kwargs))
def info(self, message: str, **kwargs):
"""INFO级别日志"""
self._base_logger.info(message, **self._add_context(kwargs))
def warning(self, message: str, **kwargs):
"""WARNING级别日志"""
self._base_logger.warning(message, **self._add_context(kwargs))
def error(self, message: str, **kwargs):
"""ERROR级别日志"""
self._base_logger.error(message, **self._add_context(kwargs))
def critical(self, message: str, **kwargs):
"""CRITICAL级别日志"""
self._base_logger.critical(message, **self._add_context(kwargs))
def with_context(self, **additional_context) -> 'ContextLogger':
"""添加额外上下文"""
new_context = dict(self._context)
new_context.update(additional_context)
return ContextLogger(self._base_logger, new_context)
_structured_logger_instance = None
def get_logger(name: str = "edge_inference") -> StructuredLogger:
"""获取结构化日志记录器单例"""
global _structured_logger_instance
if _structured_logger_instance is None:
_structured_logger_instance = StructuredLogger(name)
return _structured_logger_instance
def create_logger_with_context(base_logger: StructuredLogger,
context: Dict[str, Any]) -> ContextLogger:
"""创建带上下文的日志记录器"""
return ContextLogger(base_logger, context)
def log_performance_operation(operation_name: str, logger_instance: StructuredLogger):
"""性能日志装饰器"""
def decorator(func):
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
try:
result = func(*args, **kwargs)
duration_ms = (time.perf_counter() - start_time) * 1000
logger_instance.log_inference_latency(
duration_ms,
batch_size=1
)
logger_instance.debug(
f"操作完成: {operation_name}, 耗时: {duration_ms:.2f}ms"
)
return result
except Exception as e:
duration_ms = (time.perf_counter() - start_time) * 1000
logger_instance.error(
f"操作失败: {operation_name}, 耗时: {duration_ms:.2f}ms, 错误: {e}"
)
raise
return wrapper
return decorator