Spaces:
Sleeping
Sleeping
| # ============================================================================== | |
| # ☁️ r2.py (V41.2 - GEM-Architect: Anti-Reset Safety) | |
| # ============================================================================== | |
| # - Fix: Prevents "Reset to Initial" if read fails due to network/lock errors. | |
| # - Fix: Improved error handling in JSON loaders. | |
| # ============================================================================== | |
| import os | |
| import json | |
| import asyncio | |
| import boto3 | |
| from datetime import datetime | |
| from botocore.exceptions import ClientError | |
| from typing import Dict, Any, Optional | |
| # ============================================================================== | |
| # ⚙️ التكوين والإعدادات | |
| # ============================================================================== | |
| R2_ACCOUNT_ID = os.getenv("R2_ACCOUNT_ID") | |
| R2_ACCESS_KEY_ID = os.getenv("R2_ACCESS_KEY_ID") | |
| R2_SECRET_ACCESS_KEY = os.getenv("R2_SECRET_ACCESS_KEY") | |
| BUCKET_NAME = "trading" | |
| # 🟢 وضع الإنتاج | |
| FILE_PREFIX = "" | |
| INITIAL_CAPITAL = 10.0 | |
| # ============================================================================== | |
| # 📁 مفاتيح الملفات | |
| # ============================================================================== | |
| def get_key(filename): | |
| if FILE_PREFIX and "/" in filename: | |
| folder, name = filename.rsplit("/", 1) | |
| return f"{folder}/{FILE_PREFIX}{name}" | |
| return f"{FILE_PREFIX}{filename}" | |
| PORTFOLIO_STATE_KEY = get_key("portfolio_state.json") | |
| SMART_PORTFOLIO_STATE_KEY = get_key("smart_portfolio_state.json") | |
| OPEN_TRADES_KEY = get_key("open_trades.json") | |
| CLOSED_TRADES_KEY = get_key("closed_trades_history.json") | |
| CONTRACTS_DB_KEY = get_key("contracts.json") | |
| WHALE_LEARNING_PENDING_KEY = get_key("learning_whale_pending_records.json") | |
| WHALE_LEARNING_COMPLETED_KEY = get_key("learning_whale_completed_records.json") | |
| WHALE_LEARNING_CONFIG_KEY = get_key("learning_whale_optimal_config.json") | |
| GOVERNANCE_TRAINING_KEY = get_key("datasets/governance_training_data.json") | |
| STRATEGIC_DNA_KEY = get_key("learning/strategic_dna_v2.json") | |
| DEEP_STEWARD_AUDIT_KEY = get_key("DeepSteward_Audit_Log.json") | |
| DIAGNOSTIC_STATS_KEY = get_key("analytics/model_diagnostic_matrix.json") | |
| GUARDIAN_STATS_KEY = get_key("analytics/guardian_performance_stats.json") | |
| class R2Service: | |
| def __init__(self): | |
| try: | |
| endpoint_url = f"https://{R2_ACCOUNT_ID}.r2.cloudflarestorage.com" | |
| self.s3_client = boto3.client( | |
| 's3', | |
| endpoint_url=endpoint_url, | |
| aws_access_key_id=R2_ACCESS_KEY_ID, | |
| aws_secret_access_key=R2_SECRET_ACCESS_KEY, | |
| ) | |
| self.BUCKET_NAME = BUCKET_NAME | |
| print(f"✅ [R2 V41.2] Service Loaded (PRODUCTION MODE).") | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to initialize S3 client: {e}") | |
| # ============================================================================== | |
| # 🔗 ADAPTERS | |
| # ============================================================================== | |
| async def load_active_trades_async(self) -> Dict[str, Any]: | |
| data = await self.get_open_trades_async() | |
| if not data: return {} | |
| if isinstance(data, list): | |
| converted_dict = {} | |
| for t in data: | |
| if isinstance(t, dict) and 'symbol' in t: | |
| converted_dict[t['symbol']] = t | |
| return converted_dict | |
| if isinstance(data, dict): | |
| return data | |
| return {} | |
| async def save_active_trades_async(self, trades_dict: Dict[str, Any]): | |
| await self.save_open_trades_async(trades_dict) | |
| async def save_trade_history_async(self, trade_record: Dict[str, Any]): | |
| await self.append_to_closed_trades_history(trade_record) | |
| # ============================================================================== | |
| # 🔴 دالة التصفير | |
| # ============================================================================== | |
| async def reset_all_stats_async(self): | |
| try: | |
| print(f"🔄 [R2 Production Reset] جاري مسح البيانات...") | |
| initial_state = { | |
| "current_capital_usd": INITIAL_CAPITAL, | |
| "invested_capital_usd": 0.0, | |
| "allocated_capital_usd": 0.0, | |
| "initial_capital_usd": INITIAL_CAPITAL, | |
| "total_trades": 0, "winning_trades": 0, "losing_trades": 0, | |
| "total_profit_usd": 0.0, "total_loss_usd": 0.0, "win_rate": 0.0, | |
| "daily_net_pnl": 0.0, "is_trading_halted": False, | |
| "last_update": datetime.now().isoformat() | |
| } | |
| await self.save_portfolio_state_async(initial_state) | |
| smart_state = { | |
| "current_capital": INITIAL_CAPITAL, | |
| "allocated_capital_usd": 0.0, | |
| "session_start_balance": INITIAL_CAPITAL, | |
| "daily_net_pnl": 0.0, | |
| "is_trading_halted": False, | |
| "halt_reason": None, | |
| "last_session_reset": datetime.now().isoformat() | |
| } | |
| await self.upload_json_async(smart_state, SMART_PORTFOLIO_STATE_KEY) | |
| empty_list_json = json.dumps([], indent=2).encode('utf-8') | |
| empty_dict_json = json.dumps({}, indent=2).encode('utf-8') | |
| def _reset_sync(): | |
| self.s3_client.put_object(Bucket=BUCKET_NAME, Key=CLOSED_TRADES_KEY, Body=empty_list_json, ContentType="application/json") | |
| self.s3_client.put_object(Bucket=BUCKET_NAME, Key=OPEN_TRADES_KEY, Body=empty_dict_json, ContentType="application/json") | |
| await asyncio.to_thread(_reset_sync) | |
| await self.reset_diagnostic_stats_async() | |
| await self.reset_guardian_stats_async() | |
| print(f"✅ [R2 Reset] تم تصفير النظام بنجاح.") | |
| return True | |
| except Exception as e: | |
| print(f"❌ [R2 Reset Error] فشل التصفير: {e}") | |
| return False | |
| # ============================================================================== | |
| # 📊 Diagnostic Stats | |
| # ============================================================================== | |
| async def get_diagnostic_stats_async(self) -> Dict[str, Any]: | |
| data = await self.get_file_json_async(DIAGNOSTIC_STATS_KEY) | |
| target_keys = ["Pattern", "Oracle", "Sniper", "MonteCarlo_L", "MonteCarlo_A", "News", "Governance"] | |
| defaults = {k: {"wins": 0, "losses": 0, "pnl": 0.0, "profit_accum": 0.0, "loss_accum": 0.0} for k in target_keys} | |
| if not data: return defaults | |
| for k, v in defaults.items(): | |
| if k not in data: data[k] = v | |
| return data | |
| async def update_diagnostic_stats_async(self, updates: Dict[str, Dict[str, float]]): | |
| try: | |
| current = await self.get_diagnostic_stats_async() | |
| for model, metrics in updates.items(): | |
| if model not in current: | |
| current[model] = {"wins": 0, "losses": 0, "pnl": 0.0, "profit_accum": 0.0, "loss_accum": 0.0} | |
| current[model]['wins'] += metrics.get('wins', 0) | |
| current[model]['losses'] += metrics.get('losses', 0) | |
| current[model]['pnl'] += metrics.get('pnl', 0.0) | |
| current[model]['profit_accum'] += metrics.get('profit_accum', 0.0) | |
| current[model]['loss_accum'] += metrics.get('loss_accum', 0.0) | |
| await self.upload_json_async(current, DIAGNOSTIC_STATS_KEY) | |
| except Exception as e: print(f"❌ [R2] Failed to update diagnostics: {e}") | |
| async def reset_diagnostic_stats_async(self): | |
| target_keys = ["Pattern", "Oracle", "Sniper", "MonteCarlo_L", "MonteCarlo_A", "News", "Governance"] | |
| empty = {k: {"wins": 0, "losses": 0, "pnl": 0.0, "profit_accum": 0.0, "loss_accum": 0.0} for k in target_keys} | |
| await self.upload_json_async(empty, DIAGNOSTIC_STATS_KEY) | |
| # ============================================================================== | |
| # Helpers | |
| # ============================================================================== | |
| async def upload_json_async(self, data: Any, key: str): | |
| try: | |
| json_bytes = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8') | |
| await asyncio.to_thread(self.s3_client.put_object, Bucket=self.BUCKET_NAME, Key=key, Body=json_bytes, ContentType="application/json") | |
| except Exception as e: | |
| print(f"❌ [R2] Upload JSON failed for {key}: {e}") | |
| raise | |
| async def get_file_json_async(self, key: str) -> Optional[Any]: | |
| try: | |
| data = await self.get_file_async(key) | |
| if data: | |
| return json.loads(data) | |
| return None | |
| except json.JSONDecodeError: | |
| print(f"⚠️ [R2] Corrupt JSON in {key}. Returning None.") | |
| return None | |
| except Exception as e: | |
| # Don't silence connection errors, print them so we know why it failed | |
| print(f"⚠️ [R2] Error reading JSON from {key}: {e}") | |
| return None | |
| async def get_file_async(self, key: str) -> Optional[bytes]: | |
| try: | |
| response = await asyncio.to_thread(self.s3_client.get_object, Bucket=self.BUCKET_NAME, Key=key) | |
| return response['Body'].read() | |
| except ClientError as e: | |
| # Only return None if Key genuinely doesn't exist. | |
| if e.response['Error']['Code'] == 'NoSuchKey': | |
| return None | |
| print(f"❌ [R2] ClientError reading {key}: {e}") | |
| raise # Raise other errors to prevent accidental overwrites/resets | |
| except Exception as e: | |
| print(f"❌ [R2] General Error reading {key}: {e}") | |
| raise | |
| async def upload_file_async(self, file_obj, key: str): | |
| try: | |
| await asyncio.to_thread(self.s3_client.upload_fileobj, file_obj, self.BUCKET_NAME, key) | |
| except Exception as e: print(f"❌ [R2] File upload failed for {key}: {e}") | |
| # ============================================================================== | |
| # Guardian & Portfolio | |
| # ============================================================================== | |
| async def get_guardian_stats_async(self) -> Dict[str, Any]: | |
| data = await self.get_file_json_async(GUARDIAN_STATS_KEY) | |
| defaults = {k: {"total": 0, "good": 0, "saved": 0.0, "missed": 0.0} for k in ["hybrid", "crash", "giveback", "stagnation"]} | |
| if not data: return defaults | |
| for k, v in defaults.items(): | |
| if k not in data: | |
| data[k] = v | |
| return data | |
| async def save_guardian_stats_async(self, stats): await self.upload_json_async(stats, GUARDIAN_STATS_KEY) | |
| async def reset_guardian_stats_async(self): | |
| defaults = {k: {"total": 0, "good": 0, "saved": 0.0, "missed": 0.0} for k in ["hybrid", "crash", "giveback", "stagnation"]} | |
| await self.upload_json_async(defaults, GUARDIAN_STATS_KEY) | |
| async def append_governance_training_data(self, record): | |
| try: | |
| data = await self.get_file_json_async(GOVERNANCE_TRAINING_KEY) | |
| dataset = data if data else [] | |
| dataset.append(record) | |
| if len(dataset) > 5000: dataset = dataset[-5000:] | |
| await self.upload_json_async(dataset, GOVERNANCE_TRAINING_KEY) | |
| except: pass | |
| async def get_portfolio_state_async(self): | |
| # 🔥 GEM-FIX: Don't assume None means "Initialize". | |
| # Only initialize if file is GENUINELY missing (caught inside get_file_async). | |
| # If get_file_json_async returns None due to error, we might be overwriting good data. | |
| try: | |
| data = await self.get_file_json_async(PORTFOLIO_STATE_KEY) | |
| if data: return data | |
| except Exception as e: | |
| print(f"⚠️ [R2] Failed to load portfolio state (Network/Lock): {e}") | |
| # If we fail to read, DO NOT return initial state, because that resets the wallet. | |
| # Instead, return a temporary safe state or re-raise to stop the cycle. | |
| # Returning None here might be safer than resetting. | |
| # For now, we will proceed to check if it's a NoSuchKey inside get_file_async | |
| pass | |
| # If we are here, it means the file likely doesn't exist (First Run) | |
| print("⚠️ [R2] Portfolio State not found. Creating INITIAL state...") | |
| initial = { | |
| "current_capital_usd": INITIAL_CAPITAL, "allocated_capital_usd": 0.0, | |
| "invested_capital_usd": 0.0, "initial_capital_usd": INITIAL_CAPITAL, | |
| "total_trades": 0, "winning_trades": 0, "losing_trades": 0, | |
| "total_profit_usd": 0.0, "total_loss_usd": 0.0, "win_rate": 0.0, | |
| "daily_net_pnl": 0.0, "is_trading_halted": False, | |
| "last_update": datetime.now().isoformat() | |
| } | |
| await self.save_portfolio_state_async(initial) | |
| return initial | |
| async def save_portfolio_state_async(self, state): | |
| state['last_update'] = datetime.now().isoformat() | |
| await self.upload_json_async(state, PORTFOLIO_STATE_KEY) | |
| async def get_open_trades_async(self): | |
| data = await self.get_file_json_async(OPEN_TRADES_KEY) | |
| return data if data else [] | |
| async def save_open_trades_async(self, trades): | |
| await self.upload_json_async(trades, OPEN_TRADES_KEY) | |
| async def append_to_closed_trades_history(self, trade_data): | |
| try: | |
| # 🔥 GEM-FIX: Use specific try-except to avoid silent failure | |
| data = await self.get_file_json_async(CLOSED_TRADES_KEY) | |
| history = data if data else [] | |
| history.append(trade_data) | |
| if len(history) > 1000: history = history[-1000:] | |
| await self.upload_json_async(history, CLOSED_TRADES_KEY) | |
| except Exception as e: | |
| print(f"❌ [R2] Failed to append trade history: {e}") | |
| async def load_contracts_db_async(self): | |
| data = await self.get_file_json_async(CONTRACTS_DB_KEY) | |
| return data if data else {} | |
| async def append_deep_steward_audit(self, record): | |
| try: | |
| data = await self.get_file_json_async(DEEP_STEWARD_AUDIT_KEY) | |
| history = data if data else [] | |
| history.append(record) | |
| if len(history) > 2000: history = history[-2000:] | |
| await self.upload_json_async(history, DEEP_STEWARD_AUDIT_KEY) | |
| except: pass | |
| async def load_strategy_dna_async(self): | |
| data = await self.get_file_json_async(STRATEGIC_DNA_KEY) | |
| return data if data else {} | |
| async def save_strategy_dna_async(self, dna): await self.upload_json_async(dna, STRATEGIC_DNA_KEY) |