trends_spotter_LATEST / utils /api_clients.py
cryogenic22's picture
Create utils/api_clients.py
d57e709 verified
# utils/api_clients.py
import os
from typing import Dict, Any
import requests
import streamlit as st
import anthropic
import openai
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
class PerplexityAPIClient:
"""Direct Perplexity API integration"""
def __init__(self):
self.api_key = os.environ.get("PERPLEXITY_API_KEY")
if not self.api_key:
raise ValueError("PERPLEXITY_API_KEY environment variable not set")
self.url = "https://api.perplexity.ai/chat/completions"
def search(self, query: str) -> Dict[str, Any]:
"""Search using Perplexity API with advanced configuration"""
payload = {
"model": "sonar",
"messages": [
{
"role": "system",
"content": "You are a trend analysis expert. Provide detailed, data-driven analysis."
},
{
"role": "user",
"content": query
}
],
"temperature": 0.2,
"top_p": 0.9,
"return_images": False,
"return_related_questions": False,
"search_recency_filter": "month"
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
try:
response = requests.post(self.url, json=payload, headers=headers)
response.raise_for_status()
result = response.json()
return {
"results": [
{
"title": f"Analysis {i+1}",
"text": choice["message"]["content"]
}
for i, choice in enumerate(result.get("choices", []))
]
}
except Exception as e:
st.error(f"Error querying Perplexity API: {str(e)}")
return {"results": []}
def initialize_api_clients():
"""Initialize all API clients"""
try:
client_anthropic = anthropic.Client(api_key=os.environ.get("ANTHROPIC_API_KEY"))
client_openai = openai.OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
perplexity_client = PerplexityAPIClient()
return client_anthropic, client_openai, perplexity_client
except Exception as e:
st.error(f"Error initializing API clients: {str(e)}")
raise