Sequence¶
约 690 个字 103 行代码 预计阅读时间 4 分钟
sequence.py 定义了序列的状态机和数据结构。每个 Sequence 代表一条正在运行的生成路径,携带 token 历史、KV cache 页表、采样参数和调度状态。
状态机¶
SequenceStatus 用三个状态描述一条序列的生命周期:
class SequenceStatus(Enum):
WAITING = auto() # 等待调度
RUNNING = auto() # 正在参与前向
FINISHED = auto() # 已终止(EOS 或达到 max_tokens)
状态转换:WAITING → RUNNING → FINISHED,不会回退。
num_tokens¶
初始化时,num_tokens 和 num_prompt_tokens 分别取了两个不同来源的长度:
self.token_ids = copy(token_ids) # 切断外部引用,Sequence 拥有独立的列表
self.num_tokens = len(self.token_ids) # 绑定对象内部列表,跟着序列成长
self.num_prompt_tokens = len(token_ids) # 绑定外部参数,冻结在 prompt 原始长度
初始化那一刻两者相等,但语义不同:
num_tokens:会变。每生成一个新 token,append_token()里self.num_tokens += 1。num_prompt_tokens:永远不变。prompt 的原始长度,初始化后就冻结,用来计算已生成 token 数(num_completion_tokens = num_tokens - num_prompt_tokens)。
copy() 的必要性:Python 的参数传递是对象引用共享,不 copy 的话外部修改会污染序列内部、序列内部 append_token 也会污染外部。copy 切断了这个共享关系。
为什么 num_tokens 必须独立存在¶
初看 num_tokens 似乎多余——len(self.token_ids) 总能拿到同样的值,何必多维护一个变量?
当 tensor_parallel_size > 1 时,主进程(rank 0)需要将 Sequence 对象通过 pickle + shared_memory 传给 worker 进程。Shared memory 只有 1MB,如果每条 seq 都携带完整的历史 token 列表(decode 阶段可能已有几百到几千个 token),加上 Python 序列化的对象膨胀,极易溢出。
于是作者在 __getstate__(pickle 序列化钩子)中做了压缩:
- decode 阶段只传
last_token - 不传整条
token_ids列表;历史 token 对应的 K/V 状态已经存在 KV cache 中,decode attention 不再需要原始 token ids
__setstate__(pickle 反序列化钩子)把 token_ids 重建为空列表,此时 len(self.token_ids) == 0,但 num_tokens 作为独立字段从 tuple 中恢复,仍能正确反映序列的真实长度。
如果 num_tokens 不存在会怎样? model_runner.prepare_decode 用 len(seq) 计算 position 和 context_lens,scheduler 直接读取 seq.num_tokens 计算调度进度。如果用 len(self.token_ids) 替代,decode worker 反序列化后 token_ids 为空,len(seq) 会错误地变成 0。
KV Cache 相关字段¶
self.block_table = [] # K/V 存在显存的哪几个物理块里(初始为空,BlockManager 分配后填上)
self.num_cached_tokens = 0 # 已有多少个 token 的 K/V 写入 cache 了
self.num_scheduled_tokens = 0 # 本轮 forward 要往 cache 里写多少个新 token 的 K/V
self.is_prefill = True # 表示还没做 Prefill
block_table 本质上是 KV cache 的「页表」:这条 Sequence 的第 i 个逻辑 block,对应 KV cache 里的哪个物理 block id。
比如:
含义是:
seq 的第 0 个 token block → KV cache 物理 block 7
seq 的第 1 个 token block → KV cache 物理 block 3
seq 的第 2 个 token block → KV cache 物理 block 10
不同 sequence 的 KV cache block 可能分散在显存里,不一定连续。
last_block_num_tokens¶
@property
def last_block_num_tokens(self):
return self.num_tokens - (self.num_blocks - 1) * self.block_size
表示最后一个 block 里已经用了多少个 token 位置。decode 阶段用它算新 token 应该写进 KV cache 的哪个 slot:
block 方法¶
def block(self, i):
assert 0 <= i < self.num_blocks
return self.token_ids[i*self.block_size: (i+1)*self.block_size]
返回第 i 个逻辑 token block 的 token id 切片。主要给 BlockManager 用,比如做 prefix cache hash。
Source Code¶
from copy import copy
from enum import Enum, auto
from itertools import count
from nanovllm.sampling_params import SamplingParams
class SequenceStatus(Enum):
WAITING = auto()
RUNNING = auto()
FINISHED = auto()
class Sequence:
block_size = 256
counter = count()
def __init__(self, token_ids: list[int], sampling_params = SamplingParams()):
self.seq_id = next(Sequence.counter)
self.status = SequenceStatus.WAITING
self.token_ids = copy(token_ids)
self.last_token = token_ids[-1]
self.num_tokens = len(self.token_ids)
self.num_prompt_tokens = len(token_ids)
self.num_cached_tokens = 0
self.num_scheduled_tokens = 0
self.is_prefill = True
self.block_table = []
self.temperature = sampling_params.temperature
self.max_tokens = sampling_params.max_tokens
self.ignore_eos = sampling_params.ignore_eos
def __len__(self):
return self.num_tokens
def __getitem__(self, key):
return self.token_ids[key]
@property
def is_finished(self):
return self.status == SequenceStatus.FINISHED
@property
def num_completion_tokens(self):
return self.num_tokens - self.num_prompt_tokens
@property
def prompt_token_ids(self):
return self.token_ids[:self.num_prompt_tokens]
@property
def completion_token_ids(self):
return self.token_ids[self.num_prompt_tokens:]
@property
def num_blocks(self):
return (self.num_tokens + self.block_size - 1) // self.block_size
@property
def last_block_num_tokens(self):
return self.num_tokens - (self.num_blocks - 1) * self.block_size
def block(self, i):
assert 0 <= i < self.num_blocks
return self.token_ids[i*self.block_size: (i+1)*self.block_size]
def append_token(self, token_id: int):
self.token_ids.append(token_id)
self.last_token = token_id
self.num_tokens += 1
def __getstate__(self):
last_state = self.last_token if not self.is_prefill else self.token_ids
return (self.num_tokens, self.num_prompt_tokens, self.num_cached_tokens,
self.num_scheduled_tokens, self.block_table, last_state)
def __setstate__(self, state):
self.num_tokens, self.num_prompt_tokens, self.num_cached_tokens, \
self.num_scheduled_tokens, self.block_table, last_state = state
if isinstance(last_state, list):
self.token_ids = last_state
self.last_token = self.token_ids[-1]
else:
self.token_ids = []
self.last_token = last_state
Last update: May 5, 2026
Discussion