quanthedge / backend /app /services /ml /pattern_recognition /pattern_detector.py
jashdoshi77's picture
whole lotta changes
e6021a3
"""
Candlestick Pattern Detector β€” 35+ Patterns.
Institutional-grade pattern detection across single-candle, multi-candle,
and complex chart patterns.
Trading Styles Covered:
- Scalping: sub-minute to minutes, high-frequency entries
- Day Trading / Intraday: same-day open-close positions
- Swing Trading: multi-day to multi-week reversals/continuations
- Positional / Long-Term: weeks to months trend riding
- F&O / Options: volatility-based, breakout/breakdown detection
- Futures: momentum-based, trend-following signals
Market Coverage:
- Equities (US, India, Europe, Asia)
- Forex (major, minor, exotic pairs)
- Crypto (BTC, ETH, altcoins)
- Commodities (gold, oil, agricultural)
- Indices (SPX, NIFTY, DAX)
Each detected pattern returns:
- name, category, direction (bullish/bearish/neutral)
- reliability_score (0-1), body_ratio, shadow_ratios
- trading_styles: list of suitable trading approaches
- markets: list of suitable market types
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
@dataclass
class PatternResult:
"""A detected candlestick or chart pattern."""
name: str
category: str # single, multi, complex
direction: str # bullish, bearish, neutral
reliability: float # 0.0 – 1.0
index: int # bar index where pattern was detected
description: str
trading_styles: List[str] # scalping, intraday, swing, positional, options, futures, forex, crypto, commodities
details: Dict[str, Any]
@property
def hedge_signal(self) -> str:
"""Auto-derive hedge action from pattern direction + reliability.
Returns one of:
- 'hedge_now' β€” Strong bearish signal, open/increase hedge immediately
- 'increase_hedge' β€” Moderate bearish signal, consider tightening protection
- 'reduce_hedge' β€” Strong bullish signal, reduce hedge exposure
- 'hold_hedge' β€” Neutral / indecision, maintain current hedge level
"""
if self.direction == "bearish" and self.reliability >= 0.70:
return "hedge_now"
elif self.direction == "bearish":
return "increase_hedge"
elif self.direction == "bullish" and self.reliability >= 0.70:
return "reduce_hedge"
else:
return "hold_hedge"
@property
def hedge_recommendation(self) -> Dict[str, Any]:
"""Hedge sizing recommendation based on signal strength."""
signal = self.hedge_signal
if signal == "hedge_now":
return {
"action": "hedge_now",
"suggested_hedge_pct": min(50, int(self.reliability * 60)),
"urgency": "high",
"rationale": f"{self.name} ({self.reliability:.0%} reliability) indicates strong downside risk. Protect portfolio with 30-50% hedge allocation.",
}
elif signal == "increase_hedge":
return {
"action": "increase_hedge",
"suggested_hedge_pct": min(30, int(self.reliability * 40)),
"urgency": "medium",
"rationale": f"{self.name} signals bearish pressure. Consider increasing hedge to 15-30%.",
}
elif signal == "reduce_hedge":
return {
"action": "reduce_hedge",
"suggested_hedge_pct": max(5, int((1 - self.reliability) * 20)),
"urgency": "low",
"rationale": f"{self.name} ({self.reliability:.0%} reliability) confirms bullish momentum. Reduce hedge to 5-10% maintenance level.",
}
else:
return {
"action": "hold_hedge",
"suggested_hedge_pct": 15,
"urgency": "none",
"rationale": f"{self.name} is neutral/indecisive. Maintain current hedge level.",
}
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"category": self.category,
"direction": self.direction,
"reliability": round(self.reliability, 4),
"index": self.index,
"description": self.description,
"trading_styles": self.trading_styles,
"hedge_signal": self.hedge_signal,
"hedge_recommendation": self.hedge_recommendation,
"details": self.details,
}
# ── Helper Functions ─────────────────────────────────────────────────────
def _body(o: float, c: float) -> float:
"""Absolute body size."""
return abs(c - o)
def _range(h: float, l: float) -> float:
"""Candle range (high-low)."""
return h - l
def _upper_shadow(o: float, h: float, c: float) -> float:
return h - max(o, c)
def _lower_shadow(o: float, l: float, c: float) -> float:
return min(o, c) - l
def _is_bullish(o: float, c: float) -> bool:
return c > o
def _is_bearish(o: float, c: float) -> bool:
return c < o
def _body_ratio(o: float, h: float, l: float, c: float) -> float:
"""Body as fraction of total range."""
r = _range(h, l)
return _body(o, c) / r if r > 0 else 0
def _avg_body(df: pd.DataFrame, end: int, lookback: int = 14) -> float:
"""Average body size over lookback period."""
start = max(0, end - lookback)
bodies = [_body(df.iloc[i]["Open"], df.iloc[i]["Close"]) for i in range(start, end)]
return np.mean(bodies) if bodies else 1.0
# ── Single-Candle Patterns ───────────────────────────────────────────────
class SingleCandleDetector:
"""Detect single-candle patterns."""
@staticmethod
def detect(df: pd.DataFrame, idx: int, avg_b: float) -> List[PatternResult]:
results: List[PatternResult] = []
if idx < 0 or idx >= len(df):
return results
row = df.iloc[idx]
o, h, l, c = row["Open"], row["High"], row["Low"], row["Close"]
body = _body(o, c)
rng = _range(h, l)
if rng <= 0:
return results
br = body / rng
us = _upper_shadow(o, h, c)
ls = _lower_shadow(o, l, c)
bullish = _is_bullish(o, c)
bearish = _is_bearish(o, c)
# 1. Doji β€” body < 5% of range
if br < 0.05:
# Sub-types
if ls > rng * 0.3 and us < rng * 0.1:
results.append(PatternResult(
name="Dragonfly Doji", category="single", direction="bullish",
reliability=0.72, index=idx,
description="Open and close near high with long lower shadow. Strong bullish reversal when found at support.",
trading_styles=["scalping", "intraday", "swing", "positional", "options", "futures", "forex", "crypto"],
details={"body_ratio": round(br, 4), "lower_shadow_pct": round(ls/rng, 4)}
))
elif us > rng * 0.3 and ls < rng * 0.1:
results.append(PatternResult(
name="Gravestone Doji", category="single", direction="bearish",
reliability=0.71, index=idx,
description="Open and close near low with long upper shadow. Bearish reversal at resistance.",
trading_styles=["scalping", "intraday", "swing", "positional", "options", "futures", "forex", "crypto"],
details={"body_ratio": round(br, 4), "upper_shadow_pct": round(us/rng, 4)}
))
elif us > rng * 0.3 and ls > rng * 0.3:
results.append(PatternResult(
name="Long-Legged Doji", category="single", direction="neutral",
reliability=0.60, index=idx,
description="Equal long shadows on both sides. Indicates extreme indecision β€” high volatility expected.",
trading_styles=["scalping", "intraday", "swing", "options", "futures", "forex", "crypto", "commodities"],
details={"body_ratio": round(br, 4)}
))
else:
results.append(PatternResult(
name="Doji", category="single", direction="neutral",
reliability=0.55, index=idx,
description="Open equals close. Market indecision β€” potential reversal signal when confirmed.",
trading_styles=["scalping", "intraday", "swing", "forex", "crypto"],
details={"body_ratio": round(br, 4)}
))
# 2. Hammer β€” small body at top, long lower shadow (>2x body)
if br < 0.35 and ls >= body * 2 and us < body * 0.5 and body > 0:
results.append(PatternResult(
name="Hammer", category="single", direction="bullish",
reliability=0.75, index=idx,
description="Small body at top, long lower shadow. Classic bullish reversal pattern at support levels.",
trading_styles=["scalping", "intraday", "swing", "positional", "options", "futures", "forex", "crypto", "commodities"],
details={"body_ratio": round(br, 4), "shadow_body_ratio": round(ls/body, 2) if body > 0 else 0}
))
# 3. Hanging Man β€” same shape as hammer but at top of uptrend
if br < 0.35 and ls >= body * 2 and us < body * 0.5 and body > 0:
# Differentiated from hammer by context (checked in multi-candle)
results.append(PatternResult(
name="Hanging Man", category="single", direction="bearish",
reliability=0.65, index=idx,
description="Hammer shape at top of uptrend. Warns of potential trend reversal downward.",
trading_styles=["intraday", "swing", "positional", "options", "futures", "forex", "crypto"],
details={"body_ratio": round(br, 4)}
))
# 4. Inverted Hammer β€” small body at bottom, long upper shadow
if br < 0.35 and us >= body * 2 and ls < body * 0.5 and body > 0:
results.append(PatternResult(
name="Inverted Hammer", category="single", direction="bullish",
reliability=0.65, index=idx,
description="Small body at bottom, long upper shadow. Bullish reversal candidate requiring confirmation.",
trading_styles=["scalping", "intraday", "swing", "options", "forex", "crypto"],
details={"body_ratio": round(br, 4)}
))
# 5. Shooting Star β€” small body at bottom, long upper shadow (bearish)
if br < 0.35 and us >= body * 2 and ls < body * 0.5 and body > 0:
results.append(PatternResult(
name="Shooting Star", category="single", direction="bearish",
reliability=0.72, index=idx,
description="Small body near low, long upper shadow. Strong bearish reversal at resistance.",
trading_styles=["scalping", "intraday", "swing", "positional", "options", "futures", "forex", "crypto", "commodities"],
details={"body_ratio": round(br, 4)}
))
# 6. Marubozu (Bullish) β€” large body, almost no shadows
if br > 0.90 and bullish:
results.append(PatternResult(
name="Bullish Marubozu", category="single", direction="bullish",
reliability=0.78, index=idx,
description="Full bullish candle with no or tiny shadows. Extreme buying pressure, strong continuation signal.",
trading_styles=["scalping", "intraday", "swing", "positional", "futures", "forex", "crypto", "commodities"],
details={"body_ratio": round(br, 4)}
))
# 7. Marubozu (Bearish)
if br > 0.90 and bearish:
results.append(PatternResult(
name="Bearish Marubozu", category="single", direction="bearish",
reliability=0.78, index=idx,
description="Full bearish candle with no or tiny shadows. Extreme selling pressure, strong continuation signal.",
trading_styles=["scalping", "intraday", "swing", "positional", "futures", "forex", "crypto", "commodities"],
details={"body_ratio": round(br, 4)}
))
# 8. Spinning Top β€” small body, shadows on both sides
if 0.05 <= br <= 0.35 and us > body * 0.5 and ls > body * 0.5 and body > 0:
results.append(PatternResult(
name="Spinning Top", category="single", direction="neutral",
reliability=0.45, index=idx,
description="Small body with shadows on both sides. Indecision between buyers and sellers.",
trading_styles=["scalping", "intraday", "swing", "forex", "crypto"],
details={"body_ratio": round(br, 4)}
))
# 9. High Wave Candle β€” very small body, very long shadows
if br < 0.15 and us > rng * 0.35 and ls > rng * 0.35:
results.append(PatternResult(
name="High Wave Candle", category="single", direction="neutral",
reliability=0.55, index=idx,
description="Tiny body with extremely long shadows. Signals major indecision and potential reversal.",
trading_styles=["intraday", "swing", "options", "futures", "forex", "crypto"],
details={"body_ratio": round(br, 4)}
))
# 10. Belt Hold (Bullish) β€” opens at low, closes near high, large body
if bullish and br > 0.6 and ls < rng * 0.05 and body > avg_b * 1.2:
results.append(PatternResult(
name="Bullish Belt Hold", category="single", direction="bullish",
reliability=0.68, index=idx,
description="Opens at/near low, strong close near high. Powerful bullish opening signal.",
trading_styles=["intraday", "swing", "positional", "futures", "forex", "crypto", "commodities"],
details={"body_ratio": round(br, 4), "body_vs_avg": round(body/avg_b, 2) if avg_b > 0 else 0}
))
# 11. Belt Hold (Bearish)
if bearish and br > 0.6 and us < rng * 0.05 and body > avg_b * 1.2:
results.append(PatternResult(
name="Bearish Belt Hold", category="single", direction="bearish",
reliability=0.68, index=idx,
description="Opens at/near high, closes near low. Strong bearish opening signal.",
trading_styles=["intraday", "swing", "positional", "futures", "forex", "crypto", "commodities"],
details={"body_ratio": round(br, 4)}
))
return results
# ── Multi-Candle Patterns ────────────────────────────────────────────────
class MultiCandleDetector:
"""Detect multi-candle patterns (2-5 candle formations)."""
@staticmethod
def detect(df: pd.DataFrame, idx: int, avg_b: float) -> List[PatternResult]:
results: List[PatternResult] = []
if idx < 2 or idx >= len(df):
return results
c0 = df.iloc[idx] # current candle
c1 = df.iloc[idx - 1] # previous candle
c2 = df.iloc[idx - 2] if idx >= 2 else None
o0, h0, l0, close0 = c0["Open"], c0["High"], c0["Low"], c0["Close"]
o1, h1, l1, close1 = c1["Open"], c1["High"], c1["Low"], c1["Close"]
body0 = _body(o0, close0)
body1 = _body(o1, close1)
rng0 = _range(h0, l0)
rng1 = _range(h1, l1)
# 1. Bullish Engulfing
if _is_bearish(o1, close1) and _is_bullish(o0, close0):
if o0 <= close1 and close0 >= o1 and body0 > body1:
results.append(PatternResult(
name="Bullish Engulfing", category="multi", direction="bullish",
reliability=0.82, index=idx,
description="Bullish candle completely engulfs prior bearish candle. One of the strongest reversal patterns.",
trading_styles=["scalping", "intraday", "swing", "positional", "options", "futures", "forex", "crypto", "commodities"],
details={"engulfing_ratio": round(body0/body1, 2) if body1 > 0 else 0}
))
# 2. Bearish Engulfing
if _is_bullish(o1, close1) and _is_bearish(o0, close0):
if o0 >= close1 and close0 <= o1 and body0 > body1:
results.append(PatternResult(
name="Bearish Engulfing", category="multi", direction="bearish",
reliability=0.82, index=idx,
description="Bearish candle completely engulfs prior bullish candle. Strong bearish reversal pattern.",
trading_styles=["scalping", "intraday", "swing", "positional", "options", "futures", "forex", "crypto", "commodities"],
details={"engulfing_ratio": round(body0/body1, 2) if body1 > 0 else 0}
))
# 3. Bullish Harami
if _is_bearish(o1, close1) and _is_bullish(o0, close0):
if o0 >= close1 and close0 <= o1 and body0 < body1 * 0.6:
results.append(PatternResult(
name="Bullish Harami", category="multi", direction="bullish",
reliability=0.62, index=idx,
description="Small bullish candle contained within prior bearish candle. Potential trend reversal.",
trading_styles=["scalping", "intraday", "swing", "options", "forex", "crypto"],
details={"containment_ratio": round(body0/body1, 2) if body1 > 0 else 0}
))
# 4. Bearish Harami
if _is_bullish(o1, close1) and _is_bearish(o0, close0):
if o0 <= close1 and close0 >= o1 and body0 < body1 * 0.6:
results.append(PatternResult(
name="Bearish Harami", category="multi", direction="bearish",
reliability=0.62, index=idx,
description="Small bearish candle contained within prior bullish candle. Potential trend reversal.",
trading_styles=["scalping", "intraday", "swing", "options", "forex", "crypto"],
details={"containment_ratio": round(body0/body1, 2) if body1 > 0 else 0}
))
# 5. Piercing Line
if _is_bearish(o1, close1) and _is_bullish(o0, close0):
mid1 = (o1 + close1) / 2
if o0 < close1 and close0 > mid1 and close0 < o1:
results.append(PatternResult(
name="Piercing Line", category="multi", direction="bullish",
reliability=0.70, index=idx,
description="Opens below prior close, closes above prior midpoint. Bullish reversal pattern.",
trading_styles=["intraday", "swing", "positional", "futures", "forex", "crypto", "commodities"],
details={"penetration_pct": round((close0 - close1) / body1 * 100, 1) if body1 > 0 else 0}
))
# 6. Dark Cloud Cover
if _is_bullish(o1, close1) and _is_bearish(o0, close0):
mid1 = (o1 + close1) / 2
if o0 > close1 and close0 < mid1 and close0 > o1:
results.append(PatternResult(
name="Dark Cloud Cover", category="multi", direction="bearish",
reliability=0.70, index=idx,
description="Opens above prior close, closes below prior midpoint. Bearish reversal signal.",
trading_styles=["intraday", "swing", "positional", "options", "futures", "forex", "crypto", "commodities"],
details={"penetration_pct": round((close1 - close0) / body1 * 100, 1) if body1 > 0 else 0}
))
# 7. Tweezer Top
if abs(h0 - h1) / (rng1 + 1e-8) < 0.02:
if _is_bullish(o1, close1) and _is_bearish(o0, close0):
results.append(PatternResult(
name="Tweezer Top", category="multi", direction="bearish",
reliability=0.68, index=idx,
description="Two candles with matching highs. Resistance confirmed, bearish reversal likely.",
trading_styles=["intraday", "swing", "options", "futures", "forex", "crypto"],
details={"high_diff_pct": round(abs(h0 - h1) / h1 * 100, 4) if h1 > 0 else 0}
))
# 8. Tweezer Bottom
if abs(l0 - l1) / (rng1 + 1e-8) < 0.02:
if _is_bearish(o1, close1) and _is_bullish(o0, close0):
results.append(PatternResult(
name="Tweezer Bottom", category="multi", direction="bullish",
reliability=0.68, index=idx,
description="Two candles with matching lows. Support confirmed, bullish reversal likely.",
trading_styles=["intraday", "swing", "options", "futures", "forex", "crypto"],
details={"low_diff_pct": round(abs(l0 - l1) / l1 * 100, 4) if l1 > 0 else 0}
))
# Three-candle patterns (need c2)
if c2 is not None:
o2, h2, l2, close2 = c2["Open"], c2["High"], c2["Low"], c2["Close"]
body2 = _body(o2, close2)
# 9. Morning Star
if (_is_bearish(o2, close2) and body2 > avg_b * 0.5
and body1 < avg_b * 0.3 # small middle candle
and _is_bullish(o0, close0) and body0 > avg_b * 0.5
and close0 > (o2 + close2) / 2):
results.append(PatternResult(
name="Morning Star", category="multi", direction="bullish",
reliability=0.85, index=idx,
description="Three-candle reversal: bearish, small indecision, strong bullish. One of the most reliable bullish reversals.",
trading_styles=["intraday", "swing", "positional", "options", "futures", "forex", "crypto", "commodities"],
details={"middle_body_ratio": round(body1/avg_b, 2) if avg_b > 0 else 0}
))
# 10. Evening Star
if (_is_bullish(o2, close2) and body2 > avg_b * 0.5
and body1 < avg_b * 0.3
and _is_bearish(o0, close0) and body0 > avg_b * 0.5
and close0 < (o2 + close2) / 2):
results.append(PatternResult(
name="Evening Star", category="multi", direction="bearish",
reliability=0.85, index=idx,
description="Three-candle reversal: bullish, small indecision, strong bearish. One of the most reliable bearish reversals.",
trading_styles=["intraday", "swing", "positional", "options", "futures", "forex", "crypto", "commodities"],
details={"middle_body_ratio": round(body1/avg_b, 2) if avg_b > 0 else 0}
))
# 11. Three White Soldiers
if (all(_is_bullish(df.iloc[idx-j]["Open"], df.iloc[idx-j]["Close"]) for j in range(3))
and close0 > close1 > close2
and all(_body(df.iloc[idx-j]["Open"], df.iloc[idx-j]["Close"]) > avg_b * 0.4 for j in range(3))):
results.append(PatternResult(
name="Three White Soldiers", category="multi", direction="bullish",
reliability=0.80, index=idx,
description="Three consecutive large bullish candles with higher closes. Strong uptrend continuation.",
trading_styles=["swing", "positional", "futures", "forex", "crypto", "commodities"],
details={"avg_body_size": round(np.mean([body0, body1, body2]), 2)}
))
# 12. Three Black Crows
if (all(_is_bearish(df.iloc[idx-j]["Open"], df.iloc[idx-j]["Close"]) for j in range(3))
and close0 < close1 < close2
and all(_body(df.iloc[idx-j]["Open"], df.iloc[idx-j]["Close"]) > avg_b * 0.4 for j in range(3))):
results.append(PatternResult(
name="Three Black Crows", category="multi", direction="bearish",
reliability=0.80, index=idx,
description="Three consecutive large bearish candles with lower closes. Strong downtrend continuation.",
trading_styles=["swing", "positional", "futures", "forex", "crypto", "commodities"],
details={"avg_body_size": round(np.mean([body0, body1, body2]), 2)}
))
# 13. Three Inside Up
if (_is_bearish(o2, close2) and body2 > avg_b * 0.6
and _is_bullish(o1, close1)
and o1 >= close2 and close1 <= o2
and _is_bullish(o0, close0) and close0 > o2):
results.append(PatternResult(
name="Three Inside Up", category="multi", direction="bullish",
reliability=0.76, index=idx,
description="Harami pattern confirmed by third bullish candle closing above first candle. Strong bullish reversal.",
trading_styles=["intraday", "swing", "positional", "options", "futures", "forex", "crypto"],
details={}
))
# 14. Three Inside Down
if (_is_bullish(o2, close2) and body2 > avg_b * 0.6
and _is_bearish(o1, close1)
and o1 <= close2 and close1 >= o2
and _is_bearish(o0, close0) and close0 < o2):
results.append(PatternResult(
name="Three Inside Down", category="multi", direction="bearish",
reliability=0.76, index=idx,
description="Harami pattern confirmed by third bearish candle closing below first candle. Strong bearish reversal.",
trading_styles=["intraday", "swing", "positional", "options", "futures", "forex", "crypto"],
details={}
))
# 15. Abandoned Baby (Bullish)
if (_is_bearish(o2, close2)
and h1 < l2 and h1 < l0 # gap isolation
and body1 < avg_b * 0.15
and _is_bullish(o0, close0)):
results.append(PatternResult(
name="Bullish Abandoned Baby", category="multi", direction="bullish",
reliability=0.88, index=idx,
description="Isolated doji gapped below prior and next candles. Extremely rare and reliable bullish reversal.",
trading_styles=["swing", "positional", "options", "futures", "forex", "crypto"],
details={}
))
# 16. Abandoned Baby (Bearish)
if (_is_bullish(o2, close2)
and l1 > h2 and l1 > h0
and body1 < avg_b * 0.15
and _is_bearish(o0, close0)):
results.append(PatternResult(
name="Bearish Abandoned Baby", category="multi", direction="bearish",
reliability=0.88, index=idx,
description="Isolated doji gapped above prior and next candles. Extremely rare and reliable bearish reversal.",
trading_styles=["swing", "positional", "options", "futures", "forex", "crypto"],
details={}
))
return results
# ── Complex Chart Patterns ───────────────────────────────────────────────
class ComplexPatternDetector:
"""Detect multi-bar chart patterns using rolling window analysis."""
@staticmethod
def detect(df: pd.DataFrame, window: int = 20) -> List[PatternResult]:
results: List[PatternResult] = []
if len(df) < window + 5:
return results
highs = df["High"].values
lows = df["Low"].values
closes = df["Close"].values
n = len(df)
# Scan from window onward
for i in range(window, n):
segment_h = highs[i - window:i + 1]
segment_l = lows[i - window:i + 1]
segment_c = closes[i - window:i + 1]
# Double Top
peaks = []
for j in range(2, len(segment_h) - 2):
if segment_h[j] > segment_h[j-1] and segment_h[j] > segment_h[j-2] \
and segment_h[j] > segment_h[j+1] and segment_h[j] > segment_h[j+2]:
peaks.append((j, segment_h[j]))
if len(peaks) >= 2:
p1, p2 = peaks[-2], peaks[-1]
if abs(p1[1] - p2[1]) / p1[1] < 0.02 and p2[0] - p1[0] >= 5:
trough = min(segment_l[p1[0]:p2[0]+1])
if segment_c[-1] < trough:
results.append(PatternResult(
name="Double Top", category="complex", direction="bearish",
reliability=0.78, index=i,
description="Two peaks at similar levels followed by neckline breakdown. Strong bearish reversal.",
trading_styles=["swing", "positional", "options", "futures", "forex", "crypto", "commodities"],
details={"peak_diff_pct": round(abs(p1[1]-p2[1])/p1[1]*100, 2)}
))
# Double Bottom
troughs = []
for j in range(2, len(segment_l) - 2):
if segment_l[j] < segment_l[j-1] and segment_l[j] < segment_l[j-2] \
and segment_l[j] < segment_l[j+1] and segment_l[j] < segment_l[j+2]:
troughs.append((j, segment_l[j]))
if len(troughs) >= 2:
t1, t2 = troughs[-2], troughs[-1]
if abs(t1[1] - t2[1]) / t1[1] < 0.02 and t2[0] - t1[0] >= 5:
peak = max(segment_h[t1[0]:t2[0]+1])
if segment_c[-1] > peak:
results.append(PatternResult(
name="Double Bottom", category="complex", direction="bullish",
reliability=0.78, index=i,
description="Two troughs at similar levels followed by neckline breakout. Strong bullish reversal.",
trading_styles=["swing", "positional", "options", "futures", "forex", "crypto", "commodities"],
details={"trough_diff_pct": round(abs(t1[1]-t2[1])/t1[1]*100, 2)}
))
# Rising Wedge (bearish)
if len(segment_c) >= 10:
upper_slope = np.polyfit(range(len(segment_h)), segment_h, 1)[0]
lower_slope = np.polyfit(range(len(segment_l)), segment_l, 1)[0]
if upper_slope > 0 and lower_slope > 0 and lower_slope > upper_slope * 0.5:
if upper_slope < lower_slope * 1.5: # converging
results.append(PatternResult(
name="Rising Wedge", category="complex", direction="bearish",
reliability=0.72, index=i,
description="Price making higher highs and higher lows in converging channel. Typically breaks down.",
trading_styles=["swing", "positional", "options", "futures", "forex", "crypto", "commodities"],
details={"upper_slope": round(upper_slope, 6), "lower_slope": round(lower_slope, 6)}
))
# Falling Wedge (bullish)
if len(segment_c) >= 10:
upper_slope = np.polyfit(range(len(segment_h)), segment_h, 1)[0]
lower_slope = np.polyfit(range(len(segment_l)), segment_l, 1)[0]
if upper_slope < 0 and lower_slope < 0 and upper_slope > lower_slope * 0.5:
if lower_slope < upper_slope * 1.5: # converging
results.append(PatternResult(
name="Falling Wedge", category="complex", direction="bullish",
reliability=0.72, index=i,
description="Price making lower highs and lower lows in converging channel. Typically breaks upward.",
trading_styles=["swing", "positional", "options", "futures", "forex", "crypto", "commodities"],
details={"upper_slope": round(upper_slope, 6), "lower_slope": round(lower_slope, 6)}
))
# Bull Flag
if i >= 30:
pre_flag = closes[i-30:i-10]
flag_body = closes[i-10:i+1]
if len(pre_flag) >= 10 and len(flag_body) >= 5:
pre_ret = (pre_flag[-1] - pre_flag[0]) / pre_flag[0] if pre_flag[0] > 0 else 0
flag_slope = np.polyfit(range(len(flag_body)), flag_body, 1)[0]
flag_range = (max(flag_body) - min(flag_body)) / min(flag_body) if min(flag_body) > 0 else 0
if pre_ret > 0.05 and flag_slope < 0 and flag_range < 0.05:
results.append(PatternResult(
name="Bull Flag", category="complex", direction="bullish",
reliability=0.70, index=i,
description="Strong upward move (pole) followed by slight downward consolidation (flag). Continuation pattern.",
trading_styles=["scalping", "intraday", "swing", "options", "futures", "forex", "crypto"],
details={"pole_return_pct": round(pre_ret * 100, 2), "flag_range_pct": round(flag_range * 100, 2)}
))
# Bear Flag
if i >= 30:
pre_flag = closes[i-30:i-10]
flag_body = closes[i-10:i+1]
if len(pre_flag) >= 10 and len(flag_body) >= 5:
pre_ret = (pre_flag[-1] - pre_flag[0]) / pre_flag[0] if pre_flag[0] > 0 else 0
flag_slope = np.polyfit(range(len(flag_body)), flag_body, 1)[0]
flag_range = (max(flag_body) - min(flag_body)) / min(flag_body) if min(flag_body) > 0 else 0
if pre_ret < -0.05 and flag_slope > 0 and flag_range < 0.05:
results.append(PatternResult(
name="Bear Flag", category="complex", direction="bearish",
reliability=0.70, index=i,
description="Strong downward move (pole) followed by slight upward consolidation (flag). Bearish continuation.",
trading_styles=["scalping", "intraday", "swing", "options", "futures", "forex", "crypto"],
details={"pole_return_pct": round(pre_ret * 100, 2)}
))
# Deduplicate: keep only the last occurrence of each pattern
seen = {}
for r in results:
seen[r.name] = r
return list(seen.values())
# ── Master Detector ──────────────────────────────────────────────────────
class CandlestickPatternDetector:
"""
Master pattern detector β€” runs all sub-detectors and returns
a unified list of detected patterns sorted by reliability.
"""
def __init__(self):
self.single = SingleCandleDetector()
self.multi = MultiCandleDetector()
self.complex = ComplexPatternDetector()
def detect_all(
self,
df: pd.DataFrame,
lookback: int = 5,
) -> List[Dict[str, Any]]:
"""
Detect all patterns in the most recent `lookback` candles.
Returns list of pattern dicts sorted by reliability (highest first).
"""
if df.empty or len(df) < 5:
return []
all_patterns: List[PatternResult] = []
n = len(df)
start = max(0, n - lookback)
for idx in range(start, n):
avg_b = _avg_body(df, idx)
# Single-candle
all_patterns.extend(self.single.detect(df, idx, avg_b))
# Multi-candle
all_patterns.extend(self.multi.detect(df, idx, avg_b))
# Complex chart patterns (use wider window)
all_patterns.extend(self.complex.detect(df, window=min(30, len(df) - 5)))
# Deduplicate by name (keep highest reliability instance)
best: Dict[str, PatternResult] = {}
for p in all_patterns:
if p.name not in best or p.reliability > best[p.name].reliability:
best[p.name] = p
# Sort by reliability descending
sorted_patterns = sorted(best.values(), key=lambda x: x.reliability, reverse=True)
return [p.to_dict() for p in sorted_patterns]
def get_pattern_catalog(self) -> List[Dict[str, Any]]:
"""Return catalog of all 35+ supported patterns."""
catalog = [
# Single-candle (12)
{"name": "Doji", "category": "single", "direction": "neutral", "reliability": 0.55, "description": "Open equals close. Market indecision signal."},
{"name": "Dragonfly Doji", "category": "single", "direction": "bullish", "reliability": 0.72, "description": "Open/close at high with long lower shadow. Bullish reversal."},
{"name": "Gravestone Doji", "category": "single", "direction": "bearish", "reliability": 0.71, "description": "Open/close at low with long upper shadow. Bearish reversal."},
{"name": "Long-Legged Doji", "category": "single", "direction": "neutral", "reliability": 0.60, "description": "Equal long shadows. Extreme indecision."},
{"name": "Hammer", "category": "single", "direction": "bullish", "reliability": 0.75, "description": "Small body at top, long lower shadow. Classic bullish reversal."},
{"name": "Hanging Man", "category": "single", "direction": "bearish", "reliability": 0.65, "description": "Hammer at top of uptrend. Bearish warning."},
{"name": "Inverted Hammer", "category": "single", "direction": "bullish", "reliability": 0.65, "description": "Small body at bottom, long upper shadow. Bullish reversal."},
{"name": "Shooting Star", "category": "single", "direction": "bearish", "reliability": 0.72, "description": "Small body near low, long upper shadow. Bearish reversal."},
{"name": "Bullish Marubozu", "category": "single", "direction": "bullish", "reliability": 0.78, "description": "Full bullish body, no shadows. Strong buying pressure."},
{"name": "Bearish Marubozu", "category": "single", "direction": "bearish", "reliability": 0.78, "description": "Full bearish body, no shadows. Strong selling pressure."},
{"name": "Spinning Top", "category": "single", "direction": "neutral", "reliability": 0.45, "description": "Small body, shadows on both sides. Indecision."},
{"name": "High Wave Candle", "category": "single", "direction": "neutral", "reliability": 0.55, "description": "Tiny body, very long shadows. Major indecision."},
{"name": "Bullish Belt Hold", "category": "single", "direction": "bullish", "reliability": 0.68, "description": "Opens at low, strong close near high."},
{"name": "Bearish Belt Hold", "category": "single", "direction": "bearish", "reliability": 0.68, "description": "Opens at high, closes near low."},
# Multi-candle (16)
{"name": "Bullish Engulfing", "category": "multi", "direction": "bullish", "reliability": 0.82, "description": "Bullish candle engulfs prior bearish. Strong reversal."},
{"name": "Bearish Engulfing", "category": "multi", "direction": "bearish", "reliability": 0.82, "description": "Bearish candle engulfs prior bullish. Strong reversal."},
{"name": "Bullish Harami", "category": "multi", "direction": "bullish", "reliability": 0.62, "description": "Small bullish inside prior bearish. Reversal."},
{"name": "Bearish Harami", "category": "multi", "direction": "bearish", "reliability": 0.62, "description": "Small bearish inside prior bullish. Reversal."},
{"name": "Piercing Line", "category": "multi", "direction": "bullish", "reliability": 0.70, "description": "Opens below, closes above prior midpoint."},
{"name": "Dark Cloud Cover", "category": "multi", "direction": "bearish", "reliability": 0.70, "description": "Opens above, closes below prior midpoint."},
{"name": "Tweezer Top", "category": "multi", "direction": "bearish", "reliability": 0.68, "description": "Matching highs confirm resistance."},
{"name": "Tweezer Bottom", "category": "multi", "direction": "bullish", "reliability": 0.68, "description": "Matching lows confirm support."},
{"name": "Morning Star", "category": "multi", "direction": "bullish", "reliability": 0.85, "description": "Three-candle bullish reversal. Highly reliable."},
{"name": "Evening Star", "category": "multi", "direction": "bearish", "reliability": 0.85, "description": "Three-candle bearish reversal. Highly reliable."},
{"name": "Three White Soldiers", "category": "multi", "direction": "bullish", "reliability": 0.80, "description": "Three strong bullish candles. Uptrend continuation."},
{"name": "Three Black Crows", "category": "multi", "direction": "bearish", "reliability": 0.80, "description": "Three strong bearish candles. Downtrend continuation."},
{"name": "Three Inside Up", "category": "multi", "direction": "bullish", "reliability": 0.76, "description": "Confirmed harami breakout. Bullish."},
{"name": "Three Inside Down", "category": "multi", "direction": "bearish", "reliability": 0.76, "description": "Confirmed harami breakdown. Bearish."},
{"name": "Bullish Abandoned Baby", "category": "multi", "direction": "bullish", "reliability": 0.88, "description": "Isolated doji gap. Extremely rare, very bullish."},
{"name": "Bearish Abandoned Baby", "category": "multi", "direction": "bearish", "reliability": 0.88, "description": "Isolated doji gap. Extremely rare, very bearish."},
# Complex (6)
{"name": "Double Top", "category": "complex", "direction": "bearish", "reliability": 0.78, "description": "Two peaks at same level, neckline breakdown."},
{"name": "Double Bottom", "category": "complex", "direction": "bullish", "reliability": 0.78, "description": "Two troughs at same level, neckline breakout."},
{"name": "Rising Wedge", "category": "complex", "direction": "bearish", "reliability": 0.72, "description": "Converging uptrend channel. Typically breaks down."},
{"name": "Falling Wedge", "category": "complex", "direction": "bullish", "reliability": 0.72, "description": "Converging downtrend channel. Typically breaks up."},
{"name": "Bull Flag", "category": "complex", "direction": "bullish", "reliability": 0.70, "description": "Strong up-move then slight consolidation. Continuation."},
{"name": "Bear Flag", "category": "complex", "direction": "bearish", "reliability": 0.70, "description": "Strong down-move then slight consolidation. Continuation."},
]
return catalog
# Module singleton
pattern_detector = CandlestickPatternDetector()