196 lines
5.3 KiB
Python
196 lines
5.3 KiB
Python
|
|
import os
|
||
|
|
from datetime import datetime, timedelta
|
||
|
|
from typing import Any, Dict, List, Optional, Tuple
|
||
|
|
|
||
|
|
import cv2
|
||
|
|
import numpy as np
|
||
|
|
|
||
|
|
|
||
|
|
def draw_bbox(
|
||
|
|
image: np.ndarray,
|
||
|
|
bbox: List[float],
|
||
|
|
label: str = "",
|
||
|
|
color: Tuple[int, int, int] = (0, 255, 0),
|
||
|
|
thickness: int = 2,
|
||
|
|
) -> np.ndarray:
|
||
|
|
x1, y1, x2, y2 = [int(v) for v in bbox]
|
||
|
|
cv2.rectangle(image, (x1, y1), (x2, y2), color, thickness)
|
||
|
|
|
||
|
|
if label:
|
||
|
|
(text_width, text_height), baseline = cv2.getTextSize(
|
||
|
|
label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, thickness
|
||
|
|
)
|
||
|
|
cv2.rectangle(
|
||
|
|
image,
|
||
|
|
(x1, y1 - text_height - 10),
|
||
|
|
(x1 + text_width + 10, y1),
|
||
|
|
color,
|
||
|
|
-1,
|
||
|
|
)
|
||
|
|
cv2.putText(
|
||
|
|
image,
|
||
|
|
label,
|
||
|
|
(x1 + 5, y1 - 5),
|
||
|
|
cv2.FONT_HERSHEY_SIMPLEX,
|
||
|
|
0.5,
|
||
|
|
(0, 0, 0),
|
||
|
|
thickness,
|
||
|
|
)
|
||
|
|
|
||
|
|
return image
|
||
|
|
|
||
|
|
|
||
|
|
def draw_roi(
|
||
|
|
image: np.ndarray,
|
||
|
|
points: List[List[float]],
|
||
|
|
roi_type: str = "polygon",
|
||
|
|
color: Tuple[int, int, int] = (255, 0, 0),
|
||
|
|
thickness: int = 2,
|
||
|
|
label: str = "",
|
||
|
|
) -> np.ndarray:
|
||
|
|
points = np.array(points, dtype=np.int32)
|
||
|
|
|
||
|
|
if roi_type == "polygon":
|
||
|
|
cv2.polylines(image, [points], True, color, thickness)
|
||
|
|
cv2.fillPoly(image, [points], color=(color[0], color[1], color[2], 30))
|
||
|
|
elif roi_type == "line":
|
||
|
|
cv2.line(image, tuple(points[0]), tuple(points[1]), color, thickness)
|
||
|
|
elif roi_type == "rectangle":
|
||
|
|
x1, y1 = points[0]
|
||
|
|
x2, y2 = points[1]
|
||
|
|
cv2.rectangle(image, (x1, y1), (x2, y2), color, thickness)
|
||
|
|
|
||
|
|
if label:
|
||
|
|
cx = int(np.mean(points[:, 0]))
|
||
|
|
cy = int(np.mean(points[:, 1]))
|
||
|
|
cv2.putText(
|
||
|
|
image,
|
||
|
|
label,
|
||
|
|
(cx, cy),
|
||
|
|
cv2.FONT_HERSHEY_SIMPLEX,
|
||
|
|
0.7,
|
||
|
|
color,
|
||
|
|
thickness,
|
||
|
|
)
|
||
|
|
|
||
|
|
return image
|
||
|
|
|
||
|
|
|
||
|
|
def draw_detections(
|
||
|
|
image: np.ndarray,
|
||
|
|
detections: List[Dict[str, Any]],
|
||
|
|
roi_configs: Optional[List[Dict[str, Any]]] = None,
|
||
|
|
) -> np.ndarray:
|
||
|
|
result = image.copy()
|
||
|
|
|
||
|
|
for detection in detections:
|
||
|
|
bbox = detection.get("bbox", [])
|
||
|
|
conf = detection.get("conf", 0.0)
|
||
|
|
cls = detection.get("cls", 0)
|
||
|
|
|
||
|
|
label = f"Person: {conf:.2f}"
|
||
|
|
color = (0, 255, 0)
|
||
|
|
|
||
|
|
if roi_configs:
|
||
|
|
matched_rois = detection.get("matched_rois", [])
|
||
|
|
for roi_conf in matched_rois:
|
||
|
|
if roi_conf.get("enabled", True):
|
||
|
|
roi_points = roi_conf.get("points", [])
|
||
|
|
roi_type = roi_conf.get("type", "polygon")
|
||
|
|
roi_label = roi_conf.get("name", "")
|
||
|
|
roi_color = (255, 0, 0) if roi_conf.get("rule") == "intrusion" else (0, 165, 255)
|
||
|
|
result = draw_roi(result, roi_points, roi_type, roi_color, 2, roi_label)
|
||
|
|
|
||
|
|
result = draw_bbox(result, bbox, label, color, 2)
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
def resize_image(
|
||
|
|
image: np.ndarray,
|
||
|
|
max_size: int = 480,
|
||
|
|
maintain_aspect: bool = True,
|
||
|
|
) -> np.ndarray:
|
||
|
|
h, w = image.shape[:2]
|
||
|
|
|
||
|
|
if maintain_aspect:
|
||
|
|
scale = min(max_size / h, max_size / w)
|
||
|
|
new_h, new_w = int(h * scale), int(w * scale)
|
||
|
|
else:
|
||
|
|
new_h, new_w = max_size, max_size
|
||
|
|
scale = max_size / max(h, w)
|
||
|
|
|
||
|
|
result = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
|
||
|
|
return result, scale
|
||
|
|
|
||
|
|
|
||
|
|
def encode_image_base64(image: np.ndarray, quality: int = 85) -> str:
|
||
|
|
_, buffer = cv2.imencode(".jpg", image, [cv2.IMWRITE_JPEG_QUALITY, quality])
|
||
|
|
return base64.b64encode(buffer).decode("utf-8")
|
||
|
|
|
||
|
|
|
||
|
|
def decode_image_base64(data: str) -> np.ndarray:
|
||
|
|
import base64
|
||
|
|
buffer = base64.b64decode(data)
|
||
|
|
nparr = np.frombuffer(buffer, np.uint8)
|
||
|
|
return cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
||
|
|
|
||
|
|
|
||
|
|
def get_timestamp_str() -> str:
|
||
|
|
return datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
|
|
|
||
|
|
|
||
|
|
def format_duration(seconds: float) -> str:
|
||
|
|
if seconds < 60:
|
||
|
|
return f"{int(seconds)}秒"
|
||
|
|
elif seconds < 3600:
|
||
|
|
minutes = int(seconds // 60)
|
||
|
|
secs = int(seconds % 60)
|
||
|
|
return f"{minutes}分{secs}秒"
|
||
|
|
else:
|
||
|
|
hours = int(seconds // 3600)
|
||
|
|
minutes = int((seconds % 3600) // 60)
|
||
|
|
return f"{hours}小时{minutes}分"
|
||
|
|
|
||
|
|
|
||
|
|
def is_time_in_ranges(
|
||
|
|
current_time: datetime,
|
||
|
|
time_ranges: List[Dict[str, List[int]]],
|
||
|
|
) -> bool:
|
||
|
|
if not time_ranges:
|
||
|
|
return True
|
||
|
|
|
||
|
|
current_minutes = current_time.hour * 60 + current_time.minute
|
||
|
|
|
||
|
|
for time_range in time_ranges:
|
||
|
|
start = time_range["start"]
|
||
|
|
end = time_range["end"]
|
||
|
|
start_minutes = start[0] * 60 + start[1]
|
||
|
|
end_minutes = end[0] * 60 + end[1]
|
||
|
|
|
||
|
|
if start_minutes <= current_minutes < end_minutes:
|
||
|
|
return True
|
||
|
|
|
||
|
|
return False
|
||
|
|
|
||
|
|
|
||
|
|
class FPSCounter:
|
||
|
|
def __init__(self, window_size: int = 30):
|
||
|
|
self.timestamps: List[float] = []
|
||
|
|
self.window_size = window_size
|
||
|
|
|
||
|
|
def update(self):
|
||
|
|
import time
|
||
|
|
now = time.time()
|
||
|
|
self.timestamps.append(now)
|
||
|
|
self.timestamps = self.timestamps[-self.window_size:]
|
||
|
|
|
||
|
|
@property
|
||
|
|
def fps(self) -> float:
|
||
|
|
if len(self.timestamps) < 2:
|
||
|
|
return 0.0
|
||
|
|
elapsed = self.timestamps[-1] - self.timestamps[0]
|
||
|
|
if elapsed <= 0:
|
||
|
|
return 0.0
|
||
|
|
return (len(self.timestamps) - 1) / elapsed
|