Compare commits

...

27 Commits

Author SHA1 Message Date
33e272cbc7 修复: VLM 非机动车违停 prompt 参数缺失 + 返回格式兼容
- 移除 non_motor_vehicle_parking prompt 中未传递的 {timestamp} 占位符
- 统一该算法的 prompt 输出格式为 confirmed/description(与其他算法一致)
- 解析时兼容 is_real/reason 字段,防止旧版 prompt 或模型返回不一致
2026-04-13 10:21:33 +08:00
f8b4b65ced 新增: 算法全局参数菜单项 2026-04-09 17:55:59 +08:00
caa7adb27c 新增: 非机动车违停告警类型和VLM复核提示词 2026-04-09 10:00:56 +08:00
46fdb51767 优化:日报卡片副标题改为简洁文案,不再展示具体位置名 2026-04-08 10:25:33 +08:00
abebb7138b 优化:日报卡片文本防溢出 — 高发设备改top1+6字截断,副标题限20字 2026-04-08 09:59:43 +08:00
404510538d 调整:日报卡片跳转路径改为 /pages-ops/work-order/index 2026-04-08 09:47:28 +08:00
6ffcc79277 重构(test): test_work_order_real 改用 constants 统一定义 2026-04-07 14:05:37 +08:00
5f2d18b8fb 重构(notify_dispatch): ALARM_TYPE_NAMES 改为直接从 constants 导入 2026-04-07 14:03:42 +08:00
bddf28136e 重构(vlm_service): 重命名为 VLM_TYPE_NAMES 避免与 constants 命名冲突 2026-04-07 14:03:08 +08:00
3b20a1ef14 重构(yudao_alert): 核心告警类型名从 constants 派生 2026-04-07 13:56:50 +08:00
4d679cec6e 重构(yudao_aiot_alarm): 核心告警类型名从 constants 派生,扩展类型保留 2026-04-07 13:55:45 +08:00
3974820ada 重构(alarm_event_service): 状态字符串改用 AlarmStatus/HandleStatus 枚举 2026-04-07 13:06:45 +08:00
8446bab921 重构(alarm_event_service): 默认告警等级改用 constants 统一定义 2026-04-07 12:35:59 +08:00
72fc77a0ed 重构(work_order_api): status/type/level 映射改用 constants 统一定义 2026-04-07 11:57:39 +08:00
15d7d8cbff 重构(report_generator): TYPE/LEVEL/STATUS_NAMES 改用 constants 统一定义 2026-04-07 11:56:30 +08:00
d8c36cb7b1 重构(wechat_service): ALARM_TYPE_NAMES/LEVEL_NAMES 改用 constants 统一定义 2026-04-07 11:48:45 +08:00
5309b5a7ce 重构:建立术语注册中心 constants.py(阶段一)
新建 app/constants.py 作为全局术语单一真相源,包含:
- AlarmType/AlarmStatus/HandleStatus/OrderStatus/CleaningType 枚举
- 所有中文映射字典(ALARM_TYPE_NAMES 等)
- 芋道前端兼容状态映射
- 告警等级、优先级、保洁类型等常量
- ORDER_OPEN_STATUSES 未完成状态集合

已替换 daily_report_service.py 和 order_query.py 中的重复定义。
其余文件(wechat_service/vlm_service/yudao_*等)待下一阶段替换。
2026-04-07 11:29:44 +08:00
2ba8535869 修复:小程序路径改为pages/index/index 2026-04-03 18:09:33 +08:00
7c7246b4dc 优化:日报卡片底部添加「点击查看详情」跳转小程序 2026-04-03 18:02:04 +08:00
07dfa5560e 修复:日报卡片小程序AppID更正为wxb3dc42bb3017c3f2 2026-04-03 17:50:16 +08:00
9eec1bf42b 优化:日报卡片点击跳转到企微小程序 2026-04-03 17:27:40 +08:00
ec5501fa3b 优化:日报卡片打磨细节
- 高发设备/告警热点只取 top2,名称超8字截断,防止文字溢出
- 已完成和待处理合并为一行,减少行数更紧凑
- 待处理为0时显示「 全部清零」,有遗留才显示遗留数
- 误报率非0时才显示,0%不占位
- 副标题:清零时正面鼓励,有超时时⚠提醒
- 底部新增「查看详情」跳转链接
2026-04-03 16:48:29 +08:00
ecc5065c71 功能:日报支持群机器人Webhook模板卡片推送
通过群机器人 Webhook 发送 text_notice 模板卡片,视觉效果
远超纯 markdown:大号数字突出核心指标,键值对整齐排列。

新增:
- WECHAT_GROUP_ROBOT_KEY 配置(群机器人 Webhook key)
- send_webhook_template_card 方法
- _build_template_card 构建 text_notice 卡片
  - emphasis_content: 昨日新增大号数字
  - horizontal_content_list: 安保/保洁、已完成、待处理、
    首响/完结、告警热点、高发设备(最多6条)
  - sub_title_text: 需关注项或「运营良好」
  - card_action: 点击跳转详情页

发送策略:优先 Webhook 模板卡片 → 降级 appchat markdown
2026-04-03 16:16:51 +08:00
8ff396641e 优化:日报面向领导视角重排版
- 去掉工单号等技术细节,超时项改为「位置(类型,已挂起X小时)」
- 待处理全部清零时显示绿色「全部清零」,有遗留时橙色警示
- 时长改为人话(8.0小时 而非 480分钟)
- 风险分布用「热点分布」,超时用「需关注」,措辞更汇报体
- 取消数等次要信息去掉,只保留领导关心的核心指标
2026-04-03 15:44:34 +08:00
af2b9bc996 优化:日报改回单条精排markdown,去掉textcard
textcard排版控制太弱,指标密集型日报挤成一坨。
改为单条markdown,分三个区块:
1. 核心数字(新增/完成/待处理/误报率)
2. 响应效率(首响/完结时长)
3. 风险分布(告警类型/区域/摄像头,仅有数据时展示)
4. 超时未处理(仅有遗留时展示)

去掉 textcard 和 news 相关代码,简化发送逻辑。
2026-04-03 15:26:46 +08:00
d6765f51f2 优化:日报改为单条文本卡片推送 2026-04-03 14:12:25 +08:00
30db9d8961 优化:日报升级为图文摘要加详情推送 2026-04-03 13:16:04 +08:00
14 changed files with 858 additions and 255 deletions

View File

@@ -63,6 +63,7 @@ class WeChatConfig:
test_uids: str = "" # 演示模式逗号分隔的企微userid如 "zhangsan,lisi"
service_base_url: str = "" # 公网地址,如 https://vsp.viewshanghai.com
group_chat_id: str = "" # 告警群聊ID通过企微API创建或手动指定
group_robot_key: str = "" # 群机器人 Webhook key用于日报等模板卡片推送
@dataclass
@@ -184,6 +185,7 @@ def load_settings() -> Settings:
test_uids=os.getenv("WECHAT_TEST_UIDS", ""),
service_base_url=os.getenv("SERVICE_BASE_URL", ""),
group_chat_id=os.getenv("WECHAT_GROUP_CHAT_ID", ""),
group_robot_key=os.getenv("WECHAT_GROUP_ROBOT_KEY", ""),
),
agent=AgentConfig(
vlm_api_key=os.getenv("DASHSCOPE_API_KEY", ""),

190
app/constants.py Normal file
View File

@@ -0,0 +1,190 @@
"""
全局术语注册中心Single Source of Truth
所有业务术语、状态映射、类型映射集中在此文件定义。
其他模块一律从这里 import禁止各自重复定义。
修改告警类型、工单状态、保洁类型等业务术语时,只需改这一个文件。
"""
from enum import Enum
from typing import Dict
# ============================================================
# 摄像头 ID 统一说明
# ============================================================
# 全系统的摄像头唯一标识统一叫 camera_code格式为 cam_{hash}
# 不同系统的字段名映射关系:
# - security-ai-edge: camera_id → 实际是 camera_code
# - wvp-platform: camera_code → 数据库字段就叫这个
# - IoT 工单表: camera_id → 实际存的是 camera_code
# - vsp-service: device_id → AlarmEvent 表,历史原因叫 device_id
#
# Python 代码中对外接口API 参数、工具参数)统一使用 device_id
# 因为 AlarmEvent 主表和 EdgeAlarmReport schema 已用此名。
# 内部传递和显示时,用 camera_name_service 解析为人类可读名称。
# ============================================================
# 告警类型 (alarm_type)
# ============================================================
class AlarmType(str, Enum):
LEAVE_POST = "leave_post"
INTRUSION = "intrusion"
ILLEGAL_PARKING = "illegal_parking"
VEHICLE_CONGESTION = "vehicle_congestion"
NON_MOTOR_VEHICLE_PARKING = "non_motor_vehicle_parking"
ALARM_TYPE_NAMES: Dict[str, str] = {
AlarmType.LEAVE_POST: "人员离岗",
AlarmType.INTRUSION: "周界入侵",
AlarmType.ILLEGAL_PARKING: "车辆违停",
AlarmType.VEHICLE_CONGESTION: "车辆拥堵",
AlarmType.NON_MOTOR_VEHICLE_PARKING: "非机动车违停",
}
# VLM 场景下的简短名称(用于截图分析提示词,尽量精炼)
ALARM_TYPE_SHORT_NAMES: Dict[str, str] = {
AlarmType.LEAVE_POST: "离岗",
AlarmType.INTRUSION: "入侵",
AlarmType.ILLEGAL_PARKING: "违停",
AlarmType.VEHICLE_CONGESTION: "拥堵",
AlarmType.NON_MOTOR_VEHICLE_PARKING: "非机动车违停",
}
def get_alarm_type_name(code: str) -> str:
"""获取告警类型中文名,未知类型原样返回"""
return ALARM_TYPE_NAMES.get(code, code)
# ============================================================
# 告警状态 (alarm_status) — vsp-service 内部使用
# ============================================================
class AlarmStatus(str, Enum):
NEW = "NEW" # 新告警
CONFIRMED = "CONFIRMED" # 已确认(已创建工单)
FALSE = "FALSE" # 误报
CLOSED = "CLOSED" # 已关闭
ALARM_STATUS_NAMES: Dict[str, str] = {
AlarmStatus.NEW: "待处理",
AlarmStatus.CONFIRMED: "处理中",
AlarmStatus.FALSE: "误报",
AlarmStatus.CLOSED: "已关闭",
}
# 芋道前端兼容映射(前端期望的状态值)
ALARM_STATUS_TO_YUDAO: Dict[str, str] = {
AlarmStatus.NEW: "pending",
AlarmStatus.CONFIRMED: "processing",
AlarmStatus.FALSE: "false_alarm",
AlarmStatus.CLOSED: "completed",
}
YUDAO_TO_ALARM_STATUS: Dict[str, str] = {
v: k for k, v in ALARM_STATUS_TO_YUDAO.items()
}
# ============================================================
# 处理状态 (handle_status) — vsp-service 内部使用
# ============================================================
class HandleStatus(str, Enum):
UNHANDLED = "UNHANDLED" # 未处理
HANDLING = "HANDLING" # 处理中
DONE = "DONE" # 已完成
IGNORED = "IGNORED" # 已忽略
HANDLE_STATUS_NAMES: Dict[str, str] = {
HandleStatus.UNHANDLED: "未处理",
HandleStatus.HANDLING: "处理中",
HandleStatus.DONE: "已完成",
HandleStatus.IGNORED: "已忽略",
}
# ============================================================
# IoT 工单状态 (order status) — 来自芋道 ops_order 表
# ============================================================
class OrderStatus(str, Enum):
PENDING = "PENDING" # 待处理
ASSIGNED = "ASSIGNED" # 已派单
ARRIVED = "ARRIVED" # 已到岗
PAUSED = "PAUSED" # 已暂停
COMPLETED = "COMPLETED" # 已完成
CANCELLED = "CANCELLED" # 已取消
ORDER_STATUS_NAMES: Dict[str, str] = {
OrderStatus.PENDING: "待处理",
OrderStatus.ASSIGNED: "已派单",
OrderStatus.ARRIVED: "已到岗",
OrderStatus.PAUSED: "已暂停",
OrderStatus.COMPLETED: "已完成",
OrderStatus.CANCELLED: "已取消",
}
# 未完成状态集合(用于查询"待处理"工单)
ORDER_OPEN_STATUSES = frozenset({
OrderStatus.PENDING, OrderStatus.ASSIGNED,
OrderStatus.ARRIVED, OrderStatus.PAUSED,
})
# ============================================================
# 告警等级 (alarm_level)
# ============================================================
ALARM_LEVEL_NAMES: Dict[int, str] = {
0: "紧急",
1: "重要",
2: "普通",
3: "轻微",
}
# 各算法的默认告警等级
ALARM_TYPE_DEFAULT_LEVEL: Dict[str, int] = {
AlarmType.INTRUSION: 1, # 重要
AlarmType.LEAVE_POST: 2, # 普通
AlarmType.ILLEGAL_PARKING: 1, # 重要(与 edge 端一致)
AlarmType.VEHICLE_CONGESTION: 2, # 普通
AlarmType.NON_MOTOR_VEHICLE_PARKING: 2, # 普通
}
# ============================================================
# 工单优先级 (priority)
# ============================================================
PRIORITY_NAMES: Dict[int, str] = {
0: "",
1: "",
2: "",
}
# ============================================================
# 保洁类型 (cleaning_type)
# ============================================================
class CleaningType(str, Enum):
ROUTINE = "ROUTINE"
DEEP = "DEEP"
SPOT = "SPOT"
EMERGENCY = "EMERGENCY"
CLEANING_TYPE_NAMES: Dict[str, str] = {
CleaningType.ROUTINE: "日常保洁",
CleaningType.DEEP: "深度保洁",
CleaningType.SPOT: "点状保洁",
CleaningType.EMERGENCY: "应急保洁",
}
# ============================================================
# 通用常量
# ============================================================
WEEKDAY_NAMES = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]

View File

@@ -14,6 +14,7 @@ from typing import Optional, List
import httpx
from app.config import settings
from app.constants import ALARM_STATUS_TO_YUDAO, ALARM_TYPE_NAMES, ALARM_LEVEL_NAMES
from app.utils.logger import logger
router = APIRouter(prefix="/api/work-order", tags=["H5工单处理"])
@@ -106,22 +107,6 @@ async def get_work_order_detail(
except Exception:
snapshot_url = alarm_dict.get("snapshot_url", "")
# 状态映射
status_map = {
"NEW": "pending",
"CONFIRMED": "processing",
"CLOSED": "completed",
"FALSE": "false_alarm",
}
type_names = {
"leave_post": "人员离岗",
"intrusion": "周界入侵",
"illegal_parking": "车辆违停",
"vehicle_congestion": "车辆拥堵",
}
level_names = {0: "紧急", 1: "重要", 2: "普通", 3: "轻微"}
event_time = alarm_dict.get("event_time", "")
if event_time:
try:
@@ -135,9 +120,9 @@ async def get_work_order_detail(
"data": {
"alarmId": alarmId,
"orderId": order_id,
"status": status_map.get(alarm_dict.get("alarm_status", ""), "pending"),
"alarmType": type_names.get(alarm_dict.get("alarm_type", ""), alarm_dict.get("alarm_type", "")),
"alarmLevel": level_names.get(alarm_dict.get("alarm_level"), "普通"),
"status": ALARM_STATUS_TO_YUDAO.get(alarm_dict.get("alarm_status", ""), "pending"),
"alarmType": ALARM_TYPE_NAMES.get(alarm_dict.get("alarm_type", ""), alarm_dict.get("alarm_type", "")),
"alarmLevel": ALARM_LEVEL_NAMES.get(alarm_dict.get("alarm_level"), "普通"),
"cameraName": camera_name,
"eventTime": event_time,
"snapshotUrl": snapshot_url,

View File

@@ -20,6 +20,7 @@ import os
import httpx
from app.yudao_compat import YudaoResponse, get_current_user
from app.constants import ALARM_TYPE_NAMES as _BASE_TYPE_NAMES
from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService
from app.services.notification_service import get_notification_service
from app.services.oss_storage import get_oss_storage
@@ -557,9 +558,8 @@ async def _notify_ops_platform(data: dict):
def _get_alarm_type_name(alarm_type: Optional[str]) -> str:
"""获取告警类型名称"""
type_names = {
"leave_post": "离岗检测",
"intrusion": "周界入侵",
_EXTENDED_TYPE_NAMES = {
**_BASE_TYPE_NAMES,
"crowd": "人群聚集",
"fire": "火焰检测",
"smoke": "烟雾检测",
@@ -567,4 +567,4 @@ def _get_alarm_type_name(alarm_type: Optional[str]) -> str:
"helmet": "安全帽检测",
"unknown": "未知类型",
}
return type_names.get(alarm_type, alarm_type or "未知类型")
return _EXTENDED_TYPE_NAMES.get(alarm_type, alarm_type or "未知类型")

View File

@@ -28,6 +28,7 @@ from typing import Optional
from datetime import datetime
from app.yudao_compat import YudaoResponse, get_current_user
from app.constants import ALARM_TYPE_NAMES as _BASE_TYPE_NAMES
from app.services.alert_service import get_alert_service, AlertService
from app.schemas import AlertHandleRequest
@@ -251,9 +252,8 @@ async def get_camera_summary_page(
def _get_alert_type_name(alert_type: Optional[str]) -> str:
"""获取告警类型名称"""
type_names = {
"leave_post": "离岗检测",
"intrusion": "周界入侵",
_EXTENDED_TYPE_NAMES = {
**_BASE_TYPE_NAMES,
"crowd": "人群聚集",
"fire": "火焰检测",
"smoke": "烟雾检测",
@@ -261,4 +261,4 @@ def _get_alert_type_name(alert_type: Optional[str]) -> str:
"helmet": "安全帽检测",
"unknown": "未知类型",
}
return type_names.get(alert_type, alert_type or "未知类型")
return _EXTENDED_TYPE_NAMES.get(alert_type, alert_type or "未知类型")

View File

@@ -328,6 +328,18 @@ def _build_aiot_menus():
"visible": True,
"keepAlive": True,
},
{
"id": 2030,
"parentId": 2000,
"name": "算法全局参数",
"path": "device/algorithm",
"component": "aiot/device/algorithm/index",
"componentName": "AiotDeviceAlgorithm",
"icon": "ep:setting",
"sort": 5,
"visible": True,
"keepAlive": True,
},
{
"id": 2040,
"parentId": 2000,

View File

@@ -11,28 +11,10 @@ from langchain_core.runnables import RunnableConfig
from app.utils.logger import logger
from app.utils.timezone import beijing_now
# 告警类型中文映射
ALARM_TYPE_NAMES = {
"leave_post": "人员离岗", "intrusion": "周界入侵",
"illegal_parking": "车辆违停", "vehicle_congestion": "车辆拥堵",
}
# 工单状态映射
ORDER_STATUS_NAMES = {
"PENDING": "待处理", "ASSIGNED": "已派单", "ARRIVED": "已到岗",
"PAUSED": "已暂停", "COMPLETED": "已完成", "CANCELLED": "已取消",
}
# 工单优先级映射
PRIORITY_NAMES = {0: "", 1: "", 2: ""}
# 保洁类型映射
CLEANING_TYPE_NAMES = {
"ROUTINE": "日常保洁", "DEEP": "深度保洁",
"SPOT": "点状保洁", "EMERGENCY": "应急保洁",
}
from app.constants import (
ALARM_TYPE_NAMES, ORDER_STATUS_NAMES, PRIORITY_NAMES,
CLEANING_TYPE_NAMES, ORDER_OPEN_STATUSES,
)
def _parse_time_range(time_range: str):
@@ -237,8 +219,7 @@ def list_orders(
user_id = config.get("configurable", {}).get("user_id", "")
# 查待处理工单时不限时间范围(待处理可能是之前创建的)
pending_statuses = {"PENDING", "ASSIGNED", "ARRIVED", "PAUSED"}
skip_time = status and status in pending_statuses
skip_time = status and status in ORDER_OPEN_STATUSES
orders, total, sec_ext_map, clean_ext_map = _query_orders(
order_type=order_type if order_type != "ALL" else None,

View File

@@ -8,6 +8,7 @@ from typing import Optional, List, Dict, Any, Tuple
from sqlalchemy import func, cast, Date, Integer, extract, text
from app.constants import ALARM_TYPE_DEFAULT_LEVEL, AlarmStatus, HandleStatus
from app.models import AlarmEvent, AlarmEventExt, AlarmLlmAnalysis, get_session
from app.services.oss_storage import get_oss_storage
from app.utils.logger import logger
@@ -36,14 +37,8 @@ def _determine_alarm_level(
2. 时长升级:持续型告警随时长升级(只升不降)
3. 无配置时使用算法默认等级
"""
# 算法默认等级
default_levels = {
"intrusion": 1, # 重要
"leave_post": 2, # 普通
"illegal_parking": 1, # 重要
"vehicle_congestion": 2, # 普通
}
base_level = initial_level if initial_level is not None else default_levels.get(alarm_type, 2)
# 算法默认等级(来自 constants.py 统一定义)
base_level = initial_level if initial_level is not None else ALARM_TYPE_DEFAULT_LEVEL.get(alarm_type, 2)
# 入侵检测:事件型,不升级
if alarm_type == "intrusion":
@@ -124,8 +119,8 @@ class AlarmEventService:
last_frame_time=None,
alarm_level=alarm_level,
confidence_score=confidence,
alarm_status="NEW",
handle_status="UNHANDLED",
alarm_status=AlarmStatus.NEW,
handle_status=HandleStatus.UNHANDLED,
edge_node_id=mqtt_data.get("device_id"),
)
@@ -236,8 +231,8 @@ class AlarmEventService:
duration_ms=ext_data.get("duration_ms"),
alarm_level=alarm_level,
confidence_score=confidence,
alarm_status="NEW",
handle_status="UNHANDLED",
alarm_status=AlarmStatus.NEW,
handle_status=HandleStatus.UNHANDLED,
snapshot_url=data.get("snapshot_url"),
edge_node_id=ext_data.get("edge_node_id"),
area_id=data.get("area_id") or ext_data.get("area_id"),
@@ -314,8 +309,8 @@ class AlarmEventService:
duration_ms=duration_ms,
alarm_level=alarm_level,
confidence_score=confidence,
alarm_status="NEW",
handle_status="UNHANDLED",
alarm_status=AlarmStatus.NEW,
handle_status=HandleStatus.UNHANDLED,
snapshot_url=snapshot_url,
edge_node_id=data.get("device_id"),
)
@@ -514,12 +509,12 @@ class AlarmEventService:
# 待处理数
pending_count = db.query(AlarmEvent).filter(
AlarmEvent.handle_status == "UNHANDLED"
AlarmEvent.handle_status == HandleStatus.UNHANDLED
).count()
# 已处理数
handled_count = db.query(AlarmEvent).filter(
AlarmEvent.handle_status.in_(["DONE", "IGNORED"])
AlarmEvent.handle_status.in_([HandleStatus.DONE, HandleStatus.IGNORED])
).count()
# 平均响应时间只算近7天已处理的排除>6h的异常值
@@ -675,8 +670,8 @@ class AlarmEventService:
yesterday_count = db.query(AlarmEvent).filter(
AlarmEvent.event_time >= yesterday_start, AlarmEvent.event_time < today_start
).count()
pending_count = db.query(AlarmEvent).filter(AlarmEvent.handle_status == "UNHANDLED").count()
handled_count = db.query(AlarmEvent).filter(AlarmEvent.handle_status.in_(["DONE", "IGNORED"])).count()
pending_count = db.query(AlarmEvent).filter(AlarmEvent.handle_status == HandleStatus.UNHANDLED).count()
handled_count = db.query(AlarmEvent).filter(AlarmEvent.handle_status.in_([HandleStatus.DONE, HandleStatus.IGNORED])).count()
from sqlalchemy.sql.expression import literal_column
stats_since = today_start - timedelta(days=7)
@@ -790,7 +785,7 @@ class AlarmEventService:
unhandled_count = (
db.query(AlarmEvent)
.filter(AlarmEvent.device_id == row.device_id)
.filter(AlarmEvent.handle_status == "UNHANDLED")
.filter(AlarmEvent.handle_status == HandleStatus.UNHANDLED)
.count()
)
@@ -850,34 +845,34 @@ class AlarmEventService:
alarm.last_frame_time = beijing_now().replace(microsecond=0)
# 先到先得:已被人工处理到终态的不覆盖
terminal_statuses = ("CLOSED", "FALSE")
terminal_handle = ("DONE", "IGNORED")
terminal_statuses = (AlarmStatus.CLOSED, AlarmStatus.FALSE)
terminal_handle = (HandleStatus.DONE, HandleStatus.IGNORED)
if alarm.alarm_status in terminal_statuses or alarm.handle_status in terminal_handle:
logger.info(f"告警已为终态({alarm.alarm_status}/{alarm.handle_status}),仅更新时长: {alarm_id}")
elif resolve_type == "person_returned":
alarm.alarm_status = "CLOSED"
alarm.handle_status = "DONE"
alarm.alarm_status = AlarmStatus.CLOSED
alarm.handle_status = HandleStatus.DONE
alarm.handle_remark = "人员回岗自动关闭"
alarm.handled_at = beijing_now()
elif resolve_type == "non_work_time":
alarm.alarm_status = "CLOSED"
alarm.handle_status = "DONE"
alarm.alarm_status = AlarmStatus.CLOSED
alarm.handle_status = HandleStatus.DONE
alarm.handle_remark = "非工作时间自动关闭"
alarm.handled_at = beijing_now()
elif resolve_type == "intrusion_cleared":
alarm.alarm_status = "CLOSED"
alarm.handle_status = "DONE"
alarm.alarm_status = AlarmStatus.CLOSED
alarm.handle_status = HandleStatus.DONE
alarm.handle_remark = "入侵消失自动关闭持续无人180秒"
alarm.handled_at = beijing_now()
elif resolve_type == "vehicle_left":
alarm.alarm_status = "CLOSED"
alarm.handle_status = "DONE"
alarm.alarm_status = AlarmStatus.CLOSED
alarm.handle_status = HandleStatus.DONE
alarm.handle_remark = "车辆离开自动关闭"
alarm.handled_at = beijing_now()
elif resolve_type == "congestion_cleared":
alarm.alarm_status = "CLOSED"
alarm.handle_status = "DONE"
alarm.alarm_status = AlarmStatus.CLOSED
alarm.handle_status = HandleStatus.DONE
alarm.handle_remark = "拥堵消散自动关闭"
alarm.handled_at = beijing_now()
@@ -900,7 +895,7 @@ class AlarmEventService:
alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first()
if not alarm:
return False
return alarm.alarm_status in ("CLOSED", "FALSE") or alarm.handle_status in ("DONE", "IGNORED")
return alarm.alarm_status in (AlarmStatus.CLOSED, AlarmStatus.FALSE) or alarm.handle_status in (HandleStatus.DONE, HandleStatus.IGNORED)
except Exception:
return False
finally:

View File

@@ -4,46 +4,79 @@
每天定时生成前一天的工单汇总,发送到企微群聊。
数据源IoT ops_order + 安保/保洁扩展表。
"""
import asyncio
from collections import Counter, defaultdict
from collections import Counter
from datetime import timedelta
from typing import Dict, List, Optional
from app.utils.logger import logger
from app.config import settings
# 告警类型中文映射
ALARM_TYPE_NAMES = {
"leave_post": "人员离岗",
"intrusion": "周界入侵",
"illegal_parking": "车辆违停",
"vehicle_congestion": "车辆拥堵",
}
# 保洁类型映射
CLEANING_TYPE_NAMES = {
"ROUTINE": "日常保洁", "DEEP": "深度保洁",
"SPOT": "点状保洁", "EMERGENCY": "应急保洁",
}
# 工单状态映射
ORDER_STATUS_NAMES = {
"PENDING": "待处理", "ASSIGNED": "已派单", "ARRIVED": "已到岗",
"PAUSED": "已暂停", "COMPLETED": "已完成", "CANCELLED": "已取消",
}
WEEKDAY_NAMES = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
from app.utils.logger import logger
from app.constants import ALARM_TYPE_NAMES, CLEANING_TYPE_NAMES, WEEKDAY_NAMES
def _format_resp_time(minutes: float) -> str:
"""格式化响应时长"""
def _format_duration(minutes: float) -> str:
if minutes <= 0:
return "暂无数据"
if minutes < 60:
return f"{minutes:.1f}分钟"
return f"{minutes / 60:.1f}小时"
async def generate_daily_report() -> Optional[str]:
"""生成昨日工单日报 Markdown 内容"""
def _format_age(minutes: int) -> str:
"""把分钟数格式化为人类友好的时长"""
if minutes < 60:
return f"{minutes}分钟"
hours = minutes / 60
if hours < 24:
return f"{hours:.1f}小时"
days = hours / 24
return f"{days:.1f}"
def _format_ratio(numerator: int, denominator: int) -> str:
if denominator <= 0:
return "0%"
return f"{(numerator / denominator) * 100:.1f}%"
def _format_change(current: int, previous: int) -> str:
if previous <= 0:
return "前日无工单"
change_pct = (current - previous) / previous * 100
if change_pct > 0:
return f"前日{previous}条,↑{change_pct:.1f}%"
if change_pct < 0:
return f"前日{previous}条,↓{abs(change_pct):.1f}%"
return f"前日{previous}条,持平"
def _top_summary(counter: Counter, mapping: Optional[Dict[str, str]] = None, top_n: int = 3, max_len: int = 0) -> str:
"""汇总 Counter 前 N 名。max_len > 0 时截断每个名称。"""
if not counter:
return "暂无数据"
parts = []
for key, count in counter.most_common(top_n):
name = mapping.get(key, key) if mapping else key
if max_len and len(name) > max_len:
name = name[:max_len] + ".."
parts.append(f"{name} {count}")
return "".join(parts)
def _location_name(order) -> str:
if getattr(order, "location", None):
return order.location
if getattr(order, "area_id", None):
return f"区域{order.area_id}"
return "未标注区域"
def _safe_avg(values: List[float]) -> float:
return sum(values) / len(values) if values else 0.0
async def _build_daily_report_data() -> Optional[Dict]:
from app.utils.timezone import beijing_now
now = beijing_now()
@@ -54,11 +87,12 @@ async def generate_daily_report() -> Optional[str]:
date_str = yesterday_start.strftime("%m-%d")
weekday = WEEKDAY_NAMES[yesterday_start.weekday()]
# 查询 IoT 工单
try:
from app.models_iot import (
get_iot_session, IotOpsOrder,
IotOpsOrderSecurityExt, IotOpsOrderCleanExt,
get_iot_session,
IotOpsOrder,
IotOpsOrderSecurityExt,
IotOpsOrderCleanExt,
)
except Exception as e:
logger.error(f"IoT数据库不可用日报生成失败: {e}")
@@ -66,7 +100,6 @@ async def generate_daily_report() -> Optional[str]:
db = get_iot_session()
try:
# 昨日工单
yesterday_orders = db.query(IotOpsOrder).filter(
IotOpsOrder.create_time >= yesterday_start,
IotOpsOrder.create_time < today_start,
@@ -74,38 +107,58 @@ async def generate_daily_report() -> Optional[str]:
).all()
yesterday_total = len(yesterday_orders)
# 前日工单(用于环比)
prev_total = db.query(IotOpsOrder).filter(
IotOpsOrder.create_time >= day_before_start,
IotOpsOrder.create_time < yesterday_start,
IotOpsOrder.deleted == 0,
).count()
if yesterday_total == 0:
return (
f"**物业工单日报 — {date_str}{weekday}**\n\n"
f">昨日工单总计:**0** 条\n"
f">系统运行正常,无工单"
)
current_open_orders = db.query(IotOpsOrder).filter(
IotOpsOrder.deleted == 0,
IotOpsOrder.status.notin_(("COMPLETED", "CANCELLED")),
).all()
if not yesterday_orders and not current_open_orders:
return {
"date_str": date_str,
"weekday": weekday,
"empty": True,
"title": f"物业工单日报 {date_str}{weekday}",
"subtitle": "昨日系统运行平稳",
"overview": "昨日无新增工单,当前无待处理工单。",
"summary": {
"yesterday_total": 0,
"completed_count": 0,
"backlog_count": 0,
"avg_resp": "暂无数据",
"avg_close": "暂无数据",
"false_alarm_rate": "0%",
},
"risk_lines": [
"安保高发:暂无数据",
"高发区域:暂无数据",
"高发摄像头:暂无数据",
"超时未处理0 条",
],
"change_str": "前日无工单",
"top_overdue": [],
}
# 收集 order_ids
order_ids = [o.id for o in yesterday_orders]
# 批量查安保扩展
sec_ext_map = {}
sec_exts = db.query(IotOpsOrderSecurityExt).filter(
IotOpsOrderSecurityExt.ops_order_id.in_(order_ids),
IotOpsOrderSecurityExt.deleted == 0,
).all()
sec_ext_map = {e.ops_order_id: e for e in sec_exts}
# 批量查保洁扩展
clean_ext_map = {}
clean_exts = db.query(IotOpsOrderCleanExt).filter(
IotOpsOrderCleanExt.ops_order_id.in_(order_ids),
IotOpsOrderCleanExt.deleted == 0,
).all()
clean_ext_map = {e.ops_order_id: e for e in clean_exts}
if order_ids:
sec_exts = db.query(IotOpsOrderSecurityExt).filter(
IotOpsOrderSecurityExt.ops_order_id.in_(order_ids),
IotOpsOrderSecurityExt.deleted == 0,
).all()
sec_ext_map = {e.ops_order_id: e for e in sec_exts}
clean_exts = db.query(IotOpsOrderCleanExt).filter(
IotOpsOrderCleanExt.ops_order_id.in_(order_ids),
IotOpsOrderCleanExt.deleted == 0,
).all()
clean_ext_map = {e.ops_order_id: e for e in clean_exts}
except Exception as e:
logger.error(f"查询IoT工单失败: {e}", exc_info=True)
@@ -113,47 +166,59 @@ async def generate_daily_report() -> Optional[str]:
finally:
db.close()
# ---- 统计 ----
type_count = {"SECURITY": 0, "CLEAN": 0}
status_count = Counter()
alarm_type_count = Counter()
camera_code_counter = Counter() # 先按 camera_code 统计
area_counter = Counter()
camera_code_counter = Counter()
false_alarm_count = 0
response_times: List[float] = []
close_times: List[float] = []
cleaning_type_count = Counter()
for o in yesterday_orders:
ot = o.order_type or "SECURITY"
type_count[ot] = type_count.get(ot, 0) + 1
status_count[o.status or "PENDING"] += 1
for order in yesterday_orders:
order_type = order.order_type or "SECURITY"
type_count[order_type] = type_count.get(order_type, 0) + 1
status_count[order.status or "PENDING"] += 1
area_counter[_location_name(order)] += 1
# 安保统计
sec_ext = sec_ext_map.get(o.id)
sec_ext = sec_ext_map.get(order.id)
if sec_ext:
if sec_ext.alarm_type:
alarm_type_count[sec_ext.alarm_type] += 1
# 统一用 camera_id即 camera_code做 key后续批量解析名称
cam_key = sec_ext.camera_id or sec_ext.camera_name
if cam_key:
camera_code_counter[cam_key] += 1
if sec_ext.false_alarm == 1:
false_alarm_count += 1
# 响应时长dispatched → confirmed
if sec_ext.dispatched_time and sec_ext.confirmed_time:
delta = (sec_ext.confirmed_time - sec_ext.dispatched_time).total_seconds() / 60.0
if 0 <= delta <= 360:
response_times.append(delta)
if sec_ext.dispatched_time and sec_ext.completed_time:
delta = (sec_ext.completed_time - sec_ext.dispatched_time).total_seconds() / 60.0
if 0 <= delta <= 24 * 60:
close_times.append(delta)
# 保洁统计
clean_ext = clean_ext_map.get(o.id)
if clean_ext and clean_ext.cleaning_type:
cleaning_type_count[clean_ext.cleaning_type] += 1
clean_ext = clean_ext_map.get(order.id)
if clean_ext:
if clean_ext.cleaning_type:
cleaning_type_count[clean_ext.cleaning_type] += 1
dispatch_time = clean_ext.first_dispatched_time or clean_ext.dispatched_time
if dispatch_time and clean_ext.arrived_time:
delta = (clean_ext.arrived_time - dispatch_time).total_seconds() / 60.0
if 0 <= delta <= 360:
response_times.append(delta)
if dispatch_time and clean_ext.completed_time:
delta = (clean_ext.completed_time - dispatch_time).total_seconds() / 60.0
if 0 <= delta <= 24 * 60:
close_times.append(delta)
# 批量解析摄像头名称camera_code → 真实名称)
camera_counter = Counter()
if camera_code_counter:
try:
from app.services.camera_name_service import get_camera_name_service
cam_svc = get_camera_name_service()
name_map = await cam_svc.get_display_names_batch(list(camera_code_counter.keys()))
for code, count in camera_code_counter.items():
@@ -163,103 +228,250 @@ async def generate_daily_report() -> Optional[str]:
logger.warning(f"摄像头名称解析失败,降级使用代码: {e}")
camera_counter = camera_code_counter
# 环比
if prev_total > 0:
change_pct = (yesterday_total - prev_total) / prev_total * 100
if change_pct > 0:
change_str = f"前日{prev_total}条,↑{change_pct:.1f}%"
elif change_pct < 0:
change_str = f"前日{prev_total}条,↓{abs(change_pct):.1f}%"
else:
change_str = f"前日{prev_total}条,持平"
else:
change_str = "前日无工单"
# 平均响应时长
resp_str = _format_resp_time(sum(response_times) / len(response_times)) if response_times else "暂无数据"
# 待处理数量
pending_count = sum(
1 for o in yesterday_orders if o.status in ("PENDING", "ASSIGNED")
)
backlog_count = len(current_open_orders)
carry_over_count = sum(1 for order in current_open_orders if order.create_time and order.create_time < today_start)
completed_count = status_count.get("COMPLETED", 0)
cancelled_count = status_count.get("CANCELLED", 0)
avg_resp = _format_duration(_safe_avg(response_times))
avg_close = _format_duration(_safe_avg(close_times))
false_alarm_rate = _format_ratio(false_alarm_count, type_count.get("SECURITY", 0))
# ==================== 组装 Markdown ====================
lines = [
f"**物业工单日报 — {date_str}{weekday}**",
"",
f">昨日工单总计:<font color=\"warning\">{yesterday_total}</font> 条({change_str}",
overdue_orders = sorted(
(
order for order in current_open_orders
if order.create_time and order.create_time < today_start
),
key=lambda item: item.create_time,
)
top_overdue = []
for order in overdue_orders[:3]:
age_minutes = max(int((now - order.create_time).total_seconds() / 60), 0)
order_type_label = "安保" if order.order_type == "SECURITY" else "保洁"
top_overdue.append(
f"{_location_name(order)}{order_type_label},已挂起{_format_age(age_minutes)}"
)
report = {
"date_str": date_str,
"weekday": weekday,
"empty": False,
"title": f"物业工单日报 {date_str}{weekday}",
"subtitle": "昨日运营概览",
"overview": f"昨日新增 {yesterday_total}|完成 {completed_count}|当前待处理 {backlog_count}",
"change_str": _format_change(yesterday_total, prev_total),
"summary": {
"yesterday_total": yesterday_total,
"completed_count": completed_count,
"backlog_count": backlog_count,
"security_count": type_count.get("SECURITY", 0),
"clean_count": type_count.get("CLEAN", 0),
"avg_resp": avg_resp,
"avg_close": avg_close,
"false_alarm_rate": false_alarm_rate,
"carry_over_count": carry_over_count,
"cancelled_count": status_count.get("CANCELLED", 0),
},
"risk_lines": [
f"安保高发:{_top_summary(alarm_type_count, ALARM_TYPE_NAMES)}",
f"高发区域:{_top_summary(area_counter)}",
f"高发摄像头:{_top_summary(camera_counter)}",
f"超时未处理:{carry_over_count}",
],
"tops": {
"alarm_types": _top_summary(alarm_type_count, ALARM_TYPE_NAMES),
"areas": _top_summary(area_counter),
"cameras": _top_summary(camera_counter),
"cleaning_types": _top_summary(cleaning_type_count, CLEANING_TYPE_NAMES),
# 卡片专用:截断名称,只取 top1 防溢出
"cameras_short": _top_summary(camera_counter, top_n=1, max_len=6),
"alarm_types_short": _top_summary(alarm_type_count, ALARM_TYPE_NAMES, top_n=2),
},
"top_overdue": top_overdue,
}
return report
def _build_template_card(report: Dict) -> Dict:
"""构建 text_notice 模板卡片(群机器人 Webhook 专用)"""
s = report["summary"]
tops = report["tops"]
click_url = settings.wechat.service_base_url or "https://work.weixin.qq.com"
# 大号数字
emphasis_desc = f"昨日新增({report['change_str']}"
# 待处理文案
if s["backlog_count"] == 0:
pending_val = "0 ✅ 全部清零"
elif s["carry_over_count"] > 0:
pending_val = f"{s['backlog_count']}(遗留{s['carry_over_count']}"
else:
pending_val = str(s["backlog_count"])
# 键值对(最多 6 条,用短名称防截断)
kv_list = [
{"keyname": "安保 / 保洁", "value": f"{s['security_count']} / {s['clean_count']}"},
{"keyname": "已完成 / 待处理", "value": f"{s['completed_count']} / {pending_val}"},
{"keyname": "首响 / 完结", "value": f"{s['avg_resp']} / {s['avg_close']}"},
]
# 按工单类型
sec_count = type_count.get("SECURITY", 0)
clean_count = type_count.get("CLEAN", 0)
if sec_count and clean_count:
lines.append(f">安保工单:{sec_count}条 | 保洁工单:{clean_count}")
elif sec_count:
lines.append(f">安保工单:{sec_count}")
elif clean_count:
lines.append(f">保洁工单:{clean_count}")
if tops["alarm_types_short"] != "暂无数据":
kv_list.append({"keyname": "告警热点", "value": tops["alarm_types_short"]})
lines.append(f">待处理:<font color=\"warning\">{pending_count}</font> 条 | "
f"已完成:{completed_count}条 | 已取消:{cancelled_count}条 | 误报:{false_alarm_count}")
lines.append(f">平均响应:<font color=\"info\">{resp_str}</font>")
if tops["cameras_short"] != "暂无数据":
kv_list.append({"keyname": "高发设备", "value": tops["cameras_short"]})
# 安保告警类型分布
if alarm_type_count:
if s["false_alarm_rate"] != "0%":
kv_list.append({"keyname": "误报率", "value": s["false_alarm_rate"]})
# 副标题(一句话总结)
if s["backlog_count"] == 0:
sub_title = "昨日工单全部清零,运营状态良好"
elif s["carry_over_count"] > 0:
sub_title = f"{s['carry_over_count']}条工单超时未处理,请及时跟进"
elif s["backlog_count"] > 0:
sub_title = f"当前 {s['backlog_count']} 条待处理"
else:
sub_title = f"当前 {s['backlog_count']} 条待处理"
card = {
"card_type": "text_notice",
"source": {
"desc": "VSP物业平台",
"desc_color": 0,
},
"main_title": {
"title": report["title"],
},
"emphasis_content": {
"title": str(s["yesterday_total"]),
"desc": emphasis_desc,
},
"sub_title_text": sub_title,
"horizontal_content_list": kv_list,
"jump_list": [
{
"type": 2,
"title": "点击查看详情",
"appid": "wxb3dc42bb3017c3f2",
"pagepath": "/pages-ops/work-order/index",
},
],
"card_action": {
"type": 2,
"appid": "wxb3dc42bb3017c3f2",
"pagepath": "/pages-ops/work-order/index",
},
}
return card
def _build_markdown(report: Dict) -> str:
"""构建单条企微 markdown 日报(降级方案)"""
if report.get("empty"):
return (
f"**{report['title']}**\n\n"
f">昨日系统运行平稳,无新增工单\n"
f">当前无待处理事项"
)
s = report["summary"]
backlog = s["backlog_count"]
lines = [
f"**{report['title']}**",
"",
f">昨日新增 <font color=\"warning\">{s['yesterday_total']}</font> 条({report['change_str']}",
f">安保 {s['security_count']}|保洁 {s['clean_count']}"
f"已完成 {s['completed_count']}|误报 {s['false_alarm_rate']}",
]
if backlog == 0:
lines.append(f">待处理 <font color=\"info\">0</font> 条,全部清零")
else:
lines.append(
f">待处理 <font color=\"warning\">{backlog}</font> 条"
f"(其中遗留 {s['carry_over_count']}"
)
lines.append("")
lines.append(
f">响应效率:首响 <font color=\"info\">{s['avg_resp']}</font>"
f"|完结 <font color=\"info\">{s['avg_close']}</font>"
)
tops = report["tops"]
risk_items = []
if tops["alarm_types"] != "暂无数据":
risk_items.append(f">告警类型|{tops['alarm_types']}")
if tops["cleaning_types"] != "暂无数据":
risk_items.append(f">保洁类型|{tops['cleaning_types']}")
if tops["areas"] != "暂无数据":
risk_items.append(f">高发区域|{tops['areas']}")
if tops["cameras"] != "暂无数据":
risk_items.append(f">高发设备|{tops['cameras']}")
if risk_items:
lines.append("")
lines.append("**安保告警类型分布**")
for alarm_type, count in alarm_type_count.most_common():
type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type)
lines.append(f">{type_name}{count}")
lines.append("**热点分布**")
lines.extend(risk_items)
# 保洁类型分布
if cleaning_type_count:
if report["top_overdue"]:
lines.append("")
lines.append("**保洁类型分布**")
for ct, count in cleaning_type_count.most_common():
ct_name = CLEANING_TYPE_NAMES.get(ct, ct)
lines.append(f">{ct_name}{count}")
# 摄像头 Top5
top5_cameras = camera_counter.most_common(5)
if top5_cameras:
lines.append("")
lines.append("**告警摄像头 Top5**")
for i, (cam_name, count) in enumerate(top5_cameras, 1):
lines.append(f">{i}. {cam_name}{count}")
lines.append(f"**需关注({s['carry_over_count']}条超时)**")
for item in report["top_overdue"]:
lines.append(f">{item}")
return "\n".join(lines)
async def generate_daily_report() -> Optional[str]:
"""生成日报 markdown 内容(供预览和降级发送)"""
report = await _build_daily_report_data()
if not report:
return None
return _build_markdown(report)
async def _send_daily_report():
"""生成并发送日报"""
"""发送日报:优先用群机器人 Webhook 模板卡片,降级为 markdown"""
from app.services.wechat_service import get_wechat_service
chat_id = settings.wechat.group_chat_id
if not chat_id:
logger.warning("日报发送跳过:未配置 group_chat_id")
robot_key = settings.wechat.group_robot_key
if not chat_id and not robot_key:
logger.warning("日报发送跳过:未配置 group_chat_id 或 group_robot_key")
return
try:
content = await generate_daily_report()
if not content:
report = await _build_daily_report_data()
if not report:
logger.info("日报生成内容为空,跳过发送")
return
wechat_svc = get_wechat_service()
ok = await wechat_svc.send_group_markdown(chat_id, content)
if ok:
logger.info("日报已发送到企微群聊")
else:
logger.error("日报发送失败")
# 优先:群机器人 Webhook 发送 text_notice 模板卡片
if robot_key:
card = _build_template_card(report)
ok = await wechat_svc.send_webhook_template_card(robot_key, card)
if ok:
logger.info("日报模板卡片已通过 Webhook 发送")
return
# 降级:应用群聊发送 markdown
if chat_id:
content = _build_markdown(report)
ok = await wechat_svc.send_group_markdown(chat_id, content)
if ok:
logger.info("日报已通过 markdown 发送到群聊")
else:
logger.error("日报发送失败")
except Exception:
logger.exception("日报生成或发送异常")
def _seconds_until(hour: int, minute: int) -> float:
"""计算距离下一个 HH:MM 的秒数"""
from app.utils.timezone import beijing_now
now = beijing_now()
@@ -270,7 +482,6 @@ def _seconds_until(hour: int, minute: int) -> float:
async def start_daily_report_scheduler():
"""日报定时调度主循环"""
hour = settings.daily_report.send_hour
minute = settings.daily_report.send_minute
logger.info(f"日报定时任务已启动,每日 {hour:02d}:{minute:02d} 发送")
@@ -281,7 +492,6 @@ async def start_daily_report_scheduler():
logger.debug(f"日报下次发送倒计时 {wait:.0f}")
await asyncio.sleep(wait)
await _send_daily_report()
# 发送完等 61 秒,避免同一分钟内重复触发
await asyncio.sleep(61)
except asyncio.CancelledError:
logger.info("日报定时任务已停止")

View File

@@ -21,7 +21,7 @@ from app.models import (
)
from app.config import settings
from app.services.vlm_service import get_vlm_service
from app.services.wechat_service import ALARM_TYPE_NAMES
from app.constants import ALARM_TYPE_NAMES
from app.services.camera_name_service import get_camera_name_service
from app.services.work_order_client import get_work_order_client
from app.utils.logger import logger

View File

@@ -11,19 +11,12 @@ from typing import Optional, Tuple
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from app.constants import ALARM_TYPE_NAMES, ALARM_LEVEL_NAMES, ALARM_STATUS_NAMES
from app.models import AlarmEvent, get_session
from app.utils.logger import logger
from app.utils.timezone import beijing_now
TYPE_NAMES = {"leave_post": "人员离岗", "intrusion": "周界入侵"}
LEVEL_NAMES = {0: "紧急", 1: "重要", 2: "普通", 3: "轻微"}
STATUS_NAMES = {
"NEW": "待处理", "CONFIRMED": "已确认",
"FALSE": "误报", "CLOSED": "已关闭",
}
def generate_alarm_report(
time_range: str = "week",
) -> Optional[Tuple[str, io.BytesIO]]:
@@ -91,11 +84,11 @@ def generate_alarm_report(
for row, a in enumerate(alarms, 2):
values = [
a.alarm_id,
TYPE_NAMES.get(a.alarm_type, a.alarm_type),
ALARM_TYPE_NAMES.get(a.alarm_type, a.alarm_type),
a.device_id,
a.scene_id or "",
LEVEL_NAMES.get(a.alarm_level, str(a.alarm_level or "")),
STATUS_NAMES.get(a.alarm_status, a.alarm_status or ""),
ALARM_LEVEL_NAMES.get(a.alarm_level, str(a.alarm_level or "")),
ALARM_STATUS_NAMES.get(a.alarm_status, a.alarm_status or ""),
a.handle_status or "",
f"{a.confidence_score:.2f}" if a.confidence_score else "",
a.event_time.strftime("%Y-%m-%d %H:%M:%S") if a.event_time else "",
@@ -129,7 +122,7 @@ def generate_alarm_report(
ws2.cell(row=2, column=1, value="类型")
ws2.cell(row=2, column=2, value="数量")
for i, (t, c) in enumerate(type_count.items(), 3):
ws2.cell(row=i, column=1, value=TYPE_NAMES.get(t, t))
ws2.cell(row=i, column=1, value=ALARM_TYPE_NAMES.get(t, t))
ws2.cell(row=i, column=2, value=c)
# 状态统计
@@ -138,7 +131,7 @@ def generate_alarm_report(
ws2.cell(row=offset + 1, column=1, value="状态")
ws2.cell(row=offset + 1, column=2, value="数量")
for i, (s, c) in enumerate(status_count.items(), offset + 2):
ws2.cell(row=i, column=1, value=STATUS_NAMES.get(s, s))
ws2.cell(row=i, column=1, value=ALARM_STATUS_NAMES.get(s, s))
ws2.cell(row=i, column=2, value=c)
# 级别统计
@@ -147,7 +140,7 @@ def generate_alarm_report(
ws2.cell(row=offset2 + 1, column=1, value="级别")
ws2.cell(row=offset2 + 1, column=2, value="数量")
for i, (lv, c) in enumerate(level_count.items(), offset2 + 2):
ws2.cell(row=i, column=1, value=LEVEL_NAMES.get(lv, str(lv)))
ws2.cell(row=i, column=1, value=ALARM_LEVEL_NAMES.get(lv, str(lv)))
ws2.cell(row=i, column=2, value=c)
# 输出到内存

View File

@@ -13,12 +13,13 @@ from openai import AsyncOpenAI
from app.utils.logger import logger
# 算法类型中文映射
ALARM_TYPE_NAMES = {
# VLM 提示词专用名称,与 app.constants.ALARM_TYPE_NAMES 故意不同
VLM_TYPE_NAMES = {
"leave_post": "离岗",
"intrusion": "周界入侵",
"illegal_parking": "车辆违停",
"vehicle_congestion": "车辆拥堵",
"non_motor_vehicle_parking": "非机动车违停",
}
# 算法类型 → VLM Prompt 模板
@@ -58,6 +59,20 @@ description要求≤15字直接说结论注明大致车辆数。
告警成立示例:"约5辆车拥堵在路口"
误报示例:"车辆正常通行无拥堵"
仅输出JSON{{"confirmed":true,"description":"..."}}""",
"non_motor_vehicle_parking": """你是安防监控AI复核员。算法类型非机动车违停检测监控区域{roi_name}
任务:判断图中是否有非机动车(自行车、电动车、摩托车等)违规停放在禁停区域。
分析要点:
1. 是否存在非机动车(自行车、电动车、共享单车等)
2. 非机动车是否处于静止停放状态(而非骑行经过)
3. 是否在禁停区域/消防通道内
4. 停放是否造成通道阻塞
- confirmed=true有非机动车违停告警成立
- confirmed=false无非机动车违停误报
description要求≤15字直接说结论。
告警成立示例:"电动车违停在消防通道"
误报示例:"该区域无非机动车违停"
仅输出JSON{{"confirmed":true,"description":"..."}}""",
}
# 通用降级 prompt未知算法类型时使用
@@ -137,7 +152,7 @@ class VLMService:
# 选择 prompt 模板
template = VLM_PROMPTS.get(alarm_type, DEFAULT_PROMPT)
alarm_type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type)
alarm_type_name = VLM_TYPE_NAMES.get(alarm_type, alarm_type)
prompt = template.format(
camera_name=camera_name or "未知位置",
roi_name=roi_name or "监控区域",
@@ -172,13 +187,16 @@ class VLMService:
content = content.strip()
result = json.loads(content)
# 兼容不同 prompt 返回格式is_real/reason vs confirmed/description
confirmed = result.get("confirmed") if "confirmed" in result else result.get("is_real", True)
description = result.get("description") or result.get("reason", "")
logger.info(
f"VLM 复核完成: confirmed={result.get('confirmed')}, "
f"desc={result.get('description', '')[:30]}"
f"VLM 复核完成: confirmed={confirmed}, "
f"desc={description[:30]}"
)
return {
"confirmed": result.get("confirmed", True),
"description": result.get("description", ""),
"confirmed": confirmed,
"description": description,
"skipped": False,
}

View File

@@ -13,19 +13,9 @@ import httpx
import time
from typing import Optional, List, Dict
from app.constants import ALARM_TYPE_NAMES, ALARM_LEVEL_NAMES
from app.utils.logger import logger
# 告警类型中文映射(全局复用)
ALARM_TYPE_NAMES = {
"leave_post": "人员离岗",
"intrusion": "周界入侵",
"illegal_parking": "车辆违停",
"vehicle_congestion": "车辆拥堵",
}
# 告警级别映射
ALARM_LEVEL_NAMES = {0: "紧急", 1: "重要", 2: "普通", 3: "轻微"}
class WeChatService:
"""企微通知服务(单例)"""
@@ -588,6 +578,58 @@ class WeChatService:
logger.error(f"发送群聊markdown异常: {e}")
return False
async def send_group_textcard(self, chat_id: str, title: str, description: str, url: str = "") -> bool:
"""发送 textcard 消息到群聊,适合单条摘要展示。"""
if not self._enabled:
return False
try:
access_token = await self._get_access_token()
msg = {
"chatid": chat_id,
"msgtype": "textcard",
"textcard": {
"title": title,
"description": description,
"url": url or "https://work.weixin.qq.com",
"btntxt": "查看详情",
},
}
api_url = f"https://qyapi.weixin.qq.com/cgi-bin/appchat/send?access_token={access_token}"
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(api_url, json=msg)
data = resp.json()
if data.get("errcode") != 0:
logger.error(f"群聊文本卡片发送失败: {data}")
return False
logger.info(f"群聊文本卡片已发送: chatid={chat_id}")
return True
except Exception as e:
logger.error(f"发送群聊文本卡片异常: {e}")
return False
async def send_webhook_template_card(self, webhook_key: str, card: Dict) -> bool:
"""通过群机器人 Webhook 发送 template_cardtext_notice / news_notice"""
if not webhook_key:
logger.warning("Webhook key 为空,跳过发送")
return False
url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={webhook_key}"
payload = {
"msgtype": "template_card",
"template_card": card,
}
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(url, json=payload)
data = resp.json()
if data.get("errcode") != 0:
logger.error(f"Webhook template_card 发送失败: {data}")
return False
logger.info("Webhook template_card 已发送")
return True
except Exception as e:
logger.error(f"Webhook template_card 异常: {e}")
return False
async def send_group_image(self, chat_id: str, media_id: str) -> bool:
"""发送图片消息到群聊"""
if not self._enabled:

175
test_work_order_real.py Normal file
View File

@@ -0,0 +1,175 @@
"""
工单创建测试 — 使用真实告警数据 + 真实截图
1. 从 MySQL 查询最近一条带截图的告警
2. 用该告警数据创建工单到 192.168.0.104:48080
"""
import asyncio
import hashlib
import json
import time
import uuid
import httpx
import pymysql
from app.constants import ALARM_TYPE_NAMES, ALARM_LEVEL_NAMES
# ===== 数据库配置 =====
DB_CONFIG = {
"host": "124.221.55.225",
"port": 3306,
"user": "vsp_user",
"password": "VspPass2024!",
"database": "aiot_alarm",
"charset": "utf8mb4",
}
# ===== 工单 API 配置 =====
BASE_URL = "http://192.168.0.104:48080"
APP_ID = "alarm-system"
APP_SECRET = "tQ3v5q1z2ZLu7hrU1yseaHwg1wJUcmF1"
TENANT_ID = "1"
def query_recent_alarm_with_snapshot():
"""从数据库查询最近一条带截图的告警"""
conn = pymysql.connect(**DB_CONFIG)
try:
with conn.cursor(pymysql.cursors.DictCursor) as cur:
sql = """
SELECT alarm_id, alarm_type, device_id, scene_id,
event_time, alarm_level, confidence_score,
snapshot_url, edge_node_id, area_id
FROM alarm_event
WHERE snapshot_url IS NOT NULL
AND snapshot_url != ''
ORDER BY event_time DESC
LIMIT 5
"""
cur.execute(sql)
rows = cur.fetchall()
return rows
finally:
conn.close()
def build_sign(query_str: str, body_json: str, nonce: str, timestamp: str) -> str:
"""构建 SHA256 签名"""
header_str = f"appId={APP_ID}&nonce={nonce}&timestamp={timestamp}"
sign_string = f"{query_str}{body_json}{header_str}{APP_SECRET}"
return hashlib.sha256(sign_string.encode("utf-8")).hexdigest()
def build_headers(body_json: str) -> dict:
"""构造完整请求头"""
nonce = uuid.uuid4().hex[:16]
timestamp = str(int(time.time() * 1000))
sign = build_sign("", body_json, nonce, timestamp)
return {
"Content-Type": "application/json",
"tenant-id": TENANT_ID,
"appId": APP_ID,
"timestamp": timestamp,
"nonce": nonce,
"sign": sign,
}
async def create_order(alarm: dict) -> str:
"""用真实告警数据创建工单"""
alarm_type = alarm["alarm_type"]
alarm_level = alarm.get("alarm_level", 2)
type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type)
level_name = ALARM_LEVEL_NAMES.get(alarm_level, "一般")
device_id = alarm["device_id"]
event_time = str(alarm["event_time"])[:19] if alarm["event_time"] else ""
body = {
"title": f"{level_name}{type_name}告警 - {device_id}",
"areaId": alarm.get("area_id") or 1317,
"alarmId": alarm["alarm_id"],
"alarmType": alarm_type,
"description": f"设备 {device_id}{event_time} 检测到{type_name},置信度 {alarm.get('confidence_score', 0):.2f}",
"priority": alarm_level,
"triggerSource": "自动上报",
"cameraId": device_id,
"imageUrl": alarm["snapshot_url"],
}
body_json = json.dumps(body, ensure_ascii=False, separators=(",", ":"))
headers = build_headers(body_json)
url = f"{BASE_URL}/open-api/ops/security/order/create"
print(f"\n{'='*60}")
print(f"创建工单 — 使用真实告警数据")
print(f"{'='*60}")
print(f" 告警ID: {alarm['alarm_id']}")
print(f" 告警类型: {type_name}")
print(f" 设备ID: {device_id}")
print(f" 事件时间: {event_time}")
print(f" 置信度: {alarm.get('confidence_score', 'N/A')}")
print(f" 截图URL: {alarm['snapshot_url'][:80]}...")
print(f" 区域ID: {body['areaId']}")
print(f" 工单标题: {body['title']}")
print()
print(f" 请求URL: {url}")
print(f" 请求Body: {body_json[:200]}...")
print()
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(url, content=body_json, headers=headers)
print(f" HTTP Status: {resp.status_code}")
print(f" Response Body: {resp.text[:500]}")
if resp.status_code == 200:
data = resp.json()
if data.get("code") == 0:
order_id = str(data.get("data", ""))
print(f"\n ✅ 工单创建成功! orderId = {order_id}")
return order_id
else:
print(f"\n ❌ API 返回错误: code={data.get('code')}, msg={data.get('msg')}")
else:
print(f"\n ❌ HTTP 请求失败: {resp.status_code}")
return ""
async def main():
print(f"工单系统地址: {BASE_URL}")
print(f"数据库: {DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}")
print()
# 1. 查询最近带截图的告警
print("正在查询最近带截图的告警...")
alarms = query_recent_alarm_with_snapshot()
if not alarms:
print("❌ 未找到带截图的告警,无法测试")
return
print(f"\n找到 {len(alarms)} 条带截图的告警:")
print(f"{''*80}")
for i, a in enumerate(alarms):
type_name = ALARM_TYPE_NAMES.get(a["alarm_type"], a["alarm_type"])
print(f" [{i+1}] {a['alarm_id'][:40]}...")
print(f" 类型={type_name} 设备={a['device_id']} 时间={str(a['event_time'])[:19]}")
print(f" 截图={a['snapshot_url'][:70]}...")
print()
# 2. 用第一条创建工单
alarm = alarms[0]
print(f"将使用第 1 条告警创建工单...")
order_id = await create_order(alarm)
if order_id:
print(f"\n{'='*60}")
print(f"测试完成 — 工单 {order_id} 已创建")
print(f"请在 AIoT 平台确认工单内容和截图是否正确")
print(f"{'='*60}")
if __name__ == "__main__":
asyncio.run(main())