|
|
import pandas as pd |
|
|
import yfinance as yf |
|
|
import numpy as np |
|
|
import gradio as gr |
|
|
import matplotlib.pyplot as plt |
|
|
from functools import lru_cache |
|
|
import asyncio |
|
|
import concurrent.futures |
|
|
import time |
|
|
from typing import Dict, List, Optional, Any, Tuple |
|
|
import logging |
|
|
import tempfile |
|
|
import os |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
logger = logging.getLogger('stock_analyzer') |
|
|
|
|
|
|
|
|
@lru_cache(maxsize=100) |
|
|
def get_financial_data(ticker: str) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
Fetch financial data for a given stock ticker using Yahoo Finance. |
|
|
|
|
|
Args: |
|
|
ticker: Stock symbol to fetch data for |
|
|
|
|
|
Returns: |
|
|
Dictionary of financial metrics or None if fetch failed |
|
|
""" |
|
|
try: |
|
|
stock = yf.Ticker(ticker) |
|
|
info = stock.info |
|
|
|
|
|
return { |
|
|
'Ticker': ticker, |
|
|
'PE_Ratio': info.get('forwardPE'), |
|
|
'Debt_to_Equity': info.get('debtToEquity'), |
|
|
'Revenue_Growth': info.get('revenueGrowth'), |
|
|
'ROE': info.get('returnOnEquity'), |
|
|
'ROA': info.get('returnOnAssets'), |
|
|
'Gross_Margin': info.get('grossMargins'), |
|
|
'EBITDA': info.get('ebitda'), |
|
|
'Market_Cap': info.get('marketCap'), |
|
|
'Dividend_Yield': info.get('dividendYield'), |
|
|
'Profit_Margin': info.get('profitMargins'), |
|
|
'EPS_Growth': info.get('earningsGrowth'), |
|
|
'Price_to_Book': info.get('priceToBook'), |
|
|
'Current_Price': info.get('currentPrice') |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error fetching data for {ticker}: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
async def fetch_data_concurrently(tickers: List[str]) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Fetch financial data for multiple tickers concurrently. |
|
|
|
|
|
Args: |
|
|
tickers: List of stock symbols |
|
|
|
|
|
Returns: |
|
|
List of financial data dictionaries for each ticker |
|
|
""" |
|
|
loop = asyncio.get_event_loop() |
|
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
|
tasks = [ |
|
|
loop.run_in_executor( |
|
|
executor, |
|
|
get_financial_data, |
|
|
ticker |
|
|
) |
|
|
for ticker in tickers |
|
|
] |
|
|
results = await asyncio.gather(*tasks) |
|
|
return [r for r in results if r is not None] |
|
|
|
|
|
def sanitize_financial_data(df: pd.DataFrame) -> pd.DataFrame: |
|
|
""" |
|
|
Sanitize raw financial metrics based on domain knowledge. |
|
|
Replace implausible values with NaN for later imputation. |
|
|
""" |
|
|
df = df.copy() |
|
|
|
|
|
|
|
|
for col in ['ROE', 'ROA', 'Profit_Margin', 'Gross_Margin']: |
|
|
if col in df.columns: |
|
|
df[col] = df[col].where((df[col] >= -2) & (df[col] <= 2), np.nan) |
|
|
|
|
|
|
|
|
for col in ['Revenue_Growth', 'EPS_Growth']: |
|
|
if col in df.columns: |
|
|
df[col] = df[col].where((df[col] >= -1) & (df[col] <= 5), np.nan) |
|
|
|
|
|
|
|
|
for col in ['Debt_to_Equity', 'Dividend_Yield']: |
|
|
if col in df.columns: |
|
|
df[col] = df[col].where(df[col] >= 0, np.nan) |
|
|
|
|
|
|
|
|
for col in ['PE_Ratio', 'Price_to_Book']: |
|
|
if col in df.columns: |
|
|
df[col] = df[col].where((df[col] > 0) & (df[col] < 1000), np.nan) |
|
|
|
|
|
|
|
|
for col in ['Market_Cap', 'EBITDA']: |
|
|
if col in df.columns: |
|
|
df[col] = df[col].where(df[col] > 0, np.nan) |
|
|
|
|
|
|
|
|
if 'Current_Price' in df.columns: |
|
|
df['Current_Price'] = df['Current_Price'].where(df['Current_Price'] > 0, np.nan) |
|
|
|
|
|
return df |
|
|
|
|
|
|
|
|
def normalize(series: pd.Series, reverse: bool = False, |
|
|
lower_percentile: float = 0.10, upper_percentile: float = 0.90) -> pd.Series: |
|
|
""" |
|
|
Normalize a series to a 0-10 scale using winsorization. |
|
|
""" |
|
|
|
|
|
valid_series = series.dropna() |
|
|
if len(valid_series) == 0 or len(valid_series.unique()) <= 1: |
|
|
return pd.Series(5.0, index=series.index, dtype=float) |
|
|
|
|
|
|
|
|
q_low = valid_series.quantile(lower_percentile) |
|
|
q_high = valid_series.quantile(upper_percentile) |
|
|
|
|
|
if q_high <= q_low: |
|
|
return pd.Series(5.0, index=series.index, dtype=float) |
|
|
|
|
|
|
|
|
clipped = series.clip(q_low, q_high) |
|
|
normalized = (clipped - q_low) / (q_high - q_low) |
|
|
normalized = normalized.clip(0, 1) |
|
|
|
|
|
result = 10 * (1 - normalized) if reverse else 10 * normalized |
|
|
return result |
|
|
|
|
|
|
|
|
def calculate_scores(df: pd.DataFrame, growth_weight: float, |
|
|
value_weight: float, risk_weight: float) -> pd.DataFrame: |
|
|
""" |
|
|
Calculate stock scores based on various financial metrics. |
|
|
""" |
|
|
|
|
|
scored_df = df.copy() |
|
|
|
|
|
|
|
|
scored_df['Revenue_Growth_Score'] = normalize(df['Revenue_Growth']) |
|
|
scored_df['EPS_Growth_Score'] = normalize(df['EPS_Growth']) |
|
|
scored_df['ROE_Score'] = normalize(df['ROE']) |
|
|
scored_df['ROA_Score'] = normalize(df['ROA']) |
|
|
|
|
|
|
|
|
growth_cols = ['Revenue_Growth_Score', 'EPS_Growth_Score', 'ROE_Score', 'ROA_Score'] |
|
|
scored_df['Growth_Score'] = scored_df[growth_cols].mean(axis=1) |
|
|
|
|
|
|
|
|
scored_df['PE_Ratio_Score'] = normalize(df['PE_Ratio'], reverse=True) |
|
|
scored_df['Price_to_Book_Score'] = normalize(df['Price_to_Book'], reverse=True) |
|
|
scored_df['Dividend_Yield_Score'] = normalize(df['Dividend_Yield']) |
|
|
|
|
|
|
|
|
value_cols = ['PE_Ratio_Score', 'Price_to_Book_Score', 'Dividend_Yield_Score'] |
|
|
scored_df['Value_Score'] = scored_df[value_cols].mean(axis=1) |
|
|
|
|
|
|
|
|
scored_df['Debt_to_Equity_No_Risk_Score'] = normalize(df['Debt_to_Equity'], reverse=True) |
|
|
scored_df['Profit_Margin_No_Risk_Score'] = normalize(df['Profit_Margin']) |
|
|
scored_df['Market_Cap_No_Risk_Score'] = normalize(df['Market_Cap']) |
|
|
|
|
|
|
|
|
no_risk_cols = ['Debt_to_Equity_No_Risk_Score', 'Profit_Margin_No_Risk_Score', 'Market_Cap_No_Risk_Score'] |
|
|
scored_df['No_Risk_Score'] = scored_df[no_risk_cols].mean(axis=1) |
|
|
|
|
|
|
|
|
total = growth_weight + value_weight + risk_weight |
|
|
if total == 0: |
|
|
growth_weight = value_weight = risk_weight = 1/3 |
|
|
else: |
|
|
growth_weight /= total |
|
|
value_weight /= total |
|
|
risk_weight /= total |
|
|
|
|
|
|
|
|
scored_df['Total_Score'] = ( |
|
|
growth_weight * scored_df['Growth_Score'] + |
|
|
value_weight * scored_df['Value_Score'] + |
|
|
risk_weight * scored_df['No_Risk_Score'] |
|
|
) |
|
|
|
|
|
return scored_df |
|
|
|
|
|
|
|
|
def plot_bar_chart(df: pd.DataFrame) -> plt.Figure: |
|
|
plt.style.use('seaborn-v0_8-whitegrid') |
|
|
fig, ax = plt.subplots(figsize=(12, 7)) |
|
|
colors = ['#4CAF50', '#2196F3', '#FF9800'] |
|
|
|
|
|
df.set_index('Ticker')[['Growth_Score', 'Value_Score', 'No_Risk_Score']].plot( |
|
|
kind='bar', |
|
|
stacked=False, |
|
|
color=colors, |
|
|
width=0.7, |
|
|
alpha=0.8, |
|
|
ax=ax |
|
|
) |
|
|
|
|
|
total_scores = df.set_index('Ticker')['Total_Score'] |
|
|
ax2 = ax.twinx() |
|
|
ax2.plot(range(len(total_scores)), total_scores, 'ro-', linewidth=2.5, markersize=8, label='Total Score') |
|
|
ax2.set_ylim(0, 10.5) |
|
|
ax2.set_ylabel('Total Score', fontsize=12, color='r') |
|
|
|
|
|
ax.set_title("Stock Analysis Scores", fontsize=16, fontweight='bold', pad=20) |
|
|
ax.set_ylabel("Component Scores (0-10)", fontsize=12) |
|
|
ax.set_xlabel("", fontsize=12) |
|
|
ax.tick_params(axis='x', rotation=45) |
|
|
ax.set_ylim(0, 10.5) |
|
|
ax.grid(axis='y', linestyle='--', alpha=0.7) |
|
|
|
|
|
lines, labels = ax.get_legend_handles_labels() |
|
|
lines2, labels2 = ax2.get_legend_handles_labels() |
|
|
ax.legend(lines + lines2, labels + labels2, loc='upper center', bbox_to_anchor=(0.5, -0.15), |
|
|
ncol=4, frameon=True, fontsize=10) |
|
|
|
|
|
plt.tight_layout() |
|
|
return fig |
|
|
|
|
|
|
|
|
def plot_radar_chart(df: pd.DataFrame, tickers: List[str]) -> plt.Figure: |
|
|
plot_df = df[df['Ticker'].isin(tickers)] |
|
|
|
|
|
if plot_df.empty: |
|
|
plot_df = df.head(min(3, len(df))) |
|
|
tickers = plot_df['Ticker'].tolist() |
|
|
|
|
|
categories = ['Growth', 'Value', 'No_Risk', 'Total'] |
|
|
N = len(categories) |
|
|
angles = [n / float(N) * 2 * np.pi for n in range(N)] |
|
|
angles += angles[:1] |
|
|
|
|
|
fig = plt.figure(figsize=(10, 8)) |
|
|
ax = fig.add_subplot(111, polar=True) |
|
|
colors = plt.cm.viridis(np.linspace(0, 1, len(tickers))) |
|
|
|
|
|
for i, ticker in enumerate(tickers): |
|
|
ticker_data = plot_df[plot_df['Ticker'] == ticker] |
|
|
if ticker_data.empty: |
|
|
continue |
|
|
|
|
|
values = ticker_data[['Growth_Score', 'Value_Score', 'No_Risk_Score', 'Total_Score']].values.flatten().tolist() |
|
|
values += values[:1] |
|
|
|
|
|
ax.plot(angles, values, linewidth=2, linestyle='solid', color=colors[i], label=ticker) |
|
|
ax.fill(angles, values, color=colors[i], alpha=0.1) |
|
|
|
|
|
ax.set_xticks(angles[:-1]) |
|
|
ax.set_xticklabels(categories, size=12) |
|
|
ax.set_yticks(np.arange(2, 12, 2)) |
|
|
ax.set_yticklabels(np.arange(2, 12, 2), size=10) |
|
|
ax.set_ylim(0, 10) |
|
|
|
|
|
plt.title("Stock Comparison Radar Chart", size=16, fontweight='bold', pad=20) |
|
|
plt.legend(loc='upper right', bbox_to_anchor=(0.1, 0.1), frameon=True) |
|
|
|
|
|
return fig |
|
|
|
|
|
|
|
|
def create_metrics_table(df: pd.DataFrame) -> pd.DataFrame: |
|
|
metrics_df = df[['Ticker', 'Current_Price', 'PE_Ratio', 'Price_to_Book', |
|
|
'Debt_to_Equity', 'ROE', 'ROA', 'Revenue_Growth', |
|
|
'EPS_Growth', 'Profit_Margin', 'Dividend_Yield']].copy() |
|
|
|
|
|
for col in ['ROE', 'ROA', 'Revenue_Growth', 'EPS_Growth', 'Profit_Margin', 'Dividend_Yield']: |
|
|
metrics_df[col] = metrics_df[col].apply(lambda x: f"{x*100:.2f}%" if pd.notnull(x) else "N/A") |
|
|
|
|
|
for col in ['PE_Ratio', 'Price_to_Book', 'Debt_to_Equity']: |
|
|
metrics_df[col] = metrics_df[col].apply(lambda x: f"{x:.2f}" if pd.notnull(x) else "N/A") |
|
|
|
|
|
metrics_df['Current_Price'] = metrics_df['Current_Price'].apply(lambda x: f"${x:.2f}" if pd.notnull(x) else "N/A") |
|
|
|
|
|
return metrics_df |
|
|
|
|
|
|
|
|
async def analyze_tickers( |
|
|
tickers: str, |
|
|
growth_weight: float, |
|
|
value_weight: float, |
|
|
risk_weight: float, |
|
|
top_n: int = 5 |
|
|
) -> Tuple[pd.DataFrame, pd.DataFrame, plt.Figure, plt.Figure]: |
|
|
start_time = time.time() |
|
|
|
|
|
ticker_list = [t.strip().upper() for t in tickers.split(",") if t.strip()] |
|
|
|
|
|
if not ticker_list: |
|
|
return pd.DataFrame(), pd.DataFrame(), plt.figure(), plt.figure() |
|
|
|
|
|
data = await fetch_data_concurrently(ticker_list) |
|
|
|
|
|
if not data: |
|
|
logger.warning("No valid data retrieved for any tickers") |
|
|
return pd.DataFrame(), pd.DataFrame(), plt.figure(), plt.figure() |
|
|
|
|
|
|
|
|
df = pd.DataFrame(data) |
|
|
df = sanitize_financial_data(df) |
|
|
|
|
|
|
|
|
numerical_cols = df.select_dtypes(include=[np.number]).columns |
|
|
for col in numerical_cols: |
|
|
median_val = df[col].median() |
|
|
if pd.isna(median_val): |
|
|
df[col] = df[col].fillna(0) |
|
|
else: |
|
|
df[col] = df[col].fillna(median_val) |
|
|
|
|
|
|
|
|
df = calculate_scores(df, growth_weight, value_weight, risk_weight) |
|
|
df = df.sort_values(by='Total_Score', ascending=False).reset_index(drop=True) |
|
|
|
|
|
|
|
|
metrics_table = create_metrics_table(df) |
|
|
bar_chart = plot_bar_chart(df) |
|
|
top_tickers = df.head(min(top_n, len(df)))['Ticker'].tolist() |
|
|
radar_chart = plot_radar_chart(df, top_tickers) |
|
|
scores_table = df[['Ticker', 'Total_Score', 'Growth_Score', 'Value_Score', 'No_Risk_Score']].round(2) |
|
|
|
|
|
logger.info(f"Analysis completed in {time.time() - start_time:.2f} seconds") |
|
|
return scores_table, metrics_table, bar_chart, radar_chart |
|
|
|
|
|
|
|
|
def dataframe_to_markdown(df: pd.DataFrame) -> str: |
|
|
if df.empty: |
|
|
return "" |
|
|
df = df.fillna("N/A") |
|
|
header = "| " + " | ".join(str(col) for col in df.columns) + " |" |
|
|
separator = "| " + " | ".join(["---"] * len(df.columns)) + " |" |
|
|
rows = ["| " + " | ".join(str(val) for val in row) + " |" for _, row in df.iterrows()] |
|
|
return "\n".join([header, separator] + rows) |
|
|
|
|
|
|
|
|
def download_tables(scores_df: pd.DataFrame, metrics_df: pd.DataFrame) -> str: |
|
|
content = "# Stock Analysis Results\n\n" |
|
|
content += "## Scores Table\n" |
|
|
content += dataframe_to_markdown(scores_df) + "\n\n" |
|
|
content += "## Financial Metrics Table\n" |
|
|
content += dataframe_to_markdown(metrics_df) + "\n" |
|
|
|
|
|
|
|
|
temp_dir = tempfile.gettempdir() |
|
|
path = os.path.join(temp_dir, "stock_analysis_tables.txt") |
|
|
with open(path, "w", encoding="utf-8") as f: |
|
|
f.write(content) |
|
|
return path |
|
|
|
|
|
|
|
|
custom_css = """ |
|
|
.gradio-container { |
|
|
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; |
|
|
} |
|
|
.container { |
|
|
max-width: 1200px; |
|
|
margin: auto; |
|
|
} |
|
|
button#analyze-btn { |
|
|
background-color: #003366; |
|
|
color: white; |
|
|
border: none; |
|
|
} |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
def create_gradio_interface(): |
|
|
with gr.Blocks(theme=gr.themes.Monochrome(), css=custom_css) as iface: |
|
|
gr.Markdown("# Fundamental Financial Analysis") |
|
|
gr.Markdown(""" |
|
|
Enter comma-separated stock tickers and adjust the weights to analyze stocks based on |
|
|
growth potential, value metrics, and risk factors. |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
tickers_input = gr.Textbox( |
|
|
label="Stock Tickers (comma-separated)", |
|
|
placeholder="AAPL, MSFT, GOOG, AMZN, TSLA", |
|
|
lines=1 |
|
|
) |
|
|
analyze_btn = gr.Button("Analyze Stocks", variant="primary") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
growth_weight = gr.Slider(minimum=0, maximum=1, step=0.05, label="Growth Weight", value=0.4) |
|
|
with gr.Column(): |
|
|
value_weight = gr.Slider(minimum=0, maximum=1, step=0.05, label="Value Weight", value=0.4) |
|
|
with gr.Column(): |
|
|
risk_weight = gr.Slider(minimum=0, maximum=1, step=0.05, label="Risk Weight", value=0.2) |
|
|
|
|
|
with gr.Tabs(): |
|
|
with gr.TabItem("Scores & Charts"): |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
scores_output = gr.Dataframe(label="Stock Scores") |
|
|
with gr.Column(): |
|
|
metrics_output = gr.Dataframe(label="Financial Metrics") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
bar_chart_output = gr.Plot(label="Component Scores Chart") |
|
|
with gr.Column(): |
|
|
radar_chart_output = gr.Plot(label="Top Stocks Comparison") |
|
|
|
|
|
|
|
|
download_btn = gr.Button("📥 Download Tables (.txt)", variant="secondary") |
|
|
download_output = gr.File(label="Download") |
|
|
|
|
|
with gr.TabItem("Help & Information"): |
|
|
gr.Markdown(""" |
|
|
## How to Use This Tool |
|
|
|
|
|
1. Enter stock tickers separated by commas (e.g., "AAPL, MSFT, GOOG") |
|
|
2. Adjust weights based on your investment strategy: |
|
|
- **Growth Weight**: Emphasizes revenue growth, EPS growth, ROE, and ROA |
|
|
- **Value Weight**: Focuses on PE ratio, price-to-book, and dividend yield |
|
|
- **Risk Weight**: Considers debt-to-equity ratio, profit margins, and market cap |
|
|
3. Click "Analyze Stocks" to see results |
|
|
|
|
|
## About the Scores |
|
|
|
|
|
All metrics are normalized on a scale of 0-10, with higher being better: |
|
|
- **Growth Score**: Higher values indicate stronger growth potential |
|
|
- **Value Score**: Higher values indicate the stock may be undervalued |
|
|
- **No_Risk_Score**: Higher values suggest lower relative risk |
|
|
- **Total Score**: Weighted average of the three component scores |
|
|
|
|
|
## Data Source |
|
|
|
|
|
Financial data is provided by Yahoo Finance via the yfinance package. |
|
|
""") |
|
|
|
|
|
|
|
|
last_scores = [pd.DataFrame()] |
|
|
last_metrics = [pd.DataFrame()] |
|
|
|
|
|
def analyze_wrapper(*args): |
|
|
scores_df, metrics_df, bar_fig, radar_fig = asyncio.run(analyze_tickers(*args)) |
|
|
last_scores[0] = scores_df |
|
|
last_metrics[0] = metrics_df |
|
|
return scores_df, metrics_df, bar_fig, radar_fig |
|
|
|
|
|
def download_wrapper(): |
|
|
if last_scores[0].empty: |
|
|
return None |
|
|
return download_tables(last_scores[0], last_metrics[0]) |
|
|
|
|
|
analyze_btn.click( |
|
|
analyze_wrapper, |
|
|
inputs=[tickers_input, growth_weight, value_weight, risk_weight], |
|
|
outputs=[scores_output, metrics_output, bar_chart_output, radar_chart_output] |
|
|
) |
|
|
|
|
|
download_btn.click( |
|
|
download_wrapper, |
|
|
inputs=[], |
|
|
outputs=download_output |
|
|
) |
|
|
|
|
|
return iface |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
logger.info("Starting Stock Analyzer app") |
|
|
iface = create_gradio_interface() |
|
|
iface.launch() |