基于 NVIDIA 生态的 Dynamo 风格分布式 LLM 推理架构
大家好,我是 展菲,目前在上市企业从事人工智能项目研发管理工作,平时热衷于分享各种编程领域的软硬技能知识以及前沿技术,包括iOS、前端、Harmony OS、Java、Python等方向。在移动端开发、鸿蒙开发、物联网、嵌入式、云原生、开源等领域有深厚造诣。
图书作者:《ESP32-C3 物联网工程开发实战》
图书作者:《SwiftUI 入门,进阶与实战》
超级个体:COC上海社区主理人
特约讲师:大学讲师,谷歌亚马逊分享嘉宾
科技博主:华为HDE/HDG
我的博客内容涵盖广泛,主要分享技术教程、Bug解决方案、开发工具使用、前沿科技资讯、产品评测与使用体验。我特别关注云服务产品评测、AI 产品对比、开发板性能测试以及技术报告,同时也会提供产品优缺点分析、横向对比,并分享技术沙龙与行业大会的参会体验。我的目标是为读者提供有深度、有实用价值的技术洞察与分析。
展菲:您的前沿技术领航员
👋 大家好,我是展菲!
📱 全网搜索“展菲”,即可纵览我在各大平台的知识足迹。
📣 公众号“Swift社区”,每周定时推送干货满满的技术长文,从新兴框架的剖析到运维实战的复盘,助您技术进阶之路畅通无阻。
💬 微信端添加好友“fzhanfei”,与我直接交流,不管是项目瓶颈的求助,还是行业趋势的探讨,随时畅所欲言。
📅 最新动态:2025 年 3 月 17 日
快来加入技术社区,一起挖掘技术的无限潜能,携手迈向数字化新征程!
文章目录
- 摘要
- 引言
- Dynamo 推理的关键模块
- 计算面:引擎与并行
- 服务面:模型服务与动态批处理
- 调度面:Dynamo 风格的连续批处理
- 存储面:KV Cache 管理
- 观测面:可视化与自愈
- Dynamo 推理架构解析
- Batch 合并策略:怎么既不拖慢人,又让 GPU 忙起来
- Token 流式输出:既要快,又要稳定
- KV Cache 管理:让显存一直够用
- 最小可运行的 Dynamo 风格推理原型
- 代码讲解(抓要点就好)
- 把 Demo 搬进生产:Triton 动态批处理与多实例
- Triton 动态批处理配置示例
- Batch 合并策略、流式输出与 KV Cache
- Batch 合并策略的取舍
- Token 级流式输出
- KV Cache 管理的小技巧
- 把它用在真实业务里
- 场景一:聊天问答 API 网关
- 场景二:RAG 检索问答
- 场景三:多租户 SLA 与限流
- QA 环节
- 总结
摘要
大模型上到生产之后,最先撞墙的往往不是“精度”,而是“吞吐”和“时延”。聊天问答、RAG 检索问答、智能客服这类在线场景,并发高、延迟敏感,常规“一次性 batch + 逐条生成”的做法很快就遇到性能瓶颈:GPU 吃不满、排队时间长、波动大。本文聚焦一套基于 NVIDIA 生态的“Dynamo 风格”分布式 LLM 推理架构(文中简称 Dynamo 架构):核心是连续批处理(continuous batching / dynamic batching)、Prefill/Decode 分离、Token 级流式输出和KV Cache 有效管理,并结合多卡通信(NCCL)、TensorRT-LLM、Triton Inference Server(或 NIM)的工程化能力,给出从原理到可运行 Demo 的完整路径。
引言
现在主流推理框架都在往两个方向发力:
一是单卡效率,靠算子融合、CUDA Graph、FP8/INT8、Paged KV 等技术把单次 kernel 做大做满;
二是调度层效率,靠更聪明的批合并、流式解码、缓存复用让“GPU 忙起来”。
实际落地时,大家经常遇到这些痛点:
- 吞吐/延迟的拉扯:批大一点吞吐好了,但最慢的那个请求拖着整批人一起慢;批小一点时延低了,但 GPU 忙闲不均。
- Prompt 长度差异大:Prefill 阶段(吃 prompt)是长序列 GEMM,Decode 阶段(逐 token)是短序列/单步计算,混一起做常常低效。
- KV Cache 爆内存:并发高的时候,KV Cache 占满显存;请求又长又多时还会碎片化,OOM 很常见。
- 在线流式输出:用户想“边生成边看”,但你要一边发 token,一边还能继续合批,工程上不太好写。
Dynamo 架构就是在这堆现实问题上“抠细节”:分阶段合批、持续注入新请求、Token 级别出结果、按页管理 KV。下面我们把它拆开说,并给出一个能直接跑起来的最小可用 Demo(CPU/GPU 都能跑),帮你把核心机制吃透,再对接到 Triton / TensorRT-LLM 时会顺手很多。
Dynamo 推理的关键模块
计算面:引擎与并行
- 引擎层用 TensorRT-LLM(或 PyTorch + CUDA Graph)承载模型执行;
- 结合 TP(张量并行)/PP(流水线并行) 和 NCCL 做多卡扩展;
- Decode 路径启用 Paged KV、Flash Attention、FP8/INT8 等优化。
服务面:模型服务与动态批处理
- 用 Triton Inference Server(或 NVIDIA NIM 微服务)做在线服务;
- 启用 dynamic batching / decoupled transaction policy,让请求在毫秒级延迟预算内合批。
调度面:Dynamo 风格的连续批处理
- 请求进入“接入队列”后不必等整批完成,在 token 边界持续并入;
- Prefill 与 Decode 分离:新请求先做 Prefill(长序列计算),再加入 Decode 批次(短序列 token-by-token)。
存储面:KV Cache 管理
- 按页管理(Paged KV),回收和复用更容易;
- 配额与水位:限制单租户最大并发、触发降级或反压;
- 冷热分层:长上下文或“挂起请求”的 KV 优先换出。
观测面:可视化与自愈
- 关键指标:GPU 使用率、合批效率、Prefill/Decode 时间、KV 命中率/占用、p95/p99 时延;
- 超阈值触发自动降采样、降精度、切换更激进的合批策略。
Dynamo 推理架构解析
Batch 合并策略:怎么既不拖慢人,又让 GPU 忙起来
一个好用的合批器至少考虑这几条:
- 时间上限:最大等待时间,比如 1~3ms,过时就直接发,保证尾时延。
- 容量上限:
max_batch_size
、max_tokens_per_batch
双限同时生效,避免单批太重。 - Prefill / Decode 拆分:Prefill 先按Prompt token 总量控重;Decode 按活跃会话数控重。
- 公平性:优先级队列(SLA),避免长请求饿死短请求,或多租户之间互相影响。
- 可抢占:Decode 每一步都是天然切片点,可以在 token 边界调度,让新请求尽快插队进入 Prefill。
Token 流式输出:既要快,又要稳定
- 解码步进:每次只向前生成一个 token(或小于等于 N 个),立刻把该 token 推给客户端;
- 解耦 I/O:推流线程和计算线程分离,避免阻塞合批;
- 回压控制:客户端慢时不要拖死计算,必要时丢弃部分中间增量,或聚合为“句子级片段”再发。
KV Cache 管理:让显存一直够用
- 按页管理:每个序列的 KV 占用被切成固定大小的 page,释放与复用都以 page 为单位;
- 配额&回收:超出配额的会话拒绝接入、挂起或降级;
- 前缀复用:命中相同系统提示或检索段时,KV 前缀可复用,Prefill 直接省掉一大截。
最小可运行的 Dynamo 风格推理原型
下面这段 Python 代码用 HuggingFace 的 transformers
跑一个可连续合批 + 流式输出 + KV 复用的最小原型。为了容易跑通,默认用 CPU + distilgpt2
,也支持 GPU(装好 PyTorch CUDA 即可)。
运行前安装依赖:
pip install torch transformers
# file: toy_dynamo_engine.py
import time
import threading
import queue
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Anyimport torch
from transformers import AutoModelForCausalLM, AutoTokenizer# --------- 配置 ----------
MODEL_NAME = "distilgpt2" # 体积小,CPU 也能跑
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"MAX_BATCH_DECODE = 16 # 每步 decode 合批的最大会话数
MAX_PREFILL_TOKENS = 8192 # 单次 prefill 的总 token 上限(防止过大序列)
MAX_QUEUE_DELAY_MS = 3 # 合批最大等待时间
MAX_NEW_TOKENS = 64 # 每个请求最多生成多少 token# --------- 数据结构 ----------
@dataclass
class Request:req_id: intprompt: strmax_new_tokens: int = MAX_NEW_TOKENScreated_at: float = field(default_factory=time.time)output_tokens: List[int] = field(default_factory=list)done: bool = False# 由调度器填充input_ids: Optional[torch.Tensor] = Nonepast_key_values: Optional[Any] = Nonelast_token: Optional[torch.Tensor] = None# --------- 引擎 ----------
class ToyDynamoEngine:def __init__(self):self.tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)self.model = AutoModelForCausalLM.from_pretrained(MODEL_NAME).to(DEVICE)self.model.eval()if self.tokenizer.pad_token is None:self.tokenizer.pad_token = self.tokenizer.eos_token@torch.no_grad()def prefill(self, reqs: List[Request]):""" 对新来的请求做 prefill,得到初始 past_key_values 和第一个解码 token """inputs = self.tokenizer([r.prompt for r in reqs], return_tensors="pt", padding=True).to(DEVICE)# 控制总 token,避免超大 batchtotal_tokens = inputs["input_ids"].numel()if total_tokens > MAX_PREFILL_TOKENS:raise RuntimeError(f"Prefill tokens {total_tokens} exceed limit {MAX_PREFILL_TOKENS}")outputs = self.model(**inputs, use_cache=True)# 取每个序列最后一个位置的 logits,采样一个 token 作为第一步输出last_logits = outputs.logits[:, -1, :]next_tokens = torch.argmax(last_logits, dim=-1) # 为简单演示,用 argmax;生产建议用采样或温度控制# 更新请求状态for i, r in enumerate(reqs):r.input_ids = inputs["input_ids"][i:i+1] # 保存原始 prompt(可选)r.past_key_values = outputs.past_key_valuesr.last_token = next_tokens[i:i+1].unsqueeze(0) # [1,1]r.output_tokens.append(int(next_tokens[i].item()))return reqs@torch.no_grad()def decode_step(self, active: List[Request]):""" 对活跃会话做一步 decode:输入 last_token + past,输出下一个 token """if not active:return# 合批 last_tokeninput_ids = torch.cat([r.last_token for r in active], dim=0).to(DEVICE) # [B,1]past = active[0].past_key_values # 这里简单起见假设结构一致outputs = self.model(input_ids=input_ids, past_key_values=past, use_cache=True)logits = outputs.logits[:, -1, :]next_tokens = torch.argmax(logits, dim=-1)# 更新每个请求的状态for i, r in enumerate(active):r.past_key_values = outputs.past_key_valuesr.last_token = next_tokens[i:i+1].unsqueeze(0) # [1,1]r.output_tokens.append(int(next_tokens[i].item()))if len(r.output_tokens) >= r.max_new_tokens or int(next_tokens[i].item()) == self.tokenizer.eos_token_id:r.done = Truedef decode_text(self, token_ids: List[int]) -> str:return self.tokenizer.decode(token_ids, skip_special_tokens=True)# --------- 调度器(连续批处理) ----------
class DynamoScheduler:def __init__(self, engine: ToyDynamoEngine):self.engine = engineself.waiting_q: "queue.Queue[Request]" = queue.Queue()self.active: List[Request] = []self.lock = threading.Lock()self.stop_flag = Falsedef submit(self, req: Request):self.waiting_q.put(req)def run_forever(self):""" 主循环:小延迟等一会儿,聚合 prefill;随后进入 decode 步进,期间持续接纳新请求 """while not self.stop_flag:# 1) 聚合 prefillnewcomers = self._gather_newcomers(MAX_QUEUE_DELAY_MS / 1000)if newcomers:try:self.engine.prefill(newcomers)with self.lock:self.active.extend(newcomers)except Exception as e:for r in newcomers:r.done = Trueprint("Prefill error:", e)# 2) 一步 decodeactives = [r for r in self.active if not r.done]if not actives and self.waiting_q.empty():time.sleep(0.001)continue# 控制 decode 批大小step_batch = actives[:MAX_BATCH_DECODE]if step_batch:self.engine.decode_step(step_batch)# 3) 输出增量(模拟流式)self._flush_streaming(step_batch)# 4) 清理已完成请求with self.lock:self.active = [r for r in self.active if not r.done]def _gather_newcomers(self, wait_seconds: float) -> List[Request]:""" 在一个很短的时间窗内收集新请求,做 prefill 批 """start = time.time()newcomers = []while time.time() - start < wait_seconds:try:r = self.waiting_q.get_nowait()newcomers.append(r)except queue.Empty:time.sleep(0.001)return newcomersdef _flush_streaming(self, batch: List[Request]):""" 这里简单打印每一步的增量,你可以改成 WebSocket/SSE 推流 """for r in batch:if r.output_tokens:text = self.engine.decode_text([r.output_tokens[-1]])print(f"[req#{r.req_id}] +{repr(text)}", flush=True)# --------- 演示入口 ----------
def demo():eng = ToyDynamoEngine()sch = DynamoScheduler(eng)t = threading.Thread(target=sch.run_forever, daemon=True)t.start()# 模拟高并发提交prompts = ["Explain the significance of GPU memory bandwidth in LLM inference.","Write a short poem about distributed systems.","What is continuous batching and why does it help?","Summarize the benefits of KV cache paging in two sentences.",]reqs: Dict[int, Request] = {}for i, p in enumerate(prompts, 1):r = Request(req_id=i, prompt=p, max_new_tokens=40)reqs[i] = rsch.submit(r)# 等待全部完成while any(not r.done for r in reqs.values()):time.sleep(0.05)# 汇总输出for i, r in reqs.items():whole_text = eng.decode_text(r.output_tokens)print(f"\n=== Request #{i} Final ===\n{whole_text}\n")sch.stop_flag = Truet.join(timeout=1)if __name__ == "__main__":demo()
代码讲解(抓要点就好)
- Prefill:把新请求在一个很短的时间窗(默认 3ms)内聚合起来,一次性做长序列前向,得到
past_key_values
和第一个 token。 - Decode:把活跃会话的
last_token
合起来做一步解码,拿到下一个 token 并更新 KV。 - 连续并入:主循环每一轮都尝试把“刚刚到达的新请求”做 prefill,然后继续 decode,不会等整批结束。
- 流式输出:每步 decode 后把本步新 token 打印出来,实际工程里换成 WebSocket/SSE 推给客户端即可。
- KV 管理:示例里把
past_key_values
挂在 Request 上。真实系统会做 page 化和内存池复用,这里保留了思路。
把 Demo 搬进生产:Triton 动态批处理与多实例
Triton 动态批处理配置示例
如果你用 Triton Inference Server 托管 TensorRT-LLM 引擎,可以在 config.pbtxt
开启动态批处理与解耦响应,达到和上面 Demo 相同的调度思路:
name: "llm-trtllm"
backend: "tensorrtllm"
max_batch_size: 128instance_group [{ kind: KIND_GPU count: 1 gpus: [0] },{ kind: KIND_GPU count: 1 gpus: [1] }
]dynamic_batching {preferred_batch_size: [ 4, 8, 16, 32 ]max_queue_delay_microseconds: 3000 # 3ms 等待窗口
}# 解耦请求-响应,便于流式输出
model_transaction_policy { decoupled: true }# 打开内置缓存(如启用可用)
response_cache { enable: true }
要点:
max_queue_delay_microseconds
就是我们的短窗合批;decoupled: true
允许多次响应,对应 token 级流式;- 多
instance_group
可以把一个模型同时开多个副本,利用大卡/MIG 做隔离和扩容。
Batch 合并策略、流式输出与 KV Cache
Batch 合并策略的取舍
- 吞吐优先:把
preferred_batch_size
设大一些,max_queue_delay
放宽到 4~6ms,能大幅提升 GPU 利用; - 时延优先:把
preferred_batch_size
收紧,max_queue_delay
控制在 1~2ms,牺牲部分吞吐; - 混合策略:对不同 SLA 的租户使用不同队列,不同的
max_queue_delay
,高优先级走低延迟策略。
Token 级流式输出
- Triton/NIM 用解耦事务,一条请求可以多次
send
; - 如果你自己写服务,SSE/WebSocket 都行。关键是I/O 解耦:推流线程不要阻塞 decode。
KV Cache 管理的小技巧
- 按页管理:统一 page 大小(比如 2MB),回收/复用都用 page 计数;
- 复用前缀:RAG/系统提示经常重复,前缀 KV 命中能省掉一大段 Prefill;
- 阈值&回收:高水位触发回收策略:暂停超长请求、降低并发、切浮点精度。
把它用在真实业务里
下面给 3 个常见场景,顺便给出必要的代码/配置片段。
场景一:聊天问答 API 网关
需求:超高 QPS,延迟敏感,用户要“边打字边看结果”。
做法:
- API 网关把请求丢进短窗队列,每 1~3ms 聚合一次做 Prefill;
- 进入 Decode 后每步出 token,SSE 推给前端;
- Triton 配置
decoupled: true
,业务代码里每步send()
。
SSE 端点伪代码(Python/FastAPI):
from fastapi import FastAPI
from fastapi.responses import StreamingResponseapp = FastAPI()@app.post("/chat/stream")
def chat_stream(prompt: str):def token_stream():req = Request(req_id=int(time.time()*1000)%100000, prompt=prompt, max_new_tokens=128)scheduler.submit(req)last_len = 0while not req.done:if len(req.output_tokens) > last_len:text = engine.decode_text(req.output_tokens[last_len:])last_len = len(req.output_tokens)yield f"data: {text}\n\n"time.sleep(0.01)yield "data: [DONE]\n\n"return StreamingResponse(token_stream(), media_type="text/event-stream")
场景二:RAG 检索问答
痛点:Prompt 很长(检索段拼进去),Prefill 特别重。
做法:
- 把“系统提示 + 领域说明 + 模板”作为前缀 KV缓存,检索段变化时只拼后缀;
- 命中前缀直接跳过大段 Prefill;
- 合批时对 Prefill 设
MAX_PREFILL_TOKENS
,避免偶发超长请求拖全局。
前缀 KV 复用思路(示意):
# 假设 prefix_kv_map 缓存了若干常见前缀的 KV
prefix = build_prefix(system_prompt, domain_hint, template) # 只包括稳定部分
key = hash(prefix)
if key in prefix_kv_map:r.past_key_values = clone(prefix_kv_map[key]) # 直接拿到 KVr.input_ids = tokenizer(rag_snippets + question) # 只对新增部分做 prefill
else:# 第一次命中:先把 prefix 做一遍 prefill,然后缓存 KVprefix_req = Request(req_id=-1, prompt=prefix, max_new_tokens=1)engine.prefill([prefix_req])prefix_kv_map[key] = clone(prefix_req.past_key_values)
场景三:多租户 SLA 与限流
痛点:不同租户对延迟/吞吐诉求不一样,容易互相影响。
做法:
- 维护多优先级队列,高优先级
max_queue_delay
更小; - 租户配额:限制“活跃会话数”和“总 KV page 数”;
- 过载时对低优先级租户降级(减小
max_new_tokens
、降低温度/核采样)。
多优先级合批伪代码:
def gather_newcomers():high, normal = [], []start = time.time()while time.time() - start < MAX_QUEUE_DELAY_MS/1000:r = try_pop_high_or_normal_queue()if not r:time.sleep(0.001); continue(high if r.priority == "high" else normal).append(r)# 先吃高优先级,空余再放普通return (high + normal)[:MAX_BATCH_DECODE]
QA 环节
Q1:Prefill/Decode 为什么要分开?
Prefill 是长序列 GEMM,Decode 是短序列、token 级前向。把两种负载硬拼一起,显卡经常“忽忙忽闲”。分开后,你可以对 Prefill 限制总 token,对 Decode 限制活跃会话数,各自吃满各自的“甜点区”。
Q2:连续批处理会不会饿死长请求?
会,如果不做公平性。用优先级队列+轮转,从不同桶里按比例取请求进入 decode 批。长请求也能稳定推进。
Q3:KV Cache 是不是一直增?
不是。按页管理后,请求结束立刻归还 page。高水位触发回收策略,先清理低优先级/挂起的会话。
Q4:流式输出影响吞吐吗?
I/O 线程要解耦,输出聚合到句子级再发,或者使用零拷贝队列,就不会拖慢计算。Triton 的 decoupled transaction 可以放心用。
Q5:TensorRT-LLM 和 PyTorch 选择哪个?
追求极致延迟/吞吐、模型相对稳定用 TensorRT-LLM;需要快速迭代/频繁改模型,用 PyTorch + CUDA Graph 也能做得很强。生产常见做法是两者并行:验证在 PyTorch,稳定后下沉到 TensorRT-LLM。
Q6:多卡如何划分并行?
长上下文/大模型先考虑 TP(张量并行),很长序列或需要拉长流水线时再加 PP。跨机房的通信延迟会显著影响效果,尽量同机架内聚合。
总结
把 LLM 真正跑上线,关键不是“一个模型多准”,而是“一个 GPU 多忙、一个请求多稳”。Dynamo 风格的分布式推理架构抓住了生产里的三件事:连续批处理把吞吐提起来;流式输出把体验做顺;KV Cache 管理把显存稳住。
本文先用一个能跑起来的最小 Demo 把思路讲清楚,再落到 Triton/TensorRT-LLM 的配置与实践,最后结合聊天问答、RAG、多租户三类常见场景给出实操建议。你可以先直接跑 Demo 感受“连续批”的节奏,再把批策略、流式通道和 KV 管理逐步换成你线上框架的等价能力。这样推进,既能尽快见到收益,也能把复杂度压在你可控的范围里。