Trad / app.py
Riy777's picture
Update app.py
039d23c verified
# ==============================================================================
# 🚀 app.py (V70.6 - GEM-Architect: R2 Source of Truth)
# ==============================================================================
# - Fix: All UI elements now fetch directly from R2 to prevent Split-Brain.
# - Features: X-Ray Mode, Neural Visibility, Atomic UI Updates.
# ==============================================================================
import os
import sys
import traceback
import asyncio
import gc
import time
import json
import logging
from datetime import datetime, timedelta
from contextlib import asynccontextmanager, redirect_stdout, redirect_stderr
from io import StringIO
from typing import List, Dict, Any, Optional
from fastapi import FastAPI
import gradio as gr
import pandas as pd
import numpy as np
import plotly.graph_objects as go
# ------------------------------------------------------------------------------
# 1. Logging Configuration
# ------------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger("TitanCore")
# ------------------------------------------------------------------------------
# 2. Critical Imports & Error Handling
# ------------------------------------------------------------------------------
try:
# Core Infrastructure
from r2 import R2Service, INITIAL_CAPITAL
# ML & Data Engines
from ml_engine.data_manager import DataManager
from ml_engine.processor import MLProcessor, SystemLimits
# External Intelligence
from whale_monitor.core import EnhancedWhaleMonitor
from whale_monitor.rpc_manager import AdaptiveRpcManager
from sentiment_news import NewsFetcher
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
# Learning & Execution
from learning_hub.adaptive_hub import AdaptiveHub
from trade_manager import TradeManager
# Optional Components
try:
from backtest_engine import run_strategic_optimization_task
BACKTEST_AVAILABLE = True
except ImportError:
BACKTEST_AVAILABLE = False
logger.warning("⚠️ Backtest Engine not found. Optimization disabled.")
try:
from periodic_tuner import ContinuousTuner
except ImportError:
ContinuousTuner = None
logger.warning("⚠️ Continuous Tuner not found. Adaptive tuning disabled.")
except ImportError as e:
logger.critical(f"❌ [FATAL ERROR] Failed to import core modules: {e}")
traceback.print_exc()
sys.exit(1)
# ------------------------------------------------------------------------------
# 3. Global Context & System State
# ------------------------------------------------------------------------------
# Global Instances
r2: R2Service = None
data_manager: DataManager = None
ml_processor: MLProcessor = None
adaptive_hub: AdaptiveHub = None
trade_manager: TradeManager = None
whale_monitor: EnhancedWhaleMonitor = None
news_fetcher: NewsFetcher = None
senti_analyzer: SentimentIntensityAnalyzer = None
class SystemState:
"""Central State Management for the Application"""
def __init__(self):
self.ready = False
self.cycle_running = False
self.training_running = False
self.auto_pilot = True
self.last_cycle_time: datetime = None
self.last_cycle_error = None
self.app_start_time = datetime.now()
self.last_cycle_logs = "System Initializing..."
self.training_status_msg = "Adaptive Mode: Active"
self.scan_interval = 60 # Seconds between auto-scans
def set_ready(self):
self.ready = True
self.last_cycle_logs = "✅ System Ready. R2 Source of Truth Mode."
logger.info("✅ System State set to READY.")
def set_cycle_start(self):
self.cycle_running = True
self.last_cycle_logs = "🌀 [Cycle START] Scanning Markets..."
logger.info("🌀 Cycle STARTED.")
def set_cycle_end(self, error=None, logs=None):
self.cycle_running = False
self.last_cycle_time = datetime.now()
self.last_cycle_error = str(error) if error else None
if logs:
self.last_cycle_logs = logs
elif error:
self.last_cycle_logs = f"❌ [Cycle ERROR] {error}"
logger.error(f"Cycle Error: {error}")
else:
self.last_cycle_logs = f"✅ [Cycle END] Finished successfully."
logger.info("✅ Cycle ENDED.")
sys_state = SystemState()
# ------------------------------------------------------------------------------
# 4. Helper Functions & Utilities
# ------------------------------------------------------------------------------
def calculate_duration_str(timestamp_str):
"""Calculates formatted duration string from a timestamp."""
if not timestamp_str: return "--:--:--"
try:
if isinstance(timestamp_str, str):
start_time = datetime.fromisoformat(timestamp_str)
else: start_time = timestamp_str
diff = datetime.now() - start_time
total_seconds = int(diff.total_seconds())
days = total_seconds // 86400
hours = (total_seconds % 86400) // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
if days > 0: return f"{days}d {hours:02}:{minutes:02}:{seconds:02}"
return f"{hours:02}:{minutes:02}:{seconds:02}"
except: return "--:--:--"
def format_pnl_split(profit, loss):
"""HTML formatted profit/loss split."""
return f"<span style='color:#00ff00'>+${profit:,.2f}</span> / <span style='color:#ff0000'>-${abs(loss):,.2f}</span>"
# ------------------------------------------------------------------------------
# 5. Background Daemons (Auto-Pilot)
# ------------------------------------------------------------------------------
async def auto_pilot_loop():
"""Background task to manage automated trading cycles."""
logger.info("🤖 [Auto-Pilot] Daemon started.")
while True:
try:
await asyncio.sleep(5)
if not sys_state.ready: continue
# Update Hub Status Periodically
if adaptive_hub and int(time.time()) % 60 == 0:
sys_state.training_status_msg = adaptive_hub.get_status()
# Trade Manager Watchdog (Reduced freq)
if trade_manager and len(trade_manager.open_positions) > 0:
if not sys_state.cycle_running:
sys_state.last_cycle_logs = trade_manager.latest_guardian_log
continue
# Trigger Auto Scan
if sys_state.auto_pilot and not sys_state.cycle_running and not sys_state.training_running:
# Check interval
if sys_state.last_cycle_time:
elapsed = (datetime.now() - sys_state.last_cycle_time).total_seconds()
if elapsed < sys_state.scan_interval:
continue
logger.info("🤖 [Auto-Pilot] Triggering scheduled scan...")
asyncio.create_task(run_unified_cycle())
await asyncio.sleep(5)
except Exception as e:
logger.error(f"⚠️ [Auto-Pilot Error] {e}")
await asyncio.sleep(30)
# ------------------------------------------------------------------------------
# 6. Application Lifespan (Startup/Shutdown)
# ------------------------------------------------------------------------------
@asynccontextmanager
async def lifespan(app: FastAPI):
global r2, data_manager, ml_processor, adaptive_hub, trade_manager, whale_monitor, news_fetcher, senti_analyzer, sys_state
logger.info("\n🚀 [System] Startup Sequence (Titan V70.6 - R2 Sync)...")
try:
# 1. Initialize R2 (Persistence)
r2 = R2Service()
# 2. Initialize Data Manager
data_manager = DataManager(contracts_db={}, whale_monitor=None, r2_service=r2)
await data_manager.initialize()
await data_manager.load_contracts_from_r2()
# 3. Initialize Whale Monitor
whale_monitor = EnhancedWhaleMonitor(contracts_db=data_manager.get_contracts_db(), r2_service=r2)
rpc_mgr = AdaptiveRpcManager(data_manager.http_client)
whale_monitor.set_rpc_manager(rpc_mgr)
data_manager.whale_monitor = whale_monitor
# 4. Initialize External Data Feeds
news_fetcher = NewsFetcher()
senti_analyzer = SentimentIntensityAnalyzer()
# 5. Initialize Adaptive Learning Hub
adaptive_hub = AdaptiveHub(r2_service=r2)
await adaptive_hub.initialize()
# 6. Initialize ML Processor (PatternNet + Oracle + Sniper)
ml_processor = MLProcessor(data_manager=data_manager)
await ml_processor.initialize()
# 7. Initialize Trade Manager
trade_manager = TradeManager(r2_service=r2, data_manager=data_manager, processor=ml_processor)
trade_manager.learning_hub = adaptive_hub
# 8. Inject Continuous Tuner if available
if ContinuousTuner:
adaptive_hub.tuner = ContinuousTuner(adaptive_hub)
logger.info("🔧 [Tuner] Continuous Tuner Injected.")
# 9. Start Sentries
await trade_manager.sync_internal_state_with_r2()
await trade_manager.initialize_sentry_exchanges()
await trade_manager.start_sentry_loops()
# 10. Finalize Startup
sys_state.set_ready()
asyncio.create_task(auto_pilot_loop())
logger.info("✅ [System READY] All modules operational.")
yield
except Exception as e:
logger.critical(f"❌ [FATAL STARTUP ERROR] {e}")
traceback.print_exc()
sys.exit(1)
finally:
# Shutdown Sequence
sys_state.ready = False
logger.info("🛑 [System] Initiating Shutdown...")
if trade_manager: await trade_manager.stop_sentry_loops()
if data_manager: await data_manager.close()
if whale_monitor and whale_monitor.rpc_manager: await whale_monitor.rpc_manager.close()
logger.info("✅ [System] Shutdown Complete.")
# ------------------------------------------------------------------------------
# 7. Analysis Tasks (Data Fetcher for Layers)
# ------------------------------------------------------------------------------
async def _fetch_l2_data_task(candidate_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Fetches required 15m, 1h, 4h data for L2 (Pattern/Oracle/MC).
Does NOT run the models, just prepares the data package.
"""
try:
symbol = candidate_data['symbol']
required_tfs = ["15m", "1h", "4h"]
# Concurrent Data Fetching
data_tasks = [data_manager.get_latest_ohlcv(symbol, tf, limit=300) for tf in required_tfs]
all_data = await asyncio.gather(*data_tasks)
ohlcv_data = {}
for tf, data in zip(required_tfs, all_data):
if data and len(data) > 0: ohlcv_data[tf] = data
# Validation: Ensure critical timeframes exist
if '15m' not in ohlcv_data:
return None
current_price = await data_manager.get_latest_price_async(symbol)
# Construct Raw Data Package
raw_data = {
'symbol': symbol,
'ohlcv': ohlcv_data,
'current_price': current_price,
'timestamp': time.time(),
'dynamic_limits': candidate_data.get('dynamic_limits', {}),
'asset_regime': candidate_data.get('asset_regime', 'UNKNOWN'),
'strategy_tag': candidate_data.get('strategy_tag', 'NONE'),
'strategy_type': candidate_data.get('strategy_type', 'NORMAL'),
'l1_score': candidate_data.get('l1_sort_score', 0)
}
return raw_data
except Exception:
return None
# ------------------------------------------------------------------------------
# 8. Unified Logic Cycle (The 5-Layer Pipeline)
# ------------------------------------------------------------------------------
async def run_unified_cycle():
"""
💎 GEM-Architect: The 5-Layer Precision Pipeline
"""
log_buffer = StringIO()
def log_and_print(message):
logger.info(message)
log_buffer.write(message + '\n')
if sys_state.cycle_running or sys_state.training_running: return
if not sys_state.ready: return
sys_state.set_cycle_start()
try:
# Sync R2 State (Crucial)
await trade_manager.sync_internal_state_with_r2()
# Log Active Trades
if len(trade_manager.open_positions) > 0:
log_and_print(f"ℹ️ [Cycle] Active Positions: {len(trade_manager.open_positions)}")
for sym, tr in trade_manager.open_positions.items():
curr_p = await data_manager.get_latest_price_async(sym)
entry_p = float(tr.get('entry_price', 0))
pnl = ((curr_p - entry_p) / entry_p) * 100 if entry_p > 0 else 0
log_and_print(f" 🔒 {sym}: {pnl:+.2f}%")
# ==============================================================================
# 🌊 LAYER 1: Market Breadth & Liquidity Filter
# ==============================================================================
log_and_print(f" [1/5] 🌊 Layer 1: Market Breadth & Liquidity Screen...")
l1_candidates = await data_manager.layer1_rapid_screening(limit=200, adaptive_hub_ref=adaptive_hub)
if not l1_candidates:
log_and_print("⚠️ [Layer 1] No candidates found (Market unsafe or flat).")
sys_state.set_cycle_end(logs=log_buffer.getvalue())
return
# ==============================================================================
# 🧠 LAYER 2: Neural Analysis (Pattern + Oracle + MC)
# ==============================================================================
log_and_print(f" [2/5] 🧠 Layer 2: Neural Pattern Analysis ({len(l1_candidates)} coins)...")
# Batch execute L2
async def process_l2(cand):
raw_data = await _fetch_l2_data_task(cand)
if not raw_data: return None
# Processor now returns full dict even if rejected
return await ml_processor.execute_layer2_analysis(raw_data)
l2_tasks = [process_l2(c) for c in l1_candidates]
l2_results = await asyncio.gather(*l2_tasks)
processed_l2 = [res for res in l2_results if res is not None]
processed_l2.sort(key=lambda x: x['l2_score'], reverse=True)
# 👁️ X-RAY MODE: Show Top 10 regardless of status
top_view = processed_l2[:10]
log_and_print(f"📊 [X-RAY] Top 10 Scanned Candidates (Pass & Fail):")
l2_header = f"{'STS':<3} | {'SYMBOL':<9} | {'COMP':<5} | {'PATT':<5} | {'ORCL':<5} | {'MC':<4}"
log_and_print("-" * 65)
log_and_print(l2_header)
for c in top_view:
status = "✅" if c.get('is_valid', False) else "❌"
log_and_print(f"{status:<3} | {c['symbol']:<9} | {c['l2_score']:>5.1f} | {c['pattern_score']*100:>5.1f} | {c['oracle_score']*100:>5.1f} | {c['mc_score']*100:>4.1f}")
log_and_print("-" * 65)
valid_l2 = [c for c in processed_l2 if c.get('is_valid', False)]
if not valid_l2:
log_and_print("⚠️ [Layer 2] All candidates failed Pattern Gate.")
log_and_print(" (Check X-Ray table above to see why scores are low)")
sys_state.set_cycle_end(logs=log_buffer.getvalue())
return
# ==============================================================================
# 📡 LAYER 3: External Intelligence (News + Whales)
# ==============================================================================
log_and_print(f"\n [3/5] 📡 Layer 3: External Intelligence ({len(valid_l2)} candidates)...")
final_l3_list = []
for cand in valid_l2:
symbol = cand['symbol']
l2_score = cand['l2_score']
# Whale Analysis
whale_bonus = 0.0
if whale_monitor:
try:
w_data = await whale_monitor.get_symbol_whale_activity(symbol, known_price=cand.get('current_price', 0))
if w_data and w_data.get('trading_signal', {}).get('action') == 'BUY':
conf = float(w_data.get('trading_signal', {}).get('confidence', 0.5))
whale_bonus = SystemLimits.L3_WHALE_IMPACT_MAX * conf
except: pass
# News Analysis
news_bonus = 0.0
if news_fetcher and senti_analyzer:
try:
n_data = await news_fetcher.get_news(symbol)
summary = n_data.get('summary', '')
if "No specific news" not in summary:
sent = senti_analyzer.polarity_scores(summary)
news_bonus = sent['compound'] * SystemLimits.L3_NEWS_IMPACT_MAX
except: pass
cand['whale_score'] = whale_bonus
cand['news_score'] = news_bonus
cand['final_total_score'] = l2_score + whale_bonus + news_bonus
final_l3_list.append(cand)
final_l3_list.sort(key=lambda x: x['final_total_score'], reverse=True)
log_and_print(f" -> {len(final_l3_list)} passed to L3.")
l3_header = f"{'SYMBOL':<9} | {'L2':<5} | {'WHALE':<6} | {'NEWS':<6} | {'TOTAL':<6}"
log_and_print("-" * 60)
log_and_print(l3_header)
for c in final_l3_list[:5]: # Show top 5 survivors
log_and_print(f"{c['symbol']:<9} | {c['l2_score']:>5.1f} | {c['whale_score']:>6.1f} | {c['news_score']:>6.1f} | {c['final_total_score']:>6.1f}")
log_and_print("-" * 60)
# ==============================================================================
# 🎯 LAYER 4: Sniper (Micro-Structure) - Find The One
# ==============================================================================
log_and_print(f"\n [4/5] 🎯 Layer 4: Sniper Micro-Analysis...")
best_candidate = None
best_sniper_score = -1.0
for cand in final_l3_list[:5]:
symbol = cand['symbol']
t1m = await data_manager.get_latest_ohlcv(symbol, '1m', 500)
ob = await data_manager.get_order_book_snapshot(symbol)
if not t1m or not ob: continue
sniper_res = await ml_processor.execute_layer4_sniper(symbol, t1m, ob)
s_sig = sniper_res.get('signal', 'WAIT')
s_conf = sniper_res.get('confidence_prob', 0.0)
s_reason = sniper_res.get('reason', 'N/A')
log_and_print(f" 🔭 {symbol:<6} -> Signal: {s_sig} | Conf: {s_conf:.2f} | {s_reason}")
if s_sig == 'BUY' and s_conf > best_sniper_score:
best_sniper_score = s_conf
cand['sniper_entry_price'] = sniper_res.get('entry_price', 0)
cand['sniper_score'] = s_conf
best_candidate = cand
if not best_candidate:
log_and_print("🛑 Layer 4: Sniper found no valid entry points.")
sys_state.set_cycle_end(logs=log_buffer.getvalue())
return
# ==============================================================================
# 🏛️ LAYER 5: Governance & Execution
# ==============================================================================
log_and_print(f"\n [5/5] 🏛️ Layer 5: Sending {best_candidate['symbol']} to Governance...")
tm_log_buffer = StringIO()
with redirect_stdout(tm_log_buffer), redirect_stderr(tm_log_buffer):
await trade_manager.select_and_execute_best_signal([best_candidate])
tm_logs = tm_log_buffer.getvalue()
for line in tm_logs.splitlines():
if line.strip(): log_and_print(line.strip())
gc.collect()
sys_state.set_cycle_end(logs=log_buffer.getvalue())
except Exception as e:
logger.error(f"❌ [Cycle ERROR] {e}")
traceback.print_exc()
sys_state.set_cycle_end(error=e, logs=log_buffer.getvalue())
# ------------------------------------------------------------------------------
# 9. UI Action Handlers
# ------------------------------------------------------------------------------
async def trigger_training_cycle():
if adaptive_hub: return f"🤖 Adaptive System: {adaptive_hub.get_status()}"
return "⚠️ System not ready."
async def trigger_strategic_backtest():
if not BACKTEST_AVAILABLE: return "⚠️ Backtest Engine not found."
if trade_manager and len(trade_manager.open_positions) > 0: return "⛔ Active trades exist."
if sys_state.training_running: return "⚠️ Running."
async def _run_bg_task():
sys_state.training_running = True
sys_state.training_status_msg = "🧪 Strategic Backtest Running..."
try:
logger.info("🧪 [Manual] Starting Strategic Backtest...")
await run_strategic_optimization_task()
if adaptive_hub: await adaptive_hub.initialize()
logger.info("✅ [Manual] Backtest Complete.")
except Exception as e: logger.error(f"❌ Backtest Failed: {e}")
finally:
sys_state.training_running = False
sys_state.training_status_msg = adaptive_hub.get_status() if adaptive_hub else "Ready"
asyncio.create_task(_run_bg_task())
return "🧪 Strategic Backtest Started."
async def manual_close_current_trade():
# Sync first to be sure
await trade_manager.sync_internal_state_with_r2()
if not trade_manager.open_positions: return "⚠️ No trade found in R2."
symbol = list(trade_manager.open_positions.keys())[0]
await trade_manager.force_exit_by_manager(symbol, reason="MANUAL_UI")
return f"✅ Closed {symbol}."
async def reset_history_handler():
if trade_manager.open_positions: return "⚠️ Close active trades first."
current_state = await r2.get_portfolio_state_async()
preserved_capital = current_state.get('current_capital_usd', INITIAL_CAPITAL)
await r2.reset_all_stats_async()
if trade_manager and trade_manager.smart_portfolio:
sp = trade_manager.smart_portfolio
sp.state["current_capital"] = preserved_capital
sp.state["session_start_balance"] = preserved_capital
sp.state["allocated_capital_usd"] = 0.0
sp.state["daily_net_pnl"] = 0.0
sp.state["is_trading_halted"] = False
await sp._save_state_to_r2()
return f"✅ History Cleared. Capital Preserved at ${preserved_capital:.2f}"
async def reset_capital_handler():
if trade_manager.open_positions: return "⚠️ Close active trades first."
if trade_manager and trade_manager.smart_portfolio:
sp = trade_manager.smart_portfolio
sp.state["current_capital"] = INITIAL_CAPITAL
sp.state["session_start_balance"] = INITIAL_CAPITAL
sp.state["allocated_capital_usd"] = 0.0
sp.state["daily_net_pnl"] = 0.0
sp.state["is_trading_halted"] = False
await sp._save_state_to_r2()
return f"✅ Capital Reset to ${INITIAL_CAPITAL} (History Kept)"
async def reset_diagnostics_handler():
await r2.reset_diagnostic_stats_async()
return "✅ Diagnostic Matrix Reset."
async def reset_guardians_handler():
await r2.reset_guardian_stats_async()
if trade_manager: trade_manager.ai_stats = await r2.get_guardian_stats_async()
return "✅ Guardian Stats Reset."
async def toggle_auto_pilot(enable):
sys_state.auto_pilot = enable
return f"Auto-Pilot: {enable}"
async def run_cycle_from_gradio():
if sys_state.cycle_running: return "Busy."
asyncio.create_task(run_unified_cycle())
return "🚀 Launched."
# ------------------------------------------------------------------------------
# 10. UI Data Fetching Logic (Status & Charts)
# ------------------------------------------------------------------------------
async def check_live_pnl_and_status(selected_view="Dual-Core (Hybrid)"):
"""
Poller function to update Gradio UI every few seconds.
NOW PURELY R2-BASED TO PREVENT SPLIT-BRAIN.
"""
empty_chart = go.Figure()
empty_chart.update_layout(template="plotly_dark", paper_bgcolor="#0b0f19", plot_bgcolor="#0b0f19", xaxis={'visible':False}, yaxis={'visible':False})
wl_df_empty = pd.DataFrame(columns=["Coin", "Score"])
diag_df_empty = pd.DataFrame(columns=["Model", "Wins", "Losses", "PnL (USD)"])
type_df_empty = pd.DataFrame(columns=["Coin Type", "Wins", "Losses", "Profitability"])
if not sys_state.ready:
return "Initializing...", "...", empty_chart, "0.0", "0.0", "0.0", "0.0", "0.0%", wl_df_empty, diag_df_empty, type_df_empty, "Loading...", "Loading...", "Loading..."
try:
# 🔥 DIRECT R2 FETCH (Source of Truth)
portfolio = await r2.get_portfolio_state_async()
open_trades_raw = await r2.get_open_trades_async()
# Parse Trades
active_trades_dict = {}
if isinstance(open_trades_raw, list):
for t in open_trades_raw: active_trades_dict[t['symbol']] = t
elif isinstance(open_trades_raw, dict):
active_trades_dict = open_trades_raw
equity = portfolio.get('current_capital_usd', INITIAL_CAPITAL)
daily_pnl = portfolio.get('daily_net_pnl', 0.0)
allocated = portfolio.get('allocated_capital_usd', 0.0)
is_halted = portfolio.get('is_trading_halted', False)
free_cap = max(0.0, equity - allocated)
symbol = None; entry_p = 0.0; tp_p = 0.0; sl_p = 0.0; curr_p = 0.0; pnl_pct = 0.0; pnl_val_unrealized = 0.0
active_trade_info = ""
trade_dur_str = "--:--:--"
# --- Active Trade Details ---
if active_trades_dict:
symbol = list(active_trades_dict.keys())[0]
trade = active_trades_dict[symbol]
entry_p = float(trade.get('entry_price', 0.0))
tp_p = float(trade.get('tp_price', 0.0))
sl_p = float(trade.get('sl_price', 0.0))
trade_dur_str = calculate_duration_str(trade.get('entry_time'))
decision_data = trade.get('decision_data', {})
gov_grade = decision_data.get('governance_grade', 'N/A')
gov_score = decision_data.get('governance_score', 0.0)
oracle_score = decision_data.get('oracle_score', 0.0)
curr_p = await data_manager.get_latest_price_async(symbol)
if curr_p > 0 and entry_p > 0:
pnl_pct = ((curr_p - entry_p) / entry_p) * 100
size = float(trade.get('entry_capital', 0.0))
pnl_val_unrealized = size * (pnl_pct / 100)
grade_color = "#ccc"
if gov_grade == "ULTRA": grade_color = "#ff00ff"
elif gov_grade == "STRONG": grade_color = "#00ff00"
elif gov_grade == "NORMAL": grade_color = "#00e5ff"
active_trade_info = f"""
<div style='display: flex; justify-content: space-between; font-size: 12px; color: #ccc; margin-top:5px; border-top: 1px solid #333; padding-top: 5px;'>
<span>⏱️ Time:</span> <span style='color: #ffff00;'>{trade_dur_str}</span>
</div>
<div style='display: flex; justify-content: space-between; font-size: 12px; color: #ccc; margin-top:5px;'>
<span>🏛️ Grade:</span> <span style='color: {grade_color}; font-weight:bold;'>{gov_grade} ({gov_score:.1f})</span>
</div>
<div style='display: flex; justify-content: space-between; font-size: 12px; color: #ccc; margin-top:5px;'>
<span>🔮 Oracle:</span> <span style='color: #00ff00;'>{oracle_score*100:+.1f}%</span>
</div>
"""
virtual_equity = equity + pnl_val_unrealized
active_pnl_color = "#00ff00" if pnl_val_unrealized >= 0 else "#ff0000"
total_t = portfolio.get('total_trades', 0)
wins = portfolio.get('winning_trades', 0)
losses = portfolio.get('losing_trades', 0)
if losses == 0 and total_t > 0: losses = total_t - wins
tot_prof = portfolio.get('total_profit_usd', 0.0)
tot_loss = portfolio.get('total_loss_usd', 0.0)
net_prof = tot_prof - tot_loss
win_rate = (wins / total_t * 100) if total_t > 0 else 0.0
halt_status = "<span style='color:red; font-weight:bold;'>HALTED</span>" if is_halted else "<span style='color:#00ff00;'>ACTIVE</span>"
# --- Wallet Widget ---
wallet_md = f"""
<div style='background-color: #1a1a1a; padding: 15px; border-radius: 8px; border: 1px solid #333; text-align:center;'>
<h3 style='margin:0; color:#888; font-size:14px;'>💼 Institutional Portfolio</h3>
<div style='font-size: 24px; font-weight: bold; color: white; margin: 5px 0 0 0;'>${virtual_equity:,.2f}</div>
<div style='font-size: 14px; color: {active_pnl_color}; margin-bottom: 5px;'>({pnl_val_unrealized:+,.2f} USD)</div>
<table style='width:100%; font-size:12px; margin-top:5px; color:#ccc;'>
<tr><td>Allocated:</td><td style='text-align:right; color:#ffa500;'>${allocated:.2f}</td></tr>
<tr><td>Free Cap:</td><td style='text-align:right; color:#00ff00;'>${free_cap:.2f}</td></tr>
<tr><td>Daily PnL:</td><td style='text-align:right; color:{"#00ff00" if daily_pnl>=0 else "#ff0000"};'>${daily_pnl:+.2f}</td></tr>
</table>
<hr style='border-color:#444; margin: 10px 0;'>
<div style='display: flex; justify-content: space-between; font-size: 12px; color: #ccc;'>
<span>🛡️ Status:</span> {halt_status}
</div>
{active_trade_info}
</div>
"""
# --- Guardian Stats ---
key_map = {
"Dual-Core (Hybrid)": "hybrid",
"Hydra: Crash (Panic)": "crash",
"Hydra: Giveback (Profit)": "giveback",
"Hydra: Stagnation (Time)": "stagnation"
}
target_key = key_map.get(selected_view, "hybrid")
# Fetch stats from R2 (not memory)
stats_file = await r2.get_guardian_stats_async()
stats_data = stats_file.get(target_key, {"total":0, "good":0, "saved":0.0, "missed":0.0})
tot_ds = stats_data['total']
ds_acc = (stats_data['good'] / tot_ds * 100) if tot_ds > 0 else 0.0
history_md = f"""
<div style='background-color: #1a1a1a; padding: 10px; border-radius: 8px; border: 1px solid #333; font-size: 12px;'>
<h3 style='margin:0 0 5px 0; color:#888; font-size:14px;'>📊 Performance</h3>
<table style='width:100%; color:white;'>
<tr><td>Trades:</td><td style='text-align:right;'>{total_t}</td></tr>
<tr><td>Win Rate:</td><td style='text-align:right; color:{"#00ff00" if win_rate>=50 else "#ff0000"};'>{win_rate:.1f}%</td></tr>
<tr><td>Wins:</td><td style='text-align:right; color:#00ff00;'>{wins} (+${tot_prof:,.2f})</td></tr>
<tr><td>Losses:</td><td style='text-align:right; color:#ff0000;'>{losses} (-${tot_loss:,.2f})</td></tr>
<tr><td style='border-top:1px solid #444;'>Net:</td><td style='border-top:1px solid #444; text-align:right; color:{"#00ff00" if net_prof>=0 else "#ff0000"};'>${net_prof:,.2f}</td></tr>
</table>
<hr style='border-color:#444; margin: 8px 0;'>
<h3 style='margin:0 0 5px 0; color: #00e5ff; font-size:14px;'>🛡️ Guard IQ ({target_key})</h3>
<table style='width:100%; color:white;'>
<tr><td>Interventions:</td><td style='text-align:right;'>{tot_ds}</td></tr>
<tr><td>Accuracy:</td><td style='text-align:right; color:#00e5ff;'>{ds_acc:.1f}%</td></tr>
<tr><td>Saved:</td><td style='text-align:right; color:#00ff00;'>${stats_data['saved']:.2f}</td></tr>
<tr><td>Missed:</td><td style='text-align:right; color:#ff0000;'>${stats_data['missed']:.2f}</td></tr>
</table>
</div>
"""
# --- Neural Status ---
status_msg = "Inactive"
if adaptive_hub:
status_msg = adaptive_hub.get_status()
neural_md = f"""
<div style='background-color: #1a1a1a; padding: 10px; border-radius: 8px; border: 1px solid #333; font-size: 12px; margin-top: 10px;'>
<h3 style='margin:0; color:#00e5ff; font-size:14px;'>🧠 Micro-Tuner</h3>
<table style='width:100%; color:#ccc;'>
<tr style='border-bottom: 1px solid #333;'>
<td style='padding:4px 0;'>⚡ Status:</td>
<td style='text-align:right; color:#ffff00; font-weight:bold;'>Active</td>
</tr>
<tr>
<td style='padding:4px 0;'>🔧 Logic:</td>
<td style='text-align:right; color:#fff;'>{status_msg}</td>
</tr>
</table>
</div>
"""
# --- DataFrames ---
diag_data = await r2.get_diagnostic_stats_async()
diag_list = []
ordered_models = ["Pattern", "Oracle", "Sniper", "MonteCarlo_L", "MonteCarlo_A", "News", "Governance"]
for m in ordered_models:
stats = diag_data.get(m, {"wins": 0, "losses": 0, "pnl": 0.0, "profit_accum": 0.0, "loss_accum": 0.0})
profit_accum = stats.get('profit_accum', 0.0)
loss_accum = stats.get('loss_accum', 0.0)
if profit_accum == 0.0 and loss_accum == 0.0 and stats['pnl'] != 0.0:
if stats['pnl'] > 0: profit_accum = stats['pnl']
else: loss_accum = abs(stats['pnl'])
pnl_str = format_pnl_split(profit_accum, loss_accum)
diag_list.append([m, stats['wins'], stats['losses'], pnl_str])
diag_df = pd.DataFrame(diag_list, columns=["Model", "Wins", "Losses", "PnL (USD)"])
type_stats_list = []
if trade_manager and hasattr(trade_manager, 'type_stats'):
for t_name, t_data in trade_manager.type_stats.items():
name_clean = t_name.replace("_", " ")
wins = t_data.get('wins', 0)
losses = t_data.get('losses', 0)
prof = t_data.get('profit_usd', 0.0)
loss_val = t_data.get('loss_usd', 0.0)
profitability_html = format_pnl_split(prof, loss_val)
type_stats_list.append([name_clean, wins, losses, profitability_html])
type_df = pd.DataFrame(type_stats_list, columns=["Coin Type", "Wins", "Losses", "Profitability"])
wl_data = [[k, f"{v.get('final_total_score',0):.2f}"] for k, v in trade_manager.watchlist.items()]
wl_df = pd.DataFrame(wl_data, columns=["Coin", "Score"])
status_txt = sys_state.last_cycle_logs
status_line = f"Cycle: {'RUNNING' if sys_state.cycle_running else 'IDLE'} | Auto-Pilot: {'ON' if sys_state.auto_pilot else 'OFF'}"
# --- Charting ---
fig = empty_chart
if symbol and curr_p > 0:
ohlcv = await data_manager.get_latest_ohlcv(symbol, '5m', 120)
if ohlcv:
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')
fig = go.Figure(data=[go.Candlestick(
x=df['datetime'], open=df['open'], high=df['high'], low=df['low'], close=df['close'],
increasing_line_color='#00ff00', decreasing_line_color='#ff0000', name=symbol
)])
if entry_p > 0:
fig.add_hline(y=entry_p, line_dash="dash", line_color="white", annotation_text="ENTRY", annotation_position="top left")
if tp_p > 0:
fig.add_hline(y=tp_p, line_color="#00ff00", line_width=2, annotation_text="TP", annotation_position="top left")
if sl_p > 0:
fig.add_hline(y=sl_p, line_color="#ff0000", line_width=2, annotation_text="SL", annotation_position="bottom left")
fig.update_layout(
template="plotly_dark",
paper_bgcolor="#0b0f19",
plot_bgcolor="#0b0f19",
margin=dict(l=0, r=40, t=30, b=0),
height=400,
xaxis_rangeslider_visible=False,
title=dict(text=f"{symbol} (Long) | PnL: {pnl_pct:+.2f}%", font=dict(color="white"))
)
train_status = sys_state.training_status_msg
if sys_state.training_running: train_status = "🧪 Backtest Running..."
return (status_txt, status_line, fig, f"{curr_p:.6f}", f"{entry_p:.6f}", f"{tp_p:.6f}", f"{sl_p:.6f}",
f"{pnl_pct:+.2f}%", wl_df, diag_df, type_df, wallet_md, history_md, neural_md)
except Exception:
traceback.print_exc()
return "Error", "Error", empty_chart, "0", "0", "0", "0", "0%", wl_df_empty, diag_df_empty, type_df_empty, "Err", "Err", "Err"
# ------------------------------------------------------------------------------
# 11. Gradio Layout & Assembly
# ------------------------------------------------------------------------------
def create_gradio_ui():
custom_css = ".gradio-container {background:#0b0f19} .dataframe {background:#1a1a1a!important} .html-box {min-height:180px}"
with gr.Blocks(title="Titan V70.6 (R2-Sync)", css=custom_css) as demo:
gr.Markdown("# 🚀 Titan V70.6 (Neural Core + R2 Sync)")
with gr.Row():
# LEFT: Chart & Controls
with gr.Column(scale=3):
live_chart = gr.Plot(label="Chart")
with gr.Row():
t_price = gr.Textbox(label="Price", interactive=False)
t_pnl = gr.Textbox(label="PnL %", interactive=False)
with gr.Row():
t_entry = gr.Textbox(label="Entry", interactive=False)
t_tp = gr.Textbox(label="TP", interactive=False)
t_sl = gr.Textbox(label="SL", interactive=False)
# RIGHT: Stats & Panels
with gr.Column(scale=1):
wallet_out = gr.HTML(label="Smart Wallet", elem_classes="html-box")
neural_out = gr.HTML(label="Neural Cycles", elem_classes="html-box")
# Type Stats Table
gr.Markdown("### 💎 Opportunity Types")
type_stats_out = gr.Dataframe(
headers=["Coin Type", "Wins", "Losses", "Profitability"],
datatype=["str", "number", "number", "html"],
interactive=False,
label="Type Performance"
)
gr.Markdown("### 🕵️ Diagnostic Matrix")
diagnostic_out = gr.Dataframe(
headers=["Model", "Wins", "Losses", "PnL (USD)"],
datatype=["str", "number", "number", "html"],
interactive=False,
label="Model Performance"
)
with gr.Row():
btn_reset_diag = gr.Button("🧹 Reset Matrix", size="sm", variant="secondary")
btn_reset_guard = gr.Button("🛡️ Reset Guardians", size="sm", variant="secondary")
gr.HTML("<hr style='border-color:#444; margin: 10px 0;'>")
stats_dd = gr.Dropdown([
"Dual-Core (Hybrid)",
"Hydra: Crash (Panic)",
"Hydra: Giveback (Profit)",
"Hydra: Stagnation (Time)"
], value="Dual-Core (Hybrid)", label="View Guard Stats")
history_out = gr.HTML(label="Stats", elem_classes="html-box")
watchlist_out = gr.DataFrame(label="Watchlist")
gr.HTML("<hr style='border-color:#333'>")
# BOTTOM: Action Bar & Logs
with gr.Row():
with gr.Column(scale=1):
auto_pilot = gr.Checkbox(label="✈️ Auto-Pilot", value=True)
with gr.Row():
btn_run = gr.Button("🚀 Scan", variant="primary")
btn_close = gr.Button("🚨 Close", variant="stop")
with gr.Row():
btn_train = gr.Button("🤖 Status", variant="secondary")
btn_backtest = gr.Button("🧪 Run Strategic Backtest", variant="secondary")
with gr.Row():
btn_history_reset = gr.Button("🗑️ Clear History", variant="secondary")
btn_cap_reset = gr.Button("💰 Reset Capital", variant="secondary")
status = gr.Markdown("Init...")
alert = gr.Textbox(label="Alerts", interactive=False)
with gr.Column(scale=3):
logs = gr.Textbox(label="Logs", lines=14, autoscroll=True, elem_classes="log-box", type="text")
gr.HTML("<style>.log-box textarea { font-family: 'Consolas', 'Monaco', monospace !important; font-size: 12px !important; white-space: pre !important; }</style>")
# Events
btn_run.click(fn=run_cycle_from_gradio, outputs=alert)
btn_close.click(fn=manual_close_current_trade, outputs=alert)
btn_history_reset.click(fn=reset_history_handler, outputs=alert)
btn_cap_reset.click(fn=reset_capital_handler, outputs=alert)
btn_train.click(fn=trigger_training_cycle, outputs=alert)
btn_backtest.click(fn=trigger_strategic_backtest, outputs=alert)
auto_pilot.change(fn=toggle_auto_pilot, inputs=auto_pilot, outputs=alert)
btn_reset_diag.click(fn=reset_diagnostics_handler, outputs=alert)
btn_reset_guard.click(fn=reset_guardians_handler, outputs=alert)
# Periodic Refresh
gr.Timer(3).tick(
fn=check_live_pnl_and_status,
inputs=stats_dd,
outputs=[logs, status, live_chart, t_price, t_entry, t_tp, t_sl, t_pnl, watchlist_out, diagnostic_out, type_stats_out, wallet_out, history_out, neural_out]
)
return demo
# ------------------------------------------------------------------------------
# 12. Execution Entry Point
# ------------------------------------------------------------------------------
fast_api_server = FastAPI(lifespan=lifespan)
gradio_dashboard = create_gradio_ui()
app = gr.mount_gradio_app(app=fast_api_server, blocks=gradio_dashboard, path="/")
if __name__ == "__main__":
import uvicorn
# 0.0.0.0 is crucial for Docker/Colab environments
uvicorn.run(app, host="0.0.0.0", port=7860)