diff --git a/app/config.py b/app/config.py index a90b6b5..0fa9a54 100644 --- a/app/config.py +++ b/app/config.py @@ -75,6 +75,16 @@ class AgentConfig: enabled: bool = False +@dataclass +class WorkOrderConfig: + """安保工单开放接口配置""" + base_url: str = "" # 工单系统地址,如 http://aiot-platform.viewsh.com:48080 + app_id: str = "" # 应用ID + app_secret: str = "" # 应用密钥 + timeout: int = 10 # 请求超时(秒) + enabled: bool = False + + @dataclass class RedisConfig: """Redis 配置""" @@ -124,6 +134,7 @@ class Settings(BaseModel): vlm: VLMConfig = VLMConfig() wechat: WeChatConfig = WeChatConfig() agent: AgentConfig = AgentConfig() + work_order: WorkOrderConfig = WorkOrderConfig() redis: RedisConfig = RedisConfig() camera_name: CameraNameConfig = CameraNameConfig() @@ -180,6 +191,13 @@ def load_settings() -> Settings: llm_timeout=int(os.getenv("AGENT_LLM_TIMEOUT", "15")), enabled=os.getenv("AGENT_ENABLED", "false").lower() == "true", ), + work_order=WorkOrderConfig( + base_url=os.getenv("WORK_ORDER_BASE_URL", ""), + app_id=os.getenv("WORK_ORDER_APP_ID", ""), + app_secret=os.getenv("WORK_ORDER_APP_SECRET", ""), + timeout=int(os.getenv("WORK_ORDER_TIMEOUT", "10")), + enabled=os.getenv("WORK_ORDER_ENABLED", "false").lower() == "true", + ), redis=RedisConfig( host=os.getenv("REDIS_HOST", "localhost"), port=int(os.getenv("REDIS_PORT", "6379")), diff --git a/app/main.py b/app/main.py index c7fdfa8..b6844a2 100644 --- a/app/main.py +++ b/app/main.py @@ -65,6 +65,11 @@ async def lifespan(app: FastAPI): agent = get_agent_dispatcher() agent.init(settings.agent) + # 初始化工单客户端 + from app.services.work_order_client import get_work_order_client + wo_client = get_work_order_client() + wo_client.init(settings.work_order) + logger.info("AI 告警平台启动完成") yield diff --git a/app/routers/edge_compat.py b/app/routers/edge_compat.py index c48b364..97574f9 100644 --- a/app/routers/edge_compat.py +++ b/app/routers/edge_compat.py @@ -75,7 +75,11 @@ async def edge_alarm_resolve( 与 /admin-api/aiot/alarm/edge/resolve 功能相同, 但不要求认证,供 Edge 设备直接调用。 + 支持先到先得:已被人工处理的告警不覆盖状态。 """ + # 先检查是否已到终态(先到先得) + was_terminal = service.is_alarm_terminal(resolve.alarm_id) + success = service.resolve_alarm( alarm_id=resolve.alarm_id, duration_ms=resolve.duration_ms, @@ -84,4 +88,54 @@ async def edge_alarm_resolve( ) if not success: return YudaoResponse.error(404, "告警不存在") + + # 如果之前不是终态(边缘端先到),触发自动结单 + 卡片更新 + if not was_terminal: + asyncio.create_task(_resolve_work_order_and_card(resolve.alarm_id, resolve.resolve_type)) + return YudaoResponse.success(True) + + +async def _resolve_work_order_and_card(alarm_id: str, resolve_type: str): + """边缘端 resolve 后异步处理:更新卡片 + 自动结单""" + try: + from app.services.work_order_client import get_work_order_client + from app.services.wechat_service import get_wechat_service + from app.models import get_session, AlarmEventExt + + # 1. 自动结单 + wo_client = get_work_order_client() + if wo_client.enabled: + db = get_session() + try: + ext = db.query(AlarmEventExt).filter( + AlarmEventExt.alarm_id == alarm_id, + AlarmEventExt.ext_type == "WORK_ORDER", + ).first() + order_id = ext.ext_data.get("order_id", "") if ext and ext.ext_data else "" + finally: + db.close() + + if order_id: + remark_map = { + "person_returned": "人员回岗自动关闭", + "non_work_time": "非工作时间自动关闭", + "intrusion_cleared": "入侵消失自动关闭", + } + remark = remark_map.get(resolve_type, f"边缘端自动结单: {resolve_type}") + await wo_client.auto_complete_order(order_id, remark) + + # 2. 更新企微卡片到终态(如果有 response_code) + wechat = get_wechat_service() + if wechat.enabled: + response_code = wechat.get_response_code(alarm_id) + if response_code: + await wechat.update_alarm_card_terminal( + response_code=response_code, + user_ids=[], # 空列表时企微更新所有已收到卡片的用户 + alarm_id=alarm_id, + action="auto_resolve", + ) + + except Exception as e: + logger.error(f"边缘端resolve后处理失败: alarm={alarm_id}, error={e}", exc_info=True) diff --git a/app/routers/wechat_callback.py b/app/routers/wechat_callback.py index eeaec3e..b5c482b 100644 --- a/app/routers/wechat_callback.py +++ b/app/routers/wechat_callback.py @@ -90,16 +90,22 @@ async def _process_agent_message(user_id: str, content: str): async def _process_card_button_click(msg: dict): """ - 处理模板卡片按钮点击事件 + 处理模板卡片按钮点击事件(两步状态机) - 企微回调 XML 解密后包含: - - FromUserName: 点击者 userid - - EventKey: 按钮 key (handle_{alarm_id} / ignore_{alarm_id}) - - TaskId: 卡片的 task_id (alarm_id) - - ResponseCode: 用于更新卡片状态(一次性) + 第一步按钮: + - confirm_{alarm_id}: 确认接单 → 更新状态为处理中,创建工单,卡片更新到第二步 + - ignore_{alarm_id}: 误报忽略 → 终态,自动结单 + + 第二步按钮: + - complete_{alarm_id}: 已处理完成 → 终态,自动结单 + - false_{alarm_id}: 标记误报 → 终态,自动结单 + + 终态按钮: + - done_{alarm_id}: 已完成,忽略 """ try: from app.services.wechat_service import get_wechat_service + from app.services.work_order_client import get_work_order_client user_id = msg.get("FromUserName", "") event_key = msg.get("EventKey", "") @@ -107,65 +113,155 @@ async def _process_card_button_click(msg: dict): response_code = msg.get("ResponseCode", "") logger.info( - f"卡片按钮点击: user={user_id}, key={event_key}, " - f"task={task_id}" + f"卡片按钮点击: user={user_id}, key={event_key}, task={task_id}" ) # 解析 action 和 alarm_id - if event_key.startswith("handle_"): - action = "handle" - alarm_id = event_key[len("handle_"):] + if event_key.startswith("confirm_"): + action = "confirm" + alarm_id = event_key[len("confirm_"):] elif event_key.startswith("ignore_"): action = "ignore" alarm_id = event_key[len("ignore_"):] + elif event_key.startswith("complete_"): + action = "complete" + alarm_id = event_key[len("complete_"):] + elif event_key.startswith("false_"): + action = "false" + alarm_id = event_key[len("false_"):] elif event_key.startswith("done_"): - # 已处理状态的按钮,忽略 - return + return # 终态按钮,忽略 else: logger.warning(f"未知的按钮 key: {event_key}") return - # 更新告警状态 - action_map = { - "handle": { - "alarm_status": "CONFIRMED", - "handle_status": "HANDLING", - "remark": "企微卡片-前往处理", - }, - "ignore": { - "alarm_status": "FALSE", - "handle_status": "IGNORED", - "remark": "企微卡片-标记误报", - }, - } + wechat = get_wechat_service() + wo_client = get_work_order_client() + service = get_alarm_event_service() - action_cfg = action_map.get(action) - if action_cfg: - service = get_alarm_event_service() + # ---- 第一步:确认接单 ---- + if action == "confirm": + # 更新告警状态为处理中 service.handle_alarm( alarm_id=alarm_id, - alarm_status=action_cfg["alarm_status"], - handle_status=action_cfg["handle_status"], + alarm_status="CONFIRMED", + handle_status="HANDLING", handler=user_id, - remark=action_cfg["remark"], + remark="企微卡片-确认接单", ) - logger.info(f"告警状态已更新: alarm={alarm_id}, action={action}, by={user_id}") + logger.info(f"告警已确认接单: alarm={alarm_id}, by={user_id}") - # 更新卡片按钮状态(变灰 + 显示处理结果) - if response_code: - wechat = get_wechat_service() - await wechat.update_alarm_card( - response_code=response_code, - user_ids=[user_id], + # 更新卡片到第二步(新交互按钮) + if response_code: + await wechat.update_alarm_card_step2( + response_code=response_code, + user_ids=[user_id], + alarm_id=alarm_id, + operator_name=user_id, + ) + + # ---- 第一步:误报忽略 ---- + elif action == "ignore": + service.handle_alarm( alarm_id=alarm_id, - action=action, - operator_name=user_id, + alarm_status="FALSE", + handle_status="IGNORED", + handler=user_id, + remark="企微卡片-误报忽略", ) + logger.info(f"告警已标记忽略: alarm={alarm_id}, by={user_id}") + + # 终态卡片 + if response_code: + await wechat.update_alarm_card_terminal( + response_code=response_code, + user_ids=[user_id], + alarm_id=alarm_id, + action="ignore", + operator_name=user_id, + ) + + # 自动结单 + order_id = _get_order_id_for_alarm(alarm_id) + if order_id: + await wo_client.auto_complete_order(order_id, f"误报忽略 by {user_id}") + + # ---- 第二步:已处理完成 ---- + elif action == "complete": + service.handle_alarm( + alarm_id=alarm_id, + alarm_status="CLOSED", + handle_status="DONE", + handler=user_id, + remark="企微卡片-已处理完成", + ) + logger.info(f"告警已处理完成: alarm={alarm_id}, by={user_id}") + + # 终态卡片 + if response_code: + await wechat.update_alarm_card_terminal( + response_code=response_code, + user_ids=[user_id], + alarm_id=alarm_id, + action="complete", + operator_name=user_id, + ) + + # 自动结单 + order_id = _get_order_id_for_alarm(alarm_id) + if order_id: + await wo_client.auto_complete_order(order_id, f"已处理 by {user_id}") + + # ---- 第二步:标记误报 ---- + elif action == "false": + service.handle_alarm( + alarm_id=alarm_id, + alarm_status="FALSE", + handle_status="IGNORED", + handler=user_id, + remark="企微卡片-标记误报", + ) + logger.info(f"告警已标记误报: alarm={alarm_id}, by={user_id}") + + # 终态卡片 + if response_code: + await wechat.update_alarm_card_terminal( + response_code=response_code, + user_ids=[user_id], + alarm_id=alarm_id, + action="false", + operator_name=user_id, + ) + + # 自动结单 + order_id = _get_order_id_for_alarm(alarm_id) + if order_id: + await wo_client.auto_complete_order(order_id, f"标记误报 by {user_id}") except Exception as e: logger.error(f"处理卡片按钮点击失败: {e}", exc_info=True) +def _get_order_id_for_alarm(alarm_id: str) -> str: + """从 alarm_event_ext 中获取关联的工单ID""" + from app.models import get_session, AlarmEventExt + + db = get_session() + try: + ext = db.query(AlarmEventExt).filter( + AlarmEventExt.alarm_id == alarm_id, + AlarmEventExt.ext_type == "WORK_ORDER", + ).first() + if ext and ext.ext_data: + return ext.ext_data.get("order_id", "") + return "" + except Exception as e: + logger.error(f"查询工单ID失败: alarm={alarm_id}, error={e}") + return "" + finally: + db.close() + + # ==================== Agent测试接口(开发用) ==================== @router.post("/agent/test") diff --git a/app/services/alarm_event_service.py b/app/services/alarm_event_service.py index ea34fe3..ab8c999 100644 --- a/app/services/alarm_event_service.py +++ b/app/services/alarm_event_service.py @@ -558,7 +558,11 @@ class AlarmEventService: db.close() def resolve_alarm(self, alarm_id: str, duration_ms: int, last_frame_time: str, resolve_type: str) -> bool: - """更新告警的持续时长和结束时间""" + """ + 更新告警的持续时长和结束时间 + + 先到先得:如果告警已被人工处理到终态(CLOSED/FALSE),仅更新时长,不覆盖状态。 + """ db = get_session() try: alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first() @@ -574,9 +578,12 @@ class AlarmEventService: except Exception: alarm.last_frame_time = beijing_now().replace(microsecond=0) - # 如果已被 VLM 标记为误报(IGNORED),只更新时长,不覆盖状态 - if alarm.handle_status == "IGNORED": - logger.info(f"告警已为误报状态,仅更新时长: {alarm_id}") + # 先到先得:已被人工处理到终态的不覆盖 + terminal_statuses = ("CLOSED", "FALSE") + terminal_handle = ("DONE", "IGNORED") + + if alarm.alarm_status in terminal_statuses or alarm.handle_status in terminal_handle: + logger.info(f"告警已为终态({alarm.alarm_status}/{alarm.handle_status}),仅更新时长: {alarm_id}") elif resolve_type == "person_returned": alarm.alarm_status = "CLOSED" alarm.handle_status = "DONE" @@ -605,6 +612,19 @@ class AlarmEventService: finally: db.close() + def is_alarm_terminal(self, alarm_id: str) -> bool: + """判断告警是否已到终态(先到先得判断用)""" + db = get_session() + try: + alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first() + if not alarm: + return False + return alarm.alarm_status in ("CLOSED", "FALSE") or alarm.handle_status in ("DONE", "IGNORED") + except Exception: + return False + finally: + db.close() + def count_alarms_by_edge_node(self, edge_node_id: str) -> int: """统计指定边缘节点的告警数量""" db = get_session() diff --git a/app/services/notify_dispatch.py b/app/services/notify_dispatch.py index 7816b57..eb03823 100644 --- a/app/services/notify_dispatch.py +++ b/app/services/notify_dispatch.py @@ -17,12 +17,13 @@ from typing import Dict, List from app.models import ( get_session, - AlarmEvent, AlarmLlmAnalysis, + AlarmEvent, AlarmLlmAnalysis, AlarmEventExt, CameraAreaBinding, AreaPersonBinding, NotifyArea, ) from app.config import settings from app.services.vlm_service import get_vlm_service -from app.services.wechat_service import get_wechat_service +from app.services.wechat_service import get_wechat_service, ALARM_TYPE_NAMES, ALARM_LEVEL_NAMES +from app.services.work_order_client import get_work_order_client from app.utils.logger import logger from app.utils.timezone import beijing_now @@ -142,6 +143,21 @@ async def process_alarm_notification(alarm_data: Dict): else: logger.warning(f"个人卡片发送失败: {alarm_id}") + # ---- 4. 创建安保工单 ---- + wo_client = get_work_order_client() + if wo_client.enabled: + type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type) + level_name = ALARM_LEVEL_NAMES.get(alarm_level, "一般") + wo_title = f"【{level_name}】{type_name}告警 - {area_name}" + order_id = await wo_client.create_order( + title=wo_title, + area_id=area_name, + alarm_id=alarm_id, + alarm_type=alarm_type, + ) + if order_id: + _save_order_id(alarm_id, order_id) + except Exception as e: logger.error(f"告警通知处理失败: {alarm_id}, error={e}", exc_info=True) @@ -255,3 +271,22 @@ def _get_notify_persons(camera_id: str, alarm_level: int) -> tuple: return ("未知区域", []) finally: db.close() + + +def _save_order_id(alarm_id: str, order_id: str): + """将工单ID保存到 alarm_event_ext(ext_type=WORK_ORDER)""" + db = get_session() + try: + ext = AlarmEventExt( + alarm_id=alarm_id, + ext_type="WORK_ORDER", + ext_data={"order_id": order_id}, + ) + db.add(ext) + db.commit() + logger.info(f"工单ID已关联: alarm={alarm_id}, order={order_id}") + except Exception as e: + db.rollback() + logger.error(f"保存工单ID失败: {e}") + finally: + db.close() diff --git a/app/services/wechat_service.py b/app/services/wechat_service.py index 01fcfbe..c35663e 100644 --- a/app/services/wechat_service.py +++ b/app/services/wechat_service.py @@ -199,9 +199,9 @@ class WeChatService: }, "button_list": [ { - "text": "前往处理", + "text": "确认接单", "style": 1, - "key": f"handle_{alarm_id}", + "key": f"confirm_{alarm_id}", }, { "text": "误报忽略", @@ -232,7 +232,72 @@ class WeChatService: logger.error(f"企微卡片发送异常: {e}") return False - async def update_alarm_card( + async def update_alarm_card_step2( + self, + response_code: str, + user_ids: List[str], + alarm_id: str, + operator_name: str = "", + ) -> bool: + """ + 更新卡片到第二步:确认接单后显示「已处理完成」「标记误报」按钮 + + 利用 update_template_card 将卡片更新为新的交互卡片(非终态), + 用户点击后会生成新的 response_code,实现链式更新。 + """ + if not self._enabled: + return False + + try: + access_token = await self._get_access_token() + + body = { + "userids": user_ids, + "agentid": self.agent_id_int, + "response_code": response_code, + "template_card": { + "card_type": "button_interaction", + "task_id": alarm_id, + "source": { + "desc": "AI安防告警 - 处理中", + "desc_color": 0, + }, + "main_title": { + "title": f"已接单 - {operator_name}" if operator_name else "已接单", + }, + "sub_title_text": "请完成处理后选择操作", + "button_list": [ + { + "text": "已处理完成", + "style": 1, + "key": f"complete_{alarm_id}", + }, + { + "text": "标记误报", + "style": 2, + "key": f"false_{alarm_id}", + }, + ], + }, + } + + url = f"https://qyapi.weixin.qq.com/cgi-bin/message/update_template_card?access_token={access_token}" + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post(url, json=body) + data = resp.json() + + if data.get("errcode") != 0: + logger.error(f"更新卡片到步骤2失败: {data}") + return False + + logger.info(f"卡片已更新到步骤2: alarm={alarm_id}, operator={operator_name}") + return True + + except Exception as e: + logger.error(f"更新卡片步骤2异常: {e}") + return False + + async def update_alarm_card_terminal( self, response_code: str, user_ids: List[str], @@ -240,7 +305,7 @@ class WeChatService: action: str, operator_name: str = "", ) -> bool: - """更新模板卡片状态(按钮变灰 + 显示处理结果)""" + """更新卡片到终态(按钮变灰 + 显示最终结果)""" if not self._enabled: return False @@ -248,9 +313,10 @@ class WeChatService: access_token = await self._get_access_token() action_text = { - "handle": f"处理中 - {operator_name}" if operator_name else "处理中", - "ignore": f"已忽略 - {operator_name}" if operator_name else "已忽略", "complete": f"已处理 - {operator_name}" if operator_name else "已处理", + "false": f"已标记误报 - {operator_name}" if operator_name else "已标记误报", + "ignore": f"已忽略 - {operator_name}" if operator_name else "已忽略", + "auto_resolve": "系统自动结单", } replace_text = action_text.get(action, "已处理") @@ -264,7 +330,7 @@ class WeChatService: "main_title": { "title": replace_text, }, - "sub_title_text": f"操作人:{operator_name}", + "sub_title_text": f"操作人:{operator_name}" if operator_name else "", "button_list": [ { "text": replace_text, @@ -281,14 +347,14 @@ class WeChatService: data = resp.json() if data.get("errcode") != 0: - logger.error(f"更新卡片失败: {data}") + logger.error(f"更新卡片终态失败: {data}") return False - logger.info(f"卡片已更新: alarm={alarm_id}, action={action}") + logger.info(f"卡片已更新到终态: alarm={alarm_id}, action={action}") return True except Exception as e: - logger.error(f"更新卡片异常: {e}") + logger.error(f"更新卡片终态异常: {e}") return False # ==================== 个人消息:文本 ==================== diff --git a/app/services/work_order_client.py b/app/services/work_order_client.py new file mode 100644 index 0000000..ab11534 --- /dev/null +++ b/app/services/work_order_client.py @@ -0,0 +1,170 @@ +""" +安保工单开放接口客户端 + +对接外部工单系统,支持: +- 创建工单:POST /open-api/ops/security/order/create +- 自动结单:POST /open-api/ops/security/order/auto-complete +- SHA256 签名认证 +""" + +import hashlib +import json +import time +import uuid +from typing import Optional + +import httpx + +from app.utils.logger import logger + + +class WorkOrderClient: + """安保工单 API 客户端(单例)""" + + def __init__(self): + self._enabled = False + self._base_url = "" + self._app_id = "" + self._app_secret = "" + self._timeout = 10 + + def init(self, config): + """初始化工单配置""" + self._enabled = config.enabled and bool(config.base_url) and bool(config.app_secret) + self._base_url = config.base_url.rstrip("/") + self._app_id = config.app_id + self._app_secret = config.app_secret + self._timeout = getattr(config, "timeout", 10) + + if self._enabled: + logger.info(f"工单客户端已启用: base_url={self._base_url}") + else: + logger.info("工单客户端未启用") + + @property + def enabled(self) -> bool: + return self._enabled + + def _sign(self, body_json: str, nonce: str, timestamp: str) -> str: + """ + SHA256 签名 + + 签名算法:SHA256(body_json + "appId=" + appId + "&nonce=" + nonce + "×tamp=" + timestamp + appSecret) + """ + raw = ( + body_json + + "appId=" + self._app_id + + "&nonce=" + nonce + + "×tamp=" + timestamp + + self._app_secret + ) + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + def _build_headers(self, body_json: str) -> dict: + """构造请求头(含签名)""" + nonce = uuid.uuid4().hex[:16] + timestamp = str(int(time.time() * 1000)) + sign = self._sign(body_json, nonce, timestamp) + + return { + "Content-Type": "application/json", + "appId": self._app_id, + "nonce": nonce, + "timestamp": timestamp, + "sign": sign, + } + + async def create_order( + self, + title: str, + area_id: str, + alarm_id: str, + alarm_type: str, + ) -> Optional[str]: + """ + 创建安保工单 + + Returns: + orderId 字符串,失败返回 None + """ + if not self._enabled: + logger.debug("工单客户端未启用,跳过创建") + return None + + body = { + "title": title, + "areaId": area_id, + "alarmId": alarm_id, + "alarmType": alarm_type, + } + body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":")) + + try: + headers = self._build_headers(body_json) + url = f"{self._base_url}/open-api/ops/security/order/create" + + async with httpx.AsyncClient(timeout=self._timeout) as client: + resp = await client.post(url, content=body_json, headers=headers) + data = resp.json() + + if data.get("code") != 0: + logger.error(f"创建工单失败: {data}") + return None + + order_id = data.get("data", {}).get("orderId", "") + logger.info(f"工单已创建: orderId={order_id}, alarmId={alarm_id}") + return order_id + + except Exception as e: + logger.error(f"创建工单异常: {e}") + return None + + async def auto_complete_order( + self, + order_id: str, + remark: str = "", + ) -> bool: + """ + 自动结单 + + Returns: + 是否成功 + """ + if not self._enabled: + return False + + body = { + "orderId": order_id, + "remark": remark, + } + body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":")) + + try: + headers = self._build_headers(body_json) + url = f"{self._base_url}/open-api/ops/security/order/auto-complete" + + async with httpx.AsyncClient(timeout=self._timeout) as client: + resp = await client.post(url, content=body_json, headers=headers) + data = resp.json() + + if data.get("code") != 0: + logger.error(f"自动结单失败: orderId={order_id}, resp={data}") + return False + + logger.info(f"工单已自动结单: orderId={order_id}") + return True + + except Exception as e: + logger.error(f"自动结单异常: orderId={order_id}, error={e}") + return False + + +# 全局单例 +_work_order_client: Optional[WorkOrderClient] = None + + +def get_work_order_client() -> WorkOrderClient: + global _work_order_client + if _work_order_client is None: + _work_order_client = WorkOrderClient() + return _work_order_client