Files
16337 789dc6a373 fix: 修复告警时间戳格式 - 移除微秒保持一致性
问题描述:
- 告警结束时间显示过多小数位(如 2026-02-12T14:23:42.331566)
- 与触发时间格式不一致(2026-02-12 14:23:24)

修改内容:
1. app/models.py
   - AlarmEvent.to_dict() 使用 strftime 格式化所有时间戳
   - 统一格式为 'YYYY-MM-DD HH:MM:SS'(去除微秒和T分隔符)

2. app/services/alarm_event_service.py
   - resolve_alarm() 解析 last_frame_time 时去除微秒
   - 确保数据库存储的时间戳格式一致

影响范围:
- 告警事件API响应格式
- 前端显示更加简洁统一

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-12 14:57:03 +08:00

413 lines
16 KiB
Python

"""
数据模型定义
"""
import enum
import os
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import (
Column, String, Integer, SmallInteger, BigInteger, Boolean, Float, DateTime, Text, Enum, JSON,
ForeignKey, create_engine, Index
)
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
from sqlalchemy.pool import StaticPool
from app.config import settings
Base = declarative_base()
# ==================== 枚举定义 ====================
class AlertStatus(str, enum.Enum):
"""告警状态"""
PENDING = "pending" # 待处理
CONFIRMED = "confirmed" # 已确认
IGNORED = "ignored" # 已忽略
RESOLVED = "resolved" # 已解决
DISPATCHED = "dispatched" # 已派单
class AlertLevel(str, enum.Enum):
"""告警级别"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class WorkOrderStatus(str, enum.Enum):
"""工单状态"""
CREATED = "created" # 已创建
ASSIGNED = "assigned" # 已派发
PROCESSING = "processing" # 处理中
COMPLETED = "completed" # 已完成
CLOSED = "closed" # 已关闭
class WorkOrderPriority(str, enum.Enum):
"""工单优先级"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
class DeviceStatus(str, enum.Enum):
"""设备状态"""
ONLINE = "online"
OFFLINE = "offline"
ERROR = "error"
# ==================== 数据模型 ====================
class Alert(Base):
"""告警表"""
__tablename__ = "alerts"
id = Column(Integer, primary_key=True, autoincrement=True)
alert_no = Column(String(32), unique=True, nullable=False, index=True)
# 来源信息
camera_id = Column(String(64), nullable=False, index=True)
roi_id = Column(String(64), nullable=True)
bind_id = Column(String(64), nullable=True)
device_id = Column(String(64), nullable=True, index=True)
# 告警内容
alert_type = Column(String(64), nullable=False)
algorithm = Column(String(128), nullable=True)
confidence = Column(Integer, nullable=True)
duration_minutes = Column(Integer, nullable=True)
trigger_time = Column(DateTime, nullable=False)
message = Column(Text, nullable=True)
bbox = Column(JSON, nullable=True)
# 截图
snapshot_url = Column(String(512), nullable=True)
snapshot_path = Column(String(512), nullable=True)
# 处理状态
status = Column(Enum(AlertStatus), default=AlertStatus.PENDING, index=True)
level = Column(Enum(AlertLevel), default=AlertLevel.MEDIUM)
handle_remark = Column(Text, nullable=True)
handled_by = Column(String(64), nullable=True)
handled_at = Column(DateTime, nullable=True)
# AI分析
ai_analysis = Column(JSON, nullable=True)
# 关联工单
work_order_id = Column(Integer, ForeignKey("work_orders.id"), nullable=True)
# 时间戳
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc))
# 关系
work_order = relationship("WorkOrder", back_populates="alerts", foreign_keys=[work_order_id])
__table_args__ = (
Index('idx_alert_trigger_time', 'trigger_time'),
Index('idx_alert_camera_status', 'camera_id', 'status'),
)
def to_dict(self) -> dict:
return {
"id": self.id,
"alert_no": self.alert_no,
"camera_id": self.camera_id,
"roi_id": self.roi_id,
"bind_id": self.bind_id,
"device_id": self.device_id,
"alert_type": self.alert_type,
"algorithm": self.algorithm,
"confidence": self.confidence,
"duration_minutes": self.duration_minutes,
"trigger_time": self.trigger_time.isoformat() if self.trigger_time else None,
"message": self.message,
"bbox": self.bbox,
"snapshot_url": self.snapshot_url,
"status": self.status.value if self.status else None,
"level": self.level.value if self.level else None,
"handle_remark": self.handle_remark,
"handled_by": self.handled_by,
"handled_at": self.handled_at.isoformat() if self.handled_at else None,
"ai_analysis": self.ai_analysis,
"work_order_id": self.work_order_id,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
class WorkOrder(Base):
"""工单表"""
__tablename__ = "work_orders"
id = Column(Integer, primary_key=True, autoincrement=True)
order_no = Column(String(32), unique=True, nullable=False, index=True)
# 关联告警
alert_id = Column(Integer, nullable=True)
alert_no = Column(String(32), nullable=True)
# 工单内容
title = Column(String(255), nullable=False)
description = Column(Text, nullable=True)
priority = Column(Enum(WorkOrderPriority), default=WorkOrderPriority.MEDIUM)
# 派发信息
assignee_id = Column(String(64), nullable=True, index=True)
assignee_name = Column(String(64), nullable=True)
department = Column(String(64), nullable=True)
# 状态
status = Column(Enum(WorkOrderStatus), default=WorkOrderStatus.CREATED, index=True)
# 处理结果
result = Column(Text, nullable=True)
attachments = Column(JSON, nullable=True)
# 时间
deadline = Column(DateTime, nullable=True)
assigned_at = Column(DateTime, nullable=True)
started_at = Column(DateTime, nullable=True)
completed_at = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc))
# 关系
alerts = relationship("Alert", back_populates="work_order", foreign_keys=[Alert.work_order_id])
def to_dict(self) -> dict:
return {
"id": self.id,
"order_no": self.order_no,
"alert_id": self.alert_id,
"alert_no": self.alert_no,
"title": self.title,
"description": self.description,
"priority": self.priority.value if self.priority else None,
"assignee_id": self.assignee_id,
"assignee_name": self.assignee_name,
"department": self.department,
"status": self.status.value if self.status else None,
"result": self.result,
"attachments": self.attachments,
"deadline": self.deadline.isoformat() if self.deadline else None,
"assigned_at": self.assigned_at.isoformat() if self.assigned_at else None,
"started_at": self.started_at.isoformat() if self.started_at else None,
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
class EdgeDevice(Base):
"""边缘设备表"""
__tablename__ = "edge_devices"
id = Column(Integer, primary_key=True, autoincrement=True)
device_id = Column(String(64), unique=True, nullable=False, index=True)
device_name = Column(String(128), nullable=True)
# 状态
status = Column(Enum(DeviceStatus), default=DeviceStatus.OFFLINE, index=True)
last_heartbeat = Column(DateTime, nullable=True)
# 运行信息
uptime_seconds = Column(BigInteger, nullable=True)
frames_processed = Column(BigInteger, nullable=True)
alerts_generated = Column(BigInteger, nullable=True)
# 配置
ip_address = Column(String(45), nullable=True)
stream_count = Column(Integer, nullable=True)
config_version = Column(String(32), nullable=True)
# 扩展
extra_info = Column(JSON, nullable=True)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc))
def to_dict(self) -> dict:
return {
"id": self.id,
"device_id": self.device_id,
"device_name": self.device_name,
"status": self.status.value if self.status else None,
"last_heartbeat": self.last_heartbeat.isoformat() if self.last_heartbeat else None,
"uptime_seconds": self.uptime_seconds,
"frames_processed": self.frames_processed,
"alerts_generated": self.alerts_generated,
"ip_address": self.ip_address,
"stream_count": self.stream_count,
"config_version": self.config_version,
"extra_info": self.extra_info,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
# ==================== 新告警三表结构 ====================
class AlarmEvent(Base):
"""告警事件主表"""
__tablename__ = "alarm_event"
alarm_id = Column(String(64), primary_key=True, comment="分布式告警ID")
alarm_type = Column(String(32), nullable=False, comment="告警类型")
algorithm_code = Column(String(64), comment="算法编码")
device_id = Column(String(64), nullable=False, comment="摄像头/设备ID")
scene_id = Column(String(64), comment="场景/ROI ID")
event_time = Column(DateTime, nullable=False, comment="事件发生时间")
first_frame_time = Column(DateTime, comment="首帧时间")
last_frame_time = Column(DateTime, comment="末帧时间")
duration_ms = Column(Integer, comment="持续时长(毫秒)")
alarm_level = Column(SmallInteger, comment="告警级别: 1提醒 2一般 3严重 4紧急")
confidence_score = Column(Float, comment="置信度 0-1")
alarm_status = Column(String(20), default="NEW", comment="告警状态: NEW/CONFIRMED/FALSE/CLOSED")
handle_status = Column(String(20), default="UNHANDLED", comment="处理状态: UNHANDLED/HANDLING/DONE")
snapshot_url = Column(String(512), comment="截图URL")
video_url = Column(String(512), comment="视频URL")
edge_node_id = Column(String(64), comment="边缘节点ID")
handler = Column(String(64), comment="处理人")
handle_remark = Column(Text, comment="处理备注")
handled_at = Column(DateTime, comment="处理时间")
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc))
__table_args__ = (
Index('idx_alarm_event_time', 'event_time'),
Index('idx_alarm_device_type', 'device_id', 'alarm_type'),
)
def to_dict(self) -> dict:
return {
"alarm_id": self.alarm_id,
"alarm_type": self.alarm_type,
"algorithm_code": self.algorithm_code,
"device_id": self.device_id,
"scene_id": self.scene_id,
"event_time": self.event_time.strftime('%Y-%m-%d %H:%M:%S') if self.event_time else None,
"first_frame_time": self.first_frame_time.strftime('%Y-%m-%d %H:%M:%S') if self.first_frame_time else None,
"last_frame_time": self.last_frame_time.strftime('%Y-%m-%d %H:%M:%S') if self.last_frame_time else None,
"duration_ms": self.duration_ms,
"alarm_level": self.alarm_level,
"confidence_score": self.confidence_score,
"alarm_status": self.alarm_status,
"handle_status": self.handle_status,
"snapshot_url": self.snapshot_url,
"video_url": self.video_url,
"edge_node_id": self.edge_node_id,
"handler": self.handler,
"handle_remark": self.handle_remark,
"handled_at": self.handled_at.strftime('%Y-%m-%d %H:%M:%S') if self.handled_at else None,
"created_at": self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None,
"updated_at": self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.updated_at else None,
}
class AlarmEventExt(Base):
"""告警事件扩展表(算法结果详情)"""
__tablename__ = "alarm_event_ext"
id = Column(Integer, primary_key=True, autoincrement=True)
alarm_id = Column(String(64), nullable=False, index=True, comment="关联告警ID")
ext_type = Column(String(32), comment="扩展类型: EDGE/POST/MANUAL")
ext_data = Column(JSON, comment="扩展数据")
roi_config = Column(JSON, comment="ROI配置快照")
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
def to_dict(self) -> dict:
return {
"id": self.id,
"alarm_id": self.alarm_id,
"ext_type": self.ext_type,
"ext_data": self.ext_data,
"roi_config": self.roi_config,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
class AlarmLlmAnalysis(Base):
"""告警大模型分析表"""
__tablename__ = "alarm_llm_analysis"
id = Column(Integer, primary_key=True, autoincrement=True)
alarm_id = Column(String(64), nullable=False, index=True, comment="关联告警ID")
llm_model = Column(String(32), comment="模型名称")
analysis_type = Column(String(20), comment="分析类型: REVIEW/EXPLAIN/RISK")
summary = Column(Text, comment="分析摘要")
is_false_alarm = Column(Boolean, comment="是否误报")
risk_score = Column(Integer, comment="风险评分 0-100")
confidence_score = Column(Float, comment="分析置信度")
suggestion = Column(Text, comment="处置建议")
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
def to_dict(self) -> dict:
return {
"id": self.id,
"alarm_id": self.alarm_id,
"llm_model": self.llm_model,
"analysis_type": self.analysis_type,
"summary": self.summary,
"is_false_alarm": self.is_false_alarm,
"risk_score": self.risk_score,
"confidence_score": self.confidence_score,
"suggestion": self.suggestion,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
# ==================== 数据库管理 ====================
_engine = None
_SessionLocal = None
def get_engine():
global _engine
if _engine is None:
db_url = settings.database.url
connect_args = {}
if "sqlite" in db_url:
db_path = db_url.replace("sqlite:///", "")
os.makedirs(os.path.dirname(db_path) if os.path.dirname(db_path) else ".", exist_ok=True)
connect_args = {"check_same_thread": False}
_engine = create_engine(
db_url,
echo=settings.app.debug,
poolclass=StaticPool if "sqlite" in db_url else None,
connect_args=connect_args,
)
else:
_engine = create_engine(db_url, echo=settings.app.debug)
return _engine
def get_session():
global _SessionLocal
if _SessionLocal is None:
_SessionLocal = sessionmaker(bind=get_engine(), autocommit=False, autoflush=False)
return _SessionLocal()
def init_db():
engine = get_engine()
Base.metadata.create_all(bind=engine)
def close_db():
global _engine, _SessionLocal
if _engine:
_engine.dispose()
_engine = None
_SessionLocal = None