Trad / r2.py
Riy777's picture
Update r2.py
6c8c847 verified
# ==============================================================================
# ☁️ r2.py (V41.2 - GEM-Architect: Anti-Reset Safety)
# ==============================================================================
# - Fix: Prevents "Reset to Initial" if read fails due to network/lock errors.
# - Fix: Improved error handling in JSON loaders.
# ==============================================================================
import os
import json
import asyncio
import boto3
from datetime import datetime
from botocore.exceptions import ClientError
from typing import Dict, Any, Optional
# ==============================================================================
# ⚙️ التكوين والإعدادات
# ==============================================================================
R2_ACCOUNT_ID = os.getenv("R2_ACCOUNT_ID")
R2_ACCESS_KEY_ID = os.getenv("R2_ACCESS_KEY_ID")
R2_SECRET_ACCESS_KEY = os.getenv("R2_SECRET_ACCESS_KEY")
BUCKET_NAME = "trading"
# 🟢 وضع الإنتاج
FILE_PREFIX = ""
INITIAL_CAPITAL = 10.0
# ==============================================================================
# 📁 مفاتيح الملفات
# ==============================================================================
def get_key(filename):
if FILE_PREFIX and "/" in filename:
folder, name = filename.rsplit("/", 1)
return f"{folder}/{FILE_PREFIX}{name}"
return f"{FILE_PREFIX}{filename}"
PORTFOLIO_STATE_KEY = get_key("portfolio_state.json")
SMART_PORTFOLIO_STATE_KEY = get_key("smart_portfolio_state.json")
OPEN_TRADES_KEY = get_key("open_trades.json")
CLOSED_TRADES_KEY = get_key("closed_trades_history.json")
CONTRACTS_DB_KEY = get_key("contracts.json")
WHALE_LEARNING_PENDING_KEY = get_key("learning_whale_pending_records.json")
WHALE_LEARNING_COMPLETED_KEY = get_key("learning_whale_completed_records.json")
WHALE_LEARNING_CONFIG_KEY = get_key("learning_whale_optimal_config.json")
GOVERNANCE_TRAINING_KEY = get_key("datasets/governance_training_data.json")
STRATEGIC_DNA_KEY = get_key("learning/strategic_dna_v2.json")
DEEP_STEWARD_AUDIT_KEY = get_key("DeepSteward_Audit_Log.json")
DIAGNOSTIC_STATS_KEY = get_key("analytics/model_diagnostic_matrix.json")
GUARDIAN_STATS_KEY = get_key("analytics/guardian_performance_stats.json")
class R2Service:
def __init__(self):
try:
endpoint_url = f"https://{R2_ACCOUNT_ID}.r2.cloudflarestorage.com"
self.s3_client = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=R2_ACCESS_KEY_ID,
aws_secret_access_key=R2_SECRET_ACCESS_KEY,
)
self.BUCKET_NAME = BUCKET_NAME
print(f"✅ [R2 V41.2] Service Loaded (PRODUCTION MODE).")
except Exception as e:
raise RuntimeError(f"Failed to initialize S3 client: {e}")
# ==============================================================================
# 🔗 ADAPTERS
# ==============================================================================
async def load_active_trades_async(self) -> Dict[str, Any]:
data = await self.get_open_trades_async()
if not data: return {}
if isinstance(data, list):
converted_dict = {}
for t in data:
if isinstance(t, dict) and 'symbol' in t:
converted_dict[t['symbol']] = t
return converted_dict
if isinstance(data, dict):
return data
return {}
async def save_active_trades_async(self, trades_dict: Dict[str, Any]):
await self.save_open_trades_async(trades_dict)
async def save_trade_history_async(self, trade_record: Dict[str, Any]):
await self.append_to_closed_trades_history(trade_record)
# ==============================================================================
# 🔴 دالة التصفير
# ==============================================================================
async def reset_all_stats_async(self):
try:
print(f"🔄 [R2 Production Reset] جاري مسح البيانات...")
initial_state = {
"current_capital_usd": INITIAL_CAPITAL,
"invested_capital_usd": 0.0,
"allocated_capital_usd": 0.0,
"initial_capital_usd": INITIAL_CAPITAL,
"total_trades": 0, "winning_trades": 0, "losing_trades": 0,
"total_profit_usd": 0.0, "total_loss_usd": 0.0, "win_rate": 0.0,
"daily_net_pnl": 0.0, "is_trading_halted": False,
"last_update": datetime.now().isoformat()
}
await self.save_portfolio_state_async(initial_state)
smart_state = {
"current_capital": INITIAL_CAPITAL,
"allocated_capital_usd": 0.0,
"session_start_balance": INITIAL_CAPITAL,
"daily_net_pnl": 0.0,
"is_trading_halted": False,
"halt_reason": None,
"last_session_reset": datetime.now().isoformat()
}
await self.upload_json_async(smart_state, SMART_PORTFOLIO_STATE_KEY)
empty_list_json = json.dumps([], indent=2).encode('utf-8')
empty_dict_json = json.dumps({}, indent=2).encode('utf-8')
def _reset_sync():
self.s3_client.put_object(Bucket=BUCKET_NAME, Key=CLOSED_TRADES_KEY, Body=empty_list_json, ContentType="application/json")
self.s3_client.put_object(Bucket=BUCKET_NAME, Key=OPEN_TRADES_KEY, Body=empty_dict_json, ContentType="application/json")
await asyncio.to_thread(_reset_sync)
await self.reset_diagnostic_stats_async()
await self.reset_guardian_stats_async()
print(f"✅ [R2 Reset] تم تصفير النظام بنجاح.")
return True
except Exception as e:
print(f"❌ [R2 Reset Error] فشل التصفير: {e}")
return False
# ==============================================================================
# 📊 Diagnostic Stats
# ==============================================================================
async def get_diagnostic_stats_async(self) -> Dict[str, Any]:
data = await self.get_file_json_async(DIAGNOSTIC_STATS_KEY)
target_keys = ["Pattern", "Oracle", "Sniper", "MonteCarlo_L", "MonteCarlo_A", "News", "Governance"]
defaults = {k: {"wins": 0, "losses": 0, "pnl": 0.0, "profit_accum": 0.0, "loss_accum": 0.0} for k in target_keys}
if not data: return defaults
for k, v in defaults.items():
if k not in data: data[k] = v
return data
async def update_diagnostic_stats_async(self, updates: Dict[str, Dict[str, float]]):
try:
current = await self.get_diagnostic_stats_async()
for model, metrics in updates.items():
if model not in current:
current[model] = {"wins": 0, "losses": 0, "pnl": 0.0, "profit_accum": 0.0, "loss_accum": 0.0}
current[model]['wins'] += metrics.get('wins', 0)
current[model]['losses'] += metrics.get('losses', 0)
current[model]['pnl'] += metrics.get('pnl', 0.0)
current[model]['profit_accum'] += metrics.get('profit_accum', 0.0)
current[model]['loss_accum'] += metrics.get('loss_accum', 0.0)
await self.upload_json_async(current, DIAGNOSTIC_STATS_KEY)
except Exception as e: print(f"❌ [R2] Failed to update diagnostics: {e}")
async def reset_diagnostic_stats_async(self):
target_keys = ["Pattern", "Oracle", "Sniper", "MonteCarlo_L", "MonteCarlo_A", "News", "Governance"]
empty = {k: {"wins": 0, "losses": 0, "pnl": 0.0, "profit_accum": 0.0, "loss_accum": 0.0} for k in target_keys}
await self.upload_json_async(empty, DIAGNOSTIC_STATS_KEY)
# ==============================================================================
# Helpers
# ==============================================================================
async def upload_json_async(self, data: Any, key: str):
try:
json_bytes = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8')
await asyncio.to_thread(self.s3_client.put_object, Bucket=self.BUCKET_NAME, Key=key, Body=json_bytes, ContentType="application/json")
except Exception as e:
print(f"❌ [R2] Upload JSON failed for {key}: {e}")
raise
async def get_file_json_async(self, key: str) -> Optional[Any]:
try:
data = await self.get_file_async(key)
if data:
return json.loads(data)
return None
except json.JSONDecodeError:
print(f"⚠️ [R2] Corrupt JSON in {key}. Returning None.")
return None
except Exception as e:
# Don't silence connection errors, print them so we know why it failed
print(f"⚠️ [R2] Error reading JSON from {key}: {e}")
return None
async def get_file_async(self, key: str) -> Optional[bytes]:
try:
response = await asyncio.to_thread(self.s3_client.get_object, Bucket=self.BUCKET_NAME, Key=key)
return response['Body'].read()
except ClientError as e:
# Only return None if Key genuinely doesn't exist.
if e.response['Error']['Code'] == 'NoSuchKey':
return None
print(f"❌ [R2] ClientError reading {key}: {e}")
raise # Raise other errors to prevent accidental overwrites/resets
except Exception as e:
print(f"❌ [R2] General Error reading {key}: {e}")
raise
async def upload_file_async(self, file_obj, key: str):
try:
await asyncio.to_thread(self.s3_client.upload_fileobj, file_obj, self.BUCKET_NAME, key)
except Exception as e: print(f"❌ [R2] File upload failed for {key}: {e}")
# ==============================================================================
# Guardian & Portfolio
# ==============================================================================
async def get_guardian_stats_async(self) -> Dict[str, Any]:
data = await self.get_file_json_async(GUARDIAN_STATS_KEY)
defaults = {k: {"total": 0, "good": 0, "saved": 0.0, "missed": 0.0} for k in ["hybrid", "crash", "giveback", "stagnation"]}
if not data: return defaults
for k, v in defaults.items():
if k not in data:
data[k] = v
return data
async def save_guardian_stats_async(self, stats): await self.upload_json_async(stats, GUARDIAN_STATS_KEY)
async def reset_guardian_stats_async(self):
defaults = {k: {"total": 0, "good": 0, "saved": 0.0, "missed": 0.0} for k in ["hybrid", "crash", "giveback", "stagnation"]}
await self.upload_json_async(defaults, GUARDIAN_STATS_KEY)
async def append_governance_training_data(self, record):
try:
data = await self.get_file_json_async(GOVERNANCE_TRAINING_KEY)
dataset = data if data else []
dataset.append(record)
if len(dataset) > 5000: dataset = dataset[-5000:]
await self.upload_json_async(dataset, GOVERNANCE_TRAINING_KEY)
except: pass
async def get_portfolio_state_async(self):
# 🔥 GEM-FIX: Don't assume None means "Initialize".
# Only initialize if file is GENUINELY missing (caught inside get_file_async).
# If get_file_json_async returns None due to error, we might be overwriting good data.
try:
data = await self.get_file_json_async(PORTFOLIO_STATE_KEY)
if data: return data
except Exception as e:
print(f"⚠️ [R2] Failed to load portfolio state (Network/Lock): {e}")
# If we fail to read, DO NOT return initial state, because that resets the wallet.
# Instead, return a temporary safe state or re-raise to stop the cycle.
# Returning None here might be safer than resetting.
# For now, we will proceed to check if it's a NoSuchKey inside get_file_async
pass
# If we are here, it means the file likely doesn't exist (First Run)
print("⚠️ [R2] Portfolio State not found. Creating INITIAL state...")
initial = {
"current_capital_usd": INITIAL_CAPITAL, "allocated_capital_usd": 0.0,
"invested_capital_usd": 0.0, "initial_capital_usd": INITIAL_CAPITAL,
"total_trades": 0, "winning_trades": 0, "losing_trades": 0,
"total_profit_usd": 0.0, "total_loss_usd": 0.0, "win_rate": 0.0,
"daily_net_pnl": 0.0, "is_trading_halted": False,
"last_update": datetime.now().isoformat()
}
await self.save_portfolio_state_async(initial)
return initial
async def save_portfolio_state_async(self, state):
state['last_update'] = datetime.now().isoformat()
await self.upload_json_async(state, PORTFOLIO_STATE_KEY)
async def get_open_trades_async(self):
data = await self.get_file_json_async(OPEN_TRADES_KEY)
return data if data else []
async def save_open_trades_async(self, trades):
await self.upload_json_async(trades, OPEN_TRADES_KEY)
async def append_to_closed_trades_history(self, trade_data):
try:
# 🔥 GEM-FIX: Use specific try-except to avoid silent failure
data = await self.get_file_json_async(CLOSED_TRADES_KEY)
history = data if data else []
history.append(trade_data)
if len(history) > 1000: history = history[-1000:]
await self.upload_json_async(history, CLOSED_TRADES_KEY)
except Exception as e:
print(f"❌ [R2] Failed to append trade history: {e}")
async def load_contracts_db_async(self):
data = await self.get_file_json_async(CONTRACTS_DB_KEY)
return data if data else {}
async def append_deep_steward_audit(self, record):
try:
data = await self.get_file_json_async(DEEP_STEWARD_AUDIT_KEY)
history = data if data else []
history.append(record)
if len(history) > 2000: history = history[-2000:]
await self.upload_json_async(history, DEEP_STEWARD_AUDIT_KEY)
except: pass
async def load_strategy_dna_async(self):
data = await self.get_file_json_async(STRATEGIC_DNA_KEY)
return data if data else {}
async def save_strategy_dna_async(self, dna): await self.upload_json_async(dna, STRATEGIC_DNA_KEY)