Dmitry Beresnev
add wavelet analysis
0821f38
"""Data fetching and async orchestration for wavelets_lite."""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
import numpy as np
import pandas as pd
import yfinance as yf
from .core import signal_snapshot
logger = logging.getLogger(__name__)
# Timeframe β†’ (yfinance interval, bars_per_day multiplier for period calc)
_TF_META: dict[str, tuple[str, float]] = {
"1h": ("60m", 6.5),
"4h": ("60m", 6.5), # resampled from 1h
"1d": ("1d", 1.0),
"1wk": ("1wk", 1.0 / 5),
}
VALID_TIMEFRAMES = set(_TF_META)
DEFAULT_LOOKBACK = 512
DEFAULT_TF = "1d"
SIG_LEVELS = [4, 5]
SLOPE_WINDOW = 40
VOL_WINDOW = 60
DECOMP_LEVELS = 6
@dataclass
class LiteSignal:
ticker: str
timeframe: str
timestamp: datetime
current_price: float
last_bar_time: pd.Timestamp
raw_signal: float # +1.0, -1.0, 0.0
sized_position: float # vol-targeted
midband_slope: float
vol_ann: float
level_slopes: dict[str, float] # D1..D6, A6
level_signals: dict[str, float] # sign of each level slope
sig_levels: list[int]
lookback: int
bars_loaded: int
warnings: list[str] = field(default_factory=list)
def _resample_4h(df_1h: pd.DataFrame) -> pd.DataFrame:
return df_1h.resample("4h").agg({
"Open": "first", "High": "max",
"Low": "min", "Close": "last", "Volume": "sum",
}).dropna()
def _fetch_prices(ticker: str, timeframe: str, lookback: int) -> pd.Series:
yf_interval, bars_per_day = _TF_META[timeframe]
# Request enough bars to cover lookback with margin
need_days = int(lookback / bars_per_day * 1.4) + 30
period = f"{min(need_days, 730)}d" if yf_interval != "1wk" else f"{min(need_days * 7, 3650)}d"
df = yf.download(
ticker,
period=period,
interval=yf_interval,
auto_adjust=True,
progress=False,
threads=False,
)
if df.empty:
raise ValueError(f"No data returned for {ticker} ({timeframe})")
if isinstance(df.columns, pd.MultiIndex):
df.columns = df.columns.get_level_values(0)
df = df[["Open", "High", "Low", "Close", "Volume"]].dropna()
if timeframe == "4h":
df = _resample_4h(df)
close = df["Close"].dropna()
close.name = ticker
return close.tail(lookback)
class WaveletLiteAnalyzer:
"""Stateless lightweight wavelet signal analyzer."""
@staticmethod
async def analyze(
ticker: str,
timeframe: str = DEFAULT_TF,
lookback: int = DEFAULT_LOOKBACK,
sig_levels: list[int] | None = None,
) -> LiteSignal:
"""Fetch prices and compute MODWT signal snapshot. Non-blocking."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: WaveletLiteAnalyzer._analyze_sync(
ticker, timeframe, lookback, sig_levels or SIG_LEVELS
),
)
@staticmethod
def _analyze_sync(
ticker: str,
timeframe: str,
lookback: int,
sig_levels: list[int],
) -> LiteSignal:
warnings: list[str] = []
if timeframe not in VALID_TIMEFRAMES:
raise ValueError(
f"Unsupported timeframe '{timeframe}'. "
f"Use one of: {', '.join(sorted(VALID_TIMEFRAMES))}"
)
# ── fetch ─────────────────────────────────────────────────────────────
try:
prices = _fetch_prices(ticker, timeframe, lookback)
except Exception as e:
raise ValueError(f"Data fetch failed for {ticker}: {e}") from e
bars = len(prices)
if bars < 64:
raise ValueError(
f"Only {bars} bars for {ticker} ({timeframe}) β€” need at least 64."
)
if bars < lookback:
warnings.append(
f"Only {bars} bars available (requested {lookback}). "
"Signal quality at deep levels (D5/D6) may be reduced."
)
# ── data freshness ────────────────────────────────────────────────────
last_bar_time: pd.Timestamp = prices.index[-1]
now = datetime.now(timezone.utc)
lb = last_bar_time
if lb.tzinfo is None:
lb = lb.tz_localize("UTC")
age_min = (now - lb.to_pydatetime()).total_seconds() / 60
if age_min > 30:
warnings.append(
f"Last bar is {age_min:.0f} min old "
"(yfinance free tier ~15 min lag for US equities)"
)
# ── signal ────────────────────────────────────────────────────────────
log_prices = np.log(prices.values)
snap = signal_snapshot(
log_prices,
sig_levels=sig_levels,
slope_window=SLOPE_WINDOW,
vol_window=VOL_WINDOW,
decomp_levels=DECOMP_LEVELS,
)
return LiteSignal(
ticker=ticker,
timeframe=timeframe,
timestamp=datetime.now(timezone.utc),
current_price=float(prices.iloc[-1]),
last_bar_time=last_bar_time,
raw_signal=snap["raw_signal"],
sized_position=snap["sized_position"],
midband_slope=snap["midband_slope"],
vol_ann=snap["vol_ann"],
level_slopes=snap["level_slopes"],
level_signals=snap["level_signals"],
sig_levels=sig_levels,
lookback=lookback,
bars_loaded=bars,
warnings=warnings,
)