"""Main pipeline: analyze(ticker, timeframe, equity) → Analysis.""" from __future__ import annotations from datetime import datetime, timezone import logging import numpy as np from .types import Analysis, Regime from .data.loader import load_ohlcv, get_current_price, needs_warning, normalize_tf from .features.volatility import atr, yang_zhang_volatility, compression_score from .features.regime import hurst_dfa, adx, classify_trend from .liquidity.kde_profile import build_profile, VolumeProfile from .liquidity.swing import detect_swings, detect_liquidity_pools from .scoring.confluence import ( collect_candidate_levels, cluster_levels, score_clusters, clusters_to_levels, ) from .strategy.setups import build_all_setups logger = logging.getLogger(__name__) def analyze( ticker: str, timeframe: str, equity: float = 10_000.0, max_levels_returned: int = 10, ) -> Analysis: """One-shot stateless pipeline. Args: ticker: yfinance format (AAPL, BTC-USD, ^GSPC, ES=F, EURUSD=X) timeframe: 1m/5m/15m/30m/1h/4h/1d/1wk equity: account size for position sizing """ tf = normalize_tf(timeframe) warnings: list[str] = [] w = needs_warning(tf) if w: warnings.append(w) df = load_ohlcv(ticker, tf) if len(df) < 50: raise ValueError(f"Only {len(df)} bars loaded — not enough for analysis (need 50+)") current_price, last_bar_time = get_current_price(df) now = datetime.now(timezone.utc) if last_bar_time.tzinfo is None: last_bar_time = last_bar_time.tz_localize("UTC") age_min = (now - last_bar_time.to_pydatetime()).total_seconds() / 60 if age_min > 30: warnings.append( f"Last bar is {age_min:.0f} min old — data delayed " "(yfinance free tier ~15 min lag for US equities)" ) # Volatility atr_value = float(atr(df, period=14).iloc[-1]) if not np.isfinite(atr_value) or atr_value <= 0: atr_value = (df["High"].max() - df["Low"].min()) * 0.01 atr_pct = atr_value / current_price * 100 yz = yang_zhang_volatility(df, window=min(30, max(5, len(df) // 4))) yz_clean = yz.dropna() yz_value = float(yz_clean.iloc[-1]) if len(yz_clean) > 0 else 0.0 if not np.isfinite(yz_value): yz_value = 0.0 compression = compression_score(df, window=20, lookback=min(100, len(df))) # Regime hurst_val = hurst_dfa(df["Close"]) adx_val = float(adx(df, period=14).iloc[-1]) trend = classify_trend(df, adx_val, hurst_val, compression) regime = Regime( trend=trend, hurst=hurst_val, adx=adx_val, atr=atr_value, atr_pct=atr_pct, yang_zhang_vol=yz_value, compression=compression, ) # Volume profile try: profile = build_profile(df, grid_size=500, samples_per_bar=10) except Exception as e: logger.warning(f"Profile failed for {ticker}: {e}") warnings.append(f"Volume profile unavailable: {e}") profile = VolumeProfile( grid=np.array([current_price]), density=np.array([1.0]), poc=current_price, vah=current_price, val=current_price, hvn=[], lvn=[], ) # Swings and liquidity pools swing_lookback = min(300, len(df)) swings = detect_swings(df, atr_multiplier=1.5, lookback=swing_lookback) pools = detect_liquidity_pools(swings, df.tail(swing_lookback)) # Confluence scoring candidates = collect_candidate_levels(profile, swings, pools) clusters = cluster_levels(candidates, atr_value=atr_value, eps_factor=0.20) clusters = score_clusters(clusters, trend=trend, atr_value=atr_value, current_price=current_price) levels = clusters_to_levels(clusters) levels.sort(key=lambda l: l.strength, reverse=True) levels = levels[:max_levels_returned] levels.sort(key=lambda l: l.price) # Setups setups = build_all_setups(df, levels, regime, current_price, equity) setups.sort(key=lambda s: s.confidence, reverse=True) # Conflict detection: opposite directions at close levels in neutral regime longs = [s for s in setups if s.direction.value == "long"] shorts = [s for s in setups if s.direction.value == "short"] if longs and shorts and trend in ("range", "accumulation"): best_long_entry = max(longs, key=lambda s: s.confidence).entry best_short_entry = max(shorts, key=lambda s: s.confidence).entry spread_atr = abs(best_short_entry - best_long_entry) / atr_value if atr_value > 0 else 0 if spread_atr < 6.0 and best_short_entry > best_long_entry: warnings.append( f"⚠ Range-trading scenario " f"(long@{best_long_entry:.2f} ↔ short@{best_short_entry:.2f}). " "Do not stack opposite directions — pick one based on entry trigger." ) return Analysis( ticker=ticker, timeframe=tf, timestamp=datetime.now(timezone.utc), current_price=current_price, regime=regime, levels=levels, setups=setups, warnings=warnings, bars_loaded=len(df), )