- 新增交互Agent调度器(意图识别 + 工单/查询/报表/闲聊4个Handler) - 新增工单服务、Excel报表生成器、企微消息加解密模块 - VLM提示词优化(角色设定、≤25字描述、布尔值优先输出) - VLM降级策略(入侵默认放行、离岗默认拦截) - 企微演示模式(WECHAT_TEST_UIDS兜底 + SERVICE_BASE_URL修复) - 新增Agent回调路由和测试接口 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
91 lines
3.3 KiB
Python
91 lines
3.3 KiB
Python
"""
|
||
企微消息加解密
|
||
|
||
基于企微官方加解密方案实现 AES-CBC-256 消息加解密和签名验证。
|
||
参考:https://developer.work.weixin.qq.com/document/path/90930
|
||
|
||
注意:需要安装 pycryptodome: pip install pycryptodome
|
||
如果 pycryptodome 未安装,模块会优雅降级,verify_url/decrypt_message 抛出明确异常。
|
||
"""
|
||
|
||
import base64
|
||
import hashlib
|
||
import struct
|
||
import xml.etree.ElementTree as ET
|
||
import logging
|
||
from typing import Dict
|
||
|
||
from app.config import settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
try:
|
||
from Crypto.Cipher import AES
|
||
_HAS_CRYPTO = True
|
||
except ImportError:
|
||
_HAS_CRYPTO = False
|
||
logger.warning("pycryptodome 未安装,企微消息加解密不可用。安装命令: pip install pycryptodome")
|
||
|
||
|
||
class WeChatCrypto:
|
||
"""企微消息加解密"""
|
||
|
||
def __init__(self):
|
||
self._token = settings.wechat.token
|
||
key = settings.wechat.encoding_aes_key
|
||
if key and len(key) == 43:
|
||
self._aes_key = base64.b64decode(key + "=")
|
||
else:
|
||
self._aes_key = b""
|
||
|
||
def verify_url(self, msg_signature: str, timestamp: str, nonce: str, echostr: str) -> str:
|
||
"""验证回调URL,返回解密后的echostr"""
|
||
if not _HAS_CRYPTO:
|
||
raise RuntimeError("pycryptodome 未安装,无法验证URL")
|
||
if not self._aes_key:
|
||
raise ValueError("EncodingAESKey 未配置")
|
||
|
||
self._check_signature(msg_signature, timestamp, nonce, echostr)
|
||
return self._decrypt(echostr)
|
||
|
||
def decrypt_message(self, xml_body: bytes, msg_signature: str, timestamp: str, nonce: str) -> Dict[str, str]:
|
||
"""解密企微消息XML,返回消息字典"""
|
||
if not _HAS_CRYPTO:
|
||
raise RuntimeError("pycryptodome 未安装,无法解密消息")
|
||
if not self._aes_key:
|
||
raise ValueError("EncodingAESKey 未配置")
|
||
|
||
root = ET.fromstring(xml_body)
|
||
encrypt_node = root.find("Encrypt")
|
||
if encrypt_node is None or not encrypt_node.text:
|
||
raise ValueError("XML中缺少Encrypt节点")
|
||
|
||
encrypt = encrypt_node.text
|
||
self._check_signature(msg_signature, timestamp, nonce, encrypt)
|
||
decrypted_xml = self._decrypt(encrypt)
|
||
|
||
msg_root = ET.fromstring(decrypted_xml)
|
||
return {child.tag: (child.text or "") for child in msg_root}
|
||
|
||
def _check_signature(self, msg_signature: str, timestamp: str, nonce: str, encrypt: str):
|
||
"""校验签名"""
|
||
items = sorted([self._token, timestamp, nonce, encrypt])
|
||
sha1 = hashlib.sha1("".join(items).encode("utf-8")).hexdigest()
|
||
if sha1 != msg_signature:
|
||
raise ValueError(f"签名验证失败: expected={sha1}, got={msg_signature}")
|
||
|
||
def _decrypt(self, text: str) -> str:
|
||
"""AES-CBC 解密"""
|
||
cipher = AES.new(self._aes_key, AES.MODE_CBC, iv=self._aes_key[:16])
|
||
decrypted = cipher.decrypt(base64.b64decode(text))
|
||
# 去除PKCS7填充
|
||
pad = decrypted[-1]
|
||
if isinstance(pad, int):
|
||
content = decrypted[:-pad]
|
||
else:
|
||
content = decrypted[:-ord(pad)]
|
||
# 去除16字节随机串 + 4字节消息长度
|
||
msg_len = struct.unpack("!I", content[16:20])[0]
|
||
msg = content[20:20 + msg_len].decode("utf-8")
|
||
return msg
|