"""
企微通知服务
封装企业微信 API,发送告警文本卡片。
V1 使用应用消息 + 文本卡片,后期扩展为模板卡片。
"""
import httpx
import time
from typing import Optional, List
from app.utils.logger import logger
class WeChatService:
"""企微通知服务(单例)"""
def __init__(self):
self._enabled = False
self._corp_id = ""
self._agent_id = ""
self._secret = ""
self._token = ""
self._encoding_aes_key = ""
self._access_token = ""
self._token_expire_at = 0
def init(self, config):
"""初始化企微配置"""
self._enabled = config.enabled and bool(config.corp_id) and bool(config.secret)
self._corp_id = config.corp_id
self._agent_id = config.agent_id
self._secret = config.secret
self._token = config.token
self._encoding_aes_key = config.encoding_aes_key
if self._enabled:
logger.info(f"企微通知服务已启用: corp_id={self._corp_id}")
else:
logger.info("企微通知服务未启用")
@property
def enabled(self) -> bool:
return self._enabled
async def _get_access_token(self) -> str:
"""获取企微 access_token(带缓存)"""
if self._access_token and time.time() < self._token_expire_at - 60:
return self._access_token
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
params = {"corpid": self._corp_id, "corpsecret": self._secret}
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(url, params=params)
data = resp.json()
if data.get("errcode") != 0:
raise Exception(f"获取 access_token 失败: {data}")
self._access_token = data["access_token"]
self._token_expire_at = time.time() + data.get("expires_in", 7200)
logger.info("企微 access_token 已更新")
return self._access_token
async def send_alarm_card(
self,
user_ids: List[str],
alarm_id: str,
alarm_type: str,
area_name: str,
camera_name: str,
description: str,
snapshot_url: str,
event_time: str,
alarm_level: int = 2,
service_base_url: str = "",
) -> bool:
"""
发送告警文本卡片
Args:
user_ids: 企微 userid 列表
alarm_id: 告警ID
alarm_type: 告警类型
area_name: 区域名称
camera_name: 摄像头名称
description: VLM 生成的场景描述
snapshot_url: 截图 URL
event_time: 告警时间
alarm_level: 告警级别
Returns:
是否发送成功
"""
if not self._enabled:
logger.debug("企微未启用,跳过发送")
return False
try:
access_token = await self._get_access_token()
# 告警类型中文映射
type_names = {
"leave_post": "人员离岗",
"intrusion": "周界入侵",
}
type_name = type_names.get(alarm_type, alarm_type)
# 告警级别映射
level_names = {1: "提醒", 2: "一般", 3: "严重", 4: "紧急"}
level_name = level_names.get(alarm_level, "一般")
# 构造文本卡片消息
content = (
f"
{level_name} | {type_name}
\n"
f"区域:{area_name}
\n"
f"摄像头:{camera_name}
\n"
f"时间:{event_time}
\n"
f"{description}
"
)
# H5 详情页 URL(企微卡片"查看详情"跳转目标)
detail_url = f"{service_base_url}/static/alarm_detail.html?alarm_id={alarm_id}" if service_base_url else snapshot_url or "https://work.weixin.qq.com"
msg = {
"touser": "|".join(user_ids),
"msgtype": "textcard",
"agentid": int(self._agent_id) if self._agent_id else 0,
"textcard": {
"title": f"【{level_name}】{type_name}告警",
"description": content,
"url": detail_url,
"btntxt": "查看详情",
},
}
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(url, json=msg)
data = resp.json()
if data.get("errcode") != 0:
logger.error(f"企微发送失败: {data}")
return False
logger.info(f"企微通知已发送: alarm={alarm_id}, users={user_ids}")
return True
except Exception as e:
logger.error(f"企微发送异常: {e}")
return False
async def send_text_message(self, user_id: str, content: str) -> bool:
"""发送文本消息给指定用户"""
if not self._enabled:
return False
try:
access_token = await self._get_access_token()
msg = {
"touser": user_id,
"msgtype": "text",
"agentid": int(self._agent_id) if self._agent_id else 0,
"text": {"content": content},
}
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(url, json=msg)
data = resp.json()
if data.get("errcode") != 0:
logger.error(f"企微文本消息发送失败: {data}")
return False
logger.info(f"企微文本消息已发送: user={user_id}")
return True
except Exception as e:
logger.error(f"发送文本消息异常: {e}")
return False
async def send_webhook_alarm(
self,
webhook_url: str,
alarm_id: str,
alarm_type: str,
area_name: str,
camera_name: str,
description: str,
event_time: str,
alarm_level: int = 2,
detail_url: str = "",
) -> bool:
"""通过群机器人 Webhook 发送告警通知(无需IP白名单)"""
type_names = {"leave_post": "人员离岗", "intrusion": "周界入侵"}
level_names = {1: "提醒", 2: "一般", 3: "严重", 4: "紧急"}
type_name = type_names.get(alarm_type, alarm_type)
level_name = level_names.get(alarm_level, "一般")
content = (
f"## 【{level_name}】{type_name}告警\n"
f"> 区域:{area_name}\n"
f"> 摄像头:{camera_name}\n"
f"> 时间:{event_time}\n"
f"> AI描述:**{description}**\n"
)
if detail_url:
content += f"> [查看详情]({detail_url})\n"
msg = {"msgtype": "markdown", "markdown": {"content": content}}
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(webhook_url, json=msg)
data = resp.json()
if data.get("errcode") != 0:
logger.error(f"Webhook发送失败: {data}")
return False
logger.info(f"Webhook告警已发送: alarm={alarm_id}")
return True
except Exception as e:
logger.error(f"Webhook发送异常: {e}")
return False
# 全局单例
_wechat_service: Optional[WeChatService] = None
def get_wechat_service() -> WeChatService:
global _wechat_service
if _wechat_service is None:
_wechat_service = WeChatService()
return _wechat_service