Files
Security_AI_integrated/monitor.py
2026-01-20 11:39:23 +08:00

1137 lines
49 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import numpy as np
import yaml
import torch
from ultralytics import YOLO
import time
import datetime
import threading
import queue
import sys
import argparse
import base64
import os
from openai import OpenAI
from io import BytesIO
from PIL import Image, ImageDraw, ImageFont
def save_alert_image(frame, cam_id, roi_name, alert_type, alert_info=""):
"""保存告警图片
Args:
frame: OpenCV图像
cam_id: 摄像头ID
roi_name: ROI区域名称
alert_type: 告警类型 ("离岗""入侵")
alert_info: 告警信息(可选)
"""
try:
# 创建文件夹结构
data_dir = "data"
alert_dir = os.path.join(data_dir, alert_type)
os.makedirs(alert_dir, exist_ok=True)
# 生成文件名:根据告警类型使用不同的命名方式
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# 清理文件名中的特殊字符
safe_roi_name = roi_name.replace("/", "_").replace("\\", "_").replace(":", "_")
# 对于入侵告警,使用告警类型+ROI名称对于离岗告警使用ROI名称
if alert_type == "入侵":
# 周界入侵:使用"入侵_区域名称"格式
filename = f"{cam_id}_入侵_{safe_roi_name}_{timestamp}.jpg"
else:
# 离岗:使用原有格式
filename = f"{cam_id}_{safe_roi_name}_{timestamp}.jpg"
filepath = os.path.join(alert_dir, filename)
# 保存图片
cv2.imwrite(filepath, frame)
print(f"[{cam_id}] 💾 告警图片已保存: {filepath}")
# 如果有告警信息,保存到文本文件
if alert_info:
info_filepath = filepath.replace(".jpg", ".txt")
with open(info_filepath, 'w', encoding='utf-8') as f:
f.write(f"告警时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"摄像头ID: {cam_id}\n")
f.write(f"ROI区域: {roi_name}\n")
f.write(f"告警类型: {alert_type}\n")
f.write(f"告警信息:\n{alert_info}\n")
return filepath
except Exception as e:
print(f"[{cam_id}] 保存告警图片失败: {e}")
return None
def put_chinese_text(img, text, position, font_size=20, color=(255, 255, 255), thickness=1):
"""在OpenCV图像上绘制中文文本
Args:
img: OpenCV图像 (BGR格式)
text: 要显示的文本(支持中文)
position: 文本位置 (x, y)
font_size: 字体大小
color: 颜色 (BGR格式会被转换为RGB)
thickness: 线条粗细PIL不支持保留参数以兼容
"""
try:
# 将OpenCV图像转换为PIL图像
img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img_pil)
# 转换颜色格式BGR -> RGB
color_rgb = (color[2], color[1], color[0])
# 尝试使用系统字体
font = None
font_paths = [
"C:/Windows/Fonts/simhei.ttf", # 黑体
"C:/Windows/Fonts/msyh.ttc", # 微软雅黑
"C:/Windows/Fonts/simsun.ttc", # 宋体
"C:/Windows/Fonts/msyhbd.ttc", # 微软雅黑 Bold
]
for font_path in font_paths:
if os.path.exists(font_path):
try:
font = ImageFont.truetype(font_path, font_size)
break
except:
continue
# 如果找不到字体,使用默认字体
if font is None:
font = ImageFont.load_default()
# 绘制文本
draw.text(position, text, font=font, fill=color_rgb)
# 转换回OpenCV格式
img = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
return img
except Exception as e:
# 如果绘制失败,使用英文替代或直接返回原图
print(f"中文文本绘制失败: {e}使用OpenCV默认字体")
# 降级方案使用OpenCV绘制可能显示为问号但至少不会崩溃
cv2.putText(img, text.encode('utf-8').decode('latin-1', 'ignore'), position,
cv2.FONT_HERSHEY_SIMPLEX, font_size/40, color, thickness)
return img
class LLMClient:
"""大模型客户端,用于人员判断和离岗分析"""
def __init__(self, api_key, base_url, model_name):
self.client = OpenAI(
api_key=api_key,
base_url=base_url,
)
self.model_name = model_name
def frame_to_base64(self, frame):
"""将OpenCV帧转换为base64编码"""
_, buffer = cv2.imencode('.jpg', frame)
img_base64 = base64.b64encode(buffer).decode('utf-8')
return img_base64
def check_if_staff(self, frame, cam_id, roi_name):
"""判断ROI中的人员是否为工作人员"""
try:
img_base64 = self.frame_to_base64(frame)
prompt = f"""你是一个智能安防辅助系统,负责对监控画面中指定敏感区域(如高配间门口、天台、禁行通道)的人员活动进行分析。
请根据以下规则生成结构化响应:
### 【判定标准】
✅ **本单位物业员工**需满足下列条件之一:
1. **清晰可见的正式工牌**(胸前佩戴)
2. **穿着标准制服**(如:带有白色反光条的深色工程服、黄蓝工程服、白衬衫+黑领带、蓝色清洁装、浅色客服装等)
3. **行为符合岗位规范**(如巡检、维修、清洁,无徘徊、张望、翻越)
> 注意: 满足部分关键条件(如戴有安全帽、穿有工作人员服饰、带有工牌)→ 视为员工,不生成告警。
### 【输出规则】
#### 情况1ROI区域内**无人**
→ 输出:
🟢无异常:敏感区域当前无人员活动。
[客观描述:画面整体状态]
#### 情况2ROI区域内**有本单位员工**
→ 输出:
🟢无异常:检测到本单位工作人员正常作业。
[客观描述:人数+制服类型+工牌状态+行为]
#### 情况3ROI区域内**有非员工或身份不明人员**
→ 输出:
🚨[区域类型]入侵告警:检测到疑似非工作人员,请立即核查。
[客观描述:人数+衣着+工牌状态+位置+行为]
### 【描述要求】
- 所有描述必须**≤30字**
- 仅陈述**可观察事实**,禁止主观推测(如"意图破坏""形迹可疑"
- 使用简洁、标准化语言
### 【示例】
▶ 示例1无人
🟢无异常:敏感区域当前无人员活动。
高配间门口区域空旷,无人员进入。
▶ 示例2员工
🟢无异常:检测到本单位工作人员正常作业。
1名工程人员穿带有反光条的深蓝色工服在高配间巡检。
▶ 示例3非员工
🚨天台区域入侵告警:检测到疑似非工作人员,请立即核查。
1人穿绿色外套未佩戴工牌进入天台区域。
---
请分析摄像头{cam_id}{roi_name}区域,按照上述格式输出结果。"""
response = self.client.chat.completions.create(
model=self.model_name,
messages=[
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{img_base64}"
}
},
{"type": "text", "text": prompt}
]
}
]
)
result_text = response.choices[0].message.content
# 判断是否为工作人员(根据新的输出格式)
# 如果包含"🟢无异常"且包含"工作人员",则为员工
# 如果包含"🚨"或"入侵告警"或"非工作人员",则为非员工
is_staff = False
if "🟢无异常" in result_text and "工作人员" in result_text:
is_staff = True
elif "🚨" in result_text or "入侵告警" in result_text or "非工作人员" in result_text:
is_staff = False
elif "无人员活动" in result_text or "无人" in result_text:
is_staff = None # 无人情况
return is_staff, result_text
except Exception as e:
print(f"[{cam_id}] 大模型调用失败: {e}")
return None, str(e)
def analyze_off_duty_duration(self, key_frames_info, cam_id):
"""分析离岗时长并判断是否为同一人"""
try:
frames = key_frames_info.get('frames', [])
if not frames:
return False, False, "无关键帧"
off_duty_duration = key_frames_info.get('off_duty_duration', 0)
duration_minutes = int(off_duty_duration / 60)
duration_seconds = int(off_duty_duration % 60)
# 构建消息内容
content_parts = [
{
"type": "text",
"text": f"""请分析以下关键帧图像,判断人员离岗情况。请按照以下格式简洁回答:
【输出格式】
1. 是否告警:[是/否]
2. 离岗时间:{duration_minutes}{duration_seconds}
3. 是否为同一人:[是/否/无法确定]
4. 简要分析:[一句话概括不超过30字]
要求:
- 如果离岗时间超过6分钟且确认为同一人则告警
- 简要分析需客观描述关键帧中人员的特征和行为变化
- 回答要简洁明了,避免冗余描述
关键帧信息:"""
}
]
# 添加图像和说明
for i, frame_info in enumerate(frames):
img_base64 = self.frame_to_base64(frame_info['frame'])
content_parts.append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{img_base64}"
}
})
content_parts.append({
"type": "text",
"text": f"关键帧{i+1} - 时间: {frame_info['time']}, 事件: {frame_info['event']}"
})
response = self.client.chat.completions.create(
model=self.model_name,
messages=[
{
"role": "user",
"content": content_parts
}
]
)
result = response.choices[0].message.content
# 解析结果 - 更灵活的解析逻辑
# 判断是否告警
exceeds_6min = False
if duration_minutes >= 6:
# 如果时间已经超过6分钟检查大模型是否确认告警
if any(keyword in result for keyword in ["是否告警:是", "是否告警:是", "告警:是", "告警:是", "需要告警", "应告警"]):
exceeds_6min = True
elif "是否告警:否" not in result and "是否告警:否" not in result:
# 如果没有明确说否且时间超过6分钟默认告警
exceeds_6min = True
else:
# 时间未超过6分钟即使大模型说告警也不告警
exceeds_6min = False
# 判断是否为同一人
is_same_person = False
if any(keyword in result for keyword in ["是否为同一人:是", "是否为同一人:是", "同一人:是", "同一人:是", "是同一人", "确认为同一人"]):
is_same_person = True
elif any(keyword in result for keyword in ["是否为同一人:否", "是否为同一人:否", "同一人:否", "同一人:否", "不是同一人", "非同一人"]):
is_same_person = False
elif "无法确定" in result or "不确定" in result:
is_same_person = False # 无法确定时,不告警
return exceeds_6min, is_same_person, result
except Exception as e:
print(f"[{cam_id}] 离岗分析失败: {e}")
return None, None, str(e)
class ThreadedFrameReader:
def __init__(self, cam_id, rtsp_url):
self.cam_id = cam_id
self.rtsp_url = rtsp_url
self._lock = threading.Lock() # 添加锁保护VideoCapture访问
self.cap = None
try:
self.cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
if self.cap.isOpened():
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
else:
print(f"[{cam_id}] 警告:无法打开视频流: {rtsp_url}")
except Exception as e:
print(f"[{cam_id}] 初始化VideoCapture失败: {e}")
self.q = queue.Queue(maxsize=2)
self.running = True
self.thread = threading.Thread(target=self._reader, daemon=True)
self.thread.start()
def _reader(self):
"""读取帧的线程函数"""
try:
while self.running:
with self._lock:
if self.cap is None or not self.cap.isOpened():
break
ret, frame = self.cap.read()
if not ret:
time.sleep(0.1)
continue
if self.q.full():
try:
self.q.get_nowait()
except queue.Empty:
pass
self.q.put(frame)
except Exception as e:
print(f"[{self.cam_id}] 读取帧线程异常: {e}")
finally:
# 确保资源释放(使用锁保护)
with self._lock:
if self.cap is not None:
try:
if self.cap.isOpened():
self.cap.release()
except Exception as e:
print(f"[{self.cam_id}] 释放VideoCapture时出错: {e}")
finally:
self.cap = None
def read(self):
if not self.q.empty():
return True, self.q.get()
return False, None
def release(self):
"""释放资源,等待线程结束"""
if not self.running:
return # 已经释放过了
self.running = False
# 等待线程结束最多等待3秒
if self.thread.is_alive():
self.thread.join(timeout=3.0)
if self.thread.is_alive():
print(f"[{self.cam_id}] 警告读取线程未能在3秒内结束")
# 清空队列
while not self.q.empty():
try:
self.q.get_nowait()
except queue.Empty:
break
# VideoCapture的释放由_reader线程的finally块处理这里不再重复释放
class MultiCameraMonitor:
def __init__(self, config_path):
with open(config_path, 'r', encoding='utf-8') as f:
self.cfg = yaml.safe_load(f)
# === 全局模型(只加载一次)===
model_cfg = self.cfg['model']
self.device = model_cfg.get('device', 'auto')
if self.device == 'auto' or not self.device:
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"🚀 全局加载模型到 {self.device}...")
self.model = YOLO(model_cfg['path'])
self.model.to(self.device)
self.use_half = (self.device == 'cuda')
if self.use_half:
print("✅ 启用 FP16 推理")
self.imgsz = model_cfg['imgsz']
self.conf_thresh = model_cfg['conf_threshold']
# === 初始化大模型客户端 ===
llm_cfg = self.cfg.get('llm', {})
if llm_cfg.get('api_key'):
self.llm_client = LLMClient(
llm_cfg['api_key'],
llm_cfg['base_url'],
llm_cfg.get('model_name', 'qwen-vl-max')
)
print("✅ 大模型客户端已初始化")
else:
self.llm_client = None
print("⚠️ 未配置大模型API密钥大模型功能将不可用")
# === 初始化所有摄像头 ===
self.common = self.cfg['common']
self.cameras = {}
self.frame_readers = {}
self.queues = {} # cam_id -> queue for detection results
self.perimeter_queues = {} # cam_id -> queue for perimeter detection (每秒抽帧)
for cam_cfg in self.cfg['cameras']:
cam_id = cam_cfg['id']
self.cameras[cam_id] = CameraLogic(cam_id, cam_cfg, self.common, self.llm_client)
self.frame_readers[cam_id] = ThreadedFrameReader(cam_id, cam_cfg['rtsp_url'])
self.queues[cam_id] = queue.Queue(maxsize=1) # 存放检测结果(人员离岗)
self.perimeter_queues[cam_id] = queue.Queue(maxsize=1) # 存放检测结果(周界入侵)
# === 控制信号 ===
self.running = True
self.inference_thread = threading.Thread(target=self._inference_loop, daemon=True)
self.perimeter_thread = threading.Thread(target=self._perimeter_inference_loop, daemon=True)
self.inference_thread.start()
self.perimeter_thread.start()
def _inference_loop(self):
"""统一推理线程:轮询各摄像头最新帧,逐个推理(用于人员离岗)"""
while self.running:
processed = False
for cam_id, reader in self.frame_readers.items():
ret, frame = reader.read()
if not ret:
continue
cam_logic = self.cameras[cam_id]
if cam_logic.should_skip_frame():
continue
# 检查是否有ROI启用了人员离岗算法
if not cam_logic.has_enabled_algorithm('人员离岗'):
continue
results = self.model(
frame,
imgsz=self.imgsz,
conf=self.conf_thresh,
verbose=False,
device=self.device,
half=self.use_half,
classes=[0] # person only
)
if not self.queues[cam_id].full():
self.queues[cam_id].put((frame.copy(), results[0]))
processed = True
if not processed:
time.sleep(0.01)
def _perimeter_inference_loop(self):
"""周界入侵推理线程:每秒抽一帧进行检测"""
while self.running:
processed = False
for cam_id, reader in self.frame_readers.items():
cam_logic = self.cameras[cam_id]
# 检查是否有ROI启用了周界入侵算法
if not cam_logic.has_enabled_algorithm('周界入侵'):
continue
ret, frame = reader.read()
if not ret:
continue
# 每秒抽一帧
current_time = time.time()
if not hasattr(cam_logic, 'last_perimeter_check_time'):
cam_logic.last_perimeter_check_time = {}
if cam_id not in cam_logic.last_perimeter_check_time:
cam_logic.last_perimeter_check_time[cam_id] = 0
if current_time - cam_logic.last_perimeter_check_time[cam_id] < 1.0:
continue
cam_logic.last_perimeter_check_time[cam_id] = current_time
results = self.model(
frame,
imgsz=self.imgsz,
conf=self.conf_thresh,
verbose=False,
device=self.device,
half=self.use_half,
classes=[0] # person only
)
if not self.perimeter_queues[cam_id].full():
self.perimeter_queues[cam_id].put((frame.copy(), results[0]))
processed = True
if not processed:
time.sleep(0.1)
def run(self):
"""启动所有摄像头的显示和告警逻辑(主线程)"""
try:
while self.running:
for cam_id, cam_logic in self.cameras.items():
# 处理人员离岗检测结果
if not self.queues[cam_id].empty():
frame, results = self.queues[cam_id].get()
cam_logic.process_off_duty(frame, results)
# 处理周界入侵检测结果
if not self.perimeter_queues[cam_id].empty():
frame, results = self.perimeter_queues[cam_id].get()
cam_logic.process_perimeter(frame, results)
# 更新显示
cam_logic.update_display()
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
time.sleep(0.01)
except KeyboardInterrupt:
pass
finally:
self.stop()
def stop(self):
"""停止监控,清理所有资源"""
print("正在停止监控系统...")
self.running = False
# 等待推理线程结束
if hasattr(self, 'inference_thread') and self.inference_thread.is_alive():
self.inference_thread.join(timeout=2.0)
if hasattr(self, 'perimeter_thread') and self.perimeter_thread.is_alive():
self.perimeter_thread.join(timeout=2.0)
# 释放所有摄像头资源
for cam_id, reader in self.frame_readers.items():
try:
print(f"正在释放摄像头 {cam_id}...")
reader.release()
except Exception as e:
print(f"释放摄像头 {cam_id} 时出错: {e}")
# 关闭所有窗口
try:
cv2.destroyAllWindows()
except:
pass
# 强制清理(如果还有线程在运行)
import sys
import os
if sys.platform == 'win32':
# Windows下可能需要额外等待
time.sleep(0.5)
print("监控系统已停止")
class ROILogic:
"""单个ROI区域的逻辑处理"""
def __init__(self, roi_cfg, cam_id, common_cfg, llm_client):
self.cam_id = cam_id
self.roi_name = roi_cfg.get('name', '未命名区域')
self.llm_client = llm_client
# 处理points如果不存在或为空设置为None表示使用整张画面
if 'points' in roi_cfg and roi_cfg['points']:
self.roi_points = np.array(roi_cfg['points'], dtype=np.int32)
self.use_full_frame = False
else:
# 对于周界入侵算法如果没有points使用整张画面
self.roi_points = None
self.use_full_frame = True
# 算法配置
self.algorithms = {}
for alg_cfg in roi_cfg.get('algorithms', []):
alg_name = alg_cfg['name']
if alg_cfg.get('enabled', False):
self.algorithms[alg_name] = alg_cfg
# 人员离岗相关状态需要ROI如果没有points则不能启用
if '人员离岗' in self.algorithms:
if self.roi_points is None:
print(f"[{cam_id}] 警告:{self.roi_name} 启用了人员离岗算法但没有配置points已禁用")
del self.algorithms['人员离岗']
else:
alg_cfg = self.algorithms['人员离岗']
self.off_duty_threshold_sec = alg_cfg.get('off_duty_threshold_sec', 300)
self.on_duty_confirm_sec = alg_cfg.get('on_duty_confirm_sec', 5)
self.off_duty_confirm_sec = alg_cfg.get('off_duty_confirm_sec', 30)
self.is_on_duty = False
self.is_off_duty = True
self.on_duty_start_time = None
self.last_no_person_time = None
self.off_duty_timer_start = None
self.last_alert_time = 0
self.last_person_seen_time = None
# 关键时间记录
self.on_duty_confirm_time = None # 上岗确认时间
self.off_duty_confirm_time = None # 离岗确认时间
self.key_frames = [] # 关键帧存储
# 初始化状态跟踪
self.initial_state_start_time = None # 初始化状态开始时间(进入工作时间时)
self.has_ever_seen_person = False # 是否曾经检测到过人员
self.initial_state_frame = None # 初始化状态时的帧(用于大模型分析)
# 周界入侵相关状态如果没有points使用整张画面
if '周界入侵' in self.algorithms:
self.perimeter_last_check_time = 0
self.perimeter_alert_cooldown = 120 # 周界入侵告警冷却120秒2分钟
if self.use_full_frame:
print(f"[{cam_id}] 提示:{self.roi_name} 周界入侵算法将使用整张画面进行检测")
def is_point_in_roi(self, x, y):
"""判断点是否在ROI内如果没有ROIuse_full_frame=True总是返回True"""
if self.use_full_frame or self.roi_points is None:
return True
return cv2.pointPolygonTest(self.roi_points, (int(x), int(y)), False) >= 0
class CameraLogic:
def __init__(self, cam_id, cam_cfg, common_cfg, llm_client):
self.cam_id = cam_id
self.llm_client = llm_client
# 工作时间段配置
self.working_hours = common_cfg.get('working_hours', [[8, 30, 11, 0], [12, 0, 17, 30]])
self.process_every_n = cam_cfg.get('process_every_n_frames', common_cfg['process_every_n_frames'])
self.alert_cooldown_sec = common_cfg.get('alert_cooldown_sec', 300)
self.off_duty_alert_threshold_sec = common_cfg.get('off_duty_alert_threshold_sec', 360) # 6分钟
# 初始化所有ROI
self.rois = []
for roi_cfg in cam_cfg.get('rois', []):
self.rois.append(ROILogic(roi_cfg, cam_id, common_cfg, llm_client))
# 兼容旧配置格式
if 'roi_points' in cam_cfg:
# 创建默认ROI用于人员离岗
default_roi = {
'name': '离岗检测区域',
'points': cam_cfg['roi_points'],
'algorithms': [{
'name': '人员离岗',
'enabled': True,
'off_duty_threshold_sec': cam_cfg.get('off_duty_threshold_sec', 300),
'on_duty_confirm_sec': cam_cfg.get('on_duty_confirm_sec', 5),
'off_duty_confirm_sec': cam_cfg.get('off_duty_confirm_sec', 30)
}]
}
self.rois.append(ROILogic(default_roi, cam_id, common_cfg, llm_client))
self.frame_count = 0
self.display_frame = None # 用于显示的帧
self.display_results = None # 用于显示的检测结果YOLO results
def should_skip_frame(self):
self.frame_count += 1
return self.frame_count % self.process_every_n != 0
def has_enabled_algorithm(self, alg_name):
"""检查是否有ROI启用了指定算法"""
return any(alg_name in roi.algorithms for roi in self.rois)
def in_working_hours(self):
"""判断是否在工作时间段内"""
now = datetime.datetime.now()
h, m = now.hour, now.minute
current_minutes = h * 60 + m
for period in self.working_hours:
start_h, start_m, end_h, end_m = period
start_minutes = start_h * 60 + start_m
end_minutes = end_h * 60 + end_m
if start_minutes <= current_minutes < end_minutes:
return True
return False
def is_edge_time(self):
"""判断是否为边缘时间段8:30-9:00, 11:00-12:00, 17:30-18:00"""
now = datetime.datetime.now()
h, m = now.hour, now.minute
current_minutes = h * 60 + m
edge_periods = [
(8 * 60 + 30, 9 * 60), # 8:30-9:00
(11 * 60, 12 * 60), # 11:00-12:00
(17 * 60 + 30, 18 * 60) # 17:30-18:00
]
for start, end in edge_periods:
if start <= current_minutes < end:
return True
return False
def get_end_of_work_time(self):
"""获取当天工作结束时间17:30"""
now = datetime.datetime.now()
end_time = now.replace(hour=17, minute=30, second=0, microsecond=0)
if now > end_time:
# 如果已经过了17:30返回明天的17:30
end_time += datetime.timedelta(days=1)
return end_time
def process_off_duty(self, frame, results):
"""处理人员离岗检测"""
current_time = time.time()
now = datetime.datetime.now()
boxes = results.boxes
for roi in self.rois:
if '人员离岗' not in roi.algorithms:
continue
# 检查ROI中是否有人
roi_has_person = any(
roi.is_point_in_roi((b.xyxy[0][0] + b.xyxy[0][2]) / 2,
(b.xyxy[0][1] + b.xyxy[0][3]) / 2)
for b in boxes
)
in_work = self.in_working_hours()
is_edge = self.is_edge_time()
if in_work:
# 初始化状态跟踪:如果刚进入工作时间,记录开始时间
if roi.initial_state_start_time is None:
roi.initial_state_start_time = current_time
roi.has_ever_seen_person = False
roi.initial_state_frame = frame.copy() # 保存初始化状态时的帧
if roi_has_person:
roi.last_person_seen_time = current_time
roi.has_ever_seen_person = True
# 如果检测到人员,清除初始化状态
if roi.initial_state_start_time is not None:
roi.initial_state_start_time = None
roi.initial_state_frame = None
effective = (
roi.last_person_seen_time is not None and
(current_time - roi.last_person_seen_time) < 1.0
)
# 处理初始化状态:如果系统启动时没有人,且超过离岗确认时间
if not roi.has_ever_seen_person and roi.initial_state_start_time is not None:
elapsed_since_start = current_time - roi.initial_state_start_time
if elapsed_since_start >= roi.off_duty_confirm_sec:
# 超过离岗确认时间,触发离岗确认逻辑
roi.is_off_duty, roi.is_on_duty = True, False
roi.off_duty_confirm_time = roi.initial_state_start_time + roi.off_duty_confirm_sec # 使用离岗确认时间点
roi.off_duty_timer_start = current_time
# 保存关键帧(使用初始化状态时的帧作为离岗确认帧)
if roi.initial_state_frame is not None:
roi.key_frames.append({
'frame': roi.initial_state_frame.copy(),
'time': datetime.datetime.fromtimestamp(roi.off_duty_confirm_time).strftime('%Y-%m-%d %H:%M:%S'),
'event': '离岗确认(初始化状态)'
})
# 也保存当前帧
roi.key_frames.append({
'frame': frame.copy(),
'time': now.strftime('%Y-%m-%d %H:%M:%S'),
'event': '当前状态'
})
print(f"[{self.cam_id}] [{roi.roi_name}] 🚪 初始化状态:超过离岗确认时间,进入离岗计时 ({now.strftime('%H:%M:%S')})")
roi.initial_state_start_time = None # 清除初始化状态标记
roi.initial_state_frame = None
if effective:
roi.last_no_person_time = None
if roi.is_off_duty:
if roi.on_duty_start_time is None:
roi.on_duty_start_time = current_time
elif current_time - roi.on_duty_start_time >= roi.on_duty_confirm_sec:
roi.is_on_duty, roi.is_off_duty = True, False
roi.on_duty_confirm_time = current_time
roi.on_duty_start_time = None
# 保存关键帧
roi.key_frames.append({
'frame': frame.copy(),
'time': now.strftime('%Y-%m-%d %H:%M:%S'),
'event': '上岗确认'
})
print(f"[{self.cam_id}] [{roi.roi_name}] ✅ 上岗确认成功 ({now.strftime('%H:%M:%S')})")
else:
roi.on_duty_start_time = None
roi.last_person_seen_time = None
if not roi.is_off_duty:
if roi.last_no_person_time is None:
roi.last_no_person_time = current_time
elif current_time - roi.last_no_person_time >= roi.off_duty_confirm_sec:
roi.is_off_duty, roi.is_on_duty = True, False
roi.off_duty_confirm_time = current_time
roi.last_no_person_time = None
roi.off_duty_timer_start = current_time
# 保存关键帧
roi.key_frames.append({
'frame': frame.copy(),
'time': now.strftime('%Y-%m-%d %H:%M:%S'),
'event': '离岗确认'
})
print(f"[{self.cam_id}] [{roi.roi_name}] 🚪 进入离岗计时 ({now.strftime('%H:%M:%S')})")
# 离岗告警逻辑(边缘时间只记录,不告警)
if roi.is_off_duty and roi.off_duty_timer_start:
elapsed = current_time - roi.off_duty_timer_start
off_duty_duration = elapsed
# 如果到了下班时间还没回来,计算到下班时间的离岗时长
end_time = self.get_end_of_work_time()
if now >= end_time and roi.off_duty_confirm_time:
# 计算离岗时长:下班时间 - 离岗确认时间
off_duty_duration = (end_time.timestamp() - roi.off_duty_confirm_time)
# 超过6分钟且不在边缘时间使用大模型判断
if off_duty_duration >= self.off_duty_alert_threshold_sec and not is_edge:
# 对于初始化状态即使只有1帧也要进行分析
is_initial_state = any('初始化状态' in f.get('event', '') for f in roi.key_frames)
min_frames_required = 1 if is_initial_state else 2
if self.llm_client and len(roi.key_frames) >= min_frames_required:
# 限制关键帧数量只保留最近10帧
if len(roi.key_frames) > 10:
roi.key_frames = roi.key_frames[-10:]
# 准备关键帧信息
key_frames_info = {
'frames': roi.key_frames[-5:] if len(roi.key_frames) >= 2 else roi.key_frames, # 如果有足够帧取最近5帧否则全部使用
'off_duty_duration': off_duty_duration
}
# 调用大模型分析
exceeds_6min, is_same_person, analysis_result = self.llm_client.analyze_off_duty_duration(
key_frames_info, self.cam_id
)
# 对于初始化状态只要超过6分钟就告警因为无法判断是否为同一人
if is_initial_state:
should_alert = exceeds_6min if exceeds_6min is not None else (off_duty_duration >= self.off_duty_alert_threshold_sec)
else:
should_alert = exceeds_6min and is_same_person
if should_alert:
if (current_time - roi.last_alert_time) >= self.alert_cooldown_sec:
print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 离岗告警!离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)")
print(f"大模型分析结果: {analysis_result}")
# 保存告警图片
save_alert_image(
frame.copy(),
self.cam_id,
roi.roi_name,
"离岗",
f"离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)\n大模型分析结果:\n{analysis_result}"
)
roi.last_alert_time = current_time
elif not is_edge:
# 如果没有大模型,直接告警
if (current_time - roi.last_alert_time) >= self.alert_cooldown_sec:
print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 离岗告警!离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)")
# 保存告警图片
save_alert_image(
frame.copy(),
self.cam_id,
roi.roi_name,
"离岗",
f"离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)"
)
roi.last_alert_time = current_time
elif is_edge and roi.off_duty_confirm_time:
# 边缘时间只记录,不告警
print(f"[{self.cam_id}] [{roi.roi_name}] 边缘时间段,记录离岗时长: {int(off_duty_duration)}")
self.display_frame = frame.copy()
self.display_results = results # 保存检测结果用于显示
def crop_roi(self, frame, roi_points):
"""裁剪ROI区域如果roi_points为None返回整张画面"""
if roi_points is None:
return frame.copy()
x_coords = roi_points[:, 0]
y_coords = roi_points[:, 1]
x_min, x_max = int(x_coords.min()), int(x_coords.max())
y_min, y_max = int(y_coords.min()), int(y_coords.max())
# 确保坐标在图像范围内
h, w = frame.shape[:2]
x_min = max(0, x_min)
y_min = max(0, y_min)
x_max = min(w, x_max)
y_max = min(h, y_max)
roi_frame = frame[y_min:y_max, x_min:x_max]
# 创建掩码
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
cv2.fillPoly(mask, [roi_points], 255)
mask_roi = mask[y_min:y_max, x_min:x_max]
# 应用掩码
if len(roi_frame.shape) == 3:
mask_roi = cv2.cvtColor(mask_roi, cv2.COLOR_GRAY2BGR)
roi_frame = cv2.bitwise_and(roi_frame, mask_roi)
return roi_frame
def process_perimeter(self, frame, results):
"""处理周界入侵检测"""
current_time = time.time()
boxes = results.boxes
for roi in self.rois:
if '周界入侵' not in roi.algorithms:
continue
# 检查ROI中是否有人如果没有ROI检查整张画面是否有人
if roi.use_full_frame:
# 使用整张画面,只要检测到人就触发
roi_has_person = len(boxes) > 0
else:
# 检查ROI中是否有人
roi_has_person = any(
roi.is_point_in_roi((b.xyxy[0][0] + b.xyxy[0][2]) / 2,
(b.xyxy[0][1] + b.xyxy[0][3]) / 2)
for b in boxes
)
if roi_has_person:
# 冷却时间检查
if current_time - roi.perimeter_last_check_time >= roi.perimeter_alert_cooldown:
roi.perimeter_last_check_time = current_time
# 裁剪ROI区域如果没有ROI使用整张画面
roi_frame = self.crop_roi(frame, roi.roi_points)
# 调用大模型判断是否为工作人员
if self.llm_client:
is_staff, result = self.llm_client.check_if_staff(roi_frame, self.cam_id, roi.roi_name)
area_desc = "整张画面" if roi.use_full_frame else roi.roi_name
if is_staff is None:
# 无人情况
print(f"[{self.cam_id}] [{roi.roi_name}] 大模型判断:{result}")
elif not is_staff:
# 非工作人员
print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 周界入侵告警!检测到非工作人员(检测区域:{area_desc}")
print(f"大模型判断结果: {result}")
# 保存告警图片(使用区域描述作为名称,更清晰)
save_alert_image(
frame.copy(),
self.cam_id,
area_desc, # 使用area_desc而不是roi.roi_name
"入侵",
f"检测区域: {area_desc}\nROI名称: {roi.roi_name}\n大模型判断结果:\n{result}"
)
else:
# 工作人员
print(f"[{self.cam_id}] [{roi.roi_name}] 检测到工作人员,无需告警")
print(f"大模型判断结果: {result}")
else:
# 没有大模型时,直接告警
area_desc = "整张画面" if roi.use_full_frame else roi.roi_name
print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 周界入侵告警!检测到人员进入(检测区域:{area_desc}")
# 保存告警图片(使用区域描述作为名称,更清晰)
save_alert_image(
frame.copy(),
self.cam_id,
area_desc, # 使用area_desc而不是roi.roi_name
"入侵",
f"检测区域: {area_desc}\nROI名称: {roi.roi_name}\n检测到人员进入"
)
self.display_frame = frame.copy()
self.display_results = results # 保存检测结果用于显示
def update_display(self):
"""更新显示"""
if self.display_frame is None:
return
vis = self.display_frame.copy()
now = datetime.datetime.now()
in_work = self.in_working_hours()
# 如果有检测结果先绘制YOLO识别框
if self.display_results is not None:
vis = self.display_results.plot() # 使用YOLO的plot方法绘制识别框
# 检查是否有启用人员离岗算法的ROI
has_off_duty_algorithm = any('人员离岗' in roi.algorithms for roi in self.rois)
# 绘制所有ROI
full_frame_roi_count = 0 # 用于跟踪使用整张画面的ROI数量避免文本重叠
for roi in self.rois:
color = (0, 255, 0) # 默认绿色
thickness = 2
# 根据算法状态设置颜色
if '人员离岗' in roi.algorithms:
if roi.is_on_duty:
color = (0, 255, 0) # 绿色:在岗
elif roi.is_off_duty and roi.off_duty_timer_start:
elapsed = time.time() - roi.off_duty_timer_start
if elapsed >= roi.off_duty_threshold_sec:
color = (0, 0, 255) # 红色:离岗告警
else:
color = (0, 255, 255) # 黄色:离岗中
else:
color = (255, 0, 0) # 蓝色:未在岗
if '周界入侵' in roi.algorithms:
color = (255, 255, 0) # 青色:周界入侵区域
# 如果有ROI绘制ROI框
if roi.roi_points is not None:
cv2.polylines(vis, [roi.roi_points], True, color, thickness)
# 创建半透明覆盖层
overlay = vis.copy()
cv2.fillPoly(overlay, [roi.roi_points], color)
cv2.addWeighted(overlay, 0.2, vis, 0.8, 0, vis)
# 显示ROI名称使用中文文本绘制函数
text_pos = tuple(roi.roi_points[0])
vis = put_chinese_text(vis, roi.roi_name, text_pos, font_size=20, color=color, thickness=1)
else:
# 如果没有ROI使用整张画面在左上角显示提示避免重叠
display_text = f"{roi.roi_name} (整张画面)"
text_y = 30 + full_frame_roi_count * 25 # 每个ROI文本向下偏移25像素
vis = put_chinese_text(vis, display_text, (10, text_y), font_size=18, color=color, thickness=1)
full_frame_roi_count += 1
# 只在启用人员离岗算法时显示岗位状态
if has_off_duty_algorithm:
status = "OUT OF HOURS"
status_color = (128, 128, 128)
if in_work:
# 检查所有ROI的状态
has_on_duty = any(roi.is_on_duty for roi in self.rois if '人员离岗' in roi.algorithms)
has_off_duty = any(roi.is_off_duty and roi.off_duty_timer_start
for roi in self.rois if '人员离岗' in roi.algorithms)
if has_on_duty:
status, status_color = "ON DUTY", (0, 255, 0)
elif has_off_duty:
status, status_color = "OFF DUTY", (0, 255, 255)
else:
status, status_color = "OFF DUTY", (255, 0, 0)
cv2.putText(vis, f"[{self.cam_id}] {status}", (20, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, status_color, 2)
# 显示时间戳(所有摄像头都显示)
cv2.putText(vis, now.strftime('%Y-%m-%d %H:%M:%S'), (20, 90),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
cv2.imshow(f"Monitor - {self.cam_id}", vis)
def main():
import signal
import sys
parser = argparse.ArgumentParser()
parser.add_argument("--config", default="config.yaml", help="配置文件路径")
args = parser.parse_args()
monitor = None
try:
monitor = MultiCameraMonitor(args.config)
# 注册信号处理,确保优雅退出
def signal_handler(sig, frame):
print("\n收到退出信号,正在关闭...")
if monitor:
monitor.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
if sys.platform != 'win32':
signal.signal(signal.SIGTERM, signal_handler)
monitor.run()
except KeyboardInterrupt:
print("\n收到键盘中断,正在关闭...")
except Exception as e:
print(f"程序异常: {e}")
import traceback
traceback.print_exc()
finally:
if monitor:
monitor.stop()
# 确保进程退出
sys.exit(0)
if __name__ == "__main__":
main()