import os from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Tuple import cv2 import numpy as np def draw_bbox( image: np.ndarray, bbox: List[float], label: str = "", color: Tuple[int, int, int] = (0, 255, 0), thickness: int = 2, ) -> np.ndarray: x1, y1, x2, y2 = [int(v) for v in bbox] cv2.rectangle(image, (x1, y1), (x2, y2), color, thickness) if label: (text_width, text_height), baseline = cv2.getTextSize( label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, thickness ) cv2.rectangle( image, (x1, y1 - text_height - 10), (x1 + text_width + 10, y1), color, -1, ) cv2.putText( image, label, (x1 + 5, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), thickness, ) return image def draw_roi( image: np.ndarray, points: List[List[float]], roi_type: str = "polygon", color: Tuple[int, int, int] = (255, 0, 0), thickness: int = 2, label: str = "", ) -> np.ndarray: points = np.array(points, dtype=np.int32) if roi_type == "polygon": cv2.polylines(image, [points], True, color, thickness) cv2.fillPoly(image, [points], color=(color[0], color[1], color[2], 30)) elif roi_type == "line": cv2.line(image, tuple(points[0]), tuple(points[1]), color, thickness) elif roi_type == "rectangle": x1, y1 = points[0] x2, y2 = points[1] cv2.rectangle(image, (x1, y1), (x2, y2), color, thickness) if label: cx = int(np.mean(points[:, 0])) cy = int(np.mean(points[:, 1])) cv2.putText( image, label, (cx, cy), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, thickness, ) return image def draw_detections( image: np.ndarray, detections: List[Dict[str, Any]], roi_configs: Optional[List[Dict[str, Any]]] = None, ) -> np.ndarray: result = image.copy() for detection in detections: bbox = detection.get("bbox", []) conf = detection.get("conf", 0.0) cls = detection.get("cls", 0) label = f"Person: {conf:.2f}" color = (0, 255, 0) if roi_configs: matched_rois = detection.get("matched_rois", []) for roi_conf in matched_rois: if roi_conf.get("enabled", True): roi_points = roi_conf.get("points", []) roi_type = roi_conf.get("type", "polygon") roi_label = roi_conf.get("name", "") roi_color = (255, 0, 0) if roi_conf.get("rule") == "intrusion" else (0, 165, 255) result = draw_roi(result, roi_points, roi_type, roi_color, 2, roi_label) result = draw_bbox(result, bbox, label, color, 2) return result def resize_image( image: np.ndarray, max_size: int = 480, maintain_aspect: bool = True, ) -> np.ndarray: h, w = image.shape[:2] if maintain_aspect: scale = min(max_size / h, max_size / w) new_h, new_w = int(h * scale), int(w * scale) else: new_h, new_w = max_size, max_size scale = max_size / max(h, w) result = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA) return result, scale def encode_image_base64(image: np.ndarray, quality: int = 85) -> str: _, buffer = cv2.imencode(".jpg", image, [cv2.IMWRITE_JPEG_QUALITY, quality]) return base64.b64encode(buffer).decode("utf-8") def decode_image_base64(data: str) -> np.ndarray: import base64 buffer = base64.b64decode(data) nparr = np.frombuffer(buffer, np.uint8) return cv2.imdecode(nparr, cv2.IMREAD_COLOR) def get_timestamp_str() -> str: return datetime.now().strftime("%Y%m%d_%H%M%S") def format_duration(seconds: float) -> str: if seconds < 60: return f"{int(seconds)}秒" elif seconds < 3600: minutes = int(seconds // 60) secs = int(seconds % 60) return f"{minutes}分{secs}秒" else: hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) return f"{hours}小时{minutes}分" def is_time_in_ranges( current_time: datetime, time_ranges: List[Dict[str, List[int]]], ) -> bool: if not time_ranges: return True current_minutes = current_time.hour * 60 + current_time.minute for time_range in time_ranges: start = time_range["start"] end = time_range["end"] start_minutes = start[0] * 60 + start[1] end_minutes = end[0] * 60 + end[1] if start_minutes <= current_minutes < end_minutes: return True return False class FPSCounter: def __init__(self, window_size: int = 30): self.timestamps: List[float] = [] self.window_size = window_size def update(self): import time now = time.time() self.timestamps.append(now) self.timestamps = self.timestamps[-self.window_size:] @property def fps(self) -> float: if len(self.timestamps) < 2: return 0.0 elapsed = self.timestamps[-1] - self.timestamps[0] if elapsed <= 0: return 0.0 return (len(self.timestamps) - 1) / elapsed