diff --git a/app/models.py b/app/models.py index 93a833a..be0af02 100644 --- a/app/models.py +++ b/app/models.py @@ -271,13 +271,14 @@ class AlarmEvent(Base): first_frame_time = Column(DateTime, comment="首帧时间") last_frame_time = Column(DateTime, comment="末帧时间") duration_ms = Column(Integer, comment="持续时长(毫秒)") - alarm_level = Column(SmallInteger, comment="告警级别: 1提醒 2一般 3严重 4紧急") + alarm_level = Column(SmallInteger, comment="告警级别: 0紧急 1重要 2普通 3轻微") confidence_score = Column(Float, comment="置信度 0-1") alarm_status = Column(String(20), default="NEW", comment="告警状态: NEW/CONFIRMED/FALSE/CLOSED") handle_status = Column(String(20), default="UNHANDLED", comment="处理状态: UNHANDLED/HANDLING/DONE") snapshot_url = Column(String(512), comment="截图URL") video_url = Column(String(512), comment="视频URL") edge_node_id = Column(String(64), comment="边缘节点ID") + area_id = Column(BigInteger, comment="所属区域ID") handler = Column(String(64), comment="处理人") handle_remark = Column(Text, comment="处理备注") handled_at = Column(DateTime, comment="处理时间") @@ -308,6 +309,7 @@ class AlarmEvent(Base): "snapshot_url": self.snapshot_url, "video_url": self.video_url, "edge_node_id": self.edge_node_id, + "area_id": self.area_id, "handler": self.handler, "handle_remark": self.handle_remark, "handled_at": self.handled_at.strftime('%Y-%m-%d %H:%M:%S') if self.handled_at else None, diff --git a/app/routers/edge_compat.py b/app/routers/edge_compat.py index 9743684..e7fd5a7 100644 --- a/app/routers/edge_compat.py +++ b/app/routers/edge_compat.py @@ -8,19 +8,21 @@ Edge 设备使用 /api/ai/alert/edge/report 和 /api/ai/alert/edge/resolve 路 import asyncio from datetime import datetime -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, Request from typing import Optional from app.yudao_compat import YudaoResponse from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService from app.services.notification_service import get_notification_service from app.schemas import EdgeAlarmReport, EdgeAlarmResolve +from app.models import EdgeDevice, DeviceStatus, get_session from app.utils.logger import logger +from app.utils.timezone import beijing_now -router = APIRouter(prefix="/api/ai/alert", tags=["Edge-兼容路由"]) +router = APIRouter(prefix="/api/ai", tags=["Edge-兼容路由"]) -@router.post("/edge/report") +@router.post("/alert/edge/report") async def edge_alarm_report( report: EdgeAlarmReport, service: AlarmEventService = Depends(get_alarm_event_service), @@ -65,7 +67,7 @@ async def edge_alarm_report( }) -@router.post("/edge/resolve") +@router.post("/alert/edge/resolve") async def edge_alarm_resolve( resolve: EdgeAlarmResolve, service: AlarmEventService = Depends(get_alarm_event_service), @@ -96,6 +98,64 @@ async def edge_alarm_resolve( return YudaoResponse.success(True) +@router.post("/device/heartbeat") +async def edge_device_heartbeat(request: Request): + """ + Edge 设备心跳上报(无认证) + + 每 30 秒由 Edge 推理服务调用,更新设备在线状态和运行指标。 + """ + data = await request.json() + device_id = data.get("device_id", "") + status_data = data.get("status", {}) + + if not device_id: + return YudaoResponse.error(400, "缺少 device_id") + + db = get_session() + try: + device = db.query(EdgeDevice).filter(EdgeDevice.device_id == device_id).first() + now = beijing_now() + + if not device: + device = EdgeDevice( + device_id=device_id, + device_name=device_id, + status=DeviceStatus.ONLINE, + last_heartbeat=now, + created_at=now, + ) + db.add(device) + + device.status = DeviceStatus.ONLINE + device.last_heartbeat = now + device.uptime_seconds = status_data.get("uptime_seconds") + device.frames_processed = status_data.get("frames_processed") + device.alerts_generated = status_data.get("alerts_generated") + device.stream_count = status_data.get("stream_count") + device.config_version = str(status_data.get("config_version", "")) + device.extra_info = status_data.get("stream_stats") + device.updated_at = now + + # 从请求头获取 IP + client_ip = request.headers.get("x-forwarded-for", "").split(",")[0].strip() + if not client_ip: + client_ip = request.client.host if request.client else None + if client_ip: + device.ip_address = client_ip + + db.commit() + logger.debug(f"[心跳] 设备 {device_id} 心跳更新成功") + + return YudaoResponse.success(True) + except Exception as e: + db.rollback() + logger.error(f"[心跳] 处理失败: {e}") + return YudaoResponse.error(500, f"心跳处理失败: {e}") + finally: + db.close() + + async def _resolve_card_update(alarm_id: str, resolve_type: str): """边缘端 resolve 后异步处理:更新卡片(工单暂未上线)""" try: diff --git a/app/services/alarm_event_service.py b/app/services/alarm_event_service.py index 6e8cfba..6d684a9 100644 --- a/app/services/alarm_event_service.py +++ b/app/services/alarm_event_service.py @@ -21,41 +21,52 @@ def generate_alarm_id() -> str: return f"ALM{timestamp}{unique_id}" -def _determine_alarm_level(alarm_type: str, confidence: float, duration_ms: Optional[int] = None) -> int: +def _determine_alarm_level( + alarm_type: str, + confidence: float, + duration_ms: Optional[int] = None, + initial_level: Optional[int] = None, +) -> int: """ 根据告警类型、置信度和持续时长确定告警级别 - 返回: 1提醒 2一般 3严重 4紧急 + 返回: 0紧急 1重要 2普通 3轻微 + + 三层决策: + 1. 初始等级:Edge 上报的 alarm_level(来自 ROI 算法绑定配置) + 2. 时长升级:持续型告警随时长升级(只升不降) + 3. 无配置时使用算法默认等级 """ + # 算法默认等级 + default_levels = { + "intrusion": 1, # 重要 + "leave_post": 2, # 普通 + "illegal_parking": 1, # 重要 + "vehicle_congestion": 2, # 普通 + } + base_level = initial_level if initial_level is not None else default_levels.get(alarm_type, 2) + + # 入侵检测:事件型,不升级 if alarm_type == "intrusion": - return 3 # 严重 - elif alarm_type == "leave_post": - # 告警触发时 duration_ms 为 None,设置为一般级别 - if duration_ms is None: - return 2 # 一般级别(刚触发,持续时长未知) + return base_level - # 根据持续时长判断级别 - if duration_ms > 30 * 60 * 1000: - return 3 # 严重 - elif duration_ms > 10 * 60 * 1000: - return 2 # 一般 - return 1 # 提醒 - elif alarm_type == "illegal_parking": - # 违停:根据停留时长判断级别 - if duration_ms is None: - return 2 # 一般级别(刚触发) - if duration_ms > 60 * 60 * 1000: - return 3 # 严重(超过1小时) - elif duration_ms > 15 * 60 * 1000: - return 2 # 一般(超过15分钟) - return 1 # 提醒 - elif alarm_type == "vehicle_congestion": - return 2 # 一般级别(拥堵本身不分等级,由持续时长在resolve时重算) - elif confidence and confidence > 0.9: - return 3 # 严重 - elif confidence and confidence > 0.7: - return 2 # 一般 + # 持续型告警:根据时长升级(只升不降,即取较小值) + if duration_ms is not None: + escalated = base_level + if alarm_type == "leave_post": + if duration_ms > 60 * 60 * 1000: # >1h → 紧急 + escalated = 0 + elif duration_ms > 30 * 60 * 1000: # >30min → 重要 + escalated = 1 + elif alarm_type == "illegal_parking": + if duration_ms > 60 * 60 * 1000: # >1h → 紧急 + escalated = 0 + elif alarm_type == "vehicle_congestion": + if duration_ms > 30 * 60 * 1000: # >30min → 重要 + escalated = 1 + # 只升不降 + return min(base_level, escalated) - return 2 # 默认一般 + return base_level class AlarmEventService: @@ -196,9 +207,14 @@ class AlarmEventService: alarm_level = data.get("alarm_level") ext_data = data.get("ext_data") or {} if alarm_level is None: - # 从 ext_data 取 duration_ms + # Edge 未提供等级,使用算法默认 + 时长升级 duration_ms = ext_data.get("duration_ms") alarm_level = _determine_alarm_level(alarm_type, confidence or 0, duration_ms) + else: + # Edge 提供了初始等级,检查是否需要时长升级 + duration_ms = ext_data.get("duration_ms") + if duration_ms is not None: + alarm_level = _determine_alarm_level(alarm_type, confidence or 0, duration_ms, initial_level=alarm_level) # 解析 first_frame_time(离岗开始时间,由 Edge 在 ext_data 中传递) first_frame_time = None @@ -224,6 +240,7 @@ class AlarmEventService: handle_status="UNHANDLED", snapshot_url=data.get("snapshot_url"), edge_node_id=ext_data.get("edge_node_id"), + area_id=ext_data.get("area_id"), ) db.add(alarm) @@ -582,6 +599,15 @@ class AlarmEventService: alarm.duration_ms = duration_ms + # 根据持续时长重新计算告警等级(只升不降) + escalated_level = _determine_alarm_level( + alarm.alarm_type, alarm.confidence_score or 0, + duration_ms, initial_level=alarm.alarm_level, + ) + if escalated_level < alarm.alarm_level: + logger.info(f"告警等级升级: {alarm_id}, {alarm.alarm_level} → {escalated_level}") + alarm.alarm_level = escalated_level + # 解析 last_frame_time,去除微秒保持格式一致 try: parsed_time = datetime.fromisoformat(last_frame_time.replace("Z", "+00:00")) diff --git a/app/services/notify_dispatch.py b/app/services/notify_dispatch.py index 3e9cbb0..6490147 100644 --- a/app/services/notify_dispatch.py +++ b/app/services/notify_dispatch.py @@ -146,17 +146,27 @@ async def process_alarm_notification(alarm_data: Dict): # ---- 4. 创建安保工单(暂未上线,待本地测试通过后启用) ---- # wo_client = get_work_order_client() # if wo_client.enabled: - # type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type) - # level_name = ALARM_LEVEL_NAMES.get(alarm_level, "一般") - # wo_title = f"【{level_name}】{type_name}告警 - {area_name}" - # order_id = await wo_client.create_order( - # title=wo_title, - # area_id=area_id_int, - # alarm_id=alarm_id, - # alarm_type=alarm_type, - # ) - # if order_id: - # _save_order_id(alarm_id, order_id) + # wo_area_id = _get_alarm_area_id(alarm_id) or area_id_int + # if wo_area_id: + # type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type) + # wo_title = f"{type_name}告警" + # trigger_source = _get_trigger_source(alarm_id) + # permanent_url = _get_permanent_url(snapshot_url) + # order_id = await wo_client.create_order( + # title=wo_title, + # area_id=wo_area_id, + # alarm_id=alarm_id, + # alarm_type=type_name, + # description=description, + # priority=alarm_level, + # trigger_source=trigger_source, + # camera_id=device_id, + # image_url=permanent_url, + # ) + # if order_id: + # _save_order_id(alarm_id, order_id) + # else: + # logger.warning(f"告警无 area_id,跳过工单创建: {alarm_id}") except Exception as e: logger.error(f"告警通知处理失败: {alarm_id}, error={e}", exc_info=True) @@ -253,7 +263,7 @@ def _get_notify_persons(camera_id: str, alarm_level: int) -> tuple: persons = db.query(AreaPersonBinding).filter( AreaPersonBinding.area_id == binding.area_id, AreaPersonBinding.enabled == 1, - AreaPersonBinding.notify_level <= alarm_level, + AreaPersonBinding.notify_level >= alarm_level, ).all() result = [ @@ -274,6 +284,54 @@ def _get_notify_persons(camera_id: str, alarm_level: int) -> tuple: db.close() +def _get_alarm_area_id(alarm_id: str) -> int: + """从 alarm_event 表获取 area_id""" + db = get_session() + try: + alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first() + return alarm.area_id if alarm and alarm.area_id else 0 + except Exception: + return 0 + finally: + db.close() + + +def _get_trigger_source(alarm_id: str) -> str: + """从 alarm_event_ext 的 ext_type 判断告警来源""" + db = get_session() + try: + ext = db.query(AlarmEventExt).filter( + AlarmEventExt.alarm_id == alarm_id, + AlarmEventExt.ext_type.in_(["EDGE_HTTP", "EDGE", "POST", "MANUAL"]), + ).first() + if ext: + return { + "EDGE_HTTP": "自动上报", + "EDGE": "自动上报", + "POST": "人工上报", + "MANUAL": "人工上报", + }.get(ext.ext_type, "自动上报") + return "自动上报" + except Exception: + return "自动上报" + finally: + db.close() + + +def _get_permanent_url(snapshot_url: str) -> str: + """将 COS object key 转为永久访问 URL""" + if not snapshot_url: + return "" + if snapshot_url.startswith("http"): + return snapshot_url + try: + from app.services.oss_storage import get_oss_storage + return get_oss_storage().get_permanent_url(snapshot_url) + except Exception as e: + logger.warning(f"生成永久URL失败: {e}") + return "" + + def _save_order_id(alarm_id: str, order_id: str): """将工单ID保存到 alarm_event_ext(ext_type=WORK_ORDER)""" db = get_session() diff --git a/app/services/oss_storage.py b/app/services/oss_storage.py index b3d02e3..052f34c 100644 --- a/app/services/oss_storage.py +++ b/app/services/oss_storage.py @@ -226,6 +226,19 @@ class COSStorage: except Exception as e: raise CosStorageError(f"COS 文件上传失败: {e}") + # ======================== 永久 URL ======================== + + def get_permanent_url(self, object_key: str) -> str: + """获取永久访问 URL + + 格式: https://{bucket}.cos.{region}.myqcloud.com/{key} + """ + if not object_key: + return "" + if object_key.startswith("http"): + return object_key + return f"https://{settings.cos.bucket}.cos.{settings.cos.region}.myqcloud.com/{object_key}" + # ======================== 下载(预签名 URL) ======================== def get_presigned_url(self, object_key: str, expire: Optional[int] = None) -> str: diff --git a/app/services/work_order_client.py b/app/services/work_order_client.py index f8c4a68..971a9b0 100644 --- a/app/services/work_order_client.py +++ b/app/services/work_order_client.py @@ -80,10 +80,26 @@ class WorkOrderClient: area_id: int, alarm_id: str, alarm_type: str, + description: str = "", + priority: Optional[int] = None, + trigger_source: str = "自动上报", + camera_id: str = "", + image_url: str = "", ) -> Optional[str]: """ 创建安保工单 + Args: + title: 工单标题(中文告警类型,如"人员离岗告警") + area_id: 区域ID + alarm_id: 告警ID + alarm_type: 告警类型编码(leave_post/intrusion等) + description: VLM 大模型描述 + priority: 优先级(0紧急 1重要 2普通 3轻微) + trigger_source: 来源(自动上报/人工上报) + camera_id: 摄像头ID + image_url: 截图URL + Returns: orderId 字符串,失败返回 None """ @@ -97,6 +113,17 @@ class WorkOrderClient: "alarmId": alarm_id, "alarmType": alarm_type, } + if description: + body["description"] = description + if priority is not None: + body["priority"] = priority + if trigger_source: + body["triggerSource"] = trigger_source + if camera_id: + body["cameraId"] = camera_id + if image_url: + body["imageUrl"] = image_url + body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":")) try: diff --git a/test_work_order.py b/test_work_order.py new file mode 100644 index 0000000..fe29dd1 --- /dev/null +++ b/test_work_order.py @@ -0,0 +1,275 @@ +""" +安保工单开放接口测试脚本 + +基于最新接入文档(v1.0.0 / 2026-03-10),测试内容: +1. SHA256 签名生成验证 +2. 创建工单 API(含 tenant-id 请求头) +3. 自动完单 API + +签名公式:SHA256(Query参数排序后 + Body原始JSON + Header参数 + appSecret) +- Query 参数: 按 key 字母升序排序,无参数时为空串 +- Header 参数: 固定顺序 appId=&nonce=×tamp= + +使用方法: + python test_work_order.py +""" + +import asyncio +import hashlib +import json +import time +import uuid + +import httpx + + +# ===== 配置(使用实际凭证) ===== +BASE_URL = "http://192.168.0.104:48080" +APP_ID = "alarm-system" +APP_SECRET = "tQ3v5q1z2ZLu7hrU1yseaHwg1wJUcmF1" +TENANT_ID = "1" + + +def build_sign(query_str: str, body_json: str, nonce: str, timestamp: str) -> str: + """ + 构建签名 + + 签名字符串 = Query参数(排序后) + Body原始JSON + Header参数 + appSecret + Header参数 = appId={appId}&nonce={nonce}×tamp={timestamp} + """ + header_str = f"appId={APP_ID}&nonce={nonce}×tamp={timestamp}" + sign_string = f"{query_str}{body_json}{header_str}{APP_SECRET}" + return hashlib.sha256(sign_string.encode("utf-8")).hexdigest(), sign_string + + +def build_headers(body_json: str, query_str: str = "") -> dict: + """构造完整请求头(含 tenant-id + 签名)""" + nonce = uuid.uuid4().hex[:16] + timestamp = str(int(time.time() * 1000)) + sign, sign_raw = build_sign(query_str, body_json, nonce, timestamp) + + headers = { + "Content-Type": "application/json", + "tenant-id": TENANT_ID, + "appId": APP_ID, + "timestamp": timestamp, + "nonce": nonce, + "sign": sign, + } + return headers, sign_raw + + +async def test_signature(): + """测试 0: 签名生成验证(用固定参数便于手动对比)""" + print("=" * 60) + print("测试 0: 签名生成验证") + print("=" * 60) + + body = {"title": "A栋3层入侵告警", "areaId": 1309, "alarmId": "ALM001", "alarmType": "intrusion"} + body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":")) + nonce = "a1b2c3d4e5f6g7h8" + timestamp = "1704357025000" + + sign, sign_raw = build_sign("", body_json, nonce, timestamp) + + print(f" Body JSON: {body_json}") + print(f" Query 参数: (空)") + print(f" nonce: {nonce}") + print(f" timestamp: {timestamp}") + print(f" 签名原文: {sign_raw}") + print(f" 签名结果: {sign}") + print() + + # 带 Query 参数的签名示例(文档示例) + query_str = "k1=v1&k2=v2" + sign2, sign_raw2 = build_sign(query_str, body_json, nonce, timestamp) + print(f" [带 Query] 签名原文: {sign_raw2}") + print(f" [带 Query] 签名结果: {sign2}") + print() + + +async def test_create_order(): + """测试 1: 创建安保工单""" + print("=" * 60) + print("测试 1: 创建安保工单") + print("=" * 60) + + body = { + "title": "【一般】人员离岗告警 - A座大堂吧台", + "areaId": 1317, + "alarmId": f"TEST_ALARM_{int(time.time())}", + "alarmType": "leave_post", + "priority": 2, + } + body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":")) + headers, sign_raw = build_headers(body_json) + + url = f"{BASE_URL}/open-api/ops/security/order/create" + print(f" URL: {url}") + print(f" Body: {body_json}") + print(f" Headers:") + for k, v in headers.items(): + print(f" {k}: {v}") + print(f" 签名原文: {sign_raw}") + print() + + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post(url, content=body_json, headers=headers) + print(f" HTTP Status: {resp.status_code}") + print(f" Response Body: {resp.text[:1000]}") + + if resp.status_code == 200: + try: + data = resp.json() + if data.get("code") == 0: + order_id = str(data.get("data", "")) + print(f"\n [成功] 工单已创建, orderId = {order_id}") + return order_id + else: + print(f"\n [失败] API 返回错误: code={data.get('code')}, msg={data.get('msg')}") + except Exception as e: + print(f"\n [失败] 解析响应失败: {e}") + else: + print(f"\n [失败] HTTP {resp.status_code}") + return None + + +async def test_create_order_full(): + """测试 1b: 创建工单(含全部可选参数)""" + print() + print("=" * 60) + print("测试 1b: 创建工单(全部参数)") + print("=" * 60) + + body = { + "title": "【严重】入侵告警 - C座消控室", + "description": "摄像头检测到异常人员入侵", + "priority": 0, + "areaId": 1318, + "location": "C座1层消控室门口", + "alarmId": f"TEST_ALARM_{int(time.time())}", + "alarmType": "intrusion", + "cameraId": "cam_172_16_8_37_fa4c", + "imageUrl": "https://example.com/alarm/snapshot.jpg", + } + body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":")) + headers, sign_raw = build_headers(body_json) + + url = f"{BASE_URL}/open-api/ops/security/order/create" + print(f" URL: {url}") + print(f" Body: {body_json}") + print() + + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post(url, content=body_json, headers=headers) + print(f" HTTP Status: {resp.status_code}") + print(f" Response Body: {resp.text[:1000]}") + + if resp.status_code == 200: + try: + data = resp.json() + if data.get("code") == 0: + order_id = str(data.get("data", "")) + print(f"\n [成功] 工单已创建, orderId = {order_id}") + return order_id + else: + print(f"\n [失败] code={data.get('code')}, msg={data.get('msg')}") + except Exception as e: + print(f"\n [失败] {e}") + return None + + +async def test_auto_complete(order_id: str): + """测试 2: 自动完单""" + print() + print("=" * 60) + print(f"测试 2: 自动完单 (orderId={order_id})") + print("=" * 60) + + body = { + "orderId": int(order_id), + "remark": "告警自动解除 - 人员回岗", + } + body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":")) + headers, sign_raw = build_headers(body_json) + + url = f"{BASE_URL}/open-api/ops/security/order/auto-complete" + print(f" URL: {url}") + print(f" Body: {body_json}") + print(f" 签名原文: {sign_raw}") + print() + + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post(url, content=body_json, headers=headers) + print(f" HTTP Status: {resp.status_code}") + print(f" Response Body: {resp.text[:1000]}") + + if resp.status_code == 200: + try: + data = resp.json() + if data.get("code") == 0: + print(f"\n [成功] 工单已自动完单") + else: + print(f"\n [失败] code={data.get('code')}, msg={data.get('msg')}") + except Exception as e: + print(f"\n [失败] {e}") + else: + print(f"\n [失败] HTTP {resp.status_code}") + + +async def test_curl_equivalent(): + """测试 3: 生成等效 curl 命令(便于手动调试)""" + print() + print("=" * 60) + print("测试 3: 生成等效 curl 命令") + print("=" * 60) + + body = {"title": "测试告警", "areaId": 1309, "alarmId": "TEST001"} + body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":")) + headers, _ = build_headers(body_json) + + url = f"{BASE_URL}/open-api/ops/security/order/create" + curl_parts = [f'curl -X POST "{url}"'] + for k, v in headers.items(): + curl_parts.append(f' -H "{k}: {v}"') + curl_parts.append(f" -d '{body_json}'") + + print() + print(" \\\n".join(curl_parts)) + print() + + +async def main(): + print(f"工单系统地址: {BASE_URL}") + print(f"appId: {APP_ID}") + print(f"tenant-id: {TENANT_ID}") + print() + + # 0. 签名验证 + await test_signature() + + # 1. 创建工单(最简参数) + order_id = await test_create_order() + + # 1b. 创建工单(全部参数) + order_id_full = await test_create_order_full() + + # 2. 自动完单(仅在创建成功时测试) + target_order = order_id or order_id_full + if target_order: + input("\n按回车继续测试自动完单...") + await test_auto_complete(target_order) + else: + print("\n[跳过] 创建工单均失败,无法测试自动完单") + + # 3. 生成 curl 命令 + await test_curl_equivalent() + + print() + print("=" * 60) + print("测试完成") + print("=" * 60) + + +if __name__ == "__main__": + asyncio.run(main())