Spaces:
Running
Running
| from datetime import date | |
| import numpy as np | |
| import pandas as pd | |
| import yfinance as yf | |
| from src.constants import ( | |
| PRICES_EPS, | |
| PRICES_RETRIEVAL_INTERVAL, | |
| PRICES_ROLLING_MIN_PERIOD, | |
| PRICES_ROLLING_WINDOW, | |
| ) | |
| class Prices: | |
| """Retrieve historical prices and compute relevant metrics.""" | |
| def __init__(self, initial_tickers: list[str], date_start: str) -> None: | |
| self.tickers = list(initial_tickers) | |
| self.date_range = pd.date_range(start=date_start, end=date.today(), freq="B") | |
| self._yield_cache: dict[str, float] = {} | |
| self.get_relative_prices(initial_tickers) | |
| def __str__(self) -> str: | |
| return f"Prices(tickers={self.tickers})" | |
| def get_historical_prices(self, tickers: str | list[str]) -> pd.DataFrame: | |
| data = yf.download( | |
| tickers, | |
| interval=PRICES_RETRIEVAL_INTERVAL, | |
| start=self.date_range[0], | |
| end=self.date_range[-1], | |
| auto_adjust=True, # account for dividends | |
| progress=False, | |
| ) | |
| if data is None or data.empty: | |
| raise ValueError(f"No data retrieved for tickers: {tickers}") | |
| df = data.Close.reindex(index=self.date_range).bfill().ffill() | |
| # add yield for securities without dividends (e.g. bonds, money market funds) | |
| ticker_list = [tickers] if isinstance(tickers, str) else tickers | |
| if ticker_list: | |
| days_elapsed = (self.date_range - self.date_range[0]).days | |
| for ticker in ticker_list: | |
| annual_yield = self._get_annual_yield(ticker) | |
| if annual_yield > 0: | |
| continuous_rate = np.log(1 + annual_yield) | |
| cumulative_factor = np.exp(continuous_rate * days_elapsed / 365.25) | |
| df[ticker] = df[ticker] * cumulative_factor | |
| return df | |
| def _get_annual_yield(self, ticker: str) -> float: | |
| """Return cached annual yield for dividend-less securities; 0.0 if none or unknown.""" | |
| if ticker in self._yield_cache: | |
| return self._yield_cache[ticker] | |
| annual_yield = 0.0 | |
| try: | |
| ticker_obj = yf.Ticker(ticker) | |
| if ticker_obj.dividends.empty: | |
| annual_yield = float(ticker_obj.info.get("yield", 0.0) or 0.0) | |
| except Exception as e: | |
| print(f"Could not adjust yield for {ticker}: {e}") | |
| self._yield_cache[ticker] = annual_yield | |
| return annual_yield | |
| def get_relative_prices(self, tickers: list[str]) -> None: | |
| self.prices_raw = self.get_historical_prices(tickers).reindex(columns=tickers) | |
| self.prices_normalized = self.prices_raw / self.prices_raw.iloc[0] | |
| self.percentage_changes = ( | |
| self.prices_normalized / (self.prices_normalized.shift(1) + PRICES_EPS) - 1 | |
| ).fillna(0) | |
| self.rolling_changes = self.percentage_changes.rolling( | |
| window=PRICES_ROLLING_WINDOW, min_periods=PRICES_ROLLING_MIN_PERIOD | |
| ).sum() | |
| def update_tickers(self, tickers: list[str]) -> None: | |
| for ticker in self.tickers[:]: | |
| if ticker not in tickers: | |
| self.remove_ticker(ticker) | |
| for ticker in tickers: | |
| if ticker not in self.tickers: | |
| self.add_ticker(ticker) | |
| # reorder tickers | |
| if self.tickers != tickers: | |
| self.tickers = list(tickers) | |
| self.prices_raw = self.prices_raw[tickers] | |
| self.prices_normalized = self.prices_normalized[tickers] | |
| self.percentage_changes = self.percentage_changes[tickers] | |
| self.rolling_changes = self.rolling_changes[tickers] | |
| def remove_ticker(self, ticker: str) -> None: | |
| self.tickers.remove(ticker) | |
| for df in [ | |
| self.prices_raw, | |
| self.prices_normalized, | |
| self.percentage_changes, | |
| self.rolling_changes, | |
| ]: | |
| df.drop(ticker, axis=1, inplace=True) | |
| def add_ticker(self, ticker: str) -> None: | |
| ticker_df = self.get_historical_prices(ticker) | |
| self.tickers.append(ticker) | |
| self.prices_raw[ticker] = ticker_df | |
| self.prices_normalized[ticker] = ticker_df / ticker_df.iloc[0] | |
| self.percentage_changes[ticker] = ( | |
| self.prices_normalized[ticker] / (self.prices_normalized[ticker].shift(1) + PRICES_EPS) | |
| - 1 | |
| ).fillna(0) | |
| self.rolling_changes[ticker] = ( | |
| self.percentage_changes[ticker] | |
| .rolling(window=PRICES_ROLLING_WINDOW, min_periods=PRICES_ROLLING_MIN_PERIOD) | |
| .sum() | |
| ) | |
| def is_valid_ticker(self, ticker: str) -> bool: | |
| try: | |
| data = yf.download(ticker, period="5d", progress=False, auto_adjust=False) | |
| return data is not None and not data.empty | |
| except Exception: | |
| return False | |