Please enable JavaScript.
Coggle requires JavaScript to display documents.
QServe Impl, sequence.py, model_executor, core/block_manager.py,…
-
sequence.py
class SequenceGroup
// a group of seq generated from same prompt
// used for co-schedule beam search candidates
[
- request_key: str
- seqs_dict: Dict[int, Sequence], seqid -> seq
- sampling_params: SamplingParams
- arrival_time: float
- last_token_time: float
- prefix: Prefix
- prompt_logprobs: PromptLogprobs
]
class Sequence
[
- seq_id: int
- request_key: str
- prompt: str
- prompt_token_ds: List[int]
- data: SequenceData(prompt_token_ids)
- output_logprobs: SampleLogprobs
- output_text: str
- block_size: int
- logical_token_blocks: List[LogicalTokenBlock]
- status: SequenceStatus
- run_vlm: bool
- img_per_seq: int
- pil_image
- prefix_offset: int // inc detok
- read_offset: int // inc detok
- tokens: List[str] // input+output
]
class SampleLogprobs = List[Dict[int, float]]
class SequenceData
[
- prompt_token_ids: List[int]
- output_token_ids: List[int]
- cumuative_logprob: float
class SequenceGroupMetadata
[
- request_key: str
- is_prompt: bool, in prompt stage
- seq_data: Dict[int, SequenceData], seq id -> seq data
- sampling_params: SamplingParams
- block_tables: Dict[int, List[int]], seq id -> list of physical block id
- prefix: Prefix, prefix of prompt
]
enum SequenceStatus
[
- WAITING
- RUNNING
- SWAPPED
- FINISHED_STOPPED
- FINISHED_LENGTH_CAPPED
- FINISHED_ABORTED // mem?
- FINISHED_IGNORED // prompt length
]
class SequenceOutput
[
- parent_seq_id: int, ID of the parent sequence, forked in beam search
- output_token: int
- logprobs: Dict[int, float]
]
class SequenceGroupOutput
[
- samples: List[SequenceOutput]
- prompt_logprobs: PromptLogprobs
]
class SamplerOutput = List[SequenceGroupOutput]
PromptLogprobs = List[Dict[int,float]]
model_executor
parameter.py
-
class ModelWeightParameter(_ColumnvLLMParameter, RowvLLMParameter)
-
-
class GroupQuantScaleParameter(_ColumnvLLMParameter, RowvLLMParameter)
-
-
-
core/block_manager.py
class BlockSapceManager
// mapping logical -> physical
[
- block_size: int
- num_total_gpu_blocks: int
- num_total_cpu_blocks: int
- watermark: float=0.01 // safe buffer
- sliding_window: int
- watermark_blocks: int // gpu watermark
- gpu_allocator: BlockAllocator
- cpu_allocator: BlockAllocator
- block_tables: Dict[int,BlockTable]
]
member can_allocate()
[
- seq_group: SequenceGroup
]
- AllocStatus
// check the first WAITING seq in seq_group,
// whether can put into GPU. all waiting share same prompt.
member allocate()
[
- seq_group: SequenceGroup
]
// get first waiting seq in seq_group
// currently all logical blocks are for prompt
// for each logical block, use gpu_allocator.allocate() to get physical block
// set ref count
// set seq_id -> block_table (logical -> physical), for self.block_tables, for all waiting seqs
member can_append_slot()
[
- seq_group: SequenceGroup
]
- bool
// whether can keep adding one gpu block for each running seq
// this is conservative, becoz not all need to allocate new block
member append_slot()
[
- seq: Sequence
]
- [int, int] // old/new block id
// if has new logical blocks than recored in block_table, gpu_allocator add a new block, return
// if ref_count is 1, then no sharing, do nothing (appendable), return
// if ref_count >1, then there is sharing, allocate a new block and replace. return old/new block id for copy on write
member fork()
[
- parent_seq: Sequence
- child_seq: Sequence
]
// copy the physical token block from parent to child, record in block_tables
// increase the ref count
member can_swap_in()
[
- seq_group: SequenceGroup
]
- bool
// for SWAPPED, all physical blocks (in cpu) + 1 new block for each seq are the required blocks
// this is conservative
member swap_in()
[
- seq_group: SequenceGroup
]
- Dict[int, int] // cpu block no. -> gpu block no.
// for SWAPPED, free CPU, create GPU blocks
// create mapping for copy on write
member can_swap_out()
[
- seq_group: SequenceGroup
]
- bool
// number of gpu blocks is smaller than cpu_allocators capacity
member swap_out()
[
- seq_group: SequenceGroup
]
- Dict[int, int] // gpu block no. -> cpu block no.
// swap out all RUNNING seq
class BlockAllocator
// free physical token blocks, ref count
[
- device: Device
- block_size: int
- num_blocks: int
- free_blocks: BlockTable
]
enum AllocStatus
// Result of BlockSpaceManager.can_allocate for SeqGroup
[
- OK
- LATER
- NEVER // larger than GPU mem
]
core/scheduler.py
class Scheduler
[
- schduler_config: SchedulerConfig
- cache_config: CacheConfig
- prompt_limit: int
- policy: FCFSPolicy
- block_manager: BlackSpaceManager
- prefix_pool: PrefixPool
- waiting: Deque[SequenceGroup]
- running: Deque[SequenceGroup]
- swapped: Deque[SequenceGroup]
]
member schedule()
- [SequenceGroupMetadata]
- SchedulerOutputs
- scheduler_outputs = _schedule()
- for each scheduled seq_group
2a. for each RUNNING seq
2a1. update seq_data, id->seq.data
2a2. update block_tables, id->phys block ids
2b. create SequenceGroupMetadata for a seq_group, request_key, is_prompt, seq_data, sampling_prams, block_tables, prefix.
member _schedule()
[]
{
- self.waiting: Deque[SequenceGroup]
- self.swapped: Deque[SequenceGroup]
- self.running: Deque[SequenceGroup]
- preempted: List[SequenceGroup]
- scheduled: List[SequenceGroup]
- ignored: List[SequenceGroup]
- blocks_to_swap_in: Dict[int,int]
- blocks_to_swap_out: Dict[int,int]
- blocks_to_copy: Dict[int,List[int]]
}
- SchedulerOutputs(scheduled, prompt_run, num_batched_tokens, blocks_to_swap_in, blocks_to_swap_out, blocks_to_copy, ignored)
//running > swapped > recompute > waiting
case 1. if no self.swapped, and can schedule from self.waiting
- get total num of seqs on the fly (consider future decoding phase), -> num_curr_seqs
- for each self.waiting
2a. check prompt length with prompt_limit, if so, change seq.status to FINISHED_IGNORED, and add to ignored_seq_groups, remove from self.waiting, continue
2b. check block_manager.can_allocate for prompt, if NEVER, change to FINISHED_IGNORED, add to ignored, remove from self.waiting, continue
2c. check block_manager.can_allocate, if LATER, break (no need to search),
2d. check scheduled total prompt length, if exceed max_num_batched_tokens, stop to search
2e. check scheduled total num seqs, if exceed max_num steps, stop to search
2f. remove from waiting and add to running. block_manager.allocate(seq_group), change WAITING to RUNNING for seq.
- return SchedulerOutputs, if has scheduled or ignored
case 2. self.running schedule
- for each self.running, pop left a seq_group
1a. while not can_append_slot(seq_group)
1a1. if there are seq_group in self.running, preempt the last one, update mapping to blocks_to_swap_out, update preempted
1a2. if there are no candidate to preempt, preempt this one itself and break
1b. if finally can_append_slot, _append_slot, add to back of new running
- assign new running to self.running
case 2a. nothing is preempted, we can swap in
- for each self.swapped
1a. check can_swap_in, if not, break
1b. check max_num_seqs, if exceed, break(since SWAPPED directly to RUNNING not waiting)
1c. remove from swapped, add to running
1d. _swap_in, blocks_to_swap_in, _append_slot, blocks_to_copy
- Return SchedulerOutputs
member add_seq_group
[
- seq_group: SequenceGroup
]
// add to self.waiting
member abort_seq_group
[
- request_key: List[str]
]
// check if seq group of given id is in 3 queues
// 1. remove seq group from the queue
// 2a. if not finished change seq.status to FINISHED_ABORTED
// 2b. free the block table for seq.id
member _preempt()
[
- seq_group: SequenceGroup
- blocks_to_swap_out: Dict[int,int], out param
- preemption_mode: PreemptionMode
]
// if preemption mode not specified, using RECOMPUTE over SWAPPED, (but recompute for multi seq is not impl)
member_preempt_by_recompute()
[
- seq_group: SequenceGroup
]
// get all RUNNING seq, and there should be only 1
// change to WAITING, block_manager.free(seq)
// push to front of self.waiting
member _preempt_by_swap()
[
- seq_group, SequenceGroup
- blocks_to_swap_out: Dict[int,int], out param
]
// self._swap_out(seq_group, blocks_to_swap_out)
// append to self.swapped
member _swap_out()
[
- seq_group: SequenceGroup
- blocks_to_swap_out: Dict[int,int], out param
]
// first check block_manager.can_swap_out, to check CPU mem
// mapping = block_manager.swap_out(seq_group), update mapping to blocks_to_swap_out
// update RUNNING to SWAPPED
member _swap_in()
[
- seq_group: SequenceGroup
- blocks_to_swap_in: Dict[int,int], out param
]
// mapping = block_manager.swap_in(seq_group), update mapping to blocks_to_swap_in
// update SWAPPED to RUNNING
member _allocate()
[
- seq_group: SequenceGroup
]
// block_manager.allocate(seq_group)
// change all WAITING to RUNNING
member _append_slot()
[
- seq_group: SequenceGroup
- blocks_to_copy: Dict[int,List[int]]
]
// for each RUNNING seq, block_manager.append_slot(seq)
// if has copy on write, add to blocks_to_copy[src]
class SchedulerOutput
[
- scheduled: List[SequenceGroup]
- ignored: List[SequenceGroup]
- prompt_run: bool, in prompt stage
- num_batched_tokens: int
- blocks_to_swap_in: Dict[int,int]
- blocks_to_swap_out: Dict[int,int]
- blocks_to_swap_copy: Dict[int,List[int]]
]
enum PreemptionMode
[
block.py
class LogicalTokenBlock
// used for KV cache
// allocated per seq, to fill outputs
[
- block_number: int, id
- block_size: int, capacity
- token_ids: List[int], data
- num_tokens: int, size
]
class PhysicalTokenBlock
[
- device: Device
- block_number: int, id
- block_size: int, capacity
- ref_count: int
]
class BlockTable = List[PhyicalTokenBlock]
worker
model_runner.py
class ModelRunner
member execute_model()
[
- seq_group_metada_list: List[SequenceGroupMetadata]
]
- SamplerOutput
- input_tokens, input_metadata = self.prepare_input_tensors
- output = model(input_tokens, input_metadata)
- tokens = model.sample(input_tokens, output, input_metadata)
member prepare_input_tensors
[
- seq_group_metada_list: List[SequenceGroupMetadata]
]
- Tensor
- InputMetadata
// 1. if is prompt, _prepare_prompt
// 2. if not, _prepare_decode_ifb
member _prepare_prompt
[
- seq_group_metada_list: List[SequenceGroupMetadata]
]
- Tensor
- InputMetadata
- for each seq_group_metadata in the list
1a. assume for prompt run, seq num is only 1
1b. concat all seq_data.get_token_ids() from the list into input_tokens, save their lens in context_lens, concat all block_table of a seq into block_tables
- make input_tokens tensor[, [L_all]
- make context_lens_tensor, [#sg]
- make cu_seqlens_tensor, [#sg+1]
- make block_tables, [#req, max_prompt_block_table_len]
member _make_tensor_with_pad()
[
- x: List[List[int]]
- max_len: int
- pad: int
- dtype
- device
]
- Tensor
from 2d list to 2d tensor with each element with same len: max_len
class GPUModelRunnerBase
-
-
member _dummy_run()
- call ModelRunner.execute_model()
-
class CUDAGraphRunner
member capture()
- run model.forward directly without ModelRunner.execute_model()
worker.py
class Worker
member execute_model()
[
- seq_group_metadata_list: List[SequenceGroupMetadata]
]
- SamplerOutput
call model_runner.execute_model
-
-
-
-
executor
-
executor_base.py
class ExecutorBase
member determine_num_available_blocks
- call collective_rpc("determine_num_available_blocks")
-
engine
llm_engine.py
class LLMEngine
member step()
- call schedule
- call _run_workers
member _run_workers
[
- for self.workers
1a. get executor of the worker
1b. get an output
- return all_outputs
-
member init()
- call _initialize_kv_caches()
member _initialize_kv_caches()
- call model_executor.determine_num_available_blocks()
- call model_executor.initialize_cache()
csrc/qgemm
w4a8_per_chn
gemm_cuda.cu
-
define KERNEL_LAUNCH_CODE
- NUM_WARPS = (CTA_M / WARP_M) x (CTA_N / WARP_N) x (CTA_K / WARP_K)
- SCALES_SMEM_SIZE = CTA_N x (CTA_K / G) x STAGES x 2 // scales are in 2 bytes
- kSmemByteSize = (CTA_M x CTA_K + CTA_N x CTA_K / 2) x STAGES + SCALES_SMEM_SIZE // matrix b in int4
- check Maximum amount of shared memory per thread block, 163KB or 99KB
- num_blocks_m: M/CTA_M, num_blocks_n: N/CTA_N
- shift tile block from m to n: m /= 8, n x= 8
- setup kernel_func = dense_kernel0<....>
- cudaFuncSetAttribute(kernel_func, cudaFuncAttributeMaxDynamicSharedMemorySize, kSmemByteSize)
- call kernel_func<<<num_blocks, threads_per_block, kSmemByteSize>>>(...)