fix(intrusion): 修复代码质量问题 - 空值检查和常量提取

修复内容:
1. 添加 state_start_time 空值检查(CONFIRMING_INTRUSION 和 CONFIRMING_CLEAR 状态)
2. 移除 get_state() 未使用的 roi_id 参数
3. get_state() 接受 current_time 参数,避免直接调用 datetime.now()
4. 提取魔法数字:ALARM_LEVEL_INTRUSION = 3

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-14 10:23:14 +08:00
parent 2d90f00b11
commit 37fc48e34d

View File

@@ -389,6 +389,9 @@ class IntrusionAlgorithm:
STATE_ALARMED = "ALARMED" # 已告警(等待入侵消失)
STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR" # 入侵消失确认中
# 告警级别常量
ALARM_LEVEL_INTRUSION = 3
def __init__(
self,
cooldown_seconds: int = 300,
@@ -472,41 +475,46 @@ class IntrusionAlgorithm:
elif self.state == self.STATE_CONFIRMING_INTRUSION:
# 入侵确认中:需要持续检测到人
elapsed = (current_time - self.state_start_time).total_seconds()
if not roi_has_person:
# 人消失了回到IDLE
if self.state_start_time is None:
# 防御性编程如果状态时间为空重置到IDLE
self.state = self.STATE_IDLE
self.state_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_INTRUSION → IDLE (人消失)")
elif elapsed >= self.confirm_seconds:
# 入侵确认成功,检查冷却期
cooldown_key = f"{camera_id}_{roi_id}"
if cooldown_key not in self.alert_cooldowns or \
(current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_seconds:
logger.error(f"ROI {roi_id}: CONFIRMING_INTRUSION 状态异常state_start_time为None重置到IDLE")
else:
elapsed = (current_time - self.state_start_time).total_seconds()
bbox = self._get_latest_bbox(tracks, roi_id)
self._intrusion_start_time = self.state_start_time # 记录入侵开始时间
alerts.append({
"roi_id": roi_id,
"camera_id": camera_id,
"bbox": bbox,
"alert_type": "intrusion",
"alarm_level": 3,
"message": "检测到周界入侵",
"first_frame_time": self._intrusion_start_time.strftime('%Y-%m-%d %H:%M:%S'),
})
self.alert_cooldowns[cooldown_key] = current_time
self.state = self.STATE_ALARMED
# _last_alarm_id 由 main.py 通过 set_last_alarm_id() 回填
logger.warning(f"ROI {roi_id}: CONFIRMING_INTRUSION → ALARMED (告警触发)")
else:
# 冷却期内回到IDLE
if not roi_has_person:
# 人消失了回到IDLE
self.state = self.STATE_IDLE
self.state_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_INTRUSION → IDLE (冷却期内)")
logger.debug(f"ROI {roi_id}: CONFIRMING_INTRUSION → IDLE (人消失)")
elif elapsed >= self.confirm_seconds:
# 入侵确认成功,检查冷却期
cooldown_key = f"{camera_id}_{roi_id}"
if cooldown_key not in self.alert_cooldowns or \
(current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_seconds:
bbox = self._get_latest_bbox(tracks, roi_id)
self._intrusion_start_time = self.state_start_time # 记录入侵开始时间
alerts.append({
"roi_id": roi_id,
"camera_id": camera_id,
"bbox": bbox,
"alert_type": "intrusion",
"alarm_level": self.ALARM_LEVEL_INTRUSION,
"message": "检测到周界入侵",
"first_frame_time": self._intrusion_start_time.strftime('%Y-%m-%d %H:%M:%S'),
})
self.alert_cooldowns[cooldown_key] = current_time
self.state = self.STATE_ALARMED
# _last_alarm_id 由 main.py 通过 set_last_alarm_id() 回填
logger.warning(f"ROI {roi_id}: CONFIRMING_INTRUSION → ALARMED (告警触发)")
else:
# 冷却期内回到IDLE
self.state = self.STATE_IDLE
self.state_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_INTRUSION → IDLE (冷却期内)")
elif self.state == self.STATE_ALARMED:
# 已告警状态:等待入侵消失
@@ -518,33 +526,38 @@ class IntrusionAlgorithm:
elif self.state == self.STATE_CONFIRMING_CLEAR:
# 入侵消失确认中:需要持续未检测到人
elapsed = (current_time - self.state_start_time).total_seconds()
if roi_has_person:
# 人又出现了回到ALARMED
self.state = self.STATE_ALARMED
self.state_start_time = current_time
logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → ALARMED (人又出现)")
elif elapsed >= self.confirm_seconds:
# 消失确认成功发送resolve事件
if self._last_alarm_id and self._intrusion_start_time:
duration_ms = int((current_time - self._intrusion_start_time).total_seconds() * 1000)
alerts.append({
"alert_type": "alarm_resolve",
"resolve_alarm_id": self._last_alarm_id,
"duration_ms": duration_ms,
"last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'),
"resolve_type": "intrusion_cleared",
})
logger.info(f"ROI {roi_id}: 告警已解决(入侵消失)")
# 重置状态
if self.state_start_time is None:
# 防御性编程如果状态时间为空重置到IDLE
self.state = self.STATE_IDLE
self.state_start_time = None
self._last_alarm_id = None
self._intrusion_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → IDLE (消失确认成功)")
logger.error(f"ROI {roi_id}: CONFIRMING_CLEAR 状态异常state_start_time为None重置到IDLE")
else:
elapsed = (current_time - self.state_start_time).total_seconds()
if roi_has_person:
# 人又出现了回到ALARMED
self.state = self.STATE_ALARMED
self.state_start_time = current_time
logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → ALARMED (人又出现)")
elif elapsed >= self.confirm_seconds:
# 消失确认成功发送resolve事件
if self._last_alarm_id and self._intrusion_start_time:
duration_ms = int((current_time - self._intrusion_start_time).total_seconds() * 1000)
alerts.append({
"alert_type": "alarm_resolve",
"resolve_alarm_id": self._last_alarm_id,
"duration_ms": duration_ms,
"last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'),
"resolve_type": "intrusion_cleared",
})
logger.info(f"ROI {roi_id}: 告警已解决(入侵消失)")
# 重置状态
self.state = self.STATE_IDLE
self.state_start_time = None
self._last_alarm_id = None
self._intrusion_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → IDLE (消失确认成功)")
return alerts
@@ -564,8 +577,10 @@ class IntrusionAlgorithm:
self.alert_triggered.clear()
self.detection_start.clear()
def get_state(self, roi_id: str) -> Dict[str, Any]:
def get_state(self, current_time: Optional[datetime] = None) -> Dict[str, Any]:
"""获取当前状态(用于调试和监控)"""
current_time = current_time or datetime.now()
state_info = {
"state": self.state,
"state_start_time": self.state_start_time.isoformat() if self.state_start_time else None,
@@ -573,7 +588,7 @@ class IntrusionAlgorithm:
# 添加状态特定信息
if self.state == self.STATE_ALARMED and self._intrusion_start_time:
total_intrusion_sec = (datetime.now() - self._intrusion_start_time).total_seconds()
total_intrusion_sec = (current_time - self._intrusion_start_time).total_seconds()
state_info["total_intrusion_sec"] = total_intrusion_sec
state_info["alarm_id"] = self._last_alarm_id