import yaml import threading from ultralytics import YOLO import torch from detector import OffDutyCrowdDetector import os def load_config(config_path="config.yaml"): with open(config_path, "r", encoding="utf-8") as f: return yaml.safe_load(f) def main(): config = load_config() # 全局模型(共享) model_path = config["model"]["path"] device = config["model"].get("device", "cuda" if torch.cuda.is_available() else "cpu") use_half = (device == "cuda") print(f"Loading model {model_path} on {device} (FP16: {use_half})") model = YOLO(model_path) model.to(device) if use_half: model.model.half() # 启动每个摄像头的检测线程 threads = [] for cam_cfg in config["cameras"]: # 合并 common 配置 full_cfg = {**config.get("common", {}), **cam_cfg} full_cfg["imgsz"] = config["model"]["imgsz"] full_cfg["conf_thresh"] = config["model"]["conf_thresh"] full_cfg["working_hours"] = config["common"]["working_hours"] detector = OffDutyCrowdDetector(full_cfg, model, device, use_half) thread = threading.Thread(target=detector.run, daemon=True) thread.start() threads.append(thread) print(f"Started detector for {cam_cfg['id']}") print(f"✅ 已启动 {len(threads)} 路摄像头检测,按 Ctrl+C 退出") try: for t in threads: t.join() except KeyboardInterrupt: print("\n🛑 Shutting down...") if __name__ == "__main__": main()