Spaces:
Sleeping
Sleeping
| import os | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| import pandas_ta as ta | |
| import xgboost as xgb | |
| from collections import deque, defaultdict | |
| import traceback | |
| import sys | |
| class GuardianHydra: | |
| """ | |
| GuardianHydra V1.4 (Configurable Verbosity) | |
| - Added set_silent_mode() to suppress logs during Backtesting. | |
| """ | |
| def __init__(self, model_dir): | |
| self.model_dir = model_dir | |
| self.initialized = False | |
| self.models = {} | |
| self.feature_cols = [] | |
| self.verbose = True # β Default to True for Live System | |
| self.smoothing_buffer = defaultdict(lambda: { | |
| 'crash': deque(maxlen=3), | |
| 'giveback': deque(maxlen=3), | |
| 'stagnation': deque(maxlen=3) | |
| }) | |
| self.ATR_PERIOD = 14 | |
| # β Silent check | |
| if self.verbose: print("π² [Hydra X-RAY] Instance Created. Waiting for data...") | |
| def set_silent_mode(self, silent=True): | |
| """ β Control Logging Output (True = No Logs, False = X-RAY Logs) """ | |
| self.verbose = not silent | |
| def initialize(self): | |
| if self.verbose: print(f"π² [Hydra X-RAY] Loading from: {self.model_dir}") | |
| if not os.path.exists(self.model_dir): | |
| if self.verbose: print(f"β [FATAL] Directory missing: {self.model_dir}") | |
| return False | |
| try: | |
| # 1. Load Features | |
| feat_path = os.path.join(self.model_dir, "hydra_features_list.pkl") | |
| if not os.path.exists(feat_path): | |
| if self.verbose: print(f"β Feature list missing: {feat_path}") | |
| return False | |
| self.feature_cols = joblib.load(feat_path) | |
| if self.verbose: print(f"β Features List Loaded ({len(self.feature_cols)} items)") | |
| # 2. Load Models (RAW) | |
| heads = ['crash', 'giveback', 'stagnation'] | |
| for h in heads: | |
| model_path = os.path.join(self.model_dir, f"hydra_head_{h}_raw.json") | |
| if not os.path.exists(model_path): | |
| if self.verbose: print(f"β Model missing: {model_path}") | |
| return False | |
| clf = xgb.XGBClassifier() | |
| clf.load_model(model_path) | |
| self.models[h] = clf | |
| if self.verbose: print(f"β Loaded Head: {h}") | |
| self.initialized = True | |
| if self.verbose: print(f"β [Hydra X-RAY] System Ready.") | |
| return True | |
| except Exception as e: | |
| if self.verbose: print(f"β [Hydra Init Error] {e}") | |
| traceback.print_exc() | |
| return False | |
| def _engineer_features(self, ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_context): | |
| try: | |
| # 1. Check Raw Data Inputs | |
| if not ohlcv_1m or len(ohlcv_1m) < 1: | |
| if self.verbose: print("β οΈ [X-RAY] 1m Data is EMPTY!") | |
| return None | |
| df_1m = pd.DataFrame(ohlcv_1m, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) | |
| # [DIAGNOSTIC 1] Print Input Sample | |
| last_close = df_1m['close'].iloc[-1] | |
| if self.verbose: | |
| print(f"π [Input Check] Last Close: {last_close} | Candles: {len(df_1m)}") | |
| if len(df_1m) < 50: | |
| if self.verbose: print(f"β οΈ [X-RAY] Not enough history: {len(df_1m)} < 50") | |
| return None | |
| # 2. Indicator Calculation | |
| df_1m['atr'] = ta.atr(df_1m['high'], df_1m['low'], df_1m['close'], length=self.ATR_PERIOD) | |
| df_1m['rsi'] = ta.rsi(df_1m['close'], length=14) | |
| # [DIAGNOSTIC 2] Check Indicators | |
| last_rsi = df_1m['rsi'].iloc[-1] | |
| last_atr = df_1m['atr'].iloc[-1] | |
| if (pd.isna(last_rsi) or pd.isna(last_atr)) and self.verbose: | |
| print(f"β οΈ [X-RAY] Indicators are NaN! RSI: {last_rsi}, ATR: {last_atr}") | |
| # ... rest of calculations ... | |
| bb = ta.bbands(df_1m['close'], length=20, std=2) | |
| if bb is not None: | |
| w_col = [c for c in bb.columns if 'BBB' in c] | |
| df_1m['bb_width'] = bb[w_col[0]] if w_col else 0.0 | |
| else: | |
| df_1m['bb_width'] = 0.0 | |
| vol_ma = df_1m['volume'].rolling(50).mean() | |
| df_1m['rel_vol'] = df_1m['volume'] / (vol_ma + 1e-9) | |
| # HTF Mocking if missing | |
| df_5m = pd.DataFrame(ohlcv_5m, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) if ohlcv_5m else pd.DataFrame() | |
| rsi_5m = ta.rsi(df_5m['close'], length=14).iloc[-1] if len(df_5m) > 14 else 50 | |
| df_15m = pd.DataFrame(ohlcv_15m, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) if ohlcv_15m else pd.DataFrame() | |
| rsi_15m = ta.rsi(df_15m['close'], length=14).iloc[-1] if len(df_15m) > 14 else 50 | |
| dist_ema20_1h = 0.0 | |
| if len(df_15m) > 4: | |
| ema20_1h_approx = df_15m['close'].ewm(span=80, adjust=False).mean().iloc[-1] | |
| dist_ema20_1h = (last_close - ema20_1h_approx) / last_close | |
| # Trade Context | |
| entry_price = float(trade_context.get('entry_price', 0.0)) | |
| if entry_price == 0: entry_price = last_close # Fallback | |
| atr_val = last_atr if last_atr > 0 else (entry_price * 0.01) | |
| sl_dist_unit = 1.5 * atr_val | |
| pnl_amt = last_close - entry_price | |
| norm_pnl_r = pnl_amt / sl_dist_unit | |
| duration_mins = trade_context.get('time_in_trade_mins', 10) | |
| highest_price = float(trade_context.get('highest_price', entry_price)) | |
| if highest_price < entry_price: highest_price = entry_price | |
| max_pnl_amt = highest_price - entry_price | |
| max_pnl_r = max_pnl_amt / sl_dist_unit if sl_dist_unit > 0 else 0.0 | |
| # Assemble Vector | |
| feat_dict = { | |
| 'rsi_1m': last_rsi, | |
| 'rsi_5m': rsi_5m, | |
| 'rsi_15m': rsi_15m, | |
| 'bb_width': df_1m['bb_width'].iloc[-1], | |
| 'rel_vol': df_1m['rel_vol'].iloc[-1], | |
| 'dist_ema20_1h': dist_ema20_1h, | |
| 'atr_pct': atr_val / last_close, | |
| 'norm_pnl_r': norm_pnl_r, | |
| 'max_pnl_r': max_pnl_r, | |
| 'dist_tp_atr': 0.0, | |
| 'dist_sl_atr': 0.0, | |
| 'time_in_trade': float(duration_mins), | |
| 'entry_type': 0.0, | |
| 'oracle_conf': 0.8, | |
| 'l2_score': 0.7, | |
| 'target_class': 3.0 | |
| } | |
| vector = pd.DataFrame([feat_dict]) | |
| for col in self.feature_cols: | |
| if col not in vector.columns: | |
| vector[col] = 0.0 | |
| if vector.isnull().values.any(): | |
| if self.verbose: | |
| print("β οΈ [X-RAY] Final Vector contains NaNs! Model will fail or output 0.") | |
| print(vector.iloc[0].to_dict()) | |
| vector = vector.fillna(0) | |
| return vector[self.feature_cols].astype(float) | |
| except Exception as e: | |
| if self.verbose: | |
| print(f"β [X-RAY] Feature Error: {e}") | |
| traceback.print_exc() | |
| return None | |
| def analyze_position(self, symbol, ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_data): | |
| if not self.initialized: | |
| return {'action': 'HOLD', 'reason': 'Not Init'} | |
| try: | |
| features = self._engineer_features(ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_data) | |
| if features is None: | |
| if self.verbose: print(f"π« [X-RAY] {symbol}: Feature Engineering Failed.") | |
| return {'action': 'HOLD', 'reason': 'Feat Fail'} | |
| probs = {} | |
| if self.verbose: print(f"π¬ [X-RAY] Predicting for {symbol}...") | |
| for h in ['crash', 'giveback', 'stagnation']: | |
| try: | |
| full_pred = self.models[h].predict_proba(features) | |
| raw_prob = full_pred[0][1] | |
| probs[h] = raw_prob | |
| if raw_prob > 0.0 and self.verbose: | |
| print(f" π₯ {h.upper()} Non-Zero Prob: {raw_prob:.4f}") | |
| except Exception as e: | |
| if self.verbose: print(f" β Error predicting {h}: {e}") | |
| probs[h] = 0.0 | |
| return self._pkg('HOLD', 0.0, "X-RAY DIAGNOSTIC", probs) | |
| except Exception as e: | |
| if self.verbose: print(f"β [X-RAY] Analyze Error: {e}") | |
| return {'action': 'HOLD', 'reason': 'Error'} | |
| def _pkg(self, action, conf, reason, probs): | |
| return { | |
| 'action': action, | |
| 'confidence': float(conf), | |
| 'reason': reason, | |
| 'probs': {k: float(v) for k, v in probs.items()}, | |
| 'scores': {'v2': probs.get('crash',0), 'v3': probs.get('giveback',0)} | |
| } |