import cv2 import numpy as np import yaml import torch from ultralytics import YOLO import time import datetime import threading import queue import sys import argparse import base64 import os from openai import OpenAI from io import BytesIO from PIL import Image, ImageDraw, ImageFont def save_alert_image(frame, cam_id, roi_name, alert_type, alert_info=""): """保存告警图片 Args: frame: OpenCV图像 cam_id: 摄像头ID roi_name: ROI区域名称 alert_type: 告警类型 ("离岗" 或 "入侵") alert_info: 告警信息(可选) """ try: # 创建文件夹结构 data_dir = "data" alert_dir = os.path.join(data_dir, alert_type) os.makedirs(alert_dir, exist_ok=True) # 生成文件名:根据告警类型使用不同的命名方式 timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # 清理文件名中的特殊字符 safe_roi_name = roi_name.replace("/", "_").replace("\\", "_").replace(":", "_") # 对于入侵告警,使用告警类型+ROI名称;对于离岗告警,使用ROI名称 if alert_type == "入侵": # 周界入侵:使用"入侵_区域名称"格式 filename = f"{cam_id}_入侵_{safe_roi_name}_{timestamp}.jpg" else: # 离岗:使用原有格式 filename = f"{cam_id}_{safe_roi_name}_{timestamp}.jpg" filepath = os.path.join(alert_dir, filename) # 保存图片 cv2.imwrite(filepath, frame) print(f"[{cam_id}] 💾 告警图片已保存: {filepath}") # 如果有告警信息,保存到文本文件 if alert_info: info_filepath = filepath.replace(".jpg", ".txt") with open(info_filepath, 'w', encoding='utf-8') as f: f.write(f"告警时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"摄像头ID: {cam_id}\n") f.write(f"ROI区域: {roi_name}\n") f.write(f"告警类型: {alert_type}\n") f.write(f"告警信息:\n{alert_info}\n") return filepath except Exception as e: print(f"[{cam_id}] 保存告警图片失败: {e}") return None def put_chinese_text(img, text, position, font_size=20, color=(255, 255, 255), thickness=1): """在OpenCV图像上绘制中文文本 Args: img: OpenCV图像 (BGR格式) text: 要显示的文本(支持中文) position: 文本位置 (x, y) font_size: 字体大小 color: 颜色 (BGR格式,会被转换为RGB) thickness: 线条粗细(PIL不支持,保留参数以兼容) """ try: # 将OpenCV图像转换为PIL图像 img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) draw = ImageDraw.Draw(img_pil) # 转换颜色格式:BGR -> RGB color_rgb = (color[2], color[1], color[0]) # 尝试使用系统字体 font = None font_paths = [ "C:/Windows/Fonts/simhei.ttf", # 黑体 "C:/Windows/Fonts/msyh.ttc", # 微软雅黑 "C:/Windows/Fonts/simsun.ttc", # 宋体 "C:/Windows/Fonts/msyhbd.ttc", # 微软雅黑 Bold ] for font_path in font_paths: if os.path.exists(font_path): try: font = ImageFont.truetype(font_path, font_size) break except: continue # 如果找不到字体,使用默认字体 if font is None: font = ImageFont.load_default() # 绘制文本 draw.text(position, text, font=font, fill=color_rgb) # 转换回OpenCV格式 img = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR) return img except Exception as e: # 如果绘制失败,使用英文替代或直接返回原图 print(f"中文文本绘制失败: {e},使用OpenCV默认字体") # 降级方案:使用OpenCV绘制(可能显示为问号,但至少不会崩溃) cv2.putText(img, text.encode('utf-8').decode('latin-1', 'ignore'), position, cv2.FONT_HERSHEY_SIMPLEX, font_size/40, color, thickness) return img class LLMClient: """大模型客户端,用于人员判断和离岗分析""" def __init__(self, api_key, base_url, model_name): self.client = OpenAI( api_key=api_key, base_url=base_url, ) self.model_name = model_name def frame_to_base64(self, frame): """将OpenCV帧转换为base64编码""" _, buffer = cv2.imencode('.jpg', frame) img_base64 = base64.b64encode(buffer).decode('utf-8') return img_base64 def check_if_staff(self, frame, cam_id, roi_name): """判断ROI中的人员是否为工作人员""" try: img_base64 = self.frame_to_base64(frame) prompt = f"""你是一个智能安防辅助系统,负责对监控画面中指定敏感区域(如高配间门口、天台、禁行通道)的人员活动进行分析。 请根据以下规则生成结构化响应: ### 【判定标准】 ✅ **本单位物业员工**需满足下列条件之一: 1. **清晰可见的正式工牌**(胸前佩戴) 2. **穿着标准制服**(如:带有白色反光条的深色工程服、黄蓝工程服、白衬衫+黑领带、蓝色清洁装、浅色客服装等) 3. **行为符合岗位规范**(如巡检、维修、清洁,无徘徊、张望、翻越) > 注意: 满足部分关键条件(如戴有安全帽、穿有工作人员服饰、带有工牌)→ 视为员工,不生成告警。 ### 【输出规则】 #### 情况1:ROI区域内**无人** → 输出: 🟢无异常:敏感区域当前无人员活动。 [客观描述:画面整体状态] #### 情况2:ROI区域内**有本单位员工** → 输出: 🟢无异常:检测到本单位工作人员正常作业。 [客观描述:人数+制服类型+工牌状态+行为] #### 情况3:ROI区域内**有非员工或身份不明人员** → 输出: 🚨[区域类型]入侵告警:检测到疑似非工作人员,请立即核查。 [客观描述:人数+衣着+工牌状态+位置+行为] ### 【描述要求】 - 所有描述必须**≤30字** - 仅陈述**可观察事实**,禁止主观推测(如"意图破坏""形迹可疑") - 使用简洁、标准化语言 ### 【示例】 ▶ 示例1(无人): 🟢无异常:敏感区域当前无人员活动。 高配间门口区域空旷,无人员进入。 ▶ 示例2(员工): 🟢无异常:检测到本单位工作人员正常作业。 1名工程人员穿带有反光条的深蓝色工服在高配间巡检。 ▶ 示例3(非员工): 🚨天台区域入侵告警:检测到疑似非工作人员,请立即核查。 1人穿绿色外套未佩戴工牌进入天台区域。 --- 请分析摄像头{cam_id}的{roi_name}区域,按照上述格式输出结果。""" response = self.client.chat.completions.create( model=self.model_name, messages=[ { "role": "user", "content": [ { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{img_base64}" } }, {"type": "text", "text": prompt} ] } ] ) result_text = response.choices[0].message.content # 判断是否为工作人员(根据新的输出格式) # 如果包含"🟢无异常"且包含"工作人员",则为员工 # 如果包含"🚨"或"入侵告警"或"非工作人员",则为非员工 is_staff = False if "🟢无异常" in result_text and "工作人员" in result_text: is_staff = True elif "🚨" in result_text or "入侵告警" in result_text or "非工作人员" in result_text: is_staff = False elif "无人员活动" in result_text or "无人" in result_text: is_staff = None # 无人情况 return is_staff, result_text except Exception as e: print(f"[{cam_id}] 大模型调用失败: {e}") return None, str(e) def analyze_off_duty_duration(self, key_frames_info, cam_id): """分析离岗时长并判断是否为同一人""" try: frames = key_frames_info.get('frames', []) if not frames: return False, False, "无关键帧" off_duty_duration = key_frames_info.get('off_duty_duration', 0) duration_minutes = int(off_duty_duration / 60) duration_seconds = int(off_duty_duration % 60) # 构建消息内容 content_parts = [ { "type": "text", "text": f"""请分析以下关键帧图像,判断人员离岗情况。请按照以下格式简洁回答: 【输出格式】 1. 是否告警:[是/否] 2. 离岗时间:{duration_minutes}分{duration_seconds}秒 3. 是否为同一人:[是/否/无法确定] 4. 简要分析:[一句话概括,不超过30字] 要求: - 如果离岗时间超过6分钟且确认为同一人,则告警 - 简要分析需客观描述关键帧中人员的特征和行为变化 - 回答要简洁明了,避免冗余描述 关键帧信息:""" } ] # 添加图像和说明 for i, frame_info in enumerate(frames): img_base64 = self.frame_to_base64(frame_info['frame']) content_parts.append({ "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{img_base64}" } }) content_parts.append({ "type": "text", "text": f"关键帧{i+1} - 时间: {frame_info['time']}, 事件: {frame_info['event']}" }) response = self.client.chat.completions.create( model=self.model_name, messages=[ { "role": "user", "content": content_parts } ] ) result = response.choices[0].message.content # 解析结果 - 更灵活的解析逻辑 # 判断是否告警 exceeds_6min = False if duration_minutes >= 6: # 如果时间已经超过6分钟,检查大模型是否确认告警 if any(keyword in result for keyword in ["是否告警:是", "是否告警:是", "告警:是", "告警:是", "需要告警", "应告警"]): exceeds_6min = True elif "是否告警:否" not in result and "是否告警:否" not in result: # 如果没有明确说否,且时间超过6分钟,默认告警 exceeds_6min = True else: # 时间未超过6分钟,即使大模型说告警也不告警 exceeds_6min = False # 判断是否为同一人 is_same_person = False if any(keyword in result for keyword in ["是否为同一人:是", "是否为同一人:是", "同一人:是", "同一人:是", "是同一人", "确认为同一人"]): is_same_person = True elif any(keyword in result for keyword in ["是否为同一人:否", "是否为同一人:否", "同一人:否", "同一人:否", "不是同一人", "非同一人"]): is_same_person = False elif "无法确定" in result or "不确定" in result: is_same_person = False # 无法确定时,不告警 return exceeds_6min, is_same_person, result except Exception as e: print(f"[{cam_id}] 离岗分析失败: {e}") return None, None, str(e) class ThreadedFrameReader: def __init__(self, cam_id, rtsp_url): self.cam_id = cam_id self.rtsp_url = rtsp_url self._lock = threading.Lock() # 添加锁保护VideoCapture访问 self.cap = None try: self.cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG) if self.cap.isOpened(): self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) else: print(f"[{cam_id}] 警告:无法打开视频流: {rtsp_url}") except Exception as e: print(f"[{cam_id}] 初始化VideoCapture失败: {e}") self.q = queue.Queue(maxsize=2) self.running = True self.thread = threading.Thread(target=self._reader, daemon=True) self.thread.start() def _reader(self): """读取帧的线程函数""" try: while self.running: with self._lock: if self.cap is None or not self.cap.isOpened(): break ret, frame = self.cap.read() if not ret: time.sleep(0.1) continue if self.q.full(): try: self.q.get_nowait() except queue.Empty: pass self.q.put(frame) except Exception as e: print(f"[{self.cam_id}] 读取帧线程异常: {e}") finally: # 确保资源释放(使用锁保护) with self._lock: if self.cap is not None: try: if self.cap.isOpened(): self.cap.release() except Exception as e: print(f"[{self.cam_id}] 释放VideoCapture时出错: {e}") finally: self.cap = None def read(self): if not self.q.empty(): return True, self.q.get() return False, None def release(self): """释放资源,等待线程结束""" if not self.running: return # 已经释放过了 self.running = False # 等待线程结束,最多等待3秒 if self.thread.is_alive(): self.thread.join(timeout=3.0) if self.thread.is_alive(): print(f"[{self.cam_id}] 警告:读取线程未能在3秒内结束") # 清空队列 while not self.q.empty(): try: self.q.get_nowait() except queue.Empty: break # VideoCapture的释放由_reader线程的finally块处理,这里不再重复释放 class MultiCameraMonitor: def __init__(self, config_path): with open(config_path, 'r', encoding='utf-8') as f: self.cfg = yaml.safe_load(f) # === 全局模型(只加载一次)=== model_cfg = self.cfg['model'] self.device = model_cfg.get('device', 'auto') if self.device == 'auto' or not self.device: self.device = 'cuda' if torch.cuda.is_available() else 'cpu' print(f"🚀 全局加载模型到 {self.device}...") self.model = YOLO(model_cfg['path']) self.model.to(self.device) self.use_half = (self.device == 'cuda') if self.use_half: print("✅ 启用 FP16 推理") self.imgsz = model_cfg['imgsz'] self.conf_thresh = model_cfg['conf_threshold'] # === 初始化大模型客户端 === llm_cfg = self.cfg.get('llm', {}) if llm_cfg.get('api_key'): self.llm_client = LLMClient( llm_cfg['api_key'], llm_cfg['base_url'], llm_cfg.get('model_name', 'qwen-vl-max') ) print("✅ 大模型客户端已初始化") else: self.llm_client = None print("⚠️ 未配置大模型API密钥,大模型功能将不可用") # === 初始化所有摄像头 === self.common = self.cfg['common'] self.cameras = {} self.frame_readers = {} self.queues = {} # cam_id -> queue for detection results self.perimeter_queues = {} # cam_id -> queue for perimeter detection (每秒抽帧) for cam_cfg in self.cfg['cameras']: cam_id = cam_cfg['id'] self.cameras[cam_id] = CameraLogic(cam_id, cam_cfg, self.common, self.llm_client) self.frame_readers[cam_id] = ThreadedFrameReader(cam_id, cam_cfg['rtsp_url']) self.queues[cam_id] = queue.Queue(maxsize=1) # 存放检测结果(人员离岗) self.perimeter_queues[cam_id] = queue.Queue(maxsize=1) # 存放检测结果(周界入侵) # === 控制信号 === self.running = True self.inference_thread = threading.Thread(target=self._inference_loop, daemon=True) self.perimeter_thread = threading.Thread(target=self._perimeter_inference_loop, daemon=True) self.inference_thread.start() self.perimeter_thread.start() def _inference_loop(self): """统一推理线程:轮询各摄像头最新帧,逐个推理(用于人员离岗)""" while self.running: processed = False for cam_id, reader in self.frame_readers.items(): ret, frame = reader.read() if not ret: continue cam_logic = self.cameras[cam_id] if cam_logic.should_skip_frame(): continue # 检查是否有ROI启用了人员离岗算法 if not cam_logic.has_enabled_algorithm('人员离岗'): continue results = self.model( frame, imgsz=self.imgsz, conf=self.conf_thresh, verbose=False, device=self.device, half=self.use_half, classes=[0] # person only ) if not self.queues[cam_id].full(): self.queues[cam_id].put((frame.copy(), results[0])) processed = True if not processed: time.sleep(0.01) def _perimeter_inference_loop(self): """周界入侵推理线程:每秒抽一帧进行检测""" while self.running: processed = False for cam_id, reader in self.frame_readers.items(): cam_logic = self.cameras[cam_id] # 检查是否有ROI启用了周界入侵算法 if not cam_logic.has_enabled_algorithm('周界入侵'): continue ret, frame = reader.read() if not ret: continue # 每秒抽一帧 current_time = time.time() if not hasattr(cam_logic, 'last_perimeter_check_time'): cam_logic.last_perimeter_check_time = {} if cam_id not in cam_logic.last_perimeter_check_time: cam_logic.last_perimeter_check_time[cam_id] = 0 if current_time - cam_logic.last_perimeter_check_time[cam_id] < 1.0: continue cam_logic.last_perimeter_check_time[cam_id] = current_time results = self.model( frame, imgsz=self.imgsz, conf=self.conf_thresh, verbose=False, device=self.device, half=self.use_half, classes=[0] # person only ) if not self.perimeter_queues[cam_id].full(): self.perimeter_queues[cam_id].put((frame.copy(), results[0])) processed = True if not processed: time.sleep(0.1) def run(self): """启动所有摄像头的显示和告警逻辑(主线程)""" try: while self.running: for cam_id, cam_logic in self.cameras.items(): # 处理人员离岗检测结果 if not self.queues[cam_id].empty(): frame, results = self.queues[cam_id].get() cam_logic.process_off_duty(frame, results) # 处理周界入侵检测结果 if not self.perimeter_queues[cam_id].empty(): frame, results = self.perimeter_queues[cam_id].get() cam_logic.process_perimeter(frame, results) # 更新显示 cam_logic.update_display() key = cv2.waitKey(1) & 0xFF if key == ord('q'): break time.sleep(0.01) except KeyboardInterrupt: pass finally: self.stop() def stop(self): """停止监控,清理所有资源""" print("正在停止监控系统...") self.running = False # 等待推理线程结束 if hasattr(self, 'inference_thread') and self.inference_thread.is_alive(): self.inference_thread.join(timeout=2.0) if hasattr(self, 'perimeter_thread') and self.perimeter_thread.is_alive(): self.perimeter_thread.join(timeout=2.0) # 释放所有摄像头资源 for cam_id, reader in self.frame_readers.items(): try: print(f"正在释放摄像头 {cam_id}...") reader.release() except Exception as e: print(f"释放摄像头 {cam_id} 时出错: {e}") # 关闭所有窗口 try: cv2.destroyAllWindows() except: pass # 强制清理(如果还有线程在运行) import sys import os if sys.platform == 'win32': # Windows下可能需要额外等待 time.sleep(0.5) print("监控系统已停止") class ROILogic: """单个ROI区域的逻辑处理""" def __init__(self, roi_cfg, cam_id, common_cfg, llm_client): self.cam_id = cam_id self.roi_name = roi_cfg.get('name', '未命名区域') self.llm_client = llm_client # 处理points:如果不存在或为空,设置为None(表示使用整张画面) if 'points' in roi_cfg and roi_cfg['points']: self.roi_points = np.array(roi_cfg['points'], dtype=np.int32) self.use_full_frame = False else: # 对于周界入侵算法,如果没有points,使用整张画面 self.roi_points = None self.use_full_frame = True # 算法配置 self.algorithms = {} for alg_cfg in roi_cfg.get('algorithms', []): alg_name = alg_cfg['name'] if alg_cfg.get('enabled', False): self.algorithms[alg_name] = alg_cfg # 人员离岗相关状态(需要ROI,如果没有points则不能启用) if '人员离岗' in self.algorithms: if self.roi_points is None: print(f"[{cam_id}] 警告:{self.roi_name} 启用了人员离岗算法但没有配置points,已禁用") del self.algorithms['人员离岗'] else: alg_cfg = self.algorithms['人员离岗'] self.off_duty_threshold_sec = alg_cfg.get('off_duty_threshold_sec', 300) self.on_duty_confirm_sec = alg_cfg.get('on_duty_confirm_sec', 5) self.off_duty_confirm_sec = alg_cfg.get('off_duty_confirm_sec', 30) self.is_on_duty = False self.is_off_duty = True self.on_duty_start_time = None self.last_no_person_time = None self.off_duty_timer_start = None self.last_alert_time = 0 self.last_person_seen_time = None # 关键时间记录 self.on_duty_confirm_time = None # 上岗确认时间 self.off_duty_confirm_time = None # 离岗确认时间 self.key_frames = [] # 关键帧存储 # 初始化状态跟踪 self.initial_state_start_time = None # 初始化状态开始时间(进入工作时间时) self.has_ever_seen_person = False # 是否曾经检测到过人员 self.initial_state_frame = None # 初始化状态时的帧(用于大模型分析) # 周界入侵相关状态(如果没有points,使用整张画面) if '周界入侵' in self.algorithms: self.perimeter_last_check_time = 0 self.perimeter_alert_cooldown = 120 # 周界入侵告警冷却120秒(2分钟) if self.use_full_frame: print(f"[{cam_id}] 提示:{self.roi_name} 周界入侵算法将使用整张画面进行检测") def is_point_in_roi(self, x, y): """判断点是否在ROI内,如果没有ROI(use_full_frame=True),总是返回True""" if self.use_full_frame or self.roi_points is None: return True return cv2.pointPolygonTest(self.roi_points, (int(x), int(y)), False) >= 0 class CameraLogic: def __init__(self, cam_id, cam_cfg, common_cfg, llm_client): self.cam_id = cam_id self.llm_client = llm_client # 工作时间段配置 self.working_hours = common_cfg.get('working_hours', [[8, 30, 11, 0], [12, 0, 17, 30]]) self.process_every_n = cam_cfg.get('process_every_n_frames', common_cfg['process_every_n_frames']) self.alert_cooldown_sec = common_cfg.get('alert_cooldown_sec', 300) self.off_duty_alert_threshold_sec = common_cfg.get('off_duty_alert_threshold_sec', 360) # 6分钟 # 初始化所有ROI self.rois = [] for roi_cfg in cam_cfg.get('rois', []): self.rois.append(ROILogic(roi_cfg, cam_id, common_cfg, llm_client)) # 兼容旧配置格式 if 'roi_points' in cam_cfg: # 创建默认ROI用于人员离岗 default_roi = { 'name': '离岗检测区域', 'points': cam_cfg['roi_points'], 'algorithms': [{ 'name': '人员离岗', 'enabled': True, 'off_duty_threshold_sec': cam_cfg.get('off_duty_threshold_sec', 300), 'on_duty_confirm_sec': cam_cfg.get('on_duty_confirm_sec', 5), 'off_duty_confirm_sec': cam_cfg.get('off_duty_confirm_sec', 30) }] } self.rois.append(ROILogic(default_roi, cam_id, common_cfg, llm_client)) self.frame_count = 0 self.display_frame = None # 用于显示的帧 self.display_results = None # 用于显示的检测结果(YOLO results) def should_skip_frame(self): self.frame_count += 1 return self.frame_count % self.process_every_n != 0 def has_enabled_algorithm(self, alg_name): """检查是否有ROI启用了指定算法""" return any(alg_name in roi.algorithms for roi in self.rois) def in_working_hours(self): """判断是否在工作时间段内""" now = datetime.datetime.now() h, m = now.hour, now.minute current_minutes = h * 60 + m for period in self.working_hours: start_h, start_m, end_h, end_m = period start_minutes = start_h * 60 + start_m end_minutes = end_h * 60 + end_m if start_minutes <= current_minutes < end_minutes: return True return False def is_edge_time(self): """判断是否为边缘时间段(8:30-9:00, 11:00-12:00, 17:30-18:00)""" now = datetime.datetime.now() h, m = now.hour, now.minute current_minutes = h * 60 + m edge_periods = [ (8 * 60 + 30, 9 * 60), # 8:30-9:00 (11 * 60, 12 * 60), # 11:00-12:00 (17 * 60 + 30, 18 * 60) # 17:30-18:00 ] for start, end in edge_periods: if start <= current_minutes < end: return True return False def get_end_of_work_time(self): """获取当天工作结束时间(17:30)""" now = datetime.datetime.now() end_time = now.replace(hour=17, minute=30, second=0, microsecond=0) if now > end_time: # 如果已经过了17:30,返回明天的17:30 end_time += datetime.timedelta(days=1) return end_time def process_off_duty(self, frame, results): """处理人员离岗检测""" current_time = time.time() now = datetime.datetime.now() boxes = results.boxes for roi in self.rois: if '人员离岗' not in roi.algorithms: continue # 检查ROI中是否有人 roi_has_person = any( roi.is_point_in_roi((b.xyxy[0][0] + b.xyxy[0][2]) / 2, (b.xyxy[0][1] + b.xyxy[0][3]) / 2) for b in boxes ) in_work = self.in_working_hours() is_edge = self.is_edge_time() if in_work: # 初始化状态跟踪:如果刚进入工作时间,记录开始时间 if roi.initial_state_start_time is None: roi.initial_state_start_time = current_time roi.has_ever_seen_person = False roi.initial_state_frame = frame.copy() # 保存初始化状态时的帧 if roi_has_person: roi.last_person_seen_time = current_time roi.has_ever_seen_person = True # 如果检测到人员,清除初始化状态 if roi.initial_state_start_time is not None: roi.initial_state_start_time = None roi.initial_state_frame = None effective = ( roi.last_person_seen_time is not None and (current_time - roi.last_person_seen_time) < 1.0 ) # 处理初始化状态:如果系统启动时没有人,且超过离岗确认时间 if not roi.has_ever_seen_person and roi.initial_state_start_time is not None: elapsed_since_start = current_time - roi.initial_state_start_time if elapsed_since_start >= roi.off_duty_confirm_sec: # 超过离岗确认时间,触发离岗确认逻辑 roi.is_off_duty, roi.is_on_duty = True, False roi.off_duty_confirm_time = roi.initial_state_start_time + roi.off_duty_confirm_sec # 使用离岗确认时间点 roi.off_duty_timer_start = current_time # 保存关键帧(使用初始化状态时的帧作为离岗确认帧) if roi.initial_state_frame is not None: roi.key_frames.append({ 'frame': roi.initial_state_frame.copy(), 'time': datetime.datetime.fromtimestamp(roi.off_duty_confirm_time).strftime('%Y-%m-%d %H:%M:%S'), 'event': '离岗确认(初始化状态)' }) # 也保存当前帧 roi.key_frames.append({ 'frame': frame.copy(), 'time': now.strftime('%Y-%m-%d %H:%M:%S'), 'event': '当前状态' }) print(f"[{self.cam_id}] [{roi.roi_name}] 🚪 初始化状态:超过离岗确认时间,进入离岗计时 ({now.strftime('%H:%M:%S')})") roi.initial_state_start_time = None # 清除初始化状态标记 roi.initial_state_frame = None if effective: roi.last_no_person_time = None if roi.is_off_duty: if roi.on_duty_start_time is None: roi.on_duty_start_time = current_time elif current_time - roi.on_duty_start_time >= roi.on_duty_confirm_sec: roi.is_on_duty, roi.is_off_duty = True, False roi.on_duty_confirm_time = current_time roi.on_duty_start_time = None # 保存关键帧 roi.key_frames.append({ 'frame': frame.copy(), 'time': now.strftime('%Y-%m-%d %H:%M:%S'), 'event': '上岗确认' }) print(f"[{self.cam_id}] [{roi.roi_name}] ✅ 上岗确认成功 ({now.strftime('%H:%M:%S')})") else: roi.on_duty_start_time = None roi.last_person_seen_time = None if not roi.is_off_duty: if roi.last_no_person_time is None: roi.last_no_person_time = current_time elif current_time - roi.last_no_person_time >= roi.off_duty_confirm_sec: roi.is_off_duty, roi.is_on_duty = True, False roi.off_duty_confirm_time = current_time roi.last_no_person_time = None roi.off_duty_timer_start = current_time # 保存关键帧 roi.key_frames.append({ 'frame': frame.copy(), 'time': now.strftime('%Y-%m-%d %H:%M:%S'), 'event': '离岗确认' }) print(f"[{self.cam_id}] [{roi.roi_name}] 🚪 进入离岗计时 ({now.strftime('%H:%M:%S')})") # 离岗告警逻辑(边缘时间只记录,不告警) if roi.is_off_duty and roi.off_duty_timer_start: elapsed = current_time - roi.off_duty_timer_start off_duty_duration = elapsed # 如果到了下班时间还没回来,计算到下班时间的离岗时长 end_time = self.get_end_of_work_time() if now >= end_time and roi.off_duty_confirm_time: # 计算离岗时长:下班时间 - 离岗确认时间 off_duty_duration = (end_time.timestamp() - roi.off_duty_confirm_time) # 超过6分钟且不在边缘时间,使用大模型判断 if off_duty_duration >= self.off_duty_alert_threshold_sec and not is_edge: # 对于初始化状态,即使只有1帧也要进行分析 is_initial_state = any('初始化状态' in f.get('event', '') for f in roi.key_frames) min_frames_required = 1 if is_initial_state else 2 if self.llm_client and len(roi.key_frames) >= min_frames_required: # 限制关键帧数量,只保留最近10帧 if len(roi.key_frames) > 10: roi.key_frames = roi.key_frames[-10:] # 准备关键帧信息 key_frames_info = { 'frames': roi.key_frames[-5:] if len(roi.key_frames) >= 2 else roi.key_frames, # 如果有足够帧,取最近5帧;否则全部使用 'off_duty_duration': off_duty_duration } # 调用大模型分析 exceeds_6min, is_same_person, analysis_result = self.llm_client.analyze_off_duty_duration( key_frames_info, self.cam_id ) # 对于初始化状态,只要超过6分钟就告警(因为无法判断是否为同一人) if is_initial_state: should_alert = exceeds_6min if exceeds_6min is not None else (off_duty_duration >= self.off_duty_alert_threshold_sec) else: should_alert = exceeds_6min and is_same_person if should_alert: if (current_time - roi.last_alert_time) >= self.alert_cooldown_sec: print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 离岗告警!离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)") print(f"大模型分析结果: {analysis_result}") # 保存告警图片 save_alert_image( frame.copy(), self.cam_id, roi.roi_name, "离岗", f"离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)\n大模型分析结果:\n{analysis_result}" ) roi.last_alert_time = current_time elif not is_edge: # 如果没有大模型,直接告警 if (current_time - roi.last_alert_time) >= self.alert_cooldown_sec: print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 离岗告警!离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)") # 保存告警图片 save_alert_image( frame.copy(), self.cam_id, roi.roi_name, "离岗", f"离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)" ) roi.last_alert_time = current_time elif is_edge and roi.off_duty_confirm_time: # 边缘时间只记录,不告警 print(f"[{self.cam_id}] [{roi.roi_name}] ℹ️ 边缘时间段,记录离岗时长: {int(off_duty_duration)}秒") self.display_frame = frame.copy() self.display_results = results # 保存检测结果用于显示 def crop_roi(self, frame, roi_points): """裁剪ROI区域,如果roi_points为None,返回整张画面""" if roi_points is None: return frame.copy() x_coords = roi_points[:, 0] y_coords = roi_points[:, 1] x_min, x_max = int(x_coords.min()), int(x_coords.max()) y_min, y_max = int(y_coords.min()), int(y_coords.max()) # 确保坐标在图像范围内 h, w = frame.shape[:2] x_min = max(0, x_min) y_min = max(0, y_min) x_max = min(w, x_max) y_max = min(h, y_max) roi_frame = frame[y_min:y_max, x_min:x_max] # 创建掩码 mask = np.zeros(frame.shape[:2], dtype=np.uint8) cv2.fillPoly(mask, [roi_points], 255) mask_roi = mask[y_min:y_max, x_min:x_max] # 应用掩码 if len(roi_frame.shape) == 3: mask_roi = cv2.cvtColor(mask_roi, cv2.COLOR_GRAY2BGR) roi_frame = cv2.bitwise_and(roi_frame, mask_roi) return roi_frame def process_perimeter(self, frame, results): """处理周界入侵检测""" current_time = time.time() boxes = results.boxes for roi in self.rois: if '周界入侵' not in roi.algorithms: continue # 检查ROI中是否有人(如果没有ROI,检查整张画面是否有人) if roi.use_full_frame: # 使用整张画面,只要检测到人就触发 roi_has_person = len(boxes) > 0 else: # 检查ROI中是否有人 roi_has_person = any( roi.is_point_in_roi((b.xyxy[0][0] + b.xyxy[0][2]) / 2, (b.xyxy[0][1] + b.xyxy[0][3]) / 2) for b in boxes ) if roi_has_person: # 冷却时间检查 if current_time - roi.perimeter_last_check_time >= roi.perimeter_alert_cooldown: roi.perimeter_last_check_time = current_time # 裁剪ROI区域(如果没有ROI,使用整张画面) roi_frame = self.crop_roi(frame, roi.roi_points) # 调用大模型判断是否为工作人员 if self.llm_client: is_staff, result = self.llm_client.check_if_staff(roi_frame, self.cam_id, roi.roi_name) area_desc = "整张画面" if roi.use_full_frame else roi.roi_name if is_staff is None: # 无人情况 print(f"[{self.cam_id}] [{roi.roi_name}] ℹ️ 大模型判断:{result}") elif not is_staff: # 非工作人员 print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 周界入侵告警!检测到非工作人员(检测区域:{area_desc})") print(f"大模型判断结果: {result}") # 保存告警图片(使用区域描述作为名称,更清晰) save_alert_image( frame.copy(), self.cam_id, area_desc, # 使用area_desc而不是roi.roi_name "入侵", f"检测区域: {area_desc}\nROI名称: {roi.roi_name}\n大模型判断结果:\n{result}" ) else: # 工作人员 print(f"[{self.cam_id}] [{roi.roi_name}] ℹ️ 检测到工作人员,无需告警") print(f"大模型判断结果: {result}") else: # 没有大模型时,直接告警 area_desc = "整张画面" if roi.use_full_frame else roi.roi_name print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 周界入侵告警!检测到人员进入(检测区域:{area_desc})") # 保存告警图片(使用区域描述作为名称,更清晰) save_alert_image( frame.copy(), self.cam_id, area_desc, # 使用area_desc而不是roi.roi_name "入侵", f"检测区域: {area_desc}\nROI名称: {roi.roi_name}\n检测到人员进入" ) self.display_frame = frame.copy() self.display_results = results # 保存检测结果用于显示 def update_display(self): """更新显示""" if self.display_frame is None: return vis = self.display_frame.copy() now = datetime.datetime.now() in_work = self.in_working_hours() # 如果有检测结果,先绘制YOLO识别框 if self.display_results is not None: vis = self.display_results.plot() # 使用YOLO的plot方法绘制识别框 # 检查是否有启用人员离岗算法的ROI has_off_duty_algorithm = any('人员离岗' in roi.algorithms for roi in self.rois) # 绘制所有ROI full_frame_roi_count = 0 # 用于跟踪使用整张画面的ROI数量,避免文本重叠 for roi in self.rois: color = (0, 255, 0) # 默认绿色 thickness = 2 # 根据算法状态设置颜色 if '人员离岗' in roi.algorithms: if roi.is_on_duty: color = (0, 255, 0) # 绿色:在岗 elif roi.is_off_duty and roi.off_duty_timer_start: elapsed = time.time() - roi.off_duty_timer_start if elapsed >= roi.off_duty_threshold_sec: color = (0, 0, 255) # 红色:离岗告警 else: color = (0, 255, 255) # 黄色:离岗中 else: color = (255, 0, 0) # 蓝色:未在岗 if '周界入侵' in roi.algorithms: color = (255, 255, 0) # 青色:周界入侵区域 # 如果有ROI,绘制ROI框 if roi.roi_points is not None: cv2.polylines(vis, [roi.roi_points], True, color, thickness) # 创建半透明覆盖层 overlay = vis.copy() cv2.fillPoly(overlay, [roi.roi_points], color) cv2.addWeighted(overlay, 0.2, vis, 0.8, 0, vis) # 显示ROI名称(使用中文文本绘制函数) text_pos = tuple(roi.roi_points[0]) vis = put_chinese_text(vis, roi.roi_name, text_pos, font_size=20, color=color, thickness=1) else: # 如果没有ROI(使用整张画面),在左上角显示提示,避免重叠 display_text = f"{roi.roi_name} (整张画面)" text_y = 30 + full_frame_roi_count * 25 # 每个ROI文本向下偏移25像素 vis = put_chinese_text(vis, display_text, (10, text_y), font_size=18, color=color, thickness=1) full_frame_roi_count += 1 # 只在启用人员离岗算法时显示岗位状态 if has_off_duty_algorithm: status = "OUT OF HOURS" status_color = (128, 128, 128) if in_work: # 检查所有ROI的状态 has_on_duty = any(roi.is_on_duty for roi in self.rois if '人员离岗' in roi.algorithms) has_off_duty = any(roi.is_off_duty and roi.off_duty_timer_start for roi in self.rois if '人员离岗' in roi.algorithms) if has_on_duty: status, status_color = "ON DUTY", (0, 255, 0) elif has_off_duty: status, status_color = "OFF DUTY", (0, 255, 255) else: status, status_color = "OFF DUTY", (255, 0, 0) cv2.putText(vis, f"[{self.cam_id}] {status}", (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, status_color, 2) # 显示时间戳(所有摄像头都显示) cv2.putText(vis, now.strftime('%Y-%m-%d %H:%M:%S'), (20, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) cv2.imshow(f"Monitor - {self.cam_id}", vis) def main(): import signal import sys parser = argparse.ArgumentParser() parser.add_argument("--config", default="config.yaml", help="配置文件路径") args = parser.parse_args() monitor = None try: monitor = MultiCameraMonitor(args.config) # 注册信号处理,确保优雅退出 def signal_handler(sig, frame): print("\n收到退出信号,正在关闭...") if monitor: monitor.stop() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) if sys.platform != 'win32': signal.signal(signal.SIGTERM, signal_handler) monitor.run() except KeyboardInterrupt: print("\n收到键盘中断,正在关闭...") except Exception as e: print(f"程序异常: {e}") import traceback traceback.print_exc() finally: if monitor: monitor.stop() # 确保进程退出 sys.exit(0) if __name__ == "__main__": main()