feat: 增强告警服务

- 新增 create_alert_from_mqtt 处理 MQTT 告警
- 新增 _determine_level 自动判断告警级别
- 新增 dispatch_alert 派发告警到工单
- 支持告警级别统计

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-05 13:57:05 +08:00
parent 6861fb6653
commit a5ecec7610

View File

@@ -1,18 +1,25 @@
"""
告警服务
处理告警 CRUD 和 MQTT 消息
"""
import uuid import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Optional, List from typing import Optional, List, Dict, Any
from app.models import Alert, AlertStatus, get_session from app.models import Alert, AlertStatus, AlertLevel, get_session
from app.schemas import AlertCreate, AlertHandleRequest from app.schemas import AlertCreate, AlertHandleRequest
from app.services.oss_storage import get_oss_storage from app.services.oss_storage import get_oss_storage
from app.utils.logger import logger from app.utils.logger import logger
class AlertService: class AlertService:
"""告警服务"""
def __init__(self): def __init__(self):
self.oss = get_oss_storage() self.oss = get_oss_storage()
def generate_alert_no(self) -> str: def generate_alert_no(self) -> str:
"""生成告警编号"""
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") timestamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
unique_id = uuid.uuid4().hex[:8].upper() unique_id = uuid.uuid4().hex[:8].upper()
return f"ALT{timestamp}{unique_id}" return f"ALT{timestamp}{unique_id}"
@@ -22,19 +29,24 @@ class AlertService:
alert_data: AlertCreate, alert_data: AlertCreate,
snapshot_data: Optional[bytes] = None, snapshot_data: Optional[bytes] = None,
) -> Alert: ) -> Alert:
"""创建告警HTTP 方式)"""
db = get_session() db = get_session()
try: try:
alert = Alert( alert = Alert(
alert_no=self.generate_alert_no(), alert_no=self.generate_alert_no(),
camera_id=alert_data.camera_id, camera_id=alert_data.camera_id,
roi_id=alert_data.roi_id, roi_id=alert_data.roi_id,
bind_id=getattr(alert_data, 'bind_id', None),
device_id=getattr(alert_data, 'device_id', None),
alert_type=alert_data.alert_type, alert_type=alert_data.alert_type,
algorithm=alert_data.algorithm, algorithm=alert_data.algorithm,
confidence=alert_data.confidence, confidence=alert_data.confidence,
duration_minutes=alert_data.duration_minutes, duration_minutes=alert_data.duration_minutes,
trigger_time=alert_data.trigger_time, trigger_time=alert_data.trigger_time,
message=alert_data.message, message=alert_data.message,
bbox=getattr(alert_data, 'bbox', None),
status=AlertStatus.PENDING, status=AlertStatus.PENDING,
level=AlertLevel.MEDIUM,
) )
if snapshot_data: if snapshot_data:
@@ -51,33 +63,129 @@ class AlertService:
finally: finally:
db.close() db.close()
def create_alert_from_mqtt(self, mqtt_data: Dict[str, Any]) -> Optional[Alert]:
"""从 MQTT 消息创建告警"""
db = get_session()
try:
# 解析时间
trigger_time_str = mqtt_data.get("timestamp")
if trigger_time_str:
try:
trigger_time = datetime.fromisoformat(trigger_time_str.replace("Z", "+00:00"))
except ValueError:
trigger_time = datetime.now(timezone.utc)
else:
trigger_time = datetime.now(timezone.utc)
# 解析置信度MQTT 可能是 0-1需要转为 0-100
confidence = mqtt_data.get("confidence")
if confidence is not None:
if isinstance(confidence, float) and confidence <= 1:
confidence = int(confidence * 100)
else:
confidence = int(confidence)
# 解析持续时长
duration_minutes = mqtt_data.get("duration_minutes")
if duration_minutes is not None:
duration_minutes = int(float(duration_minutes))
alert = Alert(
alert_no=self.generate_alert_no(),
camera_id=mqtt_data.get("camera_id", "unknown"),
roi_id=mqtt_data.get("roi_id"),
bind_id=mqtt_data.get("bind_id"),
device_id=mqtt_data.get("device_id"),
alert_type=mqtt_data.get("alert_type", "unknown"),
algorithm=mqtt_data.get("algorithm", "YOLO"),
confidence=confidence,
duration_minutes=duration_minutes,
trigger_time=trigger_time,
message=mqtt_data.get("message"),
bbox=mqtt_data.get("bbox"),
status=AlertStatus.PENDING,
level=self._determine_level(mqtt_data),
)
db.add(alert)
db.commit()
db.refresh(alert)
logger.info(f"MQTT告警创建成功: {alert.alert_no}, type={alert.alert_type}")
return alert
except Exception as e:
db.rollback()
logger.error(f"创建MQTT告警失败: {e}")
return None
finally:
db.close()
def _determine_level(self, data: Dict[str, Any]) -> AlertLevel:
"""根据告警数据确定告警级别"""
alert_type = data.get("alert_type", "")
confidence = data.get("confidence", 0)
# 根据告警类型和置信度确定级别
if alert_type == "intrusion":
return AlertLevel.HIGH
elif alert_type == "leave_post":
duration = data.get("duration_minutes", 0)
if duration and duration > 30:
return AlertLevel.HIGH
elif duration and duration > 10:
return AlertLevel.MEDIUM
return AlertLevel.LOW
elif confidence and confidence > 0.9:
return AlertLevel.HIGH
elif confidence and confidence > 0.7:
return AlertLevel.MEDIUM
return AlertLevel.MEDIUM
def get_alert(self, alert_id: int) -> Optional[Alert]: def get_alert(self, alert_id: int) -> Optional[Alert]:
"""获取告警详情"""
db = get_session() db = get_session()
try: try:
return db.query(Alert).filter(Alert.id == alert_id).first() return db.query(Alert).filter(Alert.id == alert_id).first()
finally: finally:
db.close() db.close()
def get_alert_by_no(self, alert_no: str) -> Optional[Alert]:
"""根据告警编号获取"""
db = get_session()
try:
return db.query(Alert).filter(Alert.alert_no == alert_no).first()
finally:
db.close()
def get_alerts( def get_alerts(
self, self,
camera_id: Optional[str] = None, camera_id: Optional[str] = None,
device_id: Optional[str] = None,
alert_type: Optional[str] = None, alert_type: Optional[str] = None,
status: Optional[str] = None, status: Optional[str] = None,
level: Optional[str] = None,
start_time: Optional[datetime] = None, start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None, end_time: Optional[datetime] = None,
page: int = 1, page: int = 1,
page_size: int = 20, page_size: int = 20,
) -> tuple[List[Alert], int]: ) -> tuple[List[Alert], int]:
"""获取告警列表"""
db = get_session() db = get_session()
try: try:
query = db.query(Alert) query = db.query(Alert)
if camera_id: if camera_id:
query = query.filter(Alert.camera_id == camera_id) query = query.filter(Alert.camera_id == camera_id)
if device_id:
query = query.filter(Alert.device_id == device_id)
if alert_type: if alert_type:
query = query.filter(Alert.alert_type == alert_type) query = query.filter(Alert.alert_type == alert_type)
if status: if status:
query = query.filter(Alert.status == status) query = query.filter(Alert.status == status)
if level:
query = query.filter(Alert.level == level)
if start_time: if start_time:
query = query.filter(Alert.trigger_time >= start_time) query = query.filter(Alert.trigger_time >= start_time)
if end_time: if end_time:
@@ -101,6 +209,7 @@ class AlertService:
handle_data: AlertHandleRequest, handle_data: AlertHandleRequest,
handled_by: Optional[str] = None, handled_by: Optional[str] = None,
) -> Optional[Alert]: ) -> Optional[Alert]:
"""处理告警"""
db = get_session() db = get_session()
try: try:
alert = db.query(Alert).filter(Alert.id == alert_id).first() alert = db.query(Alert).filter(Alert.id == alert_id).first()
@@ -122,7 +231,50 @@ class AlertService:
finally: finally:
db.close() db.close()
def dispatch_alert(self, alert_id: int, work_order_id: int) -> Optional[Alert]:
"""派发告警(关联工单)"""
db = get_session()
try:
alert = db.query(Alert).filter(Alert.id == alert_id).first()
if not alert:
return None
alert.status = AlertStatus.DISPATCHED
alert.work_order_id = work_order_id
alert.updated_at = datetime.now(timezone.utc)
db.commit()
db.refresh(alert)
logger.info(f"告警已派单: {alert.alert_no} -> 工单 {work_order_id}")
return alert
finally:
db.close()
def delete_alert(self, alert_id: int) -> bool:
"""删除告警"""
db = get_session()
try:
alert = db.query(Alert).filter(Alert.id == alert_id).first()
if not alert:
return False
db.delete(alert)
db.commit()
logger.info(f"告警已删除: {alert.alert_no}")
return True
except Exception as e:
db.rollback()
logger.error(f"删除告警失败: {e}")
return False
finally:
db.close()
def get_statistics(self) -> dict: def get_statistics(self) -> dict:
"""获取告警统计"""
db = get_session() db = get_session()
try: try:
total = db.query(Alert).count() total = db.query(Alert).count()
@@ -130,24 +282,32 @@ class AlertService:
confirmed = db.query(Alert).filter(Alert.status == AlertStatus.CONFIRMED).count() confirmed = db.query(Alert).filter(Alert.status == AlertStatus.CONFIRMED).count()
ignored = db.query(Alert).filter(Alert.status == AlertStatus.IGNORED).count() ignored = db.query(Alert).filter(Alert.status == AlertStatus.IGNORED).count()
resolved = db.query(Alert).filter(Alert.status == AlertStatus.RESOLVED).count() resolved = db.query(Alert).filter(Alert.status == AlertStatus.RESOLVED).count()
dispatched = db.query(Alert).filter(Alert.status == AlertStatus.DISPATCHED).count()
by_type = {} by_type = {}
for alert in db.query(Alert.alert_type).distinct(): for alert in db.query(Alert.alert_type).distinct():
alert_type = alert[0] alert_type = alert[0]
by_type[alert_type] = db.query(Alert).filter(Alert.alert_type == alert_type).count() by_type[alert_type] = db.query(Alert).filter(Alert.alert_type == alert_type).count()
by_level = {}
for level in AlertLevel:
by_level[level.value] = db.query(Alert).filter(Alert.level == level).count()
return { return {
"total": total, "total": total,
"pending": pending, "pending": pending,
"confirmed": confirmed, "confirmed": confirmed,
"ignored": ignored, "ignored": ignored,
"resolved": resolved, "resolved": resolved,
"dispatched": dispatched,
"by_type": by_type, "by_type": by_type,
"by_level": by_level,
} }
finally: finally:
db.close() db.close()
def update_ai_analysis(self, alert_id: int, analysis: dict) -> Optional[Alert]: def update_ai_analysis(self, alert_id: int, analysis: dict) -> Optional[Alert]:
"""更新 AI 分析结果"""
db = get_session() db = get_session()
try: try:
alert = db.query(Alert).filter(Alert.id == alert_id).first() alert = db.query(Alert).filter(Alert.id == alert_id).first()
@@ -160,8 +320,10 @@ class AlertService:
db.close() db.close()
# 全局单例
alert_service = AlertService() alert_service = AlertService()
def get_alert_service() -> AlertService: def get_alert_service() -> AlertService:
"""获取告警服务单例"""
return alert_service return alert_service