cryptic / src /ui /app.py
vlbandara's picture
Upload folder using huggingface_hub
eb27803 verified
import os
import sys
import time
import json
import gradio as gr
import threading
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import requests
import sqlite3
from typing import Dict, Any, List, Optional
import openai
# Add project root to path for imports
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
# Import necessary modules
from src.crypto_analysis.tools.technical_tools import TechnicalAnalysisStrategy, IndicatorCalculator
from src.crypto_analysis.tools.order_tools import AlpacaCryptoOrderTool
from src.crypto_analysis.tools.bitcoin_tools import YahooBitcoinDataTool
from src.crypto_analysis.tools.yahoo_tools import YahooCryptoMarketTool
from src.crypto_analysis.crew import BitcoinAnalysisCrew
# Load environment variables
from dotenv import load_dotenv
load_dotenv()
# Initialize the database
DB_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../data/cryptic.db'))
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
def init_database():
"""Initialize the SQLite database with necessary tables"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Create strategies table
cursor.execute('''
CREATE TABLE IF NOT EXISTS strategies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
description TEXT,
strategy_text TEXT NOT NULL,
parameters TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create transactions table
cursor.execute('''
CREATE TABLE IF NOT EXISTS transactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
action TEXT NOT NULL,
quantity REAL NOT NULL,
price REAL NOT NULL,
status TEXT NOT NULL,
allocation_percentage INTEGER,
order_id TEXT,
strategy_id INTEGER,
reasoning TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (strategy_id) REFERENCES strategies (id)
)
''')
# Create analysis_results table
cursor.execute('''
CREATE TABLE IF NOT EXISTS analysis_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
signal TEXT NOT NULL,
confidence INTEGER,
allocation_percentage INTEGER,
reasoning TEXT,
indicator_values TEXT,
strategy_id INTEGER,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (strategy_id) REFERENCES strategies (id)
)
''')
conn.commit()
conn.close()
# Initialize the database when module is loaded
init_database()
# Global variables for strategy parameters
strategy_params = {
"timeframe_minutes": 60,
"max_allocation_percentage": 50,
"strategy_text": None
}
# Store analysis results and orders
analysis_results = []
orders_history = []
active_trades = []
# Flag to control the background thread
running = False
background_thread = None
# Function to get available strategies from the database
def get_saved_strategies():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT id, name, description FROM strategies")
strategies = [{"id": row[0], "name": row[1], "description": row[2]} for row in cursor.fetchall()]
conn.close()
return strategies
# Function to get a specific strategy by ID
def get_strategy_by_id(strategy_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT id, name, description, strategy_text, parameters FROM strategies WHERE id = ?", (strategy_id,))
row = cursor.fetchone()
conn.close()
if row:
return {
"id": row[0],
"name": row[1],
"description": row[2],
"strategy_text": row[3],
"parameters": json.loads(row[4])
}
return None
# Function to save a strategy to the database
def save_strategy(name, description, strategy_text, parameters):
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Check if strategy name already exists
cursor.execute("SELECT id FROM strategies WHERE name = ?", (name,))
existing = cursor.fetchone()
if existing:
# Update existing strategy
cursor.execute(
"UPDATE strategies SET description = ?, strategy_text = ?, parameters = ? WHERE name = ?",
(description, strategy_text, json.dumps(parameters), name)
)
else:
# Insert new strategy
cursor.execute(
"INSERT INTO strategies (name, description, strategy_text, parameters) VALUES (?, ?, ?, ?)",
(name, strategy_text, strategy_text, json.dumps(parameters))
)
conn.commit()
conn.close()
return True, "Strategy saved successfully"
except Exception as e:
return False, f"Error saving strategy: {str(e)}"
# Function to save analysis result to the database
def save_analysis_result(result, strategy_id=None):
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Extract values from result
signal = result.get("signal", "unknown")
confidence = result.get("confidence", 0)
allocation_percentage = result.get("allocation_percentage", 0)
reasoning = result.get("reasoning", "")
# Convert technical indicators to JSON string
indicator_values = json.dumps(result.get("technical_indicators", {}))
# Insert into database
cursor.execute(
"INSERT INTO analysis_results (signal, confidence, allocation_percentage, reasoning, indicator_values, strategy_id) VALUES (?, ?, ?, ?, ?, ?)",
(signal, confidence, allocation_percentage, reasoning, indicator_values, strategy_id)
)
conn.commit()
conn.close()
return True
except Exception as e:
print(f"Error saving analysis result: {e}")
return False
# Function to save transaction to the database
def save_transaction(order_data, strategy_id=None):
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Extract values from order data
symbol = order_data.get("symbol", "BTC/USD")
action = order_data.get("action", "unknown")
quantity = order_data.get("quantity", 0)
price = order_data.get("price", 0)
status = order_data.get("status", "unknown")
allocation_percentage = order_data.get("allocation_percentage", 0)
order_id = order_data.get("order_id", "")
reasoning = order_data.get("reasoning", "")
# Insert into database
cursor.execute(
"INSERT INTO transactions (symbol, action, quantity, price, status, allocation_percentage, order_id, strategy_id, reasoning) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(symbol, action, quantity, price, status, allocation_percentage, order_id, strategy_id, reasoning)
)
conn.commit()
conn.close()
return True
except Exception as e:
print(f"Error saving transaction: {e}")
return False
# Function to get recent analysis results from the database
def get_recent_analysis_results(limit=10):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT ar.id, ar.signal, ar.confidence, ar.allocation_percentage, ar.reasoning,
ar.timestamp, s.name as strategy_name
FROM analysis_results ar
LEFT JOIN strategies s ON ar.strategy_id = s.id
ORDER BY ar.timestamp DESC
LIMIT ?
""", (limit,))
results = []
for row in cursor.fetchall():
results.append({
"id": row[0],
"signal": row[1],
"confidence": row[2],
"allocation_percentage": row[3],
"reasoning": row[4],
"timestamp": row[5],
"strategy_name": row[6] or "Default Strategy"
})
conn.close()
return results
# Function to get recent transactions from the database
def get_recent_transactions(limit=10):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT t.id, t.symbol, t.action, t.quantity, t.price, t.status,
t.allocation_percentage, t.timestamp, s.name as strategy_name
FROM transactions t
LEFT JOIN strategies s ON t.strategy_id = s.id
ORDER BY t.timestamp DESC
LIMIT ?
""", (limit,))
transactions = []
for row in cursor.fetchall():
transactions.append({
"id": row[0],
"symbol": row[1],
"action": row[2],
"quantity": row[3],
"price": row[4],
"status": row[5],
"allocation_percentage": row[6],
"timestamp": row[7],
"strategy_name": row[8] or "Default Strategy"
})
conn.close()
return transactions
# Function to fetch account information
def fetch_account_info():
try:
order_tool = AlpacaCryptoOrderTool()
account_data = order_tool._check_account()
return account_data
except Exception as e:
print(f"Error fetching account info: {e}")
return {"error": str(e), "cash": 0, "equity": 0}
# Function to reset portfolio (close all positions)
def reset_portfolio():
try:
# Create a session with authentication
api_key = os.getenv("ALPACA_API_KEY")
api_secret = os.getenv("ALPACA_API_SECRET")
headers = {
"APCA-API-KEY-ID": api_key,
"APCA-API-SECRET-KEY": api_secret
}
# Use paper trading URL
base_url = "https://paper-api.alpaca.markets"
# 1. First cancel all open orders
response = requests.delete(f"{base_url}/v2/orders", headers=headers)
if response.status_code != 204 and response.status_code != 200:
print(f"Error canceling orders: {response.status_code}, {response.text}")
return f"Error canceling orders: {response.status_code}"
# 2. Then close all positions
response = requests.delete(f"{base_url}/v2/positions", headers=headers)
if response.status_code != 204 and response.status_code != 200:
print(f"Error closing positions: {response.status_code}, {response.text}")
return f"Error closing positions: {response.status_code}"
return "Portfolio reset successfully. All positions closed and orders canceled."
except Exception as e:
print(f"Error resetting portfolio: {e}")
return f"Error resetting portfolio: {str(e)}"
# Function to fetch order history
def fetch_order_history():
try:
# Create a session with authentication
api_key = os.getenv("ALPACA_API_KEY")
api_secret = os.getenv("ALPACA_API_SECRET")
headers = {
"APCA-API-KEY-ID": api_key,
"APCA-API-SECRET-KEY": api_secret
}
# Use paper trading URL
base_url = "https://paper-api.alpaca.markets"
# Fetch orders
response = requests.get(f"{base_url}/v2/orders?status=all&limit=100", headers=headers)
if response.status_code == 200:
orders = response.json()
# Filter for BTC orders and format them
btc_orders = [order for order in orders if "BTC" in order.get("symbol", "")]
formatted_orders = []
for order in btc_orders:
formatted_orders.append({
"id": order["id"],
"symbol": order["symbol"],
"side": order["side"],
"type": order["type"],
"qty": order["qty"],
"status": order["status"],
"created_at": order["created_at"],
"filled_at": order.get("filled_at", "N/A"),
"filled_qty": order.get("filled_qty", "0"),
"filled_avg_price": order.get("filled_avg_price", "0")
})
return formatted_orders
else:
print(f"Error fetching orders: {response.status_code}, {response.text}")
return []
except Exception as e:
print(f"Error in fetch_order_history: {e}")
return []
# Function to fetch active positions
def fetch_active_positions():
try:
api_key = os.getenv("ALPACA_API_KEY")
api_secret = os.getenv("ALPACA_API_SECRET")
headers = {
"APCA-API-KEY-ID": api_key,
"APCA-API-SECRET-KEY": api_secret
}
base_url = "https://paper-api.alpaca.markets"
# Fetch positions
response = requests.get(f"{base_url}/v2/positions", headers=headers)
if response.status_code == 200:
positions = response.json()
# Filter for BTC positions
btc_positions = [pos for pos in positions if "BTC" in pos.get("symbol", "")]
formatted_positions = []
for pos in btc_positions:
# Calculate profit/loss
current_price = float(pos.get("current_price", 0))
avg_entry_price = float(pos.get("avg_entry_price", 0))
qty = float(pos.get("qty", 0))
profit_loss = (current_price - avg_entry_price) * qty
profit_loss_percent = ((current_price / avg_entry_price) - 1) * 100 if avg_entry_price > 0 else 0
formatted_positions.append({
"symbol": pos["symbol"],
"qty": pos["qty"],
"avg_entry_price": pos["avg_entry_price"],
"current_price": pos["current_price"],
"profit_loss": round(profit_loss, 2),
"profit_loss_percent": round(profit_loss_percent, 2),
"market_value": pos["market_value"],
"side": pos["side"]
})
return formatted_positions
else:
print(f"Error fetching positions: {response.status_code}, {response.text}")
return []
except Exception as e:
print(f"Error in fetch_active_positions: {e}")
return []
# Function to run technical analysis with just the TA agent
def run_ta_agent_only(strategy_text=None):
try:
tech_strategy = TechnicalAnalysisStrategy()
# Use the provided strategy text or the global one
strategy_text = strategy_text or strategy_params.get("strategy_text")
if not strategy_text:
return {
"error": "No strategy text provided",
"signal": "hold",
"confidence": 0,
"allocation_percentage": 0,
"reasoning": "Please enter a strategy description first"
}
# Get the indicator data from the tool
indicator_data = tech_strategy._run()
if "error" in indicator_data:
return {
"error": indicator_data["error"],
"signal": "hold",
"confidence": 0,
"allocation_percentage": 0,
"reasoning": f"Error fetching indicator data: {indicator_data['error']}"
}
# Use OpenAI to interpret the strategy based on the indicator data
signal_data = interpret_strategy_with_llm(indicator_data, strategy_text, strategy_params["max_allocation_percentage"])
# Add timestamp
signal_data["timestamp"] = datetime.now().isoformat()
# Add to analysis results
analysis_results.append(signal_data)
# Save to database
save_analysis_result(signal_data)
return signal_data
except Exception as e:
print(f"Error running TA agent: {e}")
return {"error": str(e)}
# New function to interpret strategy with LLM
def interpret_strategy_with_llm(indicator_data, strategy_text, max_allocation_percentage=50):
try:
# Create the system prompt for the LLM
system_prompt = """
You are a cryptocurrency trading strategy interpreter. Your task is to analyze the provided technical
indicators and price data, then interpret the user's strategy to generate a trading signal.
You must return a JSON object with the following fields:
- signal: "buy", "sell", or "hold"
- confidence: Integer between 0-95 (how confident you are in the signal)
- allocation_percentage: Integer between 0-{max_allocation} (how much of the portfolio to allocate)
- reasoning: String explanation of your decision process
Be pragmatic and conservative. Only give buy/sell signals when the conditions are clearly met.
Base your decision on the indicator values, not on general market sentiment or news.
"""
# Prepare price data - might need to be retrieved from indicator_data
price = indicator_data.get("price", 0)
# Create the user prompt with all the data
user_prompt = f"""
# Technical Indicators
{json.dumps(indicator_data, indent=2)}
# Strategy Description
{strategy_text}
Analyze the above data according to the strategy description and generate a trading signal.
Respond only with JSON. Maximum allocation is {max_allocation_percentage}%.
"""
# Make the call to OpenAI
response = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": system_prompt.format(max_allocation=max_allocation_percentage)},
{"role": "user", "content": user_prompt}
],
temperature=0.2,
response_format={"type": "json_object"}
)
# Extract and parse the response content
response_content = response.choices[0].message.content
result = json.loads(response_content)
# Ensure the result has the expected fields
if not all(k in result for k in ["signal", "confidence", "allocation_percentage", "reasoning"]):
missing = [k for k in ["signal", "confidence", "allocation_percentage", "reasoning"] if k not in result]
print(f"LLM response missing required fields: {missing}")
result = {
"signal": result.get("signal", "hold"),
"confidence": result.get("confidence", 50),
"allocation_percentage": result.get("allocation_percentage", 0),
"reasoning": result.get("reasoning", "No reasoning provided.")
}
# Ensure values are within expected ranges
result["confidence"] = max(0, min(95, int(result["confidence"])))
result["allocation_percentage"] = max(0, min(max_allocation_percentage, int(result["allocation_percentage"])))
# Add indicator values to the result
result["technical_indicators"] = indicator_data
print(f"LLM generated signal: {result['signal']} with confidence {result['confidence']}%")
return result
except Exception as e:
print(f"Error interpreting strategy with LLM: {e}")
import traceback
traceback.print_exc()
return {
"signal": "hold",
"confidence": 0,
"allocation_percentage": 0,
"reasoning": f"Error interpreting strategy with LLM: {str(e)}"
}
# Function to run the full analysis using the crew
def run_full_analysis():
try:
# Get the strategy text from global parameters
strategy_text = strategy_params.get("strategy_text")
if not strategy_text:
return {
"error": "No strategy text provided",
"signal": "hold",
"confidence": 0,
"allocation_percentage": 0,
"reasoning": "Please enter a strategy description first"
}
crew = BitcoinAnalysisCrew()
result = crew.run_analysis(strategy_text=strategy_text)
# Add timestamp
result["timestamp"] = datetime.now().isoformat()
# Add to analysis results
analysis_results.append(result)
# Save to database
save_analysis_result(result)
# Check if order was executed, if so add to orders
if "order_execution" in result and isinstance(result["order_execution"], dict):
if result["order_execution"].get("success", False):
orders_history.append(result["order_execution"])
save_transaction(result["order_execution"])
return result
except Exception as e:
print(f"Error running crew analysis: {e}")
return {"error": str(e)}
# Background process function
def background_process():
global running
# Create a log file for the automated trading session
log_file = os.path.join(os.path.dirname(DB_PATH), f"auto_trading_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt")
with open(log_file, 'w') as f:
f.write(f"Automated trading session started at {datetime.now().isoformat()}\n")
f.write(f"Strategy parameters: {json.dumps(strategy_params)}\n\n")
error_count = 0
max_errors = 5
while running:
try:
print(f"Running full crew analysis at {datetime.now().isoformat()} with params: {strategy_params}")
# Log to file as well
with open(log_file, 'a') as f:
f.write(f"\n--- Analysis run at {datetime.now().isoformat()} ---\n")
# Check if strategy text is available
if not strategy_params.get("strategy_text"):
print("No strategy text available for automated analysis. Skipping.")
with open(log_file, 'a') as f:
f.write("No strategy text available for automated analysis. Skipping.\n")
time.sleep(60) # Sleep for a minute and check again
continue
# Run the full crew analysis instead of just TA agent
result = run_full_analysis()
print(f"Analysis result: {result.get('signal', 'unknown')} with confidence {result.get('confidence', 0)}%")
# Log the result
with open(log_file, 'a') as f:
f.write(f"Signal: {result.get('signal', 'unknown')}, Confidence: {result.get('confidence', 0)}%, Allocation: {result.get('allocation_percentage', 0)}%\n")
f.write(f"Reasoning: {result.get('reasoning', 'No reasoning provided')}\n")
# Log order execution details if any
if "order_execution" in result and result["order_execution"]:
order_details = result["order_execution"]
if isinstance(order_details, dict):
f.write(f"Order executed: {order_details.get('action', 'unknown')} {order_details.get('quantity', 0)} BTC at ${order_details.get('price', 0)}\n")
else:
f.write(f"Order info: {str(order_details)}\n")
# Update the active trades
positions = fetch_active_positions()
global active_trades
active_trades = positions
# Log current positions
with open(log_file, 'a') as f:
f.write("\nCurrent Positions:\n")
if positions:
for pos in positions:
f.write(f" {pos['symbol']}: {pos['qty']} @ ${pos['avg_entry_price']} - P/L: ${pos['profit_loss']} ({pos['profit_loss_percent']}%)\n")
else:
f.write(" No active positions\n")
# Log account info
account = fetch_account_info()
f.write(f"\nAccount Balance: ${account.get('cash', 'N/A')}, Equity: ${account.get('equity', 'N/A')}\n")
# Reset error counter on successful run
error_count = 0
# Sleep for the specified timeframe interval
interval_seconds = strategy_params["timeframe_minutes"] * 60
print(f"Sleeping for {interval_seconds} seconds ({strategy_params['timeframe_minutes']} minutes)")
# Sleep in smaller increments to allow for quicker stopping
for _ in range(min(interval_seconds, 3600), 0, -10): # Sleep in 10-second increments
if not running:
break
time.sleep(10)
except Exception as e:
error_count += 1
error_message = f"Error in background process: {str(e)}"
print(error_message)
import traceback
trace = traceback.format_exc()
print(trace)
# Log error
with open(log_file, 'a') as f:
f.write(f"\nERROR: {error_message}\n")
f.write(trace + "\n")
# If too many consecutive errors, pause for a longer time
if error_count >= max_errors:
print(f"Too many errors ({error_count}). Pausing for 30 minutes.")
with open(log_file, 'a') as f:
f.write(f"Too many errors ({error_count}). Pausing for 30 minutes.\n")
# Sleep but still check for stop signal
for _ in range(1800, 0, -10):
if not running:
break
time.sleep(10)
# Reset error count after pause
error_count = 0
else:
# Short pause before retrying
time.sleep(60)
# Function to start the background process
def start_background_process():
global running, background_thread
if not running:
running = True
background_thread = threading.Thread(target=background_process)
background_thread.daemon = True
background_thread.start()
return f"Background analysis started. Running full crew analysis every {strategy_params['timeframe_minutes']} minutes."
else:
return "Background analysis is already running."
# Function to stop the background process
def stop_background_process():
global running
if running:
running = False
return "Background analysis stopped."
else:
return "Background analysis is not running."
# Function to update strategy parameters
def update_strategy(timeframe, max_allocation):
global strategy_params
try:
# Convert values to appropriate types
timeframe = int(timeframe)
max_allocation = int(max_allocation)
# Update the global dictionary
strategy_params["timeframe_minutes"] = timeframe
strategy_params["max_allocation_percentage"] = max_allocation
print(f"Updated strategy parameters: {strategy_params}")
return f"Strategy parameters updated: Timeframe: {timeframe} minutes, Max allocation: {max_allocation}%"
except Exception as e:
return f"Error updating strategy parameters: {e}"
# Function to update custom strategy text
def update_strategy_text(strategy_text):
global strategy_params
try:
strategy_params["strategy_text"] = strategy_text
print(f"Updated strategy text: {strategy_text[:100]}...")
return "Strategy text updated successfully"
except Exception as e:
return f"Error updating strategy text: {e}"
# Function to save current strategy
def save_current_strategy(name, description):
try:
# Get current strategy parameters
parameters = strategy_params.copy()
strategy_text = parameters.get("strategy_text", "")
# If strategy text is empty, provide a default description
if not strategy_text:
strategy_text = f"Default RSI ({parameters['rsi_lower_threshold']}-{parameters['rsi_upper_threshold']}) and Bollinger Bands strategy"
# Save to database
success, message = save_strategy(name, description, strategy_text, parameters)
if success:
return f"Strategy '{name}' saved successfully"
else:
return message
except Exception as e:
return f"Error saving strategy: {e}"
# Function to run a manual trade
def execute_trade(action, symbol, allocation_pct):
try:
order_tool = AlpacaCryptoOrderTool()
result = order_tool._run(
action=action,
symbol=symbol,
allocation_percentage=int(allocation_pct)
)
if result.get("success", False):
orders_history.append(result)
# Save to database
save_transaction(result)
return f"Trade executed: {action.upper()} {symbol} with {allocation_pct}% allocation"
else:
return f"Trade failed: {result.get('error', 'Unknown error')}"
except Exception as e:
return f"Error executing trade: {e}"
# Function to get account summary for display
def get_account_summary():
account = fetch_account_info()
positions = fetch_active_positions()
cash = account.get("cash", "0")
equity = account.get("equity", "0")
total_positions = len(positions)
total_value = sum(float(pos.get("market_value", 0)) for pos in positions)
total_pl = sum(pos.get("profit_loss", 0) for pos in positions)
return f"""
**Account Summary**
- Cash: ${cash}
- Equity: ${equity}
- Active Positions: {total_positions}
- Positions Value: ${total_value:.2f}
- Total P/L: ${total_pl:.2f}
"""
# Function to format analysis results for display
def format_analysis_results():
# First try to get from database
db_results = get_recent_analysis_results(1)
if db_results:
latest = db_results[0]
timestamp = latest.get("timestamp", datetime.now().isoformat())
signal = latest.get("signal", "unknown").upper()
confidence = latest.get("confidence", 0)
allocation = latest.get("allocation_percentage", 0)
reasoning = latest.get("reasoning", "No reasoning provided.")
strategy_name = latest.get("strategy_name", "Default Strategy")
return f"""
**Latest Analysis ({timestamp})**
Strategy: {strategy_name}
Signal: {signal}
Confidence: {confidence}%
Allocation: {allocation}%
Reasoning:
{reasoning}
"""
# Fallback to memory if database is empty
if not analysis_results:
return "No analysis results available."
latest = analysis_results[-1]
timestamp = latest.get("timestamp", datetime.now().isoformat())
signal = latest.get("signal", "unknown").upper()
confidence = latest.get("confidence", 0)
allocation = latest.get("allocation_percentage", 0)
reasoning = latest.get("reasoning", "No reasoning provided.")
return f"""
**Latest Analysis ({timestamp})**
Signal: {signal}
Confidence: {confidence}%
Allocation: {allocation}%
Reasoning:
{reasoning}
"""
# Function to format active positions for display
def format_active_positions():
positions = fetch_active_positions()
if not positions:
return "No active positions."
result = "## Active Positions\n\n"
for pos in positions:
result += f"""
**{pos['symbol']}**
Quantity: {pos['qty']} BTC
Entry: ${pos['avg_entry_price']}
Current: ${pos['current_price']}
P/L: ${pos['profit_loss']} ({pos['profit_loss_percent']}%)
Value: ${pos['market_value']}
"""
return result
# Function to format order history for display
def format_order_history():
# Try to get from database first
db_transactions = get_recent_transactions(10)
if db_transactions:
result = "## Recent Transactions\n\n"
for tx in db_transactions:
result += f"""
**{tx['symbol']} {tx['action'].upper()}**
Quantity: {tx['quantity']}
Price: ${tx['price']}
Status: {tx['status']}
Allocation: {tx['allocation_percentage']}%
Date: {tx['timestamp']}
Strategy: {tx['strategy_name']}
"""
return result
# Fall back to API if database is empty
orders = fetch_order_history()
if not orders:
return "No order history."
result = "## Order History (Last 10)\n\n"
for order in orders[:10]:
result += f"""
**{order['symbol']} {order['side'].upper()}**
Quantity: {order['qty']}
Type: {order['type']}
Status: {order['status']}
Created: {order['created_at']}
Filled: {order.get('filled_at', 'N/A')}
Filled Price: ${order.get('filled_avg_price', 'N/A')}
"""
return result
# Function to get available indicators for display
def get_available_indicators():
indicators = IndicatorCalculator.get_available_indicators()
result = "## Available Indicators\n\n"
for name, description in indicators.items():
result += f"**{name}**: {description}\n\n"
return result
# Function to format detailed analysis results for display
def format_detailed_analysis_results(result=None):
if result is None:
# First try to get from database
db_results = get_recent_analysis_results(1)
if db_results:
result = db_results[0]
elif analysis_results:
result = analysis_results[-1]
else:
return "No analysis results available."
# Basic information
timestamp = result.get("timestamp", datetime.now().isoformat())
signal = result.get("signal", "unknown").upper()
confidence = result.get("confidence", 0)
allocation = result.get("allocation_percentage", 0)
reasoning = result.get("reasoning", "No reasoning provided.")
strategy_name = result.get("strategy_name", "Default Strategy")
# Detailed sections
sections = []
# Add header
sections.append(f"## Analysis Results ({timestamp})")
sections.append(f"### Strategy: {strategy_name}")
sections.append(f"### Signal: {signal} | Confidence: {confidence}% | Allocation: {allocation}%")
# Technical indicator values (if available)
if "technical_indicators" in result:
sections.append("### Technical Indicators")
indicators = result["technical_indicators"]
indicators_text = []
# Display price first
if "price" in indicators:
indicators_text.append(f"- **Price**: ${indicators['price']:.2f}")
# Display RSI values
rsi_indicators = {k: v for k, v in indicators.items() if "rsi" in k.lower() and v is not None}
if rsi_indicators:
indicators_text.append("- **RSI**:")
for k, v in rsi_indicators.items():
indicators_text.append(f" - {k}: {v:.2f}")
# Display Bollinger Bands
bb_indicators = {k: v for k, v in indicators.items() if "bb_" in k.lower() and v is not None}
if bb_indicators:
indicators_text.append("- **Bollinger Bands**:")
for k, v in bb_indicators.items():
indicators_text.append(f" - {k}: {v:.2f}")
# Display MACD
macd_indicators = {k: v for k, v in indicators.items() if "macd" in k.lower() and v is not None}
if macd_indicators:
indicators_text.append("- **MACD**:")
for k, v in macd_indicators.items():
indicators_text.append(f" - {k}: {v:.2f}")
# Display other indicators
other_indicators = {k: v for k, v in indicators.items()
if not any(x in k.lower() for x in ["rsi", "bb_", "macd", "price"])
and v is not None}
if other_indicators:
indicators_text.append("- **Other Indicators**:")
for k, v in other_indicators.items():
indicators_text.append(f" - {k}: {v:.2f}")
sections.append("\n".join(indicators_text))
# Add technical analysis section
if "technical_analysis" in result:
sections.append("### Technical Analysis")
sections.append(f"```\n{result['technical_analysis']}\n```")
# Add initial analysis section
if "initial_analysis" in result:
sections.append("### Market Context Analysis")
sections.append(f"```\n{result['initial_analysis']}\n```")
# Add reflection analysis section
if "reflection_analysis" in result:
sections.append("### Sentiment Analysis")
sections.append(f"```\n{result['reflection_analysis']}\n```")
# Add tool errors section if present
if "tool_error_summary" in result or "tool_error_assessment" in result:
sections.append("### Data Limitations")
if "tool_error_assessment" in result:
sections.append(f"```\n{result['tool_error_assessment']}\n```")
if "tool_error_summary" in result:
sections.append(f"Tool Errors: {result['tool_error_summary']}")
# Add detailed reasoning
sections.append("### Detailed Reasoning")
sections.append(f"```\n{reasoning}\n```")
# Add market outlook if available
if "market_outlook" in result:
sections.append("### Market Outlook")
sections.append(f"```\n{result['market_outlook']}\n```")
# Add risk assessment if available
if "risk_assessment" in result:
sections.append("### Risk Assessment")
sections.append(f"```\n{result['risk_assessment']}\n```")
# Add order execution details if available
if "order_execution" in result:
sections.append("### Trade Execution")
if isinstance(result["order_execution"], dict):
order = result["order_execution"]
order_details = ["- **Status**: Success"]
for k, v in order.items():
if k != "success":
order_details.append(f"- **{k.replace('_', ' ').title()}**: {v}")
sections.append("\n".join(order_details))
else:
sections.append(str(result["order_execution"]))
return "\n\n".join(sections)
# Create the Gradio interface
with gr.Blocks(title="CrypticAI - Bitcoin Trading Dashboard") as app:
gr.Markdown("# CrypticAI - Bitcoin Trading Dashboard")
with gr.Tabs():
# Strategy Tab
with gr.TabItem("Strategy Configuration"):
gr.Markdown("## Strategy Parameters")
with gr.Row():
with gr.Column():
timeframe = gr.Slider(minimum=1, maximum=240, value=strategy_params["timeframe_minutes"], step=1,
label="Timeframe (minutes)")
with gr.Column():
max_allocation = gr.Slider(minimum=5, maximum=100, value=strategy_params["max_allocation_percentage"], step=5,
label="Maximum Allocation (%)")
strategy_update_btn = gr.Button("Update Strategy Parameters")
strategy_message = gr.Textbox(label="Strategy Update Status")
strategy_update_btn.click(update_strategy,
inputs=[timeframe, max_allocation],
outputs=strategy_message)
gr.Markdown("---")
gr.Markdown("## Strategy Management")
# Create a tabbed interface for strategy management
with gr.Tabs():
# Tab 1: Select Existing Strategy
with gr.TabItem("Select Strategy"):
gr.Markdown("### Available Strategies")
# Display saved strategies as a table
saved_strategies = gr.Dataframe(
headers=["ID", "Name", "Description"],
datatype=["number", "str", "str"],
label="Available Strategies"
)
refresh_strategies_btn = gr.Button("Refresh Strategies")
def get_strategies_as_df():
strategies = get_saved_strategies()
if not strategies:
return [[0, "No strategies found", "Create a new strategy first"]]
return [[s["id"], s["name"], s["description"]] for s in strategies]
refresh_strategies_btn.click(get_strategies_as_df, outputs=saved_strategies)
# Create a dropdown for strategy selection instead of number input
gr.Markdown("### Select a Strategy")
def get_strategy_dropdown_choices():
strategies = get_saved_strategies()
if not strategies:
return [("No strategies available", 0)]
return [(f"{s['id']} - {s['name']}", s['id']) for s in strategies]
strategy_dropdown = gr.Dropdown(
choices=get_strategy_dropdown_choices(),
label="Choose Strategy",
value=None
)
# Connect refresh button to also update dropdown
def refresh_all_strategy_displays():
df = get_strategies_as_df()
choices = get_strategy_dropdown_choices()
return df, choices
refresh_strategies_btn.click(
refresh_all_strategy_displays,
outputs=[saved_strategies, strategy_dropdown]
)
# Load, Delete buttons
with gr.Row():
load_btn = gr.Button("Load Selected Strategy", variant="primary")
delete_btn = gr.Button("Delete Selected Strategy", variant="stop")
# Add a status message
strategy_action_message = gr.Textbox(label="Status", interactive=False)
# Add strategy details display section
gr.Markdown("### Selected Strategy Details")
selected_strategy_view = gr.Markdown("No strategy selected")
# Update load strategy function to work with dropdown
def load_selected_strategy(strategy_dropdown_value):
if not strategy_dropdown_value:
return "Please select a strategy first", "No strategy selected"
try:
strategy_id = int(strategy_dropdown_value)
strategy = get_strategy_by_id(strategy_id)
if not strategy:
return "Strategy not found", "Strategy not found"
# Update global parameters
global strategy_params
strategy_params["strategy_text"] = strategy["strategy_text"]
for key, value in strategy["parameters"].items():
if key in strategy_params:
strategy_params[key] = value
# Create a markdown view of the strategy - improved formatting
view_text = f"""
## {strategy['name']}
**Description**: {strategy['description']}
**Strategy Logic**:
```
{strategy['strategy_text']}
```
**Parameters**:
- Timeframe: {strategy["parameters"].get("timeframe_minutes", "N/A")} minutes
- Max Allocation: {strategy["parameters"].get("max_allocation_percentage", "N/A")}%
"""
# Return success message and view
return f"Strategy '{strategy['name']}' loaded successfully", view_text
except Exception as e:
return f"Error loading strategy: {e}", "Error loading strategy"
# Delete strategy function updated for dropdown
def delete_selected_strategy(strategy_dropdown_value):
if not strategy_dropdown_value:
return "Please select a strategy first", get_strategy_dropdown_choices()
try:
strategy_id = int(strategy_dropdown_value)
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Check if strategy exists
cursor.execute("SELECT name FROM strategies WHERE id = ?", (strategy_id,))
result = cursor.fetchone()
if not result:
conn.close()
return "Strategy not found", get_strategy_dropdown_choices()
strategy_name = result[0]
# Delete the strategy
cursor.execute("DELETE FROM strategies WHERE id = ?", (strategy_id,))
conn.commit()
conn.close()
# Get updated choices
new_choices = get_strategy_dropdown_choices()
return f"Strategy '{strategy_name}' deleted successfully", new_choices
except Exception as e:
return f"Error deleting strategy: {e}", get_strategy_dropdown_choices()
# Connect buttons to new functions
load_btn.click(
load_selected_strategy,
inputs=[strategy_dropdown],
outputs=[strategy_action_message, selected_strategy_view]
)
delete_btn.click(
delete_selected_strategy,
inputs=[strategy_dropdown],
outputs=[strategy_action_message, strategy_dropdown]
)
# Tab 2: Create/Edit Strategy
with gr.TabItem("Create/Edit Strategy"):
gr.Markdown("### Strategy Editor")
# Strategy editor section
strategy_name = gr.Textbox(label="Strategy Name", placeholder="Enter a name for your strategy")
strategy_description = gr.Textbox(label="Strategy Description", placeholder="Enter a brief description of what your strategy does")
# Add a checkbox to indicate if editing existing strategy
is_editing = gr.Checkbox(label="Edit Existing Strategy", value=False)
edit_id = gr.Number(label="Strategy ID to Edit", value=0, visible=False)
# Add a dropdown to select strategy for editing
edit_strategy_dropdown = gr.Dropdown(
choices=get_strategy_dropdown_choices(),
label="Choose Strategy to Edit",
visible=False
)
gr.Markdown("### Strategy Logic")
gr.Markdown("""Write your trading strategy using the available indicators. Be specific about entry and exit conditions.
**Example Strategy:**
```
Buy when RSI is below 30 and the price is near the lower Bollinger Band (position < 0.2).
Sell when RSI is above 70 and the price is near the upper Bollinger Band (position > 0.8).
Increase confidence if the ADX shows a strong trend (> 25) in the same direction.
Use 40% of available capital for trades if confidence is high (> 70), otherwise use 20%.
```
The strategy will be interpreted by AI to generate trading signals based on current market conditions.""")
# Available indicators
with gr.Accordion("View Available Indicators", open=False):
indicators_info = gr.Markdown(get_available_indicators())
# Strategy text editor - moved up before being referenced
strategy_text = gr.TextArea(
label="Strategy Logic",
placeholder="Enter your trading strategy logic here...",
value=strategy_params.get("strategy_text", ""),
lines=10,
max_lines=20
)
# Connect checkbox to show/hide controls
def toggle_edit_mode(is_editing):
return {
edit_strategy_dropdown: gr.update(visible=is_editing),
}
is_editing.change(
toggle_edit_mode,
inputs=[is_editing],
outputs=[edit_strategy_dropdown]
)
# Function to load strategy into editor
def load_strategy_to_editor(strategy_dropdown_value):
if not strategy_dropdown_value:
return "Please select a strategy to edit", "", "", "", 0
try:
strategy_id = int(strategy_dropdown_value)
strategy = get_strategy_by_id(strategy_id)
if not strategy:
return "Strategy not found", "", "", "", 0
return "Strategy loaded for editing", strategy["name"], strategy["description"], strategy["strategy_text"], strategy_id
except Exception as e:
return f"Error loading strategy: {e}", "", "", "", 0
# Connect edit dropdown to load function
edit_strategy_dropdown.change(
load_strategy_to_editor,
inputs=[edit_strategy_dropdown],
outputs=[strategy_action_message, strategy_name, strategy_description, strategy_text, edit_id]
)
# Save button and status
save_btn = gr.Button("Save Strategy", variant="primary")
save_status = gr.Textbox(label="Save Status", interactive=False)
# Enhanced save function
def save_strategy_enhanced(is_editing, edit_id, name, description, text):
try:
if not name or not text:
return "Error: Strategy name and logic are required", get_strategy_dropdown_choices(), get_strategy_dropdown_choices()
# Get current strategy parameters
parameters = strategy_params.copy()
parameters["strategy_text"] = text
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
strategy_id = int(edit_id) if is_editing and edit_id > 0 else 0
if is_editing and strategy_id > 0:
# Update existing strategy
cursor.execute(
"UPDATE strategies SET name = ?, description = ?, strategy_text = ?, parameters = ? WHERE id = ?",
(name, description, text, json.dumps(parameters), strategy_id)
)
message = f"Strategy '{name}' updated successfully"
else:
# Create new strategy
cursor.execute(
"INSERT INTO strategies (name, description, strategy_text, parameters) VALUES (?, ?, ?, ?)",
(name, description, text, json.dumps(parameters))
)
message = f"Strategy '{name}' saved successfully"
conn.commit()
conn.close()
# Update dropdown choices
new_choices = get_strategy_dropdown_choices()
return message, new_choices, new_choices
except Exception as e:
return f"Error saving strategy: {e}", get_strategy_dropdown_choices(), get_strategy_dropdown_choices()
# Connect save button
save_btn.click(
save_strategy_enhanced,
inputs=[is_editing, edit_id, strategy_name, strategy_description, strategy_text],
outputs=[save_status, strategy_dropdown, edit_strategy_dropdown]
)
# Clear form button
clear_btn = gr.Button("Clear Form")
def clear_form():
return "", "", "", False, 0
clear_btn.click(
clear_form,
outputs=[strategy_name, strategy_description, strategy_text, is_editing, edit_id]
)
gr.Markdown("---")
gr.Markdown("## Execute Analysis")
gr.Markdown("First select a strategy above, then run the analysis pipeline.")
# Check if strategy is loaded - improved formatting
def check_strategy_loaded():
if not strategy_params.get("strategy_text"):
return (
"⚠️ No strategy selected! Please load a strategy first.",
gr.update(interactive=False)
)
else:
# Format the strategy preview in a more structured way
strategy_text = strategy_params.get("strategy_text", "")
# Properly format the strategy preview
strategy_preview = f"""
## Current Strategy
```
{strategy_text}
```
**Parameters**:
- Timeframe: {strategy_params.get('timeframe_minutes', 'N/A')} minutes
- Max Allocation: {strategy_params.get('max_allocation_percentage', 'N/A')}%
"""
return (
strategy_preview,
gr.update(interactive=True)
)
# Display current strategy and run button
current_strategy_info = gr.Markdown("No strategy loaded")
run_pipeline_btn = gr.Button("Run Analysis Pipeline", variant="primary", interactive=False)
# Add a refresh button to check if strategy is loaded
check_strategy_btn = gr.Button("Check Current Strategy")
check_strategy_btn.click(
check_strategy_loaded,
outputs=[current_strategy_info, run_pipeline_btn]
)
# Run button should automatically check if strategy is loaded
analysis_result = gr.Markdown("Analysis results will appear here...")
# Function to run full analysis and format detailed output
def run_full_analysis_with_check():
try:
if not strategy_params.get("strategy_text"):
return "Error: No strategy selected. Please load a strategy before running analysis."
result = run_full_analysis()
if "error" in result:
return f"Analysis failed: {result['error']}"
return format_detailed_analysis_results(result)
except Exception as e:
return f"Error running analysis: {str(e)}"
# Connect the run button
run_pipeline_btn.click(
run_full_analysis_with_check,
outputs=[analysis_result]
)
# Trading Tab
with gr.TabItem("Trading"):
gr.Markdown("## Account Summary")
account_summary = gr.Markdown("")
with gr.Row():
refresh_account_btn = gr.Button("Refresh Account Info")
reset_portfolio_btn = gr.Button("Reset Portfolio")
reset_portfolio_message = gr.Textbox(label="Reset Portfolio Status")
refresh_account_btn.click(get_account_summary, outputs=account_summary)
reset_portfolio_btn.click(reset_portfolio, outputs=reset_portfolio_message)
gr.Markdown("## Manual Trading")
with gr.Row():
action = gr.Dropdown(["buy", "sell", "check"], label="Action")
symbol = gr.Dropdown(["BTC/USD", "ETH/USD"], label="Symbol", value="BTC/USD")
allocation = gr.Slider(minimum=1, maximum=100, value=10, label="Allocation Percentage")
execute_btn = gr.Button("Execute Trade")
trade_result = gr.Textbox(label="Trade Result")
execute_btn.click(execute_trade, inputs=[action, symbol, allocation], outputs=trade_result)
# Monitoring Tab
with gr.TabItem("Monitoring"):
with gr.Tabs():
with gr.TabItem("Analysis Results"):
gr.Markdown("## Latest Analysis Results")
analysis_history = gr.Dataframe(
headers=["ID", "Timestamp", "Strategy", "Signal", "Confidence", "Allocation"],
datatype=["number", "str", "str", "str", "number", "number"],
label="Recent Analysis Results"
)
def get_analysis_history():
results = get_recent_analysis_results(10)
return [
[r["id"], r["timestamp"], r["strategy_name"], r["signal"].upper(),
r["confidence"], r["allocation_percentage"]]
for r in results
]
refresh_analysis_btn = gr.Button("Refresh Results")
refresh_analysis_btn.click(get_analysis_history, outputs=analysis_history)
selected_analysis_id = gr.Number(label="Result ID to View", value=0)
view_analysis_btn = gr.Button("View Detailed Analysis")
detailed_analysis = gr.Markdown("Select an analysis result and click 'View Detailed Analysis'")
def view_detailed_analysis(analysis_id):
if analysis_id <= 0:
return "Please select a valid analysis result ID"
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Get the full analysis result
cursor.execute("""
SELECT ar.*, s.name as strategy_name
FROM analysis_results ar
LEFT JOIN strategies s ON ar.strategy_id = s.id
WHERE ar.id = ?
""", (int(analysis_id),))
row = cursor.fetchone()
conn.close()
if not row:
return "Analysis result not found"
# Convert row to dict
column_names = [description[0] for description in cursor.description]
result = {column_names[i]: row[i] for i in range(len(column_names))}
# Parse JSON fields
if "indicator_values" in result and result["indicator_values"]:
try:
result["technical_indicators"] = json.loads(result["indicator_values"])
except json.JSONDecodeError as e:
result["technical_indicators"] = {"error": f"Failed to parse indicators: {str(e)}"}
# Format the detailed result
return format_detailed_analysis_results(result)
except Exception as e:
import traceback
return f"Error retrieving analysis details: {str(e)}\n{traceback.format_exc()}"
view_analysis_btn.click(view_detailed_analysis, inputs=[selected_analysis_id], outputs=detailed_analysis)
with gr.TabItem("Positions"):
gr.Markdown("## Active Positions")
positions_display = gr.Markdown("")
refresh_positions_btn = gr.Button("Refresh Positions")
refresh_positions_btn.click(format_active_positions, outputs=positions_display)
with gr.TabItem("Transactions"):
gr.Markdown("## Transaction History")
transactions_history = gr.Dataframe(
headers=["ID", "Date", "Symbol", "Action", "Quantity", "Price", "Status"],
datatype=["number", "str", "str", "str", "number", "number", "str"],
label="Recent Transactions"
)
def get_transactions_history():
txs = get_recent_transactions(20)
return [
[tx["id"], tx["timestamp"], tx["symbol"], tx["action"].upper(),
float(tx["quantity"]), float(tx["price"]), tx["status"]]
for tx in txs
]
refresh_tx_btn = gr.Button("Refresh Transactions")
refresh_tx_btn.click(get_transactions_history, outputs=transactions_history)
orders_display = gr.Markdown("")
refresh_orders_btn = gr.Button("Show Full Transaction Details")
refresh_orders_btn.click(format_order_history, outputs=orders_display)
# Automated Analysis Tab
with gr.TabItem("Automated Analysis"):
gr.Markdown("## Background Analysis")
gr.Markdown("""
### Full Crew Automated Analysis
The automated analysis will run the complete Bitcoin Analysis Crew at intervals defined by the timeframe setting.
This includes:
- Technical analysis
- Market sentiment analysis
- Order execution
**All trades and analysis results will be logged** to a file in the data directory for later review.
""")
gr.Markdown(f"Current timeframe: **{strategy_params['timeframe_minutes']} minutes**")
with gr.Row():
start_btn = gr.Button("Start Automated Analysis", variant="primary")
stop_btn = gr.Button("Stop Automated Analysis", variant="stop")
auto_status = gr.Textbox(label="Automation Status")
start_btn.click(start_background_process, outputs=auto_status)
stop_btn.click(stop_background_process, outputs=auto_status)
# Add log path information
log_path_info = gr.Markdown(f"""
**Log File Location**
Trading logs will be saved to:
```
{os.path.dirname(DB_PATH)}
```
A new log file is created for each trading session with format: `auto_trading_log_YYYYMMDD_HHMMSS.txt`
These logs contain:
- All analysis results
- Trade executions
- Account balances
- Profit/loss tracking
- Errors and warnings
""")
auto_result = gr.Markdown("Automated analysis results will appear here...")
# Refresh button with detailed results
auto_refresh = gr.Button("Refresh Latest Results")
# Update to use detailed formatting
auto_refresh.click(format_detailed_analysis_results, outputs=auto_result)
# Add warning about strategy selection
gr.Markdown("""
**IMPORTANT**: Make sure you have loaded a strategy from the Strategy Configuration tab before starting automated analysis.
The system will use the currently loaded strategy for all automated runs.
""")
# Add instructions for running for days
gr.Markdown("""
### Running for Extended Periods
To run this application continuously for days:
1. Start the automated analysis with your chosen strategy and timeframe
2. Keep this application running (do not close the browser tab or terminal)
3. If running on a remote server, use tools like `screen` or `tmux` to keep the session alive
**Example terminal command for persistent session**:
```
# Start a new screen session
screen -S cryptic_trading
# Then run your application in that session
python src/ui/app.py
# You can detach from the session with Ctrl+A, D and reconnect later with:
screen -r cryptic_trading
```
""")
# Launch the app
if __name__ == "__main__":
# Ensure directories exist
os.makedirs("results", exist_ok=True)
# Run initial data fetching
fetch_account_info()
fetch_order_history()
fetch_active_positions()
# Launch the app
app.launch(share=False)