基于 NVIDIA 生态的 Dynamo 风格分布式 LLM 推理架构

发布于:2025-08-18 ⋅ 阅读:(19) ⋅ 点赞:(0)

网罗开发 (小红书、快手、视频号同名)

  大家好,我是 展菲,目前在上市企业从事人工智能项目研发管理工作,平时热衷于分享各种编程领域的软硬技能知识以及前沿技术,包括iOS、前端、Harmony OS、Java、Python等方向。在移动端开发、鸿蒙开发、物联网、嵌入式、云原生、开源等领域有深厚造诣。

图书作者:《ESP32-C3 物联网工程开发实战》
图书作者:《SwiftUI 入门,进阶与实战》
超级个体:COC上海社区主理人
特约讲师:大学讲师,谷歌亚马逊分享嘉宾
科技博主:华为HDE/HDG

我的博客内容涵盖广泛,主要分享技术教程、Bug解决方案、开发工具使用、前沿科技资讯、产品评测与使用体验。我特别关注云服务产品评测、AI 产品对比、开发板性能测试以及技术报告,同时也会提供产品优缺点分析、横向对比,并分享技术沙龙与行业大会的参会体验。我的目标是为读者提供有深度、有实用价值的技术洞察与分析。

展菲:您的前沿技术领航员
👋 大家好,我是展菲!
📱 全网搜索“展菲”,即可纵览我在各大平台的知识足迹。
📣 公众号“Swift社区”,每周定时推送干货满满的技术长文,从新兴框架的剖析到运维实战的复盘,助您技术进阶之路畅通无阻。
💬 微信端添加好友“fzhanfei”,与我直接交流,不管是项目瓶颈的求助,还是行业趋势的探讨,随时畅所欲言。
📅 最新动态:2025 年 3 月 17 日
快来加入技术社区,一起挖掘技术的无限潜能,携手迈向数字化新征程!


摘要

大模型上到生产之后,最先撞墙的往往不是“精度”,而是“吞吐”和“时延”。聊天问答、RAG 检索问答、智能客服这类在线场景,并发高、延迟敏感,常规“一次性 batch + 逐条生成”的做法很快就遇到性能瓶颈:GPU 吃不满、排队时间长、波动大。本文聚焦一套基于 NVIDIA 生态的“Dynamo 风格”分布式 LLM 推理架构(文中简称 Dynamo 架构):核心是连续批处理(continuous batching / dynamic batching)Prefill/Decode 分离Token 级流式输出KV Cache 有效管理,并结合多卡通信(NCCL)、TensorRT-LLM、Triton Inference Server(或 NIM)的工程化能力,给出从原理到可运行 Demo 的完整路径。

引言

现在主流推理框架都在往两个方向发力:
一是单卡效率,靠算子融合、CUDA Graph、FP8/INT8、Paged KV 等技术把单次 kernel 做大做满;
二是调度层效率,靠更聪明的批合并流式解码缓存复用让“GPU 忙起来”。
实际落地时,大家经常遇到这些痛点:

  • 吞吐/延迟的拉扯:批大一点吞吐好了,但最慢的那个请求拖着整批人一起慢;批小一点时延低了,但 GPU 忙闲不均。
  • Prompt 长度差异大:Prefill 阶段(吃 prompt)是长序列 GEMM,Decode 阶段(逐 token)是短序列/单步计算,混一起做常常低效。
  • KV Cache 爆内存:并发高的时候,KV Cache 占满显存;请求又长又多时还会碎片化,OOM 很常见。
  • 在线流式输出:用户想“边生成边看”,但你要一边发 token,一边还能继续合批,工程上不太好写。

Dynamo 架构就是在这堆现实问题上“抠细节”:分阶段合批、持续注入新请求、Token 级别出结果、按页管理 KV。下面我们把它拆开说,并给出一个能直接跑起来的最小可用 Demo(CPU/GPU 都能跑),帮你把核心机制吃透,再对接到 Triton / TensorRT-LLM 时会顺手很多。

Dynamo 推理的关键模块

计算面:引擎与并行

  • 引擎层用 TensorRT-LLM(或 PyTorch + CUDA Graph)承载模型执行;
  • 结合 TP(张量并行)/PP(流水线并行)NCCL 做多卡扩展;
  • Decode 路径启用 Paged KVFlash AttentionFP8/INT8 等优化。

服务面:模型服务与动态批处理

  • Triton Inference Server(或 NVIDIA NIM 微服务)做在线服务;
  • 启用 dynamic batching / decoupled transaction policy,让请求在毫秒级延迟预算内合批。

调度面:Dynamo 风格的连续批处理

  • 请求进入“接入队列”后不必等整批完成,在 token 边界持续并入
  • Prefill 与 Decode 分离:新请求先做 Prefill(长序列计算),再加入 Decode 批次(短序列 token-by-token)。

存储面:KV Cache 管理

  • 按页管理(Paged KV),回收和复用更容易;
  • 配额与水位:限制单租户最大并发、触发降级或反压;
  • 冷热分层:长上下文或“挂起请求”的 KV 优先换出。

观测面:可视化与自愈

  • 关键指标:GPU 使用率合批效率Prefill/Decode 时间KV 命中率/占用p95/p99 时延
  • 超阈值触发自动降采样、降精度、切换更激进的合批策略。

Dynamo 推理架构解析

Batch 合并策略:怎么既不拖慢人,又让 GPU 忙起来

一个好用的合批器至少考虑这几条:

  1. 时间上限:最大等待时间,比如 1~3ms,过时就直接发,保证尾时延。
  2. 容量上限max_batch_sizemax_tokens_per_batch 双限同时生效,避免单批太重。
  3. Prefill / Decode 拆分:Prefill 先按Prompt token 总量控重;Decode 按活跃会话数控重。
  4. 公平性:优先级队列(SLA),避免长请求饿死短请求,或多租户之间互相影响。
  5. 可抢占:Decode 每一步都是天然切片点,可以在 token 边界调度,让新请求尽快插队进入 Prefill。

Token 流式输出:既要快,又要稳定

  • 解码步进:每次只向前生成一个 token(或小于等于 N 个),立刻把该 token 推给客户端;
  • 解耦 I/O:推流线程和计算线程分离,避免阻塞合批;
  • 回压控制:客户端慢时不要拖死计算,必要时丢弃部分中间增量,或聚合为“句子级片段”再发。

KV Cache 管理:让显存一直够用

  • 按页管理:每个序列的 KV 占用被切成固定大小的 page,释放与复用都以 page 为单位;
  • 配额&回收:超出配额的会话拒绝接入、挂起或降级;
  • 前缀复用:命中相同系统提示或检索段时,KV 前缀可复用,Prefill 直接省掉一大截。

最小可运行的 Dynamo 风格推理原型

下面这段 Python 代码用 HuggingFace 的 transformers 跑一个可连续合批 + 流式输出 + KV 复用的最小原型。为了容易跑通,默认用 CPU + distilgpt2,也支持 GPU(装好 PyTorch CUDA 即可)。

运行前安装依赖:

pip install torch transformers
# file: toy_dynamo_engine.py
import time
import threading
import queue
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

# --------- 配置 ----------
MODEL_NAME = "distilgpt2"  # 体积小,CPU 也能跑
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

MAX_BATCH_DECODE = 16            # 每步 decode 合批的最大会话数
MAX_PREFILL_TOKENS = 8192        # 单次 prefill 的总 token 上限(防止过大序列)
MAX_QUEUE_DELAY_MS = 3            # 合批最大等待时间
MAX_NEW_TOKENS = 64               # 每个请求最多生成多少 token

# --------- 数据结构 ----------
@dataclass
class Request:
    req_id: int
    prompt: str
    max_new_tokens: int = MAX_NEW_TOKENS
    created_at: float = field(default_factory=time.time)
    output_tokens: List[int] = field(default_factory=list)
    done: bool = False
    # 由调度器填充
    input_ids: Optional[torch.Tensor] = None
    past_key_values: Optional[Any] = None
    last_token: Optional[torch.Tensor] = None

# --------- 引擎 ----------
class ToyDynamoEngine:
    def __init__(self):
        self.tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
        self.model = AutoModelForCausalLM.from_pretrained(MODEL_NAME).to(DEVICE)
        self.model.eval()
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token

    @torch.no_grad()
    def prefill(self, reqs: List[Request]):
        """ 对新来的请求做 prefill,得到初始 past_key_values 和第一个解码 token """
        inputs = self.tokenizer([r.prompt for r in reqs], return_tensors="pt", padding=True).to(DEVICE)
        # 控制总 token,避免超大 batch
        total_tokens = inputs["input_ids"].numel()
        if total_tokens > MAX_PREFILL_TOKENS:
            raise RuntimeError(f"Prefill tokens {total_tokens} exceed limit {MAX_PREFILL_TOKENS}")

        outputs = self.model(**inputs, use_cache=True)
        # 取每个序列最后一个位置的 logits,采样一个 token 作为第一步输出
        last_logits = outputs.logits[:, -1, :]
        next_tokens = torch.argmax(last_logits, dim=-1)  # 为简单演示,用 argmax;生产建议用采样或温度控制

        # 更新请求状态
        for i, r in enumerate(reqs):
            r.input_ids = inputs["input_ids"][i:i+1]  # 保存原始 prompt(可选)
            r.past_key_values = outputs.past_key_values
            r.last_token = next_tokens[i:i+1].unsqueeze(0)  # [1,1]
            r.output_tokens.append(int(next_tokens[i].item()))
        return reqs

    @torch.no_grad()
    def decode_step(self, active: List[Request]):
        """ 对活跃会话做一步 decode:输入 last_token + past,输出下一个 token """
        if not active:
            return

        # 合批 last_token
        input_ids = torch.cat([r.last_token for r in active], dim=0).to(DEVICE)  # [B,1]
        past = active[0].past_key_values  # 这里简单起见假设结构一致
        outputs = self.model(input_ids=input_ids, past_key_values=past, use_cache=True)
        logits = outputs.logits[:, -1, :]
        next_tokens = torch.argmax(logits, dim=-1)

        # 更新每个请求的状态
        for i, r in enumerate(active):
            r.past_key_values = outputs.past_key_values
            r.last_token = next_tokens[i:i+1].unsqueeze(0)  # [1,1]
            r.output_tokens.append(int(next_tokens[i].item()))
            if len(r.output_tokens) >= r.max_new_tokens or int(next_tokens[i].item()) == self.tokenizer.eos_token_id:
                r.done = True

    def decode_text(self, token_ids: List[int]) -> str:
        return self.tokenizer.decode(token_ids, skip_special_tokens=True)

# --------- 调度器(连续批处理) ----------
class DynamoScheduler:
    def __init__(self, engine: ToyDynamoEngine):
        self.engine = engine
        self.waiting_q: "queue.Queue[Request]" = queue.Queue()
        self.active: List[Request] = []
        self.lock = threading.Lock()
        self.stop_flag = False

    def submit(self, req: Request):
        self.waiting_q.put(req)

    def run_forever(self):
        """ 主循环:小延迟等一会儿,聚合 prefill;随后进入 decode 步进,期间持续接纳新请求 """
        while not self.stop_flag:
            # 1) 聚合 prefill
            newcomers = self._gather_newcomers(MAX_QUEUE_DELAY_MS / 1000)
            if newcomers:
                try:
                    self.engine.prefill(newcomers)
                    with self.lock:
                        self.active.extend(newcomers)
                except Exception as e:
                    for r in newcomers:
                        r.done = True
                    print("Prefill error:", e)

            # 2) 一步 decode
            actives = [r for r in self.active if not r.done]
            if not actives and self.waiting_q.empty():
                time.sleep(0.001)
                continue

            # 控制 decode 批大小
            step_batch = actives[:MAX_BATCH_DECODE]
            if step_batch:
                self.engine.decode_step(step_batch)

            # 3) 输出增量(模拟流式)
            self._flush_streaming(step_batch)

            # 4) 清理已完成请求
            with self.lock:
                self.active = [r for r in self.active if not r.done]

    def _gather_newcomers(self, wait_seconds: float) -> List[Request]:
        """ 在一个很短的时间窗内收集新请求,做 prefill 批 """
        start = time.time()
        newcomers = []
        while time.time() - start < wait_seconds:
            try:
                r = self.waiting_q.get_nowait()
                newcomers.append(r)
            except queue.Empty:
                time.sleep(0.001)
        return newcomers

    def _flush_streaming(self, batch: List[Request]):
        """ 这里简单打印每一步的增量,你可以改成 WebSocket/SSE 推流 """
        for r in batch:
            if r.output_tokens:
                text = self.engine.decode_text([r.output_tokens[-1]])
                print(f"[req#{r.req_id}] +{repr(text)}", flush=True)

# --------- 演示入口 ----------
def demo():
    eng = ToyDynamoEngine()
    sch = DynamoScheduler(eng)
    t = threading.Thread(target=sch.run_forever, daemon=True)
    t.start()

    # 模拟高并发提交
    prompts = [
        "Explain the significance of GPU memory bandwidth in LLM inference.",
        "Write a short poem about distributed systems.",
        "What is continuous batching and why does it help?",
        "Summarize the benefits of KV cache paging in two sentences.",
    ]

    reqs: Dict[int, Request] = {}
    for i, p in enumerate(prompts, 1):
        r = Request(req_id=i, prompt=p, max_new_tokens=40)
        reqs[i] = r
        sch.submit(r)

    # 等待全部完成
    while any(not r.done for r in reqs.values()):
        time.sleep(0.05)

    # 汇总输出
    for i, r in reqs.items():
        whole_text = eng.decode_text(r.output_tokens)
        print(f"\n=== Request #{i} Final ===\n{whole_text}\n")

    sch.stop_flag = True
    t.join(timeout=1)

if __name__ == "__main__":
    demo()

代码讲解(抓要点就好)

  • Prefill:把新请求在一个很短的时间窗(默认 3ms)内聚合起来,一次性做长序列前向,得到 past_key_values 和第一个 token。
  • Decode:把活跃会话的 last_token 合起来做一步解码,拿到下一个 token 并更新 KV。
  • 连续并入:主循环每一轮都尝试把“刚刚到达的新请求”做 prefill,然后继续 decode,不会等整批结束
  • 流式输出:每步 decode 后把本步新 token 打印出来,实际工程里换成 WebSocket/SSE 推给客户端即可。
  • KV 管理:示例里把 past_key_values 挂在 Request 上。真实系统会做 page 化和内存池复用,这里保留了思路。

把 Demo 搬进生产:Triton 动态批处理与多实例

Triton 动态批处理配置示例

如果你用 Triton Inference Server 托管 TensorRT-LLM 引擎,可以在 config.pbtxt 开启动态批处理与解耦响应,达到和上面 Demo 相同的调度思路:

name: "llm-trtllm"
backend: "tensorrtllm"
max_batch_size: 128

instance_group [
  { kind: KIND_GPU count: 1 gpus: [0] },
  { kind: KIND_GPU count: 1 gpus: [1] }
]

dynamic_batching {
  preferred_batch_size: [ 4, 8, 16, 32 ]
  max_queue_delay_microseconds: 3000  # 3ms 等待窗口
}

# 解耦请求-响应,便于流式输出
model_transaction_policy { decoupled: true }

# 打开内置缓存(如启用可用)
response_cache { enable: true }

要点:

  • max_queue_delay_microseconds 就是我们的短窗合批
  • decoupled: true 允许多次响应,对应 token 级流式;
  • instance_group 可以把一个模型同时开多个副本,利用大卡/MIG 做隔离和扩容。

Batch 合并策略、流式输出与 KV Cache

Batch 合并策略的取舍

  • 吞吐优先:把 preferred_batch_size 设大一些,max_queue_delay 放宽到 4~6ms,能大幅提升 GPU 利用;
  • 时延优先:把 preferred_batch_size 收紧,max_queue_delay 控制在 1~2ms,牺牲部分吞吐;
  • 混合策略:对不同 SLA 的租户使用不同队列,不同的 max_queue_delay,高优先级走低延迟策略。

Token 级流式输出

  • Triton/NIM 用解耦事务,一条请求可以多次 send
  • 如果你自己写服务,SSE/WebSocket 都行。关键是I/O 解耦:推流线程不要阻塞 decode。

KV Cache 管理的小技巧

  • 按页管理:统一 page 大小(比如 2MB),回收/复用都用 page 计数;
  • 复用前缀:RAG/系统提示经常重复,前缀 KV 命中能省掉一大段 Prefill;
  • 阈值&回收:高水位触发回收策略:暂停超长请求、降低并发、切浮点精度。

把它用在真实业务里

下面给 3 个常见场景,顺便给出必要的代码/配置片段。

场景一:聊天问答 API 网关

需求:超高 QPS,延迟敏感,用户要“边打字边看结果”。
做法

  • API 网关把请求丢进短窗队列,每 1~3ms 聚合一次做 Prefill;
  • 进入 Decode 后每步出 token,SSE 推给前端;
  • Triton 配置 decoupled: true,业务代码里每步 send()

SSE 端点伪代码(Python/FastAPI)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat/stream")
def chat_stream(prompt: str):
    def token_stream():
        req = Request(req_id=int(time.time()*1000)%100000, prompt=prompt, max_new_tokens=128)
        scheduler.submit(req)
        last_len = 0
        while not req.done:
            if len(req.output_tokens) > last_len:
                text = engine.decode_text(req.output_tokens[last_len:])
                last_len = len(req.output_tokens)
                yield f"data: {text}\n\n"
            time.sleep(0.01)
        yield "data: [DONE]\n\n"
    return StreamingResponse(token_stream(), media_type="text/event-stream")

场景二:RAG 检索问答

痛点:Prompt 很长(检索段拼进去),Prefill 特别重。
做法

  • 把“系统提示 + 领域说明 + 模板”作为前缀 KV缓存,检索段变化时只拼后缀;
  • 命中前缀直接跳过大段 Prefill;
  • 合批时对 Prefill 设 MAX_PREFILL_TOKENS,避免偶发超长请求拖全局。

前缀 KV 复用思路(示意)

# 假设 prefix_kv_map 缓存了若干常见前缀的 KV
prefix = build_prefix(system_prompt, domain_hint, template)  # 只包括稳定部分
key = hash(prefix)
if key in prefix_kv_map:
    r.past_key_values = clone(prefix_kv_map[key])   # 直接拿到 KV
    r.input_ids = tokenizer(rag_snippets + question)  # 只对新增部分做 prefill
else:
    # 第一次命中:先把 prefix 做一遍 prefill,然后缓存 KV
    prefix_req = Request(req_id=-1, prompt=prefix, max_new_tokens=1)
    engine.prefill([prefix_req])
    prefix_kv_map[key] = clone(prefix_req.past_key_values)

场景三:多租户 SLA 与限流

痛点:不同租户对延迟/吞吐诉求不一样,容易互相影响。
做法

  • 维护多优先级队列,高优先级 max_queue_delay 更小;
  • 租户配额:限制“活跃会话数”和“总 KV page 数”;
  • 过载时对低优先级租户降级(减小 max_new_tokens、降低温度/核采样)。

多优先级合批伪代码

def gather_newcomers():
    high, normal = [], []
    start = time.time()
    while time.time() - start < MAX_QUEUE_DELAY_MS/1000:
        r = try_pop_high_or_normal_queue()
        if not r:
            time.sleep(0.001); continue
        (high if r.priority == "high" else normal).append(r)
    # 先吃高优先级,空余再放普通
    return (high + normal)[:MAX_BATCH_DECODE]

QA 环节

Q1:Prefill/Decode 为什么要分开?
Prefill 是长序列 GEMM,Decode 是短序列、token 级前向。把两种负载硬拼一起,显卡经常“忽忙忽闲”。分开后,你可以对 Prefill 限制总 token,对 Decode 限制活跃会话数,各自吃满各自的“甜点区”。

Q2:连续批处理会不会饿死长请求?
会,如果不做公平性。用优先级队列+轮转,从不同桶里按比例取请求进入 decode 批。长请求也能稳定推进。

Q3:KV Cache 是不是一直增?
不是。按页管理后,请求结束立刻归还 page。高水位触发回收策略,先清理低优先级/挂起的会话。

Q4:流式输出影响吞吐吗?
I/O 线程要解耦,输出聚合到句子级再发,或者使用零拷贝队列,就不会拖慢计算。Triton 的 decoupled transaction 可以放心用。

Q5:TensorRT-LLM 和 PyTorch 选择哪个?
追求极致延迟/吞吐、模型相对稳定用 TensorRT-LLM;需要快速迭代/频繁改模型,用 PyTorch + CUDA Graph 也能做得很强。生产常见做法是两者并行:验证在 PyTorch,稳定后下沉到 TensorRT-LLM。

Q6:多卡如何划分并行?
长上下文/大模型先考虑 TP(张量并行),很长序列或需要拉长流水线时再加 PP。跨机房的通信延迟会显著影响效果,尽量同机架内聚合。

总结

把 LLM 真正跑上线,关键不是“一个模型多准”,而是“一个 GPU 多忙、一个请求多稳”。Dynamo 风格的分布式推理架构抓住了生产里的三件事:连续批处理把吞吐提起来;流式输出把体验做顺;KV Cache 管理把显存稳住。
本文先用一个能跑起来的最小 Demo 把思路讲清楚,再落到 Triton/TensorRT-LLM 的配置与实践,最后结合聊天问答、RAG、多租户三类常见场景给出实操建议。你可以先直接跑 Demo 感受“连续批”的节奏,再把批策略、流式通道和 KV 管理逐步换成你线上框架的等价能力。这样推进,既能尽快见到收益,也能把复杂度压在你可控的范围里。


网站公告

今日签到

点亮在社区的每一天
去签到