Spaces:
Sleeping
Sleeping
| import os | |
| import httpx | |
| import json | |
| import traceback | |
| from typing import AsyncGenerator, List, Dict | |
| from config import logger | |
| # ===== OpenAI ===== | |
| async def ask_openai(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]: | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| if not openai_api_key: | |
| logger.error("OpenAI API key not provided") | |
| yield "Error: OpenAI API key not provided." | |
| return | |
| messages = [] | |
| for msg in history: | |
| messages.append({"role": "user", "content": msg["user"]}) | |
| if msg.get("openai"): | |
| messages.append({"role": "assistant", "content": msg["openai"]}) | |
| messages.append({"role": "user", "content": query}) | |
| headers = { | |
| "Authorization": f"Bearer {openai_api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model": "gpt-3.5-turbo", | |
| "messages": messages, | |
| "stream": True | |
| } | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| async with client.stream("POST", "https://api.openai.com/v1/chat/completions", headers=headers, json=payload) as response: | |
| response.raise_for_status() | |
| buffer = "" | |
| async for chunk in response.aiter_text(): | |
| if chunk: | |
| buffer += chunk | |
| while "\n" in buffer: | |
| line, buffer = buffer.split("\n", 1) | |
| if line.startswith("data: "): | |
| data = line[6:] | |
| if data.strip() == "[DONE]": | |
| break | |
| if not data.strip(): | |
| continue | |
| try: | |
| json_data = json.loads(data) | |
| delta = json_data["choices"][0].get("delta", {}) | |
| if "content" in delta: | |
| yield delta["content"] | |
| except Exception as e: | |
| logger.error(f"OpenAI parse error: {e}") | |
| yield f"[OpenAI Error]: {e}" | |
| except Exception as e: | |
| logger.error(f"OpenAI API error: {e}") | |
| yield f"[OpenAI Error]: {e}" | |
| # ===== Anthropic ===== | |
| async def ask_anthropic(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]: | |
| anthropic_api_key = os.getenv("ANTHROPIC_API_KEY") | |
| if not anthropic_api_key: | |
| logger.error("Anthropic API key not provided") | |
| yield "Error: Anthropic API key not provided." | |
| return | |
| messages = [] | |
| for msg in history: | |
| messages.append({"role": "user", "content": msg["user"]}) | |
| if msg.get("anthropic"): | |
| messages.append({"role": "assistant", "content": msg["anthropic"]}) | |
| messages.append({"role": "user", "content": query}) | |
| headers = { | |
| "x-api-key": anthropic_api_key, | |
| "anthropic-version": "2023-06-01", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model": "claude-3-5-sonnet-20241022", | |
| "max_tokens": 1024, | |
| "messages": messages, | |
| "stream": True | |
| } | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| async with client.stream("POST", "https://api.anthropic.com/v1/messages", headers=headers, json=payload) as response: | |
| response.raise_for_status() | |
| buffer = "" | |
| async for chunk in response.aiter_text(): | |
| if chunk: | |
| buffer += chunk | |
| while "\n" in buffer: | |
| line, buffer = buffer.split("\n", 1) | |
| if line.startswith("data: "): | |
| data = line[6:] | |
| if data.strip() == "[DONE]": | |
| break | |
| if not data.strip(): | |
| continue | |
| try: | |
| json_data = json.loads(data) | |
| if json_data.get("type") == "content_block_delta" and "delta" in json_data: | |
| yield json_data["delta"].get("text", "") | |
| except Exception as e: | |
| logger.error(f"Anthropic parse error: {e}") | |
| yield f"[Anthropic Error]: {e}" | |
| except Exception as e: | |
| logger.error(f"Anthropic API error: {e}") | |
| yield f"[Anthropic Error]: {e}" | |
| # ===== Gemini ===== | |
| async def ask_gemini(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]: | |
| gemini_api_key = os.getenv("GEMINI_API_KEY") | |
| if not gemini_api_key: | |
| logger.error("Gemini API key not provided") | |
| yield "Error: Gemini API key not provided." | |
| return | |
| history_text = "" | |
| for msg in history: | |
| history_text += f"User: {msg['user']}\n" | |
| if msg.get("gemini"): | |
| history_text += f"Assistant: {msg['gemini']}\n" | |
| full_prompt = f"{history_text}User: {query}\n" | |
| headers = {"Content-Type": "application/json"} | |
| payload = { | |
| "contents": [{"parts": [{"text": full_prompt}]}] | |
| } | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| async with client.stream( | |
| "POST", | |
| f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:streamGenerateContent?key={gemini_api_key}", | |
| headers=headers, | |
| json=payload | |
| ) as response: | |
| response.raise_for_status() | |
| buffer = "" | |
| async for chunk in response.aiter_text(): | |
| if not chunk.strip(): | |
| continue | |
| buffer += chunk | |
| try: | |
| json_data = json.loads(buffer.strip(", \n")) | |
| buffer = "" | |
| # handle both list and dict format | |
| objects = json_data if isinstance(json_data, list) else [json_data] | |
| for obj in objects: | |
| candidates = obj.get("candidates", []) | |
| if candidates: | |
| parts = candidates[0].get("content", {}).get("parts", []) | |
| for part in parts: | |
| text = part.get("text", "") | |
| if text: | |
| yield text | |
| except json.JSONDecodeError: | |
| continue # wait for more data | |
| except Exception as e: | |
| logger.error(f"Gemini API error: {e}") | |
| yield f"[Gemini Error]: {e}" | |