Mirrowel
refactor(core): 🔨 centralize path management for PyInstaller compatibility
467f294
# src/rotator_library/providers/qwen_code_provider.py
import copy
import json
import time
import os
import httpx
import logging
from typing import Union, AsyncGenerator, List, Dict, Any
from .provider_interface import ProviderInterface
from .qwen_auth_base import QwenAuthBase
from ..model_definitions import ModelDefinitions
from ..timeout_config import TimeoutConfig
from ..utils.paths import get_logs_dir
import litellm
from litellm.exceptions import RateLimitError, AuthenticationError
from pathlib import Path
import uuid
from datetime import datetime
lib_logger = logging.getLogger("rotator_library")
def _get_qwen_code_logs_dir() -> Path:
"""Get the Qwen Code logs directory."""
logs_dir = get_logs_dir() / "qwen_code_logs"
logs_dir.mkdir(parents=True, exist_ok=True)
return logs_dir
class _QwenCodeFileLogger:
"""A simple file logger for a single Qwen Code transaction."""
def __init__(self, model_name: str, enabled: bool = True):
self.enabled = enabled
if not self.enabled:
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
request_id = str(uuid.uuid4())
# Sanitize model name for directory
safe_model_name = model_name.replace("/", "_").replace(":", "_")
self.log_dir = (
_get_qwen_code_logs_dir() / f"{timestamp}_{safe_model_name}_{request_id}"
)
try:
self.log_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
lib_logger.error(f"Failed to create Qwen Code log directory: {e}")
self.enabled = False
def log_request(self, payload: Dict[str, Any]):
"""Logs the request payload sent to Qwen Code."""
if not self.enabled:
return
try:
with open(
self.log_dir / "request_payload.json", "w", encoding="utf-8"
) as f:
json.dump(payload, f, indent=2, ensure_ascii=False)
except Exception as e:
lib_logger.error(f"_QwenCodeFileLogger: Failed to write request: {e}")
def log_response_chunk(self, chunk: str):
"""Logs a raw chunk from the Qwen Code response stream."""
if not self.enabled:
return
try:
with open(self.log_dir / "response_stream.log", "a", encoding="utf-8") as f:
f.write(chunk + "\n")
except Exception as e:
lib_logger.error(
f"_QwenCodeFileLogger: Failed to write response chunk: {e}"
)
def log_error(self, error_message: str):
"""Logs an error message."""
if not self.enabled:
return
try:
with open(self.log_dir / "error.log", "a", encoding="utf-8") as f:
f.write(f"[{datetime.utcnow().isoformat()}] {error_message}\n")
except Exception as e:
lib_logger.error(f"_QwenCodeFileLogger: Failed to write error: {e}")
def log_final_response(self, response_data: Dict[str, Any]):
"""Logs the final, reassembled response."""
if not self.enabled:
return
try:
with open(self.log_dir / "final_response.json", "w", encoding="utf-8") as f:
json.dump(response_data, f, indent=2, ensure_ascii=False)
except Exception as e:
lib_logger.error(
f"_QwenCodeFileLogger: Failed to write final response: {e}"
)
HARDCODED_MODELS = ["qwen3-coder-plus", "qwen3-coder-flash"]
# OpenAI-compatible parameters supported by Qwen Code API
SUPPORTED_PARAMS = {
"model",
"messages",
"temperature",
"top_p",
"max_tokens",
"stream",
"tools",
"tool_choice",
"presence_penalty",
"frequency_penalty",
"n",
"stop",
"seed",
"response_format",
}
class QwenCodeProvider(QwenAuthBase, ProviderInterface):
skip_cost_calculation = True
REASONING_START_MARKER = "THINK||"
def __init__(self):
super().__init__()
self.model_definitions = ModelDefinitions()
def has_custom_logic(self) -> bool:
return True
async def get_models(self, credential: str, client: httpx.AsyncClient) -> List[str]:
"""
Returns a merged list of Qwen Code models from three sources:
1. Environment variable models (via QWEN_CODE_MODELS) - ALWAYS included, take priority
2. Hardcoded models (fallback list) - added only if ID not in env vars
3. Dynamic discovery from Qwen API (if supported) - added only if ID not in env vars
Environment variable models always win and are never deduplicated, even if they
share the same ID (to support different configs like temperature, etc.)
Validates OAuth credentials if applicable.
"""
models = []
env_var_ids = (
set()
) # Track IDs from env vars to prevent hardcoded/dynamic duplicates
def extract_model_id(item) -> str:
"""Extract model ID from various formats (dict, string with/without provider prefix)."""
if isinstance(item, dict):
# Dict format: extract 'id' or 'name' field
return item.get("id") or item.get("name", "")
elif isinstance(item, str):
# String format: extract ID from "provider/id" or just "id"
return item.split("/")[-1] if "/" in item else item
return str(item)
# Source 1: Load environment variable models (ALWAYS include ALL of them)
static_models = self.model_definitions.get_all_provider_models("qwen_code")
if static_models:
for model in static_models:
# Extract model name from "qwen_code/ModelName" format
model_name = model.split("/")[-1] if "/" in model else model
# Get the actual model ID from definitions (which may differ from the name)
model_id = self.model_definitions.get_model_id("qwen_code", model_name)
# ALWAYS add env var models (no deduplication)
models.append(model)
# Track the ID to prevent hardcoded/dynamic duplicates
if model_id:
env_var_ids.add(model_id)
lib_logger.info(
f"Loaded {len(static_models)} static models for qwen_code from environment variables"
)
# Source 2: Add hardcoded models (only if ID not already in env vars)
for model_id in HARDCODED_MODELS:
if model_id not in env_var_ids:
models.append(f"qwen_code/{model_id}")
env_var_ids.add(model_id)
# Source 3: Try dynamic discovery from Qwen Code API (only if ID not already in env vars)
try:
# Validate OAuth credentials and get API details
if os.path.isfile(credential):
await self.initialize_token(credential)
api_base, access_token = await self.get_api_details(credential)
models_url = f"{api_base.rstrip('/')}/v1/models"
response = await client.get(
models_url, headers={"Authorization": f"Bearer {access_token}"}
)
response.raise_for_status()
dynamic_data = response.json()
# Handle both {data: [...]} and direct [...] formats
model_list = (
dynamic_data.get("data", dynamic_data)
if isinstance(dynamic_data, dict)
else dynamic_data
)
dynamic_count = 0
for model in model_list:
model_id = extract_model_id(model)
if model_id and model_id not in env_var_ids:
models.append(f"qwen_code/{model_id}")
env_var_ids.add(model_id)
dynamic_count += 1
if dynamic_count > 0:
lib_logger.debug(
f"Discovered {dynamic_count} additional models for qwen_code from API"
)
except Exception as e:
# Silently ignore dynamic discovery errors
lib_logger.debug(f"Dynamic model discovery failed for qwen_code: {e}")
pass
return models
def _clean_tool_schemas(self, tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Removes unsupported properties from tool schemas to prevent API errors.
Adapted for Qwen's API requirements.
"""
cleaned_tools = []
for tool in tools:
cleaned_tool = copy.deepcopy(tool)
if "function" in cleaned_tool:
func = cleaned_tool["function"]
# Remove strict mode (not supported by Qwen)
func.pop("strict", None)
# Clean parameter schema if present
if "parameters" in func and isinstance(func["parameters"], dict):
params = func["parameters"]
# Remove additionalProperties if present
params.pop("additionalProperties", None)
# Recursively clean nested properties
if "properties" in params:
self._clean_schema_properties(params["properties"])
cleaned_tools.append(cleaned_tool)
return cleaned_tools
def _clean_schema_properties(self, properties: Dict[str, Any]) -> None:
"""Recursively cleans schema properties."""
for prop_name, prop_schema in properties.items():
if isinstance(prop_schema, dict):
# Remove unsupported fields
prop_schema.pop("strict", None)
prop_schema.pop("additionalProperties", None)
# Recurse into nested properties
if "properties" in prop_schema:
self._clean_schema_properties(prop_schema["properties"])
# Recurse into array items
if "items" in prop_schema and isinstance(prop_schema["items"], dict):
self._clean_schema_properties({"item": prop_schema["items"]})
def _build_request_payload(self, **kwargs) -> Dict[str, Any]:
"""
Builds a clean request payload with only supported parameters.
This prevents 400 Bad Request errors from litellm-internal parameters.
"""
# Extract only supported OpenAI parameters
payload = {k: v for k, v in kwargs.items() if k in SUPPORTED_PARAMS}
# Always force streaming for internal processing
payload["stream"] = True
# Always include usage data in stream
payload["stream_options"] = {"include_usage": True}
# Handle tool schema cleaning
if "tools" in payload and payload["tools"]:
payload["tools"] = self._clean_tool_schemas(payload["tools"])
lib_logger.debug(f"Cleaned {len(payload['tools'])} tool schemas")
elif not payload.get("tools"):
# Per Qwen Code API bug (see: https://github.com/qianwen-team/flash-dance/issues/2),
# injecting a dummy tool prevents stream corruption when no tools are provided
payload["tools"] = [
{
"type": "function",
"function": {
"name": "do_not_call_me",
"description": "Do not call this tool.",
"parameters": {"type": "object", "properties": {}},
},
}
]
lib_logger.debug(
"Injected dummy tool to prevent Qwen API stream corruption"
)
return payload
def _convert_chunk_to_openai(self, chunk: Dict[str, Any], model_id: str):
"""
Converts a raw Qwen SSE chunk to an OpenAI-compatible chunk.
CRITICAL FIX: Handle chunks with BOTH usage and choices (final chunk)
without early return to ensure finish_reason is properly processed.
"""
if not isinstance(chunk, dict):
return
# Get choices and usage data
choices = chunk.get("choices", [])
usage_data = chunk.get("usage")
chunk_id = chunk.get("id", f"chatcmpl-qwen-{time.time()}")
chunk_created = chunk.get("created", int(time.time()))
# Handle chunks with BOTH choices and usage (typical for final chunk)
# CRITICAL: Process choices FIRST to capture finish_reason, then yield usage
if choices and usage_data:
choice = choices[0]
delta = choice.get("delta", {})
finish_reason = choice.get("finish_reason")
# Yield the choice chunk first (contains finish_reason)
yield {
"choices": [
{"index": 0, "delta": delta, "finish_reason": finish_reason}
],
"model": model_id,
"object": "chat.completion.chunk",
"id": chunk_id,
"created": chunk_created,
}
# Then yield the usage chunk
yield {
"choices": [],
"model": model_id,
"object": "chat.completion.chunk",
"id": chunk_id,
"created": chunk_created,
"usage": {
"prompt_tokens": usage_data.get("prompt_tokens", 0),
"completion_tokens": usage_data.get("completion_tokens", 0),
"total_tokens": usage_data.get("total_tokens", 0),
},
}
return
# Handle usage-only chunks
if usage_data:
yield {
"choices": [],
"model": model_id,
"object": "chat.completion.chunk",
"id": chunk_id,
"created": chunk_created,
"usage": {
"prompt_tokens": usage_data.get("prompt_tokens", 0),
"completion_tokens": usage_data.get("completion_tokens", 0),
"total_tokens": usage_data.get("total_tokens", 0),
},
}
return
# Handle content-only chunks
if not choices:
return
choice = choices[0]
delta = choice.get("delta", {})
finish_reason = choice.get("finish_reason")
# Handle <think> tags for reasoning content
content = delta.get("content")
if content and ("<think>" in content or "</think>" in content):
parts = (
content.replace("<think>", f"||{self.REASONING_START_MARKER}")
.replace("</think>", f"||/{self.REASONING_START_MARKER}")
.split("||")
)
for part in parts:
if not part:
continue
new_delta = {}
if part.startswith(self.REASONING_START_MARKER):
new_delta["reasoning_content"] = part.replace(
self.REASONING_START_MARKER, ""
)
elif part.startswith(f"/{self.REASONING_START_MARKER}"):
continue
else:
new_delta["content"] = part
yield {
"choices": [
{"index": 0, "delta": new_delta, "finish_reason": None}
],
"model": model_id,
"object": "chat.completion.chunk",
"id": chunk_id,
"created": chunk_created,
}
else:
# Standard content chunk
yield {
"choices": [
{"index": 0, "delta": delta, "finish_reason": finish_reason}
],
"model": model_id,
"object": "chat.completion.chunk",
"id": chunk_id,
"created": chunk_created,
}
def _stream_to_completion_response(
self, chunks: List[litellm.ModelResponse]
) -> litellm.ModelResponse:
"""
Manually reassembles streaming chunks into a complete response.
Key improvements:
- Determines finish_reason based on accumulated state (tool_calls vs stop)
- Properly initializes tool_calls with type field
- Handles usage data extraction from chunks
"""
if not chunks:
raise ValueError("No chunks provided for reassembly")
# Initialize the final response structure
final_message = {"role": "assistant"}
aggregated_tool_calls = {}
usage_data = None
chunk_finish_reason = (
None # Track finish_reason from chunks (but we'll override)
)
# Get the first chunk for basic response metadata
first_chunk = chunks[0]
# Process each chunk to aggregate content
for chunk in chunks:
if not hasattr(chunk, "choices") or not chunk.choices:
continue
choice = chunk.choices[0]
delta = choice.get("delta", {})
# Aggregate content
if "content" in delta and delta["content"] is not None:
if "content" not in final_message:
final_message["content"] = ""
final_message["content"] += delta["content"]
# Aggregate reasoning content
if "reasoning_content" in delta and delta["reasoning_content"] is not None:
if "reasoning_content" not in final_message:
final_message["reasoning_content"] = ""
final_message["reasoning_content"] += delta["reasoning_content"]
# Aggregate tool calls with proper initialization
if "tool_calls" in delta and delta["tool_calls"]:
for tc_chunk in delta["tool_calls"]:
index = tc_chunk.get("index", 0)
if index not in aggregated_tool_calls:
# Initialize with type field for OpenAI compatibility
aggregated_tool_calls[index] = {
"type": "function",
"function": {"name": "", "arguments": ""},
}
if "id" in tc_chunk:
aggregated_tool_calls[index]["id"] = tc_chunk["id"]
if "type" in tc_chunk:
aggregated_tool_calls[index]["type"] = tc_chunk["type"]
if "function" in tc_chunk:
if (
"name" in tc_chunk["function"]
and tc_chunk["function"]["name"] is not None
):
aggregated_tool_calls[index]["function"]["name"] += (
tc_chunk["function"]["name"]
)
if (
"arguments" in tc_chunk["function"]
and tc_chunk["function"]["arguments"] is not None
):
aggregated_tool_calls[index]["function"]["arguments"] += (
tc_chunk["function"]["arguments"]
)
# Aggregate function calls (legacy format)
if "function_call" in delta and delta["function_call"] is not None:
if "function_call" not in final_message:
final_message["function_call"] = {"name": "", "arguments": ""}
if (
"name" in delta["function_call"]
and delta["function_call"]["name"] is not None
):
final_message["function_call"]["name"] += delta["function_call"][
"name"
]
if (
"arguments" in delta["function_call"]
and delta["function_call"]["arguments"] is not None
):
final_message["function_call"]["arguments"] += delta[
"function_call"
]["arguments"]
# Track finish_reason from chunks (for reference only)
if choice.get("finish_reason"):
chunk_finish_reason = choice["finish_reason"]
# Handle usage data from the last chunk that has it
for chunk in reversed(chunks):
if hasattr(chunk, "usage") and chunk.usage:
usage_data = chunk.usage
break
# Add tool calls to final message if any
if aggregated_tool_calls:
final_message["tool_calls"] = list(aggregated_tool_calls.values())
# Ensure standard fields are present for consistent logging
for field in ["content", "tool_calls", "function_call"]:
if field not in final_message:
final_message[field] = None
# Determine finish_reason based on accumulated state
# Priority: tool_calls wins if present, then chunk's finish_reason, then default to "stop"
if aggregated_tool_calls:
finish_reason = "tool_calls"
elif chunk_finish_reason:
finish_reason = chunk_finish_reason
else:
finish_reason = "stop"
# Construct the final response
final_choice = {
"index": 0,
"message": final_message,
"finish_reason": finish_reason,
}
# Create the final ModelResponse
final_response_data = {
"id": first_chunk.id,
"object": "chat.completion",
"created": first_chunk.created,
"model": first_chunk.model,
"choices": [final_choice],
"usage": usage_data,
}
return litellm.ModelResponse(**final_response_data)
async def acompletion(
self, client: httpx.AsyncClient, **kwargs
) -> Union[litellm.ModelResponse, AsyncGenerator[litellm.ModelResponse, None]]:
credential_path = kwargs.pop("credential_identifier")
enable_request_logging = kwargs.pop("enable_request_logging", False)
model = kwargs["model"]
# Create dedicated file logger for this request
file_logger = _QwenCodeFileLogger(
model_name=model, enabled=enable_request_logging
)
async def make_request():
"""Prepares and makes the actual API call."""
api_base, access_token = await self.get_api_details(credential_path)
# Strip provider prefix from model name (e.g., "qwen_code/qwen3-coder-plus" -> "qwen3-coder-plus")
model_name = model.split("/")[-1]
kwargs_with_stripped_model = {**kwargs, "model": model_name}
# Build clean payload with only supported parameters
payload = self._build_request_payload(**kwargs_with_stripped_model)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"Accept": "text/event-stream",
"User-Agent": "google-api-nodejs-client/9.15.1",
"X-Goog-Api-Client": "gl-node/22.17.0",
"Client-Metadata": "ideType=IDE_UNSPECIFIED,platform=PLATFORM_UNSPECIFIED,pluginType=GEMINI",
}
url = f"{api_base.rstrip('/')}/v1/chat/completions"
# Log request to dedicated file
file_logger.log_request(payload)
lib_logger.debug(f"Qwen Code Request URL: {url}")
return client.stream(
"POST",
url,
headers=headers,
json=payload,
timeout=TimeoutConfig.streaming(),
)
async def stream_handler(response_stream, attempt=1):
"""Handles the streaming response and converts chunks."""
try:
async with response_stream as response:
# Check for HTTP errors before processing stream
if response.status_code >= 400:
error_text = await response.aread()
error_text = (
error_text.decode("utf-8")
if isinstance(error_text, bytes)
else error_text
)
# Handle 401: Force token refresh and retry once
if response.status_code == 401 and attempt == 1:
lib_logger.warning(
"Qwen Code returned 401. Forcing token refresh and retrying once."
)
await self._refresh_token(credential_path, force=True)
retry_stream = await make_request()
async for chunk in stream_handler(retry_stream, attempt=2):
yield chunk
return
# Handle 429: Rate limit
elif (
response.status_code == 429
or "slow_down" in error_text.lower()
):
raise RateLimitError(
f"Qwen Code rate limit exceeded: {error_text}",
llm_provider="qwen_code",
model=model,
response=response,
)
# Handle other errors
else:
error_msg = f"Qwen Code HTTP {response.status_code} error: {error_text}"
file_logger.log_error(error_msg)
raise httpx.HTTPStatusError(
f"HTTP {response.status_code}: {error_text}",
request=response.request,
response=response,
)
# Process successful streaming response
async for line in response.aiter_lines():
file_logger.log_response_chunk(line)
if line.startswith("data: "):
data_str = line[6:]
if data_str == "[DONE]":
break
try:
chunk = json.loads(data_str)
for openai_chunk in self._convert_chunk_to_openai(
chunk, model
):
yield litellm.ModelResponse(**openai_chunk)
except json.JSONDecodeError:
lib_logger.warning(
f"Could not decode JSON from Qwen Code: {line}"
)
except httpx.HTTPStatusError:
raise # Re-raise HTTP errors we already handled
except Exception as e:
file_logger.log_error(f"Error during Qwen Code stream processing: {e}")
lib_logger.error(
f"Error during Qwen Code stream processing: {e}", exc_info=True
)
raise
async def logging_stream_wrapper():
"""Wraps the stream to log the final reassembled response."""
openai_chunks = []
try:
async for chunk in stream_handler(await make_request()):
openai_chunks.append(chunk)
yield chunk
finally:
if openai_chunks:
final_response = self._stream_to_completion_response(openai_chunks)
file_logger.log_final_response(final_response.dict())
if kwargs.get("stream"):
return logging_stream_wrapper()
else:
async def non_stream_wrapper():
chunks = [chunk async for chunk in logging_stream_wrapper()]
return self._stream_to_completion_response(chunks)
return await non_stream_wrapper()