Files
security-ai-edge/core/alarm_upload_worker.py
16337 5a0265de52 修复:P0+P1 生产稳定性和性能优化(6项)
P0 稳定性修复:
- 告警去重字典添加惰性清理机制,防止长时间运行内存溢出
- Redis 连接断开时显式 close() 后再置 None,防止文件描述符泄漏
- 截图消息 ACK 移至成功路径,失败消息留在 pending list 自动重试

P1 性能优化:
- GPU NMS 添加 torch.no_grad() + 显式释放临时张量,减少显存碎片
- 截图存储改为 Redis 原始 bytes,去掉 Base64 编解码开销(兼容旧格式)
- ROI 配置查询 N+1 改为 get_all_bindings() 单次 JOIN 查询

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 14:10:27 +08:00

529 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
告警上报 Worker
独立线程运行,从 Redis 队列消费告警:
1. BRPOP 取待上报告警
2. 上传截图到腾讯云 COS
3. HTTP POST 上报告警元数据到云端
4. 失败重试 / 死信队列
Redis Key 设计:
local:alarm:pending - 待上报告警队列
local:alarm:retry - 重试告警队列
local:alarm:dead - 死信告警队列
"""
import json
import logging
import threading
import time
from datetime import datetime, timezone, timedelta
_BEIJING_TZ = timezone(timedelta(hours=8))
from typing import Any, Dict, Optional
import redis
import requests
from config.settings import get_settings
from core.result_reporter import (
REDIS_KEY_ALARM_PENDING,
REDIS_KEY_ALARM_RETRY,
REDIS_KEY_ALARM_DEAD,
)
logger = logging.getLogger(__name__)
class AlarmUploadWorker:
"""告警上报 Worker
在独立线程中运行,从本地 Redis 消费告警数据,
上传截图到 COS再 HTTP 上报告警元数据到云端。
"""
def __init__(self):
self._settings = get_settings()
self._logger = logging.getLogger("alarm_upload_worker")
self._redis: Optional[redis.Redis] = None
self._redis_binary: Optional[redis.Redis] = None # 用于读取截图 bytes
self._cos_client = None # 懒初始化
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._stats = {
"processed": 0,
"cos_uploaded": 0,
"http_reported": 0,
"retried": 0,
"dead_lettered": 0,
"errors": 0,
}
def start(self):
"""启动 worker 线程"""
if self._thread and self._thread.is_alive():
self._logger.warning("AlarmUploadWorker 已在运行")
return
# 初始化 Redis 连接
redis_cfg = self._settings.local_redis
try:
self._redis = redis.Redis(
host=redis_cfg.host,
port=redis_cfg.port,
db=redis_cfg.db,
password=redis_cfg.password,
decode_responses=True,
socket_connect_timeout=5,
)
self._redis.ping()
self._logger.info(f"Worker Redis 连接成功: {redis_cfg.host}:{redis_cfg.port}/{redis_cfg.db}")
# 二进制 Redis 连接(用于读取截图 bytes不做 decode
self._redis_binary = redis.Redis(
host=redis_cfg.host,
port=redis_cfg.port,
db=redis_cfg.db,
password=redis_cfg.password,
decode_responses=False,
socket_connect_timeout=5,
)
except Exception as e:
self._logger.error(f"Worker Redis 连接失败: {e}")
return
# 启动时验证云端 API 可达性
self._check_cloud_api()
self._stop_event.clear()
self._thread = threading.Thread(
target=self._worker_loop,
name="AlarmUploadWorker",
daemon=True,
)
self._thread.start()
self._logger.info("AlarmUploadWorker 已启动")
def _check_cloud_api(self):
"""启动时检查云端 API 是否可达(仅记录日志,不阻断启动)"""
upload_cfg = self._settings.alarm_upload
base_url = upload_cfg.cloud_api_url.rstrip("/")
health_url = f"{base_url}/health"
report_url = f"{base_url}/api/ai/alert/edge/report"
self._logger.info(f"云端 API 地址: {base_url}")
self._logger.info(f"告警上报端点: {report_url}")
try:
resp = requests.get(health_url, timeout=5, proxies={"http": None, "https": None})
if resp.status_code == 200:
self._logger.info(f"云端健康检查通过: {health_url}")
else:
self._logger.warning(
f"云端健康检查异常: {health_url}, status={resp.status_code}"
)
except requests.ConnectionError:
self._logger.warning(f"云端不可达: {health_url},请确认服务已启动")
except Exception as e:
self._logger.warning(f"云端健康检查失败: {e}")
def stop(self):
"""停止 worker"""
if not self._thread or not self._thread.is_alive():
return
self._logger.info("正在停止 AlarmUploadWorker...")
self._stop_event.set()
# 等待线程退出BRPOP 有超时,所以不会永远阻塞)
self._thread.join(timeout=10)
if self._redis:
try:
self._redis.close()
except Exception:
pass
if self._redis_binary:
try:
self._redis_binary.close()
except Exception:
pass
self._logger.info("AlarmUploadWorker 已停止")
def _worker_loop(self):
"""主循环:消费待上报告警"""
self._logger.info("Worker 主循环开始")
while not self._stop_event.is_set():
try:
# 先检查重试队列
self._process_retry_queue()
# BRPOP 阻塞等待新告警(超时 2 秒,便于检查 stop 信号)
result = self._redis.brpop(REDIS_KEY_ALARM_PENDING, timeout=2)
if result is None:
continue
_, alarm_json = result
self._process_alarm(alarm_json)
except redis.ConnectionError as e:
self._logger.error(f"Redis 连接断开: {e}")
time.sleep(5)
self._reconnect_redis()
except Exception as e:
self._logger.error(f"Worker 主循环异常: {e}")
self._stats["errors"] += 1
time.sleep(1)
self._logger.info("Worker 主循环退出")
def _process_alarm(self, alarm_json: str):
"""处理单条告警"""
try:
alarm_data = json.loads(alarm_json)
except json.JSONDecodeError as e:
self._logger.error(f"告警 JSON 解析失败: {e}")
return
# 如果是 resolve 事件,走单独的处理逻辑
if alarm_data.get("_type") == "resolve":
self._process_resolve(alarm_data)
return
alarm_id = alarm_data.get("alarm_id", "unknown")
retry_count = alarm_data.get("_retry_count", 0)
self._logger.info(f"开始处理告警: {alarm_id} (retry={retry_count})")
# Step 1: 上传截图到 COS
snapshot_key = (alarm_data.get("ext_data") or {}).get("_snapshot_key")
snapshot_b64 = alarm_data.get("snapshot_b64")
object_key = None
if snapshot_key:
# 新格式:从独立 Redis key 获取原始 bytes
try:
image_bytes = self._redis_binary.get(snapshot_key) if self._redis_binary else None
if image_bytes is None:
self._logger.warning(f"截图 key 已过期: {snapshot_key}, 无截图继续上报")
else:
object_key = self._upload_snapshot_to_cos(
image_bytes, alarm_id, alarm_data.get("device_id", "unknown")
)
if object_key is None:
self._handle_retry(alarm_json, "COS 上传失败")
return
# 上传成功后删除临时 key
try:
if self._redis_binary:
self._redis_binary.delete(snapshot_key)
except Exception:
pass
except Exception as e:
self._logger.error(f"截图获取/上传失败: {e}")
self._handle_retry(alarm_json, f"截图处理失败: {e}")
return
elif snapshot_b64:
# 兼容旧格式 (Base64)
try:
import base64
image_bytes = base64.b64decode(snapshot_b64)
object_key = self._upload_snapshot_to_cos(
image_bytes, alarm_id, alarm_data.get("device_id", "unknown")
)
if object_key is None:
self._handle_retry(alarm_json, "COS 上传失败")
return
except Exception as e:
self._logger.error(f"截图解码/上传失败: {e}")
self._handle_retry(alarm_json, f"截图处理失败: {e}")
return
# Step 2: HTTP 上报告警元数据(不含截图二进制数据)
report_data = {
"alarm_id": alarm_data.get("alarm_id"),
"alarm_type": alarm_data.get("alarm_type"),
"device_id": alarm_data.get("device_id"),
"scene_id": alarm_data.get("scene_id"),
"event_time": alarm_data.get("event_time"),
"alarm_level": alarm_data.get("alarm_level"),
"snapshot_url": object_key or "", # COS object_key
"algorithm_code": alarm_data.get("algorithm_code"),
"confidence_score": alarm_data.get("confidence_score"),
"ext_data": alarm_data.get("ext_data", {}),
}
success = self._report_alarm_http(report_data)
if success:
self._stats["processed"] += 1
self._logger.info(f"告警上报成功: {alarm_id}")
else:
# HTTP 上报失败,进入重试
self._handle_retry(alarm_json, "HTTP 上报失败")
def _process_resolve(self, resolve_data: dict):
"""处理告警结束事件 - HTTP POST 到云端"""
upload_cfg = self._settings.alarm_upload
base_url = upload_cfg.cloud_api_url.rstrip("/")
url = f"{base_url}/api/ai/alert/edge/resolve"
headers = {"Content-Type": "application/json"}
if upload_cfg.edge_token:
headers["Authorization"] = f"Bearer {upload_cfg.edge_token}"
payload = {
"alarm_id": resolve_data.get("alarm_id"),
"duration_ms": resolve_data.get("duration_ms"),
"last_frame_time": resolve_data.get("last_frame_time"),
"resolve_type": resolve_data.get("resolve_type"),
}
try:
response = requests.post(url, json=payload, headers=headers, timeout=10,
proxies={"http": None, "https": None})
if response.status_code == 200:
body = response.json()
if body.get("code") == 0:
self._logger.info(f"告警结束上报成功: {resolve_data.get('alarm_id')}")
else:
self._logger.warning(f"告警结束上报业务错误: {body}")
else:
self._logger.warning(f"告警结束上报失败: status={response.status_code}")
except Exception as e:
self._logger.warning(f"告警结束上报异常: {e}")
def _upload_snapshot_to_cos(
self, image_bytes: bytes, alarm_id: str, device_id: str
) -> Optional[str]:
"""
上传截图到腾讯云 COS直接从内存字节流上传
Args:
image_bytes: JPEG 图片字节
alarm_id: 告警ID
device_id: 设备ID
Returns:
COS object_key失败返回 None
"""
cos_cfg = self._settings.cos
if not cos_cfg.secret_id or not cos_cfg.bucket:
self._logger.error("COS 未配置(缺少 secret_id 或 bucket无法上传截图")
return None
# 懒初始化 COS 客户端
if self._cos_client is None:
try:
from qcloud_cos import CosConfig, CosS3Client
config = CosConfig(
Region=cos_cfg.region,
SecretId=cos_cfg.secret_id,
SecretKey=cos_cfg.secret_key,
Scheme="https",
)
self._cos_client = CosS3Client(config)
self._logger.info(f"COS 客户端初始化成功: bucket={cos_cfg.bucket}")
except Exception as e:
self._logger.error(f"COS 客户端初始化失败: {e}")
return None
# 生成 Object Key: alarms/{device_id}/{yyyy-MM-dd}/{alarm_id}.jpg
date_str = datetime.now(_BEIJING_TZ).strftime("%Y-%m-%d")
object_key = f"alarms/{device_id}/{date_str}/{alarm_id}.jpg"
try:
self._cos_client.put_object(
Bucket=cos_cfg.bucket,
Body=image_bytes,
Key=object_key,
ContentType="image/jpeg",
)
self._stats["cos_uploaded"] += 1
self._logger.info(f"COS 上传成功: {object_key}")
return object_key
except Exception as e:
self._logger.error(f"COS 上传失败: {e}")
return None
def _report_alarm_http(self, alarm_data: Dict[str, Any]) -> bool:
"""
HTTP POST 上报告警元数据到云端
Args:
alarm_data: 告警元数据
Returns:
是否上报成功
"""
upload_cfg = self._settings.alarm_upload
base_url = upload_cfg.cloud_api_url.rstrip("/")
url = f"{base_url}/api/ai/alert/edge/report"
headers = {
"Content-Type": "application/json",
}
if upload_cfg.edge_token:
headers["Authorization"] = f"Bearer {upload_cfg.edge_token}"
# 过滤掉内部字段(以 _ 开头的控制字段不发送到云端)
report_data = {k: v for k, v in alarm_data.items() if not k.startswith("_")}
self._logger.debug(f"HTTP 上报 URL: {url}")
self._logger.debug(f"HTTP 上报数据: {report_data}")
try:
response = requests.post(
url,
json=report_data,
headers=headers,
timeout=10,
proxies={"http": None, "https": None},
)
if response.status_code == 200:
body = response.json()
if body.get("code") == 0:
self._stats["http_reported"] += 1
return True
else:
self._logger.warning(
f"云端返回业务错误: code={body.get('code')}, msg={body.get('msg')}"
)
return False
else:
# 记录详细的错误信息便于排查
resp_text = ""
try:
resp_text = response.text[:500]
except Exception:
pass
self._logger.warning(
f"HTTP 上报失败: url={url}, status={response.status_code}, "
f"body={resp_text}"
)
return False
except requests.Timeout:
self._logger.warning(f"HTTP 上报超时: {url}")
return False
except requests.ConnectionError as e:
self._logger.warning(f"HTTP 上报连接失败: {url}, error={e}")
return False
except Exception as e:
self._logger.error(f"HTTP 上报异常: {url}, error={e}")
return False
def _handle_retry(self, alarm_json: str, error: str):
"""处理重试逻辑"""
try:
alarm_data = json.loads(alarm_json)
except json.JSONDecodeError:
return
retry_count = alarm_data.get("_retry_count", 0) + 1
max_retry = self._settings.alarm_upload.retry_max
if retry_count > max_retry:
# 超过最大重试次数,写入死信队列
alarm_data["_dead_reason"] = error
alarm_data["_dead_at"] = datetime.now(_BEIJING_TZ).isoformat()
dead_json = json.dumps(alarm_data, ensure_ascii=False)
self._redis.lpush(REDIS_KEY_ALARM_DEAD, dead_json)
self._stats["dead_lettered"] += 1
self._logger.warning(
f"告警进入死信队列: {alarm_data.get('alarm_id')}, "
f"reason={error}, retries={retry_count - 1}"
)
return
# 指数退避base_interval * 2^(retry_count-1)
base_interval = self._settings.alarm_upload.retry_interval
delay = base_interval * (2 ** (retry_count - 1))
alarm_data["_retry_count"] = retry_count
alarm_data["_retry_at"] = (
datetime.now(timezone.utc).timestamp() + delay
)
retry_json = json.dumps(alarm_data, ensure_ascii=False)
self._redis.lpush(REDIS_KEY_ALARM_RETRY, retry_json)
self._stats["retried"] += 1
self._logger.info(
f"告警将重试: {alarm_data.get('alarm_id')}, "
f"retry={retry_count}/{max_retry}, delay={delay}s, reason={error}"
)
def _process_retry_queue(self):
"""检查重试队列中到期的告警"""
if not self._redis:
return
try:
queue_len = self._redis.llen(REDIS_KEY_ALARM_RETRY)
if queue_len == 0:
return
now = datetime.now(timezone.utc).timestamp()
# 逐条检查FIFO: RPOP 取最早的)
checked = 0
while checked < queue_len:
item = self._redis.rpop(REDIS_KEY_ALARM_RETRY)
if item is None:
break
checked += 1
try:
data = json.loads(item)
retry_at = data.get("_retry_at", 0)
if now >= retry_at:
# 到期,放回 pending 队列重新处理
self._redis.lpush(REDIS_KEY_ALARM_PENDING, item)
self._logger.debug(
f"重试告警回归 pending: {data.get('alarm_id')}"
)
else:
# 未到期,放回 retry 队列头部
self._redis.lpush(REDIS_KEY_ALARM_RETRY, item)
except json.JSONDecodeError:
pass
except Exception as e:
self._logger.error(f"处理重试队列异常: {e}")
def _reconnect_redis(self):
"""重连 Redis"""
redis_cfg = self._settings.local_redis
try:
self._redis = redis.Redis(
host=redis_cfg.host,
port=redis_cfg.port,
db=redis_cfg.db,
password=redis_cfg.password,
decode_responses=True,
socket_connect_timeout=5,
)
self._redis.ping()
self._logger.info("Redis 重连成功")
except Exception as e:
self._logger.error(f"Redis 重连失败: {e}")
self._redis = None
def get_statistics(self) -> Dict[str, Any]:
"""获取 Worker 统计"""
stats = self._stats.copy()
stats["running"] = self._thread is not None and self._thread.is_alive()
return stats