From 840bb0e720fbc54a757286de0dd1016b491c2ffa Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Mon, 9 Mar 2026 12:31:55 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E4=BC=81=E5=BE=AE?= =?UTF-8?q?=E7=BE=A4=E6=9C=BA=E5=99=A8=E4=BA=BAWebhook=E6=8E=A8=E9=80=81?= =?UTF-8?q?=EF=BC=88=E6=97=A0=E9=9C=80IP=E7=99=BD=E5=90=8D=E5=8D=95?= =?UTF-8?q?=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - wechat_service 新增 send_webhook_alarm 方法(markdown格式) - notify_dispatch 优先使用Webhook,失败降级到应用消息 - config 新增 WECHAT_WEBHOOK_URL 配置项 Co-Authored-By: Claude Opus 4.6 --- app/config.py | 2 ++ app/services/notify_dispatch.py | 55 ++++++++++++++++++++++----------- app/services/wechat_service.py | 47 ++++++++++++++++++++++++++-- 3 files changed, 84 insertions(+), 20 deletions(-) diff --git a/app/config.py b/app/config.py index 9ace39c..b59d3df 100644 --- a/app/config.py +++ b/app/config.py @@ -62,6 +62,7 @@ class WeChatConfig: enabled: bool = False test_uids: str = "" # 演示模式:逗号分隔的企微userid,如 "zhangsan,lisi" service_base_url: str = "" # H5页面公网地址,如 https://vsp.viewshanghai.com + webhook_url: str = "" # 群机器人Webhook地址 @dataclass @@ -170,6 +171,7 @@ def load_settings() -> Settings: enabled=os.getenv("WECHAT_ENABLED", "false").lower() == "true", test_uids=os.getenv("WECHAT_TEST_UIDS", ""), service_base_url=os.getenv("SERVICE_BASE_URL", ""), + webhook_url=os.getenv("WECHAT_WEBHOOK_URL", ""), ), agent=AgentConfig( llm_api_key=os.getenv("DASHSCOPE_API_KEY", ""), diff --git a/app/services/notify_dispatch.py b/app/services/notify_dispatch.py index e17d068..9972307 100644 --- a/app/services/notify_dispatch.py +++ b/app/services/notify_dispatch.py @@ -81,30 +81,49 @@ async def process_alarm_notification(alarm_data: Dict): # ========== 3. 推送企微通知 ========== wechat_service = get_wechat_service() - if not wechat_service.enabled: - logger.info("企微未启用,跳过推送") - return - - user_ids = [p["wechat_uid"] for p in persons] + service_base_url = settings.wechat.service_base_url or f"http://{settings.app.host}:{settings.app.port}" event_time_str = ( event_time.strftime("%Y-%m-%d %H:%M:%S") if isinstance(event_time, datetime) else str(event_time or "") ) + detail_url = f"{service_base_url}/static/alarm_detail.html?alarm_id={alarm_id}" - await wechat_service.send_alarm_card( - user_ids=user_ids, - alarm_id=alarm_id, - alarm_type=alarm_type, - area_name=area_name, - camera_name=camera_name, - description=description, - snapshot_url=snapshot_url, - event_time=event_time_str, - alarm_level=alarm_level, - service_base_url=settings.wechat.service_base_url or f"http://{settings.app.host}:{settings.app.port}", - ) + sent = False - logger.info(f"告警通知完成: {alarm_id} → {len(persons)} 人") + # 优先 Webhook(无需IP白名单) + if settings.wechat.webhook_url: + sent = await wechat_service.send_webhook_alarm( + webhook_url=settings.wechat.webhook_url, + alarm_id=alarm_id, + alarm_type=alarm_type, + area_name=area_name, + camera_name=camera_name, + description=description, + event_time=event_time_str, + alarm_level=alarm_level, + detail_url=detail_url, + ) + + # Webhook 未配置或失败时,降级到应用消息 + if not sent and wechat_service.enabled: + user_ids = [p["wechat_uid"] for p in persons] + sent = await wechat_service.send_alarm_card( + user_ids=user_ids, + alarm_id=alarm_id, + alarm_type=alarm_type, + area_name=area_name, + camera_name=camera_name, + description=description, + snapshot_url=snapshot_url, + event_time=event_time_str, + alarm_level=alarm_level, + service_base_url=service_base_url, + ) + + if not sent: + logger.warning(f"告警通知未发送: {alarm_id}, webhook和应用消息均未成功") + else: + logger.info(f"告警通知完成: {alarm_id}") except Exception as e: logger.error(f"告警通知处理失败: {alarm_id}, error={e}", exc_info=True) diff --git a/app/services/wechat_service.py b/app/services/wechat_service.py index f408fd2..fb58e7a 100644 --- a/app/services/wechat_service.py +++ b/app/services/wechat_service.py @@ -6,11 +6,10 @@ V1 使用应用消息 + 文本卡片,后期扩展为模板卡片。 """ import httpx -import logging import time from typing import Optional, List -logger = logging.getLogger(__name__) +from app.utils.logger import logger class WeChatService: @@ -177,6 +176,50 @@ class WeChatService: logger.error(f"发送文本消息异常: {e}") return False + async def send_webhook_alarm( + self, + webhook_url: str, + alarm_id: str, + alarm_type: str, + area_name: str, + camera_name: str, + description: str, + event_time: str, + alarm_level: int = 2, + detail_url: str = "", + ) -> bool: + """通过群机器人 Webhook 发送告警通知(无需IP白名单)""" + type_names = {"leave_post": "人员离岗", "intrusion": "周界入侵"} + level_names = {1: "提醒", 2: "一般", 3: "严重", 4: "紧急"} + + type_name = type_names.get(alarm_type, alarm_type) + level_name = level_names.get(alarm_level, "一般") + + content = ( + f"## 【{level_name}】{type_name}告警\n" + f"> 区域:{area_name}\n" + f"> 摄像头:{camera_name}\n" + f"> 时间:{event_time}\n" + f"> AI描述:**{description}**\n" + ) + if detail_url: + content += f"> [查看详情]({detail_url})\n" + + msg = {"msgtype": "markdown", "markdown": {"content": content}} + + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post(webhook_url, json=msg) + data = resp.json() + if data.get("errcode") != 0: + logger.error(f"Webhook发送失败: {data}") + return False + logger.info(f"Webhook告警已发送: alarm={alarm_id}") + return True + except Exception as e: + logger.error(f"Webhook发送异常: {e}") + return False + # 全局单例 _wechat_service: Optional[WeChatService] = None