Files
iot-device-management-service/app/services/work_order_client.py
16337 44c5df7302 功能:area_id 存储 + 工单对接代码完善 + 心跳端点
- AlarmEvent 模型添加 area_id 字段
- create_from_edge_report 提取 ext_data.area_id 存储
- 心跳端点 POST /api/ai/device/heartbeat
- work_order_client: create_order 支持完整参数(description/priority/triggerSource/cameraId/imageUrl)
- notify_dispatch: 工单标题中文化、alarmType 中文映射、永久 COS URL、triggerSource 来源判断
- oss_storage: 新增 get_permanent_url 方法
- 工单创建测试脚本

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 16:05:05 +08:00

199 lines
6.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
安保工单开放接口客户端
对接外部工单系统,支持:
- 创建工单POST /open-api/ops/security/order/create
- 自动结单POST /open-api/ops/security/order/auto-complete
- SHA256 签名认证
"""
import hashlib
import json
import time
import uuid
from typing import Optional
import httpx
from app.utils.logger import logger
class WorkOrderClient:
"""安保工单 API 客户端(单例)"""
def __init__(self):
self._enabled = False
self._base_url = ""
self._app_id = ""
self._app_secret = ""
self._tenant_id = "1"
self._timeout = 10
def init(self, config):
"""初始化工单配置"""
self._enabled = config.enabled and bool(config.base_url) and bool(config.app_secret)
self._base_url = config.base_url.rstrip("/")
self._app_id = config.app_id
self._app_secret = config.app_secret
self._tenant_id = getattr(config, "tenant_id", "1")
self._timeout = getattr(config, "timeout", 10)
if self._enabled:
logger.info(f"工单客户端已启用: base_url={self._base_url}")
else:
logger.info("工单客户端未启用")
@property
def enabled(self) -> bool:
return self._enabled
def _sign(self, body_json: str, nonce: str, timestamp: str, query_str: str = "") -> str:
"""
SHA256 签名
签名算法SHA256(query_str + body_json + header_str + appSecret)
- query_str: Query 参数按 key 字母升序排序拼接,无参数时为空串
- header_str: 固定顺序 appId=&nonce=&timestamp=
"""
header_str = f"appId={self._app_id}&nonce={nonce}&timestamp={timestamp}"
raw = f"{query_str}{body_json}{header_str}{self._app_secret}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def _build_headers(self, body_json: str) -> dict:
"""构造请求头(含签名 + tenant-id"""
nonce = uuid.uuid4().hex[:16]
timestamp = str(int(time.time() * 1000))
sign = self._sign(body_json, nonce, timestamp)
return {
"Content-Type": "application/json",
"tenant-id": self._tenant_id,
"appId": self._app_id,
"nonce": nonce,
"timestamp": timestamp,
"sign": sign,
}
async def create_order(
self,
title: str,
area_id: int,
alarm_id: str,
alarm_type: str,
description: str = "",
priority: Optional[int] = None,
trigger_source: str = "自动上报",
camera_id: str = "",
image_url: str = "",
) -> Optional[str]:
"""
创建安保工单
Args:
title: 工单标题(中文告警类型,如"人员离岗告警"
area_id: 区域ID
alarm_id: 告警ID
alarm_type: 告警类型编码leave_post/intrusion等
description: VLM 大模型描述
priority: 优先级0紧急 1重要 2普通 3轻微
trigger_source: 来源(自动上报/人工上报)
camera_id: 摄像头ID
image_url: 截图URL
Returns:
orderId 字符串,失败返回 None
"""
if not self._enabled:
logger.debug("工单客户端未启用,跳过创建")
return None
body = {
"title": title,
"areaId": area_id,
"alarmId": alarm_id,
"alarmType": alarm_type,
}
if description:
body["description"] = description
if priority is not None:
body["priority"] = priority
if trigger_source:
body["triggerSource"] = trigger_source
if camera_id:
body["cameraId"] = camera_id
if image_url:
body["imageUrl"] = image_url
body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":"))
try:
headers = self._build_headers(body_json)
url = f"{self._base_url}/open-api/ops/security/order/create"
async with httpx.AsyncClient(timeout=self._timeout) as client:
resp = await client.post(url, content=body_json, headers=headers)
data = resp.json()
if data.get("code") != 0:
logger.error(f"创建工单失败: {data}")
return None
# API 返回 {"code":0, "data": 1234567890} — data 直接是 orderId
order_id = str(data.get("data", ""))
logger.info(f"工单已创建: orderId={order_id}, alarmId={alarm_id}")
return order_id
except Exception as e:
logger.error(f"创建工单异常: {e}")
return None
async def auto_complete_order(
self,
order_id: str,
remark: str = "",
) -> bool:
"""
自动结单
Returns:
是否成功
"""
if not self._enabled:
return False
body = {
"orderId": int(order_id) if order_id else 0,
"remark": remark,
}
body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":"))
try:
headers = self._build_headers(body_json)
url = f"{self._base_url}/open-api/ops/security/order/auto-complete"
async with httpx.AsyncClient(timeout=self._timeout) as client:
resp = await client.post(url, content=body_json, headers=headers)
data = resp.json()
if data.get("code") != 0:
logger.error(f"自动结单失败: orderId={order_id}, resp={data}")
return False
logger.info(f"工单已自动结单: orderId={order_id}")
return True
except Exception as e:
logger.error(f"自动结单异常: orderId={order_id}, error={e}")
return False
# 全局单例
_work_order_client: Optional[WorkOrderClient] = None
def get_work_order_client() -> WorkOrderClient:
global _work_order_client
if _work_order_client is None:
_work_order_client = WorkOrderClient()
return _work_order_client