feat: 告警HTTP上报 + 日志精简 + 边缘节点统一为edge
- 新增 alarm_upload_worker.py 异步告警上报(COS+HTTP) - result_reporter 重构为Redis队列模式 - config_sync 适配WVP直推的聚合配置格式 - settings 默认 EDGE_DEVICE_ID 改为 edge - 日志设置非告警模块为WARNING级别减少噪音 - main.py 集成新的告警上报流程 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
412
core/alarm_upload_worker.py
Normal file
412
core/alarm_upload_worker.py
Normal file
@@ -0,0 +1,412 @@
|
||||
"""
|
||||
告警上报 Worker
|
||||
|
||||
独立线程运行,从 Redis 队列消费告警:
|
||||
1. BRPOP 取待上报告警
|
||||
2. 上传截图到腾讯云 COS
|
||||
3. HTTP POST 上报告警元数据到云端
|
||||
4. 失败重试 / 死信队列
|
||||
|
||||
Redis Key 设计:
|
||||
local:alarm:pending - 待上报告警队列
|
||||
local:alarm:retry - 重试告警队列
|
||||
local:alarm:dead - 死信告警队列
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import redis
|
||||
import requests
|
||||
|
||||
from config.settings import get_settings
|
||||
from core.result_reporter import (
|
||||
REDIS_KEY_ALARM_PENDING,
|
||||
REDIS_KEY_ALARM_RETRY,
|
||||
REDIS_KEY_ALARM_DEAD,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AlarmUploadWorker:
|
||||
"""告警上报 Worker
|
||||
|
||||
在独立线程中运行,从本地 Redis 消费告警数据,
|
||||
上传截图到 COS,再 HTTP 上报告警元数据到云端。
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._settings = get_settings()
|
||||
self._logger = logging.getLogger("alarm_upload_worker")
|
||||
|
||||
self._redis: Optional[redis.Redis] = None
|
||||
self._cos_client = None # 懒初始化
|
||||
|
||||
self._thread: Optional[threading.Thread] = None
|
||||
self._stop_event = threading.Event()
|
||||
|
||||
self._stats = {
|
||||
"processed": 0,
|
||||
"cos_uploaded": 0,
|
||||
"http_reported": 0,
|
||||
"retried": 0,
|
||||
"dead_lettered": 0,
|
||||
"errors": 0,
|
||||
}
|
||||
|
||||
def start(self):
|
||||
"""启动 worker 线程"""
|
||||
if self._thread and self._thread.is_alive():
|
||||
self._logger.warning("AlarmUploadWorker 已在运行")
|
||||
return
|
||||
|
||||
# 初始化 Redis 连接
|
||||
redis_cfg = self._settings.local_redis
|
||||
try:
|
||||
self._redis = redis.Redis(
|
||||
host=redis_cfg.host,
|
||||
port=redis_cfg.port,
|
||||
db=redis_cfg.db,
|
||||
password=redis_cfg.password,
|
||||
decode_responses=True,
|
||||
socket_connect_timeout=5,
|
||||
)
|
||||
self._redis.ping()
|
||||
self._logger.info(f"Worker Redis 连接成功: {redis_cfg.host}:{redis_cfg.port}/{redis_cfg.db}")
|
||||
except Exception as e:
|
||||
self._logger.error(f"Worker Redis 连接失败: {e}")
|
||||
return
|
||||
|
||||
self._stop_event.clear()
|
||||
self._thread = threading.Thread(
|
||||
target=self._worker_loop,
|
||||
name="AlarmUploadWorker",
|
||||
daemon=True,
|
||||
)
|
||||
self._thread.start()
|
||||
self._logger.info("AlarmUploadWorker 已启动")
|
||||
|
||||
def stop(self):
|
||||
"""停止 worker"""
|
||||
if not self._thread or not self._thread.is_alive():
|
||||
return
|
||||
|
||||
self._logger.info("正在停止 AlarmUploadWorker...")
|
||||
self._stop_event.set()
|
||||
|
||||
# 等待线程退出(BRPOP 有超时,所以不会永远阻塞)
|
||||
self._thread.join(timeout=10)
|
||||
|
||||
if self._redis:
|
||||
try:
|
||||
self._redis.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self._logger.info("AlarmUploadWorker 已停止")
|
||||
|
||||
def _worker_loop(self):
|
||||
"""主循环:消费待上报告警"""
|
||||
self._logger.info("Worker 主循环开始")
|
||||
|
||||
while not self._stop_event.is_set():
|
||||
try:
|
||||
# 先检查重试队列
|
||||
self._process_retry_queue()
|
||||
|
||||
# BRPOP 阻塞等待新告警(超时 2 秒,便于检查 stop 信号)
|
||||
result = self._redis.brpop(REDIS_KEY_ALARM_PENDING, timeout=2)
|
||||
if result is None:
|
||||
continue
|
||||
|
||||
_, alarm_json = result
|
||||
self._process_alarm(alarm_json)
|
||||
|
||||
except redis.ConnectionError as e:
|
||||
self._logger.error(f"Redis 连接断开: {e}")
|
||||
time.sleep(5)
|
||||
self._reconnect_redis()
|
||||
except Exception as e:
|
||||
self._logger.error(f"Worker 主循环异常: {e}")
|
||||
self._stats["errors"] += 1
|
||||
time.sleep(1)
|
||||
|
||||
self._logger.info("Worker 主循环退出")
|
||||
|
||||
def _process_alarm(self, alarm_json: str):
|
||||
"""处理单条告警"""
|
||||
try:
|
||||
alarm_data = json.loads(alarm_json)
|
||||
except json.JSONDecodeError as e:
|
||||
self._logger.error(f"告警 JSON 解析失败: {e}")
|
||||
return
|
||||
|
||||
alarm_id = alarm_data.get("alarm_id", "unknown")
|
||||
retry_count = alarm_data.get("_retry_count", 0)
|
||||
|
||||
self._logger.info(f"开始处理告警: {alarm_id} (retry={retry_count})")
|
||||
|
||||
# Step 1: 上传截图到 COS
|
||||
snapshot_local_path = alarm_data.get("snapshot_local_path")
|
||||
object_key = None
|
||||
|
||||
if snapshot_local_path and os.path.exists(snapshot_local_path):
|
||||
object_key = self._upload_snapshot_to_cos(
|
||||
snapshot_local_path,
|
||||
alarm_id,
|
||||
alarm_data.get("device_id", "unknown"),
|
||||
)
|
||||
if object_key is None:
|
||||
# COS 上传失败,进入重试
|
||||
self._handle_retry(alarm_json, "COS 上传失败")
|
||||
return
|
||||
else:
|
||||
if snapshot_local_path:
|
||||
self._logger.warning(f"截图文件不存在: {snapshot_local_path}")
|
||||
|
||||
# Step 2: HTTP 上报告警元数据
|
||||
report_data = {
|
||||
"alarm_id": alarm_data.get("alarm_id"),
|
||||
"alarm_type": alarm_data.get("alarm_type"),
|
||||
"device_id": alarm_data.get("device_id"),
|
||||
"scene_id": alarm_data.get("scene_id"),
|
||||
"event_time": alarm_data.get("event_time"),
|
||||
"alarm_level": alarm_data.get("alarm_level"),
|
||||
"snapshot_url": object_key or "", # COS object_key
|
||||
"algorithm_code": alarm_data.get("algorithm_code"),
|
||||
"confidence_score": alarm_data.get("confidence_score"),
|
||||
"ext_data": alarm_data.get("ext_data", {}),
|
||||
}
|
||||
|
||||
success = self._report_alarm_http(report_data)
|
||||
|
||||
if success:
|
||||
self._stats["processed"] += 1
|
||||
self._logger.info(f"告警上报成功: {alarm_id}")
|
||||
|
||||
# 可选:删除本地截图
|
||||
if snapshot_local_path and os.path.exists(snapshot_local_path):
|
||||
try:
|
||||
os.remove(snapshot_local_path)
|
||||
self._logger.debug(f"已删除本地截图: {snapshot_local_path}")
|
||||
except Exception as e:
|
||||
self._logger.warning(f"删除本地截图失败: {e}")
|
||||
else:
|
||||
# HTTP 上报失败,进入重试
|
||||
self._handle_retry(alarm_json, "HTTP 上报失败")
|
||||
|
||||
def _upload_snapshot_to_cos(
|
||||
self, local_path: str, alarm_id: str, device_id: str
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
上传截图到腾讯云 COS
|
||||
|
||||
Args:
|
||||
local_path: 本地截图路径
|
||||
alarm_id: 告警ID
|
||||
device_id: 设备ID
|
||||
|
||||
Returns:
|
||||
COS object_key,失败返回 None
|
||||
"""
|
||||
cos_cfg = self._settings.cos
|
||||
if not cos_cfg.secret_id or not cos_cfg.bucket:
|
||||
self._logger.warning("COS 未配置,跳过截图上传")
|
||||
return ""
|
||||
|
||||
# 懒初始化 COS 客户端
|
||||
if self._cos_client is None:
|
||||
try:
|
||||
from qcloud_cos import CosConfig, CosS3Client
|
||||
|
||||
config = CosConfig(
|
||||
Region=cos_cfg.region,
|
||||
SecretId=cos_cfg.secret_id,
|
||||
SecretKey=cos_cfg.secret_key,
|
||||
Scheme="https",
|
||||
)
|
||||
self._cos_client = CosS3Client(config)
|
||||
self._logger.info(f"COS 客户端初始化成功: bucket={cos_cfg.bucket}")
|
||||
except Exception as e:
|
||||
self._logger.error(f"COS 客户端初始化失败: {e}")
|
||||
return None
|
||||
|
||||
# 生成 Object Key: alarms/{device_id}/{yyyy-MM-dd}/{alarm_id}.jpg
|
||||
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
||||
object_key = f"alarms/{device_id}/{date_str}/{alarm_id}.jpg"
|
||||
|
||||
try:
|
||||
self._cos_client.put_object_from_local_file(
|
||||
Bucket=cos_cfg.bucket,
|
||||
LocalFilePath=local_path,
|
||||
Key=object_key,
|
||||
)
|
||||
self._stats["cos_uploaded"] += 1
|
||||
self._logger.info(f"COS 上传成功: {object_key}")
|
||||
return object_key
|
||||
|
||||
except Exception as e:
|
||||
self._logger.error(f"COS 上传失败: {e}")
|
||||
return None
|
||||
|
||||
def _report_alarm_http(self, alarm_data: Dict[str, Any]) -> bool:
|
||||
"""
|
||||
HTTP POST 上报告警元数据到云端
|
||||
|
||||
Args:
|
||||
alarm_data: 告警元数据
|
||||
|
||||
Returns:
|
||||
是否上报成功
|
||||
"""
|
||||
upload_cfg = self._settings.alarm_upload
|
||||
url = f"{upload_cfg.cloud_api_url}/admin-api/aiot/alarm/edge/report"
|
||||
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
if upload_cfg.edge_token:
|
||||
headers["Authorization"] = f"Bearer {upload_cfg.edge_token}"
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
url,
|
||||
json=alarm_data,
|
||||
headers=headers,
|
||||
timeout=10,
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
body = response.json()
|
||||
if body.get("code") == 0:
|
||||
self._stats["http_reported"] += 1
|
||||
return True
|
||||
else:
|
||||
self._logger.warning(
|
||||
f"云端返回业务错误: code={body.get('code')}, msg={body.get('msg')}"
|
||||
)
|
||||
return False
|
||||
else:
|
||||
self._logger.warning(f"HTTP 上报失败: status={response.status_code}")
|
||||
return False
|
||||
|
||||
except requests.Timeout:
|
||||
self._logger.warning(f"HTTP 上报超时: {url}")
|
||||
return False
|
||||
except requests.ConnectionError as e:
|
||||
self._logger.warning(f"HTTP 上报连接失败: {e}")
|
||||
return False
|
||||
except Exception as e:
|
||||
self._logger.error(f"HTTP 上报异常: {e}")
|
||||
return False
|
||||
|
||||
def _handle_retry(self, alarm_json: str, error: str):
|
||||
"""处理重试逻辑"""
|
||||
try:
|
||||
alarm_data = json.loads(alarm_json)
|
||||
except json.JSONDecodeError:
|
||||
return
|
||||
|
||||
retry_count = alarm_data.get("_retry_count", 0) + 1
|
||||
max_retry = self._settings.alarm_upload.retry_max
|
||||
|
||||
if retry_count > max_retry:
|
||||
# 超过最大重试次数,写入死信队列
|
||||
alarm_data["_dead_reason"] = error
|
||||
alarm_data["_dead_at"] = datetime.now(timezone.utc).isoformat()
|
||||
dead_json = json.dumps(alarm_data, ensure_ascii=False)
|
||||
self._redis.lpush(REDIS_KEY_ALARM_DEAD, dead_json)
|
||||
self._stats["dead_lettered"] += 1
|
||||
self._logger.warning(
|
||||
f"告警进入死信队列: {alarm_data.get('alarm_id')}, "
|
||||
f"reason={error}, retries={retry_count - 1}"
|
||||
)
|
||||
return
|
||||
|
||||
# 指数退避:base_interval * 2^(retry_count-1)
|
||||
base_interval = self._settings.alarm_upload.retry_interval
|
||||
delay = base_interval * (2 ** (retry_count - 1))
|
||||
|
||||
alarm_data["_retry_count"] = retry_count
|
||||
alarm_data["_retry_at"] = (
|
||||
datetime.now(timezone.utc).timestamp() + delay
|
||||
)
|
||||
retry_json = json.dumps(alarm_data, ensure_ascii=False)
|
||||
|
||||
self._redis.lpush(REDIS_KEY_ALARM_RETRY, retry_json)
|
||||
self._stats["retried"] += 1
|
||||
|
||||
self._logger.info(
|
||||
f"告警将重试: {alarm_data.get('alarm_id')}, "
|
||||
f"retry={retry_count}/{max_retry}, delay={delay}s, reason={error}"
|
||||
)
|
||||
|
||||
def _process_retry_queue(self):
|
||||
"""检查重试队列中到期的告警"""
|
||||
if not self._redis:
|
||||
return
|
||||
|
||||
try:
|
||||
queue_len = self._redis.llen(REDIS_KEY_ALARM_RETRY)
|
||||
if queue_len == 0:
|
||||
return
|
||||
|
||||
now = datetime.now(timezone.utc).timestamp()
|
||||
|
||||
# 逐条检查(FIFO: RPOP 取最早的)
|
||||
checked = 0
|
||||
while checked < queue_len:
|
||||
item = self._redis.rpop(REDIS_KEY_ALARM_RETRY)
|
||||
if item is None:
|
||||
break
|
||||
|
||||
checked += 1
|
||||
|
||||
try:
|
||||
data = json.loads(item)
|
||||
retry_at = data.get("_retry_at", 0)
|
||||
|
||||
if now >= retry_at:
|
||||
# 到期,放回 pending 队列重新处理
|
||||
self._redis.lpush(REDIS_KEY_ALARM_PENDING, item)
|
||||
self._logger.debug(
|
||||
f"重试告警回归 pending: {data.get('alarm_id')}"
|
||||
)
|
||||
else:
|
||||
# 未到期,放回 retry 队列头部
|
||||
self._redis.lpush(REDIS_KEY_ALARM_RETRY, item)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
self._logger.error(f"处理重试队列异常: {e}")
|
||||
|
||||
def _reconnect_redis(self):
|
||||
"""重连 Redis"""
|
||||
redis_cfg = self._settings.local_redis
|
||||
try:
|
||||
self._redis = redis.Redis(
|
||||
host=redis_cfg.host,
|
||||
port=redis_cfg.port,
|
||||
db=redis_cfg.db,
|
||||
password=redis_cfg.password,
|
||||
decode_responses=True,
|
||||
socket_connect_timeout=5,
|
||||
)
|
||||
self._redis.ping()
|
||||
self._logger.info("Redis 重连成功")
|
||||
except Exception as e:
|
||||
self._logger.error(f"Redis 重连失败: {e}")
|
||||
self._redis = None
|
||||
|
||||
def get_statistics(self) -> Dict[str, Any]:
|
||||
"""获取 Worker 统计"""
|
||||
stats = self._stats.copy()
|
||||
stats["running"] = self._thread is not None and self._thread.is_alive()
|
||||
return stats
|
||||
Reference in New Issue
Block a user