| | from fastapi import FastAPI, HTTPException
|
| | from pydantic import BaseModel
|
| | import httpx
|
| | import requests
|
| | import os
|
| | from dotenv import load_dotenv
|
| | from typing import List, Optional, Dict, Any
|
| | import json
|
| |
|
| |
|
| | load_dotenv()
|
| |
|
| |
|
| | OPENROUTER_API_KEY = os.getenv("OPEN_ROUTER_API_KEY")
|
| | PERSONALITY_URL = os.getenv("PERSONALITY_URL")
|
| | app = FastAPI(title="ER5 v1.0.0")
|
| |
|
| | class ChatMessage(BaseModel):
|
| | role: str
|
| | content: str
|
| |
|
| |
|
| | class ChatRequest(BaseModel):
|
| | messages: List[ChatMessage]=[]
|
| | temperature: Optional[float] = 0.7
|
| | max_tokens: Optional[int] = 1000
|
| | user_preferences: Optional[Dict[str, Any]] = None
|
| | context: Optional[str] = ""
|
| | personality: Optional[str] = None
|
| |
|
| | class ChatResponse(BaseModel):
|
| | response: str
|
| | context: Optional[str] = None
|
| |
|
| | def build_system_message(preferences: Dict[str, Any]) -> dict:
|
| | """Create a system message based on schemaless user preferences"""
|
| | if not preferences:
|
| | return {"role": "system", "content": "You are a helpful assistant."}
|
| |
|
| | preferences_str = "\n".join(
|
| | f"- {key}: {value if not isinstance(value, (list, dict)) else json.dumps(value)}"
|
| | for key, value in preferences.items()
|
| | )
|
| |
|
| | system_content = f"""You are a helpful assistant. Please consider these user preferences in your responses:
|
| |
|
| | {preferences_str}
|
| |
|
| | Tailor your responses to align with these user preferences and characteristics."""
|
| |
|
| | return {"role": "system", "content": system_content}
|
| |
|
| | def parse_context(context_str: str) -> List[dict]:
|
| | """Parse context string into messages list"""
|
| | if not context_str:
|
| | return []
|
| | try:
|
| | return json.loads(context_str)
|
| | except json.JSONDecodeError:
|
| | return []
|
| |
|
| |
|
| |
|
| | async def call_openrouter(messages: List[dict], temperature: float, max_tokens: int):
|
| | url = "https://openrouter.ai/api/v1/chat/completions"
|
| |
|
| | headers = {
|
| | "Authorization": f"Bearer {OPENROUTER_API_KEY}",
|
| | "Content-Type": "application/json"
|
| | }
|
| |
|
| | payload = {
|
| | "model": "qwen/qwen-2.5-7b-instruct",
|
| | "messages": messages,
|
| | "temperature": temperature,
|
| | "max_tokens": max_tokens
|
| | }
|
| |
|
| | async with httpx.AsyncClient() as client:
|
| | response = await client.post(url, json=payload, headers=headers)
|
| |
|
| | if response.status_code != 200:
|
| | raise HTTPException(status_code=response.status_code,
|
| | detail=f"OpenRouter API error: {response.text}")
|
| |
|
| | return response.json()['choices'][0]['message']['content']
|
| |
|
| | async def chat_endpoint(request: ChatRequest):
|
| | try:
|
| |
|
| |
|
| | include_preferences = await should_include_preferences_llm(request.messages[-1].content)
|
| |
|
| |
|
| | messages = []
|
| |
|
| | if include_preferences and request.user_preferences:
|
| | messages.append(build_system_message(request.user_preferences))
|
| |
|
| |
|
| | context_messages = parse_context(request.context)
|
| | messages.extend(context_messages)
|
| |
|
| |
|
| | messages.extend([{"role": msg.role, "content": msg.content}
|
| | for msg in request.messages])
|
| |
|
| |
|
| | messages = messages[-10:]
|
| |
|
| | print(messages)
|
| |
|
| | response = await decide_chat_api(
|
| | messages=messages,
|
| | temperature=request.temperature,
|
| | max_tokens=request.max_tokens,
|
| | personality=request.personality
|
| | )
|
| |
|
| | assistant_message = response
|
| |
|
| |
|
| | context_messages = messages[1:] if include_preferences else messages
|
| | context_messages.append({"role": "assistant", "content": assistant_message})
|
| | updated_context = json.dumps(context_messages[-10:])
|
| |
|
| | return ChatResponse(response=assistant_message, context=updated_context)
|
| |
|
| | except Exception as e:
|
| | raise HTTPException(status_code=500, detail=str(e))
|
| |
|
| | async def should_include_preferences_llm(query: str) -> bool:
|
| | """
|
| | Use the LLM to decide if user preferences should be included based on the query.
|
| | """
|
| | url = "https://openrouter.ai/api/v1/chat/completions"
|
| | headers = {
|
| | "Authorization": f"Bearer {OPENROUTER_API_KEY}",
|
| | "Content-Type": "application/json"
|
| | }
|
| |
|
| |
|
| | prompt = (
|
| | "You are an assistant that classifies user queries. "
|
| | "Given the query below, decide if user-specific context or preferences are relevant. "
|
| | "Respond with 'true' if any kind of personal context or preferences should be included in the response, "
|
| | "and 'false' otherwise.\n\n"
|
| | f"Query: {query}\n\n"
|
| | "Is this query related to personal context such as user preferences, lifestyle, habits, or any other factors "
|
| | "that could affect the response?"
|
| | )
|
| |
|
| | payload = {
|
| | "model": "qwen/qwen-2.5-7b-instruct",
|
| | "messages": [{"role": "user", "content": prompt}],
|
| | "temperature": 0.0,
|
| | "max_tokens": 5
|
| | }
|
| |
|
| | async with httpx.AsyncClient() as client:
|
| | response = await client.post(url, json=payload, headers=headers)
|
| |
|
| | if response.status_code != 200:
|
| | raise HTTPException(status_code=response.status_code,
|
| | detail=f"OpenRouter API error: {response.text}")
|
| |
|
| |
|
| | llm_response = response.json()['choices'][0]['message']['content'].strip().lower()
|
| | return llm_response == "true"
|
| |
|
| | async def call_personality_api(messages: List[dict], personality: str):
|
| | """
|
| | Call the external Personality API to handle chat completions.
|
| | """
|
| | url = PERSONALITY_URL + "/chat"
|
| | message_str = "\n".join([msg['content'] for msg in messages]) if messages else ""
|
| |
|
| | payload = {
|
| | "message": message_str,
|
| | "personality": personality
|
| | }
|
| | payload = {
|
| | "message": message_str,
|
| | "personality": 'humanish'
|
| | }
|
| |
|
| | response = requests.post(url, json=payload)
|
| | if response.status_code != 200:
|
| | raise HTTPException(status_code=response.status_code,
|
| | detail=f"Personality API error: {response.text}")
|
| |
|
| | response_json = response.json()
|
| | if "response" in response_json:
|
| | return response_json["response"]
|
| |
|
| |
|
| | async def decide_chat_api(messages: List[dict], temperature: float, max_tokens: int,personality:str):
|
| | """
|
| | This function decides which API to call based on the presence of the PERSONALITY_URL environment variable.
|
| | """
|
| | if PERSONALITY_URL and PERSONALITY_URL.strip() and personality:
|
| | print("yes")
|
| | return await call_personality_api(messages,personality)
|
| | else:
|
| |
|
| | return await call_openrouter(messages, temperature, max_tokens)
|
| |
|
| |
|
| |
|
| |
|
| |
|