From 1a94854c52904d34c1b3cf9e2419417aec647dde Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Thu, 22 Jan 2026 10:57:19 +0800 Subject: [PATCH] =?UTF-8?q?chore:=E5=88=A0=E9=99=A4=E6=97=A0=E5=85=B3?= =?UTF-8?q?=E5=8E=9F=E5=A7=8B=E7=AE=97=E6=B3=95=20Python=20=E6=96=87?= =?UTF-8?q?=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- monitor.py | 1137 ---------------------------------------------------- 1 file changed, 1137 deletions(-) delete mode 100644 monitor.py diff --git a/monitor.py b/monitor.py deleted file mode 100644 index faa680d..0000000 --- a/monitor.py +++ /dev/null @@ -1,1137 +0,0 @@ -import cv2 -import numpy as np -import yaml -import torch -from ultralytics import YOLO -import time -import datetime -import threading -import queue -import sys -import argparse -import base64 -import os -from openai import OpenAI -from io import BytesIO -from PIL import Image, ImageDraw, ImageFont - - -def save_alert_image(frame, cam_id, roi_name, alert_type, alert_info=""): - """保存告警图片 - - Args: - frame: OpenCV图像 - cam_id: 摄像头ID - roi_name: ROI区域名称 - alert_type: 告警类型 ("离岗" 或 "入侵") - alert_info: 告警信息(可选) - """ - try: - # 创建文件夹结构 - data_dir = "data" - alert_dir = os.path.join(data_dir, alert_type) - - os.makedirs(alert_dir, exist_ok=True) - - # 生成文件名:根据告警类型使用不同的命名方式 - timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - # 清理文件名中的特殊字符 - safe_roi_name = roi_name.replace("/", "_").replace("\\", "_").replace(":", "_") - - # 对于入侵告警,使用告警类型+ROI名称;对于离岗告警,使用ROI名称 - if alert_type == "入侵": - # 周界入侵:使用"入侵_区域名称"格式 - filename = f"{cam_id}_入侵_{safe_roi_name}_{timestamp}.jpg" - else: - # 离岗:使用原有格式 - filename = f"{cam_id}_{safe_roi_name}_{timestamp}.jpg" - - filepath = os.path.join(alert_dir, filename) - - # 保存图片 - cv2.imwrite(filepath, frame) - print(f"[{cam_id}] 💾 告警图片已保存: {filepath}") - - # 如果有告警信息,保存到文本文件 - if alert_info: - info_filepath = filepath.replace(".jpg", ".txt") - with open(info_filepath, 'w', encoding='utf-8') as f: - f.write(f"告警时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") - f.write(f"摄像头ID: {cam_id}\n") - f.write(f"ROI区域: {roi_name}\n") - f.write(f"告警类型: {alert_type}\n") - f.write(f"告警信息:\n{alert_info}\n") - - return filepath - except Exception as e: - print(f"[{cam_id}] 保存告警图片失败: {e}") - return None - - -def put_chinese_text(img, text, position, font_size=20, color=(255, 255, 255), thickness=1): - """在OpenCV图像上绘制中文文本 - - Args: - img: OpenCV图像 (BGR格式) - text: 要显示的文本(支持中文) - position: 文本位置 (x, y) - font_size: 字体大小 - color: 颜色 (BGR格式,会被转换为RGB) - thickness: 线条粗细(PIL不支持,保留参数以兼容) - """ - try: - # 将OpenCV图像转换为PIL图像 - img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) - draw = ImageDraw.Draw(img_pil) - - # 转换颜色格式:BGR -> RGB - color_rgb = (color[2], color[1], color[0]) - - # 尝试使用系统字体 - font = None - font_paths = [ - "C:/Windows/Fonts/simhei.ttf", # 黑体 - "C:/Windows/Fonts/msyh.ttc", # 微软雅黑 - "C:/Windows/Fonts/simsun.ttc", # 宋体 - "C:/Windows/Fonts/msyhbd.ttc", # 微软雅黑 Bold - ] - - for font_path in font_paths: - if os.path.exists(font_path): - try: - font = ImageFont.truetype(font_path, font_size) - break - except: - continue - - # 如果找不到字体,使用默认字体 - if font is None: - font = ImageFont.load_default() - - # 绘制文本 - draw.text(position, text, font=font, fill=color_rgb) - - # 转换回OpenCV格式 - img = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR) - return img - except Exception as e: - # 如果绘制失败,使用英文替代或直接返回原图 - print(f"中文文本绘制失败: {e},使用OpenCV默认字体") - # 降级方案:使用OpenCV绘制(可能显示为问号,但至少不会崩溃) - cv2.putText(img, text.encode('utf-8').decode('latin-1', 'ignore'), position, - cv2.FONT_HERSHEY_SIMPLEX, font_size/40, color, thickness) - return img - - -class LLMClient: - """大模型客户端,用于人员判断和离岗分析""" - def __init__(self, api_key, base_url, model_name): - self.client = OpenAI( - api_key=api_key, - base_url=base_url, - ) - self.model_name = model_name - - def frame_to_base64(self, frame): - """将OpenCV帧转换为base64编码""" - _, buffer = cv2.imencode('.jpg', frame) - img_base64 = base64.b64encode(buffer).decode('utf-8') - return img_base64 - - def check_if_staff(self, frame, cam_id, roi_name): - """判断ROI中的人员是否为工作人员""" - try: - img_base64 = self.frame_to_base64(frame) - prompt = f"""你是一个智能安防辅助系统,负责对监控画面中指定敏感区域(如高配间门口、天台、禁行通道)的人员活动进行分析。 - -请根据以下规则生成结构化响应: - -### 【判定标准】 -✅ **本单位物业员工**需满足下列条件之一: -1. **清晰可见的正式工牌**(胸前佩戴) -2. **穿着标准制服**(如:带有白色反光条的深色工程服、黄蓝工程服、白衬衫+黑领带、蓝色清洁装、浅色客服装等) -3. **行为符合岗位规范**(如巡检、维修、清洁,无徘徊、张望、翻越) - -> 注意: 满足部分关键条件(如戴有安全帽、穿有工作人员服饰、带有工牌)→ 视为员工,不生成告警。 - -### 【输出规则】 -#### 情况1:ROI区域内**无人** -→ 输出: -🟢无异常:敏感区域当前无人员活动。 -[客观描述:画面整体状态] - -#### 情况2:ROI区域内**有本单位员工** -→ 输出: -🟢无异常:检测到本单位工作人员正常作业。 -[客观描述:人数+制服类型+工牌状态+行为] - -#### 情况3:ROI区域内**有非员工或身份不明人员** -→ 输出: -🚨[区域类型]入侵告警:检测到疑似非工作人员,请立即核查。 -[客观描述:人数+衣着+工牌状态+位置+行为] - -### 【描述要求】 -- 所有描述必须**≤30字** -- 仅陈述**可观察事实**,禁止主观推测(如"意图破坏""形迹可疑") -- 使用简洁、标准化语言 - -### 【示例】 -▶ 示例1(无人): -🟢无异常:敏感区域当前无人员活动。 -高配间门口区域空旷,无人员进入。 - -▶ 示例2(员工): -🟢无异常:检测到本单位工作人员正常作业。 -1名工程人员穿带有反光条的深蓝色工服在高配间巡检。 - -▶ 示例3(非员工): -🚨天台区域入侵告警:检测到疑似非工作人员,请立即核查。 -1人穿绿色外套未佩戴工牌进入天台区域。 - ---- -请分析摄像头{cam_id}的{roi_name}区域,按照上述格式输出结果。""" - - response = self.client.chat.completions.create( - model=self.model_name, - messages=[ - { - "role": "user", - "content": [ - { - "type": "image_url", - "image_url": { - "url": f"data:image/jpeg;base64,{img_base64}" - } - }, - {"type": "text", "text": prompt} - ] - } - ] - ) - result_text = response.choices[0].message.content - - # 判断是否为工作人员(根据新的输出格式) - # 如果包含"🟢无异常"且包含"工作人员",则为员工 - # 如果包含"🚨"或"入侵告警"或"非工作人员",则为非员工 - is_staff = False - if "🟢无异常" in result_text and "工作人员" in result_text: - is_staff = True - elif "🚨" in result_text or "入侵告警" in result_text or "非工作人员" in result_text: - is_staff = False - elif "无人员活动" in result_text or "无人" in result_text: - is_staff = None # 无人情况 - - return is_staff, result_text - except Exception as e: - print(f"[{cam_id}] 大模型调用失败: {e}") - return None, str(e) - - def analyze_off_duty_duration(self, key_frames_info, cam_id): - """分析离岗时长并判断是否为同一人""" - try: - frames = key_frames_info.get('frames', []) - if not frames: - return False, False, "无关键帧" - - off_duty_duration = key_frames_info.get('off_duty_duration', 0) - duration_minutes = int(off_duty_duration / 60) - duration_seconds = int(off_duty_duration % 60) - - # 构建消息内容 - content_parts = [ - { - "type": "text", - "text": f"""请分析以下关键帧图像,判断人员离岗情况。请按照以下格式简洁回答: - -【输出格式】 -1. 是否告警:[是/否] -2. 离岗时间:{duration_minutes}分{duration_seconds}秒 -3. 是否为同一人:[是/否/无法确定] -4. 简要分析:[一句话概括,不超过30字] - -要求: -- 如果离岗时间超过6分钟且确认为同一人,则告警 -- 简要分析需客观描述关键帧中人员的特征和行为变化 -- 回答要简洁明了,避免冗余描述 - -关键帧信息:""" - } - ] - - # 添加图像和说明 - for i, frame_info in enumerate(frames): - img_base64 = self.frame_to_base64(frame_info['frame']) - content_parts.append({ - "type": "image_url", - "image_url": { - "url": f"data:image/jpeg;base64,{img_base64}" - } - }) - content_parts.append({ - "type": "text", - "text": f"关键帧{i+1} - 时间: {frame_info['time']}, 事件: {frame_info['event']}" - }) - - response = self.client.chat.completions.create( - model=self.model_name, - messages=[ - { - "role": "user", - "content": content_parts - } - ] - ) - result = response.choices[0].message.content - - # 解析结果 - 更灵活的解析逻辑 - # 判断是否告警 - exceeds_6min = False - if duration_minutes >= 6: - # 如果时间已经超过6分钟,检查大模型是否确认告警 - if any(keyword in result for keyword in ["是否告警:是", "是否告警:是", "告警:是", "告警:是", "需要告警", "应告警"]): - exceeds_6min = True - elif "是否告警:否" not in result and "是否告警:否" not in result: - # 如果没有明确说否,且时间超过6分钟,默认告警 - exceeds_6min = True - else: - # 时间未超过6分钟,即使大模型说告警也不告警 - exceeds_6min = False - - # 判断是否为同一人 - is_same_person = False - if any(keyword in result for keyword in ["是否为同一人:是", "是否为同一人:是", "同一人:是", "同一人:是", "是同一人", "确认为同一人"]): - is_same_person = True - elif any(keyword in result for keyword in ["是否为同一人:否", "是否为同一人:否", "同一人:否", "同一人:否", "不是同一人", "非同一人"]): - is_same_person = False - elif "无法确定" in result or "不确定" in result: - is_same_person = False # 无法确定时,不告警 - - return exceeds_6min, is_same_person, result - except Exception as e: - print(f"[{cam_id}] 离岗分析失败: {e}") - return None, None, str(e) - - -class ThreadedFrameReader: - def __init__(self, cam_id, rtsp_url): - self.cam_id = cam_id - self.rtsp_url = rtsp_url - self._lock = threading.Lock() # 添加锁保护VideoCapture访问 - self.cap = None - try: - self.cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG) - if self.cap.isOpened(): - self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) - else: - print(f"[{cam_id}] 警告:无法打开视频流: {rtsp_url}") - except Exception as e: - print(f"[{cam_id}] 初始化VideoCapture失败: {e}") - self.q = queue.Queue(maxsize=2) - self.running = True - self.thread = threading.Thread(target=self._reader, daemon=True) - self.thread.start() - - def _reader(self): - """读取帧的线程函数""" - try: - while self.running: - with self._lock: - if self.cap is None or not self.cap.isOpened(): - break - ret, frame = self.cap.read() - - if not ret: - time.sleep(0.1) - continue - - if self.q.full(): - try: - self.q.get_nowait() - except queue.Empty: - pass - self.q.put(frame) - except Exception as e: - print(f"[{self.cam_id}] 读取帧线程异常: {e}") - finally: - # 确保资源释放(使用锁保护) - with self._lock: - if self.cap is not None: - try: - if self.cap.isOpened(): - self.cap.release() - except Exception as e: - print(f"[{self.cam_id}] 释放VideoCapture时出错: {e}") - finally: - self.cap = None - - def read(self): - if not self.q.empty(): - return True, self.q.get() - return False, None - - def release(self): - """释放资源,等待线程结束""" - if not self.running: - return # 已经释放过了 - - self.running = False - - # 等待线程结束,最多等待3秒 - if self.thread.is_alive(): - self.thread.join(timeout=3.0) - if self.thread.is_alive(): - print(f"[{self.cam_id}] 警告:读取线程未能在3秒内结束") - - # 清空队列 - while not self.q.empty(): - try: - self.q.get_nowait() - except queue.Empty: - break - - # VideoCapture的释放由_reader线程的finally块处理,这里不再重复释放 - - -class MultiCameraMonitor: - def __init__(self, config_path): - with open(config_path, 'r', encoding='utf-8') as f: - self.cfg = yaml.safe_load(f) - - # === 全局模型(只加载一次)=== - model_cfg = self.cfg['model'] - self.device = model_cfg.get('device', 'auto') - if self.device == 'auto' or not self.device: - self.device = 'cuda' if torch.cuda.is_available() else 'cpu' - print(f"🚀 全局加载模型到 {self.device}...") - self.model = YOLO(model_cfg['path']) - self.model.to(self.device) - self.use_half = (self.device == 'cuda') - if self.use_half: - print("✅ 启用 FP16 推理") - - self.imgsz = model_cfg['imgsz'] - self.conf_thresh = model_cfg['conf_threshold'] - - # === 初始化大模型客户端 === - llm_cfg = self.cfg.get('llm', {}) - if llm_cfg.get('api_key'): - self.llm_client = LLMClient( - llm_cfg['api_key'], - llm_cfg['base_url'], - llm_cfg.get('model_name', 'qwen-vl-max') - ) - print("✅ 大模型客户端已初始化") - else: - self.llm_client = None - print("⚠️ 未配置大模型API密钥,大模型功能将不可用") - - # === 初始化所有摄像头 === - self.common = self.cfg['common'] - self.cameras = {} - self.frame_readers = {} - self.queues = {} # cam_id -> queue for detection results - self.perimeter_queues = {} # cam_id -> queue for perimeter detection (每秒抽帧) - - for cam_cfg in self.cfg['cameras']: - cam_id = cam_cfg['id'] - self.cameras[cam_id] = CameraLogic(cam_id, cam_cfg, self.common, self.llm_client) - self.frame_readers[cam_id] = ThreadedFrameReader(cam_id, cam_cfg['rtsp_url']) - self.queues[cam_id] = queue.Queue(maxsize=1) # 存放检测结果(人员离岗) - self.perimeter_queues[cam_id] = queue.Queue(maxsize=1) # 存放检测结果(周界入侵) - - # === 控制信号 === - self.running = True - self.inference_thread = threading.Thread(target=self._inference_loop, daemon=True) - self.perimeter_thread = threading.Thread(target=self._perimeter_inference_loop, daemon=True) - self.inference_thread.start() - self.perimeter_thread.start() - - def _inference_loop(self): - """统一推理线程:轮询各摄像头最新帧,逐个推理(用于人员离岗)""" - while self.running: - processed = False - for cam_id, reader in self.frame_readers.items(): - ret, frame = reader.read() - if not ret: - continue - - cam_logic = self.cameras[cam_id] - if cam_logic.should_skip_frame(): - continue - - # 检查是否有ROI启用了人员离岗算法 - if not cam_logic.has_enabled_algorithm('人员离岗'): - continue - - results = self.model( - frame, - imgsz=self.imgsz, - conf=self.conf_thresh, - verbose=False, - device=self.device, - half=self.use_half, - classes=[0] # person only - ) - - if not self.queues[cam_id].full(): - self.queues[cam_id].put((frame.copy(), results[0])) - processed = True - - if not processed: - time.sleep(0.01) - - def _perimeter_inference_loop(self): - """周界入侵推理线程:每秒抽一帧进行检测""" - while self.running: - processed = False - for cam_id, reader in self.frame_readers.items(): - cam_logic = self.cameras[cam_id] - # 检查是否有ROI启用了周界入侵算法 - if not cam_logic.has_enabled_algorithm('周界入侵'): - continue - - ret, frame = reader.read() - if not ret: - continue - - # 每秒抽一帧 - current_time = time.time() - if not hasattr(cam_logic, 'last_perimeter_check_time'): - cam_logic.last_perimeter_check_time = {} - if cam_id not in cam_logic.last_perimeter_check_time: - cam_logic.last_perimeter_check_time[cam_id] = 0 - - if current_time - cam_logic.last_perimeter_check_time[cam_id] < 1.0: - continue - - cam_logic.last_perimeter_check_time[cam_id] = current_time - - results = self.model( - frame, - imgsz=self.imgsz, - conf=self.conf_thresh, - verbose=False, - device=self.device, - half=self.use_half, - classes=[0] # person only - ) - - if not self.perimeter_queues[cam_id].full(): - self.perimeter_queues[cam_id].put((frame.copy(), results[0])) - processed = True - - if not processed: - time.sleep(0.1) - - def run(self): - """启动所有摄像头的显示和告警逻辑(主线程)""" - try: - while self.running: - for cam_id, cam_logic in self.cameras.items(): - # 处理人员离岗检测结果 - if not self.queues[cam_id].empty(): - frame, results = self.queues[cam_id].get() - cam_logic.process_off_duty(frame, results) - - # 处理周界入侵检测结果 - if not self.perimeter_queues[cam_id].empty(): - frame, results = self.perimeter_queues[cam_id].get() - cam_logic.process_perimeter(frame, results) - - # 更新显示 - cam_logic.update_display() - - key = cv2.waitKey(1) & 0xFF - if key == ord('q'): - break - time.sleep(0.01) - except KeyboardInterrupt: - pass - finally: - self.stop() - - def stop(self): - """停止监控,清理所有资源""" - print("正在停止监控系统...") - self.running = False - - # 等待推理线程结束 - if hasattr(self, 'inference_thread') and self.inference_thread.is_alive(): - self.inference_thread.join(timeout=2.0) - - if hasattr(self, 'perimeter_thread') and self.perimeter_thread.is_alive(): - self.perimeter_thread.join(timeout=2.0) - - # 释放所有摄像头资源 - for cam_id, reader in self.frame_readers.items(): - try: - print(f"正在释放摄像头 {cam_id}...") - reader.release() - except Exception as e: - print(f"释放摄像头 {cam_id} 时出错: {e}") - - # 关闭所有窗口 - try: - cv2.destroyAllWindows() - except: - pass - - # 强制清理(如果还有线程在运行) - import sys - import os - if sys.platform == 'win32': - # Windows下可能需要额外等待 - time.sleep(0.5) - - print("监控系统已停止") - - -class ROILogic: - """单个ROI区域的逻辑处理""" - def __init__(self, roi_cfg, cam_id, common_cfg, llm_client): - self.cam_id = cam_id - self.roi_name = roi_cfg.get('name', '未命名区域') - self.llm_client = llm_client - - # 处理points:如果不存在或为空,设置为None(表示使用整张画面) - if 'points' in roi_cfg and roi_cfg['points']: - self.roi_points = np.array(roi_cfg['points'], dtype=np.int32) - self.use_full_frame = False - else: - # 对于周界入侵算法,如果没有points,使用整张画面 - self.roi_points = None - self.use_full_frame = True - - # 算法配置 - self.algorithms = {} - for alg_cfg in roi_cfg.get('algorithms', []): - alg_name = alg_cfg['name'] - if alg_cfg.get('enabled', False): - self.algorithms[alg_name] = alg_cfg - - # 人员离岗相关状态(需要ROI,如果没有points则不能启用) - if '人员离岗' in self.algorithms: - if self.roi_points is None: - print(f"[{cam_id}] 警告:{self.roi_name} 启用了人员离岗算法但没有配置points,已禁用") - del self.algorithms['人员离岗'] - else: - alg_cfg = self.algorithms['人员离岗'] - self.off_duty_threshold_sec = alg_cfg.get('off_duty_threshold_sec', 300) - self.on_duty_confirm_sec = alg_cfg.get('on_duty_confirm_sec', 5) - self.off_duty_confirm_sec = alg_cfg.get('off_duty_confirm_sec', 30) - - self.is_on_duty = False - self.is_off_duty = True - self.on_duty_start_time = None - self.last_no_person_time = None - self.off_duty_timer_start = None - self.last_alert_time = 0 - self.last_person_seen_time = None - - # 关键时间记录 - self.on_duty_confirm_time = None # 上岗确认时间 - self.off_duty_confirm_time = None # 离岗确认时间 - self.key_frames = [] # 关键帧存储 - - # 初始化状态跟踪 - self.initial_state_start_time = None # 初始化状态开始时间(进入工作时间时) - self.has_ever_seen_person = False # 是否曾经检测到过人员 - self.initial_state_frame = None # 初始化状态时的帧(用于大模型分析) - - # 周界入侵相关状态(如果没有points,使用整张画面) - if '周界入侵' in self.algorithms: - self.perimeter_last_check_time = 0 - self.perimeter_alert_cooldown = 120 # 周界入侵告警冷却120秒(2分钟) - if self.use_full_frame: - print(f"[{cam_id}] 提示:{self.roi_name} 周界入侵算法将使用整张画面进行检测") - - def is_point_in_roi(self, x, y): - """判断点是否在ROI内,如果没有ROI(use_full_frame=True),总是返回True""" - if self.use_full_frame or self.roi_points is None: - return True - return cv2.pointPolygonTest(self.roi_points, (int(x), int(y)), False) >= 0 - - -class CameraLogic: - def __init__(self, cam_id, cam_cfg, common_cfg, llm_client): - self.cam_id = cam_id - self.llm_client = llm_client - - # 工作时间段配置 - self.working_hours = common_cfg.get('working_hours', [[8, 30, 11, 0], [12, 0, 17, 30]]) - self.process_every_n = cam_cfg.get('process_every_n_frames', common_cfg['process_every_n_frames']) - self.alert_cooldown_sec = common_cfg.get('alert_cooldown_sec', 300) - self.off_duty_alert_threshold_sec = common_cfg.get('off_duty_alert_threshold_sec', 360) # 6分钟 - - # 初始化所有ROI - self.rois = [] - for roi_cfg in cam_cfg.get('rois', []): - self.rois.append(ROILogic(roi_cfg, cam_id, common_cfg, llm_client)) - - # 兼容旧配置格式 - if 'roi_points' in cam_cfg: - # 创建默认ROI用于人员离岗 - default_roi = { - 'name': '离岗检测区域', - 'points': cam_cfg['roi_points'], - 'algorithms': [{ - 'name': '人员离岗', - 'enabled': True, - 'off_duty_threshold_sec': cam_cfg.get('off_duty_threshold_sec', 300), - 'on_duty_confirm_sec': cam_cfg.get('on_duty_confirm_sec', 5), - 'off_duty_confirm_sec': cam_cfg.get('off_duty_confirm_sec', 30) - }] - } - self.rois.append(ROILogic(default_roi, cam_id, common_cfg, llm_client)) - - self.frame_count = 0 - self.display_frame = None # 用于显示的帧 - self.display_results = None # 用于显示的检测结果(YOLO results) - - def should_skip_frame(self): - self.frame_count += 1 - return self.frame_count % self.process_every_n != 0 - - def has_enabled_algorithm(self, alg_name): - """检查是否有ROI启用了指定算法""" - return any(alg_name in roi.algorithms for roi in self.rois) - - def in_working_hours(self): - """判断是否在工作时间段内""" - now = datetime.datetime.now() - h, m = now.hour, now.minute - current_minutes = h * 60 + m - - for period in self.working_hours: - start_h, start_m, end_h, end_m = period - start_minutes = start_h * 60 + start_m - end_minutes = end_h * 60 + end_m - if start_minutes <= current_minutes < end_minutes: - return True - return False - - def is_edge_time(self): - """判断是否为边缘时间段(8:30-9:00, 11:00-12:00, 17:30-18:00)""" - now = datetime.datetime.now() - h, m = now.hour, now.minute - current_minutes = h * 60 + m - - edge_periods = [ - (8 * 60 + 30, 9 * 60), # 8:30-9:00 - (11 * 60, 12 * 60), # 11:00-12:00 - (17 * 60 + 30, 18 * 60) # 17:30-18:00 - ] - - for start, end in edge_periods: - if start <= current_minutes < end: - return True - return False - - def get_end_of_work_time(self): - """获取当天工作结束时间(17:30)""" - now = datetime.datetime.now() - end_time = now.replace(hour=17, minute=30, second=0, microsecond=0) - if now > end_time: - # 如果已经过了17:30,返回明天的17:30 - end_time += datetime.timedelta(days=1) - return end_time - - def process_off_duty(self, frame, results): - """处理人员离岗检测""" - current_time = time.time() - now = datetime.datetime.now() - boxes = results.boxes - - for roi in self.rois: - if '人员离岗' not in roi.algorithms: - continue - - # 检查ROI中是否有人 - roi_has_person = any( - roi.is_point_in_roi((b.xyxy[0][0] + b.xyxy[0][2]) / 2, - (b.xyxy[0][1] + b.xyxy[0][3]) / 2) - for b in boxes - ) - - in_work = self.in_working_hours() - is_edge = self.is_edge_time() - - if in_work: - # 初始化状态跟踪:如果刚进入工作时间,记录开始时间 - if roi.initial_state_start_time is None: - roi.initial_state_start_time = current_time - roi.has_ever_seen_person = False - roi.initial_state_frame = frame.copy() # 保存初始化状态时的帧 - - if roi_has_person: - roi.last_person_seen_time = current_time - roi.has_ever_seen_person = True - # 如果检测到人员,清除初始化状态 - if roi.initial_state_start_time is not None: - roi.initial_state_start_time = None - roi.initial_state_frame = None - - effective = ( - roi.last_person_seen_time is not None and - (current_time - roi.last_person_seen_time) < 1.0 - ) - - # 处理初始化状态:如果系统启动时没有人,且超过离岗确认时间 - if not roi.has_ever_seen_person and roi.initial_state_start_time is not None: - elapsed_since_start = current_time - roi.initial_state_start_time - if elapsed_since_start >= roi.off_duty_confirm_sec: - # 超过离岗确认时间,触发离岗确认逻辑 - roi.is_off_duty, roi.is_on_duty = True, False - roi.off_duty_confirm_time = roi.initial_state_start_time + roi.off_duty_confirm_sec # 使用离岗确认时间点 - roi.off_duty_timer_start = current_time - - # 保存关键帧(使用初始化状态时的帧作为离岗确认帧) - if roi.initial_state_frame is not None: - roi.key_frames.append({ - 'frame': roi.initial_state_frame.copy(), - 'time': datetime.datetime.fromtimestamp(roi.off_duty_confirm_time).strftime('%Y-%m-%d %H:%M:%S'), - 'event': '离岗确认(初始化状态)' - }) - # 也保存当前帧 - roi.key_frames.append({ - 'frame': frame.copy(), - 'time': now.strftime('%Y-%m-%d %H:%M:%S'), - 'event': '当前状态' - }) - - print(f"[{self.cam_id}] [{roi.roi_name}] 🚪 初始化状态:超过离岗确认时间,进入离岗计时 ({now.strftime('%H:%M:%S')})") - roi.initial_state_start_time = None # 清除初始化状态标记 - roi.initial_state_frame = None - - if effective: - roi.last_no_person_time = None - if roi.is_off_duty: - if roi.on_duty_start_time is None: - roi.on_duty_start_time = current_time - elif current_time - roi.on_duty_start_time >= roi.on_duty_confirm_sec: - roi.is_on_duty, roi.is_off_duty = True, False - roi.on_duty_confirm_time = current_time - roi.on_duty_start_time = None - - # 保存关键帧 - roi.key_frames.append({ - 'frame': frame.copy(), - 'time': now.strftime('%Y-%m-%d %H:%M:%S'), - 'event': '上岗确认' - }) - - print(f"[{self.cam_id}] [{roi.roi_name}] ✅ 上岗确认成功 ({now.strftime('%H:%M:%S')})") - else: - roi.on_duty_start_time = None - roi.last_person_seen_time = None - if not roi.is_off_duty: - if roi.last_no_person_time is None: - roi.last_no_person_time = current_time - elif current_time - roi.last_no_person_time >= roi.off_duty_confirm_sec: - roi.is_off_duty, roi.is_on_duty = True, False - roi.off_duty_confirm_time = current_time - roi.last_no_person_time = None - roi.off_duty_timer_start = current_time - - # 保存关键帧 - roi.key_frames.append({ - 'frame': frame.copy(), - 'time': now.strftime('%Y-%m-%d %H:%M:%S'), - 'event': '离岗确认' - }) - - print(f"[{self.cam_id}] [{roi.roi_name}] 🚪 进入离岗计时 ({now.strftime('%H:%M:%S')})") - - # 离岗告警逻辑(边缘时间只记录,不告警) - if roi.is_off_duty and roi.off_duty_timer_start: - elapsed = current_time - roi.off_duty_timer_start - off_duty_duration = elapsed - - # 如果到了下班时间还没回来,计算到下班时间的离岗时长 - end_time = self.get_end_of_work_time() - if now >= end_time and roi.off_duty_confirm_time: - # 计算离岗时长:下班时间 - 离岗确认时间 - off_duty_duration = (end_time.timestamp() - roi.off_duty_confirm_time) - - # 超过6分钟且不在边缘时间,使用大模型判断 - if off_duty_duration >= self.off_duty_alert_threshold_sec and not is_edge: - # 对于初始化状态,即使只有1帧也要进行分析 - is_initial_state = any('初始化状态' in f.get('event', '') for f in roi.key_frames) - min_frames_required = 1 if is_initial_state else 2 - - if self.llm_client and len(roi.key_frames) >= min_frames_required: - # 限制关键帧数量,只保留最近10帧 - if len(roi.key_frames) > 10: - roi.key_frames = roi.key_frames[-10:] - - # 准备关键帧信息 - key_frames_info = { - 'frames': roi.key_frames[-5:] if len(roi.key_frames) >= 2 else roi.key_frames, # 如果有足够帧,取最近5帧;否则全部使用 - 'off_duty_duration': off_duty_duration - } - - # 调用大模型分析 - exceeds_6min, is_same_person, analysis_result = self.llm_client.analyze_off_duty_duration( - key_frames_info, self.cam_id - ) - - # 对于初始化状态,只要超过6分钟就告警(因为无法判断是否为同一人) - if is_initial_state: - should_alert = exceeds_6min if exceeds_6min is not None else (off_duty_duration >= self.off_duty_alert_threshold_sec) - else: - should_alert = exceeds_6min and is_same_person - - if should_alert: - if (current_time - roi.last_alert_time) >= self.alert_cooldown_sec: - print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 离岗告警!离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)") - print(f"大模型分析结果: {analysis_result}") - # 保存告警图片 - save_alert_image( - frame.copy(), - self.cam_id, - roi.roi_name, - "离岗", - f"离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)\n大模型分析结果:\n{analysis_result}" - ) - roi.last_alert_time = current_time - elif not is_edge: - # 如果没有大模型,直接告警 - if (current_time - roi.last_alert_time) >= self.alert_cooldown_sec: - print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 离岗告警!离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)") - # 保存告警图片 - save_alert_image( - frame.copy(), - self.cam_id, - roi.roi_name, - "离岗", - f"离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)" - ) - roi.last_alert_time = current_time - elif is_edge and roi.off_duty_confirm_time: - # 边缘时间只记录,不告警 - print(f"[{self.cam_id}] [{roi.roi_name}] ℹ️ 边缘时间段,记录离岗时长: {int(off_duty_duration)}秒") - - self.display_frame = frame.copy() - self.display_results = results # 保存检测结果用于显示 - - def crop_roi(self, frame, roi_points): - """裁剪ROI区域,如果roi_points为None,返回整张画面""" - if roi_points is None: - return frame.copy() - - x_coords = roi_points[:, 0] - y_coords = roi_points[:, 1] - x_min, x_max = int(x_coords.min()), int(x_coords.max()) - y_min, y_max = int(y_coords.min()), int(y_coords.max()) - - # 确保坐标在图像范围内 - h, w = frame.shape[:2] - x_min = max(0, x_min) - y_min = max(0, y_min) - x_max = min(w, x_max) - y_max = min(h, y_max) - - roi_frame = frame[y_min:y_max, x_min:x_max] - - # 创建掩码 - mask = np.zeros(frame.shape[:2], dtype=np.uint8) - cv2.fillPoly(mask, [roi_points], 255) - mask_roi = mask[y_min:y_max, x_min:x_max] - - # 应用掩码 - if len(roi_frame.shape) == 3: - mask_roi = cv2.cvtColor(mask_roi, cv2.COLOR_GRAY2BGR) - roi_frame = cv2.bitwise_and(roi_frame, mask_roi) - - return roi_frame - - def process_perimeter(self, frame, results): - """处理周界入侵检测""" - current_time = time.time() - boxes = results.boxes - - for roi in self.rois: - if '周界入侵' not in roi.algorithms: - continue - - # 检查ROI中是否有人(如果没有ROI,检查整张画面是否有人) - if roi.use_full_frame: - # 使用整张画面,只要检测到人就触发 - roi_has_person = len(boxes) > 0 - else: - # 检查ROI中是否有人 - roi_has_person = any( - roi.is_point_in_roi((b.xyxy[0][0] + b.xyxy[0][2]) / 2, - (b.xyxy[0][1] + b.xyxy[0][3]) / 2) - for b in boxes - ) - - if roi_has_person: - # 冷却时间检查 - if current_time - roi.perimeter_last_check_time >= roi.perimeter_alert_cooldown: - roi.perimeter_last_check_time = current_time - - # 裁剪ROI区域(如果没有ROI,使用整张画面) - roi_frame = self.crop_roi(frame, roi.roi_points) - - # 调用大模型判断是否为工作人员 - if self.llm_client: - is_staff, result = self.llm_client.check_if_staff(roi_frame, self.cam_id, roi.roi_name) - area_desc = "整张画面" if roi.use_full_frame else roi.roi_name - - if is_staff is None: - # 无人情况 - print(f"[{self.cam_id}] [{roi.roi_name}] ℹ️ 大模型判断:{result}") - elif not is_staff: - # 非工作人员 - print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 周界入侵告警!检测到非工作人员(检测区域:{area_desc})") - print(f"大模型判断结果: {result}") - # 保存告警图片(使用区域描述作为名称,更清晰) - save_alert_image( - frame.copy(), - self.cam_id, - area_desc, # 使用area_desc而不是roi.roi_name - "入侵", - f"检测区域: {area_desc}\nROI名称: {roi.roi_name}\n大模型判断结果:\n{result}" - ) - else: - # 工作人员 - print(f"[{self.cam_id}] [{roi.roi_name}] ℹ️ 检测到工作人员,无需告警") - print(f"大模型判断结果: {result}") - else: - # 没有大模型时,直接告警 - area_desc = "整张画面" if roi.use_full_frame else roi.roi_name - print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 周界入侵告警!检测到人员进入(检测区域:{area_desc})") - # 保存告警图片(使用区域描述作为名称,更清晰) - save_alert_image( - frame.copy(), - self.cam_id, - area_desc, # 使用area_desc而不是roi.roi_name - "入侵", - f"检测区域: {area_desc}\nROI名称: {roi.roi_name}\n检测到人员进入" - ) - - self.display_frame = frame.copy() - self.display_results = results # 保存检测结果用于显示 - - def update_display(self): - """更新显示""" - if self.display_frame is None: - return - - vis = self.display_frame.copy() - now = datetime.datetime.now() - in_work = self.in_working_hours() - - # 如果有检测结果,先绘制YOLO识别框 - if self.display_results is not None: - vis = self.display_results.plot() # 使用YOLO的plot方法绘制识别框 - - # 检查是否有启用人员离岗算法的ROI - has_off_duty_algorithm = any('人员离岗' in roi.algorithms for roi in self.rois) - - # 绘制所有ROI - full_frame_roi_count = 0 # 用于跟踪使用整张画面的ROI数量,避免文本重叠 - for roi in self.rois: - color = (0, 255, 0) # 默认绿色 - thickness = 2 - - # 根据算法状态设置颜色 - if '人员离岗' in roi.algorithms: - if roi.is_on_duty: - color = (0, 255, 0) # 绿色:在岗 - elif roi.is_off_duty and roi.off_duty_timer_start: - elapsed = time.time() - roi.off_duty_timer_start - if elapsed >= roi.off_duty_threshold_sec: - color = (0, 0, 255) # 红色:离岗告警 - else: - color = (0, 255, 255) # 黄色:离岗中 - else: - color = (255, 0, 0) # 蓝色:未在岗 - - if '周界入侵' in roi.algorithms: - color = (255, 255, 0) # 青色:周界入侵区域 - - # 如果有ROI,绘制ROI框 - if roi.roi_points is not None: - cv2.polylines(vis, [roi.roi_points], True, color, thickness) - # 创建半透明覆盖层 - overlay = vis.copy() - cv2.fillPoly(overlay, [roi.roi_points], color) - cv2.addWeighted(overlay, 0.2, vis, 0.8, 0, vis) - - # 显示ROI名称(使用中文文本绘制函数) - text_pos = tuple(roi.roi_points[0]) - vis = put_chinese_text(vis, roi.roi_name, text_pos, font_size=20, color=color, thickness=1) - else: - # 如果没有ROI(使用整张画面),在左上角显示提示,避免重叠 - display_text = f"{roi.roi_name} (整张画面)" - text_y = 30 + full_frame_roi_count * 25 # 每个ROI文本向下偏移25像素 - vis = put_chinese_text(vis, display_text, (10, text_y), font_size=18, color=color, thickness=1) - full_frame_roi_count += 1 - - # 只在启用人员离岗算法时显示岗位状态 - if has_off_duty_algorithm: - status = "OUT OF HOURS" - status_color = (128, 128, 128) - if in_work: - # 检查所有ROI的状态 - has_on_duty = any(roi.is_on_duty for roi in self.rois if '人员离岗' in roi.algorithms) - has_off_duty = any(roi.is_off_duty and roi.off_duty_timer_start - for roi in self.rois if '人员离岗' in roi.algorithms) - - if has_on_duty: - status, status_color = "ON DUTY", (0, 255, 0) - elif has_off_duty: - status, status_color = "OFF DUTY", (0, 255, 255) - else: - status, status_color = "OFF DUTY", (255, 0, 0) - - cv2.putText(vis, f"[{self.cam_id}] {status}", (20, 50), - cv2.FONT_HERSHEY_SIMPLEX, 1, status_color, 2) - - # 显示时间戳(所有摄像头都显示) - cv2.putText(vis, now.strftime('%Y-%m-%d %H:%M:%S'), (20, 90), - cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) - cv2.imshow(f"Monitor - {self.cam_id}", vis) - - -def main(): - import signal - import sys - - parser = argparse.ArgumentParser() - parser.add_argument("--config", default="config.yaml", help="配置文件路径") - args = parser.parse_args() - - monitor = None - try: - monitor = MultiCameraMonitor(args.config) - - # 注册信号处理,确保优雅退出 - def signal_handler(sig, frame): - print("\n收到退出信号,正在关闭...") - if monitor: - monitor.stop() - sys.exit(0) - - signal.signal(signal.SIGINT, signal_handler) - if sys.platform != 'win32': - signal.signal(signal.SIGTERM, signal_handler) - - monitor.run() - except KeyboardInterrupt: - print("\n收到键盘中断,正在关闭...") - except Exception as e: - print(f"程序异常: {e}") - import traceback - traceback.print_exc() - finally: - if monitor: - monitor.stop() - # 确保进程退出 - sys.exit(0) - - -if __name__ == "__main__": - main() \ No newline at end of file