hskwon7's picture
Update modules.py
38bf294 verified
import pandas as pd
import re
import streamlit as st
import numpy as np
import plotly.graph_objects as go
from datetime import datetime, timedelta
import pandas as pd
from yahooquery import Ticker
from typing import List
from requests.exceptions import RetryError
import time
def load_etf_data():
df_etf_info_master = pd.read_csv('etf_general_info_enriched.csv').rename(columns={'ticker': 'Ticker'})
df_etf_holdings = pd.read_csv('etf_holdings_summarized.csv').rename(columns={'ticker': 'Ticker', 'holdingInformation': 'Holdings'})
df_etf_info_master = df_etf_info_master.merge(df_etf_holdings, how='left', on='Ticker')
df_etf, avilable_tickers = set_etf_data(df_etf_info_master)
df_analyst_report = pd.read_csv('etf_analyst_report_full.csv')
df_annual_return_master = pd.read_csv('annual_return.csv').rename(columns={'ticker': 'Ticker'})
return df_etf, df_analyst_report, avilable_tickers, df_annual_return_master
def set_etf_data(df_src):
df = df_src[
(df_src['averageVolume'] > 1000) &
(df_src['exchangeCountry'] == 'United States')
].dropna(subset=['categoryName'])
full_ticker_list = df['Ticker'].unique().tolist()
valid_ticker_set = set(t.upper() for t in full_ticker_list)
return df, valid_ticker_set
# Build a ticker → doc_text lookup
def make_doc_text(row):
parts = []
# helper to append only if the value exists
def add(label, value):
if pd.notna(value) and str(value).strip():
parts.append(f"{label}: {value}" if label else str(value))
add(None, row.shortName)
add(None, row.longName)
add("Issuer", row.family)
add("Category", row.categoryName)
add("Type", row.legalType)
add("Position", row.positionType)
add("Tags", row.otherTags)
add("Return", row.return_rating_text)
add("Risk", row.risk_rating_text)
add("Expense Ratio", row.annualReportExpenseRatio_rating_text)
add("Dividend Yield", row.dividendYield_rating_text)
add(None, row.longBusinessSummary)
add("Holdings", row.holdingInformation)
# join with “. ” so each bit reads like a sentence
return ". ".join(parts)
# Helper: extract and filter ticker spans from tokens + labels
def extract_valid_tickers(tokens, labels, tokenizer, valid_set):
spans, cur = [], []
for tok, lab in zip(tokens, labels):
if lab == "B-TICKER":
if cur:
spans.append(cur)
cur = [tok]
elif lab == "I-TICKER" and cur:
cur.append(tok)
else:
if cur:
spans.append(cur)
cur = []
if cur:
spans.append(cur)
results = []
for span in spans:
word = tokenizer.convert_tokens_to_string(span).strip().upper()
if word in valid_set:
results.append(word)
return results
# Rule-based fallback: catch literal 2–4 char tickers in the text
def rule_fallback(query, valid_set):
words = re.findall(r"\b[A-Za-z0-9]{2,4}\b", query)
return {w.upper() for w in words if w.upper() in valid_set}
def get_cols_to_display() -> List[str]:
"""
Returns the list of raw property names that we want to select
for our recommendations table.
"""
return [
'Ticker',
'categoryName',
'annualReportExpenseRatio',
'previousCloseUSD',
'averageVolumeUSD',
'totalAssetsUSD',
'longName',
'marketCapUSD',
'dividendYield',
'ytdReturn',
'oneMonthReturn',
'threeMonthReturn',
'oneYearReturn',
'threeYearReturn',
'fiveYearReturn',
'tenYearReturn',
'avg_annual_return',
'return_rating',
'risk_rating',
'positionType',
'isLeveraged',
'return_rating_text',
'risk_rating_text',
'annualReportExpenseRatio_rating_text',
'dividendYield_rating_text',
'ytdReturn_rating_text',
'oneMonthReturn_rating_text',
'threeMonthReturn_rating_text',
'oneYearReturn_rating_text',
'threeYearReturn_rating_text',
'fiveYearReturn_rating_text',
'tenYearReturn_rating_text',
'Holdings'
]
def rename_etf_columns(df: pd.DataFrame) -> pd.DataFrame:
"""
Rename DataFrame columns from raw names to display-friendly names.
"""
mapping = {
'ticker': 'Ticker',
'categoryName': 'Category',
'annualReportExpenseRatio': 'Expense Ratio',
'previousCloseUSD': 'Prev. Close',
'averageVolumeUSD': 'Avg. Volume',
'totalAssetsUSD': 'Total Assets',
'longName': 'Full Name',
'marketCapUSD': 'Market Cap.',
'dividendYield': 'Dividend Yield',
'ytdReturn': 'YTD Return',
'oneMonthReturn': '1-month Return',
'threeMonthReturn': '3-month Return',
'oneYearReturn': '1-year Return',
'threeYearReturn': '3-year Return',
'fiveYearReturn': '5-year Return',
'tenYearReturn': '10-year Return',
'avg_annual_return': 'Avg. Annual Return %',
'return_rating': 'Avg. Return Rating (1-10)',
'risk_rating': 'Avg. Risk Rating (1-10)',
'positionType': 'Position Type',
'isLeveraged': 'Leveraged',
'return_rating_text': 'Return Rating',
'risk_rating_text': 'Risk Rating',
'annualReportExpenseRatio_rating_text': 'Expense Ratio Rating',
'dividendYield_rating_text': 'Dividend Yield Rating',
'ytdReturn_rating_text': 'YTD Return Rating',
'oneMonthReturn_rating_text': '1-month Return Rating',
'threeMonthReturn_rating_text': '3-month Return Rating',
'oneYearReturn_rating_text': '1-year Return Rating',
'threeYearReturn_rating_text': '3-year Return Rating',
'fiveYearReturn_rating_text': '5-year Return Rating',
'tenYearReturn_rating_text': '10-year Return Rating',
'holdingInformation': 'Holdings',
}
# Only rename columns that actually exist in df
valid_mapping = {k: v for k, v in mapping.items() if k in df.columns}
return df.rename(columns=valid_mapping)
def get_etf_recommendations_from_list(
list_of_fetched_etfs: List[str],
df_etf: pd.DataFrame,
top_n: int
) -> pd.DataFrame:
"""
Filter the master ETF DataFrame down to the tickers you fetched,
sort by averageVolumeUSD descending, take the top_n rows,
select only the requested raw columns, rename them for display, and return.
Parameters
----------
list_of_fetched_etfs : List[str]
ETF ticker symbols returned by your semantic search.
df_etf : pd.DataFrame
The full ETF DataFrame loaded from Neo4j, with raw property names.
top_n : int
How many of the highest-volume ETFs to return.
Returns
-------
pd.DataFrame
A DataFrame of the top_n ETFs (by avg volume), with only the
selected columns, renamed to friendly display names.
"""
# 1. Keep only the tickers you fetched
df_filtered = df_etf[df_etf['Ticker'].isin(list_of_fetched_etfs)].copy()
# 2. Sort by raw averageVolumeUSD descending
df_sorted = df_filtered.sort_values(by='averageVolumeUSD', ascending=False)
# 3. Take the top_n rows
df_top = df_sorted.head(top_n)
# 4. Select only the columns you asked for
df_selected = df_top[get_cols_to_display()]
# 5. Rename to friendly display names
df_final = rename_etf_columns(df_selected)
return df_final
def format_number_short(x):
"""
Converts a single number to a short format with K (thousands), M (millions),
B (billions), or T (trillions) suffix. Preserves NaN values.
Parameters:
x (float or int): The number to format.
Returns:
str or float: The formatted string if x is a number, or the original NaN.
"""
# If the value is NaN, return it as is
if pd.isna(x):
return x
# Use the absolute value for comparison to handle negative numbers
abs_x = abs(x)
if abs_x < 1e3:
# For values less than 1,000, just return the value formatted to two decimals.
return f"{x:.2f}"
elif abs_x < 1e6:
# For thousands, divide by 1,000 and append 'K'
return f"{x/1e3:.2f}K"
elif abs_x < 1e9:
# For millions, divide by 1,000,000 and append 'M'
return f"{x/1e6:.2f}M"
elif abs_x < 1e12:
# For billions, divide by 1,000,000,000 and append 'B'
return f"{x/1e9:.2f}B"
else:
# For trillions and above, divide by 1,000,000,000,000 and append 'T'
return f"{x/1e12:.2f}T"
def transform_number_columns(df, columns):
"""
Transforms specified numeric columns in a DataFrame to short format strings.
The transformation converts numbers to their respective short formats:
thousands (K), millions (M), billions (B), and trillions (T).
NaN values are preserved.
Parameters:
df (pd.DataFrame): The input DataFrame.
columns (list): List of column names (as strings) to be transformed.
Returns:
pd.DataFrame: A copy of the DataFrame with the specified columns transformed.
"""
# Create a copy of the DataFrame to avoid modifying the original
df_transformed = df.copy()
# Loop through each specified column
for col in columns:
if col in df_transformed.columns:
# Apply the formatting function to each value in the column.
df_transformed[col] = df_transformed[col].apply(format_number_short)
return df_transformed
def transform_float_columns_to_perc(df, columns):
"""
Transforms specified numeric columns in a DataFrame to short format strings.
The transformation converts numbers to their respective short formats:
thousands (K), millions (M), billions (B), and trillions (T).
NaN values are preserved.
Parameters:
df (pd.DataFrame): The input DataFrame.
columns (list): List of column names (as strings) to be transformed.
Returns:
pd.DataFrame: A copy of the DataFrame with the specified columns transformed.
"""
# Create a copy of the DataFrame to avoid modifying the original
df_transformed = df.copy()
# Loop through each specified column
for col in columns:
if col in df_transformed.columns:
# Apply transformation: multiply by 100, format as string, preserve NaNs
df_transformed[col] = df_transformed[col].apply(
lambda x: f"{x * 100:.2f}%" if pd.notna(x) else x
)
return df_transformed
def overview_df(df_recommendations, drop_relavance_score=True):
overview_cols = ["Leveraged", "Ticker", "Full Name", 'Category', 'Country', 'Total Assets', "Prev. Close",
"Avg. Volume", 'Market Cap.']
existing_cols = [col for col in overview_cols if col in df_recommendations.columns]
df_overview = transform_number_columns(df_recommendations[existing_cols], ['Total Assets', 'Market Cap.'])
# df_overview = transform_float_columns_to_perc(df_overview, columns=['Relevance Score'])
# if drop_relavance_score:
# df_overview = df_overview.drop(['Relevance Score'], axis=1)
return df_overview
def transform_return_columns(df, cols=None):
"""
Transforms float values to percentage strings for all columns ending with 'Return'.
For each column in the DataFrame whose name ends with 'Return', the function
multiplies each non-NaN float value by 100 and formats it as a string with two
decimal places followed by a percent sign. NaN values are preserved.
Parameters:
df (pd.DataFrame): The input DataFrame.
Returns:
pd.DataFrame: A copy of the DataFrame with transformed 'Return' columns.
"""
# Create a copy of the DataFrame to avoid modifying the original
df_transformed = df.copy()
# Loop through each column in the DataFrame
for col in df_transformed.columns:
# Check if the column name ends with 'Return'
if col.endswith('Return'):
# Apply transformation: multiply by 100, format as string, preserve NaNs
df_transformed[col] = df_transformed[col].apply(
lambda x: f"{x * 100:.2f}%" if pd.notna(x) else x
)
return df_transformed
def return_df(df_recommendations):
# Returns
returns_cols = [
"Ticker", "Full Name", 'Category', "YTD Return", "1-month Return",
"3-month Return", "1-year Return", "3-year Return",
"5-year Return", "10-year Return"
]
existing_cols = [col for col in returns_cols if col in df_recommendations.columns]
df_return = transform_return_columns(df_recommendations[existing_cols])
return df_return
def clean_ratings_columns(df):
rating_cols = ['YTD Return Rating', '1-month Return Rating', '3-month Return Rating',
'1-year Return Rating', '3-year Return Rating', '5-year Return Rating', '10-year Return Rating',
'Expense Ratio Rating', 'Dividend Yield Rating']
strings_to_keep = ['High', 'Moderate', 'Low']
for col in rating_cols:
if col in df.columns:
df.loc[:, col] = df[col].copy().astype(str).apply(
lambda x: next((s for s in strings_to_keep if s in x), '').strip()
)
return df
def rating_df(df_recommendations):
ratings_cols = [
"Ticker", "Full Name", 'Category', "Avg. Return Rating (1-10)", "Avg. Risk Rating (1-10)",
'Avg. Return Rating', 'Avg. Risk Rating', 'YTD Return Rating', '1-month Return Rating', '3-month Return Rating',
'1-year Return Rating', '3-year Return Rating', '5-year Return Rating', '10-year Return Rating'
]
existing_cols = [col for col in ratings_cols if col in df_recommendations.columns]
df_rating = clean_ratings_columns(df_recommendations[existing_cols])
return df_rating
def expense_ratio_df(df_recommendations):
expenses_cols = ["Ticker", "Full Name", "Category", 'Total Assets', 'Expense Ratio', 'Expense Ratio Rating']
existing_cols = [col for col in expenses_cols if col in df_recommendations.columns]
df_rec_transformed = transform_number_columns(df_recommendations[existing_cols], ['Total Assets'])
df_rec_transformed = transform_float_columns_to_perc(df_rec_transformed, columns=['Expense Ratio'])
df_rec_transformed = clean_ratings_columns(df_rec_transformed)
return df_rec_transformed
def holdings_df(df_recommendations):
holdings_cols = ["Ticker", "Full Name", "Category", "Holdings"]
existing_cols = [col for col in holdings_cols if col in df_recommendations.columns]
return df_recommendations[existing_cols]
def dividend_df(df_recommendations):
dividends_cols = ["Ticker", "Full Name", "Category", "Dividend Yield", "Dividend Yield Rating"]
existing_cols = [col for col in dividends_cols if col in df_recommendations.columns]
df_rec_transformed = clean_ratings_columns(df_recommendations[existing_cols])
df_rec_transformed = transform_float_columns_to_perc(df_rec_transformed, columns=['Dividend Yield'])
return df_rec_transformed
def display_matching_etfs(df_recommendations):
if not df_recommendations.empty:
# st.write("Below are the **most recent ETF recommendations** we found:")
# Create tabs for each column group
tabs = st.tabs(["Overview", "Returns", "Ratings", 'Holdings', 'Expenses', 'Dividends'])
# Overview
with tabs[0]:
st.dataframe(overview_df(df_recommendations), hide_index=True)
# Returns
with tabs[1]:
st.dataframe(return_df(df_recommendations), hide_index=True)
# Ratings
with tabs[2]:
st.dataframe(rating_df(df_recommendations), hide_index=True)
# Holdings
with tabs[3]:
st.dataframe(holdings_df(df_recommendations), hide_index=True)
# Expenses
with tabs[4]:
st.dataframe(expense_ratio_df(df_recommendations), hide_index=True)
# Dividend
with tabs[5]:
st.dataframe(dividend_df(df_recommendations), hide_index=True)
return
def compare_etfs_interactive(etf_a: str, etf_b: str,
max_retries: int = 5,
initial_delay: float = 1.0) -> go.Figure:
"""
Fetches 5-year historical price data for two ETFs from Yahoo Finance,
calculates percentage change from the starting price, and returns a Plotly
figure for interactive viewing in Streamlit.
Retries up to `max_retries` times if Yahoo returns 429s, with exponential back-off.
Parameters:
etf_a, etf_b: ETF tickers
max_retries: how many attempts before giving up
initial_delay: seconds to wait before first retry (doubles each time)
Returns:
plotly.graph_objects.Figure
"""
end_date = datetime.today()
start_date = end_date - timedelta(days=5 * 365)
# 1) Fetch data with retries
delay = initial_delay
for attempt in range(max_retries):
try:
tickers = Ticker(f"{etf_a} {etf_b}", asynchronous=True)
df_full = tickers.history(period="5y", interval="1d").reset_index()
break
except RetryError:
if attempt < max_retries - 1:
time.sleep(delay)
delay *= 2
else:
# final failure
fig = go.Figure()
fig.update_layout(
title="Data fetch failed after multiple attempts. Please try again later."
)
return fig
except Exception as e:
fig = go.Figure()
fig.update_layout(title=f"Error fetching data: {e}")
return fig
# 2) Split & merge
df_a = (
df_full[df_full.symbol == etf_a]
.rename(columns={"adjclose": "Adj Close A"})[["date", "Adj Close A"]]
)
df_b = (
df_full[df_full.symbol == etf_b]
.rename(columns={"adjclose": "Adj Close B"})[["date", "Adj Close B"]]
)
df = pd.merge(df_a, df_b, on="date", how="inner").set_index("date")
# 3) Compute % changes
df["Pct Change A"] = (df["Adj Close A"] / df["Adj Close A"].iloc[0] - 1) * 100
df["Pct Change B"] = (df["Adj Close B"] / df["Adj Close B"].iloc[0] - 1) * 100
# 4) Build figure
fig = go.Figure()
fig.add_trace(go.Scatter(
x=df.index, y=df["Pct Change A"], mode="lines", name=etf_a))
fig.add_trace(go.Scatter(
x=df.index, y=df["Pct Change B"], mode="lines", name=etf_b))
fig.update_layout(
title=f"5-Year Performance Comparison: {etf_a} vs. {etf_b}",
xaxis_title="Date",
yaxis_title="Percentage Change (%)",
hovermode="x unified"
)
fig.update_xaxes(range=[start_date, end_date])
return fig
def trim_to_last_full_sentence(text: str) -> str:
# If it already ends cleanly, just return it
if text.rstrip().endswith(('.', '!', '?')):
return text
# Split on sentence boundaries: punctuation + space + uppercase
pattern = r'(?<=[.!?])\s+(?=[A-Z])'
parts = re.split(pattern, text)
# If we only got one part, nothing to trim
if len(parts) == 1:
return text
# Drop the last (incomplete) fragment and rejoin the rest
full_sentences = parts[:-1]
return ' '.join(full_sentences).strip()
def form_display_comparison_table(df_etf, list_of_parsed_tickers):
cols_interests = ['Ticker', 'longName', 'categoryName', 'previousCloseUSD', 'averageVolumeUSD', 'return_rating', 'risk_rating',
'ytdReturn', 'oneMonthReturn', 'threeMonthReturn', 'oneYearReturn', 'threeYearReturn', 'fiveYearReturn',
'tenYearReturn', 'annualReportExpenseRatio']
cols_interests_pretty = ['Ticker', 'Full Name', 'Category', 'Prev. Close', 'Avg. Volume', 'Return Rating (1-10)', 'Risk Rating (1-10)',
'YTD Return', '1-month Return', '3-month Return', '1-year Return', '3-year Return',
'5-year Return', '10-year Return', 'Expense Ratio']
rename_dict = dict(zip(cols_interests, cols_interests_pretty))
df_comparison = df_etf[df_etf['Ticker'].isin(list_of_parsed_tickers)][cols_interests]
df_comparison = df_comparison.rename(columns=rename_dict)
df_comparison = transform_return_columns(df_comparison)
df_comparison = transform_float_columns_to_perc(df_comparison, columns=['Expense Ratio'])
df_comparison = transform_number_columns(df_comparison, ['Avg. Volume'])
return df_comparison
def portfolio_interactive_chart(df_port_output):
# Create a Plotly figure
fig = go.Figure()
# Plot each ETF's growth as a separate line
for col in df_port_output.columns:
if col not in ["year", "Total"]:
fig.add_trace(go.Scatter(
x=df_port_output["year"],
y=df_port_output[col],
mode='lines',
name=col
))
# Plot the 'Total' line, perhaps in a different style
fig.add_trace(go.Scatter(
x=df_port_output["year"],
y=df_port_output["Total"],
mode='lines',
name="Total",
# line=dict(dash='dash', color='black')
line=dict(dash='dash')
))
fig.update_layout(
title="Portfolio Growth Over Time",
xaxis_title="Year",
yaxis_title="Portfolio Value (USD)",
hovermode='x unified'
)
return fig
def set_estimated_return(tickers, df_general_info, df_annual_return):
"""
Estimate the return for each ticker based on trailing and annual returns.
For each ticker, the function:
1. Extracts trailing return data from df_general_info for the columns:
'oneYearReturn', 'threeYearReturn', 'fiveYearReturn', and 'tenYearReturn'.
2. Replaces NaN values with 0 and calculates the mean of non-zero trailing returns.
3. Retrieves the average annual return from df_annual_return using the 'fundReturn' column.
If 'fundReturn' is NaN, it attempts to use the 'categoryReturn' column instead.
4. Uses the non-zero mean trailing return if available; otherwise, falls back to the annual return.
Parameters:
tickers (iterable): An iterable of ticker symbols to process.
df_general_info (pd.DataFrame): DataFrame containing general information including trailing returns.
df_annual_return (pd.DataFrame): DataFrame containing annual return information.
Returns:
dict: A dictionary mapping each ticker to its estimated return.
"""
# Define the columns that contain the trailing returns in the general info DataFrame.
trailing_returns_cols = ['oneYearReturn', 'threeYearReturn', 'fiveYearReturn', 'tenYearReturn']
# Define the column names for annual return and category-based annual return.
annual_return_col = 'fundReturn'
cat_annual_return_col = 'categoryReturn'
# Dictionary to store the estimated return for each ticker.
# d_est_return = {}
ticker_collected = []
est_return_collected = []
# Loop over each ticker symbol provided in the tickers list.
for ticker in tickers:
# Extract the trailing return values for the current ticker.
trailing_return = df_general_info[df_general_info['Ticker'] == ticker][trailing_returns_cols].values
# Replace any NaN values in the trailing return array with 0.
trailing_return = np.nan_to_num(trailing_return, nan=0)
# Filter out zero values to only consider nonzero trailing returns.
non_zero_elements = trailing_return[trailing_return != 0]
# Calculate the mean of the nonzero trailing returns, if available.
if len(non_zero_elements) > 0:
non_zero_mean_trailing_return = np.mean(non_zero_elements)
else:
non_zero_mean_trailing_return = 0
# Calculate the average annual return from the annual return DataFrame using 'fundReturn'.
avg_return = df_annual_return[df_annual_return['Ticker'] == ticker][annual_return_col].mean()
# If the annual return is NaN, try using the 'categoryReturn' column instead.
if pd.isnull(avg_return):
avg_return = df_annual_return[df_annual_return['Ticker'] == ticker][cat_annual_return_col].mean()
# If still NaN, default to 0.
if pd.isnull(avg_return):
avg_return = 0
# Choose the estimated return:
# If the nonzero trailing mean is 0, use the annual return (avg_return).
# Otherwise, use the nonzero trailing mean.
if non_zero_mean_trailing_return == 0:
est_return_collected.append(avg_return)
# d_est_return[ticker] = avg_return
else:
est_return_collected.append(non_zero_mean_trailing_return)
# d_est_return[ticker] = non_zero_mean_trailing_return
ticker_collected.append(ticker)
df = pd.DataFrame({'etf': ticker_collected, 'estimated_annual_return': est_return_collected})
d = df.to_dict()
return df, d
def form_d_chat_history(result_id, response, task, fig=None, df=None, query=None):
d = {
"id": result_id,
"task": task,
"response": response,
"fig": fig,
"df": df,
"query": query
}
return d
def portfolio_growth_over_time(df, target_years=30):
"""
Calculate the portfolio value over time (yearly) for each asset in the DataFrame.
The DataFrame should have columns:
- 'etf'
- 'initial_investment'
- 'estimated_annual_return' (as percentage string like "10%" or as a decimal)
- 'amount_of_recurring_investments'
Parameters:
df (pd.DataFrame): Input DataFrame with asset details.
target_years (int): Total number of years to project (default is 30).
Returns:
portfolio_data (pd.DataFrame): DataFrame containing the portfolio value for each asset
and the total portfolio value over time.
"""
years = np.arange(0, target_years + 1) # yearly intervals from 0 to target_years
portfolio_data = pd.DataFrame({'year': years})
# Process each asset separately
for idx, row in df.iterrows():
etf = row['etf']
P = row['initial_investment']
recurring = row['amount_of_recurring_investments']
r = row['estimated_annual_return']
# Convert percentage string (if applicable) to a decimal
if isinstance(r, str) and '%' in r:
r = float(r.strip('%')) / 100.0
monthly_rate = r / 12
values = []
for t in years:
months = int(t * 12)
# Future value from the initial investment:
fv_initial = P * (1 + monthly_rate) ** months
# Future value from monthly contributions (annuity formula)
if monthly_rate != 0:
fv_contrib = recurring * (((1 + monthly_rate) ** months - 1) / monthly_rate)
else:
fv_contrib = recurring * months
total_value = fv_initial + fv_contrib
values.append(total_value)
portfolio_data[etf] = values
# Compute total portfolio value (summing each asset)
asset_columns = df['etf'].tolist()
portfolio_data['Total'] = portfolio_data[asset_columns].sum(axis=1)
last_row = portfolio_data.iloc[-1].to_dict()
return portfolio_data, last_row
def run_portfolio_analysis(list_of_parsed_tickers, df_etf, df_annual_return_master):
# Portfolio Analysis configuration
target_years = 30
init_investment = 1000
recur_monthly = 100
df_port_input = pd.DataFrame({'etf': list_of_parsed_tickers,
'initial_investment': [init_investment] * len(list_of_parsed_tickers),
'amount_of_recurring_investments': [recur_monthly] * len(list_of_parsed_tickers)})
df_est_return, d_est_return = set_estimated_return(tickers=list_of_parsed_tickers,
df_general_info=df_etf,
df_annual_return=df_annual_return_master)
df_port_input = df_port_input.merge(df_est_return, how='left', on='etf').fillna(0)
df_port_output, d_summary = portfolio_growth_over_time(df=df_port_input, target_years=target_years)
d_summary['initial_investment_on_each_etf'] = init_investment
d_summary['recurring_monthly_investment'] = recur_monthly
d_summary['estimated_annual_return'] = d_est_return
return df_port_output, d_summary
def format_portfolio_summary(d_summary: dict) -> str:
"""
Given a summary dict with keys:
- 'year'
- one key per ETF with its final value
- 'Total'
- 'initial_investment_on_each_etf'
- 'recurring_monthly_investment'
- 'estimated_annual_return': {
'etf': { idx: ticker, … },
'estimated_annual_return': { idx: float_return, … }
}
Returns a Markdown string like:
**Portfolio Summary**:
After a 30-year projection …
"""
# 1) Core numbers
year = int(d_summary.get("year", 0))
initial = d_summary.get("initial_investment_on_each_etf", 0)
recurring = d_summary.get("recurring_monthly_investment", 0)
total = d_summary.get("Total", 0)
# 2) Extract just the ETF final values
skip = {
"year", "Total",
"initial_investment_on_each_etf",
"recurring_monthly_investment",
"estimated_annual_return"
}
etf_values = {
k: v for k, v in d_summary.items()
if k not in skip
}
# 3) Sort ETFs by final value descending
sorted_etfs = sorted(
etf_values.items(),
key=lambda kv: kv[1],
reverse=True
)
# 4) Map tickers → annual returns
e = d_summary.get("estimated_annual_return", {})
idx_to_ticker = e.get("etf", {})
idx_to_ret = e.get("estimated_annual_return", {})
ticker_to_ret = {
idx_to_ticker[i]: idx_to_ret[i] * 100
for i in idx_to_ticker
if i in idx_to_ret
}
# 5) Build lines
lines = []
lines.append("**Portfolio Summary**:\n")
lines.append(
f"After **a {year}-year** projection, the final amounts for each ETF in your portfolio are as follows:\n"
)
# 6) ETF bullets
for ticker, val in sorted_etfs:
lines.append(f"- {ticker}: {val:,.0f} USD")
lines.append(f"- **Total Portfolio Value: {total:,.0f} USD**\n")
# 7) Investment details
lines.append(
f"These amounts are calculated based on an initial investment of "
f"**{initial:,} USD** for each ETF, with a recurring monthly investment "
f"of **{recurring:,} USD**. The estimated annual returns for each ETF "
f"are as follows:\n"
)
# 8) Return bullets
for ticker, _ in sorted_etfs:
ret = ticker_to_ret.get(ticker, 0.0)
lines.append(f"- {ticker}: {ret:.2f}%")
lines.append("")
# 9) Closing sentence
lines.append(
f"The growth of the investments is influenced by the compounding effect "
f"of the recurring investments and the estimated annual returns over the "
f"{year}-year period."
)
return "\n".join(lines)
def clean_ocr_etf_text(text: str) -> str:
"""
Cleans and formats OCR-parsed ETF text by:
- Removing excessive newlines and spaces
- Fixing line-break hyphenations
- Normalizing whitespace and punctuation
- Removing wrapping quotes
"""
# Remove leading/trailing whitespace and outer quotes if present
text = text.strip().strip('"').strip("'")
# Fix hyphenated line breaks (e.g., 'NASDAQ-\n100' -> 'NASDAQ-100')
text = re.sub(r'-\s*\n\s*', '-', text)
# Replace remaining line breaks with spaces
text = re.sub(r'[\n\r]+', ' ', text)
# Remove excessive spaces
text = re.sub(r'\s{2,}', ' ', text)
# Ensure proper spacing after periods, commas, etc.
text = re.sub(r'([.,!?])([^\s])', r'\1 \2', text)
# Capitalize the first letter if needed
if text and text[0].islower():
text = text[0].upper() + text[1:]
return text.strip()
def lookup_etf_report(tickers, df_analyst_report):
d_reports = {}
# Get a value of description column in df_analyst_report by iterating tickers
for ticker in tickers:
description = df_analyst_report[df_analyst_report['Ticker'] == ticker]['description'].values
if len(description) > 0:
d_reports[ticker] = clean_ocr_etf_text(description[0])
return d_reports
def format_insights_report(d_report: dict) -> str:
"""
Given a dict mapping ETF tickers to their analysis text, returns
a Markdown-formatted Insights Report.
Example output:
**Insights Report**:
**QQQ:**
<<analysis report for QQQ>>
**SPY:**
<<analysis report for SPY>>
"""
lines = ["**Insights Report**:\n"]
for ticker, report in d_report.items():
# Section header for each ticker
lines.append(f"**{ticker}**: ")
# The report text (preserve any internal newlines)
lines.append(report.strip())
# Blank line between entries
lines.append("")
# Join with newlines
return "\n".join(lines).strip()
def format_etf_search_results(tickers: list[str]) -> str:
"""
Given a list of ETF tickers, returns a Markdown-formatted string:
**ETF Search Results**:
- TICKER1
- TICKER2
...
- TICKER5
... and N more. Check the full results in the table.
"""
header = "**ETF Search Results**:\n"
# Take up to 5
display_list = tickers[:5]
lines = [header]
for t in display_list:
lines.append(f"- {t}")
remaining = len(tickers) - len(display_list)
if remaining > 0:
lines.append(f"... and {remaining} more. Check the full results in the table!")
return "\n".join(lines)
def format_etf_search_results_inline(tickers: list[str], max_display: int = 5) -> str:
"""
Given a list of ETF tickers, returns a one-line Markdown summary:
**ETF Search Results**: SPY, QQQ, BND, GLD, IWM, and more. Check the full results in the table above!
"""
displayed = tickers[:max_display]
remaining = len(tickers) - len(displayed)
# Join the displayed tickers with commas
tickers_str = ", ".join(displayed)
if remaining > 0:
return (
f"**ETF Search Results**: I've found {tickers_str}, and many more, which I believe align with your interests."
" Check the full results in the table above!"
)
else:
return f"**ETF Search Results**: {tickers_str}."