| import os |
| import json |
| import time |
| import uuid |
| from typing import List, Dict, Optional, Union, Generator, Any |
|
|
| |
| import uvicorn |
| from fastapi import FastAPI, HTTPException, Request |
| from fastapi.responses import JSONResponse, StreamingResponse |
| from pydantic import BaseModel, Field |
| from curl_cffi.requests import Session |
| from curl_cffi import CurlError |
|
|
| |
| QODO_API_KEY = os.getenv("QODO_API_KEY", "useme") |
| QODO_URL = os.getenv("QODO_URL", "https://hello.com") |
| QODO_INFO_URL = os.getenv("QODO_INFO_URL", "https://openai.com") |
|
|
| |
| |
| |
|
|
| |
| class exceptions: |
| class FailedToGenerateResponseError(Exception): |
| pass |
|
|
| |
| def sanitize_stream(data: Generator[bytes, None, None], content_extractor: callable, **kwargs: Any) -> Generator[str, None, None]: |
| """ |
| Parses a stream of byte chunks, extracts complete JSON objects, |
| and yields content processed by the content_extractor. |
| """ |
| buffer = "" |
| for byte_chunk in data: |
| buffer += byte_chunk.decode('utf-8', errors='ignore') |
| |
| start_index = 0 |
| while True: |
| |
| try: |
| obj_start = buffer.index('{', start_index) |
| except ValueError: |
| |
| buffer = buffer[start_index:] |
| break |
|
|
| |
| brace_count = 1 |
| i = obj_start + 1 |
| while i < len(buffer) and brace_count > 0: |
| if buffer[i] == '{': |
| brace_count += 1 |
| elif buffer[i] == '}': |
| brace_count -= 1 |
| i += 1 |
| |
| if brace_count == 0: |
| json_str = buffer[obj_start:i] |
| try: |
| json_obj = json.loads(json_str) |
| content = content_extractor(json_obj) |
| if content: |
| yield content |
| except json.JSONDecodeError: |
| pass |
| start_index = i |
| else: |
| |
| buffer = buffer[start_index:] |
| break |
|
|
| |
| class Tool(BaseModel): |
| type: str = "function" |
| function: Dict[str, Any] |
|
|
| class ChatCompletionMessage(BaseModel): |
| role: str |
| content: Optional[str] = None |
| tool_calls: Optional[List[Dict]] = None |
|
|
| class Choice(BaseModel): |
| index: int |
| message: Optional[ChatCompletionMessage] = None |
| finish_reason: Optional[str] = None |
| delta: Optional[Dict] = Field(default_factory=dict) |
|
|
| class ChoiceDelta(BaseModel): |
| content: Optional[str] = None |
| role: Optional[str] = None |
|
|
| class ChoiceStreaming(BaseModel): |
| index: int |
| delta: ChoiceDelta |
| finish_reason: Optional[str] = None |
|
|
| class CompletionUsage(BaseModel): |
| prompt_tokens: int |
| completion_tokens: int |
| total_tokens: int |
|
|
| class ChatCompletion(BaseModel): |
| id: str |
| choices: List[Choice] |
| created: int |
| model: str |
| object: str = "chat.completion" |
| usage: CompletionUsage |
|
|
| class ChatCompletionChunk(BaseModel): |
| id: str |
| choices: List[ChoiceStreaming] |
| created: int |
| model: str |
| object: str = "chat.completion.chunk" |
| usage: Optional[CompletionUsage] = None |
|
|
| |
| class BaseCompletions: |
| def __init__(self, client: Any): |
| self._client = client |
|
|
| class BaseChat: |
| def __init__(self, client: Any): |
| self.completions = Completions(client) |
|
|
| class OpenAICompatibleProvider: |
| def __init__(self, **kwargs: Any): |
| pass |
|
|
| |
| try: |
| from webscout.litagent import LitAgent |
| except ImportError: |
| LitAgent = None |
|
|
| |
|
|
| class Completions(BaseCompletions): |
| def create( |
| self, |
| *, |
| model: str, |
| messages: List[Dict[str, Any]], |
| stream: bool = False, |
| **kwargs: Any |
| ) -> Union[ChatCompletion, Generator[ChatCompletionChunk, None, None]]: |
| """ |
| Creates a model response for the given chat conversation. |
| Mimics openai.chat.completions.create |
| """ |
| user_prompt = "" |
| for message in reversed(messages): |
| if message.get("role") == "user": |
| user_prompt = message.get("content", "") |
| break |
|
|
| if not user_prompt: |
| raise ValueError("No user message found in messages") |
|
|
| payload = self._client._build_payload(user_prompt, model) |
| payload["stream"] = stream |
| payload["custom_model"] = model |
|
|
| request_id = f"chatcmpl-{uuid.uuid4()}" |
| created_time = int(time.time()) |
|
|
| if stream: |
| return self._create_stream(request_id, created_time, model, payload, user_prompt) |
| else: |
| return self._create_non_stream(request_id, created_time, model, payload, user_prompt) |
|
|
| def _create_stream( |
| self, request_id: str, created_time: int, model: str, payload: Dict[str, Any], user_prompt: str |
| ) -> Generator[ChatCompletionChunk, None, None]: |
| try: |
| response = self._client.session.post( |
| self._client.url, |
| json=payload, |
| stream=True, |
| timeout=self._client.timeout, |
| impersonate="chrome110" |
| ) |
|
|
| if response.status_code == 401: |
| raise exceptions.FailedToGenerateResponseError("Invalid Qodo API key provided.") |
| elif response.status_code != 200: |
| raise IOError(f"Qodo request failed with status code {response.status_code}: {response.text}") |
|
|
| prompt_tokens = len(user_prompt.split()) |
| completion_tokens = 0 |
| |
| processed_stream = sanitize_stream( |
| data=response.iter_content(chunk_size=None), |
| content_extractor=QodoAI._qodo_extractor |
| ) |
|
|
| for content_chunk in processed_stream: |
| if content_chunk: |
| completion_tokens += len(content_chunk.split()) |
| |
| delta = ChoiceDelta(content=content_chunk, role="assistant") |
| choice = ChoiceStreaming(index=0, delta=delta, finish_reason=None) |
| chunk = ChatCompletionChunk(id=request_id, choices=[choice], created=created_time, model=model) |
| yield chunk |
|
|
| final_choice = ChoiceStreaming(index=0, delta=ChoiceDelta(), finish_reason="stop") |
| yield ChatCompletionChunk(id=request_id, choices=[final_choice], created=created_time, model=model) |
|
|
| except CurlError as e: |
| raise exceptions.FailedToGenerateResponseError(f"Request failed (CurlError): {e}") |
| except Exception as e: |
| raise exceptions.FailedToGenerateResponseError(f"An unexpected error occurred ({type(e).__name__}): {e}") |
|
|
| def _create_non_stream( |
| self, request_id: str, created_time: int, model: str, payload: Dict[str, Any], user_prompt: str |
| ) -> ChatCompletion: |
| try: |
| payload["stream"] = False |
| response = self._client.session.post( |
| self._client.url, |
| json=payload, |
| timeout=self._client.timeout, |
| impersonate="chrome110" |
| ) |
| |
| if response.status_code == 401: |
| raise exceptions.FailedToGenerateResponseError("Invalid Qodo API key provided.") |
| elif response.status_code != 200: |
| raise IOError(f"Qodo request failed with status code {response.status_code}: {response.text}") |
|
|
| response_text = response.text |
| full_response = "" |
| |
| |
| current_json = "" |
| brace_count = 0 |
| json_objects = [] |
| lines = response_text.strip().split('\n') |
| for line in lines: |
| current_json += line |
| brace_count += line.count('{') - line.count('}') |
| if brace_count == 0 and current_json: |
| json_objects.append(current_json) |
| current_json = "" |
| |
| for json_str in json_objects: |
| try: |
| json_obj = json.loads(json_str) |
| content = QodoAI._qodo_extractor(json_obj) |
| if content: |
| full_response += content |
| except json.JSONDecodeError: |
| pass |
|
|
| prompt_tokens = len(user_prompt.split()) |
| completion_tokens = len(full_response.split()) |
| total_tokens = prompt_tokens + completion_tokens |
|
|
| message = ChatCompletionMessage(role="assistant", content=full_response) |
| choice = Choice(index=0, message=message, finish_reason="stop") |
| usage = CompletionUsage(prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, total_tokens=total_tokens) |
| return ChatCompletion(id=request_id, choices=[choice], created=created_time, model=model, usage=usage) |
|
|
| except CurlError as e: |
| raise exceptions.FailedToGenerateResponseError(f"Request failed (CurlError): {e}") |
| except Exception as e: |
| raise exceptions.FailedToGenerateResponseError(f"Request failed ({type(e).__name__}): {e}") |
|
|
| class Chat(BaseChat): |
| def __init__(self, client: 'QodoAI'): |
| self.completions = Completions(client) |
|
|
| class QodoAI(OpenAICompatibleProvider): |
| AVAILABLE_MODELS = ["gpt-4.1", "gpt-4o", "o3", "o4-mini", "claude-4-sonnet", "gemini-2.5-pro"] |
|
|
| def __init__(self, api_key: str, **kwargs: Any): |
| super().__init__(api_key=api_key, **kwargs) |
| |
| self.url = QODO_URL |
| self.info_url = QODO_INFO_URL |
| self.timeout = 600 |
| self.api_key = api_key |
| |
| self.user_agent = "axios/1.10.0" |
| self.session_id = self._get_session_id() |
| self.request_id = str(uuid.uuid4()) |
| |
| self.headers = { |
| "Accept": "text/plain", "Accept-Encoding": "gzip, deflate, br, zstd", |
| "Accept-Language": "en-US,en;q=0.9", "Authorization": f"Bearer {self.api_key}", |
| "Connection": "close", "Content-Type": "application/json", |
| "host": "api.cli.qodo.ai", "Request-id": self.request_id, |
| "Session-id": self.session_id, "User-Agent": self.user_agent, |
| } |
| |
| self.session = Session() |
| self.session.headers.update(self.headers) |
| self.chat = Chat(self) |
| |
| @staticmethod |
| def _qodo_extractor(chunk: Union[str, Dict[str, Any]]) -> Optional[str]: |
| if isinstance(chunk, dict): |
| data = chunk.get("data", {}) |
| if isinstance(data, dict): |
| tool_args = data.get("tool_args", {}) |
| if isinstance(tool_args, dict) and "content" in tool_args: |
| return tool_args.get("content") |
| if "content" in data: |
| return data["content"] |
| return None |
|
|
| def _get_session_id(self) -> str: |
| try: |
| temp_session = Session() |
| temp_headers = { |
| "Authorization": f"Bearer {self.api_key}", |
| "User-Agent": self.user_agent, |
| } |
| temp_session.headers.update(temp_headers) |
| |
| response = temp_session.get(self.info_url, timeout=self.timeout, impersonate="chrome110") |
| |
| if response.status_code == 200: |
| return response.json().get("session-id", f"fallback-{uuid.uuid4()}") |
| elif response.status_code == 401: |
| raise exceptions.FailedToGenerateResponseError("Invalid Qodo API key. Please check your QODO_API_KEY environment variable.") |
| else: |
| raise exceptions.FailedToGenerateResponseError(f"Failed to get session_id from Qodo: HTTP {response.status_code}") |
| except Exception as e: |
| raise exceptions.FailedToGenerateResponseError(f"Failed to connect to Qodo API to get session_id: {e}") |
|
|
| def _build_payload(self, prompt: str, model: str) -> Dict[str, Any]: |
| return { |
| "agent_type": "cli", "session_id": self.session_id, |
| "user_data": {"extension_version": "0.7.2", "os_platform": "win32"}, |
| "tools": {"web_search": []}, "user_request": prompt, |
| "execution_strategy": "act", "custom_model": model, "stream": True |
| } |
| |
| |
|
|
| app = FastAPI( |
| title="QodoAI OpenAI-Compatible API", |
| description="Provides an OpenAI-compatible interface for the QodoAI service.", |
| version="1.0.0" |
| ) |
|
|
| |
| try: |
| client = QodoAI(api_key=QODO_API_KEY) |
| except exceptions.FailedToGenerateResponseError as e: |
| print(f"FATAL: Could not initialize QodoAI client: {e}") |
| print("Please ensure the QODO_API_KEY environment variable is set correctly.") |
| client = None |
|
|
| |
|
|
| class Model(BaseModel): |
| id: str |
| object: str = "model" |
| created: int = Field(default_factory=lambda: int(time.time())) |
| owned_by: str = "qodoai" |
|
|
| class ModelList(BaseModel): |
| object: str = "list" |
| data: List[Model] |
|
|
| class ChatCompletionRequest(BaseModel): |
| model: str |
| messages: List[Dict[str, Any]] |
| max_tokens: Optional[int] = 2049 |
| stream: bool = False |
| temperature: Optional[float] = None |
| top_p: Optional[float] = None |
| tools: Optional[List[Dict[str, Any]]] = None |
| tool_choice: Optional[str] = None |
| |
| |
|
|
| @app.on_event("startup") |
| async def startup_event(): |
| if client is None: |
| |
| raise RuntimeError("QodoAI client could not be initialized. Check API key and connectivity.") |
| print("QodoAI client initialized successfully.") |
|
|
| @app.get("/v1/models", response_model=ModelList) |
| async def list_models(): |
| """Lists the available models from the QodoAI provider.""" |
| model_data = [Model(id=model_id) for model_id in QodoAI.AVAILABLE_MODELS] |
| return ModelList(data=model_data) |
|
|
| @app.post("/v1/chat/completions") |
| async def create_chat_completion(request: ChatCompletionRequest): |
| """Creates a chat completion, supporting both streaming and non-streaming modes.""" |
| if client is None: |
| raise HTTPException(status_code=500, detail="QodoAI client is not available.") |
| |
| params = request.model_dump(exclude_none=True) |
| |
| try: |
| if request.stream: |
| async def stream_generator(): |
| try: |
| generator = client.chat.completions.create(**params) |
| for chunk in generator: |
| yield f"data: {chunk.model_dump_json()}\n\n" |
| yield "data: [DONE]\n\n" |
| except exceptions.FailedToGenerateResponseError as e: |
| error_payload = {"error": {"message": str(e), "type": "api_error"}} |
| yield f"data: {json.dumps(error_payload)}\n\n" |
| yield "data: [DONE]\n\n" |
|
|
| return StreamingResponse(stream_generator(), media_type="text/event-stream") |
| else: |
| response = client.chat.completions.create(**params) |
| return JSONResponse(content=response.model_dump()) |
|
|
| except exceptions.FailedToGenerateResponseError as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
| except ValueError as e: |
| raise HTTPException(status_code=400, detail=str(e)) |
|
|
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=8000) |