diff --git a/app/routers/wechat_callback.py b/app/routers/wechat_callback.py index 9838cec..a6246e5 100644 --- a/app/routers/wechat_callback.py +++ b/app/routers/wechat_callback.py @@ -50,6 +50,15 @@ async def get_alarm_detail(alarm_id: str = Query(..., description="告警ID")): if analysis: vlm_desc = analysis.summary or "" + # snapshot_url 可能是 COS key,需转为可访问的预签名 URL + snapshot_url = alarm.snapshot_url or "" + if snapshot_url and not snapshot_url.startswith("http"): + try: + from app.services.oss_storage import get_oss_storage + snapshot_url = get_oss_storage().get_presigned_url(snapshot_url) + except Exception: + pass + return YudaoResponse.success({ "alarm_id": alarm.alarm_id, "alarm_type": alarm.alarm_type, @@ -59,7 +68,7 @@ async def get_alarm_detail(alarm_id: str = Query(..., description="告警ID")): "alarm_level": alarm.alarm_level, "alarm_status": alarm.alarm_status, "handle_status": alarm.handle_status, - "snapshot_url": alarm.snapshot_url or "", + "snapshot_url": snapshot_url, "handler": alarm.handler or "", "handle_remark": alarm.handle_remark or "", "vlm_description": vlm_desc, @@ -160,17 +169,22 @@ async def wechat_agent_message( logger.error(f"企微消息解密失败: {e}") return PlainTextResponse(content="success") - # 只处理文本消息 - if msg.get("MsgType") != "text": + msg_type = msg.get("MsgType", "") + event = msg.get("Event", "") + + # ---- 模板卡片按钮点击事件 ---- + if msg_type == "event" and event == "template_card_event": + asyncio.create_task(_process_card_button_click(msg)) return PlainTextResponse(content="success") - user_id = msg.get("FromUserName", "") - content = msg.get("Content", "") + # ---- 文本消息 → Agent 对话 ---- + if msg_type == "text": + user_id = msg.get("FromUserName", "") + content = msg.get("Content", "") + logger.info(f"收到企微消息: user={user_id}, content={content[:50]}") + asyncio.create_task(_process_agent_message(user_id, content)) + return PlainTextResponse(content="success") - logger.info(f"收到企微消息: user={user_id}, content={content[:50]}") - - # 异步处理(企微要求5秒内响应),通过主动消息回复 - asyncio.create_task(_process_agent_message(user_id, content)) return PlainTextResponse(content="success") @@ -189,6 +203,84 @@ async def _process_agent_message(user_id: str, content: str): logger.error(f"Agent消息处理失败: user={user_id}, error={e}", exc_info=True) +async def _process_card_button_click(msg: dict): + """ + 处理模板卡片按钮点击事件 + + 企微回调 XML 解密后包含: + - FromUserName: 点击者 userid + - EventKey: 按钮 key (handle_{alarm_id} / ignore_{alarm_id}) + - TaskId: 卡片的 task_id (alarm_id) + - ResponseCode: 用于更新卡片状态(一次性) + """ + try: + from app.services.wechat_service import get_wechat_service + + user_id = msg.get("FromUserName", "") + event_key = msg.get("EventKey", "") + task_id = msg.get("TaskId", "") + response_code = msg.get("ResponseCode", "") + + logger.info( + f"卡片按钮点击: user={user_id}, key={event_key}, " + f"task={task_id}" + ) + + # 解析 action 和 alarm_id + if event_key.startswith("handle_"): + action = "handle" + alarm_id = event_key[len("handle_"):] + elif event_key.startswith("ignore_"): + action = "ignore" + alarm_id = event_key[len("ignore_"):] + elif event_key.startswith("done_"): + # 已处理状态的按钮,忽略 + return + else: + logger.warning(f"未知的按钮 key: {event_key}") + return + + # 更新告警状态 + action_map = { + "handle": { + "alarm_status": "CONFIRMED", + "handle_status": "HANDLING", + "remark": "企微卡片-前往处理", + }, + "ignore": { + "alarm_status": "FALSE", + "handle_status": "IGNORED", + "remark": "企微卡片-标记误报", + }, + } + + action_cfg = action_map.get(action) + if action_cfg: + service = get_alarm_event_service() + service.handle_alarm( + alarm_id=alarm_id, + alarm_status=action_cfg["alarm_status"], + handle_status=action_cfg["handle_status"], + handler=user_id, + remark=action_cfg["remark"], + ) + logger.info(f"告警状态已更新: alarm={alarm_id}, action={action}, by={user_id}") + + # 更新卡片按钮状态(变灰 + 显示处理结果) + if response_code: + wechat = get_wechat_service() + await wechat.update_alarm_card( + response_code=response_code, + user_ids=[user_id], + alarm_id=alarm_id, + action=action, + operator_name=user_id, + ) + + except Exception as e: + logger.error(f"处理卡片按钮点击失败: {e}", exc_info=True) + + # ==================== Agent测试接口(开发用) ==================== @router.post("/agent/test") @@ -202,3 +294,79 @@ async def test_agent_message( reply = await dispatcher.handle_message(user_id, content) return YudaoResponse.success({"user_id": user_id, "content": content, "reply": reply}) + +# ==================== 群聊管理接口 ==================== + +class CreateGroupRequest(BaseModel): + """创建群聊请求""" + name: str # 群聊名称 + owner: str # 群主 userid + user_list: list # 成员 userid 列表(至少2人) + chat_id: Optional[str] = "" # 自定义群聊ID + + +class SendGroupCardRequest(BaseModel): + """发送群聊卡片请求""" + chat_id: str # 群聊ID + alarm_id: str # 告警ID + mention_user_ids: Optional[list] = [] # 要 @的人员 + + +@router.post("/group/create") +async def create_group_chat(req: CreateGroupRequest): + """创建企微群聊""" + from app.services.wechat_service import get_wechat_service + wechat = get_wechat_service() + chatid = await wechat.create_group_chat( + name=req.name, + owner=req.owner, + user_list=req.user_list, + chat_id=req.chat_id, + ) + if chatid: + return YudaoResponse.success({"chatid": chatid}) + return YudaoResponse.error(500, "创建群聊失败") + + +@router.post("/group/send_alarm") +async def send_group_alarm(req: SendGroupCardRequest): + """发送告警卡片到群聊(测试用,根据 alarm_id 查库)""" + from app.services.wechat_service import get_wechat_service + from app.config import settings + + db = get_session() + try: + alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == req.alarm_id).first() + if not alarm: + return YudaoResponse.error(404, "告警不存在") + + # 查 VLM 描述 + vlm_desc = "" + analysis = db.query(AlarmLlmAnalysis).filter( + AlarmLlmAnalysis.alarm_id == req.alarm_id + ).order_by(AlarmLlmAnalysis.id.desc()).first() + if analysis: + vlm_desc = analysis.summary or "" + + wechat = get_wechat_service() + service_base_url = settings.wechat.service_base_url or "" + event_time_str = alarm.event_time.strftime('%Y-%m-%d %H:%M:%S') if alarm.event_time else "" + + sent = await wechat.send_group_card( + chat_id=req.chat_id, + alarm_id=alarm.alarm_id, + alarm_type=alarm.alarm_type, + area_name=alarm.scene_id or "未知区域", + camera_name=alarm.device_id or "未知摄像头", + description=vlm_desc or f"{alarm.alarm_type} 告警", + event_time=event_time_str, + alarm_level=alarm.alarm_level or 2, + service_base_url=service_base_url, + mention_user_ids=req.mention_user_ids, + ) + if sent: + return YudaoResponse.success(True) + return YudaoResponse.error(500, "发送失败") + finally: + db.close() + diff --git a/app/services/vlm_service.py b/app/services/vlm_service.py index 6ea9acd..3727464 100644 --- a/app/services/vlm_service.py +++ b/app/services/vlm_service.py @@ -81,10 +81,9 @@ class VLMService: @staticmethod def _fallback_result(alarm_type: str, camera_name: str, reason: str) -> Dict: - """降级结果:入侵默认放行(宁可多报),离岗默认拦截(避免VLM不可用时误推)""" - confirmed = alarm_type != "leave_post" + """降级结果:VLM 不可用时统一放行推送(宁可多报,不可漏报)""" return { - "confirmed": confirmed, + "confirmed": True, "description": f"{camera_name or '未知位置'} 触发 {alarm_type} 告警({reason})", "skipped": True, } diff --git a/app/services/wechat_service.py b/app/services/wechat_service.py index 89cbb11..8a7559a 100644 --- a/app/services/wechat_service.py +++ b/app/services/wechat_service.py @@ -1,13 +1,13 @@ """ 企微通知服务 -封装企业微信 API,发送告警文本卡片。 -V1 使用应用消息 + 文本卡片,后期扩展为模板卡片。 +封装企业微信 API,发送告警模板卡片(按钮交互型)。 +用户直接在对话框中点击按钮处理告警,无需跳转 H5 页面。 """ import httpx import time -from typing import Optional, List +from typing import Optional, List, Dict from app.utils.logger import logger @@ -24,6 +24,9 @@ class WeChatService: self._encoding_aes_key = "" self._access_token = "" self._token_expire_at = 0 + # 缓存 response_code,用于更新卡片状态 + # key: task_id (alarm_id), value: response_code + self._response_codes: Dict[str, str] = {} def init(self, config): """初始化企微配置""" @@ -63,6 +66,14 @@ class WeChatService: logger.info("企微 access_token 已更新") return self._access_token + def save_response_code(self, task_id: str, response_code: str): + """保存卡片的 response_code(用于后续更新卡片状态)""" + self._response_codes[task_id] = response_code + + def get_response_code(self, task_id: str) -> Optional[str]: + """获取并消耗 response_code(只能用一次)""" + return self._response_codes.pop(task_id, None) + async def send_alarm_card( self, user_ids: List[str], @@ -77,21 +88,10 @@ class WeChatService: service_base_url: str = "", ) -> bool: """ - 发送告警文本卡片 + 发送按钮交互型模板卡片 - Args: - user_ids: 企微 userid 列表 - alarm_id: 告警ID - alarm_type: 告警类型 - area_name: 区域名称 - camera_name: 摄像头名称 - description: VLM 生成的场景描述 - snapshot_url: 截图 URL - event_time: 告警时间 - alarm_level: 告警级别 - - Returns: - 是否发送成功 + 卡片直接在对话框中展示告警信息 + 操作按钮, + 用户点击按钮后企微回调服务器,无需跳转 H5 页面。 """ if not self._enabled: logger.debug("企微未启用,跳过发送") @@ -110,28 +110,62 @@ class WeChatService: # 告警级别映射 level_names = {1: "提醒", 2: "一般", 3: "严重", 4: "紧急"} level_name = level_names.get(alarm_level, "一般") + level_colors = {1: "blue", 2: "yellow", 3: "red", 4: "red"} - # 构造文本卡片消息 - content = ( - f"