Qodo / main.py
AIMaster7's picture
Create main.py
fcc1420 verified
raw
history blame
16.1 kB
import os
import json
import time
import uuid
from typing import List, Dict, Optional, Union, Generator, Any
# --- Core Dependencies ---
import uvicorn
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field
from curl_cffi.requests import Session
from curl_cffi import CurlError
# --- Environment Configuration ---
QODO_API_KEY = os.getenv("QODO_API_KEY", "useme")
QODO_URL = os.getenv("QODO_URL", "https://hello.com")
QODO_INFO_URL = os.getenv("QODO_INFO_URL", "https://openai.com")
# --- Recreated/Mocked webscout Dependencies ---
# This section recreates the necessary classes and functions
# to make the QodoAI provider self-contained.
# webscout.exceptions
class exceptions:
class FailedToGenerateResponseError(Exception):
pass
# webscout.AIutel.sanitize_stream
def sanitize_stream(data: Generator[bytes, None, None], content_extractor: callable, **kwargs: Any) -> Generator[str, None, None]:
"""
Parses a stream of byte chunks, extracts complete JSON objects,
and yields content processed by the content_extractor.
"""
buffer = ""
for byte_chunk in data:
buffer += byte_chunk.decode('utf-8', errors='ignore')
start_index = 0
while True:
# Find the start of a potential JSON object
try:
obj_start = buffer.index('{', start_index)
except ValueError:
# No more objects in buffer, keep the remainder for the next chunk
buffer = buffer[start_index:]
break
# Find the corresponding end brace
brace_count = 1
i = obj_start + 1
while i < len(buffer) and brace_count > 0:
if buffer[i] == '{':
brace_count += 1
elif buffer[i] == '}':
brace_count -= 1
i += 1
if brace_count == 0: # Found a complete object
json_str = buffer[obj_start:i]
try:
json_obj = json.loads(json_str)
content = content_extractor(json_obj)
if content:
yield content
except json.JSONDecodeError:
pass # Skip malformed JSON
start_index = i # Move past the processed object
else:
# Incomplete object, wait for more data
buffer = buffer[start_index:]
break
# webscout.Provider.OPENAI.utils (Pydantic Models)
class Tool(BaseModel):
type: str = "function"
function: Dict[str, Any]
class ChatCompletionMessage(BaseModel):
role: str
content: Optional[str] = None
tool_calls: Optional[List[Dict]] = None
class Choice(BaseModel):
index: int
message: Optional[ChatCompletionMessage] = None
finish_reason: Optional[str] = None
delta: Optional[Dict] = Field(default_factory=dict)
class ChoiceDelta(BaseModel):
content: Optional[str] = None
role: Optional[str] = None
class ChoiceStreaming(BaseModel):
index: int
delta: ChoiceDelta
finish_reason: Optional[str] = None
class CompletionUsage(BaseModel):
prompt_tokens: int
completion_tokens: int
total_tokens: int
class ChatCompletion(BaseModel):
id: str
choices: List[Choice]
created: int
model: str
object: str = "chat.completion"
usage: CompletionUsage
class ChatCompletionChunk(BaseModel):
id: str
choices: List[ChoiceStreaming]
created: int
model: str
object: str = "chat.completion.chunk"
usage: Optional[CompletionUsage] = None
# webscout.Provider.OPENAI.base
class BaseCompletions:
def __init__(self, client: Any):
self._client = client
class BaseChat:
def __init__(self, client: Any):
self.completions = Completions(client)
class OpenAICompatibleProvider:
def __init__(self, **kwargs: Any):
pass
# Attempt to import LitAgent, fallback if not available
try:
from webscout.litagent import LitAgent
except ImportError:
LitAgent = None
# --- QodoAI Provider Code (from the prompt) ---
class Completions(BaseCompletions):
def create(
self,
*,
model: str,
messages: List[Dict[str, Any]],
stream: bool = False,
**kwargs: Any
) -> Union[ChatCompletion, Generator[ChatCompletionChunk, None, None]]:
"""
Creates a model response for the given chat conversation.
Mimics openai.chat.completions.create
"""
user_prompt = ""
for message in reversed(messages):
if message.get("role") == "user":
user_prompt = message.get("content", "")
break
if not user_prompt:
raise ValueError("No user message found in messages")
payload = self._client._build_payload(user_prompt, model)
payload["stream"] = stream
payload["custom_model"] = model
request_id = f"chatcmpl-{uuid.uuid4()}"
created_time = int(time.time())
if stream:
return self._create_stream(request_id, created_time, model, payload, user_prompt)
else:
return self._create_non_stream(request_id, created_time, model, payload, user_prompt)
def _create_stream(
self, request_id: str, created_time: int, model: str, payload: Dict[str, Any], user_prompt: str
) -> Generator[ChatCompletionChunk, None, None]:
try:
response = self._client.session.post(
self._client.url,
json=payload,
stream=True,
timeout=self._client.timeout,
impersonate="chrome110"
)
if response.status_code == 401:
raise exceptions.FailedToGenerateResponseError("Invalid Qodo API key provided.")
elif response.status_code != 200:
raise IOError(f"Qodo request failed with status code {response.status_code}: {response.text}")
prompt_tokens = len(user_prompt.split())
completion_tokens = 0
processed_stream = sanitize_stream(
data=response.iter_content(chunk_size=None),
content_extractor=QodoAI._qodo_extractor
)
for content_chunk in processed_stream:
if content_chunk:
completion_tokens += len(content_chunk.split())
delta = ChoiceDelta(content=content_chunk, role="assistant")
choice = ChoiceStreaming(index=0, delta=delta, finish_reason=None)
chunk = ChatCompletionChunk(id=request_id, choices=[choice], created=created_time, model=model)
yield chunk
final_choice = ChoiceStreaming(index=0, delta=ChoiceDelta(), finish_reason="stop")
yield ChatCompletionChunk(id=request_id, choices=[final_choice], created=created_time, model=model)
except CurlError as e:
raise exceptions.FailedToGenerateResponseError(f"Request failed (CurlError): {e}")
except Exception as e:
raise exceptions.FailedToGenerateResponseError(f"An unexpected error occurred ({type(e).__name__}): {e}")
def _create_non_stream(
self, request_id: str, created_time: int, model: str, payload: Dict[str, Any], user_prompt: str
) -> ChatCompletion:
try:
payload["stream"] = False
response = self._client.session.post(
self._client.url,
json=payload,
timeout=self._client.timeout,
impersonate="chrome110"
)
if response.status_code == 401:
raise exceptions.FailedToGenerateResponseError("Invalid Qodo API key provided.")
elif response.status_code != 200:
raise IOError(f"Qodo request failed with status code {response.status_code}: {response.text}")
response_text = response.text
full_response = ""
# This logic parses concatenated JSON objects from the response body.
current_json = ""
brace_count = 0
json_objects = []
lines = response_text.strip().split('\n')
for line in lines:
current_json += line
brace_count += line.count('{') - line.count('}')
if brace_count == 0 and current_json:
json_objects.append(current_json)
current_json = ""
for json_str in json_objects:
try:
json_obj = json.loads(json_str)
content = QodoAI._qodo_extractor(json_obj)
if content:
full_response += content
except json.JSONDecodeError:
pass
prompt_tokens = len(user_prompt.split())
completion_tokens = len(full_response.split())
total_tokens = prompt_tokens + completion_tokens
message = ChatCompletionMessage(role="assistant", content=full_response)
choice = Choice(index=0, message=message, finish_reason="stop")
usage = CompletionUsage(prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, total_tokens=total_tokens)
return ChatCompletion(id=request_id, choices=[choice], created=created_time, model=model, usage=usage)
except CurlError as e:
raise exceptions.FailedToGenerateResponseError(f"Request failed (CurlError): {e}")
except Exception as e:
raise exceptions.FailedToGenerateResponseError(f"Request failed ({type(e).__name__}): {e}")
class Chat(BaseChat):
def __init__(self, client: 'QodoAI'):
self.completions = Completions(client)
class QodoAI(OpenAICompatibleProvider):
AVAILABLE_MODELS = ["gpt-4.1", "gpt-4o", "o3", "o4-mini", "claude-4-sonnet", "gemini-2.5-pro"]
def __init__(self, api_key: str, **kwargs: Any):
super().__init__(api_key=api_key, **kwargs)
self.url = QODO_URL
self.info_url = QODO_INFO_URL
self.timeout = 600
self.api_key = api_key
self.user_agent = "axios/1.10.0"
self.session_id = self._get_session_id()
self.request_id = str(uuid.uuid4())
self.headers = {
"Accept": "text/plain", "Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "en-US,en;q=0.9", "Authorization": f"Bearer {self.api_key}",
"Connection": "close", "Content-Type": "application/json",
"host": "api.cli.qodo.ai", "Request-id": self.request_id,
"Session-id": self.session_id, "User-Agent": self.user_agent,
}
self.session = Session()
self.session.headers.update(self.headers)
self.chat = Chat(self)
@staticmethod
def _qodo_extractor(chunk: Union[str, Dict[str, Any]]) -> Optional[str]:
if isinstance(chunk, dict):
data = chunk.get("data", {})
if isinstance(data, dict):
tool_args = data.get("tool_args", {})
if isinstance(tool_args, dict) and "content" in tool_args:
return tool_args.get("content")
if "content" in data:
return data["content"]
return None
def _get_session_id(self) -> str:
try:
temp_session = Session()
temp_headers = {
"Authorization": f"Bearer {self.api_key}",
"User-Agent": self.user_agent,
}
temp_session.headers.update(temp_headers)
response = temp_session.get(self.info_url, timeout=self.timeout, impersonate="chrome110")
if response.status_code == 200:
return response.json().get("session-id", f"fallback-{uuid.uuid4()}")
elif response.status_code == 401:
raise exceptions.FailedToGenerateResponseError("Invalid Qodo API key. Please check your QODO_API_KEY environment variable.")
else:
raise exceptions.FailedToGenerateResponseError(f"Failed to get session_id from Qodo: HTTP {response.status_code}")
except Exception as e:
raise exceptions.FailedToGenerateResponseError(f"Failed to connect to Qodo API to get session_id: {e}")
def _build_payload(self, prompt: str, model: str) -> Dict[str, Any]:
return {
"agent_type": "cli", "session_id": self.session_id,
"user_data": {"extension_version": "0.7.2", "os_platform": "win32"},
"tools": {"web_search": []}, "user_request": prompt,
"execution_strategy": "act", "custom_model": model, "stream": True
}
# --- FastAPI Application ---
app = FastAPI(
title="QodoAI OpenAI-Compatible API",
description="Provides an OpenAI-compatible interface for the QodoAI service.",
version="1.0.0"
)
# Initialize the client at startup
try:
client = QodoAI(api_key=QODO_API_KEY)
except exceptions.FailedToGenerateResponseError as e:
print(f"FATAL: Could not initialize QodoAI client: {e}")
print("Please ensure the QODO_API_KEY environment variable is set correctly.")
client = None
# --- API Models ---
class Model(BaseModel):
id: str
object: str = "model"
created: int = Field(default_factory=lambda: int(time.time()))
owned_by: str = "qodoai"
class ModelList(BaseModel):
object: str = "list"
data: List[Model]
class ChatCompletionRequest(BaseModel):
model: str
messages: List[Dict[str, Any]]
max_tokens: Optional[int] = 2049
stream: bool = False
temperature: Optional[float] = None
top_p: Optional[float] = None
tools: Optional[List[Dict[str, Any]]] = None
tool_choice: Optional[str] = None
# --- API Endpoints ---
@app.on_event("startup")
async def startup_event():
if client is None:
# This will prevent the app from starting if the client failed to init
raise RuntimeError("QodoAI client could not be initialized. Check API key and connectivity.")
print("QodoAI client initialized successfully.")
@app.get("/v1/models", response_model=ModelList)
async def list_models():
"""Lists the available models from the QodoAI provider."""
model_data = [Model(id=model_id) for model_id in QodoAI.AVAILABLE_MODELS]
return ModelList(data=model_data)
@app.post("/v1/chat/completions")
async def create_chat_completion(request: ChatCompletionRequest):
"""Creates a chat completion, supporting both streaming and non-streaming modes."""
if client is None:
raise HTTPException(status_code=500, detail="QodoAI client is not available.")
params = request.model_dump(exclude_none=True)
try:
if request.stream:
async def stream_generator():
try:
generator = client.chat.completions.create(**params)
for chunk in generator:
yield f"data: {chunk.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
except exceptions.FailedToGenerateResponseError as e:
error_payload = {"error": {"message": str(e), "type": "api_error"}}
yield f"data: {json.dumps(error_payload)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(stream_generator(), media_type="text/event-stream")
else:
response = client.chat.completions.create(**params)
return JSONResponse(content=response.model_dump())
except exceptions.FailedToGenerateResponseError as e:
raise HTTPException(status_code=500, detail=str(e))
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)