Dmitry Beresnev
patch pack v1 β€” accumulation regime, counter-trend warnings, conflict detection, data delay marker
3447c07
"""Main pipeline: analyze(ticker, timeframe, equity) β†’ Analysis."""
from __future__ import annotations
from datetime import datetime, timezone
import logging
import numpy as np
from .types import Analysis, Regime
from .data.loader import load_ohlcv, get_current_price, needs_warning, normalize_tf
from .features.volatility import atr, yang_zhang_volatility, compression_score
from .features.regime import hurst_dfa, adx, classify_trend
from .liquidity.kde_profile import build_profile, VolumeProfile
from .liquidity.swing import detect_swings, detect_liquidity_pools
from .scoring.confluence import (
collect_candidate_levels, cluster_levels, score_clusters, clusters_to_levels,
)
from .strategy.setups import build_all_setups
logger = logging.getLogger(__name__)
def analyze(
ticker: str,
timeframe: str,
equity: float = 10_000.0,
max_levels_returned: int = 10,
) -> Analysis:
"""One-shot stateless pipeline.
Args:
ticker: yfinance format (AAPL, BTC-USD, ^GSPC, ES=F, EURUSD=X)
timeframe: 1m/5m/15m/30m/1h/4h/1d/1wk
equity: account size for position sizing
"""
tf = normalize_tf(timeframe)
warnings: list[str] = []
w = needs_warning(tf)
if w:
warnings.append(w)
df = load_ohlcv(ticker, tf)
if len(df) < 50:
raise ValueError(f"Only {len(df)} bars loaded β€” not enough for analysis (need 50+)")
current_price, last_bar_time = get_current_price(df)
now = datetime.now(timezone.utc)
if last_bar_time.tzinfo is None:
last_bar_time = last_bar_time.tz_localize("UTC")
age_min = (now - last_bar_time.to_pydatetime()).total_seconds() / 60
if age_min > 30:
warnings.append(
f"Last bar is {age_min:.0f} min old β€” data delayed "
"(yfinance free tier ~15 min lag for US equities)"
)
# Volatility
atr_value = float(atr(df, period=14).iloc[-1])
if not np.isfinite(atr_value) or atr_value <= 0:
atr_value = (df["High"].max() - df["Low"].min()) * 0.01
atr_pct = atr_value / current_price * 100
yz = yang_zhang_volatility(df, window=min(30, max(5, len(df) // 4)))
yz_clean = yz.dropna()
yz_value = float(yz_clean.iloc[-1]) if len(yz_clean) > 0 else 0.0
if not np.isfinite(yz_value):
yz_value = 0.0
compression = compression_score(df, window=20, lookback=min(100, len(df)))
# Regime
hurst_val = hurst_dfa(df["Close"])
adx_val = float(adx(df, period=14).iloc[-1])
trend = classify_trend(df, adx_val, hurst_val, compression)
regime = Regime(
trend=trend,
hurst=hurst_val,
adx=adx_val,
atr=atr_value,
atr_pct=atr_pct,
yang_zhang_vol=yz_value,
compression=compression,
)
# Volume profile
try:
profile = build_profile(df, grid_size=500, samples_per_bar=10)
except Exception as e:
logger.warning(f"Profile failed for {ticker}: {e}")
warnings.append(f"Volume profile unavailable: {e}")
profile = VolumeProfile(
grid=np.array([current_price]),
density=np.array([1.0]),
poc=current_price, vah=current_price, val=current_price,
hvn=[], lvn=[],
)
# Swings and liquidity pools
swing_lookback = min(300, len(df))
swings = detect_swings(df, atr_multiplier=1.5, lookback=swing_lookback)
pools = detect_liquidity_pools(swings, df.tail(swing_lookback))
# Confluence scoring
candidates = collect_candidate_levels(profile, swings, pools)
clusters = cluster_levels(candidates, atr_value=atr_value, eps_factor=0.20)
clusters = score_clusters(clusters, trend=trend, atr_value=atr_value, current_price=current_price)
levels = clusters_to_levels(clusters)
levels.sort(key=lambda l: l.strength, reverse=True)
levels = levels[:max_levels_returned]
levels.sort(key=lambda l: l.price)
# Setups
setups = build_all_setups(df, levels, regime, current_price, equity)
setups.sort(key=lambda s: s.confidence, reverse=True)
# Conflict detection: opposite directions at close levels in neutral regime
longs = [s for s in setups if s.direction.value == "long"]
shorts = [s for s in setups if s.direction.value == "short"]
if longs and shorts and trend in ("range", "accumulation"):
best_long_entry = max(longs, key=lambda s: s.confidence).entry
best_short_entry = max(shorts, key=lambda s: s.confidence).entry
spread_atr = abs(best_short_entry - best_long_entry) / atr_value if atr_value > 0 else 0
if spread_atr < 6.0 and best_short_entry > best_long_entry:
warnings.append(
f"⚠ Range-trading scenario "
f"(long@{best_long_entry:.2f} ↔ short@{best_short_entry:.2f}). "
"Do not stack opposite directions β€” pick one based on entry trigger."
)
return Analysis(
ticker=ticker,
timeframe=tf,
timestamp=datetime.now(timezone.utc),
current_price=current_price,
regime=regime,
levels=levels,
setups=setups,
warnings=warnings,
bars_loaded=len(df),
)