Egeekle's picture
Update main.py
4e8daea verified
import os
import json
import asyncio
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Literal
from contextlib import asynccontextmanager
import httpx
import numpy as np
import pandas as pd
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
import uvicorn
from azure.ai.textanalytics import TextAnalyticsClient
from azure.core.credentials import AzureKeyCredential
from openai import OpenAI
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# ---------- Config ----------
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4")
AZURE_TEXT_ANALYTICS_KEY = os.getenv("AZURE_TEXT_ANALYTICS_KEY", "")
AZURE_TEXT_ANALYTICS_ENDPOINT = os.getenv("AZURE_TEXT_ANALYTICS_ENDPOINT", "")
ALPHA_VANTAGE_API_KEY = os.getenv("ALPHA_VANTAGE_API_KEY", "")
DATA_FILE = "investment_data.json"
# ---------- Clients ----------
openai_client = None
text_analytics_client = None
# ---------- Lifecycle ----------
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: Initialize OpenAI and Azure clients and load data
global openai_client, text_analytics_client
try:
if OPENAI_API_KEY:
openai_client = OpenAI(api_key=OPENAI_API_KEY)
print("βœ… OpenAI client initialized")
if AZURE_TEXT_ANALYTICS_KEY and AZURE_TEXT_ANALYTICS_ENDPOINT:
text_analytics_client = TextAnalyticsClient(
endpoint=AZURE_TEXT_ANALYTICS_ENDPOINT,
credential=AzureKeyCredential(AZURE_TEXT_ANALYTICS_KEY)
)
print("βœ… Azure Text Analytics client initialized")
except Exception as e:
print(f"⚠️ Warning: Client initialization failed: {e}")
# Load investment data
try:
if os.path.exists(DATA_FILE):
with open(DATA_FILE, "r") as f:
app.state.data = json.load(f)
else:
app.state.data = {
"portfolios": [],
"news_analysis": [],
"market_data": {},
"strategies": [],
"watchlist": []
}
print("βœ… Investment data loaded")
except Exception as e:
print(f"Error loading data: {e}")
app.state.data = {
"portfolios": [],
"news_analysis": [],
"market_data": {},
"strategies": [],
"watchlist": []
}
yield
# Shutdown: Save data
try:
with open(DATA_FILE, "w") as f:
json.dump(app.state.data, f, indent=2)
except Exception as e:
print(f"Error saving data: {e}")
app = FastAPI(
title="Investment Assistant API - ETF & Crypto",
version="2.0.0",
description="AI-powered investment assistant for ETF and Crypto with Top/Bottom strategies using OpenAI",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.state.data = None
# ---------- Models ----------
class ChatMessage(BaseModel):
message: str
context: Optional[Dict] = None
class NewsAnalysis(BaseModel):
title: str
content: str
source: str
published_date: Optional[str] = None
class MarketDataRequest(BaseModel):
symbols: List[str]
asset_type: Literal["ETF", "CRYPTO"] = "ETF"
class StrategyRequest(BaseModel):
symbol: str
asset_type: Literal["ETF", "CRYPTO"]
strategy_type: Literal["TOP", "BOTTOM"]
timeframe: str = "1mo" # 1d, 5d, 1mo, 3mo, 6mo, 1y
class PortfolioPosition(BaseModel):
symbol: str
asset_type: Literal["ETF", "CRYPTO"]
quantity: float
purchase_price: float
purchase_date: str
# ---------- Helper Functions ----------
async def get_market_data(symbols: List[str], asset_type: str) -> Dict:
"""Get market data for ETFs or Crypto"""
results = {}
for symbol in symbols:
try:
if asset_type == "CRYPTO":
# Using CoinGecko API (free, no key needed)
url = f"https://api.coingecko.com/api/v3/coins/{symbol.lower()}/market_chart"
params = {
"vs_currency": "usd",
"days": "30",
"interval": "daily"
}
async with httpx.AsyncClient() as client:
response = await client.get(url, params=params, timeout=10.0)
if response.status_code == 200:
data = response.json()
if "prices" in data and len(data["prices"]) > 0:
prices = [p[1] for p in data["prices"]]
results[symbol] = {
"current_price": prices[-1],
"prices": prices,
"dates": [datetime.fromtimestamp(p[0]/1000).strftime("%Y-%m-%d") for p in data["prices"]],
"high_30d": max(prices),
"low_30d": min(prices),
"change_30d": ((prices[-1] - prices[0]) / prices[0]) * 100,
"asset_type": "CRYPTO"
}
else:
# Using Alpha Vantage for ETFs
if ALPHA_VANTAGE_API_KEY:
url = "https://www.alphavantage.co/query"
params = {
"function": "TIME_SERIES_DAILY",
"symbol": symbol,
"apikey": ALPHA_VANTAGE_API_KEY,
"outputsize": "compact"
}
async with httpx.AsyncClient() as client:
response = await client.get(url, params=params, timeout=10.0)
if response.status_code == 200:
data = response.json()
if "Time Series (Daily)" in data:
time_series = data["Time Series (Daily)"]
dates = sorted(time_series.keys())[-30:]
prices = [float(time_series[date]["4. close"]) for date in dates]
results[symbol] = {
"current_price": prices[-1],
"prices": prices,
"dates": dates,
"high_30d": max(prices),
"low_30d": min(prices),
"change_30d": ((prices[-1] - prices[0]) / prices[0]) * 100,
"asset_type": "ETF"
}
except Exception as e:
print(f"Error fetching data for {symbol}: {e}")
results[symbol] = {"error": str(e)}
return results
def analyze_sentiment(text: str) -> Dict:
"""Analyze sentiment using Azure Text Analytics"""
if not text_analytics_client:
# Fallback to simple keyword-based sentiment
positive_words = ["rise", "gain", "bull", "surge", "rally", "growth", "profit", "up"]
negative_words = ["fall", "drop", "bear", "crash", "decline", "loss", "down", "sell"]
text_lower = text.lower()
positive_score = sum(1 for word in positive_words if word in text_lower)
negative_score = sum(1 for word in negative_words if word in text_lower)
if positive_score > negative_score:
sentiment = "positive"
confidence = min(0.7 + (positive_score * 0.05), 0.95)
elif negative_score > positive_score:
sentiment = "negative"
confidence = min(0.7 + (negative_score * 0.05), 0.95)
else:
sentiment = "neutral"
confidence = 0.5
return {
"sentiment": sentiment,
"confidence": confidence,
"method": "keyword-based"
}
try:
documents = [text[:5000]] # Azure limit
response = text_analytics_client.analyze_sentiment(
documents=documents,
show_opinion_mining=True
)[0]
return {
"sentiment": response.sentiment,
"confidence": {
"positive": response.confidence_scores.positive,
"neutral": response.confidence_scores.neutral,
"negative": response.confidence_scores.negative
},
"method": "azure-text-analytics"
}
except Exception as e:
print(f"Error in sentiment analysis: {e}")
return {"sentiment": "neutral", "confidence": 0.5, "error": str(e)}
def calculate_technical_indicators(prices: List[float]) -> Dict:
"""Calculate technical indicators for strategy analysis"""
if len(prices) < 2:
return {}
df = pd.Series(prices)
# Moving averages
sma_10 = df.rolling(window=min(10, len(df))).mean().iloc[-1]
sma_20 = df.rolling(window=min(20, len(df))).mean().iloc[-1]
# RSI
delta = df.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=min(14, len(df))).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=min(14, len(df))).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
rsi_value = rsi.iloc[-1] if not rsi.empty else 50
# Volatility
volatility = df.pct_change().std() * 100
# Current position relative to range
current = prices[-1]
high_30d = max(prices)
low_30d = min(prices)
price_position = ((current - low_30d) / (high_30d - low_30d)) * 100 if high_30d != low_30d else 50
return {
"sma_10": float(sma_10),
"sma_20": float(sma_20),
"rsi": float(rsi_value),
"volatility": float(volatility),
"price_position": float(price_position), # 0 = bottom, 100 = top
"current_price": current,
"high_30d": high_30d,
"low_30d": low_30d
}
def generate_top_bottom_strategy(symbol: str, asset_type: str, market_data: Dict, news_sentiment: Optional[Dict] = None) -> Dict:
"""Generate TOP or BOTTOM strategy recommendations"""
if "error" in market_data:
return {"error": "Insufficient market data"}
indicators = calculate_technical_indicators(market_data["prices"])
price_position = indicators.get("price_position", 50)
rsi = indicators.get("rsi", 50)
sentiment_score = 0.5
if news_sentiment:
if news_sentiment["sentiment"] == "positive":
sentiment_score = 0.7
elif news_sentiment["sentiment"] == "negative":
sentiment_score = 0.3
# TOP Strategy (Momentum - Buy at highs)
top_score = 0
top_reasons = []
if price_position > 70:
top_score += 0.3
top_reasons.append(f"Price at {price_position:.1f}% of 30-day range (near top)")
if rsi > 50 and rsi < 70:
top_score += 0.2
top_reasons.append(f"RSI at {rsi:.1f} (showing momentum)")
if sentiment_score > 0.6:
top_score += 0.2
top_reasons.append("Positive news sentiment")
if indicators.get("sma_10", 0) > indicators.get("sma_20", 0):
top_score += 0.15
top_reasons.append("Short-term MA above long-term MA (uptrend)")
if market_data.get("change_30d", 0) > 5:
top_score += 0.15
top_reasons.append(f"30-day gain of {market_data['change_30d']:.1f}%")
# BOTTOM Strategy (Value - Buy at lows)
bottom_score = 0
bottom_reasons = []
if price_position < 30:
bottom_score += 0.3
bottom_reasons.append(f"Price at {price_position:.1f}% of 30-day range (near bottom)")
if rsi < 30:
bottom_score += 0.25
bottom_reasons.append(f"RSI at {rsi:.1f} (oversold condition)")
if sentiment_score < 0.4:
bottom_score += 0.15
bottom_reasons.append("Negative news sentiment (contrarian opportunity)")
if indicators.get("sma_10", 0) < indicators.get("sma_20", 0):
bottom_score += 0.15
bottom_reasons.append("Short-term MA below long-term MA (potential reversal)")
if market_data.get("change_30d", 0) < -10:
bottom_score += 0.15
bottom_reasons.append(f"30-day decline of {market_data['change_30d']:.1f}% (potential bounce)")
return {
"symbol": symbol,
"asset_type": asset_type,
"current_price": market_data["current_price"],
"indicators": indicators,
"top_strategy": {
"score": min(top_score, 1.0),
"recommendation": "BUY" if top_score > 0.5 else "HOLD",
"confidence": "HIGH" if top_score > 0.7 else "MEDIUM" if top_score > 0.5 else "LOW",
"reasons": top_reasons,
"strategy": "Momentum - Follow the trend, buy at highs"
},
"bottom_strategy": {
"score": min(bottom_score, 1.0),
"recommendation": "BUY" if bottom_score > 0.5 else "HOLD",
"confidence": "HIGH" if bottom_score > 0.7 else "MEDIUM" if bottom_score > 0.5 else "LOW",
"reasons": bottom_reasons,
"strategy": "Value - Buy the dip, contrarian approach"
},
"generated_at": datetime.now().isoformat()
}
async def get_llm_advice(query: str, context: Optional[Dict] = None) -> str:
"""Get investment advice from OpenAI"""
if not openai_client:
return "OpenAI is not configured. Please set OPENAI_API_KEY environment variable."
system_prompt = """You are an expert financial advisor specializing in ETF and cryptocurrency investments.
Your expertise includes:
- Technical analysis and market trends
- Top (momentum) and Bottom (value) investment strategies
- Risk assessment and portfolio management
- Market sentiment analysis
Provide clear, actionable advice based on the provided context. Always include:
1. Clear recommendation (BUY/HOLD/SELL)
2. Reasoning based on data
3. Risk factors
4. Suggested strategy (TOP/BOTTOM)
Be concise but comprehensive."""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": query}
]
if context:
context_str = json.dumps(context, indent=2)
messages.append({
"role": "assistant",
"content": f"Context data:\n{context_str}"
})
try:
response = openai_client.chat.completions.create(
model=OPENAI_MODEL,
messages=messages,
temperature=0.7,
max_tokens=1000
)
return response.choices[0].message.content
except Exception as e:
return f"Error getting LLM advice: {str(e)}"
# ---------- Endpoints ----------
@app.get("/")
def root():
return {
"message": "Investment Assistant API - ETF & Crypto",
"version": "2.0.0",
"features": [
"LLM-powered investment advice",
"Top/Bottom strategy analysis",
"News sentiment analysis",
"Market data for ETFs and Crypto",
"Portfolio management"
],
"endpoints": {
"health": "/health",
"chat": "/chat",
"market_data": "/market-data",
"strategy": "/strategy",
"news_analysis": "/news/analyze",
"docs": "/docs"
}
}
@app.get("/health")
def health():
return {
"status": "healthy",
"openai": openai_client is not None,
"azure_text_analytics": text_analytics_client is not None,
"timestamp": datetime.now().isoformat()
}
@app.post("/chat")
async def chat(message: ChatMessage):
"""Chat with the investment assistant LLM"""
advice = await get_llm_advice(message.message, message.context)
return {
"query": message.message,
"advice": advice,
"timestamp": datetime.now().isoformat()
}
@app.post("/market-data")
async def get_market_data_endpoint(request: MarketDataRequest):
"""Get market data for ETFs or Crypto"""
data = await get_market_data(request.symbols, request.asset_type)
app.state.data["market_data"].update(data)
return {
"data": data,
"timestamp": datetime.now().isoformat()
}
@app.post("/strategy")
async def get_strategy(request: StrategyRequest):
"""Get TOP or BOTTOM strategy analysis for a symbol"""
# Get market data
market_data_dict = await get_market_data([request.symbol], request.asset_type)
market_data = market_data_dict.get(request.symbol, {})
if "error" in market_data:
raise HTTPException(status_code=404, detail=f"Could not fetch market data for {request.symbol}")
# Get recent news sentiment if available
news_sentiment = None
recent_news = [
n for n in app.state.data.get("news_analysis", [])
if request.symbol.lower() in n.get("title", "").lower() or request.symbol.lower() in n.get("content", "").lower()
]
if recent_news:
combined_text = " ".join([n.get("content", "") for n in recent_news[-3:]])
news_sentiment = analyze_sentiment(combined_text)
# Generate strategy
strategy = generate_top_bottom_strategy(
request.symbol,
request.asset_type,
market_data,
news_sentiment
)
# Get LLM explanation
strategy_summary = json.dumps({
"strategy_type": request.strategy_type,
"top": strategy.get("top_strategy", {}),
"bottom": strategy.get("bottom_strategy", {})
}, indent=2)
llm_query = f"""Analyze this investment strategy for {request.symbol} ({request.asset_type}):
{strategy_summary}
Provide a comprehensive analysis focusing on the {request.strategy_type} strategy."""
llm_advice = await get_llm_advice(llm_query, strategy)
strategy["llm_analysis"] = llm_advice
# Save strategy
app.state.data["strategies"].append(strategy)
return strategy
@app.post("/news/analyze")
async def analyze_news(news: NewsAnalysis):
"""Analyze financial news sentiment"""
sentiment = analyze_sentiment(news.content)
analysis = {
"title": news.title,
"source": news.source,
"published_date": news.published_date or datetime.now().isoformat(),
"sentiment": sentiment,
"analyzed_at": datetime.now().isoformat()
}
app.state.data["news_analysis"].append(analysis)
# Keep only last 1000 analyses
if len(app.state.data["news_analysis"]) > 1000:
app.state.data["news_analysis"] = app.state.data["news_analysis"][-1000:]
return analysis
@app.get("/news/sentiment/{symbol}")
def get_symbol_sentiment(symbol: str):
"""Get aggregated sentiment for a symbol"""
relevant_news = [
n for n in app.state.data.get("news_analysis", [])
if symbol.lower() in n.get("title", "").lower() or symbol.lower() in n.get("content", "").lower()
]
if not relevant_news:
return {"symbol": symbol, "sentiment": "neutral", "confidence": 0.5, "news_count": 0}
sentiments = [n.get("sentiment", {}).get("sentiment", "neutral") for n in relevant_news]
positive = sentiments.count("positive")
negative = sentiments.count("negative")
neutral = sentiments.count("neutral")
if positive > negative and positive > neutral:
overall = "positive"
confidence = positive / len(sentiments)
elif negative > positive and negative > neutral:
overall = "negative"
confidence = negative / len(sentiments)
else:
overall = "neutral"
confidence = neutral / len(sentiments)
return {
"symbol": symbol,
"sentiment": overall,
"confidence": confidence,
"news_count": len(relevant_news),
"breakdown": {
"positive": positive,
"negative": negative,
"neutral": neutral
}
}
@app.get("/strategies")
def get_all_strategies():
"""Get all saved strategies"""
return {
"strategies": app.state.data.get("strategies", []),
"count": len(app.state.data.get("strategies", []))
}
@app.post("/portfolio")
def add_to_portfolio(position: PortfolioPosition):
"""Add position to portfolio"""
position_dict = position.model_dump()
position_dict["id"] = f"pos_{datetime.now().timestamp()}"
app.state.data["portfolios"].append(position_dict)
return {"message": "Position added", "position": position_dict}
@app.get("/portfolio")
def get_portfolio():
"""Get portfolio"""
return {
"positions": app.state.data.get("portfolios", []),
"count": len(app.state.data.get("portfolios", []))
}
if __name__ == "__main__":
print("πŸš€ Starting Investment Assistant API...")
print("πŸ“Š ETF & Crypto Investment Assistant with Top/Bottom Strategies")
print("πŸ€– Powered by OpenAI")
uvicorn.run(app, host="0.0.0.0", port=8000)