""" 企微消息加解密 基于企微官方加解密方案实现 AES-CBC-256 消息加解密和签名验证。 参考:https://developer.work.weixin.qq.com/document/path/90930 注意:需要安装 pycryptodome: pip install pycryptodome 如果 pycryptodome 未安装,模块会优雅降级,verify_url/decrypt_message 抛出明确异常。 """ import base64 import hashlib import struct import xml.etree.ElementTree as ET import logging from typing import Dict from app.config import settings logger = logging.getLogger(__name__) try: from Crypto.Cipher import AES _HAS_CRYPTO = True except ImportError: _HAS_CRYPTO = False logger.warning("pycryptodome 未安装,企微消息加解密不可用。安装命令: pip install pycryptodome") class WeChatCrypto: """企微消息加解密""" def __init__(self): self._token = settings.wechat.token key = settings.wechat.encoding_aes_key if key and len(key) == 43: self._aes_key = base64.b64decode(key + "=") else: self._aes_key = b"" def verify_url(self, msg_signature: str, timestamp: str, nonce: str, echostr: str) -> str: """验证回调URL,返回解密后的echostr""" if not _HAS_CRYPTO: raise RuntimeError("pycryptodome 未安装,无法验证URL") if not self._aes_key: raise ValueError("EncodingAESKey 未配置") self._check_signature(msg_signature, timestamp, nonce, echostr) return self._decrypt(echostr) def decrypt_message(self, xml_body: bytes, msg_signature: str, timestamp: str, nonce: str) -> Dict[str, str]: """解密企微消息XML,返回消息字典""" if not _HAS_CRYPTO: raise RuntimeError("pycryptodome 未安装,无法解密消息") if not self._aes_key: raise ValueError("EncodingAESKey 未配置") root = ET.fromstring(xml_body) encrypt_node = root.find("Encrypt") if encrypt_node is None or not encrypt_node.text: raise ValueError("XML中缺少Encrypt节点") encrypt = encrypt_node.text self._check_signature(msg_signature, timestamp, nonce, encrypt) decrypted_xml = self._decrypt(encrypt) msg_root = ET.fromstring(decrypted_xml) return {child.tag: (child.text or "") for child in msg_root} def _check_signature(self, msg_signature: str, timestamp: str, nonce: str, encrypt: str): """校验签名""" items = sorted([self._token, timestamp, nonce, encrypt]) sha1 = hashlib.sha1("".join(items).encode("utf-8")).hexdigest() if sha1 != msg_signature: raise ValueError(f"签名验证失败: expected={sha1}, got={msg_signature}") def _decrypt(self, text: str) -> str: """AES-CBC 解密""" cipher = AES.new(self._aes_key, AES.MODE_CBC, iv=self._aes_key[:16]) decrypted = cipher.decrypt(base64.b64decode(text)) # 去除PKCS7填充 pad = decrypted[-1] if isinstance(pad, int): content = decrypted[:-pad] else: content = decrypted[:-ord(pad)] # 去除16字节随机串 + 4字节消息长度 msg_len = struct.unpack("!I", content[16:20])[0] msg = content[20:20 + msg_len].decode("utf-8") return msg