vLLM - SchedulerInterface

发布于:2025-09-15 ⋅ 阅读:(24) ⋅ 点赞:(0)

SchedulerInterface定义了vLLM Scheduler的接口,源代码:vllm\v1\core\sched\interface.py。

schedule

schedule方法产生当前Step(一次模型前向传播)的调度结果:SchedulerOutput。

class SchedulerInterface(ABC):
    @abstractmethod
    def schedule(self) -> "SchedulerOutput":
        raise NotImplementedError

SchedulerOutput的定义:

@dataclass
class SchedulerOutput:
    scheduled_new_reqs: list[NewRequestData] # 新请求
    scheduled_cached_reqs: CachedRequestData # 已调度过的请求
    num_scheduled_tokens: dict[str, int] # 请求需要生成的token数字典
    total_num_scheduled_tokens: int # 所有请求需要生成的token总数
    scheduled_spec_decode_tokens: dict[str, list[int]] # 请求投机推理的前置token,len(scheduled_spec_decode_tokens[req_id] = num_scheduled_tokens[req_id] - 1)
    scheduled_encoder_inputs: dict[str, list[int]] # 请求要处理的scheduled_new_reqs中的mm_positions中的多模态输入的id字典。
    num_common_prefix_blocks: list[int] # 每个kv cache group中的common prefix的block数量。
    finished_req_ids: set[str] # 上一次调度后已经完成的req_id,用于通知worker释放资源
    free_encoder_input_ids: list[tuple[str, int]] # 可以释放的[req_id, encoder_inputs_id]列表,用于释放Encoder Cache
    structured_output_request_ids: dict[str, int] # 需要结构化输出的req_id在batch中的index,用于计算grammar_bitmask
    grammar_bitmask: Optional[npt.NDArray[np.int32]]  # batch中需要使用XGrammar做结构化输出的mask
    kv_connector_metadata: Optional[KVConnectorMetadata] = None # 抽象基类定义PD分离场景下PD之间KV Cache通信所需的元数据结构

scheduled_new_reqs

第一次被调度的新请求:

class NewRequestData:
    req_id: str      
    prompt_token_ids: list[int]    
    mm_inputs: list[MultiModalKwargs]  
    mm_hashes: list[str]  
    mm_positions: list[PlaceholderRange]  
    sampling_params: Optional[SamplingParams]
    pooling_params: Optional[PoolingParams]
    block_ids: tuple[list[int], ...]
    num_computed_tokens: int
    lora_request: Optional[LoRARequest]

其中:

  • req_id:请求ID。
  • prompt_token_ids: Prompt的 token ID 列表。
  • mm_inputs: 多模态输入的 keyword 参数列表。
  • mm_hashes: 多模态输入的哈希值列表。
  • mm_positions: 多模态输入在Prompt中的位置信息列表。示例:
Prompt: `AAAA BBBB What is in these images?`
    mm_positions参数:
    ```
    A: PlaceholderRange(offset=0, length=4)
    B: PlaceholderRange(offset=5, length=4)
    ```
    """
  • sampling_params:采样参数。存储生成文本时的采样参数,例如温度(temperature)、Top-K、Top-P 等。
  • pooling_params:池化参数,如池化类型(max pooling、mean pooling 等)。池化操作可以减少输入的维度,提高模型的效率。
  • block_ids:存储与请求相关的块ID。vLLM用块来管理内存。
  • num_computed_tokens:当前请求已生成的token数量。
  • lora_request:LORA参数。

scheduled_cached_reqs

之前已经被调度过的请求:

class CachedRequestData:
    req_ids: list[str]
    # If resumed_from_preemption is False, new_block_ids will be appended to
    # the request's block IDs. If True, new_block_ids will be used as the
    # request's block IDs instead of appending to the existing block IDs.
    resumed_from_preemption: list[bool]
    # NOTE(woosuk): new_token_ids is only used for pipeline parallelism.
    # When PP is not used, new_token_ids will be empty.
    new_token_ids: list[list[int]]
    new_block_ids: list[tuple[list[int], ...]]
    num_computed_tokens: list[int]

其中:

  • req_ids:请求ID列表。
  • resumed_from_preemption:请求是否从抢占中恢复。
  • new_token_ids:只在PP并行时使用。TODO:WHY?
  • new_block_ids:新分配的块ID列表,如果resumed_from_preemption为false,则追加到请求现有块 ID 列表,如果resumed_from_preemption为True,则替换请求现有块 ID 列表。
  • num_computed_tokens:每个请求已计算的 token 数量。

update_from_output

update_from_output方法使用1个Step模型执行的结果,来更新Scheduler的状态。

class SchedulerInterface(ABC):
    @abstractmethod
    def update_from_output(
        self,
        scheduler_output: "SchedulerOutput",
        model_runner_output: "ModelRunnerOutput",
    ) -> dict[int, "EngineCoreOutputs"]:

一个输入是Step的调度结果:SchedulerOutput,具体含义参见上述内容。
一个输入是根据Step调度结果,模型执行的输出ModelRunnerOutput。

ModelRunnerOutput的定义:

@dataclass
class ModelRunnerOutput:
    req_ids: list[str] # 请求ID列表
    req_id_to_index: dict[str, int] # 请求ID在batch中的index字典
    sampled_token_ids: list[list[int]] # 请求生成的token列表
    spec_token_ids: Optional[list[list[int]]] # 请求生成的投机推理token列表
    logprobs: Optional[LogprobsLists]  # 请求生成的token列表的对数概率
    prompt_logprobs_dict: dict[str, Optional[LogprobsTensors]] # 请求的对数概率张量
    pooler_output: list[Optional[torch.Tensor]] # 请求的模型最后一层的隐藏状态
    finished_sending: Optional[set[str]] = None # 请求的结果已经从工作进程发送到调度器进程
    finished_recving: Optional[set[str]] = None # 请求的结果已经被调度器进程接收
    num_nans_in_logits: Optional[dict[str, int]] = None # 请求的logits中的NaN(Not a Number)数量

LogprobsLists

class LogprobsLists(NamedTuple):
    # [num_reqs, max_num_logprobs + 1]
    logprob_token_ids: list[list[int]] # 请求生成的tokens
    # [num_reqs, max_num_logprobs + 1]
    logprobs: list[list[float]] # 请求生成的tokens对应的对数概率
    # [num_reqs]
    sampled_token_ranks: list[int] # 请求生成的第一个token在概率里的排名

add_request

add_request用于添加一个新的请求。

class SchedulerInterface(ABC):
    @abstractmethod
    def add_request(self, request: "Request") -> None:
        raise NotImplementedError

创建一个新的Request:

class Request:
    def __init__(
        self,
        request_id: str,
        prompt_token_ids: list[int],
        multi_modal_inputs: Optional[list[MultiModalKwargs]],
        multi_modal_hashes: Optional[list[str]],
        multi_modal_placeholders: Optional[list[PlaceholderRange]],
        sampling_params: Optional[SamplingParams],
        pooling_params: Optional[PoolingParams],
        eos_token_id: Optional[int],
        client_index: int = 0,
        arrival_time: Optional[float] = None,
        lora_request: Optional["LoRARequest"] = None,
        structured_output_request: Optional["StructuredOutputRequest"] = None,
        cache_salt: Optional[str] = None,
        priority: int = 0,
    ) -> None:
        ...

其中大部分参数的定义同:NewRequestData,其余部分:

  • client_index:用户ID
  • cache_salt:Cache的盐值。提供了一个额外的唯一性标识符,用于确保缓存的唯一性。
  • priority:请求的优先级。

finish_requests

finish_requests用于完成一个请求:


    @abstractmethod
    def finish_requests(
        self,
        request_ids: Union[str, Iterable[str]],
        finished_status: "RequestStatus",
    ) -> None:
        raise NotImplementedError

其中:

  • request_ids:要结束的请求ID列表
  • finished_status:请求结束的状态

RequestStatus

class RequestStatus(enum.IntEnum):
    """Status of a request."""
    WAITING = enum.auto()  # 请求正在等待处理
    WAITING_FOR_FSM = enum.auto() # 请求正在等待FSM(Finite State Machine)的处理
    WAITING_FOR_REMOTE_KVS = enum.auto() # 请求正在等待KVS(Remote Key-Value Store)的响应,在vLLM中,KVS用于跨节点存储和检索数据。
    RUNNING = enum.auto() # 请求正在处理中
    PREEMPTED = enum.auto() # 请求被抢占
    FINISHED_STOPPED = enum.auto() # 请求完成并停止
    FINISHED_LENGTH_CAPPED = enum.auto() # 请求完成但长度超限(还没遇到EOS但是超最大长度)
    FINISHED_ABORTED = enum.auto() # 请求被终止,比如client终止
    FINISHED_IGNORED = enum.auto() # 请求完成但被忽略,通常发生在请求无效、系统策略、错误处理或资源管理等场景中

make_stats

make_stats生成Scheduler统计数据,用于log等:

class SchedulerInterface(ABC):
    @abstractmethod
    def make_stats(self) -> Optional["SchedulerStats"]:
        raise NotImplementedError

SchedulerStats的定义:

@dataclass
class SchedulerStats:
    num_running_reqs: int = 0 # 运行中的请求
    num_waiting_reqs: int = 0 # 等待调度的请求
    kv_cache_usage: float = 0.0 # KV Cache利用率
    prefix_cache_stats: PrefixCacheStats = field(
        default_factory=PrefixCacheStats) # Prefix Cache统计
    spec_decoding_stats: Optional[SpecDecodingStats] = None # 投机推理统计
    num_corrupted_reqs: int = 0 # 损坏的请求数(结果中有NAN)

@dataclass
class PrefixCacheStats:
    reset: bool = False # 在Cache创建后,是否有reset_prefix_cache调用(如RLHF导致权重更新等)
    requests: int = 0 # Cache请求次数
    queries: int = 0 # Cache查询的token数
    hits: int = 0 # Cache查询命中的token数
    
@dataclass
class SpecDecodingStats:
    num_spec_tokens: int # Scheduler创建时的配置参数:投机推理的token数
    num_drafts: int = 0 # 投机推理的总次数
    num_draft_tokens: int = 0 # 投机推理产生的token数
    num_accepted_tokens: int = 0 # 被接受的投机推理的token数
    num_accepted_tokens_per_pos: list[int] = field(default_factory=list) # 投机推理的token每个位置的接受次数

request status

SchedulerInterface定义了一系列方法用于获取request status:

class SchedulerInterface(ABC):
    @abstractmethod
    def get_num_unfinished_requests(self) -> int:
        raise NotImplementedError

    def has_unfinished_requests(self) -> bool:
        return self.get_num_unfinished_requests() > 0

    @abstractmethod
    def has_finished_requests(self) -> bool:
        raise NotImplementedError

    def has_requests(self) -> bool:
        return self.has_unfinished_requests() or self.has_finished_requests()

    @abstractmethod
    def get_request_counts(self) -> tuple[int, int]:
        raise NotImplementedError

reset_prefix_cache

reset_prefix_cache用于weights在线更新后,重新生成Prefix Cache

    @abstractmethod
    def reset_prefix_cache(self) -> bool:
        raise NotImplementedError

shutdown

停止调度器:

class SchedulerInterface(ABC):
    @abstractmethod
    def shutdown(self) -> None:
        """Shutdown the scheduler."""
        raise NotImplementedError

get_kv_connector

get_kv_connector用于获取KV Cache通信的Connector。

class SchedulerInterface(ABC):
	def get_kv_connector(self) -> Optional["KVConnectorBase_V1"]:
        return None