File size: 5,948 Bytes
0821f38
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
"""Data fetching and async orchestration for wavelets_lite."""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone

import numpy as np
import pandas as pd
import yfinance as yf

from .core import signal_snapshot

logger = logging.getLogger(__name__)

# Timeframe β†’ (yfinance interval, bars_per_day multiplier for period calc)
_TF_META: dict[str, tuple[str, float]] = {
    "1h":  ("60m",  6.5),
    "4h":  ("60m",  6.5),   # resampled from 1h
    "1d":  ("1d",   1.0),
    "1wk": ("1wk",  1.0 / 5),
}

VALID_TIMEFRAMES = set(_TF_META)
DEFAULT_LOOKBACK = 512
DEFAULT_TF = "1d"
SIG_LEVELS = [4, 5]
SLOPE_WINDOW = 40
VOL_WINDOW = 60
DECOMP_LEVELS = 6


@dataclass
class LiteSignal:
    ticker: str
    timeframe: str
    timestamp: datetime
    current_price: float
    last_bar_time: pd.Timestamp

    raw_signal: float         # +1.0, -1.0, 0.0
    sized_position: float     # vol-targeted
    midband_slope: float
    vol_ann: float

    level_slopes: dict[str, float]   # D1..D6, A6
    level_signals: dict[str, float]  # sign of each level slope

    sig_levels: list[int]
    lookback: int
    bars_loaded: int
    warnings: list[str] = field(default_factory=list)


def _resample_4h(df_1h: pd.DataFrame) -> pd.DataFrame:
    return df_1h.resample("4h").agg({
        "Open": "first", "High": "max",
        "Low": "min", "Close": "last", "Volume": "sum",
    }).dropna()


def _fetch_prices(ticker: str, timeframe: str, lookback: int) -> pd.Series:
    yf_interval, bars_per_day = _TF_META[timeframe]

    # Request enough bars to cover lookback with margin
    need_days = int(lookback / bars_per_day * 1.4) + 30
    period = f"{min(need_days, 730)}d" if yf_interval != "1wk" else f"{min(need_days * 7, 3650)}d"

    df = yf.download(
        ticker,
        period=period,
        interval=yf_interval,
        auto_adjust=True,
        progress=False,
        threads=False,
    )
    if df.empty:
        raise ValueError(f"No data returned for {ticker} ({timeframe})")

    if isinstance(df.columns, pd.MultiIndex):
        df.columns = df.columns.get_level_values(0)

    df = df[["Open", "High", "Low", "Close", "Volume"]].dropna()

    if timeframe == "4h":
        df = _resample_4h(df)

    close = df["Close"].dropna()
    close.name = ticker
    return close.tail(lookback)


class WaveletLiteAnalyzer:
    """Stateless lightweight wavelet signal analyzer."""

    @staticmethod
    async def analyze(
        ticker: str,
        timeframe: str = DEFAULT_TF,
        lookback: int = DEFAULT_LOOKBACK,
        sig_levels: list[int] | None = None,
    ) -> LiteSignal:
        """Fetch prices and compute MODWT signal snapshot. Non-blocking."""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            None,
            lambda: WaveletLiteAnalyzer._analyze_sync(
                ticker, timeframe, lookback, sig_levels or SIG_LEVELS
            ),
        )

    @staticmethod
    def _analyze_sync(
        ticker: str,
        timeframe: str,
        lookback: int,
        sig_levels: list[int],
    ) -> LiteSignal:
        warnings: list[str] = []

        if timeframe not in VALID_TIMEFRAMES:
            raise ValueError(
                f"Unsupported timeframe '{timeframe}'. "
                f"Use one of: {', '.join(sorted(VALID_TIMEFRAMES))}"
            )

        # ── fetch ─────────────────────────────────────────────────────────────
        try:
            prices = _fetch_prices(ticker, timeframe, lookback)
        except Exception as e:
            raise ValueError(f"Data fetch failed for {ticker}: {e}") from e

        bars = len(prices)
        if bars < 64:
            raise ValueError(
                f"Only {bars} bars for {ticker} ({timeframe}) β€” need at least 64."
            )
        if bars < lookback:
            warnings.append(
                f"Only {bars} bars available (requested {lookback}). "
                "Signal quality at deep levels (D5/D6) may be reduced."
            )

        # ── data freshness ────────────────────────────────────────────────────
        last_bar_time: pd.Timestamp = prices.index[-1]
        now = datetime.now(timezone.utc)
        lb = last_bar_time
        if lb.tzinfo is None:
            lb = lb.tz_localize("UTC")
        age_min = (now - lb.to_pydatetime()).total_seconds() / 60
        if age_min > 30:
            warnings.append(
                f"Last bar is {age_min:.0f} min old "
                "(yfinance free tier ~15 min lag for US equities)"
            )

        # ── signal ────────────────────────────────────────────────────────────
        log_prices = np.log(prices.values)
        snap = signal_snapshot(
            log_prices,
            sig_levels=sig_levels,
            slope_window=SLOPE_WINDOW,
            vol_window=VOL_WINDOW,
            decomp_levels=DECOMP_LEVELS,
        )

        return LiteSignal(
            ticker=ticker,
            timeframe=timeframe,
            timestamp=datetime.now(timezone.utc),
            current_price=float(prices.iloc[-1]),
            last_bar_time=last_bar_time,
            raw_signal=snap["raw_signal"],
            sized_position=snap["sized_position"],
            midband_slope=snap["midband_slope"],
            vol_ann=snap["vol_ann"],
            level_slopes=snap["level_slopes"],
            level_signals=snap["level_signals"],
            sig_levels=sig_levels,
            lookback=lookback,
            bars_loaded=bars,
            warnings=warnings,
        )