chore: 移除无用的调试脚本和模型二进制文件

从版本控制中移除(本地文件保留):
- analyze_latency*.py, benchmark_trt.py, build_engine.py 等调试脚本
- check_engine*.py, debug_output_shape.py 等检查工具
- test_edge_run.py, test_inference.py 临时测试脚本
- models/*.engine, *.onnx, *.pt 大型模型二进制文件
- 根目录 __init__.py(无用)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-10 15:29:05 +08:00
parent 0191e498f1
commit e828f4e09b
13 changed files with 0 additions and 981 deletions

View File

@@ -1,26 +0,0 @@
"""
Edge_Inference_Service
工业级边缘AI推理服务
依赖安装:
pip install -r requirements.txt
环境要求:
- Python 3.8+
- CUDA 12.1
- cuDNN 8.9.7
- TensorRT 8.6.1
使用说明:
1. 配置数据库连接 (config/settings.py 或环境变量)
2. 配置Redis连接
3. 配置MQTT连接
4. 准备TensorRT引擎文件 (models/yolo11n.engine)
5. 运行: python main.py
作者: AI Edge Architecture Team
版本: 1.0.0
"""
__version__ = "1.0.0"
__author__ = "AI Edge Architecture Team"

View File

@@ -1,56 +0,0 @@
"""详细延迟分析 - 简化版"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import time
import numpy as np
import cv2
from config.settings import get_settings
from core.preprocessor import ImagePreprocessor
settings = get_settings()
preprocessor = ImagePreprocessor(settings.inference)
# 模拟 100 次推理
img = np.random.randint(0, 255, (1080, 1920, 3), dtype=np.uint8)
roi_mock = type('ROI', (), {'x1': 300, 'y1': 100, 'x2': 1000, 'y2': 800, 'enabled': True})()
times_preprocess = []
times_single = []
times_batch = []
for _ in range(100):
# 1. preprocess_single
start = time.perf_counter()
cropped = preprocessor.preprocess_single(img, roi_mock)
t = (time.perf_counter() - start) * 1000
times_single.append(t)
# 2. preprocess_batch (1→4)
start = time.perf_counter()
batch_data, _ = preprocessor._batch_preprocessor.preprocess_batch([cropped[0]])
t = (time.perf_counter() - start) * 1000
times_batch.append(t)
# 3. 完整 preprocess (single + batch)
start = time.perf_counter()
cropped = preprocessor.preprocess_single(img, roi_mock)
batch_data, _ = preprocessor._batch_preprocessor.preprocess_batch([cropped[0]])
t = (time.perf_counter() - start) * 1000
times_preprocess.append(t)
print("延迟分析 (100次平均):")
print(f" preprocess_single (ROI + resize): {np.mean(times_single):.2f}ms")
print(f" preprocess_batch (padding 1→4): {np.mean(times_batch):.2f}ms")
print(f" 完整预处理: {np.mean(times_preprocess):.2f}ms")
print()
print(f"TensorRT 推理 (batch=1): ~2.5ms (基准测试)")
print(f"TensorRT 推理 (batch=4): ~5.0ms (基准测试)")
print()
print("推算总延迟:")
print(f" 方案A (batch=1): {np.mean(times_single):.2f} + 2.5 + 后处理 ≈ 10-15ms")
print(f" 方案B (batch=4 实际只推理1帧): {np.mean(times_preprocess):.2f} + 5 + 后处理 ≈ 55-65ms")
print()
print("结论:延迟主要来自 batch padding 和不必要的 4帧推理开销")

View File

@@ -1,44 +0,0 @@
"""延迟分析 - batch=1 优化后"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import time
import numpy as np
from config.settings import get_settings
from core.preprocessor import ImagePreprocessor, BatchPreprocessor
settings = get_settings()
preprocessor = ImagePreprocessor(settings.inference)
img = np.random.randint(0, 255, (1080, 1920, 3), dtype=np.uint8)
roi_mock = type('ROI', (), {'x1': 300, 'y1': 100, 'x2': 1000, 'y2': 800, 'enabled': True, 'roi_type': 0})()
times_preprocess_single = []
times_preprocess_batch = []
for _ in range(100):
# 1. preprocess_single
start = time.perf_counter()
cropped = preprocessor.preprocess_single(img, roi_mock)
t = (time.perf_counter() - start) * 1000
times_preprocess_single.append(t)
# 2. preprocess_batch (batch=1)
start = time.perf_counter()
batch_data, _ = preprocessor._batch_preprocessor.preprocess_batch([cropped[0]])
t = (time.perf_counter() - start) * 1000
times_preprocess_batch.append(t)
print("延迟分析 (batch=1 优化后):")
print(f" preprocess_single: {np.mean(times_preprocess_single):.2f}ms")
print(f" preprocess_batch: {np.mean(times_preprocess_batch):.2f}ms")
print(f" 总预处理: {np.mean(times_preprocess_single) + np.mean(times_preprocess_batch):.2f}ms")
print()
print(f"TensorRT batch=1 推理: ~2.5ms")
print(f"TensorRT batch=4 推理: ~5.0ms")
print()
print("推算总延迟:")
print(f" batch=1: {np.mean(times_preprocess_single) + np.mean(times_preprocess_batch):.2f} + 2.5 ≈ 8-12ms")
print(f" batch=4: {np.mean(times_preprocess_single) + np.mean(times_preprocess_batch):.2f} + 5 ≈ 10-15ms")

View File

@@ -1,96 +0,0 @@
"""TensorRT 纯推理延迟测试"""
import numpy as np
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
import time
engine_path = './models/yolo11n.engine'
with open(engine_path, 'rb') as f:
runtime = trt.Runtime(trt.Logger(trt.Logger.WARNING))
engine = runtime.deserialize_cuda_engine(f.read())
context = engine.create_execution_context()
input_shape = (1, 3, 480, 480)
input_data = np.random.randn(*input_shape).astype(np.float32)
context.set_input_shape('images', input_shape)
output_shape = tuple(max(1, s) for s in engine.get_binding_shape(1))
output_size = int(np.prod(output_shape))
h_input = cuda.pagelocked_empty(input_data.size, np.float32)
h_output = cuda.pagelocked_empty(output_size, np.float32)
np.copyto(h_input, input_data.ravel())
d_input = cuda.mem_alloc(h_input.nbytes)
d_output = cuda.mem_alloc(h_output.nbytes)
bindings = [int(d_input), int(d_output)]
# Warmup
for _ in range(10):
cuda.memcpy_htod(d_input, h_input)
context.execute_v2(bindings=bindings)
cuda.memcpy_dtoh(h_output, d_output)
# Benchmark
times = []
for _ in range(100):
start = time.perf_counter()
cuda.memcpy_htod(d_input, h_input)
context.execute_v2(bindings=bindings)
cuda.memcpy_dtoh(h_output, d_output)
times.append((time.perf_counter() - start) * 1000)
print(f'TensorRT 纯推理延迟 (batch=1):')
print(f' 平均: {np.mean(times):.2f}ms')
print(f' 中位数: {np.median(times):.2f}ms')
print(f' 最小: {np.min(times):.2f}ms')
print(f' 最大: {np.max(times):.2f}ms')
print(f' P95: {np.percentile(times, 95):.2f}ms')
print()
# 再测试 batch=4
print("测试 batch=4...")
input_shape_4 = (4, 3, 480, 480)
input_data_4 = np.random.randn(*input_shape_4).astype(np.float32)
context.set_input_shape('images', input_shape_4)
output_shape_4 = (4, 84, 4725)
output_size_4 = int(np.prod(output_shape_4))
h_input_4 = cuda.pagelocked_empty(input_data_4.size, np.float32)
h_output_4 = cuda.pagelocked_empty(output_size_4, np.float32)
np.copyto(h_input_4, input_data_4.ravel())
d_input_4 = cuda.mem_alloc(h_input_4.nbytes)
d_output_4 = cuda.mem_alloc(h_output_4.nbytes)
bindings_4 = [int(d_input_4), int(d_output_4)]
# Warmup
for _ in range(10):
cuda.memcpy_htod(d_input_4, h_input_4)
context.execute_v2(bindings=bindings_4)
cuda.memcpy_dtoh(h_output_4, d_output_4)
# Benchmark
times_4 = []
for _ in range(100):
start = time.perf_counter()
cuda.memcpy_htod(d_input_4, h_input_4)
context.execute_v2(bindings=bindings_4)
cuda.memcpy_dtoh(h_output_4, d_output_4)
times_4.append((time.perf_counter() - start) * 1000)
print(f'TensorRT 纯推理延迟 (batch=4):')
print(f' 平均: {np.mean(times_4):.2f}ms')
print(f' 中位数: {np.median(times_4):.2f}ms')
print(f' 最小: {np.min(times_4):.2f}ms')
print(f' 最大: {np.max(times_4):.2f}ms')
print(f' P95: {np.percentile(times_4, 95):.2f}ms')

View File

@@ -1,395 +0,0 @@
"""
TensorRT Engine 生成脚本 (8GB显存优化版)
功能:
- 将 YOLO11 模型转换为 TensorRT engine 文件
- 针对 8GB 显存环境优化
- 支持 FP16 精度、480x480 输入、动态 Batch(1-8)
- 启用所有高性能优化策略
使用方法:
python build_engine.py --download yolo11n --output models/yolo11n.engine [OPTIONS]
示例:
python build_engine.py --download yolo11n --output models/yolo11n.engine --fp16 --batch 8
优化命令参数说明:
--min-batch 最小Batch大小 (默认: 1)
--opt-batch 优化Batch大小 (默认: 4) <-- TensorRT会针对此尺寸专门优化
--max-batch 最大Batch大小 (默认: 8)
--workspace 工作空间大小MB (默认: 6144即6GB)
--tactics 优化策略 (默认: +7等价于 +CUBLAS,+CUBLAS_LT,+CUDNN)
--best 全局最优搜索 (默认: 启用)
--preview 预览特性 (默认: +faster_dynamic_shapes_0805)
tactic_values:
CUBLAS = 1
CUBLAS_LT = 2
CUDNN = 4
+7 = 全部启用"""
import os
import sys
import argparse
from pathlib import Path
try:
import tensorrt as trt
TRT_AVAILABLE = True
except ImportError:
TRT_AVAILABLE = False
trt = None
TRT_LOGGER = trt.Logger(trt.Logger.WARNING) if TRT_AVAILABLE else None
def check_environment():
"""检查环境配置"""
print(f"\n{'='*60}")
print("环境检查")
print(f"{'='*60}")
try:
import tensorrt as trt
print(f" TensorRT: {trt.__version__}")
except ImportError:
print(" TensorRT: 未安装")
try:
import torch
print(f" PyTorch: {torch.__version__}")
print(f" CUDA: {torch.version.cuda if hasattr(torch, 'version') else 'N/A'}")
if torch.cuda.is_available():
print(f" GPU: {torch.cuda.get_device_name(0)}")
print(f" GPU显存: {torch.cuda.get_device_properties(0).total_memory / (1024**3):.1f} GB")
except ImportError:
print(" PyTorch: 未安装")
try:
from ultralytics import YOLO
print(" Ultralytics: 已安装")
except ImportError:
print(" Ultralytics: 未安装")
def download_model(model_type: str, save_dir: str = "./models") -> str:
"""
下载 YOLO11 模型
Args:
model_type: 模型类型 (yolo11n, yolo11s, yolo11m, yolo11l, yolo11x)
save_dir: 保存目录
Returns:
模型文件路径
"""
from ultralytics import YOLO
os.makedirs(save_dir, exist_ok=True)
model_path = os.path.join(save_dir, f"{model_type}.pt")
if os.path.exists(model_path):
print(f"模型已存在: {model_path}")
return model_path
print(f"\n下载 YOLO11 {model_type} 模型...")
model = YOLO(f"{model_type}.pt")
model.save(model_path)
print(f"模型已保存: {model_path}")
return model_path
def export_onnx(
model_path: str,
output_path: str,
input_size: int = 480,
dynamic_batch: bool = True
) -> bool:
"""
导出 ONNX 模型
Args:
model_path: 模型路径
output_path: 输出路径
input_size: 输入尺寸
dynamic_batch: 是否使用动态Batch
Returns:
是否成功
"""
from ultralytics import YOLO
print(f"\n导出 ONNX 模型...")
print(f" 源模型: {model_path}")
print(f" 输出路径: {output_path}")
print(f" 输入尺寸: {input_size}x{input_size}")
print(f" 动态Batch: {'' if dynamic_batch else ''}")
model = YOLO(model_path)
try:
model.export(
format='onnx',
imgsz=input_size,
dynamic=dynamic_batch,
simplify=True,
opset=17
)
if os.path.exists(output_path):
file_size = os.path.getsize(output_path) / (1024 * 1024)
print(f"\n✓ ONNX 导出成功!")
print(f" 文件: {output_path}")
print(f" 大小: {file_size:.2f} MB")
return True
else:
print("错误: ONNX 导出失败")
return False
except Exception as e:
print(f"错误: ONNX 导出失败: {e}")
return False
def build_engine(
onnx_path: str,
engine_path: str,
input_size: int = 480,
min_batch: int = 1,
opt_batch: int = 4,
max_batch: int = 8,
fp16: bool = True,
workspace_mb: int = 6144,
tactic_sources: str = "+CUBLAS,+CUBLAS_LT,+CUDNN",
best: bool = True,
preview: str = "+faster_dynamic_shapes_0805"
) -> bool:
"""
使用 TensorRT 构建 Engine8GB显存优化版
Args:
onnx_path: ONNX 模型路径
engine_path: 输出 engine 路径
input_size: 输入尺寸
min_batch: 最小 Batch 大小
opt_batch: 优化 Batch 大小 (TensorRT 会针对此尺寸专门优化)
max_batch: 最大 Batch 大小
fp16: 是否使用 FP16
workspace_mb: 工作空间大小 (MB)
tactic_sources: 优化策略
best: 是否全局最优搜索
preview: 预览特性
Returns:
是否成功
"""
print(f"\n{'='*60}")
print("TensorRT Engine 构建 (8GB显存优化版)")
print(f"{'='*60}")
if not os.path.exists(onnx_path):
print(f"错误: ONNX 模型不存在: {onnx_path}")
return False
os.makedirs(os.path.dirname(engine_path), exist_ok=True)
EXPLICIT_BATCH = 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)
with trt.Builder(TRT_LOGGER) as builder:
with builder.create_network(EXPLICIT_BATCH) as network:
with trt.OnnxParser(network, TRT_LOGGER) as parser:
print(f"加载 ONNX 模型: {onnx_path}")
with open(onnx_path, 'rb') as f:
if not parser.parse(f.read()):
print("错误: ONNX 解析失败")
for error in range(parser.num_errors):
print(f" {parser.get_error(error)}")
return False
input_tensor = network.get_input(0)
input_name = input_tensor.name
input_shape = input_tensor.shape
print(f"\n构建参数:")
print(f" 输入名称: {input_name}")
print(f" 输入形状: {input_shape}")
print(f" 最小Batch: {min_batch}")
print(f" 优化Batch: {opt_batch} <-- TensorRT会针对此尺寸专门优化!")
print(f" 最大Batch: {max_batch}")
print(f" 精度: {'FP16' if fp16 else 'FP32'}")
print(f" 工作空间: {workspace_mb} MB")
print(f" 优化策略: {tactic_sources}")
print(f" 全局最优: {'' if best else ''}")
print(f" 预览特性: {preview}")
profile = builder.create_optimization_profile()
profile.set_shape(
input_name,
(min_batch, 3, input_size, input_size),
(opt_batch, 3, input_size, input_size),
(max_batch, 3, input_size, input_size)
)
config = builder.create_builder_config()
config.add_optimization_profile(profile)
if fp16:
config.set_flag(trt.BuilderFlag.FP16)
config.set_flag(trt.BuilderFlag.TF32)
tactic_value = 0
for source in tactic_sources.split(','):
source = source.strip()
if not source:
continue
if source.startswith('+'):
name = source[1:]
if name.isdigit():
tactic_value += int(name)
else:
name_upper = name.upper()
if name_upper == 'CUBLAS':
tactic_value += 1
elif name_upper == 'CUBLAS_LT':
tactic_value += 2
elif name_upper == 'CUDNN':
tactic_value += 4
if tactic_value > 0:
config.set_tactic_sources(tactic_value)
# BENCHMARK flag 在 TRT 8.5+ 已废弃tactic profiling 逻辑内部自动完成
# config.set_flag(trt.BuilderFlag.BENCHMARK)
config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, workspace_mb * 1024 * 1024)
print(f"\n正在构建 Engine请稍候 (可能需要3-5分钟)...")
serialized_engine = builder.build_serialized_network(network, config)
if serialized_engine is None:
print("错误: Engine 构建失败")
return False
with open(engine_path, 'wb') as f:
f.write(serialized_engine)
if os.path.exists(engine_path):
file_size = os.path.getsize(engine_path) / (1024 * 1024)
print(f"\n✓ Engine 构建成功!")
print(f" 文件: {engine_path}")
print(f" 大小: {file_size:.2f} MB")
return True
else:
print("错误: Engine 保存失败")
return False
def main():
parser = argparse.ArgumentParser(
description="TensorRT Engine 构建工具 (8GB显存优化版)",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("--model", type=str, default="yolo11n.pt",
help="模型名称或路径 (默认: yolo11n.pt)")
parser.add_argument("--output", type=str, default="models/yolo11n.engine",
help="输出 engine 路径 (默认: models/yolo11n.engine)")
parser.add_argument("--download", type=str, default=None,
help="直接下载模型 (yolo11n/s/m/l/x)")
group = parser.add_argument_group("构建参数")
group.add_argument("--input-size", type=int, default=480,
help="输入图像尺寸 (默认: 480)")
group.add_argument("--min-batch", type=int, default=1,
help="最小Batch大小 (默认: 1)")
group.add_argument("--opt-batch", type=int, default=4,
help="优化Batch大小TensorRT针对此尺寸专门优化 (默认: 4)")
group.add_argument("--max-batch", type=int, default=8,
help="最大Batch大小 (默认: 8)")
group.add_argument("--no-fp16", dest="fp16", action="store_false",
help="禁用 FP16 精度")
parser.set_defaults(fp16=True)
opt_group = parser.add_argument_group("8GB显存优化参数")
opt_group.add_argument("--workspace", type=int, default=6144,
help="工作空间大小 MB8GB显存建议 6144 (默认: 6144)")
opt_group.add_argument("--tactics", type=str,
default="+CUBLAS,+CUBLAS_LT,+CUDNN",
help="优化策略 (默认: +CUBLAS,+CUBLAS_LT,+CUDNN)")
opt_group.add_argument("--best", action="store_true", default=True,
help="全局最优搜索 (默认: 启用)")
opt_group.add_argument("--no-best", dest="best", action="store_false",
help="禁用全局最优搜索")
opt_group.add_argument("--preview", type=str,
default="+faster_dynamic_shapes_0805",
help="预览特性 (默认: +faster_dynamic_shapes_0805)")
args = parser.parse_args()
check_environment()
if not TRT_AVAILABLE:
print("\n错误: TensorRT 未安装")
print("请安装: pip install tensorrt")
return 1
try:
from ultralytics import YOLO
except ImportError:
print("\n错误: Ultralytics 未安装")
print("请安装: pip install ultralytics")
return 1
model_path = args.model
if args.download:
model_path = download_model(args.download)
elif not os.path.exists(model_path):
if model_path.endswith('.pt') or model_path.endswith('.onnx'):
print(f"错误: 模型文件不存在: {model_path}")
return 1
else:
model_path = download_model(model_path)
else:
print(f"\n使用本地模型: {model_path}")
onnx_path = model_path
if model_path.endswith('.pt'):
onnx_path = model_path.replace('.pt', '.onnx')
success = export_onnx(model_path, onnx_path, args.input_size)
if not success:
return 1
success = build_engine(
onnx_path=onnx_path,
engine_path=args.output,
input_size=args.input_size,
min_batch=args.min_batch,
opt_batch=args.opt_batch,
max_batch=args.max_batch,
fp16=args.fp16,
workspace_mb=args.workspace,
tactic_sources=args.tactics,
best=args.best,
preview=args.preview
)
if success:
print(f"\n{'='*60}")
print("构建完成!")
print(f"{'='*60}")
print(f"Engine 文件: {args.output}")
print(f"\n推荐使用 trtexec 测试性能:")
print(f" trtexec --loadEngine={args.output} --streams=8 --iterations=100")
return 0
else:
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,43 +0,0 @@
"""检查 TensorRT Engine 的实际 shape"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
try:
import tensorrt as trt
engine_path = "./models/yolo11n.engine"
with open(engine_path, "rb") as f:
runtime = trt.Runtime(trt.Logger(trt.Logger.WARNING))
engine = runtime.deserialize_cuda_engine(f.read())
print("=" * 60)
print("Engine Binding Information")
print("=" * 60)
for i in range(engine.num_bindings):
name = engine.get_binding_name(i)
shape = engine.get_binding_shape(i)
dtype = trt.nptype(engine.get_binding_dtype(i))
is_input = engine.binding_is_input(i)
size = trt.volume(shape)
print(f"\nBinding {i}:")
print(f" Name: {name}")
print(f" Shape: {shape}")
print(f" Dtype: {dtype}")
print(f" Size: {size}")
print(f" Is Input: {is_input}")
if is_input:
print(f" Total Elements: {size}")
print(f" Expected Batch Size: {shape[0] if len(shape) > 0 else 'N/A'}")
print("\n" + "=" * 60)
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()

View File

@@ -1,53 +0,0 @@
"""检查 TensorRT Engine 输出的实际 shape"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import numpy as np
try:
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
engine_path = "./models/yolo11n.engine"
with open(engine_path, "rb") as f:
runtime = trt.Runtime(trt.Logger(trt.Logger.WARNING))
engine = runtime.deserialize_cuda_engine(f.read())
context = engine.create_execution_context()
print("=" * 60)
print("Engine Binding Information")
print("=" * 60)
for i in range(engine.num_bindings):
name = engine.get_binding_name(i)
shape = engine.get_binding_shape(i)
dtype = trt.nptype(engine.get_binding_dtype(i))
is_input = engine.binding_is_input(i)
print(f"\nBinding {i}: {name}")
print(f" Shape: {shape}")
print(f" Dtype: {dtype}")
print(f" Is Input: {is_input}")
print("\n" + "=" * 60)
input_shape = engine.get_binding_shape(0)
output_shape = engine.get_binding_shape(1)
print(f"Input shape: {input_shape}")
print(f"Output shape: {output_shape}")
input_size = np.prod([max(1, s) for s in input_shape])
output_size = np.prod([max(1, s) for s in output_shape])
print(f"Input size: {input_size}")
print(f"Output size: {output_size}")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()

View File

@@ -1,69 +0,0 @@
"""调试 TensorRT 输出 shape"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import numpy as np
try:
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
engine_path = "./models/yolo11n.engine"
with open(engine_path, "rb") as f:
runtime = trt.Runtime(trt.Logger(trt.Logger.WARNING))
engine = runtime.deserialize_cuda_engine(f.read())
context = engine.create_execution_context()
input_shape = (1, 3, 480, 480)
input_data = np.random.randn(*input_shape).astype(np.float32)
input_binding_idx = 0
output_binding_idx = 1
output_shape = engine.get_binding_shape(output_binding_idx)
print(f"Engine 定义 output shape: {output_shape}")
context.set_input_shape(engine.get_binding_name(0), input_shape)
output_size = int(np.prod([max(1, s) for s in output_shape]))
h_input = cuda.pagelocked_empty(input_data.size, np.float32)
h_output = cuda.pagelocked_empty(output_size, np.float32)
np.copyto(h_input, input_data.ravel())
d_input = cuda.mem_alloc(h_input.nbytes)
d_output = cuda.mem_alloc(h_output.nbytes)
bindings = [int(d_input), int(d_output)]
cuda.memcpy_htod(d_input, h_input)
context.execute_v2(bindings=bindings)
cuda.memcpy_dtoh(h_output, d_output)
output_array = h_output.reshape(output_shape)
print(f"\n实际输出:")
print(f" dtype: {output_array.dtype}")
print(f" shape: {output_array.shape}")
print(f" ndim: {output_array.ndim}")
if output_array.ndim == 1:
print(f" total elements: {output_array.shape[0]}")
print(f" expected (84*4725): {84 * 4725}")
elif output_array.ndim == 2:
print(f" shape[0]: {output_array.shape[0]} (detections)")
print(f" shape[1]: {output_array.shape[1]} (features)")
elif output_array.ndim == 3:
print(f" shape[0]: {output_array.shape[0]} (batch)")
print(f" shape[1]: {output_array.shape[1]} (classes+coords)")
print(f" shape[2]: {output_array.shape[2]} (num_boxes)")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,130 +0,0 @@
"""
边缘端运行测试脚本
添加测试摄像头和ROI配置验证系统正常运行
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config.database import get_sqlite_manager
from datetime import datetime
import random
def setup_test_data():
"""设置测试数据"""
db = get_sqlite_manager()
print("=" * 60)
print("边缘端运行测试 - 数据准备")
print("=" * 60)
camera_id = "test_camera_01"
rtsp_url = "rtsp://admin:admin@172.16.8.35/cam/realmonitor?channel=6&subtype=1"
print(f"\n1. 添加摄像头配置")
print(f" camera_id: {camera_id}")
print(f" rtsp_url: {rtsp_url}")
result = db.save_camera_config(
camera_id=camera_id,
rtsp_url=rtsp_url,
camera_name="测试摄像头-车间入口",
location="车间入口通道",
enabled=True,
status=True
)
print(f" 结果: {'成功' if result else '失败'}")
print(f"\n2. 添加ROI配置随机划分区域")
roi_configs = [
{
"roi_id": f"{camera_id}_roi_01",
"name": "离岗检测区域",
"roi_type": "polygon",
"coordinates": [[100, 50], [300, 50], [300, 200], [100, 200]],
"algorithm_type": "leave_post",
"target_class": "person",
"confirm_on_duty_sec": 10,
"confirm_leave_sec": 30,
"cooldown_sec": 60,
"working_hours": [{"start": "08:00", "end": "18:00"}],
},
{
"roi_id": f"{camera_id}_roi_02",
"name": "入侵检测区域",
"roi_type": "polygon",
"coordinates": [[350, 50], [550, 50], [550, 200], [350, 200]],
"algorithm_type": "intrusion",
"target_class": "person",
"alert_threshold": 3,
"alert_cooldown": 60,
"confirm_on_duty_sec": 10,
"confirm_leave_sec": 10,
"cooldown_sec": 60,
"working_hours": None,
},
]
for roi in roi_configs:
print(f"\n ROI: {roi['name']}")
print(f" - roi_id: {roi['roi_id']}")
print(f" - algorithm_type: {roi['algorithm_type']}")
print(f" - coordinates: {roi['coordinates']}")
result = db.save_roi_config(
roi_id=roi["roi_id"],
camera_id=camera_id,
roi_type=roi["roi_type"],
coordinates=roi["coordinates"],
algorithm_type=roi["algorithm_type"],
target_class=roi["target_class"],
confirm_on_duty_sec=roi["confirm_on_duty_sec"],
confirm_leave_sec=roi["confirm_leave_sec"],
cooldown_sec=roi["cooldown_sec"],
working_hours=str(roi["working_hours"]),
)
print(f" 结果: {'成功' if result else '失败'}")
print("\n" + "=" * 60)
print("测试数据准备完成")
print("=" * 60)
return camera_id, roi_configs
def verify_data():
"""验证数据"""
db = get_sqlite_manager()
print("\n" + "=" * 60)
print("验证数据库中的配置")
print("=" * 60)
cameras = db.get_all_camera_configs()
print(f"\n摄像头数量: {len(cameras)}")
for cam in cameras:
print(f" - {cam['camera_id']}: {cam['camera_name']} ({cam['rtsp_url'][:50]}...)")
rois = db.get_all_roi_configs()
print(f"\nROI数量: {len(rois)}")
for roi in rois:
print(f" - {roi['roi_id']}: {roi['name']} ({roi['algorithm_type']})")
return len(cameras) > 0 and len(rois) > 0
if __name__ == "__main__":
print("\n" + "#" * 60)
print("# 边缘端运行测试 - 数据准备")
print("#" * 60)
setup_test_data()
verify_data()
print("\n" + "#" * 60)
print("# 测试数据准备完成,请运行 main.py 进行推理测试")
print("#" * 60)

View File

@@ -1,69 +0,0 @@
"""
边缘端运行测试脚本 - 推理测试
运行 main.py 并测试 30 秒
"""
import subprocess
import sys
import os
import time
def run_test():
print("=" * 60)
print("边缘端运行测试 - 推理测试")
print("=" * 60)
print(f"测试时长: 30 秒")
print(f"测试时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
env = os.environ.copy()
env['PATH'] = r"C:\Users\16337\miniconda3\envs\yolo;" + env.get('PATH', '')
cmd = [
sys.executable, "main.py"
]
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)),
env=env
)
output_lines = []
start_time = time.time()
try:
while True:
line = process.stdout.readline()
if not line and process.poll() is not None:
break
if line:
output_lines.append(line.strip())
print(line.strip())
elapsed = time.time() - start_time
if elapsed >= 30:
print(f"\n[INFO] 测试达到 30 秒,停止进程...")
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
break
except KeyboardInterrupt:
print("\n[INFO] 用户中断测试")
process.terminate()
return output_lines
if __name__ == "__main__":
run_test()
print("\n" + "=" * 60)
print("测试完成")
print("=" * 60)