Trad / r2.py
Riy777's picture
Update r2.py
50aef5d verified
raw
history blame
18 kB
# ==============================================================================
# ☁️ r2.py (V36.0 - GEM-Architect: Full Cybernetic + Whale Legacy Support)
# ==============================================================================
# التحديثات:
# 1. الحفاظ على كافة وظائف V14.1 الأصلية (Whale Learning, Candidates, Contracts).
# 2. إضافة دعم ملفات "الحمض النووي" (Strategic DNA) للنظام الجديد.
# 3. تحديث مسارات التدقيق (Audit) لتتوافق مع كلا النظامين.
# ==============================================================================
import os
import traceback
import json
import time
import io
from datetime import datetime, timedelta
import asyncio
import boto3
from botocore.exceptions import NoCredentialsError, ClientError
from typing import List, Dict, Any, Optional, Union
# ==============================================================================
# ⚙️ التكوين والإعدادات
# ==============================================================================
R2_ACCOUNT_ID = os.getenv("R2_ACCOUNT_ID")
R2_ACCESS_KEY_ID = os.getenv("R2_ACCESS_KEY_ID")
R2_SECRET_ACCESS_KEY = os.getenv("R2_SECRET_ACCESS_KEY")
BUCKET_NAME = "trading"
# 🔴 الرصيد الافتراضي
INITIAL_CAPITAL = 10.0
# 📁 مفاتيح الملفات الأساسية في R2 (القديمة + الجديدة)
WHALE_LEARNING_PENDING_KEY = "learning_whale_pending_records.json"
WHALE_LEARNING_COMPLETED_KEY = "learning_whale_completed_records.json"
WHALE_LEARNING_CONFIG_KEY = "learning_whale_optimal_config.json"
PORTFOLIO_STATE_KEY = "portfolio_state.json"
# ✅ المفتاح الجديد لحالة المحفظة الذكية
SMART_PORTFOLIO_STATE_KEY = "smart_portfolio_state.json"
OPEN_TRADES_KEY = "open_trades.json"
CLOSED_TRADES_KEY = "closed_trades_history.json"
CANDIDATES_KEY = "Candidates.json"
CONTRACTS_DB_KEY = "contracts.json"
SYSTEM_LOGS_KEY = "system_logs.json"
LEARNING_DATA_KEY = "learning_data.json"
ANALYSIS_AUDIT_KEY = "analysis_audit_log.json"
DEEP_STEWARD_AUDIT_KEY = "DeepSteward_Audit_Log.json" # ✅ مفتاح التدقيق الموحد
# 🧬 مفاتيح النظام السيبراني الجديد (New Cybernetic Keys)
STRATEGIC_DNA_KEY = "learning/strategic_dna_v2.json"
# 🆕 مفاتيح التدريب (Training Keys - Legacy Support)
TRAINING_PENDING_BATCH_KEY = "datasets/pending_training_batch.json"
MODEL_TITAN_KEY = "ml_models/layer2/Titan_XGB_V1.json"
class R2Service:
def __init__(self):
try:
endpoint_url = f"https://{R2_ACCOUNT_ID}.r2.cloudflarestorage.com"
self.s3_client = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=R2_ACCESS_KEY_ID,
aws_secret_access_key=R2_SECRET_ACCESS_KEY,
)
self.lock_acquired = False
self.BUCKET_NAME = BUCKET_NAME
except Exception as e:
raise RuntimeError(f"Failed to initialize S3 client: {e}")
# ==============================================================================
# 🔴 [هام جداً] دالة التصفير الشامل (Updated for Smart Portfolio)
# ==============================================================================
async def reset_all_stats_async(self):
"""تصفير المحفظة والسجلات التاريخية والعودة لنقطة الصفر"""
try:
print("🔄 [R2 Reset] بدء عملية التصفير...")
# 1. إعداد الحالة الأولية المتوافقة مع SmartPortfolio V1.2/V36.0
initial_state = {
"current_capital_usd": INITIAL_CAPITAL,
"invested_capital_usd": 0.0, # حقل قديم (للتوافق)
"allocated_capital_usd": 0.0, # ✅ حقل جديد (للمحفظة الذكية)
"initial_capital_usd": INITIAL_CAPITAL,
"total_trades": 0,
"winning_trades": 0,
"losing_trades": 0,
"total_profit_usd": 0.0,
"total_loss_usd": 0.0,
"win_rate": 0.0,
"daily_net_pnl": 0.0, # ✅ حقل جديد
"is_trading_halted": False, # ✅ حقل جديد
"last_update": datetime.now().isoformat()
}
# حفظ الحالة في الملف الرئيسي
await self.save_portfolio_state_async(initial_state)
# ✅ تصفير ملف المحفظة الذكية الخاص أيضاً
smart_state = {
"current_capital": INITIAL_CAPITAL,
"allocated_capital_usd": 0.0,
"session_start_balance": INITIAL_CAPITAL,
"daily_net_pnl": 0.0,
"is_trading_halted": False,
"halt_reason": None,
"last_session_reset": datetime.now().isoformat()
}
await self.upload_json_async(smart_state, SMART_PORTFOLIO_STATE_KEY)
# تصفير السجلات الأخرى
empty_list_json = json.dumps([], indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=CLOSED_TRADES_KEY, Body=empty_list_json, ContentType="application/json"
)
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=OPEN_TRADES_KEY, Body=empty_list_json, ContentType="application/json"
)
empty_logs_json = json.dumps({"logs": []}, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=SYSTEM_LOGS_KEY, Body=empty_logs_json, ContentType="application/json"
)
print("✅ [R2 Reset] تم تصفير جميع البيانات والمحفظة (بما في ذلك SmartPortfolio) بنجاح.")
return True
except Exception as e:
print(f"❌ [R2 Reset Error] فشل التصفير: {e}")
traceback.print_exc()
return False
# ==============================================================================
# 🆕 دوال دعم التدريب والملفات العامة (General File Support)
# ==============================================================================
async def upload_json_async(self, data: Any, key: str):
"""رفع أي بيانات JSON إلى مسار محدد"""
try:
json_bytes = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8')
await asyncio.to_thread(
self.s3_client.put_object,
Bucket=self.BUCKET_NAME,
Key=key,
Body=json_bytes,
ContentType="application/json"
)
except Exception as e:
print(f"❌ [R2] Upload JSON failed for {key}: {e}")
raise
async def get_file_async(self, key: str) -> Optional[bytes]:
"""جلب ملف خام (Raw Bytes)"""
try:
response = await asyncio.to_thread(
self.s3_client.get_object,
Bucket=self.BUCKET_NAME,
Key=key
)
return response['Body'].read()
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
return None
raise
async def upload_file_async(self, file_obj, key: str):
"""رفع ملف (مثل نماذج .json)"""
try:
await asyncio.to_thread(
self.s3_client.upload_fileobj,
file_obj,
self.BUCKET_NAME,
key
)
except Exception as e:
print(f"❌ [R2] File upload failed for {key}: {e}")
raise
# ==============================================================================
# 🧬 إدارة الاستراتيجية والتعلم (NEW: Strategic DNA)
# ==============================================================================
async def load_strategy_dna_async(self) -> Dict[str, Any]:
"""تحميل ملف الحمض النووي للاستراتيجيات (Cybernetic DNA)"""
try:
data = await self.get_file_async(STRATEGIC_DNA_KEY)
return json.loads(data) if data else {}
except: return {}
async def save_strategy_dna_async(self, dna_data: Dict[str, Any]):
"""حفظ تحديثات الاستراتيجية"""
await self.upload_json_async(dna_data, STRATEGIC_DNA_KEY)
# ==============================================================================
# 🔒 إدارة القفل (Lock Mechanism)
# ==============================================================================
def acquire_lock(self, max_retries=3):
lock_path = "lock.txt"
for attempt in range(max_retries):
try:
try:
self.s3_client.head_object(Bucket=BUCKET_NAME, Key=lock_path)
time.sleep(1)
except ClientError as e:
if e.response['Error']['Code'] == '404':
self.s3_client.put_object(Bucket=BUCKET_NAME, Key=lock_path, Body=b'')
self.lock_acquired = True
return True
else: raise
except Exception: time.sleep(1)
return False
def release_lock(self):
if self.lock_acquired:
try:
self.s3_client.delete_object(Bucket=BUCKET_NAME, Key="lock.txt")
self.lock_acquired = False
except Exception: pass
# ==============================================================================
# 📊 إدارة المرشحين (Candidates)
# ==============================================================================
async def save_candidates_async(self, candidates):
try:
data = {"timestamp": datetime.now().isoformat(), "total_candidates": len(candidates), "candidates": candidates}
await self.upload_json_async(data, CANDIDATES_KEY)
print(f"✅ تم حفظ {len(candidates)} مرشح في R2")
except Exception: pass
async def load_candidates_async(self):
try:
data_bytes = await self.get_file_async(CANDIDATES_KEY)
if data_bytes: return json.loads(data_bytes).get('candidates', [])
return []
except Exception: return []
# ==============================================================================
# 📝 السجلات والتعلم (Logs & Audit)
# ==============================================================================
async def save_system_logs_async(self, log_data):
"""حفظ سجلات النظام (متروك للتوافق)"""
pass
async def append_deep_steward_audit(self, audit_record: Dict[str, Any]):
"""تسجيل نتائج التدقيق للتعلم (Deep Steward Audit Log)"""
try:
data = await self.get_file_async(DEEP_STEWARD_AUDIT_KEY)
history = json.loads(data) if data else []
history.append(audit_record)
if len(history) > 2000: history = history[-2000:] # زيادة السعة لـ 2000 سجل
await self.upload_json_async(history, DEEP_STEWARD_AUDIT_KEY)
print(f"📝 [R2 Audit] تم تسجيل التدقيق لـ {audit_record.get('symbol')}")
except Exception as e:
print(f"❌ [R2 Error] فشل حفظ التدقيق: {e}")
# ==============================================================================
# 💰 إدارة المحفظة (Accounting - Updated)
# ==============================================================================
async def get_portfolio_state_async(self):
try:
data_bytes = await self.get_file_async(PORTFOLIO_STATE_KEY)
if data_bytes:
state = json.loads(data_bytes)
# ✅ تصحيح وتحديث البيانات القديمة لتشمل حقول SmartPortfolio
if 'losing_trades' not in state: state['losing_trades'] = 0
if 'initial_capital_usd' not in state: state['initial_capital_usd'] = INITIAL_CAPITAL
if 'allocated_capital_usd' not in state: state['allocated_capital_usd'] = 0.0
if 'daily_net_pnl' not in state: state['daily_net_pnl'] = 0.0
return state
# تهيئة افتراضية متكاملة
initial_state = {
"current_capital_usd": INITIAL_CAPITAL,
"allocated_capital_usd": 0.0, # ✅ جديد
"invested_capital_usd": 0.0, # Legacy support
"initial_capital_usd": INITIAL_CAPITAL,
"total_trades": 0, "winning_trades": 0, "losing_trades": 0,
"total_profit_usd": 0.0, "total_loss_usd": 0.0, "win_rate": 0.0,
"daily_net_pnl": 0.0, # ✅ جديد
"is_trading_halted": False,
"last_update": datetime.now().isoformat()
}
await self.save_portfolio_state_async(initial_state)
return initial_state
except Exception: raise
async def save_portfolio_state_async(self, state):
state['last_update'] = datetime.now().isoformat()
await self.upload_json_async(state, PORTFOLIO_STATE_KEY)
# ==============================================================================
# 📈 إدارة الصفقات (Trades)
# ==============================================================================
async def get_open_trades_async(self):
try:
data = await self.get_file_async(OPEN_TRADES_KEY)
return json.loads(data) if data else []
except: return []
async def save_open_trades_async(self, trades):
await self.upload_json_async(trades, OPEN_TRADES_KEY)
async def append_to_closed_trades_history(self, trade_data: Dict[str, Any]):
try:
data = await self.get_file_async(CLOSED_TRADES_KEY)
history = json.loads(data) if data else []
history.append(trade_data)
if len(history) > 1000: history = history[-1000:]
await self.upload_json_async(history, CLOSED_TRADES_KEY)
except Exception: pass
# ==============================================================================
# 📜 قاعدة العقود (Contracts DB)
# ==============================================================================
async def load_contracts_db_async(self):
try:
data = await self.get_file_async(CONTRACTS_DB_KEY)
return json.loads(data) if data else {}
except: return {}
async def save_contracts_db_async(self, data):
await self.upload_json_async(data, CONTRACTS_DB_KEY)
# ==============================================================================
# 🐋 تعلم الحيتان (Whale Learning - FULL LEGACY RESTORED)
# ==============================================================================
async def save_whale_learning_record_async(self, record: Dict[str, Any]):
"""حفظ سجل تعلم حيتان معلق"""
try:
data = await self.get_file_async(WHALE_LEARNING_PENDING_KEY)
pending = json.loads(data) if data else []
pending.append(record)
await self.upload_json_async(pending, WHALE_LEARNING_PENDING_KEY)
except: pass
async def get_pending_whale_learning_records_async(self) -> List[Dict[str, Any]]:
"""جلب السجلات المعلقة للتحليل"""
data = await self.get_file_async(WHALE_LEARNING_PENDING_KEY)
return json.loads(data) if data else []
async def update_completed_whale_learning_record_async(self, completed_record: Dict[str, Any]):
"""نقل السجل من المعلق إلى المكتمل"""
try:
rec_id = completed_record.get("record_id")
if not rec_id: return
# 1. إضافة للمكتمل
comp_data = await self.get_file_async(WHALE_LEARNING_COMPLETED_KEY)
completed = json.loads(comp_data) if comp_data else []
completed.append(completed_record)
if len(completed) > 5000: completed = completed[-5000:]
await self.upload_json_async(completed, WHALE_LEARNING_COMPLETED_KEY)
# 2. حذف من المعلق
pend_data = await self.get_file_async(WHALE_LEARNING_PENDING_KEY)
pending = json.loads(pend_data) if pend_data else []
updated_pending = [r for r in pending if r.get("record_id") != rec_id]
await self.upload_json_async(updated_pending, WHALE_LEARNING_PENDING_KEY)
except: pass
async def get_all_completed_whale_records_async(self) -> List[Dict[str, Any]]:
"""جلب كافة سجلات الحيتان المكتملة"""
data = await self.get_file_async(WHALE_LEARNING_COMPLETED_KEY)
return json.loads(data) if data else []
async def save_whale_learning_config_async(self, config: Dict[str, Any]):
"""حفظ إعدادات الحيتان المحسنة"""
await self.upload_json_async(config, WHALE_LEARNING_CONFIG_KEY)
async def load_whale_learning_config_async(self) -> Dict[str, Any]:
"""تحميل إعدادات الحيتان المحسنة"""
data = await self.get_file_async(WHALE_LEARNING_CONFIG_KEY)
return json.loads(data) if data else {}
print("✅ [R2 V36.0] Service Loaded (Full Cybernetic + Legacy Support).")