Files
iot-device-management-service/app/services/alarm_event_service.py
16337 a3797e7508 性能:看板数据合并为单次请求 + 摄像头名称缓存
- 新增 GET /alert/dashboard 聚合接口,一次返回全部看板数据
- 共用同一个 DB session 执行所有查询,减少连接开销
- 摄像头名称服务增加 5 分钟内存缓存,避免重复查询 WVP
- 设备Top10 和最近告警共用一次批量摄像头名称查询

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 17:31:18 +08:00

948 lines
35 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
告警事件服务(新三表结构)
处理 alarm_event / alarm_event_ext / alarm_llm_analysis 的 CRUD
"""
import uuid
from datetime import datetime, timezone, timedelta
from typing import Optional, List, Dict, Any, Tuple
from sqlalchemy import func, cast, Date, Integer, extract, text
from app.models import AlarmEvent, AlarmEventExt, AlarmLlmAnalysis, get_session
from app.services.oss_storage import get_oss_storage
from app.utils.logger import logger
from app.utils.timezone import beijing_now
def generate_alarm_id() -> str:
"""生成告警ID: ALM + YYYYMMDDHHmmss + 8位uuid"""
timestamp = beijing_now().strftime("%Y%m%d%H%M%S")
unique_id = uuid.uuid4().hex[:8].upper()
return f"ALM{timestamp}{unique_id}"
def _determine_alarm_level(
alarm_type: str,
confidence: float,
duration_ms: Optional[int] = None,
initial_level: Optional[int] = None,
) -> int:
"""
根据告警类型、置信度和持续时长确定告警级别
返回: 0紧急 1重要 2普通 3轻微
三层决策:
1. 初始等级Edge 上报的 alarm_level来自 ROI 算法绑定配置)
2. 时长升级:持续型告警随时长升级(只升不降)
3. 无配置时使用算法默认等级
"""
# 算法默认等级
default_levels = {
"intrusion": 1, # 重要
"leave_post": 2, # 普通
"illegal_parking": 1, # 重要
"vehicle_congestion": 2, # 普通
}
base_level = initial_level if initial_level is not None else default_levels.get(alarm_type, 2)
# 入侵检测:事件型,不升级
if alarm_type == "intrusion":
return base_level
# 持续型告警:根据时长升级(只升不降,即取较小值)
if duration_ms is not None:
escalated = base_level
if alarm_type == "leave_post":
if duration_ms > 60 * 60 * 1000: # >1h → 紧急
escalated = 0
elif duration_ms > 30 * 60 * 1000: # >30min → 重要
escalated = 1
elif alarm_type == "illegal_parking":
if duration_ms > 60 * 60 * 1000: # >1h → 紧急
escalated = 0
elif alarm_type == "vehicle_congestion":
if duration_ms > 30 * 60 * 1000: # >30min → 重要
escalated = 1
# 只升不降
return min(base_level, escalated)
return base_level
class AlarmEventService:
"""告警事件服务"""
def __init__(self):
self.oss = get_oss_storage()
def create_from_mqtt(self, mqtt_data: Dict[str, Any]) -> Optional[AlarmEvent]:
"""从 MQTT 消息创建告警事件"""
db = get_session()
try:
alarm_id = generate_alarm_id()
# 解析时间
timestamp_str = mqtt_data.get("timestamp")
if timestamp_str:
try:
event_time = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
except ValueError:
event_time = beijing_now()
else:
event_time = beijing_now()
# 解析 first_frame_time告警首次触发时间
first_frame_str = mqtt_data.get("first_frame_time")
first_frame_time = None
if first_frame_str:
try:
first_frame_time = datetime.fromisoformat(first_frame_str.replace("Z", "+00:00"))
except ValueError:
pass
# 置信度保持 float 0-1
confidence = mqtt_data.get("confidence")
if confidence is not None:
confidence = float(confidence)
# 如果传入的是 0-100 范围的值,转为 0-1
if confidence > 1:
confidence = confidence / 100.0
alarm_type = mqtt_data.get("alert_type", "unknown")
# 告警创建时不传递 duration_ms传递 None
alarm_level = _determine_alarm_level(alarm_type, confidence or 0, None)
alarm = AlarmEvent(
alarm_id=alarm_id,
alarm_type=alarm_type,
algorithm_code=mqtt_data.get("algorithm", "YOLO"),
device_id=mqtt_data.get("camera_id", "unknown"),
scene_id=mqtt_data.get("roi_id"),
event_time=event_time,
first_frame_time=first_frame_time,
duration_ms=None,
last_frame_time=None,
alarm_level=alarm_level,
confidence_score=confidence,
alarm_status="NEW",
handle_status="UNHANDLED",
edge_node_id=mqtt_data.get("device_id"),
)
db.add(alarm)
# 写入扩展表
ext_data = {}
for key in ("bind_id", "target_class", "bbox", "message", "alert_id"):
val = mqtt_data.get(key)
if val is not None:
ext_key = "edge_alert_id" if key == "alert_id" else key
ext_data[ext_key] = val
if ext_data:
ext = AlarmEventExt(
alarm_id=alarm_id,
ext_type="EDGE",
ext_data=ext_data,
)
db.add(ext)
db.commit()
db.refresh(alarm)
logger.info(f"新告警事件创建: {alarm_id}, type={alarm_type}")
return alarm
except Exception as e:
db.rollback()
logger.error(f"创建告警事件失败: {e}")
return None
finally:
db.close()
def create_from_edge_report(self, data: Dict[str, Any]) -> Optional[AlarmEvent]:
"""
从边缘端 HTTP 上报创建告警事件
边缘端通过 POST /admin-api/aiot/alarm/edge/report 上报告警。
使用边缘端生成的 alarm_id支持幂等重复 alarm_id 跳过)。
Args:
data: 边缘端上报数据,字段与 alarm_event 表对齐
Returns:
AlarmEvent 或 None
"""
db = get_session()
try:
alarm_id = data.get("alarm_id")
if not alarm_id:
logger.error("边缘上报缺少 alarm_id")
return None
# 幂等校验alarm_id 已存在则跳过
existing = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first()
if existing:
logger.info(f"告警已存在,跳过: {alarm_id}")
return existing
# 解析时间
event_time_str = data.get("event_time")
if event_time_str:
try:
event_time = datetime.fromisoformat(event_time_str.replace("Z", "+00:00"))
except ValueError:
event_time = beijing_now()
else:
event_time = beijing_now()
# 置信度
confidence = data.get("confidence_score")
if confidence is not None:
confidence = float(confidence)
if confidence > 1:
confidence = confidence / 100.0
alarm_type = data.get("alarm_type", "unknown")
alarm_level = data.get("alarm_level")
ext_data = data.get("ext_data") or {}
if alarm_level is None:
# Edge 未提供等级,使用算法默认 + 时长升级
duration_ms = ext_data.get("duration_ms")
alarm_level = _determine_alarm_level(alarm_type, confidence or 0, duration_ms)
else:
# Edge 提供了初始等级,检查是否需要时长升级
duration_ms = ext_data.get("duration_ms")
if duration_ms is not None:
alarm_level = _determine_alarm_level(alarm_type, confidence or 0, duration_ms, initial_level=alarm_level)
# 解析 first_frame_time离岗开始时间由 Edge 在 ext_data 中传递)
first_frame_time = None
first_frame_time_str = ext_data.get("first_frame_time")
if first_frame_time_str:
try:
first_frame_time = datetime.fromisoformat(first_frame_time_str.replace("Z", "+00:00"))
except ValueError:
first_frame_time = None
alarm = AlarmEvent(
alarm_id=alarm_id,
alarm_type=alarm_type,
algorithm_code=data.get("algorithm_code"),
device_id=data.get("device_id", "unknown"),
scene_id=data.get("scene_id"),
event_time=event_time,
first_frame_time=first_frame_time,
duration_ms=ext_data.get("duration_ms"),
alarm_level=alarm_level,
confidence_score=confidence,
alarm_status="NEW",
handle_status="UNHANDLED",
snapshot_url=data.get("snapshot_url"),
edge_node_id=ext_data.get("edge_node_id"),
area_id=ext_data.get("area_id"),
)
db.add(alarm)
# 写入扩展表
ext_data = data.get("ext_data")
if ext_data:
ext = AlarmEventExt(
alarm_id=alarm_id,
ext_type="EDGE_HTTP",
ext_data=ext_data,
)
db.add(ext)
db.commit()
db.refresh(alarm)
logger.info(f"边缘端告警创建成功: {alarm_id}, type={alarm_type}, device={data.get('device_id')}")
return alarm
except Exception as e:
db.rollback()
logger.error(f"创建边缘端告警失败: {e}")
return None
finally:
db.close()
def create_from_http(self, data: Dict[str, Any], snapshot_data: Optional[bytes] = None) -> Optional[AlarmEvent]:
"""从 HTTP 请求创建告警事件"""
db = get_session()
try:
alarm_id = generate_alarm_id()
# 解析时间
trigger_time = data.get("trigger_time") or data.get("timestamp")
if isinstance(trigger_time, str):
try:
event_time = datetime.fromisoformat(trigger_time.replace("Z", "+00:00"))
except ValueError:
event_time = beijing_now()
elif isinstance(trigger_time, datetime):
event_time = trigger_time
else:
event_time = beijing_now()
confidence = data.get("confidence")
if confidence is not None:
confidence = float(confidence)
if confidence > 1:
confidence = confidence / 100.0
duration_minutes = data.get("duration_minutes")
duration_ms = None
if duration_minutes is not None:
duration_ms = int(float(duration_minutes) * 60 * 1000)
alarm_type = data.get("alert_type", "unknown")
alarm_level = _determine_alarm_level(alarm_type, confidence or 0, duration_ms)
snapshot_url = None
if snapshot_data:
snapshot_url = self.oss.upload_image(snapshot_data)
alarm = AlarmEvent(
alarm_id=alarm_id,
alarm_type=alarm_type,
algorithm_code=data.get("algorithm"),
device_id=data.get("camera_id", "unknown"),
scene_id=data.get("roi_id"),
event_time=event_time,
duration_ms=duration_ms,
alarm_level=alarm_level,
confidence_score=confidence,
alarm_status="NEW",
handle_status="UNHANDLED",
snapshot_url=snapshot_url,
edge_node_id=data.get("device_id"),
)
db.add(alarm)
# 写入扩展表
ext_data = {}
for key in ("bind_id", "target_class", "bbox", "message"):
val = data.get(key)
if val is not None:
ext_data[key] = val
if ext_data:
ext = AlarmEventExt(
alarm_id=alarm_id,
ext_type="POST",
ext_data=ext_data,
)
db.add(ext)
db.commit()
db.refresh(alarm)
logger.info(f"HTTP告警事件创建: {alarm_id}")
return alarm
except Exception as e:
db.rollback()
logger.error(f"HTTP创建告警事件失败: {e}")
return None
finally:
db.close()
def get_alarm(self, alarm_id: str) -> Optional[Dict]:
"""获取告警详情(含扩展信息)"""
db = get_session()
try:
alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first()
if not alarm:
return None
result = alarm.to_dict()
# 关联扩展
ext = db.query(AlarmEventExt).filter(AlarmEventExt.alarm_id == alarm_id).first()
if ext:
result["ext"] = ext.to_dict()
# 关联 LLM 分析
analyses = db.query(AlarmLlmAnalysis).filter(
AlarmLlmAnalysis.alarm_id == alarm_id
).order_by(AlarmLlmAnalysis.created_at.desc()).all()
if analyses:
result["llm_analyses"] = [a.to_dict() for a in analyses]
return result
finally:
db.close()
def get_alarms(
self,
device_id: Optional[str] = None,
alarm_type: Optional[str] = None,
alarm_status: Optional[str] = None,
alarm_level: Optional[int] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
edge_node_id: Optional[str] = None,
page: int = 1,
page_size: int = 20,
) -> Tuple[List[AlarmEvent], int]:
"""分页查询告警列表"""
db = get_session()
try:
query = db.query(AlarmEvent)
if device_id:
query = query.filter(AlarmEvent.device_id == device_id)
if alarm_type:
query = query.filter(AlarmEvent.alarm_type == alarm_type)
if alarm_status:
query = query.filter(AlarmEvent.alarm_status == alarm_status)
if alarm_level is not None:
query = query.filter(AlarmEvent.alarm_level == alarm_level)
if edge_node_id:
query = query.filter(AlarmEvent.edge_node_id == edge_node_id)
if start_time:
query = query.filter(AlarmEvent.event_time >= start_time)
if end_time:
query = query.filter(AlarmEvent.event_time <= end_time)
total = query.count()
alarms = (
query.order_by(AlarmEvent.event_time.desc())
.offset((page - 1) * page_size)
.limit(page_size)
.all()
)
return alarms, total
finally:
db.close()
def handle_alarm(
self,
alarm_id: str,
alarm_status: Optional[str] = None,
handle_status: Optional[str] = None,
remark: Optional[str] = None,
handler: Optional[str] = None,
) -> Optional[AlarmEvent]:
"""处理告警"""
db = get_session()
try:
alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first()
if not alarm:
return None
if alarm_status:
alarm.alarm_status = alarm_status
if handle_status:
alarm.handle_status = handle_status
if remark is not None:
alarm.handle_remark = remark
if handler:
alarm.handler = handler
now = beijing_now()
alarm.handled_at = now
alarm.updated_at = now
# 如果没有 duration_ms 且有 event_time计算告警时长
if alarm.duration_ms is None and alarm.event_time:
try:
delta = now - alarm.event_time
alarm.duration_ms = int(delta.total_seconds() * 1000)
except Exception:
pass
# 如果没有 last_frame_time设置为当前处理时间
if alarm.last_frame_time is None:
alarm.last_frame_time = now
db.commit()
db.refresh(alarm)
logger.info(f"告警已处理: {alarm_id}, status={alarm_status}")
return alarm
finally:
db.close()
def delete_alarm(self, alarm_id: str) -> bool:
"""删除告警(含扩展和分析)"""
db = get_session()
try:
alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first()
if not alarm:
return False
# 删除关联数据
db.query(AlarmEventExt).filter(AlarmEventExt.alarm_id == alarm_id).delete()
db.query(AlarmLlmAnalysis).filter(AlarmLlmAnalysis.alarm_id == alarm_id).delete()
db.delete(alarm)
db.commit()
logger.info(f"告警已删除: {alarm_id}")
return True
except Exception as e:
db.rollback()
logger.error(f"删除告警失败: {e}")
return False
finally:
db.close()
def get_statistics(self) -> Dict:
"""获取告警统计(扩展版:含今日、昨日、待处理、平均响应时间)"""
db = get_session()
try:
total = db.query(AlarmEvent).count()
now = beijing_now()
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday_start = today_start - timedelta(days=1)
# 今日告警数
today_count = db.query(AlarmEvent).filter(
AlarmEvent.event_time >= today_start
).count()
# 昨日告警数
yesterday_count = db.query(AlarmEvent).filter(
AlarmEvent.event_time >= yesterday_start,
AlarmEvent.event_time < today_start,
).count()
# 待处理数
pending_count = db.query(AlarmEvent).filter(
AlarmEvent.handle_status == "UNHANDLED"
).count()
# 已处理数
handled_count = db.query(AlarmEvent).filter(
AlarmEvent.handle_status.in_(["DONE", "IGNORED"])
).count()
# 平均响应时间(从 event_time 到 handled_at只算已处理且时间合理的
from sqlalchemy.sql.expression import literal_column
avg_response = db.query(
func.avg(
func.timestampdiff(
literal_column("MINUTE"),
AlarmEvent.event_time,
AlarmEvent.handled_at
)
)
).filter(
AlarmEvent.handled_at.isnot(None),
AlarmEvent.handled_at > AlarmEvent.event_time,
).scalar()
# 按 alarm_status 计数
by_status = {}
for row in db.query(
AlarmEvent.alarm_status, func.count(AlarmEvent.alarm_id)
).group_by(AlarmEvent.alarm_status).all():
by_status[row[0]] = row[1]
# 按 alarm_type 计数
by_type = {}
for row in db.query(
AlarmEvent.alarm_type, func.count(AlarmEvent.alarm_id)
).group_by(AlarmEvent.alarm_type).all():
by_type[row[0]] = row[1]
# 按 alarm_level 计数
by_level = {}
for row in db.query(
AlarmEvent.alarm_level, func.count(AlarmEvent.alarm_id)
).group_by(AlarmEvent.alarm_level).all():
by_level[row[0]] = row[1]
return {
"total": total,
"todayCount": today_count,
"yesterdayCount": yesterday_count,
"pendingCount": pending_count,
"handledCount": handled_count,
"avgResponseMinutes": round(float(avg_response), 1) if avg_response else None,
"byStatus": by_status,
"byType": by_type,
"byLevel": by_level,
}
finally:
db.close()
def get_trend(self, days: int = 7) -> List[Dict]:
"""获取告警趋势(按天+按类型分组)"""
db = get_session()
try:
now = beijing_now()
start = (now - timedelta(days=days)).replace(hour=0, minute=0, second=0, microsecond=0)
rows = db.query(
func.date(AlarmEvent.event_time).label("date"),
AlarmEvent.alarm_type,
func.count(AlarmEvent.alarm_id).label("cnt"),
).filter(
AlarmEvent.event_time >= start
).group_by(
func.date(AlarmEvent.event_time),
AlarmEvent.alarm_type,
).all()
# 构造日期 → {type: count} 映射
date_map: Dict[str, Dict[str, int]] = {}
for r in rows:
d = str(r.date)
if d not in date_map:
date_map[d] = {}
date_map[d][r.alarm_type] = r.cnt
# 补全所有日期
result = []
for i in range(days):
d = (start + timedelta(days=i)).strftime("%Y-%m-%d")
type_counts = date_map.get(d, {})
total = sum(type_counts.values())
result.append({
"date": d,
"total": total,
**type_counts,
})
return result
finally:
db.close()
def get_device_top(self, limit: int = 10, days: int = 7) -> List[Dict]:
"""获取告警最多的设备 Top N"""
db = get_session()
try:
now = beijing_now()
start = (now - timedelta(days=days)).replace(hour=0, minute=0, second=0, microsecond=0)
rows = db.query(
AlarmEvent.device_id,
func.count(AlarmEvent.alarm_id).label("cnt"),
).filter(
AlarmEvent.event_time >= start
).group_by(
AlarmEvent.device_id
).order_by(
func.count(AlarmEvent.alarm_id).desc()
).limit(limit).all()
return [{"deviceId": r.device_id, "count": r.cnt} for r in rows]
finally:
db.close()
def get_hour_distribution(self, days: int = 7) -> List[Dict]:
"""获取 24 小时告警分布"""
db = get_session()
try:
now = beijing_now()
start = (now - timedelta(days=days)).replace(hour=0, minute=0, second=0, microsecond=0)
rows = db.query(
func.hour(AlarmEvent.event_time).label("h"),
func.count(AlarmEvent.alarm_id).label("cnt"),
).filter(
AlarmEvent.event_time >= start
).group_by(
func.hour(AlarmEvent.event_time)
).all()
hour_map = {r.h: r.cnt for r in rows}
return [{"hour": h, "count": hour_map.get(h, 0)} for h in range(24)]
finally:
db.close()
def get_dashboard_data(self, trend_days: int = 7) -> Dict:
"""一次查询返回看板所有数据(共用一个 DB session减少连接开销"""
db = get_session()
try:
now = beijing_now()
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday_start = today_start - timedelta(days=1)
trend_start = (now - timedelta(days=trend_days)).replace(hour=0, minute=0, second=0, microsecond=0)
week_start = (now - timedelta(days=7)).replace(hour=0, minute=0, second=0, microsecond=0)
# === 统计概览 ===
total = db.query(AlarmEvent).count()
today_count = db.query(AlarmEvent).filter(AlarmEvent.event_time >= today_start).count()
yesterday_count = db.query(AlarmEvent).filter(
AlarmEvent.event_time >= yesterday_start, AlarmEvent.event_time < today_start
).count()
pending_count = db.query(AlarmEvent).filter(AlarmEvent.handle_status == "UNHANDLED").count()
handled_count = db.query(AlarmEvent).filter(AlarmEvent.handle_status.in_(["DONE", "IGNORED"])).count()
from sqlalchemy.sql.expression import literal_column
avg_response = db.query(
func.avg(func.timestampdiff(literal_column("MINUTE"), AlarmEvent.event_time, AlarmEvent.handled_at))
).filter(AlarmEvent.handled_at.isnot(None), AlarmEvent.handled_at > AlarmEvent.event_time).scalar()
by_type = {}
for r in db.query(AlarmEvent.alarm_type, func.count(AlarmEvent.alarm_id)).group_by(AlarmEvent.alarm_type).all():
by_type[r[0]] = r[1]
by_level = {}
for r in db.query(AlarmEvent.alarm_level, func.count(AlarmEvent.alarm_id)).group_by(AlarmEvent.alarm_level).all():
by_level[r[0]] = r[1]
# === 趋势 ===
trend_rows = db.query(
func.date(AlarmEvent.event_time).label("date"),
AlarmEvent.alarm_type,
func.count(AlarmEvent.alarm_id).label("cnt"),
).filter(AlarmEvent.event_time >= trend_start).group_by(
func.date(AlarmEvent.event_time), AlarmEvent.alarm_type
).all()
date_map: Dict[str, Dict[str, int]] = {}
for r in trend_rows:
d = str(r.date)
if d not in date_map:
date_map[d] = {}
date_map[d][r.alarm_type] = r.cnt
trend = []
for i in range(trend_days):
d = (trend_start + timedelta(days=i)).strftime("%Y-%m-%d")
tc = date_map.get(d, {})
trend.append({"date": d, "total": sum(tc.values()), **tc})
# === 设备 Top10近7天 ===
device_rows = db.query(
AlarmEvent.device_id, func.count(AlarmEvent.alarm_id).label("cnt")
).filter(AlarmEvent.event_time >= week_start).group_by(
AlarmEvent.device_id
).order_by(func.count(AlarmEvent.alarm_id).desc()).limit(10).all()
device_top = [{"deviceId": r.device_id, "count": r.cnt} for r in device_rows]
# === 24h 分布近7天 ===
hour_rows = db.query(
func.hour(AlarmEvent.event_time).label("h"),
func.count(AlarmEvent.alarm_id).label("cnt"),
).filter(AlarmEvent.event_time >= week_start).group_by(
func.hour(AlarmEvent.event_time)
).all()
hour_map = {r.h: r.cnt for r in hour_rows}
hour_dist = [{"hour": h, "count": hour_map.get(h, 0)} for h in range(24)]
# === 最近10条告警 ===
recent = db.query(AlarmEvent).order_by(AlarmEvent.event_time.desc()).limit(10).all()
recent_list = [a.to_dict() for a in recent]
return {
"statistics": {
"total": total,
"todayCount": today_count,
"yesterdayCount": yesterday_count,
"pendingCount": pending_count,
"handledCount": handled_count,
"avgResponseMinutes": round(float(avg_response), 1) if avg_response else None,
"byType": by_type,
"byLevel": by_level,
},
"trend": trend,
"deviceTop": device_top,
"hourDistribution": hour_dist,
"recentAlerts": recent_list,
}
finally:
db.close()
def get_device_summary(
self,
page: int = 1,
page_size: int = 10,
) -> Dict:
"""按设备(摄像头)分组统计告警汇总"""
db = get_session()
try:
query = db.query(
AlarmEvent.device_id,
func.count(AlarmEvent.alarm_id).label("total_count"),
func.max(AlarmEvent.event_time).label("last_event_time"),
).group_by(AlarmEvent.device_id)
total = query.count()
results = (
query.order_by(func.count(AlarmEvent.alarm_id).desc())
.offset((page - 1) * page_size)
.limit(page_size)
.all()
)
summary_list = []
for row in results:
# 该设备待处理数量
unhandled_count = (
db.query(AlarmEvent)
.filter(AlarmEvent.device_id == row.device_id)
.filter(AlarmEvent.handle_status == "UNHANDLED")
.count()
)
# 最新一条告警
latest = (
db.query(AlarmEvent)
.filter(AlarmEvent.device_id == row.device_id)
.order_by(AlarmEvent.event_time.desc())
.first()
)
summary_list.append({
"deviceId": row.device_id,
"deviceName": row.device_id,
"totalCount": row.total_count,
"unhandledCount": unhandled_count,
"lastEventTime": row.last_event_time.isoformat() if row.last_event_time else None,
"lastAlarmType": latest.alarm_type if latest else None,
"lastAlarmTypeName": latest.alarm_type if latest else None,
})
return {
"list": summary_list,
"total": total,
}
finally:
db.close()
def resolve_alarm(self, alarm_id: str, duration_ms: int, last_frame_time: str, resolve_type: str) -> bool:
"""
更新告警的持续时长和结束时间
先到先得如果告警已被人工处理到终态CLOSED/FALSE仅更新时长不覆盖状态。
"""
db = get_session()
try:
alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first()
if not alarm:
return False
alarm.duration_ms = duration_ms
# 根据持续时长重新计算告警等级(只升不降)
escalated_level = _determine_alarm_level(
alarm.alarm_type, alarm.confidence_score or 0,
duration_ms, initial_level=alarm.alarm_level,
)
if escalated_level < alarm.alarm_level:
logger.info(f"告警等级升级: {alarm_id}, {alarm.alarm_level}{escalated_level}")
alarm.alarm_level = escalated_level
# 解析 last_frame_time去除微秒保持格式一致
try:
parsed_time = datetime.fromisoformat(last_frame_time.replace("Z", "+00:00"))
alarm.last_frame_time = parsed_time.replace(microsecond=0)
except Exception:
alarm.last_frame_time = beijing_now().replace(microsecond=0)
# 先到先得:已被人工处理到终态的不覆盖
terminal_statuses = ("CLOSED", "FALSE")
terminal_handle = ("DONE", "IGNORED")
if alarm.alarm_status in terminal_statuses or alarm.handle_status in terminal_handle:
logger.info(f"告警已为终态({alarm.alarm_status}/{alarm.handle_status}),仅更新时长: {alarm_id}")
elif resolve_type == "person_returned":
alarm.alarm_status = "CLOSED"
alarm.handle_status = "DONE"
alarm.handle_remark = "人员回岗自动关闭"
alarm.handled_at = beijing_now()
elif resolve_type == "non_work_time":
alarm.alarm_status = "CLOSED"
alarm.handle_status = "DONE"
alarm.handle_remark = "非工作时间自动关闭"
alarm.handled_at = beijing_now()
elif resolve_type == "intrusion_cleared":
alarm.alarm_status = "CLOSED"
alarm.handle_status = "DONE"
alarm.handle_remark = "入侵消失自动关闭持续无人180秒"
alarm.handled_at = beijing_now()
alarm.updated_at = beijing_now()
db.commit()
logger.info(f"告警已更新结束信息: {alarm_id}, duration={duration_ms}ms, type={resolve_type}")
return True
except Exception as e:
db.rollback()
logger.error(f"更新告警结束信息失败: {e}")
return False
finally:
db.close()
def is_alarm_terminal(self, alarm_id: str) -> bool:
"""判断告警是否已到终态(先到先得判断用)"""
db = get_session()
try:
alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first()
if not alarm:
return False
return alarm.alarm_status in ("CLOSED", "FALSE") or alarm.handle_status in ("DONE", "IGNORED")
except Exception:
return False
finally:
db.close()
def count_alarms_by_edge_node(self, edge_node_id: str) -> int:
"""统计指定边缘节点的告警数量"""
db = get_session()
try:
count = db.query(AlarmEvent).filter(
AlarmEvent.edge_node_id == edge_node_id
).count()
return count
except Exception as e:
logger.error(f"统计边缘节点告警数失败: {e}")
return 0
finally:
db.close()
def save_llm_analysis(
self,
alarm_id: str,
llm_model: str,
analysis_type: str,
summary: Optional[str] = None,
is_false_alarm: Optional[bool] = None,
risk_score: Optional[int] = None,
confidence_score: Optional[float] = None,
suggestion: Optional[str] = None,
) -> Optional[AlarmLlmAnalysis]:
"""保存大模型分析结果"""
db = get_session()
try:
analysis = AlarmLlmAnalysis(
alarm_id=alarm_id,
llm_model=llm_model,
analysis_type=analysis_type,
summary=summary,
is_false_alarm=is_false_alarm,
risk_score=risk_score,
confidence_score=confidence_score,
suggestion=suggestion,
)
db.add(analysis)
db.commit()
db.refresh(analysis)
logger.info(f"LLM分析结果已保存: alarm={alarm_id}, model={llm_model}")
return analysis
except Exception as e:
db.rollback()
logger.error(f"保存LLM分析失败: {e}")
return None
finally:
db.close()
# 全局单例
alarm_event_service = AlarmEventService()
def get_alarm_event_service() -> AlarmEventService:
"""获取告警事件服务单例"""
return alarm_event_service