Mirrowel commited on
Commit
e2f4e9e
·
1 Parent(s): c5a61d9

refactor(timeout): 🔨 centralize HTTP timeout configuration across providers

Browse files

Introduces a new TimeoutConfig class that provides centralized timeout management for all HTTP requests across the codebase.

Key changes:
- Created `timeout_config.py` with configurable timeout values for streaming and non-streaming requests
- Replaced hardcoded `httpx.Timeout()` calls in antigravity_provider, gemini_cli_provider, iflow_provider, and qwen_code_provider
- All timeout values now support environment variable overrides (TIMEOUT_CONNECT, TIMEOUT_WRITE, TIMEOUT_POOL, TIMEOUT_READ_STREAMING, TIMEOUT_READ_NON_STREAMING)
- Improved code formatting and consistency across provider files

Default timeout values:
- Connect: 30s
- Write: 30s
- Pool: 60s
- Read (streaming): 180s (3 min between chunks)
- Read (non-streaming): 600s (10 min for full response)

This refactoring eliminates code duplication, improves maintainability, and provides flexibility for production deployments to adjust timeout values without code changes.

src/rotator_library/providers/antigravity_provider.py CHANGED
@@ -38,6 +38,7 @@ from .provider_interface import ProviderInterface, UsageResetConfigDef, QuotaGro
38
  from .antigravity_auth_base import AntigravityAuthBase
39
  from .provider_cache import ProviderCache
40
  from ..model_definitions import ModelDefinitions
 
41
 
42
 
43
  # =============================================================================
@@ -3704,8 +3705,10 @@ class AntigravityProvider(AntigravityAuthBase, ProviderInterface):
3704
  ) -> litellm.ModelResponse:
3705
  """Handle non-streaming completion."""
3706
  response = await client.post(
3707
- url, headers=headers, json=payload,
3708
- timeout=httpx.Timeout(connect=30.0, read=120.0, write=120.0, pool=120.0)
 
 
3709
  )
3710
  response.raise_for_status()
3711
 
@@ -3739,8 +3742,11 @@ class AntigravityProvider(AntigravityAuthBase, ProviderInterface):
3739
  }
3740
 
3741
  async with client.stream(
3742
- "POST", url, headers=headers, json=payload,
3743
- timeout=httpx.Timeout(connect=30.0, read=120.0, write=120.0, pool=120.0)
 
 
 
3744
  ) as response:
3745
  if response.status_code >= 400:
3746
  # Read error body for raise_for_status to include in exception
 
38
  from .antigravity_auth_base import AntigravityAuthBase
39
  from .provider_cache import ProviderCache
40
  from ..model_definitions import ModelDefinitions
41
+ from ..timeout_config import TimeoutConfig
42
 
43
 
44
  # =============================================================================
 
3705
  ) -> litellm.ModelResponse:
3706
  """Handle non-streaming completion."""
3707
  response = await client.post(
3708
+ url,
3709
+ headers=headers,
3710
+ json=payload,
3711
+ timeout=TimeoutConfig.non_streaming(),
3712
  )
3713
  response.raise_for_status()
3714
 
 
3742
  }
3743
 
3744
  async with client.stream(
3745
+ "POST",
3746
+ url,
3747
+ headers=headers,
3748
+ json=payload,
3749
+ timeout=TimeoutConfig.streaming(),
3750
  ) as response:
3751
  if response.status_code >= 400:
3752
  # Read error body for raise_for_status to include in exception
src/rotator_library/providers/gemini_cli_provider.py CHANGED
@@ -11,6 +11,7 @@ from .provider_interface import ProviderInterface
11
  from .gemini_auth_base import GeminiAuthBase
12
  from .provider_cache import ProviderCache
13
  from ..model_definitions import ModelDefinitions
 
14
  import litellm
15
  from litellm.exceptions import RateLimitError
16
  from ..error_handler import extract_retry_after_from_body
@@ -1965,7 +1966,7 @@ class GeminiCliProvider(GeminiAuthBase, ProviderInterface):
1965
  headers=final_headers,
1966
  json=request_payload,
1967
  params={"alt": "sse"},
1968
- timeout=httpx.Timeout(connect=30.0, read=120.0, write=120.0, pool=120.0),
1969
  ) as response:
1970
  # Read and log error body before raise_for_status for better debugging
1971
  if response.status_code >= 400:
 
11
  from .gemini_auth_base import GeminiAuthBase
12
  from .provider_cache import ProviderCache
13
  from ..model_definitions import ModelDefinitions
14
+ from ..timeout_config import TimeoutConfig
15
  import litellm
16
  from litellm.exceptions import RateLimitError
17
  from ..error_handler import extract_retry_after_from_body
 
1966
  headers=final_headers,
1967
  json=request_payload,
1968
  params={"alt": "sse"},
1969
+ timeout=TimeoutConfig.streaming(),
1970
  ) as response:
1971
  # Read and log error body before raise_for_status for better debugging
1972
  if response.status_code >= 400:
src/rotator_library/providers/iflow_provider.py CHANGED
@@ -10,19 +10,22 @@ from typing import Union, AsyncGenerator, List, Dict, Any
10
  from .provider_interface import ProviderInterface
11
  from .iflow_auth_base import IFlowAuthBase
12
  from ..model_definitions import ModelDefinitions
 
13
  import litellm
14
  from litellm.exceptions import RateLimitError, AuthenticationError
15
  from pathlib import Path
16
  import uuid
17
  from datetime import datetime
18
 
19
- lib_logger = logging.getLogger('rotator_library')
20
 
21
  LOGS_DIR = Path(__file__).resolve().parent.parent.parent.parent / "logs"
22
  IFLOW_LOGS_DIR = LOGS_DIR / "iflow_logs"
23
 
 
24
  class _IFlowFileLogger:
25
  """A simple file logger for a single iFlow transaction."""
 
26
  def __init__(self, model_name: str, enabled: bool = True):
27
  self.enabled = enabled
28
  if not self.enabled:
@@ -31,7 +34,7 @@ class _IFlowFileLogger:
31
  timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
32
  request_id = str(uuid.uuid4())
33
  # Sanitize model name for directory
34
- safe_model_name = model_name.replace('/', '_').replace(':', '_')
35
  self.log_dir = IFLOW_LOGS_DIR / f"{timestamp}_{safe_model_name}_{request_id}"
36
  try:
37
  self.log_dir.mkdir(parents=True, exist_ok=True)
@@ -41,16 +44,20 @@ class _IFlowFileLogger:
41
 
42
  def log_request(self, payload: Dict[str, Any]):
43
  """Logs the request payload sent to iFlow."""
44
- if not self.enabled: return
 
45
  try:
46
- with open(self.log_dir / "request_payload.json", "w", encoding="utf-8") as f:
 
 
47
  json.dump(payload, f, indent=2, ensure_ascii=False)
48
  except Exception as e:
49
  lib_logger.error(f"_IFlowFileLogger: Failed to write request: {e}")
50
 
51
  def log_response_chunk(self, chunk: str):
52
  """Logs a raw chunk from the iFlow response stream."""
53
- if not self.enabled: return
 
54
  try:
55
  with open(self.log_dir / "response_stream.log", "a", encoding="utf-8") as f:
56
  f.write(chunk + "\n")
@@ -59,7 +66,8 @@ class _IFlowFileLogger:
59
 
60
  def log_error(self, error_message: str):
61
  """Logs an error message."""
62
- if not self.enabled: return
 
63
  try:
64
  with open(self.log_dir / "error.log", "a", encoding="utf-8") as f:
65
  f.write(f"[{datetime.utcnow().isoformat()}] {error_message}\n")
@@ -68,13 +76,15 @@ class _IFlowFileLogger:
68
 
69
  def log_final_response(self, response_data: Dict[str, Any]):
70
  """Logs the final, reassembled response."""
71
- if not self.enabled: return
 
72
  try:
73
  with open(self.log_dir / "final_response.json", "w", encoding="utf-8") as f:
74
  json.dump(response_data, f, indent=2, ensure_ascii=False)
75
  except Exception as e:
76
  lib_logger.error(f"_IFlowFileLogger: Failed to write final response: {e}")
77
 
 
78
  # Model list can be expanded as iFlow supports more models
79
  HARDCODED_MODELS = [
80
  "glm-4.6",
@@ -90,14 +100,25 @@ HARDCODED_MODELS = [
90
  "deepseek-v3",
91
  "qwen3-vl-plus",
92
  "qwen3-235b-a22b-instruct",
93
- "qwen3-235b"
94
  ]
95
 
96
  # OpenAI-compatible parameters supported by iFlow API
97
  SUPPORTED_PARAMS = {
98
- 'model', 'messages', 'temperature', 'top_p', 'max_tokens',
99
- 'stream', 'tools', 'tool_choice', 'presence_penalty',
100
- 'frequency_penalty', 'n', 'stop', 'seed', 'response_format'
 
 
 
 
 
 
 
 
 
 
 
101
  }
102
 
103
 
@@ -106,6 +127,7 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
106
  iFlow provider using OAuth authentication with local callback server.
107
  API requests use the derived API key (NOT OAuth access_token).
108
  """
 
109
  skip_cost_calculation = True
110
 
111
  def __init__(self):
@@ -128,7 +150,9 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
128
  Validates OAuth credentials if applicable.
129
  """
130
  models = []
131
- env_var_ids = set() # Track IDs from env vars to prevent hardcoded/dynamic duplicates
 
 
132
 
133
  def extract_model_id(item) -> str:
134
  """Extract model ID from various formats (dict, string with/without provider prefix)."""
@@ -154,7 +178,9 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
154
  # Track the ID to prevent hardcoded/dynamic duplicates
155
  if model_id:
156
  env_var_ids.add(model_id)
157
- lib_logger.info(f"Loaded {len(static_models)} static models for iflow from environment variables")
 
 
158
 
159
  # Source 2: Add hardcoded models (only if ID not already in env vars)
160
  for model_id in HARDCODED_MODELS:
@@ -172,14 +198,17 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
172
  models_url = f"{api_base.rstrip('/')}/models"
173
 
174
  response = await client.get(
175
- models_url,
176
- headers={"Authorization": f"Bearer {api_key}"}
177
  )
178
  response.raise_for_status()
179
 
180
  dynamic_data = response.json()
181
  # Handle both {data: [...]} and direct [...] formats
182
- model_list = dynamic_data.get("data", dynamic_data) if isinstance(dynamic_data, dict) else dynamic_data
 
 
 
 
183
 
184
  dynamic_count = 0
185
  for model in model_list:
@@ -190,7 +219,9 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
190
  dynamic_count += 1
191
 
192
  if dynamic_count > 0:
193
- lib_logger.debug(f"Discovered {dynamic_count} additional models for iflow from API")
 
 
194
 
195
  except Exception as e:
196
  # Silently ignore dynamic discovery errors
@@ -255,7 +286,7 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
255
  payload = {k: v for k, v in kwargs.items() if k in SUPPORTED_PARAMS}
256
 
257
  # Always force streaming for internal processing
258
- payload['stream'] = True
259
 
260
  # NOTE: iFlow API does not support stream_options parameter
261
  # Unlike other providers, we don't include it to avoid HTTP 406 errors
@@ -264,16 +295,22 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
264
  if "tools" in payload and payload["tools"]:
265
  payload["tools"] = self._clean_tool_schemas(payload["tools"])
266
  lib_logger.debug(f"Cleaned {len(payload['tools'])} tool schemas")
267
- elif "tools" in payload and isinstance(payload["tools"], list) and len(payload["tools"]) == 0:
 
 
 
 
268
  # Inject dummy tool for empty arrays to prevent streaming issues (similar to Qwen's behavior)
269
- payload["tools"] = [{
270
- "type": "function",
271
- "function": {
272
- "name": "noop",
273
- "description": "Placeholder tool to stabilise streaming",
274
- "parameters": {"type": "object"}
 
 
275
  }
276
- }]
277
  lib_logger.debug("Injected placeholder tool for empty tools array")
278
 
279
  return payload
@@ -282,7 +319,7 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
282
  """
283
  Converts a raw iFlow SSE chunk to an OpenAI-compatible chunk.
284
  Since iFlow is OpenAI-compatible, minimal conversion is needed.
285
-
286
  CRITICAL FIX: Handle chunks with BOTH usage and choices (final chunk)
287
  without early return to ensure finish_reason is properly processed.
288
  """
@@ -302,32 +339,36 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
302
  "model": model_id,
303
  "object": "chat.completion.chunk",
304
  "id": chunk.get("id", f"chatcmpl-iflow-{time.time()}"),
305
- "created": chunk.get("created", int(time.time()))
306
  }
307
  # Then yield the usage chunk
308
  yield {
309
- "choices": [], "model": model_id, "object": "chat.completion.chunk",
 
 
310
  "id": chunk.get("id", f"chatcmpl-iflow-{time.time()}"),
311
  "created": chunk.get("created", int(time.time())),
312
  "usage": {
313
  "prompt_tokens": usage_data.get("prompt_tokens", 0),
314
  "completion_tokens": usage_data.get("completion_tokens", 0),
315
  "total_tokens": usage_data.get("total_tokens", 0),
316
- }
317
  }
318
  return
319
 
320
  # Handle usage-only chunks
321
  if usage_data:
322
  yield {
323
- "choices": [], "model": model_id, "object": "chat.completion.chunk",
 
 
324
  "id": chunk.get("id", f"chatcmpl-iflow-{time.time()}"),
325
  "created": chunk.get("created", int(time.time())),
326
  "usage": {
327
  "prompt_tokens": usage_data.get("prompt_tokens", 0),
328
  "completion_tokens": usage_data.get("completion_tokens", 0),
329
  "total_tokens": usage_data.get("total_tokens", 0),
330
- }
331
  }
332
  return
333
 
@@ -339,13 +380,15 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
339
  "model": model_id,
340
  "object": "chat.completion.chunk",
341
  "id": chunk.get("id", f"chatcmpl-iflow-{time.time()}"),
342
- "created": chunk.get("created", int(time.time()))
343
  }
344
 
345
- def _stream_to_completion_response(self, chunks: List[litellm.ModelResponse]) -> litellm.ModelResponse:
 
 
346
  """
347
  Manually reassembles streaming chunks into a complete response.
348
-
349
  Key improvements:
350
  - Determines finish_reason based on accumulated state (tool_calls vs stop)
351
  - Properly initializes tool_calls with type field
@@ -358,14 +401,16 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
358
  final_message = {"role": "assistant"}
359
  aggregated_tool_calls = {}
360
  usage_data = None
361
- chunk_finish_reason = None # Track finish_reason from chunks (but we'll override)
 
 
362
 
363
  # Get the first chunk for basic response metadata
364
  first_chunk = chunks[0]
365
 
366
  # Process each chunk to aggregate content
367
  for chunk in chunks:
368
- if not hasattr(chunk, 'choices') or not chunk.choices:
369
  continue
370
 
371
  choice = chunk.choices[0]
@@ -389,25 +434,48 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
389
  index = tc_chunk.get("index", 0)
390
  if index not in aggregated_tool_calls:
391
  # Initialize with type field for OpenAI compatibility
392
- aggregated_tool_calls[index] = {"type": "function", "function": {"name": "", "arguments": ""}}
 
 
 
393
  if "id" in tc_chunk:
394
  aggregated_tool_calls[index]["id"] = tc_chunk["id"]
395
  if "type" in tc_chunk:
396
  aggregated_tool_calls[index]["type"] = tc_chunk["type"]
397
  if "function" in tc_chunk:
398
- if "name" in tc_chunk["function"] and tc_chunk["function"]["name"] is not None:
399
- aggregated_tool_calls[index]["function"]["name"] += tc_chunk["function"]["name"]
400
- if "arguments" in tc_chunk["function"] and tc_chunk["function"]["arguments"] is not None:
401
- aggregated_tool_calls[index]["function"]["arguments"] += tc_chunk["function"]["arguments"]
 
 
 
 
 
 
 
 
 
 
402
 
403
  # Aggregate function calls (legacy format)
404
  if "function_call" in delta and delta["function_call"] is not None:
405
  if "function_call" not in final_message:
406
  final_message["function_call"] = {"name": "", "arguments": ""}
407
- if "name" in delta["function_call"] and delta["function_call"]["name"] is not None:
408
- final_message["function_call"]["name"] += delta["function_call"]["name"]
409
- if "arguments" in delta["function_call"] and delta["function_call"]["arguments"] is not None:
410
- final_message["function_call"]["arguments"] += delta["function_call"]["arguments"]
 
 
 
 
 
 
 
 
 
 
411
 
412
  # Track finish_reason from chunks (for reference only)
413
  if choice.get("finish_reason"):
@@ -415,7 +483,7 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
415
 
416
  # Handle usage data from the last chunk that has it
417
  for chunk in reversed(chunks):
418
- if hasattr(chunk, 'usage') and chunk.usage:
419
  usage_data = chunk.usage
420
  break
421
 
@@ -441,7 +509,7 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
441
  final_choice = {
442
  "index": 0,
443
  "message": final_message,
444
- "finish_reason": finish_reason
445
  }
446
 
447
  # Create the final ModelResponse
@@ -451,21 +519,20 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
451
  "created": first_chunk.created,
452
  "model": first_chunk.model,
453
  "choices": [final_choice],
454
- "usage": usage_data
455
  }
456
 
457
  return litellm.ModelResponse(**final_response_data)
458
 
459
- async def acompletion(self, client: httpx.AsyncClient, **kwargs) -> Union[litellm.ModelResponse, AsyncGenerator[litellm.ModelResponse, None]]:
 
 
460
  credential_path = kwargs.pop("credential_identifier")
461
  enable_request_logging = kwargs.pop("enable_request_logging", False)
462
  model = kwargs["model"]
463
 
464
  # Create dedicated file logger for this request
465
- file_logger = _IFlowFileLogger(
466
- model_name=model,
467
- enabled=enable_request_logging
468
- )
469
 
470
  async def make_request():
471
  """Prepares and makes the actual API call."""
@@ -473,8 +540,8 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
473
  api_base, api_key = await self.get_api_details(credential_path)
474
 
475
  # Strip provider prefix from model name (e.g., "iflow/Qwen3-Coder-Plus" -> "Qwen3-Coder-Plus")
476
- model_name = model.split('/')[-1]
477
- kwargs_with_stripped_model = {**kwargs, 'model': model_name}
478
 
479
  # Build clean payload with only supported parameters
480
  payload = self._build_request_payload(**kwargs_with_stripped_model)
@@ -483,7 +550,7 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
483
  "Authorization": f"Bearer {api_key}", # Uses api_key from user info
484
  "Content-Type": "application/json",
485
  "Accept": "text/event-stream",
486
- "User-Agent": "iFlow-Cli"
487
  }
488
 
489
  url = f"{api_base.rstrip('/')}/chat/completions"
@@ -493,8 +560,11 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
493
  lib_logger.debug(f"iFlow Request URL: {url}")
494
 
495
  return client.stream(
496
- "POST", url, headers=headers, json=payload,
497
- timeout=httpx.Timeout(connect=30.0, read=120.0, write=120.0, pool=120.0)
 
 
 
498
  )
499
 
500
  async def stream_handler(response_stream, attempt=1):
@@ -504,11 +574,17 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
504
  # Check for HTTP errors before processing stream
505
  if response.status_code >= 400:
506
  error_text = await response.aread()
507
- error_text = error_text.decode('utf-8') if isinstance(error_text, bytes) else error_text
 
 
 
 
508
 
509
  # Handle 401: Force token refresh and retry once
510
  if response.status_code == 401 and attempt == 1:
511
- lib_logger.warning("iFlow returned 401. Forcing token refresh and retrying once.")
 
 
512
  await self._refresh_token(credential_path, force=True)
513
  retry_stream = await make_request()
514
  async for chunk in stream_handler(retry_stream, attempt=2):
@@ -516,50 +592,61 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
516
  return
517
 
518
  # Handle 429: Rate limit
519
- elif response.status_code == 429 or "slow_down" in error_text.lower():
 
 
 
520
  raise RateLimitError(
521
  f"iFlow rate limit exceeded: {error_text}",
522
  llm_provider="iflow",
523
  model=model,
524
- response=response
525
  )
526
 
527
  # Handle other errors
528
  else:
529
- error_msg = f"iFlow HTTP {response.status_code} error: {error_text}"
 
 
530
  file_logger.log_error(error_msg)
531
  raise httpx.HTTPStatusError(
532
  f"HTTP {response.status_code}: {error_text}",
533
  request=response.request,
534
- response=response
535
  )
536
 
537
  # Process successful streaming response
538
  async for line in response.aiter_lines():
539
  file_logger.log_response_chunk(line)
540
-
541
  # CRITICAL FIX: Handle both "data:" (no space) and "data: " (with space)
542
- if line.startswith('data:'):
543
  # Extract data after "data:" prefix, handling both formats
544
- if line.startswith('data: '):
545
  data_str = line[6:] # Skip "data: "
546
  else:
547
  data_str = line[5:] # Skip "data:"
548
-
549
  if data_str.strip() == "[DONE]":
550
  break
551
  try:
552
  chunk = json.loads(data_str)
553
- for openai_chunk in self._convert_chunk_to_openai(chunk, model):
 
 
554
  yield litellm.ModelResponse(**openai_chunk)
555
  except json.JSONDecodeError:
556
- lib_logger.warning(f"Could not decode JSON from iFlow: {line}")
 
 
557
 
558
  except httpx.HTTPStatusError:
559
  raise # Re-raise HTTP errors we already handled
560
  except Exception as e:
561
  file_logger.log_error(f"Error during iFlow stream processing: {e}")
562
- lib_logger.error(f"Error during iFlow stream processing: {e}", exc_info=True)
 
 
563
  raise
564
 
565
  async def logging_stream_wrapper():
@@ -577,7 +664,9 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
577
  if kwargs.get("stream"):
578
  return logging_stream_wrapper()
579
  else:
 
580
  async def non_stream_wrapper():
581
  chunks = [chunk async for chunk in logging_stream_wrapper()]
582
  return self._stream_to_completion_response(chunks)
 
583
  return await non_stream_wrapper()
 
10
  from .provider_interface import ProviderInterface
11
  from .iflow_auth_base import IFlowAuthBase
12
  from ..model_definitions import ModelDefinitions
13
+ from ..timeout_config import TimeoutConfig
14
  import litellm
15
  from litellm.exceptions import RateLimitError, AuthenticationError
16
  from pathlib import Path
17
  import uuid
18
  from datetime import datetime
19
 
20
+ lib_logger = logging.getLogger("rotator_library")
21
 
22
  LOGS_DIR = Path(__file__).resolve().parent.parent.parent.parent / "logs"
23
  IFLOW_LOGS_DIR = LOGS_DIR / "iflow_logs"
24
 
25
+
26
  class _IFlowFileLogger:
27
  """A simple file logger for a single iFlow transaction."""
28
+
29
  def __init__(self, model_name: str, enabled: bool = True):
30
  self.enabled = enabled
31
  if not self.enabled:
 
34
  timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
35
  request_id = str(uuid.uuid4())
36
  # Sanitize model name for directory
37
+ safe_model_name = model_name.replace("/", "_").replace(":", "_")
38
  self.log_dir = IFLOW_LOGS_DIR / f"{timestamp}_{safe_model_name}_{request_id}"
39
  try:
40
  self.log_dir.mkdir(parents=True, exist_ok=True)
 
44
 
45
  def log_request(self, payload: Dict[str, Any]):
46
  """Logs the request payload sent to iFlow."""
47
+ if not self.enabled:
48
+ return
49
  try:
50
+ with open(
51
+ self.log_dir / "request_payload.json", "w", encoding="utf-8"
52
+ ) as f:
53
  json.dump(payload, f, indent=2, ensure_ascii=False)
54
  except Exception as e:
55
  lib_logger.error(f"_IFlowFileLogger: Failed to write request: {e}")
56
 
57
  def log_response_chunk(self, chunk: str):
58
  """Logs a raw chunk from the iFlow response stream."""
59
+ if not self.enabled:
60
+ return
61
  try:
62
  with open(self.log_dir / "response_stream.log", "a", encoding="utf-8") as f:
63
  f.write(chunk + "\n")
 
66
 
67
  def log_error(self, error_message: str):
68
  """Logs an error message."""
69
+ if not self.enabled:
70
+ return
71
  try:
72
  with open(self.log_dir / "error.log", "a", encoding="utf-8") as f:
73
  f.write(f"[{datetime.utcnow().isoformat()}] {error_message}\n")
 
76
 
77
  def log_final_response(self, response_data: Dict[str, Any]):
78
  """Logs the final, reassembled response."""
79
+ if not self.enabled:
80
+ return
81
  try:
82
  with open(self.log_dir / "final_response.json", "w", encoding="utf-8") as f:
83
  json.dump(response_data, f, indent=2, ensure_ascii=False)
84
  except Exception as e:
85
  lib_logger.error(f"_IFlowFileLogger: Failed to write final response: {e}")
86
 
87
+
88
  # Model list can be expanded as iFlow supports more models
89
  HARDCODED_MODELS = [
90
  "glm-4.6",
 
100
  "deepseek-v3",
101
  "qwen3-vl-plus",
102
  "qwen3-235b-a22b-instruct",
103
+ "qwen3-235b",
104
  ]
105
 
106
  # OpenAI-compatible parameters supported by iFlow API
107
  SUPPORTED_PARAMS = {
108
+ "model",
109
+ "messages",
110
+ "temperature",
111
+ "top_p",
112
+ "max_tokens",
113
+ "stream",
114
+ "tools",
115
+ "tool_choice",
116
+ "presence_penalty",
117
+ "frequency_penalty",
118
+ "n",
119
+ "stop",
120
+ "seed",
121
+ "response_format",
122
  }
123
 
124
 
 
127
  iFlow provider using OAuth authentication with local callback server.
128
  API requests use the derived API key (NOT OAuth access_token).
129
  """
130
+
131
  skip_cost_calculation = True
132
 
133
  def __init__(self):
 
150
  Validates OAuth credentials if applicable.
151
  """
152
  models = []
153
+ env_var_ids = (
154
+ set()
155
+ ) # Track IDs from env vars to prevent hardcoded/dynamic duplicates
156
 
157
  def extract_model_id(item) -> str:
158
  """Extract model ID from various formats (dict, string with/without provider prefix)."""
 
178
  # Track the ID to prevent hardcoded/dynamic duplicates
179
  if model_id:
180
  env_var_ids.add(model_id)
181
+ lib_logger.info(
182
+ f"Loaded {len(static_models)} static models for iflow from environment variables"
183
+ )
184
 
185
  # Source 2: Add hardcoded models (only if ID not already in env vars)
186
  for model_id in HARDCODED_MODELS:
 
198
  models_url = f"{api_base.rstrip('/')}/models"
199
 
200
  response = await client.get(
201
+ models_url, headers={"Authorization": f"Bearer {api_key}"}
 
202
  )
203
  response.raise_for_status()
204
 
205
  dynamic_data = response.json()
206
  # Handle both {data: [...]} and direct [...] formats
207
+ model_list = (
208
+ dynamic_data.get("data", dynamic_data)
209
+ if isinstance(dynamic_data, dict)
210
+ else dynamic_data
211
+ )
212
 
213
  dynamic_count = 0
214
  for model in model_list:
 
219
  dynamic_count += 1
220
 
221
  if dynamic_count > 0:
222
+ lib_logger.debug(
223
+ f"Discovered {dynamic_count} additional models for iflow from API"
224
+ )
225
 
226
  except Exception as e:
227
  # Silently ignore dynamic discovery errors
 
286
  payload = {k: v for k, v in kwargs.items() if k in SUPPORTED_PARAMS}
287
 
288
  # Always force streaming for internal processing
289
+ payload["stream"] = True
290
 
291
  # NOTE: iFlow API does not support stream_options parameter
292
  # Unlike other providers, we don't include it to avoid HTTP 406 errors
 
295
  if "tools" in payload and payload["tools"]:
296
  payload["tools"] = self._clean_tool_schemas(payload["tools"])
297
  lib_logger.debug(f"Cleaned {len(payload['tools'])} tool schemas")
298
+ elif (
299
+ "tools" in payload
300
+ and isinstance(payload["tools"], list)
301
+ and len(payload["tools"]) == 0
302
+ ):
303
  # Inject dummy tool for empty arrays to prevent streaming issues (similar to Qwen's behavior)
304
+ payload["tools"] = [
305
+ {
306
+ "type": "function",
307
+ "function": {
308
+ "name": "noop",
309
+ "description": "Placeholder tool to stabilise streaming",
310
+ "parameters": {"type": "object"},
311
+ },
312
  }
313
+ ]
314
  lib_logger.debug("Injected placeholder tool for empty tools array")
315
 
316
  return payload
 
319
  """
320
  Converts a raw iFlow SSE chunk to an OpenAI-compatible chunk.
321
  Since iFlow is OpenAI-compatible, minimal conversion is needed.
322
+
323
  CRITICAL FIX: Handle chunks with BOTH usage and choices (final chunk)
324
  without early return to ensure finish_reason is properly processed.
325
  """
 
339
  "model": model_id,
340
  "object": "chat.completion.chunk",
341
  "id": chunk.get("id", f"chatcmpl-iflow-{time.time()}"),
342
+ "created": chunk.get("created", int(time.time())),
343
  }
344
  # Then yield the usage chunk
345
  yield {
346
+ "choices": [],
347
+ "model": model_id,
348
+ "object": "chat.completion.chunk",
349
  "id": chunk.get("id", f"chatcmpl-iflow-{time.time()}"),
350
  "created": chunk.get("created", int(time.time())),
351
  "usage": {
352
  "prompt_tokens": usage_data.get("prompt_tokens", 0),
353
  "completion_tokens": usage_data.get("completion_tokens", 0),
354
  "total_tokens": usage_data.get("total_tokens", 0),
355
+ },
356
  }
357
  return
358
 
359
  # Handle usage-only chunks
360
  if usage_data:
361
  yield {
362
+ "choices": [],
363
+ "model": model_id,
364
+ "object": "chat.completion.chunk",
365
  "id": chunk.get("id", f"chatcmpl-iflow-{time.time()}"),
366
  "created": chunk.get("created", int(time.time())),
367
  "usage": {
368
  "prompt_tokens": usage_data.get("prompt_tokens", 0),
369
  "completion_tokens": usage_data.get("completion_tokens", 0),
370
  "total_tokens": usage_data.get("total_tokens", 0),
371
+ },
372
  }
373
  return
374
 
 
380
  "model": model_id,
381
  "object": "chat.completion.chunk",
382
  "id": chunk.get("id", f"chatcmpl-iflow-{time.time()}"),
383
+ "created": chunk.get("created", int(time.time())),
384
  }
385
 
386
+ def _stream_to_completion_response(
387
+ self, chunks: List[litellm.ModelResponse]
388
+ ) -> litellm.ModelResponse:
389
  """
390
  Manually reassembles streaming chunks into a complete response.
391
+
392
  Key improvements:
393
  - Determines finish_reason based on accumulated state (tool_calls vs stop)
394
  - Properly initializes tool_calls with type field
 
401
  final_message = {"role": "assistant"}
402
  aggregated_tool_calls = {}
403
  usage_data = None
404
+ chunk_finish_reason = (
405
+ None # Track finish_reason from chunks (but we'll override)
406
+ )
407
 
408
  # Get the first chunk for basic response metadata
409
  first_chunk = chunks[0]
410
 
411
  # Process each chunk to aggregate content
412
  for chunk in chunks:
413
+ if not hasattr(chunk, "choices") or not chunk.choices:
414
  continue
415
 
416
  choice = chunk.choices[0]
 
434
  index = tc_chunk.get("index", 0)
435
  if index not in aggregated_tool_calls:
436
  # Initialize with type field for OpenAI compatibility
437
+ aggregated_tool_calls[index] = {
438
+ "type": "function",
439
+ "function": {"name": "", "arguments": ""},
440
+ }
441
  if "id" in tc_chunk:
442
  aggregated_tool_calls[index]["id"] = tc_chunk["id"]
443
  if "type" in tc_chunk:
444
  aggregated_tool_calls[index]["type"] = tc_chunk["type"]
445
  if "function" in tc_chunk:
446
+ if (
447
+ "name" in tc_chunk["function"]
448
+ and tc_chunk["function"]["name"] is not None
449
+ ):
450
+ aggregated_tool_calls[index]["function"]["name"] += (
451
+ tc_chunk["function"]["name"]
452
+ )
453
+ if (
454
+ "arguments" in tc_chunk["function"]
455
+ and tc_chunk["function"]["arguments"] is not None
456
+ ):
457
+ aggregated_tool_calls[index]["function"]["arguments"] += (
458
+ tc_chunk["function"]["arguments"]
459
+ )
460
 
461
  # Aggregate function calls (legacy format)
462
  if "function_call" in delta and delta["function_call"] is not None:
463
  if "function_call" not in final_message:
464
  final_message["function_call"] = {"name": "", "arguments": ""}
465
+ if (
466
+ "name" in delta["function_call"]
467
+ and delta["function_call"]["name"] is not None
468
+ ):
469
+ final_message["function_call"]["name"] += delta["function_call"][
470
+ "name"
471
+ ]
472
+ if (
473
+ "arguments" in delta["function_call"]
474
+ and delta["function_call"]["arguments"] is not None
475
+ ):
476
+ final_message["function_call"]["arguments"] += delta[
477
+ "function_call"
478
+ ]["arguments"]
479
 
480
  # Track finish_reason from chunks (for reference only)
481
  if choice.get("finish_reason"):
 
483
 
484
  # Handle usage data from the last chunk that has it
485
  for chunk in reversed(chunks):
486
+ if hasattr(chunk, "usage") and chunk.usage:
487
  usage_data = chunk.usage
488
  break
489
 
 
509
  final_choice = {
510
  "index": 0,
511
  "message": final_message,
512
+ "finish_reason": finish_reason,
513
  }
514
 
515
  # Create the final ModelResponse
 
519
  "created": first_chunk.created,
520
  "model": first_chunk.model,
521
  "choices": [final_choice],
522
+ "usage": usage_data,
523
  }
524
 
525
  return litellm.ModelResponse(**final_response_data)
526
 
527
+ async def acompletion(
528
+ self, client: httpx.AsyncClient, **kwargs
529
+ ) -> Union[litellm.ModelResponse, AsyncGenerator[litellm.ModelResponse, None]]:
530
  credential_path = kwargs.pop("credential_identifier")
531
  enable_request_logging = kwargs.pop("enable_request_logging", False)
532
  model = kwargs["model"]
533
 
534
  # Create dedicated file logger for this request
535
+ file_logger = _IFlowFileLogger(model_name=model, enabled=enable_request_logging)
 
 
 
536
 
537
  async def make_request():
538
  """Prepares and makes the actual API call."""
 
540
  api_base, api_key = await self.get_api_details(credential_path)
541
 
542
  # Strip provider prefix from model name (e.g., "iflow/Qwen3-Coder-Plus" -> "Qwen3-Coder-Plus")
543
+ model_name = model.split("/")[-1]
544
+ kwargs_with_stripped_model = {**kwargs, "model": model_name}
545
 
546
  # Build clean payload with only supported parameters
547
  payload = self._build_request_payload(**kwargs_with_stripped_model)
 
550
  "Authorization": f"Bearer {api_key}", # Uses api_key from user info
551
  "Content-Type": "application/json",
552
  "Accept": "text/event-stream",
553
+ "User-Agent": "iFlow-Cli",
554
  }
555
 
556
  url = f"{api_base.rstrip('/')}/chat/completions"
 
560
  lib_logger.debug(f"iFlow Request URL: {url}")
561
 
562
  return client.stream(
563
+ "POST",
564
+ url,
565
+ headers=headers,
566
+ json=payload,
567
+ timeout=TimeoutConfig.streaming(),
568
  )
569
 
570
  async def stream_handler(response_stream, attempt=1):
 
574
  # Check for HTTP errors before processing stream
575
  if response.status_code >= 400:
576
  error_text = await response.aread()
577
+ error_text = (
578
+ error_text.decode("utf-8")
579
+ if isinstance(error_text, bytes)
580
+ else error_text
581
+ )
582
 
583
  # Handle 401: Force token refresh and retry once
584
  if response.status_code == 401 and attempt == 1:
585
+ lib_logger.warning(
586
+ "iFlow returned 401. Forcing token refresh and retrying once."
587
+ )
588
  await self._refresh_token(credential_path, force=True)
589
  retry_stream = await make_request()
590
  async for chunk in stream_handler(retry_stream, attempt=2):
 
592
  return
593
 
594
  # Handle 429: Rate limit
595
+ elif (
596
+ response.status_code == 429
597
+ or "slow_down" in error_text.lower()
598
+ ):
599
  raise RateLimitError(
600
  f"iFlow rate limit exceeded: {error_text}",
601
  llm_provider="iflow",
602
  model=model,
603
+ response=response,
604
  )
605
 
606
  # Handle other errors
607
  else:
608
+ error_msg = (
609
+ f"iFlow HTTP {response.status_code} error: {error_text}"
610
+ )
611
  file_logger.log_error(error_msg)
612
  raise httpx.HTTPStatusError(
613
  f"HTTP {response.status_code}: {error_text}",
614
  request=response.request,
615
+ response=response,
616
  )
617
 
618
  # Process successful streaming response
619
  async for line in response.aiter_lines():
620
  file_logger.log_response_chunk(line)
621
+
622
  # CRITICAL FIX: Handle both "data:" (no space) and "data: " (with space)
623
+ if line.startswith("data:"):
624
  # Extract data after "data:" prefix, handling both formats
625
+ if line.startswith("data: "):
626
  data_str = line[6:] # Skip "data: "
627
  else:
628
  data_str = line[5:] # Skip "data:"
629
+
630
  if data_str.strip() == "[DONE]":
631
  break
632
  try:
633
  chunk = json.loads(data_str)
634
+ for openai_chunk in self._convert_chunk_to_openai(
635
+ chunk, model
636
+ ):
637
  yield litellm.ModelResponse(**openai_chunk)
638
  except json.JSONDecodeError:
639
+ lib_logger.warning(
640
+ f"Could not decode JSON from iFlow: {line}"
641
+ )
642
 
643
  except httpx.HTTPStatusError:
644
  raise # Re-raise HTTP errors we already handled
645
  except Exception as e:
646
  file_logger.log_error(f"Error during iFlow stream processing: {e}")
647
+ lib_logger.error(
648
+ f"Error during iFlow stream processing: {e}", exc_info=True
649
+ )
650
  raise
651
 
652
  async def logging_stream_wrapper():
 
664
  if kwargs.get("stream"):
665
  return logging_stream_wrapper()
666
  else:
667
+
668
  async def non_stream_wrapper():
669
  chunks = [chunk async for chunk in logging_stream_wrapper()]
670
  return self._stream_to_completion_response(chunks)
671
+
672
  return await non_stream_wrapper()
src/rotator_library/providers/qwen_code_provider.py CHANGED
@@ -10,19 +10,22 @@ from typing import Union, AsyncGenerator, List, Dict, Any
10
  from .provider_interface import ProviderInterface
11
  from .qwen_auth_base import QwenAuthBase
12
  from ..model_definitions import ModelDefinitions
 
13
  import litellm
14
  from litellm.exceptions import RateLimitError, AuthenticationError
15
  from pathlib import Path
16
  import uuid
17
  from datetime import datetime
18
 
19
- lib_logger = logging.getLogger('rotator_library')
20
 
21
  LOGS_DIR = Path(__file__).resolve().parent.parent.parent.parent / "logs"
22
  QWEN_CODE_LOGS_DIR = LOGS_DIR / "qwen_code_logs"
23
 
 
24
  class _QwenCodeFileLogger:
25
  """A simple file logger for a single Qwen Code transaction."""
 
26
  def __init__(self, model_name: str, enabled: bool = True):
27
  self.enabled = enabled
28
  if not self.enabled:
@@ -31,8 +34,10 @@ class _QwenCodeFileLogger:
31
  timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
32
  request_id = str(uuid.uuid4())
33
  # Sanitize model name for directory
34
- safe_model_name = model_name.replace('/', '_').replace(':', '_')
35
- self.log_dir = QWEN_CODE_LOGS_DIR / f"{timestamp}_{safe_model_name}_{request_id}"
 
 
36
  try:
37
  self.log_dir.mkdir(parents=True, exist_ok=True)
38
  except Exception as e:
@@ -41,25 +46,32 @@ class _QwenCodeFileLogger:
41
 
42
  def log_request(self, payload: Dict[str, Any]):
43
  """Logs the request payload sent to Qwen Code."""
44
- if not self.enabled: return
 
45
  try:
46
- with open(self.log_dir / "request_payload.json", "w", encoding="utf-8") as f:
 
 
47
  json.dump(payload, f, indent=2, ensure_ascii=False)
48
  except Exception as e:
49
  lib_logger.error(f"_QwenCodeFileLogger: Failed to write request: {e}")
50
 
51
  def log_response_chunk(self, chunk: str):
52
  """Logs a raw chunk from the Qwen Code response stream."""
53
- if not self.enabled: return
 
54
  try:
55
  with open(self.log_dir / "response_stream.log", "a", encoding="utf-8") as f:
56
  f.write(chunk + "\n")
57
  except Exception as e:
58
- lib_logger.error(f"_QwenCodeFileLogger: Failed to write response chunk: {e}")
 
 
59
 
60
  def log_error(self, error_message: str):
61
  """Logs an error message."""
62
- if not self.enabled: return
 
63
  try:
64
  with open(self.log_dir / "error.log", "a", encoding="utf-8") as f:
65
  f.write(f"[{datetime.utcnow().isoformat()}] {error_message}\n")
@@ -68,28 +80,41 @@ class _QwenCodeFileLogger:
68
 
69
  def log_final_response(self, response_data: Dict[str, Any]):
70
  """Logs the final, reassembled response."""
71
- if not self.enabled: return
 
72
  try:
73
  with open(self.log_dir / "final_response.json", "w", encoding="utf-8") as f:
74
  json.dump(response_data, f, indent=2, ensure_ascii=False)
75
  except Exception as e:
76
- lib_logger.error(f"_QwenCodeFileLogger: Failed to write final response: {e}")
 
 
 
77
 
78
- HARDCODED_MODELS = [
79
- "qwen3-coder-plus",
80
- "qwen3-coder-flash"
81
- ]
82
 
83
  # OpenAI-compatible parameters supported by Qwen Code API
84
  SUPPORTED_PARAMS = {
85
- 'model', 'messages', 'temperature', 'top_p', 'max_tokens',
86
- 'stream', 'tools', 'tool_choice', 'presence_penalty',
87
- 'frequency_penalty', 'n', 'stop', 'seed', 'response_format'
 
 
 
 
 
 
 
 
 
 
 
88
  }
89
 
 
90
  class QwenCodeProvider(QwenAuthBase, ProviderInterface):
91
  skip_cost_calculation = True
92
- REASONING_START_MARKER = 'THINK||'
93
 
94
  def __init__(self):
95
  super().__init__()
@@ -111,7 +136,9 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
111
  Validates OAuth credentials if applicable.
112
  """
113
  models = []
114
- env_var_ids = set() # Track IDs from env vars to prevent hardcoded/dynamic duplicates
 
 
115
 
116
  def extract_model_id(item) -> str:
117
  """Extract model ID from various formats (dict, string with/without provider prefix)."""
@@ -137,7 +164,9 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
137
  # Track the ID to prevent hardcoded/dynamic duplicates
138
  if model_id:
139
  env_var_ids.add(model_id)
140
- lib_logger.info(f"Loaded {len(static_models)} static models for qwen_code from environment variables")
 
 
141
 
142
  # Source 2: Add hardcoded models (only if ID not already in env vars)
143
  for model_id in HARDCODED_MODELS:
@@ -155,14 +184,17 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
155
  models_url = f"{api_base.rstrip('/')}/v1/models"
156
 
157
  response = await client.get(
158
- models_url,
159
- headers={"Authorization": f"Bearer {access_token}"}
160
  )
161
  response.raise_for_status()
162
 
163
  dynamic_data = response.json()
164
  # Handle both {data: [...]} and direct [...] formats
165
- model_list = dynamic_data.get("data", dynamic_data) if isinstance(dynamic_data, dict) else dynamic_data
 
 
 
 
166
 
167
  dynamic_count = 0
168
  for model in model_list:
@@ -173,7 +205,9 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
173
  dynamic_count += 1
174
 
175
  if dynamic_count > 0:
176
- lib_logger.debug(f"Discovered {dynamic_count} additional models for qwen_code from API")
 
 
177
 
178
  except Exception as e:
179
  # Silently ignore dynamic discovery errors
@@ -238,10 +272,10 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
238
  payload = {k: v for k, v in kwargs.items() if k in SUPPORTED_PARAMS}
239
 
240
  # Always force streaming for internal processing
241
- payload['stream'] = True
242
 
243
  # Always include usage data in stream
244
- payload['stream_options'] = {"include_usage": True}
245
 
246
  # Handle tool schema cleaning
247
  if "tools" in payload and payload["tools"]:
@@ -250,22 +284,26 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
250
  elif not payload.get("tools"):
251
  # Per Qwen Code API bug (see: https://github.com/qianwen-team/flash-dance/issues/2),
252
  # injecting a dummy tool prevents stream corruption when no tools are provided
253
- payload["tools"] = [{
254
- "type": "function",
255
- "function": {
256
- "name": "do_not_call_me",
257
- "description": "Do not call this tool.",
258
- "parameters": {"type": "object", "properties": {}}
 
 
259
  }
260
- }]
261
- lib_logger.debug("Injected dummy tool to prevent Qwen API stream corruption")
 
 
262
 
263
  return payload
264
 
265
  def _convert_chunk_to_openai(self, chunk: Dict[str, Any], model_id: str):
266
  """
267
  Converts a raw Qwen SSE chunk to an OpenAI-compatible chunk.
268
-
269
  CRITICAL FIX: Handle chunks with BOTH usage and choices (final chunk)
270
  without early return to ensure finish_reason is properly processed.
271
  """
@@ -287,32 +325,42 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
287
 
288
  # Yield the choice chunk first (contains finish_reason)
289
  yield {
290
- "choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}],
291
- "model": model_id, "object": "chat.completion.chunk",
292
- "id": chunk_id, "created": chunk_created
 
 
 
 
293
  }
294
  # Then yield the usage chunk
295
  yield {
296
- "choices": [], "model": model_id, "object": "chat.completion.chunk",
297
- "id": chunk_id, "created": chunk_created,
 
 
 
298
  "usage": {
299
  "prompt_tokens": usage_data.get("prompt_tokens", 0),
300
  "completion_tokens": usage_data.get("completion_tokens", 0),
301
  "total_tokens": usage_data.get("total_tokens", 0),
302
- }
303
  }
304
  return
305
 
306
  # Handle usage-only chunks
307
  if usage_data:
308
  yield {
309
- "choices": [], "model": model_id, "object": "chat.completion.chunk",
310
- "id": chunk_id, "created": chunk_created,
 
 
 
311
  "usage": {
312
  "prompt_tokens": usage_data.get("prompt_tokens", 0),
313
  "completion_tokens": usage_data.get("completion_tokens", 0),
314
  "total_tokens": usage_data.get("total_tokens", 0),
315
- }
316
  }
317
  return
318
 
@@ -327,35 +375,52 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
327
  # Handle <think> tags for reasoning content
328
  content = delta.get("content")
329
  if content and ("<think>" in content or "</think>" in content):
330
- parts = content.replace("<think>", f"||{self.REASONING_START_MARKER}").replace("</think>", f"||/{self.REASONING_START_MARKER}").split("||")
 
 
 
 
331
  for part in parts:
332
- if not part: continue
333
-
 
334
  new_delta = {}
335
  if part.startswith(self.REASONING_START_MARKER):
336
- new_delta['reasoning_content'] = part.replace(self.REASONING_START_MARKER, "")
 
 
337
  elif part.startswith(f"/{self.REASONING_START_MARKER}"):
338
  continue
339
  else:
340
- new_delta['content'] = part
341
-
342
  yield {
343
- "choices": [{"index": 0, "delta": new_delta, "finish_reason": None}],
344
- "model": model_id, "object": "chat.completion.chunk",
345
- "id": chunk_id, "created": chunk_created
 
 
 
 
346
  }
347
  else:
348
  # Standard content chunk
349
  yield {
350
- "choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}],
351
- "model": model_id, "object": "chat.completion.chunk",
352
- "id": chunk_id, "created": chunk_created
 
 
 
 
353
  }
354
 
355
- def _stream_to_completion_response(self, chunks: List[litellm.ModelResponse]) -> litellm.ModelResponse:
 
 
356
  """
357
  Manually reassembles streaming chunks into a complete response.
358
-
359
  Key improvements:
360
  - Determines finish_reason based on accumulated state (tool_calls vs stop)
361
  - Properly initializes tool_calls with type field
@@ -368,14 +433,16 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
368
  final_message = {"role": "assistant"}
369
  aggregated_tool_calls = {}
370
  usage_data = None
371
- chunk_finish_reason = None # Track finish_reason from chunks (but we'll override)
 
 
372
 
373
  # Get the first chunk for basic response metadata
374
  first_chunk = chunks[0]
375
 
376
  # Process each chunk to aggregate content
377
  for chunk in chunks:
378
- if not hasattr(chunk, 'choices') or not chunk.choices:
379
  continue
380
 
381
  choice = chunk.choices[0]
@@ -399,25 +466,48 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
399
  index = tc_chunk.get("index", 0)
400
  if index not in aggregated_tool_calls:
401
  # Initialize with type field for OpenAI compatibility
402
- aggregated_tool_calls[index] = {"type": "function", "function": {"name": "", "arguments": ""}}
 
 
 
403
  if "id" in tc_chunk:
404
  aggregated_tool_calls[index]["id"] = tc_chunk["id"]
405
  if "type" in tc_chunk:
406
  aggregated_tool_calls[index]["type"] = tc_chunk["type"]
407
  if "function" in tc_chunk:
408
- if "name" in tc_chunk["function"] and tc_chunk["function"]["name"] is not None:
409
- aggregated_tool_calls[index]["function"]["name"] += tc_chunk["function"]["name"]
410
- if "arguments" in tc_chunk["function"] and tc_chunk["function"]["arguments"] is not None:
411
- aggregated_tool_calls[index]["function"]["arguments"] += tc_chunk["function"]["arguments"]
 
 
 
 
 
 
 
 
 
 
412
 
413
  # Aggregate function calls (legacy format)
414
  if "function_call" in delta and delta["function_call"] is not None:
415
  if "function_call" not in final_message:
416
  final_message["function_call"] = {"name": "", "arguments": ""}
417
- if "name" in delta["function_call"] and delta["function_call"]["name"] is not None:
418
- final_message["function_call"]["name"] += delta["function_call"]["name"]
419
- if "arguments" in delta["function_call"] and delta["function_call"]["arguments"] is not None:
420
- final_message["function_call"]["arguments"] += delta["function_call"]["arguments"]
 
 
 
 
 
 
 
 
 
 
421
 
422
  # Track finish_reason from chunks (for reference only)
423
  if choice.get("finish_reason"):
@@ -425,7 +515,7 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
425
 
426
  # Handle usage data from the last chunk that has it
427
  for chunk in reversed(chunks):
428
- if hasattr(chunk, 'usage') and chunk.usage:
429
  usage_data = chunk.usage
430
  break
431
 
@@ -451,7 +541,7 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
451
  final_choice = {
452
  "index": 0,
453
  "message": final_message,
454
- "finish_reason": finish_reason
455
  }
456
 
457
  # Create the final ModelResponse
@@ -461,20 +551,21 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
461
  "created": first_chunk.created,
462
  "model": first_chunk.model,
463
  "choices": [final_choice],
464
- "usage": usage_data
465
  }
466
 
467
  return litellm.ModelResponse(**final_response_data)
468
 
469
- async def acompletion(self, client: httpx.AsyncClient, **kwargs) -> Union[litellm.ModelResponse, AsyncGenerator[litellm.ModelResponse, None]]:
 
 
470
  credential_path = kwargs.pop("credential_identifier")
471
  enable_request_logging = kwargs.pop("enable_request_logging", False)
472
  model = kwargs["model"]
473
 
474
  # Create dedicated file logger for this request
475
  file_logger = _QwenCodeFileLogger(
476
- model_name=model,
477
- enabled=enable_request_logging
478
  )
479
 
480
  async def make_request():
@@ -482,8 +573,8 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
482
  api_base, access_token = await self.get_api_details(credential_path)
483
 
484
  # Strip provider prefix from model name (e.g., "qwen_code/qwen3-coder-plus" -> "qwen3-coder-plus")
485
- model_name = model.split('/')[-1]
486
- kwargs_with_stripped_model = {**kwargs, 'model': model_name}
487
 
488
  # Build clean payload with only supported parameters
489
  payload = self._build_request_payload(**kwargs_with_stripped_model)
@@ -504,8 +595,11 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
504
  lib_logger.debug(f"Qwen Code Request URL: {url}")
505
 
506
  return client.stream(
507
- "POST", url, headers=headers, json=payload,
508
- timeout=httpx.Timeout(connect=30.0, read=120.0, write=120.0, pool=120.0)
 
 
 
509
  )
510
 
511
  async def stream_handler(response_stream, attempt=1):
@@ -515,11 +609,17 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
515
  # Check for HTTP errors before processing stream
516
  if response.status_code >= 400:
517
  error_text = await response.aread()
518
- error_text = error_text.decode('utf-8') if isinstance(error_text, bytes) else error_text
 
 
 
 
519
 
520
  # Handle 401: Force token refresh and retry once
521
  if response.status_code == 401 and attempt == 1:
522
- lib_logger.warning("Qwen Code returned 401. Forcing token refresh and retrying once.")
 
 
523
  await self._refresh_token(credential_path, force=True)
524
  retry_stream = await make_request()
525
  async for chunk in stream_handler(retry_stream, attempt=2):
@@ -527,12 +627,15 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
527
  return
528
 
529
  # Handle 429: Rate limit
530
- elif response.status_code == 429 or "slow_down" in error_text.lower():
 
 
 
531
  raise RateLimitError(
532
  f"Qwen Code rate limit exceeded: {error_text}",
533
  llm_provider="qwen_code",
534
  model=model,
535
- response=response
536
  )
537
 
538
  # Handle other errors
@@ -542,28 +645,34 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
542
  raise httpx.HTTPStatusError(
543
  f"HTTP {response.status_code}: {error_text}",
544
  request=response.request,
545
- response=response
546
  )
547
 
548
  # Process successful streaming response
549
  async for line in response.aiter_lines():
550
  file_logger.log_response_chunk(line)
551
- if line.startswith('data: '):
552
  data_str = line[6:]
553
  if data_str == "[DONE]":
554
  break
555
  try:
556
  chunk = json.loads(data_str)
557
- for openai_chunk in self._convert_chunk_to_openai(chunk, model):
 
 
558
  yield litellm.ModelResponse(**openai_chunk)
559
  except json.JSONDecodeError:
560
- lib_logger.warning(f"Could not decode JSON from Qwen Code: {line}")
 
 
561
 
562
  except httpx.HTTPStatusError:
563
  raise # Re-raise HTTP errors we already handled
564
  except Exception as e:
565
  file_logger.log_error(f"Error during Qwen Code stream processing: {e}")
566
- lib_logger.error(f"Error during Qwen Code stream processing: {e}", exc_info=True)
 
 
567
  raise
568
 
569
  async def logging_stream_wrapper():
@@ -581,7 +690,9 @@ class QwenCodeProvider(QwenAuthBase, ProviderInterface):
581
  if kwargs.get("stream"):
582
  return logging_stream_wrapper()
583
  else:
 
584
  async def non_stream_wrapper():
585
  chunks = [chunk async for chunk in logging_stream_wrapper()]
586
  return self._stream_to_completion_response(chunks)
587
- return await non_stream_wrapper()
 
 
10
  from .provider_interface import ProviderInterface
11
  from .qwen_auth_base import QwenAuthBase
12
  from ..model_definitions import ModelDefinitions
13
+ from ..timeout_config import TimeoutConfig
14
  import litellm
15
  from litellm.exceptions import RateLimitError, AuthenticationError
16
  from pathlib import Path
17
  import uuid
18
  from datetime import datetime
19
 
20
+ lib_logger = logging.getLogger("rotator_library")
21
 
22
  LOGS_DIR = Path(__file__).resolve().parent.parent.parent.parent / "logs"
23
  QWEN_CODE_LOGS_DIR = LOGS_DIR / "qwen_code_logs"
24
 
25
+
26
  class _QwenCodeFileLogger:
27
  """A simple file logger for a single Qwen Code transaction."""
28
+
29
  def __init__(self, model_name: str, enabled: bool = True):
30
  self.enabled = enabled
31
  if not self.enabled:
 
34
  timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
35
  request_id = str(uuid.uuid4())
36
  # Sanitize model name for directory
37
+ safe_model_name = model_name.replace("/", "_").replace(":", "_")
38
+ self.log_dir = (
39
+ QWEN_CODE_LOGS_DIR / f"{timestamp}_{safe_model_name}_{request_id}"
40
+ )
41
  try:
42
  self.log_dir.mkdir(parents=True, exist_ok=True)
43
  except Exception as e:
 
46
 
47
  def log_request(self, payload: Dict[str, Any]):
48
  """Logs the request payload sent to Qwen Code."""
49
+ if not self.enabled:
50
+ return
51
  try:
52
+ with open(
53
+ self.log_dir / "request_payload.json", "w", encoding="utf-8"
54
+ ) as f:
55
  json.dump(payload, f, indent=2, ensure_ascii=False)
56
  except Exception as e:
57
  lib_logger.error(f"_QwenCodeFileLogger: Failed to write request: {e}")
58
 
59
  def log_response_chunk(self, chunk: str):
60
  """Logs a raw chunk from the Qwen Code response stream."""
61
+ if not self.enabled:
62
+ return
63
  try:
64
  with open(self.log_dir / "response_stream.log", "a", encoding="utf-8") as f:
65
  f.write(chunk + "\n")
66
  except Exception as e:
67
+ lib_logger.error(
68
+ f"_QwenCodeFileLogger: Failed to write response chunk: {e}"
69
+ )
70
 
71
  def log_error(self, error_message: str):
72
  """Logs an error message."""
73
+ if not self.enabled:
74
+ return
75
  try:
76
  with open(self.log_dir / "error.log", "a", encoding="utf-8") as f:
77
  f.write(f"[{datetime.utcnow().isoformat()}] {error_message}\n")
 
80
 
81
  def log_final_response(self, response_data: Dict[str, Any]):
82
  """Logs the final, reassembled response."""
83
+ if not self.enabled:
84
+ return
85
  try:
86
  with open(self.log_dir / "final_response.json", "w", encoding="utf-8") as f:
87
  json.dump(response_data, f, indent=2, ensure_ascii=False)
88
  except Exception as e:
89
+ lib_logger.error(
90
+ f"_QwenCodeFileLogger: Failed to write final response: {e}"
91
+ )
92
+
93
 
94
+ HARDCODED_MODELS = ["qwen3-coder-plus", "qwen3-coder-flash"]
 
 
 
95
 
96
  # OpenAI-compatible parameters supported by Qwen Code API
97
  SUPPORTED_PARAMS = {
98
+ "model",
99
+ "messages",
100
+ "temperature",
101
+ "top_p",
102
+ "max_tokens",
103
+ "stream",
104
+ "tools",
105
+ "tool_choice",
106
+ "presence_penalty",
107
+ "frequency_penalty",
108
+ "n",
109
+ "stop",
110
+ "seed",
111
+ "response_format",
112
  }
113
 
114
+
115
  class QwenCodeProvider(QwenAuthBase, ProviderInterface):
116
  skip_cost_calculation = True
117
+ REASONING_START_MARKER = "THINK||"
118
 
119
  def __init__(self):
120
  super().__init__()
 
136
  Validates OAuth credentials if applicable.
137
  """
138
  models = []
139
+ env_var_ids = (
140
+ set()
141
+ ) # Track IDs from env vars to prevent hardcoded/dynamic duplicates
142
 
143
  def extract_model_id(item) -> str:
144
  """Extract model ID from various formats (dict, string with/without provider prefix)."""
 
164
  # Track the ID to prevent hardcoded/dynamic duplicates
165
  if model_id:
166
  env_var_ids.add(model_id)
167
+ lib_logger.info(
168
+ f"Loaded {len(static_models)} static models for qwen_code from environment variables"
169
+ )
170
 
171
  # Source 2: Add hardcoded models (only if ID not already in env vars)
172
  for model_id in HARDCODED_MODELS:
 
184
  models_url = f"{api_base.rstrip('/')}/v1/models"
185
 
186
  response = await client.get(
187
+ models_url, headers={"Authorization": f"Bearer {access_token}"}
 
188
  )
189
  response.raise_for_status()
190
 
191
  dynamic_data = response.json()
192
  # Handle both {data: [...]} and direct [...] formats
193
+ model_list = (
194
+ dynamic_data.get("data", dynamic_data)
195
+ if isinstance(dynamic_data, dict)
196
+ else dynamic_data
197
+ )
198
 
199
  dynamic_count = 0
200
  for model in model_list:
 
205
  dynamic_count += 1
206
 
207
  if dynamic_count > 0:
208
+ lib_logger.debug(
209
+ f"Discovered {dynamic_count} additional models for qwen_code from API"
210
+ )
211
 
212
  except Exception as e:
213
  # Silently ignore dynamic discovery errors
 
272
  payload = {k: v for k, v in kwargs.items() if k in SUPPORTED_PARAMS}
273
 
274
  # Always force streaming for internal processing
275
+ payload["stream"] = True
276
 
277
  # Always include usage data in stream
278
+ payload["stream_options"] = {"include_usage": True}
279
 
280
  # Handle tool schema cleaning
281
  if "tools" in payload and payload["tools"]:
 
284
  elif not payload.get("tools"):
285
  # Per Qwen Code API bug (see: https://github.com/qianwen-team/flash-dance/issues/2),
286
  # injecting a dummy tool prevents stream corruption when no tools are provided
287
+ payload["tools"] = [
288
+ {
289
+ "type": "function",
290
+ "function": {
291
+ "name": "do_not_call_me",
292
+ "description": "Do not call this tool.",
293
+ "parameters": {"type": "object", "properties": {}},
294
+ },
295
  }
296
+ ]
297
+ lib_logger.debug(
298
+ "Injected dummy tool to prevent Qwen API stream corruption"
299
+ )
300
 
301
  return payload
302
 
303
  def _convert_chunk_to_openai(self, chunk: Dict[str, Any], model_id: str):
304
  """
305
  Converts a raw Qwen SSE chunk to an OpenAI-compatible chunk.
306
+
307
  CRITICAL FIX: Handle chunks with BOTH usage and choices (final chunk)
308
  without early return to ensure finish_reason is properly processed.
309
  """
 
325
 
326
  # Yield the choice chunk first (contains finish_reason)
327
  yield {
328
+ "choices": [
329
+ {"index": 0, "delta": delta, "finish_reason": finish_reason}
330
+ ],
331
+ "model": model_id,
332
+ "object": "chat.completion.chunk",
333
+ "id": chunk_id,
334
+ "created": chunk_created,
335
  }
336
  # Then yield the usage chunk
337
  yield {
338
+ "choices": [],
339
+ "model": model_id,
340
+ "object": "chat.completion.chunk",
341
+ "id": chunk_id,
342
+ "created": chunk_created,
343
  "usage": {
344
  "prompt_tokens": usage_data.get("prompt_tokens", 0),
345
  "completion_tokens": usage_data.get("completion_tokens", 0),
346
  "total_tokens": usage_data.get("total_tokens", 0),
347
+ },
348
  }
349
  return
350
 
351
  # Handle usage-only chunks
352
  if usage_data:
353
  yield {
354
+ "choices": [],
355
+ "model": model_id,
356
+ "object": "chat.completion.chunk",
357
+ "id": chunk_id,
358
+ "created": chunk_created,
359
  "usage": {
360
  "prompt_tokens": usage_data.get("prompt_tokens", 0),
361
  "completion_tokens": usage_data.get("completion_tokens", 0),
362
  "total_tokens": usage_data.get("total_tokens", 0),
363
+ },
364
  }
365
  return
366
 
 
375
  # Handle <think> tags for reasoning content
376
  content = delta.get("content")
377
  if content and ("<think>" in content or "</think>" in content):
378
+ parts = (
379
+ content.replace("<think>", f"||{self.REASONING_START_MARKER}")
380
+ .replace("</think>", f"||/{self.REASONING_START_MARKER}")
381
+ .split("||")
382
+ )
383
  for part in parts:
384
+ if not part:
385
+ continue
386
+
387
  new_delta = {}
388
  if part.startswith(self.REASONING_START_MARKER):
389
+ new_delta["reasoning_content"] = part.replace(
390
+ self.REASONING_START_MARKER, ""
391
+ )
392
  elif part.startswith(f"/{self.REASONING_START_MARKER}"):
393
  continue
394
  else:
395
+ new_delta["content"] = part
396
+
397
  yield {
398
+ "choices": [
399
+ {"index": 0, "delta": new_delta, "finish_reason": None}
400
+ ],
401
+ "model": model_id,
402
+ "object": "chat.completion.chunk",
403
+ "id": chunk_id,
404
+ "created": chunk_created,
405
  }
406
  else:
407
  # Standard content chunk
408
  yield {
409
+ "choices": [
410
+ {"index": 0, "delta": delta, "finish_reason": finish_reason}
411
+ ],
412
+ "model": model_id,
413
+ "object": "chat.completion.chunk",
414
+ "id": chunk_id,
415
+ "created": chunk_created,
416
  }
417
 
418
+ def _stream_to_completion_response(
419
+ self, chunks: List[litellm.ModelResponse]
420
+ ) -> litellm.ModelResponse:
421
  """
422
  Manually reassembles streaming chunks into a complete response.
423
+
424
  Key improvements:
425
  - Determines finish_reason based on accumulated state (tool_calls vs stop)
426
  - Properly initializes tool_calls with type field
 
433
  final_message = {"role": "assistant"}
434
  aggregated_tool_calls = {}
435
  usage_data = None
436
+ chunk_finish_reason = (
437
+ None # Track finish_reason from chunks (but we'll override)
438
+ )
439
 
440
  # Get the first chunk for basic response metadata
441
  first_chunk = chunks[0]
442
 
443
  # Process each chunk to aggregate content
444
  for chunk in chunks:
445
+ if not hasattr(chunk, "choices") or not chunk.choices:
446
  continue
447
 
448
  choice = chunk.choices[0]
 
466
  index = tc_chunk.get("index", 0)
467
  if index not in aggregated_tool_calls:
468
  # Initialize with type field for OpenAI compatibility
469
+ aggregated_tool_calls[index] = {
470
+ "type": "function",
471
+ "function": {"name": "", "arguments": ""},
472
+ }
473
  if "id" in tc_chunk:
474
  aggregated_tool_calls[index]["id"] = tc_chunk["id"]
475
  if "type" in tc_chunk:
476
  aggregated_tool_calls[index]["type"] = tc_chunk["type"]
477
  if "function" in tc_chunk:
478
+ if (
479
+ "name" in tc_chunk["function"]
480
+ and tc_chunk["function"]["name"] is not None
481
+ ):
482
+ aggregated_tool_calls[index]["function"]["name"] += (
483
+ tc_chunk["function"]["name"]
484
+ )
485
+ if (
486
+ "arguments" in tc_chunk["function"]
487
+ and tc_chunk["function"]["arguments"] is not None
488
+ ):
489
+ aggregated_tool_calls[index]["function"]["arguments"] += (
490
+ tc_chunk["function"]["arguments"]
491
+ )
492
 
493
  # Aggregate function calls (legacy format)
494
  if "function_call" in delta and delta["function_call"] is not None:
495
  if "function_call" not in final_message:
496
  final_message["function_call"] = {"name": "", "arguments": ""}
497
+ if (
498
+ "name" in delta["function_call"]
499
+ and delta["function_call"]["name"] is not None
500
+ ):
501
+ final_message["function_call"]["name"] += delta["function_call"][
502
+ "name"
503
+ ]
504
+ if (
505
+ "arguments" in delta["function_call"]
506
+ and delta["function_call"]["arguments"] is not None
507
+ ):
508
+ final_message["function_call"]["arguments"] += delta[
509
+ "function_call"
510
+ ]["arguments"]
511
 
512
  # Track finish_reason from chunks (for reference only)
513
  if choice.get("finish_reason"):
 
515
 
516
  # Handle usage data from the last chunk that has it
517
  for chunk in reversed(chunks):
518
+ if hasattr(chunk, "usage") and chunk.usage:
519
  usage_data = chunk.usage
520
  break
521
 
 
541
  final_choice = {
542
  "index": 0,
543
  "message": final_message,
544
+ "finish_reason": finish_reason,
545
  }
546
 
547
  # Create the final ModelResponse
 
551
  "created": first_chunk.created,
552
  "model": first_chunk.model,
553
  "choices": [final_choice],
554
+ "usage": usage_data,
555
  }
556
 
557
  return litellm.ModelResponse(**final_response_data)
558
 
559
+ async def acompletion(
560
+ self, client: httpx.AsyncClient, **kwargs
561
+ ) -> Union[litellm.ModelResponse, AsyncGenerator[litellm.ModelResponse, None]]:
562
  credential_path = kwargs.pop("credential_identifier")
563
  enable_request_logging = kwargs.pop("enable_request_logging", False)
564
  model = kwargs["model"]
565
 
566
  # Create dedicated file logger for this request
567
  file_logger = _QwenCodeFileLogger(
568
+ model_name=model, enabled=enable_request_logging
 
569
  )
570
 
571
  async def make_request():
 
573
  api_base, access_token = await self.get_api_details(credential_path)
574
 
575
  # Strip provider prefix from model name (e.g., "qwen_code/qwen3-coder-plus" -> "qwen3-coder-plus")
576
+ model_name = model.split("/")[-1]
577
+ kwargs_with_stripped_model = {**kwargs, "model": model_name}
578
 
579
  # Build clean payload with only supported parameters
580
  payload = self._build_request_payload(**kwargs_with_stripped_model)
 
595
  lib_logger.debug(f"Qwen Code Request URL: {url}")
596
 
597
  return client.stream(
598
+ "POST",
599
+ url,
600
+ headers=headers,
601
+ json=payload,
602
+ timeout=TimeoutConfig.streaming(),
603
  )
604
 
605
  async def stream_handler(response_stream, attempt=1):
 
609
  # Check for HTTP errors before processing stream
610
  if response.status_code >= 400:
611
  error_text = await response.aread()
612
+ error_text = (
613
+ error_text.decode("utf-8")
614
+ if isinstance(error_text, bytes)
615
+ else error_text
616
+ )
617
 
618
  # Handle 401: Force token refresh and retry once
619
  if response.status_code == 401 and attempt == 1:
620
+ lib_logger.warning(
621
+ "Qwen Code returned 401. Forcing token refresh and retrying once."
622
+ )
623
  await self._refresh_token(credential_path, force=True)
624
  retry_stream = await make_request()
625
  async for chunk in stream_handler(retry_stream, attempt=2):
 
627
  return
628
 
629
  # Handle 429: Rate limit
630
+ elif (
631
+ response.status_code == 429
632
+ or "slow_down" in error_text.lower()
633
+ ):
634
  raise RateLimitError(
635
  f"Qwen Code rate limit exceeded: {error_text}",
636
  llm_provider="qwen_code",
637
  model=model,
638
+ response=response,
639
  )
640
 
641
  # Handle other errors
 
645
  raise httpx.HTTPStatusError(
646
  f"HTTP {response.status_code}: {error_text}",
647
  request=response.request,
648
+ response=response,
649
  )
650
 
651
  # Process successful streaming response
652
  async for line in response.aiter_lines():
653
  file_logger.log_response_chunk(line)
654
+ if line.startswith("data: "):
655
  data_str = line[6:]
656
  if data_str == "[DONE]":
657
  break
658
  try:
659
  chunk = json.loads(data_str)
660
+ for openai_chunk in self._convert_chunk_to_openai(
661
+ chunk, model
662
+ ):
663
  yield litellm.ModelResponse(**openai_chunk)
664
  except json.JSONDecodeError:
665
+ lib_logger.warning(
666
+ f"Could not decode JSON from Qwen Code: {line}"
667
+ )
668
 
669
  except httpx.HTTPStatusError:
670
  raise # Re-raise HTTP errors we already handled
671
  except Exception as e:
672
  file_logger.log_error(f"Error during Qwen Code stream processing: {e}")
673
+ lib_logger.error(
674
+ f"Error during Qwen Code stream processing: {e}", exc_info=True
675
+ )
676
  raise
677
 
678
  async def logging_stream_wrapper():
 
690
  if kwargs.get("stream"):
691
  return logging_stream_wrapper()
692
  else:
693
+
694
  async def non_stream_wrapper():
695
  chunks = [chunk async for chunk in logging_stream_wrapper()]
696
  return self._stream_to_completion_response(chunks)
697
+
698
+ return await non_stream_wrapper()
src/rotator_library/timeout_config.py ADDED
@@ -0,0 +1,102 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # src/rotator_library/timeout_config.py
2
+ """
3
+ Centralized timeout configuration for HTTP requests.
4
+
5
+ All values can be overridden via environment variables:
6
+ TIMEOUT_CONNECT - Connection establishment timeout (default: 30s)
7
+ TIMEOUT_WRITE - Request body send timeout (default: 30s)
8
+ TIMEOUT_POOL - Connection pool acquisition timeout (default: 60s)
9
+ TIMEOUT_READ_STREAMING - Read timeout between chunks for streaming (default: 180s / 3 min)
10
+ TIMEOUT_READ_NON_STREAMING - Read timeout for non-streaming responses (default: 600s / 10 min)
11
+ """
12
+
13
+ import os
14
+ import logging
15
+ import httpx
16
+
17
+ lib_logger = logging.getLogger("rotator_library")
18
+
19
+
20
+ class TimeoutConfig:
21
+ """
22
+ Centralized timeout configuration for HTTP requests.
23
+
24
+ All values can be overridden via environment variables.
25
+ """
26
+
27
+ # Default values (in seconds)
28
+ _CONNECT = 30.0
29
+ _WRITE = 30.0
30
+ _POOL = 60.0
31
+ _READ_STREAMING = 180.0 # 3 minutes between chunks
32
+ _READ_NON_STREAMING = 600.0 # 10 minutes for full response
33
+
34
+ @classmethod
35
+ def _get_env_float(cls, key: str, default: float) -> float:
36
+ """Get a float value from environment variable, or return default."""
37
+ value = os.environ.get(key)
38
+ if value is not None:
39
+ try:
40
+ return float(value)
41
+ except ValueError:
42
+ lib_logger.warning(
43
+ f"Invalid value for {key}: {value}. Using default: {default}"
44
+ )
45
+ return default
46
+
47
+ @classmethod
48
+ def connect(cls) -> float:
49
+ """Connection establishment timeout."""
50
+ return cls._get_env_float("TIMEOUT_CONNECT", cls._CONNECT)
51
+
52
+ @classmethod
53
+ def write(cls) -> float:
54
+ """Request body send timeout."""
55
+ return cls._get_env_float("TIMEOUT_WRITE", cls._WRITE)
56
+
57
+ @classmethod
58
+ def pool(cls) -> float:
59
+ """Connection pool acquisition timeout."""
60
+ return cls._get_env_float("TIMEOUT_POOL", cls._POOL)
61
+
62
+ @classmethod
63
+ def read_streaming(cls) -> float:
64
+ """Read timeout between chunks for streaming requests."""
65
+ return cls._get_env_float("TIMEOUT_READ_STREAMING", cls._READ_STREAMING)
66
+
67
+ @classmethod
68
+ def read_non_streaming(cls) -> float:
69
+ """Read timeout for non-streaming responses."""
70
+ return cls._get_env_float("TIMEOUT_READ_NON_STREAMING", cls._READ_NON_STREAMING)
71
+
72
+ @classmethod
73
+ def streaming(cls) -> httpx.Timeout:
74
+ """
75
+ Timeout configuration for streaming LLM requests.
76
+
77
+ Uses a shorter read timeout (default 3 min) since we expect
78
+ periodic chunks. If no data arrives for this duration, the
79
+ connection is considered stalled.
80
+ """
81
+ return httpx.Timeout(
82
+ connect=cls.connect(),
83
+ read=cls.read_streaming(),
84
+ write=cls.write(),
85
+ pool=cls.pool(),
86
+ )
87
+
88
+ @classmethod
89
+ def non_streaming(cls) -> httpx.Timeout:
90
+ """
91
+ Timeout configuration for non-streaming LLM requests.
92
+
93
+ Uses a longer read timeout (default 10 min) since the server
94
+ may take significant time to generate the complete response
95
+ before sending anything back.
96
+ """
97
+ return httpx.Timeout(
98
+ connect=cls.connect(),
99
+ read=cls.read_non_streaming(),
100
+ write=cls.write(),
101
+ pool=cls.pool(),
102
+ )