''' Example 9 for using yfinance Calculate annual, trailing, cumumlative, and CAGR returns for multiple stocks. * The start date can be an arbitrary date. The default is the current date. * annual return is displayed from the default current day, or an arbitrary given day (except for Feb 29 for leap year) For leap years, use Feb 28 to replace Feb 29 as simplification & approximation * trailing, cumumlative returns are currently displayed from the month boundary (last day of Month) prior to the given date. * However, trailing, cumumlative returns can be displayed from any date, which can be not at the month boundary (last day of Month), by minor change of setting calculation_end_date_for_others_str = calculation_end_date_str. prior to the given date in the function "calculation_response(message, history)" Author: Gang Luo yfinance References: code: https://github.com/ranaroussi/yfinance project: https://pypi.org/project/yfinance/ Guide: https://algotrading101.com/learn/yfinance-guide/ Revision history: 2025-02.23.1444: fixing issues of missing "Adj Close" in yf.download and yf.Ticker("AAPL"), caused by (https://github.com/ranaroussi/yfinance/issues/2283) which is introduced by yfinance version 0.2.54 (released on Feb 18, 2025 ). 2025-02.23.1655: further fix for the issues from (https://github.com/ranaroussi/yfinance/issues/2283). The "Adj Close" column is missing from yf.download since yf.download default changed from auto_adjust=False to auto_adjust=True. When auto_adjust=True, column Close is actually Adj Close and Adj Close column does not exist any more. The "Adj Close" column is also missing from using ticker = yf.Ticker("AAPL") data = ticker.history(period="1y") The fixes 1: In order to fix the issue in the function stock_prices_df, auto_adjust=False is used explicitly in download function, to get back the Adj Close column. The fixes 2: The function "get_yearly_single_stock_data" in part 6 is broken duo to the missing "Adj Close" column from ticker = yf.Ticker() and ticker.history(). Add auto_adjust=False into ticker.history(..., auto_adjust=False) for fixing the issue. However, after the fix, the following line in the part 6 has an error: complete_history = complete_history.merge(dld_history, how='left', left_index=True, right_index=True) The root cause is that Columns of dld_history is of MultiIndex(, names=['Price', 'Ticker']). However, each price column such as 'Close','AdjClose' has only single level with Ticker being column index name. Dropping the column MultiIndex level ('Ticker') fixed the issue (dld_history.columns = dld_history.columns.droplevel(1) ) print("\n===== DataFrame Structure Information for debug =====") print("Index Levels:", dld_history.index.names) # Shows the index levels print("Index:", dld_history.index) # Shows the actual index print("Columns:", dld_history.columns) # Shows column names print("Data Types:\n", dld_history.dtypes) # Shows data types of each column print("Shape (Rows, Columns):", dld_history.shape) # Shows the shape of the DataFrame 2025-02.23.2000: Add the test cases for unit testing of part 1,2,3,4 Comment out part 5 which is not used, for better performance. 2025-02.23.2040: Fix the date errors 2025-02.24.2200: updated unit test cases 2025-02.25.0027: make output fonts smaller than the default font size using font 14 by using css in gradio.app, provided by deepseek R1 2025-02.25.0028: years_list = [1, 2, 3, 4,5,6,7,8,9,10,11,12,13,14,15, 20, 25, 30, 40, 50, 60] ''' script_version = 'version: (2025-02.25.0028)' import gradio as gr import yfinance as yf import pandas as pd import numpy as np from datetime import datetime, timedelta import pytz DEBUG_ENABLED = True #============================================================================== print_yearly_total_return = True num_years_calculation=52 # total years for calculation # Define a list of years to calculate the trailing returns, cumulative returns, and so on # remove the row of current year row since it is not a full year. #years_list = [1, 2, 3, 5, 10, 15, 20, 25, 30, 40, 50, 60] years_list = [1, 2, 3, 4,5,6,7,8,9,10,11,12,13,14,15, 20, 25, 30, 40, 50, 60] # Set the stock tickers list tickers_lists = [["qqq","hxq.to","spy", "vfv.to","xiu.to", "xbb.to","xcb.to","xhb.to"], #0 checking ETF ["qqq","spy", "vfv.to", "vgg.to", "zlu.to", "xiu.to","zlb.to","vdy.to", "xfn.to", "ry.to", "td.to", "na.to", "slf.to", "gwo.to", "bce.to", "t.to", "rci-b.to", "enb.to", "trp.to","cp.to"], #1 main monitoring list ["xiu.to", "xfn.to", "na.to","ry.to", "bmo.to","bns.to", "td.to", "cm.to", "cwb.to", "slf.to", "gwo.to", "bce.to", "t.to", "rci-b.to", "enb.to", "trp.to", "vdy.to","xdv.to","cdz.to","xdiv.to", "zeb.to"], #2 financial ETF & stocks ["spy","qqq","tqqq","mags","msft","AAPL","goog","AMZN","NVDA","meta","tsla","BRK-A","shop.to","hxq.to"], #3 US mega stocks + risky shopfy ["^DJI","dia","^GSPC","spy","voo","ivv", "tpu-u.to","vfv.to", "zsp.to","hxs.to","tpu.to","xus.to", "xsp.to", "^IXIC","^ndx", "qqq","hxq.to","^GSPTSE","xic.to","xiu.to", "HXT.TO", "TTP.TO","ZCN.TO", "xfn.to", "xit.to"], #4 indexes and index ETFs ["dia","^DJI","^GSPC","spy","vfv.to", "zsp.to","hxs.to","xus.to", "xsp.to", "^IXIC","qqq","hxq.to","^GSPTSE","xic.to","xiu.to", "HXT.TO", "xfn.to"], #5 indexes and typical index ETFs ["^IXIC","^ndx","ONEQ","CIBR","QQJG", "qqq", "tqqq", "spy", "vfv.to", "HXQ.to", "ZQQ.to", "XQQ.to", "QQC.to", "ZNQ.TO", "xiu.to", "xit.to"], #6 Nasdaq ETF and TSX IT ETF ["qqq","tqqq","sqqq", "QLD", "spy", "spxu", "upro", "sso", "spxl","tecl"], #7 leveraged ETFs ["^IXIC","^DJI","^GSPC","^GSPTSE"], #8 testing ["vfv.to","spy"] #9 testing ] #============================================================================== # Part 1: # retrieve daily adjusted close prices of a list of tickers from yahoo finance # Generate the year-end adjusted close prices # return year-end adjusted close prices, and daily adjusted close prices def stock_prices_df(tickers_list, end_date_str): tickers_list_upper = [ticker.upper() for ticker in tickers_list] tickers_str = ", ".join(tickers_list_upper) try: ''' 'try' statement for handlingy the exception error for yf.download ''' # Download the historical data, see 2025-02.23.1655 revision note data = yf.download(tickers_str, period="max", auto_adjust=False) # default changed to auto_adjust=True at yfinance version 0.2.54, # when auto_adjust=True, Close = Adj Close and Adj Close does not exist except: return pd.DataFrame() else: data_adj_close = data['Adj Close'] # Filter out rows with dates newer than calculation_end_date data_adj_close = data_adj_close[data_adj_close.index <= end_date_str] #print("\nDebug- stock_prices_df\n", data_adj_close) # Rearrange columns based on the order in tickers_list_upper if len(tickers_list)>1: data_adj_close = data_adj_close.reindex(columns=tickers_list_upper) # needed this when having only a single ticker in the ticker list if len(tickers_list_upper)==1: data_adj_close = pd.DataFrame(data_adj_close) data_adj_close.rename(columns={'Adj Close': tickers_list_upper[0]}, inplace=True) data_adj_close.columns = map(str.lower, data_adj_close.columns) # must after pd.DataFrame(data_adj_close) # data_adj_close_year_end = data_adj_close.resample('A').ffill().round(2) # must before index changed to date data_adj_close_year_end = data_adj_close.resample('YE').ffill().round(2) # must before index changed to date data_adj_close.index=data_adj_close.index.date data_adj_close_year_end.index=data_adj_close_year_end.index.date last_date = data_adj_close_year_end.index[-1] data_adj_close_year_end = data_adj_close_year_end.rename(index={last_date: end_date_str}) #print("\nstock_prices_df\n", end_date_str, "\n", data_adj_close_year_end) return data_adj_close_year_end, data_adj_close #============================================================================== # Part 2: Calculate annual returns at year end, and at any given day (by calculation_end_date_str) # # annual return calculation can start at any given day def get_annual_returns_anyday_df(daily_adj_close_df, calculation_end_date_str): calculation_end_date=pd.to_datetime(calculation_end_date_str).tz_localize('America/New_York') # Create a DataFrame with a complete date range date_range = pd.date_range(start=daily_adj_close_df.index.min(), end=daily_adj_close_df.index.max(), freq='D') complete_stock_history = pd.DataFrame(index=date_range) # Merge the complete DataFrame with the original stock_history complete_stock_history = complete_stock_history.merge(daily_adj_close_df, how='left', left_index=True, right_index=True) complete_stock_history = complete_stock_history.ffill() # fill the newy added rows with previous day value ''' Filter out the rows that matches the month and date of calculation_end_date, which are the ends of annual periods from the calculation_end_date. ''' # Filter out rows with dates newer than calculation_end_date #filtered_stock_history = complete_stock_history[complete_stock_history.index <= calculation_end_date] # note" daily_adj_close_df satisfys daily_adj_close_df.index <= calculation_end_date filtered_stock_history = complete_stock_history #print(filtered_stock_history) target_month=filtered_stock_history.index.max().month target_day=filtered_stock_history.index.max().day #print("target_month", target_month, "target_day",target_day, "start_year", filtered_stock_history.index.max().year) annual_returns = filtered_stock_history[(filtered_stock_history.index.month == target_month) & (filtered_stock_history.index.day ==target_day)] annual_returns_percent = annual_returns.pct_change().dropna(how='all') annual_returns_df = pd.DataFrame(annual_returns_percent) #print("\ndebug-annual_returns_df\n", annual_returns_df) return annual_returns_df # annual return calculation can start at year end def get_annual_returns_year_end_df(data_adj_close_df, calculation_end_date_str): annual_returns_percent = data_adj_close_df.pct_change().dropna(how='all') return annual_returns_percent #============================================================================== # Part 3: calculate the annualized trailing total return from the data generated in step 1 & display # Define a function to calculate the annualized trailing total return for a given number of years def get_trailing_return(ticker, data, years): # Get the total return values for the last n years trailing_data = data[ticker].tail(years) # Check if there are empty values within years if trailing_data.isna().any(): return np.nan # Check if there are valid total return values for all years if len(trailing_data) == years: # Convert the percentage strings to numeric values trailing_data = trailing_data.astype(str).str.replace('%', '').astype(float) """ Calculate the annualized trailing total return using the formula from Investopedia[^1^][1]: Annualized Return = [(1 + r1) * (1 + r2) * ... * (1 + rn)]^(1/n) - 1 Where r1, r2, ..., rn are the total return values for each year """ annualized_trailing_return = (trailing_data + 1).prod() ** (1 / years) - 1 # Format the result as a percentage with two decimal places annualized_trailing_return = annualized_trailing_return * 100 annualized_trailing_return = annualized_trailing_return.round(2) return annualized_trailing_return else: return np.nan # Define a function to Loop through the list and print the trailing returns for each num_years def get_trailing_return_column(ticker, annual_returns_df): trailing_return_column = {} for num_years in years_list: # Check if the ticker data is available in all_tickers_returns_df if ticker in annual_returns_df.columns: # using data from step 1, avoiding get_annual_returns_df(ticker) for less traffic from yahoo server data = annual_returns_df[[ticker]] trailing_return = get_trailing_return(ticker, data, num_years) trailing_return_column[f"{num_years}-Year"] = trailing_return else: print(f"Data not available for {ticker}. Skipping.") trailing_return_column[f"{num_years}-Year"] = np.nan return trailing_return_column # Create an empty DataFrame to store all tickers' trailing returns def get_trailing_return_all(annual_returns_df): all_tickers_trailing_returns_df = pd.DataFrame(index=years_list) tickers=annual_returns_df.columns.tolist() # Loop through each ticker in the list for ticker in tickers: trailing_returns = get_trailing_return_column(ticker, annual_returns_df) # Add the trailing returns to the DataFrame all_tickers_trailing_returns_df[ticker] = pd.Series(trailing_returns).values return all_tickers_trailing_returns_df #============================================================================== # Part 4: calculate the cumulative return from the data (all_tickers_returns_df) generated in part 1 & display # Define a function to calculate the cumulative return for a given number of years from a ticker def get_cumulative_return(ticker, data, years): # Calculate the cumulative return cumulative_return = (1 + data[ticker]).rolling(window=years).apply(lambda x: x.prod(), raw=True) - 1 return cumulative_return # Define a function to Loop through the list and return the cumulative returns for each num_years def get_cumulative_return_column(ticker, annual_returns_df): cumulative_returns = {} for years in years_list: # Calculate the cumulative return for the given number of years cumulative_return = get_cumulative_return(ticker, annual_returns_df, years) # Get the last value, which is the cumulative return up to the current year cumulative_returns[years] = cumulative_return.iloc[-1] return cumulative_returns def get_cumulative_return_all(annual_returns_df): # Create an empty DataFrame with years_list as the index for cumulative returns all_tickers_cumulative_returns_df = pd.DataFrame(index=years_list) tickers=annual_returns_df.columns.tolist() # Loop through each ticker in the list for ticker in tickers: cumulative_returns = get_cumulative_return_column(ticker, annual_returns_df) # Add the trailing returns to the DataFrame all_tickers_cumulative_returns_df[ticker] = pd.Series(cumulative_returns).values return all_tickers_cumulative_returns_df #============================================================================== # Part 5: calculate the CAGR (Compound Annual Growth Rate) from the data # in all_tickers_cumulative_returns_df generated earlier & display # Define a function to calculate the CAGR from the cumulative value and the years def calculate_cagr(value, years): # Otherwise, calculate the CAGR using the formula cagr = (value + 1) ** (1 / np.array(years)) - 1 #print("debug-cagr\n", cagr, "end") return cagr # Define a function to format the Float64Index values into percentage strings def format_to_percentage(value): # If any element in the value array is not null, format it as a percentage string with two decimal places if np.any(pd.notnull(value)): return f"{value:.2f}%" # Otherwise, return None return None def get_cagr_return_all(all_tickers_cumulative_returns_df): # Apply the calculate_cagr function to each column of the DataFrame all_tickers_cagrs_df = all_tickers_cumulative_returns_df.apply(lambda x: calculate_cagr(x, x.index), axis=0) return all_tickers_cagrs_df #============================================================================== # Part 6: # single ticker's Prices, Returns,Dividends, good for verifying whether "Adj Close" is correct. ''' Calculate and display: yearly dividendSum, 'Close' & 'Adj Close' prices, Return(by 'Close' price), total return(by 'Adj Close' price), CalReturn(total return by 'Close' price and "dividendSum). Note: CalReturn from is expected to be nearly same as total return, when the 'Adj Close' price is correct. ''' def get_yearly_single_stock_data(ticker): stock = yf.Ticker(ticker) #-------- mainly for downloading 'Dividends' history = stock.history(period="max", auto_adjust=False) # see 2025-02.23.1655 revision note dividend_history=history['Dividends'] dividend_history.index=dividend_history.index.date #-------- mainly for downloading 'Close','Adj Close' dld_history=yf.download(ticker, period="max", auto_adjust=False) # see 2025-02.23.1655 revision note dld_history=dld_history[['Close','Adj Close']] dld_history.rename(columns={'Adj Close': 'AdjClose'}, inplace=True) ''' note: see 2025-02.23.1655 revision note Columns is of MultiIndex(, names=['Price', 'Ticker']). Each price colums such as 'Close','AdjClose' has only single sub-column with Ticker is column index name. Drop the column MultiIndex level ('Ticker') ''' dld_history.columns = dld_history.columns.droplevel(1) # see 2025-02.23.1655 revision note date_range = pd.date_range(start=dld_history.index.min(), end=dld_history.index.max(), freq='D') complete_history = pd.DataFrame(index=date_range) # Merge the complete DataFrame with the original stock_history complete_history = complete_history.merge(dld_history, how='left', left_index=True, right_index=True) complete_history[['Close','AdjClose']] = complete_history[['Close','AdjClose']].ffill().round(3) # Merge dividend into complete_history complete_history = complete_history.merge(dividend_history, how='left', left_index=True, right_index=True) # replace all NaN values in the 'Dividends' column with 0.0 complete_history['Dividends'] = complete_history['Dividends'].fillna(0.0).round(3) complete_history['Year']=complete_history.index.year complete_history['Date']=complete_history.index yearly_data = complete_history.groupby('Year').agg({'Date': 'last', 'Close': 'last', 'AdjClose': 'last','Dividends': 'sum'}) yearly_data.rename(columns={'Dividends': 'DivSum'}, inplace=True) # calculating 'Return' and 'TotalReturn' yearly_data['DivRatio']=yearly_data['DivSum'] / yearly_data['Close'] yearly_data['Return']=yearly_data['Close'].pct_change() yearly_data['TotalReturn']=yearly_data['AdjClose'].pct_change() ''' The CalReturn column is the yearly total return calculated from un-adjusted "Close" prices and yearly "dividend sum", which is expected to be equal to the total return that is calculated from "AdjClose" prices ''' yearly_data['CalReturn'] = (yearly_data['Close'] + yearly_data['DivSum']) / yearly_data['Close'].shift(1) - 1 # set the display format yearly_data[['DivRatio','Return','TotalReturn','CalReturn']] = yearly_data[['DivRatio','Return','TotalReturn','CalReturn']].mul(100).round(2) ''' #yearly_data[['DivRatio','Return', 'TotalReturn', 'CalReturn']] = yearly_data[['DivRatio','Return', 'TotalReturn', 'CalReturn']].applymap("{:.2f}%".format) yearly_data[['DivRatio','Return', 'TotalReturn', 'CalReturn']]= \ yearly_data[['DivRatio','Return', 'TotalReturn', 'CalReturn']].applymap(lambda x: f"{x:.2f}%" if not pd.isna(x) else "NaN") ''' # Use .applymap() and lambda to format the values as percentage strings only if they are not NaN yearly_data[['DivRatio','Return', 'TotalReturn', 'CalReturn']]= \ yearly_data[['DivRatio','Return', 'TotalReturn', 'CalReturn']].applymap(lambda x: f"{x:.2f}%" if not pd.isna(x) else x) # 'Date' column is no longer required yearly_data.drop('Date', axis=1, inplace=True) return yearly_data #============================================================================== # Part 7: utility functions # get the last trading day of S&P 500 in string format def get_last_trading_day(): # Get today's date, use .strftime("%Y-%m-%d") to convert to a string today_date_str=datetime.now(pytz.timezone('America/New_York')).date().strftime("%Y-%m-%d") stock = yf.Ticker("^GSPC") # S&P 500 (^GSPC) ticker # search and see yfinance_BUG_1 NOTE in this file history_df=stock.history(period="max", end=today_date_str)["Close"] last_trading_day_str = history_df.index.max().date().strftime("%Y-%m-%d") return last_trading_day_str def str_to_integer(integer_str): try: integer_number = int(integer_str) return integer_number except ValueError: return -1 # validate the date string def is_valid_date(date_string): try: # Attempt to parse the date string datetime.strptime(date_string, "%Y-%m-%d") return True except ValueError: # Raised when the date string is not in the expected format return False def date_label_conversion_strip_time(all_tickers_returns_df, calculation_end_date_str): all_tickers_returns_df.index=all_tickers_returns_df.index.date all_tickers_returns_df.index.name='date' # print("debug get_annual_returns_tickers_df", all_tickers_returns_df) # Convert calculation_end_date_str to a datetime object, replace the index's mon/day portion of date end_date_datetime_obj = datetime.strptime(calculation_end_date_str, "%Y-%m-%d") all_tickers_returns_df.index = all_tickers_returns_df.index.map( lambda x: x.replace(month=end_date_datetime_obj.month, day=end_date_datetime_obj.day)) return all_tickers_returns_df #============================================================================== # Part 8: gradio handling - Input command handling and display in web page help_info_str="Input Formats:\n \ 1. ticker list....................Example: spy vfv.to xiu.to xic.to xfn.to ry.to \n \ 2. One of default ticker list, a number between 1 and 7....Example: 0, or 1, ...,7 \n \ 3. CalculationEndDate as prefix. Example: 2020-12-31 2 \n \ .........................................2020-12-31 spy vfv.to xiu.to xic.to xfn.to ry.to \n \ 4. single ticker: Dividend/Close/AdjClose/Return/TotalReturn/CalReturn(by Close/Dividends). @1 spy \n \ note: daily adjusted close data are from Yahoo Finance. \n" + script_version # Main Handling Process def calculation_response(message): # if there is no input, display help information if message=="": return help_info_str tickers=message.split() # ****************************************************************************** # processing web input parameters # set calculation_end_date_str, and tickers #--------------------------------------------------------- # single stock ticker - detailed information if (tickers[0] == "@1"): tickers.pop(0) # remove the first string which is "@1" if len(tickers)==0: ticker = 'spy' # default ticker = spy else: ticker=tickers[0] output_string=f"\n {ticker}\n" output_dataframe0=get_yearly_single_stock_data(ticker) output_html=output_string + output_dataframe0.to_html() return output_html #---------------------------------------------------------- # Get today's date, use .strftime("%Y-%m-%d") to convert to a string #calculation_end_date_str=datetime.now(pytz.timezone('America/New_York')).date().strftime("%Y-%m-%d") calculation_end_date_str = get_last_trading_day() # Check whether the first str is date for calculation end date if is_valid_date(tickers[0]): calculation_end_date_str = tickers[0] # reset calculation_end_date_str tickers.pop(0) # remove the first string which is the date #............ For display trailing and cumulative returns at month_boundary_date # Assuming calculation_end_date_str contains the date string '2024-01-03' calculation_end_date = datetime.strptime(calculation_end_date_str, '%Y-%m-%d') # Calculate the first day of the current month first_day_of_month = calculation_end_date.replace(day=1) # Calculate the last day of the month last_day_of_month = (calculation_end_date.replace(day=1) + timedelta(days=32)).replace(day=1) - timedelta(days=1) # Calculate the last day of the previous month last_day_of_previous_month = first_day_of_month - timedelta(days=1) # Check if the original date is the last day of the month if (calculation_end_date == last_day_of_month): calculation_end_date_month_boundary_date_str=calculation_end_date_str else: calculation_end_date_month_boundary_date_str=last_day_of_previous_month.strftime('%Y-%m-%d') # calculation_end_date_for_others are for trailing and cumulative returns calculation_end_date_for_others_str=calculation_end_date_month_boundary_date_str ''' Handling Feb 29 of leap years. For leap years, to simiplify the calculation, Feb 28 will be used to replace Feb 29 for for calculating returns. Therefore, if calculation_end_date_for_others_str is Feb 29, then replace 29 to 28 of calculation_end_date_for_others_str ''' leap_year=False if ( calculation_end_date_for_others_str[-5:] == '02-29' ): calculation_end_date_for_others_str = calculation_end_date_for_others_str[:-2] + '28' leap_year=True #................End # Check whether numebr 0, 1, 2, .. is selected for using a default ticker list integer_value=str_to_integer(tickers[0]) if (integer_value >= 0 and integer_value