import os import traceback import asyncio import json from datetime import datetime from functools import wraps from backoff import on_exception, expo from openai import OpenAI, RateLimitError, APITimeoutError, APIStatusError import numpy as np # --- LLM Service Configuration --- NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY") PRIMARY_MODEL = "nvidia/llama-3.1-nemotron-ultra-253b-v1" COORDINATOR_MODEL = "meta/llama-4-scout-17b-16e-instruct" NVIDIA_RATE_LIMIT_CALLS = 20 NVIDIA_RATE_LIMIT_PERIOD = 60 class LLMService: def __init__(self): if not NVIDIA_API_KEY: raise ValueError("NVIDIA API key not found.") self.client = OpenAI(base_url="https://integrate.api.nvidia.com/v1", api_key=NVIDIA_API_KEY) @on_exception(expo, (APIStatusError, RateLimitError, APITimeoutError), max_tries=5, max_time=300) def call_model_with_retry(self, prompt_content, model_name, response_format): """Call model with exponential backoff retry.""" try: # Toggled on for detailed thinking messages = [ {"role": "system", "content": "You are a highly skilled AI assistant for crypto spot trading. Your primary role is to provide detailed reasoning and analysis before reaching a final decision."}, {"role": "user", "content": prompt_content} ] response = self.client.chat.completions.create( model=model_name, messages=messages, response_format=response_format ) return response.choices[0].message.content except RateLimitError: print("❗ Rate limit exceeded. Retrying...") raise except APITimeoutError: print("❗ API timeout. Retrying...") raise except APIStatusError as e: print(f"❌ API status error: {e.status_code} - {e.response}") raise except Exception as e: print(f"❌ An unexpected error occurred: {e}") return None def _convert_numpy_types(self, obj): """Recursively converts numpy types to standard Python types.""" if isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, np.generic): return obj.item() elif isinstance(obj, dict): return {k: self._convert_numpy_types(v) for k, v in obj.items()} elif isinstance(obj, list): return [self._convert_numpy_types(i) for i in obj] else: return obj async def re_analyze_trade_async(self, trade_data, processed_data, data_manager, start_time=None): """ Re-analyzes an open trade with new market data using a language model. """ try: current_time = time.time() elapsed_time_seconds = current_time - start_time if start_time else 0 # Convert numpy types in processed_data and trade_data clean_processed_data = self._convert_numpy_types(processed_data) clean_trade_data = self._convert_numpy_types(trade_data) market_context = await data_manager.get_market_context_async() re_analysis_prompt = ( f"""You are an AI assistant for crypto spot trading, focused on managing open trades. Your goal is to provide a clear action based on the latest data. Your output must be direct, concise, and in a specific format. Analyze the open trade and the latest market data to decide on one of the following actions: 1. **CLOSE_TRADE:** If conditions suggest a strong reason to exit the trade now (e.g., trend reversal, major price drop, or profit target reached). 2. **UPDATE_TP_SL:** If the trade is profitable, update the take-profit and stop-loss levels to secure gains and manage risk. 3. **HOLD:** If conditions are stable and there's no clear reason to change the current trade plan. The final output must be in the exact JSON format requested. Do not add any extra text. Open Trade Data: Asset: {clean_trade_data['asset']} Entry Price: {clean_trade_data['entry_price']} Current Stop Loss: {clean_trade_data['stop_loss']} Current Take Profit: {clean_trade_data['take_profit_1']} Latest Market Data: Current Price: {clean_processed_data['current_price']} Technical Features: {json.dumps(clean_processed_data['features'], indent=2)} Fibonacci Levels: {json.dumps(clean_processed_data.get('fibonacci_levels', {}), indent=2)} Time elapsed since data collection started: {elapsed_time_seconds:.2f} seconds. Strict Output Directives: - action: ("CLOSE_TRADE", "UPDATE_TP_SL", or "HOLD") - reasoning: (short, specific sentence) - new_stop_loss: (float or null) - new_take_profit: (float or null) Example 1 (CLOSE_TRADE): {{"action": "CLOSE_TRADE", "reasoning": "Price has dropped below the critical support level.", "new_stop_loss": null, "new_take_profit": null}} Example 2 (UPDATE_TP_SL): {{"action": "UPDATE_TP_SL", "reasoning": "Price has moved significantly, trailing stop loss to new support.", "new_stop_loss": {clean_processed_data['current_price'] * 0.95:.6f}, "new_take_profit": {clean_processed_data['current_price'] * 1.15:.6f}}} Example 3 (HOLD): {{"action": "HOLD", "reasoning": "Market is consolidating with no clear trend change.", "new_stop_loss": null, "new_take_profit": null}} """ ) print("🧠 LLM re-analysis raw decision:") raw_analysis = await asyncio.to_thread( self.call_model_with_retry, re_analysis_prompt, PRIMARY_MODEL, None ) coordinator_model_prompt = ( f"""You are a data transformation expert. Your sole purpose is to convert the following raw text into a valid JSON object. You must not add any extra text or explanations. Your output must be pure JSON. Raw Text: '''{raw_analysis}''' Required JSON Keys: - action (string: "CLOSE_TRADE", "HOLD", or "UPDATE_TP_SL") - reasoning (string) - new_stop_loss (float or null) - new_take_profit (float or null) """ ) json_string = await asyncio.to_thread( self.call_model_with_retry, coordinator_model_prompt, COORDINATOR_MODEL, {"type": "json_object"} ) re_analysis_decision = self._safe_json_parse(json_string) return re_analysis_decision except Exception as e: print(f"❌ Dynamic re-analysis failed for {trade_data['asset']}: {e}") traceback.print_exc() return None def _safe_json_parse(self, s): """Safely parses a JSON string.""" try: return json.loads(s) except json.JSONDecodeError as e: print(f"❌ Failed to parse JSON: {e}") print(f"Raw string: {s}") return None