Files
security-ai-edge/core/alarm_upload_worker.py

486 lines
17 KiB
Python
Raw Normal View History

"""
告警上报 Worker
独立线程运行 Redis 队列消费告警
1. BRPOP 取待上报告警
2. 上传截图到腾讯云 COS
3. HTTP POST 上报告警元数据到云端
4. 失败重试 / 死信队列
Redis Key 设计
local:alarm:pending - 待上报告警队列
local:alarm:retry - 重试告警队列
local:alarm:dead - 死信告警队列
"""
import json
import logging
import threading
import time
from datetime import datetime, timezone
from typing import Any, Dict, Optional
import redis
import requests
from config.settings import get_settings
from core.result_reporter import (
REDIS_KEY_ALARM_PENDING,
REDIS_KEY_ALARM_RETRY,
REDIS_KEY_ALARM_DEAD,
)
logger = logging.getLogger(__name__)
class AlarmUploadWorker:
"""告警上报 Worker
在独立线程中运行从本地 Redis 消费告警数据
上传截图到 COS HTTP 上报告警元数据到云端
"""
def __init__(self):
self._settings = get_settings()
self._logger = logging.getLogger("alarm_upload_worker")
self._redis: Optional[redis.Redis] = None
self._cos_client = None # 懒初始化
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._stats = {
"processed": 0,
"cos_uploaded": 0,
"http_reported": 0,
"retried": 0,
"dead_lettered": 0,
"errors": 0,
}
def start(self):
"""启动 worker 线程"""
if self._thread and self._thread.is_alive():
self._logger.warning("AlarmUploadWorker 已在运行")
return
# 初始化 Redis 连接
redis_cfg = self._settings.local_redis
try:
self._redis = redis.Redis(
host=redis_cfg.host,
port=redis_cfg.port,
db=redis_cfg.db,
password=redis_cfg.password,
decode_responses=True,
socket_connect_timeout=5,
)
self._redis.ping()
self._logger.info(f"Worker Redis 连接成功: {redis_cfg.host}:{redis_cfg.port}/{redis_cfg.db}")
except Exception as e:
self._logger.error(f"Worker Redis 连接失败: {e}")
return
# 启动时验证云端 API 可达性
self._check_cloud_api()
self._stop_event.clear()
self._thread = threading.Thread(
target=self._worker_loop,
name="AlarmUploadWorker",
daemon=True,
)
self._thread.start()
self._logger.info("AlarmUploadWorker 已启动")
def _check_cloud_api(self):
"""启动时检查云端 API 是否可达(仅记录日志,不阻断启动)"""
upload_cfg = self._settings.alarm_upload
base_url = upload_cfg.cloud_api_url.rstrip("/")
health_url = f"{base_url}/health"
report_url = f"{base_url}/api/ai/alert/edge/report"
self._logger.info(f"云端 API 地址: {base_url}")
self._logger.info(f"告警上报端点: {report_url}")
try:
resp = requests.get(health_url, timeout=5)
if resp.status_code == 200:
self._logger.info(f"云端健康检查通过: {health_url}")
else:
self._logger.warning(
f"云端健康检查异常: {health_url}, status={resp.status_code}"
)
except requests.ConnectionError:
self._logger.warning(f"云端不可达: {health_url},请确认服务已启动")
except Exception as e:
self._logger.warning(f"云端健康检查失败: {e}")
def stop(self):
"""停止 worker"""
if not self._thread or not self._thread.is_alive():
return
self._logger.info("正在停止 AlarmUploadWorker...")
self._stop_event.set()
# 等待线程退出BRPOP 有超时,所以不会永远阻塞)
self._thread.join(timeout=10)
if self._redis:
try:
self._redis.close()
except Exception:
pass
self._logger.info("AlarmUploadWorker 已停止")
def _worker_loop(self):
"""主循环:消费待上报告警"""
self._logger.info("Worker 主循环开始")
while not self._stop_event.is_set():
try:
# 先检查重试队列
self._process_retry_queue()
# BRPOP 阻塞等待新告警(超时 2 秒,便于检查 stop 信号)
result = self._redis.brpop(REDIS_KEY_ALARM_PENDING, timeout=2)
if result is None:
continue
_, alarm_json = result
self._process_alarm(alarm_json)
except redis.ConnectionError as e:
self._logger.error(f"Redis 连接断开: {e}")
time.sleep(5)
self._reconnect_redis()
except Exception as e:
self._logger.error(f"Worker 主循环异常: {e}")
self._stats["errors"] += 1
time.sleep(1)
self._logger.info("Worker 主循环退出")
def _process_alarm(self, alarm_json: str):
"""处理单条告警"""
try:
alarm_data = json.loads(alarm_json)
except json.JSONDecodeError as e:
self._logger.error(f"告警 JSON 解析失败: {e}")
return
# 如果是 resolve 事件,走单独的处理逻辑
if alarm_data.get("_type") == "resolve":
self._process_resolve(alarm_data)
return
alarm_id = alarm_data.get("alarm_id", "unknown")
retry_count = alarm_data.get("_retry_count", 0)
self._logger.info(f"开始处理告警: {alarm_id} (retry={retry_count})")
# Step 1: 上传截图到 COS从 base64 解码后直接上传字节流)
snapshot_b64 = alarm_data.get("snapshot_b64")
object_key = None
if snapshot_b64:
try:
import base64
image_bytes = base64.b64decode(snapshot_b64)
object_key = self._upload_snapshot_to_cos(
image_bytes,
alarm_id,
alarm_data.get("device_id", "unknown"),
)
if object_key is None:
# COS 上传失败,进入重试
self._handle_retry(alarm_json, "COS 上传失败")
return
except Exception as e:
self._logger.error(f"截图解码/上传失败: {e}")
self._handle_retry(alarm_json, f"截图处理失败: {e}")
return
# Step 2: HTTP 上报告警元数据(不含截图二进制数据)
report_data = {
"alarm_id": alarm_data.get("alarm_id"),
"alarm_type": alarm_data.get("alarm_type"),
"device_id": alarm_data.get("device_id"),
"scene_id": alarm_data.get("scene_id"),
"event_time": alarm_data.get("event_time"),
"alarm_level": alarm_data.get("alarm_level"),
"snapshot_url": object_key or "", # COS object_key
"algorithm_code": alarm_data.get("algorithm_code"),
"confidence_score": alarm_data.get("confidence_score"),
"ext_data": alarm_data.get("ext_data", {}),
}
success = self._report_alarm_http(report_data)
if success:
self._stats["processed"] += 1
self._logger.info(f"告警上报成功: {alarm_id}")
else:
# HTTP 上报失败,进入重试
self._handle_retry(alarm_json, "HTTP 上报失败")
def _process_resolve(self, resolve_data: dict):
"""处理告警结束事件 - HTTP POST 到云端"""
upload_cfg = self._settings.alarm_upload
base_url = upload_cfg.cloud_api_url.rstrip("/")
url = f"{base_url}/api/ai/alert/edge/resolve"
headers = {"Content-Type": "application/json"}
if upload_cfg.edge_token:
headers["Authorization"] = f"Bearer {upload_cfg.edge_token}"
payload = {
"alarm_id": resolve_data.get("alarm_id"),
"duration_ms": resolve_data.get("duration_ms"),
"last_frame_time": resolve_data.get("last_frame_time"),
"resolve_type": resolve_data.get("resolve_type"),
}
try:
response = requests.post(url, json=payload, headers=headers, timeout=10)
if response.status_code == 200:
body = response.json()
if body.get("code") == 0:
self._logger.info(f"告警结束上报成功: {resolve_data.get('alarm_id')}")
else:
self._logger.warning(f"告警结束上报业务错误: {body}")
else:
self._logger.warning(f"告警结束上报失败: status={response.status_code}")
except Exception as e:
self._logger.warning(f"告警结束上报异常: {e}")
def _upload_snapshot_to_cos(
self, image_bytes: bytes, alarm_id: str, device_id: str
) -> Optional[str]:
"""
上传截图到腾讯云 COS直接从内存字节流上传
Args:
image_bytes: JPEG 图片字节
alarm_id: 告警ID
device_id: 设备ID
Returns:
COS object_key失败返回 None
"""
cos_cfg = self._settings.cos
if not cos_cfg.secret_id or not cos_cfg.bucket:
self._logger.error("COS 未配置(缺少 secret_id 或 bucket无法上传截图")
return None
# 懒初始化 COS 客户端
if self._cos_client is None:
try:
from qcloud_cos import CosConfig, CosS3Client
config = CosConfig(
Region=cos_cfg.region,
SecretId=cos_cfg.secret_id,
SecretKey=cos_cfg.secret_key,
Scheme="https",
)
self._cos_client = CosS3Client(config)
self._logger.info(f"COS 客户端初始化成功: bucket={cos_cfg.bucket}")
except Exception as e:
self._logger.error(f"COS 客户端初始化失败: {e}")
return None
# 生成 Object Key: alarms/{device_id}/{yyyy-MM-dd}/{alarm_id}.jpg
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
object_key = f"alarms/{device_id}/{date_str}/{alarm_id}.jpg"
try:
self._cos_client.put_object(
Bucket=cos_cfg.bucket,
Body=image_bytes,
Key=object_key,
ContentType="image/jpeg",
)
self._stats["cos_uploaded"] += 1
self._logger.info(f"COS 上传成功: {object_key}")
return object_key
except Exception as e:
self._logger.error(f"COS 上传失败: {e}")
return None
def _report_alarm_http(self, alarm_data: Dict[str, Any]) -> bool:
"""
HTTP POST 上报告警元数据到云端
Args:
alarm_data: 告警元数据
Returns:
是否上报成功
"""
upload_cfg = self._settings.alarm_upload
base_url = upload_cfg.cloud_api_url.rstrip("/")
url = f"{base_url}/api/ai/alert/edge/report"
headers = {
"Content-Type": "application/json",
}
if upload_cfg.edge_token:
headers["Authorization"] = f"Bearer {upload_cfg.edge_token}"
# 过滤掉内部字段(以 _ 开头的控制字段不发送到云端)
report_data = {k: v for k, v in alarm_data.items() if not k.startswith("_")}
self._logger.debug(f"HTTP 上报 URL: {url}")
self._logger.debug(f"HTTP 上报数据: {report_data}")
try:
response = requests.post(
url,
json=report_data,
headers=headers,
timeout=10,
)
if response.status_code == 200:
body = response.json()
if body.get("code") == 0:
self._stats["http_reported"] += 1
return True
else:
self._logger.warning(
f"云端返回业务错误: code={body.get('code')}, msg={body.get('msg')}"
)
return False
else:
# 记录详细的错误信息便于排查
resp_text = ""
try:
resp_text = response.text[:500]
except Exception:
pass
self._logger.warning(
f"HTTP 上报失败: url={url}, status={response.status_code}, "
f"body={resp_text}"
)
return False
except requests.Timeout:
self._logger.warning(f"HTTP 上报超时: {url}")
return False
except requests.ConnectionError as e:
self._logger.warning(f"HTTP 上报连接失败: {url}, error={e}")
return False
except Exception as e:
self._logger.error(f"HTTP 上报异常: {url}, error={e}")
return False
def _handle_retry(self, alarm_json: str, error: str):
"""处理重试逻辑"""
try:
alarm_data = json.loads(alarm_json)
except json.JSONDecodeError:
return
retry_count = alarm_data.get("_retry_count", 0) + 1
max_retry = self._settings.alarm_upload.retry_max
if retry_count > max_retry:
# 超过最大重试次数,写入死信队列
alarm_data["_dead_reason"] = error
alarm_data["_dead_at"] = datetime.now(timezone.utc).isoformat()
dead_json = json.dumps(alarm_data, ensure_ascii=False)
self._redis.lpush(REDIS_KEY_ALARM_DEAD, dead_json)
self._stats["dead_lettered"] += 1
self._logger.warning(
f"告警进入死信队列: {alarm_data.get('alarm_id')}, "
f"reason={error}, retries={retry_count - 1}"
)
return
# 指数退避base_interval * 2^(retry_count-1)
base_interval = self._settings.alarm_upload.retry_interval
delay = base_interval * (2 ** (retry_count - 1))
alarm_data["_retry_count"] = retry_count
alarm_data["_retry_at"] = (
datetime.now(timezone.utc).timestamp() + delay
)
retry_json = json.dumps(alarm_data, ensure_ascii=False)
self._redis.lpush(REDIS_KEY_ALARM_RETRY, retry_json)
self._stats["retried"] += 1
self._logger.info(
f"告警将重试: {alarm_data.get('alarm_id')}, "
f"retry={retry_count}/{max_retry}, delay={delay}s, reason={error}"
)
def _process_retry_queue(self):
"""检查重试队列中到期的告警"""
if not self._redis:
return
try:
queue_len = self._redis.llen(REDIS_KEY_ALARM_RETRY)
if queue_len == 0:
return
now = datetime.now(timezone.utc).timestamp()
# 逐条检查FIFO: RPOP 取最早的)
checked = 0
while checked < queue_len:
item = self._redis.rpop(REDIS_KEY_ALARM_RETRY)
if item is None:
break
checked += 1
try:
data = json.loads(item)
retry_at = data.get("_retry_at", 0)
if now >= retry_at:
# 到期,放回 pending 队列重新处理
self._redis.lpush(REDIS_KEY_ALARM_PENDING, item)
self._logger.debug(
f"重试告警回归 pending: {data.get('alarm_id')}"
)
else:
# 未到期,放回 retry 队列头部
self._redis.lpush(REDIS_KEY_ALARM_RETRY, item)
except json.JSONDecodeError:
pass
except Exception as e:
self._logger.error(f"处理重试队列异常: {e}")
def _reconnect_redis(self):
"""重连 Redis"""
redis_cfg = self._settings.local_redis
try:
self._redis = redis.Redis(
host=redis_cfg.host,
port=redis_cfg.port,
db=redis_cfg.db,
password=redis_cfg.password,
decode_responses=True,
socket_connect_timeout=5,
)
self._redis.ping()
self._logger.info("Redis 重连成功")
except Exception as e:
self._logger.error(f"Redis 重连失败: {e}")
self._redis = None
def get_statistics(self) -> Dict[str, Any]:
"""获取 Worker 统计"""
stats = self._stats.copy()
stats["running"] = self._thread is not None and self._thread.is_alive()
return stats