folio / src /stockdata.py
dystomachina's picture
chore: fix lint and introduce pre-commit hooks
286e013
"""
Stock data interface and factory.
This module provides:
1. A common interface for data fetchers (DataFetcherInterface)
2. A factory function to create data fetchers (create_data_fetcher)
3. A singleton data fetcher instance (get_data_fetcher)
4. Utility functions for cache management and market hours
This allows for interchangeable use of different data sources (FMP API, Yahoo Finance, etc.)
with runtime selection between them.
"""
import logging
import os
import time
from abc import ABC, abstractmethod
from datetime import datetime
import pytz
logger = logging.getLogger(__name__)
class DataFetcherInterface(ABC):
"""Interface for stock data fetchers"""
# Default period for beta calculations
beta_period = "3m"
@abstractmethod
def fetch_data(self, ticker, period="3m", interval="1d"):
"""
Fetch stock data for a ticker.
Args:
ticker (str): Stock ticker symbol
period (str): Time period ('3m', '6m', '1y', etc.)
interval (str): Data interval ('1d', '1wk', etc.)
Returns:
pandas.DataFrame: DataFrame with stock data
"""
pass
@abstractmethod
def fetch_market_data(self, market_index="SPY", period=None, interval="1d"):
"""
Fetch market index data for beta calculations.
Args:
market_index (str): Market index ticker symbol (default: 'SPY')
period (str, optional): Time period. If None, uses beta_period.
interval (str): Data interval ('1d', '1wk', etc.)
Returns:
pandas.DataFrame: DataFrame with market index data
"""
pass
def create_data_fetcher(cache_dir=None):
"""
Factory function to create a YFinance data fetcher.
Args:
cache_dir (str, optional): Cache directory. If None, uses default.
Returns:
DataFetcherInterface: An instance of YFinanceDataFetcher
"""
# Set default cache directory based on environment
# In Hugging Face Spaces, use /tmp for cache
is_huggingface = (
os.environ.get("HF_SPACE") == "1" or os.environ.get("SPACE_ID") is not None
)
if cache_dir is None:
if is_huggingface:
# Use /tmp directory for Hugging Face
cache_dir = "/tmp/cache_yf"
else:
# Use local directory for other environments
cache_dir = ".cache_yf"
from src.yfinance import YFinanceDataFetcher
logger.info(f"Creating YFinance data fetcher with cache dir: {cache_dir}")
return YFinanceDataFetcher(cache_dir=cache_dir)
# Singleton data fetcher class
class DataFetcherSingleton:
"""Singleton class for data fetcher."""
_instance = None
_initialized = False
@classmethod
def get_instance(cls, cache_dir=None):
"""
Get the singleton instance of the data fetcher.
This method ensures that only one data fetcher is created throughout
the application, preventing duplicate initialization.
Args:
cache_dir (str, optional): Cache directory. If None, uses default.
Returns:
DataFetcherInterface: The singleton data fetcher instance.
Raises:
RuntimeError: If the data fetcher initialization fails.
"""
if cls._instance is not None:
return cls._instance
try:
logger.info("Initializing YFinance data fetcher")
cls._instance = create_data_fetcher(cache_dir=cache_dir)
if cls._instance is None:
raise RuntimeError(
"Data fetcher initialization failed but didn't raise an exception"
)
cls._initialized = True
return cls._instance
except ValueError as e:
logger.error(f"Failed to initialize data fetcher: {e}")
# Re-raise to fail fast rather than continuing with a null reference
raise RuntimeError(
f"Critical component data fetcher could not be initialized: {e}"
) from e
# Convenience function to maintain backward compatibility
def get_data_fetcher(cache_dir=None):
"""
Get the singleton instance of the data fetcher.
This function is a wrapper around DataFetcherSingleton.get_instance()
for backward compatibility.
Args:
cache_dir (str, optional): Cache directory. If None, uses default.
Returns:
DataFetcherInterface: The singleton data fetcher instance.
"""
return DataFetcherSingleton.get_instance(cache_dir)
# Cache management functions
def is_cache_expired(cache_timestamp):
"""
Determine if cache should be considered expired based on market hours.
Cache expires daily at 2PM Pacific time to ensure we use EOD pricing.
Args:
cache_timestamp (float): The timestamp of when the cache was created/modified
Returns:
bool: True if cache should be considered expired, False otherwise
"""
# Convert cache timestamp to datetime
cache_time = datetime.fromtimestamp(cache_timestamp)
# Get current time in Pacific timezone
pacific_tz = pytz.timezone("US/Pacific")
now = datetime.now(pacific_tz)
# Convert cache time to Pacific timezone (assuming it's in local time)
cache_time_pacific = pacific_tz.localize(cache_time)
# Check if cache is from a previous day
if cache_time_pacific.date() < now.date():
# If it's after 2PM Pacific, cache from previous days is expired
if now.hour >= 14: # 2PM = 14:00 in 24-hour format
return True
# If it's before 2PM, cache is still valid
return False
# If cache is from today and it's after 2PM, check if cache was created before 2PM
if now.hour >= 14 and cache_time_pacific.hour < 14:
return True
# In all other cases, cache is still valid
return False
def should_use_cache(cache_path, cache_ttl):
"""
Determine if cache should be used based on both TTL and market hours.
This function centralizes cache validation logic for all data fetchers.
Cache is considered valid if it's within TTL AND not expired based on market hours.
Args:
cache_path (str): Path to the cache file
cache_ttl (int): Cache time-to-live in seconds
Returns:
tuple: (should_use, reason)
- should_use (bool): True if cache should be used, False otherwise
- reason (str): Reason for the decision (for logging)
"""
if not os.path.exists(cache_path):
return False, "Cache file does not exist"
# Get cache modification time
cache_mtime = os.path.getmtime(cache_path)
# Check TTL
cache_age = time.time() - cache_mtime
if cache_age >= cache_ttl:
return False, f"Cache TTL expired (age: {cache_age:.0f}s > TTL: {cache_ttl}s)"
# Check market hours
if is_cache_expired(cache_mtime):
return False, "Cache expired due to market hours (2PM Pacific cutoff)"
# Cache is valid
return True, f"Cache is valid (age: {cache_age:.0f}s)"