大家好,我是 展菲,目前在上市企业从事人工智能项目研发管理工作,平时热衷于分享各种编程领域的软硬技能知识以及前沿技术,包括iOS、前端、Harmony OS、Java、Python等方向。在移动端开发、鸿蒙开发、物联网、嵌入式、云原生、开源等领域有深厚造诣。
图书作者:《ESP32-C3 物联网工程开发实战》
图书作者:《SwiftUI 入门,进阶与实战》
超级个体:COC上海社区主理人
特约讲师:大学讲师,谷歌亚马逊分享嘉宾
科技博主:华为HDE/HDG
我的博客内容涵盖广泛,主要分享技术教程、Bug解决方案、开发工具使用、前沿科技资讯、产品评测与使用体验。我特别关注云服务产品评测、AI 产品对比、开发板性能测试以及技术报告,同时也会提供产品优缺点分析、横向对比,并分享技术沙龙与行业大会的参会体验。我的目标是为读者提供有深度、有实用价值的技术洞察与分析。
展菲:您的前沿技术领航员
👋 大家好,我是展菲!
📱 全网搜索“展菲”,即可纵览我在各大平台的知识足迹。
📣 公众号“Swift社区”,每周定时推送干货满满的技术长文,从新兴框架的剖析到运维实战的复盘,助您技术进阶之路畅通无阻。
💬 微信端添加好友“fzhanfei”,与我直接交流,不管是项目瓶颈的求助,还是行业趋势的探讨,随时畅所欲言。
📅 最新动态:2025 年 3 月 17 日
快来加入技术社区,一起挖掘技术的无限潜能,携手迈向数字化新征程!
文章目录
摘要
大模型上到生产之后,最先撞墙的往往不是“精度”,而是“吞吐”和“时延”。聊天问答、RAG 检索问答、智能客服这类在线场景,并发高、延迟敏感,常规“一次性 batch + 逐条生成”的做法很快就遇到性能瓶颈:GPU 吃不满、排队时间长、波动大。本文聚焦一套基于 NVIDIA 生态的“Dynamo 风格”分布式 LLM 推理架构(文中简称 Dynamo 架构):核心是连续批处理(continuous batching / dynamic batching)、Prefill/Decode 分离、Token 级流式输出和KV Cache 有效管理,并结合多卡通信(NCCL)、TensorRT-LLM、Triton Inference Server(或 NIM)的工程化能力,给出从原理到可运行 Demo 的完整路径。
引言
现在主流推理框架都在往两个方向发力:
一是单卡效率,靠算子融合、CUDA Graph、FP8/INT8、Paged KV 等技术把单次 kernel 做大做满;
二是调度层效率,靠更聪明的批合并、流式解码、缓存复用让“GPU 忙起来”。
实际落地时,大家经常遇到这些痛点:
- 吞吐/延迟的拉扯:批大一点吞吐好了,但最慢的那个请求拖着整批人一起慢;批小一点时延低了,但 GPU 忙闲不均。
- Prompt 长度差异大:Prefill 阶段(吃 prompt)是长序列 GEMM,Decode 阶段(逐 token)是短序列/单步计算,混一起做常常低效。
- KV Cache 爆内存:并发高的时候,KV Cache 占满显存;请求又长又多时还会碎片化,OOM 很常见。
- 在线流式输出:用户想“边生成边看”,但你要一边发 token,一边还能继续合批,工程上不太好写。
Dynamo 架构就是在这堆现实问题上“抠细节”:分阶段合批、持续注入新请求、Token 级别出结果、按页管理 KV。下面我们把它拆开说,并给出一个能直接跑起来的最小可用 Demo(CPU/GPU 都能跑),帮你把核心机制吃透,再对接到 Triton / TensorRT-LLM 时会顺手很多。
Dynamo 推理的关键模块
计算面:引擎与并行
- 引擎层用 TensorRT-LLM(或 PyTorch + CUDA Graph)承载模型执行;
- 结合 TP(张量并行)/PP(流水线并行) 和 NCCL 做多卡扩展;
- Decode 路径启用 Paged KV、Flash Attention、FP8/INT8 等优化。
服务面:模型服务与动态批处理
- 用 Triton Inference Server(或 NVIDIA NIM 微服务)做在线服务;
- 启用 dynamic batching / decoupled transaction policy,让请求在毫秒级延迟预算内合批。
调度面:Dynamo 风格的连续批处理
- 请求进入“接入队列”后不必等整批完成,在 token 边界持续并入;
- Prefill 与 Decode 分离:新请求先做 Prefill(长序列计算),再加入 Decode 批次(短序列 token-by-token)。
存储面:KV Cache 管理
- 按页管理(Paged KV),回收和复用更容易;
- 配额与水位:限制单租户最大并发、触发降级或反压;
- 冷热分层:长上下文或“挂起请求”的 KV 优先换出。
观测面:可视化与自愈
- 关键指标:GPU 使用率、合批效率、Prefill/Decode 时间、KV 命中率/占用、p95/p99 时延;
- 超阈值触发自动降采样、降精度、切换更激进的合批策略。
Dynamo 推理架构解析
Batch 合并策略:怎么既不拖慢人,又让 GPU 忙起来
一个好用的合批器至少考虑这几条:
- 时间上限:最大等待时间,比如 1~3ms,过时就直接发,保证尾时延。
- 容量上限:
max_batch_size
、max_tokens_per_batch
双限同时生效,避免单批太重。 - Prefill / Decode 拆分:Prefill 先按Prompt token 总量控重;Decode 按活跃会话数控重。
- 公平性:优先级队列(SLA),避免长请求饿死短请求,或多租户之间互相影响。
- 可抢占:Decode 每一步都是天然切片点,可以在 token 边界调度,让新请求尽快插队进入 Prefill。
Token 流式输出:既要快,又要稳定
- 解码步进:每次只向前生成一个 token(或小于等于 N 个),立刻把该 token 推给客户端;
- 解耦 I/O:推流线程和计算线程分离,避免阻塞合批;
- 回压控制:客户端慢时不要拖死计算,必要时丢弃部分中间增量,或聚合为“句子级片段”再发。
KV Cache 管理:让显存一直够用
- 按页管理:每个序列的 KV 占用被切成固定大小的 page,释放与复用都以 page 为单位;
- 配额&回收:超出配额的会话拒绝接入、挂起或降级;
- 前缀复用:命中相同系统提示或检索段时,KV 前缀可复用,Prefill 直接省掉一大截。
最小可运行的 Dynamo 风格推理原型
下面这段 Python 代码用 HuggingFace 的 transformers
跑一个可连续合批 + 流式输出 + KV 复用的最小原型。为了容易跑通,默认用 CPU + distilgpt2
,也支持 GPU(装好 PyTorch CUDA 即可)。
运行前安装依赖:
pip install torch transformers
# file: toy_dynamo_engine.py
import time
import threading
import queue
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
# --------- 配置 ----------
MODEL_NAME = "distilgpt2" # 体积小,CPU 也能跑
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
MAX_BATCH_DECODE = 16 # 每步 decode 合批的最大会话数
MAX_PREFILL_TOKENS = 8192 # 单次 prefill 的总 token 上限(防止过大序列)
MAX_QUEUE_DELAY_MS = 3 # 合批最大等待时间
MAX_NEW_TOKENS = 64 # 每个请求最多生成多少 token
# --------- 数据结构 ----------
@dataclass
class Request:
req_id: int
prompt: str
max_new_tokens: int = MAX_NEW_TOKENS
created_at: float = field(default_factory=time.time)
output_tokens: List[int] = field(default_factory=list)
done: bool = False
# 由调度器填充
input_ids: Optional[torch.Tensor] = None
past_key_values: Optional[Any] = None
last_token: Optional[torch.Tensor] = None
# --------- 引擎 ----------
class ToyDynamoEngine:
def __init__(self):
self.tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
self.model = AutoModelForCausalLM.from_pretrained(MODEL_NAME).to(DEVICE)
self.model.eval()
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
@torch.no_grad()
def prefill(self, reqs: List[Request]):
""" 对新来的请求做 prefill,得到初始 past_key_values 和第一个解码 token """
inputs = self.tokenizer([r.prompt for r in reqs], return_tensors="pt", padding=True).to(DEVICE)
# 控制总 token,避免超大 batch
total_tokens = inputs["input_ids"].numel()
if total_tokens > MAX_PREFILL_TOKENS:
raise RuntimeError(f"Prefill tokens {total_tokens} exceed limit {MAX_PREFILL_TOKENS}")
outputs = self.model(**inputs, use_cache=True)
# 取每个序列最后一个位置的 logits,采样一个 token 作为第一步输出
last_logits = outputs.logits[:, -1, :]
next_tokens = torch.argmax(last_logits, dim=-1) # 为简单演示,用 argmax;生产建议用采样或温度控制
# 更新请求状态
for i, r in enumerate(reqs):
r.input_ids = inputs["input_ids"][i:i+1] # 保存原始 prompt(可选)
r.past_key_values = outputs.past_key_values
r.last_token = next_tokens[i:i+1].unsqueeze(0) # [1,1]
r.output_tokens.append(int(next_tokens[i].item()))
return reqs
@torch.no_grad()
def decode_step(self, active: List[Request]):
""" 对活跃会话做一步 decode:输入 last_token + past,输出下一个 token """
if not active:
return
# 合批 last_token
input_ids = torch.cat([r.last_token for r in active], dim=0).to(DEVICE) # [B,1]
past = active[0].past_key_values # 这里简单起见假设结构一致
outputs = self.model(input_ids=input_ids, past_key_values=past, use_cache=True)
logits = outputs.logits[:, -1, :]
next_tokens = torch.argmax(logits, dim=-1)
# 更新每个请求的状态
for i, r in enumerate(active):
r.past_key_values = outputs.past_key_values
r.last_token = next_tokens[i:i+1].unsqueeze(0) # [1,1]
r.output_tokens.append(int(next_tokens[i].item()))
if len(r.output_tokens) >= r.max_new_tokens or int(next_tokens[i].item()) == self.tokenizer.eos_token_id:
r.done = True
def decode_text(self, token_ids: List[int]) -> str:
return self.tokenizer.decode(token_ids, skip_special_tokens=True)
# --------- 调度器(连续批处理) ----------
class DynamoScheduler:
def __init__(self, engine: ToyDynamoEngine):
self.engine = engine
self.waiting_q: "queue.Queue[Request]" = queue.Queue()
self.active: List[Request] = []
self.lock = threading.Lock()
self.stop_flag = False
def submit(self, req: Request):
self.waiting_q.put(req)
def run_forever(self):
""" 主循环:小延迟等一会儿,聚合 prefill;随后进入 decode 步进,期间持续接纳新请求 """
while not self.stop_flag:
# 1) 聚合 prefill
newcomers = self._gather_newcomers(MAX_QUEUE_DELAY_MS / 1000)
if newcomers:
try:
self.engine.prefill(newcomers)
with self.lock:
self.active.extend(newcomers)
except Exception as e:
for r in newcomers:
r.done = True
print("Prefill error:", e)
# 2) 一步 decode
actives = [r for r in self.active if not r.done]
if not actives and self.waiting_q.empty():
time.sleep(0.001)
continue
# 控制 decode 批大小
step_batch = actives[:MAX_BATCH_DECODE]
if step_batch:
self.engine.decode_step(step_batch)
# 3) 输出增量(模拟流式)
self._flush_streaming(step_batch)
# 4) 清理已完成请求
with self.lock:
self.active = [r for r in self.active if not r.done]
def _gather_newcomers(self, wait_seconds: float) -> List[Request]:
""" 在一个很短的时间窗内收集新请求,做 prefill 批 """
start = time.time()
newcomers = []
while time.time() - start < wait_seconds:
try:
r = self.waiting_q.get_nowait()
newcomers.append(r)
except queue.Empty:
time.sleep(0.001)
return newcomers
def _flush_streaming(self, batch: List[Request]):
""" 这里简单打印每一步的增量,你可以改成 WebSocket/SSE 推流 """
for r in batch:
if r.output_tokens:
text = self.engine.decode_text([r.output_tokens[-1]])
print(f"[req#{r.req_id}] +{repr(text)}", flush=True)
# --------- 演示入口 ----------
def demo():
eng = ToyDynamoEngine()
sch = DynamoScheduler(eng)
t = threading.Thread(target=sch.run_forever, daemon=True)
t.start()
# 模拟高并发提交
prompts = [
"Explain the significance of GPU memory bandwidth in LLM inference.",
"Write a short poem about distributed systems.",
"What is continuous batching and why does it help?",
"Summarize the benefits of KV cache paging in two sentences.",
]
reqs: Dict[int, Request] = {}
for i, p in enumerate(prompts, 1):
r = Request(req_id=i, prompt=p, max_new_tokens=40)
reqs[i] = r
sch.submit(r)
# 等待全部完成
while any(not r.done for r in reqs.values()):
time.sleep(0.05)
# 汇总输出
for i, r in reqs.items():
whole_text = eng.decode_text(r.output_tokens)
print(f"\n=== Request #{i} Final ===\n{whole_text}\n")
sch.stop_flag = True
t.join(timeout=1)
if __name__ == "__main__":
demo()
代码讲解(抓要点就好)
- Prefill:把新请求在一个很短的时间窗(默认 3ms)内聚合起来,一次性做长序列前向,得到
past_key_values
和第一个 token。 - Decode:把活跃会话的
last_token
合起来做一步解码,拿到下一个 token 并更新 KV。 - 连续并入:主循环每一轮都尝试把“刚刚到达的新请求”做 prefill,然后继续 decode,不会等整批结束。
- 流式输出:每步 decode 后把本步新 token 打印出来,实际工程里换成 WebSocket/SSE 推给客户端即可。
- KV 管理:示例里把
past_key_values
挂在 Request 上。真实系统会做 page 化和内存池复用,这里保留了思路。
把 Demo 搬进生产:Triton 动态批处理与多实例
Triton 动态批处理配置示例
如果你用 Triton Inference Server 托管 TensorRT-LLM 引擎,可以在 config.pbtxt
开启动态批处理与解耦响应,达到和上面 Demo 相同的调度思路:
name: "llm-trtllm"
backend: "tensorrtllm"
max_batch_size: 128
instance_group [
{ kind: KIND_GPU count: 1 gpus: [0] },
{ kind: KIND_GPU count: 1 gpus: [1] }
]
dynamic_batching {
preferred_batch_size: [ 4, 8, 16, 32 ]
max_queue_delay_microseconds: 3000 # 3ms 等待窗口
}
# 解耦请求-响应,便于流式输出
model_transaction_policy { decoupled: true }
# 打开内置缓存(如启用可用)
response_cache { enable: true }
要点:
max_queue_delay_microseconds
就是我们的短窗合批;decoupled: true
允许多次响应,对应 token 级流式;- 多
instance_group
可以把一个模型同时开多个副本,利用大卡/MIG 做隔离和扩容。
Batch 合并策略、流式输出与 KV Cache
Batch 合并策略的取舍
- 吞吐优先:把
preferred_batch_size
设大一些,max_queue_delay
放宽到 4~6ms,能大幅提升 GPU 利用; - 时延优先:把
preferred_batch_size
收紧,max_queue_delay
控制在 1~2ms,牺牲部分吞吐; - 混合策略:对不同 SLA 的租户使用不同队列,不同的
max_queue_delay
,高优先级走低延迟策略。
Token 级流式输出
- Triton/NIM 用解耦事务,一条请求可以多次
send
; - 如果你自己写服务,SSE/WebSocket 都行。关键是I/O 解耦:推流线程不要阻塞 decode。
KV Cache 管理的小技巧
- 按页管理:统一 page 大小(比如 2MB),回收/复用都用 page 计数;
- 复用前缀:RAG/系统提示经常重复,前缀 KV 命中能省掉一大段 Prefill;
- 阈值&回收:高水位触发回收策略:暂停超长请求、降低并发、切浮点精度。
把它用在真实业务里
下面给 3 个常见场景,顺便给出必要的代码/配置片段。
场景一:聊天问答 API 网关
需求:超高 QPS,延迟敏感,用户要“边打字边看结果”。
做法:
- API 网关把请求丢进短窗队列,每 1~3ms 聚合一次做 Prefill;
- 进入 Decode 后每步出 token,SSE 推给前端;
- Triton 配置
decoupled: true
,业务代码里每步send()
。
SSE 端点伪代码(Python/FastAPI):
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat/stream")
def chat_stream(prompt: str):
def token_stream():
req = Request(req_id=int(time.time()*1000)%100000, prompt=prompt, max_new_tokens=128)
scheduler.submit(req)
last_len = 0
while not req.done:
if len(req.output_tokens) > last_len:
text = engine.decode_text(req.output_tokens[last_len:])
last_len = len(req.output_tokens)
yield f"data: {text}\n\n"
time.sleep(0.01)
yield "data: [DONE]\n\n"
return StreamingResponse(token_stream(), media_type="text/event-stream")
场景二:RAG 检索问答
痛点:Prompt 很长(检索段拼进去),Prefill 特别重。
做法:
- 把“系统提示 + 领域说明 + 模板”作为前缀 KV缓存,检索段变化时只拼后缀;
- 命中前缀直接跳过大段 Prefill;
- 合批时对 Prefill 设
MAX_PREFILL_TOKENS
,避免偶发超长请求拖全局。
前缀 KV 复用思路(示意):
# 假设 prefix_kv_map 缓存了若干常见前缀的 KV
prefix = build_prefix(system_prompt, domain_hint, template) # 只包括稳定部分
key = hash(prefix)
if key in prefix_kv_map:
r.past_key_values = clone(prefix_kv_map[key]) # 直接拿到 KV
r.input_ids = tokenizer(rag_snippets + question) # 只对新增部分做 prefill
else:
# 第一次命中:先把 prefix 做一遍 prefill,然后缓存 KV
prefix_req = Request(req_id=-1, prompt=prefix, max_new_tokens=1)
engine.prefill([prefix_req])
prefix_kv_map[key] = clone(prefix_req.past_key_values)
场景三:多租户 SLA 与限流
痛点:不同租户对延迟/吞吐诉求不一样,容易互相影响。
做法:
- 维护多优先级队列,高优先级
max_queue_delay
更小; - 租户配额:限制“活跃会话数”和“总 KV page 数”;
- 过载时对低优先级租户降级(减小
max_new_tokens
、降低温度/核采样)。
多优先级合批伪代码:
def gather_newcomers():
high, normal = [], []
start = time.time()
while time.time() - start < MAX_QUEUE_DELAY_MS/1000:
r = try_pop_high_or_normal_queue()
if not r:
time.sleep(0.001); continue
(high if r.priority == "high" else normal).append(r)
# 先吃高优先级,空余再放普通
return (high + normal)[:MAX_BATCH_DECODE]
QA 环节
Q1:Prefill/Decode 为什么要分开?
Prefill 是长序列 GEMM,Decode 是短序列、token 级前向。把两种负载硬拼一起,显卡经常“忽忙忽闲”。分开后,你可以对 Prefill 限制总 token,对 Decode 限制活跃会话数,各自吃满各自的“甜点区”。
Q2:连续批处理会不会饿死长请求?
会,如果不做公平性。用优先级队列+轮转,从不同桶里按比例取请求进入 decode 批。长请求也能稳定推进。
Q3:KV Cache 是不是一直增?
不是。按页管理后,请求结束立刻归还 page。高水位触发回收策略,先清理低优先级/挂起的会话。
Q4:流式输出影响吞吐吗?
I/O 线程要解耦,输出聚合到句子级再发,或者使用零拷贝队列,就不会拖慢计算。Triton 的 decoupled transaction 可以放心用。
Q5:TensorRT-LLM 和 PyTorch 选择哪个?
追求极致延迟/吞吐、模型相对稳定用 TensorRT-LLM;需要快速迭代/频繁改模型,用 PyTorch + CUDA Graph 也能做得很强。生产常见做法是两者并行:验证在 PyTorch,稳定后下沉到 TensorRT-LLM。
Q6:多卡如何划分并行?
长上下文/大模型先考虑 TP(张量并行),很长序列或需要拉长流水线时再加 PP。跨机房的通信延迟会显著影响效果,尽量同机架内聚合。
总结
把 LLM 真正跑上线,关键不是“一个模型多准”,而是“一个 GPU 多忙、一个请求多稳”。Dynamo 风格的分布式推理架构抓住了生产里的三件事:连续批处理把吞吐提起来;流式输出把体验做顺;KV Cache 管理把显存稳住。
本文先用一个能跑起来的最小 Demo 把思路讲清楚,再落到 Triton/TensorRT-LLM 的配置与实践,最后结合聊天问答、RAG、多租户三类常见场景给出实操建议。你可以先直接跑 Demo 感受“连续批”的节奏,再把批策略、流式通道和 KV 管理逐步换成你线上框架的等价能力。这样推进,既能尽快见到收益,也能把复杂度压在你可控的范围里。