import pandas as pd import re import streamlit as st import numpy as np import plotly.graph_objects as go from datetime import datetime, timedelta import pandas as pd from yahooquery import Ticker from typing import List from requests.exceptions import RetryError import time def load_etf_data(): df_etf_info_master = pd.read_csv('etf_general_info_enriched.csv').rename(columns={'ticker': 'Ticker'}) df_etf_holdings = pd.read_csv('etf_holdings_summarized.csv').rename(columns={'ticker': 'Ticker', 'holdingInformation': 'Holdings'}) df_etf_info_master = df_etf_info_master.merge(df_etf_holdings, how='left', on='Ticker') df_etf, avilable_tickers = set_etf_data(df_etf_info_master) df_analyst_report = pd.read_csv('etf_analyst_report_full.csv') df_annual_return_master = pd.read_csv('annual_return.csv').rename(columns={'ticker': 'Ticker'}) return df_etf, df_analyst_report, avilable_tickers, df_annual_return_master def set_etf_data(df_src): df = df_src[ (df_src['averageVolume'] > 1000) & (df_src['exchangeCountry'] == 'United States') ].dropna(subset=['categoryName']) full_ticker_list = df['Ticker'].unique().tolist() valid_ticker_set = set(t.upper() for t in full_ticker_list) return df, valid_ticker_set # Build a ticker → doc_text lookup def make_doc_text(row): parts = [] # helper to append only if the value exists def add(label, value): if pd.notna(value) and str(value).strip(): parts.append(f"{label}: {value}" if label else str(value)) add(None, row.shortName) add(None, row.longName) add("Issuer", row.family) add("Category", row.categoryName) add("Type", row.legalType) add("Position", row.positionType) add("Tags", row.otherTags) add("Return", row.return_rating_text) add("Risk", row.risk_rating_text) add("Expense Ratio", row.annualReportExpenseRatio_rating_text) add("Dividend Yield", row.dividendYield_rating_text) add(None, row.longBusinessSummary) add("Holdings", row.holdingInformation) # join with “. ” so each bit reads like a sentence return ". ".join(parts) # Helper: extract and filter ticker spans from tokens + labels def extract_valid_tickers(tokens, labels, tokenizer, valid_set): spans, cur = [], [] for tok, lab in zip(tokens, labels): if lab == "B-TICKER": if cur: spans.append(cur) cur = [tok] elif lab == "I-TICKER" and cur: cur.append(tok) else: if cur: spans.append(cur) cur = [] if cur: spans.append(cur) results = [] for span in spans: word = tokenizer.convert_tokens_to_string(span).strip().upper() if word in valid_set: results.append(word) return results # Rule-based fallback: catch literal 2–4 char tickers in the text def rule_fallback(query, valid_set): words = re.findall(r"\b[A-Za-z0-9]{2,4}\b", query) return {w.upper() for w in words if w.upper() in valid_set} def get_cols_to_display() -> List[str]: """ Returns the list of raw property names that we want to select for our recommendations table. """ return [ 'Ticker', 'categoryName', 'annualReportExpenseRatio', 'previousCloseUSD', 'averageVolumeUSD', 'totalAssetsUSD', 'longName', 'marketCapUSD', 'dividendYield', 'ytdReturn', 'oneMonthReturn', 'threeMonthReturn', 'oneYearReturn', 'threeYearReturn', 'fiveYearReturn', 'tenYearReturn', 'avg_annual_return', 'return_rating', 'risk_rating', 'positionType', 'isLeveraged', 'return_rating_text', 'risk_rating_text', 'annualReportExpenseRatio_rating_text', 'dividendYield_rating_text', 'ytdReturn_rating_text', 'oneMonthReturn_rating_text', 'threeMonthReturn_rating_text', 'oneYearReturn_rating_text', 'threeYearReturn_rating_text', 'fiveYearReturn_rating_text', 'tenYearReturn_rating_text', 'Holdings' ] def rename_etf_columns(df: pd.DataFrame) -> pd.DataFrame: """ Rename DataFrame columns from raw names to display-friendly names. """ mapping = { 'ticker': 'Ticker', 'categoryName': 'Category', 'annualReportExpenseRatio': 'Expense Ratio', 'previousCloseUSD': 'Prev. Close', 'averageVolumeUSD': 'Avg. Volume', 'totalAssetsUSD': 'Total Assets', 'longName': 'Full Name', 'marketCapUSD': 'Market Cap.', 'dividendYield': 'Dividend Yield', 'ytdReturn': 'YTD Return', 'oneMonthReturn': '1-month Return', 'threeMonthReturn': '3-month Return', 'oneYearReturn': '1-year Return', 'threeYearReturn': '3-year Return', 'fiveYearReturn': '5-year Return', 'tenYearReturn': '10-year Return', 'avg_annual_return': 'Avg. Annual Return %', 'return_rating': 'Avg. Return Rating (1-10)', 'risk_rating': 'Avg. Risk Rating (1-10)', 'positionType': 'Position Type', 'isLeveraged': 'Leveraged', 'return_rating_text': 'Return Rating', 'risk_rating_text': 'Risk Rating', 'annualReportExpenseRatio_rating_text': 'Expense Ratio Rating', 'dividendYield_rating_text': 'Dividend Yield Rating', 'ytdReturn_rating_text': 'YTD Return Rating', 'oneMonthReturn_rating_text': '1-month Return Rating', 'threeMonthReturn_rating_text': '3-month Return Rating', 'oneYearReturn_rating_text': '1-year Return Rating', 'threeYearReturn_rating_text': '3-year Return Rating', 'fiveYearReturn_rating_text': '5-year Return Rating', 'tenYearReturn_rating_text': '10-year Return Rating', 'holdingInformation': 'Holdings', } # Only rename columns that actually exist in df valid_mapping = {k: v for k, v in mapping.items() if k in df.columns} return df.rename(columns=valid_mapping) def get_etf_recommendations_from_list( list_of_fetched_etfs: List[str], df_etf: pd.DataFrame, top_n: int ) -> pd.DataFrame: """ Filter the master ETF DataFrame down to the tickers you fetched, sort by averageVolumeUSD descending, take the top_n rows, select only the requested raw columns, rename them for display, and return. Parameters ---------- list_of_fetched_etfs : List[str] ETF ticker symbols returned by your semantic search. df_etf : pd.DataFrame The full ETF DataFrame loaded from Neo4j, with raw property names. top_n : int How many of the highest-volume ETFs to return. Returns ------- pd.DataFrame A DataFrame of the top_n ETFs (by avg volume), with only the selected columns, renamed to friendly display names. """ # 1. Keep only the tickers you fetched df_filtered = df_etf[df_etf['Ticker'].isin(list_of_fetched_etfs)].copy() # 2. Sort by raw averageVolumeUSD descending df_sorted = df_filtered.sort_values(by='averageVolumeUSD', ascending=False) # 3. Take the top_n rows df_top = df_sorted.head(top_n) # 4. Select only the columns you asked for df_selected = df_top[get_cols_to_display()] # 5. Rename to friendly display names df_final = rename_etf_columns(df_selected) return df_final def format_number_short(x): """ Converts a single number to a short format with K (thousands), M (millions), B (billions), or T (trillions) suffix. Preserves NaN values. Parameters: x (float or int): The number to format. Returns: str or float: The formatted string if x is a number, or the original NaN. """ # If the value is NaN, return it as is if pd.isna(x): return x # Use the absolute value for comparison to handle negative numbers abs_x = abs(x) if abs_x < 1e3: # For values less than 1,000, just return the value formatted to two decimals. return f"{x:.2f}" elif abs_x < 1e6: # For thousands, divide by 1,000 and append 'K' return f"{x/1e3:.2f}K" elif abs_x < 1e9: # For millions, divide by 1,000,000 and append 'M' return f"{x/1e6:.2f}M" elif abs_x < 1e12: # For billions, divide by 1,000,000,000 and append 'B' return f"{x/1e9:.2f}B" else: # For trillions and above, divide by 1,000,000,000,000 and append 'T' return f"{x/1e12:.2f}T" def transform_number_columns(df, columns): """ Transforms specified numeric columns in a DataFrame to short format strings. The transformation converts numbers to their respective short formats: thousands (K), millions (M), billions (B), and trillions (T). NaN values are preserved. Parameters: df (pd.DataFrame): The input DataFrame. columns (list): List of column names (as strings) to be transformed. Returns: pd.DataFrame: A copy of the DataFrame with the specified columns transformed. """ # Create a copy of the DataFrame to avoid modifying the original df_transformed = df.copy() # Loop through each specified column for col in columns: if col in df_transformed.columns: # Apply the formatting function to each value in the column. df_transformed[col] = df_transformed[col].apply(format_number_short) return df_transformed def transform_float_columns_to_perc(df, columns): """ Transforms specified numeric columns in a DataFrame to short format strings. The transformation converts numbers to their respective short formats: thousands (K), millions (M), billions (B), and trillions (T). NaN values are preserved. Parameters: df (pd.DataFrame): The input DataFrame. columns (list): List of column names (as strings) to be transformed. Returns: pd.DataFrame: A copy of the DataFrame with the specified columns transformed. """ # Create a copy of the DataFrame to avoid modifying the original df_transformed = df.copy() # Loop through each specified column for col in columns: if col in df_transformed.columns: # Apply transformation: multiply by 100, format as string, preserve NaNs df_transformed[col] = df_transformed[col].apply( lambda x: f"{x * 100:.2f}%" if pd.notna(x) else x ) return df_transformed def overview_df(df_recommendations, drop_relavance_score=True): overview_cols = ["Leveraged", "Ticker", "Full Name", 'Category', 'Country', 'Total Assets', "Prev. Close", "Avg. Volume", 'Market Cap.'] existing_cols = [col for col in overview_cols if col in df_recommendations.columns] df_overview = transform_number_columns(df_recommendations[existing_cols], ['Total Assets', 'Market Cap.']) # df_overview = transform_float_columns_to_perc(df_overview, columns=['Relevance Score']) # if drop_relavance_score: # df_overview = df_overview.drop(['Relevance Score'], axis=1) return df_overview def transform_return_columns(df, cols=None): """ Transforms float values to percentage strings for all columns ending with 'Return'. For each column in the DataFrame whose name ends with 'Return', the function multiplies each non-NaN float value by 100 and formats it as a string with two decimal places followed by a percent sign. NaN values are preserved. Parameters: df (pd.DataFrame): The input DataFrame. Returns: pd.DataFrame: A copy of the DataFrame with transformed 'Return' columns. """ # Create a copy of the DataFrame to avoid modifying the original df_transformed = df.copy() # Loop through each column in the DataFrame for col in df_transformed.columns: # Check if the column name ends with 'Return' if col.endswith('Return'): # Apply transformation: multiply by 100, format as string, preserve NaNs df_transformed[col] = df_transformed[col].apply( lambda x: f"{x * 100:.2f}%" if pd.notna(x) else x ) return df_transformed def return_df(df_recommendations): # Returns returns_cols = [ "Ticker", "Full Name", 'Category', "YTD Return", "1-month Return", "3-month Return", "1-year Return", "3-year Return", "5-year Return", "10-year Return" ] existing_cols = [col for col in returns_cols if col in df_recommendations.columns] df_return = transform_return_columns(df_recommendations[existing_cols]) return df_return def clean_ratings_columns(df): rating_cols = ['YTD Return Rating', '1-month Return Rating', '3-month Return Rating', '1-year Return Rating', '3-year Return Rating', '5-year Return Rating', '10-year Return Rating', 'Expense Ratio Rating', 'Dividend Yield Rating'] strings_to_keep = ['High', 'Moderate', 'Low'] for col in rating_cols: if col in df.columns: df.loc[:, col] = df[col].copy().astype(str).apply( lambda x: next((s for s in strings_to_keep if s in x), '').strip() ) return df def rating_df(df_recommendations): ratings_cols = [ "Ticker", "Full Name", 'Category', "Avg. Return Rating (1-10)", "Avg. Risk Rating (1-10)", 'Avg. Return Rating', 'Avg. Risk Rating', 'YTD Return Rating', '1-month Return Rating', '3-month Return Rating', '1-year Return Rating', '3-year Return Rating', '5-year Return Rating', '10-year Return Rating' ] existing_cols = [col for col in ratings_cols if col in df_recommendations.columns] df_rating = clean_ratings_columns(df_recommendations[existing_cols]) return df_rating def expense_ratio_df(df_recommendations): expenses_cols = ["Ticker", "Full Name", "Category", 'Total Assets', 'Expense Ratio', 'Expense Ratio Rating'] existing_cols = [col for col in expenses_cols if col in df_recommendations.columns] df_rec_transformed = transform_number_columns(df_recommendations[existing_cols], ['Total Assets']) df_rec_transformed = transform_float_columns_to_perc(df_rec_transformed, columns=['Expense Ratio']) df_rec_transformed = clean_ratings_columns(df_rec_transformed) return df_rec_transformed def holdings_df(df_recommendations): holdings_cols = ["Ticker", "Full Name", "Category", "Holdings"] existing_cols = [col for col in holdings_cols if col in df_recommendations.columns] return df_recommendations[existing_cols] def dividend_df(df_recommendations): dividends_cols = ["Ticker", "Full Name", "Category", "Dividend Yield", "Dividend Yield Rating"] existing_cols = [col for col in dividends_cols if col in df_recommendations.columns] df_rec_transformed = clean_ratings_columns(df_recommendations[existing_cols]) df_rec_transformed = transform_float_columns_to_perc(df_rec_transformed, columns=['Dividend Yield']) return df_rec_transformed def display_matching_etfs(df_recommendations): if not df_recommendations.empty: # st.write("Below are the **most recent ETF recommendations** we found:") # Create tabs for each column group tabs = st.tabs(["Overview", "Returns", "Ratings", 'Holdings', 'Expenses', 'Dividends']) # Overview with tabs[0]: st.dataframe(overview_df(df_recommendations), hide_index=True) # Returns with tabs[1]: st.dataframe(return_df(df_recommendations), hide_index=True) # Ratings with tabs[2]: st.dataframe(rating_df(df_recommendations), hide_index=True) # Holdings with tabs[3]: st.dataframe(holdings_df(df_recommendations), hide_index=True) # Expenses with tabs[4]: st.dataframe(expense_ratio_df(df_recommendations), hide_index=True) # Dividend with tabs[5]: st.dataframe(dividend_df(df_recommendations), hide_index=True) return def compare_etfs_interactive(etf_a: str, etf_b: str, max_retries: int = 5, initial_delay: float = 1.0) -> go.Figure: """ Fetches 5-year historical price data for two ETFs from Yahoo Finance, calculates percentage change from the starting price, and returns a Plotly figure for interactive viewing in Streamlit. Retries up to `max_retries` times if Yahoo returns 429s, with exponential back-off. Parameters: etf_a, etf_b: ETF tickers max_retries: how many attempts before giving up initial_delay: seconds to wait before first retry (doubles each time) Returns: plotly.graph_objects.Figure """ end_date = datetime.today() start_date = end_date - timedelta(days=5 * 365) # 1) Fetch data with retries delay = initial_delay for attempt in range(max_retries): try: tickers = Ticker(f"{etf_a} {etf_b}", asynchronous=True) df_full = tickers.history(period="5y", interval="1d").reset_index() break except RetryError: if attempt < max_retries - 1: time.sleep(delay) delay *= 2 else: # final failure fig = go.Figure() fig.update_layout( title="Data fetch failed after multiple attempts. Please try again later." ) return fig except Exception as e: fig = go.Figure() fig.update_layout(title=f"Error fetching data: {e}") return fig # 2) Split & merge df_a = ( df_full[df_full.symbol == etf_a] .rename(columns={"adjclose": "Adj Close A"})[["date", "Adj Close A"]] ) df_b = ( df_full[df_full.symbol == etf_b] .rename(columns={"adjclose": "Adj Close B"})[["date", "Adj Close B"]] ) df = pd.merge(df_a, df_b, on="date", how="inner").set_index("date") # 3) Compute % changes df["Pct Change A"] = (df["Adj Close A"] / df["Adj Close A"].iloc[0] - 1) * 100 df["Pct Change B"] = (df["Adj Close B"] / df["Adj Close B"].iloc[0] - 1) * 100 # 4) Build figure fig = go.Figure() fig.add_trace(go.Scatter( x=df.index, y=df["Pct Change A"], mode="lines", name=etf_a)) fig.add_trace(go.Scatter( x=df.index, y=df["Pct Change B"], mode="lines", name=etf_b)) fig.update_layout( title=f"5-Year Performance Comparison: {etf_a} vs. {etf_b}", xaxis_title="Date", yaxis_title="Percentage Change (%)", hovermode="x unified" ) fig.update_xaxes(range=[start_date, end_date]) return fig def trim_to_last_full_sentence(text: str) -> str: # If it already ends cleanly, just return it if text.rstrip().endswith(('.', '!', '?')): return text # Split on sentence boundaries: punctuation + space + uppercase pattern = r'(?<=[.!?])\s+(?=[A-Z])' parts = re.split(pattern, text) # If we only got one part, nothing to trim if len(parts) == 1: return text # Drop the last (incomplete) fragment and rejoin the rest full_sentences = parts[:-1] return ' '.join(full_sentences).strip() def form_display_comparison_table(df_etf, list_of_parsed_tickers): cols_interests = ['Ticker', 'longName', 'categoryName', 'previousCloseUSD', 'averageVolumeUSD', 'return_rating', 'risk_rating', 'ytdReturn', 'oneMonthReturn', 'threeMonthReturn', 'oneYearReturn', 'threeYearReturn', 'fiveYearReturn', 'tenYearReturn', 'annualReportExpenseRatio'] cols_interests_pretty = ['Ticker', 'Full Name', 'Category', 'Prev. Close', 'Avg. Volume', 'Return Rating (1-10)', 'Risk Rating (1-10)', 'YTD Return', '1-month Return', '3-month Return', '1-year Return', '3-year Return', '5-year Return', '10-year Return', 'Expense Ratio'] rename_dict = dict(zip(cols_interests, cols_interests_pretty)) df_comparison = df_etf[df_etf['Ticker'].isin(list_of_parsed_tickers)][cols_interests] df_comparison = df_comparison.rename(columns=rename_dict) df_comparison = transform_return_columns(df_comparison) df_comparison = transform_float_columns_to_perc(df_comparison, columns=['Expense Ratio']) df_comparison = transform_number_columns(df_comparison, ['Avg. Volume']) return df_comparison def portfolio_interactive_chart(df_port_output): # Create a Plotly figure fig = go.Figure() # Plot each ETF's growth as a separate line for col in df_port_output.columns: if col not in ["year", "Total"]: fig.add_trace(go.Scatter( x=df_port_output["year"], y=df_port_output[col], mode='lines', name=col )) # Plot the 'Total' line, perhaps in a different style fig.add_trace(go.Scatter( x=df_port_output["year"], y=df_port_output["Total"], mode='lines', name="Total", # line=dict(dash='dash', color='black') line=dict(dash='dash') )) fig.update_layout( title="Portfolio Growth Over Time", xaxis_title="Year", yaxis_title="Portfolio Value (USD)", hovermode='x unified' ) return fig def set_estimated_return(tickers, df_general_info, df_annual_return): """ Estimate the return for each ticker based on trailing and annual returns. For each ticker, the function: 1. Extracts trailing return data from df_general_info for the columns: 'oneYearReturn', 'threeYearReturn', 'fiveYearReturn', and 'tenYearReturn'. 2. Replaces NaN values with 0 and calculates the mean of non-zero trailing returns. 3. Retrieves the average annual return from df_annual_return using the 'fundReturn' column. If 'fundReturn' is NaN, it attempts to use the 'categoryReturn' column instead. 4. Uses the non-zero mean trailing return if available; otherwise, falls back to the annual return. Parameters: tickers (iterable): An iterable of ticker symbols to process. df_general_info (pd.DataFrame): DataFrame containing general information including trailing returns. df_annual_return (pd.DataFrame): DataFrame containing annual return information. Returns: dict: A dictionary mapping each ticker to its estimated return. """ # Define the columns that contain the trailing returns in the general info DataFrame. trailing_returns_cols = ['oneYearReturn', 'threeYearReturn', 'fiveYearReturn', 'tenYearReturn'] # Define the column names for annual return and category-based annual return. annual_return_col = 'fundReturn' cat_annual_return_col = 'categoryReturn' # Dictionary to store the estimated return for each ticker. # d_est_return = {} ticker_collected = [] est_return_collected = [] # Loop over each ticker symbol provided in the tickers list. for ticker in tickers: # Extract the trailing return values for the current ticker. trailing_return = df_general_info[df_general_info['Ticker'] == ticker][trailing_returns_cols].values # Replace any NaN values in the trailing return array with 0. trailing_return = np.nan_to_num(trailing_return, nan=0) # Filter out zero values to only consider nonzero trailing returns. non_zero_elements = trailing_return[trailing_return != 0] # Calculate the mean of the nonzero trailing returns, if available. if len(non_zero_elements) > 0: non_zero_mean_trailing_return = np.mean(non_zero_elements) else: non_zero_mean_trailing_return = 0 # Calculate the average annual return from the annual return DataFrame using 'fundReturn'. avg_return = df_annual_return[df_annual_return['Ticker'] == ticker][annual_return_col].mean() # If the annual return is NaN, try using the 'categoryReturn' column instead. if pd.isnull(avg_return): avg_return = df_annual_return[df_annual_return['Ticker'] == ticker][cat_annual_return_col].mean() # If still NaN, default to 0. if pd.isnull(avg_return): avg_return = 0 # Choose the estimated return: # If the nonzero trailing mean is 0, use the annual return (avg_return). # Otherwise, use the nonzero trailing mean. if non_zero_mean_trailing_return == 0: est_return_collected.append(avg_return) # d_est_return[ticker] = avg_return else: est_return_collected.append(non_zero_mean_trailing_return) # d_est_return[ticker] = non_zero_mean_trailing_return ticker_collected.append(ticker) df = pd.DataFrame({'etf': ticker_collected, 'estimated_annual_return': est_return_collected}) d = df.to_dict() return df, d def form_d_chat_history(result_id, response, task, fig=None, df=None, query=None): d = { "id": result_id, "task": task, "response": response, "fig": fig, "df": df, "query": query } return d def portfolio_growth_over_time(df, target_years=30): """ Calculate the portfolio value over time (yearly) for each asset in the DataFrame. The DataFrame should have columns: - 'etf' - 'initial_investment' - 'estimated_annual_return' (as percentage string like "10%" or as a decimal) - 'amount_of_recurring_investments' Parameters: df (pd.DataFrame): Input DataFrame with asset details. target_years (int): Total number of years to project (default is 30). Returns: portfolio_data (pd.DataFrame): DataFrame containing the portfolio value for each asset and the total portfolio value over time. """ years = np.arange(0, target_years + 1) # yearly intervals from 0 to target_years portfolio_data = pd.DataFrame({'year': years}) # Process each asset separately for idx, row in df.iterrows(): etf = row['etf'] P = row['initial_investment'] recurring = row['amount_of_recurring_investments'] r = row['estimated_annual_return'] # Convert percentage string (if applicable) to a decimal if isinstance(r, str) and '%' in r: r = float(r.strip('%')) / 100.0 monthly_rate = r / 12 values = [] for t in years: months = int(t * 12) # Future value from the initial investment: fv_initial = P * (1 + monthly_rate) ** months # Future value from monthly contributions (annuity formula) if monthly_rate != 0: fv_contrib = recurring * (((1 + monthly_rate) ** months - 1) / monthly_rate) else: fv_contrib = recurring * months total_value = fv_initial + fv_contrib values.append(total_value) portfolio_data[etf] = values # Compute total portfolio value (summing each asset) asset_columns = df['etf'].tolist() portfolio_data['Total'] = portfolio_data[asset_columns].sum(axis=1) last_row = portfolio_data.iloc[-1].to_dict() return portfolio_data, last_row def run_portfolio_analysis(list_of_parsed_tickers, df_etf, df_annual_return_master): # Portfolio Analysis configuration target_years = 30 init_investment = 1000 recur_monthly = 100 df_port_input = pd.DataFrame({'etf': list_of_parsed_tickers, 'initial_investment': [init_investment] * len(list_of_parsed_tickers), 'amount_of_recurring_investments': [recur_monthly] * len(list_of_parsed_tickers)}) df_est_return, d_est_return = set_estimated_return(tickers=list_of_parsed_tickers, df_general_info=df_etf, df_annual_return=df_annual_return_master) df_port_input = df_port_input.merge(df_est_return, how='left', on='etf').fillna(0) df_port_output, d_summary = portfolio_growth_over_time(df=df_port_input, target_years=target_years) d_summary['initial_investment_on_each_etf'] = init_investment d_summary['recurring_monthly_investment'] = recur_monthly d_summary['estimated_annual_return'] = d_est_return return df_port_output, d_summary def format_portfolio_summary(d_summary: dict) -> str: """ Given a summary dict with keys: - 'year' - one key per ETF with its final value - 'Total' - 'initial_investment_on_each_etf' - 'recurring_monthly_investment' - 'estimated_annual_return': { 'etf': { idx: ticker, … }, 'estimated_annual_return': { idx: float_return, … } } Returns a Markdown string like: **Portfolio Summary**: After a 30-year projection … … """ # 1) Core numbers year = int(d_summary.get("year", 0)) initial = d_summary.get("initial_investment_on_each_etf", 0) recurring = d_summary.get("recurring_monthly_investment", 0) total = d_summary.get("Total", 0) # 2) Extract just the ETF final values skip = { "year", "Total", "initial_investment_on_each_etf", "recurring_monthly_investment", "estimated_annual_return" } etf_values = { k: v for k, v in d_summary.items() if k not in skip } # 3) Sort ETFs by final value descending sorted_etfs = sorted( etf_values.items(), key=lambda kv: kv[1], reverse=True ) # 4) Map tickers → annual returns e = d_summary.get("estimated_annual_return", {}) idx_to_ticker = e.get("etf", {}) idx_to_ret = e.get("estimated_annual_return", {}) ticker_to_ret = { idx_to_ticker[i]: idx_to_ret[i] * 100 for i in idx_to_ticker if i in idx_to_ret } # 5) Build lines lines = [] lines.append("**Portfolio Summary**:\n") lines.append( f"After **a {year}-year** projection, the final amounts for each ETF in your portfolio are as follows:\n" ) # 6) ETF bullets for ticker, val in sorted_etfs: lines.append(f"- {ticker}: {val:,.0f} USD") lines.append(f"- **Total Portfolio Value: {total:,.0f} USD**\n") # 7) Investment details lines.append( f"These amounts are calculated based on an initial investment of " f"**{initial:,} USD** for each ETF, with a recurring monthly investment " f"of **{recurring:,} USD**. The estimated annual returns for each ETF " f"are as follows:\n" ) # 8) Return bullets for ticker, _ in sorted_etfs: ret = ticker_to_ret.get(ticker, 0.0) lines.append(f"- {ticker}: {ret:.2f}%") lines.append("") # 9) Closing sentence lines.append( f"The growth of the investments is influenced by the compounding effect " f"of the recurring investments and the estimated annual returns over the " f"{year}-year period." ) return "\n".join(lines) def clean_ocr_etf_text(text: str) -> str: """ Cleans and formats OCR-parsed ETF text by: - Removing excessive newlines and spaces - Fixing line-break hyphenations - Normalizing whitespace and punctuation - Removing wrapping quotes """ # Remove leading/trailing whitespace and outer quotes if present text = text.strip().strip('"').strip("'") # Fix hyphenated line breaks (e.g., 'NASDAQ-\n100' -> 'NASDAQ-100') text = re.sub(r'-\s*\n\s*', '-', text) # Replace remaining line breaks with spaces text = re.sub(r'[\n\r]+', ' ', text) # Remove excessive spaces text = re.sub(r'\s{2,}', ' ', text) # Ensure proper spacing after periods, commas, etc. text = re.sub(r'([.,!?])([^\s])', r'\1 \2', text) # Capitalize the first letter if needed if text and text[0].islower(): text = text[0].upper() + text[1:] return text.strip() def lookup_etf_report(tickers, df_analyst_report): d_reports = {} # Get a value of description column in df_analyst_report by iterating tickers for ticker in tickers: description = df_analyst_report[df_analyst_report['Ticker'] == ticker]['description'].values if len(description) > 0: d_reports[ticker] = clean_ocr_etf_text(description[0]) return d_reports def format_insights_report(d_report: dict) -> str: """ Given a dict mapping ETF tickers to their analysis text, returns a Markdown-formatted Insights Report. Example output: **Insights Report**: **QQQ:** <> **SPY:** <> """ lines = ["**Insights Report**:\n"] for ticker, report in d_report.items(): # Section header for each ticker lines.append(f"**{ticker}**: ") # The report text (preserve any internal newlines) lines.append(report.strip()) # Blank line between entries lines.append("") # Join with newlines return "\n".join(lines).strip() def format_etf_search_results(tickers: list[str]) -> str: """ Given a list of ETF tickers, returns a Markdown-formatted string: **ETF Search Results**: - TICKER1 - TICKER2 ... - TICKER5 ... and N more. Check the full results in the table. """ header = "**ETF Search Results**:\n" # Take up to 5 display_list = tickers[:5] lines = [header] for t in display_list: lines.append(f"- {t}") remaining = len(tickers) - len(display_list) if remaining > 0: lines.append(f"... and {remaining} more. Check the full results in the table!") return "\n".join(lines) def format_etf_search_results_inline(tickers: list[str], max_display: int = 5) -> str: """ Given a list of ETF tickers, returns a one-line Markdown summary: **ETF Search Results**: SPY, QQQ, BND, GLD, IWM, and more. Check the full results in the table above! """ displayed = tickers[:max_display] remaining = len(tickers) - len(displayed) # Join the displayed tickers with commas tickers_str = ", ".join(displayed) if remaining > 0: return ( f"**ETF Search Results**: I've found {tickers_str}, and many more, which I believe align with your interests." " Check the full results in the table above!" ) else: return f"**ETF Search Results**: {tickers_str}."