| |
| |
| |
| |
| @@ -454,14 +454,14 @@ class ModelConfig: |
| ).lower() |
| |
| # Detect which checkpoint is it |
| - for _, method in QUANTIZATION_METHODS.items(): |
| - quantization_override = method.override_quantization_method( |
| - quant_cfg, self.quantization |
| - ) |
| - if quantization_override: |
| - quant_method = quantization_override |
| - self.quantization = quantization_override |
| - break |
| + # for _, method in QUANTIZATION_METHODS.items(): |
| + # quantization_override = method.override_quantization_method( |
| + # quant_cfg, self.quantization |
| + # ) |
| + # if quantization_override: |
| + # quant_method = quantization_override |
| + # self.quantization = quantization_override |
| + # break |
| |
| # Verify quantization configurations. |
| if self.quantization is None: |
| |
| |
| |
| |
| @@ -264,6 +264,10 @@ async def validate_json_request(raw_request: Request): |
| |
| |
| @app.get("/health") |
| +async def health(request: Request) -> Response: |
| + return Response(status_code=200) |
| + |
| + |
| @app.get("/health_generate") |
| async def health_generate(request: Request) -> Response: |
| """ |
| |
| |
| |
| |
| @@ -190,6 +190,7 @@ class DeepEPBuffer: |
| f"Consider using --deepep-config to change the behavior." |
| ) |
| |
| + num_qps_per_rank = 20 |
| cls._buffer = Buffer( |
| group, |
| num_nvl_bytes, |
| |
| |
| |
| |
| @@ -351,10 +351,10 @@ class Fp8LinearMethod(LinearMethodBase): |
| return |
| else: |
| weight, weight_scale = layer.weight.data, layer.weight_scale_inv.data |
| - layer.weight = torch.nn.Parameter(weight, requires_grad=False) |
| - layer.weight_scale_inv = torch.nn.Parameter( |
| - weight_scale, requires_grad=False |
| - ) |
| + # layer.weight = torch.nn.Parameter(weight, requires_grad=False) |
| + # layer.weight_scale_inv = torch.nn.Parameter( |
| + # weight_scale, requires_grad=False |
| + # ) |
| return |
| |
| layer.weight = torch.nn.Parameter(layer.weight.data, requires_grad=False) |
| |
| |
| |
| |
| @@ -1359,7 +1359,7 @@ class Scheduler( |
| |
| if memory_leak: |
| msg = "token_to_kv_pool_allocator memory leak detected! " f"{token_msg}" |
| - raise ValueError(msg) |
| + # raise ValueError(msg) |
| |
| if self.disaggregation_mode == DisaggregationMode.DECODE: |
| req_total_size = ( |
| @@ -1374,7 +1374,7 @@ class Scheduler( |
| f"available_size={len(self.req_to_token_pool.free_slots)}, " |
| f"total_size={self.req_to_token_pool.size}\n" |
| ) |
| - raise ValueError(msg) |
| + # raise ValueError(msg) |
| |
| if ( |
| self.enable_metrics |
| @@ -1830,6 +1830,7 @@ class Scheduler( |
| deepep_mode=DeepEPMode(self.server_args.deepep_mode), |
| require_mlp_tp_gather=require_mlp_tp_gather(self.server_args), |
| disable_overlap_schedule=self.server_args.disable_overlap_schedule, |
| + offload_tags=self.offload_tags, |
| ) |
| |
| def handle_dp_balance_data(self, local_batch: ScheduleBatch): |
| @@ -1927,6 +1928,7 @@ class Scheduler( |
| deepep_mode: DeepEPMode, |
| require_mlp_tp_gather: bool, |
| disable_overlap_schedule: bool, |
| + offload_tags: set[str], |
| ): |
| # Check if other DP workers have running batches |
| if local_batch is None: |
| @@ -1957,7 +1959,7 @@ class Scheduler( |
| ) |
| |
| tbo_preparer = TboDPAttentionPreparer() |
| - if disable_overlap_schedule: |
| + if len(offload_tags) == 0 and disable_overlap_schedule: |
| group = tp_group.device_group |
| device = tp_group.device |
| else: |
| |
| |
| |
| |
| @@ -1044,10 +1044,15 @@ class TokenizerManager: |
| request: Optional[fastapi.Request] = None, |
| ) -> Tuple[bool, str]: |
| self.auto_create_handle_loop() |
| - assert ( |
| - self.server_args.dp_size == 1 |
| - ), "dp_size must be 1 for init parameter update group" |
| - result = (await self.init_weights_update_group_communicator(obj))[0] |
| + results = await self.init_weights_update_group_communicator(obj) |
| + if self.server_args.dp_size == 1: |
| + result = results[0] |
| + return result.success, result.message |
| + else: |
| + all_success = all([r.success for r in results]) |
| + all_message = [r.message for r in results] |
| + all_message = " | ".join(all_message) |
| + return all_success, all_message |
| return result.success, result.message |
| |
| async def update_weights_from_distributed( |
| @@ -1056,9 +1061,6 @@ class TokenizerManager: |
| request: Optional[fastapi.Request] = None, |
| ) -> Tuple[bool, str]: |
| self.auto_create_handle_loop() |
| - assert ( |
| - self.server_args.dp_size == 1 or self.server_args.enable_dp_attention |
| - ), "dp_size must be 1 or dp attention must be enabled for update weights from distributed" |
| |
| if obj.abort_all_requests: |
| self.abort_request(abort_all=True) |
| @@ -1066,8 +1068,15 @@ class TokenizerManager: |
| # This means that weight sync |
| # cannot run while requests are in progress. |
| async with self.model_update_lock.writer_lock: |
| - result = (await self.update_weights_from_distributed_communicator(obj))[0] |
| - return result.success, result.message |
| + results = await self.update_weights_from_distributed_communicator(obj) |
| + if self.server_args.dp_size == 1: |
| + result = results[0] |
| + return result.success, result.message |
| + else: |
| + all_success = all([r.success for r in results]) |
| + all_message = [r.message for r in results] |
| + all_message = " | ".join(all_message) |
| + return all_success, all_message |
| |
| async def update_weights_from_tensor( |
| self, |
| |
| |
| |
| |
| @@ -22,6 +22,7 @@ import os |
| import time |
| from dataclasses import dataclass |
| from typing import List, Optional, Tuple, Union |
| +from contextlib import nullcontext |
| |
| import torch |
| import torch.distributed as dist |
| @@ -675,7 +676,7 @@ class ModelRunner: |
| monkey_patch_vllm_parallel_state() |
| monkey_patch_isinstance_for_vllm_base_layer() |
| |
| - with self.memory_saver_adapter.region(GPU_MEMORY_TYPE_WEIGHTS): |
| + with self.memory_saver_adapter.region(GPU_MEMORY_TYPE_WEIGHTS) if not self.is_draft_worker else nullcontext(): |
| self.model = get_model( |
| model_config=self.model_config, |
| load_config=self.load_config, |
| |
| |
| |
| |
| @@ -1108,5 +1108,4 @@ class Glm4MoeForCausalLM(DeepseekV2ForCausalLM): |
| ) |
| weight_loader(param, loaded_weight) |
| |
| - |
| EntryClass = [Glm4MoeForCausalLM] |
|
|