from typing import Dict, Tuple import numpy as np from scipy.signal import find_peaks from data.quant_ohlc_feature_schema import PATTERN_NAMES def _empty_pattern_output() -> Dict[str, float]: out = {f"pattern_{name}_confidence": 0.0 for name in PATTERN_NAMES} out["pattern_available"] = 1.0 return out def _confidence_from_error(error: float, tolerance: float) -> float: if tolerance <= 1e-8: return 0.0 return float(max(0.0, min(1.0, 1.0 - (error / tolerance)))) def _recent_prominent_peaks(series: np.ndarray, distance: int, prominence: float) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: peaks, props = find_peaks(series, distance=distance, prominence=prominence) if peaks.size == 0: return peaks, props order = np.argsort(props["prominences"]) keep = order[-5:] keep_sorted = np.sort(keep) peaks = peaks[keep_sorted] props = {key: value[keep_sorted] for key, value in props.items()} return peaks, props def _double_top_confidence(highs: np.ndarray, current_price: float, tolerance: float) -> float: peaks, props = _recent_prominent_peaks(highs, distance=3, prominence=tolerance * 0.5) if peaks.size < 2: return 0.0 top1_idx, top2_idx = peaks[-2], peaks[-1] top1 = float(highs[top1_idx]) top2 = float(highs[top2_idx]) neckline = float(np.min(highs[top1_idx:top2_idx + 1])) if top2_idx > top1_idx else min(top1, top2) if current_price > max(top1, top2): return 0.0 symmetry = _confidence_from_error(abs(top1 - top2), tolerance) separation = min(1.0, float(top2_idx - top1_idx) / 8.0) breakdown = 1.0 if current_price <= neckline else 0.6 prominence = min(1.0, float(np.mean(props["prominences"][-2:])) / max(tolerance, 1e-8)) return float(max(0.0, min(1.0, symmetry * separation * breakdown * prominence))) def _double_bottom_confidence(lows: np.ndarray, current_price: float, tolerance: float) -> float: troughs, props = _recent_prominent_peaks(-lows, distance=3, prominence=tolerance * 0.5) if troughs.size < 2: return 0.0 low1_idx, low2_idx = troughs[-2], troughs[-1] low1 = float(lows[low1_idx]) low2 = float(lows[low2_idx]) ceiling = float(np.max(lows[low1_idx:low2_idx + 1])) if low2_idx > low1_idx else max(low1, low2) if current_price < min(low1, low2): return 0.0 symmetry = _confidence_from_error(abs(low1 - low2), tolerance) separation = min(1.0, float(low2_idx - low1_idx) / 8.0) breakout = 1.0 if current_price >= ceiling else 0.6 prominence = min(1.0, float(np.mean(props["prominences"][-2:])) / max(tolerance, 1e-8)) return float(max(0.0, min(1.0, symmetry * separation * breakout * prominence))) def _triangle_confidences(highs: np.ndarray, lows: np.ndarray, tolerance: float) -> Dict[str, float]: out = { "ascending_triangle": 0.0, "descending_triangle": 0.0, } peak_idx, _ = _recent_prominent_peaks(highs, distance=3, prominence=tolerance * 0.5) trough_idx, _ = _recent_prominent_peaks(-lows, distance=3, prominence=tolerance * 0.5) if peak_idx.size < 2 or trough_idx.size < 2: return out peak_vals = highs[peak_idx[-3:]] trough_vals = lows[trough_idx[-3:]] peak_slope = np.polyfit(np.arange(len(peak_vals), dtype=np.float64), peak_vals.astype(np.float64), deg=1)[0] trough_slope = np.polyfit(np.arange(len(trough_vals), dtype=np.float64), trough_vals.astype(np.float64), deg=1)[0] peak_flatness = _confidence_from_error(float(np.max(peak_vals) - np.min(peak_vals)), tolerance) trough_flatness = _confidence_from_error(float(np.max(trough_vals) - np.min(trough_vals)), tolerance) out["ascending_triangle"] = float(max(0.0, min(1.0, peak_flatness * max(0.0, trough_slope) / max(tolerance, 1e-8)))) out["descending_triangle"] = float(max(0.0, min(1.0, trough_flatness * max(0.0, -peak_slope) / max(tolerance, 1e-8)))) return out def _head_shoulders_confidence(highs: np.ndarray, lows: np.ndarray, tolerance: float, inverse: bool = False) -> float: series = -lows if inverse else highs pivots, props = _recent_prominent_peaks(series, distance=3, prominence=tolerance * 0.5) if pivots.size < 3: return 0.0 idxs = pivots[-3:] values = series[idxs] left, head, right = [float(v) for v in values] shoulders_match = _confidence_from_error(abs(left - right), tolerance) if inverse: head_margin = max(0.0, min(left, right) - head) else: head_margin = max(0.0, head - max(left, right)) head_score = min(1.0, head_margin / max(tolerance, 1e-8)) spacing = min(1.0, float(min(idxs[1] - idxs[0], idxs[2] - idxs[1])) / 5.0) prominence = min(1.0, float(np.mean(props["prominences"][-3:])) / max(tolerance, 1e-8)) return float(max(0.0, min(1.0, shoulders_match * head_score * spacing * prominence))) def compute_pattern_features(closes, highs, lows, end_idx: int) -> Dict[str, float]: out = _empty_pattern_output() closes_np = np.asarray(closes[: end_idx + 1], dtype=np.float64) highs_np = np.asarray(highs[: end_idx + 1], dtype=np.float64) lows_np = np.asarray(lows[: end_idx + 1], dtype=np.float64) if closes_np.size < 10: return out current_price = float(closes_np[-1]) tolerance = max(float(np.std(closes_np[-20:])) if closes_np.size >= 20 else float(np.std(closes_np)), current_price * 0.003, 1e-5) out["pattern_double_top_confidence"] = _double_top_confidence(highs_np, current_price, tolerance) out["pattern_double_bottom_confidence"] = _double_bottom_confidence(lows_np, current_price, tolerance) triangle = _triangle_confidences(highs_np, lows_np, tolerance) out["pattern_ascending_triangle_confidence"] = triangle["ascending_triangle"] out["pattern_descending_triangle_confidence"] = triangle["descending_triangle"] out["pattern_head_shoulders_confidence"] = _head_shoulders_confidence(highs_np, lows_np, tolerance, inverse=False) out["pattern_inverse_head_shoulders_confidence"] = _head_shoulders_confidence(highs_np, lows_np, tolerance, inverse=True) return out