From 35a1d1ae50ed6991465c6d4c8b863d0c17ad3c2d Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Wed, 14 Jan 2026 17:12:16 +0800 Subject: [PATCH] . --- config.yaml | 112 ++++-- monitor.py | 997 +++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 954 insertions(+), 155 deletions(-) diff --git a/config.yaml b/config.yaml index 83efea5..6162e80 100644 --- a/config.yaml +++ b/config.yaml @@ -4,40 +4,104 @@ model: conf_threshold: 0.5 device: "cuda" # cuda, cpu +# 大模型配置 +llm: + api_key: "sk-21e61bef09074682b589da3bdbfe07a2" # 请替换为实际的API密钥(阿里云DashScope API Key) + base_url: "https://dashscope.aliyuncs.com/compatible-mode/v1/" + model_name: "qwen3-vl-flash" # 模型名称,可选:qwen-vl-max, qwen-vl-plus, qwen3-vl-flash等 + common: - working_hours: [9, 17] # 工作时间:9:00 ~ 17:00(24小时制) - process_every_n_frames: 3 # 每3帧处理1帧 + # 工作时间段:支持多个时间段,格式为 [开始小时, 开始分钟, 结束小时, 结束分钟] + # 8:30-11:00, 12:00-17:30 + working_hours: + - [8, 30, 11, 0] # 8:30-11:00 + - [12, 0, 17, 30] # 12:00-17:30 + process_every_n_frames: 3 # 每3帧处理1帧(用于人员离岗) alert_cooldown_sec: 300 # 离岗告警冷却(秒) - crowd_cooldown_sec: 180 # 聚集告警冷却(秒) - entry_grace_period_sec: 1.0 # 入岗保护期(防漏检) + off_duty_alert_threshold_sec: 360 # 离岗超过6分钟(360秒)触发告警 cameras: - id: "cam_01" rtsp_url: "rtsp://admin:admin@172.16.8.19:554/cam/realmonitor?channel=16&subtype=1" - roi_points: [[380, 50], [530, 100], [550, 550], [140, 420]] # 离岗检测区域 - crowd_roi_points: [[220, 50], [380, 60], [180, 525], [0, 500]] # 聚集检测区域 - off_duty_threshold_sec: 300 # 离岗超时告警(秒) - on_duty_confirm_sec: 5 # 上岗确认时间(秒) process_every_n_frames: 5 - off_duty_confirm_sec: 30 # 离岗确认时间(秒) - crowd_threshold: 5 # 聚集人数阈值(最低触发) - + rois: + - name: "离岗检测区域" + points: [[380, 50], [530, 100], [550, 550], [140, 420]] + algorithms: + - name: "人员离岗" + enabled: true + off_duty_threshold_sec: 300 # 离岗超时告警(秒) + on_duty_confirm_sec: 5 # 上岗确认时间(秒) + off_duty_confirm_sec: 30 # 离岗确认时间(秒) + - name: "周界入侵" + enabled: false + # - name: "周界入侵区域1" + # points: [[100, 100], [200, 100], [200, 300], [100, 300]] + # algorithms: + # - name: "人员离岗" + # enabled: false + # - name: "周界入侵" + # enabled: false - id: "cam_02" rtsp_url: "rtsp://admin:admin@172.16.8.13:554/cam/realmonitor?channel=7&subtype=1" - roi_points: [ [ 380, 50 ], [ 530, 100 ], [ 550, 550 ], [ 140, 420 ] ] # 离岗检测区域 - crowd_roi_points: [ [ 220, 50 ], [ 380, 60 ], [ 180, 525 ], [ 0, 500 ] ] # 聚集检测区域 - off_duty_threshold_sec: 600 - on_duty_confirm_sec: 10 - off_duty_confirm_sec: 20 - crowd_threshold: 3 - + rois: + - name: "离岗检测区域" + points: [[380, 50], [530, 100], [550, 550], [140, 420]] + algorithms: + - name: "人员离岗" + enabled: true + off_duty_threshold_sec: 600 + on_duty_confirm_sec: 10 + off_duty_confirm_sec: 20 + - name: "周界入侵" + enabled: false - id: "cam_03" rtsp_url: "rtsp://admin:admin@172.16.8.26:554/cam/realmonitor?channel=3&subtype=1" - roi_points: [ [ 380, 50 ], [ 530, 100 ], [ 550, 550 ], [ 140, 420 ] ] # 离岗检测区域 - crowd_roi_points: [ [ 220, 50 ], [ 380, 60 ], [ 180, 525 ], [ 0, 500 ] ] # 聚集检测区域 - off_duty_threshold_sec: 600 - on_duty_confirm_sec: 10 - off_duty_confirm_sec: 20 - crowd_threshold: 3 \ No newline at end of file + rois: + - name: "离岗检测区域" + points: [[380, 50], [530, 100], [550, 550], [140, 420]] + algorithms: + - name: "人员离岗" + enabled: true + off_duty_threshold_sec: 600 + on_duty_confirm_sec: 10 + off_duty_confirm_sec: 20 + - name: "周界入侵" + enabled: false + + - id: "cam_04" + rtsp_url: "rtsp://admin:admin@172.16.8.20:554/cam/realmonitor?channel=14&subtype=1" + process_every_n_frames: 5 + rois: + - name: "离岗检测区域" +# points: [ [ 380, 50 ], [ 530, 100 ], [ 550, 550 ], [ 140, 420 ] ] + algorithms: + - name: "人员离岗" + enabled: false + - name: "周界入侵" + enabled: true + - id: "cam_05" + rtsp_url: "rtsp://admin:admin@172.16.8.31:554/cam/realmonitor?channel=15&subtype=1" + process_every_n_frames: 5 + rois: + - name: "离岗检测区域" +# points: [ [ 380, 50 ], [ 530, 100 ], [ 550, 550 ], [ 140, 420 ] ] + algorithms: + - name: "人员离岗" + enabled: false # 离岗确认时间(秒) + - name: "周界入侵" + enabled: true + + - id: "cam_06" + rtsp_url: "rtsp://admin:admin@172.16.8.35:554/cam/realmonitor?channel=13&subtype=1" + process_every_n_frames: 5 + rois: + - name: "离岗检测区域" + points: [ [ 150, 100 ], [ 600, 100 ], [ 600, 500 ], [ 150, 500 ] ] + algorithms: + - name: "人员离岗" + enabled: false + - name: "周界入侵" + enabled: true \ No newline at end of file diff --git a/monitor.py b/monitor.py index d1aa976..b14053d 100644 --- a/monitor.py +++ b/monitor.py @@ -9,7 +9,299 @@ import threading import queue import sys import argparse -from collections import defaultdict +import base64 +import os +from openai import OpenAI +from io import BytesIO +from PIL import Image, ImageDraw, ImageFont + + +def save_alert_image(frame, cam_id, roi_name, alert_type, alert_info=""): + """保存告警图片 + + Args: + frame: OpenCV图像 + cam_id: 摄像头ID + roi_name: ROI区域名称 + alert_type: 告警类型 ("离岗" 或 "入侵") + alert_info: 告警信息(可选) + """ + try: + # 创建文件夹结构 + data_dir = "data" + alert_dir = os.path.join(data_dir, alert_type) + + os.makedirs(alert_dir, exist_ok=True) + + # 生成文件名:摄像头ID_ROI名称_时间戳.jpg + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + # 清理文件名中的特殊字符 + safe_roi_name = roi_name.replace("/", "_").replace("\\", "_").replace(":", "_") + filename = f"{cam_id}_{safe_roi_name}_{timestamp}.jpg" + filepath = os.path.join(alert_dir, filename) + + # 保存图片 + cv2.imwrite(filepath, frame) + print(f"[{cam_id}] 💾 告警图片已保存: {filepath}") + + # 如果有告警信息,保存到文本文件 + if alert_info: + info_filepath = filepath.replace(".jpg", ".txt") + with open(info_filepath, 'w', encoding='utf-8') as f: + f.write(f"告警时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + f.write(f"摄像头ID: {cam_id}\n") + f.write(f"ROI区域: {roi_name}\n") + f.write(f"告警类型: {alert_type}\n") + f.write(f"告警信息:\n{alert_info}\n") + + return filepath + except Exception as e: + print(f"[{cam_id}] 保存告警图片失败: {e}") + return None + + +def put_chinese_text(img, text, position, font_size=20, color=(255, 255, 255), thickness=1): + """在OpenCV图像上绘制中文文本 + + Args: + img: OpenCV图像 (BGR格式) + text: 要显示的文本(支持中文) + position: 文本位置 (x, y) + font_size: 字体大小 + color: 颜色 (BGR格式,会被转换为RGB) + thickness: 线条粗细(PIL不支持,保留参数以兼容) + """ + try: + # 将OpenCV图像转换为PIL图像 + img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) + draw = ImageDraw.Draw(img_pil) + + # 转换颜色格式:BGR -> RGB + color_rgb = (color[2], color[1], color[0]) + + # 尝试使用系统字体 + font = None + font_paths = [ + "C:/Windows/Fonts/simhei.ttf", # 黑体 + "C:/Windows/Fonts/msyh.ttc", # 微软雅黑 + "C:/Windows/Fonts/simsun.ttc", # 宋体 + "C:/Windows/Fonts/msyhbd.ttc", # 微软雅黑 Bold + ] + + for font_path in font_paths: + if os.path.exists(font_path): + try: + font = ImageFont.truetype(font_path, font_size) + break + except: + continue + + # 如果找不到字体,使用默认字体 + if font is None: + font = ImageFont.load_default() + + # 绘制文本 + draw.text(position, text, font=font, fill=color_rgb) + + # 转换回OpenCV格式 + img = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR) + return img + except Exception as e: + # 如果绘制失败,使用英文替代或直接返回原图 + print(f"中文文本绘制失败: {e},使用OpenCV默认字体") + # 降级方案:使用OpenCV绘制(可能显示为问号,但至少不会崩溃) + cv2.putText(img, text.encode('utf-8').decode('latin-1', 'ignore'), position, + cv2.FONT_HERSHEY_SIMPLEX, font_size/40, color, thickness) + return img + + +class LLMClient: + """大模型客户端,用于人员判断和离岗分析""" + def __init__(self, api_key, base_url, model_name): + self.client = OpenAI( + api_key=api_key, + base_url=base_url, + ) + self.model_name = model_name + + def frame_to_base64(self, frame): + """将OpenCV帧转换为base64编码""" + _, buffer = cv2.imencode('.jpg', frame) + img_base64 = base64.b64encode(buffer).decode('utf-8') + return img_base64 + + def check_if_staff(self, frame, cam_id, roi_name): + """判断ROI中的人员是否为工作人员""" + try: + img_base64 = self.frame_to_base64(frame) + prompt = f"""你是一个智能安防辅助系统,负责对监控画面中指定敏感区域(如高配间门口、天台、禁行通道)的人员活动进行分析。 + +请根据以下规则生成结构化响应: + +### 【判定标准】 +✅ **本单位物业员工**需同时满足: +1. **清晰可见的正式工牌**(胸前佩戴,含照片/姓名/公司LOGO) +2. **穿着标准制服**(如:黄蓝工程服、白衬衫+黑领带、蓝色清洁装、浅色客服装等) +3. **行为符合岗位规范**(如巡检、维修、清洁,无徘徊、张望、翻越) + +> ⚠️ 仅满足部分条件(如仅有安全帽、仅有类似颜色衣服、工牌不可见)→ 视为非员工。 + +### 【输出规则】 +#### 情况1:ROI区域内**无人** +→ 输出: +🟢无异常:敏感区域当前无人员活动。 +[客观描述:画面整体状态] + +#### 情况2:ROI区域内**有本单位员工** +→ 输出: +🟢无异常:检测到本单位工作人员正常作业。 +[客观描述:人数+制服类型+工牌状态+行为] + +#### 情况3:ROI区域内**有非员工或身份不明人员** +→ 输出: +🚨[区域类型]入侵告警:检测到疑似非工作人员,请立即核查。 +[客观描述:人数+衣着+工牌状态+位置+行为] + +### 【描述要求】 +- 所有描述必须**≤30字** +- 仅陈述**可观察事实**,禁止主观推测(如"意图破坏""形迹可疑") +- 使用简洁、标准化语言 + +### 【示例】 +▶ 示例1(无人): +🟢无异常:敏感区域当前无人员活动。 +高配间门口区域空旷,无人员进入。 + +▶ 示例2(员工): +🟢无异常:检测到本单位工作人员正常作业。 +1名工程人员穿黄蓝工服佩戴工牌在高配间巡检。 + +▶ 示例3(非员工): +🚨天台区域入侵告警:检测到疑似非工作人员,请立即核查。 +1人穿黑色外套未佩戴工牌进入天台区域并短暂停留。 + +--- +请分析摄像头{cam_id}的{roi_name}区域,按照上述格式输出结果。""" + + response = self.client.chat.completions.create( + model=self.model_name, + messages=[ + { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{img_base64}" + } + }, + {"type": "text", "text": prompt} + ] + } + ] + ) + result_text = response.choices[0].message.content + + # 判断是否为工作人员(根据新的输出格式) + # 如果包含"🟢无异常"且包含"工作人员",则为员工 + # 如果包含"🚨"或"入侵告警"或"非工作人员",则为非员工 + is_staff = False + if "🟢无异常" in result_text and "工作人员" in result_text: + is_staff = True + elif "🚨" in result_text or "入侵告警" in result_text or "非工作人员" in result_text: + is_staff = False + elif "无人员活动" in result_text or "无人" in result_text: + is_staff = None # 无人情况 + + return is_staff, result_text + except Exception as e: + print(f"[{cam_id}] 大模型调用失败: {e}") + return None, str(e) + + def analyze_off_duty_duration(self, key_frames_info, cam_id): + """分析离岗时长并判断是否为同一人""" + try: + frames = key_frames_info.get('frames', []) + if not frames: + return False, False, "无关键帧" + + off_duty_duration = key_frames_info.get('off_duty_duration', 0) + duration_minutes = int(off_duty_duration / 60) + duration_seconds = int(off_duty_duration % 60) + + # 构建消息内容 + content_parts = [ + { + "type": "text", + "text": f"""请分析以下关键帧图像,判断人员离岗情况。请按照以下格式简洁回答: + +【输出格式】 +1. 是否告警:[是/否] +2. 离岗时间:{duration_minutes}分{duration_seconds}秒 +3. 是否为同一人:[是/否/无法确定] +4. 简要分析:[一句话概括,不超过30字] + +要求: +- 如果离岗时间超过6分钟且确认为同一人,则告警 +- 简要分析需客观描述关键帧中人员的特征和行为变化 +- 回答要简洁明了,避免冗余描述 + +关键帧信息:""" + } + ] + + # 添加图像和说明 + for i, frame_info in enumerate(frames): + img_base64 = self.frame_to_base64(frame_info['frame']) + content_parts.append({ + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{img_base64}" + } + }) + content_parts.append({ + "type": "text", + "text": f"关键帧{i+1} - 时间: {frame_info['time']}, 事件: {frame_info['event']}" + }) + + response = self.client.chat.completions.create( + model=self.model_name, + messages=[ + { + "role": "user", + "content": content_parts + } + ] + ) + result = response.choices[0].message.content + + # 解析结果 - 更灵活的解析逻辑 + # 判断是否告警 + exceeds_6min = False + if duration_minutes >= 6: + # 如果时间已经超过6分钟,检查大模型是否确认告警 + if any(keyword in result for keyword in ["是否告警:是", "是否告警:是", "告警:是", "告警:是", "需要告警", "应告警"]): + exceeds_6min = True + elif "是否告警:否" not in result and "是否告警:否" not in result: + # 如果没有明确说否,且时间超过6分钟,默认告警 + exceeds_6min = True + else: + # 时间未超过6分钟,即使大模型说告警也不告警 + exceeds_6min = False + + # 判断是否为同一人 + is_same_person = False + if any(keyword in result for keyword in ["是否为同一人:是", "是否为同一人:是", "同一人:是", "同一人:是", "是同一人", "确认为同一人"]): + is_same_person = True + elif any(keyword in result for keyword in ["是否为同一人:否", "是否为同一人:否", "同一人:否", "同一人:否", "不是同一人", "非同一人"]): + is_same_person = False + elif "无法确定" in result or "不确定" in result: + is_same_person = False # 无法确定时,不告警 + + return exceeds_6min, is_same_person, result + except Exception as e: + print(f"[{cam_id}] 离岗分析失败: {e}") + return None, None, str(e) class ThreadedFrameReader: @@ -66,25 +358,42 @@ class MultiCameraMonitor: self.imgsz = model_cfg['imgsz'] self.conf_thresh = model_cfg['conf_threshold'] + # === 初始化大模型客户端 === + llm_cfg = self.cfg.get('llm', {}) + if llm_cfg.get('api_key'): + self.llm_client = LLMClient( + llm_cfg['api_key'], + llm_cfg['base_url'], + llm_cfg.get('model_name', 'qwen-vl-max') + ) + print("✅ 大模型客户端已初始化") + else: + self.llm_client = None + print("⚠️ 未配置大模型API密钥,大模型功能将不可用") + # === 初始化所有摄像头 === self.common = self.cfg['common'] self.cameras = {} self.frame_readers = {} self.queues = {} # cam_id -> queue for detection results + self.perimeter_queues = {} # cam_id -> queue for perimeter detection (每秒抽帧) for cam_cfg in self.cfg['cameras']: cam_id = cam_cfg['id'] - self.cameras[cam_id] = CameraLogic(cam_id, cam_cfg, self.common) + self.cameras[cam_id] = CameraLogic(cam_id, cam_cfg, self.common, self.llm_client) self.frame_readers[cam_id] = ThreadedFrameReader(cam_id, cam_cfg['rtsp_url']) - self.queues[cam_id] = queue.Queue(maxsize=1) # 存放检测结果 + self.queues[cam_id] = queue.Queue(maxsize=1) # 存放检测结果(人员离岗) + self.perimeter_queues[cam_id] = queue.Queue(maxsize=1) # 存放检测结果(周界入侵) # === 控制信号 === self.running = True self.inference_thread = threading.Thread(target=self._inference_loop, daemon=True) + self.perimeter_thread = threading.Thread(target=self._perimeter_inference_loop, daemon=True) self.inference_thread.start() + self.perimeter_thread.start() def _inference_loop(self): - """统一推理线程:轮询各摄像头最新帧,逐个推理""" + """统一推理线程:轮询各摄像头最新帧,逐个推理(用于人员离岗)""" while self.running: processed = False for cam_id, reader in self.frame_readers.items(): @@ -92,12 +401,14 @@ class MultiCameraMonitor: if not ret: continue - # 获取该摄像头是否需要处理此帧 cam_logic = self.cameras[cam_id] if cam_logic.should_skip_frame(): continue - # 执行推理 + # 检查是否有ROI启用了人员离岗算法 + if not cam_logic.has_enabled_algorithm('人员离岗'): + continue + results = self.model( frame, imgsz=self.imgsz, @@ -105,30 +416,81 @@ class MultiCameraMonitor: verbose=False, device=self.device, half=self.use_half, - classes=[0] # person + classes=[0] # person only ) - # 将结果送回对应摄像头逻辑 if not self.queues[cam_id].full(): self.queues[cam_id].put((frame.copy(), results[0])) processed = True if not processed: time.sleep(0.01) + + def _perimeter_inference_loop(self): + """周界入侵推理线程:每秒抽一帧进行检测""" + while self.running: + processed = False + for cam_id, reader in self.frame_readers.items(): + cam_logic = self.cameras[cam_id] + # 检查是否有ROI启用了周界入侵算法 + if not cam_logic.has_enabled_algorithm('周界入侵'): + continue + + ret, frame = reader.read() + if not ret: + continue + + # 每秒抽一帧 + current_time = time.time() + if not hasattr(cam_logic, 'last_perimeter_check_time'): + cam_logic.last_perimeter_check_time = {} + if cam_id not in cam_logic.last_perimeter_check_time: + cam_logic.last_perimeter_check_time[cam_id] = 0 + + if current_time - cam_logic.last_perimeter_check_time[cam_id] < 1.0: + continue + + cam_logic.last_perimeter_check_time[cam_id] = current_time + + results = self.model( + frame, + imgsz=self.imgsz, + conf=self.conf_thresh, + verbose=False, + device=self.device, + half=self.use_half, + classes=[0] # person only + ) + + if not self.perimeter_queues[cam_id].full(): + self.perimeter_queues[cam_id].put((frame.copy(), results[0])) + processed = True + + if not processed: + time.sleep(0.1) def run(self): """启动所有摄像头的显示和告警逻辑(主线程)""" try: while self.running: for cam_id, cam_logic in self.cameras.items(): + # 处理人员离岗检测结果 if not self.queues[cam_id].empty(): frame, results = self.queues[cam_id].get() - cam_logic.process(frame, results) - # 让 OpenCV 刷新所有窗口 + cam_logic.process_off_duty(frame, results) + + # 处理周界入侵检测结果 + if not self.perimeter_queues[cam_id].empty(): + frame, results = self.perimeter_queues[cam_id].get() + cam_logic.process_perimeter(frame, results) + + # 更新显示 + cam_logic.update_display() + key = cv2.waitKey(1) & 0xFF - if key == ord('q'): # 可选:按 q 退出 + if key == ord('q'): break - time.sleep(0.01) # 避免 CPU 占用过高 + time.sleep(0.01) except KeyboardInterrupt: pass finally: @@ -141,140 +503,513 @@ class MultiCameraMonitor: cv2.destroyAllWindows() -class CameraLogic: - def __init__(self, cam_id, cam_cfg, common_cfg): +class ROILogic: + """单个ROI区域的逻辑处理""" + def __init__(self, roi_cfg, cam_id, common_cfg, llm_client): self.cam_id = cam_id - self.roi_off_duty = np.array(cam_cfg['roi_points'], dtype=np.int32) - self.roi_crowd = np.array(cam_cfg['crowd_roi_points'], dtype=np.int32) - self.off_duty_threshold_sec = cam_cfg['off_duty_threshold_sec'] - self.on_duty_confirm_sec = cam_cfg['on_duty_confirm_sec'] - self.off_duty_confirm_sec = cam_cfg['off_duty_confirm_sec'] - self.crowd_threshold_min = cam_cfg['crowd_threshold'] + self.roi_name = roi_cfg.get('name', '未命名区域') + self.llm_client = llm_client + + # 处理points:如果不存在或为空,设置为None(表示使用整张画面) + if 'points' in roi_cfg and roi_cfg['points']: + self.roi_points = np.array(roi_cfg['points'], dtype=np.int32) + self.use_full_frame = False + else: + # 对于周界入侵算法,如果没有points,使用整张画面 + self.roi_points = None + self.use_full_frame = True + + # 算法配置 + self.algorithms = {} + for alg_cfg in roi_cfg.get('algorithms', []): + alg_name = alg_cfg['name'] + if alg_cfg.get('enabled', False): + self.algorithms[alg_name] = alg_cfg + + # 人员离岗相关状态(需要ROI,如果没有points则不能启用) + if '人员离岗' in self.algorithms: + if self.roi_points is None: + print(f"[{cam_id}] 警告:{self.roi_name} 启用了人员离岗算法但没有配置points,已禁用") + del self.algorithms['人员离岗'] + else: + alg_cfg = self.algorithms['人员离岗'] + self.off_duty_threshold_sec = alg_cfg.get('off_duty_threshold_sec', 300) + self.on_duty_confirm_sec = alg_cfg.get('on_duty_confirm_sec', 5) + self.off_duty_confirm_sec = alg_cfg.get('off_duty_confirm_sec', 30) + + self.is_on_duty = False + self.is_off_duty = True + self.on_duty_start_time = None + self.last_no_person_time = None + self.off_duty_timer_start = None + self.last_alert_time = 0 + self.last_person_seen_time = None + + # 关键时间记录 + self.on_duty_confirm_time = None # 上岗确认时间 + self.off_duty_confirm_time = None # 离岗确认时间 + self.key_frames = [] # 关键帧存储 + + # 初始化状态跟踪 + self.initial_state_start_time = None # 初始化状态开始时间(进入工作时间时) + self.has_ever_seen_person = False # 是否曾经检测到过人员 + self.initial_state_frame = None # 初始化状态时的帧(用于大模型分析) + + # 周界入侵相关状态(如果没有points,使用整张画面) + if '周界入侵' in self.algorithms: + self.perimeter_last_check_time = 0 + self.perimeter_alert_cooldown = 60 # 周界入侵告警冷却60秒 + if self.use_full_frame: + print(f"[{cam_id}] 提示:{self.roi_name} 周界入侵算法将使用整张画面进行检测") + + def is_point_in_roi(self, x, y): + """判断点是否在ROI内,如果没有ROI(use_full_frame=True),总是返回True""" + if self.use_full_frame or self.roi_points is None: + return True + return cv2.pointPolygonTest(self.roi_points, (int(x), int(y)), False) >= 0 - self.working_hours = common_cfg['working_hours'] + +class CameraLogic: + def __init__(self, cam_id, cam_cfg, common_cfg, llm_client): + self.cam_id = cam_id + self.llm_client = llm_client + + # 工作时间段配置 + self.working_hours = common_cfg.get('working_hours', [[8, 30, 11, 0], [12, 0, 17, 30]]) self.process_every_n = cam_cfg.get('process_every_n_frames', common_cfg['process_every_n_frames']) - self.alert_cooldown_sec = cam_cfg.get('alert_cooldown_sec', common_cfg['alert_cooldown_sec']) - self.crowd_cooldown_sec = cam_cfg.get('crowd_cooldown_sec', common_cfg['crowd_cooldown_sec']) - + self.alert_cooldown_sec = common_cfg.get('alert_cooldown_sec', 300) + self.off_duty_alert_threshold_sec = common_cfg.get('off_duty_alert_threshold_sec', 360) # 6分钟 + + # 初始化所有ROI + self.rois = [] + for roi_cfg in cam_cfg.get('rois', []): + self.rois.append(ROILogic(roi_cfg, cam_id, common_cfg, llm_client)) + + # 兼容旧配置格式 + if 'roi_points' in cam_cfg: + # 创建默认ROI用于人员离岗 + default_roi = { + 'name': '离岗检测区域', + 'points': cam_cfg['roi_points'], + 'algorithms': [{ + 'name': '人员离岗', + 'enabled': True, + 'off_duty_threshold_sec': cam_cfg.get('off_duty_threshold_sec', 300), + 'on_duty_confirm_sec': cam_cfg.get('on_duty_confirm_sec', 5), + 'off_duty_confirm_sec': cam_cfg.get('off_duty_confirm_sec', 30) + }] + } + self.rois.append(ROILogic(default_roi, cam_id, common_cfg, llm_client)) + self.frame_count = 0 - self.is_on_duty = False - self.is_off_duty = True - self.on_duty_start_time = None - self.last_no_person_time = None - self.off_duty_timer_start = None - self.last_alert_time = 0 - self.last_crowd_alert_time = 0 - self.crowd_history = [] - self.last_person_seen_time = None + self.display_frame = None # 用于显示的帧 + self.display_results = None # 用于显示的检测结果(YOLO results) def should_skip_frame(self): self.frame_count += 1 return self.frame_count % self.process_every_n != 0 - - def is_point_in_roi(self, x, y, roi): - return cv2.pointPolygonTest(roi, (int(x), int(y)), False) >= 0 - + + def has_enabled_algorithm(self, alg_name): + """检查是否有ROI启用了指定算法""" + return any(alg_name in roi.algorithms for roi in self.rois) + def in_working_hours(self): + """判断是否在工作时间段内""" now = datetime.datetime.now() - h = now.hour - start_h, end_h = self.working_hours - return start_h <= h < end_h or (start_h == end_h == 0) - - def detect_crowd_count(self, boxes, roi): - count = 0 - for box in boxes: - x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() - cx, cy = (x1 + x2) / 2, (y1 + y2) / 2 - if self.is_point_in_roi(cx, cy, roi): - count += 1 - return count - - def process(self, frame, results): + h, m = now.hour, now.minute + current_minutes = h * 60 + m + + for period in self.working_hours: + start_h, start_m, end_h, end_m = period + start_minutes = start_h * 60 + start_m + end_minutes = end_h * 60 + end_m + if start_minutes <= current_minutes < end_minutes: + return True + return False + + def is_edge_time(self): + """判断是否为边缘时间段(8:30-9:00, 11:00-12:00, 17:30-18:00)""" + now = datetime.datetime.now() + h, m = now.hour, now.minute + current_minutes = h * 60 + m + + edge_periods = [ + (8 * 60 + 30, 9 * 60), # 8:30-9:00 + (11 * 60, 12 * 60), # 11:00-12:00 + (17 * 60 + 30, 18 * 60) # 17:30-18:00 + ] + + for start, end in edge_periods: + if start <= current_minutes < end: + return True + return False + + def get_end_of_work_time(self): + """获取当天工作结束时间(17:30)""" + now = datetime.datetime.now() + end_time = now.replace(hour=17, minute=30, second=0, microsecond=0) + if now > end_time: + # 如果已经过了17:30,返回明天的17:30 + end_time += datetime.timedelta(days=1) + return end_time + + def process_off_duty(self, frame, results): + """处理人员离岗检测""" current_time = time.time() now = datetime.datetime.now() - in_work = self.in_working_hours() boxes = results.boxes - - # === 离岗逻辑 === - roi_has_person_raw = any( - self.is_point_in_roi((b.xyxy[0][0] + b.xyxy[0][2]) / 2, - (b.xyxy[0][1] + b.xyxy[0][3]) / 2, - self.roi_off_duty) - for b in boxes - ) - - if in_work: - if roi_has_person_raw: - self.last_person_seen_time = current_time - effective = ( - self.last_person_seen_time is not None and - (current_time - self.last_person_seen_time) < 1.0 # grace period + + for roi in self.rois: + if '人员离岗' not in roi.algorithms: + continue + + # 检查ROI中是否有人 + roi_has_person = any( + roi.is_point_in_roi((b.xyxy[0][0] + b.xyxy[0][2]) / 2, + (b.xyxy[0][1] + b.xyxy[0][3]) / 2) + for b in boxes ) - - if effective: - self.last_no_person_time = None - if self.is_off_duty: - if self.on_duty_start_time is None: - self.on_duty_start_time = current_time - elif current_time - self.on_duty_start_time >= self.on_duty_confirm_sec: - self.is_on_duty, self.is_off_duty = True, False - self.on_duty_start_time = None - print(f"[{self.cam_id}] ✅ 上岗确认成功 ({now.strftime('%H:%M:%S')})") - else: - self.on_duty_start_time = None - self.last_person_seen_time = None - if not self.is_off_duty: - if self.last_no_person_time is None: - self.last_no_person_time = current_time - elif current_time - self.last_no_person_time >= self.off_duty_confirm_sec: - self.is_off_duty, self.is_on_duty = True, False - self.last_no_person_time = None - self.off_duty_timer_start = current_time - print(f"[{self.cam_id}] 🚪 进入离岗计时") - - # === 聚集检测 === - crowd_count = self.detect_crowd_count(boxes, self.roi_crowd) - self.crowd_history.append((current_time, crowd_count)) - self.crowd_history = [(t, c) for t, c in self.crowd_history if current_time - t <= 300] - - if in_work and crowd_count >= self.crowd_threshold_min: - recent = [c for t, c in self.crowd_history[-10:]] - if len(recent) >= 5 and sum(c >= self.crowd_threshold_min for c in recent[-5:]) >= 4: - if current_time - self.last_crowd_alert_time >= self.crowd_cooldown_sec: - print(f"[{self.cam_id}] 🚨 聚集告警:{crowd_count}人") - self.last_crowd_alert_time = current_time - - # === 离岗告警 === - if in_work and self.is_off_duty and self.off_duty_timer_start: - if (current_time - self.off_duty_timer_start) >= self.off_duty_threshold_sec: - if (current_time - self.last_alert_time) >= self.alert_cooldown_sec: - print(f"[{self.cam_id}] 🚨 离岗告警!") - self.last_alert_time = current_time - - # === 可视化 === - vis = results.plot() - overlay = vis.copy() - cv2.fillPoly(overlay, [self.roi_off_duty], (0, 255, 0)) - cv2.fillPoly(overlay, [self.roi_crowd], (0, 0, 255)) - cv2.addWeighted(overlay, 0.2, vis, 0.8, 0, vis) - - status = "OUT OF HOURS" - color = (128, 128, 128) - if in_work: - if self.is_on_duty: - status, color = "ON DUTY", (0, 255, 0) - elif self.is_off_duty: - if self.off_duty_timer_start: - elapsed = int(current_time - self.off_duty_timer_start) - if elapsed >= self.off_duty_threshold_sec: - status, color = "OFF DUTY!", (0, 0, 255) - else: - status = f"IDLE - {elapsed}s" - color = (0, 255, 255) + + in_work = self.in_working_hours() + is_edge = self.is_edge_time() + + if in_work: + # 初始化状态跟踪:如果刚进入工作时间,记录开始时间 + if roi.initial_state_start_time is None: + roi.initial_state_start_time = current_time + roi.has_ever_seen_person = False + roi.initial_state_frame = frame.copy() # 保存初始化状态时的帧 + + if roi_has_person: + roi.last_person_seen_time = current_time + roi.has_ever_seen_person = True + # 如果检测到人员,清除初始化状态 + if roi.initial_state_start_time is not None: + roi.initial_state_start_time = None + roi.initial_state_frame = None + + effective = ( + roi.last_person_seen_time is not None and + (current_time - roi.last_person_seen_time) < 1.0 + ) + + # 处理初始化状态:如果系统启动时没有人,且超过离岗确认时间 + if not roi.has_ever_seen_person and roi.initial_state_start_time is not None: + elapsed_since_start = current_time - roi.initial_state_start_time + if elapsed_since_start >= roi.off_duty_confirm_sec: + # 超过离岗确认时间,触发离岗确认逻辑 + roi.is_off_duty, roi.is_on_duty = True, False + roi.off_duty_confirm_time = roi.initial_state_start_time + roi.off_duty_confirm_sec # 使用离岗确认时间点 + roi.off_duty_timer_start = current_time + + # 保存关键帧(使用初始化状态时的帧作为离岗确认帧) + if roi.initial_state_frame is not None: + roi.key_frames.append({ + 'frame': roi.initial_state_frame.copy(), + 'time': datetime.datetime.fromtimestamp(roi.off_duty_confirm_time).strftime('%Y-%m-%d %H:%M:%S'), + 'event': '离岗确认(初始化状态)' + }) + # 也保存当前帧 + roi.key_frames.append({ + 'frame': frame.copy(), + 'time': now.strftime('%Y-%m-%d %H:%M:%S'), + 'event': '当前状态' + }) + + print(f"[{self.cam_id}] [{roi.roi_name}] 🚪 初始化状态:超过离岗确认时间,进入离岗计时 ({now.strftime('%H:%M:%S')})") + roi.initial_state_start_time = None # 清除初始化状态标记 + roi.initial_state_frame = None + + if effective: + roi.last_no_person_time = None + if roi.is_off_duty: + if roi.on_duty_start_time is None: + roi.on_duty_start_time = current_time + elif current_time - roi.on_duty_start_time >= roi.on_duty_confirm_sec: + roi.is_on_duty, roi.is_off_duty = True, False + roi.on_duty_confirm_time = current_time + roi.on_duty_start_time = None + + # 保存关键帧 + roi.key_frames.append({ + 'frame': frame.copy(), + 'time': now.strftime('%Y-%m-%d %H:%M:%S'), + 'event': '上岗确认' + }) + + print(f"[{self.cam_id}] [{roi.roi_name}] ✅ 上岗确认成功 ({now.strftime('%H:%M:%S')})") else: - status, color = "OFF DUTY", (255, 0, 0) - - cv2.putText(vis, f"[{self.cam_id}] {status}", (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2) + roi.on_duty_start_time = None + roi.last_person_seen_time = None + if not roi.is_off_duty: + if roi.last_no_person_time is None: + roi.last_no_person_time = current_time + elif current_time - roi.last_no_person_time >= roi.off_duty_confirm_sec: + roi.is_off_duty, roi.is_on_duty = True, False + roi.off_duty_confirm_time = current_time + roi.last_no_person_time = None + roi.off_duty_timer_start = current_time + + # 保存关键帧 + roi.key_frames.append({ + 'frame': frame.copy(), + 'time': now.strftime('%Y-%m-%d %H:%M:%S'), + 'event': '离岗确认' + }) + + print(f"[{self.cam_id}] [{roi.roi_name}] 🚪 进入离岗计时 ({now.strftime('%H:%M:%S')})") + + # 离岗告警逻辑(边缘时间只记录,不告警) + if roi.is_off_duty and roi.off_duty_timer_start: + elapsed = current_time - roi.off_duty_timer_start + off_duty_duration = elapsed + + # 如果到了下班时间还没回来,计算到下班时间的离岗时长 + end_time = self.get_end_of_work_time() + if now >= end_time and roi.off_duty_confirm_time: + # 计算离岗时长:下班时间 - 离岗确认时间 + off_duty_duration = (end_time.timestamp() - roi.off_duty_confirm_time) + + # 超过6分钟且不在边缘时间,使用大模型判断 + if off_duty_duration >= self.off_duty_alert_threshold_sec and not is_edge: + # 对于初始化状态,即使只有1帧也要进行分析 + is_initial_state = any('初始化状态' in f.get('event', '') for f in roi.key_frames) + min_frames_required = 1 if is_initial_state else 2 + + if self.llm_client and len(roi.key_frames) >= min_frames_required: + # 限制关键帧数量,只保留最近10帧 + if len(roi.key_frames) > 10: + roi.key_frames = roi.key_frames[-10:] + + # 准备关键帧信息 + key_frames_info = { + 'frames': roi.key_frames[-5:] if len(roi.key_frames) >= 2 else roi.key_frames, # 如果有足够帧,取最近5帧;否则全部使用 + 'off_duty_duration': off_duty_duration + } + + # 调用大模型分析 + exceeds_6min, is_same_person, analysis_result = self.llm_client.analyze_off_duty_duration( + key_frames_info, self.cam_id + ) + + # 对于初始化状态,只要超过6分钟就告警(因为无法判断是否为同一人) + if is_initial_state: + should_alert = exceeds_6min if exceeds_6min is not None else (off_duty_duration >= self.off_duty_alert_threshold_sec) + else: + should_alert = exceeds_6min and is_same_person + + if should_alert: + if (current_time - roi.last_alert_time) >= self.alert_cooldown_sec: + print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 离岗告警!离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)") + print(f"大模型分析结果: {analysis_result}") + # 保存告警图片 + save_alert_image( + frame.copy(), + self.cam_id, + roi.roi_name, + "离岗", + f"离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)\n大模型分析结果:\n{analysis_result}" + ) + roi.last_alert_time = current_time + elif not is_edge: + # 如果没有大模型,直接告警 + if (current_time - roi.last_alert_time) >= self.alert_cooldown_sec: + print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 离岗告警!离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)") + # 保存告警图片 + save_alert_image( + frame.copy(), + self.cam_id, + roi.roi_name, + "离岗", + f"离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)" + ) + roi.last_alert_time = current_time + elif is_edge and roi.off_duty_confirm_time: + # 边缘时间只记录,不告警 + print(f"[{self.cam_id}] [{roi.roi_name}] ℹ️ 边缘时间段,记录离岗时长: {int(off_duty_duration)}秒") + + self.display_frame = frame.copy() + self.display_results = results # 保存检测结果用于显示 + + def crop_roi(self, frame, roi_points): + """裁剪ROI区域,如果roi_points为None,返回整张画面""" + if roi_points is None: + return frame.copy() + + x_coords = roi_points[:, 0] + y_coords = roi_points[:, 1] + x_min, x_max = int(x_coords.min()), int(x_coords.max()) + y_min, y_max = int(y_coords.min()), int(y_coords.max()) + + # 确保坐标在图像范围内 + h, w = frame.shape[:2] + x_min = max(0, x_min) + y_min = max(0, y_min) + x_max = min(w, x_max) + y_max = min(h, y_max) + + roi_frame = frame[y_min:y_max, x_min:x_max] + + # 创建掩码 + mask = np.zeros(frame.shape[:2], dtype=np.uint8) + cv2.fillPoly(mask, [roi_points], 255) + mask_roi = mask[y_min:y_max, x_min:x_max] + + # 应用掩码 + if len(roi_frame.shape) == 3: + mask_roi = cv2.cvtColor(mask_roi, cv2.COLOR_GRAY2BGR) + roi_frame = cv2.bitwise_and(roi_frame, mask_roi) + + return roi_frame + + def process_perimeter(self, frame, results): + """处理周界入侵检测""" + current_time = time.time() + boxes = results.boxes + + for roi in self.rois: + if '周界入侵' not in roi.algorithms: + continue + + # 检查ROI中是否有人(如果没有ROI,检查整张画面是否有人) + if roi.use_full_frame: + # 使用整张画面,只要检测到人就触发 + roi_has_person = len(boxes) > 0 + else: + # 检查ROI中是否有人 + roi_has_person = any( + roi.is_point_in_roi((b.xyxy[0][0] + b.xyxy[0][2]) / 2, + (b.xyxy[0][1] + b.xyxy[0][3]) / 2) + for b in boxes + ) + + if roi_has_person: + # 冷却时间检查 + if current_time - roi.perimeter_last_check_time >= roi.perimeter_alert_cooldown: + roi.perimeter_last_check_time = current_time + + # 裁剪ROI区域(如果没有ROI,使用整张画面) + roi_frame = self.crop_roi(frame, roi.roi_points) + + # 调用大模型判断是否为工作人员 + if self.llm_client: + is_staff, result = self.llm_client.check_if_staff(roi_frame, self.cam_id, roi.roi_name) + area_desc = "整张画面" if roi.use_full_frame else roi.roi_name + + if is_staff is None: + # 无人情况 + print(f"[{self.cam_id}] [{roi.roi_name}] ℹ️ 大模型判断:{result}") + elif not is_staff: + # 非工作人员 + print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 周界入侵告警!检测到非工作人员(检测区域:{area_desc})") + print(f"大模型判断结果: {result}") + # 保存告警图片 + save_alert_image( + frame.copy(), + self.cam_id, + roi.roi_name, + "入侵", + f"检测区域: {area_desc}\n大模型判断结果:\n{result}" + ) + else: + # 工作人员 + print(f"[{self.cam_id}] [{roi.roi_name}] ℹ️ 检测到工作人员,无需告警") + print(f"大模型判断结果: {result}") + else: + # 没有大模型时,直接告警 + area_desc = "整张画面" if roi.use_full_frame else roi.roi_name + print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 周界入侵告警!检测到人员进入(检测区域:{area_desc})") + # 保存告警图片 + save_alert_image( + frame.copy(), + self.cam_id, + roi.roi_name, + "入侵", + f"检测区域: {area_desc}\n检测到人员进入" + ) + + self.display_frame = frame.copy() + self.display_results = results # 保存检测结果用于显示 + + def update_display(self): + """更新显示""" + if self.display_frame is None: + return + + vis = self.display_frame.copy() + now = datetime.datetime.now() + in_work = self.in_working_hours() + + # 如果有检测结果,先绘制YOLO识别框 + if self.display_results is not None: + vis = self.display_results.plot() # 使用YOLO的plot方法绘制识别框 + + # 检查是否有启用人员离岗算法的ROI + has_off_duty_algorithm = any('人员离岗' in roi.algorithms for roi in self.rois) + + # 绘制所有ROI + full_frame_roi_count = 0 # 用于跟踪使用整张画面的ROI数量,避免文本重叠 + for roi in self.rois: + color = (0, 255, 0) # 默认绿色 + thickness = 2 + + # 根据算法状态设置颜色 + if '人员离岗' in roi.algorithms: + if roi.is_on_duty: + color = (0, 255, 0) # 绿色:在岗 + elif roi.is_off_duty and roi.off_duty_timer_start: + elapsed = time.time() - roi.off_duty_timer_start + if elapsed >= roi.off_duty_threshold_sec: + color = (0, 0, 255) # 红色:离岗告警 + else: + color = (0, 255, 255) # 黄色:离岗中 + else: + color = (255, 0, 0) # 蓝色:未在岗 + + if '周界入侵' in roi.algorithms: + color = (255, 255, 0) # 青色:周界入侵区域 + + # 如果有ROI,绘制ROI框 + if roi.roi_points is not None: + cv2.polylines(vis, [roi.roi_points], True, color, thickness) + # 创建半透明覆盖层 + overlay = vis.copy() + cv2.fillPoly(overlay, [roi.roi_points], color) + cv2.addWeighted(overlay, 0.2, vis, 0.8, 0, vis) + + # 显示ROI名称(使用中文文本绘制函数) + text_pos = tuple(roi.roi_points[0]) + vis = put_chinese_text(vis, roi.roi_name, text_pos, font_size=20, color=color, thickness=1) + else: + # 如果没有ROI(使用整张画面),在左上角显示提示,避免重叠 + display_text = f"{roi.roi_name} (整张画面)" + text_y = 30 + full_frame_roi_count * 25 # 每个ROI文本向下偏移25像素 + vis = put_chinese_text(vis, display_text, (10, text_y), font_size=18, color=color, thickness=1) + full_frame_roi_count += 1 + + # 只在启用人员离岗算法时显示岗位状态 + if has_off_duty_algorithm: + status = "OUT OF HOURS" + status_color = (128, 128, 128) + if in_work: + # 检查所有ROI的状态 + has_on_duty = any(roi.is_on_duty for roi in self.rois if '人员离岗' in roi.algorithms) + has_off_duty = any(roi.is_off_duty and roi.off_duty_timer_start + for roi in self.rois if '人员离岗' in roi.algorithms) + + if has_on_duty: + status, status_color = "ON DUTY", (0, 255, 0) + elif has_off_duty: + status, status_color = "OFF DUTY", (0, 255, 255) + else: + status, status_color = "OFF DUTY", (255, 0, 0) + + cv2.putText(vis, f"[{self.cam_id}] {status}", (20, 50), + cv2.FONT_HERSHEY_SIMPLEX, 1, status_color, 2) + + # 显示时间戳(所有摄像头都显示) + cv2.putText(vis, now.strftime('%Y-%m-%d %H:%M:%S'), (20, 90), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) cv2.imshow(f"Monitor - {self.cam_id}", vis)