Tradi / LLM.py
Riy777's picture
Rename LLm.py to LLM.py
96a6dcd verified
import os
import traceback
import asyncio
import json
from datetime import datetime
from functools import wraps
from backoff import on_exception, expo
from openai import OpenAI, RateLimitError, APITimeoutError, APIStatusError
import numpy as np
# --- LLM Service Configuration ---
NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY")
PRIMARY_MODEL = "nvidia/llama-3.1-nemotron-ultra-253b-v1"
COORDINATOR_MODEL = "meta/llama-4-scout-17b-16e-instruct"
NVIDIA_RATE_LIMIT_CALLS = 20
NVIDIA_RATE_LIMIT_PERIOD = 60
class LLMService:
def __init__(self):
if not NVIDIA_API_KEY:
raise ValueError("NVIDIA API key not found.")
self.client = OpenAI(base_url="https://integrate.api.nvidia.com/v1", api_key=NVIDIA_API_KEY)
@on_exception(expo, (APIStatusError, RateLimitError, APITimeoutError), max_tries=5, max_time=300)
def call_model_with_retry(self, prompt_content, model_name, response_format):
"""Call model with exponential backoff retry."""
try:
# Toggled on for detailed thinking
messages = [
{"role": "system", "content": "You are a highly skilled AI assistant for crypto spot trading. Your primary role is to provide detailed reasoning and analysis before reaching a final decision."},
{"role": "user", "content": prompt_content}
]
response = self.client.chat.completions.create(
model=model_name,
messages=messages,
response_format=response_format
)
return response.choices[0].message.content
except RateLimitError:
print("❗ Rate limit exceeded. Retrying...")
raise
except APITimeoutError:
print("❗ API timeout. Retrying...")
raise
except APIStatusError as e:
print(f"❌ API status error: {e.status_code} - {e.response}")
raise
except Exception as e:
print(f"❌ An unexpected error occurred: {e}")
return None
def _convert_numpy_types(self, obj):
"""Recursively converts numpy types to standard Python types."""
if isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, np.generic):
return obj.item()
elif isinstance(obj, dict):
return {k: self._convert_numpy_types(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [self._convert_numpy_types(i) for i in obj]
else:
return obj
async def re_analyze_trade_async(self, trade_data, processed_data, data_manager, start_time=None):
"""
Re-analyzes an open trade with new market data using a language model.
"""
try:
current_time = time.time()
elapsed_time_seconds = current_time - start_time if start_time else 0
# Convert numpy types in processed_data and trade_data
clean_processed_data = self._convert_numpy_types(processed_data)
clean_trade_data = self._convert_numpy_types(trade_data)
market_context = await data_manager.get_market_context_async()
re_analysis_prompt = (
f"""You are an AI assistant for crypto spot trading, focused on managing open trades. Your goal is to provide a clear action based on the latest data. Your output must be direct, concise, and in a specific format.
Analyze the open trade and the latest market data to decide on one of the following actions:
1. **CLOSE_TRADE:** If conditions suggest a strong reason to exit the trade now (e.g., trend reversal, major price drop, or profit target reached).
2. **UPDATE_TP_SL:** If the trade is profitable, update the take-profit and stop-loss levels to secure gains and manage risk.
3. **HOLD:** If conditions are stable and there's no clear reason to change the current trade plan.
The final output must be in the exact JSON format requested. Do not add any extra text.
Open Trade Data:
Asset: {clean_trade_data['asset']}
Entry Price: {clean_trade_data['entry_price']}
Current Stop Loss: {clean_trade_data['stop_loss']}
Current Take Profit: {clean_trade_data['take_profit_1']}
Latest Market Data:
Current Price: {clean_processed_data['current_price']}
Technical Features: {json.dumps(clean_processed_data['features'], indent=2)}
Fibonacci Levels: {json.dumps(clean_processed_data.get('fibonacci_levels', {}), indent=2)}
Time elapsed since data collection started: {elapsed_time_seconds:.2f} seconds.
Strict Output Directives:
- action: ("CLOSE_TRADE", "UPDATE_TP_SL", or "HOLD")
- reasoning: (short, specific sentence)
- new_stop_loss: (float or null)
- new_take_profit: (float or null)
Example 1 (CLOSE_TRADE):
{{"action": "CLOSE_TRADE", "reasoning": "Price has dropped below the critical support level.", "new_stop_loss": null, "new_take_profit": null}}
Example 2 (UPDATE_TP_SL):
{{"action": "UPDATE_TP_SL", "reasoning": "Price has moved significantly, trailing stop loss to new support.", "new_stop_loss": {clean_processed_data['current_price'] * 0.95:.6f}, "new_take_profit": {clean_processed_data['current_price'] * 1.15:.6f}}}
Example 3 (HOLD):
{{"action": "HOLD", "reasoning": "Market is consolidating with no clear trend change.", "new_stop_loss": null, "new_take_profit": null}}
"""
)
print("🧠 LLM re-analysis raw decision:")
raw_analysis = await asyncio.to_thread(
self.call_model_with_retry,
re_analysis_prompt,
PRIMARY_MODEL,
None
)
coordinator_model_prompt = (
f"""You are a data transformation expert. Your sole purpose is to convert the following raw text into a valid JSON object. You must not add any extra text or explanations. Your output must be pure JSON.
Raw Text:
'''{raw_analysis}'''
Required JSON Keys:
- action (string: "CLOSE_TRADE", "HOLD", or "UPDATE_TP_SL")
- reasoning (string)
- new_stop_loss (float or null)
- new_take_profit (float or null)
"""
)
json_string = await asyncio.to_thread(
self.call_model_with_retry,
coordinator_model_prompt,
COORDINATOR_MODEL,
{"type": "json_object"}
)
re_analysis_decision = self._safe_json_parse(json_string)
return re_analysis_decision
except Exception as e:
print(f"❌ Dynamic re-analysis failed for {trade_data['asset']}: {e}")
traceback.print_exc()
return None
def _safe_json_parse(self, s):
"""Safely parses a JSON string."""
try:
return json.loads(s)
except json.JSONDecodeError as e:
print(f"❌ Failed to parse JSON: {e}")
print(f"Raw string: {s}")
return None