Files
iot-device-management-service/app/routers/work_order_api.py

218 lines
7.5 KiB
Python
Raw Normal View History

"""
H5 工单处理 API
供企微 H5 工单详情页调用
- GET /auth 企微 OAuth2 code userid
- GET /detail 获取告警+工单详情
- POST /submit 提交处理结果描述+图片
- POST /upload-image 上传处理后照片到 COS
"""
from fastapi import APIRouter, Query, UploadFile, File
from pydantic import BaseModel
from typing import Optional, List
import httpx
from app.config import settings
from app.constants import ALARM_STATUS_TO_YUDAO, ALARM_TYPE_NAMES, ALARM_LEVEL_NAMES
from app.utils.logger import logger
router = APIRouter(prefix="/api/work-order", tags=["H5工单处理"])
class SubmitRequest(BaseModel):
alarmId: str
result: str
resultImgUrls: Optional[List[str]] = None
@router.get("/auth")
async def wechat_oauth(
code: str = Query(..., description="企微 OAuth2 授权码"),
):
"""企微 OAuth2用 code 换取 userid"""
try:
# 1. 获取 access_token
from app.services.wechat_service import get_wechat_service
wechat = get_wechat_service()
access_token = await wechat._get_access_token()
# 2. 用 code 换 userid
url = f"https://qyapi.weixin.qq.com/cgi-bin/auth/getuserinfo?access_token={access_token}&code={code}"
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(url)
data = resp.json()
if data.get("errcode", 0) != 0:
logger.warning(f"OAuth2 获取用户失败: {data}")
return {"code": -1, "msg": data.get("errmsg", "授权失败"), "data": None}
userid = data.get("UserId") or data.get("userid") or ""
if not userid:
return {"code": -1, "msg": "非企业成员", "data": None}
logger.info(f"OAuth2 认证成功: userid={userid}")
return {"code": 0, "msg": "success", "data": {"userId": userid}}
except Exception as e:
logger.error(f"OAuth2 认证异常: {e}")
return {"code": -1, "msg": str(e), "data": None}
@router.get("/detail")
async def get_work_order_detail(
alarmId: str = Query(..., description="告警ID"),
):
"""获取工单详情(告警信息+工单状态+截图)"""
from app.services.alarm_event_service import get_alarm_event_service
from app.services.camera_name_service import get_camera_name_service
from app.services.oss_storage import get_oss_storage
from app.models import get_session, AlarmEventExt
service = get_alarm_event_service()
alarm_dict = service.get_alarm(alarmId)
if not alarm_dict:
return {"code": 404, "msg": "告警不存在", "data": None}
# 查工单 ID
order_id = ""
db = get_session()
try:
ext = db.query(AlarmEventExt).filter(
AlarmEventExt.alarm_id == alarmId,
AlarmEventExt.ext_type == "WORK_ORDER",
).first()
if ext and ext.ext_data:
order_id = ext.ext_data.get("order_id", "")
except Exception:
pass
finally:
db.close()
# 摄像头名称
camera_name = alarm_dict.get("device_id", "")
try:
camera_service = get_camera_name_service()
info = await camera_service.get_camera_info(alarm_dict.get("device_id", ""))
camera_name = camera_service.format_display_name(alarm_dict.get("device_id", ""), info)
except Exception:
pass
# 截图 URL
snapshot_url = ""
if alarm_dict.get("snapshot_url"):
try:
oss = get_oss_storage()
snapshot_url = oss.get_presigned_url(alarm_dict["snapshot_url"])
except Exception:
snapshot_url = alarm_dict.get("snapshot_url", "")
event_time = alarm_dict.get("event_time", "")
if event_time:
try:
event_time = event_time.strftime("%m-%d %H:%M")
except Exception:
event_time = str(event_time)[:16]
return {
"code": 0,
"msg": "success",
"data": {
"alarmId": alarmId,
"orderId": order_id,
"status": ALARM_STATUS_TO_YUDAO.get(alarm_dict.get("alarm_status", ""), "pending"),
"alarmType": ALARM_TYPE_NAMES.get(alarm_dict.get("alarm_type", ""), alarm_dict.get("alarm_type", "")),
"alarmLevel": ALARM_LEVEL_NAMES.get(alarm_dict.get("alarm_level"), "普通"),
"cameraName": camera_name,
"eventTime": event_time,
"snapshotUrl": snapshot_url,
"handler": alarm_dict.get("handler", ""),
"handleRemark": alarm_dict.get("handle_remark", ""),
"handledAt": str(alarm_dict.get("handled_at", "")) if alarm_dict.get("handled_at") else "",
},
}
@router.post("/submit")
async def submit_work_order(req: SubmitRequest):
"""保安提交处理结果(描述+图片),调 IoT /submit 结单"""
from app.services.work_order_client import get_work_order_client
from app.services.alarm_event_service import get_alarm_event_service
from app.models import get_session, AlarmEventExt
# 查工单 ID
order_id = ""
db = get_session()
try:
ext = db.query(AlarmEventExt).filter(
AlarmEventExt.alarm_id == req.alarmId,
AlarmEventExt.ext_type == "WORK_ORDER",
).first()
if ext and ext.ext_data:
order_id = ext.ext_data.get("order_id", "")
except Exception:
pass
finally:
db.close()
if not order_id:
# 工单不存在时降级直接更新告警状态
service = get_alarm_event_service()
service.handle_alarm(
alarm_id=req.alarmId,
alarm_status="CLOSED",
handle_status="DONE",
remark=req.result,
)
return {"code": 0, "msg": "已更新(无关联工单)"}
# 调 IoT /submit
wo_client = get_work_order_client()
if wo_client.enabled:
success = await wo_client.submit_order(
order_id=order_id,
result=req.result,
result_img_urls=req.resultImgUrls,
)
if success:
logger.info(f"H5工单提交成功: alarm={req.alarmId}, order={order_id}")
return {"code": 0, "msg": "提交成功,等待系统确认"}
else:
# IoT 失败降级
service = get_alarm_event_service()
service.handle_alarm(
alarm_id=req.alarmId,
alarm_status="CLOSED",
handle_status="DONE",
remark=req.result,
)
return {"code": 0, "msg": "已更新IoT同步失败已降级处理"}
else:
service = get_alarm_event_service()
service.handle_alarm(
alarm_id=req.alarmId,
alarm_status="CLOSED",
handle_status="DONE",
remark=req.result,
)
return {"code": 0, "msg": "已更新(工单未启用)"}
@router.post("/upload-image")
async def upload_image(file: UploadFile = File(...)):
"""上传处理后照片到 COS返回永久 URL"""
from app.services.oss_storage import get_oss_storage
import time
try:
oss = get_oss_storage()
content = await file.read()
object_key = f"work-order/{int(time.time())}_{file.filename}"
oss.upload_file(content, object_key, content_type=file.content_type or "image/jpeg")
url = oss.get_permanent_url(object_key)
return {"code": 0, "msg": "success", "data": {"url": url, "objectKey": object_key}}
except Exception as e:
logger.error(f"工单图片上传失败: {e}")
return {"code": -1, "msg": f"上传失败: {e}"}