功能:告警级别支持前端配置下发 + 级别体系统一为 0-3

- 四种算法均支持通过 params.alarm_level 覆盖默认告警级别
- 级别体系统一:0紧急/1重要/2普通/3轻微
- 车辆拥堵默认阈值调整为 5 辆

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-18 16:39:58 +08:00
parent 3d91aa1a67
commit 9c39913a55
2 changed files with 30 additions and 11 deletions

View File

@@ -43,6 +43,9 @@ class LeavePostAlgorithm:
STATE_ALARMED = "ALARMED" # 已告警(等待回岗)
STATE_NON_WORK_TIME = "NON_WORK_TIME" # 非工作时间
# 告警级别常量(默认值,可通过 params 覆盖)
DEFAULT_ALARM_LEVEL = 2 # 普通
def __init__(
self,
confirm_on_duty_sec: int = 10, # 上岗确认窗口(持续检测到人的时长)
@@ -52,6 +55,7 @@ class LeavePostAlgorithm:
cooldown_sec: int = 600, # 告警冷却期(两次告警的最小间隔)
working_hours: Optional[List[Dict]] = None,
target_class: Optional[str] = "person",
alarm_level: Optional[int] = None,
# 兼容旧参数名(向后兼容)
confirm_leave_sec: Optional[int] = None,
):
@@ -65,6 +69,7 @@ class LeavePostAlgorithm:
# 工作时间和目标类别
self.working_hours = working_hours or []
self.target_class = target_class
self._alarm_level = alarm_level if alarm_level is not None else self.DEFAULT_ALARM_LEVEL
# 状态变量
self.state: str = self.STATE_INIT
@@ -295,6 +300,7 @@ class LeavePostAlgorithm:
"camera_id": camera_id,
"bbox": bbox,
"alert_type": "leave_post",
"alarm_level": self._alarm_level,
"message": "人员离岗告警",
"first_frame_time": self._leave_start_time.strftime('%Y-%m-%d %H:%M:%S'),
})
@@ -392,8 +398,8 @@ class IntrusionAlgorithm:
STATE_ALARMED = "ALARMED" # 已告警(等待入侵消失)
STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR" # 入侵消失确认中
# 告警级别常量
ALARM_LEVEL_INTRUSION = 3
# 告警级别常量(默认值,可通过 params 覆盖)
DEFAULT_ALARM_LEVEL = 1 # 重要
def __init__(
self,
@@ -402,6 +408,7 @@ class IntrusionAlgorithm:
confirm_intrusion_seconds: Optional[int] = None, # 入侵确认时间默认5秒
confirm_clear_seconds: Optional[int] = None, # 消失确认时间默认180秒
target_class: Optional[str] = None,
alarm_level: Optional[int] = None,
):
self.cooldown_seconds = cooldown_seconds
@@ -413,6 +420,7 @@ class IntrusionAlgorithm:
self.confirm_seconds = confirm_seconds
self.target_class = target_class
self._alarm_level = alarm_level if alarm_level is not None else self.DEFAULT_ALARM_LEVEL
# 状态变量
self.state: str = self.STATE_IDLE
@@ -516,7 +524,7 @@ class IntrusionAlgorithm:
"camera_id": camera_id,
"bbox": bbox,
"alert_type": "intrusion",
"alarm_level": self.ALARM_LEVEL_INTRUSION,
"alarm_level": self._alarm_level,
"message": "检测到周界入侵",
"first_frame_time": self._intrusion_start_time.strftime('%Y-%m-%d %H:%M:%S'),
})
@@ -729,8 +737,8 @@ class IllegalParkingAlgorithm:
STATE_ALARMED = "ALARMED"
STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR"
# 告警级别常量
ALARM_LEVEL_ILLEGAL_PARKING = 2 # 一般级别
# 告警级别常量(默认值,可通过 params 覆盖)
DEFAULT_ALARM_LEVEL = 1 # 重要
# 滑动窗口参数
WINDOW_SIZE_SEC = 10
@@ -742,12 +750,14 @@ class IllegalParkingAlgorithm:
confirm_clear_sec: int = 30,
cooldown_sec: int = 600,
target_classes: Optional[List[str]] = None,
alarm_level: Optional[int] = None,
):
self.confirm_vehicle_sec = confirm_vehicle_sec
self.parking_countdown_sec = parking_countdown_sec
self.confirm_clear_sec = confirm_clear_sec
self.cooldown_sec = cooldown_sec
self.target_classes = target_classes or ["car", "truck", "bus", "motorcycle"]
self._alarm_level = alarm_level if alarm_level is not None else self.DEFAULT_ALARM_LEVEL
# 状态变量
self.state: str = self.STATE_IDLE
@@ -878,7 +888,7 @@ class IllegalParkingAlgorithm:
"camera_id": camera_id,
"bbox": bbox,
"alert_type": "illegal_parking",
"alarm_level": self.ALARM_LEVEL_ILLEGAL_PARKING,
"alarm_level": self._alarm_level,
"confidence": confidence,
"message": f"检测到车辆违停(已停留{int(elapsed / 60)}分钟)",
"first_frame_time": self._parking_start_time.strftime('%Y-%m-%d %H:%M:%S') if self._parking_start_time else None,
@@ -982,25 +992,27 @@ class VehicleCongestionAlgorithm:
STATE_CONGESTED = "CONGESTED"
STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR"
# 告警级别
ALARM_LEVEL_CONGESTION = 2 # 一般级别
# 告警级别(默认值,可通过 params 覆盖)
DEFAULT_ALARM_LEVEL = 2 # 普通
# 滑动窗口参数
WINDOW_SIZE_SEC = 10
def __init__(
self,
count_threshold: int = 3,
count_threshold: int = 5,
confirm_congestion_sec: int = 60,
confirm_clear_sec: int = 120,
cooldown_sec: int = 600,
target_classes: Optional[List[str]] = None,
alarm_level: Optional[int] = None,
):
self.count_threshold = count_threshold
self.confirm_congestion_sec = confirm_congestion_sec
self.confirm_clear_sec = confirm_clear_sec
self.cooldown_sec = cooldown_sec
self.target_classes = target_classes or ["car", "truck", "bus", "motorcycle"]
self._alarm_level = alarm_level if alarm_level is not None else self.DEFAULT_ALARM_LEVEL
# 状态变量
self.state: str = self.STATE_NORMAL
@@ -1112,7 +1124,7 @@ class VehicleCongestionAlgorithm:
"camera_id": camera_id,
"bbox": bbox,
"alert_type": "vehicle_congestion",
"alarm_level": self.ALARM_LEVEL_CONGESTION,
"alarm_level": self._alarm_level,
"confidence": confidence,
"message": f"检测到车辆拥堵(平均{avg_count:.0f}辆,持续{int(elapsed)}秒)",
"first_frame_time": self._congestion_start_time.strftime('%Y-%m-%d %H:%M:%S') if self._congestion_start_time else None,
@@ -1571,6 +1583,9 @@ class AlgorithmManager:
if params:
algo_params.update(params)
# 从 params 中提取告警等级(前端配置下发)
configured_alarm_level = algo_params.get("alarm_level")
if algorithm_type == "leave_post":
roi_working_hours = algo_params.get("working_hours") or self.working_hours
self.algorithms[roi_id][key]["leave_post"] = LeavePostAlgorithm(
@@ -1580,12 +1595,14 @@ class AlgorithmManager:
cooldown_sec=algo_params.get("cooldown_sec", 600),
working_hours=roi_working_hours,
target_class=algo_params.get("target_class", "person"),
alarm_level=configured_alarm_level,
)
elif algorithm_type == "intrusion":
self.algorithms[roi_id][key]["intrusion"] = IntrusionAlgorithm(
cooldown_seconds=algo_params.get("cooldown_seconds", 300),
confirm_seconds=algo_params.get("confirm_seconds", 5),
target_class=algo_params.get("target_class"),
alarm_level=configured_alarm_level,
)
elif algorithm_type == "illegal_parking":
self.algorithms[roi_id][key]["illegal_parking"] = IllegalParkingAlgorithm(
@@ -1594,6 +1611,7 @@ class AlgorithmManager:
confirm_clear_sec=algo_params.get("confirm_clear_sec", 30),
cooldown_sec=algo_params.get("cooldown_sec", 600),
target_classes=algo_params.get("target_classes", ["car", "truck", "bus", "motorcycle"]),
alarm_level=configured_alarm_level,
)
elif algorithm_type == "vehicle_congestion":
self.algorithms[roi_id][key]["vehicle_congestion"] = VehicleCongestionAlgorithm(
@@ -1602,6 +1620,7 @@ class AlgorithmManager:
confirm_clear_sec=algo_params.get("confirm_clear_sec", 120),
cooldown_sec=algo_params.get("cooldown_sec", 600),
target_classes=algo_params.get("target_classes", ["car", "truck", "bus", "motorcycle"]),
alarm_level=configured_alarm_level,
)
self._registered_keys.add(cache_key)

View File

@@ -29,7 +29,7 @@ class AlarmInfo:
device_id: str
scene_id: str
event_time: str # ISO8601
alarm_level: int # 1-4
alarm_level: int # 0紧急 1重要 2普通 3轻微
snapshot_b64: Optional[str] = None # Base64 编码的 JPEG 截图
algorithm_code: Optional[str] = None
confidence_score: Optional[float] = None