Files
iot-device-management-service/app/services/wechat_service.py
16337 c36488c6e6 feat: 企微卡片升级为原生按钮交互型模板卡片
1. wechat_service.py:
   - send_alarm_card 从 textcard 升级为 button_interaction 模板卡片
   - 卡片直接在对话框展示告警信息 + 操作按钮(前往处理/误报忽略)
   - 新增 update_alarm_card 方法:点击按钮后更新卡片状态(按钮变灰)
   - 保留群聊消息能力(create_group_chat/send_group_text)

2. wechat_callback.py:
   - 回调支持 template_card_event 按钮点击事件
   - 按钮点击自动更新告警状态(handle→HANDLING, ignore→IGNORED)
   - 通过 response_code 更新卡片按钮为已处理状态
   - H5 详情页 snapshot_url 增加 COS key→预签名URL 转换
   - 新增群聊创建和群聊发卡片的测试接口

3. vlm_service.py:
   - VLM 降级策略统一放行(所有类型 confirmed=True)
   - 避免 VLM 不可用时离岗告警无法推送

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 14:36:58 +08:00

354 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
企微通知服务
封装企业微信 API发送告警模板卡片按钮交互型
用户直接在对话框中点击按钮处理告警,无需跳转 H5 页面。
"""
import httpx
import time
from typing import Optional, List, Dict
from app.utils.logger import logger
class WeChatService:
"""企微通知服务(单例)"""
def __init__(self):
self._enabled = False
self._corp_id = ""
self._agent_id = ""
self._secret = ""
self._token = ""
self._encoding_aes_key = ""
self._access_token = ""
self._token_expire_at = 0
# 缓存 response_code用于更新卡片状态
# key: task_id (alarm_id), value: response_code
self._response_codes: Dict[str, str] = {}
def init(self, config):
"""初始化企微配置"""
self._enabled = config.enabled and bool(config.corp_id) and bool(config.secret)
self._corp_id = config.corp_id
self._agent_id = config.agent_id
self._secret = config.secret
self._token = config.token
self._encoding_aes_key = config.encoding_aes_key
if self._enabled:
logger.info(f"企微通知服务已启用: corp_id={self._corp_id}")
else:
logger.info("企微通知服务未启用")
@property
def enabled(self) -> bool:
return self._enabled
async def _get_access_token(self) -> str:
"""获取企微 access_token带缓存"""
if self._access_token and time.time() < self._token_expire_at - 60:
return self._access_token
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
params = {"corpid": self._corp_id, "corpsecret": self._secret}
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(url, params=params)
data = resp.json()
if data.get("errcode") != 0:
raise Exception(f"获取 access_token 失败: {data}")
self._access_token = data["access_token"]
self._token_expire_at = time.time() + data.get("expires_in", 7200)
logger.info("企微 access_token 已更新")
return self._access_token
def save_response_code(self, task_id: str, response_code: str):
"""保存卡片的 response_code用于后续更新卡片状态"""
self._response_codes[task_id] = response_code
def get_response_code(self, task_id: str) -> Optional[str]:
"""获取并消耗 response_code只能用一次"""
return self._response_codes.pop(task_id, None)
async def send_alarm_card(
self,
user_ids: List[str],
alarm_id: str,
alarm_type: str,
area_name: str,
camera_name: str,
description: str,
snapshot_url: str,
event_time: str,
alarm_level: int = 2,
service_base_url: str = "",
) -> bool:
"""
发送按钮交互型模板卡片
卡片直接在对话框中展示告警信息 + 操作按钮,
用户点击按钮后企微回调服务器,无需跳转 H5 页面。
"""
if not self._enabled:
logger.debug("企微未启用,跳过发送")
return False
try:
access_token = await self._get_access_token()
# 告警类型中文映射
type_names = {
"leave_post": "人员离岗",
"intrusion": "周界入侵",
}
type_name = type_names.get(alarm_type, alarm_type)
# 告警级别映射
level_names = {1: "提醒", 2: "一般", 3: "严重", 4: "紧急"}
level_name = level_names.get(alarm_level, "一般")
level_colors = {1: "blue", 2: "yellow", 3: "red", 4: "red"}
# H5 详情页 URL点击卡片标题跳转可查看截图
detail_url = (
f"{service_base_url}/static/alarm_detail.html?alarm_id={alarm_id}"
if service_base_url
else "https://work.weixin.qq.com"
)
# 构造按钮交互型模板卡片
msg = {
"touser": "|".join(user_ids),
"msgtype": "template_card",
"agentid": int(self._agent_id) if self._agent_id else 0,
"template_card": {
"card_type": "button_interaction",
"task_id": alarm_id,
"main_title": {
"title": f"{level_name}{type_name}告警",
"desc": description or f"{area_name} 检测到{type_name}",
},
"sub_title_text": f"请相关人员及时处理",
"horizontal_content_list": [
{
"keyname": "告警区域",
"value": area_name or "未知区域",
},
{
"keyname": "摄像头",
"value": camera_name or "未知",
},
{
"keyname": "告警时间",
"value": event_time,
},
{
"keyname": "告警级别",
"value": level_name,
},
],
"card_action": {
"type": 1,
"url": detail_url,
},
"button_list": [
{
"text": "前往处理",
"style": 1,
"key": f"handle_{alarm_id}",
},
{
"text": "误报忽略",
"style": 2,
"key": f"ignore_{alarm_id}",
},
],
},
}
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(url, json=msg)
data = resp.json()
if data.get("errcode") != 0:
logger.error(f"企微发送失败: {data}")
return False
# 保存 response_code 用于后续更新卡片
response_code = data.get("response_code", "")
if response_code:
self.save_response_code(alarm_id, response_code)
logger.info(f"企微模板卡片已发送: alarm={alarm_id}, users={user_ids}")
return True
except Exception as e:
logger.error(f"企微发送异常: {e}")
return False
async def update_alarm_card(
self,
response_code: str,
user_ids: List[str],
alarm_id: str,
action: str,
operator_name: str = "",
) -> bool:
"""
更新模板卡片状态(按钮变灰 + 显示处理结果)
Args:
response_code: 企微回调提供的 response_code
user_ids: 目标用户列表
alarm_id: 告警ID
action: 操作类型 (handle/ignore/complete)
operator_name: 操作人名称
"""
if not self._enabled:
return False
try:
access_token = await self._get_access_token()
action_text = {
"handle": f"处理中 - {operator_name}" if operator_name else "处理中",
"ignore": f"已忽略 - {operator_name}" if operator_name else "已忽略",
"complete": f"已处理 - {operator_name}" if operator_name else "已处理",
}
replace_text = action_text.get(action, "已处理")
body = {
"userids": user_ids,
"agentid": int(self._agent_id) if self._agent_id else 0,
"response_code": response_code,
"template_card": {
"card_type": "button_interaction",
"task_id": alarm_id,
"button_list": [
{
"text": replace_text,
"style": 3,
"key": f"done_{alarm_id}",
},
],
},
}
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/update_template_card?access_token={access_token}"
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(url, json=body)
data = resp.json()
if data.get("errcode") != 0:
logger.error(f"更新卡片失败: {data}")
return False
logger.info(f"卡片已更新: alarm={alarm_id}, action={action}")
return True
except Exception as e:
logger.error(f"更新卡片异常: {e}")
return False
async def send_text_message(self, user_id: str, content: str) -> bool:
"""发送文本消息给指定用户"""
if not self._enabled:
return False
try:
access_token = await self._get_access_token()
msg = {
"touser": user_id,
"msgtype": "text",
"agentid": int(self._agent_id) if self._agent_id else 0,
"text": {"content": content},
}
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(url, json=msg)
data = resp.json()
if data.get("errcode") != 0:
logger.error(f"企微文本消息发送失败: {data}")
return False
logger.info(f"企微文本消息已发送: user={user_id}")
return True
except Exception as e:
logger.error(f"发送文本消息异常: {e}")
return False
# ==================== 群聊消息 ====================
async def create_group_chat(
self,
name: str,
owner: str,
user_list: List[str],
chat_id: str = "",
) -> Optional[str]:
"""创建企微群聊"""
if not self._enabled:
return None
try:
access_token = await self._get_access_token()
body = {
"name": name,
"owner": owner,
"userlist": user_list,
}
if chat_id:
body["chatid"] = chat_id
url = f"https://qyapi.weixin.qq.com/cgi-bin/appchat/create?access_token={access_token}"
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(url, json=body)
data = resp.json()
if data.get("errcode") != 0:
logger.error(f"创建群聊失败: {data}")
return None
chatid = data.get("chatid", "")
logger.info(f"群聊已创建: name={name}, chatid={chatid}")
return chatid
except Exception as e:
logger.error(f"创建群聊异常: {e}")
return None
async def send_group_text(self, chat_id: str, content: str) -> bool:
"""发送文本消息到群聊(支持 <@userid> 语法 @人员)"""
if not self._enabled:
return False
try:
access_token = await self._get_access_token()
msg = {
"chatid": chat_id,
"msgtype": "text",
"text": {"content": content},
}
url = f"https://qyapi.weixin.qq.com/cgi-bin/appchat/send?access_token={access_token}"
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(url, json=msg)
data = resp.json()
if data.get("errcode") != 0:
logger.error(f"群聊消息发送失败: {data}")
return False
logger.info(f"群聊消息已发送: chatid={chat_id}")
return True
except Exception as e:
logger.error(f"发送群聊消息异常: {e}")
return False
# 全局单例
_wechat_service: Optional[WeChatService] = None
def get_wechat_service() -> WeChatService:
global _wechat_service
if _wechat_service is None:
_wechat_service = WeChatService()
return _wechat_service