import os import sys import time import json import gradio as gr import threading import pandas as pd import numpy as np from datetime import datetime, timedelta import matplotlib.pyplot as plt import requests import sqlite3 from typing import Dict, Any, List, Optional import openai # Add project root to path for imports sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) # Import necessary modules from src.crypto_analysis.tools.technical_tools import TechnicalAnalysisStrategy, IndicatorCalculator from src.crypto_analysis.tools.order_tools import AlpacaCryptoOrderTool from src.crypto_analysis.tools.bitcoin_tools import YahooBitcoinDataTool from src.crypto_analysis.tools.yahoo_tools import YahooCryptoMarketTool from src.crypto_analysis.crew import BitcoinAnalysisCrew # Load environment variables from dotenv import load_dotenv load_dotenv() # Initialize the database DB_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../data/cryptic.db')) os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) def init_database(): """Initialize the SQLite database with necessary tables""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Create strategies table cursor.execute(''' CREATE TABLE IF NOT EXISTS strategies ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, description TEXT, strategy_text TEXT NOT NULL, parameters TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Create transactions table cursor.execute(''' CREATE TABLE IF NOT EXISTS transactions ( id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT NOT NULL, action TEXT NOT NULL, quantity REAL NOT NULL, price REAL NOT NULL, status TEXT NOT NULL, allocation_percentage INTEGER, order_id TEXT, strategy_id INTEGER, reasoning TEXT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (strategy_id) REFERENCES strategies (id) ) ''') # Create analysis_results table cursor.execute(''' CREATE TABLE IF NOT EXISTS analysis_results ( id INTEGER PRIMARY KEY AUTOINCREMENT, signal TEXT NOT NULL, confidence INTEGER, allocation_percentage INTEGER, reasoning TEXT, indicator_values TEXT, strategy_id INTEGER, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (strategy_id) REFERENCES strategies (id) ) ''') conn.commit() conn.close() # Initialize the database when module is loaded init_database() # Global variables for strategy parameters strategy_params = { "timeframe_minutes": 60, "max_allocation_percentage": 50, "strategy_text": None } # Store analysis results and orders analysis_results = [] orders_history = [] active_trades = [] # Flag to control the background thread running = False background_thread = None # Function to get available strategies from the database def get_saved_strategies(): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT id, name, description FROM strategies") strategies = [{"id": row[0], "name": row[1], "description": row[2]} for row in cursor.fetchall()] conn.close() return strategies # Function to get a specific strategy by ID def get_strategy_by_id(strategy_id): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT id, name, description, strategy_text, parameters FROM strategies WHERE id = ?", (strategy_id,)) row = cursor.fetchone() conn.close() if row: return { "id": row[0], "name": row[1], "description": row[2], "strategy_text": row[3], "parameters": json.loads(row[4]) } return None # Function to save a strategy to the database def save_strategy(name, description, strategy_text, parameters): try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Check if strategy name already exists cursor.execute("SELECT id FROM strategies WHERE name = ?", (name,)) existing = cursor.fetchone() if existing: # Update existing strategy cursor.execute( "UPDATE strategies SET description = ?, strategy_text = ?, parameters = ? WHERE name = ?", (description, strategy_text, json.dumps(parameters), name) ) else: # Insert new strategy cursor.execute( "INSERT INTO strategies (name, description, strategy_text, parameters) VALUES (?, ?, ?, ?)", (name, strategy_text, strategy_text, json.dumps(parameters)) ) conn.commit() conn.close() return True, "Strategy saved successfully" except Exception as e: return False, f"Error saving strategy: {str(e)}" # Function to save analysis result to the database def save_analysis_result(result, strategy_id=None): try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Extract values from result signal = result.get("signal", "unknown") confidence = result.get("confidence", 0) allocation_percentage = result.get("allocation_percentage", 0) reasoning = result.get("reasoning", "") # Convert technical indicators to JSON string indicator_values = json.dumps(result.get("technical_indicators", {})) # Insert into database cursor.execute( "INSERT INTO analysis_results (signal, confidence, allocation_percentage, reasoning, indicator_values, strategy_id) VALUES (?, ?, ?, ?, ?, ?)", (signal, confidence, allocation_percentage, reasoning, indicator_values, strategy_id) ) conn.commit() conn.close() return True except Exception as e: print(f"Error saving analysis result: {e}") return False # Function to save transaction to the database def save_transaction(order_data, strategy_id=None): try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Extract values from order data symbol = order_data.get("symbol", "BTC/USD") action = order_data.get("action", "unknown") quantity = order_data.get("quantity", 0) price = order_data.get("price", 0) status = order_data.get("status", "unknown") allocation_percentage = order_data.get("allocation_percentage", 0) order_id = order_data.get("order_id", "") reasoning = order_data.get("reasoning", "") # Insert into database cursor.execute( "INSERT INTO transactions (symbol, action, quantity, price, status, allocation_percentage, order_id, strategy_id, reasoning) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", (symbol, action, quantity, price, status, allocation_percentage, order_id, strategy_id, reasoning) ) conn.commit() conn.close() return True except Exception as e: print(f"Error saving transaction: {e}") return False # Function to get recent analysis results from the database def get_recent_analysis_results(limit=10): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" SELECT ar.id, ar.signal, ar.confidence, ar.allocation_percentage, ar.reasoning, ar.timestamp, s.name as strategy_name FROM analysis_results ar LEFT JOIN strategies s ON ar.strategy_id = s.id ORDER BY ar.timestamp DESC LIMIT ? """, (limit,)) results = [] for row in cursor.fetchall(): results.append({ "id": row[0], "signal": row[1], "confidence": row[2], "allocation_percentage": row[3], "reasoning": row[4], "timestamp": row[5], "strategy_name": row[6] or "Default Strategy" }) conn.close() return results # Function to get recent transactions from the database def get_recent_transactions(limit=10): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" SELECT t.id, t.symbol, t.action, t.quantity, t.price, t.status, t.allocation_percentage, t.timestamp, s.name as strategy_name FROM transactions t LEFT JOIN strategies s ON t.strategy_id = s.id ORDER BY t.timestamp DESC LIMIT ? """, (limit,)) transactions = [] for row in cursor.fetchall(): transactions.append({ "id": row[0], "symbol": row[1], "action": row[2], "quantity": row[3], "price": row[4], "status": row[5], "allocation_percentage": row[6], "timestamp": row[7], "strategy_name": row[8] or "Default Strategy" }) conn.close() return transactions # Function to fetch account information def fetch_account_info(): try: order_tool = AlpacaCryptoOrderTool() account_data = order_tool._check_account() return account_data except Exception as e: print(f"Error fetching account info: {e}") return {"error": str(e), "cash": 0, "equity": 0} # Function to reset portfolio (close all positions) def reset_portfolio(): try: # Create a session with authentication api_key = os.getenv("ALPACA_API_KEY") api_secret = os.getenv("ALPACA_API_SECRET") headers = { "APCA-API-KEY-ID": api_key, "APCA-API-SECRET-KEY": api_secret } # Use paper trading URL base_url = "https://paper-api.alpaca.markets" # 1. First cancel all open orders response = requests.delete(f"{base_url}/v2/orders", headers=headers) if response.status_code != 204 and response.status_code != 200: print(f"Error canceling orders: {response.status_code}, {response.text}") return f"Error canceling orders: {response.status_code}" # 2. Then close all positions response = requests.delete(f"{base_url}/v2/positions", headers=headers) if response.status_code != 204 and response.status_code != 200: print(f"Error closing positions: {response.status_code}, {response.text}") return f"Error closing positions: {response.status_code}" return "Portfolio reset successfully. All positions closed and orders canceled." except Exception as e: print(f"Error resetting portfolio: {e}") return f"Error resetting portfolio: {str(e)}" # Function to fetch order history def fetch_order_history(): try: # Create a session with authentication api_key = os.getenv("ALPACA_API_KEY") api_secret = os.getenv("ALPACA_API_SECRET") headers = { "APCA-API-KEY-ID": api_key, "APCA-API-SECRET-KEY": api_secret } # Use paper trading URL base_url = "https://paper-api.alpaca.markets" # Fetch orders response = requests.get(f"{base_url}/v2/orders?status=all&limit=100", headers=headers) if response.status_code == 200: orders = response.json() # Filter for BTC orders and format them btc_orders = [order for order in orders if "BTC" in order.get("symbol", "")] formatted_orders = [] for order in btc_orders: formatted_orders.append({ "id": order["id"], "symbol": order["symbol"], "side": order["side"], "type": order["type"], "qty": order["qty"], "status": order["status"], "created_at": order["created_at"], "filled_at": order.get("filled_at", "N/A"), "filled_qty": order.get("filled_qty", "0"), "filled_avg_price": order.get("filled_avg_price", "0") }) return formatted_orders else: print(f"Error fetching orders: {response.status_code}, {response.text}") return [] except Exception as e: print(f"Error in fetch_order_history: {e}") return [] # Function to fetch active positions def fetch_active_positions(): try: api_key = os.getenv("ALPACA_API_KEY") api_secret = os.getenv("ALPACA_API_SECRET") headers = { "APCA-API-KEY-ID": api_key, "APCA-API-SECRET-KEY": api_secret } base_url = "https://paper-api.alpaca.markets" # Fetch positions response = requests.get(f"{base_url}/v2/positions", headers=headers) if response.status_code == 200: positions = response.json() # Filter for BTC positions btc_positions = [pos for pos in positions if "BTC" in pos.get("symbol", "")] formatted_positions = [] for pos in btc_positions: # Calculate profit/loss current_price = float(pos.get("current_price", 0)) avg_entry_price = float(pos.get("avg_entry_price", 0)) qty = float(pos.get("qty", 0)) profit_loss = (current_price - avg_entry_price) * qty profit_loss_percent = ((current_price / avg_entry_price) - 1) * 100 if avg_entry_price > 0 else 0 formatted_positions.append({ "symbol": pos["symbol"], "qty": pos["qty"], "avg_entry_price": pos["avg_entry_price"], "current_price": pos["current_price"], "profit_loss": round(profit_loss, 2), "profit_loss_percent": round(profit_loss_percent, 2), "market_value": pos["market_value"], "side": pos["side"] }) return formatted_positions else: print(f"Error fetching positions: {response.status_code}, {response.text}") return [] except Exception as e: print(f"Error in fetch_active_positions: {e}") return [] # Function to run technical analysis with just the TA agent def run_ta_agent_only(strategy_text=None): try: tech_strategy = TechnicalAnalysisStrategy() # Use the provided strategy text or the global one strategy_text = strategy_text or strategy_params.get("strategy_text") if not strategy_text: return { "error": "No strategy text provided", "signal": "hold", "confidence": 0, "allocation_percentage": 0, "reasoning": "Please enter a strategy description first" } # Get the indicator data from the tool indicator_data = tech_strategy._run() if "error" in indicator_data: return { "error": indicator_data["error"], "signal": "hold", "confidence": 0, "allocation_percentage": 0, "reasoning": f"Error fetching indicator data: {indicator_data['error']}" } # Use OpenAI to interpret the strategy based on the indicator data signal_data = interpret_strategy_with_llm(indicator_data, strategy_text, strategy_params["max_allocation_percentage"]) # Add timestamp signal_data["timestamp"] = datetime.now().isoformat() # Add to analysis results analysis_results.append(signal_data) # Save to database save_analysis_result(signal_data) return signal_data except Exception as e: print(f"Error running TA agent: {e}") return {"error": str(e)} # New function to interpret strategy with LLM def interpret_strategy_with_llm(indicator_data, strategy_text, max_allocation_percentage=50): try: # Create the system prompt for the LLM system_prompt = """ You are a cryptocurrency trading strategy interpreter. Your task is to analyze the provided technical indicators and price data, then interpret the user's strategy to generate a trading signal. You must return a JSON object with the following fields: - signal: "buy", "sell", or "hold" - confidence: Integer between 0-95 (how confident you are in the signal) - allocation_percentage: Integer between 0-{max_allocation} (how much of the portfolio to allocate) - reasoning: String explanation of your decision process Be pragmatic and conservative. Only give buy/sell signals when the conditions are clearly met. Base your decision on the indicator values, not on general market sentiment or news. """ # Prepare price data - might need to be retrieved from indicator_data price = indicator_data.get("price", 0) # Create the user prompt with all the data user_prompt = f""" # Technical Indicators {json.dumps(indicator_data, indent=2)} # Strategy Description {strategy_text} Analyze the above data according to the strategy description and generate a trading signal. Respond only with JSON. Maximum allocation is {max_allocation_percentage}%. """ # Make the call to OpenAI response = openai.chat.completions.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": system_prompt.format(max_allocation=max_allocation_percentage)}, {"role": "user", "content": user_prompt} ], temperature=0.2, response_format={"type": "json_object"} ) # Extract and parse the response content response_content = response.choices[0].message.content result = json.loads(response_content) # Ensure the result has the expected fields if not all(k in result for k in ["signal", "confidence", "allocation_percentage", "reasoning"]): missing = [k for k in ["signal", "confidence", "allocation_percentage", "reasoning"] if k not in result] print(f"LLM response missing required fields: {missing}") result = { "signal": result.get("signal", "hold"), "confidence": result.get("confidence", 50), "allocation_percentage": result.get("allocation_percentage", 0), "reasoning": result.get("reasoning", "No reasoning provided.") } # Ensure values are within expected ranges result["confidence"] = max(0, min(95, int(result["confidence"]))) result["allocation_percentage"] = max(0, min(max_allocation_percentage, int(result["allocation_percentage"]))) # Add indicator values to the result result["technical_indicators"] = indicator_data print(f"LLM generated signal: {result['signal']} with confidence {result['confidence']}%") return result except Exception as e: print(f"Error interpreting strategy with LLM: {e}") import traceback traceback.print_exc() return { "signal": "hold", "confidence": 0, "allocation_percentage": 0, "reasoning": f"Error interpreting strategy with LLM: {str(e)}" } # Function to run the full analysis using the crew def run_full_analysis(): try: # Get the strategy text from global parameters strategy_text = strategy_params.get("strategy_text") if not strategy_text: return { "error": "No strategy text provided", "signal": "hold", "confidence": 0, "allocation_percentage": 0, "reasoning": "Please enter a strategy description first" } crew = BitcoinAnalysisCrew() result = crew.run_analysis(strategy_text=strategy_text) # Add timestamp result["timestamp"] = datetime.now().isoformat() # Add to analysis results analysis_results.append(result) # Save to database save_analysis_result(result) # Check if order was executed, if so add to orders if "order_execution" in result and isinstance(result["order_execution"], dict): if result["order_execution"].get("success", False): orders_history.append(result["order_execution"]) save_transaction(result["order_execution"]) return result except Exception as e: print(f"Error running crew analysis: {e}") return {"error": str(e)} # Background process function def background_process(): global running # Create a log file for the automated trading session log_file = os.path.join(os.path.dirname(DB_PATH), f"auto_trading_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt") with open(log_file, 'w') as f: f.write(f"Automated trading session started at {datetime.now().isoformat()}\n") f.write(f"Strategy parameters: {json.dumps(strategy_params)}\n\n") error_count = 0 max_errors = 5 while running: try: print(f"Running full crew analysis at {datetime.now().isoformat()} with params: {strategy_params}") # Log to file as well with open(log_file, 'a') as f: f.write(f"\n--- Analysis run at {datetime.now().isoformat()} ---\n") # Check if strategy text is available if not strategy_params.get("strategy_text"): print("No strategy text available for automated analysis. Skipping.") with open(log_file, 'a') as f: f.write("No strategy text available for automated analysis. Skipping.\n") time.sleep(60) # Sleep for a minute and check again continue # Run the full crew analysis instead of just TA agent result = run_full_analysis() print(f"Analysis result: {result.get('signal', 'unknown')} with confidence {result.get('confidence', 0)}%") # Log the result with open(log_file, 'a') as f: f.write(f"Signal: {result.get('signal', 'unknown')}, Confidence: {result.get('confidence', 0)}%, Allocation: {result.get('allocation_percentage', 0)}%\n") f.write(f"Reasoning: {result.get('reasoning', 'No reasoning provided')}\n") # Log order execution details if any if "order_execution" in result and result["order_execution"]: order_details = result["order_execution"] if isinstance(order_details, dict): f.write(f"Order executed: {order_details.get('action', 'unknown')} {order_details.get('quantity', 0)} BTC at ${order_details.get('price', 0)}\n") else: f.write(f"Order info: {str(order_details)}\n") # Update the active trades positions = fetch_active_positions() global active_trades active_trades = positions # Log current positions with open(log_file, 'a') as f: f.write("\nCurrent Positions:\n") if positions: for pos in positions: f.write(f" {pos['symbol']}: {pos['qty']} @ ${pos['avg_entry_price']} - P/L: ${pos['profit_loss']} ({pos['profit_loss_percent']}%)\n") else: f.write(" No active positions\n") # Log account info account = fetch_account_info() f.write(f"\nAccount Balance: ${account.get('cash', 'N/A')}, Equity: ${account.get('equity', 'N/A')}\n") # Reset error counter on successful run error_count = 0 # Sleep for the specified timeframe interval interval_seconds = strategy_params["timeframe_minutes"] * 60 print(f"Sleeping for {interval_seconds} seconds ({strategy_params['timeframe_minutes']} minutes)") # Sleep in smaller increments to allow for quicker stopping for _ in range(min(interval_seconds, 3600), 0, -10): # Sleep in 10-second increments if not running: break time.sleep(10) except Exception as e: error_count += 1 error_message = f"Error in background process: {str(e)}" print(error_message) import traceback trace = traceback.format_exc() print(trace) # Log error with open(log_file, 'a') as f: f.write(f"\nERROR: {error_message}\n") f.write(trace + "\n") # If too many consecutive errors, pause for a longer time if error_count >= max_errors: print(f"Too many errors ({error_count}). Pausing for 30 minutes.") with open(log_file, 'a') as f: f.write(f"Too many errors ({error_count}). Pausing for 30 minutes.\n") # Sleep but still check for stop signal for _ in range(1800, 0, -10): if not running: break time.sleep(10) # Reset error count after pause error_count = 0 else: # Short pause before retrying time.sleep(60) # Function to start the background process def start_background_process(): global running, background_thread if not running: running = True background_thread = threading.Thread(target=background_process) background_thread.daemon = True background_thread.start() return f"Background analysis started. Running full crew analysis every {strategy_params['timeframe_minutes']} minutes." else: return "Background analysis is already running." # Function to stop the background process def stop_background_process(): global running if running: running = False return "Background analysis stopped." else: return "Background analysis is not running." # Function to update strategy parameters def update_strategy(timeframe, max_allocation): global strategy_params try: # Convert values to appropriate types timeframe = int(timeframe) max_allocation = int(max_allocation) # Update the global dictionary strategy_params["timeframe_minutes"] = timeframe strategy_params["max_allocation_percentage"] = max_allocation print(f"Updated strategy parameters: {strategy_params}") return f"Strategy parameters updated: Timeframe: {timeframe} minutes, Max allocation: {max_allocation}%" except Exception as e: return f"Error updating strategy parameters: {e}" # Function to update custom strategy text def update_strategy_text(strategy_text): global strategy_params try: strategy_params["strategy_text"] = strategy_text print(f"Updated strategy text: {strategy_text[:100]}...") return "Strategy text updated successfully" except Exception as e: return f"Error updating strategy text: {e}" # Function to save current strategy def save_current_strategy(name, description): try: # Get current strategy parameters parameters = strategy_params.copy() strategy_text = parameters.get("strategy_text", "") # If strategy text is empty, provide a default description if not strategy_text: strategy_text = f"Default RSI ({parameters['rsi_lower_threshold']}-{parameters['rsi_upper_threshold']}) and Bollinger Bands strategy" # Save to database success, message = save_strategy(name, description, strategy_text, parameters) if success: return f"Strategy '{name}' saved successfully" else: return message except Exception as e: return f"Error saving strategy: {e}" # Function to run a manual trade def execute_trade(action, symbol, allocation_pct): try: order_tool = AlpacaCryptoOrderTool() result = order_tool._run( action=action, symbol=symbol, allocation_percentage=int(allocation_pct) ) if result.get("success", False): orders_history.append(result) # Save to database save_transaction(result) return f"Trade executed: {action.upper()} {symbol} with {allocation_pct}% allocation" else: return f"Trade failed: {result.get('error', 'Unknown error')}" except Exception as e: return f"Error executing trade: {e}" # Function to get account summary for display def get_account_summary(): account = fetch_account_info() positions = fetch_active_positions() cash = account.get("cash", "0") equity = account.get("equity", "0") total_positions = len(positions) total_value = sum(float(pos.get("market_value", 0)) for pos in positions) total_pl = sum(pos.get("profit_loss", 0) for pos in positions) return f""" **Account Summary** - Cash: ${cash} - Equity: ${equity} - Active Positions: {total_positions} - Positions Value: ${total_value:.2f} - Total P/L: ${total_pl:.2f} """ # Function to format analysis results for display def format_analysis_results(): # First try to get from database db_results = get_recent_analysis_results(1) if db_results: latest = db_results[0] timestamp = latest.get("timestamp", datetime.now().isoformat()) signal = latest.get("signal", "unknown").upper() confidence = latest.get("confidence", 0) allocation = latest.get("allocation_percentage", 0) reasoning = latest.get("reasoning", "No reasoning provided.") strategy_name = latest.get("strategy_name", "Default Strategy") return f""" **Latest Analysis ({timestamp})** Strategy: {strategy_name} Signal: {signal} Confidence: {confidence}% Allocation: {allocation}% Reasoning: {reasoning} """ # Fallback to memory if database is empty if not analysis_results: return "No analysis results available." latest = analysis_results[-1] timestamp = latest.get("timestamp", datetime.now().isoformat()) signal = latest.get("signal", "unknown").upper() confidence = latest.get("confidence", 0) allocation = latest.get("allocation_percentage", 0) reasoning = latest.get("reasoning", "No reasoning provided.") return f""" **Latest Analysis ({timestamp})** Signal: {signal} Confidence: {confidence}% Allocation: {allocation}% Reasoning: {reasoning} """ # Function to format active positions for display def format_active_positions(): positions = fetch_active_positions() if not positions: return "No active positions." result = "## Active Positions\n\n" for pos in positions: result += f""" **{pos['symbol']}** Quantity: {pos['qty']} BTC Entry: ${pos['avg_entry_price']} Current: ${pos['current_price']} P/L: ${pos['profit_loss']} ({pos['profit_loss_percent']}%) Value: ${pos['market_value']} """ return result # Function to format order history for display def format_order_history(): # Try to get from database first db_transactions = get_recent_transactions(10) if db_transactions: result = "## Recent Transactions\n\n" for tx in db_transactions: result += f""" **{tx['symbol']} {tx['action'].upper()}** Quantity: {tx['quantity']} Price: ${tx['price']} Status: {tx['status']} Allocation: {tx['allocation_percentage']}% Date: {tx['timestamp']} Strategy: {tx['strategy_name']} """ return result # Fall back to API if database is empty orders = fetch_order_history() if not orders: return "No order history." result = "## Order History (Last 10)\n\n" for order in orders[:10]: result += f""" **{order['symbol']} {order['side'].upper()}** Quantity: {order['qty']} Type: {order['type']} Status: {order['status']} Created: {order['created_at']} Filled: {order.get('filled_at', 'N/A')} Filled Price: ${order.get('filled_avg_price', 'N/A')} """ return result # Function to get available indicators for display def get_available_indicators(): indicators = IndicatorCalculator.get_available_indicators() result = "## Available Indicators\n\n" for name, description in indicators.items(): result += f"**{name}**: {description}\n\n" return result # Function to format detailed analysis results for display def format_detailed_analysis_results(result=None): if result is None: # First try to get from database db_results = get_recent_analysis_results(1) if db_results: result = db_results[0] elif analysis_results: result = analysis_results[-1] else: return "No analysis results available." # Basic information timestamp = result.get("timestamp", datetime.now().isoformat()) signal = result.get("signal", "unknown").upper() confidence = result.get("confidence", 0) allocation = result.get("allocation_percentage", 0) reasoning = result.get("reasoning", "No reasoning provided.") strategy_name = result.get("strategy_name", "Default Strategy") # Detailed sections sections = [] # Add header sections.append(f"## Analysis Results ({timestamp})") sections.append(f"### Strategy: {strategy_name}") sections.append(f"### Signal: {signal} | Confidence: {confidence}% | Allocation: {allocation}%") # Technical indicator values (if available) if "technical_indicators" in result: sections.append("### Technical Indicators") indicators = result["technical_indicators"] indicators_text = [] # Display price first if "price" in indicators: indicators_text.append(f"- **Price**: ${indicators['price']:.2f}") # Display RSI values rsi_indicators = {k: v for k, v in indicators.items() if "rsi" in k.lower() and v is not None} if rsi_indicators: indicators_text.append("- **RSI**:") for k, v in rsi_indicators.items(): indicators_text.append(f" - {k}: {v:.2f}") # Display Bollinger Bands bb_indicators = {k: v for k, v in indicators.items() if "bb_" in k.lower() and v is not None} if bb_indicators: indicators_text.append("- **Bollinger Bands**:") for k, v in bb_indicators.items(): indicators_text.append(f" - {k}: {v:.2f}") # Display MACD macd_indicators = {k: v for k, v in indicators.items() if "macd" in k.lower() and v is not None} if macd_indicators: indicators_text.append("- **MACD**:") for k, v in macd_indicators.items(): indicators_text.append(f" - {k}: {v:.2f}") # Display other indicators other_indicators = {k: v for k, v in indicators.items() if not any(x in k.lower() for x in ["rsi", "bb_", "macd", "price"]) and v is not None} if other_indicators: indicators_text.append("- **Other Indicators**:") for k, v in other_indicators.items(): indicators_text.append(f" - {k}: {v:.2f}") sections.append("\n".join(indicators_text)) # Add technical analysis section if "technical_analysis" in result: sections.append("### Technical Analysis") sections.append(f"```\n{result['technical_analysis']}\n```") # Add initial analysis section if "initial_analysis" in result: sections.append("### Market Context Analysis") sections.append(f"```\n{result['initial_analysis']}\n```") # Add reflection analysis section if "reflection_analysis" in result: sections.append("### Sentiment Analysis") sections.append(f"```\n{result['reflection_analysis']}\n```") # Add tool errors section if present if "tool_error_summary" in result or "tool_error_assessment" in result: sections.append("### Data Limitations") if "tool_error_assessment" in result: sections.append(f"```\n{result['tool_error_assessment']}\n```") if "tool_error_summary" in result: sections.append(f"Tool Errors: {result['tool_error_summary']}") # Add detailed reasoning sections.append("### Detailed Reasoning") sections.append(f"```\n{reasoning}\n```") # Add market outlook if available if "market_outlook" in result: sections.append("### Market Outlook") sections.append(f"```\n{result['market_outlook']}\n```") # Add risk assessment if available if "risk_assessment" in result: sections.append("### Risk Assessment") sections.append(f"```\n{result['risk_assessment']}\n```") # Add order execution details if available if "order_execution" in result: sections.append("### Trade Execution") if isinstance(result["order_execution"], dict): order = result["order_execution"] order_details = ["- **Status**: Success"] for k, v in order.items(): if k != "success": order_details.append(f"- **{k.replace('_', ' ').title()}**: {v}") sections.append("\n".join(order_details)) else: sections.append(str(result["order_execution"])) return "\n\n".join(sections) # Create the Gradio interface with gr.Blocks(title="CrypticAI - Bitcoin Trading Dashboard") as app: gr.Markdown("# CrypticAI - Bitcoin Trading Dashboard") with gr.Tabs(): # Strategy Tab with gr.TabItem("Strategy Configuration"): gr.Markdown("## Strategy Parameters") with gr.Row(): with gr.Column(): timeframe = gr.Slider(minimum=1, maximum=240, value=strategy_params["timeframe_minutes"], step=1, label="Timeframe (minutes)") with gr.Column(): max_allocation = gr.Slider(minimum=5, maximum=100, value=strategy_params["max_allocation_percentage"], step=5, label="Maximum Allocation (%)") strategy_update_btn = gr.Button("Update Strategy Parameters") strategy_message = gr.Textbox(label="Strategy Update Status") strategy_update_btn.click(update_strategy, inputs=[timeframe, max_allocation], outputs=strategy_message) gr.Markdown("---") gr.Markdown("## Strategy Management") # Create a tabbed interface for strategy management with gr.Tabs(): # Tab 1: Select Existing Strategy with gr.TabItem("Select Strategy"): gr.Markdown("### Available Strategies") # Display saved strategies as a table saved_strategies = gr.Dataframe( headers=["ID", "Name", "Description"], datatype=["number", "str", "str"], label="Available Strategies" ) refresh_strategies_btn = gr.Button("Refresh Strategies") def get_strategies_as_df(): strategies = get_saved_strategies() if not strategies: return [[0, "No strategies found", "Create a new strategy first"]] return [[s["id"], s["name"], s["description"]] for s in strategies] refresh_strategies_btn.click(get_strategies_as_df, outputs=saved_strategies) # Create a dropdown for strategy selection instead of number input gr.Markdown("### Select a Strategy") def get_strategy_dropdown_choices(): strategies = get_saved_strategies() if not strategies: return [("No strategies available", 0)] return [(f"{s['id']} - {s['name']}", s['id']) for s in strategies] strategy_dropdown = gr.Dropdown( choices=get_strategy_dropdown_choices(), label="Choose Strategy", value=None ) # Connect refresh button to also update dropdown def refresh_all_strategy_displays(): df = get_strategies_as_df() choices = get_strategy_dropdown_choices() return df, choices refresh_strategies_btn.click( refresh_all_strategy_displays, outputs=[saved_strategies, strategy_dropdown] ) # Load, Delete buttons with gr.Row(): load_btn = gr.Button("Load Selected Strategy", variant="primary") delete_btn = gr.Button("Delete Selected Strategy", variant="stop") # Add a status message strategy_action_message = gr.Textbox(label="Status", interactive=False) # Add strategy details display section gr.Markdown("### Selected Strategy Details") selected_strategy_view = gr.Markdown("No strategy selected") # Update load strategy function to work with dropdown def load_selected_strategy(strategy_dropdown_value): if not strategy_dropdown_value: return "Please select a strategy first", "No strategy selected" try: strategy_id = int(strategy_dropdown_value) strategy = get_strategy_by_id(strategy_id) if not strategy: return "Strategy not found", "Strategy not found" # Update global parameters global strategy_params strategy_params["strategy_text"] = strategy["strategy_text"] for key, value in strategy["parameters"].items(): if key in strategy_params: strategy_params[key] = value # Create a markdown view of the strategy - improved formatting view_text = f""" ## {strategy['name']} **Description**: {strategy['description']} **Strategy Logic**: ``` {strategy['strategy_text']} ``` **Parameters**: - Timeframe: {strategy["parameters"].get("timeframe_minutes", "N/A")} minutes - Max Allocation: {strategy["parameters"].get("max_allocation_percentage", "N/A")}% """ # Return success message and view return f"Strategy '{strategy['name']}' loaded successfully", view_text except Exception as e: return f"Error loading strategy: {e}", "Error loading strategy" # Delete strategy function updated for dropdown def delete_selected_strategy(strategy_dropdown_value): if not strategy_dropdown_value: return "Please select a strategy first", get_strategy_dropdown_choices() try: strategy_id = int(strategy_dropdown_value) conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Check if strategy exists cursor.execute("SELECT name FROM strategies WHERE id = ?", (strategy_id,)) result = cursor.fetchone() if not result: conn.close() return "Strategy not found", get_strategy_dropdown_choices() strategy_name = result[0] # Delete the strategy cursor.execute("DELETE FROM strategies WHERE id = ?", (strategy_id,)) conn.commit() conn.close() # Get updated choices new_choices = get_strategy_dropdown_choices() return f"Strategy '{strategy_name}' deleted successfully", new_choices except Exception as e: return f"Error deleting strategy: {e}", get_strategy_dropdown_choices() # Connect buttons to new functions load_btn.click( load_selected_strategy, inputs=[strategy_dropdown], outputs=[strategy_action_message, selected_strategy_view] ) delete_btn.click( delete_selected_strategy, inputs=[strategy_dropdown], outputs=[strategy_action_message, strategy_dropdown] ) # Tab 2: Create/Edit Strategy with gr.TabItem("Create/Edit Strategy"): gr.Markdown("### Strategy Editor") # Strategy editor section strategy_name = gr.Textbox(label="Strategy Name", placeholder="Enter a name for your strategy") strategy_description = gr.Textbox(label="Strategy Description", placeholder="Enter a brief description of what your strategy does") # Add a checkbox to indicate if editing existing strategy is_editing = gr.Checkbox(label="Edit Existing Strategy", value=False) edit_id = gr.Number(label="Strategy ID to Edit", value=0, visible=False) # Add a dropdown to select strategy for editing edit_strategy_dropdown = gr.Dropdown( choices=get_strategy_dropdown_choices(), label="Choose Strategy to Edit", visible=False ) gr.Markdown("### Strategy Logic") gr.Markdown("""Write your trading strategy using the available indicators. Be specific about entry and exit conditions. **Example Strategy:** ``` Buy when RSI is below 30 and the price is near the lower Bollinger Band (position < 0.2). Sell when RSI is above 70 and the price is near the upper Bollinger Band (position > 0.8). Increase confidence if the ADX shows a strong trend (> 25) in the same direction. Use 40% of available capital for trades if confidence is high (> 70), otherwise use 20%. ``` The strategy will be interpreted by AI to generate trading signals based on current market conditions.""") # Available indicators with gr.Accordion("View Available Indicators", open=False): indicators_info = gr.Markdown(get_available_indicators()) # Strategy text editor - moved up before being referenced strategy_text = gr.TextArea( label="Strategy Logic", placeholder="Enter your trading strategy logic here...", value=strategy_params.get("strategy_text", ""), lines=10, max_lines=20 ) # Connect checkbox to show/hide controls def toggle_edit_mode(is_editing): return { edit_strategy_dropdown: gr.update(visible=is_editing), } is_editing.change( toggle_edit_mode, inputs=[is_editing], outputs=[edit_strategy_dropdown] ) # Function to load strategy into editor def load_strategy_to_editor(strategy_dropdown_value): if not strategy_dropdown_value: return "Please select a strategy to edit", "", "", "", 0 try: strategy_id = int(strategy_dropdown_value) strategy = get_strategy_by_id(strategy_id) if not strategy: return "Strategy not found", "", "", "", 0 return "Strategy loaded for editing", strategy["name"], strategy["description"], strategy["strategy_text"], strategy_id except Exception as e: return f"Error loading strategy: {e}", "", "", "", 0 # Connect edit dropdown to load function edit_strategy_dropdown.change( load_strategy_to_editor, inputs=[edit_strategy_dropdown], outputs=[strategy_action_message, strategy_name, strategy_description, strategy_text, edit_id] ) # Save button and status save_btn = gr.Button("Save Strategy", variant="primary") save_status = gr.Textbox(label="Save Status", interactive=False) # Enhanced save function def save_strategy_enhanced(is_editing, edit_id, name, description, text): try: if not name or not text: return "Error: Strategy name and logic are required", get_strategy_dropdown_choices(), get_strategy_dropdown_choices() # Get current strategy parameters parameters = strategy_params.copy() parameters["strategy_text"] = text conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() strategy_id = int(edit_id) if is_editing and edit_id > 0 else 0 if is_editing and strategy_id > 0: # Update existing strategy cursor.execute( "UPDATE strategies SET name = ?, description = ?, strategy_text = ?, parameters = ? WHERE id = ?", (name, description, text, json.dumps(parameters), strategy_id) ) message = f"Strategy '{name}' updated successfully" else: # Create new strategy cursor.execute( "INSERT INTO strategies (name, description, strategy_text, parameters) VALUES (?, ?, ?, ?)", (name, description, text, json.dumps(parameters)) ) message = f"Strategy '{name}' saved successfully" conn.commit() conn.close() # Update dropdown choices new_choices = get_strategy_dropdown_choices() return message, new_choices, new_choices except Exception as e: return f"Error saving strategy: {e}", get_strategy_dropdown_choices(), get_strategy_dropdown_choices() # Connect save button save_btn.click( save_strategy_enhanced, inputs=[is_editing, edit_id, strategy_name, strategy_description, strategy_text], outputs=[save_status, strategy_dropdown, edit_strategy_dropdown] ) # Clear form button clear_btn = gr.Button("Clear Form") def clear_form(): return "", "", "", False, 0 clear_btn.click( clear_form, outputs=[strategy_name, strategy_description, strategy_text, is_editing, edit_id] ) gr.Markdown("---") gr.Markdown("## Execute Analysis") gr.Markdown("First select a strategy above, then run the analysis pipeline.") # Check if strategy is loaded - improved formatting def check_strategy_loaded(): if not strategy_params.get("strategy_text"): return ( "⚠️ No strategy selected! Please load a strategy first.", gr.update(interactive=False) ) else: # Format the strategy preview in a more structured way strategy_text = strategy_params.get("strategy_text", "") # Properly format the strategy preview strategy_preview = f""" ## Current Strategy ``` {strategy_text} ``` **Parameters**: - Timeframe: {strategy_params.get('timeframe_minutes', 'N/A')} minutes - Max Allocation: {strategy_params.get('max_allocation_percentage', 'N/A')}% """ return ( strategy_preview, gr.update(interactive=True) ) # Display current strategy and run button current_strategy_info = gr.Markdown("No strategy loaded") run_pipeline_btn = gr.Button("Run Analysis Pipeline", variant="primary", interactive=False) # Add a refresh button to check if strategy is loaded check_strategy_btn = gr.Button("Check Current Strategy") check_strategy_btn.click( check_strategy_loaded, outputs=[current_strategy_info, run_pipeline_btn] ) # Run button should automatically check if strategy is loaded analysis_result = gr.Markdown("Analysis results will appear here...") # Function to run full analysis and format detailed output def run_full_analysis_with_check(): try: if not strategy_params.get("strategy_text"): return "Error: No strategy selected. Please load a strategy before running analysis." result = run_full_analysis() if "error" in result: return f"Analysis failed: {result['error']}" return format_detailed_analysis_results(result) except Exception as e: return f"Error running analysis: {str(e)}" # Connect the run button run_pipeline_btn.click( run_full_analysis_with_check, outputs=[analysis_result] ) # Trading Tab with gr.TabItem("Trading"): gr.Markdown("## Account Summary") account_summary = gr.Markdown("") with gr.Row(): refresh_account_btn = gr.Button("Refresh Account Info") reset_portfolio_btn = gr.Button("Reset Portfolio") reset_portfolio_message = gr.Textbox(label="Reset Portfolio Status") refresh_account_btn.click(get_account_summary, outputs=account_summary) reset_portfolio_btn.click(reset_portfolio, outputs=reset_portfolio_message) gr.Markdown("## Manual Trading") with gr.Row(): action = gr.Dropdown(["buy", "sell", "check"], label="Action") symbol = gr.Dropdown(["BTC/USD", "ETH/USD"], label="Symbol", value="BTC/USD") allocation = gr.Slider(minimum=1, maximum=100, value=10, label="Allocation Percentage") execute_btn = gr.Button("Execute Trade") trade_result = gr.Textbox(label="Trade Result") execute_btn.click(execute_trade, inputs=[action, symbol, allocation], outputs=trade_result) # Monitoring Tab with gr.TabItem("Monitoring"): with gr.Tabs(): with gr.TabItem("Analysis Results"): gr.Markdown("## Latest Analysis Results") analysis_history = gr.Dataframe( headers=["ID", "Timestamp", "Strategy", "Signal", "Confidence", "Allocation"], datatype=["number", "str", "str", "str", "number", "number"], label="Recent Analysis Results" ) def get_analysis_history(): results = get_recent_analysis_results(10) return [ [r["id"], r["timestamp"], r["strategy_name"], r["signal"].upper(), r["confidence"], r["allocation_percentage"]] for r in results ] refresh_analysis_btn = gr.Button("Refresh Results") refresh_analysis_btn.click(get_analysis_history, outputs=analysis_history) selected_analysis_id = gr.Number(label="Result ID to View", value=0) view_analysis_btn = gr.Button("View Detailed Analysis") detailed_analysis = gr.Markdown("Select an analysis result and click 'View Detailed Analysis'") def view_detailed_analysis(analysis_id): if analysis_id <= 0: return "Please select a valid analysis result ID" try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Get the full analysis result cursor.execute(""" SELECT ar.*, s.name as strategy_name FROM analysis_results ar LEFT JOIN strategies s ON ar.strategy_id = s.id WHERE ar.id = ? """, (int(analysis_id),)) row = cursor.fetchone() conn.close() if not row: return "Analysis result not found" # Convert row to dict column_names = [description[0] for description in cursor.description] result = {column_names[i]: row[i] for i in range(len(column_names))} # Parse JSON fields if "indicator_values" in result and result["indicator_values"]: try: result["technical_indicators"] = json.loads(result["indicator_values"]) except json.JSONDecodeError as e: result["technical_indicators"] = {"error": f"Failed to parse indicators: {str(e)}"} # Format the detailed result return format_detailed_analysis_results(result) except Exception as e: import traceback return f"Error retrieving analysis details: {str(e)}\n{traceback.format_exc()}" view_analysis_btn.click(view_detailed_analysis, inputs=[selected_analysis_id], outputs=detailed_analysis) with gr.TabItem("Positions"): gr.Markdown("## Active Positions") positions_display = gr.Markdown("") refresh_positions_btn = gr.Button("Refresh Positions") refresh_positions_btn.click(format_active_positions, outputs=positions_display) with gr.TabItem("Transactions"): gr.Markdown("## Transaction History") transactions_history = gr.Dataframe( headers=["ID", "Date", "Symbol", "Action", "Quantity", "Price", "Status"], datatype=["number", "str", "str", "str", "number", "number", "str"], label="Recent Transactions" ) def get_transactions_history(): txs = get_recent_transactions(20) return [ [tx["id"], tx["timestamp"], tx["symbol"], tx["action"].upper(), float(tx["quantity"]), float(tx["price"]), tx["status"]] for tx in txs ] refresh_tx_btn = gr.Button("Refresh Transactions") refresh_tx_btn.click(get_transactions_history, outputs=transactions_history) orders_display = gr.Markdown("") refresh_orders_btn = gr.Button("Show Full Transaction Details") refresh_orders_btn.click(format_order_history, outputs=orders_display) # Automated Analysis Tab with gr.TabItem("Automated Analysis"): gr.Markdown("## Background Analysis") gr.Markdown(""" ### Full Crew Automated Analysis The automated analysis will run the complete Bitcoin Analysis Crew at intervals defined by the timeframe setting. This includes: - Technical analysis - Market sentiment analysis - Order execution **All trades and analysis results will be logged** to a file in the data directory for later review. """) gr.Markdown(f"Current timeframe: **{strategy_params['timeframe_minutes']} minutes**") with gr.Row(): start_btn = gr.Button("Start Automated Analysis", variant="primary") stop_btn = gr.Button("Stop Automated Analysis", variant="stop") auto_status = gr.Textbox(label="Automation Status") start_btn.click(start_background_process, outputs=auto_status) stop_btn.click(stop_background_process, outputs=auto_status) # Add log path information log_path_info = gr.Markdown(f""" **Log File Location** Trading logs will be saved to: ``` {os.path.dirname(DB_PATH)} ``` A new log file is created for each trading session with format: `auto_trading_log_YYYYMMDD_HHMMSS.txt` These logs contain: - All analysis results - Trade executions - Account balances - Profit/loss tracking - Errors and warnings """) auto_result = gr.Markdown("Automated analysis results will appear here...") # Refresh button with detailed results auto_refresh = gr.Button("Refresh Latest Results") # Update to use detailed formatting auto_refresh.click(format_detailed_analysis_results, outputs=auto_result) # Add warning about strategy selection gr.Markdown(""" **IMPORTANT**: Make sure you have loaded a strategy from the Strategy Configuration tab before starting automated analysis. The system will use the currently loaded strategy for all automated runs. """) # Add instructions for running for days gr.Markdown(""" ### Running for Extended Periods To run this application continuously for days: 1. Start the automated analysis with your chosen strategy and timeframe 2. Keep this application running (do not close the browser tab or terminal) 3. If running on a remote server, use tools like `screen` or `tmux` to keep the session alive **Example terminal command for persistent session**: ``` # Start a new screen session screen -S cryptic_trading # Then run your application in that session python src/ui/app.py # You can detach from the session with Ctrl+A, D and reconnect later with: screen -r cryptic_trading ``` """) # Launch the app if __name__ == "__main__": # Ensure directories exist os.makedirs("results", exist_ok=True) # Run initial data fetching fetch_account_info() fetch_order_history() fetch_active_positions() # Launch the app app.launch(share=False)