| from typing import Dict, Tuple |
|
|
| import numpy as np |
| from scipy.signal import find_peaks |
|
|
| from data.quant_ohlc_feature_schema import PATTERN_NAMES |
|
|
|
|
| def _empty_pattern_output() -> Dict[str, float]: |
| out = {f"pattern_{name}_confidence": 0.0 for name in PATTERN_NAMES} |
| out["pattern_available"] = 1.0 |
| return out |
|
|
|
|
| def _confidence_from_error(error: float, tolerance: float) -> float: |
| if tolerance <= 1e-8: |
| return 0.0 |
| return float(max(0.0, min(1.0, 1.0 - (error / tolerance)))) |
|
|
|
|
| def _recent_prominent_peaks(series: np.ndarray, distance: int, prominence: float) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: |
| peaks, props = find_peaks(series, distance=distance, prominence=prominence) |
| if peaks.size == 0: |
| return peaks, props |
| order = np.argsort(props["prominences"]) |
| keep = order[-5:] |
| keep_sorted = np.sort(keep) |
| peaks = peaks[keep_sorted] |
| props = {key: value[keep_sorted] for key, value in props.items()} |
| return peaks, props |
|
|
|
|
| def _double_top_confidence(highs: np.ndarray, current_price: float, tolerance: float) -> float: |
| peaks, props = _recent_prominent_peaks(highs, distance=3, prominence=tolerance * 0.5) |
| if peaks.size < 2: |
| return 0.0 |
| top1_idx, top2_idx = peaks[-2], peaks[-1] |
| top1 = float(highs[top1_idx]) |
| top2 = float(highs[top2_idx]) |
| neckline = float(np.min(highs[top1_idx:top2_idx + 1])) if top2_idx > top1_idx else min(top1, top2) |
| if current_price > max(top1, top2): |
| return 0.0 |
| symmetry = _confidence_from_error(abs(top1 - top2), tolerance) |
| separation = min(1.0, float(top2_idx - top1_idx) / 8.0) |
| breakdown = 1.0 if current_price <= neckline else 0.6 |
| prominence = min(1.0, float(np.mean(props["prominences"][-2:])) / max(tolerance, 1e-8)) |
| return float(max(0.0, min(1.0, symmetry * separation * breakdown * prominence))) |
|
|
|
|
| def _double_bottom_confidence(lows: np.ndarray, current_price: float, tolerance: float) -> float: |
| troughs, props = _recent_prominent_peaks(-lows, distance=3, prominence=tolerance * 0.5) |
| if troughs.size < 2: |
| return 0.0 |
| low1_idx, low2_idx = troughs[-2], troughs[-1] |
| low1 = float(lows[low1_idx]) |
| low2 = float(lows[low2_idx]) |
| ceiling = float(np.max(lows[low1_idx:low2_idx + 1])) if low2_idx > low1_idx else max(low1, low2) |
| if current_price < min(low1, low2): |
| return 0.0 |
| symmetry = _confidence_from_error(abs(low1 - low2), tolerance) |
| separation = min(1.0, float(low2_idx - low1_idx) / 8.0) |
| breakout = 1.0 if current_price >= ceiling else 0.6 |
| prominence = min(1.0, float(np.mean(props["prominences"][-2:])) / max(tolerance, 1e-8)) |
| return float(max(0.0, min(1.0, symmetry * separation * breakout * prominence))) |
|
|
|
|
| def _triangle_confidences(highs: np.ndarray, lows: np.ndarray, tolerance: float) -> Dict[str, float]: |
| out = { |
| "ascending_triangle": 0.0, |
| "descending_triangle": 0.0, |
| } |
| peak_idx, _ = _recent_prominent_peaks(highs, distance=3, prominence=tolerance * 0.5) |
| trough_idx, _ = _recent_prominent_peaks(-lows, distance=3, prominence=tolerance * 0.5) |
| if peak_idx.size < 2 or trough_idx.size < 2: |
| return out |
|
|
| peak_vals = highs[peak_idx[-3:]] |
| trough_vals = lows[trough_idx[-3:]] |
| peak_slope = np.polyfit(np.arange(len(peak_vals), dtype=np.float64), peak_vals.astype(np.float64), deg=1)[0] |
| trough_slope = np.polyfit(np.arange(len(trough_vals), dtype=np.float64), trough_vals.astype(np.float64), deg=1)[0] |
| peak_flatness = _confidence_from_error(float(np.max(peak_vals) - np.min(peak_vals)), tolerance) |
| trough_flatness = _confidence_from_error(float(np.max(trough_vals) - np.min(trough_vals)), tolerance) |
|
|
| out["ascending_triangle"] = float(max(0.0, min(1.0, peak_flatness * max(0.0, trough_slope) / max(tolerance, 1e-8)))) |
| out["descending_triangle"] = float(max(0.0, min(1.0, trough_flatness * max(0.0, -peak_slope) / max(tolerance, 1e-8)))) |
| return out |
|
|
|
|
| def _head_shoulders_confidence(highs: np.ndarray, lows: np.ndarray, tolerance: float, inverse: bool = False) -> float: |
| series = -lows if inverse else highs |
| pivots, props = _recent_prominent_peaks(series, distance=3, prominence=tolerance * 0.5) |
| if pivots.size < 3: |
| return 0.0 |
| idxs = pivots[-3:] |
| values = series[idxs] |
| left, head, right = [float(v) for v in values] |
| shoulders_match = _confidence_from_error(abs(left - right), tolerance) |
| if inverse: |
| head_margin = max(0.0, min(left, right) - head) |
| else: |
| head_margin = max(0.0, head - max(left, right)) |
| head_score = min(1.0, head_margin / max(tolerance, 1e-8)) |
| spacing = min(1.0, float(min(idxs[1] - idxs[0], idxs[2] - idxs[1])) / 5.0) |
| prominence = min(1.0, float(np.mean(props["prominences"][-3:])) / max(tolerance, 1e-8)) |
| return float(max(0.0, min(1.0, shoulders_match * head_score * spacing * prominence))) |
|
|
|
|
| def compute_pattern_features(closes, highs, lows, end_idx: int) -> Dict[str, float]: |
| out = _empty_pattern_output() |
| closes_np = np.asarray(closes[: end_idx + 1], dtype=np.float64) |
| highs_np = np.asarray(highs[: end_idx + 1], dtype=np.float64) |
| lows_np = np.asarray(lows[: end_idx + 1], dtype=np.float64) |
| if closes_np.size < 10: |
| return out |
|
|
| current_price = float(closes_np[-1]) |
| tolerance = max(float(np.std(closes_np[-20:])) if closes_np.size >= 20 else float(np.std(closes_np)), current_price * 0.003, 1e-5) |
|
|
| out["pattern_double_top_confidence"] = _double_top_confidence(highs_np, current_price, tolerance) |
| out["pattern_double_bottom_confidence"] = _double_bottom_confidence(lows_np, current_price, tolerance) |
|
|
| triangle = _triangle_confidences(highs_np, lows_np, tolerance) |
| out["pattern_ascending_triangle_confidence"] = triangle["ascending_triangle"] |
| out["pattern_descending_triangle_confidence"] = triangle["descending_triangle"] |
| out["pattern_head_shoulders_confidence"] = _head_shoulders_confidence(highs_np, lows_np, tolerance, inverse=False) |
| out["pattern_inverse_head_shoulders_confidence"] = _head_shoulders_confidence(highs_np, lows_np, tolerance, inverse=True) |
| return out |
|
|