Spaces:
Sleeping
Sleeping
| import time | |
| import json | |
| import random | |
| import datetime | |
| import requests | |
| import numpy as np | |
| import yfinance as yf | |
| import faiss | |
| from sentence_transformers import SentenceTransformer | |
| from google import genai | |
| # ============================================================ | |
| # 1. SETUP: Load Historical Data and Build FAISS Index | |
| # ============================================================ | |
| # Load historical news data (including reactions) and embeddings | |
| HISTORICAL_NEWS_FILE = 'historical_financial_news_with_reactions.json' | |
| EMBEDDINGS_FILE = 'historical_news_embeddings.npy' | |
| with open(HISTORICAL_NEWS_FILE, 'r') as f: | |
| historical_news = json.load(f) | |
| embeddings = np.load(EMBEDDINGS_FILE).astype('float32') | |
| # Initialize the SentenceTransformer (adjust model if needed) | |
| embed_model = SentenceTransformer("all-MiniLM-L6-v2") | |
| # Create FAISS index using L2 distance | |
| dimension = embeddings.shape[1] | |
| faiss_index = faiss.IndexFlatL2(dimension) | |
| faiss_index.add(embeddings) | |
| print(f"[INFO] FAISS index contains {faiss_index.ntotal} vectors.") | |
| # ============================================================ | |
| # 2. UTILITY FUNCTIONS | |
| # ============================================================ | |
| def get_yesterdays_market_data(ticker): | |
| """ | |
| Fetch market data for 'yesterday' (based on current date - 1 day) | |
| using yfinance. Adjust the date range as needed. | |
| """ | |
| yesterday = datetime.datetime.now() - datetime.timedelta(days=1) | |
| start_date = yesterday.strftime("%Y-%m-%d") | |
| # Since yfinance uses exclusive end date, we add 1 day: | |
| end_date = (yesterday + datetime.timedelta(days=1)).strftime("%Y-%m-%d") | |
| data = yf.download(ticker, start=start_date, end=end_date) | |
| # Extract key info. If no data is found, return defaults | |
| if not data.empty: | |
| close = data['Close'].iloc[-1] | |
| previous_close = data['Close'].iloc[0] | |
| market_cap = yf.Ticker(ticker).info.get("marketCap", "N/A") | |
| else: | |
| close, previous_close, market_cap = "N/A", "N/A", "N/A" | |
| market_data_text = ( | |
| f"Current market data for {ticker} (as of yesterday):\n" | |
| f"- Closing Price: {close}\n" | |
| f"- Previous Close: {previous_close}\n" | |
| f"- Market Cap: {market_cap}\n" | |
| ) | |
| return market_data_text | |
| def generate_rag_prompt_with_market(new_article, similar_articles, market_data_text): | |
| """ | |
| Build a RAG prompt that includes the new article, its ticker market data, | |
| and similar historical articles with their reactions. | |
| """ | |
| prompt = f"""New Financial News Article: | |
| Title: {new_article['title']} | |
| Content: {new_article['content']} | |
| Ticker: {new_article.get('ticker', 'N/A')} | |
| {market_data_text} | |
| Historical Context:""" | |
| for idx, art in enumerate(similar_articles, 1): | |
| prompt += f"""\n\n{idx}. Title: {art['title']} | |
| Reaction: Price Change = {art['reaction']['price_change']}%, Volume Spike = {art['reaction']['volume_spike']}""" | |
| prompt += """ | |
| Based on the above information, analyze and explain: | |
| - How the market might react to this new article. | |
| - Whether the recommendation is to BUY, HOLD, or SELL. | |
| - Provide a brief justification referencing historical patterns and current market conditions. | |
| """ | |
| return prompt | |
| def analyze_article_with_agent(new_article, top_k=5): | |
| """ | |
| Pipeline: | |
| 1. Embed the new article. | |
| 2. Retrieve similar historical articles via FAISS. | |
| 3. Get yesterday's market data. | |
| 4. Construct a prompt (RAG) and query Gemini. | |
| Returns Gemini's recommendation text and the prompt used. | |
| """ | |
| # 1. Embed the new article | |
| new_text = f"{new_article['title']}. {new_article['content']}" | |
| new_embedding = embed_model.encode(new_text) | |
| new_embedding = np.array([new_embedding]).astype('float32') | |
| # 2. Retrieve similar historical articles (top_k) | |
| distances, indices = faiss_index.search(new_embedding, top_k) | |
| similar_articles = [historical_news[i] for i in indices[0]] | |
| # 3. Fetch yesterday's market data (using the ticker in the new article) | |
| market_data_text = get_yesterdays_market_data(new_article['ticker']) | |
| # 4. Build the RAG prompt | |
| rag_prompt = generate_rag_prompt_with_market(new_article, similar_articles, market_data_text) | |
| # 5. Query Gemini for a recommendation | |
| # (Make sure you have set your API key in the client below) | |
| client = genai.Client(api_key="YOUR_GEMINI_API_KEY") # Replace with your real API key | |
| response = client.models.generate_content( | |
| model="gemini-1.5-flash", # Adjust model version if needed | |
| contents=[{"role": "user", "parts": [{"text": rag_prompt}]}] | |
| ) | |
| return response.text, rag_prompt | |
| def fetch_latest_news(ticker="TSLA"): | |
| """ | |
| Fetch the most recent news article for the given ticker using NewsAPI. | |
| Make sure to replace NEWS_API_KEY with your own key. | |
| """ | |
| NEWS_API_KEY = "YOUR_NEWSAPI_KEY" # Replace with your NewsAPI key | |
| url = ( | |
| f"https://newsapi.org/v2/everything?" | |
| f"q={ticker}&" | |
| f"apiKey={NEWS_API_KEY}&" | |
| f"sortBy=publishedAt&" | |
| f"language=en" | |
| ) | |
| response = requests.get(url) | |
| data = response.json() | |
| if data.get("status") == "ok" and data.get("totalResults", 0) > 0: | |
| article = data["articles"][0] # Take the most recent article | |
| return { | |
| "title": article["title"], | |
| "content": article["content"] or article["description"], | |
| "ticker": ticker | |
| } | |
| else: | |
| print("[WARN] No news articles found.") | |
| return None | |
| def parse_recommendation(recommendation_text): | |
| """ | |
| Simple parser to extract the recommendation (BUY/HOLD/SELL) from Gemini output. | |
| """ | |
| rec = "HOLD" | |
| text_upper = recommendation_text.upper() | |
| if "BUY" in text_upper: | |
| rec = "BUY" | |
| elif "SELL" in text_upper: | |
| rec = "SELL" | |
| return rec | |
| def send_alert(message): | |
| """ | |
| Placeholder function to send alerts. | |
| Here we simply print the alert, but you can integrate email/Slack/SMS APIs. | |
| """ | |
| print("\n=== ALERT ===") | |
| print(message) | |
| print("=== END ALERT ===\n") | |
| # ============================================================ | |
| # 3. Agent Class for One-Time Daily Processing | |
| # ============================================================ | |
| class StockRecommendationAgent: | |
| def __init__(self, ticker="TSLA"): | |
| self.ticker = ticker | |
| def fetch_latest_news(self): | |
| """ | |
| Fetch the latest news using the NewsAPI. | |
| """ | |
| news_article = fetch_latest_news(self.ticker) | |
| if news_article: | |
| print(f"[INFO] Fetched article: {news_article['title']}") | |
| return news_article | |
| def process(self): | |
| """ | |
| Process one iteration of the pipeline: | |
| - Fetch latest news. | |
| - Process the article with the RAG pipeline. | |
| - Parse the recommendation and take an action. | |
| """ | |
| new_article = self.fetch_latest_news() | |
| if not new_article: | |
| print("[INFO] No news available. Skipping processing.") | |
| return | |
| print(f"[INFO] Processing article: {new_article['title']}") | |
| recommendation_text, rag_prompt = analyze_article_with_agent(new_article) | |
| # Parse the recommendation decision | |
| decision = parse_recommendation(recommendation_text) | |
| action_message = ( | |
| f"Latest news '{new_article['title']}' for ticker {self.ticker} analysis:\n" | |
| f"Decision: {decision}\n" | |
| f"Details: {recommendation_text}\n" | |
| ) | |
| send_alert(action_message) | |
| # Optionally, you can log the prompt if needed: | |
| print("\n[DEBUG] RAG Prompt Used:\n", rag_prompt) | |
| # ============================================================ | |
| # 4. MAIN: Run the Agent Once per Day in the Morning | |
| # ============================================================ | |
| def main(): | |
| """ | |
| This main function is intended to be run once per day (e.g., in the morning). | |
| You might schedule this script using an OS scheduler so that it runs automatically. | |
| """ | |
| print("[INFO] Starting the Stock Recommendation Agent...") | |
| agent = StockRecommendationAgent(ticker="TSLA") | |
| agent.process() | |
| print("[INFO] Processing complete.") | |
| if __name__ == '__main__': | |
| main() | |