diff --git a/app/config.py b/app/config.py index f0d0c92..10e2613 100644 --- a/app/config.py +++ b/app/config.py @@ -125,6 +125,7 @@ class CameraNameConfig: class Settings(BaseModel): """全局配置""" database: DatabaseConfig = DatabaseConfig() + iot_database_url: str = "" # IoT 平台数据库(跨库只读查询) cos: COSConfig = COSConfig() app: AppConfig = AppConfig() ai_model: AIModelConfig = AIModelConfig() @@ -147,6 +148,7 @@ def load_settings() -> Settings: database=DatabaseConfig( url=os.getenv("DATABASE_URL", "sqlite:///./data/alert_platform.db"), ), + iot_database_url=os.getenv("IOT_DATABASE_URL", ""), cos=COSConfig( region=os.getenv("COS_REGION", "ap-beijing"), bucket=os.getenv("COS_BUCKET", ""), diff --git a/app/models.py b/app/models.py index a7b1c15..3a31708 100644 --- a/app/models.py +++ b/app/models.py @@ -568,6 +568,18 @@ class CameraAreaBinding(Base): created_at = Column(DateTime, default=beijing_now) +class WechatCardState(Base): + """企微卡片状态表(order_id ↔ alarm_id 映射 + response_code)""" + __tablename__ = "wechat_card_state" + + order_id = Column(String(64), primary_key=True, comment="IoT工单ID(ops_order.id)") + response_code = Column(String(255), nullable=True, comment="企微卡片 response_code") + alarm_id = Column(String(64), nullable=True, comment="关联告警ID(可空)") + created_at = Column(DateTime, default=lambda: beijing_now()) + updated_at = Column(DateTime, default=lambda: beijing_now(), + onupdate=lambda: beijing_now()) + + class AreaPersonBinding(Base): """区域-人员通知绑定""" __tablename__ = "area_person_binding" diff --git a/app/models_iot.py b/app/models_iot.py new file mode 100644 index 0000000..b84f094 --- /dev/null +++ b/app/models_iot.py @@ -0,0 +1,138 @@ +""" +IoT 工单只读 ORM 模型 + +映射 aiot-platform 库的工单表,vsp-service 通过第二个 SQLAlchemy 引擎跨库只读查询。 +""" + +from datetime import datetime +from typing import Optional + +from sqlalchemy import ( + Column, String, Integer, SmallInteger, BigInteger, DateTime, Text, + create_engine, Index, +) +from sqlalchemy.orm import declarative_base, sessionmaker + +from app.config import settings +from app.utils.logger import logger + +IotBase = declarative_base() + + +class IotOpsOrder(IotBase): + """IoT 通用工单主表(只读)""" + __tablename__ = "ops_order" + + id = Column(BigInteger, primary_key=True) + order_code = Column(String(64), comment="工单编号") + order_type = Column(String(32), comment="工单类型: SECURITY/CLEAN") + title = Column(String(200), comment="工单标题") + description = Column(Text, comment="工单描述") + priority = Column(SmallInteger, comment="优先级: 0低/1中/2高") + status = Column(String(32), comment="状态: PENDING/ASSIGNED/ARRIVED/PAUSED/COMPLETED/CANCELLED") + area_id = Column(BigInteger, comment="区域ID") + location = Column(String(200), comment="位置") + assignee_id = Column(BigInteger, comment="指派人ID") + assignee_name = Column(String(100), comment="指派人姓名") + start_time = Column(DateTime, comment="开始时间") + end_time = Column(DateTime, comment="结束时间") + creator = Column(String(64), comment="创建者") + create_time = Column(DateTime, comment="创建时间") + update_time = Column(DateTime, comment="更新时间") + deleted = Column(SmallInteger, default=0, comment="是否删除") + tenant_id = Column(BigInteger, default=0, comment="租户编号") + + +class IotOpsOrderSecurityExt(IotBase): + """安保工单扩展表(只读)""" + __tablename__ = "ops_order_security_ext" + + id = Column(BigInteger, primary_key=True) + ops_order_id = Column(BigInteger, nullable=False, comment="工单ID") + alarm_id = Column(String(64), comment="关联告警ID") + alarm_type = Column(String(32), comment="告警类型") + camera_id = Column(String(64), comment="摄像头ID") + camera_name = Column(String(128), comment="摄像头名称") + roi_id = Column(String(64), comment="ROI区域ID") + image_url = Column(String(512), comment="截图URL") + assigned_user_id = Column(BigInteger, comment="处理人ID") + assigned_user_name = Column(String(100), comment="处理人姓名") + result = Column(Text, comment="处理结果") + result_img_urls = Column(Text, comment="结果图片URL") + false_alarm = Column(SmallInteger, comment="是否误报") + dispatched_time = Column(DateTime, comment="派单时间") + confirmed_time = Column(DateTime, comment="确认时间") + completed_time = Column(DateTime, comment="完成时间") + creator = Column(String(64)) + create_time = Column(DateTime) + updater = Column(String(64)) + update_time = Column(DateTime) + deleted = Column(SmallInteger, default=0) + tenant_id = Column(BigInteger, default=0) + + +class IotOpsOrderCleanExt(IotBase): + """保洁工单扩展表(只读)""" + __tablename__ = "ops_order_clean_ext" + + id = Column(BigInteger, primary_key=True) + ops_order_id = Column(BigInteger, nullable=False, comment="工单ID") + is_auto = Column(SmallInteger, default=1, comment="是否自动工单") + expected_duration = Column(Integer, comment="预计作业时长(分钟)") + arrived_time = Column(DateTime, comment="实际到岗时间") + completed_time = Column(DateTime, comment="实际完成时间") + pause_start_time = Column(DateTime, comment="暂停开始时间") + pause_end_time = Column(DateTime, comment="暂停结束时间") + total_pause_seconds = Column(Integer, default=0, comment="累计暂停时长(秒)") + cleaning_type = Column(String(32), comment="保洁类型: ROUTINE/DEEP/SPOT/EMERGENCY") + difficulty_level = Column(SmallInteger, comment="难度等级(1-5)") + creator = Column(String(64)) + create_time = Column(DateTime) + updater = Column(String(64)) + update_time = Column(DateTime) + deleted = Column(SmallInteger, default=0) + tenant_id = Column(BigInteger, default=0) + dispatched_time = Column(DateTime, comment="实际下发时间") + first_dispatched_time = Column(DateTime, comment="首次下发时间") + + +# ==================== IoT 数据库连接管理 ==================== + +_iot_engine = None +_IotSessionLocal = None + + +def get_iot_engine(): + """获取 IoT 数据库引擎(懒初始化)""" + global _iot_engine + if _iot_engine is None: + iot_url = settings.iot_database_url + if not iot_url: + raise RuntimeError("IOT_DATABASE_URL 未配置,无法连接 IoT 数据库") + _iot_engine = create_engine( + iot_url, + echo=False, + pool_recycle=1800, + pool_pre_ping=True, + ) + logger.info(f"IoT 数据库引擎已创建") + return _iot_engine + + +def get_iot_session(): + """获取 IoT 数据库只读 session""" + global _IotSessionLocal + if _IotSessionLocal is None: + _IotSessionLocal = sessionmaker( + bind=get_iot_engine(), autocommit=False, autoflush=False, + ) + return _IotSessionLocal() + + +def close_iot_db(): + """关闭 IoT 数据库连接""" + global _iot_engine, _IotSessionLocal + if _iot_engine: + _iot_engine.dispose() + _iot_engine = None + _IotSessionLocal = None diff --git a/app/routers/wechat_callback.py b/app/routers/wechat_callback.py index fc45191..7957118 100644 --- a/app/routers/wechat_callback.py +++ b/app/routers/wechat_callback.py @@ -123,16 +123,18 @@ async def _process_card_button_click(msg: dict): """ 处理模板卡片按钮点击事件(两步状态机) + 按钮 key 格式为 {action}_{order_id},以工单ID为主键驱动。 + 第一步按钮: - - confirm_{alarm_id}: 确认接单 → 更新状态为处理中,创建工单,卡片更新到第二步 - - ignore_{alarm_id}: 误报忽略 → 终态,自动结单 + - confirm_{order_id}: 确认接单 → 调 IoT /confirm,卡片更新到第二步 + - ignore_{order_id}: 误报忽略 → 调 IoT /false-alarm,终态 第二步按钮: - - complete_{alarm_id}: 已处理完成 → 终态,自动结单 - - false_{alarm_id}: 标记误报 → 终态,自动结单 + - complete_{order_id}: 已处理完成 → 调 IoT /submit,终态 + - false_{order_id}: 标记误报 → 调 IoT /false-alarm,终态 终态按钮: - - done_{alarm_id}: 已完成,忽略 + - done_{order_id}: 已完成,忽略 """ try: from app.services.wechat_service import get_wechat_service @@ -147,19 +149,19 @@ async def _process_card_button_click(msg: dict): f"卡片按钮点击: user={user_id}, key={event_key}, task={task_id}" ) - # 解析 action 和 alarm_id + # 解析 action 和 order_id if event_key.startswith("confirm_"): action = "confirm" - alarm_id = event_key[len("confirm_"):] + order_id = event_key[len("confirm_"):] elif event_key.startswith("ignore_"): action = "ignore" - alarm_id = event_key[len("ignore_"):] + order_id = event_key[len("ignore_"):] elif event_key.startswith("complete_"): action = "complete" - alarm_id = event_key[len("complete_"):] + order_id = event_key[len("complete_"):] elif event_key.startswith("false_"): action = "false" - alarm_id = event_key[len("false_"):] + order_id = event_key[len("false_"):] elif event_key.startswith("done_"): return # 终态按钮,忽略 else: @@ -169,131 +171,99 @@ async def _process_card_button_click(msg: dict): wechat = get_wechat_service() wo_client = get_work_order_client() - # 保存 response_code 供后续 IoT 回调时更新卡片 + # 保存 response_code(以 order_id 为 key) if response_code: - wechat.save_response_code(alarm_id, response_code) + alarm_id = wechat.get_alarm_id_for_order(order_id) + wechat.save_response_code(order_id, response_code, alarm_id=alarm_id) - # 获取关联的工单ID - order_id = _get_order_id_for_alarm(alarm_id) + # 反查 alarm_id(可空,手动工单没有关联告警) + alarm_id = wechat.get_alarm_id_for_order(order_id) # ---- 确认接单:调 IoT /confirm ---- if action == "confirm": - if order_id and wo_client.enabled: + if wo_client.enabled: success = await wo_client.confirm_order(order_id) if success: - logger.info(f"IoT工单已确认,等待sync-status回调: alarm={alarm_id}, order={order_id}, by={user_id}") + logger.info(f"IoT工单已确认,等待sync-status回调: order={order_id}, by={user_id}") return # IoT 回调 sync-status 会更新告警+卡片 - # IoT 调用失败时降级直接更新 - logger.warning(f"IoT工单确认失败,降级处理: alarm={alarm_id}, order={order_id}") - service = get_alarm_event_service() - service.handle_alarm(alarm_id=alarm_id, alarm_status="CONFIRMED", - handle_status="HANDLING", handler=user_id, remark="企微确认(IoT降级)") - else: - # 工单未启用时直接更新告警状态 + logger.warning(f"IoT工单确认失败: order={order_id}") + + # 有告警时更新本地告警状态 + if alarm_id: service = get_alarm_event_service() service.handle_alarm(alarm_id=alarm_id, alarm_status="CONFIRMED", handle_status="HANDLING", handler=user_id, remark="企微确认接单") - # 降级时才更新卡片 + # 更新卡片 if response_code: await wechat.update_alarm_card_step2( response_code=response_code, user_ids=[user_id], - alarm_id=alarm_id, + order_id=order_id, operator_name=user_id, ) # ---- 误报忽略:调 IoT /false-alarm ---- elif action == "ignore": - if order_id and wo_client.enabled: + if wo_client.enabled: success = await wo_client.false_alarm(order_id) if success: - logger.info(f"IoT工单已标记误报,等待sync-status回调: alarm={alarm_id}, order={order_id}, by={user_id}") - return # IoT 回调 sync-status 会更新告警+卡片 - logger.warning(f"IoT误报标记失败,降级处理: alarm={alarm_id}, order={order_id}") - service = get_alarm_event_service() - service.handle_alarm(alarm_id=alarm_id, alarm_status="FALSE", - handle_status="IGNORED", handler=user_id, remark="企微误报(IoT降级)") - else: + logger.info(f"IoT工单已标记误报,等待sync-status回调: order={order_id}, by={user_id}") + return + + if alarm_id: service = get_alarm_event_service() service.handle_alarm(alarm_id=alarm_id, alarm_status="FALSE", handle_status="IGNORED", handler=user_id, remark="企微误报忽略") - # 降级时才更新卡片 if response_code: await wechat.update_alarm_card_terminal( response_code=response_code, user_ids=[user_id], - alarm_id=alarm_id, action="ignore", operator_name=user_id, + order_id=order_id, action="ignore", operator_name=user_id, ) - # ---- 已处理完成(step2按钮,保留兼容):调 IoT /submit ---- + # ---- 已处理完成(step2按钮):调 IoT /submit ---- elif action == "complete": - if order_id and wo_client.enabled: + if wo_client.enabled: success = await wo_client.submit_order(order_id, result=f"已处理 by {user_id}") if success: - logger.info(f"IoT工单已提交,等待sync-status回调: alarm={alarm_id}, order={order_id}") - return # IoT 回调 sync-status 会更新告警+卡片 - service = get_alarm_event_service() - service.handle_alarm(alarm_id=alarm_id, alarm_status="CLOSED", - handle_status="DONE", handler=user_id, remark="企微完成(IoT降级)") - else: + logger.info(f"IoT工单已提交,等待sync-status回调: order={order_id}") + return + + if alarm_id: service = get_alarm_event_service() service.handle_alarm(alarm_id=alarm_id, alarm_status="CLOSED", handle_status="DONE", handler=user_id, remark="企微已处理") - # 降级时才更新卡片 if response_code: await wechat.update_alarm_card_terminal( response_code=response_code, user_ids=[user_id], - alarm_id=alarm_id, action="complete", operator_name=user_id, + order_id=order_id, action="complete", operator_name=user_id, ) - # ---- 标记误报(step2按钮,保留兼容)---- + # ---- 标记误报(step2按钮)---- elif action == "false": - if order_id and wo_client.enabled: + if wo_client.enabled: success = await wo_client.false_alarm(order_id) if success: - logger.info(f"IoT工单已标记误报,等待sync-status回调: alarm={alarm_id}, order={order_id}") - return # IoT 回调 sync-status 会更新告警+卡片 - service = get_alarm_event_service() - service.handle_alarm(alarm_id=alarm_id, alarm_status="FALSE", - handle_status="IGNORED", handler=user_id, remark="企微误报(IoT降级)") - else: + logger.info(f"IoT工单已标记误报,等待sync-status回调: order={order_id}") + return + + if alarm_id: service = get_alarm_event_service() service.handle_alarm(alarm_id=alarm_id, alarm_status="FALSE", handle_status="IGNORED", handler=user_id, remark="企微标记误报") - # 降级时才更新卡片 if response_code: await wechat.update_alarm_card_terminal( response_code=response_code, user_ids=[user_id], - alarm_id=alarm_id, action="false", operator_name=user_id, + order_id=order_id, action="false", operator_name=user_id, ) except Exception as e: logger.error(f"处理卡片按钮点击失败: {e}", exc_info=True) -def _get_order_id_for_alarm(alarm_id: str) -> str: - """从 alarm_event_ext 中获取关联的工单ID""" - from app.models import get_session, AlarmEventExt - - db = get_session() - try: - ext = db.query(AlarmEventExt).filter( - AlarmEventExt.alarm_id == alarm_id, - AlarmEventExt.ext_type == "WORK_ORDER", - ).first() - if ext and ext.ext_data: - return ext.ext_data.get("order_id", "") - return "" - except Exception as e: - logger.error(f"查询工单ID失败: alarm={alarm_id}, error={e}") - return "" - finally: - db.close() - - # ==================== Agent测试接口(开发用) ==================== @router.post("/agent/test") diff --git a/app/routers/wechat_notify_api.py b/app/routers/wechat_notify_api.py index 3813308..e4a546e 100644 --- a/app/routers/wechat_notify_api.py +++ b/app/routers/wechat_notify_api.py @@ -18,7 +18,7 @@ router = APIRouter(prefix="/api/wechat/notify", tags=["企微通知-IoT回调"]) class SendCardRequest(BaseModel): """IoT 派单后发送企微卡片""" - alarmId: str + alarmId: Optional[str] = "" orderId: str userIds: List[str] title: str @@ -31,7 +31,7 @@ class SendCardRequest(BaseModel): class SyncStatusRequest(BaseModel): """IoT 工单状态变更后同步""" - alarmId: str + alarmId: Optional[str] = "" orderId: str status: str # dispatched / confirmed / completed / false_alarm / auto_resolved operator: Optional[str] = "" @@ -107,28 +107,29 @@ async def send_card(request: Request): camera_name = req.cameraName or "" alarm_type_code = "" alarm_snapshot_key = "" - from app.models import get_session, AlarmEvent - from app.services.camera_name_service import get_camera_name_service - db = get_session() - try: - alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == req.alarmId).first() - if alarm: - alarm_type_code = alarm.alarm_type or "" - alarm_snapshot_key = alarm.snapshot_url or "" - if not camera_name or camera_name == "未知": - if alarm.device_id: - camera_service = get_camera_name_service() - camera_info = await camera_service.get_camera_info(alarm.device_id) - camera_name = camera_service.format_display_name(alarm.device_id, camera_info) - finally: - db.close() + if req.alarmId: + from app.models import get_session, AlarmEvent + from app.services.camera_name_service import get_camera_name_service + db = get_session() + try: + alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == req.alarmId).first() + if alarm: + alarm_type_code = alarm.alarm_type or "" + alarm_snapshot_key = alarm.snapshot_url or "" + if not camera_name or camera_name == "未知": + if alarm.device_id: + camera_service = get_camera_name_service() + camera_info = await camera_service.get_camera_info(alarm.device_id) + camera_name = camera_service.format_display_name(alarm.device_id, camera_info) + finally: + db.close() # 截图预签名:优先用告警表的 object key(能正确签名) if not presigned_url or not snapshot_url: if alarm_snapshot_key: presigned_url = _get_presigned_url(alarm_snapshot_key) - logger.info(f"群聊截图诊断: alarm={req.alarmId}, " + logger.info(f"群聊截图诊断: order={req.orderId}, alarm={req.alarmId}, " f"iot_snapshot={req.snapshotUrl!r}, " f"db_snapshot={alarm_snapshot_key!r}, " f"presigned_url={presigned_url[:80] if presigned_url else '(空)'}...") @@ -138,7 +139,7 @@ async def send_card(request: Request): await wechat.send_group_alarm_combo( chat_id=group_chat_id, - alarm_id=req.alarmId, + alarm_id=req.alarmId or req.orderId, alarm_type=actual_alarm_type, area_name=req.areaName or "", camera_name=camera_name, @@ -149,9 +150,8 @@ async def send_card(request: Request): mention_user_ids=req.userIds, ) - # 私发卡片 - # 从告警表获取 alarm_type_code(避免重复"告警") - if not alarm_type_code: + # 私发卡片(以 order_id 为主键) + if not alarm_type_code and req.alarmId: from app.models import get_session, AlarmEvent db = get_session() try: @@ -162,17 +162,18 @@ async def send_card(request: Request): sent = await wechat.send_alarm_card( user_ids=req.userIds, - alarm_id=req.alarmId, + order_id=req.orderId, alarm_type=alarm_type_code or req.title, area_name=req.areaName or "", camera_name=camera_name if 'camera_name' in dir() else (req.cameraName or ""), description=f"工单编号:{req.orderId}", event_time=req.eventTime or "", alarm_level=req.level or 2, + alarm_id=req.alarmId or "", ) if sent: - logger.info(f"IoT回调发卡片成功: alarm={req.alarmId}, order={req.orderId}, users={req.userIds}") + logger.info(f"IoT回调发卡片成功: order={req.orderId}, alarm={req.alarmId}, users={req.userIds}") return {"code": 0, "msg": "success"} else: return {"code": -1, "msg": "发送失败"} @@ -197,49 +198,56 @@ async def sync_status(request: Request): service = get_alarm_event_service() wechat = get_wechat_service() + # 用 order_id 查 response_code 和 alarm_id + order_id = req.orderId + alarm_id = req.alarmId or wechat.get_alarm_id_for_order(order_id) + if req.status == "dispatched": # 派单:不更新告警,不发企微(企微由 send-card 接口发) - logger.info(f"dispatched 已记录: alarm={req.alarmId}, order={req.orderId}") + logger.info(f"dispatched 已记录: order={order_id}, alarm={alarm_id}") return {"code": 0, "msg": "success"} elif req.status == "confirmed": # 确认接单:更新卡片到第二步(不更新告警) if wechat.enabled: - response_code = wechat.get_response_code(req.alarmId) + response_code = wechat.get_response_code(order_id) if response_code: await wechat.update_alarm_card_step2( response_code=response_code, user_ids=[req.operator] if req.operator else [], - alarm_id=req.alarmId, + order_id=order_id, operator_name=req.operator, ) - logger.info(f"卡片已更新到步骤2: alarm={req.alarmId}") + logger.info(f"卡片已更新到步骤2: order={order_id}") return {"code": 0, "msg": "success"} elif req.status in ("completed", "false_alarm", "auto_resolved"): - # 终态:更新告警状态 + 更新卡片 + # 终态:更新告警状态(如果有关联告警)+ 更新卡片 terminal_map = { "completed": {"alarm_status": "CLOSED", "handle_status": "DONE", "card_action": "complete"}, "false_alarm": {"alarm_status": "FALSE", "handle_status": "IGNORED", "card_action": "false"}, "auto_resolved": {"alarm_status": "CLOSED", "handle_status": "DONE", "card_action": "auto_resolve"}, } mapping = terminal_map[req.status] - service.handle_alarm( - alarm_id=req.alarmId, - alarm_status=mapping["alarm_status"], - handle_status=mapping["handle_status"], - handler=req.operator or "", - remark=req.remark or f"IoT工单: {req.status}", - ) - logger.info(f"告警终态已同步: alarm={req.alarmId}, status={req.status}") + + # 仅在有关联告警时更新告警状态 + if alarm_id: + service.handle_alarm( + alarm_id=alarm_id, + alarm_status=mapping["alarm_status"], + handle_status=mapping["handle_status"], + handler=req.operator or "", + remark=req.remark or f"IoT工单: {req.status}", + ) + logger.info(f"告警终态已同步: alarm={alarm_id}, status={req.status}") if wechat.enabled: - response_code = wechat.get_response_code(req.alarmId) + response_code = wechat.get_response_code(order_id) if response_code: await wechat.update_alarm_card_terminal( response_code=response_code, user_ids=[req.operator] if req.operator else [], - alarm_id=req.alarmId, + order_id=order_id, action=mapping["card_action"], operator_name=req.operator, ) diff --git a/app/routers/yudao_aiot_alarm.py b/app/routers/yudao_aiot_alarm.py index 4fde2e0..0351ae3 100644 --- a/app/routers/yudao_aiot_alarm.py +++ b/app/routers/yudao_aiot_alarm.py @@ -257,9 +257,13 @@ async def _sync_work_order_and_card(alarm_id: str, alarm_status: str, operator: """前端操作后:优先调 IoT 工单 API,降级直接更新卡片""" try: from app.services.work_order_client import get_work_order_client + from app.services.wechat_service import get_wechat_service + wechat = get_wechat_service() wo_client = get_work_order_client() - order_id = _get_order_id_for_alarm(alarm_id) if wo_client.enabled else "" + + # 通过 wechat_card_state 查 order_id + order_id = wechat.get_order_id_for_alarm(alarm_id) if wo_client.enabled else "" if order_id and wo_client.enabled: if alarm_status == "CLOSED": @@ -267,45 +271,23 @@ async def _sync_work_order_and_card(alarm_id: str, alarm_status: str, operator: else: # FALSE success = await wo_client.false_alarm(order_id) if success: - logger.info(f"前端操作已同步IoT工单: alarm={alarm_id}, status={alarm_status}") + logger.info(f"前端操作已同步IoT工单: alarm={alarm_id}, order={order_id}, status={alarm_status}") return # IoT 回调 sync-status 会更新卡片 - # 降级:直接更新卡片 - from app.services.wechat_service import get_wechat_service - wechat = get_wechat_service() - if wechat.enabled: - response_code = wechat.get_response_code(alarm_id) + # 降级:直接更新卡片(用 order_id 查 response_code) + if wechat.enabled and order_id: + response_code = wechat.get_response_code(order_id) if response_code: action = "complete" if alarm_status == "CLOSED" else "false" await wechat.update_alarm_card_terminal( response_code=response_code, user_ids=[], - alarm_id=alarm_id, action=action, operator_name=operator, + order_id=order_id, action=action, operator_name=operator, ) - logger.info(f"降级直接更新卡片: alarm={alarm_id}, action={action}") + logger.info(f"降级直接更新卡片: order={order_id}, action={action}") except Exception as e: logger.error(f"同步工单/卡片失败: alarm={alarm_id}, error={e}", exc_info=True) -def _get_order_id_for_alarm(alarm_id: str) -> str: - """从 alarm_event_ext 中获取关联的工单ID""" - from app.models import get_session, AlarmEventExt - - db = get_session() - try: - ext = db.query(AlarmEventExt).filter( - AlarmEventExt.alarm_id == alarm_id, - AlarmEventExt.ext_type == "WORK_ORDER", - ).first() - if ext and ext.ext_data: - return ext.ext_data.get("order_id", "") - return "" - except Exception as e: - logger.error(f"查询工单ID失败: alarm={alarm_id}, error={e}") - return "" - finally: - db.close() - - @router.delete("/alert/delete") async def delete_alert( alarmId: Optional[str] = Query(None, description="告警ID"), diff --git a/app/services/agent/prompts.py b/app/services/agent/prompts.py index 8c4a5da..ecd3bd0 100644 --- a/app/services/agent/prompts.py +++ b/app/services/agent/prompts.py @@ -2,17 +2,22 @@ Agent Prompt 定义 """ -SYSTEM_PROMPT = """你是VSP安防AI助手,通过企业微信协助安保人员处理告警和工单。 +SYSTEM_PROMPT = """你是VSP物业AI助手,通过企业微信协助物业人员处理安保和保洁工单。 ## 能力(必须通过工具获取数据) -1. 查询告警统计和明细(query_alarm_stats / list_alarms / get_alarm_detail) -2. 处理告警(update_alarm_status:确认接单、忽略、处理完成、误报) -3. 提交工单处理结果(submit_order_result:含文字描述和处理后照片) -4. 查询待处理工单(list_my_orders) -5. 查询摄像头信息(query_camera) +1. 查询工单统计(query_order_stats)— 支持安保和保洁工单 +2. 查询工单列表(list_orders)— 按类型、状态、时间筛选 +3. 查看工单详情(get_order_detail)— 含告警信息、保洁信息 +4. 操作工单状态(update_order_status)— 确认接单、完成、误报、忽略 +5. 提交处理结果(submit_order_result)— 含文字描述和处理后照片 +6. 查询摄像头信息(query_camera) + +## 工单类型 +- SECURITY:安保工单(关联告警,含摄像头、告警类型等) +- CLEAN:保洁工单(含保洁类型、难度、预计时长等) ## 核心原则(严格遵守) -- 所有数据必须来自工具调用结果,绝对不要编造告警ID、告警数量、摄像头名称、时间等任何数据 +- 所有数据必须来自工具调用结果,绝对不要编造工单ID、数量、人员等任何数据 - 如果工具返回错误或未找到数据,如实告知用户,不要猜测或补充 - 不知道的事情直接说"我无法确认",不要推测 - 不要编造不存在的功能或操作 @@ -23,8 +28,7 @@ SYSTEM_PROMPT = """你是VSP安防AI助手,通过企业微信协助安保人 - 回复简洁,适合手机阅读 - 重要信息用【】标注 - 禁止使用markdown语法(如![](url)、**加粗**、# 标题),企微聊天不支持 -- 告警截图会自动发送图片消息,文字回复中不要包含图片链接 -- 用户问非安防相关问题时,简短回答"我只能协助处理安防告警和工单相关事务" +- 用户问非物业相关问题时,简短回答"我只能协助处理物业工单相关事务" """ IMAGE_ANALYZE_PROMPT = """你是物业安防图片分析员。分析这张图片,判断是否存在安全隐患或需要上报的情况。 diff --git a/app/services/agent/tools/__init__.py b/app/services/agent/tools/__init__.py index 292740a..6d904d2 100644 --- a/app/services/agent/tools/__init__.py +++ b/app/services/agent/tools/__init__.py @@ -2,18 +2,16 @@ 工具注册表:导出 all_tools 供图构建使用 """ -from .alarm_query import query_alarm_stats, list_alarms, get_alarm_detail -from .alarm_action import update_alarm_status -from .order_tools import list_my_orders, submit_order_result +from .order_query import query_order_stats, list_orders, get_order_detail +from .order_action import update_order_status, submit_order_result from .camera_tools import query_camera # 所有工具列表 — 添加新工具只需在这里追加 all_tools = [ - query_alarm_stats, - list_alarms, - get_alarm_detail, - update_alarm_status, - list_my_orders, + query_order_stats, + list_orders, + get_order_detail, + update_order_status, submit_order_result, query_camera, ] diff --git a/app/services/agent/tools/camera_tools.py b/app/services/agent/tools/camera_tools.py index 5865566..7fd38a0 100644 --- a/app/services/agent/tools/camera_tools.py +++ b/app/services/agent/tools/camera_tools.py @@ -5,7 +5,7 @@ import json from langchain_core.tools import tool -from .alarm_query import _get_camera_display_name +from .order_query import _get_camera_display_name @tool diff --git a/app/services/agent/tools/order_action.py b/app/services/agent/tools/order_action.py new file mode 100644 index 0000000..8b25c86 --- /dev/null +++ b/app/services/agent/tools/order_action.py @@ -0,0 +1,182 @@ +""" +工单操作工具:确认接单、完成、误报、提交处理结果 +""" + +import json +from typing import Optional, List +from langchain_core.tools import tool +from langchain_core.runnables import RunnableConfig + +from app.utils.logger import logger + + +async def _update_wechat_card(order_id: str, user_id: str, action: str): + """更新企微卡片状态(以 order_id 为 key)""" + try: + from app.services.wechat_service import get_wechat_service + wechat = get_wechat_service() + response_code = wechat.get_response_code(order_id) + if not response_code: + return + + if action == "confirm": + await wechat.update_alarm_card_step2( + response_code=response_code, user_ids=[user_id], + order_id=order_id, operator_name=user_id, + ) + else: + await wechat.update_alarm_card_terminal( + response_code=response_code, user_ids=[user_id], + order_id=order_id, action=action, operator_name=user_id, + ) + except Exception as e: + logger.error(f"更新企微卡片失败: order={order_id}, error={e}") + + +@tool +async def update_order_status(order_id: str, action: str, config: RunnableConfig) -> str: + """更新工单状态:确认接单(confirm)、处理完成(complete)、标记误报(false)、忽略(ignore) + + Args: + order_id: 工单ID(ops_order.id) + action: 操作类型 confirm=确认接单 complete=处理完成 false=标记误报 ignore=忽略 + """ + user_id = config.get("configurable", {}).get("user_id", "") + + from app.services.work_order_client import get_work_order_client + wo_client = get_work_order_client() + + if not wo_client.enabled: + return json.dumps({"error": "工单系统未启用"}, ensure_ascii=False) + + # IoT 工单操作 + iot_success = False + if action == "confirm": + iot_success = await wo_client.confirm_order(order_id) + elif action in ("ignore", "false"): + iot_success = await wo_client.false_alarm(order_id) + elif action == "complete": + iot_success = await wo_client.submit_order(order_id, result=f"已处理 by {user_id}") + + if not iot_success: + return json.dumps({"error": f"工单操作失败: order={order_id}, action={action}"}, ensure_ascii=False) + + # 有关联告警时同步告警状态 + from app.services.wechat_service import get_wechat_service + wechat = get_wechat_service() + alarm_id = wechat.get_alarm_id_for_order(order_id) + + if alarm_id: + action_map = { + "confirm": ("CONFIRMED", "HANDLING", "Agent确认接单"), + "ignore": ("FALSE", "IGNORED", "Agent忽略"), + "complete": ("CLOSED", "DONE", "Agent已处理"), + "false": ("FALSE", "IGNORED", "Agent标记误报"), + } + if action in action_map: + alarm_status, handle_status, remark = action_map[action] + from app.services.alarm_event_service import get_alarm_event_service + svc = get_alarm_event_service() + svc.handle_alarm( + alarm_id=alarm_id, alarm_status=alarm_status, + handle_status=handle_status, handler=user_id, remark=remark, + ) + + # 更新企微卡片 + await _update_wechat_card(order_id, user_id, action) + + action_labels = { + "confirm": "已确认接单", + "ignore": "已忽略", + "complete": "已处理完成", + "false": "已标记误报", + } + return json.dumps( + {"success": True, "message": f"{action_labels.get(action, action)}: 工单{order_id}"}, + ensure_ascii=False, + ) + + +@tool +async def submit_order_result( + order_id: str, + result_text: str, + config: RunnableConfig, + image_urls: Optional[List[str]] = None, +) -> str: + """提交工单处理结果(文字描述+处理后照片URL) + + Args: + order_id: 工单ID(ops_order.id) + result_text: 处理结果描述 + image_urls: 处理后照片URL列表(COS永久URL) + """ + user_id = config.get("configurable", {}).get("user_id", "") + + if image_urls is None: + image_urls = [] + + # 合并 session 中暂存的图片 + from app.services.session_manager import get_session_manager + session = get_session_manager().get(user_id) + if session.pending_images: + image_urls = session.pending_images + image_urls + session.pending_images = [] + + from app.services.work_order_client import get_work_order_client + wo_client = get_work_order_client() + + if wo_client.enabled: + success = await wo_client.submit_order( + order_id, + result=f"{result_text} by {user_id}", + result_img_urls=image_urls or None, + ) + if not success: + return json.dumps({"error": f"提交工单结果失败: order={order_id}"}, ensure_ascii=False) + + # 有关联告警时同步告警状态 + from app.services.wechat_service import get_wechat_service + wechat = get_wechat_service() + alarm_id = wechat.get_alarm_id_for_order(order_id) + + if alarm_id: + from app.services.alarm_event_service import get_alarm_event_service + svc = get_alarm_event_service() + remark = f"Agent结单: {result_text}" + if image_urls: + remark += f" (附{len(image_urls)}张图片)" + svc.handle_alarm(alarm_id=alarm_id, alarm_status="CLOSED", + handle_status="DONE", handler=user_id, remark=remark) + + # 持久化处理结果图片到 alarm_event_ext + if image_urls: + try: + from app.models import get_session as get_db_session, AlarmEventExt + db = get_db_session() + try: + ext = AlarmEventExt( + alarm_id=alarm_id, + ext_type="HANDLER_RESULT", + ext_data={"result_text": result_text, "image_urls": image_urls, "handler": user_id}, + ) + db.add(ext) + db.commit() + except Exception as e: + db.rollback() + logger.error(f"持久化处理结果图片失败: {e}") + finally: + db.close() + except Exception as e: + logger.error(f"保存处理结果失败: {e}") + + # 更新卡片到终态 + await _update_wechat_card(order_id, user_id, "complete") + + result = { + "success": True, + "message": f"工单已提交: {order_id}", + "result": result_text, + "images_count": len(image_urls), + } + return json.dumps(result, ensure_ascii=False) diff --git a/app/services/agent/tools/order_query.py b/app/services/agent/tools/order_query.py new file mode 100644 index 0000000..09e14d6 --- /dev/null +++ b/app/services/agent/tools/order_query.py @@ -0,0 +1,334 @@ +""" +工单查询工具:统计、列表、详情(查 IoT ops_order + 扩展表) +支持安保工单(SECURITY)和保洁工单(CLEAN) +""" + +import json +from datetime import timedelta +from typing import Optional +from langchain_core.tools import tool +from langchain_core.runnables import RunnableConfig + +from app.utils.logger import logger +from app.utils.timezone import beijing_now + + +# 告警类型中文映射 +ALARM_TYPE_NAMES = { + "leave_post": "人员离岗", "intrusion": "周界入侵", + "illegal_parking": "车辆违停", "vehicle_congestion": "车辆拥堵", +} + +# 工单状态映射 +ORDER_STATUS_NAMES = { + "PENDING": "待处理", "ASSIGNED": "已派单", "ARRIVED": "已到岗", + "PAUSED": "已暂停", "COMPLETED": "已完成", "CANCELLED": "已取消", +} + +# 工单优先级映射 +PRIORITY_NAMES = {0: "低", 1: "中", 2: "高"} + +# 保洁类型映射 +CLEANING_TYPE_NAMES = { + "ROUTINE": "日常保洁", "DEEP": "深度保洁", + "SPOT": "点状保洁", "EMERGENCY": "应急保洁", +} + + +def _parse_time_range(time_range: str): + """解析时间范围,返回 (start_time, label)""" + now = beijing_now() + if time_range == "week": + start = now - timedelta(days=now.weekday()) + start = start.replace(hour=0, minute=0, second=0, microsecond=0) + return start, "本周" + elif time_range == "month": + start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + return start, "本月" + elif time_range == "yesterday": + start = (now - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) + end = now.replace(hour=0, minute=0, second=0, microsecond=0) + return start, "昨日" + else: + start = now.replace(hour=0, minute=0, second=0, microsecond=0) + return start, "今日" + + +def _get_camera_display_name(device_id: str) -> str: + """同步获取摄像头显示名称""" + try: + import asyncio + from app.services.camera_name_service import get_camera_name_service + camera_service = get_camera_name_service() + loop = asyncio.get_event_loop() + if loop.is_running(): + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor() as pool: + cam_info = pool.submit( + asyncio.run, camera_service.get_camera_info(device_id) + ).result(timeout=5) + else: + cam_info = asyncio.run(camera_service.get_camera_info(device_id)) + return camera_service.format_display_name(device_id, cam_info) + except Exception: + return device_id + + +def _query_orders( + order_type: Optional[str] = None, + status: Optional[str] = None, + start_time=None, + end_time=None, + limit: int = 100, + assignee_name: Optional[str] = None, +): + """查询 IoT 工单(跨库只读)""" + from app.models_iot import get_iot_session, IotOpsOrder + db = get_iot_session() + try: + q = db.query(IotOpsOrder).filter(IotOpsOrder.deleted == 0) + if order_type and order_type != "ALL": + q = q.filter(IotOpsOrder.order_type == order_type) + if status: + q = q.filter(IotOpsOrder.status == status) + if start_time: + q = q.filter(IotOpsOrder.create_time >= start_time) + if end_time: + q = q.filter(IotOpsOrder.create_time < end_time) + if assignee_name: + q = q.filter(IotOpsOrder.assignee_name.contains(assignee_name)) + + total = q.count() + orders = q.order_by(IotOpsOrder.create_time.desc()).limit(limit).all() + + # 提取所有 order_id 用于关联查询扩展表 + order_ids = [o.id for o in orders] + + # 批量查安保扩展 + sec_ext_map = {} + if order_ids: + from app.models_iot import IotOpsOrderSecurityExt + sec_exts = db.query(IotOpsOrderSecurityExt).filter( + IotOpsOrderSecurityExt.ops_order_id.in_(order_ids), + IotOpsOrderSecurityExt.deleted == 0, + ).all() + sec_ext_map = {e.ops_order_id: e for e in sec_exts} + + # 批量查保洁扩展 + clean_ext_map = {} + if order_ids: + from app.models_iot import IotOpsOrderCleanExt + clean_exts = db.query(IotOpsOrderCleanExt).filter( + IotOpsOrderCleanExt.ops_order_id.in_(order_ids), + IotOpsOrderCleanExt.deleted == 0, + ).all() + clean_ext_map = {e.ops_order_id: e for e in clean_exts} + + return orders, total, sec_ext_map, clean_ext_map + except Exception as e: + logger.error(f"查询IoT工单失败: {e}", exc_info=True) + return [], 0, {}, {} + finally: + db.close() + + +@tool +def query_order_stats(time_range: str = "today", order_type: str = "ALL") -> str: + """查询工单统计数据(总数、按状态分布、按类型分布) + + Args: + time_range: 时间范围 today=今日 week=本周 month=本月 yesterday=昨日 + order_type: 工单类型筛选 SECURITY=安保 CLEAN=保洁 ALL=全部 + """ + start, range_label = _parse_time_range(time_range) + now = beijing_now() + end = now + if time_range == "yesterday": + end = now.replace(hour=0, minute=0, second=0, microsecond=0) + + orders, total, sec_ext_map, clean_ext_map = _query_orders( + order_type=order_type if order_type != "ALL" else None, + start_time=start, end_time=end, limit=10000, + ) + + # 按状态统计 + status_count = {} + for o in orders: + s = o.status or "PENDING" + status_count[s] = status_count.get(s, 0) + 1 + + # 按类型统计 + type_count = {"SECURITY": 0, "CLEAN": 0} + alarm_type_count = {} + for o in orders: + ot = o.order_type or "SECURITY" + type_count[ot] = type_count.get(ot, 0) + 1 + # 安保工单细分告警类型 + sec_ext = sec_ext_map.get(o.id) + if sec_ext and sec_ext.alarm_type: + alarm_type_count[sec_ext.alarm_type] = alarm_type_count.get(sec_ext.alarm_type, 0) + 1 + + # 误报统计(安保特有) + false_alarm_count = sum( + 1 for e in sec_ext_map.values() if e.false_alarm == 1 + ) + + result = { + "range": range_label, + "total": total, + "by_status": {ORDER_STATUS_NAMES.get(s, s): c for s, c in status_count.items()}, + "by_order_type": {k: v for k, v in type_count.items() if v > 0}, + "by_alarm_type": {ALARM_TYPE_NAMES.get(t, t): c for t, c in alarm_type_count.items()} if alarm_type_count else {}, + "false_alarm_count": false_alarm_count, + } + return json.dumps(result, ensure_ascii=False) + + +@tool +def list_orders( + time_range: str = "today", + order_type: str = "ALL", + status: str = "", + limit: int = 10, + assigned_to_me: bool = False, + config: RunnableConfig = None, +) -> str: + """查询工单列表,返回最近的工单记录 + + Args: + time_range: 时间范围 today/week/month/yesterday + order_type: 工单类型 SECURITY=安保 CLEAN=保洁 ALL=全部 + status: 状态筛选 PENDING/ASSIGNED/ARRIVED/PAUSED/COMPLETED/CANCELLED + limit: 返回条数,默认10,最多20 + assigned_to_me: 是否只看我的工单 + """ + start, range_label = _parse_time_range(time_range) + now = beijing_now() + end = now + if time_range == "yesterday": + end = now.replace(hour=0, minute=0, second=0, microsecond=0) + + limit = min(limit, 20) + + # 获取当前用户 + user_id = "" + if config and assigned_to_me: + user_id = config.get("configurable", {}).get("user_id", "") + + orders, total, sec_ext_map, clean_ext_map = _query_orders( + order_type=order_type if order_type != "ALL" else None, + status=status or None, + start_time=start, end_time=end, limit=limit, + ) + + items = [] + for o in orders: + create_time = "" + if o.create_time: + try: + create_time = o.create_time.strftime("%m-%d %H:%M") + except Exception: + create_time = str(o.create_time)[:16] + + item = { + "order_id": str(o.id), + "order_code": o.order_code or "", + "type": o.order_type or "SECURITY", + "title": o.title or "", + "status": ORDER_STATUS_NAMES.get(o.status, o.status or "待处理"), + "priority": PRIORITY_NAMES.get(o.priority, "中"), + "assignee": o.assignee_name or "", + "time": create_time, + } + + # 安保扩展 + sec_ext = sec_ext_map.get(o.id) + if sec_ext: + item["alarm_type"] = ALARM_TYPE_NAMES.get(sec_ext.alarm_type, sec_ext.alarm_type or "") + item["camera"] = sec_ext.camera_name or _get_camera_display_name(sec_ext.camera_id) if sec_ext.camera_id else "" + item["false_alarm"] = bool(sec_ext.false_alarm) + + # 保洁扩展 + clean_ext = clean_ext_map.get(o.id) + if clean_ext: + item["cleaning_type"] = CLEANING_TYPE_NAMES.get(clean_ext.cleaning_type, clean_ext.cleaning_type or "") + item["difficulty"] = clean_ext.difficulty_level + + items.append(item) + + result = {"range": range_label, "total": total, "items": items} + return json.dumps(result, ensure_ascii=False) + + +@tool +def get_order_detail(order_id: str, config: RunnableConfig) -> str: + """查询单条工单的详细信息 + + Args: + order_id: 工单ID(ops_order.id) + """ + from app.models_iot import get_iot_session, IotOpsOrder, IotOpsOrderSecurityExt, IotOpsOrderCleanExt + db = get_iot_session() + try: + order = db.query(IotOpsOrder).filter( + IotOpsOrder.id == int(order_id), + IotOpsOrder.deleted == 0, + ).first() + if not order: + return json.dumps({"error": f"未找到工单: {order_id}"}, ensure_ascii=False) + + result = { + "order_id": str(order.id), + "order_code": order.order_code or "", + "order_type": order.order_type or "", + "title": order.title or "", + "description": order.description or "", + "status": ORDER_STATUS_NAMES.get(order.status, order.status or ""), + "priority": PRIORITY_NAMES.get(order.priority, "中"), + "assignee": order.assignee_name or "", + "location": order.location or "", + "create_time": order.create_time.strftime("%Y-%m-%d %H:%M:%S") if order.create_time else "", + } + + # 安保扩展 + sec_ext = db.query(IotOpsOrderSecurityExt).filter( + IotOpsOrderSecurityExt.ops_order_id == order.id, + IotOpsOrderSecurityExt.deleted == 0, + ).first() + if sec_ext: + result["alarm_id"] = sec_ext.alarm_id or "" + result["alarm_type"] = ALARM_TYPE_NAMES.get(sec_ext.alarm_type, sec_ext.alarm_type or "") + result["camera_name"] = sec_ext.camera_name or ( + _get_camera_display_name(sec_ext.camera_id) if sec_ext.camera_id else "" + ) + result["has_image"] = bool(sec_ext.image_url) + result["false_alarm"] = bool(sec_ext.false_alarm) + result["result"] = sec_ext.result or "" + if sec_ext.dispatched_time: + result["dispatched_time"] = sec_ext.dispatched_time.strftime("%Y-%m-%d %H:%M:%S") + if sec_ext.confirmed_time: + result["confirmed_time"] = sec_ext.confirmed_time.strftime("%Y-%m-%d %H:%M:%S") + if sec_ext.completed_time: + result["completed_time"] = sec_ext.completed_time.strftime("%Y-%m-%d %H:%M:%S") + + # 保洁扩展 + clean_ext = db.query(IotOpsOrderCleanExt).filter( + IotOpsOrderCleanExt.ops_order_id == order.id, + IotOpsOrderCleanExt.deleted == 0, + ).first() + if clean_ext: + result["cleaning_type"] = CLEANING_TYPE_NAMES.get(clean_ext.cleaning_type, clean_ext.cleaning_type or "") + result["difficulty"] = clean_ext.difficulty_level + result["expected_duration"] = clean_ext.expected_duration + result["is_auto"] = bool(clean_ext.is_auto) + if clean_ext.arrived_time: + result["arrived_time"] = clean_ext.arrived_time.strftime("%Y-%m-%d %H:%M:%S") + if clean_ext.completed_time: + result["clean_completed_time"] = clean_ext.completed_time.strftime("%Y-%m-%d %H:%M:%S") + + return json.dumps(result, ensure_ascii=False) + except Exception as e: + logger.error(f"查询工单详情失败: {e}", exc_info=True) + return json.dumps({"error": f"查询失败: {e}"}, ensure_ascii=False) + finally: + db.close() diff --git a/app/services/daily_report_service.py b/app/services/daily_report_service.py index 95e3c9e..1bd4bc8 100644 --- a/app/services/daily_report_service.py +++ b/app/services/daily_report_service.py @@ -1,8 +1,8 @@ """ -每日告警日报定时推送服务 +每日工单日报定时推送服务 -每天定时生成前一天的告警汇总,发送到企微群聊。 -支持按边缘节点分组统计。 +每天定时生成前一天的工单汇总,发送到企微群聊。 +数据源:IoT ops_order + 安保/保洁扩展表。 """ import asyncio from collections import Counter, defaultdict @@ -12,7 +12,7 @@ from typing import Dict, List, Optional from app.utils.logger import logger from app.config import settings -# 告警类型中文映射(与 wechat_service 保持一致) +# 告警类型中文映射 ALARM_TYPE_NAMES = { "leave_post": "人员离岗", "intrusion": "周界入侵", @@ -20,23 +20,21 @@ ALARM_TYPE_NAMES = { "vehicle_congestion": "车辆拥堵", } +# 保洁类型映射 +CLEANING_TYPE_NAMES = { + "ROUTINE": "日常保洁", "DEEP": "深度保洁", + "SPOT": "点状保洁", "EMERGENCY": "应急保洁", +} + +# 工单状态映射 +ORDER_STATUS_NAMES = { + "PENDING": "待处理", "ASSIGNED": "已派单", "ARRIVED": "已到岗", + "PAUSED": "已暂停", "COMPLETED": "已完成", "CANCELLED": "已取消", +} + WEEKDAY_NAMES = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"] -def _get_edge_name_map() -> Dict[str, str]: - """从 EdgeDevice 表获取 edge_node_id → device_name 映射""" - from app.models import get_session, EdgeDevice - db = get_session() - try: - devices = db.query(EdgeDevice.device_id, EdgeDevice.device_name).all() - return {d.device_id: (d.device_name or d.device_id) for d in devices} - except Exception as e: - logger.warning(f"查询边缘设备名称失败: {e}") - return {} - finally: - db.close() - - def _format_resp_time(minutes: float) -> str: """格式化响应时长""" if minutes < 60: @@ -45,74 +43,113 @@ def _format_resp_time(minutes: float) -> str: async def generate_daily_report() -> Optional[str]: - """生成昨日告警日报 Markdown 内容(含按边缘节点分组)""" - from app.services.alarm_event_service import get_alarm_event_service - from app.services.camera_name_service import get_camera_name_service + """生成昨日工单日报 Markdown 内容""" from app.utils.timezone import beijing_now - svc = get_alarm_event_service() - camera_svc = get_camera_name_service() - now = beijing_now() today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) yesterday_start = today_start - timedelta(days=1) day_before_start = today_start - timedelta(days=2) - # 查询昨日和前日全量告警 - yesterday_alarms, yesterday_total = svc.get_alarms( - start_time=yesterday_start, end_time=today_start, page=1, page_size=10000 - ) - _, prev_total = svc.get_alarms( - start_time=day_before_start, end_time=yesterday_start, page=1, page_size=1 - ) - date_str = yesterday_start.strftime("%m-%d") weekday = WEEKDAY_NAMES[yesterday_start.weekday()] - # 无告警时发送简短通知 - if yesterday_total == 0: - return ( - f"**AI安防日报 — {date_str}({weekday})**\n\n" - f">昨日告警总计:**0** 条\n" - f">系统运行正常,无异常事件" + # 查询 IoT 工单 + try: + from app.models_iot import ( + get_iot_session, IotOpsOrder, + IotOpsOrderSecurityExt, IotOpsOrderCleanExt, ) + except Exception as e: + logger.error(f"IoT数据库不可用,日报生成失败: {e}") + return None - # ---- 全局统计 ---- - type_counter: Counter = Counter() - device_counter: Counter = Counter() - handle_done = 0 - handle_ignored = 0 - handle_unhandled = 0 - false_alarm = 0 + db = get_iot_session() + try: + # 昨日工单 + yesterday_orders = db.query(IotOpsOrder).filter( + IotOpsOrder.create_time >= yesterday_start, + IotOpsOrder.create_time < today_start, + IotOpsOrder.deleted == 0, + ).all() + yesterday_total = len(yesterday_orders) + + # 前日工单(用于环比) + prev_total = db.query(IotOpsOrder).filter( + IotOpsOrder.create_time >= day_before_start, + IotOpsOrder.create_time < yesterday_start, + IotOpsOrder.deleted == 0, + ).count() + + if yesterday_total == 0: + return ( + f"**物业工单日报 — {date_str}({weekday})**\n\n" + f">昨日工单总计:**0** 条\n" + f">系统运行正常,无工单" + ) + + # 收集 order_ids + order_ids = [o.id for o in yesterday_orders] + + # 批量查安保扩展 + sec_ext_map = {} + sec_exts = db.query(IotOpsOrderSecurityExt).filter( + IotOpsOrderSecurityExt.ops_order_id.in_(order_ids), + IotOpsOrderSecurityExt.deleted == 0, + ).all() + sec_ext_map = {e.ops_order_id: e for e in sec_exts} + + # 批量查保洁扩展 + clean_ext_map = {} + clean_exts = db.query(IotOpsOrderCleanExt).filter( + IotOpsOrderCleanExt.ops_order_id.in_(order_ids), + IotOpsOrderCleanExt.deleted == 0, + ).all() + clean_ext_map = {e.ops_order_id: e for e in clean_exts} + + except Exception as e: + logger.error(f"查询IoT工单失败: {e}", exc_info=True) + return None + finally: + db.close() + + # ---- 统计 ---- + type_count = {"SECURITY": 0, "CLEAN": 0} + status_count = Counter() + alarm_type_count = Counter() + camera_counter = Counter() + false_alarm_count = 0 response_times: List[float] = [] + cleaning_type_count = Counter() - # ---- 按边缘节点分组 ---- - edge_groups: Dict[str, List] = defaultdict(list) + for o in yesterday_orders: + ot = o.order_type or "SECURITY" + type_count[ot] = type_count.get(ot, 0) + 1 + status_count[o.status or "PENDING"] += 1 - for a in yesterday_alarms: - type_counter[a.alarm_type] += 1 - device_counter[a.device_id] += 1 + # 安保统计 + sec_ext = sec_ext_map.get(o.id) + if sec_ext: + if sec_ext.alarm_type: + alarm_type_count[sec_ext.alarm_type] += 1 + if sec_ext.camera_name: + camera_counter[sec_ext.camera_name] += 1 + elif sec_ext.camera_id: + camera_counter[sec_ext.camera_id] += 1 + if sec_ext.false_alarm == 1: + false_alarm_count += 1 + # 响应时长:dispatched → confirmed + if sec_ext.dispatched_time and sec_ext.confirmed_time: + delta = (sec_ext.confirmed_time - sec_ext.dispatched_time).total_seconds() / 60.0 + if 0 <= delta <= 360: + response_times.append(delta) - # 按 edge_node_id 分组(无 edge_node_id 归入 "unknown") - edge_key = a.edge_node_id or "unknown" - edge_groups[edge_key].append(a) + # 保洁统计 + clean_ext = clean_ext_map.get(o.id) + if clean_ext and clean_ext.cleaning_type: + cleaning_type_count[clean_ext.cleaning_type] += 1 - if a.handle_status == "DONE": - handle_done += 1 - elif a.handle_status == "IGNORED": - handle_ignored += 1 - else: - handle_unhandled += 1 - - if a.alarm_status == "FALSE": - false_alarm += 1 - - if a.handled_at and a.event_time: - delta = (a.handled_at - a.event_time).total_seconds() / 60.0 - if 0 <= delta <= 360: # 排除 >6h 异常值 - response_times.append(delta) - - # 环比变化 + # 环比 if prev_total > 0: change_pct = (yesterday_total - prev_total) / prev_total * 100 if change_pct > 0: @@ -122,95 +159,62 @@ async def generate_daily_report() -> Optional[str]: else: change_str = f"前日{prev_total}条,持平" else: - change_str = "前日无告警" + change_str = "前日无工单" # 平均响应时长 resp_str = _format_resp_time(sum(response_times) / len(response_times)) if response_times else "暂无数据" - # 设备 Top5 — 批量获取摄像头名称 - top5_devices = device_counter.most_common(5) - all_device_ids = list({d[0] for d in top5_devices}) - # 也收集各边缘节点的 top 设备 ID - for edge_key, alarms in edge_groups.items(): - edge_dev_counter = Counter(a.device_id for a in alarms) - for dev_id, _ in edge_dev_counter.most_common(3): - if dev_id not in all_device_ids: - all_device_ids.append(dev_id) - - try: - name_map = await camera_svc.get_display_names_batch(all_device_ids) - except Exception as e: - logger.warning(f"日报获取摄像头名称失败: {e}") - name_map = {} - - # 边缘设备名称映射 - edge_name_map = _get_edge_name_map() + # 待处理数量 + pending_count = sum( + 1 for o in yesterday_orders if o.status in ("PENDING", "ASSIGNED") + ) + completed_count = status_count.get("COMPLETED", 0) + cancelled_count = status_count.get("CANCELLED", 0) # ==================== 组装 Markdown ==================== lines = [ - f"**AI安防日报 — {date_str}({weekday})**", + f"**物业工单日报 — {date_str}({weekday})**", "", - f">昨日告警总计:{yesterday_total} 条({change_str})", - f">待处理:{handle_unhandled} 条", - f">已处理:{handle_done}条 | 已忽略:{handle_ignored}条 | 误报:{false_alarm}条", - f">平均响应:{resp_str}", + f">昨日工单总计:{yesterday_total} 条({change_str})", ] - # 按类型分布 - if type_counter: + # 按工单类型 + sec_count = type_count.get("SECURITY", 0) + clean_count = type_count.get("CLEAN", 0) + if sec_count and clean_count: + lines.append(f">安保工单:{sec_count}条 | 保洁工单:{clean_count}条") + elif sec_count: + lines.append(f">安保工单:{sec_count}条") + elif clean_count: + lines.append(f">保洁工单:{clean_count}条") + + lines.append(f">待处理:{pending_count} 条 | " + f"已完成:{completed_count}条 | 已取消:{cancelled_count}条 | 误报:{false_alarm_count}条") + lines.append(f">平均响应:{resp_str}") + + # 安保告警类型分布 + if alarm_type_count: lines.append("") - lines.append("**按类型分布**") - for alarm_type, count in type_counter.most_common(): + lines.append("**安保告警类型分布**") + for alarm_type, count in alarm_type_count.most_common(): type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type) lines.append(f">{type_name}:{count}条") - # 设备 Top5 - if top5_devices: + # 保洁类型分布 + if cleaning_type_count: lines.append("") - lines.append("**告警设备 Top5**") - for i, (device_id, count) in enumerate(top5_devices, 1): - display_name = name_map.get(device_id, device_id) - lines.append(f">{i}. {display_name} — {count}条") + lines.append("**保洁类型分布**") + for ct, count in cleaning_type_count.most_common(): + ct_name = CLEANING_TYPE_NAMES.get(ct, ct) + lines.append(f">{ct_name}:{count}条") - # ---- 按边缘节点分组汇总 ---- - if len(edge_groups) > 1 or (len(edge_groups) == 1 and "unknown" not in edge_groups): + # 摄像头 Top5 + top5_cameras = camera_counter.most_common(5) + if top5_cameras: lines.append("") - lines.append("**各边缘节点汇总**") - - # 按告警数降序排列 - sorted_edges = sorted(edge_groups.items(), key=lambda x: len(x[1]), reverse=True) - - for edge_key, alarms in sorted_edges: - edge_name = edge_name_map.get(edge_key, edge_key) - if edge_key == "unknown": - edge_name = "未知节点" - - edge_total = len(alarms) - edge_type_counter = Counter(a.alarm_type for a in alarms) - edge_unhandled = sum(1 for a in alarms if a.handle_status not in ("DONE", "IGNORED")) - edge_false = sum(1 for a in alarms if a.alarm_status == "FALSE") - - # 类型分布简写 - type_parts = [] - for at, ac in edge_type_counter.most_common(): - type_parts.append(f"{ALARM_TYPE_NAMES.get(at, at)}{ac}条") - type_summary = "、".join(type_parts) - - # 该节点的 Top3 摄像头 - edge_dev_counter = Counter(a.device_id for a in alarms) - top3 = edge_dev_counter.most_common(3) - cam_parts = [] - for dev_id, dev_count in top3: - cam_name = name_map.get(dev_id, dev_id) - cam_parts.append(f"{cam_name}({dev_count})") - cam_summary = "、".join(cam_parts) - - lines.append(f"") - lines.append(f">{edge_name}:{edge_total} 条") - lines.append(f"> 类型:{type_summary}") - if edge_unhandled > 0: - lines.append(f"> 待处理:{edge_unhandled} 条 误报:{edge_false}条") - lines.append(f"> Top设备:{cam_summary}") + lines.append("**告警摄像头 Top5**") + for i, (cam_name, count) in enumerate(top5_cameras, 1): + lines.append(f">{i}. {cam_name} — {count}条") return "\n".join(lines) diff --git a/app/services/notify_dispatch.py b/app/services/notify_dispatch.py index eeed834..4a6bd14 100644 --- a/app/services/notify_dispatch.py +++ b/app/services/notify_dispatch.py @@ -404,13 +404,22 @@ def _get_permanent_url(snapshot_url: str) -> str: def _save_order_id(alarm_id: str, order_id: str): - """将工单ID保存到 alarm_event_ext(ext_type=WORK_ORDER)""" + """将工单ID保存到 wechat_card_state + alarm_event_ext""" db = get_session() try: + # 写 wechat_card_state(order_id ↔ alarm_id 映射) + from app.models import WechatCardState + card_state = WechatCardState( + order_id=str(order_id), + alarm_id=alarm_id, + ) + db.merge(card_state) + + # 写 alarm_event_ext(保持兼容) ext = AlarmEventExt( alarm_id=alarm_id, ext_type="WORK_ORDER", - ext_data={"order_id": order_id}, + ext_data={"order_id": str(order_id)}, ) db.add(ext) db.commit() diff --git a/app/services/wechat_service.py b/app/services/wechat_service.py index 5d512d4..034cdf4 100644 --- a/app/services/wechat_service.py +++ b/app/services/wechat_service.py @@ -42,7 +42,7 @@ class WeChatService: self._token_expire_at = 0 self._service_base_url = "" # 缓存 response_code,用于更新卡片状态 - # key: task_id (alarm_id), value: response_code + # key: order_id, value: response_code self._response_codes: Dict[str, str] = {} def init(self, config): @@ -94,53 +94,85 @@ class WeChatService: logger.info("企微 access_token 已更新") return self._access_token - def save_response_code(self, task_id: str, response_code: str): - """保存卡片的 response_code(内存缓存 + 数据库持久化)""" - self._response_codes[task_id] = response_code + def save_response_code(self, order_id: str, response_code: str, alarm_id: str = ""): + """保存卡片的 response_code 到 wechat_card_state(以 order_id 为主键)""" + self._response_codes[order_id] = response_code try: - from app.models import get_session, AlarmEventExt + from app.models import get_session, WechatCardState db = get_session() try: - ext = db.query(AlarmEventExt).filter( - AlarmEventExt.alarm_id == task_id, - AlarmEventExt.ext_type == "WECHAT_RESPONSE_CODE", + state = db.query(WechatCardState).filter( + WechatCardState.order_id == order_id, ).first() - if ext: - ext.ext_data = {"response_code": response_code} + if state: + state.response_code = response_code + if alarm_id: + state.alarm_id = alarm_id else: - ext = AlarmEventExt( - alarm_id=task_id, - ext_type="WECHAT_RESPONSE_CODE", - ext_data={"response_code": response_code}, + state = WechatCardState( + order_id=order_id, + response_code=response_code, + alarm_id=alarm_id or None, ) - db.add(ext) + db.add(state) db.commit() finally: db.close() except Exception as e: logger.warning(f"持久化 response_code 失败: {e}") - def get_response_code(self, task_id: str) -> Optional[str]: - """获取 response_code(优先内存缓存,回退数据库查询)""" - code = self._response_codes.pop(task_id, None) + def get_response_code(self, order_id: str) -> Optional[str]: + """获取 response_code(优先内存缓存,回退数据库查询 wechat_card_state)""" + code = self._response_codes.pop(order_id, None) if code: return code try: - from app.models import get_session, AlarmEventExt + from app.models import get_session, WechatCardState db = get_session() try: - ext = db.query(AlarmEventExt).filter( - AlarmEventExt.alarm_id == task_id, - AlarmEventExt.ext_type == "WECHAT_RESPONSE_CODE", + state = db.query(WechatCardState).filter( + WechatCardState.order_id == order_id, ).first() - if ext and ext.ext_data: - return ext.ext_data.get("response_code", "") + if state and state.response_code: + return state.response_code finally: db.close() except Exception as e: logger.warning(f"查询 response_code 失败: {e}") return None + def get_alarm_id_for_order(self, order_id: str) -> str: + """从 wechat_card_state 查 order_id → alarm_id""" + try: + from app.models import get_session, WechatCardState + db = get_session() + try: + state = db.query(WechatCardState).filter( + WechatCardState.order_id == order_id, + ).first() + return state.alarm_id or "" if state else "" + finally: + db.close() + except Exception as e: + logger.warning(f"查询 alarm_id 失败: order={order_id}, error={e}") + return "" + + def get_order_id_for_alarm(self, alarm_id: str) -> str: + """从 wechat_card_state 查 alarm_id → order_id""" + try: + from app.models import get_session, WechatCardState + db = get_session() + try: + state = db.query(WechatCardState).filter( + WechatCardState.alarm_id == alarm_id, + ).first() + return state.order_id or "" if state else "" + finally: + db.close() + except Exception as e: + logger.warning(f"查询 order_id 失败: alarm={alarm_id}, error={e}") + return "" + # ==================== 媒体上传 ==================== async def upload_media(self, image_data: bytes, filename: str = "alarm.jpg") -> Optional[str]: @@ -231,13 +263,14 @@ class WeChatService: async def send_alarm_card( self, user_ids: List[str], - alarm_id: str, + order_id: str, alarm_type: str, area_name: str, camera_name: str, description: str, event_time: str, alarm_level: int = 2, + alarm_id: str = "", ) -> bool: """ 发送按钮交互型模板卡片(个人消息) @@ -261,7 +294,7 @@ class WeChatService: "agentid": self.agent_id_int, "template_card": { "card_type": "button_interaction", - "task_id": alarm_id, + "task_id": order_id, "source": { "desc": "AI安防告警", "desc_color": 3 if alarm_level <= 1 else 0, @@ -279,18 +312,18 @@ class WeChatService: ], "card_action": { "type": 1, - "url": self._alarm_detail_url(alarm_id) or "https://work.weixin.qq.com", + "url": self._alarm_detail_url(alarm_id or order_id) or "https://work.weixin.qq.com", }, "button_list": [ { "text": "确认接单", "style": 1, - "key": f"confirm_{alarm_id}", + "key": f"confirm_{order_id}", }, { "text": "误报忽略", "style": 2, - "key": f"ignore_{alarm_id}", + "key": f"ignore_{order_id}", }, ], }, @@ -307,9 +340,9 @@ class WeChatService: response_code = data.get("response_code", "") if response_code: - self.save_response_code(alarm_id, response_code) + self.save_response_code(order_id, response_code, alarm_id=alarm_id) - logger.info(f"企微卡片已发送: alarm={alarm_id}, users={user_ids}") + logger.info(f"企微卡片已发送: order={order_id}, alarm={alarm_id}, users={user_ids}") return True except Exception as e: @@ -320,7 +353,7 @@ class WeChatService: self, response_code: str, user_ids: List[str], - alarm_id: str, + order_id: str, operator_name: str = "", ) -> bool: """ @@ -341,7 +374,7 @@ class WeChatService: "response_code": response_code, "template_card": { "card_type": "button_interaction", - "task_id": alarm_id, + "task_id": order_id, "source": { "desc": "AI安防工单 - 处理中", "desc_color": 1, @@ -352,18 +385,18 @@ class WeChatService: "sub_title_text": "请在对话框中回复处理结果,或点击按钮快速操作", "card_action": { "type": 1, - "url": self._alarm_detail_url(alarm_id) or "https://work.weixin.qq.com", + "url": self._alarm_detail_url(order_id) or "https://work.weixin.qq.com", }, "button_list": [ { "text": "已处理完成", "style": 1, - "key": f"complete_{alarm_id}", + "key": f"complete_{order_id}", }, { "text": "标记误报", "style": 2, - "key": f"false_{alarm_id}", + "key": f"false_{order_id}", }, ], }, @@ -378,7 +411,7 @@ class WeChatService: logger.error(f"更新卡片到步骤2失败: {data}") return False - logger.info(f"卡片已更新到步骤2: alarm={alarm_id}, operator={operator_name}") + logger.info(f"卡片已更新到步骤2: order={order_id}, operator={operator_name}") return True except Exception as e: @@ -389,7 +422,7 @@ class WeChatService: self, response_code: str, user_ids: List[str], - alarm_id: str, + order_id: str, action: str, operator_name: str = "", ) -> bool: @@ -435,7 +468,7 @@ class WeChatService: logger.error(f"更新卡片终态失败: {data}") return False - logger.info(f"卡片已更新到终态: alarm={alarm_id}, action={action}") + logger.info(f"卡片已更新到终态: order={order_id}, action={action}") return True except Exception as e: