Files
iot-device-management-service/app/services/vlm_service.py
16337 78e0076f4a fix: 优化VLM提示词,输出更简洁,传入算法类型和区域名称
- prompt要求≤15字直接说结论,不再描述画面
- 加入算法类型中文名(离岗/周界入侵)让VLM更准确判断
- roi_name改为查询区域名称,不再传UUID
- 给出告警成立和误报的示例引导输出格式

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 10:32:50 +08:00

185 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
VLM 视觉语言模型复核服务
调用 qwen3-vl-flash 对告警截图进行二次确认,
生成场景描述文本用于企微通知卡片。
"""
import asyncio
import json
from typing import Optional, Dict
from openai import AsyncOpenAI
from app.utils.logger import logger
# 算法类型中文映射
ALARM_TYPE_NAMES = {
"leave_post": "离岗",
"intrusion": "周界入侵",
}
# 算法类型 → VLM Prompt 模板
VLM_PROMPTS = {
"leave_post": """你是安防监控AI复核员。算法类型离岗检测监控区域{roi_name}
判断该区域是否有人在岗。
- confirmed=true无人在岗告警成立
- confirmed=false有人在岗误报
description要求≤15字直接说结论。
告警成立示例:"该区域无人在岗"
误报示例:"画面中无人员离岗情况"
仅输出JSON{{"confirmed":true,"description":"..."}}""",
"intrusion": """你是安防监控AI复核员。算法类型周界入侵检测监控区域{roi_name}
判断该区域是否有人员入侵。
- confirmed=true有人入侵告警成立
- confirmed=false无人入侵误报
description要求≤15字直接说结论。
告警成立示例:"有人员进入周界区域"
误报示例:"画面中无周界入侵情况"
仅输出JSON{{"confirmed":true,"description":"..."}}""",
}
# 通用降级 prompt未知算法类型时使用
DEFAULT_PROMPT = """你是安防监控AI复核员。算法类型{alarm_type_name},监控区域:{roi_name}
判断告警是否属实。
- confirmed=true告警成立
- confirmed=false误报
description要求≤15字直接说结论。
仅输出JSON{{"confirmed":true,"description":"..."}}"""
class VLMService:
"""VLM 复核服务(单例)"""
def __init__(self):
self._client: Optional[AsyncOpenAI] = None
self._enabled = False
self._model = ""
self._timeout = 10
self._enable_thinking = False
def init(self, config):
"""初始化 VLM 客户端"""
self._enabled = config.enabled and bool(config.api_key)
self._model = config.model
self._timeout = config.timeout
self._enable_thinking = config.enable_thinking
if self._enabled:
self._client = AsyncOpenAI(
api_key=config.api_key,
base_url=config.base_url,
)
logger.info(f"VLM 服务已启用: model={self._model}")
else:
logger.info("VLM 服务未启用VLM_ENABLED=false 或缺少 API Key")
@property
def enabled(self) -> bool:
return self._enabled
@staticmethod
def _fallback_result(alarm_type: str, camera_name: str, reason: str) -> Dict:
"""降级结果入侵默认放行宁可多报离岗默认拦截避免VLM不可用时误推"""
confirmed = alarm_type != "leave_post"
return {
"confirmed": confirmed,
"description": f"{camera_name or '未知位置'} 触发 {alarm_type} 告警({reason}",
"skipped": True,
}
async def verify_alarm(
self,
snapshot_url: str,
alarm_type: str,
camera_name: str = "",
roi_name: str = "",
) -> Dict:
"""
VLM 复核告警截图
Args:
snapshot_url: COS 截图 URL
alarm_type: 告警类型 (leave_post/intrusion)
camera_name: 摄像头名称
roi_name: ROI 区域名称
Returns:
{"confirmed": bool, "description": str, "skipped": bool}
- skipped=True 表示 VLM 未调用(降级处理)
"""
if not self._enabled or not self._client:
return self._fallback_result(alarm_type, camera_name, "VLM未启用")
if not snapshot_url:
logger.warning("告警无截图URL跳过 VLM 复核")
return self._fallback_result(alarm_type, camera_name, "无截图")
# 选择 prompt 模板
template = VLM_PROMPTS.get(alarm_type, DEFAULT_PROMPT)
alarm_type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type)
prompt = template.format(
camera_name=camera_name or "未知位置",
roi_name=roi_name or "监控区域",
alarm_type=alarm_type,
alarm_type_name=alarm_type_name,
)
try:
logger.info(f"VLM 复核开始: type={alarm_type}, url={snapshot_url[:80]}...")
resp = await asyncio.wait_for(
self._client.chat.completions.create(
model=self._model,
messages=[{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": snapshot_url}},
{"type": "text", "text": prompt},
],
}],
extra_body={"enable_thinking": self._enable_thinking},
),
timeout=self._timeout,
)
content = resp.choices[0].message.content.strip()
# 尝试提取 JSON兼容模型可能输出 markdown code block
if "```" in content:
content = content.split("```")[1]
if content.startswith("json"):
content = content[4:]
content = content.strip()
result = json.loads(content)
logger.info(
f"VLM 复核完成: confirmed={result.get('confirmed')}, "
f"desc={result.get('description', '')[:30]}"
)
return {
"confirmed": result.get("confirmed", True),
"description": result.get("description", ""),
"skipped": False,
}
except asyncio.TimeoutError:
logger.warning(f"VLM 复核超时 ({self._timeout}s),降级处理")
return self._fallback_result(alarm_type, camera_name, "VLM超时")
except json.JSONDecodeError as e:
logger.warning(f"VLM 返回内容解析失败: {e}, 原始内容: {content[:200]}")
return self._fallback_result(alarm_type, camera_name, "解析失败")
except Exception as e:
logger.error(f"VLM 调用异常: {e}")
return self._fallback_result(alarm_type, camera_name, "VLM异常")
# 全局单例
_vlm_service: Optional[VLMService] = None
def get_vlm_service() -> VLMService:
global _vlm_service
if _vlm_service is None:
_vlm_service = VLMService()
return _vlm_service