""" Main Gradio interface for trading analysis platform. This module provides the web UI for interacting with the technical analysis workflow. """ import json import logging import os import time import traceback from datetime import datetime, timedelta from pathlib import Path from typing import Optional, Tuple import gradio as gr # Configure logger logger = logging.getLogger(__name__) from config.default_config import DEFAULT_CONFIG, merge_config from config.models import AnalysisPhase, ChartType, InvestmentStyle, PhaseConfiguration from graph.workflows.conditional_workflow import ConditionalComprehensiveWorkflow from utils.charts.valuation_dashboard import ValuationDashboardGenerator from utils.errors import TradingAnalysisError, format_exception_for_user from web.components.agent_provider_matrix import ( apply_routing_preset, create_agent_provider_matrix, export_routing_config, format_routing_config_status, get_agent_routing_config, import_routing_config, ) from web.components.budget_alerts import ( create_budget_configuration, create_budget_status_display, ) from web.components.chart_viewer import create_chart_viewer, display_chart from web.components.cost_dashboard import format_cost_summary_markdown from web.components.dashboard_grid import DashboardComponent from web.components.investment_style_selector import create_investment_style_selector from web.components.phase_configuration import create_phase_configuration from web.components.phase_report_formatter import ( format_phase_organized_report, format_phase_report_details, ) from web.components.report_viewer import ( create_report_viewer, format_error_report, format_progress_message, ) from web.components.ticker_input import ( create_ticker_examples, create_ticker_input, validate_ticker, ) from web.components.timeframe_selector import ( create_timeframe_selector, validate_timeframe, ) from web.config.api_keys import validate_configuration class TradingInterface: """Main Gradio interface for trading analysis.""" def __init__(self, config: Optional[dict] = None): """ Initialize trading interface. Args: config: Optional configuration override """ self.config = config or DEFAULT_CONFIG self.current_config = self.config.copy() # Mutable current config # Log provider auto-detection provider = self.current_config.get("llm_provider", "openai") logger.info( f"šŸŽÆ TradingInterface initialized with auto-detected provider: {provider}" ) self.conditional_workflow = ConditionalComprehensiveWorkflow( config=self.current_config ) # Valuation dashboard generator (Feature 004) self.dashboard_generator = ValuationDashboardGenerator() # US3: Analysis result caching for multiple timeframe support self.analysis_cache = {} # Format: {cache_key: {"result": dict, "timestamp": float, "metadata": dict}} self.max_cache_size = 50 # Limit cache to 50 entries per session # US3: Report history storage (last N analyses per session) self.report_history = [] # Format: [{"timestamp": str, "ticker": str, "timeframe": str, "report": dict}, ...] self.max_history_size = 10 # Keep last 10 reports per session self.app = self._build_interface() def _extract_phase_reports(self, final_state: dict) -> Tuple[str, str, str, str]: """Extract phase-level reports from workflow state. Args: final_state: Final workflow state Returns: Tuple of (fundamental_report, sentiment_report, research_report, risk_report) """ phase_outputs = final_state.get("phase_outputs", {}) # Extract fundamental phase report fundamental_phase = phase_outputs.get("fundamental") if fundamental_phase and hasattr(fundamental_phase, "agents"): fundamental_parts = [] for agent in fundamental_phase.agents: fundamental_parts.append( f"## {agent.agent_name.replace('_', ' ').title()}\n\n" ) fundamental_parts.append(agent.report) if agent.educational_notes: fundamental_parts.append( f"\n\n### šŸ“š Educational Notes\n\n{agent.educational_notes}" ) fundamental_report = ( "".join(fundamental_parts) if fundamental_parts else "*No fundamental analysis available*" ) else: fundamental_report = "*Fundamental phase not run*" # Extract sentiment phase report sentiment_phase = phase_outputs.get("sentiment") if sentiment_phase and hasattr(sentiment_phase, "agents"): sentiment_parts = [] for agent in sentiment_phase.agents: sentiment_parts.append( f"## {agent.agent_name.replace('_', ' ').title()}\n\n" ) sentiment_parts.append(agent.report) if agent.educational_notes: sentiment_parts.append( f"\n\n### šŸ“š Educational Notes\n\n{agent.educational_notes}" ) sentiment_report = ( "".join(sentiment_parts) if sentiment_parts else "*No sentiment analysis available*" ) else: sentiment_report = "*Sentiment phase not run*" # Extract research synthesis phase report research_phase = phase_outputs.get("research_synthesis") if research_phase and hasattr(research_phase, "agents"): research_parts = [] for agent in research_phase.agents: research_parts.append( f"## {agent.agent_name.replace('_', ' ').title()}\n\n" ) research_parts.append(agent.report) if agent.educational_notes: research_parts.append( f"\n\n### šŸ“š Educational Notes\n\n{agent.educational_notes}" ) research_report = ( "".join(research_parts) if research_parts else "*No research synthesis available*" ) else: research_report = "*Research synthesis phase not run*" # Extract risk phase report risk_phase = phase_outputs.get("risk") if risk_phase and hasattr(risk_phase, "agents"): risk_parts = [] for agent in risk_phase.agents: risk_parts.append( f"## {agent.agent_name.replace('_', ' ').title()}\n\n" ) risk_parts.append(agent.report) if agent.educational_notes: risk_parts.append( f"\n\n### šŸ“š Educational Notes\n\n{agent.educational_notes}" ) risk_report = ( "".join(risk_parts) if risk_parts else "*No risk analysis available*" ) else: risk_report = "*Risk phase not run*" return fundamental_report, sentiment_report, research_report, risk_report def _extract_indicator_chart_paths( self, final_state: dict ) -> Tuple[Optional[str], Optional[str], Optional[str]]: """Extract indicator chart paths from indicator agent metadata. Args: final_state: Final workflow state Returns: Tuple of (rsi_chart_path, macd_chart_path, stoch_chart_path) """ phase_outputs = final_state.get("phase_outputs", {}) technical_phase = phase_outputs.get("technical") if not technical_phase or not hasattr(technical_phase, "agents"): return None, None, None # Find indicator agent for agent in technical_phase.agents: if agent.agent_name == "indicator_agent": # Get chart paths from agent metadata (stored by workflow) if hasattr(agent, "metadata") and isinstance(agent.metadata, dict): chart_paths = agent.metadata.get("chart_paths", []) else: chart_paths = [] # Identify charts by filename (order is not guaranteed) rsi_path = None macd_path = None stoch_path = None for path in chart_paths: if path: if "_rsi_" in path: rsi_path = path elif "_macd_" in path: macd_path = path elif "_stochastic_" in path or "_stoch_" in path: stoch_path = path return rsi_path, macd_path, stoch_path return None, None, None def _extract_agent_outputs(self, final_state: dict) -> Tuple[str, str, str, str]: """Extract individual agent outputs from workflow state (without embedded charts). Args: final_state: Final workflow state Returns: Tuple of (decision_output, indicator_output, pattern_output, trend_output) """ # Get phase outputs phase_outputs = final_state.get("phase_outputs", {}) technical_phase = phase_outputs.get("technical") if not technical_phase: empty_msg = "*No output available - technical phase not run*" return empty_msg, empty_msg, empty_msg, empty_msg # Extract individual agent reports WITHOUT embedded charts decision_output = "*No decision agent output*" indicator_output = "*No indicator agent output*" pattern_output = "*No pattern agent output*" trend_output = "*No trend agent output*" for agent in technical_phase.agents: # Build output with just report text output_parts = [agent.report] # Add educational notes if available if agent.educational_notes: output_parts.append( f"\n\n---\n\n### šŸ“š Educational Notes\n\n{agent.educational_notes}\n" ) # NOTE: Charts are now displayed separately in the UI layout # No longer embedding charts inline full_output = "".join(output_parts) # Assign to appropriate agent if agent.agent_name == "decision_agent": decision_output = full_output elif agent.agent_name == "indicator_agent": indicator_output = full_output elif agent.agent_name == "pattern_agent": pattern_output = full_output elif agent.agent_name == "trend_agent": trend_output = full_output return decision_output, indicator_output, pattern_output, trend_output def _generate_summary(self, final_state: dict, ticker: str) -> str: """Generate executive summary from all analysis phases with portfolio manager decision.""" phase_outputs = final_state.get("phase_outputs", {}) lines = [f"# Executive Summary: {ticker.upper()}\n"] for phase_name, title, emoji in [ ("technical", "Technical Analysis", "šŸ”§"), ("fundamental", "Fundamental Analysis", "šŸ’¼"), ("sentiment", "Sentiment Analysis", "šŸ“°"), ("research_synthesis", "Research Synthesis", "šŸ”¬"), ("risk", "Risk Assessment", "āš ļø"), ]: phase = phase_outputs.get(phase_name) if not phase: continue score = phase.score if hasattr(phase, "score") else None lines.append(f"\n## {emoji} {title}") if score: indicator = "🟢" if score >= 7 else "šŸ”“" if score <= 3 else "🟔" lines.append(f"**Signal**: {indicator} {score:.1f}/10\n") # Get first agent's key insight (extract summary section) agents = phase.agents if hasattr(phase, "agents") else [] for agent in agents[:1]: # Just first agent report = agent.report if hasattr(agent, "report") else "" report_lines = report.split("\n") # Look for Summary section and extract first 3 bullet/numbered points in_summary = False summary_items = [] for i, line in enumerate(report_lines): stripped = line.strip() # Detect summary section if "## Summary" in stripped or "## Key Takeaways" in stripped: in_summary = True continue # If we're in summary, collect items if in_summary: # Stop at next section header if stripped.startswith("##"): break # Collect numbered or bullet items if ( stripped.startswith(("1.", "2.", "3.", "-", "*", "•")) and len(stripped) > 20 ): summary_items.append(stripped) if len(summary_items) >= 3: break # If we found summary items, use them if summary_items: lines.append("\n**Key Points:**\n") for item in summary_items: # Normalize bullet/number format if item[0].isdigit(): lines.append(f"- {item.split('.', 1)[1].strip()}\n") else: lines.append(f"{item}\n") else: # Fallback: Find first substantial narrative paragraph for line in report_lines: stripped = line.strip() if ( stripped and not stripped.startswith("#") and not stripped.startswith("|") and not stripped.startswith(("1.", "2.", "3.", "-", "*")) and len(stripped) > 100 ): lines.append(f"\n{stripped}\n") break # Add portfolio manager decision if available (from decision phase) decision_phase = phase_outputs.get("decision") if decision_phase and hasattr(decision_phase, "agents"): for agent in decision_phase.agents: if agent.agent_name == "portfolio_manager": lines.append("\n---\n") lines.append(f"\n## šŸŽÆ Final Trading Recommendation\n\n") lines.append(agent.report) break lines.append("\n---\n*View detailed phase tabs for complete analysis*") return "\n".join(lines) def _generate_cache_key( self, ticker: str, timeframe: str, enabled_phases: list, llm_provider: str = "openai", ) -> str: """Generate unique cache key for analysis results (US3). Includes provider to ensure different LLMs generate separate cache entries. """ import hashlib phases_str = ",".join( sorted([p.value if hasattr(p, "value") else str(p) for p in enabled_phases]) ) phases_hash = hashlib.md5(phases_str.encode()).hexdigest()[:8] return f"{ticker.upper()}_{timeframe}_{llm_provider}_{phases_hash}" def _get_cached_analysis(self, cache_key: str) -> Optional[dict]: """Retrieve cached analysis result if available (US3).""" if cache_key in self.analysis_cache: cached = self.analysis_cache[cache_key] logger.info(f"Cache hit for key: {cache_key}") return cached["result"] return None def _cache_analysis_result(self, cache_key: str, result: dict, metadata: dict): """Store analysis result in cache with LRU eviction (US3).""" if len(self.analysis_cache) >= self.max_cache_size: # Remove oldest entry oldest_key = min( self.analysis_cache.keys(), key=lambda k: self.analysis_cache[k]["timestamp"], ) del self.analysis_cache[oldest_key] logger.info(f"Cache eviction: removed {oldest_key}") self.analysis_cache[cache_key] = { "result": result, "timestamp": time.time(), "metadata": metadata, } logger.info(f"Cached analysis result for key: {cache_key}") def _add_to_report_history(self, ticker: str, timeframe: str, report: dict): """Add completed analysis to report history (US3).""" import datetime history_entry = { "timestamp": datetime.datetime.now().isoformat(), "ticker": ticker.upper(), "timeframe": timeframe, "report": report, "from_cache": report.get("from_cache", False), } # Add to beginning (most recent first) self.report_history.insert(0, history_entry) # Trim history if needed if len(self.report_history) > self.max_history_size: self.report_history = self.report_history[: self.max_history_size] logger.info( f"Added to history: {ticker} {timeframe} (Total: {len(self.report_history)})" ) def _build_interface(self) -> gr.Blocks: """ Build Gradio interface. Returns: Gradio Blocks app """ with gr.Blocks( title="Multi-Agent Trading Analysis Platform", ) as app: # Header gr.Markdown(""" # šŸ¤– Multi-Agent Trading Analysis Platform Comprehensive stock analysis powered by specialized AI agents for technical, fundamental, sentiment, and risk assessment. """) with gr.Row(): with gr.Column(scale=1): # Input Section gr.Markdown("## šŸ“ Analysis Settings") ticker_input = create_ticker_input() # Investment Style Selector gr.Markdown("### šŸ’¼ Investment Style") style_radio, style_info = create_investment_style_selector() # Phase Configuration gr.Markdown("### šŸ”§ Configure Analysis Phases") ( preset_dropdown, phase_checkboxes, educational_mode_checkbox, validation_output, estimated_time, ) = create_phase_configuration() analyze_button = gr.Button( "šŸš€ Analyze", variant="primary", size="lg", ) # Progress indicator status_output = gr.Textbox( label="Status", value="Ready to analyze", interactive=False, lines=2, ) # Hidden query input (for future feature compatibility) query_input = gr.Textbox(visible=False, value="") # Advanced Settings (collapsible) with gr.Accordion("āš™ļø Advanced Settings", open=False): # Timeframe Customization gr.Markdown("**Timeframe Override**") gr.Markdown( "By default, timeframe is set based on investment style. Enable to customize." ) timeframe_selector = create_timeframe_selector() use_custom_timeframe = gr.Checkbox( label="Use custom timeframe (otherwise use investment style default)", value=False, ) gr.Markdown("---") gr.Markdown("**Indicator Parameters**") # RSI Settings rsi_period = gr.Slider( minimum=2, maximum=100, value=14, step=1, label="RSI Period", info="Default: 14. Higher values = smoother, slower signals", ) # MACD Settings gr.Markdown("**MACD Parameters**") with gr.Row(): macd_fast = gr.Number( value=12, label="Fast Period", minimum=2, maximum=50, step=1, ) macd_slow = gr.Number( value=26, label="Slow Period", minimum=2, maximum=100, step=1, ) macd_signal = gr.Number( value=9, label="Signal Period", minimum=2, maximum=50, step=1, ) # Stochastic Settings gr.Markdown("**Stochastic Parameters**") with gr.Row(): stoch_k = gr.Number( value=14, label="K Period", minimum=2, maximum=50, step=1, ) stoch_d = gr.Number( value=3, label="D Period", minimum=2, maximum=20, step=1, ) gr.Markdown("---") gr.Markdown("### Data Providers") # Data Provider Selection ohlc_provider = gr.Dropdown( choices=["yfinance", "alpha_vantage"], value="yfinance", label="OHLC Data Provider", info="Primary source for price data", ) fundamentals_provider = gr.Dropdown( choices=["alpha_vantage", "yfinance"], value="alpha_vantage", label="Fundamentals Provider", info="Source for company financials", ) gr.Markdown("---") gr.Markdown("### LLM Models") llm_provider = gr.Dropdown( choices=["openai", "anthropic", "huggingface", "qwen"], value="huggingface", label="LLM Provider", info="AI model provider for analysis (HuggingFace = Inference Providers with routing)", ) # Routing policy selector (HuggingFace only) routing_policy = gr.Dropdown( choices=[ ("Auto (default)", "auto"), ("Fastest Response", ":fastest"), ("Cheapest Cost", ":cheapest"), ("Groq", "groq"), ("Together AI", "together"), ("Replicate", "replicate"), ("Cerebras", "cerebras"), ("Fireworks", "fireworks"), ("DeepInfra", "deepinfra"), ("Llama 3.3 70B", "meta-llama/Llama-3.3-70B-Instruct"), ], value="meta-llama/Llama-3.3-70B-Instruct", label="HuggingFace Routing Policy", info="Select routing strategy or specific provider (only applies when HuggingFace is selected)", visible=True, # Will be controlled by llm_provider selection ) # Provider status display provider_status = gr.Textbox( label="Current Provider Configuration", value="āœ“ Provider: HuggingFace | Routing: Llama-3.3-70B-Instruct", interactive=False, elem_id="provider_status_display", ) # Budget configuration ( budget_limit, threshold_75, threshold_90, require_confirmation, ) = create_budget_configuration() budget_status = create_budget_status_display() # Agent routing configuration matrix agent_components = create_agent_provider_matrix() # Routing presets with gr.Accordion("⚔ Quick Presets", open=False): gr.Markdown( "Apply pre-configured routing strategies to all agents" ) with gr.Row(): cost_preset_btn = gr.Button( "šŸ’° Cost Optimized", variant="secondary", size="sm" ) perf_preset_btn = gr.Button( "šŸš€ Performance Optimized", variant="secondary", size="sm", ) balanced_preset_btn = gr.Button( "āš–ļø Balanced", variant="secondary", size="sm" ) reset_preset_btn = gr.Button( "šŸ”„ Reset to Default", variant="secondary", size="sm", ) # Config import/export with gr.Accordion("šŸ’¾ Import/Export Configuration", open=False): config_json = gr.Textbox( label="Configuration JSON", placeholder='{"indicator_agent": {"routing_policy": ":cheapest", "model_tier": "fast"}, ...}', lines=5, ) with gr.Row(): export_btn = gr.Button("šŸ“¤ Export Config", size="sm") import_btn = gr.Button("šŸ“„ Import Config", size="sm") import_status = gr.Textbox( label="Import/Export Status", value="", interactive=False, visible=False, ) # Store config state config_state = gr.State({}) apply_config_btn = gr.Button( "šŸ’¾ Apply Configuration", variant="primary" ) config_status = gr.Textbox( label="Configuration Status", value="Using default configuration", interactive=False, ) with gr.Column(scale=2): # Output Section gr.Markdown("## šŸ“Š Analysis Results") # Tabs for analysis results with gr.Tabs(): with gr.Tab("šŸ“‹ Summary"): gr.Markdown("Executive summary of all analysis phases") summary_output = gr.Markdown( "*Run analysis to see summary*" ) with gr.Tab("šŸ’¹ Valuation Metrics"): gr.Markdown("### Fundamental valuation metrics over time") with gr.Row(): with gr.Column(scale=2): dashboard_component = DashboardComponent() dashboard_charts = ( dashboard_component.create_desktop_grid() ) with gr.Column(scale=1): gr.Markdown(""" ### šŸ“Š Chart Descriptions **Price-to-Earnings (P/E) Ratio** - Measures stock price relative to earnings per share - Higher P/E may indicate growth expectations or overvaluation - Compare to industry averages and historical trends **Price-to-Book (P/B) Ratio** - Compares market value to book value of assets - Below 1.0 may indicate undervaluation - Useful for asset-heavy companies **Return on Equity (ROE)** - Measures profitability relative to shareholder equity - Higher ROE indicates efficient use of equity - Look for consistent or improving trends **Debt-to-Equity Ratio** - Measures financial leverage and risk - Higher ratio indicates more debt financing - Industry-specific benchmarks apply **Free Cash Flow** - Cash available after capital expenditures - Positive and growing FCF indicates financial health - Critical for dividends and growth investments **Revenue Growth** - Year-over-year revenue change - Indicates business expansion or contraction - Consider sustainability and profit margins """) with gr.Tab("šŸ”§ Technical Analysis"): gr.Markdown( "Technical indicators, patterns, and trend analysis" ) # INDICATOR ANALYSIS SECTION gr.Markdown("## šŸ“Š Indicator Analysis") gr.Markdown( "*RSI, MACD, and Stochastic Oscillator analysis*" ) with gr.Row(): with gr.Column(scale=3): gr.Markdown("### Analysis Report") indicator_output = gr.Markdown() with gr.Column(scale=2): gr.Markdown("### šŸ“ˆ Technical Indicators") with gr.Row(): rsi_chart = create_chart_viewer() rsi_chart.label = "RSI" with gr.Row(): macd_chart = create_chart_viewer() macd_chart.label = "MACD" with gr.Row(): stoch_chart = create_chart_viewer() stoch_chart.label = "Stochastic Oscillator" # PATTERN ANALYSIS SECTION gr.Markdown("---") gr.Markdown("## šŸ“‰ Pattern Analysis") gr.Markdown("*Candlestick and chart pattern recognition*") with gr.Row(): with gr.Column(scale=3): gr.Markdown("### Analysis Report") pattern_output = gr.Markdown() with gr.Column(scale=2): gr.Markdown("### šŸ“Š Price Chart") chart_output = create_chart_viewer() # TREND ANALYSIS SECTION gr.Markdown("---") gr.Markdown("## šŸ“ˆ Trend Analysis") gr.Markdown("*Trend direction, strength, and momentum*") with gr.Row(): with gr.Column(): gr.Markdown("### Analysis Report") trend_output = gr.Markdown() with gr.Tab("šŸ’¼ Fundamental Analysis"): gr.Markdown( "Company fundamentals, financial metrics, and valuation" ) fundamental_output = gr.Markdown() with gr.Tab("šŸ“° Sentiment Analysis"): gr.Markdown("Market sentiment and news analysis") sentiment_output = gr.Markdown() with gr.Tab("šŸ”¬ Research Synthesis"): gr.Markdown( "Multi-perspective research and debate synthesis" ) research_output = gr.Markdown() with gr.Tab("āš ļø Risk Assessment"): gr.Markdown("Risk analysis and portfolio considerations") risk_output = gr.Markdown() with gr.Tab("šŸ’° Cost Summary"): gr.Markdown( "LLM API cost breakdown and token usage statistics" ) cost_summary_output = gr.Markdown() # Footer gr.Markdown(""" --- *Note: This is for educational purposes only. Not financial advice.* """) # Event handlers analyze_button.click( fn=self._analyze_with_progress, inputs=[ ticker_input, style_radio, timeframe_selector, use_custom_timeframe, query_input, phase_checkboxes, educational_mode_checkbox, ], outputs=[ summary_output, # Now includes decision indicator_output, pattern_output, trend_output, fundamental_output, sentiment_output, research_output, risk_output, chart_output, rsi_chart, macd_chart, stoch_chart, ] + dashboard_charts + [ status_output, cost_summary_output, ], ) # Configuration event handler apply_config_btn.click( fn=self._apply_configuration, inputs=[ rsi_period, macd_fast, macd_slow, macd_signal, stoch_k, stoch_d, ohlc_provider, fundamentals_provider, llm_provider, routing_policy, budget_limit, threshold_75, threshold_90, require_confirmation, ], outputs=[config_status, config_state, budget_status], ) # Provider status update handlers def update_provider_status(provider: str, policy: str) -> str: """Update provider status display based on selections.""" if provider == "huggingface": # Format routing policy display if policy.startswith(":"): policy_display = policy.upper() elif "/" in policy: policy_display = policy.split("/")[-1] else: policy_display = policy.title() return f"āœ“ Provider: HuggingFace | Routing: {policy_display}" else: provider_names = { "openai": "OpenAI", "anthropic": "Anthropic (Claude)", "qwen": "Qwen (DashScope)", } return ( f"āœ“ Provider: {provider_names.get(provider, provider.title())}" ) # Update status when provider changes llm_provider.change( fn=update_provider_status, inputs=[llm_provider, routing_policy], outputs=provider_status, ) # Update status when routing policy changes routing_policy.change( fn=update_provider_status, inputs=[llm_provider, routing_policy], outputs=provider_status, ) # Control routing policy visibility based on provider selection def control_routing_visibility(provider: str) -> dict: """Show routing policy selector only for HuggingFace.""" return gr.update(visible=(provider == "huggingface")) llm_provider.change( fn=control_routing_visibility, inputs=llm_provider, outputs=routing_policy, ) # Routing preset handlers def apply_cost_preset(): """Apply cost-optimized preset.""" return apply_routing_preset("cost_optimized", agent_components) def apply_perf_preset(): """Apply performance-optimized preset.""" return apply_routing_preset("performance_optimized", agent_components) def apply_balanced_preset(): """Apply balanced preset.""" return apply_routing_preset("balanced", agent_components) def apply_reset_preset(): """Reset to default configuration.""" return apply_routing_preset("default", agent_components) # Get all routing and tier dropdowns as outputs preset_outputs = [] for agent_name in [ "indicator_agent", "pattern_agent", "trend_agent", "decision_agent", "fundamentals_agent", "sentiment_agent", "news_agent", "technical_analyst", "risk_manager", "portfolio_manager", ]: preset_outputs.append(agent_components[agent_name]["routing_policy"]) preset_outputs.append(agent_components[agent_name]["model_tier"]) cost_preset_btn.click( fn=apply_cost_preset, outputs=preset_outputs, ) perf_preset_btn.click( fn=apply_perf_preset, outputs=preset_outputs, ) balanced_preset_btn.click( fn=apply_balanced_preset, outputs=preset_outputs, ) reset_preset_btn.click( fn=apply_reset_preset, outputs=preset_outputs, ) # Config export handler def handle_export(): """Export current routing configuration.""" try: config = get_agent_routing_config(agent_components) json_str = export_routing_config(config) return json_str, gr.update( value="āœ… Configuration exported", visible=True ) except Exception as e: return "", gr.update( value=f"āŒ Export failed: {str(e)}", visible=True ) export_btn.click( fn=handle_export, outputs=[config_json, import_status], ) # Config import handler def handle_import(json_str: str): """Import routing configuration from JSON.""" try: config = import_routing_config(json_str) # Generate updates for all dropdowns updates = [] for agent_name in [ "indicator_agent", "pattern_agent", "trend_agent", "decision_agent", "fundamentals_agent", "sentiment_agent", "news_agent", "technical_analyst", "risk_manager", "portfolio_manager", ]: agent_config = config.get( agent_name, {"routing_policy": "auto", "model_tier": "capable"}, ) updates.append( gr.update(value=agent_config.get("routing_policy", "auto")) ) updates.append( gr.update(value=agent_config.get("model_tier", "capable")) ) updates.append( gr.update( value="āœ… Configuration imported successfully", visible=True ) ) return updates except Exception as e: # Return no updates for dropdowns, only error status updates = [ gr.update() for _ in range(20) ] # 10 agents * 2 dropdowns updates.append( gr.update(value=f"āŒ Import failed: {str(e)}", visible=True) ) return updates import_btn.click( fn=handle_import, inputs=[config_json], outputs=preset_outputs + [import_status], ) return app def _apply_configuration( self, rsi_period: int, macd_fast: int, macd_slow: int, macd_signal: int, stoch_k: int, stoch_d: int, ohlc_provider: str, fundamentals_provider: str, llm_provider: str, routing_policy: str = None, budget_limit: float = 0, threshold_75: float = 75, threshold_90: float = 90, require_confirmation: bool = True, ) -> Tuple[str, dict, str]: """ Apply user configuration. Args: rsi_period: RSI period macd_fast: MACD fast period macd_slow: MACD slow period macd_signal: MACD signal period stoch_k: Stochastic K period stoch_d: Stochastic D period ohlc_provider: OHLC data provider fundamentals_provider: Fundamentals data provider llm_provider: LLM provider routing_policy: Routing policy for HuggingFace (optional) budget_limit: Budget limit in USD threshold_75: 75% threshold for alerts threshold_90: 90% threshold for alerts require_confirmation: Require confirmation at limit Returns: Tuple of (status_message, config_dict, budget_status) """ try: # Build user configuration user_config = { "indicator_parameters": { "rsi_period": int(rsi_period), "macd_fast": int(macd_fast), "macd_slow": int(macd_slow), "macd_signal": int(macd_signal), "stoch_k_period": int(stoch_k), "stoch_d_period": int(stoch_d), }, "data_providers": { "ohlc_primary": ohlc_provider, "fundamentals_primary": fundamentals_provider, }, "llm_provider": llm_provider, } # Add routing policy for HuggingFace if llm_provider == "huggingface" and routing_policy: user_config["routing_policy"] = routing_policy # Add budget configuration budget_status = "" if budget_limit > 0: from utils.cost_tracker import BudgetConfig budget_config = BudgetConfig( limit=budget_limit, threshold_75=threshold_75 / 100.0, threshold_90=threshold_90 / 100.0, require_confirmation_at_limit=require_confirmation, ) user_config["budget_config"] = budget_config budget_status = ( f"āœ… Budget configured: ${budget_limit:.2f} limit\n" f"Alerts at: {threshold_75:.0f}%, {threshold_90:.0f}%, 100%\n" f"Confirmation required: {'Yes' if require_confirmation else 'No'}" ) else: budget_status = "šŸ’¤ No budget configured" # Validate configuration is_valid, error = validate_configuration(user_config) if not is_valid: return f"āŒ Configuration Error: {error}", {}, "āŒ Configuration error" # Merge with defaults self.current_config = merge_config(user_config, DEFAULT_CONFIG) # Reinitialize workflows with new config self.conditional_workflow = ConditionalComprehensiveWorkflow( config=self.current_config ) status = "āœ… Configuration applied successfully!\n\n" status += f"RSI Period: {rsi_period}\n" status += f"MACD: {macd_fast}/{macd_slow}/{macd_signal}\n" status += f"Stochastic: K={stoch_k}, D={stoch_d}\n" status += f"Data Provider: {ohlc_provider}\n" status += f"LLM Provider: {llm_provider}" # Add routing policy info for HuggingFace if llm_provider == "huggingface" and routing_policy: if routing_policy.startswith(":"): policy_display = routing_policy.upper() elif "/" in routing_policy: policy_display = routing_policy.split("/")[-1] else: policy_display = routing_policy.title() status += f"\nRouting Policy: {policy_display}" return status, self.current_config, budget_status except Exception as e: error_msg = format_exception_for_user(e) logger.error(f"Configuration error: {str(e)}") return error_msg, {}, "āŒ Configuration error" def _analyze_with_progress( self, ticker: str, investment_style: Optional[str] = None, timeframe: str = "1w", use_custom_timeframe: bool = False, query: Optional[str] = None, enabled_phases: Optional[list] = None, educational_mode: bool = True, ) -> Tuple[ str, str, str, str, str, str, str, str, Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], str, str, ]: """ Run analysis with progress updates. Args: ticker: Asset ticker symbol investment_style: Investment style for custom phase analysis timeframe: Analysis timeframe (used if use_custom_timeframe is True) use_custom_timeframe: Whether to use custom timeframe or investment style default query: Optional user query enabled_phases: List of enabled phases for custom analysis educational_mode: Whether to include educational content Returns: Tuple of (summary_with_decision, indicator_md, pattern_md, trend_md, fundamental_md, sentiment_md, research_md, risk_md, chart_path, rsi_chart, macd_chart, stoch_chart, pe_chart, pb_chart, ps_chart, ev_chart, margins_chart, roe_chart, growth_chart, fcf_chart, debt_chart, status_message, cost_summary_md) """ try: # Determine timeframe based on investment style if not using custom if not use_custom_timeframe: # Use investment style defaults if investment_style == InvestmentStyle.LONG_TERM.value: timeframe = "1w" # Weekly for long-term elif investment_style == InvestmentStyle.SWING_TRADING.value: timeframe = "1d" # Daily for swing trading else: timeframe = "1w" # Default to weekly # Validate inputs is_valid, error_msg = validate_ticker(ticker) if not is_valid: error_msg_display = f"**Error**: Invalid ticker - {error_msg}" return ( error_msg_display, # summary error_msg_display, # indicator error_msg_display, # pattern error_msg_display, # trend error_msg_display, # fundamental error_msg_display, # sentiment error_msg_display, # research error_msg_display, # risk None, # chart None, # rsi_chart None, # macd_chart None, # stoch_chart None, None, None, None, None, None, None, None, None, # dashboard charts f"āŒ Error: {error_msg}", # status "", # cost_summary ) if not validate_timeframe(timeframe): error_msg_display = f"**Error**: Invalid timeframe - {timeframe}" return ( error_msg_display, # summary error_msg_display, # indicator error_msg_display, # pattern error_msg_display, # trend error_msg_display, # fundamental error_msg_display, # sentiment error_msg_display, # research error_msg_display, # risk None, # chart None, # rsi_chart None, # macd_chart None, # stoch_chart None, None, None, None, None, None, None, None, None, # dashboard charts "āŒ Error: Invalid timeframe", # status "", # cost_summary ) # US3: Check cache for existing analysis # Generate cache key based on analysis parameters (including provider) if enabled_phases: cache_key = self._generate_cache_key( ticker, timeframe, enabled_phases or [], self.current_config.get("llm_provider", "openai"), ) cached_result = self._get_cached_analysis(cache_key) if cached_result: logger.info(f"Returning cached analysis for {ticker} {timeframe}") # Extract cached outputs cache_note = "\n\n*šŸ“¦ Retrieved from cache*" # Get cached decision to merge into summary decision = cached_result.get("decision", "") # Regenerate summary with decision included (for backward compatibility with old cache) # If cache already has decision in summary, this will add cache_note # If cache is old format, this will properly merge decision into summary base_summary = cached_result.get( "summary", f"# Summary: {ticker.upper()}\n\n*Retrieved from cache*", ) # Check if decision is already in summary if decision and "Final Trading Recommendation" not in base_summary: # Old cache format - add decision to summary summary = ( base_summary + f"\n---\n\n## šŸŽÆ Final Trading Recommendation\n\n{decision}" + cache_note ) else: # New cache format or no decision summary = base_summary + cache_note indicator = cached_result.get("indicator", "") + cache_note pattern = cached_result.get("pattern", "") + cache_note trend = cached_result.get("trend", "") + cache_note fundamental = cached_result.get("fundamental", "") + cache_note sentiment = cached_result.get("sentiment", "") + cache_note research = cached_result.get("research", "") + cache_note risk = cached_result.get("risk", "") + cache_note return ( summary, indicator, pattern, trend, fundamental, sentiment, research, risk, cached_result["chart_path"], cached_result.get("rsi_chart"), cached_result.get("macd_chart"), cached_result.get("stoch_chart"), cached_result.get("dashboard_charts", [None] * 7)[ 0 ], # pe_chart cached_result.get("dashboard_charts", [None] * 7)[ 1 ], # pb_chart cached_result.get("dashboard_charts", [None] * 7)[ 2 ], # ps_chart None, # ev_chart (removed) cached_result.get("dashboard_charts", [None] * 7)[ 3 ], # margins_chart cached_result.get("dashboard_charts", [None] * 7)[ 4 ], # roe_chart None, # growth_chart (removed) cached_result.get("dashboard_charts", [None] * 7)[ 5 ], # fcf_chart cached_result.get("dashboard_charts", [None] * 7)[ 6 ], # debt_chart f"āœ… Analysis retrieved from cache for {ticker.upper()}", "", # No cost summary for cached results ) # Update status with phase details phase_count = len(enabled_phases) if enabled_phases else 0 phase_names = ", ".join([p.upper() for p in (enabled_phases or [])]) timeframe_source = "custom" if use_custom_timeframe else "style default" status = f"šŸ”„ Analyzing {ticker.upper()} with {phase_count} phases: {phase_names}\n" status += f"Investment Style: {investment_style or 'general'}\n" status += f"Timeframe: {timeframe.upper()} ({timeframe_source})\n" status += "ā³ This may take 30-120 seconds depending on phases selected..." # Create phase configuration try: # Convert phase names to AnalysisPhase enums phase_enums = [AnalysisPhase(phase) for phase in (enabled_phases or [])] # Create PhaseConfiguration phase_config = PhaseConfiguration( investment_style=InvestmentStyle(investment_style) if investment_style else InvestmentStyle.LONG_TERM, enabled_phases=phase_enums, timeframe=timeframe, educational_mode=educational_mode, ) # Validate configuration validation_errors = phase_config.validate() if validation_errors: error_msg_display = ( f"**Configuration Error**: {', '.join(validation_errors)}" ) return ( error_msg_display, # summary error_msg_display, # indicator error_msg_display, # pattern error_msg_display, # trend error_msg_display, # fundamental error_msg_display, # sentiment error_msg_display, # research error_msg_display, # risk None, # chart None, # rsi_chart None, # macd_chart None, # stoch_chart None, None, None, None, None, None, None, None, None, # dashboard charts f"āŒ Configuration Error: {', '.join(validation_errors)}", # status "", # cost_summary ) # Run conditional workflow final_state = self.conditional_workflow.run( ticker=ticker.strip().upper(), timeframe=timeframe, phase_config=phase_config, user_query=query if query else None, ) except Exception as e: error_msg_display = f"**Phase Configuration Error**: {str(e)}" return ( error_msg_display, # summary error_msg_display, # indicator error_msg_display, # pattern error_msg_display, # trend error_msg_display, # fundamental error_msg_display, # sentiment error_msg_display, # research error_msg_display, # risk None, # chart None, # rsi_chart None, # macd_chart None, # stoch_chart None, None, None, None, None, None, None, None, None, # dashboard charts f"āŒ Error: {str(e)}", # status "", # cost_summary ) # Add config to state for display final_state["config"] = self.current_config # Check for budget alerts budget_alert = final_state.get("budget_alert") if budget_alert: threshold = budget_alert.get("threshold", 0) message = budget_alert.get("message", "") exceeded = budget_alert.get("exceeded", False) # Get cost reduction tips from cost tracker cost_tracker = self.conditional_workflow.cost_tracker.cost_tracker provider = self.current_config.get("llm_provider", "huggingface") routing_policy = self.current_config.get("routing_policy", "N/A") tips = cost_tracker.get_cost_reduction_tips(provider) # Add current provider and routing policy info full_message = message full_message += f"\n\nšŸ“Š Current Configuration:\n" full_message += f" • Provider: {provider}\n" if provider == "huggingface": if routing_policy.startswith(":"): policy_display = routing_policy.upper() elif "/" in routing_policy: policy_display = routing_policy.split("/")[-1] else: policy_display = routing_policy full_message += f" • Routing Policy: {policy_display}\n" full_message += f" • Cost vs Budget: ${cost_tracker.total_cost:.4f} / ${cost_tracker.budget_config.limit:.2f}" # Add cost reduction tips if tips: full_message += "\n\nšŸ’” Cost Reduction Tips:\n" + "\n".join( f" • {tip}" for tip in tips ) # Display appropriate alert based on threshold if threshold == 1.0: # 100% - Budget exceeded if ( exceeded and cost_tracker.budget_config.require_confirmation_at_limit ): gr.Warning( full_message + "\n\nāš ļø Analysis paused - budget limit reached" ) else: gr.Warning(full_message) elif threshold == 0.90: # 90% - Warning threshold gr.Warning(full_message) elif threshold == 0.75: # 75% - Info threshold gr.Info(full_message) # Check for errors if final_state.get("error"): error_msg_display = ( f"**Analysis Failed**: {final_state.get('error', 'Unknown error')}" ) return ( error_msg_display, # summary error_msg_display, # indicator error_msg_display, # pattern error_msg_display, # trend error_msg_display, # fundamental error_msg_display, # sentiment error_msg_display, # research error_msg_display, # risk None, # chart None, # rsi_chart None, # macd_chart None, # stoch_chart None, None, None, None, None, None, None, None, None, # dashboard charts f"āŒ Analysis failed: {final_state.get('error', 'Unknown error')}", # status "", # cost_summary ) # Extract phase reports fundamental_output, sentiment_output, research_output, risk_output = ( self._extract_phase_reports(final_state) ) # Extract technical phase and decision agent outputs decision_output, indicator_output, pattern_output, trend_output = ( self._extract_agent_outputs(final_state) ) # Extract indicator chart paths rsi_chart_path, macd_chart_path, stoch_chart_path = ( self._extract_indicator_chart_paths(final_state) ) # Validate and prepare indicator chart paths for display rsi_chart_path = display_chart(rsi_chart_path) macd_chart_path = display_chart(macd_chart_path) stoch_chart_path = display_chart(stoch_chart_path) # Generate executive summary with portfolio manager decision summary_output = self._generate_summary(final_state, ticker) # Get chart path (price chart for pattern analysis) chart_path = display_chart(final_state.get("chart_path")) # Generate valuation dashboard (Feature 004) # Note: 7 charts after removing EV/EBITDA and Revenue/Earnings Growth dashboard_chart_paths = [None] * 7 # Initialize with None values try: # Calculate date range based on investment style end_date = datetime.now() if investment_style == InvestmentStyle.LONG_TERM.value: start_date = end_date - timedelta(days=365) # 1 year for long-term elif investment_style == InvestmentStyle.SWING_TRADING.value: start_date = end_date - timedelta(days=90) # 3 months for swing else: start_date = end_date - timedelta(days=365) # Default 1 year logger.info(f"Generating valuation dashboard for {ticker}") dashboard = self.dashboard_generator.generate( ticker, start_date, end_date ) logger.info(f"Dashboard generated: {len(dashboard.charts)} charts") # Extract chart file paths in order chart_dir = Path("data/cache/charts") date_str = ( f"{start_date.strftime('%Y%m%d')}_{end_date.strftime('%Y%m%d')}" ) # Build expected file paths for each chart type # Note: EV/EBITDA and REVENUE_EARNINGS_GROWTH removed (insufficient data) chart_type_to_path = { ChartType.PE_RATIO: chart_dir / f"{ticker}_pe_ratio_{date_str}.png", ChartType.PB_RATIO: chart_dir / f"{ticker}_pb_ratio_{date_str}.png", ChartType.PS_RATIO: chart_dir / f"{ticker}_ps_ratio_{date_str}.png", ChartType.PROFIT_MARGINS: chart_dir / f"{ticker}_profit_margins_{date_str}.png", ChartType.ROE: chart_dir / f"{ticker}_roe_{date_str}.png", ChartType.FREE_CASH_FLOW: chart_dir / f"{ticker}_fcf_{date_str}.png", ChartType.DEBT_TO_EQUITY: chart_dir / f"{ticker}_debt_equity_{date_str}.png", } # Extract paths in display order (7 charts total) chart_order = [ ChartType.PE_RATIO, ChartType.PB_RATIO, ChartType.PS_RATIO, ChartType.PROFIT_MARGINS, ChartType.ROE, ChartType.FREE_CASH_FLOW, ChartType.DEBT_TO_EQUITY, ] dashboard_chart_paths = [ str(chart_type_to_path[chart_type]) if chart_type_to_path[chart_type].exists() else None for chart_type in chart_order ] except Exception as e: logger.error(f"Failed to generate dashboard: {e}") dashboard_chart_paths = [None] * 7 # Fail gracefully (7 charts) # Format cost summary cost_summary = final_state.get("cost_summary") cost_summary_md = format_cost_summary_markdown(cost_summary) success_status = f"āœ… Analysis complete for {ticker.upper()}" # US3: Cache the analysis result if enabled_phases: cache_result = { "summary": summary_output, "indicator": indicator_output, "pattern": pattern_output, "trend": trend_output, "fundamental": fundamental_output, "sentiment": sentiment_output, "research": research_output, "risk": risk_output, "decision": decision_output, "chart_path": chart_path, "rsi_chart": rsi_chart_path, "macd_chart": macd_chart_path, "stoch_chart": stoch_chart_path, "dashboard_charts": dashboard_chart_paths, "from_cache": False, } metadata = { "ticker": ticker.upper(), "timeframe": timeframe, "investment_style": investment_style, "phase_count": len(enabled_phases), } self._cache_analysis_result(cache_key, cache_result, metadata) # US3: Add to report history self._add_to_report_history( ticker, timeframe, { "indicator": indicator_output, "pattern": pattern_output, "trend": trend_output, "fundamental": fundamental_output, "sentiment": sentiment_output, "research": research_output, "risk": risk_output, "decision": decision_output, "analysis_type": "Phase-Based Analysis", "from_cache": False, }, ) return ( summary_output, # Now includes decision indicator_output, pattern_output, trend_output, fundamental_output, sentiment_output, research_output, risk_output, chart_path, rsi_chart_path, macd_chart_path, stoch_chart_path, dashboard_chart_paths[0], # pe_chart dashboard_chart_paths[1], # pb_chart dashboard_chart_paths[2], # ps_chart None, # ev_chart (removed) dashboard_chart_paths[3], # margins_chart dashboard_chart_paths[4], # roe_chart None, # growth_chart (removed) dashboard_chart_paths[5], # fcf_chart dashboard_chart_paths[6], # debt_chart success_status, cost_summary_md, ) except Exception as e: error_trace = traceback.format_exc() # Log error with full traceback logger.error( json.dumps( { "component": "web_interface", "action": "error", "ticker": ticker, "timeframe": timeframe, "analysis_type": "Phase-Based Analysis", "error": str(e), "error_type": type(e).__name__, "traceback": error_trace, "timestamp": time.time(), } ) ) # Format user-friendly error message user_error_msg = format_exception_for_user(e) error_msg_display = ( f"**Unexpected Error**: {type(e).__name__}\n\n{user_error_msg}" ) return ( error_msg_display, # summary error_msg_display, # indicator error_msg_display, # pattern error_msg_display, # trend error_msg_display, # fundamental error_msg_display, # sentiment error_msg_display, # research error_msg_display, # risk None, # chart None, # rsi_chart None, # macd_chart None, # stoch_chart None, None, None, None, None, None, None, None, None, # dashboard charts user_error_msg, # status "", # cost_summary ) def launch(self, **kwargs): """ Launch Gradio app. Args: **kwargs: Arguments passed to gr.Blocks.launch() """ default_kwargs = { "server_name": "0.0.0.0", "server_port": 7860, "share": False, "show_error": True, } default_kwargs.update(kwargs) return self.app.launch(**default_kwargs) def create_interface(config: Optional[dict] = None) -> TradingInterface: """ Create trading interface instance. Args: config: Optional configuration override Returns: TradingInterface instance """ return TradingInterface(config=config)