"""Data fetching and async orchestration for wavelets_lite.""" from __future__ import annotations import asyncio import logging from dataclasses import dataclass, field from datetime import datetime, timezone import numpy as np import pandas as pd import yfinance as yf from .core import signal_snapshot logger = logging.getLogger(__name__) # Timeframe → (yfinance interval, bars_per_day multiplier for period calc) _TF_META: dict[str, tuple[str, float]] = { "1h": ("60m", 6.5), "4h": ("60m", 6.5), # resampled from 1h "1d": ("1d", 1.0), "1wk": ("1wk", 1.0 / 5), } VALID_TIMEFRAMES = set(_TF_META) DEFAULT_LOOKBACK = 512 DEFAULT_TF = "1d" SIG_LEVELS = [4, 5] SLOPE_WINDOW = 40 VOL_WINDOW = 60 DECOMP_LEVELS = 6 @dataclass class LiteSignal: ticker: str timeframe: str timestamp: datetime current_price: float last_bar_time: pd.Timestamp raw_signal: float # +1.0, -1.0, 0.0 sized_position: float # vol-targeted midband_slope: float vol_ann: float level_slopes: dict[str, float] # D1..D6, A6 level_signals: dict[str, float] # sign of each level slope sig_levels: list[int] lookback: int bars_loaded: int warnings: list[str] = field(default_factory=list) def _resample_4h(df_1h: pd.DataFrame) -> pd.DataFrame: return df_1h.resample("4h").agg({ "Open": "first", "High": "max", "Low": "min", "Close": "last", "Volume": "sum", }).dropna() def _fetch_prices(ticker: str, timeframe: str, lookback: int) -> pd.Series: yf_interval, bars_per_day = _TF_META[timeframe] # Request enough bars to cover lookback with margin need_days = int(lookback / bars_per_day * 1.4) + 30 period = f"{min(need_days, 730)}d" if yf_interval != "1wk" else f"{min(need_days * 7, 3650)}d" df = yf.download( ticker, period=period, interval=yf_interval, auto_adjust=True, progress=False, threads=False, ) if df.empty: raise ValueError(f"No data returned for {ticker} ({timeframe})") if isinstance(df.columns, pd.MultiIndex): df.columns = df.columns.get_level_values(0) df = df[["Open", "High", "Low", "Close", "Volume"]].dropna() if timeframe == "4h": df = _resample_4h(df) close = df["Close"].dropna() close.name = ticker return close.tail(lookback) class WaveletLiteAnalyzer: """Stateless lightweight wavelet signal analyzer.""" @staticmethod async def analyze( ticker: str, timeframe: str = DEFAULT_TF, lookback: int = DEFAULT_LOOKBACK, sig_levels: list[int] | None = None, ) -> LiteSignal: """Fetch prices and compute MODWT signal snapshot. Non-blocking.""" loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: WaveletLiteAnalyzer._analyze_sync( ticker, timeframe, lookback, sig_levels or SIG_LEVELS ), ) @staticmethod def _analyze_sync( ticker: str, timeframe: str, lookback: int, sig_levels: list[int], ) -> LiteSignal: warnings: list[str] = [] if timeframe not in VALID_TIMEFRAMES: raise ValueError( f"Unsupported timeframe '{timeframe}'. " f"Use one of: {', '.join(sorted(VALID_TIMEFRAMES))}" ) # ── fetch ───────────────────────────────────────────────────────────── try: prices = _fetch_prices(ticker, timeframe, lookback) except Exception as e: raise ValueError(f"Data fetch failed for {ticker}: {e}") from e bars = len(prices) if bars < 64: raise ValueError( f"Only {bars} bars for {ticker} ({timeframe}) — need at least 64." ) if bars < lookback: warnings.append( f"Only {bars} bars available (requested {lookback}). " "Signal quality at deep levels (D5/D6) may be reduced." ) # ── data freshness ──────────────────────────────────────────────────── last_bar_time: pd.Timestamp = prices.index[-1] now = datetime.now(timezone.utc) lb = last_bar_time if lb.tzinfo is None: lb = lb.tz_localize("UTC") age_min = (now - lb.to_pydatetime()).total_seconds() / 60 if age_min > 30: warnings.append( f"Last bar is {age_min:.0f} min old " "(yfinance free tier ~15 min lag for US equities)" ) # ── signal ──────────────────────────────────────────────────────────── log_prices = np.log(prices.values) snap = signal_snapshot( log_prices, sig_levels=sig_levels, slope_window=SLOPE_WINDOW, vol_window=VOL_WINDOW, decomp_levels=DECOMP_LEVELS, ) return LiteSignal( ticker=ticker, timeframe=timeframe, timestamp=datetime.now(timezone.utc), current_price=float(prices.iloc[-1]), last_bar_time=last_bar_time, raw_signal=snap["raw_signal"], sized_position=snap["sized_position"], midband_slope=snap["midband_slope"], vol_ann=snap["vol_ann"], level_slopes=snap["level_slopes"], level_signals=snap["level_signals"], sig_levels=sig_levels, lookback=lookback, bars_loaded=bars, warnings=warnings, )