Mirrowel commited on
Commit
55a5dbb
·
1 Parent(s): 79c9227

refactor(providers): overhaul qwen code provider for custom streaming

Browse files

This commit refactors the QwenCodeProvider to implement custom streaming logic using httpx instead of relying on litellm.acompletion. The changes improve handling of reasoning content with <think> tags, direct chunk parsing to OpenAI-compatible format, and enhanced error management for rate limits and authentication.

Key improvements include:
- Direct HTTP streaming with httpx for better control over responses.
- Custom chunk conversion to handle usage data and reasoning content.
- Injection of dummy tools to prevent stream corruption, per Go example.
- Updated default base URL in QwenAuthBase for portal.qwen.ai.

src/rotator_library/providers/qwen_auth_base.py CHANGED
@@ -112,7 +112,7 @@ class QwenAuthBase:
112
 
113
  def get_api_details(self, credential_path: str) -> Tuple[str, str]:
114
  creds = self._credentials_cache[credential_path]
115
- base_url = creds.get("resource_url", "https://dashscope.aliyuncs.com/compatible-mode/v1")
116
  if not base_url.startswith("http"):
117
  base_url = f"https://{base_url}"
118
  return base_url, creds["access_token"]
 
112
 
113
  def get_api_details(self, credential_path: str) -> Tuple[str, str]:
114
  creds = self._credentials_cache[credential_path]
115
+ base_url = creds.get("resource_url", "https://portal.qwen.ai/v1")
116
  if not base_url.startswith("http"):
117
  base_url = f"https://{base_url}"
118
  return base_url, creds["access_token"]
src/rotator_library/providers/qwen_code_provider.py CHANGED
@@ -1,16 +1,17 @@
1
  # src/rotator_library/providers/qwen_code_provider.py
2
 
3
- import litellm.exceptions as litellm_exc
 
4
  import httpx
5
  import logging
6
- from typing import Union, AsyncGenerator, List
7
  from .provider_interface import ProviderInterface
8
  from .qwen_auth_base import QwenAuthBase
9
  import litellm
 
10
 
11
  lib_logger = logging.getLogger('rotator_library')
12
 
13
- # [NEW] Hardcoded model list based on Kilo example
14
  HARDCODED_MODELS = [
15
  "qwen3-coder-plus",
16
  "qwen3-coder-flash"
@@ -23,73 +24,132 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
23
  super().__init__()
24
 
25
  def has_custom_logic(self) -> bool:
26
- return True # We use custom logic to handle 401 retries and stream parsing
27
 
28
- # [NEW] get_models implementation
29
  async def get_models(self, credential: str, client: httpx.AsyncClient) -> List[str]:
30
- """Returns a hardcoded list of known compatible Qwen models for the OpenAI-compatible API."""
31
  return [f"qwen_code/{model_id}" for model_id in HARDCODED_MODELS]
32
 
33
- async def _stream_parser(self, stream: AsyncGenerator, model_id: str) -> AsyncGenerator:
34
- """Parses the stream from litellm to handle Qwen's <think> tags."""
35
- async for chunk in stream:
36
- content = chunk.choices[0].delta.content
37
- if content and ("<think>" in content or "</think>" in content):
38
- parts = content.replace("<think>", "||THINK||").replace("</think>", "||/THINK||").split("||")
39
- for part in parts:
40
- # [NEW] Check for provider-specific errors in content
41
- if "slow_down" in part.lower():
42
- lib_logger.warning("Qwen 'slow_down' detected in response content. Treating as rate limit.")
43
- raise litellm_exc.RateLimitError(
44
- message="Qwen slow_down error detected.", llm_provider="qwen_code"
45
- )
46
- if not part: continue
47
- new_chunk = chunk.copy()
48
- if part.startswith("THINK||"):
49
- new_chunk.choices[0].delta.reasoning_content = part.replace("THINK||", "")
50
- new_chunk.choices[0].delta.content = None
51
- elif part.startswith("/THINK||"):
52
- continue # Ignore closing tag
53
- else:
54
- new_chunk.choices[0].delta.content = part
55
- new_chunk.choices[0].delta.reasoning_content = None
56
- yield new_chunk
57
- else:
58
- yield chunk
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59
 
60
  async def acompletion(self, client: httpx.AsyncClient, **kwargs) -> Union[litellm.ModelResponse, AsyncGenerator[litellm.ModelResponse, None]]:
61
  credential_path = kwargs.pop("credential_identifier")
62
  model = kwargs["model"]
63
-
64
  async def do_call():
65
- api_base, access_token = self.get_api_details(credential_path)
66
- response = await litellm.acompletion(
67
- # [NEW] Add timeout and retry params if needed, but since rotation handles retries, this is optional
68
- **kwargs, api_key=access_token, api_base=api_base
69
- )
70
- # [NEW] Post-call check for specific finish reasons or errors
71
- if not kwargs.get("stream") and response.choices[0].finish_reason == "slow_down":
72
- lib_logger.warning("Qwen 'slow_down' finish reason detected. Treating as rate limit.")
73
- raise litellm_exc.RateLimitError(
74
- message="Qwen slow_down finish reason.", llm_provider="qwen_code"
75
- )
76
- return response
77
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78
  try:
79
- response = await do_call()
80
- except litellm.AuthenticationError as e:
81
- if "401" in str(e):
82
  lib_logger.warning("Qwen Code returned 401. Forcing token refresh and retrying once.")
83
  await self._refresh_token(credential_path, force=True)
84
- response = await do_call()
85
- # [NEW] Catch provider-specific exceptions
86
- elif "slow_down" in str(e).lower():
87
- raise litellm_exc.RateLimitError(
88
- message="Qwen slow_down error in exception.", llm_provider="qwen_code"
 
89
  )
90
  else:
91
  raise e
92
 
93
  if kwargs.get("stream"):
94
- return self._stream_parser(response, model)
95
- return response
 
 
 
1
  # src/rotator_library/providers/qwen_code_provider.py
2
 
3
+ import json
4
+ import time
5
  import httpx
6
  import logging
7
+ from typing import Union, AsyncGenerator, List, Dict, Any
8
  from .provider_interface import ProviderInterface
9
  from .qwen_auth_base import QwenAuthBase
10
  import litellm
11
+ from litellm.exceptions import RateLimitError, AuthenticationError
12
 
13
  lib_logger = logging.getLogger('rotator_library')
14
 
 
15
  HARDCODED_MODELS = [
16
  "qwen3-coder-plus",
17
  "qwen3-coder-flash"
 
24
  super().__init__()
25
 
26
  def has_custom_logic(self) -> bool:
27
+ return True
28
 
 
29
  async def get_models(self, credential: str, client: httpx.AsyncClient) -> List[str]:
30
+ """Returns a hardcoded list of known compatible Qwen models."""
31
  return [f"qwen_code/{model_id}" for model_id in HARDCODED_MODELS]
32
 
33
+ def _convert_chunk_to_openai(self, chunk: Dict[str, Any], model_id: str):
34
+ """Converts a raw Qwen SSE chunk to an OpenAI-compatible chunk."""
35
+ if not isinstance(chunk, dict):
36
+ return
37
+
38
+ # Handle usage data
39
+ if usage_data := chunk.get("usage"):
40
+ yield {
41
+ "choices": [], "model": model_id, "object": "chat.completion.chunk",
42
+ "id": f"chatcmpl-qwen-{time.time()}", "created": int(time.time()),
43
+ "usage": {
44
+ "prompt_tokens": usage_data.get("prompt_tokens", 0),
45
+ "completion_tokens": usage_data.get("completion_tokens", 0),
46
+ "total_tokens": usage_data.get("total_tokens", 0),
47
+ }
48
+ }
49
+ return
50
+
51
+ # Handle content data
52
+ choices = chunk.get("choices", [])
53
+ if not choices:
54
+ return
55
+
56
+ choice = choices[0]
57
+ delta = choice.get("delta", {})
58
+ finish_reason = choice.get("finish_reason")
59
+
60
+ # Handle <think> tags for reasoning content
61
+ content = delta.get("content")
62
+ if content and ("<think>" in content or "</think>" in content):
63
+ parts = content.replace("<think>", "||THINK||").replace("</think>", "||/THINK||").split("||")
64
+ for part in parts:
65
+ if not part: continue
66
+
67
+ new_delta = {}
68
+ if part.startswith("THINK||"):
69
+ new_delta['reasoning_content'] = part.replace("THINK||", "")
70
+ elif part.startswith("/THINK||"):
71
+ continue
72
+ else:
73
+ new_delta['content'] = part
74
+
75
+ yield {
76
+ "choices": [{"index": 0, "delta": new_delta, "finish_reason": None}],
77
+ "model": model_id, "object": "chat.completion.chunk",
78
+ "id": f"chatcmpl-qwen-{time.time()}", "created": int(time.time())
79
+ }
80
+ else:
81
+ # Standard content chunk
82
+ yield {
83
+ "choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}],
84
+ "model": model_id, "object": "chat.completion.chunk",
85
+ "id": f"chatcmpl-qwen-{time.time()}", "created": int(time.time())
86
+ }
87
 
88
  async def acompletion(self, client: httpx.AsyncClient, **kwargs) -> Union[litellm.ModelResponse, AsyncGenerator[litellm.ModelResponse, None]]:
89
  credential_path = kwargs.pop("credential_identifier")
90
  model = kwargs["model"]
91
+
92
  async def do_call():
93
+ api_base, access_token = await self.get_api_details(credential_path)
94
+
95
+ # Prepare payload
96
+ payload = kwargs.copy()
97
+ payload.pop("litellm_params", None) # Clean up internal params
98
+
99
+ # Per Go example, inject dummy tool to prevent stream corruption
100
+ if not payload.get("tools"):
101
+ payload["tools"] = [{"type": "function", "function": {"name": "do_not_call_me", "description": "Do not call this tool under any circumstances.", "parameters": {"type": "object", "properties": {}}}}]
102
+
103
+ # Ensure usage is included in stream
104
+ payload["stream_options"] = {"include_usage": True}
105
+
106
+ headers = {
107
+ "Authorization": f"Bearer {access_token}",
108
+ "Content-Type": "application/json",
109
+ "Accept": "text/event-stream" if kwargs.get("stream") else "application/json",
110
+ "User-Agent": "google-api-nodejs-client/9.15.1",
111
+ "X-Goog-Api-Client": "gl-node/22.17.0",
112
+ "Client-Metadata": "ideType=IDE_UNSPECIFIED,platform=PLATFORM_UNSPECIFIED,pluginType=GEMINI",
113
+ }
114
+
115
+ url = f"{api_base.rstrip('/')}/chat/completions"
116
+ lib_logger.debug(f"Qwen Code Request URL: {url}")
117
+ lib_logger.debug(f"Qwen Code Request Payload: {json.dumps(payload, indent=2)}")
118
+
119
+ async def stream_handler():
120
+ async with client.stream("POST", url, headers=headers, json=payload, timeout=600) as response:
121
+ response.raise_for_status()
122
+ async for line in response.aiter_lines():
123
+ if line.startswith('data: '):
124
+ data_str = line[6:]
125
+ if data_str == "[DONE]": break
126
+ try:
127
+ chunk = json.loads(data_str)
128
+ for openai_chunk in self._convert_chunk_to_openai(chunk, model):
129
+ yield litellm.ModelResponse(**openai_chunk)
130
+ except json.JSONDecodeError:
131
+ lib_logger.warning(f"Could not decode JSON from Qwen Code: {line}")
132
+
133
+ return stream_handler()
134
+
135
  try:
136
+ response_gen = await do_call()
137
+ except httpx.HTTPStatusError as e:
138
+ if e.response.status_code == 401:
139
  lib_logger.warning("Qwen Code returned 401. Forcing token refresh and retrying once.")
140
  await self._refresh_token(credential_path, force=True)
141
+ response_gen = await do_call()
142
+ elif e.response.status_code == 429 or "slow_down" in e.response.text.lower():
143
+ raise RateLimitError(
144
+ message=f"Qwen Code rate limit exceeded: {e.response.text}",
145
+ llm_provider="qwen_code",
146
+ response=e.response
147
  )
148
  else:
149
  raise e
150
 
151
  if kwargs.get("stream"):
152
+ return response_gen
153
+ else:
154
+ chunks = [chunk async for chunk in response_gen]
155
+ return litellm.utils.stream_to_completion_response(chunks)