""" Candlestick Pattern Detector — 35+ Patterns. Institutional-grade pattern detection across single-candle, multi-candle, and complex chart patterns. Trading Styles Covered: - Scalping: sub-minute to minutes, high-frequency entries - Day Trading / Intraday: same-day open-close positions - Swing Trading: multi-day to multi-week reversals/continuations - Positional / Long-Term: weeks to months trend riding - F&O / Options: volatility-based, breakout/breakdown detection - Futures: momentum-based, trend-following signals Market Coverage: - Equities (US, India, Europe, Asia) - Forex (major, minor, exotic pairs) - Crypto (BTC, ETH, altcoins) - Commodities (gold, oil, agricultural) - Indices (SPX, NIFTY, DAX) Each detected pattern returns: - name, category, direction (bullish/bearish/neutral) - reliability_score (0-1), body_ratio, shadow_ratios - trading_styles: list of suitable trading approaches - markets: list of suitable market types """ from __future__ import annotations import logging from dataclasses import dataclass from typing import Any, Dict, List, Optional import numpy as np import pandas as pd logger = logging.getLogger(__name__) @dataclass class PatternResult: """A detected candlestick or chart pattern.""" name: str category: str # single, multi, complex direction: str # bullish, bearish, neutral reliability: float # 0.0 – 1.0 index: int # bar index where pattern was detected description: str trading_styles: List[str] # scalping, intraday, swing, positional, options, futures, forex, crypto, commodities details: Dict[str, Any] @property def hedge_signal(self) -> str: """Auto-derive hedge action from pattern direction + reliability. Returns one of: - 'hedge_now' — Strong bearish signal, open/increase hedge immediately - 'increase_hedge' — Moderate bearish signal, consider tightening protection - 'reduce_hedge' — Strong bullish signal, reduce hedge exposure - 'hold_hedge' — Neutral / indecision, maintain current hedge level """ if self.direction == "bearish" and self.reliability >= 0.70: return "hedge_now" elif self.direction == "bearish": return "increase_hedge" elif self.direction == "bullish" and self.reliability >= 0.70: return "reduce_hedge" else: return "hold_hedge" @property def hedge_recommendation(self) -> Dict[str, Any]: """Hedge sizing recommendation based on signal strength.""" signal = self.hedge_signal if signal == "hedge_now": return { "action": "hedge_now", "suggested_hedge_pct": min(50, int(self.reliability * 60)), "urgency": "high", "rationale": f"{self.name} ({self.reliability:.0%} reliability) indicates strong downside risk. Protect portfolio with 30-50% hedge allocation.", } elif signal == "increase_hedge": return { "action": "increase_hedge", "suggested_hedge_pct": min(30, int(self.reliability * 40)), "urgency": "medium", "rationale": f"{self.name} signals bearish pressure. Consider increasing hedge to 15-30%.", } elif signal == "reduce_hedge": return { "action": "reduce_hedge", "suggested_hedge_pct": max(5, int((1 - self.reliability) * 20)), "urgency": "low", "rationale": f"{self.name} ({self.reliability:.0%} reliability) confirms bullish momentum. Reduce hedge to 5-10% maintenance level.", } else: return { "action": "hold_hedge", "suggested_hedge_pct": 15, "urgency": "none", "rationale": f"{self.name} is neutral/indecisive. Maintain current hedge level.", } def to_dict(self) -> Dict[str, Any]: return { "name": self.name, "category": self.category, "direction": self.direction, "reliability": round(self.reliability, 4), "index": self.index, "description": self.description, "trading_styles": self.trading_styles, "hedge_signal": self.hedge_signal, "hedge_recommendation": self.hedge_recommendation, "details": self.details, } # ── Helper Functions ───────────────────────────────────────────────────── def _body(o: float, c: float) -> float: """Absolute body size.""" return abs(c - o) def _range(h: float, l: float) -> float: """Candle range (high-low).""" return h - l def _upper_shadow(o: float, h: float, c: float) -> float: return h - max(o, c) def _lower_shadow(o: float, l: float, c: float) -> float: return min(o, c) - l def _is_bullish(o: float, c: float) -> bool: return c > o def _is_bearish(o: float, c: float) -> bool: return c < o def _body_ratio(o: float, h: float, l: float, c: float) -> float: """Body as fraction of total range.""" r = _range(h, l) return _body(o, c) / r if r > 0 else 0 def _avg_body(df: pd.DataFrame, end: int, lookback: int = 14) -> float: """Average body size over lookback period.""" start = max(0, end - lookback) bodies = [_body(df.iloc[i]["Open"], df.iloc[i]["Close"]) for i in range(start, end)] return np.mean(bodies) if bodies else 1.0 # ── Single-Candle Patterns ─────────────────────────────────────────────── class SingleCandleDetector: """Detect single-candle patterns.""" @staticmethod def detect(df: pd.DataFrame, idx: int, avg_b: float) -> List[PatternResult]: results: List[PatternResult] = [] if idx < 0 or idx >= len(df): return results row = df.iloc[idx] o, h, l, c = row["Open"], row["High"], row["Low"], row["Close"] body = _body(o, c) rng = _range(h, l) if rng <= 0: return results br = body / rng us = _upper_shadow(o, h, c) ls = _lower_shadow(o, l, c) bullish = _is_bullish(o, c) bearish = _is_bearish(o, c) # 1. Doji — body < 5% of range if br < 0.05: # Sub-types if ls > rng * 0.3 and us < rng * 0.1: results.append(PatternResult( name="Dragonfly Doji", category="single", direction="bullish", reliability=0.72, index=idx, description="Open and close near high with long lower shadow. Strong bullish reversal when found at support.", trading_styles=["scalping", "intraday", "swing", "positional", "options", "futures", "forex", "crypto"], details={"body_ratio": round(br, 4), "lower_shadow_pct": round(ls/rng, 4)} )) elif us > rng * 0.3 and ls < rng * 0.1: results.append(PatternResult( name="Gravestone Doji", category="single", direction="bearish", reliability=0.71, index=idx, description="Open and close near low with long upper shadow. Bearish reversal at resistance.", trading_styles=["scalping", "intraday", "swing", "positional", "options", "futures", "forex", "crypto"], details={"body_ratio": round(br, 4), "upper_shadow_pct": round(us/rng, 4)} )) elif us > rng * 0.3 and ls > rng * 0.3: results.append(PatternResult( name="Long-Legged Doji", category="single", direction="neutral", reliability=0.60, index=idx, description="Equal long shadows on both sides. Indicates extreme indecision — high volatility expected.", trading_styles=["scalping", "intraday", "swing", "options", "futures", "forex", "crypto", "commodities"], details={"body_ratio": round(br, 4)} )) else: results.append(PatternResult( name="Doji", category="single", direction="neutral", reliability=0.55, index=idx, description="Open equals close. Market indecision — potential reversal signal when confirmed.", trading_styles=["scalping", "intraday", "swing", "forex", "crypto"], details={"body_ratio": round(br, 4)} )) # 2. Hammer — small body at top, long lower shadow (>2x body) if br < 0.35 and ls >= body * 2 and us < body * 0.5 and body > 0: results.append(PatternResult( name="Hammer", category="single", direction="bullish", reliability=0.75, index=idx, description="Small body at top, long lower shadow. Classic bullish reversal pattern at support levels.", trading_styles=["scalping", "intraday", "swing", "positional", "options", "futures", "forex", "crypto", "commodities"], details={"body_ratio": round(br, 4), "shadow_body_ratio": round(ls/body, 2) if body > 0 else 0} )) # 3. Hanging Man — same shape as hammer but at top of uptrend if br < 0.35 and ls >= body * 2 and us < body * 0.5 and body > 0: # Differentiated from hammer by context (checked in multi-candle) results.append(PatternResult( name="Hanging Man", category="single", direction="bearish", reliability=0.65, index=idx, description="Hammer shape at top of uptrend. Warns of potential trend reversal downward.", trading_styles=["intraday", "swing", "positional", "options", "futures", "forex", "crypto"], details={"body_ratio": round(br, 4)} )) # 4. Inverted Hammer — small body at bottom, long upper shadow if br < 0.35 and us >= body * 2 and ls < body * 0.5 and body > 0: results.append(PatternResult( name="Inverted Hammer", category="single", direction="bullish", reliability=0.65, index=idx, description="Small body at bottom, long upper shadow. Bullish reversal candidate requiring confirmation.", trading_styles=["scalping", "intraday", "swing", "options", "forex", "crypto"], details={"body_ratio": round(br, 4)} )) # 5. Shooting Star — small body at bottom, long upper shadow (bearish) if br < 0.35 and us >= body * 2 and ls < body * 0.5 and body > 0: results.append(PatternResult( name="Shooting Star", category="single", direction="bearish", reliability=0.72, index=idx, description="Small body near low, long upper shadow. Strong bearish reversal at resistance.", trading_styles=["scalping", "intraday", "swing", "positional", "options", "futures", "forex", "crypto", "commodities"], details={"body_ratio": round(br, 4)} )) # 6. Marubozu (Bullish) — large body, almost no shadows if br > 0.90 and bullish: results.append(PatternResult( name="Bullish Marubozu", category="single", direction="bullish", reliability=0.78, index=idx, description="Full bullish candle with no or tiny shadows. Extreme buying pressure, strong continuation signal.", trading_styles=["scalping", "intraday", "swing", "positional", "futures", "forex", "crypto", "commodities"], details={"body_ratio": round(br, 4)} )) # 7. Marubozu (Bearish) if br > 0.90 and bearish: results.append(PatternResult( name="Bearish Marubozu", category="single", direction="bearish", reliability=0.78, index=idx, description="Full bearish candle with no or tiny shadows. Extreme selling pressure, strong continuation signal.", trading_styles=["scalping", "intraday", "swing", "positional", "futures", "forex", "crypto", "commodities"], details={"body_ratio": round(br, 4)} )) # 8. Spinning Top — small body, shadows on both sides if 0.05 <= br <= 0.35 and us > body * 0.5 and ls > body * 0.5 and body > 0: results.append(PatternResult( name="Spinning Top", category="single", direction="neutral", reliability=0.45, index=idx, description="Small body with shadows on both sides. Indecision between buyers and sellers.", trading_styles=["scalping", "intraday", "swing", "forex", "crypto"], details={"body_ratio": round(br, 4)} )) # 9. High Wave Candle — very small body, very long shadows if br < 0.15 and us > rng * 0.35 and ls > rng * 0.35: results.append(PatternResult( name="High Wave Candle", category="single", direction="neutral", reliability=0.55, index=idx, description="Tiny body with extremely long shadows. Signals major indecision and potential reversal.", trading_styles=["intraday", "swing", "options", "futures", "forex", "crypto"], details={"body_ratio": round(br, 4)} )) # 10. Belt Hold (Bullish) — opens at low, closes near high, large body if bullish and br > 0.6 and ls < rng * 0.05 and body > avg_b * 1.2: results.append(PatternResult( name="Bullish Belt Hold", category="single", direction="bullish", reliability=0.68, index=idx, description="Opens at/near low, strong close near high. Powerful bullish opening signal.", trading_styles=["intraday", "swing", "positional", "futures", "forex", "crypto", "commodities"], details={"body_ratio": round(br, 4), "body_vs_avg": round(body/avg_b, 2) if avg_b > 0 else 0} )) # 11. Belt Hold (Bearish) if bearish and br > 0.6 and us < rng * 0.05 and body > avg_b * 1.2: results.append(PatternResult( name="Bearish Belt Hold", category="single", direction="bearish", reliability=0.68, index=idx, description="Opens at/near high, closes near low. Strong bearish opening signal.", trading_styles=["intraday", "swing", "positional", "futures", "forex", "crypto", "commodities"], details={"body_ratio": round(br, 4)} )) return results # ── Multi-Candle Patterns ──────────────────────────────────────────────── class MultiCandleDetector: """Detect multi-candle patterns (2-5 candle formations).""" @staticmethod def detect(df: pd.DataFrame, idx: int, avg_b: float) -> List[PatternResult]: results: List[PatternResult] = [] if idx < 2 or idx >= len(df): return results c0 = df.iloc[idx] # current candle c1 = df.iloc[idx - 1] # previous candle c2 = df.iloc[idx - 2] if idx >= 2 else None o0, h0, l0, close0 = c0["Open"], c0["High"], c0["Low"], c0["Close"] o1, h1, l1, close1 = c1["Open"], c1["High"], c1["Low"], c1["Close"] body0 = _body(o0, close0) body1 = _body(o1, close1) rng0 = _range(h0, l0) rng1 = _range(h1, l1) # 1. Bullish Engulfing if _is_bearish(o1, close1) and _is_bullish(o0, close0): if o0 <= close1 and close0 >= o1 and body0 > body1: results.append(PatternResult( name="Bullish Engulfing", category="multi", direction="bullish", reliability=0.82, index=idx, description="Bullish candle completely engulfs prior bearish candle. One of the strongest reversal patterns.", trading_styles=["scalping", "intraday", "swing", "positional", "options", "futures", "forex", "crypto", "commodities"], details={"engulfing_ratio": round(body0/body1, 2) if body1 > 0 else 0} )) # 2. Bearish Engulfing if _is_bullish(o1, close1) and _is_bearish(o0, close0): if o0 >= close1 and close0 <= o1 and body0 > body1: results.append(PatternResult( name="Bearish Engulfing", category="multi", direction="bearish", reliability=0.82, index=idx, description="Bearish candle completely engulfs prior bullish candle. Strong bearish reversal pattern.", trading_styles=["scalping", "intraday", "swing", "positional", "options", "futures", "forex", "crypto", "commodities"], details={"engulfing_ratio": round(body0/body1, 2) if body1 > 0 else 0} )) # 3. Bullish Harami if _is_bearish(o1, close1) and _is_bullish(o0, close0): if o0 >= close1 and close0 <= o1 and body0 < body1 * 0.6: results.append(PatternResult( name="Bullish Harami", category="multi", direction="bullish", reliability=0.62, index=idx, description="Small bullish candle contained within prior bearish candle. Potential trend reversal.", trading_styles=["scalping", "intraday", "swing", "options", "forex", "crypto"], details={"containment_ratio": round(body0/body1, 2) if body1 > 0 else 0} )) # 4. Bearish Harami if _is_bullish(o1, close1) and _is_bearish(o0, close0): if o0 <= close1 and close0 >= o1 and body0 < body1 * 0.6: results.append(PatternResult( name="Bearish Harami", category="multi", direction="bearish", reliability=0.62, index=idx, description="Small bearish candle contained within prior bullish candle. Potential trend reversal.", trading_styles=["scalping", "intraday", "swing", "options", "forex", "crypto"], details={"containment_ratio": round(body0/body1, 2) if body1 > 0 else 0} )) # 5. Piercing Line if _is_bearish(o1, close1) and _is_bullish(o0, close0): mid1 = (o1 + close1) / 2 if o0 < close1 and close0 > mid1 and close0 < o1: results.append(PatternResult( name="Piercing Line", category="multi", direction="bullish", reliability=0.70, index=idx, description="Opens below prior close, closes above prior midpoint. Bullish reversal pattern.", trading_styles=["intraday", "swing", "positional", "futures", "forex", "crypto", "commodities"], details={"penetration_pct": round((close0 - close1) / body1 * 100, 1) if body1 > 0 else 0} )) # 6. Dark Cloud Cover if _is_bullish(o1, close1) and _is_bearish(o0, close0): mid1 = (o1 + close1) / 2 if o0 > close1 and close0 < mid1 and close0 > o1: results.append(PatternResult( name="Dark Cloud Cover", category="multi", direction="bearish", reliability=0.70, index=idx, description="Opens above prior close, closes below prior midpoint. Bearish reversal signal.", trading_styles=["intraday", "swing", "positional", "options", "futures", "forex", "crypto", "commodities"], details={"penetration_pct": round((close1 - close0) / body1 * 100, 1) if body1 > 0 else 0} )) # 7. Tweezer Top if abs(h0 - h1) / (rng1 + 1e-8) < 0.02: if _is_bullish(o1, close1) and _is_bearish(o0, close0): results.append(PatternResult( name="Tweezer Top", category="multi", direction="bearish", reliability=0.68, index=idx, description="Two candles with matching highs. Resistance confirmed, bearish reversal likely.", trading_styles=["intraday", "swing", "options", "futures", "forex", "crypto"], details={"high_diff_pct": round(abs(h0 - h1) / h1 * 100, 4) if h1 > 0 else 0} )) # 8. Tweezer Bottom if abs(l0 - l1) / (rng1 + 1e-8) < 0.02: if _is_bearish(o1, close1) and _is_bullish(o0, close0): results.append(PatternResult( name="Tweezer Bottom", category="multi", direction="bullish", reliability=0.68, index=idx, description="Two candles with matching lows. Support confirmed, bullish reversal likely.", trading_styles=["intraday", "swing", "options", "futures", "forex", "crypto"], details={"low_diff_pct": round(abs(l0 - l1) / l1 * 100, 4) if l1 > 0 else 0} )) # Three-candle patterns (need c2) if c2 is not None: o2, h2, l2, close2 = c2["Open"], c2["High"], c2["Low"], c2["Close"] body2 = _body(o2, close2) # 9. Morning Star if (_is_bearish(o2, close2) and body2 > avg_b * 0.5 and body1 < avg_b * 0.3 # small middle candle and _is_bullish(o0, close0) and body0 > avg_b * 0.5 and close0 > (o2 + close2) / 2): results.append(PatternResult( name="Morning Star", category="multi", direction="bullish", reliability=0.85, index=idx, description="Three-candle reversal: bearish, small indecision, strong bullish. One of the most reliable bullish reversals.", trading_styles=["intraday", "swing", "positional", "options", "futures", "forex", "crypto", "commodities"], details={"middle_body_ratio": round(body1/avg_b, 2) if avg_b > 0 else 0} )) # 10. Evening Star if (_is_bullish(o2, close2) and body2 > avg_b * 0.5 and body1 < avg_b * 0.3 and _is_bearish(o0, close0) and body0 > avg_b * 0.5 and close0 < (o2 + close2) / 2): results.append(PatternResult( name="Evening Star", category="multi", direction="bearish", reliability=0.85, index=idx, description="Three-candle reversal: bullish, small indecision, strong bearish. One of the most reliable bearish reversals.", trading_styles=["intraday", "swing", "positional", "options", "futures", "forex", "crypto", "commodities"], details={"middle_body_ratio": round(body1/avg_b, 2) if avg_b > 0 else 0} )) # 11. Three White Soldiers if (all(_is_bullish(df.iloc[idx-j]["Open"], df.iloc[idx-j]["Close"]) for j in range(3)) and close0 > close1 > close2 and all(_body(df.iloc[idx-j]["Open"], df.iloc[idx-j]["Close"]) > avg_b * 0.4 for j in range(3))): results.append(PatternResult( name="Three White Soldiers", category="multi", direction="bullish", reliability=0.80, index=idx, description="Three consecutive large bullish candles with higher closes. Strong uptrend continuation.", trading_styles=["swing", "positional", "futures", "forex", "crypto", "commodities"], details={"avg_body_size": round(np.mean([body0, body1, body2]), 2)} )) # 12. Three Black Crows if (all(_is_bearish(df.iloc[idx-j]["Open"], df.iloc[idx-j]["Close"]) for j in range(3)) and close0 < close1 < close2 and all(_body(df.iloc[idx-j]["Open"], df.iloc[idx-j]["Close"]) > avg_b * 0.4 for j in range(3))): results.append(PatternResult( name="Three Black Crows", category="multi", direction="bearish", reliability=0.80, index=idx, description="Three consecutive large bearish candles with lower closes. Strong downtrend continuation.", trading_styles=["swing", "positional", "futures", "forex", "crypto", "commodities"], details={"avg_body_size": round(np.mean([body0, body1, body2]), 2)} )) # 13. Three Inside Up if (_is_bearish(o2, close2) and body2 > avg_b * 0.6 and _is_bullish(o1, close1) and o1 >= close2 and close1 <= o2 and _is_bullish(o0, close0) and close0 > o2): results.append(PatternResult( name="Three Inside Up", category="multi", direction="bullish", reliability=0.76, index=idx, description="Harami pattern confirmed by third bullish candle closing above first candle. Strong bullish reversal.", trading_styles=["intraday", "swing", "positional", "options", "futures", "forex", "crypto"], details={} )) # 14. Three Inside Down if (_is_bullish(o2, close2) and body2 > avg_b * 0.6 and _is_bearish(o1, close1) and o1 <= close2 and close1 >= o2 and _is_bearish(o0, close0) and close0 < o2): results.append(PatternResult( name="Three Inside Down", category="multi", direction="bearish", reliability=0.76, index=idx, description="Harami pattern confirmed by third bearish candle closing below first candle. Strong bearish reversal.", trading_styles=["intraday", "swing", "positional", "options", "futures", "forex", "crypto"], details={} )) # 15. Abandoned Baby (Bullish) if (_is_bearish(o2, close2) and h1 < l2 and h1 < l0 # gap isolation and body1 < avg_b * 0.15 and _is_bullish(o0, close0)): results.append(PatternResult( name="Bullish Abandoned Baby", category="multi", direction="bullish", reliability=0.88, index=idx, description="Isolated doji gapped below prior and next candles. Extremely rare and reliable bullish reversal.", trading_styles=["swing", "positional", "options", "futures", "forex", "crypto"], details={} )) # 16. Abandoned Baby (Bearish) if (_is_bullish(o2, close2) and l1 > h2 and l1 > h0 and body1 < avg_b * 0.15 and _is_bearish(o0, close0)): results.append(PatternResult( name="Bearish Abandoned Baby", category="multi", direction="bearish", reliability=0.88, index=idx, description="Isolated doji gapped above prior and next candles. Extremely rare and reliable bearish reversal.", trading_styles=["swing", "positional", "options", "futures", "forex", "crypto"], details={} )) return results # ── Complex Chart Patterns ─────────────────────────────────────────────── class ComplexPatternDetector: """Detect multi-bar chart patterns using rolling window analysis.""" @staticmethod def detect(df: pd.DataFrame, window: int = 20) -> List[PatternResult]: results: List[PatternResult] = [] if len(df) < window + 5: return results highs = df["High"].values lows = df["Low"].values closes = df["Close"].values n = len(df) # Scan from window onward for i in range(window, n): segment_h = highs[i - window:i + 1] segment_l = lows[i - window:i + 1] segment_c = closes[i - window:i + 1] # Double Top peaks = [] for j in range(2, len(segment_h) - 2): if segment_h[j] > segment_h[j-1] and segment_h[j] > segment_h[j-2] \ and segment_h[j] > segment_h[j+1] and segment_h[j] > segment_h[j+2]: peaks.append((j, segment_h[j])) if len(peaks) >= 2: p1, p2 = peaks[-2], peaks[-1] if abs(p1[1] - p2[1]) / p1[1] < 0.02 and p2[0] - p1[0] >= 5: trough = min(segment_l[p1[0]:p2[0]+1]) if segment_c[-1] < trough: results.append(PatternResult( name="Double Top", category="complex", direction="bearish", reliability=0.78, index=i, description="Two peaks at similar levels followed by neckline breakdown. Strong bearish reversal.", trading_styles=["swing", "positional", "options", "futures", "forex", "crypto", "commodities"], details={"peak_diff_pct": round(abs(p1[1]-p2[1])/p1[1]*100, 2)} )) # Double Bottom troughs = [] for j in range(2, len(segment_l) - 2): if segment_l[j] < segment_l[j-1] and segment_l[j] < segment_l[j-2] \ and segment_l[j] < segment_l[j+1] and segment_l[j] < segment_l[j+2]: troughs.append((j, segment_l[j])) if len(troughs) >= 2: t1, t2 = troughs[-2], troughs[-1] if abs(t1[1] - t2[1]) / t1[1] < 0.02 and t2[0] - t1[0] >= 5: peak = max(segment_h[t1[0]:t2[0]+1]) if segment_c[-1] > peak: results.append(PatternResult( name="Double Bottom", category="complex", direction="bullish", reliability=0.78, index=i, description="Two troughs at similar levels followed by neckline breakout. Strong bullish reversal.", trading_styles=["swing", "positional", "options", "futures", "forex", "crypto", "commodities"], details={"trough_diff_pct": round(abs(t1[1]-t2[1])/t1[1]*100, 2)} )) # Rising Wedge (bearish) if len(segment_c) >= 10: upper_slope = np.polyfit(range(len(segment_h)), segment_h, 1)[0] lower_slope = np.polyfit(range(len(segment_l)), segment_l, 1)[0] if upper_slope > 0 and lower_slope > 0 and lower_slope > upper_slope * 0.5: if upper_slope < lower_slope * 1.5: # converging results.append(PatternResult( name="Rising Wedge", category="complex", direction="bearish", reliability=0.72, index=i, description="Price making higher highs and higher lows in converging channel. Typically breaks down.", trading_styles=["swing", "positional", "options", "futures", "forex", "crypto", "commodities"], details={"upper_slope": round(upper_slope, 6), "lower_slope": round(lower_slope, 6)} )) # Falling Wedge (bullish) if len(segment_c) >= 10: upper_slope = np.polyfit(range(len(segment_h)), segment_h, 1)[0] lower_slope = np.polyfit(range(len(segment_l)), segment_l, 1)[0] if upper_slope < 0 and lower_slope < 0 and upper_slope > lower_slope * 0.5: if lower_slope < upper_slope * 1.5: # converging results.append(PatternResult( name="Falling Wedge", category="complex", direction="bullish", reliability=0.72, index=i, description="Price making lower highs and lower lows in converging channel. Typically breaks upward.", trading_styles=["swing", "positional", "options", "futures", "forex", "crypto", "commodities"], details={"upper_slope": round(upper_slope, 6), "lower_slope": round(lower_slope, 6)} )) # Bull Flag if i >= 30: pre_flag = closes[i-30:i-10] flag_body = closes[i-10:i+1] if len(pre_flag) >= 10 and len(flag_body) >= 5: pre_ret = (pre_flag[-1] - pre_flag[0]) / pre_flag[0] if pre_flag[0] > 0 else 0 flag_slope = np.polyfit(range(len(flag_body)), flag_body, 1)[0] flag_range = (max(flag_body) - min(flag_body)) / min(flag_body) if min(flag_body) > 0 else 0 if pre_ret > 0.05 and flag_slope < 0 and flag_range < 0.05: results.append(PatternResult( name="Bull Flag", category="complex", direction="bullish", reliability=0.70, index=i, description="Strong upward move (pole) followed by slight downward consolidation (flag). Continuation pattern.", trading_styles=["scalping", "intraday", "swing", "options", "futures", "forex", "crypto"], details={"pole_return_pct": round(pre_ret * 100, 2), "flag_range_pct": round(flag_range * 100, 2)} )) # Bear Flag if i >= 30: pre_flag = closes[i-30:i-10] flag_body = closes[i-10:i+1] if len(pre_flag) >= 10 and len(flag_body) >= 5: pre_ret = (pre_flag[-1] - pre_flag[0]) / pre_flag[0] if pre_flag[0] > 0 else 0 flag_slope = np.polyfit(range(len(flag_body)), flag_body, 1)[0] flag_range = (max(flag_body) - min(flag_body)) / min(flag_body) if min(flag_body) > 0 else 0 if pre_ret < -0.05 and flag_slope > 0 and flag_range < 0.05: results.append(PatternResult( name="Bear Flag", category="complex", direction="bearish", reliability=0.70, index=i, description="Strong downward move (pole) followed by slight upward consolidation (flag). Bearish continuation.", trading_styles=["scalping", "intraday", "swing", "options", "futures", "forex", "crypto"], details={"pole_return_pct": round(pre_ret * 100, 2)} )) # Deduplicate: keep only the last occurrence of each pattern seen = {} for r in results: seen[r.name] = r return list(seen.values()) # ── Master Detector ────────────────────────────────────────────────────── class CandlestickPatternDetector: """ Master pattern detector — runs all sub-detectors and returns a unified list of detected patterns sorted by reliability. """ def __init__(self): self.single = SingleCandleDetector() self.multi = MultiCandleDetector() self.complex = ComplexPatternDetector() def detect_all( self, df: pd.DataFrame, lookback: int = 5, ) -> List[Dict[str, Any]]: """ Detect all patterns in the most recent `lookback` candles. Returns list of pattern dicts sorted by reliability (highest first). """ if df.empty or len(df) < 5: return [] all_patterns: List[PatternResult] = [] n = len(df) start = max(0, n - lookback) for idx in range(start, n): avg_b = _avg_body(df, idx) # Single-candle all_patterns.extend(self.single.detect(df, idx, avg_b)) # Multi-candle all_patterns.extend(self.multi.detect(df, idx, avg_b)) # Complex chart patterns (use wider window) all_patterns.extend(self.complex.detect(df, window=min(30, len(df) - 5))) # Deduplicate by name (keep highest reliability instance) best: Dict[str, PatternResult] = {} for p in all_patterns: if p.name not in best or p.reliability > best[p.name].reliability: best[p.name] = p # Sort by reliability descending sorted_patterns = sorted(best.values(), key=lambda x: x.reliability, reverse=True) return [p.to_dict() for p in sorted_patterns] def get_pattern_catalog(self) -> List[Dict[str, Any]]: """Return catalog of all 35+ supported patterns.""" catalog = [ # Single-candle (12) {"name": "Doji", "category": "single", "direction": "neutral", "reliability": 0.55, "description": "Open equals close. Market indecision signal."}, {"name": "Dragonfly Doji", "category": "single", "direction": "bullish", "reliability": 0.72, "description": "Open/close at high with long lower shadow. Bullish reversal."}, {"name": "Gravestone Doji", "category": "single", "direction": "bearish", "reliability": 0.71, "description": "Open/close at low with long upper shadow. Bearish reversal."}, {"name": "Long-Legged Doji", "category": "single", "direction": "neutral", "reliability": 0.60, "description": "Equal long shadows. Extreme indecision."}, {"name": "Hammer", "category": "single", "direction": "bullish", "reliability": 0.75, "description": "Small body at top, long lower shadow. Classic bullish reversal."}, {"name": "Hanging Man", "category": "single", "direction": "bearish", "reliability": 0.65, "description": "Hammer at top of uptrend. Bearish warning."}, {"name": "Inverted Hammer", "category": "single", "direction": "bullish", "reliability": 0.65, "description": "Small body at bottom, long upper shadow. Bullish reversal."}, {"name": "Shooting Star", "category": "single", "direction": "bearish", "reliability": 0.72, "description": "Small body near low, long upper shadow. Bearish reversal."}, {"name": "Bullish Marubozu", "category": "single", "direction": "bullish", "reliability": 0.78, "description": "Full bullish body, no shadows. Strong buying pressure."}, {"name": "Bearish Marubozu", "category": "single", "direction": "bearish", "reliability": 0.78, "description": "Full bearish body, no shadows. Strong selling pressure."}, {"name": "Spinning Top", "category": "single", "direction": "neutral", "reliability": 0.45, "description": "Small body, shadows on both sides. Indecision."}, {"name": "High Wave Candle", "category": "single", "direction": "neutral", "reliability": 0.55, "description": "Tiny body, very long shadows. Major indecision."}, {"name": "Bullish Belt Hold", "category": "single", "direction": "bullish", "reliability": 0.68, "description": "Opens at low, strong close near high."}, {"name": "Bearish Belt Hold", "category": "single", "direction": "bearish", "reliability": 0.68, "description": "Opens at high, closes near low."}, # Multi-candle (16) {"name": "Bullish Engulfing", "category": "multi", "direction": "bullish", "reliability": 0.82, "description": "Bullish candle engulfs prior bearish. Strong reversal."}, {"name": "Bearish Engulfing", "category": "multi", "direction": "bearish", "reliability": 0.82, "description": "Bearish candle engulfs prior bullish. Strong reversal."}, {"name": "Bullish Harami", "category": "multi", "direction": "bullish", "reliability": 0.62, "description": "Small bullish inside prior bearish. Reversal."}, {"name": "Bearish Harami", "category": "multi", "direction": "bearish", "reliability": 0.62, "description": "Small bearish inside prior bullish. Reversal."}, {"name": "Piercing Line", "category": "multi", "direction": "bullish", "reliability": 0.70, "description": "Opens below, closes above prior midpoint."}, {"name": "Dark Cloud Cover", "category": "multi", "direction": "bearish", "reliability": 0.70, "description": "Opens above, closes below prior midpoint."}, {"name": "Tweezer Top", "category": "multi", "direction": "bearish", "reliability": 0.68, "description": "Matching highs confirm resistance."}, {"name": "Tweezer Bottom", "category": "multi", "direction": "bullish", "reliability": 0.68, "description": "Matching lows confirm support."}, {"name": "Morning Star", "category": "multi", "direction": "bullish", "reliability": 0.85, "description": "Three-candle bullish reversal. Highly reliable."}, {"name": "Evening Star", "category": "multi", "direction": "bearish", "reliability": 0.85, "description": "Three-candle bearish reversal. Highly reliable."}, {"name": "Three White Soldiers", "category": "multi", "direction": "bullish", "reliability": 0.80, "description": "Three strong bullish candles. Uptrend continuation."}, {"name": "Three Black Crows", "category": "multi", "direction": "bearish", "reliability": 0.80, "description": "Three strong bearish candles. Downtrend continuation."}, {"name": "Three Inside Up", "category": "multi", "direction": "bullish", "reliability": 0.76, "description": "Confirmed harami breakout. Bullish."}, {"name": "Three Inside Down", "category": "multi", "direction": "bearish", "reliability": 0.76, "description": "Confirmed harami breakdown. Bearish."}, {"name": "Bullish Abandoned Baby", "category": "multi", "direction": "bullish", "reliability": 0.88, "description": "Isolated doji gap. Extremely rare, very bullish."}, {"name": "Bearish Abandoned Baby", "category": "multi", "direction": "bearish", "reliability": 0.88, "description": "Isolated doji gap. Extremely rare, very bearish."}, # Complex (6) {"name": "Double Top", "category": "complex", "direction": "bearish", "reliability": 0.78, "description": "Two peaks at same level, neckline breakdown."}, {"name": "Double Bottom", "category": "complex", "direction": "bullish", "reliability": 0.78, "description": "Two troughs at same level, neckline breakout."}, {"name": "Rising Wedge", "category": "complex", "direction": "bearish", "reliability": 0.72, "description": "Converging uptrend channel. Typically breaks down."}, {"name": "Falling Wedge", "category": "complex", "direction": "bullish", "reliability": 0.72, "description": "Converging downtrend channel. Typically breaks up."}, {"name": "Bull Flag", "category": "complex", "direction": "bullish", "reliability": 0.70, "description": "Strong up-move then slight consolidation. Continuation."}, {"name": "Bear Flag", "category": "complex", "direction": "bearish", "reliability": 0.70, "description": "Strong down-move then slight consolidation. Continuation."}, ] return catalog # Module singleton pattern_detector = CandlestickPatternDetector()