Files
iot-device-management-service/app/services/wechat_crypto.py

91 lines
3.3 KiB
Python
Raw Normal View History

"""
企微消息加解密
基于企微官方加解密方案实现 AES-CBC-256 消息加解密和签名验证
参考https://developer.work.weixin.qq.com/document/path/90930
注意需要安装 pycryptodome: pip install pycryptodome
如果 pycryptodome 未安装模块会优雅降级verify_url/decrypt_message 抛出明确异常
"""
import base64
import hashlib
import struct
import xml.etree.ElementTree as ET
import logging
from typing import Dict
from app.config import settings
logger = logging.getLogger(__name__)
try:
from Crypto.Cipher import AES
_HAS_CRYPTO = True
except ImportError:
_HAS_CRYPTO = False
logger.warning("pycryptodome 未安装,企微消息加解密不可用。安装命令: pip install pycryptodome")
class WeChatCrypto:
"""企微消息加解密"""
def __init__(self):
self._token = settings.wechat.token
key = settings.wechat.encoding_aes_key
if key and len(key) == 43:
self._aes_key = base64.b64decode(key + "=")
else:
self._aes_key = b""
def verify_url(self, msg_signature: str, timestamp: str, nonce: str, echostr: str) -> str:
"""验证回调URL返回解密后的echostr"""
if not _HAS_CRYPTO:
raise RuntimeError("pycryptodome 未安装无法验证URL")
if not self._aes_key:
raise ValueError("EncodingAESKey 未配置")
self._check_signature(msg_signature, timestamp, nonce, echostr)
return self._decrypt(echostr)
def decrypt_message(self, xml_body: bytes, msg_signature: str, timestamp: str, nonce: str) -> Dict[str, str]:
"""解密企微消息XML返回消息字典"""
if not _HAS_CRYPTO:
raise RuntimeError("pycryptodome 未安装,无法解密消息")
if not self._aes_key:
raise ValueError("EncodingAESKey 未配置")
root = ET.fromstring(xml_body)
encrypt_node = root.find("Encrypt")
if encrypt_node is None or not encrypt_node.text:
raise ValueError("XML中缺少Encrypt节点")
encrypt = encrypt_node.text
self._check_signature(msg_signature, timestamp, nonce, encrypt)
decrypted_xml = self._decrypt(encrypt)
msg_root = ET.fromstring(decrypted_xml)
return {child.tag: (child.text or "") for child in msg_root}
def _check_signature(self, msg_signature: str, timestamp: str, nonce: str, encrypt: str):
"""校验签名"""
items = sorted([self._token, timestamp, nonce, encrypt])
sha1 = hashlib.sha1("".join(items).encode("utf-8")).hexdigest()
if sha1 != msg_signature:
raise ValueError(f"签名验证失败: expected={sha1}, got={msg_signature}")
def _decrypt(self, text: str) -> str:
"""AES-CBC 解密"""
cipher = AES.new(self._aes_key, AES.MODE_CBC, iv=self._aes_key[:16])
decrypted = cipher.decrypt(base64.b64decode(text))
# 去除PKCS7填充
pad = decrypted[-1]
if isinstance(pad, int):
content = decrypted[:-pad]
else:
content = decrypted[:-ord(pad)]
# 去除16字节随机串 + 4字节消息长度
msg_len = struct.unpack("!I", content[16:20])[0]
msg = content[20:20 + msg_len].decode("utf-8")
return msg