# ============================================================================== # ๐Ÿš€ app.py (V70.6 - GEM-Architect: R2 Source of Truth) # ============================================================================== # - Fix: All UI elements now fetch directly from R2 to prevent Split-Brain. # - Features: X-Ray Mode, Neural Visibility, Atomic UI Updates. # ============================================================================== import os import sys import traceback import asyncio import gc import time import json import logging from datetime import datetime, timedelta from contextlib import asynccontextmanager, redirect_stdout, redirect_stderr from io import StringIO from typing import List, Dict, Any, Optional from fastapi import FastAPI import gradio as gr import pandas as pd import numpy as np import plotly.graph_objects as go # ------------------------------------------------------------------------------ # 1. Logging Configuration # ------------------------------------------------------------------------------ logging.basicConfig( level=logging.INFO, format="[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%H:%M:%S", handlers=[logging.StreamHandler(sys.stdout)] ) logger = logging.getLogger("TitanCore") # ------------------------------------------------------------------------------ # 2. Critical Imports & Error Handling # ------------------------------------------------------------------------------ try: # Core Infrastructure from r2 import R2Service, INITIAL_CAPITAL # ML & Data Engines from ml_engine.data_manager import DataManager from ml_engine.processor import MLProcessor, SystemLimits # External Intelligence from whale_monitor.core import EnhancedWhaleMonitor from whale_monitor.rpc_manager import AdaptiveRpcManager from sentiment_news import NewsFetcher from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer # Learning & Execution from learning_hub.adaptive_hub import AdaptiveHub from trade_manager import TradeManager # Optional Components try: from backtest_engine import run_strategic_optimization_task BACKTEST_AVAILABLE = True except ImportError: BACKTEST_AVAILABLE = False logger.warning("โš ๏ธ Backtest Engine not found. Optimization disabled.") try: from periodic_tuner import ContinuousTuner except ImportError: ContinuousTuner = None logger.warning("โš ๏ธ Continuous Tuner not found. Adaptive tuning disabled.") except ImportError as e: logger.critical(f"โŒ [FATAL ERROR] Failed to import core modules: {e}") traceback.print_exc() sys.exit(1) # ------------------------------------------------------------------------------ # 3. Global Context & System State # ------------------------------------------------------------------------------ # Global Instances r2: R2Service = None data_manager: DataManager = None ml_processor: MLProcessor = None adaptive_hub: AdaptiveHub = None trade_manager: TradeManager = None whale_monitor: EnhancedWhaleMonitor = None news_fetcher: NewsFetcher = None senti_analyzer: SentimentIntensityAnalyzer = None class SystemState: """Central State Management for the Application""" def __init__(self): self.ready = False self.cycle_running = False self.training_running = False self.auto_pilot = True self.last_cycle_time: datetime = None self.last_cycle_error = None self.app_start_time = datetime.now() self.last_cycle_logs = "System Initializing..." self.training_status_msg = "Adaptive Mode: Active" self.scan_interval = 60 # Seconds between auto-scans def set_ready(self): self.ready = True self.last_cycle_logs = "โœ… System Ready. R2 Source of Truth Mode." logger.info("โœ… System State set to READY.") def set_cycle_start(self): self.cycle_running = True self.last_cycle_logs = "๐ŸŒ€ [Cycle START] Scanning Markets..." logger.info("๐ŸŒ€ Cycle STARTED.") def set_cycle_end(self, error=None, logs=None): self.cycle_running = False self.last_cycle_time = datetime.now() self.last_cycle_error = str(error) if error else None if logs: self.last_cycle_logs = logs elif error: self.last_cycle_logs = f"โŒ [Cycle ERROR] {error}" logger.error(f"Cycle Error: {error}") else: self.last_cycle_logs = f"โœ… [Cycle END] Finished successfully." logger.info("โœ… Cycle ENDED.") sys_state = SystemState() # ------------------------------------------------------------------------------ # 4. Helper Functions & Utilities # ------------------------------------------------------------------------------ def calculate_duration_str(timestamp_str): """Calculates formatted duration string from a timestamp.""" if not timestamp_str: return "--:--:--" try: if isinstance(timestamp_str, str): start_time = datetime.fromisoformat(timestamp_str) else: start_time = timestamp_str diff = datetime.now() - start_time total_seconds = int(diff.total_seconds()) days = total_seconds // 86400 hours = (total_seconds % 86400) // 3600 minutes = (total_seconds % 3600) // 60 seconds = total_seconds % 60 if days > 0: return f"{days}d {hours:02}:{minutes:02}:{seconds:02}" return f"{hours:02}:{minutes:02}:{seconds:02}" except: return "--:--:--" def format_pnl_split(profit, loss): """HTML formatted profit/loss split.""" return f"+${profit:,.2f} / -${abs(loss):,.2f}" # ------------------------------------------------------------------------------ # 5. Background Daemons (Auto-Pilot) # ------------------------------------------------------------------------------ async def auto_pilot_loop(): """Background task to manage automated trading cycles.""" logger.info("๐Ÿค– [Auto-Pilot] Daemon started.") while True: try: await asyncio.sleep(5) if not sys_state.ready: continue # Update Hub Status Periodically if adaptive_hub and int(time.time()) % 60 == 0: sys_state.training_status_msg = adaptive_hub.get_status() # Trade Manager Watchdog (Reduced freq) if trade_manager and len(trade_manager.open_positions) > 0: if not sys_state.cycle_running: sys_state.last_cycle_logs = trade_manager.latest_guardian_log continue # Trigger Auto Scan if sys_state.auto_pilot and not sys_state.cycle_running and not sys_state.training_running: # Check interval if sys_state.last_cycle_time: elapsed = (datetime.now() - sys_state.last_cycle_time).total_seconds() if elapsed < sys_state.scan_interval: continue logger.info("๐Ÿค– [Auto-Pilot] Triggering scheduled scan...") asyncio.create_task(run_unified_cycle()) await asyncio.sleep(5) except Exception as e: logger.error(f"โš ๏ธ [Auto-Pilot Error] {e}") await asyncio.sleep(30) # ------------------------------------------------------------------------------ # 6. Application Lifespan (Startup/Shutdown) # ------------------------------------------------------------------------------ @asynccontextmanager async def lifespan(app: FastAPI): global r2, data_manager, ml_processor, adaptive_hub, trade_manager, whale_monitor, news_fetcher, senti_analyzer, sys_state logger.info("\n๐Ÿš€ [System] Startup Sequence (Titan V70.6 - R2 Sync)...") try: # 1. Initialize R2 (Persistence) r2 = R2Service() # 2. Initialize Data Manager data_manager = DataManager(contracts_db={}, whale_monitor=None, r2_service=r2) await data_manager.initialize() await data_manager.load_contracts_from_r2() # 3. Initialize Whale Monitor whale_monitor = EnhancedWhaleMonitor(contracts_db=data_manager.get_contracts_db(), r2_service=r2) rpc_mgr = AdaptiveRpcManager(data_manager.http_client) whale_monitor.set_rpc_manager(rpc_mgr) data_manager.whale_monitor = whale_monitor # 4. Initialize External Data Feeds news_fetcher = NewsFetcher() senti_analyzer = SentimentIntensityAnalyzer() # 5. Initialize Adaptive Learning Hub adaptive_hub = AdaptiveHub(r2_service=r2) await adaptive_hub.initialize() # 6. Initialize ML Processor (PatternNet + Oracle + Sniper) ml_processor = MLProcessor(data_manager=data_manager) await ml_processor.initialize() # 7. Initialize Trade Manager trade_manager = TradeManager(r2_service=r2, data_manager=data_manager, processor=ml_processor) trade_manager.learning_hub = adaptive_hub # 8. Inject Continuous Tuner if available if ContinuousTuner: adaptive_hub.tuner = ContinuousTuner(adaptive_hub) logger.info("๐Ÿ”ง [Tuner] Continuous Tuner Injected.") # 9. Start Sentries await trade_manager.sync_internal_state_with_r2() await trade_manager.initialize_sentry_exchanges() await trade_manager.start_sentry_loops() # 10. Finalize Startup sys_state.set_ready() asyncio.create_task(auto_pilot_loop()) logger.info("โœ… [System READY] All modules operational.") yield except Exception as e: logger.critical(f"โŒ [FATAL STARTUP ERROR] {e}") traceback.print_exc() sys.exit(1) finally: # Shutdown Sequence sys_state.ready = False logger.info("๐Ÿ›‘ [System] Initiating Shutdown...") if trade_manager: await trade_manager.stop_sentry_loops() if data_manager: await data_manager.close() if whale_monitor and whale_monitor.rpc_manager: await whale_monitor.rpc_manager.close() logger.info("โœ… [System] Shutdown Complete.") # ------------------------------------------------------------------------------ # 7. Analysis Tasks (Data Fetcher for Layers) # ------------------------------------------------------------------------------ async def _fetch_l2_data_task(candidate_data: Dict[str, Any]) -> Dict[str, Any]: """ Fetches required 15m, 1h, 4h data for L2 (Pattern/Oracle/MC). Does NOT run the models, just prepares the data package. """ try: symbol = candidate_data['symbol'] required_tfs = ["15m", "1h", "4h"] # Concurrent Data Fetching data_tasks = [data_manager.get_latest_ohlcv(symbol, tf, limit=300) for tf in required_tfs] all_data = await asyncio.gather(*data_tasks) ohlcv_data = {} for tf, data in zip(required_tfs, all_data): if data and len(data) > 0: ohlcv_data[tf] = data # Validation: Ensure critical timeframes exist if '15m' not in ohlcv_data: return None current_price = await data_manager.get_latest_price_async(symbol) # Construct Raw Data Package raw_data = { 'symbol': symbol, 'ohlcv': ohlcv_data, 'current_price': current_price, 'timestamp': time.time(), 'dynamic_limits': candidate_data.get('dynamic_limits', {}), 'asset_regime': candidate_data.get('asset_regime', 'UNKNOWN'), 'strategy_tag': candidate_data.get('strategy_tag', 'NONE'), 'strategy_type': candidate_data.get('strategy_type', 'NORMAL'), 'l1_score': candidate_data.get('l1_sort_score', 0) } return raw_data except Exception: return None # ------------------------------------------------------------------------------ # 8. Unified Logic Cycle (The 5-Layer Pipeline) # ------------------------------------------------------------------------------ async def run_unified_cycle(): """ ๐Ÿ’Ž GEM-Architect: The 5-Layer Precision Pipeline """ log_buffer = StringIO() def log_and_print(message): logger.info(message) log_buffer.write(message + '\n') if sys_state.cycle_running or sys_state.training_running: return if not sys_state.ready: return sys_state.set_cycle_start() try: # Sync R2 State (Crucial) await trade_manager.sync_internal_state_with_r2() # Log Active Trades if len(trade_manager.open_positions) > 0: log_and_print(f"โ„น๏ธ [Cycle] Active Positions: {len(trade_manager.open_positions)}") for sym, tr in trade_manager.open_positions.items(): curr_p = await data_manager.get_latest_price_async(sym) entry_p = float(tr.get('entry_price', 0)) pnl = ((curr_p - entry_p) / entry_p) * 100 if entry_p > 0 else 0 log_and_print(f" ๐Ÿ”’ {sym}: {pnl:+.2f}%") # ============================================================================== # ๐ŸŒŠ LAYER 1: Market Breadth & Liquidity Filter # ============================================================================== log_and_print(f" [1/5] ๐ŸŒŠ Layer 1: Market Breadth & Liquidity Screen...") l1_candidates = await data_manager.layer1_rapid_screening(limit=200, adaptive_hub_ref=adaptive_hub) if not l1_candidates: log_and_print("โš ๏ธ [Layer 1] No candidates found (Market unsafe or flat).") sys_state.set_cycle_end(logs=log_buffer.getvalue()) return # ============================================================================== # ๐Ÿง  LAYER 2: Neural Analysis (Pattern + Oracle + MC) # ============================================================================== log_and_print(f" [2/5] ๐Ÿง  Layer 2: Neural Pattern Analysis ({len(l1_candidates)} coins)...") # Batch execute L2 async def process_l2(cand): raw_data = await _fetch_l2_data_task(cand) if not raw_data: return None # Processor now returns full dict even if rejected return await ml_processor.execute_layer2_analysis(raw_data) l2_tasks = [process_l2(c) for c in l1_candidates] l2_results = await asyncio.gather(*l2_tasks) processed_l2 = [res for res in l2_results if res is not None] processed_l2.sort(key=lambda x: x['l2_score'], reverse=True) # ๐Ÿ‘๏ธ X-RAY MODE: Show Top 10 regardless of status top_view = processed_l2[:10] log_and_print(f"๐Ÿ“Š [X-RAY] Top 10 Scanned Candidates (Pass & Fail):") l2_header = f"{'STS':<3} | {'SYMBOL':<9} | {'COMP':<5} | {'PATT':<5} | {'ORCL':<5} | {'MC':<4}" log_and_print("-" * 65) log_and_print(l2_header) for c in top_view: status = "โœ…" if c.get('is_valid', False) else "โŒ" log_and_print(f"{status:<3} | {c['symbol']:<9} | {c['l2_score']:>5.1f} | {c['pattern_score']*100:>5.1f} | {c['oracle_score']*100:>5.1f} | {c['mc_score']*100:>4.1f}") log_and_print("-" * 65) valid_l2 = [c for c in processed_l2 if c.get('is_valid', False)] if not valid_l2: log_and_print("โš ๏ธ [Layer 2] All candidates failed Pattern Gate.") log_and_print(" (Check X-Ray table above to see why scores are low)") sys_state.set_cycle_end(logs=log_buffer.getvalue()) return # ============================================================================== # ๐Ÿ“ก LAYER 3: External Intelligence (News + Whales) # ============================================================================== log_and_print(f"\n [3/5] ๐Ÿ“ก Layer 3: External Intelligence ({len(valid_l2)} candidates)...") final_l3_list = [] for cand in valid_l2: symbol = cand['symbol'] l2_score = cand['l2_score'] # Whale Analysis whale_bonus = 0.0 if whale_monitor: try: w_data = await whale_monitor.get_symbol_whale_activity(symbol, known_price=cand.get('current_price', 0)) if w_data and w_data.get('trading_signal', {}).get('action') == 'BUY': conf = float(w_data.get('trading_signal', {}).get('confidence', 0.5)) whale_bonus = SystemLimits.L3_WHALE_IMPACT_MAX * conf except: pass # News Analysis news_bonus = 0.0 if news_fetcher and senti_analyzer: try: n_data = await news_fetcher.get_news(symbol) summary = n_data.get('summary', '') if "No specific news" not in summary: sent = senti_analyzer.polarity_scores(summary) news_bonus = sent['compound'] * SystemLimits.L3_NEWS_IMPACT_MAX except: pass cand['whale_score'] = whale_bonus cand['news_score'] = news_bonus cand['final_total_score'] = l2_score + whale_bonus + news_bonus final_l3_list.append(cand) final_l3_list.sort(key=lambda x: x['final_total_score'], reverse=True) log_and_print(f" -> {len(final_l3_list)} passed to L3.") l3_header = f"{'SYMBOL':<9} | {'L2':<5} | {'WHALE':<6} | {'NEWS':<6} | {'TOTAL':<6}" log_and_print("-" * 60) log_and_print(l3_header) for c in final_l3_list[:5]: # Show top 5 survivors log_and_print(f"{c['symbol']:<9} | {c['l2_score']:>5.1f} | {c['whale_score']:>6.1f} | {c['news_score']:>6.1f} | {c['final_total_score']:>6.1f}") log_and_print("-" * 60) # ============================================================================== # ๐ŸŽฏ LAYER 4: Sniper (Micro-Structure) - Find The One # ============================================================================== log_and_print(f"\n [4/5] ๐ŸŽฏ Layer 4: Sniper Micro-Analysis...") best_candidate = None best_sniper_score = -1.0 for cand in final_l3_list[:5]: symbol = cand['symbol'] t1m = await data_manager.get_latest_ohlcv(symbol, '1m', 500) ob = await data_manager.get_order_book_snapshot(symbol) if not t1m or not ob: continue sniper_res = await ml_processor.execute_layer4_sniper(symbol, t1m, ob) s_sig = sniper_res.get('signal', 'WAIT') s_conf = sniper_res.get('confidence_prob', 0.0) s_reason = sniper_res.get('reason', 'N/A') log_and_print(f" ๐Ÿ”ญ {symbol:<6} -> Signal: {s_sig} | Conf: {s_conf:.2f} | {s_reason}") if s_sig == 'BUY' and s_conf > best_sniper_score: best_sniper_score = s_conf cand['sniper_entry_price'] = sniper_res.get('entry_price', 0) cand['sniper_score'] = s_conf best_candidate = cand if not best_candidate: log_and_print("๐Ÿ›‘ Layer 4: Sniper found no valid entry points.") sys_state.set_cycle_end(logs=log_buffer.getvalue()) return # ============================================================================== # ๐Ÿ›๏ธ LAYER 5: Governance & Execution # ============================================================================== log_and_print(f"\n [5/5] ๐Ÿ›๏ธ Layer 5: Sending {best_candidate['symbol']} to Governance...") tm_log_buffer = StringIO() with redirect_stdout(tm_log_buffer), redirect_stderr(tm_log_buffer): await trade_manager.select_and_execute_best_signal([best_candidate]) tm_logs = tm_log_buffer.getvalue() for line in tm_logs.splitlines(): if line.strip(): log_and_print(line.strip()) gc.collect() sys_state.set_cycle_end(logs=log_buffer.getvalue()) except Exception as e: logger.error(f"โŒ [Cycle ERROR] {e}") traceback.print_exc() sys_state.set_cycle_end(error=e, logs=log_buffer.getvalue()) # ------------------------------------------------------------------------------ # 9. UI Action Handlers # ------------------------------------------------------------------------------ async def trigger_training_cycle(): if adaptive_hub: return f"๐Ÿค– Adaptive System: {adaptive_hub.get_status()}" return "โš ๏ธ System not ready." async def trigger_strategic_backtest(): if not BACKTEST_AVAILABLE: return "โš ๏ธ Backtest Engine not found." if trade_manager and len(trade_manager.open_positions) > 0: return "โ›” Active trades exist." if sys_state.training_running: return "โš ๏ธ Running." async def _run_bg_task(): sys_state.training_running = True sys_state.training_status_msg = "๐Ÿงช Strategic Backtest Running..." try: logger.info("๐Ÿงช [Manual] Starting Strategic Backtest...") await run_strategic_optimization_task() if adaptive_hub: await adaptive_hub.initialize() logger.info("โœ… [Manual] Backtest Complete.") except Exception as e: logger.error(f"โŒ Backtest Failed: {e}") finally: sys_state.training_running = False sys_state.training_status_msg = adaptive_hub.get_status() if adaptive_hub else "Ready" asyncio.create_task(_run_bg_task()) return "๐Ÿงช Strategic Backtest Started." async def manual_close_current_trade(): # Sync first to be sure await trade_manager.sync_internal_state_with_r2() if not trade_manager.open_positions: return "โš ๏ธ No trade found in R2." symbol = list(trade_manager.open_positions.keys())[0] await trade_manager.force_exit_by_manager(symbol, reason="MANUAL_UI") return f"โœ… Closed {symbol}." async def reset_history_handler(): if trade_manager.open_positions: return "โš ๏ธ Close active trades first." current_state = await r2.get_portfolio_state_async() preserved_capital = current_state.get('current_capital_usd', INITIAL_CAPITAL) await r2.reset_all_stats_async() if trade_manager and trade_manager.smart_portfolio: sp = trade_manager.smart_portfolio sp.state["current_capital"] = preserved_capital sp.state["session_start_balance"] = preserved_capital sp.state["allocated_capital_usd"] = 0.0 sp.state["daily_net_pnl"] = 0.0 sp.state["is_trading_halted"] = False await sp._save_state_to_r2() return f"โœ… History Cleared. Capital Preserved at ${preserved_capital:.2f}" async def reset_capital_handler(): if trade_manager.open_positions: return "โš ๏ธ Close active trades first." if trade_manager and trade_manager.smart_portfolio: sp = trade_manager.smart_portfolio sp.state["current_capital"] = INITIAL_CAPITAL sp.state["session_start_balance"] = INITIAL_CAPITAL sp.state["allocated_capital_usd"] = 0.0 sp.state["daily_net_pnl"] = 0.0 sp.state["is_trading_halted"] = False await sp._save_state_to_r2() return f"โœ… Capital Reset to ${INITIAL_CAPITAL} (History Kept)" async def reset_diagnostics_handler(): await r2.reset_diagnostic_stats_async() return "โœ… Diagnostic Matrix Reset." async def reset_guardians_handler(): await r2.reset_guardian_stats_async() if trade_manager: trade_manager.ai_stats = await r2.get_guardian_stats_async() return "โœ… Guardian Stats Reset." async def toggle_auto_pilot(enable): sys_state.auto_pilot = enable return f"Auto-Pilot: {enable}" async def run_cycle_from_gradio(): if sys_state.cycle_running: return "Busy." asyncio.create_task(run_unified_cycle()) return "๐Ÿš€ Launched." # ------------------------------------------------------------------------------ # 10. UI Data Fetching Logic (Status & Charts) # ------------------------------------------------------------------------------ async def check_live_pnl_and_status(selected_view="Dual-Core (Hybrid)"): """ Poller function to update Gradio UI every few seconds. NOW PURELY R2-BASED TO PREVENT SPLIT-BRAIN. """ empty_chart = go.Figure() empty_chart.update_layout(template="plotly_dark", paper_bgcolor="#0b0f19", plot_bgcolor="#0b0f19", xaxis={'visible':False}, yaxis={'visible':False}) wl_df_empty = pd.DataFrame(columns=["Coin", "Score"]) diag_df_empty = pd.DataFrame(columns=["Model", "Wins", "Losses", "PnL (USD)"]) type_df_empty = pd.DataFrame(columns=["Coin Type", "Wins", "Losses", "Profitability"]) if not sys_state.ready: return "Initializing...", "...", empty_chart, "0.0", "0.0", "0.0", "0.0", "0.0%", wl_df_empty, diag_df_empty, type_df_empty, "Loading...", "Loading...", "Loading..." try: # ๐Ÿ”ฅ DIRECT R2 FETCH (Source of Truth) portfolio = await r2.get_portfolio_state_async() open_trades_raw = await r2.get_open_trades_async() # Parse Trades active_trades_dict = {} if isinstance(open_trades_raw, list): for t in open_trades_raw: active_trades_dict[t['symbol']] = t elif isinstance(open_trades_raw, dict): active_trades_dict = open_trades_raw equity = portfolio.get('current_capital_usd', INITIAL_CAPITAL) daily_pnl = portfolio.get('daily_net_pnl', 0.0) allocated = portfolio.get('allocated_capital_usd', 0.0) is_halted = portfolio.get('is_trading_halted', False) free_cap = max(0.0, equity - allocated) symbol = None; entry_p = 0.0; tp_p = 0.0; sl_p = 0.0; curr_p = 0.0; pnl_pct = 0.0; pnl_val_unrealized = 0.0 active_trade_info = "" trade_dur_str = "--:--:--" # --- Active Trade Details --- if active_trades_dict: symbol = list(active_trades_dict.keys())[0] trade = active_trades_dict[symbol] entry_p = float(trade.get('entry_price', 0.0)) tp_p = float(trade.get('tp_price', 0.0)) sl_p = float(trade.get('sl_price', 0.0)) trade_dur_str = calculate_duration_str(trade.get('entry_time')) decision_data = trade.get('decision_data', {}) gov_grade = decision_data.get('governance_grade', 'N/A') gov_score = decision_data.get('governance_score', 0.0) oracle_score = decision_data.get('oracle_score', 0.0) curr_p = await data_manager.get_latest_price_async(symbol) if curr_p > 0 and entry_p > 0: pnl_pct = ((curr_p - entry_p) / entry_p) * 100 size = float(trade.get('entry_capital', 0.0)) pnl_val_unrealized = size * (pnl_pct / 100) grade_color = "#ccc" if gov_grade == "ULTRA": grade_color = "#ff00ff" elif gov_grade == "STRONG": grade_color = "#00ff00" elif gov_grade == "NORMAL": grade_color = "#00e5ff" active_trade_info = f"""
โฑ๏ธ Time: {trade_dur_str}
๐Ÿ›๏ธ Grade: {gov_grade} ({gov_score:.1f})
๐Ÿ”ฎ Oracle: {oracle_score*100:+.1f}%
""" virtual_equity = equity + pnl_val_unrealized active_pnl_color = "#00ff00" if pnl_val_unrealized >= 0 else "#ff0000" total_t = portfolio.get('total_trades', 0) wins = portfolio.get('winning_trades', 0) losses = portfolio.get('losing_trades', 0) if losses == 0 and total_t > 0: losses = total_t - wins tot_prof = portfolio.get('total_profit_usd', 0.0) tot_loss = portfolio.get('total_loss_usd', 0.0) net_prof = tot_prof - tot_loss win_rate = (wins / total_t * 100) if total_t > 0 else 0.0 halt_status = "HALTED" if is_halted else "ACTIVE" # --- Wallet Widget --- wallet_md = f"""

๐Ÿ’ผ Institutional Portfolio

${virtual_equity:,.2f}
({pnl_val_unrealized:+,.2f} USD)
Allocated:${allocated:.2f}
Free Cap:${free_cap:.2f}
Daily PnL:${daily_pnl:+.2f}

๐Ÿ›ก๏ธ Status: {halt_status}
{active_trade_info}
""" # --- Guardian Stats --- key_map = { "Dual-Core (Hybrid)": "hybrid", "Hydra: Crash (Panic)": "crash", "Hydra: Giveback (Profit)": "giveback", "Hydra: Stagnation (Time)": "stagnation" } target_key = key_map.get(selected_view, "hybrid") # Fetch stats from R2 (not memory) stats_file = await r2.get_guardian_stats_async() stats_data = stats_file.get(target_key, {"total":0, "good":0, "saved":0.0, "missed":0.0}) tot_ds = stats_data['total'] ds_acc = (stats_data['good'] / tot_ds * 100) if tot_ds > 0 else 0.0 history_md = f"""

๐Ÿ“Š Performance

Trades:{total_t}
Win Rate:{win_rate:.1f}%
Wins:{wins} (+${tot_prof:,.2f})
Losses:{losses} (-${tot_loss:,.2f})
Net:${net_prof:,.2f}

๐Ÿ›ก๏ธ Guard IQ ({target_key})

Interventions:{tot_ds}
Accuracy:{ds_acc:.1f}%
Saved:${stats_data['saved']:.2f}
Missed:${stats_data['missed']:.2f}
""" # --- Neural Status --- status_msg = "Inactive" if adaptive_hub: status_msg = adaptive_hub.get_status() neural_md = f"""

๐Ÿง  Micro-Tuner

โšก Status: Active
๐Ÿ”ง Logic: {status_msg}
""" # --- DataFrames --- diag_data = await r2.get_diagnostic_stats_async() diag_list = [] ordered_models = ["Pattern", "Oracle", "Sniper", "MonteCarlo_L", "MonteCarlo_A", "News", "Governance"] for m in ordered_models: stats = diag_data.get(m, {"wins": 0, "losses": 0, "pnl": 0.0, "profit_accum": 0.0, "loss_accum": 0.0}) profit_accum = stats.get('profit_accum', 0.0) loss_accum = stats.get('loss_accum', 0.0) if profit_accum == 0.0 and loss_accum == 0.0 and stats['pnl'] != 0.0: if stats['pnl'] > 0: profit_accum = stats['pnl'] else: loss_accum = abs(stats['pnl']) pnl_str = format_pnl_split(profit_accum, loss_accum) diag_list.append([m, stats['wins'], stats['losses'], pnl_str]) diag_df = pd.DataFrame(diag_list, columns=["Model", "Wins", "Losses", "PnL (USD)"]) type_stats_list = [] if trade_manager and hasattr(trade_manager, 'type_stats'): for t_name, t_data in trade_manager.type_stats.items(): name_clean = t_name.replace("_", " ") wins = t_data.get('wins', 0) losses = t_data.get('losses', 0) prof = t_data.get('profit_usd', 0.0) loss_val = t_data.get('loss_usd', 0.0) profitability_html = format_pnl_split(prof, loss_val) type_stats_list.append([name_clean, wins, losses, profitability_html]) type_df = pd.DataFrame(type_stats_list, columns=["Coin Type", "Wins", "Losses", "Profitability"]) wl_data = [[k, f"{v.get('final_total_score',0):.2f}"] for k, v in trade_manager.watchlist.items()] wl_df = pd.DataFrame(wl_data, columns=["Coin", "Score"]) status_txt = sys_state.last_cycle_logs status_line = f"Cycle: {'RUNNING' if sys_state.cycle_running else 'IDLE'} | Auto-Pilot: {'ON' if sys_state.auto_pilot else 'OFF'}" # --- Charting --- fig = empty_chart if symbol and curr_p > 0: ohlcv = await data_manager.get_latest_ohlcv(symbol, '5m', 120) if ohlcv: df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms') fig = go.Figure(data=[go.Candlestick( x=df['datetime'], open=df['open'], high=df['high'], low=df['low'], close=df['close'], increasing_line_color='#00ff00', decreasing_line_color='#ff0000', name=symbol )]) if entry_p > 0: fig.add_hline(y=entry_p, line_dash="dash", line_color="white", annotation_text="ENTRY", annotation_position="top left") if tp_p > 0: fig.add_hline(y=tp_p, line_color="#00ff00", line_width=2, annotation_text="TP", annotation_position="top left") if sl_p > 0: fig.add_hline(y=sl_p, line_color="#ff0000", line_width=2, annotation_text="SL", annotation_position="bottom left") fig.update_layout( template="plotly_dark", paper_bgcolor="#0b0f19", plot_bgcolor="#0b0f19", margin=dict(l=0, r=40, t=30, b=0), height=400, xaxis_rangeslider_visible=False, title=dict(text=f"{symbol} (Long) | PnL: {pnl_pct:+.2f}%", font=dict(color="white")) ) train_status = sys_state.training_status_msg if sys_state.training_running: train_status = "๐Ÿงช Backtest Running..." return (status_txt, status_line, fig, f"{curr_p:.6f}", f"{entry_p:.6f}", f"{tp_p:.6f}", f"{sl_p:.6f}", f"{pnl_pct:+.2f}%", wl_df, diag_df, type_df, wallet_md, history_md, neural_md) except Exception: traceback.print_exc() return "Error", "Error", empty_chart, "0", "0", "0", "0", "0%", wl_df_empty, diag_df_empty, type_df_empty, "Err", "Err", "Err" # ------------------------------------------------------------------------------ # 11. Gradio Layout & Assembly # ------------------------------------------------------------------------------ def create_gradio_ui(): custom_css = ".gradio-container {background:#0b0f19} .dataframe {background:#1a1a1a!important} .html-box {min-height:180px}" with gr.Blocks(title="Titan V70.6 (R2-Sync)", css=custom_css) as demo: gr.Markdown("# ๐Ÿš€ Titan V70.6 (Neural Core + R2 Sync)") with gr.Row(): # LEFT: Chart & Controls with gr.Column(scale=3): live_chart = gr.Plot(label="Chart") with gr.Row(): t_price = gr.Textbox(label="Price", interactive=False) t_pnl = gr.Textbox(label="PnL %", interactive=False) with gr.Row(): t_entry = gr.Textbox(label="Entry", interactive=False) t_tp = gr.Textbox(label="TP", interactive=False) t_sl = gr.Textbox(label="SL", interactive=False) # RIGHT: Stats & Panels with gr.Column(scale=1): wallet_out = gr.HTML(label="Smart Wallet", elem_classes="html-box") neural_out = gr.HTML(label="Neural Cycles", elem_classes="html-box") # Type Stats Table gr.Markdown("### ๐Ÿ’Ž Opportunity Types") type_stats_out = gr.Dataframe( headers=["Coin Type", "Wins", "Losses", "Profitability"], datatype=["str", "number", "number", "html"], interactive=False, label="Type Performance" ) gr.Markdown("### ๐Ÿ•ต๏ธ Diagnostic Matrix") diagnostic_out = gr.Dataframe( headers=["Model", "Wins", "Losses", "PnL (USD)"], datatype=["str", "number", "number", "html"], interactive=False, label="Model Performance" ) with gr.Row(): btn_reset_diag = gr.Button("๐Ÿงน Reset Matrix", size="sm", variant="secondary") btn_reset_guard = gr.Button("๐Ÿ›ก๏ธ Reset Guardians", size="sm", variant="secondary") gr.HTML("
") stats_dd = gr.Dropdown([ "Dual-Core (Hybrid)", "Hydra: Crash (Panic)", "Hydra: Giveback (Profit)", "Hydra: Stagnation (Time)" ], value="Dual-Core (Hybrid)", label="View Guard Stats") history_out = gr.HTML(label="Stats", elem_classes="html-box") watchlist_out = gr.DataFrame(label="Watchlist") gr.HTML("
") # BOTTOM: Action Bar & Logs with gr.Row(): with gr.Column(scale=1): auto_pilot = gr.Checkbox(label="โœˆ๏ธ Auto-Pilot", value=True) with gr.Row(): btn_run = gr.Button("๐Ÿš€ Scan", variant="primary") btn_close = gr.Button("๐Ÿšจ Close", variant="stop") with gr.Row(): btn_train = gr.Button("๐Ÿค– Status", variant="secondary") btn_backtest = gr.Button("๐Ÿงช Run Strategic Backtest", variant="secondary") with gr.Row(): btn_history_reset = gr.Button("๐Ÿ—‘๏ธ Clear History", variant="secondary") btn_cap_reset = gr.Button("๐Ÿ’ฐ Reset Capital", variant="secondary") status = gr.Markdown("Init...") alert = gr.Textbox(label="Alerts", interactive=False) with gr.Column(scale=3): logs = gr.Textbox(label="Logs", lines=14, autoscroll=True, elem_classes="log-box", type="text") gr.HTML("") # Events btn_run.click(fn=run_cycle_from_gradio, outputs=alert) btn_close.click(fn=manual_close_current_trade, outputs=alert) btn_history_reset.click(fn=reset_history_handler, outputs=alert) btn_cap_reset.click(fn=reset_capital_handler, outputs=alert) btn_train.click(fn=trigger_training_cycle, outputs=alert) btn_backtest.click(fn=trigger_strategic_backtest, outputs=alert) auto_pilot.change(fn=toggle_auto_pilot, inputs=auto_pilot, outputs=alert) btn_reset_diag.click(fn=reset_diagnostics_handler, outputs=alert) btn_reset_guard.click(fn=reset_guardians_handler, outputs=alert) # Periodic Refresh gr.Timer(3).tick( fn=check_live_pnl_and_status, inputs=stats_dd, outputs=[logs, status, live_chart, t_price, t_entry, t_tp, t_sl, t_pnl, watchlist_out, diagnostic_out, type_stats_out, wallet_out, history_out, neural_out] ) return demo # ------------------------------------------------------------------------------ # 12. Execution Entry Point # ------------------------------------------------------------------------------ fast_api_server = FastAPI(lifespan=lifespan) gradio_dashboard = create_gradio_ui() app = gr.mount_gradio_app(app=fast_api_server, blocks=gradio_dashboard, path="/") if __name__ == "__main__": import uvicorn # 0.0.0.0 is crucial for Docker/Colab environments uvicorn.run(app, host="0.0.0.0", port=7860)