""" 安保工单开放接口测试脚本 基于最新接入文档(v1.0.0 / 2026-03-10),测试内容: 1. SHA256 签名生成验证 2. 创建工单 API(含 tenant-id 请求头) 3. 自动完单 API 签名公式:SHA256(Query参数排序后 + Body原始JSON + Header参数 + appSecret) - Query 参数: 按 key 字母升序排序,无参数时为空串 - Header 参数: 固定顺序 appId=&nonce=×tamp= 使用方法: python test_work_order.py """ import asyncio import hashlib import json import time import uuid import httpx # ===== 配置(使用实际凭证) ===== BASE_URL = "http://192.168.0.104:48080" APP_ID = "alarm-system" APP_SECRET = "tQ3v5q1z2ZLu7hrU1yseaHwg1wJUcmF1" TENANT_ID = "1" def build_sign(query_str: str, body_json: str, nonce: str, timestamp: str) -> str: """ 构建签名 签名字符串 = Query参数(排序后) + Body原始JSON + Header参数 + appSecret Header参数 = appId={appId}&nonce={nonce}×tamp={timestamp} """ header_str = f"appId={APP_ID}&nonce={nonce}×tamp={timestamp}" sign_string = f"{query_str}{body_json}{header_str}{APP_SECRET}" return hashlib.sha256(sign_string.encode("utf-8")).hexdigest(), sign_string def build_headers(body_json: str, query_str: str = "") -> dict: """构造完整请求头(含 tenant-id + 签名)""" nonce = uuid.uuid4().hex[:16] timestamp = str(int(time.time() * 1000)) sign, sign_raw = build_sign(query_str, body_json, nonce, timestamp) headers = { "Content-Type": "application/json", "tenant-id": TENANT_ID, "appId": APP_ID, "timestamp": timestamp, "nonce": nonce, "sign": sign, } return headers, sign_raw async def test_signature(): """测试 0: 签名生成验证(用固定参数便于手动对比)""" print("=" * 60) print("测试 0: 签名生成验证") print("=" * 60) body = {"title": "A栋3层入侵告警", "areaId": 1309, "alarmId": "ALM001", "alarmType": "intrusion"} body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":")) nonce = "a1b2c3d4e5f6g7h8" timestamp = "1704357025000" sign, sign_raw = build_sign("", body_json, nonce, timestamp) print(f" Body JSON: {body_json}") print(f" Query 参数: (空)") print(f" nonce: {nonce}") print(f" timestamp: {timestamp}") print(f" 签名原文: {sign_raw}") print(f" 签名结果: {sign}") print() # 带 Query 参数的签名示例(文档示例) query_str = "k1=v1&k2=v2" sign2, sign_raw2 = build_sign(query_str, body_json, nonce, timestamp) print(f" [带 Query] 签名原文: {sign_raw2}") print(f" [带 Query] 签名结果: {sign2}") print() async def test_create_order(): """测试 1: 创建安保工单""" print("=" * 60) print("测试 1: 创建安保工单") print("=" * 60) body = { "title": "【一般】人员离岗告警 - A座大堂吧台", "areaId": 1317, "alarmId": f"TEST_ALARM_{int(time.time())}", "alarmType": "leave_post", "priority": 2, } body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":")) headers, sign_raw = build_headers(body_json) url = f"{BASE_URL}/open-api/ops/security/order/create" print(f" URL: {url}") print(f" Body: {body_json}") print(f" Headers:") for k, v in headers.items(): print(f" {k}: {v}") print(f" 签名原文: {sign_raw}") print() async with httpx.AsyncClient(timeout=10) as client: resp = await client.post(url, content=body_json, headers=headers) print(f" HTTP Status: {resp.status_code}") print(f" Response Body: {resp.text[:1000]}") if resp.status_code == 200: try: data = resp.json() if data.get("code") == 0: order_id = str(data.get("data", "")) print(f"\n [成功] 工单已创建, orderId = {order_id}") return order_id else: print(f"\n [失败] API 返回错误: code={data.get('code')}, msg={data.get('msg')}") except Exception as e: print(f"\n [失败] 解析响应失败: {e}") else: print(f"\n [失败] HTTP {resp.status_code}") return None async def test_create_order_full(): """测试 1b: 创建工单(含全部可选参数)""" print() print("=" * 60) print("测试 1b: 创建工单(全部参数)") print("=" * 60) body = { "title": "【严重】入侵告警 - C座消控室", "description": "摄像头检测到异常人员入侵", "priority": 0, "areaId": 1318, "location": "C座1层消控室门口", "alarmId": f"TEST_ALARM_{int(time.time())}", "alarmType": "intrusion", "cameraId": "cam_172_16_8_37_fa4c", "imageUrl": "https://example.com/alarm/snapshot.jpg", } body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":")) headers, sign_raw = build_headers(body_json) url = f"{BASE_URL}/open-api/ops/security/order/create" print(f" URL: {url}") print(f" Body: {body_json}") print() async with httpx.AsyncClient(timeout=10) as client: resp = await client.post(url, content=body_json, headers=headers) print(f" HTTP Status: {resp.status_code}") print(f" Response Body: {resp.text[:1000]}") if resp.status_code == 200: try: data = resp.json() if data.get("code") == 0: order_id = str(data.get("data", "")) print(f"\n [成功] 工单已创建, orderId = {order_id}") return order_id else: print(f"\n [失败] code={data.get('code')}, msg={data.get('msg')}") except Exception as e: print(f"\n [失败] {e}") return None async def test_auto_complete(order_id: str): """测试 2: 自动完单""" print() print("=" * 60) print(f"测试 2: 自动完单 (orderId={order_id})") print("=" * 60) body = { "orderId": int(order_id), "remark": "告警自动解除 - 人员回岗", } body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":")) headers, sign_raw = build_headers(body_json) url = f"{BASE_URL}/open-api/ops/security/order/auto-complete" print(f" URL: {url}") print(f" Body: {body_json}") print(f" 签名原文: {sign_raw}") print() async with httpx.AsyncClient(timeout=10) as client: resp = await client.post(url, content=body_json, headers=headers) print(f" HTTP Status: {resp.status_code}") print(f" Response Body: {resp.text[:1000]}") if resp.status_code == 200: try: data = resp.json() if data.get("code") == 0: print(f"\n [成功] 工单已自动完单") else: print(f"\n [失败] code={data.get('code')}, msg={data.get('msg')}") except Exception as e: print(f"\n [失败] {e}") else: print(f"\n [失败] HTTP {resp.status_code}") async def test_curl_equivalent(): """测试 3: 生成等效 curl 命令(便于手动调试)""" print() print("=" * 60) print("测试 3: 生成等效 curl 命令") print("=" * 60) body = {"title": "测试告警", "areaId": 1309, "alarmId": "TEST001"} body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":")) headers, _ = build_headers(body_json) url = f"{BASE_URL}/open-api/ops/security/order/create" curl_parts = [f'curl -X POST "{url}"'] for k, v in headers.items(): curl_parts.append(f' -H "{k}: {v}"') curl_parts.append(f" -d '{body_json}'") print() print(" \\\n".join(curl_parts)) print() async def main(): print(f"工单系统地址: {BASE_URL}") print(f"appId: {APP_ID}") print(f"tenant-id: {TENANT_ID}") print() # 0. 签名验证 await test_signature() # 1. 创建工单(最简参数) order_id = await test_create_order() # 1b. 创建工单(全部参数) order_id_full = await test_create_order_full() # 2. 自动完单(仅在创建成功时测试) target_order = order_id or order_id_full if target_order: input("\n按回车继续测试自动完单...") await test_auto_complete(target_order) else: print("\n[跳过] 创建工单均失败,无法测试自动完单") # 3. 生成 curl 命令 await test_curl_equivalent() print() print("=" * 60) print("测试完成") print("=" * 60) if __name__ == "__main__": asyncio.run(main())