功能:area_id 存储 + 工单对接代码完善 + 心跳端点
- AlarmEvent 模型添加 area_id 字段 - create_from_edge_report 提取 ext_data.area_id 存储 - 心跳端点 POST /api/ai/device/heartbeat - work_order_client: create_order 支持完整参数(description/priority/triggerSource/cameraId/imageUrl) - notify_dispatch: 工单标题中文化、alarmType 中文映射、永久 COS URL、triggerSource 来源判断 - oss_storage: 新增 get_permanent_url 方法 - 工单创建测试脚本 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -271,13 +271,14 @@ class AlarmEvent(Base):
|
||||
first_frame_time = Column(DateTime, comment="首帧时间")
|
||||
last_frame_time = Column(DateTime, comment="末帧时间")
|
||||
duration_ms = Column(Integer, comment="持续时长(毫秒)")
|
||||
alarm_level = Column(SmallInteger, comment="告警级别: 1提醒 2一般 3严重 4紧急")
|
||||
alarm_level = Column(SmallInteger, comment="告警级别: 0紧急 1重要 2普通 3轻微")
|
||||
confidence_score = Column(Float, comment="置信度 0-1")
|
||||
alarm_status = Column(String(20), default="NEW", comment="告警状态: NEW/CONFIRMED/FALSE/CLOSED")
|
||||
handle_status = Column(String(20), default="UNHANDLED", comment="处理状态: UNHANDLED/HANDLING/DONE")
|
||||
snapshot_url = Column(String(512), comment="截图URL")
|
||||
video_url = Column(String(512), comment="视频URL")
|
||||
edge_node_id = Column(String(64), comment="边缘节点ID")
|
||||
area_id = Column(BigInteger, comment="所属区域ID")
|
||||
handler = Column(String(64), comment="处理人")
|
||||
handle_remark = Column(Text, comment="处理备注")
|
||||
handled_at = Column(DateTime, comment="处理时间")
|
||||
@@ -308,6 +309,7 @@ class AlarmEvent(Base):
|
||||
"snapshot_url": self.snapshot_url,
|
||||
"video_url": self.video_url,
|
||||
"edge_node_id": self.edge_node_id,
|
||||
"area_id": self.area_id,
|
||||
"handler": self.handler,
|
||||
"handle_remark": self.handle_remark,
|
||||
"handled_at": self.handled_at.strftime('%Y-%m-%d %H:%M:%S') if self.handled_at else None,
|
||||
|
||||
@@ -8,19 +8,21 @@ Edge 设备使用 /api/ai/alert/edge/report 和 /api/ai/alert/edge/resolve 路
|
||||
|
||||
import asyncio
|
||||
from datetime import datetime
|
||||
from fastapi import APIRouter, Depends
|
||||
from fastapi import APIRouter, Depends, Request
|
||||
from typing import Optional
|
||||
|
||||
from app.yudao_compat import YudaoResponse
|
||||
from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService
|
||||
from app.services.notification_service import get_notification_service
|
||||
from app.schemas import EdgeAlarmReport, EdgeAlarmResolve
|
||||
from app.models import EdgeDevice, DeviceStatus, get_session
|
||||
from app.utils.logger import logger
|
||||
from app.utils.timezone import beijing_now
|
||||
|
||||
router = APIRouter(prefix="/api/ai/alert", tags=["Edge-兼容路由"])
|
||||
router = APIRouter(prefix="/api/ai", tags=["Edge-兼容路由"])
|
||||
|
||||
|
||||
@router.post("/edge/report")
|
||||
@router.post("/alert/edge/report")
|
||||
async def edge_alarm_report(
|
||||
report: EdgeAlarmReport,
|
||||
service: AlarmEventService = Depends(get_alarm_event_service),
|
||||
@@ -65,7 +67,7 @@ async def edge_alarm_report(
|
||||
})
|
||||
|
||||
|
||||
@router.post("/edge/resolve")
|
||||
@router.post("/alert/edge/resolve")
|
||||
async def edge_alarm_resolve(
|
||||
resolve: EdgeAlarmResolve,
|
||||
service: AlarmEventService = Depends(get_alarm_event_service),
|
||||
@@ -96,6 +98,64 @@ async def edge_alarm_resolve(
|
||||
return YudaoResponse.success(True)
|
||||
|
||||
|
||||
@router.post("/device/heartbeat")
|
||||
async def edge_device_heartbeat(request: Request):
|
||||
"""
|
||||
Edge 设备心跳上报(无认证)
|
||||
|
||||
每 30 秒由 Edge 推理服务调用,更新设备在线状态和运行指标。
|
||||
"""
|
||||
data = await request.json()
|
||||
device_id = data.get("device_id", "")
|
||||
status_data = data.get("status", {})
|
||||
|
||||
if not device_id:
|
||||
return YudaoResponse.error(400, "缺少 device_id")
|
||||
|
||||
db = get_session()
|
||||
try:
|
||||
device = db.query(EdgeDevice).filter(EdgeDevice.device_id == device_id).first()
|
||||
now = beijing_now()
|
||||
|
||||
if not device:
|
||||
device = EdgeDevice(
|
||||
device_id=device_id,
|
||||
device_name=device_id,
|
||||
status=DeviceStatus.ONLINE,
|
||||
last_heartbeat=now,
|
||||
created_at=now,
|
||||
)
|
||||
db.add(device)
|
||||
|
||||
device.status = DeviceStatus.ONLINE
|
||||
device.last_heartbeat = now
|
||||
device.uptime_seconds = status_data.get("uptime_seconds")
|
||||
device.frames_processed = status_data.get("frames_processed")
|
||||
device.alerts_generated = status_data.get("alerts_generated")
|
||||
device.stream_count = status_data.get("stream_count")
|
||||
device.config_version = str(status_data.get("config_version", ""))
|
||||
device.extra_info = status_data.get("stream_stats")
|
||||
device.updated_at = now
|
||||
|
||||
# 从请求头获取 IP
|
||||
client_ip = request.headers.get("x-forwarded-for", "").split(",")[0].strip()
|
||||
if not client_ip:
|
||||
client_ip = request.client.host if request.client else None
|
||||
if client_ip:
|
||||
device.ip_address = client_ip
|
||||
|
||||
db.commit()
|
||||
logger.debug(f"[心跳] 设备 {device_id} 心跳更新成功")
|
||||
|
||||
return YudaoResponse.success(True)
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
logger.error(f"[心跳] 处理失败: {e}")
|
||||
return YudaoResponse.error(500, f"心跳处理失败: {e}")
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
async def _resolve_card_update(alarm_id: str, resolve_type: str):
|
||||
"""边缘端 resolve 后异步处理:更新卡片(工单暂未上线)"""
|
||||
try:
|
||||
|
||||
@@ -21,41 +21,52 @@ def generate_alarm_id() -> str:
|
||||
return f"ALM{timestamp}{unique_id}"
|
||||
|
||||
|
||||
def _determine_alarm_level(alarm_type: str, confidence: float, duration_ms: Optional[int] = None) -> int:
|
||||
def _determine_alarm_level(
|
||||
alarm_type: str,
|
||||
confidence: float,
|
||||
duration_ms: Optional[int] = None,
|
||||
initial_level: Optional[int] = None,
|
||||
) -> int:
|
||||
"""
|
||||
根据告警类型、置信度和持续时长确定告警级别
|
||||
返回: 1提醒 2一般 3严重 4紧急
|
||||
返回: 0紧急 1重要 2普通 3轻微
|
||||
|
||||
三层决策:
|
||||
1. 初始等级:Edge 上报的 alarm_level(来自 ROI 算法绑定配置)
|
||||
2. 时长升级:持续型告警随时长升级(只升不降)
|
||||
3. 无配置时使用算法默认等级
|
||||
"""
|
||||
# 算法默认等级
|
||||
default_levels = {
|
||||
"intrusion": 1, # 重要
|
||||
"leave_post": 2, # 普通
|
||||
"illegal_parking": 1, # 重要
|
||||
"vehicle_congestion": 2, # 普通
|
||||
}
|
||||
base_level = initial_level if initial_level is not None else default_levels.get(alarm_type, 2)
|
||||
|
||||
# 入侵检测:事件型,不升级
|
||||
if alarm_type == "intrusion":
|
||||
return 3 # 严重
|
||||
elif alarm_type == "leave_post":
|
||||
# 告警触发时 duration_ms 为 None,设置为一般级别
|
||||
if duration_ms is None:
|
||||
return 2 # 一般级别(刚触发,持续时长未知)
|
||||
return base_level
|
||||
|
||||
# 根据持续时长判断级别
|
||||
if duration_ms > 30 * 60 * 1000:
|
||||
return 3 # 严重
|
||||
elif duration_ms > 10 * 60 * 1000:
|
||||
return 2 # 一般
|
||||
return 1 # 提醒
|
||||
elif alarm_type == "illegal_parking":
|
||||
# 违停:根据停留时长判断级别
|
||||
if duration_ms is None:
|
||||
return 2 # 一般级别(刚触发)
|
||||
if duration_ms > 60 * 60 * 1000:
|
||||
return 3 # 严重(超过1小时)
|
||||
elif duration_ms > 15 * 60 * 1000:
|
||||
return 2 # 一般(超过15分钟)
|
||||
return 1 # 提醒
|
||||
elif alarm_type == "vehicle_congestion":
|
||||
return 2 # 一般级别(拥堵本身不分等级,由持续时长在resolve时重算)
|
||||
elif confidence and confidence > 0.9:
|
||||
return 3 # 严重
|
||||
elif confidence and confidence > 0.7:
|
||||
return 2 # 一般
|
||||
# 持续型告警:根据时长升级(只升不降,即取较小值)
|
||||
if duration_ms is not None:
|
||||
escalated = base_level
|
||||
if alarm_type == "leave_post":
|
||||
if duration_ms > 60 * 60 * 1000: # >1h → 紧急
|
||||
escalated = 0
|
||||
elif duration_ms > 30 * 60 * 1000: # >30min → 重要
|
||||
escalated = 1
|
||||
elif alarm_type == "illegal_parking":
|
||||
if duration_ms > 60 * 60 * 1000: # >1h → 紧急
|
||||
escalated = 0
|
||||
elif alarm_type == "vehicle_congestion":
|
||||
if duration_ms > 30 * 60 * 1000: # >30min → 重要
|
||||
escalated = 1
|
||||
# 只升不降
|
||||
return min(base_level, escalated)
|
||||
|
||||
return 2 # 默认一般
|
||||
return base_level
|
||||
|
||||
|
||||
class AlarmEventService:
|
||||
@@ -196,9 +207,14 @@ class AlarmEventService:
|
||||
alarm_level = data.get("alarm_level")
|
||||
ext_data = data.get("ext_data") or {}
|
||||
if alarm_level is None:
|
||||
# 从 ext_data 取 duration_ms
|
||||
# Edge 未提供等级,使用算法默认 + 时长升级
|
||||
duration_ms = ext_data.get("duration_ms")
|
||||
alarm_level = _determine_alarm_level(alarm_type, confidence or 0, duration_ms)
|
||||
else:
|
||||
# Edge 提供了初始等级,检查是否需要时长升级
|
||||
duration_ms = ext_data.get("duration_ms")
|
||||
if duration_ms is not None:
|
||||
alarm_level = _determine_alarm_level(alarm_type, confidence or 0, duration_ms, initial_level=alarm_level)
|
||||
|
||||
# 解析 first_frame_time(离岗开始时间,由 Edge 在 ext_data 中传递)
|
||||
first_frame_time = None
|
||||
@@ -224,6 +240,7 @@ class AlarmEventService:
|
||||
handle_status="UNHANDLED",
|
||||
snapshot_url=data.get("snapshot_url"),
|
||||
edge_node_id=ext_data.get("edge_node_id"),
|
||||
area_id=ext_data.get("area_id"),
|
||||
)
|
||||
|
||||
db.add(alarm)
|
||||
@@ -582,6 +599,15 @@ class AlarmEventService:
|
||||
|
||||
alarm.duration_ms = duration_ms
|
||||
|
||||
# 根据持续时长重新计算告警等级(只升不降)
|
||||
escalated_level = _determine_alarm_level(
|
||||
alarm.alarm_type, alarm.confidence_score or 0,
|
||||
duration_ms, initial_level=alarm.alarm_level,
|
||||
)
|
||||
if escalated_level < alarm.alarm_level:
|
||||
logger.info(f"告警等级升级: {alarm_id}, {alarm.alarm_level} → {escalated_level}")
|
||||
alarm.alarm_level = escalated_level
|
||||
|
||||
# 解析 last_frame_time,去除微秒保持格式一致
|
||||
try:
|
||||
parsed_time = datetime.fromisoformat(last_frame_time.replace("Z", "+00:00"))
|
||||
|
||||
@@ -146,17 +146,27 @@ async def process_alarm_notification(alarm_data: Dict):
|
||||
# ---- 4. 创建安保工单(暂未上线,待本地测试通过后启用) ----
|
||||
# wo_client = get_work_order_client()
|
||||
# if wo_client.enabled:
|
||||
# type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type)
|
||||
# level_name = ALARM_LEVEL_NAMES.get(alarm_level, "一般")
|
||||
# wo_title = f"【{level_name}】{type_name}告警 - {area_name}"
|
||||
# order_id = await wo_client.create_order(
|
||||
# title=wo_title,
|
||||
# area_id=area_id_int,
|
||||
# alarm_id=alarm_id,
|
||||
# alarm_type=alarm_type,
|
||||
# )
|
||||
# if order_id:
|
||||
# _save_order_id(alarm_id, order_id)
|
||||
# wo_area_id = _get_alarm_area_id(alarm_id) or area_id_int
|
||||
# if wo_area_id:
|
||||
# type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type)
|
||||
# wo_title = f"{type_name}告警"
|
||||
# trigger_source = _get_trigger_source(alarm_id)
|
||||
# permanent_url = _get_permanent_url(snapshot_url)
|
||||
# order_id = await wo_client.create_order(
|
||||
# title=wo_title,
|
||||
# area_id=wo_area_id,
|
||||
# alarm_id=alarm_id,
|
||||
# alarm_type=type_name,
|
||||
# description=description,
|
||||
# priority=alarm_level,
|
||||
# trigger_source=trigger_source,
|
||||
# camera_id=device_id,
|
||||
# image_url=permanent_url,
|
||||
# )
|
||||
# if order_id:
|
||||
# _save_order_id(alarm_id, order_id)
|
||||
# else:
|
||||
# logger.warning(f"告警无 area_id,跳过工单创建: {alarm_id}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"告警通知处理失败: {alarm_id}, error={e}", exc_info=True)
|
||||
@@ -253,7 +263,7 @@ def _get_notify_persons(camera_id: str, alarm_level: int) -> tuple:
|
||||
persons = db.query(AreaPersonBinding).filter(
|
||||
AreaPersonBinding.area_id == binding.area_id,
|
||||
AreaPersonBinding.enabled == 1,
|
||||
AreaPersonBinding.notify_level <= alarm_level,
|
||||
AreaPersonBinding.notify_level >= alarm_level,
|
||||
).all()
|
||||
|
||||
result = [
|
||||
@@ -274,6 +284,54 @@ def _get_notify_persons(camera_id: str, alarm_level: int) -> tuple:
|
||||
db.close()
|
||||
|
||||
|
||||
def _get_alarm_area_id(alarm_id: str) -> int:
|
||||
"""从 alarm_event 表获取 area_id"""
|
||||
db = get_session()
|
||||
try:
|
||||
alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first()
|
||||
return alarm.area_id if alarm and alarm.area_id else 0
|
||||
except Exception:
|
||||
return 0
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def _get_trigger_source(alarm_id: str) -> str:
|
||||
"""从 alarm_event_ext 的 ext_type 判断告警来源"""
|
||||
db = get_session()
|
||||
try:
|
||||
ext = db.query(AlarmEventExt).filter(
|
||||
AlarmEventExt.alarm_id == alarm_id,
|
||||
AlarmEventExt.ext_type.in_(["EDGE_HTTP", "EDGE", "POST", "MANUAL"]),
|
||||
).first()
|
||||
if ext:
|
||||
return {
|
||||
"EDGE_HTTP": "自动上报",
|
||||
"EDGE": "自动上报",
|
||||
"POST": "人工上报",
|
||||
"MANUAL": "人工上报",
|
||||
}.get(ext.ext_type, "自动上报")
|
||||
return "自动上报"
|
||||
except Exception:
|
||||
return "自动上报"
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def _get_permanent_url(snapshot_url: str) -> str:
|
||||
"""将 COS object key 转为永久访问 URL"""
|
||||
if not snapshot_url:
|
||||
return ""
|
||||
if snapshot_url.startswith("http"):
|
||||
return snapshot_url
|
||||
try:
|
||||
from app.services.oss_storage import get_oss_storage
|
||||
return get_oss_storage().get_permanent_url(snapshot_url)
|
||||
except Exception as e:
|
||||
logger.warning(f"生成永久URL失败: {e}")
|
||||
return ""
|
||||
|
||||
|
||||
def _save_order_id(alarm_id: str, order_id: str):
|
||||
"""将工单ID保存到 alarm_event_ext(ext_type=WORK_ORDER)"""
|
||||
db = get_session()
|
||||
|
||||
@@ -226,6 +226,19 @@ class COSStorage:
|
||||
except Exception as e:
|
||||
raise CosStorageError(f"COS 文件上传失败: {e}")
|
||||
|
||||
# ======================== 永久 URL ========================
|
||||
|
||||
def get_permanent_url(self, object_key: str) -> str:
|
||||
"""获取永久访问 URL
|
||||
|
||||
格式: https://{bucket}.cos.{region}.myqcloud.com/{key}
|
||||
"""
|
||||
if not object_key:
|
||||
return ""
|
||||
if object_key.startswith("http"):
|
||||
return object_key
|
||||
return f"https://{settings.cos.bucket}.cos.{settings.cos.region}.myqcloud.com/{object_key}"
|
||||
|
||||
# ======================== 下载(预签名 URL) ========================
|
||||
|
||||
def get_presigned_url(self, object_key: str, expire: Optional[int] = None) -> str:
|
||||
|
||||
@@ -80,10 +80,26 @@ class WorkOrderClient:
|
||||
area_id: int,
|
||||
alarm_id: str,
|
||||
alarm_type: str,
|
||||
description: str = "",
|
||||
priority: Optional[int] = None,
|
||||
trigger_source: str = "自动上报",
|
||||
camera_id: str = "",
|
||||
image_url: str = "",
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
创建安保工单
|
||||
|
||||
Args:
|
||||
title: 工单标题(中文告警类型,如"人员离岗告警")
|
||||
area_id: 区域ID
|
||||
alarm_id: 告警ID
|
||||
alarm_type: 告警类型编码(leave_post/intrusion等)
|
||||
description: VLM 大模型描述
|
||||
priority: 优先级(0紧急 1重要 2普通 3轻微)
|
||||
trigger_source: 来源(自动上报/人工上报)
|
||||
camera_id: 摄像头ID
|
||||
image_url: 截图URL
|
||||
|
||||
Returns:
|
||||
orderId 字符串,失败返回 None
|
||||
"""
|
||||
@@ -97,6 +113,17 @@ class WorkOrderClient:
|
||||
"alarmId": alarm_id,
|
||||
"alarmType": alarm_type,
|
||||
}
|
||||
if description:
|
||||
body["description"] = description
|
||||
if priority is not None:
|
||||
body["priority"] = priority
|
||||
if trigger_source:
|
||||
body["triggerSource"] = trigger_source
|
||||
if camera_id:
|
||||
body["cameraId"] = camera_id
|
||||
if image_url:
|
||||
body["imageUrl"] = image_url
|
||||
|
||||
body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":"))
|
||||
|
||||
try:
|
||||
|
||||
275
test_work_order.py
Normal file
275
test_work_order.py
Normal file
@@ -0,0 +1,275 @@
|
||||
"""
|
||||
安保工单开放接口测试脚本
|
||||
|
||||
基于最新接入文档(v1.0.0 / 2026-03-10),测试内容:
|
||||
1. SHA256 签名生成验证
|
||||
2. 创建工单 API(含 tenant-id 请求头)
|
||||
3. 自动完单 API
|
||||
|
||||
签名公式:SHA256(Query参数排序后 + Body原始JSON + Header参数 + appSecret)
|
||||
- Query 参数: 按 key 字母升序排序,无参数时为空串
|
||||
- Header 参数: 固定顺序 appId=&nonce=×tamp=
|
||||
|
||||
使用方法:
|
||||
python test_work_order.py
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import json
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import httpx
|
||||
|
||||
|
||||
# ===== 配置(使用实际凭证) =====
|
||||
BASE_URL = "http://192.168.0.104:48080"
|
||||
APP_ID = "alarm-system"
|
||||
APP_SECRET = "tQ3v5q1z2ZLu7hrU1yseaHwg1wJUcmF1"
|
||||
TENANT_ID = "1"
|
||||
|
||||
|
||||
def build_sign(query_str: str, body_json: str, nonce: str, timestamp: str) -> str:
|
||||
"""
|
||||
构建签名
|
||||
|
||||
签名字符串 = Query参数(排序后) + Body原始JSON + Header参数 + appSecret
|
||||
Header参数 = appId={appId}&nonce={nonce}×tamp={timestamp}
|
||||
"""
|
||||
header_str = f"appId={APP_ID}&nonce={nonce}×tamp={timestamp}"
|
||||
sign_string = f"{query_str}{body_json}{header_str}{APP_SECRET}"
|
||||
return hashlib.sha256(sign_string.encode("utf-8")).hexdigest(), sign_string
|
||||
|
||||
|
||||
def build_headers(body_json: str, query_str: str = "") -> dict:
|
||||
"""构造完整请求头(含 tenant-id + 签名)"""
|
||||
nonce = uuid.uuid4().hex[:16]
|
||||
timestamp = str(int(time.time() * 1000))
|
||||
sign, sign_raw = build_sign(query_str, body_json, nonce, timestamp)
|
||||
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"tenant-id": TENANT_ID,
|
||||
"appId": APP_ID,
|
||||
"timestamp": timestamp,
|
||||
"nonce": nonce,
|
||||
"sign": sign,
|
||||
}
|
||||
return headers, sign_raw
|
||||
|
||||
|
||||
async def test_signature():
|
||||
"""测试 0: 签名生成验证(用固定参数便于手动对比)"""
|
||||
print("=" * 60)
|
||||
print("测试 0: 签名生成验证")
|
||||
print("=" * 60)
|
||||
|
||||
body = {"title": "A栋3层入侵告警", "areaId": 1309, "alarmId": "ALM001", "alarmType": "intrusion"}
|
||||
body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":"))
|
||||
nonce = "a1b2c3d4e5f6g7h8"
|
||||
timestamp = "1704357025000"
|
||||
|
||||
sign, sign_raw = build_sign("", body_json, nonce, timestamp)
|
||||
|
||||
print(f" Body JSON: {body_json}")
|
||||
print(f" Query 参数: (空)")
|
||||
print(f" nonce: {nonce}")
|
||||
print(f" timestamp: {timestamp}")
|
||||
print(f" 签名原文: {sign_raw}")
|
||||
print(f" 签名结果: {sign}")
|
||||
print()
|
||||
|
||||
# 带 Query 参数的签名示例(文档示例)
|
||||
query_str = "k1=v1&k2=v2"
|
||||
sign2, sign_raw2 = build_sign(query_str, body_json, nonce, timestamp)
|
||||
print(f" [带 Query] 签名原文: {sign_raw2}")
|
||||
print(f" [带 Query] 签名结果: {sign2}")
|
||||
print()
|
||||
|
||||
|
||||
async def test_create_order():
|
||||
"""测试 1: 创建安保工单"""
|
||||
print("=" * 60)
|
||||
print("测试 1: 创建安保工单")
|
||||
print("=" * 60)
|
||||
|
||||
body = {
|
||||
"title": "【一般】人员离岗告警 - A座大堂吧台",
|
||||
"areaId": 1317,
|
||||
"alarmId": f"TEST_ALARM_{int(time.time())}",
|
||||
"alarmType": "leave_post",
|
||||
"priority": 2,
|
||||
}
|
||||
body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":"))
|
||||
headers, sign_raw = build_headers(body_json)
|
||||
|
||||
url = f"{BASE_URL}/open-api/ops/security/order/create"
|
||||
print(f" URL: {url}")
|
||||
print(f" Body: {body_json}")
|
||||
print(f" Headers:")
|
||||
for k, v in headers.items():
|
||||
print(f" {k}: {v}")
|
||||
print(f" 签名原文: {sign_raw}")
|
||||
print()
|
||||
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
resp = await client.post(url, content=body_json, headers=headers)
|
||||
print(f" HTTP Status: {resp.status_code}")
|
||||
print(f" Response Body: {resp.text[:1000]}")
|
||||
|
||||
if resp.status_code == 200:
|
||||
try:
|
||||
data = resp.json()
|
||||
if data.get("code") == 0:
|
||||
order_id = str(data.get("data", ""))
|
||||
print(f"\n [成功] 工单已创建, orderId = {order_id}")
|
||||
return order_id
|
||||
else:
|
||||
print(f"\n [失败] API 返回错误: code={data.get('code')}, msg={data.get('msg')}")
|
||||
except Exception as e:
|
||||
print(f"\n [失败] 解析响应失败: {e}")
|
||||
else:
|
||||
print(f"\n [失败] HTTP {resp.status_code}")
|
||||
return None
|
||||
|
||||
|
||||
async def test_create_order_full():
|
||||
"""测试 1b: 创建工单(含全部可选参数)"""
|
||||
print()
|
||||
print("=" * 60)
|
||||
print("测试 1b: 创建工单(全部参数)")
|
||||
print("=" * 60)
|
||||
|
||||
body = {
|
||||
"title": "【严重】入侵告警 - C座消控室",
|
||||
"description": "摄像头检测到异常人员入侵",
|
||||
"priority": 0,
|
||||
"areaId": 1318,
|
||||
"location": "C座1层消控室门口",
|
||||
"alarmId": f"TEST_ALARM_{int(time.time())}",
|
||||
"alarmType": "intrusion",
|
||||
"cameraId": "cam_172_16_8_37_fa4c",
|
||||
"imageUrl": "https://example.com/alarm/snapshot.jpg",
|
||||
}
|
||||
body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":"))
|
||||
headers, sign_raw = build_headers(body_json)
|
||||
|
||||
url = f"{BASE_URL}/open-api/ops/security/order/create"
|
||||
print(f" URL: {url}")
|
||||
print(f" Body: {body_json}")
|
||||
print()
|
||||
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
resp = await client.post(url, content=body_json, headers=headers)
|
||||
print(f" HTTP Status: {resp.status_code}")
|
||||
print(f" Response Body: {resp.text[:1000]}")
|
||||
|
||||
if resp.status_code == 200:
|
||||
try:
|
||||
data = resp.json()
|
||||
if data.get("code") == 0:
|
||||
order_id = str(data.get("data", ""))
|
||||
print(f"\n [成功] 工单已创建, orderId = {order_id}")
|
||||
return order_id
|
||||
else:
|
||||
print(f"\n [失败] code={data.get('code')}, msg={data.get('msg')}")
|
||||
except Exception as e:
|
||||
print(f"\n [失败] {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def test_auto_complete(order_id: str):
|
||||
"""测试 2: 自动完单"""
|
||||
print()
|
||||
print("=" * 60)
|
||||
print(f"测试 2: 自动完单 (orderId={order_id})")
|
||||
print("=" * 60)
|
||||
|
||||
body = {
|
||||
"orderId": int(order_id),
|
||||
"remark": "告警自动解除 - 人员回岗",
|
||||
}
|
||||
body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":"))
|
||||
headers, sign_raw = build_headers(body_json)
|
||||
|
||||
url = f"{BASE_URL}/open-api/ops/security/order/auto-complete"
|
||||
print(f" URL: {url}")
|
||||
print(f" Body: {body_json}")
|
||||
print(f" 签名原文: {sign_raw}")
|
||||
print()
|
||||
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
resp = await client.post(url, content=body_json, headers=headers)
|
||||
print(f" HTTP Status: {resp.status_code}")
|
||||
print(f" Response Body: {resp.text[:1000]}")
|
||||
|
||||
if resp.status_code == 200:
|
||||
try:
|
||||
data = resp.json()
|
||||
if data.get("code") == 0:
|
||||
print(f"\n [成功] 工单已自动完单")
|
||||
else:
|
||||
print(f"\n [失败] code={data.get('code')}, msg={data.get('msg')}")
|
||||
except Exception as e:
|
||||
print(f"\n [失败] {e}")
|
||||
else:
|
||||
print(f"\n [失败] HTTP {resp.status_code}")
|
||||
|
||||
|
||||
async def test_curl_equivalent():
|
||||
"""测试 3: 生成等效 curl 命令(便于手动调试)"""
|
||||
print()
|
||||
print("=" * 60)
|
||||
print("测试 3: 生成等效 curl 命令")
|
||||
print("=" * 60)
|
||||
|
||||
body = {"title": "测试告警", "areaId": 1309, "alarmId": "TEST001"}
|
||||
body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":"))
|
||||
headers, _ = build_headers(body_json)
|
||||
|
||||
url = f"{BASE_URL}/open-api/ops/security/order/create"
|
||||
curl_parts = [f'curl -X POST "{url}"']
|
||||
for k, v in headers.items():
|
||||
curl_parts.append(f' -H "{k}: {v}"')
|
||||
curl_parts.append(f" -d '{body_json}'")
|
||||
|
||||
print()
|
||||
print(" \\\n".join(curl_parts))
|
||||
print()
|
||||
|
||||
|
||||
async def main():
|
||||
print(f"工单系统地址: {BASE_URL}")
|
||||
print(f"appId: {APP_ID}")
|
||||
print(f"tenant-id: {TENANT_ID}")
|
||||
print()
|
||||
|
||||
# 0. 签名验证
|
||||
await test_signature()
|
||||
|
||||
# 1. 创建工单(最简参数)
|
||||
order_id = await test_create_order()
|
||||
|
||||
# 1b. 创建工单(全部参数)
|
||||
order_id_full = await test_create_order_full()
|
||||
|
||||
# 2. 自动完单(仅在创建成功时测试)
|
||||
target_order = order_id or order_id_full
|
||||
if target_order:
|
||||
input("\n按回车继续测试自动完单...")
|
||||
await test_auto_complete(target_order)
|
||||
else:
|
||||
print("\n[跳过] 创建工单均失败,无法测试自动完单")
|
||||
|
||||
# 3. 生成 curl 命令
|
||||
await test_curl_equivalent()
|
||||
|
||||
print()
|
||||
print("=" * 60)
|
||||
print("测试完成")
|
||||
print("=" * 60)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user