2026-03-16 11:45:51 +08:00
|
|
|
|
"""并发压测 - 测试不同并发数下的性能表现"""
|
|
|
|
|
|
import json
|
|
|
|
|
|
import os
|
2026-03-16 13:05:20 +08:00
|
|
|
|
import sys
|
2026-03-16 11:45:51 +08:00
|
|
|
|
import glob
|
|
|
|
|
|
import time
|
|
|
|
|
|
import torch
|
|
|
|
|
|
import threading
|
|
|
|
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
2026-03-16 13:05:20 +08:00
|
|
|
|
from transformers import AutoModelForCausalLM, AutoTokenizer
|
2026-03-16 11:45:51 +08:00
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
2026-03-16 13:05:20 +08:00
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
2026-03-16 17:38:33 +08:00
|
|
|
|
from model_utils import load_model, apply_chat
|
2026-03-16 11:45:51 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def single_inference(model, tokenizer, prompt, lock, max_tokens=64):
|
|
|
|
|
|
"""单次推理(线程安全)"""
|
|
|
|
|
|
messages = [{"role": "user", "content": prompt}]
|
2026-03-16 17:38:33 +08:00
|
|
|
|
text = apply_chat(tokenizer, messages)
|
2026-03-16 11:45:51 +08:00
|
|
|
|
inputs = tokenizer(text, return_tensors="pt").to(model.device)
|
|
|
|
|
|
input_len = inputs["input_ids"].shape[1]
|
|
|
|
|
|
|
|
|
|
|
|
t0 = time.perf_counter()
|
|
|
|
|
|
with lock: # GPU 推理需要串行(单 GPU)
|
|
|
|
|
|
with torch.no_grad():
|
|
|
|
|
|
outputs = model.generate(
|
|
|
|
|
|
**inputs,
|
|
|
|
|
|
max_new_tokens=max_tokens,
|
|
|
|
|
|
do_sample=False,
|
|
|
|
|
|
)
|
|
|
|
|
|
elapsed = time.perf_counter() - t0
|
|
|
|
|
|
output_len = outputs.shape[1] - input_len
|
|
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
|
"time_s": elapsed,
|
|
|
|
|
|
"output_tokens": output_len,
|
|
|
|
|
|
"tokens_per_sec": output_len / elapsed if elapsed > 0 else 0,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_concurrency(model, tokenizer):
|
|
|
|
|
|
"""测试不同并发数下的表现"""
|
|
|
|
|
|
print("=" * 60)
|
|
|
|
|
|
print("并发压测")
|
|
|
|
|
|
print("=" * 60)
|
|
|
|
|
|
|
|
|
|
|
|
prompts = [
|
|
|
|
|
|
"什么是人工智能?",
|
|
|
|
|
|
"请解释量子计算。",
|
|
|
|
|
|
"Python的优点是什么?",
|
|
|
|
|
|
"深度学习和机器学习的区别?",
|
|
|
|
|
|
"什么是自然语言处理?",
|
|
|
|
|
|
"解释一下GPT的工作原理。",
|
|
|
|
|
|
"什么是强化学习?",
|
|
|
|
|
|
"云计算的优势有哪些?",
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
concurrency_levels = [1, 2, 4, 8]
|
|
|
|
|
|
lock = threading.Lock()
|
|
|
|
|
|
results = []
|
|
|
|
|
|
|
|
|
|
|
|
for n_concurrent in concurrency_levels:
|
|
|
|
|
|
print(f"\n--- 并发数: {n_concurrent} ---")
|
|
|
|
|
|
test_prompts = (prompts * ((n_concurrent // len(prompts)) + 1))[:n_concurrent]
|
|
|
|
|
|
|
|
|
|
|
|
t0 = time.perf_counter()
|
|
|
|
|
|
futures_results = []
|
|
|
|
|
|
|
|
|
|
|
|
with ThreadPoolExecutor(max_workers=n_concurrent) as executor:
|
|
|
|
|
|
futures = [
|
|
|
|
|
|
executor.submit(single_inference, model, tokenizer, p, lock)
|
|
|
|
|
|
for p in test_prompts
|
|
|
|
|
|
]
|
|
|
|
|
|
for f in as_completed(futures):
|
|
|
|
|
|
futures_results.append(f.result())
|
|
|
|
|
|
|
|
|
|
|
|
total_time = time.perf_counter() - t0
|
|
|
|
|
|
total_tokens = sum(r["output_tokens"] for r in futures_results)
|
|
|
|
|
|
avg_latency = sum(r["time_s"] for r in futures_results) / len(futures_results)
|
|
|
|
|
|
throughput = total_tokens / total_time
|
|
|
|
|
|
|
|
|
|
|
|
result = {
|
|
|
|
|
|
"concurrency": n_concurrent,
|
|
|
|
|
|
"total_time_s": round(total_time, 2),
|
|
|
|
|
|
"total_tokens": total_tokens,
|
|
|
|
|
|
"throughput_tokens_per_sec": round(throughput, 1),
|
|
|
|
|
|
"avg_latency_s": round(avg_latency, 2),
|
|
|
|
|
|
"requests_completed": len(futures_results),
|
|
|
|
|
|
}
|
|
|
|
|
|
results.append(result)
|
|
|
|
|
|
|
|
|
|
|
|
print(f" 总耗时: {result['total_time_s']}s")
|
|
|
|
|
|
print(f" 总 tokens: {result['total_tokens']}")
|
|
|
|
|
|
print(f" 吞吐量: {result['throughput_tokens_per_sec']} tokens/s")
|
|
|
|
|
|
print(f" 平均延迟: {result['avg_latency_s']}s")
|
|
|
|
|
|
|
|
|
|
|
|
# 保存
|
|
|
|
|
|
output_dir = "vsp/qwen3.5-9b/results"
|
|
|
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
|
|
report = {
|
|
|
|
|
|
"timestamp": datetime.now().isoformat(),
|
|
|
|
|
|
"model": "Qwen3.5-9B",
|
|
|
|
|
|
"quantization": "4-bit NF4",
|
|
|
|
|
|
"note": "单GPU串行推理,并发测试主要体现请求排队效果",
|
|
|
|
|
|
"concurrency_results": results,
|
|
|
|
|
|
}
|
|
|
|
|
|
path = os.path.join(output_dir, "concurrency_results.json")
|
|
|
|
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
|
|
|
|
json.dump(report, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
print(f"\n结果已保存到 {path}")
|
|
|
|
|
|
|
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
|
os.chdir(os.path.dirname(os.path.abspath(__file__)) + "/../..")
|
|
|
|
|
|
model, tokenizer = load_model()
|
|
|
|
|
|
test_concurrency(model, tokenizer)
|