Ganesh Chintalapati
Multimodel chat with user login and historical dropdown
f45d8b2
import os
import httpx
import json
from typing import AsyncGenerator, List, Dict
from config import logger
# ===== OpenAI =====
async def ask_openai(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]:
openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key:
logger.error("OpenAI API key not provided")
yield "Error: OpenAI API key not provided."
return
messages = []
for msg in history:
if msg.get("role") == "user":
messages.append({"role": "user", "content": msg["content"]})
elif msg.get("role") == "assistant":
messages.append({"role": "assistant", "content": msg["content"]})
messages.append({"role": "user", "content": query})
headers = {
"Authorization": f"Bearer {openai_api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "gpt-3.5-turbo",
"messages": messages,
"stream": True
}
try:
async with httpx.AsyncClient() as client:
async with client.stream("POST", "https://api.openai.com/v1/chat/completions", headers=headers, json=payload) as response:
response.raise_for_status()
buffer = ""
async for chunk in response.aiter_text():
if chunk:
buffer += chunk
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
if line.startswith("data: "):
data = line[6:]
if data.strip() == "[DONE]":
break
if not data.strip():
continue
try:
json_data = json.loads(data)
delta = json_data["choices"][0].get("delta", {})
if "content" in delta:
yield delta["content"]
except Exception as e:
logger.error(f"OpenAI parse error: {e}")
yield f"[OpenAI Error]: {e}"
except Exception as e:
logger.error(f"OpenAI API error: {e}")
yield f"[OpenAI Error]: {e}"
# ===== Anthropic =====
async def ask_anthropic(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]:
anthropic_api_key = os.getenv("ANTHROPIC_API_KEY")
if not anthropic_api_key:
logger.error("Anthropic API key not provided")
yield "Error: Anthropic API key not provided."
return
# --- Start: Message Cleaning for Anthropic ---
# Anthropic requires messages to alternate roles, starting with 'user'.
# Clean the history to ensure this format.
cleaned_messages = []
last_role = None
for msg in history:
role = msg.get("role")
content = msg.get("content")
if not role or not content:
continue # Skip invalid messages
# If the last message was the same role, skip this one or combine (combining is more complex)
if role == last_role:
logger.warning(f"Skipping consecutive message with role: {role}")
continue
# If the first message is 'assistant', skip it
if not cleaned_messages and role == "assistant":
logger.warning("Skipping initial assistant message in history for Anthropic.")
continue
cleaned_messages.append({"role": role, "content": content})
last_role = role
# Ensure the last message in history is 'assistant' before adding the new user query
# If the history ends with 'user', we might have an issue or the model didn't respond last turn.
# For simplicity, we'll just append the new user query. The API will validate the full list.
# A more robust approach might require padding with an empty assistant message if history ends with user.
# However, the core.py logic should ensure history alternates correctly.
# The main cleaning needed is handling initial assistant messages and consecutive roles.
# Append the current user query
cleaned_messages.append({"role": "user", "content": query})
# Final check: Ensure the list starts with 'user' and alternates.
# If after cleaning and adding the new query, the list is empty or starts with 'assistant', something is wrong.
if not cleaned_messages or cleaned_messages[0].get("role") != "user":
logger.error("Anthropic message cleaning resulted in invalid format.")
yield "Error: Internal message formatting issue for Anthropic."
return
# --- End: Message Cleaning ---
headers = {
"x-api-key": anthropic_api_key,
"anthropic-version": "2023-06-01", # Use a valid API version
"Content-Type": "application/json"
}
payload = {
"model": "claude-3-5-sonnet-20241022", # Ensure you are using a valid model name
"max_tokens": 4096, # Increased max_tokens for potentially longer responses
"messages": cleaned_messages, # Use the cleaned messages
"stream": True
}
try:
async with httpx.AsyncClient() as client:
async with client.stream("POST", "https://api.anthropic.com/v1/messages", headers=headers, json=payload) as response:
response.raise_for_status() # Raise HTTPError for bad responses (like 400)
buffer = ""
async for chunk in response.aiter_text():
if chunk:
buffer += chunk
# Anthropic streaming sends JSON objects separated by newlines
# Sometimes multiple objects are in one chunk
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
if line.startswith("data: "):
data = line[6:]
if data.strip() == "[DONE]":
break
if not data.strip():
continue
try:
json_data = json.loads(data)
# Check the type of event
if json_data.get("type") == "content_block_delta" and "delta" in json_data:
yield json_data["delta"].get("text", "")
# Handle other event types if necessary (e.g., message_start, message_delta, message_stop)
except json.JSONDecodeError:
# If it's not a complete JSON line, keep buffering
buffer = line + "\n" + buffer # Put the line back in buffer
except Exception as e:
logger.error(f"Anthropic parse error: {e}")
yield f"[Anthropic Parse Error]: {e}"
except httpx.HTTPStatusError as e:
logger.error(f"Anthropic API HTTP error: {e.response.status_code} - {e.response.text}")
yield f"[Anthropic API Error {e.response.status_code}]: {e.response.text}"
except Exception as e:
logger.error(f"Anthropic API error: {e}")
yield f"[Anthropic Error]: {e}"
# ===== Gemini =====
async def ask_gemini(query: str, history: List[Dict[str, str]]) -> AsyncGenerator[str, None]:
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
logger.error("Gemini API key not provided")
yield "Error: Gemini API key not provided."
return
history_text = ""
for msg in history:
if msg.get("role") == "user":
history_text += f"User: {msg['content']}\n"
elif msg.get("role") == "assistant":
history_text += f"Assistant: {msg['content']}\n"
full_prompt = f"{history_text}User: {query}\n"
headers = {"Content-Type": "application/json"}
payload = {
"contents": [{"parts": [{"text": full_prompt}]}]
}
try:
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:streamGenerateContent?key={gemini_api_key}",
headers=headers,
json=payload
) as response:
response.raise_for_status()
buffer = ""
async for chunk in response.aiter_text():
if not chunk.strip():
continue
buffer += chunk
try:
json_data = json.loads(buffer.strip(", \n"))
buffer = ""
objects = json_data if isinstance(json_data, list) else [json_data]
for obj in objects:
candidates = obj.get("candidates", [])
if candidates:
parts = candidates[0].get("content", {}).get("parts", [])
for part in parts:
text = part.get("text", "")
if text:
yield text
except json.JSONDecodeError:
continue
except Exception as e:
logger.error(f"Gemini API error: {e}")
yield f"[Gemini Error]: {e}"