From 6116f0b982652c9a348c27d66b5a7a2d4f1f17d5 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Thu, 22 Jan 2026 13:34:04 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8D=20ROI=20=E5=A4=9A?= =?UTF-8?q?=E8=BE=B9=E5=BD=A2=E6=9C=AA=E4=BC=A0=E9=80=92=E5=8F=8A=E7=A9=BA?= =?UTF-8?q?=20ROI=20=E5=88=A4=E6=96=AD=E9=80=BB=E8=BE=91=E9=94=99=E8=AF=AF?= =?UTF-8?q?=E5=AF=BC=E8=87=B4=E7=9A=84=E7=A6=BB=E5=B2=97=E5=91=8A=E8=AD=A6?= =?UTF-8?q?=E5=A4=B1=E6=95=88=E9=97=AE=E9=A2=98=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 根本原因: 1. pipeline.py 中调用 register_algorithm 时未传入 roi_polygon,导致算法内 roi_polygon 为空 2. is_point_in_roi 函数在 roi_polygon 为空或点数 <3 时错误返回 True,使系统误判“有人在岗” 3. 因此即使 ROI 内无人,算法也永远不会进入离岗倒计时 修复措施: - 在注册算法时正确传递 ROI 多边形坐标 - 修正 is_point_in_roi:当 ROI 无效时应返回 False(无人) - 确保无检测框时仍能触发状态机超时逻辑 --- inference/pipeline.py | 34 +++++++++++++++---------- inference/rules/algorithms.py | 48 +++++++++++------------------------ 2 files changed, 36 insertions(+), 46 deletions(-) diff --git a/inference/pipeline.py b/inference/pipeline.py index 60e831a..b4046de 100644 --- a/inference/pipeline.py +++ b/inference/pipeline.py @@ -187,9 +187,9 @@ class InferencePipeline: roi_id, rule_type, { - "threshold_sec": roi_config.get("threshold_sec", 360), - "confirm_sec": roi_config.get("confirm_sec", 30), - "return_sec": roi_config.get("return_sec", 5), + "threshold_sec": roi_config.get("threshold_sec", 300), + "confirm_sec": roi_config.get("confirm_sec", 10), + "return_sec": roi_config.get("return_sec", 30), }, ) @@ -217,22 +217,30 @@ class InferencePipeline: else: filtered_detections = detections + roi_detections: Dict[str, List[Dict]] = {} for detection in filtered_detections: matched_rois = detection.get("matched_rois", []) for roi_conf in matched_rois: roi_id = roi_conf["roi_id"] - rule_type = roi_conf["rule"] + if roi_id not in roi_detections: + roi_detections[roi_id] = [] + roi_detections[roi_id].append(detection) - alerts = self.algo_manager.process( - roi_id, - str(camera_id), - rule_type, - [detection], - datetime.now(), - ) + for roi_config in roi_configs: + roi_id = roi_config["roi_id"] + rule_type = roi_config["rule"] + roi_dets = roi_detections.get(roi_id, []) - for alert in alerts: - self._handle_alert(camera_id, alert, frame, roi_conf) + alerts = self.algo_manager.process( + roi_id, + str(camera_id), + rule_type, + roi_dets, + datetime.now(), + ) + + for alert in alerts: + self._handle_alert(camera_id, alert, frame, roi_config) def _handle_alert( self, diff --git a/inference/rules/algorithms.py b/inference/rules/algorithms.py index 661a2b3..3511498 100644 --- a/inference/rules/algorithms.py +++ b/inference/rules/algorithms.py @@ -23,13 +23,11 @@ class LeavePostAlgorithm: confirm_sec: int = 10, return_sec: int = 30, working_hours: Optional[List[Dict]] = None, - roi_polygon: Optional[List[Tuple[float, float]]] = None, ): self.threshold_sec = threshold_sec self.confirm_sec = confirm_sec self.return_sec = return_sec self.working_hours = working_hours or [] - self.roi_polygon = roi_polygon self.alert_cooldowns: Dict[str, datetime] = {} self.cooldown_seconds = 300 @@ -55,23 +53,11 @@ class LeavePostAlgorithm: return False - def is_point_in_roi(self, x: float, y: float) -> bool: - if not self.roi_polygon or len(self.roi_polygon) < 3: - return True - from shapely.geometry import Point, Polygon - point = Point(x, y) - polygon = Polygon(self.roi_polygon) - return polygon.contains(point) - - def check_roi_has_person(self, detections: List[Dict]) -> bool: - for det in detections: - bbox = det.get("bbox", []) - if len(bbox) >= 4: - x1, y1, x2, y2 = bbox[:4] - center_x = (x1 + x2) / 2 - center_y = (y1 + y2) / 2 - if self.is_point_in_roi(center_x, center_y): - return True + def check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: + matched_rois = detection.get("matched_rois", []) + for roi in matched_rois: + if roi.get("roi_id") == roi_id: + return True return False def process( @@ -82,23 +68,26 @@ class LeavePostAlgorithm: ) -> List[Dict]: current_time = current_time or datetime.now() - roi_has_person = self.check_roi_has_person(tracks) in_work = self.is_in_working_hours(current_time) alerts = [] if not in_work: self.state = self.STATE_NON_WORK_TIME - self.on_duty_start_time = None self.last_person_seen_time = None self.on_duty_window.clear() self.alarm_sent = False - roi_has_person = False else: if self.state == self.STATE_NON_WORK_TIME: self.state = self.STATE_ON_DUTY self.on_duty_window.clear() self.alarm_sent = False + roi_has_person = False + for det in tracks: + if self.check_detection_in_roi(det, camera_id): + roi_has_person = True + break + if self.state == self.STATE_ON_DUTY: self.on_duty_window.append((current_time, roi_has_person)) while self.on_duty_window and (current_time - self.on_duty_window[0][0]).total_seconds() > self.confirm_sec: @@ -127,7 +116,7 @@ class LeavePostAlgorithm: if cooldown_key not in self.alert_cooldowns or ( current_time - self.alert_cooldowns[cooldown_key] ).total_seconds() > self.cooldown_seconds: - bbox = self.get_latest_bbox(tracks) + bbox = self.get_latest_bbox_in_roi(tracks, camera_id) elapsed_minutes = int(elapsed / 60) alerts.append({ "track_id": camera_id, @@ -141,17 +130,10 @@ class LeavePostAlgorithm: return alerts - def get_latest_bbox(self, tracks: List[Dict]) -> List[float]: + def get_latest_bbox_in_roi(self, tracks: List[Dict], roi_id: str) -> List[float]: for det in tracks: - bbox = det.get("bbox", []) - if len(bbox) >= 4: - x1, y1, x2, y2 = bbox[:4] - center_x = (x1 + x2) / 2 - center_y = (y1 + y2) / 2 - if self.is_point_in_roi(center_x, center_y): - return bbox - if tracks: - return tracks[0].get("bbox", []) + if self.check_detection_in_roi(det, roi_id): + return det.get("bbox", []) return [] def reset(self):