Spaces:
Sleeping
Sleeping
| from flask import Flask, render_template, jsonify, request, session | |
| import datetime | |
| import json | |
| import os | |
| import time | |
| import requests | |
| from functools import wraps | |
| import yfinance as yf | |
| import pandas as pd | |
| import numpy as np | |
| import random | |
| from functools import lru_cache | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from sklearn.preprocessing import MinMaxScaler | |
| from textblob import TextBlob | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| app = Flask(__name__) | |
| app.secret_key = 'your_secret_key_here' # Required for session management | |
| # Custom JSON encoder to handle numpy and pandas types | |
| class CustomJSONEncoder(json.JSONEncoder): | |
| def default(self, obj): | |
| if isinstance(obj, (np.int_, np.intc, np.intp, np.int8, | |
| np.int16, np.int32, np.int64, np.uint8, | |
| np.uint16, np.uint32, np.uint64)): | |
| return int(obj) | |
| elif isinstance(obj, (np.float_, np.float16, np.float32, np.float64)): | |
| # Handle NaN and Infinity | |
| if np.isnan(obj): | |
| return None | |
| if np.isinf(obj): | |
| return None | |
| return float(obj) | |
| elif isinstance(obj, (np.bool_)): | |
| return bool(obj) | |
| elif isinstance(obj, (np.ndarray,)): | |
| return [self.default(x) for x in obj.tolist()] | |
| elif isinstance(obj, (datetime.datetime, datetime.date)): | |
| return obj.isoformat() | |
| elif isinstance(obj, pd.Series): | |
| return [self.default(x) for x in obj.tolist()] | |
| elif isinstance(obj, pd.Timestamp): | |
| return obj.isoformat() | |
| elif pd.isna(obj): # Handle pandas NA/NaN | |
| return None | |
| return super().default(obj) | |
| # Configure Flask to use the custom JSON encoder | |
| app.json_encoder = CustomJSONEncoder | |
| # Configuration constants | |
| NEWS_API_KEY = os.getenv('NEWS_API_KEY', 'fd941d9b5c46456a953dc6ecafbe7b50') | |
| FINNHUB_API_KEY = os.getenv('FINNHUB_API_KEY', 'd0b7ec1r01qo0h63fns0d0b7ec1r01qo0h63fnsg') | |
| REQUEST_TIMEOUT = 10 | |
| MAX_RETRIES = 3 | |
| BATCH_SIZE = 10 | |
| # Define sectors and their representative tickers | |
| SECTORS = { | |
| "Technology": ["AAPL", "MSFT", "GOOGL", "AMZN", "META", "NVDA", "INTC", "AMD", "CRM", "CSCO"], | |
| "Healthcare": ["JNJ", "PFE", "UNH", "MRK", "ABBV", "LLY", "BMY", "AMGN", "TMO", "ABT"], | |
| "Energy": ["XOM", "CVX", "COP", "SLB", "EOG", "MPC", "PSX", "VLO", "OXY", "DVN"], | |
| "Automobile": ["TSLA", "F", "GM", "TM", "STLA", "RIVN", "LCID", "NIO", "HMC", "RACE"], | |
| "Finance": ["JPM", "BAC", "WFC", "GS", "MS", "C", "BLK", "AXP", "V", "MA"] | |
| } | |
| # Sector colors for UI | |
| SECTOR_COLORS = { | |
| "Technology": "#3498db", | |
| "Healthcare": "#27ae60", | |
| "Energy": "#e67e22", | |
| "Automobile": "#e74c3c", | |
| "Finance": "#9b59b6" | |
| } | |
| CACHE_DIR = os.path.join("/tmp", "data_cache") | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| # Cache functions | |
| def get_from_cache(key): | |
| cache_file = os.path.join(CACHE_DIR, f"{key.replace('/', '_')}.json") | |
| if os.path.exists(cache_file): | |
| try: | |
| with open(cache_file, 'r') as f: | |
| cache_data = json.load(f) | |
| if time.time() - cache_data['timestamp'] < 3600: # 1 hour cache | |
| return cache_data['data'] | |
| except Exception as e: | |
| print(f"Error reading cache file {cache_file}: {str(e)}") | |
| return None | |
| def save_to_cache(key, data): | |
| cache_file = os.path.join(CACHE_DIR, f"{key.replace('/', '_')}.json") | |
| try: | |
| with open(cache_file, 'w') as f: | |
| json.dump({ | |
| 'data': data, | |
| 'timestamp': time.time() | |
| }, f, cls=CustomJSONEncoder) | |
| except Exception as e: | |
| print(f"Error writing to cache file {cache_file}: {str(e)}") | |
| # Retry decorator | |
| def retry_with_backoff(retries=MAX_RETRIES, backoff_factor=0.5): | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| mtries, mdelay = retries, backoff_factor | |
| while mtries > 0: | |
| try: | |
| return func(*args, **kwargs) | |
| except Exception as e: | |
| mtries -= 1 | |
| if mtries == 0: | |
| raise | |
| time.sleep(mdelay * (2 ** (retries - mtries))) | |
| return None | |
| return wrapper | |
| return decorator | |
| # News API functions | |
| def fetch_latest_news(ticker="TSLA"): | |
| url = f"https://newsapi.org/v2/everything?q={ticker}&apiKey={NEWS_API_KEY}&sortBy=publishedAt&language=en" | |
| response = requests.get(url, timeout=REQUEST_TIMEOUT) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get("status") == "ok" and data.get("totalResults", 0) > 0: | |
| article = data["articles"][0] | |
| return { | |
| "title": article["title"], | |
| "content": article.get("content") or article.get("description", ""), | |
| "ticker": ticker, | |
| "url": article.get("url", ""), | |
| "source": article.get("source", {}).get("name", "Unknown"), | |
| "publishedAt": article.get("publishedAt", "") | |
| } | |
| return None | |
| def get_multiple_news(query, count=5): | |
| url = f"https://newsapi.org/v2/everything?q={query}&apiKey={NEWS_API_KEY}&sortBy=publishedAt&language=en&pageSize={count}" | |
| response = requests.get(url, timeout=REQUEST_TIMEOUT) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get("status") == "ok" and data.get("totalResults", 0) > 0: | |
| articles = [] | |
| for article in data["articles"][:count]: | |
| if article.get("title") and (article.get("content") or article.get("description")): | |
| articles.append({ | |
| "title": article["title"], | |
| "content": article.get("content") or article.get("description", ""), | |
| "url": article.get("url", ""), | |
| "source": article.get("source", {}).get("name", "Unknown"), | |
| "publishedAt": article.get("publishedAt", "") | |
| }) | |
| return articles | |
| return [] | |
| # Stock data functions | |
| def get_stock_data(ticker, period="1mo"): | |
| try: | |
| stock = yf.Ticker(ticker) | |
| # Try to get history with a longer interval if shorter fails | |
| for attempt_period in [period, "3mo", "6mo"]: | |
| hist = stock.history(period=attempt_period) | |
| if not hist.empty: | |
| break | |
| if hist.empty: | |
| print(f"Error: No data available for {ticker}") | |
| return None | |
| # Get company info with error handling | |
| try: | |
| info = stock.info | |
| except Exception as e: | |
| print(f"Error fetching info for {ticker}: {str(e)}") | |
| info = {} | |
| # Verify we have the minimum required data | |
| if 'Close' not in hist.columns or len(hist) < 2: | |
| print(f"Error: Insufficient price data for {ticker}") | |
| return None | |
| return { | |
| 'history': hist, | |
| 'info': info | |
| } | |
| except Exception as e: | |
| print(f"Error fetching stock data for {ticker}: {str(e)}") | |
| return None | |
| def get_sector_performance(sector_tickers, days=30): | |
| period = f"{days}d" | |
| if days > 30: | |
| period = f"{days//30}mo" # Convert to months for longer periods | |
| sector_performance = { | |
| 'avg_price_change': 0.0, | |
| 'avg_volume_change': 0.0, | |
| 'top_performers': [], | |
| 'is_booming': False | |
| } | |
| stock_performances = [] | |
| for ticker in sector_tickers[:5]: | |
| try: | |
| stock_data = get_stock_data(ticker, period=period) | |
| if not stock_data or stock_data['history'].empty: | |
| print(f"Skipping {ticker} due to missing data") | |
| continue | |
| hist = stock_data['history'] | |
| info = stock_data['info'] | |
| if len(hist) < 2: | |
| print(f"Insufficient history for {ticker}") | |
| continue | |
| first_price = float(hist['Close'].iloc[0]) | |
| last_price = float(hist['Close'].iloc[-1]) | |
| price_change_pct = float(((last_price - first_price) / first_price) * 100) | |
| first_volume = float(hist['Volume'].iloc[0]) | |
| avg_volume = float(hist['Volume'].mean()) | |
| volume_change_pct = float(((avg_volume - first_volume) / first_volume) * 100 if first_volume > 0 else 0) | |
| stock_data = { | |
| 'ticker': str(ticker), | |
| 'company': str(info.get('shortName', ticker)), | |
| 'current_price': float(last_price), | |
| 'price_change_pct': float(price_change_pct), | |
| 'volume_change_pct': float(volume_change_pct), | |
| 'market_cap': int(info.get('marketCap', 0)), | |
| 'chart_data': { | |
| 'dates': [d.strftime('%Y-%m-%d') for d in hist.index], | |
| 'prices': [float(p) for p in hist['Close'].values] | |
| } | |
| } | |
| stock_performances.append(stock_data) | |
| except Exception as e: | |
| print(f"Error processing {ticker}: {str(e)}") | |
| continue | |
| if stock_performances: | |
| sector_performance['avg_price_change'] = float(sum(stock['price_change_pct'] for stock in stock_performances) / len(stock_performances)) | |
| sector_performance['avg_volume_change'] = float(sum(stock['volume_change_pct'] for stock in stock_performances) / len(stock_performances)) | |
| stock_performances.sort(key=lambda x: x['price_change_pct'], reverse=True) | |
| sector_performance['top_performers'] = stock_performances[:5] | |
| sector_performance['is_booming'] = bool(sector_performance['avg_price_change'] > 2.0) | |
| return sector_performance | |
| def generate_stock_insight(ticker, sector): | |
| try: | |
| stock_data = get_stock_data(ticker, period="30d") | |
| if not stock_data or stock_data['history'].empty: | |
| return { | |
| "analysis": f"Insufficient data available for {ticker}.", | |
| "outlook": "Unable to provide outlook due to limited data.", | |
| "risk_factors": ["Data availability"] | |
| } | |
| hist = stock_data['history'] | |
| info = stock_data['info'] | |
| first_price = hist['Close'].iloc[0] | |
| last_price = hist['Close'].iloc[-1] | |
| price_change_30d = ((last_price - first_price) / first_price) * 100 | |
| price_volatility = hist['Close'].pct_change().std() * 100 | |
| recent_news = fetch_latest_news(ticker) | |
| news_factor = f"Recent news: {recent_news['title']}" if recent_news else "" | |
| industry_trends = { | |
| "Technology": "ongoing AI innovations and chip demand", | |
| "Healthcare": "post-pandemic recovery and aging population demands", | |
| "Energy": "transition to renewables and fluctuating oil prices", | |
| "Automobile": "EV adoption and supply chain improvements", | |
| "Finance": "interest rate adjustments and fintech integration" | |
| } | |
| trend = industry_trends.get(sector, "evolving market conditions") | |
| if price_change_30d > 10: | |
| analysis = f"{ticker} has shown strong performance with a {price_change_30d:.2f}% gain over the last 30 days. This outperformance appears driven by {trend}. {news_factor}" | |
| outlook = "Short-term outlook remains positive with momentum indicators suggesting continued strength." | |
| risk_factors = ["Market volatility", "Potential market-wide corrections", "Overextended valuations"] | |
| elif price_change_30d > 0: | |
| analysis = f"{ticker} has shown moderate growth with a {price_change_30d:.2f}% gain over the last 30 days, in line with {sector} sector trends. {news_factor}" | |
| outlook = f"The stock appears to be following the {sector} sector with steady growth potential." | |
| risk_factors = ["Competitive pressures", "Sector rotation", "Modest growth projections"] | |
| else: | |
| analysis = f"{ticker} has underperformed with a {price_change_30d:.2f}% decline over the last 30 days. {news_factor}" | |
| outlook = "The stock may face continued headwinds in the short term." | |
| risk_factors = ["Continued underperformance", "Negative sentiment", "Technical weakness"] | |
| if price_volatility > 3: | |
| risk_factors.append(f"High price volatility ({price_volatility:.2f}%)") | |
| return { | |
| "analysis": analysis, | |
| "outlook": outlook, | |
| "risk_factors": risk_factors | |
| } | |
| except Exception as e: | |
| print(f"Error generating insight for {ticker}: {str(e)}") | |
| return { | |
| "analysis": f"Error analyzing {ticker}", | |
| "outlook": "Unable to provide outlook.", | |
| "risk_factors": ["Data error"] | |
| } | |
| # Technical Analysis Functions | |
| def calculate_technical_indicators(historical_data): | |
| """Calculate various technical indicators for the given historical data.""" | |
| try: | |
| df = historical_data.copy() | |
| # Calculate RSI (14-day) | |
| delta = df['Close'].diff() | |
| gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() | |
| loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() | |
| rs = gain / loss | |
| df['RSI'] = 100 - (100 / (1 + rs)) | |
| # Calculate MACD | |
| exp1 = df['Close'].ewm(span=12, adjust=False).mean() | |
| exp2 = df['Close'].ewm(span=26, adjust=False).mean() | |
| df['MACD'] = exp1 - exp2 | |
| df['Signal_Line'] = df['MACD'].ewm(span=9, adjust=False).mean() | |
| # Calculate Moving Averages | |
| df['SMA_20'] = df['Close'].rolling(window=20, min_periods=1).mean() | |
| df['SMA_50'] = df['Close'].rolling(window=50, min_periods=1).mean() | |
| df['SMA_200'] = df['Close'].rolling(window=200, min_periods=1).mean() | |
| # Calculate Bollinger Bands | |
| df['BB_middle'] = df['Close'].rolling(window=20, min_periods=1).mean() | |
| bb_std = df['Close'].rolling(window=20, min_periods=1).std() | |
| df['BB_upper'] = df['BB_middle'] + 2 * bb_std | |
| df['BB_lower'] = df['BB_middle'] - 2 * bb_std | |
| # Handle NaN values | |
| for column in df.columns: | |
| if df[column].dtype in [np.float64, np.float32]: | |
| df[column] = df[column].fillna(method='ffill').fillna(method='bfill') | |
| df[column] = df[column].fillna(0) | |
| return df | |
| except Exception as e: | |
| print(f"Error calculating technical indicators: {str(e)}") | |
| return historical_data | |
| def get_technical_analysis_summary(df): | |
| """Generate a summary of technical indicators.""" | |
| try: | |
| latest = df.iloc[-1] | |
| prev = df.iloc[-2] | |
| # Handle potential NaN values in the summary | |
| rsi_value = float(latest.get('RSI', 0)) if not pd.isna(latest.get('RSI', 0)) else 0 | |
| macd_value = float(latest.get('MACD', 0)) if not pd.isna(latest.get('MACD', 0)) else 0 | |
| signal_value = float(latest.get('Signal_Line', 0)) if not pd.isna(latest.get('Signal_Line', 0)) else 0 | |
| summary = { | |
| 'indicators': { | |
| 'RSI': { | |
| 'value': round(rsi_value, 2), | |
| 'signal': 'Overbought' if rsi_value > 70 else 'Oversold' if rsi_value < 30 else 'Neutral' | |
| }, | |
| 'MACD': { | |
| 'value': round(macd_value, 2), | |
| 'signal': 'Bullish' if macd_value > signal_value else 'Bearish' | |
| }, | |
| 'Moving_Averages': { | |
| 'SMA_20': round(float(latest.get('SMA_20', 0)), 2), | |
| 'SMA_50': round(float(latest.get('SMA_50', 0)), 2), | |
| 'SMA_200': round(float(latest.get('SMA_200', 0)), 2), | |
| 'trend': 'Bullish' if latest.get('SMA_20', 0) > latest.get('SMA_50', 0) > latest.get('SMA_200', 0) | |
| else 'Bearish' if latest.get('SMA_20', 0) < latest.get('SMA_50', 0) < latest.get('SMA_200', 0) | |
| else 'Mixed' | |
| }, | |
| 'Bollinger_Bands': { | |
| 'upper': round(float(latest.get('BB_upper', 0)), 2), | |
| 'middle': round(float(latest.get('BB_middle', 0)), 2), | |
| 'lower': round(float(latest.get('BB_lower', 0)), 2), | |
| 'position': 'Upper' if latest['Close'] > latest.get('BB_upper', float('inf')) | |
| else 'Lower' if latest['Close'] < latest.get('BB_lower', float('-inf')) | |
| else 'Middle' | |
| } | |
| }, | |
| 'analysis': [] | |
| } | |
| # Generate analysis points | |
| if rsi_value > 70: | |
| summary['analysis'].append('RSI indicates overbought conditions') | |
| elif rsi_value < 30: | |
| summary['analysis'].append('RSI indicates oversold conditions') | |
| if macd_value > signal_value and prev.get('MACD', 0) <= prev.get('Signal_Line', 0): | |
| summary['analysis'].append('MACD shows a fresh bullish crossover') | |
| elif macd_value < signal_value and prev.get('MACD', 0) >= prev.get('Signal_Line', 0): | |
| summary['analysis'].append('MACD shows a fresh bearish crossover') | |
| if latest['Close'] > latest.get('BB_upper', float('inf')): | |
| summary['analysis'].append('Price is trading above upper Bollinger Band, suggesting strong upward momentum') | |
| elif latest['Close'] < latest.get('BB_lower', float('-inf')): | |
| summary['analysis'].append('Price is trading below lower Bollinger Band, suggesting strong downward momentum') | |
| if not summary['analysis']: | |
| summary['analysis'].append('No significant technical signals at this time') | |
| return summary | |
| except Exception as e: | |
| print(f"Error generating technical analysis summary: {str(e)}") | |
| return { | |
| 'indicators': { | |
| 'RSI': {'value': 0, 'signal': 'Neutral'}, | |
| 'MACD': {'value': 0, 'signal': 'Neutral'}, | |
| 'Moving_Averages': { | |
| 'SMA_20': 0, | |
| 'SMA_50': 0, | |
| 'SMA_200': 0, | |
| 'trend': 'Neutral' | |
| }, | |
| 'Bollinger_Bands': { | |
| 'upper': 0, | |
| 'middle': 0, | |
| 'lower': 0, | |
| 'position': 'Middle' | |
| } | |
| }, | |
| 'analysis': ['Technical analysis currently unavailable'] | |
| } | |
| # User watchlist storage (in-memory for demonstration) | |
| user_watchlists = {} | |
| def manage_watchlist(): | |
| user_id = session.get('user_id', 'default_user') | |
| if request.method == 'GET': | |
| # Get user's watchlist | |
| watchlist = user_watchlists.get(user_id, []) | |
| # Fetch current data for all watchlist stocks | |
| watchlist_data = [] | |
| for ticker in watchlist: | |
| try: | |
| stock_data = get_stock_data(ticker, period="1mo") # Get 1 month of data for charts | |
| if stock_data and not stock_data['history'].empty: | |
| hist = stock_data['history'] | |
| latest_price = float(hist['Close'].iloc[-1]) | |
| price_change = float(hist['Close'].pct_change().iloc[-1] * 100) | |
| # Calculate technical indicators | |
| tech_data = calculate_technical_indicators(hist) | |
| watchlist_data.append({ | |
| 'ticker': ticker, | |
| 'current_price': latest_price, | |
| 'price_change': price_change, | |
| 'last_updated': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), | |
| 'chart_data': { | |
| 'dates': [d.strftime('%Y-%m-%d') for d in hist.index], | |
| 'prices': [float(p) for p in hist['Close'].values], | |
| 'volume': [float(v) for v in hist['Volume'].values], | |
| 'technical_indicators': { | |
| 'sma_20': [float(s) if not pd.isna(s) else None for s in tech_data['SMA_20'].values] if 'SMA_20' in tech_data else [], | |
| 'sma_50': [float(s) if not pd.isna(s) else None for s in tech_data['SMA_50'].values] if 'SMA_50' in tech_data else [], | |
| 'rsi': [float(r) if not pd.isna(r) else None for r in tech_data['RSI'].values] if 'RSI' in tech_data else [] | |
| } | |
| } | |
| }) | |
| except Exception as e: | |
| print(f"Error fetching data for {ticker}: {str(e)}") | |
| return jsonify({ | |
| 'status': 'success', | |
| 'data': watchlist_data | |
| }) | |
| elif request.method == 'POST': | |
| # Add stock to watchlist | |
| data = request.get_json() | |
| ticker = data.get('ticker') | |
| if not ticker: | |
| return jsonify({'status': 'error', 'message': 'No ticker provided'}) | |
| if user_id not in user_watchlists: | |
| user_watchlists[user_id] = [] | |
| if ticker not in user_watchlists[user_id]: | |
| user_watchlists[user_id].append(ticker) | |
| return jsonify({ | |
| 'status': 'success', | |
| 'message': f'Added {ticker} to watchlist' | |
| }) | |
| elif request.method == 'DELETE': | |
| # Remove stock from watchlist | |
| data = request.get_json() | |
| ticker = data.get('ticker') | |
| if not ticker: | |
| return jsonify({'status': 'error', 'message': 'No ticker provided'}) | |
| if user_id in user_watchlists and ticker in user_watchlists[user_id]: | |
| user_watchlists[user_id].remove(ticker) | |
| return jsonify({ | |
| 'status': 'success', | |
| 'message': f'Removed {ticker} from watchlist' | |
| }) | |
| def manage_price_alerts(): | |
| user_id = session.get('user_id', 'default_user') | |
| if request.method == 'GET': | |
| # Get user's price alerts | |
| alerts = price_alerts.get(user_id, []) | |
| return jsonify({ | |
| 'status': 'success', | |
| 'data': alerts | |
| }) | |
| elif request.method == 'POST': | |
| # Add new price alert | |
| data = request.get_json() | |
| ticker = data.get('ticker') | |
| target_price = data.get('target_price') | |
| alert_type = data.get('alert_type') # 'above' or 'below' | |
| if not all([ticker, target_price, alert_type]): | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': 'Missing required fields' | |
| }) | |
| if user_id not in price_alerts: | |
| price_alerts[user_id] = [] | |
| # Check if alert already exists for this ticker | |
| existing_alert = next((a for a in price_alerts[user_id] if a['ticker'] == ticker), None) | |
| if existing_alert: | |
| existing_alert.update({ | |
| 'target_price': float(target_price), | |
| 'alert_type': alert_type, | |
| 'updated_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| }) | |
| else: | |
| price_alerts[user_id].append({ | |
| 'ticker': ticker, | |
| 'target_price': float(target_price), | |
| 'alert_type': alert_type, | |
| 'created_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| }) | |
| return jsonify({ | |
| 'status': 'success', | |
| 'message': 'Price alert created successfully' | |
| }) | |
| elif request.method == 'DELETE': | |
| # Remove price alert | |
| data = request.get_json() | |
| ticker = data.get('ticker') | |
| if not ticker: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': 'No ticker provided' | |
| }) | |
| if user_id in price_alerts: | |
| price_alerts[user_id] = [alert for alert in price_alerts[user_id] if alert['ticker'] != ticker] | |
| return jsonify({ | |
| 'status': 'success', | |
| 'message': f'Removed alert for {ticker}' | |
| }) | |
| def portfolio_analysis(): | |
| user_id = session.get('user_id', 'default_user') | |
| if request.method == 'DELETE': | |
| # Remove holding from portfolio | |
| data = request.get_json() | |
| ticker = data.get('ticker') | |
| if not ticker: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': 'No ticker provided' | |
| }) | |
| if user_id in user_portfolios: | |
| user_portfolios[user_id] = [h for h in user_portfolios[user_id] if h['ticker'] != ticker] | |
| return jsonify({ | |
| 'status': 'success', | |
| 'message': f'Removed {ticker} from portfolio' | |
| }) | |
| elif request.method == 'POST': | |
| # Add or update portfolio holdings | |
| data = request.get_json() | |
| new_holdings = data.get('holdings', []) | |
| if not new_holdings: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': 'No holdings provided' | |
| }) | |
| if user_id not in user_portfolios: | |
| user_portfolios[user_id] = [] | |
| # Update or add new holdings | |
| for new_holding in new_holdings: | |
| ticker = new_holding['ticker'] | |
| shares = new_holding['shares'] | |
| # Check if holding already exists | |
| existing_holding = next((h for h in user_portfolios[user_id] if h['ticker'] == ticker), None) | |
| if existing_holding: | |
| existing_holding['shares'] = shares | |
| else: | |
| user_portfolios[user_id].append({ | |
| 'ticker': ticker, | |
| 'shares': shares | |
| }) | |
| # Get portfolio analysis | |
| holdings = user_portfolios.get(user_id, []) | |
| if not holdings: | |
| return jsonify({ | |
| 'status': 'success', | |
| 'data': { | |
| 'total_value': 0, | |
| 'daily_change': 0, | |
| 'holdings': [], | |
| 'risk_metrics': { | |
| 'portfolio_return': 0, | |
| 'portfolio_volatility': 0, | |
| 'sharpe_ratio': 0 | |
| } | |
| } | |
| }) | |
| # Calculate portfolio metrics | |
| portfolio_data = [] | |
| total_value = 0 | |
| total_daily_change = 0 | |
| for holding in holdings: | |
| ticker = holding['ticker'] | |
| shares = holding['shares'] | |
| try: | |
| stock_data = get_stock_data(ticker, period="2d") | |
| if stock_data and not stock_data['history'].empty: | |
| current_price = float(stock_data['history']['Close'].iloc[-1]) | |
| prev_price = float(stock_data['history']['Close'].iloc[-2]) | |
| value = current_price * shares | |
| daily_change = ((current_price - prev_price) / prev_price) * 100 | |
| total_value += value | |
| total_daily_change += (daily_change * value) | |
| portfolio_data.append({ | |
| 'ticker': ticker, | |
| 'shares': shares, | |
| 'current_price': current_price, | |
| 'value': value, | |
| 'daily_change': daily_change | |
| }) | |
| except Exception as e: | |
| print(f"Error calculating portfolio data for {ticker}: {str(e)}") | |
| # Calculate portfolio risk metrics | |
| risk_metrics = { | |
| 'portfolio_return': 0, | |
| 'portfolio_volatility': 0, | |
| 'sharpe_ratio': 0 | |
| } | |
| if portfolio_data: | |
| returns = [] | |
| weights = [] | |
| for holding in portfolio_data: | |
| stock_data = get_stock_data(holding['ticker'], period="1mo") | |
| if stock_data and not stock_data['history'].empty: | |
| returns.append(stock_data['history']['Close'].pct_change().dropna()) | |
| weights.append(holding['value'] / total_value) | |
| if returns and weights: | |
| returns_df = pd.concat(returns, axis=1) | |
| portfolio_return = np.sum(returns_df.mean() * weights) * 252 | |
| portfolio_volatility = np.sqrt(np.dot(weights, np.dot(returns_df.cov() * 252, weights))) | |
| sharpe_ratio = portfolio_return / portfolio_volatility if portfolio_volatility != 0 else 0 | |
| risk_metrics = { | |
| 'portfolio_return': float(portfolio_return), | |
| 'portfolio_volatility': float(portfolio_volatility), | |
| 'sharpe_ratio': float(sharpe_ratio) | |
| } | |
| return jsonify({ | |
| 'status': 'success', | |
| 'data': { | |
| 'total_value': total_value, | |
| 'daily_change': total_daily_change / total_value if total_value > 0 else 0, | |
| 'holdings': portfolio_data, | |
| 'risk_metrics': risk_metrics | |
| } | |
| }) | |
| # Storage for user data (in-memory for demonstration) | |
| price_alerts = {} | |
| user_portfolios = {} | |
| # API Routes | |
| def index(): | |
| return render_template('index.html') | |
| def dashboard(): | |
| return render_template('user_dashboard.html') | |
| def advanced_dashboard(): | |
| return render_template('advanced_dashboard.html') | |
| def api_news(): | |
| try: | |
| count = int(request.args.get('count', BATCH_SIZE)) | |
| news = get_multiple_news("stock market", count=count) | |
| return jsonify({"status": "success", "data": news}) | |
| except Exception as e: | |
| return jsonify({"status": "error", "message": str(e)}) | |
| def api_sectors(): | |
| try: | |
| days = int(request.args.get('days', 7)) | |
| sectors_data = {} | |
| for sector, tickers in SECTORS.items(): | |
| sector_data = get_sector_performance(tickers, days=days) | |
| if sector_data['top_performers']: # Only include sectors with data | |
| sectors_data[sector] = sector_data | |
| filter_booming = request.args.get('booming', 'false').lower() == 'true' | |
| if filter_booming: | |
| sectors_data = {k: v for k, v in sectors_data.items() if v.get('is_booming', False)} | |
| return jsonify({"status": "success", "data": sectors_data}) | |
| except Exception as e: | |
| print(f"Error in api_sectors: {str(e)}") # Add debug print | |
| return jsonify({"status": "error", "message": str(e)}) | |
| def api_stock_detail(ticker): | |
| try: | |
| print(f"\nFetching details for {ticker}...") | |
| # Find sector | |
| sector = next((s for s, tickers in SECTORS.items() if ticker in tickers), "Unknown") | |
| print(f"Sector identified: {sector}") | |
| # Get stock data | |
| print(f"Fetching stock data for {ticker}...") | |
| stock_data = get_stock_data(ticker, period="1mo") | |
| if not stock_data: | |
| print(f"No stock data returned for {ticker}") | |
| return jsonify({ | |
| "status": "error", | |
| "message": f"Unable to fetch data for {ticker}. Please try again later." | |
| }) | |
| hist = stock_data['history'] | |
| info = stock_data['info'] | |
| if hist.empty: | |
| print(f"Empty history data for {ticker}") | |
| return jsonify({ | |
| "status": "error", | |
| "message": f"No historical data available for {ticker}" | |
| }) | |
| print(f"Calculating technical indicators for {ticker}...") | |
| try: | |
| tech_data = calculate_technical_indicators(hist) | |
| tech_summary = get_technical_analysis_summary(tech_data) | |
| except Exception as e: | |
| print(f"Error calculating technical indicators: {str(e)}") | |
| tech_data = hist | |
| tech_summary = { | |
| "indicators": { | |
| "RSI": {"value": 0, "signal": "Neutral"}, | |
| "MACD": {"value": 0, "signal": "Neutral"}, | |
| "Moving_Averages": { | |
| "SMA_20": 0, | |
| "SMA_50": 0, | |
| "SMA_200": 0, | |
| "trend": "Neutral" | |
| }, | |
| "Bollinger_Bands": { | |
| "upper": 0, | |
| "middle": 0, | |
| "lower": 0, | |
| "position": "Middle" | |
| } | |
| }, | |
| "analysis": ["Technical analysis currently unavailable"] | |
| } | |
| print(f"Generating insight for {ticker}...") | |
| insight = generate_stock_insight(ticker, sector) | |
| print(f"Fetching news for {ticker}...") | |
| try: | |
| related_news = get_multiple_news(f"{ticker} stock", count=3) | |
| except Exception as e: | |
| print(f"Error fetching news: {str(e)}") | |
| related_news = [] | |
| # Prepare the response data | |
| response_data = { | |
| "ticker": str(ticker), | |
| "name": str(info.get('shortName', ticker)), | |
| "sector": str(sector), | |
| "price": float(hist['Close'].iloc[-1]), | |
| "marketCap": int(info.get('marketCap', 0)), | |
| "peRatio": float(info.get('trailingPE', 0)) if info.get('trailingPE') else None, | |
| "dividend": float(info.get('dividendYield', 0)) if info.get('dividendYield') else None, | |
| "chartData": { | |
| 'dates': [d.strftime('%Y-%m-%d') for d in hist.index], | |
| 'prices': [float(p) for p in hist['Close'].values], | |
| 'volume': [float(v) for v in hist['Volume'].values], | |
| 'technical_indicators': { | |
| 'rsi': [float(r) if not pd.isna(r) else None for r in tech_data['RSI'].values] if 'RSI' in tech_data else [], | |
| 'macd': [float(m) if not pd.isna(m) else None for m in tech_data['MACD'].values] if 'MACD' in tech_data else [], | |
| 'signal_line': [float(s) if not pd.isna(s) else None for s in tech_data['Signal_Line'].values] if 'Signal_Line' in tech_data else [], | |
| 'sma_20': [float(s) if not pd.isna(s) else None for s in tech_data['SMA_20'].values] if 'SMA_20' in tech_data else [], | |
| 'sma_50': [float(s) if not pd.isna(s) else None for s in tech_data['SMA_50'].values] if 'SMA_50' in tech_data else [], | |
| 'sma_200': [float(s) if not pd.isna(s) else None for s in tech_data['SMA_200'].values] if 'SMA_200' in tech_data else [], | |
| 'bb_upper': [float(b) if not pd.isna(b) else None for b in tech_data['BB_upper'].values] if 'BB_upper' in tech_data else [], | |
| 'bb_middle': [float(b) if not pd.isna(b) else None for b in tech_data['BB_middle'].values] if 'BB_middle' in tech_data else [], | |
| 'bb_lower': [float(b) if not pd.isna(b) else None for b in tech_data['BB_lower'].values] if 'BB_lower' in tech_data else [] | |
| } | |
| }, | |
| "technical_analysis": tech_summary, | |
| "insight": insight, | |
| "news": related_news | |
| } | |
| print(f"Successfully prepared response for {ticker}") | |
| return jsonify({"status": "success", "data": response_data}) | |
| except Exception as e: | |
| print(f"Error in api_stock_detail for {ticker}: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return jsonify({ | |
| "status": "error", | |
| "message": f"An error occurred while fetching stock details: {str(e)}" | |
| }) | |
| def api_pattern_analysis(ticker): | |
| try: | |
| # Get stock data | |
| stock_data = get_stock_data(ticker, period="6mo") | |
| if not stock_data or stock_data['history'].empty: | |
| return jsonify({"status": "error", "message": "No data available"}) | |
| df = stock_data['history'] | |
| # Simple pattern detection based on price action | |
| patterns = {} | |
| # Detect basic patterns | |
| for i in range(2, len(df)): | |
| # Bullish patterns | |
| if (df['Close'].iloc[i] > df['Close'].iloc[i-1] > df['Close'].iloc[i-2] and | |
| df['Volume'].iloc[i] > df['Volume'].iloc[i-1]): | |
| patterns[df.index[i].strftime('%Y-%m-%d')] = { | |
| 'name': 'Bullish Trend', | |
| 'signal': 1 | |
| } | |
| # Bearish patterns | |
| elif (df['Close'].iloc[i] < df['Close'].iloc[i-1] < df['Close'].iloc[i-2] and | |
| df['Volume'].iloc[i] > df['Volume'].iloc[i-1]): | |
| patterns[df.index[i].strftime('%Y-%m-%d')] = { | |
| 'name': 'Bearish Trend', | |
| 'signal': -1 | |
| } | |
| # Find support and resistance levels | |
| def find_support_resistance(data, window=20): | |
| highs = data['High'].rolling(window=window, center=True).max() | |
| lows = data['Low'].rolling(window=window, center=True).min() | |
| resistance_levels = highs[highs == data['High']].unique()[-3:] | |
| support_levels = lows[lows == data['Low']].unique()[:3] | |
| return { | |
| 'support': support_levels.tolist(), | |
| 'resistance': resistance_levels.tolist() | |
| } | |
| # Calculate volume analysis | |
| volume_mean = df['Volume'].mean() | |
| volume_std = df['Volume'].std() | |
| unusual_volume = df[df['Volume'] > volume_mean + 2*volume_std] | |
| # Calculate risk metrics | |
| returns = df['Close'].pct_change() | |
| risk_metrics = { | |
| 'volatility': returns.std() * np.sqrt(252), # Annualized volatility | |
| 'sharpe_ratio': (returns.mean() * 252) / (returns.std() * np.sqrt(252)), # Assuming risk-free rate of 0 | |
| 'max_drawdown': (df['Close'] / df['Close'].expanding().max() - 1).min() | |
| } | |
| response = { | |
| 'status': 'success', | |
| 'data': { | |
| 'patterns': patterns, | |
| 'support_resistance': find_support_resistance(df), | |
| 'volume_analysis': { | |
| 'average_volume': int(volume_mean), | |
| 'unusual_volume_dates': unusual_volume.index.strftime('%Y-%m-%d').tolist(), | |
| 'unusual_volume_values': unusual_volume['Volume'].tolist() | |
| }, | |
| 'risk_metrics': {k: float(v) for k, v in risk_metrics.items()} | |
| } | |
| } | |
| return jsonify(response) | |
| except Exception as e: | |
| return jsonify({'status': 'error', 'message': str(e)}) | |
| def api_correlation_analysis(): | |
| try: | |
| sectors = request.args.get('sectors', 'Technology').split(',') | |
| tickers = [] | |
| for sector in sectors: | |
| if sector in SECTORS: | |
| tickers.extend(SECTORS[sector][:5]) # Take top 5 from each sector | |
| # Get closing prices for all tickers | |
| prices_dict = {} | |
| for ticker in tickers: | |
| stock_data = get_stock_data(ticker, period="1mo") | |
| if stock_data and not stock_data['history'].empty: | |
| prices_dict[ticker] = stock_data['history']['Close'] | |
| # Create correlation matrix | |
| df = pd.DataFrame(prices_dict) | |
| corr_matrix = df.corr() | |
| # Convert to list of correlations | |
| correlations = [] | |
| for i in range(len(corr_matrix.columns)): | |
| for j in range(i+1, len(corr_matrix.columns)): | |
| correlations.append({ | |
| 'stock1': corr_matrix.columns[i], | |
| 'stock2': corr_matrix.columns[j], | |
| 'correlation': float(corr_matrix.iloc[i, j]) | |
| }) | |
| return jsonify({ | |
| 'status': 'success', | |
| 'data': { | |
| 'correlations': correlations, | |
| 'matrix': corr_matrix.to_dict() | |
| } | |
| }) | |
| except Exception as e: | |
| return jsonify({'status': 'error', 'message': str(e)}) | |
| def api_sentiment_analysis(ticker): | |
| try: | |
| # Get news articles | |
| news = get_multiple_news(f"{ticker} stock", count=10) | |
| # Analyze sentiment for each article | |
| sentiments = [] | |
| for article in news: | |
| blob = TextBlob(article['content']) | |
| sentiment = blob.sentiment | |
| sentiments.append({ | |
| 'title': article['title'], | |
| 'polarity': float(sentiment.polarity), | |
| 'subjectivity': float(sentiment.subjectivity), | |
| 'date': article['publishedAt'] | |
| }) | |
| # Calculate average sentiment | |
| avg_sentiment = sum(s['polarity'] for s in sentiments) / len(sentiments) if sentiments else 0 | |
| return jsonify({ | |
| 'status': 'success', | |
| 'data': { | |
| 'articles': sentiments, | |
| 'average_sentiment': avg_sentiment, | |
| 'sentiment_label': 'Positive' if avg_sentiment > 0.1 else 'Negative' if avg_sentiment < -0.1 else 'Neutral' | |
| } | |
| }) | |
| except Exception as e: | |
| return jsonify({'status': 'error', 'message': str(e)}) | |
| if __name__ == '__main__': | |
| port = int(os.environ.get('PORT', 5000)) | |
| app.run(host='0.0.0.0', port=port, debug=True) |