Files
Security_AI_integrated/monitor.py

1137 lines
49 KiB
Python
Raw Normal View History

2026-01-12 17:38:39 +08:00
import cv2
import numpy as np
import yaml
import torch
from ultralytics import YOLO
import time
import datetime
import threading
import queue
import sys
import argparse
2026-01-14 17:12:16 +08:00
import base64
import os
from openai import OpenAI
from io import BytesIO
from PIL import Image, ImageDraw, ImageFont
def save_alert_image(frame, cam_id, roi_name, alert_type, alert_info=""):
"""保存告警图片
Args:
frame: OpenCV图像
cam_id: 摄像头ID
roi_name: ROI区域名称
alert_type: 告警类型 ("离岗" "入侵")
alert_info: 告警信息可选
"""
try:
# 创建文件夹结构
data_dir = "data"
alert_dir = os.path.join(data_dir, alert_type)
os.makedirs(alert_dir, exist_ok=True)
2026-01-20 11:39:23 +08:00
# 生成文件名:根据告警类型使用不同的命名方式
2026-01-14 17:12:16 +08:00
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# 清理文件名中的特殊字符
safe_roi_name = roi_name.replace("/", "_").replace("\\", "_").replace(":", "_")
2026-01-20 11:39:23 +08:00
# 对于入侵告警,使用告警类型+ROI名称对于离岗告警使用ROI名称
if alert_type == "入侵":
# 周界入侵:使用"入侵_区域名称"格式
filename = f"{cam_id}_入侵_{safe_roi_name}_{timestamp}.jpg"
else:
# 离岗:使用原有格式
filename = f"{cam_id}_{safe_roi_name}_{timestamp}.jpg"
2026-01-14 17:12:16 +08:00
filepath = os.path.join(alert_dir, filename)
# 保存图片
cv2.imwrite(filepath, frame)
print(f"[{cam_id}] 💾 告警图片已保存: {filepath}")
# 如果有告警信息,保存到文本文件
if alert_info:
info_filepath = filepath.replace(".jpg", ".txt")
with open(info_filepath, 'w', encoding='utf-8') as f:
f.write(f"告警时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"摄像头ID: {cam_id}\n")
f.write(f"ROI区域: {roi_name}\n")
f.write(f"告警类型: {alert_type}\n")
f.write(f"告警信息:\n{alert_info}\n")
return filepath
except Exception as e:
print(f"[{cam_id}] 保存告警图片失败: {e}")
return None
def put_chinese_text(img, text, position, font_size=20, color=(255, 255, 255), thickness=1):
"""在OpenCV图像上绘制中文文本
Args:
img: OpenCV图像 (BGR格式)
text: 要显示的文本支持中文
position: 文本位置 (x, y)
font_size: 字体大小
color: 颜色 (BGR格式会被转换为RGB)
thickness: 线条粗细PIL不支持保留参数以兼容
"""
try:
# 将OpenCV图像转换为PIL图像
img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img_pil)
# 转换颜色格式BGR -> RGB
color_rgb = (color[2], color[1], color[0])
# 尝试使用系统字体
font = None
font_paths = [
"C:/Windows/Fonts/simhei.ttf", # 黑体
"C:/Windows/Fonts/msyh.ttc", # 微软雅黑
"C:/Windows/Fonts/simsun.ttc", # 宋体
"C:/Windows/Fonts/msyhbd.ttc", # 微软雅黑 Bold
]
for font_path in font_paths:
if os.path.exists(font_path):
try:
font = ImageFont.truetype(font_path, font_size)
break
except:
continue
# 如果找不到字体,使用默认字体
if font is None:
font = ImageFont.load_default()
# 绘制文本
draw.text(position, text, font=font, fill=color_rgb)
# 转换回OpenCV格式
img = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
return img
except Exception as e:
# 如果绘制失败,使用英文替代或直接返回原图
print(f"中文文本绘制失败: {e}使用OpenCV默认字体")
# 降级方案使用OpenCV绘制可能显示为问号但至少不会崩溃
cv2.putText(img, text.encode('utf-8').decode('latin-1', 'ignore'), position,
cv2.FONT_HERSHEY_SIMPLEX, font_size/40, color, thickness)
return img
class LLMClient:
"""大模型客户端,用于人员判断和离岗分析"""
def __init__(self, api_key, base_url, model_name):
self.client = OpenAI(
api_key=api_key,
base_url=base_url,
)
self.model_name = model_name
def frame_to_base64(self, frame):
"""将OpenCV帧转换为base64编码"""
_, buffer = cv2.imencode('.jpg', frame)
img_base64 = base64.b64encode(buffer).decode('utf-8')
return img_base64
def check_if_staff(self, frame, cam_id, roi_name):
"""判断ROI中的人员是否为工作人员"""
try:
img_base64 = self.frame_to_base64(frame)
prompt = f"""你是一个智能安防辅助系统,负责对监控画面中指定敏感区域(如高配间门口、天台、禁行通道)的人员活动进行分析。
请根据以下规则生成结构化响应
### 【判定标准】
2026-01-20 11:39:23 +08:00
**本单位物业员工**需满足下列条件之一
1. **清晰可见的正式工牌**胸前佩戴
2. **穿着标准制服**带有白色反光条的深色工程服黄蓝工程服白衬衫+黑领带蓝色清洁装浅色客服装等
2026-01-14 17:12:16 +08:00
3. **行为符合岗位规范**如巡检维修清洁无徘徊张望翻越
2026-01-20 11:39:23 +08:00
> 注意 满足部分关键条件如戴有安全帽穿有工作人员服饰带有工牌 视为员工不生成告警
2026-01-14 17:12:16 +08:00
### 【输出规则】
#### 情况1ROI区域内**无人**
输出
🟢无异常敏感区域当前无人员活动
[客观描述画面整体状态]
#### 情况2ROI区域内**有本单位员工**
输出
🟢无异常检测到本单位工作人员正常作业
[客观描述人数+制服类型+工牌状态+行为]
#### 情况3ROI区域内**有非员工或身份不明人员**
输出
🚨[区域类型]入侵告警检测到疑似非工作人员请立即核查
[客观描述人数+衣着+工牌状态+位置+行为]
### 【描述要求】
- 所有描述必须**30**
- 仅陈述**可观察事实**禁止主观推测"意图破坏""形迹可疑"
- 使用简洁标准化语言
### 【示例】
示例1无人
🟢无异常敏感区域当前无人员活动
高配间门口区域空旷无人员进入
示例2员工
🟢无异常检测到本单位工作人员正常作业
2026-01-20 11:39:23 +08:00
1名工程人员穿带有反光条的深蓝色工服在高配间巡检
2026-01-14 17:12:16 +08:00
示例3非员工
🚨天台区域入侵告警检测到疑似非工作人员请立即核查
2026-01-20 11:39:23 +08:00
1人穿绿色外套未佩戴工牌进入天台区域
2026-01-14 17:12:16 +08:00
---
请分析摄像头{cam_id}{roi_name}区域按照上述格式输出结果"""
response = self.client.chat.completions.create(
model=self.model_name,
messages=[
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{img_base64}"
}
},
{"type": "text", "text": prompt}
]
}
]
)
result_text = response.choices[0].message.content
# 判断是否为工作人员(根据新的输出格式)
# 如果包含"🟢无异常"且包含"工作人员",则为员工
# 如果包含"🚨"或"入侵告警"或"非工作人员",则为非员工
is_staff = False
if "🟢无异常" in result_text and "工作人员" in result_text:
is_staff = True
elif "🚨" in result_text or "入侵告警" in result_text or "非工作人员" in result_text:
is_staff = False
elif "无人员活动" in result_text or "无人" in result_text:
is_staff = None # 无人情况
return is_staff, result_text
except Exception as e:
print(f"[{cam_id}] 大模型调用失败: {e}")
return None, str(e)
def analyze_off_duty_duration(self, key_frames_info, cam_id):
"""分析离岗时长并判断是否为同一人"""
try:
frames = key_frames_info.get('frames', [])
if not frames:
return False, False, "无关键帧"
off_duty_duration = key_frames_info.get('off_duty_duration', 0)
duration_minutes = int(off_duty_duration / 60)
duration_seconds = int(off_duty_duration % 60)
# 构建消息内容
content_parts = [
{
"type": "text",
"text": f"""请分析以下关键帧图像,判断人员离岗情况。请按照以下格式简洁回答:
输出格式
1. 是否告警[/]
2. 离岗时间{duration_minutes}{duration_seconds}
3. 是否为同一人[//无法确定]
4. 简要分析[一句话概括不超过30字]
要求
- 如果离岗时间超过6分钟且确认为同一人则告警
- 简要分析需客观描述关键帧中人员的特征和行为变化
- 回答要简洁明了避免冗余描述
关键帧信息"""
}
]
# 添加图像和说明
for i, frame_info in enumerate(frames):
img_base64 = self.frame_to_base64(frame_info['frame'])
content_parts.append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{img_base64}"
}
})
content_parts.append({
"type": "text",
"text": f"关键帧{i+1} - 时间: {frame_info['time']}, 事件: {frame_info['event']}"
})
response = self.client.chat.completions.create(
model=self.model_name,
messages=[
{
"role": "user",
"content": content_parts
}
]
)
result = response.choices[0].message.content
# 解析结果 - 更灵活的解析逻辑
# 判断是否告警
exceeds_6min = False
if duration_minutes >= 6:
# 如果时间已经超过6分钟检查大模型是否确认告警
if any(keyword in result for keyword in ["是否告警:是", "是否告警:是", "告警:是", "告警:是", "需要告警", "应告警"]):
exceeds_6min = True
elif "是否告警:否" not in result and "是否告警:否" not in result:
# 如果没有明确说否且时间超过6分钟默认告警
exceeds_6min = True
else:
# 时间未超过6分钟即使大模型说告警也不告警
exceeds_6min = False
# 判断是否为同一人
is_same_person = False
if any(keyword in result for keyword in ["是否为同一人:是", "是否为同一人:是", "同一人:是", "同一人:是", "是同一人", "确认为同一人"]):
is_same_person = True
elif any(keyword in result for keyword in ["是否为同一人:否", "是否为同一人:否", "同一人:否", "同一人:否", "不是同一人", "非同一人"]):
is_same_person = False
elif "无法确定" in result or "不确定" in result:
is_same_person = False # 无法确定时,不告警
return exceeds_6min, is_same_person, result
except Exception as e:
print(f"[{cam_id}] 离岗分析失败: {e}")
return None, None, str(e)
2026-01-12 17:38:39 +08:00
class ThreadedFrameReader:
def __init__(self, cam_id, rtsp_url):
self.cam_id = cam_id
self.rtsp_url = rtsp_url
2026-01-20 11:39:23 +08:00
self._lock = threading.Lock() # 添加锁保护VideoCapture访问
self.cap = None
try:
self.cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
if self.cap.isOpened():
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
else:
print(f"[{cam_id}] 警告:无法打开视频流: {rtsp_url}")
except Exception as e:
print(f"[{cam_id}] 初始化VideoCapture失败: {e}")
2026-01-12 17:38:39 +08:00
self.q = queue.Queue(maxsize=2)
self.running = True
self.thread = threading.Thread(target=self._reader, daemon=True)
self.thread.start()
def _reader(self):
2026-01-20 11:39:23 +08:00
"""读取帧的线程函数"""
try:
while self.running:
with self._lock:
if self.cap is None or not self.cap.isOpened():
break
ret, frame = self.cap.read()
if not ret:
time.sleep(0.1)
continue
if self.q.full():
try:
self.q.get_nowait()
except queue.Empty:
pass
self.q.put(frame)
except Exception as e:
print(f"[{self.cam_id}] 读取帧线程异常: {e}")
finally:
# 确保资源释放(使用锁保护)
with self._lock:
if self.cap is not None:
try:
if self.cap.isOpened():
self.cap.release()
except Exception as e:
print(f"[{self.cam_id}] 释放VideoCapture时出错: {e}")
finally:
self.cap = None
2026-01-12 17:38:39 +08:00
def read(self):
if not self.q.empty():
return True, self.q.get()
return False, None
def release(self):
2026-01-20 11:39:23 +08:00
"""释放资源,等待线程结束"""
if not self.running:
return # 已经释放过了
2026-01-12 17:38:39 +08:00
self.running = False
2026-01-20 11:39:23 +08:00
# 等待线程结束最多等待3秒
if self.thread.is_alive():
self.thread.join(timeout=3.0)
if self.thread.is_alive():
print(f"[{self.cam_id}] 警告读取线程未能在3秒内结束")
# 清空队列
while not self.q.empty():
try:
self.q.get_nowait()
except queue.Empty:
break
# VideoCapture的释放由_reader线程的finally块处理这里不再重复释放
2026-01-12 17:38:39 +08:00
class MultiCameraMonitor:
def __init__(self, config_path):
with open(config_path, 'r', encoding='utf-8') as f:
self.cfg = yaml.safe_load(f)
# === 全局模型(只加载一次)===
model_cfg = self.cfg['model']
self.device = model_cfg.get('device', 'auto')
if self.device == 'auto' or not self.device:
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"🚀 全局加载模型到 {self.device}...")
self.model = YOLO(model_cfg['path'])
self.model.to(self.device)
self.use_half = (self.device == 'cuda')
if self.use_half:
print("✅ 启用 FP16 推理")
self.imgsz = model_cfg['imgsz']
self.conf_thresh = model_cfg['conf_threshold']
2026-01-14 17:12:16 +08:00
# === 初始化大模型客户端 ===
llm_cfg = self.cfg.get('llm', {})
if llm_cfg.get('api_key'):
self.llm_client = LLMClient(
llm_cfg['api_key'],
llm_cfg['base_url'],
llm_cfg.get('model_name', 'qwen-vl-max')
)
print("✅ 大模型客户端已初始化")
else:
self.llm_client = None
print("⚠️ 未配置大模型API密钥大模型功能将不可用")
2026-01-12 17:38:39 +08:00
# === 初始化所有摄像头 ===
self.common = self.cfg['common']
self.cameras = {}
self.frame_readers = {}
self.queues = {} # cam_id -> queue for detection results
2026-01-14 17:12:16 +08:00
self.perimeter_queues = {} # cam_id -> queue for perimeter detection (每秒抽帧)
2026-01-12 17:38:39 +08:00
for cam_cfg in self.cfg['cameras']:
cam_id = cam_cfg['id']
2026-01-14 17:12:16 +08:00
self.cameras[cam_id] = CameraLogic(cam_id, cam_cfg, self.common, self.llm_client)
2026-01-12 17:38:39 +08:00
self.frame_readers[cam_id] = ThreadedFrameReader(cam_id, cam_cfg['rtsp_url'])
2026-01-14 17:12:16 +08:00
self.queues[cam_id] = queue.Queue(maxsize=1) # 存放检测结果(人员离岗)
self.perimeter_queues[cam_id] = queue.Queue(maxsize=1) # 存放检测结果(周界入侵)
2026-01-12 17:38:39 +08:00
# === 控制信号 ===
self.running = True
self.inference_thread = threading.Thread(target=self._inference_loop, daemon=True)
2026-01-14 17:12:16 +08:00
self.perimeter_thread = threading.Thread(target=self._perimeter_inference_loop, daemon=True)
2026-01-12 17:38:39 +08:00
self.inference_thread.start()
2026-01-14 17:12:16 +08:00
self.perimeter_thread.start()
2026-01-12 17:38:39 +08:00
def _inference_loop(self):
2026-01-14 17:12:16 +08:00
"""统一推理线程:轮询各摄像头最新帧,逐个推理(用于人员离岗)"""
2026-01-12 17:38:39 +08:00
while self.running:
processed = False
for cam_id, reader in self.frame_readers.items():
ret, frame = reader.read()
if not ret:
continue
cam_logic = self.cameras[cam_id]
if cam_logic.should_skip_frame():
continue
2026-01-14 17:12:16 +08:00
# 检查是否有ROI启用了人员离岗算法
if not cam_logic.has_enabled_algorithm('人员离岗'):
continue
2026-01-12 17:38:39 +08:00
results = self.model(
frame,
imgsz=self.imgsz,
conf=self.conf_thresh,
verbose=False,
device=self.device,
half=self.use_half,
2026-01-14 17:12:16 +08:00
classes=[0] # person only
2026-01-12 17:38:39 +08:00
)
if not self.queues[cam_id].full():
self.queues[cam_id].put((frame.copy(), results[0]))
processed = True
if not processed:
time.sleep(0.01)
2026-01-14 17:12:16 +08:00
def _perimeter_inference_loop(self):
"""周界入侵推理线程:每秒抽一帧进行检测"""
while self.running:
processed = False
for cam_id, reader in self.frame_readers.items():
cam_logic = self.cameras[cam_id]
# 检查是否有ROI启用了周界入侵算法
if not cam_logic.has_enabled_algorithm('周界入侵'):
continue
ret, frame = reader.read()
if not ret:
continue
# 每秒抽一帧
current_time = time.time()
if not hasattr(cam_logic, 'last_perimeter_check_time'):
cam_logic.last_perimeter_check_time = {}
if cam_id not in cam_logic.last_perimeter_check_time:
cam_logic.last_perimeter_check_time[cam_id] = 0
if current_time - cam_logic.last_perimeter_check_time[cam_id] < 1.0:
continue
cam_logic.last_perimeter_check_time[cam_id] = current_time
results = self.model(
frame,
imgsz=self.imgsz,
conf=self.conf_thresh,
verbose=False,
device=self.device,
half=self.use_half,
classes=[0] # person only
)
if not self.perimeter_queues[cam_id].full():
self.perimeter_queues[cam_id].put((frame.copy(), results[0]))
processed = True
if not processed:
time.sleep(0.1)
2026-01-12 17:38:39 +08:00
def run(self):
"""启动所有摄像头的显示和告警逻辑(主线程)"""
try:
while self.running:
for cam_id, cam_logic in self.cameras.items():
2026-01-14 17:12:16 +08:00
# 处理人员离岗检测结果
2026-01-12 17:38:39 +08:00
if not self.queues[cam_id].empty():
frame, results = self.queues[cam_id].get()
2026-01-14 17:12:16 +08:00
cam_logic.process_off_duty(frame, results)
# 处理周界入侵检测结果
if not self.perimeter_queues[cam_id].empty():
frame, results = self.perimeter_queues[cam_id].get()
cam_logic.process_perimeter(frame, results)
# 更新显示
cam_logic.update_display()
2026-01-12 17:38:39 +08:00
key = cv2.waitKey(1) & 0xFF
2026-01-14 17:12:16 +08:00
if key == ord('q'):
2026-01-12 17:38:39 +08:00
break
2026-01-14 17:12:16 +08:00
time.sleep(0.01)
2026-01-12 17:38:39 +08:00
except KeyboardInterrupt:
pass
finally:
self.stop()
def stop(self):
2026-01-20 11:39:23 +08:00
"""停止监控,清理所有资源"""
print("正在停止监控系统...")
2026-01-12 17:38:39 +08:00
self.running = False
2026-01-20 11:39:23 +08:00
# 等待推理线程结束
if hasattr(self, 'inference_thread') and self.inference_thread.is_alive():
self.inference_thread.join(timeout=2.0)
if hasattr(self, 'perimeter_thread') and self.perimeter_thread.is_alive():
self.perimeter_thread.join(timeout=2.0)
# 释放所有摄像头资源
for cam_id, reader in self.frame_readers.items():
try:
print(f"正在释放摄像头 {cam_id}...")
reader.release()
except Exception as e:
print(f"释放摄像头 {cam_id} 时出错: {e}")
# 关闭所有窗口
try:
cv2.destroyAllWindows()
except:
pass
# 强制清理(如果还有线程在运行)
import sys
import os
if sys.platform == 'win32':
# Windows下可能需要额外等待
time.sleep(0.5)
print("监控系统已停止")
2026-01-12 17:38:39 +08:00
2026-01-14 17:12:16 +08:00
class ROILogic:
"""单个ROI区域的逻辑处理"""
def __init__(self, roi_cfg, cam_id, common_cfg, llm_client):
self.cam_id = cam_id
self.roi_name = roi_cfg.get('name', '未命名区域')
self.llm_client = llm_client
# 处理points如果不存在或为空设置为None表示使用整张画面
if 'points' in roi_cfg and roi_cfg['points']:
self.roi_points = np.array(roi_cfg['points'], dtype=np.int32)
self.use_full_frame = False
else:
# 对于周界入侵算法如果没有points使用整张画面
self.roi_points = None
self.use_full_frame = True
# 算法配置
self.algorithms = {}
for alg_cfg in roi_cfg.get('algorithms', []):
alg_name = alg_cfg['name']
if alg_cfg.get('enabled', False):
self.algorithms[alg_name] = alg_cfg
# 人员离岗相关状态需要ROI如果没有points则不能启用
if '人员离岗' in self.algorithms:
if self.roi_points is None:
print(f"[{cam_id}] 警告:{self.roi_name} 启用了人员离岗算法但没有配置points已禁用")
del self.algorithms['人员离岗']
else:
alg_cfg = self.algorithms['人员离岗']
self.off_duty_threshold_sec = alg_cfg.get('off_duty_threshold_sec', 300)
self.on_duty_confirm_sec = alg_cfg.get('on_duty_confirm_sec', 5)
self.off_duty_confirm_sec = alg_cfg.get('off_duty_confirm_sec', 30)
self.is_on_duty = False
self.is_off_duty = True
self.on_duty_start_time = None
self.last_no_person_time = None
self.off_duty_timer_start = None
self.last_alert_time = 0
self.last_person_seen_time = None
# 关键时间记录
self.on_duty_confirm_time = None # 上岗确认时间
self.off_duty_confirm_time = None # 离岗确认时间
self.key_frames = [] # 关键帧存储
# 初始化状态跟踪
self.initial_state_start_time = None # 初始化状态开始时间(进入工作时间时)
self.has_ever_seen_person = False # 是否曾经检测到过人员
self.initial_state_frame = None # 初始化状态时的帧(用于大模型分析)
# 周界入侵相关状态如果没有points使用整张画面
if '周界入侵' in self.algorithms:
self.perimeter_last_check_time = 0
2026-01-20 11:39:23 +08:00
self.perimeter_alert_cooldown = 120 # 周界入侵告警冷却120秒2分钟
2026-01-14 17:12:16 +08:00
if self.use_full_frame:
print(f"[{cam_id}] 提示:{self.roi_name} 周界入侵算法将使用整张画面进行检测")
def is_point_in_roi(self, x, y):
"""判断点是否在ROI内如果没有ROIuse_full_frame=True总是返回True"""
if self.use_full_frame or self.roi_points is None:
return True
return cv2.pointPolygonTest(self.roi_points, (int(x), int(y)), False) >= 0
2026-01-12 17:38:39 +08:00
class CameraLogic:
2026-01-14 17:12:16 +08:00
def __init__(self, cam_id, cam_cfg, common_cfg, llm_client):
2026-01-12 17:38:39 +08:00
self.cam_id = cam_id
2026-01-14 17:12:16 +08:00
self.llm_client = llm_client
# 工作时间段配置
self.working_hours = common_cfg.get('working_hours', [[8, 30, 11, 0], [12, 0, 17, 30]])
2026-01-12 17:38:39 +08:00
self.process_every_n = cam_cfg.get('process_every_n_frames', common_cfg['process_every_n_frames'])
2026-01-14 17:12:16 +08:00
self.alert_cooldown_sec = common_cfg.get('alert_cooldown_sec', 300)
self.off_duty_alert_threshold_sec = common_cfg.get('off_duty_alert_threshold_sec', 360) # 6分钟
# 初始化所有ROI
self.rois = []
for roi_cfg in cam_cfg.get('rois', []):
self.rois.append(ROILogic(roi_cfg, cam_id, common_cfg, llm_client))
# 兼容旧配置格式
if 'roi_points' in cam_cfg:
# 创建默认ROI用于人员离岗
default_roi = {
'name': '离岗检测区域',
'points': cam_cfg['roi_points'],
'algorithms': [{
'name': '人员离岗',
'enabled': True,
'off_duty_threshold_sec': cam_cfg.get('off_duty_threshold_sec', 300),
'on_duty_confirm_sec': cam_cfg.get('on_duty_confirm_sec', 5),
'off_duty_confirm_sec': cam_cfg.get('off_duty_confirm_sec', 30)
}]
}
self.rois.append(ROILogic(default_roi, cam_id, common_cfg, llm_client))
2026-01-12 17:38:39 +08:00
self.frame_count = 0
2026-01-14 17:12:16 +08:00
self.display_frame = None # 用于显示的帧
self.display_results = None # 用于显示的检测结果YOLO results
2026-01-12 17:38:39 +08:00
def should_skip_frame(self):
self.frame_count += 1
return self.frame_count % self.process_every_n != 0
2026-01-14 17:12:16 +08:00
def has_enabled_algorithm(self, alg_name):
"""检查是否有ROI启用了指定算法"""
return any(alg_name in roi.algorithms for roi in self.rois)
2026-01-12 17:38:39 +08:00
def in_working_hours(self):
2026-01-14 17:12:16 +08:00
"""判断是否在工作时间段内"""
now = datetime.datetime.now()
h, m = now.hour, now.minute
current_minutes = h * 60 + m
for period in self.working_hours:
start_h, start_m, end_h, end_m = period
start_minutes = start_h * 60 + start_m
end_minutes = end_h * 60 + end_m
if start_minutes <= current_minutes < end_minutes:
return True
return False
def is_edge_time(self):
"""判断是否为边缘时间段8:30-9:00, 11:00-12:00, 17:30-18:00"""
2026-01-12 17:38:39 +08:00
now = datetime.datetime.now()
2026-01-14 17:12:16 +08:00
h, m = now.hour, now.minute
current_minutes = h * 60 + m
edge_periods = [
(8 * 60 + 30, 9 * 60), # 8:30-9:00
(11 * 60, 12 * 60), # 11:00-12:00
(17 * 60 + 30, 18 * 60) # 17:30-18:00
]
for start, end in edge_periods:
if start <= current_minutes < end:
return True
return False
def get_end_of_work_time(self):
"""获取当天工作结束时间17:30"""
now = datetime.datetime.now()
end_time = now.replace(hour=17, minute=30, second=0, microsecond=0)
if now > end_time:
# 如果已经过了17:30返回明天的17:30
end_time += datetime.timedelta(days=1)
return end_time
def process_off_duty(self, frame, results):
"""处理人员离岗检测"""
2026-01-12 17:38:39 +08:00
current_time = time.time()
now = datetime.datetime.now()
boxes = results.boxes
2026-01-14 17:12:16 +08:00
for roi in self.rois:
if '人员离岗' not in roi.algorithms:
continue
# 检查ROI中是否有人
roi_has_person = any(
roi.is_point_in_roi((b.xyxy[0][0] + b.xyxy[0][2]) / 2,
(b.xyxy[0][1] + b.xyxy[0][3]) / 2)
for b in boxes
2026-01-12 17:38:39 +08:00
)
2026-01-14 17:12:16 +08:00
in_work = self.in_working_hours()
is_edge = self.is_edge_time()
if in_work:
# 初始化状态跟踪:如果刚进入工作时间,记录开始时间
if roi.initial_state_start_time is None:
roi.initial_state_start_time = current_time
roi.has_ever_seen_person = False
roi.initial_state_frame = frame.copy() # 保存初始化状态时的帧
if roi_has_person:
roi.last_person_seen_time = current_time
roi.has_ever_seen_person = True
# 如果检测到人员,清除初始化状态
if roi.initial_state_start_time is not None:
roi.initial_state_start_time = None
roi.initial_state_frame = None
effective = (
roi.last_person_seen_time is not None and
(current_time - roi.last_person_seen_time) < 1.0
)
# 处理初始化状态:如果系统启动时没有人,且超过离岗确认时间
if not roi.has_ever_seen_person and roi.initial_state_start_time is not None:
elapsed_since_start = current_time - roi.initial_state_start_time
if elapsed_since_start >= roi.off_duty_confirm_sec:
# 超过离岗确认时间,触发离岗确认逻辑
roi.is_off_duty, roi.is_on_duty = True, False
roi.off_duty_confirm_time = roi.initial_state_start_time + roi.off_duty_confirm_sec # 使用离岗确认时间点
roi.off_duty_timer_start = current_time
# 保存关键帧(使用初始化状态时的帧作为离岗确认帧)
if roi.initial_state_frame is not None:
roi.key_frames.append({
'frame': roi.initial_state_frame.copy(),
'time': datetime.datetime.fromtimestamp(roi.off_duty_confirm_time).strftime('%Y-%m-%d %H:%M:%S'),
'event': '离岗确认(初始化状态)'
})
# 也保存当前帧
roi.key_frames.append({
'frame': frame.copy(),
'time': now.strftime('%Y-%m-%d %H:%M:%S'),
'event': '当前状态'
})
print(f"[{self.cam_id}] [{roi.roi_name}] 🚪 初始化状态:超过离岗确认时间,进入离岗计时 ({now.strftime('%H:%M:%S')})")
roi.initial_state_start_time = None # 清除初始化状态标记
roi.initial_state_frame = None
if effective:
roi.last_no_person_time = None
if roi.is_off_duty:
if roi.on_duty_start_time is None:
roi.on_duty_start_time = current_time
elif current_time - roi.on_duty_start_time >= roi.on_duty_confirm_sec:
roi.is_on_duty, roi.is_off_duty = True, False
roi.on_duty_confirm_time = current_time
roi.on_duty_start_time = None
# 保存关键帧
roi.key_frames.append({
'frame': frame.copy(),
'time': now.strftime('%Y-%m-%d %H:%M:%S'),
'event': '上岗确认'
})
print(f"[{self.cam_id}] [{roi.roi_name}] ✅ 上岗确认成功 ({now.strftime('%H:%M:%S')})")
else:
roi.on_duty_start_time = None
roi.last_person_seen_time = None
if not roi.is_off_duty:
if roi.last_no_person_time is None:
roi.last_no_person_time = current_time
elif current_time - roi.last_no_person_time >= roi.off_duty_confirm_sec:
roi.is_off_duty, roi.is_on_duty = True, False
roi.off_duty_confirm_time = current_time
roi.last_no_person_time = None
roi.off_duty_timer_start = current_time
# 保存关键帧
roi.key_frames.append({
'frame': frame.copy(),
'time': now.strftime('%Y-%m-%d %H:%M:%S'),
'event': '离岗确认'
})
print(f"[{self.cam_id}] [{roi.roi_name}] 🚪 进入离岗计时 ({now.strftime('%H:%M:%S')})")
# 离岗告警逻辑(边缘时间只记录,不告警)
if roi.is_off_duty and roi.off_duty_timer_start:
elapsed = current_time - roi.off_duty_timer_start
off_duty_duration = elapsed
# 如果到了下班时间还没回来,计算到下班时间的离岗时长
end_time = self.get_end_of_work_time()
if now >= end_time and roi.off_duty_confirm_time:
# 计算离岗时长:下班时间 - 离岗确认时间
off_duty_duration = (end_time.timestamp() - roi.off_duty_confirm_time)
# 超过6分钟且不在边缘时间使用大模型判断
if off_duty_duration >= self.off_duty_alert_threshold_sec and not is_edge:
# 对于初始化状态即使只有1帧也要进行分析
is_initial_state = any('初始化状态' in f.get('event', '') for f in roi.key_frames)
min_frames_required = 1 if is_initial_state else 2
if self.llm_client and len(roi.key_frames) >= min_frames_required:
# 限制关键帧数量只保留最近10帧
if len(roi.key_frames) > 10:
roi.key_frames = roi.key_frames[-10:]
# 准备关键帧信息
key_frames_info = {
'frames': roi.key_frames[-5:] if len(roi.key_frames) >= 2 else roi.key_frames, # 如果有足够帧取最近5帧否则全部使用
'off_duty_duration': off_duty_duration
}
# 调用大模型分析
exceeds_6min, is_same_person, analysis_result = self.llm_client.analyze_off_duty_duration(
key_frames_info, self.cam_id
)
# 对于初始化状态只要超过6分钟就告警因为无法判断是否为同一人
if is_initial_state:
should_alert = exceeds_6min if exceeds_6min is not None else (off_duty_duration >= self.off_duty_alert_threshold_sec)
else:
should_alert = exceeds_6min and is_same_person
if should_alert:
if (current_time - roi.last_alert_time) >= self.alert_cooldown_sec:
print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 离岗告警!离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)")
print(f"大模型分析结果: {analysis_result}")
# 保存告警图片
save_alert_image(
frame.copy(),
self.cam_id,
roi.roi_name,
"离岗",
f"离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)\n大模型分析结果:\n{analysis_result}"
)
roi.last_alert_time = current_time
elif not is_edge:
# 如果没有大模型,直接告警
if (current_time - roi.last_alert_time) >= self.alert_cooldown_sec:
print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 离岗告警!离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)")
# 保存告警图片
save_alert_image(
frame.copy(),
self.cam_id,
roi.roi_name,
"离岗",
f"离岗时长: {int(off_duty_duration)}秒 ({int(off_duty_duration/60)}分钟)"
)
roi.last_alert_time = current_time
elif is_edge and roi.off_duty_confirm_time:
# 边缘时间只记录,不告警
print(f"[{self.cam_id}] [{roi.roi_name}] 边缘时间段,记录离岗时长: {int(off_duty_duration)}")
self.display_frame = frame.copy()
self.display_results = results # 保存检测结果用于显示
def crop_roi(self, frame, roi_points):
"""裁剪ROI区域如果roi_points为None返回整张画面"""
if roi_points is None:
return frame.copy()
x_coords = roi_points[:, 0]
y_coords = roi_points[:, 1]
x_min, x_max = int(x_coords.min()), int(x_coords.max())
y_min, y_max = int(y_coords.min()), int(y_coords.max())
# 确保坐标在图像范围内
h, w = frame.shape[:2]
x_min = max(0, x_min)
y_min = max(0, y_min)
x_max = min(w, x_max)
y_max = min(h, y_max)
roi_frame = frame[y_min:y_max, x_min:x_max]
# 创建掩码
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
cv2.fillPoly(mask, [roi_points], 255)
mask_roi = mask[y_min:y_max, x_min:x_max]
# 应用掩码
if len(roi_frame.shape) == 3:
mask_roi = cv2.cvtColor(mask_roi, cv2.COLOR_GRAY2BGR)
roi_frame = cv2.bitwise_and(roi_frame, mask_roi)
return roi_frame
def process_perimeter(self, frame, results):
"""处理周界入侵检测"""
current_time = time.time()
boxes = results.boxes
for roi in self.rois:
if '周界入侵' not in roi.algorithms:
continue
# 检查ROI中是否有人如果没有ROI检查整张画面是否有人
if roi.use_full_frame:
# 使用整张画面,只要检测到人就触发
roi_has_person = len(boxes) > 0
2026-01-12 17:38:39 +08:00
else:
2026-01-14 17:12:16 +08:00
# 检查ROI中是否有人
roi_has_person = any(
roi.is_point_in_roi((b.xyxy[0][0] + b.xyxy[0][2]) / 2,
(b.xyxy[0][1] + b.xyxy[0][3]) / 2)
for b in boxes
)
if roi_has_person:
# 冷却时间检查
if current_time - roi.perimeter_last_check_time >= roi.perimeter_alert_cooldown:
roi.perimeter_last_check_time = current_time
# 裁剪ROI区域如果没有ROI使用整张画面
roi_frame = self.crop_roi(frame, roi.roi_points)
# 调用大模型判断是否为工作人员
if self.llm_client:
is_staff, result = self.llm_client.check_if_staff(roi_frame, self.cam_id, roi.roi_name)
area_desc = "整张画面" if roi.use_full_frame else roi.roi_name
if is_staff is None:
# 无人情况
print(f"[{self.cam_id}] [{roi.roi_name}] 大模型判断:{result}")
elif not is_staff:
# 非工作人员
print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 周界入侵告警!检测到非工作人员(检测区域:{area_desc}")
print(f"大模型判断结果: {result}")
2026-01-20 11:39:23 +08:00
# 保存告警图片(使用区域描述作为名称,更清晰)
2026-01-14 17:12:16 +08:00
save_alert_image(
frame.copy(),
self.cam_id,
2026-01-20 11:39:23 +08:00
area_desc, # 使用area_desc而不是roi.roi_name
2026-01-14 17:12:16 +08:00
"入侵",
2026-01-20 11:39:23 +08:00
f"检测区域: {area_desc}\nROI名称: {roi.roi_name}\n大模型判断结果:\n{result}"
2026-01-14 17:12:16 +08:00
)
else:
# 工作人员
print(f"[{self.cam_id}] [{roi.roi_name}] 检测到工作人员,无需告警")
print(f"大模型判断结果: {result}")
2026-01-12 17:38:39 +08:00
else:
2026-01-14 17:12:16 +08:00
# 没有大模型时,直接告警
area_desc = "整张画面" if roi.use_full_frame else roi.roi_name
print(f"[{self.cam_id}] [{roi.roi_name}] 🚨 周界入侵告警!检测到人员进入(检测区域:{area_desc}")
2026-01-20 11:39:23 +08:00
# 保存告警图片(使用区域描述作为名称,更清晰)
2026-01-14 17:12:16 +08:00
save_alert_image(
frame.copy(),
self.cam_id,
2026-01-20 11:39:23 +08:00
area_desc, # 使用area_desc而不是roi.roi_name
2026-01-14 17:12:16 +08:00
"入侵",
2026-01-20 11:39:23 +08:00
f"检测区域: {area_desc}\nROI名称: {roi.roi_name}\n检测到人员进入"
2026-01-14 17:12:16 +08:00
)
self.display_frame = frame.copy()
self.display_results = results # 保存检测结果用于显示
def update_display(self):
"""更新显示"""
if self.display_frame is None:
return
vis = self.display_frame.copy()
now = datetime.datetime.now()
in_work = self.in_working_hours()
# 如果有检测结果先绘制YOLO识别框
if self.display_results is not None:
vis = self.display_results.plot() # 使用YOLO的plot方法绘制识别框
# 检查是否有启用人员离岗算法的ROI
has_off_duty_algorithm = any('人员离岗' in roi.algorithms for roi in self.rois)
# 绘制所有ROI
full_frame_roi_count = 0 # 用于跟踪使用整张画面的ROI数量避免文本重叠
for roi in self.rois:
color = (0, 255, 0) # 默认绿色
thickness = 2
# 根据算法状态设置颜色
if '人员离岗' in roi.algorithms:
if roi.is_on_duty:
color = (0, 255, 0) # 绿色:在岗
elif roi.is_off_duty and roi.off_duty_timer_start:
elapsed = time.time() - roi.off_duty_timer_start
if elapsed >= roi.off_duty_threshold_sec:
color = (0, 0, 255) # 红色:离岗告警
else:
color = (0, 255, 255) # 黄色:离岗中
2026-01-12 17:38:39 +08:00
else:
2026-01-14 17:12:16 +08:00
color = (255, 0, 0) # 蓝色:未在岗
if '周界入侵' in roi.algorithms:
color = (255, 255, 0) # 青色:周界入侵区域
# 如果有ROI绘制ROI框
if roi.roi_points is not None:
cv2.polylines(vis, [roi.roi_points], True, color, thickness)
# 创建半透明覆盖层
overlay = vis.copy()
cv2.fillPoly(overlay, [roi.roi_points], color)
cv2.addWeighted(overlay, 0.2, vis, 0.8, 0, vis)
# 显示ROI名称使用中文文本绘制函数
text_pos = tuple(roi.roi_points[0])
vis = put_chinese_text(vis, roi.roi_name, text_pos, font_size=20, color=color, thickness=1)
else:
# 如果没有ROI使用整张画面在左上角显示提示避免重叠
display_text = f"{roi.roi_name} (整张画面)"
text_y = 30 + full_frame_roi_count * 25 # 每个ROI文本向下偏移25像素
vis = put_chinese_text(vis, display_text, (10, text_y), font_size=18, color=color, thickness=1)
full_frame_roi_count += 1
# 只在启用人员离岗算法时显示岗位状态
if has_off_duty_algorithm:
status = "OUT OF HOURS"
status_color = (128, 128, 128)
if in_work:
# 检查所有ROI的状态
has_on_duty = any(roi.is_on_duty for roi in self.rois if '人员离岗' in roi.algorithms)
has_off_duty = any(roi.is_off_duty and roi.off_duty_timer_start
for roi in self.rois if '人员离岗' in roi.algorithms)
if has_on_duty:
status, status_color = "ON DUTY", (0, 255, 0)
elif has_off_duty:
status, status_color = "OFF DUTY", (0, 255, 255)
else:
status, status_color = "OFF DUTY", (255, 0, 0)
cv2.putText(vis, f"[{self.cam_id}] {status}", (20, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, status_color, 2)
# 显示时间戳(所有摄像头都显示)
cv2.putText(vis, now.strftime('%Y-%m-%d %H:%M:%S'), (20, 90),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
2026-01-12 17:38:39 +08:00
cv2.imshow(f"Monitor - {self.cam_id}", vis)
def main():
2026-01-20 11:39:23 +08:00
import signal
import sys
2026-01-12 17:38:39 +08:00
parser = argparse.ArgumentParser()
parser.add_argument("--config", default="config.yaml", help="配置文件路径")
args = parser.parse_args()
2026-01-20 11:39:23 +08:00
monitor = None
try:
monitor = MultiCameraMonitor(args.config)
# 注册信号处理,确保优雅退出
def signal_handler(sig, frame):
print("\n收到退出信号,正在关闭...")
if monitor:
monitor.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
if sys.platform != 'win32':
signal.signal(signal.SIGTERM, signal_handler)
monitor.run()
except KeyboardInterrupt:
print("\n收到键盘中断,正在关闭...")
except Exception as e:
print(f"程序异常: {e}")
import traceback
traceback.print_exc()
finally:
if monitor:
monitor.stop()
# 确保进程退出
sys.exit(0)
2026-01-12 17:38:39 +08:00
if __name__ == "__main__":
main()