Files
Security_AI_integrated/run_detectors.py

52 lines
1.5 KiB
Python
Raw Normal View History

2026-01-12 17:38:39 +08:00
import yaml
import threading
from ultralytics import YOLO
import torch
from detector import OffDutyCrowdDetector
import os
def load_config(config_path="config.yaml"):
with open(config_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
def main():
config = load_config()
# 全局模型(共享)
model_path = config["model"]["path"]
device = config["model"].get("device", "cuda" if torch.cuda.is_available() else "cpu")
use_half = (device == "cuda")
print(f"Loading model {model_path} on {device} (FP16: {use_half})")
model = YOLO(model_path)
model.to(device)
if use_half:
model.model.half()
# 启动每个摄像头的检测线程
threads = []
for cam_cfg in config["cameras"]:
# 合并 common 配置
full_cfg = {**config.get("common", {}), **cam_cfg}
full_cfg["imgsz"] = config["model"]["imgsz"]
full_cfg["conf_thresh"] = config["model"]["conf_thresh"]
full_cfg["working_hours"] = config["common"]["working_hours"]
detector = OffDutyCrowdDetector(full_cfg, model, device, use_half)
thread = threading.Thread(target=detector.run, daemon=True)
thread.start()
threads.append(thread)
print(f"Started detector for {cam_cfg['id']}")
print(f"✅ 已启动 {len(threads)} 路摄像头检测,按 Ctrl+C 退出")
try:
for t in threads:
t.join()
except KeyboardInterrupt:
print("\n🛑 Shutting down...")
if __name__ == "__main__":
main()