File size: 5,948 Bytes
0821f38 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | """Data fetching and async orchestration for wavelets_lite."""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
import numpy as np
import pandas as pd
import yfinance as yf
from .core import signal_snapshot
logger = logging.getLogger(__name__)
# Timeframe β (yfinance interval, bars_per_day multiplier for period calc)
_TF_META: dict[str, tuple[str, float]] = {
"1h": ("60m", 6.5),
"4h": ("60m", 6.5), # resampled from 1h
"1d": ("1d", 1.0),
"1wk": ("1wk", 1.0 / 5),
}
VALID_TIMEFRAMES = set(_TF_META)
DEFAULT_LOOKBACK = 512
DEFAULT_TF = "1d"
SIG_LEVELS = [4, 5]
SLOPE_WINDOW = 40
VOL_WINDOW = 60
DECOMP_LEVELS = 6
@dataclass
class LiteSignal:
ticker: str
timeframe: str
timestamp: datetime
current_price: float
last_bar_time: pd.Timestamp
raw_signal: float # +1.0, -1.0, 0.0
sized_position: float # vol-targeted
midband_slope: float
vol_ann: float
level_slopes: dict[str, float] # D1..D6, A6
level_signals: dict[str, float] # sign of each level slope
sig_levels: list[int]
lookback: int
bars_loaded: int
warnings: list[str] = field(default_factory=list)
def _resample_4h(df_1h: pd.DataFrame) -> pd.DataFrame:
return df_1h.resample("4h").agg({
"Open": "first", "High": "max",
"Low": "min", "Close": "last", "Volume": "sum",
}).dropna()
def _fetch_prices(ticker: str, timeframe: str, lookback: int) -> pd.Series:
yf_interval, bars_per_day = _TF_META[timeframe]
# Request enough bars to cover lookback with margin
need_days = int(lookback / bars_per_day * 1.4) + 30
period = f"{min(need_days, 730)}d" if yf_interval != "1wk" else f"{min(need_days * 7, 3650)}d"
df = yf.download(
ticker,
period=period,
interval=yf_interval,
auto_adjust=True,
progress=False,
threads=False,
)
if df.empty:
raise ValueError(f"No data returned for {ticker} ({timeframe})")
if isinstance(df.columns, pd.MultiIndex):
df.columns = df.columns.get_level_values(0)
df = df[["Open", "High", "Low", "Close", "Volume"]].dropna()
if timeframe == "4h":
df = _resample_4h(df)
close = df["Close"].dropna()
close.name = ticker
return close.tail(lookback)
class WaveletLiteAnalyzer:
"""Stateless lightweight wavelet signal analyzer."""
@staticmethod
async def analyze(
ticker: str,
timeframe: str = DEFAULT_TF,
lookback: int = DEFAULT_LOOKBACK,
sig_levels: list[int] | None = None,
) -> LiteSignal:
"""Fetch prices and compute MODWT signal snapshot. Non-blocking."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: WaveletLiteAnalyzer._analyze_sync(
ticker, timeframe, lookback, sig_levels or SIG_LEVELS
),
)
@staticmethod
def _analyze_sync(
ticker: str,
timeframe: str,
lookback: int,
sig_levels: list[int],
) -> LiteSignal:
warnings: list[str] = []
if timeframe not in VALID_TIMEFRAMES:
raise ValueError(
f"Unsupported timeframe '{timeframe}'. "
f"Use one of: {', '.join(sorted(VALID_TIMEFRAMES))}"
)
# ββ fetch βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
try:
prices = _fetch_prices(ticker, timeframe, lookback)
except Exception as e:
raise ValueError(f"Data fetch failed for {ticker}: {e}") from e
bars = len(prices)
if bars < 64:
raise ValueError(
f"Only {bars} bars for {ticker} ({timeframe}) β need at least 64."
)
if bars < lookback:
warnings.append(
f"Only {bars} bars available (requested {lookback}). "
"Signal quality at deep levels (D5/D6) may be reduced."
)
# ββ data freshness ββββββββββββββββββββββββββββββββββββββββββββββββββββ
last_bar_time: pd.Timestamp = prices.index[-1]
now = datetime.now(timezone.utc)
lb = last_bar_time
if lb.tzinfo is None:
lb = lb.tz_localize("UTC")
age_min = (now - lb.to_pydatetime()).total_seconds() / 60
if age_min > 30:
warnings.append(
f"Last bar is {age_min:.0f} min old "
"(yfinance free tier ~15 min lag for US equities)"
)
# ββ signal ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
log_prices = np.log(prices.values)
snap = signal_snapshot(
log_prices,
sig_levels=sig_levels,
slope_window=SLOPE_WINDOW,
vol_window=VOL_WINDOW,
decomp_levels=DECOMP_LEVELS,
)
return LiteSignal(
ticker=ticker,
timeframe=timeframe,
timestamp=datetime.now(timezone.utc),
current_price=float(prices.iloc[-1]),
last_bar_time=last_bar_time,
raw_signal=snap["raw_signal"],
sized_position=snap["sized_position"],
midband_slope=snap["midband_slope"],
vol_ann=snap["vol_ann"],
level_slopes=snap["level_slopes"],
level_signals=snap["level_signals"],
sig_levels=sig_levels,
lookback=lookback,
bars_loaded=bars,
warnings=warnings,
)
|