- AlarmEvent 模型添加 area_id 字段 - create_from_edge_report 提取 ext_data.area_id 存储 - 心跳端点 POST /api/ai/device/heartbeat - work_order_client: create_order 支持完整参数(description/priority/triggerSource/cameraId/imageUrl) - notify_dispatch: 工单标题中文化、alarmType 中文映射、永久 COS URL、triggerSource 来源判断 - oss_storage: 新增 get_permanent_url 方法 - 工单创建测试脚本 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
199 lines
6.1 KiB
Python
199 lines
6.1 KiB
Python
"""
|
||
安保工单开放接口客户端
|
||
|
||
对接外部工单系统,支持:
|
||
- 创建工单:POST /open-api/ops/security/order/create
|
||
- 自动结单:POST /open-api/ops/security/order/auto-complete
|
||
- SHA256 签名认证
|
||
"""
|
||
|
||
import hashlib
|
||
import json
|
||
import time
|
||
import uuid
|
||
from typing import Optional
|
||
|
||
import httpx
|
||
|
||
from app.utils.logger import logger
|
||
|
||
|
||
class WorkOrderClient:
|
||
"""安保工单 API 客户端(单例)"""
|
||
|
||
def __init__(self):
|
||
self._enabled = False
|
||
self._base_url = ""
|
||
self._app_id = ""
|
||
self._app_secret = ""
|
||
self._tenant_id = "1"
|
||
self._timeout = 10
|
||
|
||
def init(self, config):
|
||
"""初始化工单配置"""
|
||
self._enabled = config.enabled and bool(config.base_url) and bool(config.app_secret)
|
||
self._base_url = config.base_url.rstrip("/")
|
||
self._app_id = config.app_id
|
||
self._app_secret = config.app_secret
|
||
self._tenant_id = getattr(config, "tenant_id", "1")
|
||
self._timeout = getattr(config, "timeout", 10)
|
||
|
||
if self._enabled:
|
||
logger.info(f"工单客户端已启用: base_url={self._base_url}")
|
||
else:
|
||
logger.info("工单客户端未启用")
|
||
|
||
@property
|
||
def enabled(self) -> bool:
|
||
return self._enabled
|
||
|
||
def _sign(self, body_json: str, nonce: str, timestamp: str, query_str: str = "") -> str:
|
||
"""
|
||
SHA256 签名
|
||
|
||
签名算法:SHA256(query_str + body_json + header_str + appSecret)
|
||
- query_str: Query 参数按 key 字母升序排序拼接,无参数时为空串
|
||
- header_str: 固定顺序 appId=&nonce=×tamp=
|
||
"""
|
||
header_str = f"appId={self._app_id}&nonce={nonce}×tamp={timestamp}"
|
||
raw = f"{query_str}{body_json}{header_str}{self._app_secret}"
|
||
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
|
||
|
||
def _build_headers(self, body_json: str) -> dict:
|
||
"""构造请求头(含签名 + tenant-id)"""
|
||
nonce = uuid.uuid4().hex[:16]
|
||
timestamp = str(int(time.time() * 1000))
|
||
sign = self._sign(body_json, nonce, timestamp)
|
||
|
||
return {
|
||
"Content-Type": "application/json",
|
||
"tenant-id": self._tenant_id,
|
||
"appId": self._app_id,
|
||
"nonce": nonce,
|
||
"timestamp": timestamp,
|
||
"sign": sign,
|
||
}
|
||
|
||
async def create_order(
|
||
self,
|
||
title: str,
|
||
area_id: int,
|
||
alarm_id: str,
|
||
alarm_type: str,
|
||
description: str = "",
|
||
priority: Optional[int] = None,
|
||
trigger_source: str = "自动上报",
|
||
camera_id: str = "",
|
||
image_url: str = "",
|
||
) -> Optional[str]:
|
||
"""
|
||
创建安保工单
|
||
|
||
Args:
|
||
title: 工单标题(中文告警类型,如"人员离岗告警")
|
||
area_id: 区域ID
|
||
alarm_id: 告警ID
|
||
alarm_type: 告警类型编码(leave_post/intrusion等)
|
||
description: VLM 大模型描述
|
||
priority: 优先级(0紧急 1重要 2普通 3轻微)
|
||
trigger_source: 来源(自动上报/人工上报)
|
||
camera_id: 摄像头ID
|
||
image_url: 截图URL
|
||
|
||
Returns:
|
||
orderId 字符串,失败返回 None
|
||
"""
|
||
if not self._enabled:
|
||
logger.debug("工单客户端未启用,跳过创建")
|
||
return None
|
||
|
||
body = {
|
||
"title": title,
|
||
"areaId": area_id,
|
||
"alarmId": alarm_id,
|
||
"alarmType": alarm_type,
|
||
}
|
||
if description:
|
||
body["description"] = description
|
||
if priority is not None:
|
||
body["priority"] = priority
|
||
if trigger_source:
|
||
body["triggerSource"] = trigger_source
|
||
if camera_id:
|
||
body["cameraId"] = camera_id
|
||
if image_url:
|
||
body["imageUrl"] = image_url
|
||
|
||
body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":"))
|
||
|
||
try:
|
||
headers = self._build_headers(body_json)
|
||
url = f"{self._base_url}/open-api/ops/security/order/create"
|
||
|
||
async with httpx.AsyncClient(timeout=self._timeout) as client:
|
||
resp = await client.post(url, content=body_json, headers=headers)
|
||
data = resp.json()
|
||
|
||
if data.get("code") != 0:
|
||
logger.error(f"创建工单失败: {data}")
|
||
return None
|
||
|
||
# API 返回 {"code":0, "data": 1234567890} — data 直接是 orderId
|
||
order_id = str(data.get("data", ""))
|
||
logger.info(f"工单已创建: orderId={order_id}, alarmId={alarm_id}")
|
||
return order_id
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建工单异常: {e}")
|
||
return None
|
||
|
||
async def auto_complete_order(
|
||
self,
|
||
order_id: str,
|
||
remark: str = "",
|
||
) -> bool:
|
||
"""
|
||
自动结单
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
if not self._enabled:
|
||
return False
|
||
|
||
body = {
|
||
"orderId": int(order_id) if order_id else 0,
|
||
"remark": remark,
|
||
}
|
||
body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":"))
|
||
|
||
try:
|
||
headers = self._build_headers(body_json)
|
||
url = f"{self._base_url}/open-api/ops/security/order/auto-complete"
|
||
|
||
async with httpx.AsyncClient(timeout=self._timeout) as client:
|
||
resp = await client.post(url, content=body_json, headers=headers)
|
||
data = resp.json()
|
||
|
||
if data.get("code") != 0:
|
||
logger.error(f"自动结单失败: orderId={order_id}, resp={data}")
|
||
return False
|
||
|
||
logger.info(f"工单已自动结单: orderId={order_id}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"自动结单异常: orderId={order_id}, error={e}")
|
||
return False
|
||
|
||
|
||
# 全局单例
|
||
_work_order_client: Optional[WorkOrderClient] = None
|
||
|
||
|
||
def get_work_order_client() -> WorkOrderClient:
|
||
global _work_order_client
|
||
if _work_order_client is None:
|
||
_work_order_client = WorkOrderClient()
|
||
return _work_order_client
|