91 lines
3.3 KiB
Python
91 lines
3.3 KiB
Python
|
|
"""
|
|||
|
|
企微消息加解密
|
|||
|
|
|
|||
|
|
基于企微官方加解密方案实现 AES-CBC-256 消息加解密和签名验证。
|
|||
|
|
参考:https://developer.work.weixin.qq.com/document/path/90930
|
|||
|
|
|
|||
|
|
注意:需要安装 pycryptodome: pip install pycryptodome
|
|||
|
|
如果 pycryptodome 未安装,模块会优雅降级,verify_url/decrypt_message 抛出明确异常。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import base64
|
|||
|
|
import hashlib
|
|||
|
|
import struct
|
|||
|
|
import xml.etree.ElementTree as ET
|
|||
|
|
import logging
|
|||
|
|
from typing import Dict
|
|||
|
|
|
|||
|
|
from app.config import settings
|
|||
|
|
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
from Crypto.Cipher import AES
|
|||
|
|
_HAS_CRYPTO = True
|
|||
|
|
except ImportError:
|
|||
|
|
_HAS_CRYPTO = False
|
|||
|
|
logger.warning("pycryptodome 未安装,企微消息加解密不可用。安装命令: pip install pycryptodome")
|
|||
|
|
|
|||
|
|
|
|||
|
|
class WeChatCrypto:
|
|||
|
|
"""企微消息加解密"""
|
|||
|
|
|
|||
|
|
def __init__(self):
|
|||
|
|
self._token = settings.wechat.token
|
|||
|
|
key = settings.wechat.encoding_aes_key
|
|||
|
|
if key and len(key) == 43:
|
|||
|
|
self._aes_key = base64.b64decode(key + "=")
|
|||
|
|
else:
|
|||
|
|
self._aes_key = b""
|
|||
|
|
|
|||
|
|
def verify_url(self, msg_signature: str, timestamp: str, nonce: str, echostr: str) -> str:
|
|||
|
|
"""验证回调URL,返回解密后的echostr"""
|
|||
|
|
if not _HAS_CRYPTO:
|
|||
|
|
raise RuntimeError("pycryptodome 未安装,无法验证URL")
|
|||
|
|
if not self._aes_key:
|
|||
|
|
raise ValueError("EncodingAESKey 未配置")
|
|||
|
|
|
|||
|
|
self._check_signature(msg_signature, timestamp, nonce, echostr)
|
|||
|
|
return self._decrypt(echostr)
|
|||
|
|
|
|||
|
|
def decrypt_message(self, xml_body: bytes, msg_signature: str, timestamp: str, nonce: str) -> Dict[str, str]:
|
|||
|
|
"""解密企微消息XML,返回消息字典"""
|
|||
|
|
if not _HAS_CRYPTO:
|
|||
|
|
raise RuntimeError("pycryptodome 未安装,无法解密消息")
|
|||
|
|
if not self._aes_key:
|
|||
|
|
raise ValueError("EncodingAESKey 未配置")
|
|||
|
|
|
|||
|
|
root = ET.fromstring(xml_body)
|
|||
|
|
encrypt_node = root.find("Encrypt")
|
|||
|
|
if encrypt_node is None or not encrypt_node.text:
|
|||
|
|
raise ValueError("XML中缺少Encrypt节点")
|
|||
|
|
|
|||
|
|
encrypt = encrypt_node.text
|
|||
|
|
self._check_signature(msg_signature, timestamp, nonce, encrypt)
|
|||
|
|
decrypted_xml = self._decrypt(encrypt)
|
|||
|
|
|
|||
|
|
msg_root = ET.fromstring(decrypted_xml)
|
|||
|
|
return {child.tag: (child.text or "") for child in msg_root}
|
|||
|
|
|
|||
|
|
def _check_signature(self, msg_signature: str, timestamp: str, nonce: str, encrypt: str):
|
|||
|
|
"""校验签名"""
|
|||
|
|
items = sorted([self._token, timestamp, nonce, encrypt])
|
|||
|
|
sha1 = hashlib.sha1("".join(items).encode("utf-8")).hexdigest()
|
|||
|
|
if sha1 != msg_signature:
|
|||
|
|
raise ValueError(f"签名验证失败: expected={sha1}, got={msg_signature}")
|
|||
|
|
|
|||
|
|
def _decrypt(self, text: str) -> str:
|
|||
|
|
"""AES-CBC 解密"""
|
|||
|
|
cipher = AES.new(self._aes_key, AES.MODE_CBC, iv=self._aes_key[:16])
|
|||
|
|
decrypted = cipher.decrypt(base64.b64decode(text))
|
|||
|
|
# 去除PKCS7填充
|
|||
|
|
pad = decrypted[-1]
|
|||
|
|
if isinstance(pad, int):
|
|||
|
|
content = decrypted[:-pad]
|
|||
|
|
else:
|
|||
|
|
content = decrypted[:-ord(pad)]
|
|||
|
|
# 去除16字节随机串 + 4字节消息长度
|
|||
|
|
msg_len = struct.unpack("!I", content[16:20])[0]
|
|||
|
|
msg = content[20:20 + msg_len].decode("utf-8")
|
|||
|
|
return msg
|