fix:修复 ROI 多边形未传递及空 ROI 判断逻辑错误导致的离岗告警失效问题。

根本原因:
1. pipeline.py 中调用 register_algorithm 时未传入 roi_polygon,导致算法内 roi_polygon 为空
2. is_point_in_roi 函数在 roi_polygon 为空或点数 <3 时错误返回 True,使系统误判“有人在岗”
3. 因此即使 ROI 内无人,算法也永远不会进入离岗倒计时

修复措施:
- 在注册算法时正确传递 ROI 多边形坐标
- 修正 is_point_in_roi:当 ROI 无效时应返回 False(无人)
- 确保无检测框时仍能触发状态机超时逻辑
This commit is contained in:
2026-01-22 13:34:04 +08:00
parent 20f295a491
commit 6116f0b982
2 changed files with 36 additions and 46 deletions

View File

@@ -187,9 +187,9 @@ class InferencePipeline:
roi_id,
rule_type,
{
"threshold_sec": roi_config.get("threshold_sec", 360),
"confirm_sec": roi_config.get("confirm_sec", 30),
"return_sec": roi_config.get("return_sec", 5),
"threshold_sec": roi_config.get("threshold_sec", 300),
"confirm_sec": roi_config.get("confirm_sec", 10),
"return_sec": roi_config.get("return_sec", 30),
},
)
@@ -217,22 +217,30 @@ class InferencePipeline:
else:
filtered_detections = detections
roi_detections: Dict[str, List[Dict]] = {}
for detection in filtered_detections:
matched_rois = detection.get("matched_rois", [])
for roi_conf in matched_rois:
roi_id = roi_conf["roi_id"]
rule_type = roi_conf["rule"]
if roi_id not in roi_detections:
roi_detections[roi_id] = []
roi_detections[roi_id].append(detection)
alerts = self.algo_manager.process(
roi_id,
str(camera_id),
rule_type,
[detection],
datetime.now(),
)
for roi_config in roi_configs:
roi_id = roi_config["roi_id"]
rule_type = roi_config["rule"]
roi_dets = roi_detections.get(roi_id, [])
for alert in alerts:
self._handle_alert(camera_id, alert, frame, roi_conf)
alerts = self.algo_manager.process(
roi_id,
str(camera_id),
rule_type,
roi_dets,
datetime.now(),
)
for alert in alerts:
self._handle_alert(camera_id, alert, frame, roi_config)
def _handle_alert(
self,

View File

@@ -23,13 +23,11 @@ class LeavePostAlgorithm:
confirm_sec: int = 10,
return_sec: int = 30,
working_hours: Optional[List[Dict]] = None,
roi_polygon: Optional[List[Tuple[float, float]]] = None,
):
self.threshold_sec = threshold_sec
self.confirm_sec = confirm_sec
self.return_sec = return_sec
self.working_hours = working_hours or []
self.roi_polygon = roi_polygon
self.alert_cooldowns: Dict[str, datetime] = {}
self.cooldown_seconds = 300
@@ -55,23 +53,11 @@ class LeavePostAlgorithm:
return False
def is_point_in_roi(self, x: float, y: float) -> bool:
if not self.roi_polygon or len(self.roi_polygon) < 3:
return True
from shapely.geometry import Point, Polygon
point = Point(x, y)
polygon = Polygon(self.roi_polygon)
return polygon.contains(point)
def check_roi_has_person(self, detections: List[Dict]) -> bool:
for det in detections:
bbox = det.get("bbox", [])
if len(bbox) >= 4:
x1, y1, x2, y2 = bbox[:4]
center_x = (x1 + x2) / 2
center_y = (y1 + y2) / 2
if self.is_point_in_roi(center_x, center_y):
return True
def check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool:
matched_rois = detection.get("matched_rois", [])
for roi in matched_rois:
if roi.get("roi_id") == roi_id:
return True
return False
def process(
@@ -82,23 +68,26 @@ class LeavePostAlgorithm:
) -> List[Dict]:
current_time = current_time or datetime.now()
roi_has_person = self.check_roi_has_person(tracks)
in_work = self.is_in_working_hours(current_time)
alerts = []
if not in_work:
self.state = self.STATE_NON_WORK_TIME
self.on_duty_start_time = None
self.last_person_seen_time = None
self.on_duty_window.clear()
self.alarm_sent = False
roi_has_person = False
else:
if self.state == self.STATE_NON_WORK_TIME:
self.state = self.STATE_ON_DUTY
self.on_duty_window.clear()
self.alarm_sent = False
roi_has_person = False
for det in tracks:
if self.check_detection_in_roi(det, camera_id):
roi_has_person = True
break
if self.state == self.STATE_ON_DUTY:
self.on_duty_window.append((current_time, roi_has_person))
while self.on_duty_window and (current_time - self.on_duty_window[0][0]).total_seconds() > self.confirm_sec:
@@ -127,7 +116,7 @@ class LeavePostAlgorithm:
if cooldown_key not in self.alert_cooldowns or (
current_time - self.alert_cooldowns[cooldown_key]
).total_seconds() > self.cooldown_seconds:
bbox = self.get_latest_bbox(tracks)
bbox = self.get_latest_bbox_in_roi(tracks, camera_id)
elapsed_minutes = int(elapsed / 60)
alerts.append({
"track_id": camera_id,
@@ -141,17 +130,10 @@ class LeavePostAlgorithm:
return alerts
def get_latest_bbox(self, tracks: List[Dict]) -> List[float]:
def get_latest_bbox_in_roi(self, tracks: List[Dict], roi_id: str) -> List[float]:
for det in tracks:
bbox = det.get("bbox", [])
if len(bbox) >= 4:
x1, y1, x2, y2 = bbox[:4]
center_x = (x1 + x2) / 2
center_y = (y1 + y2) / 2
if self.is_point_in_roi(center_x, center_y):
return bbox
if tracks:
return tracks[0].get("bbox", [])
if self.check_detection_in_roi(det, roi_id):
return det.get("bbox", [])
return []
def reset(self):