Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import re | |
| import streamlit as st | |
| import numpy as np | |
| import plotly.graph_objects as go | |
| from datetime import datetime, timedelta | |
| import pandas as pd | |
| from yahooquery import Ticker | |
| from typing import List | |
| from requests.exceptions import RetryError | |
| import time | |
| def load_etf_data(): | |
| df_etf_info_master = pd.read_csv('etf_general_info_enriched.csv').rename(columns={'ticker': 'Ticker'}) | |
| df_etf_holdings = pd.read_csv('etf_holdings_summarized.csv').rename(columns={'ticker': 'Ticker', 'holdingInformation': 'Holdings'}) | |
| df_etf_info_master = df_etf_info_master.merge(df_etf_holdings, how='left', on='Ticker') | |
| df_etf, avilable_tickers = set_etf_data(df_etf_info_master) | |
| df_analyst_report = pd.read_csv('etf_analyst_report_full.csv') | |
| df_annual_return_master = pd.read_csv('annual_return.csv').rename(columns={'ticker': 'Ticker'}) | |
| return df_etf, df_analyst_report, avilable_tickers, df_annual_return_master | |
| def set_etf_data(df_src): | |
| df = df_src[ | |
| (df_src['averageVolume'] > 1000) & | |
| (df_src['exchangeCountry'] == 'United States') | |
| ].dropna(subset=['categoryName']) | |
| full_ticker_list = df['Ticker'].unique().tolist() | |
| valid_ticker_set = set(t.upper() for t in full_ticker_list) | |
| return df, valid_ticker_set | |
| # Build a ticker → doc_text lookup | |
| def make_doc_text(row): | |
| parts = [] | |
| # helper to append only if the value exists | |
| def add(label, value): | |
| if pd.notna(value) and str(value).strip(): | |
| parts.append(f"{label}: {value}" if label else str(value)) | |
| add(None, row.shortName) | |
| add(None, row.longName) | |
| add("Issuer", row.family) | |
| add("Category", row.categoryName) | |
| add("Type", row.legalType) | |
| add("Position", row.positionType) | |
| add("Tags", row.otherTags) | |
| add("Return", row.return_rating_text) | |
| add("Risk", row.risk_rating_text) | |
| add("Expense Ratio", row.annualReportExpenseRatio_rating_text) | |
| add("Dividend Yield", row.dividendYield_rating_text) | |
| add(None, row.longBusinessSummary) | |
| add("Holdings", row.holdingInformation) | |
| # join with “. ” so each bit reads like a sentence | |
| return ". ".join(parts) | |
| # Helper: extract and filter ticker spans from tokens + labels | |
| def extract_valid_tickers(tokens, labels, tokenizer, valid_set): | |
| spans, cur = [], [] | |
| for tok, lab in zip(tokens, labels): | |
| if lab == "B-TICKER": | |
| if cur: | |
| spans.append(cur) | |
| cur = [tok] | |
| elif lab == "I-TICKER" and cur: | |
| cur.append(tok) | |
| else: | |
| if cur: | |
| spans.append(cur) | |
| cur = [] | |
| if cur: | |
| spans.append(cur) | |
| results = [] | |
| for span in spans: | |
| word = tokenizer.convert_tokens_to_string(span).strip().upper() | |
| if word in valid_set: | |
| results.append(word) | |
| return results | |
| # Rule-based fallback: catch literal 2–4 char tickers in the text | |
| def rule_fallback(query, valid_set): | |
| words = re.findall(r"\b[A-Za-z0-9]{2,4}\b", query) | |
| return {w.upper() for w in words if w.upper() in valid_set} | |
| def get_cols_to_display() -> List[str]: | |
| """ | |
| Returns the list of raw property names that we want to select | |
| for our recommendations table. | |
| """ | |
| return [ | |
| 'Ticker', | |
| 'categoryName', | |
| 'annualReportExpenseRatio', | |
| 'previousCloseUSD', | |
| 'averageVolumeUSD', | |
| 'totalAssetsUSD', | |
| 'longName', | |
| 'marketCapUSD', | |
| 'dividendYield', | |
| 'ytdReturn', | |
| 'oneMonthReturn', | |
| 'threeMonthReturn', | |
| 'oneYearReturn', | |
| 'threeYearReturn', | |
| 'fiveYearReturn', | |
| 'tenYearReturn', | |
| 'avg_annual_return', | |
| 'return_rating', | |
| 'risk_rating', | |
| 'positionType', | |
| 'isLeveraged', | |
| 'return_rating_text', | |
| 'risk_rating_text', | |
| 'annualReportExpenseRatio_rating_text', | |
| 'dividendYield_rating_text', | |
| 'ytdReturn_rating_text', | |
| 'oneMonthReturn_rating_text', | |
| 'threeMonthReturn_rating_text', | |
| 'oneYearReturn_rating_text', | |
| 'threeYearReturn_rating_text', | |
| 'fiveYearReturn_rating_text', | |
| 'tenYearReturn_rating_text', | |
| 'Holdings' | |
| ] | |
| def rename_etf_columns(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Rename DataFrame columns from raw names to display-friendly names. | |
| """ | |
| mapping = { | |
| 'ticker': 'Ticker', | |
| 'categoryName': 'Category', | |
| 'annualReportExpenseRatio': 'Expense Ratio', | |
| 'previousCloseUSD': 'Prev. Close', | |
| 'averageVolumeUSD': 'Avg. Volume', | |
| 'totalAssetsUSD': 'Total Assets', | |
| 'longName': 'Full Name', | |
| 'marketCapUSD': 'Market Cap.', | |
| 'dividendYield': 'Dividend Yield', | |
| 'ytdReturn': 'YTD Return', | |
| 'oneMonthReturn': '1-month Return', | |
| 'threeMonthReturn': '3-month Return', | |
| 'oneYearReturn': '1-year Return', | |
| 'threeYearReturn': '3-year Return', | |
| 'fiveYearReturn': '5-year Return', | |
| 'tenYearReturn': '10-year Return', | |
| 'avg_annual_return': 'Avg. Annual Return %', | |
| 'return_rating': 'Avg. Return Rating (1-10)', | |
| 'risk_rating': 'Avg. Risk Rating (1-10)', | |
| 'positionType': 'Position Type', | |
| 'isLeveraged': 'Leveraged', | |
| 'return_rating_text': 'Return Rating', | |
| 'risk_rating_text': 'Risk Rating', | |
| 'annualReportExpenseRatio_rating_text': 'Expense Ratio Rating', | |
| 'dividendYield_rating_text': 'Dividend Yield Rating', | |
| 'ytdReturn_rating_text': 'YTD Return Rating', | |
| 'oneMonthReturn_rating_text': '1-month Return Rating', | |
| 'threeMonthReturn_rating_text': '3-month Return Rating', | |
| 'oneYearReturn_rating_text': '1-year Return Rating', | |
| 'threeYearReturn_rating_text': '3-year Return Rating', | |
| 'fiveYearReturn_rating_text': '5-year Return Rating', | |
| 'tenYearReturn_rating_text': '10-year Return Rating', | |
| 'holdingInformation': 'Holdings', | |
| } | |
| # Only rename columns that actually exist in df | |
| valid_mapping = {k: v for k, v in mapping.items() if k in df.columns} | |
| return df.rename(columns=valid_mapping) | |
| def get_etf_recommendations_from_list( | |
| list_of_fetched_etfs: List[str], | |
| df_etf: pd.DataFrame, | |
| top_n: int | |
| ) -> pd.DataFrame: | |
| """ | |
| Filter the master ETF DataFrame down to the tickers you fetched, | |
| sort by averageVolumeUSD descending, take the top_n rows, | |
| select only the requested raw columns, rename them for display, and return. | |
| Parameters | |
| ---------- | |
| list_of_fetched_etfs : List[str] | |
| ETF ticker symbols returned by your semantic search. | |
| df_etf : pd.DataFrame | |
| The full ETF DataFrame loaded from Neo4j, with raw property names. | |
| top_n : int | |
| How many of the highest-volume ETFs to return. | |
| Returns | |
| ------- | |
| pd.DataFrame | |
| A DataFrame of the top_n ETFs (by avg volume), with only the | |
| selected columns, renamed to friendly display names. | |
| """ | |
| # 1. Keep only the tickers you fetched | |
| df_filtered = df_etf[df_etf['Ticker'].isin(list_of_fetched_etfs)].copy() | |
| # 2. Sort by raw averageVolumeUSD descending | |
| df_sorted = df_filtered.sort_values(by='averageVolumeUSD', ascending=False) | |
| # 3. Take the top_n rows | |
| df_top = df_sorted.head(top_n) | |
| # 4. Select only the columns you asked for | |
| df_selected = df_top[get_cols_to_display()] | |
| # 5. Rename to friendly display names | |
| df_final = rename_etf_columns(df_selected) | |
| return df_final | |
| def format_number_short(x): | |
| """ | |
| Converts a single number to a short format with K (thousands), M (millions), | |
| B (billions), or T (trillions) suffix. Preserves NaN values. | |
| Parameters: | |
| x (float or int): The number to format. | |
| Returns: | |
| str or float: The formatted string if x is a number, or the original NaN. | |
| """ | |
| # If the value is NaN, return it as is | |
| if pd.isna(x): | |
| return x | |
| # Use the absolute value for comparison to handle negative numbers | |
| abs_x = abs(x) | |
| if abs_x < 1e3: | |
| # For values less than 1,000, just return the value formatted to two decimals. | |
| return f"{x:.2f}" | |
| elif abs_x < 1e6: | |
| # For thousands, divide by 1,000 and append 'K' | |
| return f"{x/1e3:.2f}K" | |
| elif abs_x < 1e9: | |
| # For millions, divide by 1,000,000 and append 'M' | |
| return f"{x/1e6:.2f}M" | |
| elif abs_x < 1e12: | |
| # For billions, divide by 1,000,000,000 and append 'B' | |
| return f"{x/1e9:.2f}B" | |
| else: | |
| # For trillions and above, divide by 1,000,000,000,000 and append 'T' | |
| return f"{x/1e12:.2f}T" | |
| def transform_number_columns(df, columns): | |
| """ | |
| Transforms specified numeric columns in a DataFrame to short format strings. | |
| The transformation converts numbers to their respective short formats: | |
| thousands (K), millions (M), billions (B), and trillions (T). | |
| NaN values are preserved. | |
| Parameters: | |
| df (pd.DataFrame): The input DataFrame. | |
| columns (list): List of column names (as strings) to be transformed. | |
| Returns: | |
| pd.DataFrame: A copy of the DataFrame with the specified columns transformed. | |
| """ | |
| # Create a copy of the DataFrame to avoid modifying the original | |
| df_transformed = df.copy() | |
| # Loop through each specified column | |
| for col in columns: | |
| if col in df_transformed.columns: | |
| # Apply the formatting function to each value in the column. | |
| df_transformed[col] = df_transformed[col].apply(format_number_short) | |
| return df_transformed | |
| def transform_float_columns_to_perc(df, columns): | |
| """ | |
| Transforms specified numeric columns in a DataFrame to short format strings. | |
| The transformation converts numbers to their respective short formats: | |
| thousands (K), millions (M), billions (B), and trillions (T). | |
| NaN values are preserved. | |
| Parameters: | |
| df (pd.DataFrame): The input DataFrame. | |
| columns (list): List of column names (as strings) to be transformed. | |
| Returns: | |
| pd.DataFrame: A copy of the DataFrame with the specified columns transformed. | |
| """ | |
| # Create a copy of the DataFrame to avoid modifying the original | |
| df_transformed = df.copy() | |
| # Loop through each specified column | |
| for col in columns: | |
| if col in df_transformed.columns: | |
| # Apply transformation: multiply by 100, format as string, preserve NaNs | |
| df_transformed[col] = df_transformed[col].apply( | |
| lambda x: f"{x * 100:.2f}%" if pd.notna(x) else x | |
| ) | |
| return df_transformed | |
| def overview_df(df_recommendations, drop_relavance_score=True): | |
| overview_cols = ["Leveraged", "Ticker", "Full Name", 'Category', 'Country', 'Total Assets', "Prev. Close", | |
| "Avg. Volume", 'Market Cap.'] | |
| existing_cols = [col for col in overview_cols if col in df_recommendations.columns] | |
| df_overview = transform_number_columns(df_recommendations[existing_cols], ['Total Assets', 'Market Cap.']) | |
| # df_overview = transform_float_columns_to_perc(df_overview, columns=['Relevance Score']) | |
| # if drop_relavance_score: | |
| # df_overview = df_overview.drop(['Relevance Score'], axis=1) | |
| return df_overview | |
| def transform_return_columns(df, cols=None): | |
| """ | |
| Transforms float values to percentage strings for all columns ending with 'Return'. | |
| For each column in the DataFrame whose name ends with 'Return', the function | |
| multiplies each non-NaN float value by 100 and formats it as a string with two | |
| decimal places followed by a percent sign. NaN values are preserved. | |
| Parameters: | |
| df (pd.DataFrame): The input DataFrame. | |
| Returns: | |
| pd.DataFrame: A copy of the DataFrame with transformed 'Return' columns. | |
| """ | |
| # Create a copy of the DataFrame to avoid modifying the original | |
| df_transformed = df.copy() | |
| # Loop through each column in the DataFrame | |
| for col in df_transformed.columns: | |
| # Check if the column name ends with 'Return' | |
| if col.endswith('Return'): | |
| # Apply transformation: multiply by 100, format as string, preserve NaNs | |
| df_transformed[col] = df_transformed[col].apply( | |
| lambda x: f"{x * 100:.2f}%" if pd.notna(x) else x | |
| ) | |
| return df_transformed | |
| def return_df(df_recommendations): | |
| # Returns | |
| returns_cols = [ | |
| "Ticker", "Full Name", 'Category', "YTD Return", "1-month Return", | |
| "3-month Return", "1-year Return", "3-year Return", | |
| "5-year Return", "10-year Return" | |
| ] | |
| existing_cols = [col for col in returns_cols if col in df_recommendations.columns] | |
| df_return = transform_return_columns(df_recommendations[existing_cols]) | |
| return df_return | |
| def clean_ratings_columns(df): | |
| rating_cols = ['YTD Return Rating', '1-month Return Rating', '3-month Return Rating', | |
| '1-year Return Rating', '3-year Return Rating', '5-year Return Rating', '10-year Return Rating', | |
| 'Expense Ratio Rating', 'Dividend Yield Rating'] | |
| strings_to_keep = ['High', 'Moderate', 'Low'] | |
| for col in rating_cols: | |
| if col in df.columns: | |
| df.loc[:, col] = df[col].copy().astype(str).apply( | |
| lambda x: next((s for s in strings_to_keep if s in x), '').strip() | |
| ) | |
| return df | |
| def rating_df(df_recommendations): | |
| ratings_cols = [ | |
| "Ticker", "Full Name", 'Category', "Avg. Return Rating (1-10)", "Avg. Risk Rating (1-10)", | |
| 'Avg. Return Rating', 'Avg. Risk Rating', 'YTD Return Rating', '1-month Return Rating', '3-month Return Rating', | |
| '1-year Return Rating', '3-year Return Rating', '5-year Return Rating', '10-year Return Rating' | |
| ] | |
| existing_cols = [col for col in ratings_cols if col in df_recommendations.columns] | |
| df_rating = clean_ratings_columns(df_recommendations[existing_cols]) | |
| return df_rating | |
| def expense_ratio_df(df_recommendations): | |
| expenses_cols = ["Ticker", "Full Name", "Category", 'Total Assets', 'Expense Ratio', 'Expense Ratio Rating'] | |
| existing_cols = [col for col in expenses_cols if col in df_recommendations.columns] | |
| df_rec_transformed = transform_number_columns(df_recommendations[existing_cols], ['Total Assets']) | |
| df_rec_transformed = transform_float_columns_to_perc(df_rec_transformed, columns=['Expense Ratio']) | |
| df_rec_transformed = clean_ratings_columns(df_rec_transformed) | |
| return df_rec_transformed | |
| def holdings_df(df_recommendations): | |
| holdings_cols = ["Ticker", "Full Name", "Category", "Holdings"] | |
| existing_cols = [col for col in holdings_cols if col in df_recommendations.columns] | |
| return df_recommendations[existing_cols] | |
| def dividend_df(df_recommendations): | |
| dividends_cols = ["Ticker", "Full Name", "Category", "Dividend Yield", "Dividend Yield Rating"] | |
| existing_cols = [col for col in dividends_cols if col in df_recommendations.columns] | |
| df_rec_transformed = clean_ratings_columns(df_recommendations[existing_cols]) | |
| df_rec_transformed = transform_float_columns_to_perc(df_rec_transformed, columns=['Dividend Yield']) | |
| return df_rec_transformed | |
| def display_matching_etfs(df_recommendations): | |
| if not df_recommendations.empty: | |
| # st.write("Below are the **most recent ETF recommendations** we found:") | |
| # Create tabs for each column group | |
| tabs = st.tabs(["Overview", "Returns", "Ratings", 'Holdings', 'Expenses', 'Dividends']) | |
| # Overview | |
| with tabs[0]: | |
| st.dataframe(overview_df(df_recommendations), hide_index=True) | |
| # Returns | |
| with tabs[1]: | |
| st.dataframe(return_df(df_recommendations), hide_index=True) | |
| # Ratings | |
| with tabs[2]: | |
| st.dataframe(rating_df(df_recommendations), hide_index=True) | |
| # Holdings | |
| with tabs[3]: | |
| st.dataframe(holdings_df(df_recommendations), hide_index=True) | |
| # Expenses | |
| with tabs[4]: | |
| st.dataframe(expense_ratio_df(df_recommendations), hide_index=True) | |
| # Dividend | |
| with tabs[5]: | |
| st.dataframe(dividend_df(df_recommendations), hide_index=True) | |
| return | |
| def compare_etfs_interactive(etf_a: str, etf_b: str, | |
| max_retries: int = 5, | |
| initial_delay: float = 1.0) -> go.Figure: | |
| """ | |
| Fetches 5-year historical price data for two ETFs from Yahoo Finance, | |
| calculates percentage change from the starting price, and returns a Plotly | |
| figure for interactive viewing in Streamlit. | |
| Retries up to `max_retries` times if Yahoo returns 429s, with exponential back-off. | |
| Parameters: | |
| etf_a, etf_b: ETF tickers | |
| max_retries: how many attempts before giving up | |
| initial_delay: seconds to wait before first retry (doubles each time) | |
| Returns: | |
| plotly.graph_objects.Figure | |
| """ | |
| end_date = datetime.today() | |
| start_date = end_date - timedelta(days=5 * 365) | |
| # 1) Fetch data with retries | |
| delay = initial_delay | |
| for attempt in range(max_retries): | |
| try: | |
| tickers = Ticker(f"{etf_a} {etf_b}", asynchronous=True) | |
| df_full = tickers.history(period="5y", interval="1d").reset_index() | |
| break | |
| except RetryError: | |
| if attempt < max_retries - 1: | |
| time.sleep(delay) | |
| delay *= 2 | |
| else: | |
| # final failure | |
| fig = go.Figure() | |
| fig.update_layout( | |
| title="Data fetch failed after multiple attempts. Please try again later." | |
| ) | |
| return fig | |
| except Exception as e: | |
| fig = go.Figure() | |
| fig.update_layout(title=f"Error fetching data: {e}") | |
| return fig | |
| # 2) Split & merge | |
| df_a = ( | |
| df_full[df_full.symbol == etf_a] | |
| .rename(columns={"adjclose": "Adj Close A"})[["date", "Adj Close A"]] | |
| ) | |
| df_b = ( | |
| df_full[df_full.symbol == etf_b] | |
| .rename(columns={"adjclose": "Adj Close B"})[["date", "Adj Close B"]] | |
| ) | |
| df = pd.merge(df_a, df_b, on="date", how="inner").set_index("date") | |
| # 3) Compute % changes | |
| df["Pct Change A"] = (df["Adj Close A"] / df["Adj Close A"].iloc[0] - 1) * 100 | |
| df["Pct Change B"] = (df["Adj Close B"] / df["Adj Close B"].iloc[0] - 1) * 100 | |
| # 4) Build figure | |
| fig = go.Figure() | |
| fig.add_trace(go.Scatter( | |
| x=df.index, y=df["Pct Change A"], mode="lines", name=etf_a)) | |
| fig.add_trace(go.Scatter( | |
| x=df.index, y=df["Pct Change B"], mode="lines", name=etf_b)) | |
| fig.update_layout( | |
| title=f"5-Year Performance Comparison: {etf_a} vs. {etf_b}", | |
| xaxis_title="Date", | |
| yaxis_title="Percentage Change (%)", | |
| hovermode="x unified" | |
| ) | |
| fig.update_xaxes(range=[start_date, end_date]) | |
| return fig | |
| def trim_to_last_full_sentence(text: str) -> str: | |
| # If it already ends cleanly, just return it | |
| if text.rstrip().endswith(('.', '!', '?')): | |
| return text | |
| # Split on sentence boundaries: punctuation + space + uppercase | |
| pattern = r'(?<=[.!?])\s+(?=[A-Z])' | |
| parts = re.split(pattern, text) | |
| # If we only got one part, nothing to trim | |
| if len(parts) == 1: | |
| return text | |
| # Drop the last (incomplete) fragment and rejoin the rest | |
| full_sentences = parts[:-1] | |
| return ' '.join(full_sentences).strip() | |
| def form_display_comparison_table(df_etf, list_of_parsed_tickers): | |
| cols_interests = ['Ticker', 'longName', 'categoryName', 'previousCloseUSD', 'averageVolumeUSD', 'return_rating', 'risk_rating', | |
| 'ytdReturn', 'oneMonthReturn', 'threeMonthReturn', 'oneYearReturn', 'threeYearReturn', 'fiveYearReturn', | |
| 'tenYearReturn', 'annualReportExpenseRatio'] | |
| cols_interests_pretty = ['Ticker', 'Full Name', 'Category', 'Prev. Close', 'Avg. Volume', 'Return Rating (1-10)', 'Risk Rating (1-10)', | |
| 'YTD Return', '1-month Return', '3-month Return', '1-year Return', '3-year Return', | |
| '5-year Return', '10-year Return', 'Expense Ratio'] | |
| rename_dict = dict(zip(cols_interests, cols_interests_pretty)) | |
| df_comparison = df_etf[df_etf['Ticker'].isin(list_of_parsed_tickers)][cols_interests] | |
| df_comparison = df_comparison.rename(columns=rename_dict) | |
| df_comparison = transform_return_columns(df_comparison) | |
| df_comparison = transform_float_columns_to_perc(df_comparison, columns=['Expense Ratio']) | |
| df_comparison = transform_number_columns(df_comparison, ['Avg. Volume']) | |
| return df_comparison | |
| def portfolio_interactive_chart(df_port_output): | |
| # Create a Plotly figure | |
| fig = go.Figure() | |
| # Plot each ETF's growth as a separate line | |
| for col in df_port_output.columns: | |
| if col not in ["year", "Total"]: | |
| fig.add_trace(go.Scatter( | |
| x=df_port_output["year"], | |
| y=df_port_output[col], | |
| mode='lines', | |
| name=col | |
| )) | |
| # Plot the 'Total' line, perhaps in a different style | |
| fig.add_trace(go.Scatter( | |
| x=df_port_output["year"], | |
| y=df_port_output["Total"], | |
| mode='lines', | |
| name="Total", | |
| # line=dict(dash='dash', color='black') | |
| line=dict(dash='dash') | |
| )) | |
| fig.update_layout( | |
| title="Portfolio Growth Over Time", | |
| xaxis_title="Year", | |
| yaxis_title="Portfolio Value (USD)", | |
| hovermode='x unified' | |
| ) | |
| return fig | |
| def set_estimated_return(tickers, df_general_info, df_annual_return): | |
| """ | |
| Estimate the return for each ticker based on trailing and annual returns. | |
| For each ticker, the function: | |
| 1. Extracts trailing return data from df_general_info for the columns: | |
| 'oneYearReturn', 'threeYearReturn', 'fiveYearReturn', and 'tenYearReturn'. | |
| 2. Replaces NaN values with 0 and calculates the mean of non-zero trailing returns. | |
| 3. Retrieves the average annual return from df_annual_return using the 'fundReturn' column. | |
| If 'fundReturn' is NaN, it attempts to use the 'categoryReturn' column instead. | |
| 4. Uses the non-zero mean trailing return if available; otherwise, falls back to the annual return. | |
| Parameters: | |
| tickers (iterable): An iterable of ticker symbols to process. | |
| df_general_info (pd.DataFrame): DataFrame containing general information including trailing returns. | |
| df_annual_return (pd.DataFrame): DataFrame containing annual return information. | |
| Returns: | |
| dict: A dictionary mapping each ticker to its estimated return. | |
| """ | |
| # Define the columns that contain the trailing returns in the general info DataFrame. | |
| trailing_returns_cols = ['oneYearReturn', 'threeYearReturn', 'fiveYearReturn', 'tenYearReturn'] | |
| # Define the column names for annual return and category-based annual return. | |
| annual_return_col = 'fundReturn' | |
| cat_annual_return_col = 'categoryReturn' | |
| # Dictionary to store the estimated return for each ticker. | |
| # d_est_return = {} | |
| ticker_collected = [] | |
| est_return_collected = [] | |
| # Loop over each ticker symbol provided in the tickers list. | |
| for ticker in tickers: | |
| # Extract the trailing return values for the current ticker. | |
| trailing_return = df_general_info[df_general_info['Ticker'] == ticker][trailing_returns_cols].values | |
| # Replace any NaN values in the trailing return array with 0. | |
| trailing_return = np.nan_to_num(trailing_return, nan=0) | |
| # Filter out zero values to only consider nonzero trailing returns. | |
| non_zero_elements = trailing_return[trailing_return != 0] | |
| # Calculate the mean of the nonzero trailing returns, if available. | |
| if len(non_zero_elements) > 0: | |
| non_zero_mean_trailing_return = np.mean(non_zero_elements) | |
| else: | |
| non_zero_mean_trailing_return = 0 | |
| # Calculate the average annual return from the annual return DataFrame using 'fundReturn'. | |
| avg_return = df_annual_return[df_annual_return['Ticker'] == ticker][annual_return_col].mean() | |
| # If the annual return is NaN, try using the 'categoryReturn' column instead. | |
| if pd.isnull(avg_return): | |
| avg_return = df_annual_return[df_annual_return['Ticker'] == ticker][cat_annual_return_col].mean() | |
| # If still NaN, default to 0. | |
| if pd.isnull(avg_return): | |
| avg_return = 0 | |
| # Choose the estimated return: | |
| # If the nonzero trailing mean is 0, use the annual return (avg_return). | |
| # Otherwise, use the nonzero trailing mean. | |
| if non_zero_mean_trailing_return == 0: | |
| est_return_collected.append(avg_return) | |
| # d_est_return[ticker] = avg_return | |
| else: | |
| est_return_collected.append(non_zero_mean_trailing_return) | |
| # d_est_return[ticker] = non_zero_mean_trailing_return | |
| ticker_collected.append(ticker) | |
| df = pd.DataFrame({'etf': ticker_collected, 'estimated_annual_return': est_return_collected}) | |
| d = df.to_dict() | |
| return df, d | |
| def form_d_chat_history(result_id, response, task, fig=None, df=None, query=None): | |
| d = { | |
| "id": result_id, | |
| "task": task, | |
| "response": response, | |
| "fig": fig, | |
| "df": df, | |
| "query": query | |
| } | |
| return d | |
| def portfolio_growth_over_time(df, target_years=30): | |
| """ | |
| Calculate the portfolio value over time (yearly) for each asset in the DataFrame. | |
| The DataFrame should have columns: | |
| - 'etf' | |
| - 'initial_investment' | |
| - 'estimated_annual_return' (as percentage string like "10%" or as a decimal) | |
| - 'amount_of_recurring_investments' | |
| Parameters: | |
| df (pd.DataFrame): Input DataFrame with asset details. | |
| target_years (int): Total number of years to project (default is 30). | |
| Returns: | |
| portfolio_data (pd.DataFrame): DataFrame containing the portfolio value for each asset | |
| and the total portfolio value over time. | |
| """ | |
| years = np.arange(0, target_years + 1) # yearly intervals from 0 to target_years | |
| portfolio_data = pd.DataFrame({'year': years}) | |
| # Process each asset separately | |
| for idx, row in df.iterrows(): | |
| etf = row['etf'] | |
| P = row['initial_investment'] | |
| recurring = row['amount_of_recurring_investments'] | |
| r = row['estimated_annual_return'] | |
| # Convert percentage string (if applicable) to a decimal | |
| if isinstance(r, str) and '%' in r: | |
| r = float(r.strip('%')) / 100.0 | |
| monthly_rate = r / 12 | |
| values = [] | |
| for t in years: | |
| months = int(t * 12) | |
| # Future value from the initial investment: | |
| fv_initial = P * (1 + monthly_rate) ** months | |
| # Future value from monthly contributions (annuity formula) | |
| if monthly_rate != 0: | |
| fv_contrib = recurring * (((1 + monthly_rate) ** months - 1) / monthly_rate) | |
| else: | |
| fv_contrib = recurring * months | |
| total_value = fv_initial + fv_contrib | |
| values.append(total_value) | |
| portfolio_data[etf] = values | |
| # Compute total portfolio value (summing each asset) | |
| asset_columns = df['etf'].tolist() | |
| portfolio_data['Total'] = portfolio_data[asset_columns].sum(axis=1) | |
| last_row = portfolio_data.iloc[-1].to_dict() | |
| return portfolio_data, last_row | |
| def run_portfolio_analysis(list_of_parsed_tickers, df_etf, df_annual_return_master): | |
| # Portfolio Analysis configuration | |
| target_years = 30 | |
| init_investment = 1000 | |
| recur_monthly = 100 | |
| df_port_input = pd.DataFrame({'etf': list_of_parsed_tickers, | |
| 'initial_investment': [init_investment] * len(list_of_parsed_tickers), | |
| 'amount_of_recurring_investments': [recur_monthly] * len(list_of_parsed_tickers)}) | |
| df_est_return, d_est_return = set_estimated_return(tickers=list_of_parsed_tickers, | |
| df_general_info=df_etf, | |
| df_annual_return=df_annual_return_master) | |
| df_port_input = df_port_input.merge(df_est_return, how='left', on='etf').fillna(0) | |
| df_port_output, d_summary = portfolio_growth_over_time(df=df_port_input, target_years=target_years) | |
| d_summary['initial_investment_on_each_etf'] = init_investment | |
| d_summary['recurring_monthly_investment'] = recur_monthly | |
| d_summary['estimated_annual_return'] = d_est_return | |
| return df_port_output, d_summary | |
| def format_portfolio_summary(d_summary: dict) -> str: | |
| """ | |
| Given a summary dict with keys: | |
| - 'year' | |
| - one key per ETF with its final value | |
| - 'Total' | |
| - 'initial_investment_on_each_etf' | |
| - 'recurring_monthly_investment' | |
| - 'estimated_annual_return': { | |
| 'etf': { idx: ticker, … }, | |
| 'estimated_annual_return': { idx: float_return, … } | |
| } | |
| Returns a Markdown string like: | |
| **Portfolio Summary**: | |
| After a 30-year projection … | |
| … | |
| """ | |
| # 1) Core numbers | |
| year = int(d_summary.get("year", 0)) | |
| initial = d_summary.get("initial_investment_on_each_etf", 0) | |
| recurring = d_summary.get("recurring_monthly_investment", 0) | |
| total = d_summary.get("Total", 0) | |
| # 2) Extract just the ETF final values | |
| skip = { | |
| "year", "Total", | |
| "initial_investment_on_each_etf", | |
| "recurring_monthly_investment", | |
| "estimated_annual_return" | |
| } | |
| etf_values = { | |
| k: v for k, v in d_summary.items() | |
| if k not in skip | |
| } | |
| # 3) Sort ETFs by final value descending | |
| sorted_etfs = sorted( | |
| etf_values.items(), | |
| key=lambda kv: kv[1], | |
| reverse=True | |
| ) | |
| # 4) Map tickers → annual returns | |
| e = d_summary.get("estimated_annual_return", {}) | |
| idx_to_ticker = e.get("etf", {}) | |
| idx_to_ret = e.get("estimated_annual_return", {}) | |
| ticker_to_ret = { | |
| idx_to_ticker[i]: idx_to_ret[i] * 100 | |
| for i in idx_to_ticker | |
| if i in idx_to_ret | |
| } | |
| # 5) Build lines | |
| lines = [] | |
| lines.append("**Portfolio Summary**:\n") | |
| lines.append( | |
| f"After **a {year}-year** projection, the final amounts for each ETF in your portfolio are as follows:\n" | |
| ) | |
| # 6) ETF bullets | |
| for ticker, val in sorted_etfs: | |
| lines.append(f"- {ticker}: {val:,.0f} USD") | |
| lines.append(f"- **Total Portfolio Value: {total:,.0f} USD**\n") | |
| # 7) Investment details | |
| lines.append( | |
| f"These amounts are calculated based on an initial investment of " | |
| f"**{initial:,} USD** for each ETF, with a recurring monthly investment " | |
| f"of **{recurring:,} USD**. The estimated annual returns for each ETF " | |
| f"are as follows:\n" | |
| ) | |
| # 8) Return bullets | |
| for ticker, _ in sorted_etfs: | |
| ret = ticker_to_ret.get(ticker, 0.0) | |
| lines.append(f"- {ticker}: {ret:.2f}%") | |
| lines.append("") | |
| # 9) Closing sentence | |
| lines.append( | |
| f"The growth of the investments is influenced by the compounding effect " | |
| f"of the recurring investments and the estimated annual returns over the " | |
| f"{year}-year period." | |
| ) | |
| return "\n".join(lines) | |
| def clean_ocr_etf_text(text: str) -> str: | |
| """ | |
| Cleans and formats OCR-parsed ETF text by: | |
| - Removing excessive newlines and spaces | |
| - Fixing line-break hyphenations | |
| - Normalizing whitespace and punctuation | |
| - Removing wrapping quotes | |
| """ | |
| # Remove leading/trailing whitespace and outer quotes if present | |
| text = text.strip().strip('"').strip("'") | |
| # Fix hyphenated line breaks (e.g., 'NASDAQ-\n100' -> 'NASDAQ-100') | |
| text = re.sub(r'-\s*\n\s*', '-', text) | |
| # Replace remaining line breaks with spaces | |
| text = re.sub(r'[\n\r]+', ' ', text) | |
| # Remove excessive spaces | |
| text = re.sub(r'\s{2,}', ' ', text) | |
| # Ensure proper spacing after periods, commas, etc. | |
| text = re.sub(r'([.,!?])([^\s])', r'\1 \2', text) | |
| # Capitalize the first letter if needed | |
| if text and text[0].islower(): | |
| text = text[0].upper() + text[1:] | |
| return text.strip() | |
| def lookup_etf_report(tickers, df_analyst_report): | |
| d_reports = {} | |
| # Get a value of description column in df_analyst_report by iterating tickers | |
| for ticker in tickers: | |
| description = df_analyst_report[df_analyst_report['Ticker'] == ticker]['description'].values | |
| if len(description) > 0: | |
| d_reports[ticker] = clean_ocr_etf_text(description[0]) | |
| return d_reports | |
| def format_insights_report(d_report: dict) -> str: | |
| """ | |
| Given a dict mapping ETF tickers to their analysis text, returns | |
| a Markdown-formatted Insights Report. | |
| Example output: | |
| **Insights Report**: | |
| **QQQ:** | |
| <<analysis report for QQQ>> | |
| **SPY:** | |
| <<analysis report for SPY>> | |
| """ | |
| lines = ["**Insights Report**:\n"] | |
| for ticker, report in d_report.items(): | |
| # Section header for each ticker | |
| lines.append(f"**{ticker}**: ") | |
| # The report text (preserve any internal newlines) | |
| lines.append(report.strip()) | |
| # Blank line between entries | |
| lines.append("") | |
| # Join with newlines | |
| return "\n".join(lines).strip() | |
| def format_etf_search_results(tickers: list[str]) -> str: | |
| """ | |
| Given a list of ETF tickers, returns a Markdown-formatted string: | |
| **ETF Search Results**: | |
| - TICKER1 | |
| - TICKER2 | |
| ... | |
| - TICKER5 | |
| ... and N more. Check the full results in the table. | |
| """ | |
| header = "**ETF Search Results**:\n" | |
| # Take up to 5 | |
| display_list = tickers[:5] | |
| lines = [header] | |
| for t in display_list: | |
| lines.append(f"- {t}") | |
| remaining = len(tickers) - len(display_list) | |
| if remaining > 0: | |
| lines.append(f"... and {remaining} more. Check the full results in the table!") | |
| return "\n".join(lines) | |
| def format_etf_search_results_inline(tickers: list[str], max_display: int = 5) -> str: | |
| """ | |
| Given a list of ETF tickers, returns a one-line Markdown summary: | |
| **ETF Search Results**: SPY, QQQ, BND, GLD, IWM, and more. Check the full results in the table above! | |
| """ | |
| displayed = tickers[:max_display] | |
| remaining = len(tickers) - len(displayed) | |
| # Join the displayed tickers with commas | |
| tickers_str = ", ".join(displayed) | |
| if remaining > 0: | |
| return ( | |
| f"**ETF Search Results**: I've found {tickers_str}, and many more, which I believe align with your interests." | |
| " Check the full results in the table above!" | |
| ) | |
| else: | |
| return f"**ETF Search Results**: {tickers_str}." | |