1. wechat_service: save/get_response_code 改为内存+数据库双写, 容器重启后边缘resolve仍能更新企微卡片 2. work_order_client: 请求头加 tenant-id,签名公式加 query_str 参数 3. config: WorkOrderConfig 新增 tenant_id 字段 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
647 lines
24 KiB
Python
647 lines
24 KiB
Python
"""
|
||
企微通知服务
|
||
|
||
封装企业微信 API,支持:
|
||
- 个人消息:button_interaction 模板卡片(原生按钮交互)
|
||
- 群聊消息:image + news + @text 组合消息
|
||
- 媒体上传:图片上传获取 media_id
|
||
- 卡片更新:按钮点击后更新卡片状态
|
||
"""
|
||
|
||
import httpx
|
||
import time
|
||
from typing import Optional, List, Dict
|
||
|
||
from app.utils.logger import logger
|
||
|
||
# 告警类型中文映射(全局复用)
|
||
ALARM_TYPE_NAMES = {
|
||
"leave_post": "人员离岗",
|
||
"intrusion": "周界入侵",
|
||
}
|
||
|
||
# 告警级别映射
|
||
ALARM_LEVEL_NAMES = {1: "提醒", 2: "一般", 3: "严重", 4: "紧急"}
|
||
|
||
|
||
class WeChatService:
|
||
"""企微通知服务(单例)"""
|
||
|
||
def __init__(self):
|
||
self._enabled = False
|
||
self._corp_id = ""
|
||
self._agent_id = ""
|
||
self._secret = ""
|
||
self._token = ""
|
||
self._encoding_aes_key = ""
|
||
self._access_token = ""
|
||
self._token_expire_at = 0
|
||
# 缓存 response_code,用于更新卡片状态
|
||
# key: task_id (alarm_id), value: response_code
|
||
self._response_codes: Dict[str, str] = {}
|
||
|
||
def init(self, config):
|
||
"""初始化企微配置"""
|
||
self._enabled = config.enabled and bool(config.corp_id) and bool(config.secret)
|
||
self._corp_id = config.corp_id
|
||
self._agent_id = config.agent_id
|
||
self._secret = config.secret
|
||
self._token = config.token
|
||
self._encoding_aes_key = config.encoding_aes_key
|
||
|
||
if self._enabled:
|
||
logger.info(f"企微通知服务已启用: corp_id={self._corp_id}")
|
||
else:
|
||
logger.info("企微通知服务未启用")
|
||
|
||
@property
|
||
def enabled(self) -> bool:
|
||
return self._enabled
|
||
|
||
@property
|
||
def agent_id_int(self) -> int:
|
||
return int(self._agent_id) if self._agent_id else 0
|
||
|
||
async def _get_access_token(self) -> str:
|
||
"""获取企微 access_token(带缓存)"""
|
||
if self._access_token and time.time() < self._token_expire_at - 60:
|
||
return self._access_token
|
||
|
||
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
|
||
params = {"corpid": self._corp_id, "corpsecret": self._secret}
|
||
|
||
async with httpx.AsyncClient(timeout=10) as client:
|
||
resp = await client.get(url, params=params)
|
||
data = resp.json()
|
||
|
||
if data.get("errcode") != 0:
|
||
raise Exception(f"获取 access_token 失败: {data}")
|
||
|
||
self._access_token = data["access_token"]
|
||
self._token_expire_at = time.time() + data.get("expires_in", 7200)
|
||
logger.info("企微 access_token 已更新")
|
||
return self._access_token
|
||
|
||
def save_response_code(self, task_id: str, response_code: str):
|
||
"""保存卡片的 response_code(内存缓存 + 数据库持久化)"""
|
||
self._response_codes[task_id] = response_code
|
||
try:
|
||
from app.models import get_session, AlarmEventExt
|
||
db = get_session()
|
||
try:
|
||
ext = db.query(AlarmEventExt).filter(
|
||
AlarmEventExt.alarm_id == task_id,
|
||
AlarmEventExt.ext_type == "WECHAT_RESPONSE_CODE",
|
||
).first()
|
||
if ext:
|
||
ext.ext_data = {"response_code": response_code}
|
||
else:
|
||
ext = AlarmEventExt(
|
||
alarm_id=task_id,
|
||
ext_type="WECHAT_RESPONSE_CODE",
|
||
ext_data={"response_code": response_code},
|
||
)
|
||
db.add(ext)
|
||
db.commit()
|
||
finally:
|
||
db.close()
|
||
except Exception as e:
|
||
logger.warning(f"持久化 response_code 失败: {e}")
|
||
|
||
def get_response_code(self, task_id: str) -> Optional[str]:
|
||
"""获取 response_code(优先内存缓存,回退数据库查询)"""
|
||
code = self._response_codes.pop(task_id, None)
|
||
if code:
|
||
return code
|
||
try:
|
||
from app.models import get_session, AlarmEventExt
|
||
db = get_session()
|
||
try:
|
||
ext = db.query(AlarmEventExt).filter(
|
||
AlarmEventExt.alarm_id == task_id,
|
||
AlarmEventExt.ext_type == "WECHAT_RESPONSE_CODE",
|
||
).first()
|
||
if ext and ext.ext_data:
|
||
return ext.ext_data.get("response_code", "")
|
||
finally:
|
||
db.close()
|
||
except Exception as e:
|
||
logger.warning(f"查询 response_code 失败: {e}")
|
||
return None
|
||
|
||
# ==================== 媒体上传 ====================
|
||
|
||
async def upload_media(self, image_data: bytes, filename: str = "alarm.jpg") -> Optional[str]:
|
||
"""
|
||
上传临时素材到企微,返回 media_id(3天有效)
|
||
|
||
用于群聊发送图片消息。
|
||
"""
|
||
if not self._enabled:
|
||
return None
|
||
|
||
try:
|
||
access_token = await self._get_access_token()
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/media/upload?access_token={access_token}&type=image"
|
||
|
||
async with httpx.AsyncClient(timeout=30) as client:
|
||
files = {"media": (filename, image_data, "image/jpeg")}
|
||
resp = await client.post(url, files=files)
|
||
data = resp.json()
|
||
|
||
if data.get("errcode") and data.get("errcode") != 0:
|
||
logger.error(f"企微媒体上传失败: {data}")
|
||
return None
|
||
|
||
media_id = data.get("media_id", "")
|
||
logger.info(f"企微媒体上传成功: media_id={media_id[:20]}...")
|
||
return media_id
|
||
|
||
except Exception as e:
|
||
logger.error(f"企微媒体上传异常: {e}")
|
||
return None
|
||
|
||
async def _download_image(self, image_url: str) -> Optional[bytes]:
|
||
"""从 URL 下载图片数据"""
|
||
try:
|
||
async with httpx.AsyncClient(timeout=20, follow_redirects=True) as client:
|
||
resp = await client.get(image_url)
|
||
if resp.status_code == 200:
|
||
return resp.content
|
||
logger.error(f"下载图片失败: status={resp.status_code}, url={image_url[:80]}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"下载图片异常: {e}")
|
||
return None
|
||
|
||
async def upload_media_from_url(self, image_url: str) -> Optional[str]:
|
||
"""从 URL 下载图片后上传到企微,返回 media_id"""
|
||
image_data = await self._download_image(image_url)
|
||
if not image_data:
|
||
return None
|
||
return await self.upload_media(image_data)
|
||
|
||
# ==================== 个人消息:按钮交互型模板卡片 ====================
|
||
|
||
async def send_alarm_card(
|
||
self,
|
||
user_ids: List[str],
|
||
alarm_id: str,
|
||
alarm_type: str,
|
||
area_name: str,
|
||
camera_name: str,
|
||
description: str,
|
||
event_time: str,
|
||
alarm_level: int = 2,
|
||
) -> bool:
|
||
"""
|
||
发送按钮交互型模板卡片(个人消息)
|
||
|
||
卡片展示告警信息 + 「前往处理」「误报忽略」按钮,
|
||
用户点击按钮后企微回调服务器。
|
||
"""
|
||
if not self._enabled:
|
||
logger.debug("企微未启用,跳过发送")
|
||
return False
|
||
|
||
try:
|
||
access_token = await self._get_access_token()
|
||
|
||
type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type)
|
||
level_name = ALARM_LEVEL_NAMES.get(alarm_level, "一般")
|
||
|
||
msg = {
|
||
"touser": "|".join(user_ids),
|
||
"msgtype": "template_card",
|
||
"agentid": self.agent_id_int,
|
||
"template_card": {
|
||
"card_type": "button_interaction",
|
||
"task_id": alarm_id,
|
||
"source": {
|
||
"desc": "AI安防告警",
|
||
"desc_color": 3 if alarm_level >= 3 else 0,
|
||
},
|
||
"main_title": {
|
||
"title": f"【{level_name}】{type_name}告警",
|
||
"desc": description or f"{area_name} 检测到{type_name}",
|
||
},
|
||
"sub_title_text": "请相关人员及时处理",
|
||
"horizontal_content_list": [
|
||
{"keyname": "告警区域", "value": area_name or "未知区域"},
|
||
{"keyname": "摄像头", "value": camera_name or "未知"},
|
||
{"keyname": "告警时间", "value": event_time},
|
||
{"keyname": "告警级别", "value": level_name},
|
||
],
|
||
"card_action": {
|
||
"type": 1,
|
||
"url": "https://work.weixin.qq.com",
|
||
},
|
||
"button_list": [
|
||
{
|
||
"text": "确认接单",
|
||
"style": 1,
|
||
"key": f"confirm_{alarm_id}",
|
||
},
|
||
{
|
||
"text": "误报忽略",
|
||
"style": 2,
|
||
"key": f"ignore_{alarm_id}",
|
||
},
|
||
],
|
||
},
|
||
}
|
||
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
|
||
async with httpx.AsyncClient(timeout=10) as client:
|
||
resp = await client.post(url, json=msg)
|
||
data = resp.json()
|
||
|
||
if data.get("errcode") != 0:
|
||
logger.error(f"企微卡片发送失败: {data}")
|
||
return False
|
||
|
||
response_code = data.get("response_code", "")
|
||
if response_code:
|
||
self.save_response_code(alarm_id, response_code)
|
||
|
||
logger.info(f"企微卡片已发送: alarm={alarm_id}, users={user_ids}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"企微卡片发送异常: {e}")
|
||
return False
|
||
|
||
async def update_alarm_card_step2(
|
||
self,
|
||
response_code: str,
|
||
user_ids: List[str],
|
||
alarm_id: str,
|
||
operator_name: str = "",
|
||
) -> bool:
|
||
"""
|
||
更新卡片到第二步:确认接单后显示「已处理完成」「标记误报」按钮
|
||
|
||
利用 update_template_card 将卡片更新为新的交互卡片(非终态),
|
||
用户点击后会生成新的 response_code,实现链式更新。
|
||
"""
|
||
if not self._enabled:
|
||
return False
|
||
|
||
try:
|
||
access_token = await self._get_access_token()
|
||
|
||
body = {
|
||
"userids": user_ids,
|
||
"agentid": self.agent_id_int,
|
||
"response_code": response_code,
|
||
"template_card": {
|
||
"card_type": "button_interaction",
|
||
"task_id": alarm_id,
|
||
"source": {
|
||
"desc": "AI安防告警 - 处理中",
|
||
"desc_color": 1,
|
||
},
|
||
"main_title": {
|
||
"title": f"已接单 - {operator_name}" if operator_name else "已接单",
|
||
},
|
||
"sub_title_text": "请完成处理后选择操作",
|
||
"button_list": [
|
||
{
|
||
"text": "已处理完成",
|
||
"style": 1,
|
||
"key": f"complete_{alarm_id}",
|
||
},
|
||
{
|
||
"text": "标记误报",
|
||
"style": 2,
|
||
"key": f"false_{alarm_id}",
|
||
},
|
||
],
|
||
},
|
||
}
|
||
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/update_template_card?access_token={access_token}"
|
||
async with httpx.AsyncClient(timeout=10) as client:
|
||
resp = await client.post(url, json=body)
|
||
data = resp.json()
|
||
|
||
if data.get("errcode") != 0:
|
||
logger.error(f"更新卡片到步骤2失败: {data}")
|
||
return False
|
||
|
||
logger.info(f"卡片已更新到步骤2: alarm={alarm_id}, operator={operator_name}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"更新卡片步骤2异常: {e}")
|
||
return False
|
||
|
||
async def update_alarm_card_terminal(
|
||
self,
|
||
response_code: str,
|
||
user_ids: List[str],
|
||
alarm_id: str,
|
||
action: str,
|
||
operator_name: str = "",
|
||
) -> bool:
|
||
"""
|
||
更新卡片到终态(仅替换按钮文案,不重绘整张卡片)
|
||
|
||
使用 button.replace_name 方式更新,原卡片内容保持不变,
|
||
按钮变为不可点击状态并显示处理结果文案。
|
||
"""
|
||
if not self._enabled:
|
||
return False
|
||
|
||
try:
|
||
access_token = await self._get_access_token()
|
||
|
||
action_text = {
|
||
"complete": f"已处理 - {operator_name}" if operator_name else "已处理",
|
||
"false": f"已标记误报 - {operator_name}" if operator_name else "已标记误报",
|
||
"ignore": f"已忽略 - {operator_name}" if operator_name else "已忽略",
|
||
"auto_resolve": "系统自动结单",
|
||
}
|
||
replace_text = action_text.get(action, "已处理")
|
||
|
||
body = {
|
||
"userids": user_ids,
|
||
"agentid": self.agent_id_int,
|
||
"response_code": response_code,
|
||
"button": {
|
||
"replace_name": replace_text,
|
||
},
|
||
}
|
||
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/update_template_card?access_token={access_token}"
|
||
async with httpx.AsyncClient(timeout=10) as client:
|
||
resp = await client.post(url, json=body)
|
||
data = resp.json()
|
||
|
||
if data.get("errcode") != 0:
|
||
logger.error(f"更新卡片终态失败: {data}")
|
||
return False
|
||
|
||
logger.info(f"卡片已更新到终态: alarm={alarm_id}, action={action}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"更新卡片终态异常: {e}")
|
||
return False
|
||
|
||
# ==================== 个人消息:文本 ====================
|
||
|
||
async def send_text_message(self, user_id: str, content: str) -> bool:
|
||
"""发送文本消息给指定用户"""
|
||
if not self._enabled:
|
||
return False
|
||
try:
|
||
access_token = await self._get_access_token()
|
||
msg = {
|
||
"touser": user_id,
|
||
"msgtype": "text",
|
||
"agentid": self.agent_id_int,
|
||
"text": {"content": content},
|
||
}
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
|
||
async with httpx.AsyncClient(timeout=10) as client:
|
||
resp = await client.post(url, json=msg)
|
||
data = resp.json()
|
||
if data.get("errcode") != 0:
|
||
logger.error(f"企微文本消息发送失败: {data}")
|
||
return False
|
||
logger.info(f"企微文本消息已发送: user={user_id}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"发送文本消息异常: {e}")
|
||
return False
|
||
|
||
# ==================== 群聊消息 ====================
|
||
|
||
async def create_group_chat(
|
||
self,
|
||
name: str,
|
||
owner: str,
|
||
user_list: List[str],
|
||
chat_id: str = "",
|
||
) -> Optional[str]:
|
||
"""创建企微群聊"""
|
||
if not self._enabled:
|
||
return None
|
||
try:
|
||
access_token = await self._get_access_token()
|
||
body = {
|
||
"name": name,
|
||
"owner": owner,
|
||
"userlist": user_list,
|
||
}
|
||
if chat_id:
|
||
body["chatid"] = chat_id
|
||
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/appchat/create?access_token={access_token}"
|
||
async with httpx.AsyncClient(timeout=10) as client:
|
||
resp = await client.post(url, json=body)
|
||
data = resp.json()
|
||
|
||
if data.get("errcode") != 0:
|
||
logger.error(f"创建群聊失败: {data}")
|
||
return None
|
||
|
||
chatid = data.get("chatid", "")
|
||
logger.info(f"群聊已创建: name={name}, chatid={chatid}")
|
||
return chatid
|
||
except Exception as e:
|
||
logger.error(f"创建群聊异常: {e}")
|
||
return None
|
||
|
||
async def send_group_text(self, chat_id: str, content: str) -> bool:
|
||
"""发送文本消息到群聊"""
|
||
if not self._enabled:
|
||
return False
|
||
try:
|
||
access_token = await self._get_access_token()
|
||
msg = {
|
||
"chatid": chat_id,
|
||
"msgtype": "text",
|
||
"text": {"content": content},
|
||
}
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/appchat/send?access_token={access_token}"
|
||
async with httpx.AsyncClient(timeout=10) as client:
|
||
resp = await client.post(url, json=msg)
|
||
data = resp.json()
|
||
if data.get("errcode") != 0:
|
||
logger.error(f"群聊文本发送失败: {data}")
|
||
return False
|
||
logger.info(f"群聊文本已发送: chatid={chat_id}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"发送群聊文本异常: {e}")
|
||
return False
|
||
|
||
async def send_group_markdown(self, chat_id: str, content: str) -> bool:
|
||
"""发送 markdown 消息到群聊(@人员使用此方式)"""
|
||
if not self._enabled:
|
||
return False
|
||
try:
|
||
access_token = await self._get_access_token()
|
||
msg = {
|
||
"chatid": chat_id,
|
||
"msgtype": "markdown",
|
||
"markdown": {"content": content},
|
||
}
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/appchat/send?access_token={access_token}"
|
||
async with httpx.AsyncClient(timeout=10) as client:
|
||
resp = await client.post(url, json=msg)
|
||
data = resp.json()
|
||
if data.get("errcode") != 0:
|
||
logger.error(f"群聊markdown发送失败: {data}")
|
||
return False
|
||
logger.info(f"群聊markdown已发送: chatid={chat_id}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"发送群聊markdown异常: {e}")
|
||
return False
|
||
|
||
async def send_group_image(self, chat_id: str, media_id: str) -> bool:
|
||
"""发送图片消息到群聊"""
|
||
if not self._enabled:
|
||
return False
|
||
try:
|
||
access_token = await self._get_access_token()
|
||
msg = {
|
||
"chatid": chat_id,
|
||
"msgtype": "image",
|
||
"image": {"media_id": media_id},
|
||
}
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/appchat/send?access_token={access_token}"
|
||
async with httpx.AsyncClient(timeout=10) as client:
|
||
resp = await client.post(url, json=msg)
|
||
data = resp.json()
|
||
if data.get("errcode") != 0:
|
||
logger.error(f"群聊图片发送失败: {data}")
|
||
return False
|
||
logger.info(f"群聊图片已发送: chatid={chat_id}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"发送群聊图片异常: {e}")
|
||
return False
|
||
|
||
async def send_group_news(
|
||
self,
|
||
chat_id: str,
|
||
title: str,
|
||
description: str,
|
||
url: str = "",
|
||
picurl: str = "",
|
||
) -> bool:
|
||
"""发送图文(news)消息到群聊"""
|
||
if not self._enabled:
|
||
return False
|
||
try:
|
||
access_token = await self._get_access_token()
|
||
article = {
|
||
"title": title,
|
||
"description": description,
|
||
"url": url or "https://work.weixin.qq.com",
|
||
}
|
||
if picurl:
|
||
article["picurl"] = picurl
|
||
|
||
msg = {
|
||
"chatid": chat_id,
|
||
"msgtype": "news",
|
||
"news": {"articles": [article]},
|
||
}
|
||
api_url = f"https://qyapi.weixin.qq.com/cgi-bin/appchat/send?access_token={access_token}"
|
||
async with httpx.AsyncClient(timeout=10) as client:
|
||
resp = await client.post(api_url, json=msg)
|
||
data = resp.json()
|
||
if data.get("errcode") != 0:
|
||
logger.error(f"群聊图文发送失败: {data}")
|
||
return False
|
||
logger.info(f"群聊图文已发送: chatid={chat_id}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"发送群聊图文异常: {e}")
|
||
return False
|
||
|
||
async def send_group_alarm_combo(
|
||
self,
|
||
chat_id: str,
|
||
alarm_id: str,
|
||
alarm_type: str,
|
||
area_name: str,
|
||
camera_name: str,
|
||
description: str,
|
||
event_time: str,
|
||
alarm_level: int = 2,
|
||
snapshot_url: str = "",
|
||
mention_user_ids: Optional[List[str]] = None,
|
||
) -> bool:
|
||
"""
|
||
发送告警组合消息到群聊(3条消息)
|
||
|
||
1. image: 告警截图
|
||
2. news: 告警详情图文卡片
|
||
3. text: @相关人员提醒处理
|
||
"""
|
||
if not self._enabled:
|
||
return False
|
||
|
||
type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type)
|
||
level_name = ALARM_LEVEL_NAMES.get(alarm_level, "一般")
|
||
success = True
|
||
|
||
# ---- 1. 发送告警截图(image 消息) ----
|
||
if snapshot_url:
|
||
media_id = await self.upload_media_from_url(snapshot_url)
|
||
if media_id:
|
||
sent = await self.send_group_image(chat_id, media_id)
|
||
if not sent:
|
||
success = False
|
||
logger.warning(f"群聊截图发送失败: alarm={alarm_id}")
|
||
else:
|
||
logger.warning(f"截图上传企微失败,跳过图片消息: alarm={alarm_id}")
|
||
|
||
# ---- 2. 发送告警详情(news 图文卡片) ----
|
||
news_title = f"【{level_name}】{type_name}告警"
|
||
news_desc = (
|
||
f"{description}\n\n"
|
||
f"告警区域:{area_name or '未知区域'}\n"
|
||
f"摄像头:{camera_name or '未知'}\n"
|
||
f"告警时间:{event_time}\n"
|
||
f"告警ID:{alarm_id}"
|
||
)
|
||
# news 卡片的 picurl 可用 COS 预签名 URL(缩略图)
|
||
sent = await self.send_group_news(
|
||
chat_id=chat_id,
|
||
title=news_title,
|
||
description=news_desc,
|
||
picurl=snapshot_url if snapshot_url and snapshot_url.startswith("http") else "",
|
||
)
|
||
if not sent:
|
||
success = False
|
||
|
||
# ---- 3. @相关人员(markdown 消息,@渲染更可靠) ----
|
||
if mention_user_ids:
|
||
mentions = " ".join(f"<@{uid}>" for uid in mention_user_ids)
|
||
md_content = f"{mentions} 请及时处理以上**{type_name}告警**"
|
||
sent = await self.send_group_markdown(chat_id, md_content)
|
||
if not sent:
|
||
success = False
|
||
|
||
if success:
|
||
logger.info(f"群聊组合消息已发送: alarm={alarm_id}, chatid={chat_id}")
|
||
return success
|
||
|
||
|
||
# 全局单例
|
||
_wechat_service: Optional[WeChatService] = None
|
||
|
||
|
||
def get_wechat_service() -> WeChatService:
|
||
global _wechat_service
|
||
if _wechat_service is None:
|
||
_wechat_service = WeChatService()
|
||
return _wechat_service
|