Tradtesting / r2.py
Riy777's picture
Update r2.py
c382613 verified
raw
history blame
22.1 kB
# r2.py (V13.0 - GEM-Architect: Full Code with DeepSteward Audit)
import os
import traceback
import json
import time
from datetime import datetime, timedelta
import asyncio
import boto3
from botocore.exceptions import NoCredentialsError, ClientError
from typing import List, Dict, Any, Optional
# ==============================================================================
# ⚙️ التكوين والإعدادات
# ==============================================================================
R2_ACCOUNT_ID = os.getenv("R2_ACCOUNT_ID")
R2_ACCESS_KEY_ID = os.getenv("R2_ACCESS_KEY_ID")
R2_SECRET_ACCESS_KEY = os.getenv("R2_SECRET_ACCESS_KEY")
BUCKET_NAME = "trading"
# 🔴 الرصيد الافتراضي
INITIAL_CAPITAL = 10.0
# 📁 مفاتيح الملفات في R2
WHALE_LEARNING_PENDING_KEY = "learning_whale_pending_records.json"
WHALE_LEARNING_COMPLETED_KEY = "learning_whale_completed_records.json"
WHALE_LEARNING_CONFIG_KEY = "learning_whale_optimal_config.json"
PORTFOLIO_STATE_KEY = "portfolio_state.json"
OPEN_TRADES_KEY = "open_trades.json"
CLOSED_TRADES_KEY = "closed_trades_history.json"
CANDIDATES_KEY = "Candidates.json"
CONTRACTS_DB_KEY = "contracts.json"
SYSTEM_LOGS_KEY = "system_logs.json"
LLM_PROMPTS_KEY = "llm_prompts.json"
LEARNING_DATA_KEY = "learning_data.json"
ANALYSIS_AUDIT_KEY = "analysis_audit_log.json"
class R2Service:
def __init__(self):
try:
endpoint_url = f"https://{R2_ACCOUNT_ID}.r2.cloudflarestorage.com"
self.s3_client = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=R2_ACCESS_KEY_ID,
aws_secret_access_key=R2_SECRET_ACCESS_KEY,
)
self.lock_acquired = False
self.BUCKET_NAME = BUCKET_NAME
self._open_trades_warning_printed = False
self._portfolio_warning_printed = False
self._contracts_warning_printed = False
except Exception as e:
raise RuntimeError(f"Failed to initialize S3 client: {e}")
# ==============================================================================
# 🔴 [هام جداً] دالة التصفير الشامل
# ==============================================================================
async def reset_all_stats_async(self):
"""تصفير المحفظة والسجلات التاريخية والعودة لنقطة الصفر"""
try:
print("🔄 [R2 Reset] بدء عملية التصفير...")
# 1. إعادة تعيين المحفظة للقيمة الأولية
initial_state = {
"current_capital_usd": INITIAL_CAPITAL,
"invested_capital_usd": 0.0,
"initial_capital_usd": INITIAL_CAPITAL,
"total_trades": 0,
"winning_trades": 0,
"losing_trades": 0,
"total_profit_usd": 0.0,
"total_loss_usd": 0.0,
"win_rate": 0.0,
"last_update": datetime.now().isoformat()
}
await self.save_portfolio_state_async(initial_state)
# 2. مسح سجل الصفقات المغلقة (History)
empty_list_json = json.dumps([], indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=CLOSED_TRADES_KEY, Body=empty_list_json, ContentType="application/json"
)
# 3. مسح السجلات (Logs)
empty_logs_json = json.dumps({"logs": []}, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=SYSTEM_LOGS_KEY, Body=empty_logs_json, ContentType="application/json"
)
print("✅ [R2 Reset] تم تصفير جميع البيانات والمحفظة بنجاح.")
return True
except Exception as e:
print(f"❌ [R2 Reset Error] فشل التصفير: {e}")
traceback.print_exc()
return False
# ==============================================================================
# 🔒 إدارة القفل (Lock Mechanism)
# ==============================================================================
def acquire_lock(self, max_retries=3):
lock_path = "lock.txt"
for attempt in range(max_retries):
try:
try:
self.s3_client.head_object(Bucket=BUCKET_NAME, Key=lock_path)
print(f"🔒 Lock file exists. Attempt {attempt + 1}/{max_retries}. Waiting...")
time.sleep(1)
except ClientError as e:
if e.response['Error']['Code'] == '404':
self.s3_client.put_object(Bucket=BUCKET_NAME, Key=lock_path, Body=b'')
self.lock_acquired = True
print("✅ Lock acquired.")
return True
else:
raise
except Exception as e:
print(f"❌ Failed to acquire lock: {e}")
time.sleep(1)
return False
def release_lock(self):
lock_path = "lock.txt"
if self.lock_acquired:
try:
self.s3_client.delete_object(Bucket=BUCKET_NAME, Key=lock_path)
print("✅ Lock released.")
self.lock_acquired = False
except Exception as e:
print(f"❌ Failed to release lock: {e}")
# ==============================================================================
# 📊 إدارة المرشحين (Candidates)
# ==============================================================================
async def save_candidates_async(self, candidates):
try:
data = {
"timestamp": datetime.now().isoformat(),
"total_candidates": len(candidates),
"candidates": candidates
}
data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=CANDIDATES_KEY, Body=data_json, ContentType="application/json"
)
print(f"✅ تم حفظ {len(candidates)} مرشح في ملف Candidates في R2")
except Exception as e:
print(f"❌ فشل حفظ المرشحين في R2: {e}")
async def load_candidates_async(self):
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=CANDIDATES_KEY)
data = json.loads(response['Body'].read())
return data.get('candidates', [])
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
return []
else:
raise
# ==============================================================================
# 📝 السجلات والتعلم (Logs & Debugging)
# ==============================================================================
async def save_llm_prompts_async(self, symbol, prompt_type, prompt_content, analysis_data=None):
try:
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=LLM_PROMPTS_KEY)
existing_data = json.loads(response['Body'].read())
except ClientError:
existing_data = {"prompts": []}
new_prompt = {
"timestamp": datetime.now().isoformat(),
"symbol": symbol,
"prompt_type": prompt_type,
"prompt_content": prompt_content,
"analysis_data": analysis_data
}
existing_data["prompts"].append(new_prompt)
if len(existing_data["prompts"]) > 2000:
existing_data["prompts"] = existing_data["prompts"][-2000:]
data_json = json.dumps(existing_data, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=LLM_PROMPTS_KEY, Body=data_json, ContentType="application/json"
)
except Exception:
pass
async def save_system_logs_async(self, log_data):
try:
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=SYSTEM_LOGS_KEY)
existing_logs = json.loads(response['Body'].read())
except ClientError:
existing_logs = {"logs": []}
log_entry = {
"timestamp": datetime.now().isoformat(),
**log_data
}
existing_logs["logs"].append(log_entry)
if len(existing_logs["logs"]) > 2000:
existing_logs["logs"] = existing_logs["logs"][-2000:]
data_json = json.dumps(existing_logs, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=SYSTEM_LOGS_KEY, Body=data_json, ContentType="application/json"
)
except Exception:
pass
# ==============================================================================
# 🧠 سجلات DeepSteward (Post-Exit Analysis) - [NEW]
# ==============================================================================
async def append_deep_steward_audit(self, audit_record: Dict[str, Any]):
"""
حفظ تحليل ما بعد الخروج في ملف خاص بالنموذج.
"""
try:
# اسم الملف يعتمد على اسم النموذج لضمان التنظيم
file_key = "DeepSteward_Audit_Log.json"
try:
response = self.s3_client.get_object(Bucket=self.BUCKET_NAME, Key=file_key)
history = json.loads(response['Body'].read())
except ClientError:
history = []
history.append(audit_record)
# نحتفظ بآخر 1000 سجل فقط لعدم تضخم الملف
if len(history) > 1000: history = history[-1000:]
data_json = json.dumps(history, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=self.BUCKET_NAME, Key=file_key, Body=data_json, ContentType="application/json"
)
print(f"📝 [R2 Audit] تم تسجيل نتيجة قرار DeepSteward لـ {audit_record.get('symbol')}")
except Exception as e:
print(f"❌ [R2 Error] فشل حفظ سجل التدقيق: {e}")
# ==============================================================================
# 🧠 بيانات التعلم العام
# ==============================================================================
async def save_learning_data_async(self, learning_data):
try:
data = {
"timestamp": datetime.now().isoformat(),
"learning_data": learning_data
}
data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=LEARNING_DATA_KEY, Body=data_json, ContentType="application/json"
)
except Exception:
pass
async def load_learning_data_async(self):
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=LEARNING_DATA_KEY)
return json.loads(response['Body'].read())
except ClientError:
return {}
# ==============================================================================
# 💰 إدارة المحفظة (Accounting & Portfolio)
# ==============================================================================
async def get_portfolio_state_async(self):
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=PORTFOLIO_STATE_KEY)
state = json.loads(response['Body'].read())
# إصلاح البيانات القديمة إن وجدت
if 'losing_trades' not in state: state['losing_trades'] = 0
if 'total_loss_usd' not in state: state['total_loss_usd'] = 0.0
if 'initial_capital_usd' not in state: state['initial_capital_usd'] = INITIAL_CAPITAL
if hasattr(self, '_portfolio_warning_printed'):
delattr(self, '_portfolio_warning_printed')
print(f"💰 Portfolio loaded: Current Capital ${state.get('current_capital_usd', 0):.2f}")
return state
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
if not hasattr(self, '_portfolio_warning_printed'):
print(f"⚠️ No portfolio state found. Initializing with ${INITIAL_CAPITAL}")
self._portfolio_warning_printed = True
initial_state = {
"current_capital_usd": INITIAL_CAPITAL,
"invested_capital_usd": 0.0,
"initial_capital_usd": INITIAL_CAPITAL,
"total_trades": 0,
"winning_trades": 0,
"losing_trades": 0,
"total_profit_usd": 0.0,
"total_loss_usd": 0.0,
"win_rate": 0.0,
"last_update": datetime.now().isoformat()
}
await self.save_portfolio_state_async(initial_state)
return initial_state
else:
raise
async def save_portfolio_state_async(self, state):
try:
state['last_update'] = datetime.now().isoformat()
data_json = json.dumps(state, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=PORTFOLIO_STATE_KEY, Body=data_json, ContentType="application/json"
)
print(f"💾 Portfolio saved: ${state.get('current_capital_usd', 0):.2f}")
except Exception as e:
print(f"❌ Failed to save portfolio state: {e}")
raise
# ==============================================================================
# 📈 إدارة الصفقات (Open Trades & History)
# ==============================================================================
async def get_open_trades_async(self):
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=OPEN_TRADES_KEY)
return json.loads(response['Body'].read())
except ClientError:
return []
async def save_open_trades_async(self, trades):
try:
data_json = json.dumps(trades, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=OPEN_TRADES_KEY, Body=data_json, ContentType="application/json"
)
except Exception as e:
print(f"❌ Failed to save open trades: {e}")
async def append_to_closed_trades_history(self, trade_data: Dict[str, Any]):
try:
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=CLOSED_TRADES_KEY)
history = json.loads(response['Body'].read())
except ClientError:
history = []
history.append(trade_data)
if len(history) > 1000:
history = history[-1000:]
data_json = json.dumps(history, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=CLOSED_TRADES_KEY, Body=data_json, ContentType="application/json"
)
print(f"📜 History archived for {trade_data.get('symbol')}")
except Exception as e:
print(f"❌ Failed to archive trade: {e}")
async def get_closed_trades_history(self):
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=CLOSED_TRADES_KEY)
return json.loads(response['Body'].read())
except ClientError:
return []
# ==============================================================================
# 📜 قاعدة بيانات العقود
# ==============================================================================
async def load_contracts_db_async(self):
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=CONTRACTS_DB_KEY)
return json.loads(response['Body'].read())
except ClientError:
return {}
async def save_contracts_db_async(self, data):
try:
data_json = json.dumps(data, indent=2).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=CONTRACTS_DB_KEY, Body=data_json, ContentType="application/json"
)
except Exception:
pass
# ==============================================================================
# 🛠️ أدوات مساعدة
# ==============================================================================
async def get_trade_by_symbol_async(self, symbol):
try:
open_trades = await self.get_open_trades_async()
for trade in open_trades:
if trade['symbol'] == symbol and trade['status'] == 'OPEN':
return trade
return None
except Exception:
return None
async def update_trade_monitoring_status_async(self, symbol, is_monitored):
try:
open_trades = await self.get_open_trades_async()
updated = False
for trade in open_trades:
if trade['symbol'] == symbol:
trade['is_monitored'] = is_monitored
updated = True
break
if updated:
await self.save_open_trades_async(open_trades)
return True
return False
except Exception:
return False
async def get_monitored_trades_async(self):
try:
open_trades = await self.get_open_trades_async()
return [trade for trade in open_trades if trade.get('is_monitored', False)]
except Exception:
return []
async def save_analysis_audit_log_async(self, audit_data):
try:
try:
response = self.s3_client.get_object(Bucket=BUCKET_NAME, Key=ANALYSIS_AUDIT_KEY)
history = json.loads(response['Body'].read())
except ClientError:
history = []
history.append(audit_data)
if len(history) > 50:
history = history[-50:]
data_json = json.dumps(history, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(
Bucket=BUCKET_NAME, Key=ANALYSIS_AUDIT_KEY, Body=data_json, ContentType="application/json"
)
except Exception:
pass
# ==============================================================================
# 🐋 تعلم الحيتان (Whale Learning)
# ==============================================================================
async def _load_json_file_from_r2(self, key: str, default: Any = []) -> Any:
try:
response = self.s3_client.get_object(Bucket=self.BUCKET_NAME, Key=key)
return json.loads(response['Body'].read())
except Exception:
return default
async def _save_json_file_to_r2(self, key: str, data: Any):
try:
data_json = json.dumps(data, indent=2, ensure_ascii=False).encode('utf-8')
self.s3_client.put_object(Bucket=self.BUCKET_NAME, Key=key, Body=data_json, ContentType="application/json")
except Exception:
pass
async def save_whale_learning_record_async(self, record: Dict[str, Any]):
try:
pending = await self._load_json_file_from_r2(WHALE_LEARNING_PENDING_KEY, default=[])
pending.append(record)
await self._save_json_file_to_r2(WHALE_LEARNING_PENDING_KEY, pending)
except Exception: pass
async def get_pending_whale_learning_records_async(self) -> List[Dict[str, Any]]:
return await self._load_json_file_from_r2(WHALE_LEARNING_PENDING_KEY, default=[])
async def update_completed_whale_learning_record_async(self, completed_record: Dict[str, Any]):
try:
record_id = completed_record.get("record_id")
if not record_id: return
completed = await self._load_json_file_from_r2(WHALE_LEARNING_COMPLETED_KEY, default=[])
completed.append(completed_record)
if len(completed) > 5000: completed = completed[-5000:]
await self._save_json_file_to_r2(WHALE_LEARNING_COMPLETED_KEY, completed)
pending = await self._load_json_file_from_r2(WHALE_LEARNING_PENDING_KEY, default=[])
updated_pending = [r for r in pending if r.get("record_id") != record_id]
await self._save_json_file_to_r2(WHALE_LEARNING_PENDING_KEY, updated_pending)
except Exception: pass
async def get_all_completed_whale_records_async(self) -> List[Dict[str, Any]]:
return await self._load_json_file_from_r2(WHALE_LEARNING_COMPLETED_KEY, default=[])
async def save_whale_learning_config_async(self, config: Dict[str, Any]):
await self._save_json_file_to_r2(WHALE_LEARNING_CONFIG_KEY, config)
async def load_whale_learning_config_async(self) -> Dict[str, Any]:
return await self._load_json_file_from_r2(WHALE_LEARNING_CONFIG_KEY, default={})
print("✅ Full R2 Service Loaded - Accounting, History, Reset & Audit Ready")