oracle / signals /patterns.py
zirobtc's picture
Upload folder using huggingface_hub
d195287 verified
from typing import Dict, Tuple
import numpy as np
from scipy.signal import find_peaks
from data.quant_ohlc_feature_schema import PATTERN_NAMES
def _empty_pattern_output() -> Dict[str, float]:
out = {f"pattern_{name}_confidence": 0.0 for name in PATTERN_NAMES}
out["pattern_available"] = 1.0
return out
def _confidence_from_error(error: float, tolerance: float) -> float:
if tolerance <= 1e-8:
return 0.0
return float(max(0.0, min(1.0, 1.0 - (error / tolerance))))
def _recent_prominent_peaks(series: np.ndarray, distance: int, prominence: float) -> Tuple[np.ndarray, Dict[str, np.ndarray]]:
peaks, props = find_peaks(series, distance=distance, prominence=prominence)
if peaks.size == 0:
return peaks, props
order = np.argsort(props["prominences"])
keep = order[-5:]
keep_sorted = np.sort(keep)
peaks = peaks[keep_sorted]
props = {key: value[keep_sorted] for key, value in props.items()}
return peaks, props
def _double_top_confidence(highs: np.ndarray, current_price: float, tolerance: float) -> float:
peaks, props = _recent_prominent_peaks(highs, distance=3, prominence=tolerance * 0.5)
if peaks.size < 2:
return 0.0
top1_idx, top2_idx = peaks[-2], peaks[-1]
top1 = float(highs[top1_idx])
top2 = float(highs[top2_idx])
neckline = float(np.min(highs[top1_idx:top2_idx + 1])) if top2_idx > top1_idx else min(top1, top2)
if current_price > max(top1, top2):
return 0.0
symmetry = _confidence_from_error(abs(top1 - top2), tolerance)
separation = min(1.0, float(top2_idx - top1_idx) / 8.0)
breakdown = 1.0 if current_price <= neckline else 0.6
prominence = min(1.0, float(np.mean(props["prominences"][-2:])) / max(tolerance, 1e-8))
return float(max(0.0, min(1.0, symmetry * separation * breakdown * prominence)))
def _double_bottom_confidence(lows: np.ndarray, current_price: float, tolerance: float) -> float:
troughs, props = _recent_prominent_peaks(-lows, distance=3, prominence=tolerance * 0.5)
if troughs.size < 2:
return 0.0
low1_idx, low2_idx = troughs[-2], troughs[-1]
low1 = float(lows[low1_idx])
low2 = float(lows[low2_idx])
ceiling = float(np.max(lows[low1_idx:low2_idx + 1])) if low2_idx > low1_idx else max(low1, low2)
if current_price < min(low1, low2):
return 0.0
symmetry = _confidence_from_error(abs(low1 - low2), tolerance)
separation = min(1.0, float(low2_idx - low1_idx) / 8.0)
breakout = 1.0 if current_price >= ceiling else 0.6
prominence = min(1.0, float(np.mean(props["prominences"][-2:])) / max(tolerance, 1e-8))
return float(max(0.0, min(1.0, symmetry * separation * breakout * prominence)))
def _triangle_confidences(highs: np.ndarray, lows: np.ndarray, tolerance: float) -> Dict[str, float]:
out = {
"ascending_triangle": 0.0,
"descending_triangle": 0.0,
}
peak_idx, _ = _recent_prominent_peaks(highs, distance=3, prominence=tolerance * 0.5)
trough_idx, _ = _recent_prominent_peaks(-lows, distance=3, prominence=tolerance * 0.5)
if peak_idx.size < 2 or trough_idx.size < 2:
return out
peak_vals = highs[peak_idx[-3:]]
trough_vals = lows[trough_idx[-3:]]
peak_slope = np.polyfit(np.arange(len(peak_vals), dtype=np.float64), peak_vals.astype(np.float64), deg=1)[0]
trough_slope = np.polyfit(np.arange(len(trough_vals), dtype=np.float64), trough_vals.astype(np.float64), deg=1)[0]
peak_flatness = _confidence_from_error(float(np.max(peak_vals) - np.min(peak_vals)), tolerance)
trough_flatness = _confidence_from_error(float(np.max(trough_vals) - np.min(trough_vals)), tolerance)
out["ascending_triangle"] = float(max(0.0, min(1.0, peak_flatness * max(0.0, trough_slope) / max(tolerance, 1e-8))))
out["descending_triangle"] = float(max(0.0, min(1.0, trough_flatness * max(0.0, -peak_slope) / max(tolerance, 1e-8))))
return out
def _head_shoulders_confidence(highs: np.ndarray, lows: np.ndarray, tolerance: float, inverse: bool = False) -> float:
series = -lows if inverse else highs
pivots, props = _recent_prominent_peaks(series, distance=3, prominence=tolerance * 0.5)
if pivots.size < 3:
return 0.0
idxs = pivots[-3:]
values = series[idxs]
left, head, right = [float(v) for v in values]
shoulders_match = _confidence_from_error(abs(left - right), tolerance)
if inverse:
head_margin = max(0.0, min(left, right) - head)
else:
head_margin = max(0.0, head - max(left, right))
head_score = min(1.0, head_margin / max(tolerance, 1e-8))
spacing = min(1.0, float(min(idxs[1] - idxs[0], idxs[2] - idxs[1])) / 5.0)
prominence = min(1.0, float(np.mean(props["prominences"][-3:])) / max(tolerance, 1e-8))
return float(max(0.0, min(1.0, shoulders_match * head_score * spacing * prominence)))
def compute_pattern_features(closes, highs, lows, end_idx: int) -> Dict[str, float]:
out = _empty_pattern_output()
closes_np = np.asarray(closes[: end_idx + 1], dtype=np.float64)
highs_np = np.asarray(highs[: end_idx + 1], dtype=np.float64)
lows_np = np.asarray(lows[: end_idx + 1], dtype=np.float64)
if closes_np.size < 10:
return out
current_price = float(closes_np[-1])
tolerance = max(float(np.std(closes_np[-20:])) if closes_np.size >= 20 else float(np.std(closes_np)), current_price * 0.003, 1e-5)
out["pattern_double_top_confidence"] = _double_top_confidence(highs_np, current_price, tolerance)
out["pattern_double_bottom_confidence"] = _double_bottom_confidence(lows_np, current_price, tolerance)
triangle = _triangle_confidences(highs_np, lows_np, tolerance)
out["pattern_ascending_triangle_confidence"] = triangle["ascending_triangle"]
out["pattern_descending_triangle_confidence"] = triangle["descending_triangle"]
out["pattern_head_shoulders_confidence"] = _head_shoulders_confidence(highs_np, lows_np, tolerance, inverse=False)
out["pattern_inverse_head_shoulders_confidence"] = _head_shoulders_confidence(highs_np, lows_np, tolerance, inverse=True)
return out