Files
iot-device-management-service/app/models.py
16337 63a8d5a8f2 告警-工单解耦:企微交互+Agent全面切换到工单驱动
Part A: 数据层
- 新增 WechatCardState 模型(order_id ↔ alarm_id 映射 + response_code)
- 新建 models_iot.py(IoT 工单只读 ORM:ops_order + security_ext + clean_ext)
- config.py 新增 IOT_DATABASE_URL 配置

Part B: 企微解耦(alarm_id → order_id)
- wechat_service: response_code 存储迁移到 wechat_card_state,集中 helper
- 卡片发送/更新方法改用 order_id,按钮 key: confirm_{order_id}
- wechat_callback: 按钮解析改 order_id,反查 alarm_id(可空)
- wechat_notify_api: send-card/sync-status 以 orderId 为主键
- yudao_aiot_alarm: 卡片操作改用 order_id,删重复 helper

Part C: Agent 工具全面改为工单驱动
- 新建 order_query.py(查 IoT ops_order,支持安保+保洁工单)
- 新建 order_action.py(操作工单状态 + 提交处理结果)
- 更新 prompts.py 为工单助手
- 更新工具注册(__init__.py)

Part D: 日报改为工单驱动
- daily_report_service 从查 alarm_event 改为查 IoT ops_order + 扩展表
- 支持安保+保洁工单统计
2026-03-31 10:49:42 +08:00

606 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
数据模型定义
"""
import enum
import os
from datetime import datetime, timezone
from app.utils.timezone import beijing_now
from typing import Optional
from sqlalchemy import (
Column, String, Integer, SmallInteger, BigInteger, Boolean, Float, DateTime, Text, Enum, JSON,
ForeignKey, create_engine, Index
)
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
from sqlalchemy.pool import StaticPool
from app.config import settings
Base = declarative_base()
# ==================== 枚举定义 ====================
class AlertStatus(str, enum.Enum):
"""告警状态"""
PENDING = "pending" # 待处理
CONFIRMED = "confirmed" # 已确认
IGNORED = "ignored" # 已忽略
RESOLVED = "resolved" # 已解决
DISPATCHED = "dispatched" # 已派单
class AlertLevel(str, enum.Enum):
"""告警级别"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class WorkOrderStatus(str, enum.Enum):
"""工单状态"""
CREATED = "created" # 已创建
ASSIGNED = "assigned" # 已派发
PROCESSING = "processing" # 处理中
COMPLETED = "completed" # 已完成
CLOSED = "closed" # 已关闭
class WorkOrderPriority(str, enum.Enum):
"""工单优先级"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
class DeviceStatus(str, enum.Enum):
"""设备状态"""
ONLINE = "online"
OFFLINE = "offline"
ERROR = "error"
# ==================== 数据模型 ====================
class Alert(Base):
"""告警表"""
__tablename__ = "alerts"
id = Column(Integer, primary_key=True, autoincrement=True)
alert_no = Column(String(32), unique=True, nullable=False, index=True)
# 来源信息
camera_id = Column(String(64), nullable=False, index=True)
roi_id = Column(String(64), nullable=True)
bind_id = Column(String(64), nullable=True)
device_id = Column(String(64), nullable=True, index=True)
# 告警内容
alert_type = Column(String(64), nullable=False)
algorithm = Column(String(128), nullable=True)
confidence = Column(Integer, nullable=True)
duration_minutes = Column(Integer, nullable=True)
trigger_time = Column(DateTime, nullable=False)
message = Column(Text, nullable=True)
bbox = Column(JSON, nullable=True)
# 截图
snapshot_url = Column(String(512), nullable=True)
snapshot_path = Column(String(512), nullable=True)
# 处理状态
status = Column(Enum(AlertStatus), default=AlertStatus.PENDING, index=True)
level = Column(Enum(AlertLevel), default=AlertLevel.MEDIUM)
handle_remark = Column(Text, nullable=True)
handled_by = Column(String(64), nullable=True)
handled_at = Column(DateTime, nullable=True)
# AI分析
ai_analysis = Column(JSON, nullable=True)
# 关联工单
work_order_id = Column(Integer, ForeignKey("work_orders.id"), nullable=True)
# 时间戳
created_at = Column(DateTime, default=lambda: beijing_now())
updated_at = Column(DateTime, default=lambda: beijing_now(),
onupdate=lambda: beijing_now())
# 关系
work_order = relationship("WorkOrder", back_populates="alerts", foreign_keys=[work_order_id])
__table_args__ = (
Index('idx_alert_trigger_time', 'trigger_time'),
Index('idx_alert_camera_status', 'camera_id', 'status'),
)
def to_dict(self) -> dict:
return {
"id": self.id,
"alert_no": self.alert_no,
"camera_id": self.camera_id,
"roi_id": self.roi_id,
"bind_id": self.bind_id,
"device_id": self.device_id,
"alert_type": self.alert_type,
"algorithm": self.algorithm,
"confidence": self.confidence,
"duration_minutes": self.duration_minutes,
"trigger_time": self.trigger_time.isoformat() if self.trigger_time else None,
"message": self.message,
"bbox": self.bbox,
"snapshot_url": self.snapshot_url,
"status": self.status.value if self.status else None,
"level": self.level.value if self.level else None,
"handle_remark": self.handle_remark,
"handled_by": self.handled_by,
"handled_at": self.handled_at.isoformat() if self.handled_at else None,
"ai_analysis": self.ai_analysis,
"work_order_id": self.work_order_id,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
class WorkOrder(Base):
"""工单表"""
__tablename__ = "work_orders"
id = Column(Integer, primary_key=True, autoincrement=True)
order_no = Column(String(32), unique=True, nullable=False, index=True)
# 关联告警
alert_id = Column(Integer, nullable=True)
alert_no = Column(String(32), nullable=True)
# 工单内容
title = Column(String(255), nullable=False)
description = Column(Text, nullable=True)
priority = Column(Enum(WorkOrderPriority), default=WorkOrderPriority.MEDIUM)
# 派发信息
assignee_id = Column(String(64), nullable=True, index=True)
assignee_name = Column(String(64), nullable=True)
department = Column(String(64), nullable=True)
# 状态
status = Column(Enum(WorkOrderStatus), default=WorkOrderStatus.CREATED, index=True)
# 处理结果
result = Column(Text, nullable=True)
attachments = Column(JSON, nullable=True)
# 时间
deadline = Column(DateTime, nullable=True)
assigned_at = Column(DateTime, nullable=True)
started_at = Column(DateTime, nullable=True)
completed_at = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=lambda: beijing_now())
updated_at = Column(DateTime, default=lambda: beijing_now(),
onupdate=lambda: beijing_now())
# 关系
alerts = relationship("Alert", back_populates="work_order", foreign_keys=[Alert.work_order_id])
def to_dict(self) -> dict:
return {
"id": self.id,
"order_no": self.order_no,
"alert_id": self.alert_id,
"alert_no": self.alert_no,
"title": self.title,
"description": self.description,
"priority": self.priority.value if self.priority else None,
"assignee_id": self.assignee_id,
"assignee_name": self.assignee_name,
"department": self.department,
"status": self.status.value if self.status else None,
"result": self.result,
"attachments": self.attachments,
"deadline": self.deadline.isoformat() if self.deadline else None,
"assigned_at": self.assigned_at.isoformat() if self.assigned_at else None,
"started_at": self.started_at.isoformat() if self.started_at else None,
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
class EdgeDevice(Base):
"""边缘设备表"""
__tablename__ = "edge_devices"
id = Column(Integer, primary_key=True, autoincrement=True)
device_id = Column(String(64), unique=True, nullable=False, index=True)
device_name = Column(String(128), nullable=True)
# 状态
status = Column(Enum(DeviceStatus), default=DeviceStatus.OFFLINE, index=True)
last_heartbeat = Column(DateTime, nullable=True)
# 运行信息
uptime_seconds = Column(BigInteger, nullable=True)
frames_processed = Column(BigInteger, nullable=True)
alerts_generated = Column(BigInteger, nullable=True)
# 配置
ip_address = Column(String(45), nullable=True)
stream_count = Column(Integer, nullable=True)
config_version = Column(String(32), nullable=True)
# 扩展
extra_info = Column(JSON, nullable=True)
created_at = Column(DateTime, default=lambda: beijing_now())
updated_at = Column(DateTime, default=lambda: beijing_now(),
onupdate=lambda: beijing_now())
def to_dict(self) -> dict:
return {
"id": self.id,
"device_id": self.device_id,
"device_name": self.device_name,
"status": self.status.value if self.status else None,
"last_heartbeat": self.last_heartbeat.isoformat() if self.last_heartbeat else None,
"uptime_seconds": self.uptime_seconds,
"frames_processed": self.frames_processed,
"alerts_generated": self.alerts_generated,
"ip_address": self.ip_address,
"stream_count": self.stream_count,
"config_version": self.config_version,
"extra_info": self.extra_info,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
# ==================== 新告警三表结构 ====================
class AlarmEvent(Base):
"""告警事件主表"""
__tablename__ = "alarm_event"
alarm_id = Column(String(64), primary_key=True, comment="分布式告警ID")
alarm_type = Column(String(32), nullable=False, comment="告警类型")
algorithm_code = Column(String(64), comment="算法编码")
device_id = Column(String(64), nullable=False, comment="摄像头/设备ID")
scene_id = Column(String(64), comment="场景/ROI ID")
event_time = Column(DateTime, nullable=False, comment="事件发生时间")
first_frame_time = Column(DateTime, comment="首帧时间")
last_frame_time = Column(DateTime, comment="末帧时间")
duration_ms = Column(Integer, comment="持续时长(毫秒)")
alarm_level = Column(SmallInteger, comment="告警级别: 0紧急 1重要 2普通 3轻微")
confidence_score = Column(Float, comment="置信度 0-1")
alarm_status = Column(String(20), default="NEW", comment="告警状态: NEW/CONFIRMED/FALSE/CLOSED")
handle_status = Column(String(20), default="UNHANDLED", comment="处理状态: UNHANDLED/HANDLING/DONE")
snapshot_url = Column(String(512), comment="截图URL")
video_url = Column(String(512), comment="视频URL")
edge_node_id = Column(String(64), comment="边缘节点ID")
area_id = Column(BigInteger, comment="所属区域ID")
handler = Column(String(64), comment="处理人")
handle_remark = Column(Text, comment="处理备注")
handled_at = Column(DateTime, comment="处理时间")
created_at = Column(DateTime, default=lambda: beijing_now())
updated_at = Column(DateTime, default=lambda: beijing_now(),
onupdate=lambda: beijing_now())
__table_args__ = (
Index('idx_alarm_event_time', 'event_time'),
Index('idx_alarm_device_type', 'device_id', 'alarm_type'),
)
def to_dict(self) -> dict:
return {
"alarm_id": self.alarm_id,
"alarm_type": self.alarm_type,
"algorithm_code": self.algorithm_code,
"device_id": self.device_id,
"scene_id": self.scene_id,
"event_time": self.event_time.strftime('%Y-%m-%d %H:%M:%S') if self.event_time else None,
"first_frame_time": self.first_frame_time.strftime('%Y-%m-%d %H:%M:%S') if self.first_frame_time else None,
"last_frame_time": self.last_frame_time.strftime('%Y-%m-%d %H:%M:%S') if self.last_frame_time else None,
"duration_ms": self.duration_ms,
"alarm_level": self.alarm_level,
"confidence_score": self.confidence_score,
"alarm_status": self.alarm_status,
"handle_status": self.handle_status,
"snapshot_url": self.snapshot_url,
"video_url": self.video_url,
"edge_node_id": self.edge_node_id,
"area_id": self.area_id,
"handler": self.handler,
"handle_remark": self.handle_remark,
"handled_at": self.handled_at.strftime('%Y-%m-%d %H:%M:%S') if self.handled_at else None,
"created_at": self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None,
"updated_at": self.updated_at.strftime('%Y-%m-%d %H:%M:%S') if self.updated_at else None,
}
class AlarmEventExt(Base):
"""告警事件扩展表(算法结果详情)"""
__tablename__ = "alarm_event_ext"
id = Column(Integer, primary_key=True, autoincrement=True)
alarm_id = Column(String(64), nullable=False, index=True, comment="关联告警ID")
ext_type = Column(String(32), comment="扩展类型: EDGE/POST/MANUAL")
ext_data = Column(JSON, comment="扩展数据")
roi_config = Column(JSON, comment="ROI配置快照")
created_at = Column(DateTime, default=lambda: beijing_now())
def to_dict(self) -> dict:
return {
"id": self.id,
"alarm_id": self.alarm_id,
"ext_type": self.ext_type,
"ext_data": self.ext_data,
"roi_config": self.roi_config,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
class AlarmLlmAnalysis(Base):
"""告警大模型分析表"""
__tablename__ = "alarm_llm_analysis"
id = Column(Integer, primary_key=True, autoincrement=True)
alarm_id = Column(String(64), nullable=False, index=True, comment="关联告警ID")
llm_model = Column(String(32), comment="模型名称")
analysis_type = Column(String(20), comment="分析类型: REVIEW/EXPLAIN/RISK")
summary = Column(Text, comment="分析摘要")
is_false_alarm = Column(Boolean, comment="是否误报")
risk_score = Column(Integer, comment="风险评分 0-100")
confidence_score = Column(Float, comment="分析置信度")
suggestion = Column(Text, comment="处置建议")
created_at = Column(DateTime, default=lambda: beijing_now())
def to_dict(self) -> dict:
return {
"id": self.id,
"alarm_id": self.alarm_id,
"llm_model": self.llm_model,
"analysis_type": self.analysis_type,
"summary": self.summary,
"is_false_alarm": self.is_false_alarm,
"risk_score": self.risk_score,
"confidence_score": self.confidence_score,
"suggestion": self.suggestion,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
# ==================== 工单体系 ====================
class SecurityUser(Base):
"""安保人员表"""
__tablename__ = "security_user"
id = Column(Integer, primary_key=True, autoincrement=True)
user_id = Column(String(64), unique=True, nullable=False, index=True, comment="人员唯一ID")
name = Column(String(100), nullable=False, comment="姓名")
phone = Column(String(20), nullable=True, comment="手机号")
wechat_uid = Column(String(100), nullable=True, comment="企微userid")
role = Column(String(50), default="guard", comment="角色: guard/supervisor/manager")
team_id = Column(String(64), nullable=True, index=True, comment="班组ID")
status = Column(String(20), default="active", comment="状态: active/inactive")
created_at = Column(DateTime, default=lambda: beijing_now())
updated_at = Column(DateTime, default=lambda: beijing_now(),
onupdate=lambda: beijing_now())
def to_dict(self) -> dict:
fmt = '%Y-%m-%d %H:%M:%S'
return {
"id": self.id,
"user_id": self.user_id,
"name": self.name,
"phone": self.phone,
"wechat_uid": self.wechat_uid,
"role": self.role,
"team_id": self.team_id,
"status": self.status,
"created_at": self.created_at.strftime(fmt) if self.created_at else None,
"updated_at": self.updated_at.strftime(fmt) if self.updated_at else None,
}
class SecurityWorkOrder(Base):
"""安保工单表"""
__tablename__ = "security_work_order"
order_id = Column(String(64), primary_key=True, comment="工单ID: WO + 时间戳 + uuid")
# 来源告警(告警工单必填,手动工单可空)
alarm_id = Column(String(64), nullable=True, unique=True, comment="关联 alarm_event.alarm_id")
# 工单内容
title = Column(String(200), nullable=False, comment="工单标题")
description = Column(Text, nullable=True, comment="工单描述")
priority = Column(SmallInteger, default=2, comment="优先级: 1低 2中 3高 4紧急")
# 设备/区域信息(冗余,方便查询和责任追溯)
camera_id = Column(String(64), nullable=True, comment="摄像头ID")
roi_id = Column(String(64), nullable=True, comment="ROI区域ID")
alarm_type = Column(String(50), nullable=True, comment="告警类型")
image_url = Column(String(512), nullable=True, comment="截图URL")
# 派发信息(写入时确定,不依赖实时查询,确保责任可追溯)
assigned_user_id = Column(String(64), nullable=True, index=True, comment="处理人user_id")
assigned_user_name = Column(String(100), nullable=True, comment="处理人姓名")
assigned_team_id = Column(String(64), nullable=True, comment="班组ID")
# 状态: PENDING / DISPATCHED / PROCESSING / DONE / CLOSED
status = Column(String(20), default="PENDING", nullable=False, index=True, comment="工单状态")
# 处理结果
result = Column(Text, nullable=True, comment="处理结果描述")
# 创建人
created_by = Column(String(64), nullable=True, comment="创建人")
# 时间
created_at = Column(DateTime, default=lambda: beijing_now())
updated_at = Column(DateTime, default=lambda: beijing_now(),
onupdate=lambda: beijing_now())
dispatch_time = Column(DateTime, nullable=True, comment="派单时间")
finish_time = Column(DateTime, nullable=True, comment="完成时间")
__table_args__ = (
Index('idx_swo_status', 'status'),
Index('idx_swo_assigned', 'assigned_user_id'),
Index('idx_swo_created_at', 'created_at'),
Index('idx_swo_camera_roi_type', 'camera_id', 'roi_id', 'alarm_type'),
)
def to_dict(self) -> dict:
fmt = '%Y-%m-%d %H:%M:%S'
return {
"order_id": self.order_id,
"alarm_id": self.alarm_id,
"title": self.title,
"description": self.description,
"priority": self.priority,
"camera_id": self.camera_id,
"roi_id": self.roi_id,
"alarm_type": self.alarm_type,
"image_url": self.image_url,
"assigned_user_id": self.assigned_user_id,
"assigned_user_name": self.assigned_user_name,
"assigned_team_id": self.assigned_team_id,
"status": self.status,
"result": self.result,
"created_by": self.created_by,
"created_at": self.created_at.strftime(fmt) if self.created_at else None,
"updated_at": self.updated_at.strftime(fmt) if self.updated_at else None,
"dispatch_time": self.dispatch_time.strftime(fmt) if self.dispatch_time else None,
"finish_time": self.finish_time.strftime(fmt) if self.finish_time else None,
}
class WorkOrderLog(Base):
"""工单操作记录表"""
__tablename__ = "work_order_log"
id = Column(Integer, primary_key=True, autoincrement=True)
order_id = Column(String(64), nullable=False, index=True, comment="关联工单ID")
action = Column(String(50), nullable=False,
comment="操作: CREATE/DISPATCH/ACCEPT/FINISH/CLOSE")
operator_id = Column(String(64), nullable=True, comment="操作人ID")
operator_name = Column(String(100), nullable=True, comment="操作人姓名")
remark = Column(Text, nullable=True, comment="备注")
created_at = Column(DateTime, default=lambda: beijing_now())
def to_dict(self) -> dict:
return {
"id": self.id,
"order_id": self.order_id,
"action": self.action,
"operator_id": self.operator_id,
"operator_name": self.operator_name,
"remark": self.remark,
"created_at": self.created_at.strftime('%Y-%m-%d %H:%M:%S') if self.created_at else None,
}
# ==================== 数据库管理 ====================
_engine = None
_SessionLocal = None
def get_engine():
global _engine
if _engine is None:
db_url = settings.database.url
connect_args = {}
if "sqlite" in db_url:
db_path = db_url.replace("sqlite:///", "")
os.makedirs(os.path.dirname(db_path) if os.path.dirname(db_path) else ".", exist_ok=True)
connect_args = {"check_same_thread": False}
_engine = create_engine(
db_url,
echo=settings.app.debug,
poolclass=StaticPool if "sqlite" in db_url else None,
connect_args=connect_args,
)
else:
_engine = create_engine(
db_url,
echo=settings.app.debug,
pool_recycle=1800,
pool_pre_ping=True,
)
return _engine
def get_session():
global _SessionLocal
if _SessionLocal is None:
_SessionLocal = sessionmaker(bind=get_engine(), autocommit=False, autoflush=False)
return _SessionLocal()
def init_db():
engine = get_engine()
Base.metadata.create_all(bind=engine)
class NotifyArea(Base):
"""通知区域"""
__tablename__ = "notify_area"
id = Column(Integer, primary_key=True, autoincrement=True)
area_id = Column(String(36), unique=True, nullable=False, index=True)
area_name = Column(String(100), nullable=False)
description = Column(String(200), nullable=True)
enabled = Column(SmallInteger, default=1)
created_at = Column(DateTime, default=beijing_now)
updated_at = Column(DateTime, default=beijing_now, onupdate=beijing_now)
class CameraAreaBinding(Base):
"""摄像头-区域映射"""
__tablename__ = "camera_area_binding"
id = Column(Integer, primary_key=True, autoincrement=True)
camera_id = Column(String(64), unique=True, nullable=False, index=True)
area_id = Column(String(36), nullable=False, index=True)
created_at = Column(DateTime, default=beijing_now)
class WechatCardState(Base):
"""企微卡片状态表order_id ↔ alarm_id 映射 + response_code"""
__tablename__ = "wechat_card_state"
order_id = Column(String(64), primary_key=True, comment="IoT工单ID(ops_order.id)")
response_code = Column(String(255), nullable=True, comment="企微卡片 response_code")
alarm_id = Column(String(64), nullable=True, comment="关联告警ID可空")
created_at = Column(DateTime, default=lambda: beijing_now())
updated_at = Column(DateTime, default=lambda: beijing_now(),
onupdate=lambda: beijing_now())
class AreaPersonBinding(Base):
"""区域-人员通知绑定"""
__tablename__ = "area_person_binding"
id = Column(Integer, primary_key=True, autoincrement=True)
area_id = Column(String(36), nullable=False, index=True)
person_name = Column(String(50), nullable=False)
wechat_uid = Column(String(100), nullable=False)
role = Column(String(20), default="SECURITY")
notify_level = Column(Integer, default=1)
enabled = Column(SmallInteger, default=1)
created_at = Column(DateTime, default=beijing_now)
updated_at = Column(DateTime, default=beijing_now, onupdate=beijing_now)
# ==================== 数据库管理 ====================
def close_db():
global _engine, _SessionLocal
if _engine:
_engine.dispose()
_engine = None
_SessionLocal = None