main_anaylisis / COPY2.py
pranit144's picture
Upload 14 files
2997210 verified
from flask import Flask, render_template, request, jsonify
import time
import json
import random
import datetime
import requests
import numpy as np
import yfinance as yf
import faiss
from sentence_transformers import SentenceTransformer
from google import genai
from google.genai import types
from flask_caching import Cache
app = Flask(__name__)
cache = Cache()
# ============================================================
# 1. SETUP: Load Historical Data and Build FAISS Index
# ============================================================
# Load historical news data (including reactions) and embeddings
HISTORICAL_NEWS_FILE = 'historical_financial_news_with_reactions.json'
EMBEDDINGS_FILE = 'historical_news_embeddings.npy'
try:
with open(HISTORICAL_NEWS_FILE, 'r') as f:
historical_news = json.load(f)
embeddings = np.load(EMBEDDINGS_FILE).astype('float32')
# Initialize the SentenceTransformer (adjust model if needed)
embed_model = SentenceTransformer("all-MiniLM-L6-v2")
# Create FAISS index using L2 distance
dimension = embeddings.shape[1]
faiss_index = faiss.IndexFlatL2(dimension)
faiss_index.add(embeddings)
print(f"[INFO] FAISS index contains {faiss_index.ntotal} vectors.")
index_loaded = True
except Exception as e:
print(f"[ERROR] Failed to load index: {str(e)}")
index_loaded = False
historical_news = []
# ============================================================
# 2. UTILITY FUNCTIONS
# ============================================================
def get_last_n_days_market_data(ticker, n_days=10):
"""
Fetches market data for the last n_days for the given ticker using yfinance.
Returns a string with the date, closing price, and volume for each day.
"""
today = datetime.datetime.now()
start_date = (today - datetime.timedelta(days=n_days)).strftime("%Y-%m-%d")
# yfinance end date is exclusive; adding one day to include the most recent available data
end_date = (today + datetime.timedelta(days=1)).strftime("%Y-%m-%d")
data = yf.download(ticker, start=start_date, end=end_date)
if data.empty:
return "No market data available for the last {} days.".format(n_days), None
market_data_output = f"Market data for the last {n_days} days for {ticker}:\n"
for date, row in data.iterrows():
market_data_output += f"{date.strftime('%Y-%m-%d')}: Close = {row['Close']}, Volume = {row['Volume']}\n"
return market_data_output, data
def get_historical_chart_data(ticker, period="6mo"):
"""
Fetches historical stock data for charts
"""
data = yf.download(ticker, period=period)
if data.empty:
return None
chart_data = {
'dates': data.index.strftime('%Y-%m-%d').tolist(),
'prices': data['Close'].tolist(),
'volumes': data['Volume'].tolist()
}
return chart_data
def generate_rag_prompt_with_market(new_article, similar_articles, market_data_text):
"""
Build a RAG prompt including the new article details,
detailed market data (last 10 days), and similar historical articles.
"""
prompt = f"""New Financial News Article:
Title: {new_article['title']}
Content: {new_article['content']}
Ticker: {new_article.get('ticker', 'N/A')}
{market_data_text}
Historical Context:"""
for idx, art in enumerate(similar_articles, 1):
prompt += f"""\n\n{idx}. Title: {art['title']}
Reaction: Price Change = {art['reaction']['price_change']}%, Volume Spike = {art['reaction']['volume_spike']}"""
prompt += """
Based on the above information, analyze and explain:
- How the market might react to this new article.
- Whether the recommendation is to BUY, HOLD, or SELL.
- Provide a brief justification referencing historical patterns and current market conditions.
"""
return prompt
def analyze_article_with_agent(new_article, top_k=5):
"""
Pipeline:
1. Embed the new article.
2. Retrieve similar historical articles via FAISS.
3. Get the last 10 days of market data.
4. Construct a prompt (RAG) and query Gemini.
Returns Gemini's recommendation text and the prompt used.
"""
# 1. Embed the new article
new_text = f"{new_article['title']}. {new_article['content']}"
new_embedding = embed_model.encode(new_text)
new_embedding = np.array([new_embedding]).astype('float32')
# 2. Retrieve similar historical articles (top_k)
distances, indices = faiss_index.search(new_embedding, top_k)
similar_articles = [historical_news[i] for i in indices[0]]
# 3. Fetch market data for the last 10 days using the ticker in the new article
market_data_text, _ = get_last_n_days_market_data(new_article['ticker'], n_days=10)
# 4. Build the RAG prompt
rag_prompt = generate_rag_prompt_with_market(new_article, similar_articles, market_data_text)
# 5. Query Gemini for a recommendation
client = genai.Client(api_key="AIzaSyBRzLEPudJzVNDQpckpxhbkA3aF9Inr5v0") # Replace with your real API key
response = client.models.generate_content(
model="gemini-2.0-flash", # Adjust model version if needed
contents=[{"role": "user", "parts": [{"text": rag_prompt}]}]
)
return response.text, rag_prompt, similar_articles
def get_yesterdays_market_data(ticker):
"""
Fetch market data for 'yesterday' (based on current date - 1 day)
using yfinance. Adjust the date range as needed.
"""
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
start_date = yesterday.strftime("%Y-%m-%d")
# Since yfinance uses exclusive end date, we add 1 day:
end_date = (yesterday + datetime.timedelta(days=1)).strftime("%Y-%m-%d")
data = yf.download(ticker, start=start_date, end=end_date)
# Extract key info. If no data is found, return defaults
if not data.empty:
close = data['Close'].iloc[-1]
previous_close = data['Close'].iloc[0]
market_cap = yf.Ticker(ticker).info.get("marketCap", "N/A")
else:
close, previous_close, market_cap = "N/A", "N/A", "N/A"
market_data_text = (
f"Current market data for {ticker} (as of yesterday):\n"
f"- Closing Price: {close}\n"
f"- Previous Close: {previous_close}\n"
f"- Market Cap: {market_cap}\n"
)
return market_data_text, close, previous_close, market_cap
def fetch_latest_news(ticker="TSLA"):
"""
Fetch the most recent news article for the given ticker using NewsAPI.
Make sure to replace NEWS_API_KEY with your own key.
"""
NEWS_API_KEY = "8df6cf4cbc4c42cdb7fe0d2ee438ccd5" # Replace with your NewsAPI key
url = (
f"https://newsapi.org/v2/everything?"
f"q={ticker}&"
f"apiKey={NEWS_API_KEY}&"
f"sortBy=publishedAt&"
f"language=en"
)
response = requests.get(url)
data = response.json()
if data.get("status") == "ok" and data.get("totalResults", 0) > 0:
article = data["articles"][0] # Take the most recent article
return {
"title": article["title"],
"content": article["content"] or article["description"],
"ticker": ticker,
"url": article.get("url", ""),
"source": article.get("source", {}).get("name", "Unknown"),
"publishedAt": article.get("publishedAt", "")
}
else:
print("[WARN] No news articles found.")
return None
def parse_recommendation(recommendation_text):
"""
Simple parser to extract the recommendation (BUY/HOLD/SELL) from Gemini output.
"""
text_upper = recommendation_text.upper()
if "BUY" in text_upper:
return "BUY"
elif "SELL" in text_upper:
return "SELL"
else:
return "HOLD"
def get_multiple_news(query, count=5):
"""
Fetch multiple news articles for the given query
"""
NEWS_API_KEY = "8df6cf4cbc4c42cdb7fe0d2ee438ccd5" # Replace with your NewsAPI key
url = (
f"https://newsapi.org/v2/everything?"
f"q={query}&"
f"apiKey={NEWS_API_KEY}&"
f"sortBy=publishedAt&"
f"language=en&"
f"pageSize={count}"
)
try:
response = requests.get(url, timeout=10) # Add timeout
response.raise_for_status() # Raise an error for bad status codes
data = response.json()
if data.get("status") == "ok" and data.get("totalResults", 0) > 0:
articles = []
for article in data["articles"][:count]:
# Validate and clean each article
if article.get("title") and (article.get("content") or article.get("description")):
articles.append({
"title": article["title"],
"content": article.get("content") or article.get("description", ""),
"url": article.get("url", ""),
"source": article.get("source", {}).get("name", "Unknown"),
"publishedAt": article.get("publishedAt", "")
})
return articles
else:
print(f"NewsAPI returned no results for query: {query}")
return []
except requests.exceptions.RequestException as e:
print(f"Error fetching news for query {query}: {str(e)}")
return []
except Exception as e:
print(f"Unexpected error fetching news for query {query}: {str(e)}")
return []
# ============================================================
# 3. FLASK ROUTES
# ============================================================
@app.route('/')
def index():
"""Main dashboard page"""
return render_template('index.html')
@app.route('/analyze', methods=['POST'])
def analyze():
"""Analyze a stock ticker and return recommendation"""
ticker = request.form.get('ticker', 'TSLA').upper()
# Fetch news
article = fetch_latest_news(ticker)
if not article:
return jsonify({
'status': 'error',
'message': f'No recent news found for {ticker}'
})
try:
# Get market data
market_data_text, last_close, prev_close, market_cap = get_yesterdays_market_data(ticker)
# Get chart data
chart_data = get_historical_chart_data(ticker)
# Get recommendation
if index_loaded:
recommendation_text, rag_prompt, similar_articles = analyze_article_with_agent(article)
decision = parse_recommendation(recommendation_text)
else:
recommendation_text = "Cannot generate recommendation: historical data not loaded."
rag_prompt = "N/A"
decision = "N/A"
similar_articles = []
# Get more news articles
recent_news = get_multiple_news(ticker, 5)
return jsonify({
'status': 'success',
'ticker': ticker,
'article': article,
'recommendation': {
'text': recommendation_text,
'decision': decision
},
'market_data': {
'last_close': last_close,
'prev_close': prev_close,
'market_cap': market_cap
},
'chart_data': chart_data,
'recent_news': recent_news,
'similar_historical': similar_articles
})
except Exception as e:
return jsonify({
'status': 'error',
'message': f'Error analyzing {ticker}: {str(e)}'
})
@app.route('/stocks')
def stock_list():
"""Return a list of popular stocks"""
popular_stocks = [
{"symbol": "AAPL", "name": "Apple Inc."},
{"symbol": "MSFT", "name": "Microsoft Corporation"},
{"symbol": "GOOGL", "name": "Alphabet Inc."},
{"symbol": "AMZN", "name": "Amazon.com, Inc."},
{"symbol": "META", "name": "Meta Platforms, Inc."},
{"symbol": "TSLA", "name": "Tesla, Inc."},
{"symbol": "NVDA", "name": "NVIDIA Corporation"},
{"symbol": "JPM", "name": "JPMorgan Chase & Co."},
{"symbol": "V", "name": "Visa Inc."},
{"symbol": "WMT", "name": "Walmart Inc."}
]
return jsonify(popular_stocks)
@app.route('/api/market-news')
@cache.cached(timeout=300) # Cache for 5 minutes
def get_market_news():
"""Fetch latest market news"""
try:
# First try to get news for a general market query
all_news = []
try:
# Try to get general market news first
market_news = get_multiple_news("stock market", count=5)
if market_news:
all_news.extend(market_news)
except Exception as e:
print(f"Error fetching general market news: {str(e)}")
# If we don't have enough news, fetch for specific sectors
if len(all_news) < 10:
for sector in SECTORS.keys():
try:
sector_news = get_multiple_news(sector, count=2)
if sector_news:
all_news.extend(sector_news)
except Exception as e:
print(f"Error fetching news for sector {sector}: {str(e)}")
continue
# Deduplicate news by URL
seen_urls = set()
unique_news = []
for article in all_news:
if article.get('url') not in seen_urls:
seen_urls.add(article.get('url'))
# Ensure all required fields are present
processed_article = {
'title': article.get('title', 'No Title'),
'url': article.get('url', '#'),
'source': article.get('source', 'Unknown Source'),
'publishedAt': article.get('publishedAt', datetime.now().isoformat()),
'summary': article.get('content') or article.get('description', 'No content available')
}
unique_news.append(processed_article)
# Sort by published date and take the 10 most recent
unique_news.sort(key=lambda x: x['publishedAt'], reverse=True)
return jsonify(unique_news[:10])
except Exception as e:
print(f"Critical error in get_market_news: {str(e)}")
# Return a more informative error response
return jsonify({
'error': 'Failed to fetch market news',
'details': str(e)
}), 500
if __name__ == '__main__':
app.run(debug=True, port=5001)