| | import os |
| | import traceback |
| | import asyncio |
| | import json |
| | from datetime import datetime |
| | from functools import wraps |
| | from backoff import on_exception, expo |
| | from openai import OpenAI, RateLimitError, APITimeoutError, APIStatusError |
| | import numpy as np |
| |
|
| | |
| | NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY") |
| | PRIMARY_MODEL = "nvidia/llama-3.1-nemotron-ultra-253b-v1" |
| | COORDINATOR_MODEL = "meta/llama-4-scout-17b-16e-instruct" |
| | NVIDIA_RATE_LIMIT_CALLS = 20 |
| | NVIDIA_RATE_LIMIT_PERIOD = 60 |
| |
|
| | class LLMService: |
| | def __init__(self): |
| | if not NVIDIA_API_KEY: |
| | raise ValueError("NVIDIA API key not found.") |
| | self.client = OpenAI(base_url="https://integrate.api.nvidia.com/v1", api_key=NVIDIA_API_KEY) |
| | |
| | @on_exception(expo, (APIStatusError, RateLimitError, APITimeoutError), max_tries=5, max_time=300) |
| | def call_model_with_retry(self, prompt_content, model_name, response_format): |
| | """Call model with exponential backoff retry.""" |
| | try: |
| | |
| | messages = [ |
| | {"role": "system", "content": "You are a highly skilled AI assistant for crypto spot trading. Your primary role is to provide detailed reasoning and analysis before reaching a final decision."}, |
| | {"role": "user", "content": prompt_content} |
| | ] |
| | |
| | response = self.client.chat.completions.create( |
| | model=model_name, |
| | messages=messages, |
| | response_format=response_format |
| | ) |
| | return response.choices[0].message.content |
| | except RateLimitError: |
| | print("β Rate limit exceeded. Retrying...") |
| | raise |
| | except APITimeoutError: |
| | print("β API timeout. Retrying...") |
| | raise |
| | except APIStatusError as e: |
| | print(f"β API status error: {e.status_code} - {e.response}") |
| | raise |
| | except Exception as e: |
| | print(f"β An unexpected error occurred: {e}") |
| | return None |
| |
|
| | def _convert_numpy_types(self, obj): |
| | """Recursively converts numpy types to standard Python types.""" |
| | if isinstance(obj, np.ndarray): |
| | return obj.tolist() |
| | elif isinstance(obj, np.generic): |
| | return obj.item() |
| | elif isinstance(obj, dict): |
| | return {k: self._convert_numpy_types(v) for k, v in obj.items()} |
| | elif isinstance(obj, list): |
| | return [self._convert_numpy_types(i) for i in obj] |
| | else: |
| | return obj |
| |
|
| | async def re_analyze_trade_async(self, trade_data, processed_data, data_manager, start_time=None): |
| | """ |
| | Re-analyzes an open trade with new market data using a language model. |
| | """ |
| | try: |
| | current_time = time.time() |
| | elapsed_time_seconds = current_time - start_time if start_time else 0 |
| | |
| | |
| | clean_processed_data = self._convert_numpy_types(processed_data) |
| | clean_trade_data = self._convert_numpy_types(trade_data) |
| |
|
| | market_context = await data_manager.get_market_context_async() |
| |
|
| | re_analysis_prompt = ( |
| | f"""You are an AI assistant for crypto spot trading, focused on managing open trades. Your goal is to provide a clear action based on the latest data. Your output must be direct, concise, and in a specific format. |
| | |
| | Analyze the open trade and the latest market data to decide on one of the following actions: |
| | 1. **CLOSE_TRADE:** If conditions suggest a strong reason to exit the trade now (e.g., trend reversal, major price drop, or profit target reached). |
| | 2. **UPDATE_TP_SL:** If the trade is profitable, update the take-profit and stop-loss levels to secure gains and manage risk. |
| | 3. **HOLD:** If conditions are stable and there's no clear reason to change the current trade plan. |
| | |
| | The final output must be in the exact JSON format requested. Do not add any extra text. |
| | |
| | Open Trade Data: |
| | Asset: {clean_trade_data['asset']} |
| | Entry Price: {clean_trade_data['entry_price']} |
| | Current Stop Loss: {clean_trade_data['stop_loss']} |
| | Current Take Profit: {clean_trade_data['take_profit_1']} |
| | |
| | Latest Market Data: |
| | Current Price: {clean_processed_data['current_price']} |
| | Technical Features: {json.dumps(clean_processed_data['features'], indent=2)} |
| | Fibonacci Levels: {json.dumps(clean_processed_data.get('fibonacci_levels', {}), indent=2)} |
| | Time elapsed since data collection started: {elapsed_time_seconds:.2f} seconds. |
| | |
| | Strict Output Directives: |
| | - action: ("CLOSE_TRADE", "UPDATE_TP_SL", or "HOLD") |
| | - reasoning: (short, specific sentence) |
| | - new_stop_loss: (float or null) |
| | - new_take_profit: (float or null) |
| | |
| | Example 1 (CLOSE_TRADE): |
| | {{"action": "CLOSE_TRADE", "reasoning": "Price has dropped below the critical support level.", "new_stop_loss": null, "new_take_profit": null}} |
| | |
| | Example 2 (UPDATE_TP_SL): |
| | {{"action": "UPDATE_TP_SL", "reasoning": "Price has moved significantly, trailing stop loss to new support.", "new_stop_loss": {clean_processed_data['current_price'] * 0.95:.6f}, "new_take_profit": {clean_processed_data['current_price'] * 1.15:.6f}}} |
| | |
| | Example 3 (HOLD): |
| | {{"action": "HOLD", "reasoning": "Market is consolidating with no clear trend change.", "new_stop_loss": null, "new_take_profit": null}} |
| | """ |
| | ) |
| | print("π§ LLM re-analysis raw decision:") |
| | raw_analysis = await asyncio.to_thread( |
| | self.call_model_with_retry, |
| | re_analysis_prompt, |
| | PRIMARY_MODEL, |
| | None |
| | ) |
| |
|
| | coordinator_model_prompt = ( |
| | f"""You are a data transformation expert. Your sole purpose is to convert the following raw text into a valid JSON object. You must not add any extra text or explanations. Your output must be pure JSON. |
| | |
| | Raw Text: |
| | '''{raw_analysis}''' |
| | |
| | Required JSON Keys: |
| | - action (string: "CLOSE_TRADE", "HOLD", or "UPDATE_TP_SL") |
| | - reasoning (string) |
| | - new_stop_loss (float or null) |
| | - new_take_profit (float or null) |
| | """ |
| | ) |
| | |
| | json_string = await asyncio.to_thread( |
| | self.call_model_with_retry, |
| | coordinator_model_prompt, |
| | COORDINATOR_MODEL, |
| | {"type": "json_object"} |
| | ) |
| | |
| | re_analysis_decision = self._safe_json_parse(json_string) |
| | return re_analysis_decision |
| | |
| | except Exception as e: |
| | print(f"β Dynamic re-analysis failed for {trade_data['asset']}: {e}") |
| | traceback.print_exc() |
| | return None |
| |
|
| | def _safe_json_parse(self, s): |
| | """Safely parses a JSON string.""" |
| | try: |
| | return json.loads(s) |
| | except json.JSONDecodeError as e: |
| | print(f"β Failed to parse JSON: {e}") |
| | print(f"Raw string: {s}") |
| | return None |