""" 安保工单开放接口客户端 对接外部工单系统,支持: - 创建工单:POST /open-api/ops/security/order/create - 自动结单:POST /open-api/ops/security/order/auto-complete - SHA256 签名认证 """ import hashlib import json import time import uuid from typing import Optional import httpx from app.utils.logger import logger class WorkOrderClient: """安保工单 API 客户端(单例)""" def __init__(self): self._enabled = False self._base_url = "" self._app_id = "" self._app_secret = "" self._tenant_id = "1" self._timeout = 10 def init(self, config): """初始化工单配置""" self._enabled = config.enabled and bool(config.base_url) and bool(config.app_secret) self._base_url = config.base_url.rstrip("/") self._app_id = config.app_id self._app_secret = config.app_secret self._tenant_id = getattr(config, "tenant_id", "1") self._timeout = getattr(config, "timeout", 10) if self._enabled: logger.info(f"工单客户端已启用: base_url={self._base_url}") else: logger.info("工单客户端未启用") @property def enabled(self) -> bool: return self._enabled def _sign(self, body_json: str, nonce: str, timestamp: str, query_str: str = "") -> str: """ SHA256 签名 签名算法:SHA256(query_str + body_json + header_str + appSecret) - query_str: Query 参数按 key 字母升序排序拼接,无参数时为空串 - header_str: 固定顺序 appId=&nonce=×tamp= """ header_str = f"appId={self._app_id}&nonce={nonce}×tamp={timestamp}" raw = f"{query_str}{body_json}{header_str}{self._app_secret}" return hashlib.sha256(raw.encode("utf-8")).hexdigest() def _build_headers(self, body_json: str) -> dict: """构造请求头(含签名 + tenant-id)""" nonce = uuid.uuid4().hex[:16] timestamp = str(int(time.time() * 1000)) sign = self._sign(body_json, nonce, timestamp) return { "Content-Type": "application/json", "tenant-id": self._tenant_id, "appId": self._app_id, "nonce": nonce, "timestamp": timestamp, "sign": sign, } async def create_order( self, title: str, area_id: int, alarm_id: str, alarm_type: str, description: str = "", priority: Optional[int] = None, trigger_source: str = "自动上报", camera_id: str = "", image_url: str = "", ) -> Optional[str]: """ 创建安保工单 Args: title: 工单标题(中文告警类型,如"人员离岗告警") area_id: 区域ID alarm_id: 告警ID alarm_type: 告警类型编码(leave_post/intrusion等) description: VLM 大模型描述 priority: 优先级(0紧急 1重要 2普通 3轻微) trigger_source: 来源(自动上报/人工上报) camera_id: 摄像头ID image_url: 截图URL Returns: orderId 字符串,失败返回 None """ if not self._enabled: logger.debug("工单客户端未启用,跳过创建") return None body = { "title": title, "areaId": area_id, "alarmId": alarm_id, "alarmType": alarm_type, } if description: body["description"] = description if priority is not None: body["priority"] = priority if trigger_source: body["triggerSource"] = trigger_source if camera_id: body["cameraId"] = camera_id if image_url: body["imageUrl"] = image_url body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":")) try: headers = self._build_headers(body_json) url = f"{self._base_url}/open-api/ops/security/order/create" async with httpx.AsyncClient(timeout=self._timeout) as client: resp = await client.post(url, content=body_json, headers=headers) data = resp.json() if data.get("code") != 0: logger.error(f"创建工单失败: {data}") return None # API 返回 {"code":0, "data": 1234567890} — data 直接是 orderId order_id = str(data.get("data", "")) logger.info(f"工单已创建: orderId={order_id}, alarmId={alarm_id}") return order_id except Exception as e: logger.error(f"创建工单异常: {e}") return None async def auto_complete_order( self, order_id: str, remark: str = "", ) -> bool: """ 自动结单 Returns: 是否成功 """ if not self._enabled: return False body = { "orderId": int(order_id) if order_id else 0, "remark": remark, } body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":")) try: headers = self._build_headers(body_json) url = f"{self._base_url}/open-api/ops/security/order/auto-complete" async with httpx.AsyncClient(timeout=self._timeout) as client: resp = await client.post(url, content=body_json, headers=headers) data = resp.json() if data.get("code") != 0: logger.error(f"自动结单失败: orderId={order_id}, resp={data}") return False logger.info(f"工单已自动结单: orderId={order_id}") return True except Exception as e: logger.error(f"自动结单异常: orderId={order_id}, error={e}") return False async def confirm_order(self, order_id: str) -> bool: """确认工单(保安接单)""" if not self._enabled: return False body = {"orderId": int(order_id) if order_id else 0} body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":")) try: headers = self._build_headers(body_json) url = f"{self._base_url}/open-api/ops/security/order/confirm" async with httpx.AsyncClient(timeout=self._timeout) as client: resp = await client.post(url, content=body_json, headers=headers) data = resp.json() if data.get("code") != 0: logger.error(f"确认工单失败: orderId={order_id}, resp={data}") return False logger.info(f"工单已确认: orderId={order_id}") return True except Exception as e: logger.error(f"确认工单异常: orderId={order_id}, error={e}") return False async def submit_order( self, order_id: str, result: str, result_img_urls: Optional[list] = None, ) -> bool: """工单提交(保安提交处理结果+图片)""" if not self._enabled: return False body = { "orderId": int(order_id) if order_id else 0, "result": result, } if result_img_urls: body["resultImgUrls"] = result_img_urls body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":")) try: headers = self._build_headers(body_json) url = f"{self._base_url}/open-api/ops/security/order/submit" async with httpx.AsyncClient(timeout=self._timeout) as client: resp = await client.post(url, content=body_json, headers=headers) data = resp.json() if data.get("code") != 0: logger.error(f"工单提交失败: orderId={order_id}, resp={data}") return False logger.info(f"工单已提交: orderId={order_id}") return True except Exception as e: logger.error(f"工单提交异常: orderId={order_id}, error={e}") return False async def false_alarm(self, order_id: str) -> bool: """误报标记""" if not self._enabled: return False body = {"orderId": int(order_id) if order_id else 0} body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":")) try: headers = self._build_headers(body_json) url = f"{self._base_url}/open-api/ops/security/order/false-alarm" async with httpx.AsyncClient(timeout=self._timeout) as client: resp = await client.post(url, content=body_json, headers=headers) data = resp.json() if data.get("code") != 0: logger.error(f"误报标记失败: orderId={order_id}, resp={data}") return False logger.info(f"工单已标记误报: orderId={order_id}") return True except Exception as e: logger.error(f"误报标记异常: orderId={order_id}, error={e}") return False # 全局单例 _work_order_client: Optional[WorkOrderClient] = None def get_work_order_client() -> WorkOrderClient: global _work_order_client if _work_order_client is None: _work_order_client = WorkOrderClient() return _work_order_client