Files
iot-device-management-service/app/models.py
16337 d623f6b340 修复:告警状态流转统一 — 前端处理直接结单(CLOSED),支持处理图片上传
1. handle接口状态映射:handled→CLOSED(而非CONFIRMED),ignored→FALSE
2. handle_status同步设置:handled→DONE,ignored→IGNORED
3. 新增handle_image_url字段支持处理图片存储
4. 自动迁移:启动时自动ALTER TABLE添加新列
2026-03-25 15:01:35 +08:00

609 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
数据模型定义
"""
import enum
import os
from datetime import datetime, timezone
from app.utils.timezone import beijing_now
from typing import Optional
from sqlalchemy import (
Column, String, Integer, SmallInteger, BigInteger, Boolean, Float, DateTime, Text, Enum, JSON,
ForeignKey, create_engine, Index
)
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
from sqlalchemy.pool import StaticPool
from app.config import settings
Base = declarative_base()
# ==================== 枚举定义 ====================
class AlertStatus(str, enum.Enum):
"""告警状态"""
PENDING = "pending" # 待处理
CONFIRMED = "confirmed" # 已确认
IGNORED = "ignored" # 已忽略
RESOLVED = "resolved" # 已解决
DISPATCHED = "dispatched" # 已派单
class AlertLevel(str, enum.Enum):
"""告警级别"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class WorkOrderStatus(str, enum.Enum):
"""工单状态"""
CREATED = "created" # 已创建
ASSIGNED = "assigned" # 已派发
PROCESSING = "processing" # 处理中
COMPLETED = "completed" # 已完成
CLOSED = "closed" # 已关闭
class WorkOrderPriority(str, enum.Enum):
"""工单优先级"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
class DeviceStatus(str, enum.Enum):
"""设备状态"""
ONLINE = "online"
OFFLINE = "offline"
ERROR = "error"
# ==================== 数据模型 ====================
class Alert(Base):
"""告警表"""
__tablename__ = "alerts"
id = Column(Integer, primary_key=True, autoincrement=True)
alert_no = Column(String(32), unique=True, nullable=False, index=True)
# 来源信息
camera_id = Column(String(64), nullable=False, index=True)
roi_id = Column(String(64), nullable=True)
bind_id = Column(String(64), nullable=True)
device_id = Column(String(64), nullable=True, index=True)
# 告警内容
alert_type = Column(String(64), nullable=False)
algorithm = Column(String(128), nullable=True)
confidence = Column(Integer, nullable=True)
duration_minutes = Column(Integer, nullable=True)
trigger_time = Column(DateTime, nullable=False)
message = Column(Text, nullable=True)
bbox = Column(JSON, nullable=True)
# 截图
snapshot_url = Column(String(512), nullable=True)
snapshot_path = Column(String(512), nullable=True)
# 处理状态
status = Column(Enum(AlertStatus), default=AlertStatus.PENDING, index=True)
level = Column(Enum(AlertLevel), default=AlertLevel.MEDIUM)
handle_remark = Column(Text, nullable=True)
handled_by = Column(String(64), nullable=True)
handled_at = Column(DateTime, nullable=True)
# AI分析
ai_analysis = Column(JSON, nullable=True)
# 关联工单
work_order_id = Column(Integer, ForeignKey("work_orders.id"), nullable=True)
# 时间戳
created_at = Column(DateTime, default=lambda: beijing_now())
updated_at = Column(DateTime, default=lambda: beijing_now(),
onupdate=lambda: beijing_now())
# 关系
work_order = relationship("WorkOrder", back_populates="alerts", foreign_keys=[work_order_id])
__table_args__ = (
Index('idx_alert_trigger_time', 'trigger_time'),
Index('idx_alert_camera_status', 'camera_id', 'status'),
)
def to_dict(self) -> dict:
return {
"id": self.id,
"alert_no": self.alert_no,
"camera_id": self.camera_id,
"roi_id": self.roi_id,
"bind_id": self.bind_id,
"device_id": self.device_id,
"alert_type": self.alert_type,
"algorithm": self.algorithm,
"confidence": self.confidence,
"duration_minutes": self.duration_minutes,
"trigger_time": self.trigger_time.isoformat() if self.trigger_time else None,
"message": self.message,
"bbox": self.bbox,
"snapshot_url": self.snapshot_url,
"status": self.status.value if self.status else None,
"level": self.level.value if self.level else None,
"handle_remark": self.handle_remark,
"handled_by": self.handled_by,
"handled_at": self.handled_at.isoformat() if self.handled_at else None,
"ai_analysis": self.ai_analysis,
"work_order_id": self.work_order_id,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
class WorkOrder(Base):
"""工单表"""
__tablename__ = "work_orders"
id = Column(Integer, primary_key=True, autoincrement=True)
order_no = Column(String(32), unique=True, nullable=False, index=True)
# 关联告警
alert_id = Column(Integer, nullable=True)
alert_no = Column(String(32), nullable=True)
# 工单内容
title = Column(String(255), nullable=False)
description = Column(Text, nullable=True)
priority = Column(Enum(WorkOrderPriority), default=WorkOrderPriority.MEDIUM)
# 派发信息
assignee_id = Column(String(64), nullable=True, index=True)
assignee_name = Column(String(64), nullable=True)
department = Column(String(64), nullable=True)
# 状态
status = Column(Enum(WorkOrderStatus), default=WorkOrderStatus.CREATED, index=True)
# 处理结果
result = Column(Text, nullable=True)
attachments = Column(JSON, nullable=True)
# 时间
deadline = Column(DateTime, nullable=True)
assigned_at = Column(DateTime, nullable=True)
started_at = Column(DateTime, nullable=True)
completed_at = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=lambda: beijing_now())
updated_at = Column(DateTime, default=lambda: beijing_now(),
onupdate=lambda: beijing_now())
# 关系
alerts = relationship("Alert", back_populates="work_order", foreign_keys=[Alert.work_order_id])
def to_dict(self) -> dict:
return {
"id": self.id,
"order_no": self.order_no,
"alert_id": self.alert_id,
"alert_no": self.alert_no,
"title": self.title,
"description": self.description,
"priority": self.priority.value if self.priority else None,
"assignee_id": self.assignee_id,
"assignee_name": self.assignee_name,
"department": self.department,
"status": self.status.value if self.status else None,
"result": self.result,
"attachments": self.attachments,
"deadline": self.deadline.isoformat() if self.deadline else None,
"assigned_at": self.assigned_at.isoformat() if self.assigned_at else None,
"started_at": self.started_at.isoformat() if self.started_at else None,
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
class EdgeDevice(Base):
"""边缘设备表"""
__tablename__ = "edge_devices"
id = Column(Integer, primary_key=True, autoincrement=True)
device_id = Column(String(64), unique=True, nullable=False, index=True)
device_name = Column(String(128), nullable=True)
# 状态
status = Column(Enum(DeviceStatus), default=DeviceStatus.OFFLINE, index=True)
last_heartbeat = Column(DateTime, nullable=True)
# 运行信息
uptime_seconds = Column(BigInteger, nullable=True)
frames_processed = Column(BigInteger, nullable=True)
alerts_generated = Column(BigInteger, nullable=True)
# 配置
ip_address = Column(String(45), nullable=True)
stream_count = Column(Integer, nullable=True)
config_version = Column(String(32), nullable=True)
# 扩展
extra_info = Column(JSON, nullable=True)
created_at = Column(DateTime, default=lambda: beijing_now())
updated_at = Column(DateTime, default=lambda: beijing_now(),
onupdate=lambda: beijing_now())
def to_dict(self) -> dict:
return {
"id": self.id,
"device_id": self.device_id,
"device_name": self.device_name,
"status": self.status.value if self.status else None,
"last_heartbeat": self.last_heartbeat.isoformat() if self.last_heartbeat else None,
"uptime_seconds": self.uptime_seconds,
"frames_processed": self.frames_processed,
"alerts_generated": self.alerts_generated,
"ip_address": self.ip_address,
"stream_count": self.stream_count,
"config_version": self.config_version,
"extra_info": self.extra_info,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
# ==================== 新告警三表结构 ====================
class AlarmEvent(Base):
"""告警事件主表"""
__tablename__ = "alarm_event"
alarm_id = Column(String(64), primary_key=True, comment="分布式告警ID")
alarm_type = Column(String(32), nullable=False, comment="告警类型")
algorithm_code = Column(String(64), comment="算法编码")
device_id = Column(String(64), nullable=False, comment="摄像头/设备ID")
scene_id = Column(String(64), comment="场景/ROI ID")
event_time = Column(DateTime, nullable=False, comment="事件发生时间")
first_frame_time = Column(DateTime, comment="首帧时间")
last_frame_time = Column(DateTime, comment="末帧时间")
duration_ms = Column(Integer, comment="持续时长(毫秒)")
alarm_level = Column(SmallInteger, comment="告警级别: 0紧急 1重要 2普通 3轻微")
confidence_score = Column(Float, comment="置信度 0-1")
alarm_status = Column(String(20), default="NEW", comment="告警状态: NEW/CONFIRMED/FALSE/CLOSED")
handle_status = Column(String(20), default="UNHANDLED", comment="处理状态: UNHANDLED/HANDLING/DONE")
snapshot_url = Column(String(512), comment="截图URL")
video_url = Column(String(512), comment="视频URL")
edge_node_id = Column(String(64), comment="边缘节点ID")
area_id = Column(BigInteger, comment="所属区域ID")
handler = Column(String(64), comment="处理人")
handle_remark = Column(Text, comment="处理备注")
handle_image_url = Column(String(512), comment="处理图片URL")
handled_at = Column(DateTime, comment="处理时间")
created_at = Column(DateTime, default=lambda: beijing_now())
updated_at = Column(DateTime, default=lambda: beijing_now(),
onupdate=lambda: beijing_now())
__table_args__ = (
Index('idx_alarm_event_time', 'event_time'),
Index('idx_alarm_device_type', 'device_id', 'alarm_type'),
)
def to_dict(self) -> dict:
return {
"alarm_id": self.alarm_id,
"alarm_type": self.alarm_type,
"algorithm_code": self.algorithm_code,
"device_id": self.device_id,
"scene_id": self.scene_id,
"event_time": self.event_time.strftime('%Y-%m-%d %H:%M:%S') if self.event_time else None,
"first_frame_time": self.first_frame_time.strftime('%Y-%m-%d %H:%M:%S') if self.first_frame_time else None,
"last_frame_time": self.last_frame_time.strftime('%Y-%m-%d %H:%M:%S') if self.last_frame_time else None,
"duration_ms": self.duration_ms,
"alarm_level": self.alarm_level,
"confidence_score": self.confidence_score,
"alarm_status": self.alarm_status,
"handle_status": self.handle_status,
"snapshot_url": self.snapshot_url,
"video_url": self.video_url,
"edge_node_id": self.edge_node_id,
"area_id": self.area_id,
"handler": self.handler,
"handle_remark": self.handle_remark,
"handle_image_url": self.handle_image_url,
"handled_at": self.handled_at.strftime('%Y-%m-%d %H:%M:%S') if self.handled_at else None,
"created_at": self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None,
"updated_at": self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.updated_at else None,
}
class AlarmEventExt(Base):
"""告警事件扩展表(算法结果详情)"""
__tablename__ = "alarm_event_ext"
id = Column(Integer, primary_key=True, autoincrement=True)
alarm_id = Column(String(64), nullable=False, index=True, comment="关联告警ID")
ext_type = Column(String(32), comment="扩展类型: EDGE/POST/MANUAL")
ext_data = Column(JSON, comment="扩展数据")
roi_config = Column(JSON, comment="ROI配置快照")
created_at = Column(DateTime, default=lambda: beijing_now())
def to_dict(self) -> dict:
return {
"id": self.id,
"alarm_id": self.alarm_id,
"ext_type": self.ext_type,
"ext_data": self.ext_data,
"roi_config": self.roi_config,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
class AlarmLlmAnalysis(Base):
"""告警大模型分析表"""
__tablename__ = "alarm_llm_analysis"
id = Column(Integer, primary_key=True, autoincrement=True)
alarm_id = Column(String(64), nullable=False, index=True, comment="关联告警ID")
llm_model = Column(String(32), comment="模型名称")
analysis_type = Column(String(20), comment="分析类型: REVIEW/EXPLAIN/RISK")
summary = Column(Text, comment="分析摘要")
is_false_alarm = Column(Boolean, comment="是否误报")
risk_score = Column(Integer, comment="风险评分 0-100")
confidence_score = Column(Float, comment="分析置信度")
suggestion = Column(Text, comment="处置建议")
created_at = Column(DateTime, default=lambda: beijing_now())
def to_dict(self) -> dict:
return {
"id": self.id,
"alarm_id": self.alarm_id,
"llm_model": self.llm_model,
"analysis_type": self.analysis_type,
"summary": self.summary,
"is_false_alarm": self.is_false_alarm,
"risk_score": self.risk_score,
"confidence_score": self.confidence_score,
"suggestion": self.suggestion,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
# ==================== 工单体系 ====================
class SecurityUser(Base):
"""安保人员表"""
__tablename__ = "security_user"
id = Column(Integer, primary_key=True, autoincrement=True)
user_id = Column(String(64), unique=True, nullable=False, index=True, comment="人员唯一ID")
name = Column(String(100), nullable=False, comment="姓名")
phone = Column(String(20), nullable=True, comment="手机号")
wechat_uid = Column(String(100), nullable=True, comment="企微userid")
role = Column(String(50), default="guard", comment="角色: guard/supervisor/manager")
team_id = Column(String(64), nullable=True, index=True, comment="班组ID")
status = Column(String(20), default="active", comment="状态: active/inactive")
created_at = Column(DateTime, default=lambda: beijing_now())
updated_at = Column(DateTime, default=lambda: beijing_now(),
onupdate=lambda: beijing_now())
def to_dict(self) -> dict:
fmt = '%Y-%m-%d %H:%M:%S'
return {
"id": self.id,
"user_id": self.user_id,
"name": self.name,
"phone": self.phone,
"wechat_uid": self.wechat_uid,
"role": self.role,
"team_id": self.team_id,
"status": self.status,
"created_at": self.created_at.strftime(fmt) if self.created_at else None,
"updated_at": self.updated_at.strftime(fmt) if self.updated_at else None,
}
class SecurityWorkOrder(Base):
"""安保工单表"""
__tablename__ = "security_work_order"
order_id = Column(String(64), primary_key=True, comment="工单ID: WO + 时间戳 + uuid")
# 来源告警(告警工单必填,手动工单可空)
alarm_id = Column(String(64), nullable=True, unique=True, comment="关联 alarm_event.alarm_id")
# 工单内容
title = Column(String(200), nullable=False, comment="工单标题")
description = Column(Text, nullable=True, comment="工单描述")
priority = Column(SmallInteger, default=2, comment="优先级: 1低 2中 3高 4紧急")
# 设备/区域信息(冗余,方便查询和责任追溯)
camera_id = Column(String(64), nullable=True, comment="摄像头ID")
roi_id = Column(String(64), nullable=True, comment="ROI区域ID")
alarm_type = Column(String(50), nullable=True, comment="告警类型")
image_url = Column(String(512), nullable=True, comment="截图URL")
# 派发信息(写入时确定,不依赖实时查询,确保责任可追溯)
assigned_user_id = Column(String(64), nullable=True, index=True, comment="处理人user_id")
assigned_user_name = Column(String(100), nullable=True, comment="处理人姓名")
assigned_team_id = Column(String(64), nullable=True, comment="班组ID")
# 状态: PENDING / DISPATCHED / PROCESSING / DONE / CLOSED
status = Column(String(20), default="PENDING", nullable=False, index=True, comment="工单状态")
# 处理结果
result = Column(Text, nullable=True, comment="处理结果描述")
# 创建人
created_by = Column(String(64), nullable=True, comment="创建人")
# 时间
created_at = Column(DateTime, default=lambda: beijing_now())
updated_at = Column(DateTime, default=lambda: beijing_now(),
onupdate=lambda: beijing_now())
dispatch_time = Column(DateTime, nullable=True, comment="派单时间")
finish_time = Column(DateTime, nullable=True, comment="完成时间")
__table_args__ = (
Index('idx_swo_status', 'status'),
Index('idx_swo_assigned', 'assigned_user_id'),
Index('idx_swo_created_at', 'created_at'),
Index('idx_swo_camera_roi_type', 'camera_id', 'roi_id', 'alarm_type'),
)
def to_dict(self) -> dict:
fmt = '%Y-%m-%d %H:%M:%S'
return {
"order_id": self.order_id,
"alarm_id": self.alarm_id,
"title": self.title,
"description": self.description,
"priority": self.priority,
"camera_id": self.camera_id,
"roi_id": self.roi_id,
"alarm_type": self.alarm_type,
"image_url": self.image_url,
"assigned_user_id": self.assigned_user_id,
"assigned_user_name": self.assigned_user_name,
"assigned_team_id": self.assigned_team_id,
"status": self.status,
"result": self.result,
"created_by": self.created_by,
"created_at": self.created_at.strftime(fmt) if self.created_at else None,
"updated_at": self.updated_at.strftime(fmt) if self.updated_at else None,
"dispatch_time": self.dispatch_time.strftime(fmt) if self.dispatch_time else None,
"finish_time": self.finish_time.strftime(fmt) if self.finish_time else None,
}
class WorkOrderLog(Base):
"""工单操作记录表"""
__tablename__ = "work_order_log"
id = Column(Integer, primary_key=True, autoincrement=True)
order_id = Column(String(64), nullable=False, index=True, comment="关联工单ID")
action = Column(String(50), nullable=False,
comment="操作: CREATE/DISPATCH/ACCEPT/FINISH/CLOSE")
operator_id = Column(String(64), nullable=True, comment="操作人ID")
operator_name = Column(String(100), nullable=True, comment="操作人姓名")
remark = Column(Text, nullable=True, comment="备注")
created_at = Column(DateTime, default=lambda: beijing_now())
def to_dict(self) -> dict:
return {
"id": self.id,
"order_id": self.order_id,
"action": self.action,
"operator_id": self.operator_id,
"operator_name": self.operator_name,
"remark": self.remark,
"created_at": self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None,
}
# ==================== 数据库管理 ====================
_engine = None
_SessionLocal = None
def get_engine():
global _engine
if _engine is None:
db_url = settings.database.url
connect_args = {}
if "sqlite" in db_url:
db_path = db_url.replace("sqlite:///", "")
os.makedirs(os.path.dirname(db_path) if os.path.dirname(db_path) else ".", exist_ok=True)
connect_args = {"check_same_thread": False}
_engine = create_engine(
db_url,
echo=settings.app.debug,
poolclass=StaticPool if "sqlite" in db_url else None,
connect_args=connect_args,
)
else:
_engine = create_engine(db_url, echo=settings.app.debug)
return _engine
def get_session():
global _SessionLocal
if _SessionLocal is None:
_SessionLocal = sessionmaker(bind=get_engine(), autocommit=False, autoflush=False)
return _SessionLocal()
def _auto_migrate(engine):
"""自动添加缺失的列create_all 不会 ALTER 已有表)"""
from sqlalchemy import inspect, text
insp = inspect(engine)
migrations = [
("alarm_event", "handle_image_url", "VARCHAR(512) COMMENT '处理图片URL'"),
]
with engine.connect() as conn:
for table, column, col_def in migrations:
if table in insp.get_table_names():
existing = [c["name"] for c in insp.get_columns(table)]
if column not in existing:
conn.execute(text(f"ALTER TABLE {table} ADD COLUMN {column} {col_def}"))
conn.commit()
def init_db():
engine = get_engine()
Base.metadata.create_all(bind=engine)
# 自动添加新列ALTER TABLE 不会被 create_all 处理)
_auto_migrate(engine)
class NotifyArea(Base):
"""通知区域"""
__tablename__ = "notify_area"
id = Column(Integer, primary_key=True, autoincrement=True)
area_id = Column(String(36), unique=True, nullable=False, index=True)
area_name = Column(String(100), nullable=False)
description = Column(String(200), nullable=True)
enabled = Column(SmallInteger, default=1)
created_at = Column(DateTime, default=beijing_now)
updated_at = Column(DateTime, default=beijing_now, onupdate=beijing_now)
class CameraAreaBinding(Base):
"""摄像头-区域映射"""
__tablename__ = "camera_area_binding"
id = Column(Integer, primary_key=True, autoincrement=True)
camera_id = Column(String(64), unique=True, nullable=False, index=True)
area_id = Column(String(36), nullable=False, index=True)
created_at = Column(DateTime, default=beijing_now)
class AreaPersonBinding(Base):
"""区域-人员通知绑定"""
__tablename__ = "area_person_binding"
id = Column(Integer, primary_key=True, autoincrement=True)
area_id = Column(String(36), nullable=False, index=True)
person_name = Column(String(50), nullable=False)
wechat_uid = Column(String(100), nullable=False)
role = Column(String(20), default="SECURITY")
notify_level = Column(Integer, default=1)
enabled = Column(SmallInteger, default=1)
created_at = Column(DateTime, default=beijing_now)
updated_at = Column(DateTime, default=beijing_now, onupdate=beijing_now)
# ==================== 数据库管理 ====================
def close_db():
global _engine, _SessionLocal
if _engine:
_engine.dispose()
_engine = None
_SessionLocal = None