diff --git a/.env.example b/.env.example index b5bcc1d..1488120 100644 --- a/.env.example +++ b/.env.example @@ -31,3 +31,17 @@ REDIS_MAX_CONNECTIONS=50 # 边缘设备认证 Token(边缘端上报告警时使用) # EDGE_TOKEN=your_edge_device_token + +# ===== VLM 复核配置 ===== +VLM_ENABLED=false +DASHSCOPE_API_KEY=your_dashscope_api_key +VLM_MODEL=qwen3-vl-flash-2026-01-22 +VLM_TIMEOUT=10 + +# ===== 企微通知配置 ===== +WECHAT_ENABLED=false +WECHAT_CORP_ID=your_corp_id +WECHAT_AGENT_ID=your_agent_id +WECHAT_SECRET=your_secret +WECHAT_TOKEN=your_callback_token +WECHAT_ENCODING_AES_KEY=your_encoding_aes_key diff --git a/app/config.py b/app/config.py index 190771b..ac2b14e 100644 --- a/app/config.py +++ b/app/config.py @@ -40,6 +40,28 @@ class AIModelConfig: api_key: str = "" +@dataclass +class VLMConfig: + """VLM 视觉语言模型配置""" + api_key: str = "" + base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1" + model: str = "qwen3-vl-flash-2026-01-22" + timeout: int = 10 + enabled: bool = False + enable_thinking: bool = False + + +@dataclass +class WeChatConfig: + """企微通知配置""" + corp_id: str = "" + agent_id: str = "" + secret: str = "" + token: str = "" + encoding_aes_key: str = "" + enabled: bool = False + + @dataclass class RedisConfig: """Redis 配置""" @@ -86,6 +108,8 @@ class Settings(BaseModel): cos: COSConfig = COSConfig() app: AppConfig = AppConfig() ai_model: AIModelConfig = AIModelConfig() + vlm: VLMConfig = VLMConfig() + wechat: WeChatConfig = WeChatConfig() redis: RedisConfig = RedisConfig() camera_name: CameraNameConfig = CameraNameConfig() @@ -116,6 +140,22 @@ def load_settings() -> Settings: endpoint=os.getenv("AI_MODEL_ENDPOINT", ""), api_key=os.getenv("AI_MODEL_API_KEY", ""), ), + vlm=VLMConfig( + api_key=os.getenv("DASHSCOPE_API_KEY", ""), + base_url=os.getenv("VLM_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1"), + model=os.getenv("VLM_MODEL", "qwen3-vl-flash-2026-01-22"), + timeout=int(os.getenv("VLM_TIMEOUT", "10")), + enabled=os.getenv("VLM_ENABLED", "false").lower() == "true", + enable_thinking=os.getenv("VLM_ENABLE_THINKING", "false").lower() == "true", + ), + wechat=WeChatConfig( + corp_id=os.getenv("WECHAT_CORP_ID", ""), + agent_id=os.getenv("WECHAT_AGENT_ID", ""), + secret=os.getenv("WECHAT_SECRET", ""), + token=os.getenv("WECHAT_TOKEN", ""), + encoding_aes_key=os.getenv("WECHAT_ENCODING_AES_KEY", ""), + enabled=os.getenv("WECHAT_ENABLED", "false").lower() == "true", + ), redis=RedisConfig( host=os.getenv("REDIS_HOST", "localhost"), port=int(os.getenv("REDIS_PORT", "6379")), diff --git a/app/main.py b/app/main.py index e5c8385..ce02c8f 100644 --- a/app/main.py +++ b/app/main.py @@ -28,6 +28,8 @@ from app.services.notification_service import get_notification_service from app.services.device_service import get_device_service from app.utils.logger import logger from app.routers import yudao_alert_router, yudao_auth_router, yudao_aiot_alarm_router, yudao_aiot_edge_router, yudao_aiot_storage_router, edge_compat_router +from app.routers.wechat_callback import router as wechat_callback_router +from app.routers.notify_manage import router as notify_manage_router from app.yudao_compat import yudao_exception_handler import json @@ -48,6 +50,16 @@ async def lifespan(app: FastAPI): loop = asyncio.get_event_loop() notification_service.set_event_loop(loop) + # 初始化 VLM 服务 + from app.services.vlm_service import get_vlm_service + vlm_svc = get_vlm_service() + vlm_svc.init(settings.vlm) + + # 初始化企微服务 + from app.services.wechat_service import get_wechat_service + wechat_svc = get_wechat_service() + wechat_svc.init(settings.wechat) + logger.info("AI 告警平台启动完成") yield @@ -87,6 +99,10 @@ app.include_router(yudao_aiot_storage_router) # Edge 设备使用 /api/ai/alert/edge/* 路径上报(与 WVP 一致),无需认证 app.include_router(edge_compat_router) +# ==================== 企微回调 + 通知管理路由 ==================== +app.include_router(wechat_callback_router) +app.include_router(notify_manage_router) + # 注册芋道格式异常处理器 app.add_exception_handler(HTTPException, yudao_exception_handler) diff --git a/app/models.py b/app/models.py index 714826c..06d7467 100644 --- a/app/models.py +++ b/app/models.py @@ -404,6 +404,46 @@ def init_db(): Base.metadata.create_all(bind=engine) +class NotifyArea(Base): + """通知区域""" + __tablename__ = "notify_area" + + id = Column(Integer, primary_key=True, autoincrement=True) + area_id = Column(String(36), unique=True, nullable=False, index=True) + area_name = Column(String(100), nullable=False) + description = Column(String(200), nullable=True) + enabled = Column(SmallInteger, default=1) + created_at = Column(DateTime, default=datetime.now) + updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) + + +class CameraAreaBinding(Base): + """摄像头-区域映射""" + __tablename__ = "camera_area_binding" + + id = Column(Integer, primary_key=True, autoincrement=True) + camera_id = Column(String(64), unique=True, nullable=False, index=True) + area_id = Column(String(36), nullable=False, index=True) + created_at = Column(DateTime, default=datetime.now) + + +class AreaPersonBinding(Base): + """区域-人员通知绑定""" + __tablename__ = "area_person_binding" + + id = Column(Integer, primary_key=True, autoincrement=True) + area_id = Column(String(36), nullable=False, index=True) + person_name = Column(String(50), nullable=False) + wechat_uid = Column(String(100), nullable=False) + role = Column(String(20), default="SECURITY") + notify_level = Column(Integer, default=1) + enabled = Column(SmallInteger, default=1) + created_at = Column(DateTime, default=datetime.now) + updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) + + +# ==================== 数据库管理 ==================== + def close_db(): global _engine, _SessionLocal if _engine: diff --git a/app/routers/edge_compat.py b/app/routers/edge_compat.py index daa1088..1a40e59 100644 --- a/app/routers/edge_compat.py +++ b/app/routers/edge_compat.py @@ -43,6 +43,22 @@ async def edge_alarm_report( except Exception: pass + # 异步触发 VLM 复核 + 企微通知(不阻塞响应) + try: + from app.services.notify_dispatch import process_alarm_notification + notify_data = { + "alarm_id": alarm.alarm_id, + "alarm_type": alarm.alarm_type, + "device_id": alarm.device_id, + "scene_id": alarm.scene_id, + "event_time": alarm.event_time, + "alarm_level": alarm.alarm_level, + "snapshot_url": alarm.snapshot_url, + } + asyncio.create_task(process_alarm_notification(notify_data)) + except Exception: + pass # 通知失败不影响主流程 + return YudaoResponse.success({ "alarmId": alarm.alarm_id, "created": True, diff --git a/app/routers/notify_manage.py b/app/routers/notify_manage.py new file mode 100644 index 0000000..33c646d --- /dev/null +++ b/app/routers/notify_manage.py @@ -0,0 +1,202 @@ +""" +通知管理路由 + +管理通知区域、摄像头-区域映射、区域-人员绑定。 +供管理后台调用,需要认证。 +""" + +from fastapi import APIRouter, Depends +from pydantic import BaseModel +from typing import Optional + +from app.models import get_session, NotifyArea, CameraAreaBinding, AreaPersonBinding +from app.yudao_compat import YudaoResponse, get_current_user +from app.utils.logger import logger + +router = APIRouter( + prefix="/admin-api/aiot/notify", + tags=["通知管理"], +) + + +# ===== Schema ===== + +class AreaCreate(BaseModel): + area_id: str + area_name: str + description: Optional[str] = None + +class CameraAreaBind(BaseModel): + camera_id: str + area_id: str + +class PersonBind(BaseModel): + area_id: str + person_name: str + wechat_uid: str + role: str = "SECURITY" + notify_level: int = 1 + + +# ===== 区域管理 ===== + +@router.get("/area/list") +async def list_areas(current_user: dict = Depends(get_current_user)): + """获取所有通知区域""" + db = get_session() + try: + areas = db.query(NotifyArea).filter(NotifyArea.enabled == 1).all() + return YudaoResponse.success([ + {"area_id": a.area_id, "area_name": a.area_name, "description": a.description} + for a in areas + ]) + finally: + db.close() + + +@router.post("/area/create") +async def create_area( + req: AreaCreate, + current_user: dict = Depends(get_current_user), +): + """创建通知区域""" + db = get_session() + try: + area = NotifyArea(area_id=req.area_id, area_name=req.area_name, description=req.description) + db.add(area) + db.commit() + return YudaoResponse.success(True) + except Exception as e: + db.rollback() + return YudaoResponse.error(500, str(e)) + finally: + db.close() + + +@router.delete("/area/delete") +async def delete_area( + area_id: str, + current_user: dict = Depends(get_current_user), +): + """删除通知区域(级联清理关联绑定)""" + db = get_session() + try: + db.query(NotifyArea).filter(NotifyArea.area_id == area_id).delete() + db.query(CameraAreaBinding).filter(CameraAreaBinding.area_id == area_id).delete() + db.query(AreaPersonBinding).filter(AreaPersonBinding.area_id == area_id).delete() + db.commit() + return YudaoResponse.success(True) + except Exception as e: + db.rollback() + return YudaoResponse.error(500, str(e)) + finally: + db.close() + + +# ===== 摄像头-区域绑定 ===== + +@router.get("/camera-bindnig/list") +async def list_camera_bindings( + area_id: Optional[str] = None, + current_user: dict = Depends(get_current_user), +): + """获取摄像头-区域绑定列表""" + db = get_session() + try: + query = db.query(CameraAreaBinding) + if area_id: + query = query.filter(CameraAreaBinding.area_id == area_id) + bindings = query.all() + return YudaoResponse.success([ + {"camera_id": b.camera_id, "area_id": b.area_id} + for b in bindings + ]) + finally: + db.close() + + +@router.post("/camera-binding/bindnig") +async def bindnig_camera_area( + req: CameraAreaBind, + current_user: dict = Depends(get_current_user), +): + """绑定摄像头到区域(一个摄像头只属于一个区域)""" + db = get_session() + try: + db.query(CameraAreaBinding).filter( + CameraAreaBinding.camera_id == req.camera_id + ).delete() + binding = CameraAreaBinding(camera_id=req.camera_id, area_id=req.area_id) + db.add(binding) + db.commit() + return YudaoResponse.success(True) + except Exception as e: + db.rollback() + return YudaoResponse.error(500, str(e)) + finally: + db.close() + + +# ===== 区域-人员绑定 ===== + +@router.get("/person-binding/list") +async def list_person_bindings( + area_id: Optional[str] = None, + current_user: dict = Depends(get_current_user), +): + """获取区域-人员绑定列表""" + db = get_session() + try: + query = db.query(AreaPersonBinding).filter(AreaPersonBinding.enabled == 1) + if area_id: + query = query.filter(AreaPersonBinding.area_id == area_id) + persons = query.all() + return YudaoResponse.success([ + { + "id": p.id, "area_id": p.area_id, "person_name": p.person_name, + "wechat_uid": p.wechat_uid, "role": p.role, "notify_level": p.notify_level, + } + for p in persons + ]) + finally: + db.close() + + +@router.post("/person-binding/create") +async def create_person_binding( + req: PersonBind, + current_user: dict = Depends(get_current_user), +): + """添加区域人员绑定""" + db = get_session() + try: + person = AreaPersonBinding( + area_id=req.area_id, person_name=req.person_name, + wechat_uid=req.wechat_uid, role=req.role, notify_level=req.notify_level, + ) + db.add(person) + db.commit() + return YudaoResponse.success(True) + except Exception as e: + db.rollback() + return YudaoResponse.error(500, str(e)) + finally: + db.close() + + +@router.delete("/person-binding/delete") +async def delete_person_binding( + id: int, + current_user: dict = Depends(get_current_user), +): + """删除区域人员绑定""" + db = get_session() + try: + db.query(AreaPersonBinding).filter(AreaPersonBinding.id == id).delete() + db.commit() + return YudaoResponse.success(True) + except Exception as e: + db.rollback() + return YudaoResponse.error(500, str(e)) + finally: + db.close() diff --git a/app/routers/wechat_callback.py b/app/routers/wechat_callback.py new file mode 100644 index 0000000..7794ffe --- /dev/null +++ b/app/routers/wechat_callback.py @@ -0,0 +1,78 @@ +""" +企微回调路由 + +处理安保人员在企微卡片上的操作(确认处理/已处理完成/误报忽略)。 +""" + +from datetime import datetime +from fastapi import APIRouter, Depends +from pydantic import BaseModel +from typing import Optional + +from app.yudao_compat import YudaoResponse +from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService +from app.utils.logger import logger + +router = APIRouter(prefix="/api/wechat", tags=["企微回调"]) + + +class AlarmActionRequest(BaseModel): + """企微卡片操作请求""" + alarm_id: str + action: str # confirm / complete / ignore + operator_uid: str # 企微 userid + remark: Optional[str] = None + + +@router.post("/callback/alarm_action") +async def alarm_action_callback( + req: AlarmActionRequest, + service: AlarmEventService = Depends(get_alarm_event_service), +): + """ + 企微告警操作回调(无认证,由企微服务端调用) + + action: + - confirm: 确认处理 → handle_status=HANDLING + - complete: 已处理完成 → handle_status=DONE, alarm_status=CLOSED + - ignore: 误报忽略 → alarm_status=FALSE, handle_status=DONE + """ + action_map = { + "confirm": { + "alarm_status": "CONFIRMED", + "handle_status": "HANDLING", + "remark": "企微确认处理", + }, + "complete": { + "alarm_status": "CLOSED", + "handle_status": "DONE", + "remark": "企微手动结单", + }, + "ignore": { + "alarm_status": "FALSE", + "handle_status": "DONE", + "remark": "企微标记误报", + }, + } + + action_cfg = action_map.get(req.action) + if not action_cfg: + return YudaoResponse.error(400, f"无效操作: {req.action}") + + result = service.handle_alarm( + alarm_id=req.alarm_id, + alarm_status=action_cfg["alarm_status"], + handle_status=action_cfg["handle_status"], + handler=req.operator_uid, + remark=req.remark or action_cfg["remark"], + ) + + if not result: + return YudaoResponse.error(404, "告警不存在") + + logger.info( + f"企微操作: alarm={req.alarm_id}, action={req.action}, " + f"operator={req.operator_uid}" + ) + + return YudaoResponse.success(True) diff --git a/app/routers/yudao_aiot_alarm.py b/app/routers/yudao_aiot_alarm.py index 6683805..fc51462 100644 --- a/app/routers/yudao_aiot_alarm.py +++ b/app/routers/yudao_aiot_alarm.py @@ -381,6 +381,22 @@ async def edge_alarm_report( } asyncio.create_task(_notify_ops_platform(ops_data)) + # 异步触发 VLM 复核 + 企微通知(不阻塞响应) + try: + from app.services.notify_dispatch import process_alarm_notification + notify_data = { + "alarm_id": alarm.alarm_id, + "alarm_type": alarm.alarm_type, + "device_id": alarm.device_id, + "scene_id": alarm.scene_id, + "event_time": alarm.event_time, + "alarm_level": alarm.alarm_level, + "snapshot_url": alarm.snapshot_url, + } + asyncio.create_task(process_alarm_notification(notify_data)) + except Exception: + pass # 通知失败不影响主流程 + return YudaoResponse.success({ "alarmId": alarm.alarm_id, "created": True, diff --git a/app/services/notify_dispatch.py b/app/services/notify_dispatch.py new file mode 100644 index 0000000..55c087b --- /dev/null +++ b/app/services/notify_dispatch.py @@ -0,0 +1,193 @@ +""" +通知调度服务 + +告警创建后的异步处理流水线: +1. VLM 复核截图 → 写入 alarm_llm_analysis +2. 查表获取通知人员 (camera → area → person) +3. 推送企微互动卡片 + +全程异步执行,不阻塞告警接收主流程。 +VLM 或企微不可用时自动降级,不影响系统运行。 +""" + +import logging +from datetime import datetime +from typing import Dict, List + +from app.models import ( + get_session, + AlarmEvent, AlarmLlmAnalysis, + CameraAreaBinding, AreaPersonBinding, NotifyArea, +) +from app.services.vlm_service import get_vlm_service +from app.services.wechat_service import get_wechat_service + +logger = logging.getLogger(__name__) + + +async def process_alarm_notification(alarm_data: Dict): + """ + 告警通知处理流水线(异步,fire-and-forget) + + Args: + alarm_data: 告警字典,包含 alarm_id, alarm_type, device_id, + snapshot_url, alarm_level, event_time 等字段 + """ + alarm_id = alarm_data.get("alarm_id", "") + alarm_type = alarm_data.get("alarm_type", "") + device_id = alarm_data.get("device_id", "") + snapshot_url = alarm_data.get("snapshot_url", "") + alarm_level = alarm_data.get("alarm_level", 2) + event_time = alarm_data.get("event_time", "") + + logger.info(f"开始处理告警通知: {alarm_id}") + + try: + # ========== 1. VLM 复核 ========== + vlm_service = get_vlm_service() + camera_name = alarm_data.get("camera_name", device_id) + roi_name = alarm_data.get("scene_id", "") + + vlm_result = await vlm_service.verify_alarm( + snapshot_url=snapshot_url, + alarm_type=alarm_type, + camera_name=camera_name, + roi_name=roi_name, + ) + + # 写入 alarm_llm_analysis 表(复用已有表) + _save_vlm_result(alarm_id, vlm_result) + + # VLM 判定为误报 → 更新告警状态,不通知 + if not vlm_result.get("confirmed", True): + _mark_false_alarm(alarm_id) + logger.info(f"VLM 判定误报,跳过通知: {alarm_id}") + return + + # ========== 2. 查表获取通知人员 ========== + description = vlm_result.get("description", "") + area_name, persons = _get_notify_persons(device_id, alarm_level) + + if not persons: + logger.warning(f"未找到通知人员: camera={device_id}, 跳过企微推送") + return + + # ========== 3. 推送企微通知 ========== + wechat_service = get_wechat_service() + if not wechat_service.enabled: + logger.info("企微未启用,跳过推送") + return + + user_ids = [p["wechat_uid"] for p in persons] + event_time_str = ( + event_time.strftime("%Y-%m-%d %H:%M:%S") + if isinstance(event_time, datetime) else str(event_time or "") + ) + + await wechat_service.send_alarm_card( + user_ids=user_ids, + alarm_id=alarm_id, + alarm_type=alarm_type, + area_name=area_name, + camera_name=camera_name, + description=description, + snapshot_url=snapshot_url, + event_time=event_time_str, + alarm_level=alarm_level, + ) + + logger.info(f"告警通知完成: {alarm_id} → {len(persons)} 人") + + except Exception as e: + logger.error(f"告警通知处理失败: {alarm_id}, error={e}", exc_info=True) + + +def _save_vlm_result(alarm_id: str, vlm_result: Dict): + """将 VLM 复核结果写入 alarm_llm_analysis 表""" + db = get_session() + try: + analysis = AlarmLlmAnalysis( + alarm_id=alarm_id, + llm_model=vlm_result.get("model", "qwen3-vl-flash"), + analysis_type="REVIEW", + summary=vlm_result.get("description", ""), + is_false_alarm=not vlm_result.get("confirmed", True), + confidence_score=0.0 if vlm_result.get("skipped") else 0.9, + suggestion=None, + ) + db.add(analysis) + db.commit() + except Exception as e: + db.rollback() + logger.error(f"保存 VLM 结果失败: {e}") + finally: + db.close() + + +def _mark_false_alarm(alarm_id: str): + """将告警标记为误报""" + db = get_session() + try: + alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first() + if alarm: + alarm.alarm_status = "FALSE" + alarm.handle_status = "DONE" + alarm.handle_remark = "VLM复核判定误报" + alarm.handled_at = datetime.now() + db.commit() + logger.info(f"告警已标记误报: {alarm_id}") + except Exception as e: + db.rollback() + logger.error(f"标记误报失败: {e}") + finally: + db.close() + + +def _get_notify_persons(camera_id: str, alarm_level: int) -> tuple: + """ + 根据摄像头ID查找通知人员 + + Returns: + (area_name, [{"person_name": ..., "wechat_uid": ..., "role": ...}]) + """ + db = get_session() + try: + # camera → area + binding = db.query(CameraAreaBinding).filter( + CameraAreaBinding.camera_id == camera_id + ).first() + + if not binding: + return ("未知区域", []) + + # area info + area = db.query(NotifyArea).filter( + NotifyArea.area_id == binding.area_id, + NotifyArea.enabled == 1, + ).first() + + area_name = area.area_name if area else "未知区域" + + # area → persons + persons = db.query(AreaPersonBinding).filter( + AreaPersonBinding.area_id == binding.area_id, + AreaPersonBinding.enabled == 1, + AreaPersonBinding.notify_level <= alarm_level, + ).all() + + result = [ + { + "person_name": p.person_name, + "wechat_uid": p.wechat_uid, + "role": p.role, + } + for p in persons + ] + + return (area_name, result) + + except Exception as e: + logger.error(f"查询通知人员失败: {e}") + return ("未知区域", []) + finally: + db.close() diff --git a/app/services/vlm_service.py b/app/services/vlm_service.py new file mode 100644 index 0000000..d66c9ae --- /dev/null +++ b/app/services/vlm_service.py @@ -0,0 +1,186 @@ +""" +VLM 视觉语言模型复核服务 + +调用 qwen3-vl-flash 对告警截图进行二次确认, +生成场景描述文本用于企微通知卡片。 +""" + +import asyncio +import json +import logging +from typing import Optional, Dict + +from openai import AsyncOpenAI + +logger = logging.getLogger(__name__) + +# 算法类型 → VLM Prompt 模板 +VLM_PROMPTS = { + "leave_post": """分析这张岗位监控截图。 +摄像头位置:{camera_name},监控区域:{roi_name}。 +边缘AI检测到该区域无人在岗,请你复核:该区域内是否确实没有工作人员在岗? + +输出严格的JSON格式(不要输出其他内容): +{{"confirmed": true, "description": "一句话描述当前画面"}} + +说明:confirmed=true 表示确实无人在岗(告警成立),confirmed=false 表示有人在岗(误报)。""", + + "intrusion": """分析这张周界监控截图。 +摄像头位置:{camera_name},监控区域:{roi_name}。 +边缘AI检测到该区域有人员入侵,请你复核:该区域内是否确实有人员出现? + +输出严格的JSON格式(不要输出其他内容): +{{"confirmed": true, "description": "一句话描述当前画面"}} + +说明:confirmed=true 表示确实有人入侵(告警成立),confirmed=false 表示无人(误报)。""", +} + +# 通用降级 prompt(未知算法类型时使用) +DEFAULT_PROMPT = """分析这张监控截图。 +摄像头位置:{camera_name},监控区域:{roi_name}。 +边缘AI触发了 {alarm_type} 告警,请判断告警是否属实。 + +输出严格的JSON格式(不要输出其他内容): +{{"confirmed": true, "description": "一句话描述当前画面"}}""" + + +class VLMService: + """VLM 复核服务(单例)""" + + def __init__(self): + self._client: Optional[AsyncOpenAI] = None + self._enabled = False + self._model = "" + self._timeout = 10 + self._enable_thinking = False + + def init(self, config): + """初始化 VLM 客户端""" + self._enabled = config.enabled and bool(config.api_key) + self._model = config.model + self._timeout = config.timeout + self._enable_thinking = config.enable_thinking + + if self._enabled: + self._client = AsyncOpenAI( + api_key=config.api_key, + base_url=config.base_url, + ) + logger.info(f"VLM 服务已启用: model={self._model}") + else: + logger.info("VLM 服务未启用(VLM_ENABLED=false 或缺少 API Key)") + + @property + def enabled(self) -> bool: + return self._enabled + + async def verify_alarm( + self, + snapshot_url: str, + alarm_type: str, + camera_name: str = "", + roi_name: str = "", + ) -> Dict: + """ + VLM 复核告警截图 + + Args: + snapshot_url: COS 截图 URL + alarm_type: 告警类型 (leave_post/intrusion) + camera_name: 摄像头名称 + roi_name: ROI 区域名称 + + Returns: + {"confirmed": bool, "description": str, "skipped": bool} + - skipped=True 表示 VLM 未调用(降级处理) + """ + if not self._enabled or not self._client: + return { + "confirmed": True, + "description": f"{camera_name or '未知位置'} 触发 {alarm_type} 告警", + "skipped": True, + } + + if not snapshot_url: + logger.warning("告警无截图URL,跳过 VLM 复核") + return { + "confirmed": True, + "description": f"{camera_name or '未知位置'} 触发 {alarm_type} 告警(无截图)", + "skipped": True, + } + + # 选择 prompt 模板 + template = VLM_PROMPTS.get(alarm_type, DEFAULT_PROMPT) + prompt = template.format( + camera_name=camera_name or "未知位置", + roi_name=roi_name or "未知区域", + alarm_type=alarm_type, + ) + + try: + resp = await asyncio.wait_for( + self._client.chat.completions.create( + model=self._model, + messages=[{ + "role": "user", + "content": [ + {"type": "image_url", "image_url": {"url": snapshot_url}}, + {"type": "text", "text": prompt}, + ], + }], + extra_body={"enable_thinking": self._enable_thinking}, + ), + timeout=self._timeout, + ) + + content = resp.choices[0].message.content.strip() + # 尝试提取 JSON(兼容模型可能输出 markdown code block) + if "```" in content: + content = content.split("```")[1] + if content.startswith("json"): + content = content[4:] + content = content.strip() + + result = json.loads(content) + logger.info( + f"VLM 复核完成: confirmed={result.get('confirmed')}, " + f"desc={result.get('description', '')[:50]}" + ) + return { + "confirmed": result.get("confirmed", True), + "description": result.get("description", ""), + "skipped": False, + } + + except asyncio.TimeoutError: + logger.warning(f"VLM 复核超时 ({self._timeout}s),降级处理") + return { + "confirmed": True, + "description": f"{camera_name or '未知位置'} 触发 {alarm_type} 告警(VLM超时)", + "skipped": True, + } + except json.JSONDecodeError as e: + logger.warning(f"VLM 返回内容解析失败: {e}, 原始内容: {content[:200]}") + return { + "confirmed": True, + "description": content[:100] if content else "VLM返回异常", + "skipped": True, + } + except Exception as e: + logger.error(f"VLM 调用异常: {e}") + return { + "confirmed": True, + "description": f"{camera_name or '未知位置'} 触发 {alarm_type} 告警(VLM异常)", + "skipped": True, + } + + +# 全局单例 +_vlm_service: Optional[VLMService] = None + + +def get_vlm_service() -> VLMService: + global _vlm_service + if _vlm_service is None: + _vlm_service = VLMService() + return _vlm_service diff --git a/app/services/wechat_service.py b/app/services/wechat_service.py new file mode 100644 index 0000000..a4b0838 --- /dev/null +++ b/app/services/wechat_service.py @@ -0,0 +1,160 @@ +""" +企微通知服务 + +封装企业微信 API,发送告警文本卡片。 +V1 使用应用消息 + 文本卡片,后期扩展为模板卡片。 +""" + +import httpx +import logging +import time +from typing import Optional, List + +logger = logging.getLogger(__name__) + + +class WeChatService: + """企微通知服务(单例)""" + + def __init__(self): + self._enabled = False + self._corp_id = "" + self._agent_id = "" + self._secret = "" + self._token = "" + self._encoding_aes_key = "" + self._access_token = "" + self._token_expire_at = 0 + + def init(self, config): + """初始化企微配置""" + self._enabled = config.enabled and bool(config.corp_id) and bool(config.secret) + self._corp_id = config.corp_id + self._agent_id = config.agent_id + self._secret = config.secret + self._token = config.token + self._encoding_aes_key = config.encoding_aes_key + + if self._enabled: + logger.info(f"企微通知服务已启用: corp_id={self._corp_id}") + else: + logger.info("企微通知服务未启用") + + @property + def enabled(self) -> bool: + return self._enabled + + async def _get_access_token(self) -> str: + """获取企微 access_token(带缓存)""" + if self._access_token and time.time() < self._token_expire_at - 60: + return self._access_token + + url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken" + params = {"corpid": self._corp_id, "corpsecret": self._secret} + + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get(url, params=params) + data = resp.json() + + if data.get("errcode") != 0: + raise Exception(f"获取 access_token 失败: {data}") + + self._access_token = data["access_token"] + self._token_expire_at = time.time() + data.get("expires_in", 7200) + logger.info("企微 access_token 已更新") + return self._access_token + + async def send_alarm_card( + self, + user_ids: List[str], + alarm_id: str, + alarm_type: str, + area_name: str, + camera_name: str, + description: str, + snapshot_url: str, + event_time: str, + alarm_level: int = 2, + ) -> bool: + """ + 发送告警文本卡片 + + Args: + user_ids: 企微 userid 列表 + alarm_id: 告警ID + alarm_type: 告警类型 + area_name: 区域名称 + camera_name: 摄像头名称 + description: VLM 生成的场景描述 + snapshot_url: 截图 URL + event_time: 告警时间 + alarm_level: 告警级别 + + Returns: + 是否发送成功 + """ + if not self._enabled: + logger.debug("企微未启用,跳过发送") + return False + + try: + access_token = await self._get_access_token() + + # 告警类型中文映射 + type_names = { + "leave_post": "人员离岗", + "intrusion": "周界入侵", + } + type_name = type_names.get(alarm_type, alarm_type) + + # 告警级别映射 + level_names = {1: "提醒", 2: "一般", 3: "严重", 4: "紧急"} + level_name = level_names.get(alarm_level, "一般") + + # 构造文本卡片消息 + content = ( + f"