"""性能基准测试 - 推理速度、首 token 延迟、吞吐量""" import time import json import os import glob import torch import psutil from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig from datetime import datetime def load_model(): """加载 4-bit 量化模型""" paths = glob.glob("vsp/qwen3.5-9b/model/**/config.json", recursive=True) model_path = os.path.dirname(paths[0]) if paths else "Qwen/Qwen3.5-9B" bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.float16, bnb_4bit_use_double_quant=True, ) tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) model = AutoModelForCausalLM.from_pretrained( model_path, quantization_config=bnb_config, device_map="auto", trust_remote_code=True, ) return model, tokenizer def benchmark_speed(model, tokenizer, num_runs=5): """测试不同输入长度和输出长度下的推理速度""" print("=" * 60) print("性能基准测试 - 推理速度") print("=" * 60) test_cases = [ {"name": "短输入短输出", "prompt": "你好", "max_tokens": 50}, {"name": "短输入中输出", "prompt": "介绍一下人工智能", "max_tokens": 128}, {"name": "短输入长输出", "prompt": "请详细解释深度学习的原理和应用", "max_tokens": 256}, {"name": "中输入中输出", "prompt": "以下是一段代码:\n```python\ndef fibonacci(n):\n if n <= 1:\n return n\n return fibonacci(n-1) + fibonacci(n-2)\n```\n请分析这段代码的时间复杂度并给出优化方案。", "max_tokens": 256}, {"name": "长输入短输出", "prompt": "请总结以下内容的关键点:" + "人工智能是计算机科学的一个分支。" * 50, "max_tokens": 64}, ] results = [] for case in test_cases: print(f"\n--- {case['name']} (max_tokens={case['max_tokens']}) ---") times = [] output_tokens_list = [] for run in range(num_runs): messages = [{"role": "user", "content": case["prompt"]}] text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs = tokenizer(text, return_tensors="pt").to(model.device) input_len = inputs["input_ids"].shape[1] torch.cuda.synchronize() t0 = time.perf_counter() with torch.no_grad(): outputs = model.generate( **inputs, max_new_tokens=case["max_tokens"], do_sample=False, # greedy for reproducibility ) torch.cuda.synchronize() gen_time = time.perf_counter() - t0 output_len = outputs.shape[1] - input_len times.append(gen_time) output_tokens_list.append(output_len) avg_time = sum(times) / len(times) avg_tokens = sum(output_tokens_list) / len(output_tokens_list) avg_speed = avg_tokens / avg_time if avg_time > 0 else 0 result = { "test_name": case["name"], "input_tokens": input_len, "avg_output_tokens": round(avg_tokens, 1), "avg_time_s": round(avg_time, 3), "avg_tokens_per_sec": round(avg_speed, 1), "min_time_s": round(min(times), 3), "max_time_s": round(max(times), 3), } results.append(result) print(f" 输入 tokens: {input_len}") print(f" 平均输出 tokens: {result['avg_output_tokens']}") print(f" 平均耗时: {result['avg_time_s']}s") print(f" 平均速度: {result['avg_tokens_per_sec']} tokens/s") return results def benchmark_memory(model): """测试显存和内存占用""" print(f"\n{'='*60}") print("显存与内存占用") print(f"{'='*60}") result = {} if torch.cuda.is_available(): result["gpu_allocated_gb"] = round(torch.cuda.memory_allocated() / 1024**3, 2) result["gpu_reserved_gb"] = round(torch.cuda.memory_reserved() / 1024**3, 2) result["gpu_total_gb"] = round(torch.cuda.get_device_properties(0).total_memory / 1024**3, 1) result["gpu_name"] = torch.cuda.get_device_name(0) process = psutil.Process() result["ram_used_gb"] = round(process.memory_info().rss / 1024**3, 2) result["ram_total_gb"] = round(psutil.virtual_memory().total / 1024**3, 1) for k, v in result.items(): print(f" {k}: {v}") return result def save_results(speed_results, memory_results): """保存测试结果""" output_dir = "vsp/qwen3.5-9b/results" os.makedirs(output_dir, exist_ok=True) report = { "timestamp": datetime.now().isoformat(), "model": "Qwen3.5-9B", "quantization": "4-bit NF4", "speed_benchmark": speed_results, "memory": memory_results, } output_path = os.path.join(output_dir, "benchmark_speed.json") with open(output_path, "w", encoding="utf-8") as f: json.dump(report, f, ensure_ascii=False, indent=2) print(f"\n结果已保存到 {output_path}") return output_path if __name__ == "__main__": os.chdir(os.path.dirname(os.path.abspath(__file__)) + "/../..") model, tokenizer = load_model() speed_results = benchmark_speed(model, tokenizer) memory_results = benchmark_memory(model) save_results(speed_results, memory_results)