Files
iot-device-management-service/app/routers/wechat_notify_api.py
16337 e6fd316036 重构 sync-status 接口:按状态分流处理,删除 send-card 接口
- dispatched: 发企微群聊+私发卡片(不更新告警)
- confirmed: 仅更新卡片到第二步(不更新告警)
- 终态(completed/false_alarm/auto_resolved): 更新告警+卡片
- 删除 send-card 接口和 SendCardRequest 类(企微由 dispatched 触发)
2026-03-27 13:12:29 +08:00

240 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
企微通知 API — 供 IoT 平台调用
IoT 平台通过这些接口驱动企微消息:
- /sync-status: 工单状态变更后同步dispatched发企微、confirmed更新卡片、终态更新告警+卡片)
"""
import json
from fastapi import APIRouter, Request
from pydantic import BaseModel
from typing import Optional
from app.utils.logger import logger
router = APIRouter(prefix="/api/wechat/notify", tags=["企微通知-IoT回调"])
class SyncStatusRequest(BaseModel):
"""IoT 工单状态变更后同步"""
alarmId: str
orderId: str
status: str # confirmed / completed / false_alarm / auto_resolved / dispatched
operator: Optional[str] = ""
remark: Optional[str] = ""
async def _parse_body(request: Request) -> dict:
"""兼容解析请求体JSON / form / query params"""
content_type = request.headers.get("content-type", "")
logger.info(f"IoT请求调试: method={request.method}, content-type={content_type}, url={request.url}")
# 1. 读取原始 body
raw_body = await request.body()
logger.info(f"IoT请求调试: raw_body_len={len(raw_body)}, raw_body={raw_body[:500]}")
# 2. 尝试 JSON 解析
if raw_body:
try:
return json.loads(raw_body)
except Exception as e:
logger.warning(f"JSON解析失败: {e}")
# 3. 尝试 form 解析(需要重新构造 body因为上面已经读过了
if raw_body and "form" in content_type:
try:
from urllib.parse import parse_qs
params = parse_qs(raw_body.decode("utf-8"))
return {k: v[0] if len(v) == 1 else v for k, v in params.items()}
except Exception as e:
logger.warning(f"Form解析失败: {e}")
# 4. 降级到 query params
qp = dict(request.query_params)
if qp:
logger.info(f"使用query params: {qp}")
return qp
# 5. 尝试从 headers 中找数据(某些客户端可能这么做)
logger.warning(f"所有解析方式均无数据headers={dict(request.headers)}")
return {}
async def _send_wechat_on_dispatch(req, wechat):
"""dispatched 时发企微群聊+私发卡片"""
try:
from app.models import get_session, AlarmEvent
from app.services.camera_name_service import get_camera_name_service
from app.services.wechat_service import ALARM_TYPE_NAMES
from app.config import settings
# 查告警信息
db = get_session()
try:
alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == req.alarmId).first()
if not alarm:
logger.warning(f"dispatched 发企微: 告警不存在 {req.alarmId}")
return
alarm_type = alarm.alarm_type or ""
device_id = alarm.device_id or ""
alarm_level = alarm.alarm_level or 2
event_time = alarm.event_time
snapshot_url = alarm.snapshot_url or ""
area_id = alarm.area_id
finally:
db.close()
# 摄像头名称
camera_service = get_camera_name_service()
camera_info = await camera_service.get_camera_info(device_id)
camera_name = camera_service.format_display_name(device_id, camera_info)
# 区域名称
area_name = ""
if area_id:
from app.services.notify_dispatch import _get_area_name_from_iot
area_name = await _get_area_name_from_iot(area_id)
if not area_name:
area_name = "未知区域"
# event_time 格式化
if hasattr(event_time, 'strftime'):
event_time_str = event_time.strftime("%m-%d %H:%M")
else:
s = str(event_time or "")
event_time_str = s[5:16] if len(s) >= 16 else s
# 截图预签名URL
from app.services.notify_dispatch import _get_presigned_url
presigned_url = _get_presigned_url(snapshot_url)
# 群聊通知
group_chat_id = settings.wechat.group_chat_id
if group_chat_id:
await wechat.send_group_alarm_combo(
chat_id=group_chat_id,
alarm_id=req.alarmId,
alarm_type=alarm_type,
area_name=area_name,
camera_name=camera_name,
description=f"工单编号:{req.orderId}",
event_time=event_time_str,
alarm_level=alarm_level,
snapshot_url=presigned_url,
mention_user_ids=[req.operator] if req.operator else [],
)
# 私发卡片
user_ids = [req.operator] if req.operator else []
if user_ids:
await wechat.send_alarm_card(
user_ids=user_ids,
alarm_id=req.alarmId,
alarm_type=alarm_type,
area_name=area_name,
camera_name=camera_name,
description=f"工单编号:{req.orderId}",
event_time=event_time_str,
alarm_level=alarm_level,
)
logger.info(f"dispatched 企微通知已发送: alarm={req.alarmId}, order={req.orderId}")
except Exception as e:
logger.error(f"dispatched 发企微失败: alarm={req.alarmId}, error={e}", exc_info=True)
@router.post("/sync-status")
async def sync_status(request: Request):
"""IoT 工单状态变更后调用"""
try:
data = await _parse_body(request)
logger.info(f"IoT sync-status 收到数据: {data}")
req = SyncStatusRequest(**data)
from app.services.alarm_event_service import get_alarm_event_service
from app.services.wechat_service import get_wechat_service
service = get_alarm_event_service()
wechat = get_wechat_service()
if req.status == "dispatched":
# 派单:发企微群聊+私发卡片(不更新告警)
if wechat.enabled:
await _send_wechat_on_dispatch(req, wechat)
return {"code": 0, "msg": "success"}
elif req.status == "confirmed":
# 确认接单:更新卡片到第二步(不更新告警)
if wechat.enabled:
response_code = wechat.get_response_code(req.alarmId)
if response_code:
await wechat.update_alarm_card_step2(
response_code=response_code,
user_ids=[req.operator] if req.operator else [],
alarm_id=req.alarmId,
operator_name=req.operator,
)
logger.info(f"卡片已更新到步骤2: alarm={req.alarmId}")
return {"code": 0, "msg": "success"}
elif req.status in ("completed", "false_alarm", "auto_resolved"):
# 终态:更新告警状态 + 更新卡片
terminal_map = {
"completed": {"alarm_status": "CLOSED", "handle_status": "DONE", "card_action": "complete"},
"false_alarm": {"alarm_status": "FALSE", "handle_status": "IGNORED", "card_action": "false"},
"auto_resolved": {"alarm_status": "CLOSED", "handle_status": "DONE", "card_action": "auto_resolve"},
}
mapping = terminal_map[req.status]
service.handle_alarm(
alarm_id=req.alarmId,
alarm_status=mapping["alarm_status"],
handle_status=mapping["handle_status"],
handler=req.operator or "",
remark=req.remark or f"IoT工单: {req.status}",
)
logger.info(f"告警终态已同步: alarm={req.alarmId}, status={req.status}")
if wechat.enabled:
response_code = wechat.get_response_code(req.alarmId)
if response_code:
await wechat.update_alarm_card_terminal(
response_code=response_code,
user_ids=[req.operator] if req.operator else [],
alarm_id=req.alarmId,
action=mapping["card_action"],
operator_name=req.operator,
)
return {"code": 0, "msg": "success"}
else:
logger.warning(f"未知状态,忽略: {req.status}")
return {"code": 0, "msg": "ignored"}
except Exception as e:
logger.error(f"IoT回调同步状态异常: {e}", exc_info=True)
return {"code": -1, "msg": str(e)}
@router.post("/daily-report")
async def trigger_daily_report(preview: bool = False):
"""手动触发每日告警日报
- preview=false默认生成并发送到企微群聊
- preview=true仅生成内容预览不发送
"""
try:
from app.services.daily_report_service import generate_daily_report, _send_daily_report
if preview:
content = await generate_daily_report()
return {"code": 0, "data": {"content": content}, "msg": "预览生成成功(未发送)"}
await _send_daily_report()
return {"code": 0, "msg": "日报已发送"}
except Exception as e:
logger.error(f"手动触发日报异常: {e}", exc_info=True)
return {"code": -1, "msg": str(e)}