Spaces:
Sleeping
Sleeping
| from flask import Flask, render_template, request, jsonify | |
| import time | |
| import json | |
| import random | |
| import datetime | |
| import requests | |
| import numpy as np | |
| import yfinance as yf | |
| import faiss | |
| from sentence_transformers import SentenceTransformer | |
| from google import genai | |
| from google.genai import types | |
| from flask_caching import Cache | |
| app = Flask(__name__) | |
| cache = Cache() | |
| # ============================================================ | |
| # 1. SETUP: Load Historical Data and Build FAISS Index | |
| # ============================================================ | |
| # Load historical news data (including reactions) and embeddings | |
| HISTORICAL_NEWS_FILE = 'historical_financial_news_with_reactions.json' | |
| EMBEDDINGS_FILE = 'historical_news_embeddings.npy' | |
| try: | |
| with open(HISTORICAL_NEWS_FILE, 'r') as f: | |
| historical_news = json.load(f) | |
| embeddings = np.load(EMBEDDINGS_FILE).astype('float32') | |
| # Initialize the SentenceTransformer (adjust model if needed) | |
| embed_model = SentenceTransformer("all-MiniLM-L6-v2") | |
| # Create FAISS index using L2 distance | |
| dimension = embeddings.shape[1] | |
| faiss_index = faiss.IndexFlatL2(dimension) | |
| faiss_index.add(embeddings) | |
| print(f"[INFO] FAISS index contains {faiss_index.ntotal} vectors.") | |
| index_loaded = True | |
| except Exception as e: | |
| print(f"[ERROR] Failed to load index: {str(e)}") | |
| index_loaded = False | |
| historical_news = [] | |
| # ============================================================ | |
| # 2. UTILITY FUNCTIONS | |
| # ============================================================ | |
| def get_last_n_days_market_data(ticker, n_days=10): | |
| """ | |
| Fetches market data for the last n_days for the given ticker using yfinance. | |
| Returns a string with the date, closing price, and volume for each day. | |
| """ | |
| today = datetime.datetime.now() | |
| start_date = (today - datetime.timedelta(days=n_days)).strftime("%Y-%m-%d") | |
| # yfinance end date is exclusive; adding one day to include the most recent available data | |
| end_date = (today + datetime.timedelta(days=1)).strftime("%Y-%m-%d") | |
| data = yf.download(ticker, start=start_date, end=end_date) | |
| if data.empty: | |
| return "No market data available for the last {} days.".format(n_days), None | |
| market_data_output = f"Market data for the last {n_days} days for {ticker}:\n" | |
| for date, row in data.iterrows(): | |
| market_data_output += f"{date.strftime('%Y-%m-%d')}: Close = {row['Close']}, Volume = {row['Volume']}\n" | |
| return market_data_output, data | |
| def get_historical_chart_data(ticker, period="6mo"): | |
| """ | |
| Fetches historical stock data for charts | |
| """ | |
| data = yf.download(ticker, period=period) | |
| if data.empty: | |
| return None | |
| chart_data = { | |
| 'dates': data.index.strftime('%Y-%m-%d').tolist(), | |
| 'prices': data['Close'].tolist(), | |
| 'volumes': data['Volume'].tolist() | |
| } | |
| return chart_data | |
| def generate_rag_prompt_with_market(new_article, similar_articles, market_data_text): | |
| """ | |
| Build a RAG prompt including the new article details, | |
| detailed market data (last 10 days), and similar historical articles. | |
| """ | |
| prompt = f"""New Financial News Article: | |
| Title: {new_article['title']} | |
| Content: {new_article['content']} | |
| Ticker: {new_article.get('ticker', 'N/A')} | |
| {market_data_text} | |
| Historical Context:""" | |
| for idx, art in enumerate(similar_articles, 1): | |
| prompt += f"""\n\n{idx}. Title: {art['title']} | |
| Reaction: Price Change = {art['reaction']['price_change']}%, Volume Spike = {art['reaction']['volume_spike']}""" | |
| prompt += """ | |
| Based on the above information, analyze and explain: | |
| - How the market might react to this new article. | |
| - Whether the recommendation is to BUY, HOLD, or SELL. | |
| - Provide a brief justification referencing historical patterns and current market conditions. | |
| """ | |
| return prompt | |
| def analyze_article_with_agent(new_article, top_k=5): | |
| """ | |
| Pipeline: | |
| 1. Embed the new article. | |
| 2. Retrieve similar historical articles via FAISS. | |
| 3. Get the last 10 days of market data. | |
| 4. Construct a prompt (RAG) and query Gemini. | |
| Returns Gemini's recommendation text and the prompt used. | |
| """ | |
| # 1. Embed the new article | |
| new_text = f"{new_article['title']}. {new_article['content']}" | |
| new_embedding = embed_model.encode(new_text) | |
| new_embedding = np.array([new_embedding]).astype('float32') | |
| # 2. Retrieve similar historical articles (top_k) | |
| distances, indices = faiss_index.search(new_embedding, top_k) | |
| similar_articles = [historical_news[i] for i in indices[0]] | |
| # 3. Fetch market data for the last 10 days using the ticker in the new article | |
| market_data_text, _ = get_last_n_days_market_data(new_article['ticker'], n_days=10) | |
| # 4. Build the RAG prompt | |
| rag_prompt = generate_rag_prompt_with_market(new_article, similar_articles, market_data_text) | |
| # 5. Query Gemini for a recommendation | |
| client = genai.Client(api_key="AIzaSyBRzLEPudJzVNDQpckpxhbkA3aF9Inr5v0") # Replace with your real API key | |
| response = client.models.generate_content( | |
| model="gemini-2.0-flash", # Adjust model version if needed | |
| contents=[{"role": "user", "parts": [{"text": rag_prompt}]}] | |
| ) | |
| return response.text, rag_prompt, similar_articles | |
| def get_yesterdays_market_data(ticker): | |
| """ | |
| Fetch market data for 'yesterday' (based on current date - 1 day) | |
| using yfinance. Adjust the date range as needed. | |
| """ | |
| yesterday = datetime.datetime.now() - datetime.timedelta(days=1) | |
| start_date = yesterday.strftime("%Y-%m-%d") | |
| # Since yfinance uses exclusive end date, we add 1 day: | |
| end_date = (yesterday + datetime.timedelta(days=1)).strftime("%Y-%m-%d") | |
| data = yf.download(ticker, start=start_date, end=end_date) | |
| # Extract key info. If no data is found, return defaults | |
| if not data.empty: | |
| close = data['Close'].iloc[-1] | |
| previous_close = data['Close'].iloc[0] | |
| market_cap = yf.Ticker(ticker).info.get("marketCap", "N/A") | |
| else: | |
| close, previous_close, market_cap = "N/A", "N/A", "N/A" | |
| market_data_text = ( | |
| f"Current market data for {ticker} (as of yesterday):\n" | |
| f"- Closing Price: {close}\n" | |
| f"- Previous Close: {previous_close}\n" | |
| f"- Market Cap: {market_cap}\n" | |
| ) | |
| return market_data_text, close, previous_close, market_cap | |
| def fetch_latest_news(ticker="TSLA"): | |
| """ | |
| Fetch the most recent news article for the given ticker using NewsAPI. | |
| Make sure to replace NEWS_API_KEY with your own key. | |
| """ | |
| NEWS_API_KEY = "8df6cf4cbc4c42cdb7fe0d2ee438ccd5" # Replace with your NewsAPI key | |
| url = ( | |
| f"https://newsapi.org/v2/everything?" | |
| f"q={ticker}&" | |
| f"apiKey={NEWS_API_KEY}&" | |
| f"sortBy=publishedAt&" | |
| f"language=en" | |
| ) | |
| response = requests.get(url) | |
| data = response.json() | |
| if data.get("status") == "ok" and data.get("totalResults", 0) > 0: | |
| article = data["articles"][0] # Take the most recent article | |
| return { | |
| "title": article["title"], | |
| "content": article["content"] or article["description"], | |
| "ticker": ticker, | |
| "url": article.get("url", ""), | |
| "source": article.get("source", {}).get("name", "Unknown"), | |
| "publishedAt": article.get("publishedAt", "") | |
| } | |
| else: | |
| print("[WARN] No news articles found.") | |
| return None | |
| def parse_recommendation(recommendation_text): | |
| """ | |
| Simple parser to extract the recommendation (BUY/HOLD/SELL) from Gemini output. | |
| """ | |
| text_upper = recommendation_text.upper() | |
| if "BUY" in text_upper: | |
| return "BUY" | |
| elif "SELL" in text_upper: | |
| return "SELL" | |
| else: | |
| return "HOLD" | |
| def get_multiple_news(query, count=5): | |
| """ | |
| Fetch multiple news articles for the given query | |
| """ | |
| NEWS_API_KEY = "8df6cf4cbc4c42cdb7fe0d2ee438ccd5" # Replace with your NewsAPI key | |
| url = ( | |
| f"https://newsapi.org/v2/everything?" | |
| f"q={query}&" | |
| f"apiKey={NEWS_API_KEY}&" | |
| f"sortBy=publishedAt&" | |
| f"language=en&" | |
| f"pageSize={count}" | |
| ) | |
| try: | |
| response = requests.get(url, timeout=10) # Add timeout | |
| response.raise_for_status() # Raise an error for bad status codes | |
| data = response.json() | |
| if data.get("status") == "ok" and data.get("totalResults", 0) > 0: | |
| articles = [] | |
| for article in data["articles"][:count]: | |
| # Validate and clean each article | |
| if article.get("title") and (article.get("content") or article.get("description")): | |
| articles.append({ | |
| "title": article["title"], | |
| "content": article.get("content") or article.get("description", ""), | |
| "url": article.get("url", ""), | |
| "source": article.get("source", {}).get("name", "Unknown"), | |
| "publishedAt": article.get("publishedAt", "") | |
| }) | |
| return articles | |
| else: | |
| print(f"NewsAPI returned no results for query: {query}") | |
| return [] | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching news for query {query}: {str(e)}") | |
| return [] | |
| except Exception as e: | |
| print(f"Unexpected error fetching news for query {query}: {str(e)}") | |
| return [] | |
| # ============================================================ | |
| # 3. FLASK ROUTES | |
| # ============================================================ | |
| def index(): | |
| """Main dashboard page""" | |
| return render_template('index.html') | |
| def analyze(): | |
| """Analyze a stock ticker and return recommendation""" | |
| ticker = request.form.get('ticker', 'TSLA').upper() | |
| # Fetch news | |
| article = fetch_latest_news(ticker) | |
| if not article: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': f'No recent news found for {ticker}' | |
| }) | |
| try: | |
| # Get market data | |
| market_data_text, last_close, prev_close, market_cap = get_yesterdays_market_data(ticker) | |
| # Get chart data | |
| chart_data = get_historical_chart_data(ticker) | |
| # Get recommendation | |
| if index_loaded: | |
| recommendation_text, rag_prompt, similar_articles = analyze_article_with_agent(article) | |
| decision = parse_recommendation(recommendation_text) | |
| else: | |
| recommendation_text = "Cannot generate recommendation: historical data not loaded." | |
| rag_prompt = "N/A" | |
| decision = "N/A" | |
| similar_articles = [] | |
| # Get more news articles | |
| recent_news = get_multiple_news(ticker, 5) | |
| return jsonify({ | |
| 'status': 'success', | |
| 'ticker': ticker, | |
| 'article': article, | |
| 'recommendation': { | |
| 'text': recommendation_text, | |
| 'decision': decision | |
| }, | |
| 'market_data': { | |
| 'last_close': last_close, | |
| 'prev_close': prev_close, | |
| 'market_cap': market_cap | |
| }, | |
| 'chart_data': chart_data, | |
| 'recent_news': recent_news, | |
| 'similar_historical': similar_articles | |
| }) | |
| except Exception as e: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': f'Error analyzing {ticker}: {str(e)}' | |
| }) | |
| def stock_list(): | |
| """Return a list of popular stocks""" | |
| popular_stocks = [ | |
| {"symbol": "AAPL", "name": "Apple Inc."}, | |
| {"symbol": "MSFT", "name": "Microsoft Corporation"}, | |
| {"symbol": "GOOGL", "name": "Alphabet Inc."}, | |
| {"symbol": "AMZN", "name": "Amazon.com, Inc."}, | |
| {"symbol": "META", "name": "Meta Platforms, Inc."}, | |
| {"symbol": "TSLA", "name": "Tesla, Inc."}, | |
| {"symbol": "NVDA", "name": "NVIDIA Corporation"}, | |
| {"symbol": "JPM", "name": "JPMorgan Chase & Co."}, | |
| {"symbol": "V", "name": "Visa Inc."}, | |
| {"symbol": "WMT", "name": "Walmart Inc."} | |
| ] | |
| return jsonify(popular_stocks) | |
| # Cache for 5 minutes | |
| def get_market_news(): | |
| """Fetch latest market news""" | |
| try: | |
| # First try to get news for a general market query | |
| all_news = [] | |
| try: | |
| # Try to get general market news first | |
| market_news = get_multiple_news("stock market", count=5) | |
| if market_news: | |
| all_news.extend(market_news) | |
| except Exception as e: | |
| print(f"Error fetching general market news: {str(e)}") | |
| # If we don't have enough news, fetch for specific sectors | |
| if len(all_news) < 10: | |
| for sector in SECTORS.keys(): | |
| try: | |
| sector_news = get_multiple_news(sector, count=2) | |
| if sector_news: | |
| all_news.extend(sector_news) | |
| except Exception as e: | |
| print(f"Error fetching news for sector {sector}: {str(e)}") | |
| continue | |
| # Deduplicate news by URL | |
| seen_urls = set() | |
| unique_news = [] | |
| for article in all_news: | |
| if article.get('url') not in seen_urls: | |
| seen_urls.add(article.get('url')) | |
| # Ensure all required fields are present | |
| processed_article = { | |
| 'title': article.get('title', 'No Title'), | |
| 'url': article.get('url', '#'), | |
| 'source': article.get('source', 'Unknown Source'), | |
| 'publishedAt': article.get('publishedAt', datetime.now().isoformat()), | |
| 'summary': article.get('content') or article.get('description', 'No content available') | |
| } | |
| unique_news.append(processed_article) | |
| # Sort by published date and take the 10 most recent | |
| unique_news.sort(key=lambda x: x['publishedAt'], reverse=True) | |
| return jsonify(unique_news[:10]) | |
| except Exception as e: | |
| print(f"Critical error in get_market_news: {str(e)}") | |
| # Return a more informative error response | |
| return jsonify({ | |
| 'error': 'Failed to fetch market news', | |
| 'details': str(e) | |
| }), 500 | |
| if __name__ == '__main__': | |
| app.run(debug=True, port=5001) |