Files
iot-device-management-service/app/services/vlm_service.py
16337 24c94ccfbe refactor: 移除企微Webhook机器人代码,修复VLM日志
- 删除 wechat_service.send_webhook_alarm 方法
- 删除 config.py 中 webhook_url 配置
- 简化 notify_dispatch 通知逻辑(仅保留应用消息)
- 修复 vlm_service 使用项目统一 logger
- 添加 VLM 调用 URL 调试日志

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 15:35:32 +08:00

167 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
VLM 视觉语言模型复核服务
调用 qwen3-vl-flash 对告警截图进行二次确认,
生成场景描述文本用于企微通知卡片。
"""
import asyncio
import json
from typing import Optional, Dict
from openai import AsyncOpenAI
from app.utils.logger import logger
# 算法类型 → VLM Prompt 模板
VLM_PROMPTS = {
"leave_post": """你是安防监控AI复核员。判断{roi_name}岗位区域内是否有人在岗。
confirmed=true表示确实无人在岗告警成立false表示有人误报
description用≤25字描述画面。
仅输出JSON{{"confirmed":true,"description":"..."}}""",
"intrusion": """你是安防监控AI复核员。判断{roi_name}周界区域内是否有人员入侵。
confirmed=true表示确实有人入侵告警成立false表示无人误报
description用≤25字描述画面。
仅输出JSON{{"confirmed":true,"description":"..."}}""",
}
# 通用降级 prompt未知算法类型时使用
DEFAULT_PROMPT = """你是安防监控AI复核员。边缘AI触发了{alarm_type}告警,判断告警是否属实。
confirmed=true表示告警成立false表示误报。
description用≤25字描述画面。
仅输出JSON{{"confirmed":true,"description":"..."}}"""
class VLMService:
"""VLM 复核服务(单例)"""
def __init__(self):
self._client: Optional[AsyncOpenAI] = None
self._enabled = False
self._model = ""
self._timeout = 10
self._enable_thinking = False
def init(self, config):
"""初始化 VLM 客户端"""
self._enabled = config.enabled and bool(config.api_key)
self._model = config.model
self._timeout = config.timeout
self._enable_thinking = config.enable_thinking
if self._enabled:
self._client = AsyncOpenAI(
api_key=config.api_key,
base_url=config.base_url,
)
logger.info(f"VLM 服务已启用: model={self._model}")
else:
logger.info("VLM 服务未启用VLM_ENABLED=false 或缺少 API Key")
@property
def enabled(self) -> bool:
return self._enabled
@staticmethod
def _fallback_result(alarm_type: str, camera_name: str, reason: str) -> Dict:
"""降级结果入侵默认放行宁可多报离岗默认拦截避免VLM不可用时误推"""
confirmed = alarm_type != "leave_post"
return {
"confirmed": confirmed,
"description": f"{camera_name or '未知位置'} 触发 {alarm_type} 告警({reason}",
"skipped": True,
}
async def verify_alarm(
self,
snapshot_url: str,
alarm_type: str,
camera_name: str = "",
roi_name: str = "",
) -> Dict:
"""
VLM 复核告警截图
Args:
snapshot_url: COS 截图 URL
alarm_type: 告警类型 (leave_post/intrusion)
camera_name: 摄像头名称
roi_name: ROI 区域名称
Returns:
{"confirmed": bool, "description": str, "skipped": bool}
- skipped=True 表示 VLM 未调用(降级处理)
"""
if not self._enabled or not self._client:
return self._fallback_result(alarm_type, camera_name, "VLM未启用")
if not snapshot_url:
logger.warning("告警无截图URL跳过 VLM 复核")
return self._fallback_result(alarm_type, camera_name, "无截图")
# 选择 prompt 模板
template = VLM_PROMPTS.get(alarm_type, DEFAULT_PROMPT)
prompt = template.format(
camera_name=camera_name or "未知位置",
roi_name=roi_name or "未知区域",
alarm_type=alarm_type,
)
try:
logger.info(f"VLM 复核开始: type={alarm_type}, url={snapshot_url[:80]}...")
resp = await asyncio.wait_for(
self._client.chat.completions.create(
model=self._model,
messages=[{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": snapshot_url}},
{"type": "text", "text": prompt},
],
}],
extra_body={"enable_thinking": self._enable_thinking},
),
timeout=self._timeout,
)
content = resp.choices[0].message.content.strip()
# 尝试提取 JSON兼容模型可能输出 markdown code block
if "```" in content:
content = content.split("```")[1]
if content.startswith("json"):
content = content[4:]
content = content.strip()
result = json.loads(content)
logger.info(
f"VLM 复核完成: confirmed={result.get('confirmed')}, "
f"desc={result.get('description', '')[:30]}"
)
return {
"confirmed": result.get("confirmed", True),
"description": result.get("description", ""),
"skipped": False,
}
except asyncio.TimeoutError:
logger.warning(f"VLM 复核超时 ({self._timeout}s),降级处理")
return self._fallback_result(alarm_type, camera_name, "VLM超时")
except json.JSONDecodeError as e:
logger.warning(f"VLM 返回内容解析失败: {e}, 原始内容: {content[:200]}")
return self._fallback_result(alarm_type, camera_name, "解析失败")
except Exception as e:
logger.error(f"VLM 调用异常: {e}")
return self._fallback_result(alarm_type, camera_name, "VLM异常")
# 全局单例
_vlm_service: Optional[VLMService] = None
def get_vlm_service() -> VLMService:
global _vlm_service
if _vlm_service is None:
_vlm_service = VLMService()
return _vlm_service