import os import httpx import json import traceback from typing import AsyncGenerator, List, Dict from config import logger # ===== OpenAI ===== async def ask_openai(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]: openai_api_key = os.getenv("OPENAI_API_KEY") if not openai_api_key: logger.error("OpenAI API key not provided") yield "Error: OpenAI API key not provided." return messages = [] for msg in history: messages.append({"role": "user", "content": msg["user"]}) if msg.get("openai"): messages.append({"role": "assistant", "content": msg["openai"]}) messages.append({"role": "user", "content": query}) headers = { "Authorization": f"Bearer {openai_api_key}", "Content-Type": "application/json" } payload = { "model": "gpt-3.5-turbo", "messages": messages, "stream": True } try: async with httpx.AsyncClient() as client: async with client.stream("POST", "https://api.openai.com/v1/chat/completions", headers=headers, json=payload) as response: response.raise_for_status() buffer = "" async for chunk in response.aiter_text(): if chunk: buffer += chunk while "\n" in buffer: line, buffer = buffer.split("\n", 1) if line.startswith("data: "): data = line[6:] if data.strip() == "[DONE]": break if not data.strip(): continue try: json_data = json.loads(data) delta = json_data["choices"][0].get("delta", {}) if "content" in delta: yield delta["content"] except Exception as e: logger.error(f"OpenAI parse error: {e}") yield f"[OpenAI Error]: {e}" except Exception as e: logger.error(f"OpenAI API error: {e}") yield f"[OpenAI Error]: {e}" # ===== Anthropic ===== async def ask_anthropic(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]: anthropic_api_key = os.getenv("ANTHROPIC_API_KEY") if not anthropic_api_key: logger.error("Anthropic API key not provided") yield "Error: Anthropic API key not provided." return messages = [] for msg in history: messages.append({"role": "user", "content": msg["user"]}) if msg.get("anthropic"): messages.append({"role": "assistant", "content": msg["anthropic"]}) messages.append({"role": "user", "content": query}) headers = { "x-api-key": anthropic_api_key, "anthropic-version": "2023-06-01", "Content-Type": "application/json" } payload = { "model": "claude-3-5-sonnet-20241022", "max_tokens": 1024, "messages": messages, "stream": True } try: async with httpx.AsyncClient() as client: async with client.stream("POST", "https://api.anthropic.com/v1/messages", headers=headers, json=payload) as response: response.raise_for_status() buffer = "" async for chunk in response.aiter_text(): if chunk: buffer += chunk while "\n" in buffer: line, buffer = buffer.split("\n", 1) if line.startswith("data: "): data = line[6:] if data.strip() == "[DONE]": break if not data.strip(): continue try: json_data = json.loads(data) if json_data.get("type") == "content_block_delta" and "delta" in json_data: yield json_data["delta"].get("text", "") except Exception as e: logger.error(f"Anthropic parse error: {e}") yield f"[Anthropic Error]: {e}" except Exception as e: logger.error(f"Anthropic API error: {e}") yield f"[Anthropic Error]: {e}" # ===== Gemini ===== async def ask_gemini(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]: gemini_api_key = os.getenv("GEMINI_API_KEY") if not gemini_api_key: logger.error("Gemini API key not provided") yield "Error: Gemini API key not provided." return history_text = "" for msg in history: history_text += f"User: {msg['user']}\n" if msg.get("gemini"): history_text += f"Assistant: {msg['gemini']}\n" full_prompt = f"{history_text}User: {query}\n" headers = {"Content-Type": "application/json"} payload = { "contents": [{"parts": [{"text": full_prompt}]}] } try: async with httpx.AsyncClient() as client: async with client.stream( "POST", f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:streamGenerateContent?key={gemini_api_key}", headers=headers, json=payload ) as response: response.raise_for_status() buffer = "" async for chunk in response.aiter_text(): if not chunk.strip(): continue buffer += chunk try: json_data = json.loads(buffer.strip(", \n")) buffer = "" # handle both list and dict format objects = json_data if isinstance(json_data, list) else [json_data] for obj in objects: candidates = obj.get("candidates", []) if candidates: parts = candidates[0].get("content", {}).get("parts", []) for part in parts: text = part.get("text", "") if text: yield text except json.JSONDecodeError: continue # wait for more data except Exception as e: logger.error(f"Gemini API error: {e}") yield f"[Gemini Error]: {e}"