pranit144's picture
Upload 14 files
2997210 verified
import time
import json
import random
import datetime
import requests
import numpy as np
import yfinance as yf
import faiss
from sentence_transformers import SentenceTransformer
from google import genai
# ============================================================
# 1. SETUP: Load Historical Data and Build FAISS Index
# ============================================================
# Load historical news data (including reactions) and embeddings
HISTORICAL_NEWS_FILE = 'historical_financial_news_with_reactions.json'
EMBEDDINGS_FILE = 'historical_news_embeddings.npy'
with open(HISTORICAL_NEWS_FILE, 'r') as f:
historical_news = json.load(f)
embeddings = np.load(EMBEDDINGS_FILE).astype('float32')
# Initialize the SentenceTransformer (adjust model if needed)
embed_model = SentenceTransformer("all-MiniLM-L6-v2")
# Create FAISS index using L2 distance
dimension = embeddings.shape[1]
faiss_index = faiss.IndexFlatL2(dimension)
faiss_index.add(embeddings)
print(f"[INFO] FAISS index contains {faiss_index.ntotal} vectors.")
# ============================================================
# 2. UTILITY FUNCTIONS
# ============================================================
def get_yesterdays_market_data(ticker):
"""
Fetch market data for 'yesterday' (based on current date - 1 day)
using yfinance. Adjust the date range as needed.
"""
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
start_date = yesterday.strftime("%Y-%m-%d")
# Since yfinance uses exclusive end date, we add 1 day:
end_date = (yesterday + datetime.timedelta(days=1)).strftime("%Y-%m-%d")
data = yf.download(ticker, start=start_date, end=end_date)
# Extract key info. If no data is found, return defaults
if not data.empty:
close = data['Close'].iloc[-1]
previous_close = data['Close'].iloc[0]
market_cap = yf.Ticker(ticker).info.get("marketCap", "N/A")
else:
close, previous_close, market_cap = "N/A", "N/A", "N/A"
market_data_text = (
f"Current market data for {ticker} (as of yesterday):\n"
f"- Closing Price: {close}\n"
f"- Previous Close: {previous_close}\n"
f"- Market Cap: {market_cap}\n"
)
return market_data_text
def generate_rag_prompt_with_market(new_article, similar_articles, market_data_text):
"""
Build a RAG prompt that includes the new article, its ticker market data,
and similar historical articles with their reactions.
"""
prompt = f"""New Financial News Article:
Title: {new_article['title']}
Content: {new_article['content']}
Ticker: {new_article.get('ticker', 'N/A')}
{market_data_text}
Historical Context:"""
for idx, art in enumerate(similar_articles, 1):
prompt += f"""\n\n{idx}. Title: {art['title']}
Reaction: Price Change = {art['reaction']['price_change']}%, Volume Spike = {art['reaction']['volume_spike']}"""
prompt += """
Based on the above information, analyze and explain:
- How the market might react to this new article.
- Whether the recommendation is to BUY, HOLD, or SELL.
- Provide a brief justification referencing historical patterns and current market conditions.
"""
return prompt
def analyze_article_with_agent(new_article, top_k=5):
"""
Pipeline:
1. Embed the new article.
2. Retrieve similar historical articles via FAISS.
3. Get yesterday's market data.
4. Construct a prompt (RAG) and query Gemini.
Returns Gemini's recommendation text and the prompt used.
"""
# 1. Embed the new article
new_text = f"{new_article['title']}. {new_article['content']}"
new_embedding = embed_model.encode(new_text)
new_embedding = np.array([new_embedding]).astype('float32')
# 2. Retrieve similar historical articles (top_k)
distances, indices = faiss_index.search(new_embedding, top_k)
similar_articles = [historical_news[i] for i in indices[0]]
# 3. Fetch yesterday's market data (using the ticker in the new article)
market_data_text = get_yesterdays_market_data(new_article['ticker'])
# 4. Build the RAG prompt
rag_prompt = generate_rag_prompt_with_market(new_article, similar_articles, market_data_text)
# 5. Query Gemini for a recommendation
# (Make sure you have set your API key in the client below)
client = genai.Client(api_key="YOUR_GEMINI_API_KEY") # Replace with your real API key
response = client.models.generate_content(
model="gemini-1.5-flash", # Adjust model version if needed
contents=[{"role": "user", "parts": [{"text": rag_prompt}]}]
)
return response.text, rag_prompt
def fetch_latest_news(ticker="TSLA"):
"""
Fetch the most recent news article for the given ticker using NewsAPI.
Make sure to replace NEWS_API_KEY with your own key.
"""
NEWS_API_KEY = "YOUR_NEWSAPI_KEY" # Replace with your NewsAPI key
url = (
f"https://newsapi.org/v2/everything?"
f"q={ticker}&"
f"apiKey={NEWS_API_KEY}&"
f"sortBy=publishedAt&"
f"language=en"
)
response = requests.get(url)
data = response.json()
if data.get("status") == "ok" and data.get("totalResults", 0) > 0:
article = data["articles"][0] # Take the most recent article
return {
"title": article["title"],
"content": article["content"] or article["description"],
"ticker": ticker
}
else:
print("[WARN] No news articles found.")
return None
def parse_recommendation(recommendation_text):
"""
Simple parser to extract the recommendation (BUY/HOLD/SELL) from Gemini output.
"""
rec = "HOLD"
text_upper = recommendation_text.upper()
if "BUY" in text_upper:
rec = "BUY"
elif "SELL" in text_upper:
rec = "SELL"
return rec
def send_alert(message):
"""
Placeholder function to send alerts.
Here we simply print the alert, but you can integrate email/Slack/SMS APIs.
"""
print("\n=== ALERT ===")
print(message)
print("=== END ALERT ===\n")
# ============================================================
# 3. Agent Class for One-Time Daily Processing
# ============================================================
class StockRecommendationAgent:
def __init__(self, ticker="TSLA"):
self.ticker = ticker
def fetch_latest_news(self):
"""
Fetch the latest news using the NewsAPI.
"""
news_article = fetch_latest_news(self.ticker)
if news_article:
print(f"[INFO] Fetched article: {news_article['title']}")
return news_article
def process(self):
"""
Process one iteration of the pipeline:
- Fetch latest news.
- Process the article with the RAG pipeline.
- Parse the recommendation and take an action.
"""
new_article = self.fetch_latest_news()
if not new_article:
print("[INFO] No news available. Skipping processing.")
return
print(f"[INFO] Processing article: {new_article['title']}")
recommendation_text, rag_prompt = analyze_article_with_agent(new_article)
# Parse the recommendation decision
decision = parse_recommendation(recommendation_text)
action_message = (
f"Latest news '{new_article['title']}' for ticker {self.ticker} analysis:\n"
f"Decision: {decision}\n"
f"Details: {recommendation_text}\n"
)
send_alert(action_message)
# Optionally, you can log the prompt if needed:
print("\n[DEBUG] RAG Prompt Used:\n", rag_prompt)
# ============================================================
# 4. MAIN: Run the Agent Once per Day in the Morning
# ============================================================
def main():
"""
This main function is intended to be run once per day (e.g., in the morning).
You might schedule this script using an OS scheduler so that it runs automatically.
"""
print("[INFO] Starting the Stock Recommendation Agent...")
agent = StockRecommendationAgent(ticker="TSLA")
agent.process()
print("[INFO] Processing complete.")
if __name__ == '__main__':
main()