Darshan03's picture
Update app.py
5b417ea verified
import streamlit as st
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import ta
import os
from datetime import datetime
import google.generativeai as genai
from dotenv import load_dotenv
import markdown
import io
import base64
from xhtml2pdf import pisa
import logging
import json
import streamlit.components.v1 as components
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Configuration
class Config:
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
OUTPUT_DIR = "output_files" # Base output directory
# Create output directories if they don't exist
if not os.path.exists(Config.OUTPUT_DIR):
os.makedirs(Config.OUTPUT_DIR)
# --------------------- Functions from technical_analysis.py ---------------------
def fetch_data(ticker, period="1y"):
"""Fetches stock data from Yahoo Finance."""
logging.info(f"Fetching data for {ticker} for period {period}")
stock = yf.Ticker(ticker)
data = stock.history(period=period)
return data
def calculate_moving_averages(data, short_window=20, long_window=50):
"""Calculates simple moving averages."""
logging.info("Calculating moving averages")
data['SMA_Short'] = data['Close'].rolling(window=short_window).mean()
data['SMA_Long'] = data['Close'].rolling(window=long_window).mean()
return data
def calculate_ema(data, window=20):
"""Calculates exponential moving average."""
logging.info("Calculating EMA")
data['EMA'] = data['Close'].ewm(span=window, adjust=False).mean()
return data
def calculate_macd(data, short_window=12, long_window=26, signal_window=9):
"""Calculates the MACD."""
logging.info("Calculating MACD")
macd = ta.trend.MACD(data['Close'], window_fast=short_window, window_slow=long_window, window_sign=signal_window)
data['MACD'] = macd.macd()
data['MACD_signal'] = macd.macd_signal()
data['MACD_histogram'] = macd.macd_diff()
return data
def calculate_rsi(data, window=14):
"""Calculates the RSI."""
logging.info("Calculating RSI")
data['RSI'] = ta.momentum.RSIIndicator(data['Close'], window=window).rsi()
return data
def calculate_adx(data, window=14):
"""Calculates the Average Directional Index (ADX)"""
logging.info("Calculating ADX")
adx = ta.trend.ADXIndicator(data['High'], data['Low'], data['Close'], window=window)
data['ADX'] = adx.adx()
data['ADX_pos'] = adx.adx_pos()
data['ADX_neg'] = adx.adx_neg()
return data
def calculate_atr(data, window=14):
"""Calculates the ATR."""
logging.info("Calculating ATR")
data['ATR'] = ta.volatility.AverageTrueRange(data['High'], data['Low'], data['Close'], window=window).average_true_range()
return data
def calculate_bollinger_bands(data, window=20, window_dev=2):
"""Calculates the Bollinger Bands."""
logging.info("Calculating Bollinger Bands")
bb = ta.volatility.BollingerBands(data['Close'], window=window, window_dev=window_dev)
data['BB_upper'] = bb.bollinger_hband()
data['BB_mid'] = bb.bollinger_mavg()
data['BB_lower'] = bb.bollinger_lband()
return data
def calculate_stochastic(data, window=14):
"""Calculates the Stochastic Oscillator."""
logging.info("Calculating Stochastic Oscillator")
stoch = ta.momentum.StochasticOscillator(high=data['High'], low=data['Low'], close=data['Close'], window=window)
data['Stochastic_k'] = stoch.stoch()
data['Stochastic_d'] = stoch.stoch_signal()
return data
def print_indicator_outputs(data, ticker, output_dir):
"""Prints key indicator values and explanations and save them in a file"""
logging.info(f"Generating analysis output for {ticker}")
output_text = f"----- {ticker} Technical Analysis Summary -----\n"
# Moving Averages:
if 'SMA_Short' in data.columns and 'SMA_Long' in data.columns:
latest_sma_short = data['SMA_Short'].iloc[-1]
latest_sma_long = data['SMA_Long'].iloc[-1]
output_text += "\n--- Moving Averages ---\n"
output_text += f" Latest Short-Term SMA ({data['SMA_Short'].name}): {latest_sma_short:.2f}\n"
output_text += f" Latest Long-Term SMA ({data['SMA_Long'].name}): {latest_sma_long:.2f}\n"
if latest_sma_short > latest_sma_long:
output_text += " Short-term SMA is above Long-term SMA: Potential uptrend signal.\n"
elif latest_sma_short < latest_sma_long:
output_text += " Short-term SMA is below Long-term SMA: Potential downtrend signal.\n"
else:
output_text += " Short and Long term SMAs are same. No clear trend signal from MA.\n"
if 'EMA' in data.columns:
latest_ema = data['EMA'].iloc[-1]
output_text += f" Latest Exponential Moving Average ({data['EMA'].name}): {latest_ema:.2f}\n"
if 'SMA_Short' in data.columns:
if latest_ema > latest_sma_short:
output_text += " Latest EMA is above short SMA: Potential uptrend signal\n"
elif latest_ema < latest_sma_short:
output_text += " Latest EMA is below short SMA: Potential downtrend signal\n"
# MACD
if 'MACD' in data.columns and 'MACD_signal' in data.columns and 'MACD_histogram' in data.columns:
latest_macd = data['MACD'].iloc[-1]
latest_signal = data['MACD_signal'].iloc[-1]
latest_hist = data['MACD_histogram'].iloc[-1]
output_text += "\n--- MACD ---\n"
output_text += f" Latest MACD: {latest_macd:.2f}\n"
output_text += f" Latest MACD Signal Line: {latest_signal:.2f}\n"
output_text += f" Latest MACD Histogram: {latest_hist:.2f}\n"
if latest_macd > latest_signal and latest_hist > 0:
output_text += " MACD is above signal line and histogram is positive: Potential bullish momentum.\n"
elif latest_macd < latest_signal and latest_hist < 0:
output_text += " MACD is below signal line and histogram is negative: Potential bearish momentum.\n"
elif latest_macd > latest_signal and latest_hist < 0:
output_text += " MACD is above signal line but histogram is negative: Potential weakening of bullish momentum\n"
elif latest_macd < latest_signal and latest_hist > 0:
output_text += " MACD is below signal line but histogram is positive: Potential weakening of bearish momentum\n"
else:
output_text += " No clear signal from MACD.\n"
# RSI
if 'RSI' in data.columns:
latest_rsi = data['RSI'].iloc[-1]
output_text += "\n--- RSI ---\n"
output_text += f" Latest RSI: {latest_rsi:.2f}\n"
if latest_rsi > 70:
output_text += " RSI is above 70: Overbought condition, potential pullback.\n"
elif latest_rsi < 30:
output_text += " RSI is below 30: Oversold condition, potential bounce.\n"
else:
output_text += " RSI is neither overbought nor oversold.\n"
# ADX
if 'ADX' in data.columns and 'ADX_pos' in data.columns and 'ADX_neg' in data.columns:
latest_adx = data['ADX'].iloc[-1]
latest_pos_di = data['ADX_pos'].iloc[-1]
latest_neg_di = data['ADX_neg'].iloc[-1]
output_text += "\n--- ADX ---\n"
output_text += f" Latest ADX: {latest_adx:.2f}\n"
output_text += f" Latest +DI: {latest_pos_di:.2f}\n"
output_text += f" Latest -DI: {latest_neg_di:.2f}\n"
if latest_adx > 25:
output_text += " ADX is above 25: Trend strength present.\n"
if latest_pos_di > latest_neg_di:
output_text += " +DI above -DI: Likely uptrend.\n"
elif latest_pos_di < latest_neg_di:
output_text += " -DI above +DI: Likely downtrend.\n"
else:
output_text += " ADX is below 25: Weak trend or no trend.\n"
# ATR
if 'ATR' in data.columns:
latest_atr = data['ATR'].iloc[-1]
output_text += "\n--- ATR ---\n"
output_text += f" Latest ATR: {latest_atr:.2f}\n"
output_text += f" High ATR indicates higher volatility, low ATR indicates lower volatility\n"
# Bollinger Bands
if 'BB_upper' in data.columns and 'BB_lower' in data.columns and 'BB_mid' in data.columns:
latest_close = data['Close'].iloc[-1]
latest_upper = data['BB_upper'].iloc[-1]
latest_lower = data['BB_lower'].iloc[-1]
latest_mid = data['BB_mid'].iloc[-1]
output_text += "\n--- Bollinger Bands ---\n"
output_text += f" Latest Close Price: {latest_close:.2f}\n"
output_text += f" Latest Upper Band: {latest_upper:.2f}\n"
output_text += f" Latest Middle Band: {latest_mid:.2f}\n"
output_text += f" Latest Lower Band: {latest_lower:.2f}\n"
if latest_close > latest_upper:
output_text += " Price is above the upper band: Potentially overbought.\n"
elif latest_close < latest_lower:
output_text += " Price is below the lower band: Potentially oversold.\n"
elif latest_close < latest_mid:
output_text += " Price is below the middle band: Potential downtrend\n"
elif latest_close > latest_mid:
output_text += " Price is above the middle band: Potential uptrend\n"
else:
output_text += " Price within Bollinger Bands\n"
# Stochastic Oscillator
if 'Stochastic_k' in data.columns and 'Stochastic_d' in data.columns:
latest_stoch_k = data['Stochastic_k'].iloc[-1]
latest_stoch_d = data['Stochastic_d'].iloc[-1]
output_text += "\n--- Stochastic Oscillator ---\n"
output_text += f" Latest Stochastic K: {latest_stoch_k:.2f}\n"
output_text += f" Latest Stochastic D: {latest_stoch_d:.2f}\n"
if latest_stoch_k > 80 and latest_stoch_d > 80:
output_text += " Both %K and %D above 80: Potentially overbought.\n"
elif latest_stoch_k < 20 and latest_stoch_d < 20:
output_text += " Both %K and %D below 20: Potentially oversold.\n"
elif latest_stoch_k > latest_stoch_d:
output_text += " %K crosses above %D : Potential bullish signal.\n"
elif latest_stoch_k < latest_stoch_d:
output_text += " %K crosses below %D : Potential bearish signal.\n"
else:
output_text += "No clear signal from stochastic\n"
# Save the text output
filename = os.path.join(output_dir, f"{ticker}_analysis.txt")
with open(filename, 'w') as f:
f.write(output_text)
logging.info(f"Saved analysis output to: {filename}")
return output_text
def plot_stock_and_indicators(data, ticker, output_dir):
"""Plots the stock price and various indicators and saves the plot."""
logging.info(f"Plotting stock and indicators for {ticker}")
plt.figure(figsize=(15, 10))
# Subplot 1: Price and Moving Averages
plt.subplot(4, 1, 1)
plt.plot(data.index, data['Close'], label='Close Price', color='blue')
if 'SMA_Short' in data.columns:
plt.plot(data.index, data['SMA_Short'], label='SMA Short', color='orange')
if 'SMA_Long' in data.columns:
plt.plot(data.index, data['SMA_Long'], label='SMA Long', color='green')
if 'EMA' in data.columns:
plt.plot(data.index, data['EMA'], label='EMA', color='purple')
plt.title(f'{ticker} Price & Moving Averages')
plt.ylabel('Price')
plt.legend()
plt.grid(True)
# Subplot 2: MACD
plt.subplot(4, 1, 2)
if 'MACD' in data.columns:
plt.plot(data.index, data['MACD'], label='MACD', color='blue')
plt.plot(data.index, data['MACD_signal'], label='MACD Signal', color='orange')
plt.bar(data.index, data['MACD_histogram'], label='MACD Histogram', color='grey', alpha=0.6)
plt.axhline(0, color='black', linestyle='--', linewidth=0.7) # Zero line
plt.title('MACD')
plt.legend()
plt.grid(True)
# Subplot 3: RSI
plt.subplot(4, 1, 3)
if 'RSI' in data.columns:
plt.plot(data.index, data['RSI'], label='RSI', color='purple')
plt.axhline(70, color='red', linestyle='--', linewidth=0.7) # Overbought level
plt.axhline(30, color='green', linestyle='--', linewidth=0.7) # Oversold level
plt.title('RSI')
plt.ylabel('RSI Value')
plt.legend()
plt.grid(True)
# Subplot 4: ADX
plt.subplot(4, 1, 4)
if 'ADX' in data.columns:
plt.plot(data.index, data['ADX'], label='ADX', color='black')
plt.plot(data.index, data['ADX_pos'], label='+DI', color='green')
plt.plot(data.index, data['ADX_neg'], label='-DI', color='red')
plt.axhline(25, color='grey', linestyle='--', linewidth=0.7) # Threshold for strong trend
plt.title('ADX')
plt.ylabel('ADX Value')
plt.legend()
plt.grid(True)
plt.tight_layout()
# Save the plot
filename = os.path.join(output_dir, f"{ticker}_price_indicators.png")
plt.savefig(filename)
plt.close()
logging.info(f"Saved price plot to: {filename}")
def plot_volatility_indicators(data, ticker, output_dir):
"""Plots the volatility indicators and save them."""
logging.info(f"Plotting volatility indicators for {ticker}")
plt.figure(figsize=(15, 10))
# Subplot 1: Price and Bollinger Bands
plt.subplot(3, 1, 1)
plt.plot(data.index, data['Close'], label='Close Price', color='blue')
if 'BB_upper' in data.columns:
plt.plot(data.index, data['BB_upper'], label='BB Upper', color='red')
plt.plot(data.index, data['BB_mid'], label='BB Mid', color='grey')
plt.plot(data.index, data['BB_lower'], label='BB Lower', color='green')
plt.title(f'{ticker} Price & Bollinger Bands')
plt.ylabel('Price')
plt.legend()
plt.grid(True)
# Subplot 2: ATR
plt.subplot(3, 1, 2)
if 'ATR' in data.columns:
plt.plot(data.index, data['ATR'], label='ATR', color='purple')
plt.title('ATR')
plt.ylabel('ATR Value')
plt.legend()
plt.grid(True)
#Subplot 3: Stochastic Oscillator
plt.subplot(3, 1, 3)
if 'Stochastic_k' in data.columns:
plt.plot(data.index, data['Stochastic_k'], label='%K', color='blue')
plt.plot(data.index, data['Stochastic_d'], label='%D', color='orange')
plt.axhline(80, color='red', linestyle='--', linewidth=0.7) # Overbought level
plt.axhline(20, color='green', linestyle='--', linewidth=0.7) # Oversold level
plt.title('Stochastic Oscillator')
plt.legend()
plt.grid(True)
plt.tight_layout()
# Save the plot
filename = os.path.join(output_dir, f"{ticker}_volatility_indicators.png")
plt.savefig(filename)
plt.close()
logging.info(f"Saved volatility plot to: {filename}")
def generate_prompt(analysis_text, image_paths):
"""Creates a structured prompt for an LLM using analysis and image paths."""
logging.info("Generating LLM prompt")
prompt = f"""
Please analyze the following technical analysis of the stock, along with the related charts:
**Technical Analysis Text Output:**
{analysis_text}
**Image Paths:**
{image_paths}
Given this information, please provide the following:
- Summarize the overall technical outlook for this stock.
- Identify any significant patterns or signals.
- Suggest possible trading actions based on the analysis.
- Any additional insigths based on the analysis.
"""
return prompt
def load_prompt(prompt_filepath):
"""Loads the prompt from the given filepath and returns it."""
logging.info(f"Loading LLM prompt from {prompt_filepath}")
try:
with open(prompt_filepath, 'r') as f:
prompt = f.read()
return prompt
except FileNotFoundError:
logging.error(f"Error: Prompt file not found at {prompt_filepath}")
return None
except Exception as e:
logging.error(f"Error loading prompt: {e}")
return None
def get_response(llm, prompt):
"""Generates a response from the LLM based on the provided prompt and context."""
logging.info("Sending prompt to LLM")
try:
response = llm.send_message(prompt)
logging.info("Received LLM response")
return response
except Exception as e:
logging.error(f"Error getting response from LLM: {e}")
return None
def markdown_to_pdf_xhtml2pdf(markdown_text, output_pdf_path):
"""Converts markdown text to PDF using xhtml2pdf."""
logging.info(f"Converting markdown to PDF: {output_pdf_path}")
try:
html = markdown.markdown(markdown_text)
with open(output_pdf_path, "wb") as pdf_file:
pisa_status = pisa.CreatePDF(html, dest=pdf_file)
if pisa_status.err:
logging.error(f"Error converting to PDF: {pisa_status.err}")
else:
logging.info(f"PDF saved successfully to: {output_pdf_path}")
except Exception as e:
logging.error(f"Error converting to PDF: {e}")
def get_unique_output_dir(base_dir, ticker):
"""Creates a unique output directory with a timestamp."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = os.path.join(base_dir, f"output_{ticker}_{timestamp}")
os.makedirs(output_dir, exist_ok=True)
return output_dir
def get_stock_symbols(file_path):
"""
Reads stock symbols from a JSON file.
Args:
file_path (str): The path to the JSON file.
Returns:
list: A list of stock symbols.
"""
try:
with open(file_path, 'r') as f:
data = json.load(f)
# Extract symbols directly from the JSON structure
symbols = [details["symbol"] for details in data.values()]
return symbols
except Exception as e:
st.error(f"Error reading stock symbols: {e}")
return []
# --------------------- Streamlit App ---------------------
def main():
st.title("Stock Technical Analysis with LLM")
# Input for stock data as a JSON file
st.header("1. Upload Stock Data (JSON)")
uploaded_file = st.file_uploader("Upload your stock_data.json file", type=["json"])
stock_symbols = []
if uploaded_file:
try:
stock_symbols = get_stock_symbols(uploaded_file)
if stock_symbols:
st.success("Stock data file uploaded successfully!")
else:
st.warning("No Stock Symbols found in the file")
except json.JSONDecodeError:
st.error("Invalid JSON format. Please upload a valid JSON file.")
except Exception as e:
st.error(f"An error occurred while processing the uploaded file: {e}")
# Use the uploaded symbols for dynamic selection
# selected_symbol = st.sidebar.selectbox("Select a stock for analysis:", stock_symbols)
# Period selection
period = st.sidebar.selectbox("Select the time period for analysis:", ["1d", "5d", "1mo", "3mo", "6mo", "1y", "2y", "5y", "10y", "ytd", "max"])
# Button to trigger analysis
analyze_button = st.sidebar.button("Analyze")
# Check if a stock symbol is selected
if not stock_symbols:
st.sidebar.warning("Please upload a valid stock data file.")
# Only proceed if the button is clicked and a symbol is selected
if analyze_button and stock_symbols:
for selected_symbol in stock_symbols:
try:
# Create a unique output directory
output_dir = get_unique_output_dir(Config.OUTPUT_DIR, selected_symbol)
# Fetch and process data
stock_data = fetch_data(selected_symbol, period=period)
stock_data = calculate_moving_averages(stock_data)
stock_data = calculate_ema(stock_data)
stock_data = calculate_macd(stock_data)
stock_data = calculate_rsi(stock_data)
stock_data = calculate_adx(stock_data)
stock_data = calculate_atr(stock_data)
stock_data = calculate_bollinger_bands(stock_data)
stock_data = calculate_stochastic(stock_data)
# Get analysis output
analysis_output = print_indicator_outputs(stock_data, selected_symbol, output_dir)
# Plot and save charts
plot_stock_and_indicators(stock_data, selected_symbol, output_dir)
plot_volatility_indicators(stock_data, selected_symbol, output_dir)
# Generate prompt for LLM
image_paths = [os.path.join(output_dir, file) for file in os.listdir(output_dir) if file.endswith(('.png', '.jpg', '.jpeg'))]
image_paths = "\n".join(image_paths)
prompt = generate_prompt(analysis_output, image_paths)
# Save the prompt to a file
prompt_filename = os.path.join(output_dir, f"{selected_symbol}_prompt.txt")
with open(prompt_filename, 'w') as f:
f.write(prompt)
logging.info(f"Saved LLM prompt to: {prompt_filename}")
# Configure and create LLM model
genai.configure(api_key=Config.GOOGLE_API_KEY)
generation_config = {
"temperature": 0.9,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
}
model = genai.GenerativeModel(
model_name="gemini-pro", # Use "gemini-pro" for text-only
generation_config=generation_config,
)
chat_session = model.start_chat()
# Get LLM response
llm_response = get_response(chat_session, prompt)
if llm_response:
# Generate PDF
pdf_filename = os.path.join(output_dir, f"technical_analysis_{selected_symbol}.pdf")
markdown_to_pdf_xhtml2pdf(llm_response.text, pdf_filename)
# Offer PDF for download
with open(pdf_filename, "rb") as pdf_file:
pdf_bytes = pdf_file.read()
st.download_button(
label=f"Download Analysis PDF for {selected_symbol}",
data=pdf_bytes,
file_name=f"technical_analysis_{selected_symbol}.pdf",
mime="application/pdf"
)
# Display the PDF using an iframe
b64 = base64.b64encode(pdf_bytes).decode()
pdf_display = f'<iframe src="data:application/pdf;base64,{b64}" width="700" height="1000" type="application/pdf"></iframe>'
st.markdown(pdf_display, unsafe_allow_html=True)
else:
st.error(f"Could not generate PDF for {selected_symbol}. LLM response is empty.")
except Exception as e:
st.error(f"An error occurred during analysis of {selected_symbol}: {e}")
# --- TradingView Widget Code ---
tradingview_widget_code = f"""
<!-- TradingView Widget BEGIN -->
<div class="tradingview-widget-container" style="height:100%; width:100%">
<div id="tradingview_a1b2c" style="height:calc(100% - 32px); width:100%"></div>
<div class="tradingview-widget-copyright">
<a href="https://www.tradingview.com/" rel="noopener nofollow" target="_blank">
<span class="blue-text">Track all markets on TradingView</span>
</a>
</div>
<script type="text/javascript" src="https://s3.tradingview.com/tv.js"></script>
<script type="text/javascript">
new TradingView.widget(
{{
"autosize": true,
"symbol": "{'RELIANCE.NS'}",
"interval": "D",
"timezone": "Etc/UTC",
"theme": "light",
"style": "1",
"locale": "en",
"toolbar_bg": "#f1f3f6",
"enable_publishing": false,
"allow_symbol_change": true,
"container_id": "tradingview_a1b2c"
}}
);
</script>
</div>
<!-- TradingView Widget END -->
"""
# ----------------------------------
# HTML, CSS, and JavaScript for Resizing (outside the if condition)
html_code = f"""
<style>
/* Make the container resizable */
#resizable-container {{
position: relative;
width: 700px; /* Initial width */
height: 600px; /* Initial height */
border: 2px solid #ccc;
overflow: hidden; /* Important for clipping the widget during resize */
}}
/* Style the resize handle (optional) */
.resize-handle {{
position: absolute;
width: 10px;
height: 10px;
background-color: #007bff; /* Example color */
cursor: se-resize; /* Diagonal resize cursor */
}}
/* Position the resize handle at the bottom-right corner */
#resize-handle-se {{
bottom: 0;
right: 0;
}}
</style>
<div id="resizable-container">
{tradingview_widget_code}
<div id="resize-handle-se" class="resize-handle"></div>
</div>
<script src="https://cdn.jsdelivr.net/npm/interactjs/dist/interact.min.js"></script>
<script>
// Target the resize handle
interact('#resize-handle-se')
.draggable({{
// Restrict movement to the bounds of the container
modifiers: [
interact.modifiers.restrictRect({{
restriction: 'parent',
endOnly: true
}})
],
inertia: true,
}})
.on('dragmove', function (event) {{
const container = document.getElementById('resizable-container');
// Update width and height based on drag movement
let newWidth = container.offsetWidth + event.dx;
let newHeight = container.offsetHeight + event.dy;
// Set minimum dimensions (adjust as needed)
const minWidth = 300;
const minHeight = 300;
if (newWidth < minWidth) {{
newWidth = minWidth;
}}
if (newHeight < minHeight) {{
newHeight = minHeight;
}}
container.style.width = newWidth + 'px';
container.style.height = newHeight + 'px';
// Trigger TradingView resize (if necessary - see note below)
// window.dispatchEvent(new Event('resize')); // You might not need this
}});
</script>
"""
# Embed the HTML code using components.html
components.html(html_code, height=600, width=700, scrolling=False)
if __name__ == "__main__":
main()