"""并发压测 - 测试不同并发数下的性能表现""" import json import os import sys import glob import time import torch import threading from concurrent.futures import ThreadPoolExecutor, as_completed from transformers import AutoModelForCausalLM, AutoTokenizer from datetime import datetime sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from model_utils import load_model, apply_chat def single_inference(model, tokenizer, prompt, lock, max_tokens=64): """单次推理(线程安全)""" messages = [{"role": "user", "content": prompt}] text = apply_chat(tokenizer, messages) inputs = tokenizer(text, return_tensors="pt").to(model.device) input_len = inputs["input_ids"].shape[1] t0 = time.perf_counter() with lock: # GPU 推理需要串行(单 GPU) with torch.no_grad(): outputs = model.generate( **inputs, max_new_tokens=max_tokens, do_sample=False, ) elapsed = time.perf_counter() - t0 output_len = outputs.shape[1] - input_len return { "time_s": elapsed, "output_tokens": output_len, "tokens_per_sec": output_len / elapsed if elapsed > 0 else 0, } def test_concurrency(model, tokenizer): """测试不同并发数下的表现""" print("=" * 60) print("并发压测") print("=" * 60) prompts = [ "什么是人工智能?", "请解释量子计算。", "Python的优点是什么?", "深度学习和机器学习的区别?", "什么是自然语言处理?", "解释一下GPT的工作原理。", "什么是强化学习?", "云计算的优势有哪些?", ] concurrency_levels = [1, 2, 4, 8] lock = threading.Lock() results = [] for n_concurrent in concurrency_levels: print(f"\n--- 并发数: {n_concurrent} ---") test_prompts = (prompts * ((n_concurrent // len(prompts)) + 1))[:n_concurrent] t0 = time.perf_counter() futures_results = [] with ThreadPoolExecutor(max_workers=n_concurrent) as executor: futures = [ executor.submit(single_inference, model, tokenizer, p, lock) for p in test_prompts ] for f in as_completed(futures): futures_results.append(f.result()) total_time = time.perf_counter() - t0 total_tokens = sum(r["output_tokens"] for r in futures_results) avg_latency = sum(r["time_s"] for r in futures_results) / len(futures_results) throughput = total_tokens / total_time result = { "concurrency": n_concurrent, "total_time_s": round(total_time, 2), "total_tokens": total_tokens, "throughput_tokens_per_sec": round(throughput, 1), "avg_latency_s": round(avg_latency, 2), "requests_completed": len(futures_results), } results.append(result) print(f" 总耗时: {result['total_time_s']}s") print(f" 总 tokens: {result['total_tokens']}") print(f" 吞吐量: {result['throughput_tokens_per_sec']} tokens/s") print(f" 平均延迟: {result['avg_latency_s']}s") # 保存 output_dir = "vsp/qwen3.5-9b/results" os.makedirs(output_dir, exist_ok=True) report = { "timestamp": datetime.now().isoformat(), "model": "Qwen3.5-9B", "quantization": "4-bit NF4", "note": "单GPU串行推理,并发测试主要体现请求排队效果", "concurrency_results": results, } path = os.path.join(output_dir, "concurrency_results.json") with open(path, "w", encoding="utf-8") as f: json.dump(report, f, ensure_ascii=False, indent=2) print(f"\n结果已保存到 {path}") return results if __name__ == "__main__": os.chdir(os.path.dirname(os.path.abspath(__file__)) + "/../..") model, tokenizer = load_model() test_concurrency(model, tokenizer)