Tradtesting / LLM.py
Riy777's picture
Update LLM.py
e2a2b99
# LLM.py (V14.3 - Independent Analyst & Raw Data Verifier)
import os
import traceback
import json
import re
import time
from datetime import datetime
from typing import Dict, Any, Optional
from openai import AsyncOpenAI
# ==============================================================================
# 🔌 إعدادات الاتصال بالنموذج (Model Connection Settings)
# ==============================================================================
LLM_API_URL = os.getenv("LLM_API_URL", "https://integrate.api.nvidia.com/v1")
LLM_API_KEY = os.getenv("LLM_API_KEY")
LLM_MODEL = os.getenv("LLM_MODEL", "nvidia/llama-3.1-nemotron-ultra-253b-v1")
# بارامترات التوليد (مضبوطة للتفكير العميق والتحليل الدقيق)
LLM_TEMPERATURE = 0.2 # حرارة منخفضة للدقة والمنطق
LLM_TOP_P = 0.7
LLM_MAX_TOKENS = 16384 # الحد الأقصى للسماح بتحليل كميات ضخمة من البيانات
LLM_FREQUENCY_PENALTY = 0.0
LLM_PRESENCE_PENALTY = 0.0
CLIENT_TIMEOUT = 300.0
class LLMService:
def __init__(self):
if not LLM_API_KEY:
raise ValueError("❌ [LLM FATAL] LLM_API_KEY environment variable is missing!")
self.client = AsyncOpenAI(
base_url=LLM_API_URL,
api_key=LLM_API_KEY,
timeout=CLIENT_TIMEOUT
)
self.r2_service = None
self.learning_hub = None
print(f"🧠 [LLM V14.3] Independent Analyst (Reasoning Mode) Online. Model: {LLM_MODEL}")
async def _call_llm(self, prompt: str) -> Optional[str]:
"""تنفيذ استدعاء API للنموذج مع تفعيل وضع التفكير."""
# [ 🔥 مفتاح التفعيل ]: هذه العبارة تجبر النموذج على الدخول في وضع التفكير المتسلسل
system_prompt_trigger = "detailed thinking on"
try:
response = await self.client.chat.completions.create(
model=LLM_MODEL,
messages=[
{"role": "system", "content": system_prompt_trigger},
{"role": "user", "content": prompt}
],
temperature=LLM_TEMPERATURE,
top_p=LLM_TOP_P,
max_tokens=LLM_MAX_TOKENS,
frequency_penalty=LLM_FREQUENCY_PENALTY,
presence_penalty=LLM_PRESENCE_PENALTY,
stream=False,
response_format={"type": "json_object"}
)
# حماية ضد الاستجابات الفارغة
if response and response.choices and len(response.choices) > 0:
message = response.choices[0].message
if message and message.content:
return message.content
print("⚠️ [LLM Warning] Received empty response from model.")
return None
except Exception as e:
print(f"❌ [LLM Call Error] API request failed: {e}")
return None
def _parse_json_secure(self, text: str) -> Optional[Dict]:
"""محلل JSON قوي يستخرج الكائن من بين نصوص التفكير."""
try:
if not text: return None
# البحث عن أول كائن JSON صالح يبدأ بـ { وينتهي بـ }
json_match = re.search(r'\{.*\}', text, re.DOTALL)
if json_match:
return json.loads(json_match.group(0))
else:
print("⚠️ [LLM Parser] No JSON object found in response text.")
return None
except json.JSONDecodeError as e:
print(f"⚠️ [LLM Parser] JSON decode failed: {e}")
return None
except Exception as e:
print(f"❌ [LLM Parser] Unexpected error: {e}")
return None
# ==================================================================
# 🧠 الوظيفة الرئيسية 1: قرار الدخول الاستراتيجي (Layer 3 Analysis)
# ==================================================================
async def get_trading_decision(self, candidate_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
symbol = candidate_data.get('symbol', 'UNKNOWN_ASSET')
try:
# جلب سياق التعلم السابق
learning_context = "Playbook: No specific prior learning records found."
if self.learning_hub:
learning_context = await self.learning_hub.get_active_context_for_llm("general", f"{symbol} entry analysis")
# إنشاء البرومبت التحليلي (البيانات الخام)
prompt = self._create_raw_data_analyst_prompt(candidate_data, learning_context)
# استدعاء النموذج
response_text = await self._call_llm(prompt)
if not response_text: return None
# تحليل النتيجة
decision = self._parse_json_secure(response_text)
# أرشفة القرار
if self.r2_service and decision:
await self.r2_service.save_llm_prompts_async(symbol, "raw_analyst_decision", prompt, response_text)
return decision
except Exception as e:
print(f"❌ [LLM Entry Error] Critical failure for {symbol}: {e}")
traceback.print_exc()
return None
# ==================================================================
# 🔄 الوظيفة الرئيسية 2: إعادة التحليل الدوري (Strategic Re-eval)
# ==================================================================
async def re_analyze_trade_async(self, trade_data: Dict[str, Any], current_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
symbol = trade_data.get('symbol', 'UNKNOWN_ASSET')
try:
# جلب سياق الاستراتيجية
learning_context = "Playbook: Stick to original trading plan unless invalidated."
if self.learning_hub:
learning_context = await self.learning_hub.get_active_context_for_llm("strategy", f"{symbol} re-eval")
# إنشاء برومبت إعادة التحليل (البيانات الخام)
prompt = self._create_raw_reanalysis_prompt(trade_data, current_data, learning_context)
# استدعاء النموذج
response_text = await self._call_llm(prompt)
if not response_text: return None
# تحليل النتيجة
decision = self._parse_json_secure(response_text)
# أرشفة القرار
if self.r2_service and decision:
await self.r2_service.save_llm_prompts_async(symbol, "raw_reanalysis_decision", prompt, response_text)
return decision
except Exception as e:
print(f"❌ [LLM Re-Eval Error] Critical failure for {symbol}: {e}")
return None
# ==================================================================
# 📝 أدوات المساعدة في التنسيق (Helpers)
# ==================================================================
def _format_candles_to_csv(self, ohlcv_dict: Dict[str, list], limit: int = 200) -> str:
"""
تحويل بيانات الشموع من صيغة JSON/List إلى صيغة CSV مضغوطة ليفهمها النموذج بسهولة.
"""
csv_output = ""
# تحديد الأولويات للفريمات الزمنية (الأهم فالأهم)
priority_tfs = ['1h', '4h', '1d']
for tf in priority_tfs:
if tf not in ohlcv_dict: continue
candles = ohlcv_dict[tf]
if not candles: continue
# اقتطاع آخر 'limit' شمعة فقط
sliced_candles = candles[-limit:]
csv_output += f"\n=== TIMEFRAME {tf.upper()} (Last {len(sliced_candles)} Candles) ===\n"
csv_output += "Index,Open,High,Low,Close,Volume\n"
for idx, c in enumerate(sliced_candles):
# التنسيق: Index, Open, High, Low, Close, Volume
# (نتأكد من وجود العناصر لتجنب الأخطاء)
if len(c) >= 6:
csv_output += f"{idx},{c[1]},{c[2]},{c[3]},{c[4]},{c[5]}\n"
if not csv_output:
return "NO CANDLE DATA AVAILABLE."
return csv_output
# ==================================================================
# 🧠 هندسة البرومبت: الدخول (Raw Data Analyst Prompt)
# ==================================================================
def _create_raw_data_analyst_prompt(self, data: Dict[str, Any], learning_context: str) -> str:
symbol = data.get('symbol', 'UNKNOWN')
# 1. استخراج بيانات الطبقة الأولى (HEARSAY)
l1_components = data.get('components', {})
titan_score = l1_components.get('titan_score', 0.0)
pat_details = data.get('pattern_details', {})
pat_conf = pat_details.get('pattern_confidence', 0.0)
simple_mc_score = l1_components.get('mc_score', 0.0)
# 2. استخراج بيانات الطبقة الثانية (CONTEXT)
# الحيتان
whale_raw = data.get('l2_raw_values', {}).get('whale', 'No Data')
if whale_raw == "No Data" or whale_raw == "$0" or whale_raw == "No Data ($0)":
whale_section = "WHALE DATA: ⚠️ NO DATA FOUND (Assume Neutral/Unknown)"
else:
whale_section = f"WHALE DATA (Net Flow): {whale_raw} (Positive=Outflow/Sell, Negative=Inflow/Buy? Check Context)"
# الأخبار
news_raw = data.get('news_text', 'No specific news found.')
# مونت كارلو المتقدمة
adv_mc_raw = data.get('l2_raw_values', {}).get('adv_mc', 'No Data')
# 3. استخراج البيانات الخام (THE TRUTH)
ohlcv_raw = data.get('ohlcv', {})
candles_csv_block = self._format_candles_to_csv(ohlcv_raw, limit=200)
return f"""
ROLE: You are the 'Omniscient Brain', the Lead Quantitative Analyst.
OBJECTIVE: Analyze the RAW DATA for {symbol} and decide to TRADE or IGNORE.
⚠️ CRITICAL INSTRUCTION:
The "Local Model Reports" below are generated by smaller, potentially flawed algorithms.
They are strictly HEARSAY. You must NOT trust them blindly.
Your job is to verify them against the RAW CANDLE DATA provided below.
If your visual/statistical analysis of the candles contradicts the Local Models, YOUR ANALYSIS PREVAILS.
========== 1. LOCAL MODEL REPORTS (HEARSAY - TREAT WITH SKEPTICISM) ==========
[A] Technical Indicator Analysis (formerly Titan): Score {titan_score:.2f}/1.00
- Note: Based on standard RSI/MACD/Bollinger calculations.
[B] Pattern Recognition Model: Confidence {pat_conf:.2f}/1.00
- Note: This model DOES NOT identify pattern names. It only outputs a confidence score.
[C] Simple Monte Carlo: Score {simple_mc_score:.2f}
- Method: Basic Random Walk based on recent volatility.
[D] Advanced Monte Carlo: Probability {adv_mc_raw}
- Method: GARCH(1,1) model with Fat-Tailed distribution simulation.
========== 2. EXTERNAL FACTORS (CONTEXT) ==========
[A] {whale_section}
[B] RAW NEWS FEED:
"{news_raw}"
========== 3. THE TRUTH (RAW CANDLE DATA) ==========
Below are the last 200 candles for key timeframes (1H, 4H, 1D).
Index 199 is the MOST RECENT candle.
Format: Index,Open,High,Low,Close,Volume
{candles_csv_block}
========== 4. YOUR ANALYTICAL TASKS ==========
1. **Pattern Hunting:** Scan the CSV data above (especially 1H and 4H). Do you see any classic patterns (Bull Flag, Head & Shoulders, Double Bottom, etc.)?
2. **Trend Verification:** Compare the 'Technical Indicator Analysis' score with the slope of the raw Close prices. Is the score hallucinating a trend where there is actually a range or downtrend?
3. **Volume Analysis:** Look at the Volume column. Is there a volume spike supporting the price move?
4. **Conflict Resolution:** If Local Models say "UP" but Raw Candles show "Lower Highs" (Downtrend), you must REJECT.
========== 5. FINAL DECISION OUTPUT ==========
Based ONLY on your analysis of the raw data:
REQUIRED JSON FORMAT:
{{
"action": "WATCH" or "IGNORE",
"confidence": 0.00 to 1.00,
"identified_patterns": ["List specific patterns YOU found in the raw data (e.g., 'Bull Flag on 1H')"],
"analysis_summary": "Your reasoning. Explicitly mention if you agreed or disagreed with the Local Models.",
"key_raw_evidence": "Cite specific candle indexes or volume figures that support your decision."
}}
"""
# ==================================================================
# 🧠 هندسة البرومبت: إعادة التحليل (Raw Re-Analysis Prompt)
# ==================================================================
def _create_raw_reanalysis_prompt(self, trade: Dict, current: Dict, learning_context: str) -> str:
symbol = trade.get('symbol', 'UNKNOWN')
entry_price = trade.get('entry_price', 0.0)
current_price = current.get('current_price', 0.0)
# حساب مدة الصفقة والربح
try:
entry_time = datetime.fromisoformat(trade.get('entry_time').replace('Z', '+00:00'))
duration_minutes = (datetime.now(entry_time.tzinfo) - entry_time).total_seconds() / 60
except:
duration_minutes = 0.0
pnl_percentage = ((current_price - entry_price) / entry_price) * 100
# 1. البيانات الجديدة (Local Models)
titan_score_now = current.get('titan_score', 0.0)
# 2. البيانات الخارجية (الحيتان والأخبار)
whale_now = current.get('whale_data', {})
# محاولة استخراج القيمة الخام بشكل آمن
whale_net_flow = whale_now.get('accumulation_analysis_24h', {}).get('net_flow_usd', 'No Data')
if whale_net_flow == 'No Data':
whale_section = "WHALE DATA: ⚠️ NO DATA FOUND"
else:
whale_section = f"WHALE DATA (24H Net Flow): ${whale_net_flow}"
news_text_now = current.get('news_text', 'No new significant news.')
# 3. البيانات الخام (الشموع الجديدة)
ohlcv_raw = current.get('ohlcv', {})
# نكتفي بـ 100 شمعة للمتابعة لتقليل الحمل
candles_csv_block = self._format_candles_to_csv(ohlcv_raw, limit=100)
return f"""
ROLE: You are the 'Omniscient Brain', the Lead Quantitative Analyst (Guardian Mode).
EVENT: RE-EVALUATION of an OPEN POSITION.
ASSET: {symbol} | DURATION: {duration_minutes:.1f} min | PnL: {pnl_percentage:+.2f}%
⚠️ CRITICAL INSTRUCTION:
You must analyze the RAW DATA to see if the original trade thesis is INVALIDATED.
Do NOT panic over small fluctuations. Look for structural breaks or trend reversals in the raw candles.
========== 1. POSITION STATUS ==========
* Entry Price: {entry_price}
* Current Price: {current_price}
* Current Targets -> TP: {trade.get('tp_price', 'N/A')} | SL: {trade.get('sl_price', 'N/A')}
========== 2. NEW CONTEXT (HEARSAY) ==========
[A] Tech Indicator Score Now: {titan_score_now:.2f}/1.00
[B] {whale_section}
[C] LATEST NEWS: "{news_text_now[:500]}..."
========== 3. THE TRUTH (LATEST RAW CANDLES) ==========
Index 99 is the MOST RECENT candle.
Format: Index,Open,High,Low,Close,Volume
{candles_csv_block}
========== 4. YOUR ANALYTICAL TASKS ==========
1. **Momentum Check:** Look at the last 10-20 candles. Is the momentum stalling or reversing against you?
2. **Volume check:** Is there heavy volume selling (red candles with high volume)?
3. **Thesis Validation:** Does the raw price action still support the original direction (UP)?
========== 5. FINAL DECISION OUTPUT ==========
Based ONLY on raw data analysis:
REQUIRED JSON FORMAT:
{{
"action": "HOLD" or "EMERGENCY_EXIT" or "UPDATE_TARGETS",
"new_tp": null or float (only if action is UPDATE_TARGETS),
"new_sl": null or float (only if action is UPDATE_TARGETS),
"reasoning": "Concise analysis of why the trade is still valid or broken based on raw candles."
}}
"""
print("✅ LLM Service V13.8 (Robust Null Safety) Loaded")