新增: 非机动车违停检测算法(non_motor_vehicle_parking)+ 修复 illegal_parking 参数不一致

This commit is contained in:
2026-04-09 10:34:55 +08:00
parent 5a0265de52
commit c6d8430867
2 changed files with 328 additions and 1 deletions

View File

@@ -1283,6 +1283,273 @@ class VehicleCongestionAlgorithm(BaseAlgorithm):
return state_info
class NonMotorVehicleParkingAlgorithm(BaseAlgorithm):
"""
非机动车违停检测算法(状态机版本 v1.0
状态机:
IDLE → CONFIRMING_VEHICLE → PARKED_COUNTDOWN → ALARMED → CONFIRMING_CLEAR → IDLE
业务流程:
1. 检测到非机动车进入禁停区 → 车辆确认期confirm_vehicle_sec默认10秒ratio>=0.6
2. 确认有车 → 违停倒计时parking_countdown_sec默认180秒/3分钟
3. 倒计时结束仍有车 → 触发告警ALARMED状态
4. 车辆离开 → 消失确认期confirm_clear_sec默认60秒ratio<0.2
5. 确认车辆离开 → 发送resolve事件 → 回到空闲状态
使用滑动窗口10秒抗抖动检测自行车和摩托车。
"""
# 状态定义
STATE_IDLE = "IDLE"
STATE_CONFIRMING_VEHICLE = "CONFIRMING_VEHICLE"
STATE_PARKED_COUNTDOWN = "PARKED_COUNTDOWN"
STATE_ALARMED = "ALARMED"
STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR"
# 告警级别常量(默认值,可通过 params 覆盖)
DEFAULT_ALARM_LEVEL = 2 # 普通
# 滑动窗口参数
WINDOW_SIZE_SEC = 10
# 阈值常量(与 IllegalParkingAlgorithm 一致)
RATIO_CONFIRMING_DROP = 0.3
RATIO_CONFIRM_VEHICLE = 0.6
RATIO_PARKED_LEAVE = 0.2
RATIO_ALARMED_CLEAR = 0.15
RATIO_CLEAR_RETURN = 0.5
RATIO_CLEAR_CONFIRM = 0.2
def __init__(
self,
confirm_vehicle_sec: int = 10,
parking_countdown_sec: int = 180,
confirm_clear_sec: int = 60,
cooldown_sec: int = 900,
target_classes: Optional[List[str]] = None,
alarm_level: Optional[int] = None,
):
super().__init__()
self.confirm_vehicle_sec = confirm_vehicle_sec
self.parking_countdown_sec = parking_countdown_sec
self.confirm_clear_sec = confirm_clear_sec
self.cooldown_sec = cooldown_sec
self.target_classes = target_classes or ["bicycle", "motorcycle"]
self._alarm_level = alarm_level if alarm_level is not None else self.DEFAULT_ALARM_LEVEL
# 状态变量
self.state: str = self.STATE_IDLE
self.state_start_time: Optional[datetime] = None
# 滑动窗口:存储 (timestamp, has_vehicle: bool)
self._detection_window: deque = deque(maxlen=1000)
# 告警追踪
self._parking_start_time: Optional[datetime] = None
# 冷却期管理
self.alert_cooldowns: Dict[str, datetime] = {}
def _check_target_classes(self, detection: Dict) -> bool:
"""检查检测目标是否属于非机动车类别"""
det_class = detection.get("class", "")
return det_class in self.target_classes
def _update_window(self, current_time: datetime, has_vehicle: bool):
"""更新滑动窗口"""
self._detection_window.append((current_time, has_vehicle))
cutoff = current_time - timedelta(seconds=self.WINDOW_SIZE_SEC)
while self._detection_window and self._detection_window[0][0] < cutoff:
self._detection_window.popleft()
def _get_window_ratio(self) -> float:
"""获取滑动窗口内的检测命中率"""
if not self._detection_window:
return 0.0
hits = sum(1 for _, has in self._detection_window if has)
return hits / len(self._detection_window)
def _scan_tracks(self, tracks: List[Dict], roi_id: str) -> Tuple[bool, int, List[float], float]:
"""
一次遍历 tracks返回 (has_target, count, latest_bbox, max_confidence)。
过滤 target_classes。
"""
has_target = False
count = 0
latest_bbox: List[float] = []
max_confidence = 0.0
for det in tracks:
if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det):
has_target = True
count += 1
if not latest_bbox:
latest_bbox = det.get("bbox", [])
conf = det.get("confidence", 0.0)
if conf > max_confidence:
max_confidence = conf
return has_target, count, latest_bbox, max_confidence
def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]:
for det in tracks:
if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det):
return det.get("bbox", [])
return []
def _get_max_confidence(self, tracks: List[Dict], roi_id: str) -> float:
"""获取ROI内非机动车的最高置信度"""
max_conf = 0.0
for det in tracks:
if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det):
max_conf = max(max_conf, det.get("confidence", 0.0))
return max_conf
def process(
self,
roi_id: str,
camera_id: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
"""处理单帧检测结果"""
current_time = current_time or datetime.now()
alerts = []
# 一次遍历获取所有信息
roi_has_vehicle, vehicle_count, scan_bbox, scan_confidence = self._scan_tracks(tracks, roi_id)
# 更新滑动窗口
self._update_window(current_time, roi_has_vehicle)
# 计算一次比率,后续分支复用
ratio = self._get_window_ratio()
# === 状态机处理 ===
if self.state == self.STATE_IDLE:
if roi_has_vehicle:
self.state = self.STATE_CONFIRMING_VEHICLE
self.state_start_time = current_time
logger.debug(f"ROI {roi_id}: IDLE → CONFIRMING_VEHICLE (非机动车)")
elif self.state == self.STATE_CONFIRMING_VEHICLE:
if self.state_start_time is None:
self.state = self.STATE_IDLE
return alerts
elapsed = (current_time - self.state_start_time).total_seconds()
if ratio < self.RATIO_CONFIRMING_DROP:
self.state = self.STATE_IDLE
self.state_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_VEHICLE → IDLE (ratio={ratio:.2f}<{self.RATIO_CONFIRMING_DROP})")
elif elapsed >= self.confirm_vehicle_sec and ratio >= self.RATIO_CONFIRM_VEHICLE:
self._parking_start_time = self.state_start_time
self.state = self.STATE_PARKED_COUNTDOWN
self.state_start_time = current_time
logger.info(f"ROI {roi_id}: CONFIRMING_VEHICLE → PARKED_COUNTDOWN (非机动车, ratio={ratio:.2f})")
elif self.state == self.STATE_PARKED_COUNTDOWN:
if self.state_start_time is None:
self.state = self.STATE_IDLE
return alerts
elapsed = (current_time - self.state_start_time).total_seconds()
if ratio < self.RATIO_PARKED_LEAVE:
self.state = self.STATE_IDLE
self.state_start_time = None
self._parking_start_time = None
logger.debug(f"ROI {roi_id}: PARKED_COUNTDOWN → IDLE (非机动车离开, ratio={ratio:.2f})")
elif elapsed >= self.parking_countdown_sec:
cooldown_key = f"{camera_id}_{roi_id}"
if cooldown_key not in self.alert_cooldowns or \
(current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec:
alerts.append({
"roi_id": roi_id,
"camera_id": camera_id,
"bbox": scan_bbox,
"alert_type": "non_motor_vehicle_parking",
"alarm_level": self._alarm_level,
"confidence": scan_confidence,
"message": f"检测到非机动车违停(已停留{int(elapsed / 60)}分钟)",
"first_frame_time": self._parking_start_time.strftime('%Y-%m-%d %H:%M:%S') if self._parking_start_time else None,
"duration_minutes": elapsed / 60,
})
self.alert_cooldowns[cooldown_key] = current_time
self.state = self.STATE_ALARMED
logger.warning(f"ROI {roi_id}: PARKED_COUNTDOWN → ALARMED (非机动车违停告警触发)")
else:
self.state = self.STATE_IDLE
self.state_start_time = None
self._parking_start_time = None
logger.debug(f"ROI {roi_id}: PARKED_COUNTDOWN → IDLE (冷却期内)")
elif self.state == self.STATE_ALARMED:
if ratio < self.RATIO_ALARMED_CLEAR:
self.state = self.STATE_CONFIRMING_CLEAR
self.state_start_time = current_time
logger.debug(f"ROI {roi_id}: ALARMED → CONFIRMING_CLEAR (ratio={ratio:.2f}<{self.RATIO_ALARMED_CLEAR})")
elif self.state == self.STATE_CONFIRMING_CLEAR:
if self.state_start_time is None:
self.state = self.STATE_IDLE
return alerts
elapsed = (current_time - self.state_start_time).total_seconds()
if ratio >= self.RATIO_CLEAR_RETURN:
self.state = self.STATE_ALARMED
self.state_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → ALARMED (非机动车仍在)")
elif elapsed >= self.confirm_clear_sec and ratio < self.RATIO_CLEAR_CONFIRM:
if self._last_alarm_id and self._parking_start_time:
duration_ms = int((current_time - self._parking_start_time).total_seconds() * 1000)
alerts.append({
"alert_type": "alarm_resolve",
"resolve_alarm_id": self._last_alarm_id,
"duration_ms": duration_ms,
"last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'),
"resolve_type": "vehicle_left",
})
logger.info(f"ROI {roi_id}: 非机动车违停告警已解决(车辆离开)")
self.state = self.STATE_IDLE
self.state_start_time = None
self._last_alarm_id = None
self._parking_start_time = None
self.alert_cooldowns.clear()
logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → IDLE")
return alerts
def reset(self):
"""重置算法状态"""
self.state = self.STATE_IDLE
self.state_start_time = None
self._last_alarm_id = None
self._parking_start_time = None
self._detection_window.clear()
self.alert_cooldowns.clear()
def get_state(self, current_time: Optional[datetime] = None) -> Dict[str, Any]:
"""获取当前状态"""
current_time = current_time or datetime.now()
window_ratio = self._get_window_ratio()
state_info = {
"state": self.state,
"state_start_time": self.state_start_time.isoformat() if self.state_start_time else None,
"window_ratio": window_ratio,
}
if self.state in (self.STATE_ALARMED, self.STATE_PARKED_COUNTDOWN) and self._parking_start_time:
state_info["parking_duration_sec"] = (current_time - self._parking_start_time).total_seconds()
state_info["alarm_id"] = self._last_alarm_id
return state_info
class AlgorithmManager:
def __init__(self, working_hours: Optional[List[Dict]] = None):
self.algorithms: Dict[str, Dict[str, Any]] = {}
@@ -1318,6 +1585,13 @@ class AlgorithmManager:
"cooldown_sec": 1800, # Bug fix: 与算法构造函数默认值一致1800非600
"target_classes": ["car", "truck", "bus", "motorcycle"],
},
"non_motor_vehicle_parking": {
"confirm_vehicle_sec": 10,
"parking_countdown_sec": 180,
"confirm_clear_sec": 60,
"cooldown_sec": 900,
"target_classes": ["bicycle", "motorcycle"],
},
}
self._pubsub = None
@@ -1537,6 +1811,36 @@ class AlgorithmManager:
dissipation_ratio=algo_params["dissipation_ratio"],
)
logger.info(f"已从Redis加载拥堵算法: {key}")
elif algo_code == "non_motor_vehicle_parking":
configured_alarm_level = params.get("alarm_level")
algo_params = {
"confirm_vehicle_sec": params.get("confirm_vehicle_sec", 10),
"parking_countdown_sec": params.get("parking_countdown_sec", 180),
"confirm_clear_sec": params.get("confirm_clear_sec", 60),
"cooldown_sec": params.get("cooldown_sec", 900),
"target_classes": params.get("target_classes", ["bicycle", "motorcycle"]),
}
if key in self.algorithms.get(roi_id, {}) and "non_motor_vehicle_parking" in self.algorithms[roi_id].get(key, {}):
algo = self.algorithms[roi_id][key]["non_motor_vehicle_parking"]
algo.confirm_vehicle_sec = algo_params["confirm_vehicle_sec"]
algo.parking_countdown_sec = algo_params["parking_countdown_sec"]
algo.confirm_clear_sec = algo_params["confirm_clear_sec"]
algo.cooldown_sec = algo_params["cooldown_sec"]
algo.target_classes = algo_params["target_classes"]
if configured_alarm_level is not None:
algo._alarm_level = configured_alarm_level
logger.info(f"已热更新非机动车违停算法参数: {key}")
else:
self.algorithms[roi_id][key] = {}
self.algorithms[roi_id][key]["non_motor_vehicle_parking"] = NonMotorVehicleParkingAlgorithm(
confirm_vehicle_sec=algo_params["confirm_vehicle_sec"],
parking_countdown_sec=algo_params["parking_countdown_sec"],
confirm_clear_sec=algo_params["confirm_clear_sec"],
cooldown_sec=algo_params["cooldown_sec"],
target_classes=algo_params["target_classes"],
alarm_level=configured_alarm_level,
)
logger.info(f"已从Redis加载非机动车违停算法: {key}")
return True
except Exception as e:
@@ -1661,6 +1965,19 @@ class AlgorithmManager:
logger.info(f"[{roi_id}_{bind_id}] 更新拥堵检测参数")
elif algo_code == "non_motor_vehicle_parking":
existing_algo.confirm_vehicle_sec = params.get("confirm_vehicle_sec", 10)
existing_algo.parking_countdown_sec = params.get("parking_countdown_sec", 180)
existing_algo.confirm_clear_sec = params.get("confirm_clear_sec", 60)
existing_algo.cooldown_sec = params.get("cooldown_sec", 900)
if "target_classes" in params:
existing_algo.target_classes = params["target_classes"]
alarm_level = params.get("alarm_level")
if alarm_level is not None:
existing_algo._alarm_level = alarm_level
logger.info(f"[{roi_id}_{bind_id}] 更新非机动车违停检测参数")
return True
except Exception as e:
@@ -1800,6 +2117,15 @@ class AlgorithmManager:
alarm_level=configured_alarm_level,
dissipation_ratio=algo_params.get("dissipation_ratio", 0.5),
)
elif algorithm_type == "non_motor_vehicle_parking":
self.algorithms[roi_id][key]["non_motor_vehicle_parking"] = NonMotorVehicleParkingAlgorithm(
confirm_vehicle_sec=algo_params.get("confirm_vehicle_sec", 10),
parking_countdown_sec=algo_params.get("parking_countdown_sec", 180),
confirm_clear_sec=algo_params.get("confirm_clear_sec", 60),
cooldown_sec=algo_params.get("cooldown_sec", 900),
target_classes=algo_params.get("target_classes", ["bicycle", "motorcycle"]),
alarm_level=configured_alarm_level,
)
self._registered_keys.add(cache_key)
@@ -1892,7 +2218,7 @@ class AlgorithmManager:
"state": getattr(algo, "state", "WAITING"),
"alarm_sent": getattr(algo, "alarm_sent", False),
}
elif algo_type in ("illegal_parking", "vehicle_congestion"):
elif algo_type in ("illegal_parking", "vehicle_congestion", "non_motor_vehicle_parking"):
status[f"{algo_type}_{bind_id}"] = algo.get_state()
else:
status[f"{algo_type}_{bind_id}"] = {