# src/rotator_library/providers/qwen_code_provider.py import copy import json import time import os import httpx import logging from typing import Union, AsyncGenerator, List, Dict, Any from .provider_interface import ProviderInterface from .qwen_auth_base import QwenAuthBase from ..model_definitions import ModelDefinitions from ..timeout_config import TimeoutConfig from ..utils.paths import get_logs_dir import litellm from litellm.exceptions import RateLimitError, AuthenticationError from pathlib import Path import uuid from datetime import datetime lib_logger = logging.getLogger("rotator_library") def _get_qwen_code_logs_dir() -> Path: """Get the Qwen Code logs directory.""" logs_dir = get_logs_dir() / "qwen_code_logs" logs_dir.mkdir(parents=True, exist_ok=True) return logs_dir class _QwenCodeFileLogger: """A simple file logger for a single Qwen Code transaction.""" def __init__(self, model_name: str, enabled: bool = True): self.enabled = enabled if not self.enabled: return timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") request_id = str(uuid.uuid4()) # Sanitize model name for directory safe_model_name = model_name.replace("/", "_").replace(":", "_") self.log_dir = ( _get_qwen_code_logs_dir() / f"{timestamp}_{safe_model_name}_{request_id}" ) try: self.log_dir.mkdir(parents=True, exist_ok=True) except Exception as e: lib_logger.error(f"Failed to create Qwen Code log directory: {e}") self.enabled = False def log_request(self, payload: Dict[str, Any]): """Logs the request payload sent to Qwen Code.""" if not self.enabled: return try: with open( self.log_dir / "request_payload.json", "w", encoding="utf-8" ) as f: json.dump(payload, f, indent=2, ensure_ascii=False) except Exception as e: lib_logger.error(f"_QwenCodeFileLogger: Failed to write request: {e}") def log_response_chunk(self, chunk: str): """Logs a raw chunk from the Qwen Code response stream.""" if not self.enabled: return try: with open(self.log_dir / "response_stream.log", "a", encoding="utf-8") as f: f.write(chunk + "\n") except Exception as e: lib_logger.error( f"_QwenCodeFileLogger: Failed to write response chunk: {e}" ) def log_error(self, error_message: str): """Logs an error message.""" if not self.enabled: return try: with open(self.log_dir / "error.log", "a", encoding="utf-8") as f: f.write(f"[{datetime.utcnow().isoformat()}] {error_message}\n") except Exception as e: lib_logger.error(f"_QwenCodeFileLogger: Failed to write error: {e}") def log_final_response(self, response_data: Dict[str, Any]): """Logs the final, reassembled response.""" if not self.enabled: return try: with open(self.log_dir / "final_response.json", "w", encoding="utf-8") as f: json.dump(response_data, f, indent=2, ensure_ascii=False) except Exception as e: lib_logger.error( f"_QwenCodeFileLogger: Failed to write final response: {e}" ) HARDCODED_MODELS = ["qwen3-coder-plus", "qwen3-coder-flash"] # OpenAI-compatible parameters supported by Qwen Code API SUPPORTED_PARAMS = { "model", "messages", "temperature", "top_p", "max_tokens", "stream", "tools", "tool_choice", "presence_penalty", "frequency_penalty", "n", "stop", "seed", "response_format", } class QwenCodeProvider(QwenAuthBase, ProviderInterface): skip_cost_calculation = True REASONING_START_MARKER = "THINK||" def __init__(self): super().__init__() self.model_definitions = ModelDefinitions() def has_custom_logic(self) -> bool: return True async def get_models(self, credential: str, client: httpx.AsyncClient) -> List[str]: """ Returns a merged list of Qwen Code models from three sources: 1. Environment variable models (via QWEN_CODE_MODELS) - ALWAYS included, take priority 2. Hardcoded models (fallback list) - added only if ID not in env vars 3. Dynamic discovery from Qwen API (if supported) - added only if ID not in env vars Environment variable models always win and are never deduplicated, even if they share the same ID (to support different configs like temperature, etc.) Validates OAuth credentials if applicable. """ models = [] env_var_ids = ( set() ) # Track IDs from env vars to prevent hardcoded/dynamic duplicates def extract_model_id(item) -> str: """Extract model ID from various formats (dict, string with/without provider prefix).""" if isinstance(item, dict): # Dict format: extract 'id' or 'name' field return item.get("id") or item.get("name", "") elif isinstance(item, str): # String format: extract ID from "provider/id" or just "id" return item.split("/")[-1] if "/" in item else item return str(item) # Source 1: Load environment variable models (ALWAYS include ALL of them) static_models = self.model_definitions.get_all_provider_models("qwen_code") if static_models: for model in static_models: # Extract model name from "qwen_code/ModelName" format model_name = model.split("/")[-1] if "/" in model else model # Get the actual model ID from definitions (which may differ from the name) model_id = self.model_definitions.get_model_id("qwen_code", model_name) # ALWAYS add env var models (no deduplication) models.append(model) # Track the ID to prevent hardcoded/dynamic duplicates if model_id: env_var_ids.add(model_id) lib_logger.info( f"Loaded {len(static_models)} static models for qwen_code from environment variables" ) # Source 2: Add hardcoded models (only if ID not already in env vars) for model_id in HARDCODED_MODELS: if model_id not in env_var_ids: models.append(f"qwen_code/{model_id}") env_var_ids.add(model_id) # Source 3: Try dynamic discovery from Qwen Code API (only if ID not already in env vars) try: # Validate OAuth credentials and get API details if os.path.isfile(credential): await self.initialize_token(credential) api_base, access_token = await self.get_api_details(credential) models_url = f"{api_base.rstrip('/')}/v1/models" response = await client.get( models_url, headers={"Authorization": f"Bearer {access_token}"} ) response.raise_for_status() dynamic_data = response.json() # Handle both {data: [...]} and direct [...] formats model_list = ( dynamic_data.get("data", dynamic_data) if isinstance(dynamic_data, dict) else dynamic_data ) dynamic_count = 0 for model in model_list: model_id = extract_model_id(model) if model_id and model_id not in env_var_ids: models.append(f"qwen_code/{model_id}") env_var_ids.add(model_id) dynamic_count += 1 if dynamic_count > 0: lib_logger.debug( f"Discovered {dynamic_count} additional models for qwen_code from API" ) except Exception as e: # Silently ignore dynamic discovery errors lib_logger.debug(f"Dynamic model discovery failed for qwen_code: {e}") pass return models def _clean_tool_schemas(self, tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Removes unsupported properties from tool schemas to prevent API errors. Adapted for Qwen's API requirements. """ cleaned_tools = [] for tool in tools: cleaned_tool = copy.deepcopy(tool) if "function" in cleaned_tool: func = cleaned_tool["function"] # Remove strict mode (not supported by Qwen) func.pop("strict", None) # Clean parameter schema if present if "parameters" in func and isinstance(func["parameters"], dict): params = func["parameters"] # Remove additionalProperties if present params.pop("additionalProperties", None) # Recursively clean nested properties if "properties" in params: self._clean_schema_properties(params["properties"]) cleaned_tools.append(cleaned_tool) return cleaned_tools def _clean_schema_properties(self, properties: Dict[str, Any]) -> None: """Recursively cleans schema properties.""" for prop_name, prop_schema in properties.items(): if isinstance(prop_schema, dict): # Remove unsupported fields prop_schema.pop("strict", None) prop_schema.pop("additionalProperties", None) # Recurse into nested properties if "properties" in prop_schema: self._clean_schema_properties(prop_schema["properties"]) # Recurse into array items if "items" in prop_schema and isinstance(prop_schema["items"], dict): self._clean_schema_properties({"item": prop_schema["items"]}) def _build_request_payload(self, **kwargs) -> Dict[str, Any]: """ Builds a clean request payload with only supported parameters. This prevents 400 Bad Request errors from litellm-internal parameters. """ # Extract only supported OpenAI parameters payload = {k: v for k, v in kwargs.items() if k in SUPPORTED_PARAMS} # Always force streaming for internal processing payload["stream"] = True # Always include usage data in stream payload["stream_options"] = {"include_usage": True} # Handle tool schema cleaning if "tools" in payload and payload["tools"]: payload["tools"] = self._clean_tool_schemas(payload["tools"]) lib_logger.debug(f"Cleaned {len(payload['tools'])} tool schemas") elif not payload.get("tools"): # Per Qwen Code API bug (see: https://github.com/qianwen-team/flash-dance/issues/2), # injecting a dummy tool prevents stream corruption when no tools are provided payload["tools"] = [ { "type": "function", "function": { "name": "do_not_call_me", "description": "Do not call this tool.", "parameters": {"type": "object", "properties": {}}, }, } ] lib_logger.debug( "Injected dummy tool to prevent Qwen API stream corruption" ) return payload def _convert_chunk_to_openai(self, chunk: Dict[str, Any], model_id: str): """ Converts a raw Qwen SSE chunk to an OpenAI-compatible chunk. CRITICAL FIX: Handle chunks with BOTH usage and choices (final chunk) without early return to ensure finish_reason is properly processed. """ if not isinstance(chunk, dict): return # Get choices and usage data choices = chunk.get("choices", []) usage_data = chunk.get("usage") chunk_id = chunk.get("id", f"chatcmpl-qwen-{time.time()}") chunk_created = chunk.get("created", int(time.time())) # Handle chunks with BOTH choices and usage (typical for final chunk) # CRITICAL: Process choices FIRST to capture finish_reason, then yield usage if choices and usage_data: choice = choices[0] delta = choice.get("delta", {}) finish_reason = choice.get("finish_reason") # Yield the choice chunk first (contains finish_reason) yield { "choices": [ {"index": 0, "delta": delta, "finish_reason": finish_reason} ], "model": model_id, "object": "chat.completion.chunk", "id": chunk_id, "created": chunk_created, } # Then yield the usage chunk yield { "choices": [], "model": model_id, "object": "chat.completion.chunk", "id": chunk_id, "created": chunk_created, "usage": { "prompt_tokens": usage_data.get("prompt_tokens", 0), "completion_tokens": usage_data.get("completion_tokens", 0), "total_tokens": usage_data.get("total_tokens", 0), }, } return # Handle usage-only chunks if usage_data: yield { "choices": [], "model": model_id, "object": "chat.completion.chunk", "id": chunk_id, "created": chunk_created, "usage": { "prompt_tokens": usage_data.get("prompt_tokens", 0), "completion_tokens": usage_data.get("completion_tokens", 0), "total_tokens": usage_data.get("total_tokens", 0), }, } return # Handle content-only chunks if not choices: return choice = choices[0] delta = choice.get("delta", {}) finish_reason = choice.get("finish_reason") # Handle tags for reasoning content content = delta.get("content") if content and ("" in content or "" in content): parts = ( content.replace("", f"||{self.REASONING_START_MARKER}") .replace("", f"||/{self.REASONING_START_MARKER}") .split("||") ) for part in parts: if not part: continue new_delta = {} if part.startswith(self.REASONING_START_MARKER): new_delta["reasoning_content"] = part.replace( self.REASONING_START_MARKER, "" ) elif part.startswith(f"/{self.REASONING_START_MARKER}"): continue else: new_delta["content"] = part yield { "choices": [ {"index": 0, "delta": new_delta, "finish_reason": None} ], "model": model_id, "object": "chat.completion.chunk", "id": chunk_id, "created": chunk_created, } else: # Standard content chunk yield { "choices": [ {"index": 0, "delta": delta, "finish_reason": finish_reason} ], "model": model_id, "object": "chat.completion.chunk", "id": chunk_id, "created": chunk_created, } def _stream_to_completion_response( self, chunks: List[litellm.ModelResponse] ) -> litellm.ModelResponse: """ Manually reassembles streaming chunks into a complete response. Key improvements: - Determines finish_reason based on accumulated state (tool_calls vs stop) - Properly initializes tool_calls with type field - Handles usage data extraction from chunks """ if not chunks: raise ValueError("No chunks provided for reassembly") # Initialize the final response structure final_message = {"role": "assistant"} aggregated_tool_calls = {} usage_data = None chunk_finish_reason = ( None # Track finish_reason from chunks (but we'll override) ) # Get the first chunk for basic response metadata first_chunk = chunks[0] # Process each chunk to aggregate content for chunk in chunks: if not hasattr(chunk, "choices") or not chunk.choices: continue choice = chunk.choices[0] delta = choice.get("delta", {}) # Aggregate content if "content" in delta and delta["content"] is not None: if "content" not in final_message: final_message["content"] = "" final_message["content"] += delta["content"] # Aggregate reasoning content if "reasoning_content" in delta and delta["reasoning_content"] is not None: if "reasoning_content" not in final_message: final_message["reasoning_content"] = "" final_message["reasoning_content"] += delta["reasoning_content"] # Aggregate tool calls with proper initialization if "tool_calls" in delta and delta["tool_calls"]: for tc_chunk in delta["tool_calls"]: index = tc_chunk.get("index", 0) if index not in aggregated_tool_calls: # Initialize with type field for OpenAI compatibility aggregated_tool_calls[index] = { "type": "function", "function": {"name": "", "arguments": ""}, } if "id" in tc_chunk: aggregated_tool_calls[index]["id"] = tc_chunk["id"] if "type" in tc_chunk: aggregated_tool_calls[index]["type"] = tc_chunk["type"] if "function" in tc_chunk: if ( "name" in tc_chunk["function"] and tc_chunk["function"]["name"] is not None ): aggregated_tool_calls[index]["function"]["name"] += ( tc_chunk["function"]["name"] ) if ( "arguments" in tc_chunk["function"] and tc_chunk["function"]["arguments"] is not None ): aggregated_tool_calls[index]["function"]["arguments"] += ( tc_chunk["function"]["arguments"] ) # Aggregate function calls (legacy format) if "function_call" in delta and delta["function_call"] is not None: if "function_call" not in final_message: final_message["function_call"] = {"name": "", "arguments": ""} if ( "name" in delta["function_call"] and delta["function_call"]["name"] is not None ): final_message["function_call"]["name"] += delta["function_call"][ "name" ] if ( "arguments" in delta["function_call"] and delta["function_call"]["arguments"] is not None ): final_message["function_call"]["arguments"] += delta[ "function_call" ]["arguments"] # Track finish_reason from chunks (for reference only) if choice.get("finish_reason"): chunk_finish_reason = choice["finish_reason"] # Handle usage data from the last chunk that has it for chunk in reversed(chunks): if hasattr(chunk, "usage") and chunk.usage: usage_data = chunk.usage break # Add tool calls to final message if any if aggregated_tool_calls: final_message["tool_calls"] = list(aggregated_tool_calls.values()) # Ensure standard fields are present for consistent logging for field in ["content", "tool_calls", "function_call"]: if field not in final_message: final_message[field] = None # Determine finish_reason based on accumulated state # Priority: tool_calls wins if present, then chunk's finish_reason, then default to "stop" if aggregated_tool_calls: finish_reason = "tool_calls" elif chunk_finish_reason: finish_reason = chunk_finish_reason else: finish_reason = "stop" # Construct the final response final_choice = { "index": 0, "message": final_message, "finish_reason": finish_reason, } # Create the final ModelResponse final_response_data = { "id": first_chunk.id, "object": "chat.completion", "created": first_chunk.created, "model": first_chunk.model, "choices": [final_choice], "usage": usage_data, } return litellm.ModelResponse(**final_response_data) async def acompletion( self, client: httpx.AsyncClient, **kwargs ) -> Union[litellm.ModelResponse, AsyncGenerator[litellm.ModelResponse, None]]: credential_path = kwargs.pop("credential_identifier") enable_request_logging = kwargs.pop("enable_request_logging", False) model = kwargs["model"] # Create dedicated file logger for this request file_logger = _QwenCodeFileLogger( model_name=model, enabled=enable_request_logging ) async def make_request(): """Prepares and makes the actual API call.""" api_base, access_token = await self.get_api_details(credential_path) # Strip provider prefix from model name (e.g., "qwen_code/qwen3-coder-plus" -> "qwen3-coder-plus") model_name = model.split("/")[-1] kwargs_with_stripped_model = {**kwargs, "model": model_name} # Build clean payload with only supported parameters payload = self._build_request_payload(**kwargs_with_stripped_model) headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", "Accept": "text/event-stream", "User-Agent": "google-api-nodejs-client/9.15.1", "X-Goog-Api-Client": "gl-node/22.17.0", "Client-Metadata": "ideType=IDE_UNSPECIFIED,platform=PLATFORM_UNSPECIFIED,pluginType=GEMINI", } url = f"{api_base.rstrip('/')}/v1/chat/completions" # Log request to dedicated file file_logger.log_request(payload) lib_logger.debug(f"Qwen Code Request URL: {url}") return client.stream( "POST", url, headers=headers, json=payload, timeout=TimeoutConfig.streaming(), ) async def stream_handler(response_stream, attempt=1): """Handles the streaming response and converts chunks.""" try: async with response_stream as response: # Check for HTTP errors before processing stream if response.status_code >= 400: error_text = await response.aread() error_text = ( error_text.decode("utf-8") if isinstance(error_text, bytes) else error_text ) # Handle 401: Force token refresh and retry once if response.status_code == 401 and attempt == 1: lib_logger.warning( "Qwen Code returned 401. Forcing token refresh and retrying once." ) await self._refresh_token(credential_path, force=True) retry_stream = await make_request() async for chunk in stream_handler(retry_stream, attempt=2): yield chunk return # Handle 429: Rate limit elif ( response.status_code == 429 or "slow_down" in error_text.lower() ): raise RateLimitError( f"Qwen Code rate limit exceeded: {error_text}", llm_provider="qwen_code", model=model, response=response, ) # Handle other errors else: error_msg = f"Qwen Code HTTP {response.status_code} error: {error_text}" file_logger.log_error(error_msg) raise httpx.HTTPStatusError( f"HTTP {response.status_code}: {error_text}", request=response.request, response=response, ) # Process successful streaming response async for line in response.aiter_lines(): file_logger.log_response_chunk(line) if line.startswith("data: "): data_str = line[6:] if data_str == "[DONE]": break try: chunk = json.loads(data_str) for openai_chunk in self._convert_chunk_to_openai( chunk, model ): yield litellm.ModelResponse(**openai_chunk) except json.JSONDecodeError: lib_logger.warning( f"Could not decode JSON from Qwen Code: {line}" ) except httpx.HTTPStatusError: raise # Re-raise HTTP errors we already handled except Exception as e: file_logger.log_error(f"Error during Qwen Code stream processing: {e}") lib_logger.error( f"Error during Qwen Code stream processing: {e}", exc_info=True ) raise async def logging_stream_wrapper(): """Wraps the stream to log the final reassembled response.""" openai_chunks = [] try: async for chunk in stream_handler(await make_request()): openai_chunks.append(chunk) yield chunk finally: if openai_chunks: final_response = self._stream_to_completion_response(openai_chunks) file_logger.log_final_response(final_response.dict()) if kwargs.get("stream"): return logging_stream_wrapper() else: async def non_stream_wrapper(): chunks = [chunk async for chunk in logging_stream_wrapper()] return self._stream_to_completion_response(chunks) return await non_stream_wrapper()