Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| FinGPT-Forecaster Hugging Face Space | |
| Market Forecaster Agent - Predict Stock Movements Direction | |
| """ | |
| import os | |
| import json | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import mplfinance as mpf | |
| from datetime import datetime, timedelta | |
| import finnhub | |
| import yfinance as yf | |
| import streamlit as st | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| # Set page config | |
| st.set_page_config( | |
| page_title="FinGPT-Forecaster", | |
| page_icon="π", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| class MarketForecaster: | |
| def __init__(self): | |
| """Initialize the Market Forecaster""" | |
| self.finnhub_client = None | |
| self.setup_finnhub() | |
| def setup_finnhub(self): | |
| """Setup Finnhub client with API key from environment or use demo mode""" | |
| finnhub_api_key = os.getenv('FINNHUB_API_KEY') | |
| if finnhub_api_key: | |
| try: | |
| self.finnhub_client = finnhub.Client(api_key=finnhub_api_key) | |
| st.success("β Connected to Finnhub API") | |
| except Exception as e: | |
| st.warning(f"β οΈ Finnhub API connection failed: {e}") | |
| self.finnhub_client = None | |
| else: | |
| st.info("βΉοΈ Running in demo mode (no Finnhub API key provided)") | |
| self.finnhub_client = None | |
| def get_company_profile(self, symbol): | |
| """Get company profile from Finnhub or return demo data""" | |
| if self.finnhub_client: | |
| try: | |
| profile = self.finnhub_client.company_profile2(symbol=symbol) | |
| return profile | |
| except Exception as e: | |
| st.warning(f"Error getting company profile: {e}") | |
| # Return demo data | |
| return { | |
| 'name': f'{symbol} Corporation', | |
| 'finnhubIndustry': 'Technology', | |
| 'marketCapitalization': 1000000000, | |
| 'country': 'US', | |
| 'currency': 'USD' | |
| } | |
| def get_company_news(self, symbol, start_date, end_date): | |
| """Get company news from Finnhub or return demo data""" | |
| if self.finnhub_client: | |
| try: | |
| start_ts = int(datetime.strptime(start_date, '%Y-%m-%d').timestamp()) | |
| end_ts = int(datetime.strptime(end_date, '%Y-%m-%d').timestamp()) | |
| news = self.finnhub_client.company_news(symbol, _from=start_ts, to=end_ts) | |
| return news | |
| except Exception as e: | |
| st.warning(f"Error getting news: {e}") | |
| # Return demo news | |
| return [ | |
| { | |
| "headline": f"{symbol} shows strong quarterly performance", | |
| "summary": f"Recent earnings report shows {symbol} exceeding expectations with robust growth in key segments." | |
| }, | |
| { | |
| "headline": f"Market analysts upgrade {symbol} rating", | |
| "summary": f"Several analysts have upgraded their rating for {symbol} citing strong fundamentals and growth prospects." | |
| }, | |
| { | |
| "headline": f"{symbol} announces new strategic initiatives", | |
| "summary": f"Company announces new strategic initiatives aimed at expanding market presence and driving innovation." | |
| } | |
| ] | |
| def get_stock_data(self, symbol, start_date, end_date): | |
| """Get stock price data from Yahoo Finance""" | |
| try: | |
| ticker = yf.Ticker(symbol) | |
| data = ticker.history(start=start_date, end=end_date) | |
| return data | |
| except Exception as e: | |
| st.error(f"Error getting stock data for {symbol}: {e}") | |
| return None | |
| def calculate_rsi(self, prices, window=14): | |
| """Calculate RSI indicator""" | |
| delta = prices.diff() | |
| gain = (delta.where(delta > 0, 0)).rolling(window=window).mean() | |
| loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean() | |
| rs = gain / loss | |
| rsi = 100 - (100 / (1 + rs)) | |
| return rsi | |
| def analyze_stock_movement(self, symbol, days_back=30): | |
| """Analyze stock movement and generate prediction""" | |
| # Get current date and calculate date range | |
| end_date = datetime.now().strftime('%Y-%m-%d') | |
| start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d') | |
| # Get data from various sources | |
| with st.spinner(f"π Fetching data for {symbol}..."): | |
| # Company profile | |
| profile = self.get_company_profile(symbol) | |
| # Recent news | |
| news = self.get_company_news(symbol, start_date, end_date) | |
| # Stock price data | |
| stock_data = self.get_stock_data(symbol, start_date, end_date) | |
| if stock_data is None or stock_data.empty: | |
| st.error(f"β No stock data available for {symbol}") | |
| return None | |
| # Calculate technical indicators | |
| stock_data['SMA_20'] = stock_data['Close'].rolling(window=20).mean() | |
| stock_data['SMA_50'] = stock_data['Close'].rolling(window=50).mean() | |
| stock_data['RSI'] = self.calculate_rsi(stock_data['Close']) | |
| # Recent price performance | |
| recent_close = stock_data['Close'].iloc[-1] | |
| week_ago_close = stock_data['Close'].iloc[-5] if len(stock_data) >= 5 else recent_close | |
| month_ago_close = stock_data['Close'].iloc[-20] if len(stock_data) >= 20 else recent_close | |
| week_change = ((recent_close - week_ago_close) / week_ago_close) * 100 | |
| month_change = ((recent_close - month_ago_close) / month_ago_close) * 100 | |
| # Generate analysis | |
| analysis = self.generate_analysis(symbol, profile, news, stock_data, recent_close, week_change, month_change) | |
| return analysis, stock_data | |
| def generate_analysis(self, symbol, profile, news, stock_data, current_price, week_change, month_change): | |
| """Generate comprehensive analysis and prediction""" | |
| # Analyze news sentiment | |
| positive_factors = [] | |
| negative_factors = [] | |
| if news: | |
| for article in news[:10]: # Analyze top 10 recent news | |
| headline = article.get('headline', '').lower() | |
| summary = article.get('summary', '').lower() | |
| # Simple keyword-based sentiment analysis | |
| positive_keywords = ['growth', 'profit', 'revenue', 'beat', 'exceed', 'strong', 'positive', 'upgrade', 'buy', 'bullish'] | |
| negative_keywords = ['loss', 'decline', 'miss', 'weak', 'negative', 'downgrade', 'sell', 'bearish', 'concern', 'risk'] | |
| pos_score = sum(1 for word in positive_keywords if word in headline or word in summary) | |
| neg_score = sum(1 for word in negative_keywords if word in headline or word in summary) | |
| if pos_score > neg_score: | |
| positive_factors.append(article.get('headline', '')[:100]) | |
| elif neg_score > pos_score: | |
| negative_factors.append(article.get('headline', '')[:100]) | |
| # Technical analysis | |
| recent_rsi = stock_data['RSI'].iloc[-1] if not stock_data['RSI'].isna().iloc[-1] else 50 | |
| sma_20 = stock_data['SMA_20'].iloc[-1] if not stock_data['SMA_20'].isna().iloc[-1] else current_price | |
| sma_50 = stock_data['SMA_50'].iloc[-1] if not stock_data['SMA_50'].isna().iloc[-1] else current_price | |
| # Generate prediction | |
| prediction_score = 0 | |
| # RSI analysis | |
| if recent_rsi < 30: | |
| prediction_score += 2 # Oversold, potential bounce | |
| positive_factors.append("RSI indicates oversold conditions") | |
| elif recent_rsi > 70: | |
| prediction_score -= 2 # Overbought, potential pullback | |
| negative_factors.append("RSI indicates overbought conditions") | |
| # Moving average analysis | |
| if current_price > sma_20 > sma_50: | |
| prediction_score += 1 | |
| positive_factors.append("Price above both 20-day and 50-day moving averages") | |
| elif current_price < sma_20 < sma_50: | |
| prediction_score -= 1 | |
| negative_factors.append("Price below both 20-day and 50-day moving averages") | |
| # Recent performance | |
| if week_change > 2: | |
| prediction_score += 1 | |
| positive_factors.append(f"Strong weekly performance (+{week_change:.1f}%)") | |
| elif week_change < -2: | |
| prediction_score -= 1 | |
| negative_factors.append(f"Weak weekly performance ({week_change:.1f}%)") | |
| # News sentiment | |
| prediction_score += len(positive_factors) * 0.5 | |
| prediction_score -= len(negative_factors) * 0.5 | |
| # Generate prediction | |
| if prediction_score >= 2: | |
| direction = "UP" | |
| confidence = min(abs(prediction_score) * 10, 80) | |
| price_change = f"+{confidence/10:.1f}%" | |
| elif prediction_score <= -2: | |
| direction = "DOWN" | |
| confidence = min(abs(prediction_score) * 10, 80) | |
| price_change = f"-{confidence/10:.1f}%" | |
| else: | |
| direction = "SIDEWAYS" | |
| confidence = 50 | |
| price_change = "Β±1%" | |
| analysis = { | |
| 'symbol': symbol, | |
| 'current_price': current_price, | |
| 'prediction_direction': direction, | |
| 'prediction_change': price_change, | |
| 'confidence': confidence, | |
| 'positive_factors': positive_factors[:4], | |
| 'negative_factors': negative_factors[:4], | |
| 'technical_indicators': { | |
| 'rsi': recent_rsi, | |
| 'sma_20': sma_20, | |
| 'sma_50': sma_50, | |
| 'week_change': week_change, | |
| 'month_change': month_change | |
| }, | |
| 'news_count': len(news) if news else 0, | |
| 'company_name': profile.get('name', 'N/A'), | |
| 'industry': profile.get('finnhubIndustry', 'N/A'), | |
| 'market_cap': profile.get('marketCapitalization', 0) | |
| } | |
| return analysis | |
| def create_chart(symbol, stock_data): | |
| """Create candlestick chart with technical indicators""" | |
| try: | |
| # Prepare data for mplfinance | |
| df = stock_data.copy() | |
| df.index = pd.to_datetime(df.index) | |
| # Create the chart | |
| fig, axes = mpf.plot(df, type='candle', style='charles', | |
| title=f'{symbol} Stock Price Analysis', | |
| ylabel='Price ($)', | |
| volume=True, | |
| mav=(20, 50), | |
| figsize=(12, 8), | |
| returnfig=True) | |
| return fig | |
| except Exception as e: | |
| st.error(f"Error creating chart: {e}") | |
| return None | |
| def main(): | |
| """Main Streamlit app""" | |
| # Header | |
| st.title("π FinGPT-Forecaster") | |
| st.markdown("**AI-Powered Stock Market Prediction System**") | |
| st.markdown("---") | |
| # Sidebar | |
| st.sidebar.header("π§ Configuration") | |
| # Stock symbol input | |
| symbol = st.sidebar.text_input( | |
| "Stock Symbol", | |
| value="AAPL", | |
| help="Enter a stock ticker symbol (e.g., AAPL, MSFT, NVDA)" | |
| ).upper() | |
| # Analysis period | |
| days_back = st.sidebar.slider( | |
| "Analysis Period (days)", | |
| min_value=30, | |
| max_value=365, | |
| value=90, | |
| help="Number of days to look back for analysis" | |
| ) | |
| # API Key input | |
| st.sidebar.subheader("π API Configuration") | |
| finnhub_key = st.sidebar.text_input( | |
| "Finnhub API Key (Optional)", | |
| type="password", | |
| help="Get your free API key from finnhub.io for enhanced data" | |
| ) | |
| if finnhub_key: | |
| os.environ['FINNHUB_API_KEY'] = finnhub_key | |
| # Analyze button | |
| if st.sidebar.button("π Analyze Stock", type="primary"): | |
| if not symbol: | |
| st.error("Please enter a stock symbol") | |
| else: | |
| # Initialize forecaster | |
| forecaster = MarketForecaster() | |
| # Perform analysis | |
| result = forecaster.analyze_stock_movement(symbol, days_back) | |
| if result: | |
| analysis, stock_data = result | |
| # Display results | |
| st.header(f"π Analysis Results for {symbol}") | |
| # Company info | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("Company", analysis['company_name']) | |
| with col2: | |
| st.metric("Industry", analysis['industry']) | |
| with col3: | |
| st.metric("Current Price", f"${analysis['current_price']:.2f}") | |
| with col4: | |
| st.metric("Market Cap", f"${analysis['market_cap']:,.0f}") | |
| # Prediction | |
| st.subheader("π― Prediction") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| direction_color = "π’" if analysis['prediction_direction'] == "UP" else "π΄" if analysis['prediction_direction'] == "DOWN" else "π‘" | |
| st.metric("Direction", f"{direction_color} {analysis['prediction_direction']}") | |
| with col2: | |
| st.metric("Expected Change", analysis['prediction_change']) | |
| with col3: | |
| st.metric("Confidence", f"{analysis['confidence']:.1f}%") | |
| # Technical indicators | |
| st.subheader("π Technical Indicators") | |
| tech = analysis['technical_indicators'] | |
| col1, col2, col3, col4, col5 = st.columns(5) | |
| with col1: | |
| st.metric("RSI", f"{tech['rsi']:.1f}") | |
| with col2: | |
| st.metric("20-day SMA", f"${tech['sma_20']:.2f}") | |
| with col3: | |
| st.metric("50-day SMA", f"${tech['sma_50']:.2f}") | |
| with col4: | |
| st.metric("1-week Change", f"{tech['week_change']:+.2f}%") | |
| with col5: | |
| st.metric("1-month Change", f"{tech['month_change']:+.2f}%") | |
| # Factors | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("β Positive Factors") | |
| if analysis['positive_factors']: | |
| for i, factor in enumerate(analysis['positive_factors'], 1): | |
| st.write(f"{i}. {factor}") | |
| else: | |
| st.write("No significant positive factors identified") | |
| with col2: | |
| st.subheader("β οΈ Potential Concerns") | |
| if analysis['negative_factors']: | |
| for i, factor in enumerate(analysis['negative_factors'], 1): | |
| st.write(f"{i}. {factor}") | |
| else: | |
| st.write("No significant concerns identified") | |
| # Chart | |
| st.subheader("π Price Chart") | |
| fig = create_chart(symbol, stock_data) | |
| if fig: | |
| st.pyplot(fig) | |
| # News summary | |
| st.subheader("π° News Analysis") | |
| st.write(f"Analyzed {analysis['news_count']} recent news articles") | |
| # Footer | |
| st.markdown("---") | |
| st.markdown(""" | |
| <div style='text-align: center; color: #666;'> | |
| <p><strong>Disclaimer:</strong> This analysis is for educational purposes only and should not be considered as financial advice.</p> | |
| <p>Powered by FinGPT-Forecaster | Built with Streamlit</p> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| if __name__ == "__main__": | |
| main() | |