Spaces:
Build error
Build error
| import os | |
| import json | |
| import asyncio | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Optional, Literal | |
| from contextlib import asynccontextmanager | |
| import httpx | |
| import numpy as np | |
| import pandas as pd | |
| from fastapi import FastAPI, HTTPException, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field | |
| import uvicorn | |
| from azure.ai.textanalytics import TextAnalyticsClient | |
| from azure.core.credentials import AzureKeyCredential | |
| from openai import OpenAI | |
| from dotenv import load_dotenv | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # ---------- Config ---------- | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") | |
| OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4") | |
| AZURE_TEXT_ANALYTICS_KEY = os.getenv("AZURE_TEXT_ANALYTICS_KEY", "") | |
| AZURE_TEXT_ANALYTICS_ENDPOINT = os.getenv("AZURE_TEXT_ANALYTICS_ENDPOINT", "") | |
| ALPHA_VANTAGE_API_KEY = os.getenv("ALPHA_VANTAGE_API_KEY", "") | |
| DATA_FILE = "investment_data.json" | |
| # ---------- Clients ---------- | |
| openai_client = None | |
| text_analytics_client = None | |
| # ---------- Lifecycle ---------- | |
| async def lifespan(app: FastAPI): | |
| # Startup: Initialize OpenAI and Azure clients and load data | |
| global openai_client, text_analytics_client | |
| try: | |
| if OPENAI_API_KEY: | |
| openai_client = OpenAI(api_key=OPENAI_API_KEY) | |
| print("β OpenAI client initialized") | |
| if AZURE_TEXT_ANALYTICS_KEY and AZURE_TEXT_ANALYTICS_ENDPOINT: | |
| text_analytics_client = TextAnalyticsClient( | |
| endpoint=AZURE_TEXT_ANALYTICS_ENDPOINT, | |
| credential=AzureKeyCredential(AZURE_TEXT_ANALYTICS_KEY) | |
| ) | |
| print("β Azure Text Analytics client initialized") | |
| except Exception as e: | |
| print(f"β οΈ Warning: Client initialization failed: {e}") | |
| # Load investment data | |
| try: | |
| if os.path.exists(DATA_FILE): | |
| with open(DATA_FILE, "r") as f: | |
| app.state.data = json.load(f) | |
| else: | |
| app.state.data = { | |
| "portfolios": [], | |
| "news_analysis": [], | |
| "market_data": {}, | |
| "strategies": [], | |
| "watchlist": [] | |
| } | |
| print("β Investment data loaded") | |
| except Exception as e: | |
| print(f"Error loading data: {e}") | |
| app.state.data = { | |
| "portfolios": [], | |
| "news_analysis": [], | |
| "market_data": {}, | |
| "strategies": [], | |
| "watchlist": [] | |
| } | |
| yield | |
| # Shutdown: Save data | |
| try: | |
| with open(DATA_FILE, "w") as f: | |
| json.dump(app.state.data, f, indent=2) | |
| except Exception as e: | |
| print(f"Error saving data: {e}") | |
| app = FastAPI( | |
| title="Investment Assistant API - ETF & Crypto", | |
| version="2.0.0", | |
| description="AI-powered investment assistant for ETF and Crypto with Top/Bottom strategies using OpenAI", | |
| lifespan=lifespan | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| app.state.data = None | |
| # ---------- Models ---------- | |
| class ChatMessage(BaseModel): | |
| message: str | |
| context: Optional[Dict] = None | |
| class NewsAnalysis(BaseModel): | |
| title: str | |
| content: str | |
| source: str | |
| published_date: Optional[str] = None | |
| class MarketDataRequest(BaseModel): | |
| symbols: List[str] | |
| asset_type: Literal["ETF", "CRYPTO"] = "ETF" | |
| class StrategyRequest(BaseModel): | |
| symbol: str | |
| asset_type: Literal["ETF", "CRYPTO"] | |
| strategy_type: Literal["TOP", "BOTTOM"] | |
| timeframe: str = "1mo" # 1d, 5d, 1mo, 3mo, 6mo, 1y | |
| class PortfolioPosition(BaseModel): | |
| symbol: str | |
| asset_type: Literal["ETF", "CRYPTO"] | |
| quantity: float | |
| purchase_price: float | |
| purchase_date: str | |
| # ---------- Helper Functions ---------- | |
| async def get_market_data(symbols: List[str], asset_type: str) -> Dict: | |
| """Get market data for ETFs or Crypto""" | |
| results = {} | |
| for symbol in symbols: | |
| try: | |
| if asset_type == "CRYPTO": | |
| # Using CoinGecko API (free, no key needed) | |
| url = f"https://api.coingecko.com/api/v3/coins/{symbol.lower()}/market_chart" | |
| params = { | |
| "vs_currency": "usd", | |
| "days": "30", | |
| "interval": "daily" | |
| } | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get(url, params=params, timeout=10.0) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if "prices" in data and len(data["prices"]) > 0: | |
| prices = [p[1] for p in data["prices"]] | |
| results[symbol] = { | |
| "current_price": prices[-1], | |
| "prices": prices, | |
| "dates": [datetime.fromtimestamp(p[0]/1000).strftime("%Y-%m-%d") for p in data["prices"]], | |
| "high_30d": max(prices), | |
| "low_30d": min(prices), | |
| "change_30d": ((prices[-1] - prices[0]) / prices[0]) * 100, | |
| "asset_type": "CRYPTO" | |
| } | |
| else: | |
| # Using Alpha Vantage for ETFs | |
| if ALPHA_VANTAGE_API_KEY: | |
| url = "https://www.alphavantage.co/query" | |
| params = { | |
| "function": "TIME_SERIES_DAILY", | |
| "symbol": symbol, | |
| "apikey": ALPHA_VANTAGE_API_KEY, | |
| "outputsize": "compact" | |
| } | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get(url, params=params, timeout=10.0) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if "Time Series (Daily)" in data: | |
| time_series = data["Time Series (Daily)"] | |
| dates = sorted(time_series.keys())[-30:] | |
| prices = [float(time_series[date]["4. close"]) for date in dates] | |
| results[symbol] = { | |
| "current_price": prices[-1], | |
| "prices": prices, | |
| "dates": dates, | |
| "high_30d": max(prices), | |
| "low_30d": min(prices), | |
| "change_30d": ((prices[-1] - prices[0]) / prices[0]) * 100, | |
| "asset_type": "ETF" | |
| } | |
| except Exception as e: | |
| print(f"Error fetching data for {symbol}: {e}") | |
| results[symbol] = {"error": str(e)} | |
| return results | |
| def analyze_sentiment(text: str) -> Dict: | |
| """Analyze sentiment using Azure Text Analytics""" | |
| if not text_analytics_client: | |
| # Fallback to simple keyword-based sentiment | |
| positive_words = ["rise", "gain", "bull", "surge", "rally", "growth", "profit", "up"] | |
| negative_words = ["fall", "drop", "bear", "crash", "decline", "loss", "down", "sell"] | |
| text_lower = text.lower() | |
| positive_score = sum(1 for word in positive_words if word in text_lower) | |
| negative_score = sum(1 for word in negative_words if word in text_lower) | |
| if positive_score > negative_score: | |
| sentiment = "positive" | |
| confidence = min(0.7 + (positive_score * 0.05), 0.95) | |
| elif negative_score > positive_score: | |
| sentiment = "negative" | |
| confidence = min(0.7 + (negative_score * 0.05), 0.95) | |
| else: | |
| sentiment = "neutral" | |
| confidence = 0.5 | |
| return { | |
| "sentiment": sentiment, | |
| "confidence": confidence, | |
| "method": "keyword-based" | |
| } | |
| try: | |
| documents = [text[:5000]] # Azure limit | |
| response = text_analytics_client.analyze_sentiment( | |
| documents=documents, | |
| show_opinion_mining=True | |
| )[0] | |
| return { | |
| "sentiment": response.sentiment, | |
| "confidence": { | |
| "positive": response.confidence_scores.positive, | |
| "neutral": response.confidence_scores.neutral, | |
| "negative": response.confidence_scores.negative | |
| }, | |
| "method": "azure-text-analytics" | |
| } | |
| except Exception as e: | |
| print(f"Error in sentiment analysis: {e}") | |
| return {"sentiment": "neutral", "confidence": 0.5, "error": str(e)} | |
| def calculate_technical_indicators(prices: List[float]) -> Dict: | |
| """Calculate technical indicators for strategy analysis""" | |
| if len(prices) < 2: | |
| return {} | |
| df = pd.Series(prices) | |
| # Moving averages | |
| sma_10 = df.rolling(window=min(10, len(df))).mean().iloc[-1] | |
| sma_20 = df.rolling(window=min(20, len(df))).mean().iloc[-1] | |
| # RSI | |
| delta = df.diff() | |
| gain = (delta.where(delta > 0, 0)).rolling(window=min(14, len(df))).mean() | |
| loss = (-delta.where(delta < 0, 0)).rolling(window=min(14, len(df))).mean() | |
| rs = gain / loss | |
| rsi = 100 - (100 / (1 + rs)) | |
| rsi_value = rsi.iloc[-1] if not rsi.empty else 50 | |
| # Volatility | |
| volatility = df.pct_change().std() * 100 | |
| # Current position relative to range | |
| current = prices[-1] | |
| high_30d = max(prices) | |
| low_30d = min(prices) | |
| price_position = ((current - low_30d) / (high_30d - low_30d)) * 100 if high_30d != low_30d else 50 | |
| return { | |
| "sma_10": float(sma_10), | |
| "sma_20": float(sma_20), | |
| "rsi": float(rsi_value), | |
| "volatility": float(volatility), | |
| "price_position": float(price_position), # 0 = bottom, 100 = top | |
| "current_price": current, | |
| "high_30d": high_30d, | |
| "low_30d": low_30d | |
| } | |
| def generate_top_bottom_strategy(symbol: str, asset_type: str, market_data: Dict, news_sentiment: Optional[Dict] = None) -> Dict: | |
| """Generate TOP or BOTTOM strategy recommendations""" | |
| if "error" in market_data: | |
| return {"error": "Insufficient market data"} | |
| indicators = calculate_technical_indicators(market_data["prices"]) | |
| price_position = indicators.get("price_position", 50) | |
| rsi = indicators.get("rsi", 50) | |
| sentiment_score = 0.5 | |
| if news_sentiment: | |
| if news_sentiment["sentiment"] == "positive": | |
| sentiment_score = 0.7 | |
| elif news_sentiment["sentiment"] == "negative": | |
| sentiment_score = 0.3 | |
| # TOP Strategy (Momentum - Buy at highs) | |
| top_score = 0 | |
| top_reasons = [] | |
| if price_position > 70: | |
| top_score += 0.3 | |
| top_reasons.append(f"Price at {price_position:.1f}% of 30-day range (near top)") | |
| if rsi > 50 and rsi < 70: | |
| top_score += 0.2 | |
| top_reasons.append(f"RSI at {rsi:.1f} (showing momentum)") | |
| if sentiment_score > 0.6: | |
| top_score += 0.2 | |
| top_reasons.append("Positive news sentiment") | |
| if indicators.get("sma_10", 0) > indicators.get("sma_20", 0): | |
| top_score += 0.15 | |
| top_reasons.append("Short-term MA above long-term MA (uptrend)") | |
| if market_data.get("change_30d", 0) > 5: | |
| top_score += 0.15 | |
| top_reasons.append(f"30-day gain of {market_data['change_30d']:.1f}%") | |
| # BOTTOM Strategy (Value - Buy at lows) | |
| bottom_score = 0 | |
| bottom_reasons = [] | |
| if price_position < 30: | |
| bottom_score += 0.3 | |
| bottom_reasons.append(f"Price at {price_position:.1f}% of 30-day range (near bottom)") | |
| if rsi < 30: | |
| bottom_score += 0.25 | |
| bottom_reasons.append(f"RSI at {rsi:.1f} (oversold condition)") | |
| if sentiment_score < 0.4: | |
| bottom_score += 0.15 | |
| bottom_reasons.append("Negative news sentiment (contrarian opportunity)") | |
| if indicators.get("sma_10", 0) < indicators.get("sma_20", 0): | |
| bottom_score += 0.15 | |
| bottom_reasons.append("Short-term MA below long-term MA (potential reversal)") | |
| if market_data.get("change_30d", 0) < -10: | |
| bottom_score += 0.15 | |
| bottom_reasons.append(f"30-day decline of {market_data['change_30d']:.1f}% (potential bounce)") | |
| return { | |
| "symbol": symbol, | |
| "asset_type": asset_type, | |
| "current_price": market_data["current_price"], | |
| "indicators": indicators, | |
| "top_strategy": { | |
| "score": min(top_score, 1.0), | |
| "recommendation": "BUY" if top_score > 0.5 else "HOLD", | |
| "confidence": "HIGH" if top_score > 0.7 else "MEDIUM" if top_score > 0.5 else "LOW", | |
| "reasons": top_reasons, | |
| "strategy": "Momentum - Follow the trend, buy at highs" | |
| }, | |
| "bottom_strategy": { | |
| "score": min(bottom_score, 1.0), | |
| "recommendation": "BUY" if bottom_score > 0.5 else "HOLD", | |
| "confidence": "HIGH" if bottom_score > 0.7 else "MEDIUM" if bottom_score > 0.5 else "LOW", | |
| "reasons": bottom_reasons, | |
| "strategy": "Value - Buy the dip, contrarian approach" | |
| }, | |
| "generated_at": datetime.now().isoformat() | |
| } | |
| async def get_llm_advice(query: str, context: Optional[Dict] = None) -> str: | |
| """Get investment advice from OpenAI""" | |
| if not openai_client: | |
| return "OpenAI is not configured. Please set OPENAI_API_KEY environment variable." | |
| system_prompt = """You are an expert financial advisor specializing in ETF and cryptocurrency investments. | |
| Your expertise includes: | |
| - Technical analysis and market trends | |
| - Top (momentum) and Bottom (value) investment strategies | |
| - Risk assessment and portfolio management | |
| - Market sentiment analysis | |
| Provide clear, actionable advice based on the provided context. Always include: | |
| 1. Clear recommendation (BUY/HOLD/SELL) | |
| 2. Reasoning based on data | |
| 3. Risk factors | |
| 4. Suggested strategy (TOP/BOTTOM) | |
| Be concise but comprehensive.""" | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": query} | |
| ] | |
| if context: | |
| context_str = json.dumps(context, indent=2) | |
| messages.append({ | |
| "role": "assistant", | |
| "content": f"Context data:\n{context_str}" | |
| }) | |
| try: | |
| response = openai_client.chat.completions.create( | |
| model=OPENAI_MODEL, | |
| messages=messages, | |
| temperature=0.7, | |
| max_tokens=1000 | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| return f"Error getting LLM advice: {str(e)}" | |
| # ---------- Endpoints ---------- | |
| def root(): | |
| return { | |
| "message": "Investment Assistant API - ETF & Crypto", | |
| "version": "2.0.0", | |
| "features": [ | |
| "LLM-powered investment advice", | |
| "Top/Bottom strategy analysis", | |
| "News sentiment analysis", | |
| "Market data for ETFs and Crypto", | |
| "Portfolio management" | |
| ], | |
| "endpoints": { | |
| "health": "/health", | |
| "chat": "/chat", | |
| "market_data": "/market-data", | |
| "strategy": "/strategy", | |
| "news_analysis": "/news/analyze", | |
| "docs": "/docs" | |
| } | |
| } | |
| def health(): | |
| return { | |
| "status": "healthy", | |
| "openai": openai_client is not None, | |
| "azure_text_analytics": text_analytics_client is not None, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def chat(message: ChatMessage): | |
| """Chat with the investment assistant LLM""" | |
| advice = await get_llm_advice(message.message, message.context) | |
| return { | |
| "query": message.message, | |
| "advice": advice, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def get_market_data_endpoint(request: MarketDataRequest): | |
| """Get market data for ETFs or Crypto""" | |
| data = await get_market_data(request.symbols, request.asset_type) | |
| app.state.data["market_data"].update(data) | |
| return { | |
| "data": data, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def get_strategy(request: StrategyRequest): | |
| """Get TOP or BOTTOM strategy analysis for a symbol""" | |
| # Get market data | |
| market_data_dict = await get_market_data([request.symbol], request.asset_type) | |
| market_data = market_data_dict.get(request.symbol, {}) | |
| if "error" in market_data: | |
| raise HTTPException(status_code=404, detail=f"Could not fetch market data for {request.symbol}") | |
| # Get recent news sentiment if available | |
| news_sentiment = None | |
| recent_news = [ | |
| n for n in app.state.data.get("news_analysis", []) | |
| if request.symbol.lower() in n.get("title", "").lower() or request.symbol.lower() in n.get("content", "").lower() | |
| ] | |
| if recent_news: | |
| combined_text = " ".join([n.get("content", "") for n in recent_news[-3:]]) | |
| news_sentiment = analyze_sentiment(combined_text) | |
| # Generate strategy | |
| strategy = generate_top_bottom_strategy( | |
| request.symbol, | |
| request.asset_type, | |
| market_data, | |
| news_sentiment | |
| ) | |
| # Get LLM explanation | |
| strategy_summary = json.dumps({ | |
| "strategy_type": request.strategy_type, | |
| "top": strategy.get("top_strategy", {}), | |
| "bottom": strategy.get("bottom_strategy", {}) | |
| }, indent=2) | |
| llm_query = f"""Analyze this investment strategy for {request.symbol} ({request.asset_type}): | |
| {strategy_summary} | |
| Provide a comprehensive analysis focusing on the {request.strategy_type} strategy.""" | |
| llm_advice = await get_llm_advice(llm_query, strategy) | |
| strategy["llm_analysis"] = llm_advice | |
| # Save strategy | |
| app.state.data["strategies"].append(strategy) | |
| return strategy | |
| async def analyze_news(news: NewsAnalysis): | |
| """Analyze financial news sentiment""" | |
| sentiment = analyze_sentiment(news.content) | |
| analysis = { | |
| "title": news.title, | |
| "source": news.source, | |
| "published_date": news.published_date or datetime.now().isoformat(), | |
| "sentiment": sentiment, | |
| "analyzed_at": datetime.now().isoformat() | |
| } | |
| app.state.data["news_analysis"].append(analysis) | |
| # Keep only last 1000 analyses | |
| if len(app.state.data["news_analysis"]) > 1000: | |
| app.state.data["news_analysis"] = app.state.data["news_analysis"][-1000:] | |
| return analysis | |
| def get_symbol_sentiment(symbol: str): | |
| """Get aggregated sentiment for a symbol""" | |
| relevant_news = [ | |
| n for n in app.state.data.get("news_analysis", []) | |
| if symbol.lower() in n.get("title", "").lower() or symbol.lower() in n.get("content", "").lower() | |
| ] | |
| if not relevant_news: | |
| return {"symbol": symbol, "sentiment": "neutral", "confidence": 0.5, "news_count": 0} | |
| sentiments = [n.get("sentiment", {}).get("sentiment", "neutral") for n in relevant_news] | |
| positive = sentiments.count("positive") | |
| negative = sentiments.count("negative") | |
| neutral = sentiments.count("neutral") | |
| if positive > negative and positive > neutral: | |
| overall = "positive" | |
| confidence = positive / len(sentiments) | |
| elif negative > positive and negative > neutral: | |
| overall = "negative" | |
| confidence = negative / len(sentiments) | |
| else: | |
| overall = "neutral" | |
| confidence = neutral / len(sentiments) | |
| return { | |
| "symbol": symbol, | |
| "sentiment": overall, | |
| "confidence": confidence, | |
| "news_count": len(relevant_news), | |
| "breakdown": { | |
| "positive": positive, | |
| "negative": negative, | |
| "neutral": neutral | |
| } | |
| } | |
| def get_all_strategies(): | |
| """Get all saved strategies""" | |
| return { | |
| "strategies": app.state.data.get("strategies", []), | |
| "count": len(app.state.data.get("strategies", [])) | |
| } | |
| def add_to_portfolio(position: PortfolioPosition): | |
| """Add position to portfolio""" | |
| position_dict = position.model_dump() | |
| position_dict["id"] = f"pos_{datetime.now().timestamp()}" | |
| app.state.data["portfolios"].append(position_dict) | |
| return {"message": "Position added", "position": position_dict} | |
| def get_portfolio(): | |
| """Get portfolio""" | |
| return { | |
| "positions": app.state.data.get("portfolios", []), | |
| "count": len(app.state.data.get("portfolios", [])) | |
| } | |
| if __name__ == "__main__": | |
| print("π Starting Investment Assistant API...") | |
| print("π ETF & Crypto Investment Assistant with Top/Bottom Strategies") | |
| print("π€ Powered by OpenAI") | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |