Spaces:
Sleeping
Sleeping
| """ | |
| Workflow state management service. | |
| Handles in-memory workflow storage and background execution. | |
| """ | |
| import json | |
| import logging | |
| import os | |
| from datetime import datetime | |
| from src.services.swot_parser import parse_swot_text | |
| from src.utils.analysis_cache import get_cached_analysis, set_cached_analysis | |
| logger = logging.getLogger(__name__) | |
| # In-memory workflow storage | |
| WORKFLOWS: dict = {} | |
| # Configurable delay for granular progress events (ms) | |
| METRIC_DELAY_MS = int(os.getenv("METRIC_DELAY_MS", "300")) | |
| def add_activity_log(workflow_id: str, step: str, message: str): | |
| """Add an entry to the workflow activity log.""" | |
| if workflow_id in WORKFLOWS: | |
| if "activity_log" not in WORKFLOWS[workflow_id]: | |
| WORKFLOWS[workflow_id]["activity_log"] = [] | |
| WORKFLOWS[workflow_id]["activity_log"].append({ | |
| "timestamp": datetime.utcnow().isoformat() + "Z", | |
| "step": step, | |
| "message": message | |
| }) | |
| def add_metric(workflow_id: str, source: str, metric: str, value, | |
| end_date: str = None, fiscal_year: int = None, form: str = None): | |
| """Add a metric to the workflow metrics array and activity log. | |
| Args: | |
| workflow_id: Workflow identifier | |
| source: Data source (e.g., 'fundamentals', 'valuation') | |
| metric: Metric name (e.g., 'Revenue', 'P/E') | |
| value: Metric value | |
| end_date: Fiscal period end date (e.g., '2023-09-30') | |
| fiscal_year: Fiscal year number (e.g., 2023) | |
| form: SEC form type ('10-K' for annual, '10-Q' for quarterly) | |
| """ | |
| if workflow_id in WORKFLOWS: | |
| if "metrics" not in WORKFLOWS[workflow_id]: | |
| WORKFLOWS[workflow_id]["metrics"] = [] | |
| metric_entry = { | |
| "timestamp": datetime.utcnow().isoformat() + "Z", | |
| "source": source, | |
| "metric": metric, | |
| "value": value | |
| } | |
| # Add temporal fields if provided | |
| if end_date: | |
| metric_entry["end_date"] = end_date | |
| if fiscal_year: | |
| metric_entry["fiscal_year"] = fiscal_year | |
| if form: | |
| metric_entry["form"] = form | |
| WORKFLOWS[workflow_id]["metrics"].append(metric_entry) | |
| # Build display value with fiscal period if available | |
| display_value = f"{value:.2f}" if isinstance(value, float) else str(value) | |
| if fiscal_year: | |
| period_label = f"FY {fiscal_year}" if form == "10-K" else f"Q{_quarter_from_date(end_date)} {fiscal_year}" if end_date else f"FY {fiscal_year}" | |
| display_value = f"{display_value} ({period_label})" | |
| add_activity_log(workflow_id, source, f"Fetched {metric}: {display_value}") | |
| # Update MCP status to completed when we get a metric | |
| if "mcp_status" in WORKFLOWS[workflow_id] and source in WORKFLOWS[workflow_id]["mcp_status"]: | |
| WORKFLOWS[workflow_id]["mcp_status"][source] = "completed" | |
| def set_mcp_executing(workflow_id: str): | |
| """Set all MCP servers to 'executing' state when research starts.""" | |
| if workflow_id in WORKFLOWS and "mcp_status" in WORKFLOWS[workflow_id]: | |
| for source in WORKFLOWS[workflow_id]["mcp_status"]: | |
| WORKFLOWS[workflow_id]["mcp_status"][source] = "executing" | |
| def _quarter_from_date(date_str: str) -> int: | |
| """Extract quarter number from a date string (YYYY-MM-DD).""" | |
| if not date_str: | |
| return 0 | |
| try: | |
| month = int(date_str.split("-")[1]) | |
| return (month - 1) // 3 + 1 | |
| except (ValueError, IndexError): | |
| return 0 | |
| def update_mcp_status(workflow_id: str, source: str, status: str): | |
| """Update MCP server status (idle/executing/completed/failed).""" | |
| if workflow_id in WORKFLOWS and "mcp_status" in WORKFLOWS[workflow_id]: | |
| if source in WORKFLOWS[workflow_id]["mcp_status"]: | |
| WORKFLOWS[workflow_id]["mcp_status"][source] = status | |
| def _extract_metrics_from_raw_data(raw_data: dict) -> list: | |
| """Extract metrics array from raw_data for cached analysis display. | |
| Parses the multi_source structure to extract quantitative metrics | |
| in the same format as add_metric() produces. | |
| Args: | |
| raw_data: Parsed raw_data dict from cached analysis | |
| Returns: | |
| List of metric entries with source, metric, value, and temporal fields | |
| """ | |
| metrics = [] | |
| timestamp = datetime.utcnow().isoformat() + "Z" | |
| multi_source = raw_data.get("multi_source", {}) | |
| # Extract fundamentals from SEC EDGAR and Yahoo Finance | |
| fin_all = multi_source.get("fundamentals_all", {}) | |
| sec_data = fin_all.get("sec_edgar", {}).get("data", {}) | |
| yf_fund = fin_all.get("yahoo_finance", {}).get("data", {}) | |
| # Fundamentals metrics to extract | |
| fin_metrics = [ | |
| "revenue", "net_income", "gross_profit", "operating_income", | |
| "gross_margin_pct", "operating_margin_pct", "net_margin_pct", | |
| "free_cash_flow", "operating_cash_flow", "total_assets", | |
| "total_liabilities", "stockholders_equity", "cash", | |
| "long_term_debt", "net_debt", "rd_expense", "eps", "debt_to_equity" | |
| ] | |
| for metric_name in fin_metrics: | |
| # Prefer SEC EDGAR data, fall back to Yahoo Finance | |
| # Track which source the data actually came from | |
| sec_metric = sec_data.get(metric_name) | |
| yf_metric = yf_fund.get(metric_name) | |
| if sec_metric is not None: | |
| metric_data = sec_metric | |
| actual_source = "sec_edgar" | |
| elif yf_metric is not None: | |
| metric_data = yf_metric | |
| actual_source = "yahoo_finance" | |
| else: | |
| continue | |
| entry = { | |
| "timestamp": timestamp, | |
| "source": "fundamentals", | |
| "metric": metric_name, | |
| "data_source": actual_source, # Track actual data source for frontend | |
| } | |
| if isinstance(metric_data, dict): | |
| entry["value"] = metric_data.get("value") | |
| if metric_data.get("end_date"): | |
| entry["end_date"] = metric_data["end_date"] | |
| if metric_data.get("fiscal_year"): | |
| entry["fiscal_year"] = metric_data["fiscal_year"] | |
| if metric_data.get("form"): | |
| entry["form"] = metric_data["form"] | |
| else: | |
| entry["value"] = metric_data | |
| if entry.get("value") is not None: | |
| metrics.append(entry) | |
| # Extract valuation metrics from Yahoo Finance | |
| val_all = multi_source.get("valuation_all", {}) | |
| yf_val = val_all.get("yahoo_finance", {}).get("data", {}) | |
| # Get valuation fetch date if available (point-in-time data) | |
| # MCP server returns regular_market_time from Yahoo Finance quote data | |
| val_fetch_date = ( | |
| yf_val.get("_fetch_date") | |
| or yf_val.get("fetch_date") | |
| or multi_source.get("valuation_all", {}).get("yahoo_finance", {}).get("regular_market_time") | |
| ) | |
| val_metrics = [ | |
| "market_cap", "enterprise_value", "trailing_pe", "forward_pe", | |
| "pb_ratio", "ps_ratio", "trailing_peg", "price_to_fcf", | |
| "ev_ebitda", "ev_revenue", "revenue_growth", "earnings_growth" | |
| ] | |
| for metric_name in val_metrics: | |
| metric_data = yf_val.get(metric_name) | |
| if metric_data is not None: | |
| entry = { | |
| "timestamp": timestamp, | |
| "source": "valuation", | |
| "metric": metric_name, | |
| } | |
| if isinstance(metric_data, dict): | |
| entry["value"] = metric_data.get("value") | |
| # Extract date if available in metric data | |
| if metric_data.get("date"): | |
| entry["end_date"] = metric_data["date"] | |
| elif metric_data.get("end_date"): | |
| entry["end_date"] = metric_data["end_date"] | |
| elif val_fetch_date: | |
| entry["end_date"] = val_fetch_date | |
| else: | |
| entry["value"] = metric_data | |
| if val_fetch_date: | |
| entry["end_date"] = val_fetch_date | |
| if entry.get("value") is not None: | |
| metrics.append(entry) | |
| # Extract volatility metrics | |
| vol_all = multi_source.get("volatility_all", {}) | |
| ctx = vol_all.get("market_volatility_context", {}) | |
| yf_vol = vol_all.get("yahoo_finance", {}).get("data", {}) | |
| # VIX and VXN from market context | |
| for vol_metric in ["vix", "vxn"]: | |
| vol_data = ctx.get(vol_metric, {}) | |
| if vol_data.get("value") is not None: | |
| entry = { | |
| "timestamp": timestamp, | |
| "source": "volatility", | |
| "metric": vol_metric, | |
| "value": vol_data["value"] | |
| } | |
| if vol_data.get("date"): | |
| entry["end_date"] = vol_data["date"] | |
| metrics.append(entry) | |
| # Beta and volatility from Yahoo Finance | |
| # Get volatility fetch date if available (MCP returns generated_at at response level) | |
| vol_fetch_date = ( | |
| yf_vol.get("_fetch_date") | |
| or yf_vol.get("fetch_date") | |
| or (vol_all.get("generated_at", "")[:10] if vol_all.get("generated_at") else None) | |
| ) | |
| for vol_metric in ["beta", "historical_volatility", "implied_volatility"]: | |
| metric_data = yf_vol.get(vol_metric) | |
| if metric_data is not None: | |
| entry = { | |
| "timestamp": timestamp, | |
| "source": "volatility", | |
| "metric": vol_metric, | |
| } | |
| if isinstance(metric_data, dict): | |
| entry["value"] = metric_data.get("value") | |
| if metric_data.get("date"): | |
| entry["end_date"] = metric_data["date"] | |
| elif metric_data.get("end_date"): | |
| entry["end_date"] = metric_data["end_date"] | |
| elif vol_fetch_date: | |
| entry["end_date"] = vol_fetch_date | |
| else: | |
| entry["value"] = metric_data | |
| if vol_fetch_date: | |
| entry["end_date"] = vol_fetch_date | |
| if entry.get("value") is not None: | |
| metrics.append(entry) | |
| # Extract macro indicators | |
| macro_all = multi_source.get("macro_all", {}) | |
| bea_bls = macro_all.get("bea_bls", {}).get("data", {}) | |
| fred = macro_all.get("fred", {}).get("data", {}) | |
| macro_metrics = ["gdp_growth", "cpi_inflation", "unemployment", "interest_rate"] | |
| for metric_name in macro_metrics: | |
| # Prefer BEA/BLS, fall back to FRED | |
| metric_data = bea_bls.get(metric_name) or fred.get(metric_name) | |
| if metric_data is not None and isinstance(metric_data, dict): | |
| if metric_data.get("value") is not None: | |
| entry = { | |
| "timestamp": timestamp, | |
| "source": "macro", | |
| "metric": metric_name, | |
| "value": metric_data["value"] | |
| } | |
| if metric_data.get("date"): | |
| entry["end_date"] = metric_data["date"] | |
| metrics.append(entry) | |
| return metrics | |
| def run_workflow_background(workflow_id: str, company_name: str, ticker: str, strategy_focus: str, | |
| skip_cache: bool = False, user_api_keys: dict = None): | |
| """Execute workflow in background thread with progress tracking.""" | |
| try: | |
| # Check cache first (unless skip_cache is True) | |
| add_activity_log(workflow_id, "cache", f"Checking cache for {ticker}") | |
| WORKFLOWS[workflow_id]["current_step"] = "cache" | |
| if skip_cache: | |
| add_activity_log(workflow_id, "cache", f"Cache skipped - running fresh analysis") | |
| cached = None | |
| else: | |
| cached = get_cached_analysis(ticker) | |
| if cached: | |
| # Cache hit - use cached result | |
| add_activity_log(workflow_id, "cache", f"Cache HIT - {ticker} analysis found in history") | |
| add_activity_log(workflow_id, "cache", f"Returning cached result (skipping agentic workflow)") | |
| # Extract metrics from cached raw_data for frontend display | |
| cached_raw_data = cached.get("raw_data", {}) | |
| cached_metrics = _extract_metrics_from_raw_data(cached_raw_data) | |
| WORKFLOWS[workflow_id].update({ | |
| "status": "completed", | |
| "current_step": "completed", | |
| "revision_count": cached.get("revision_count", 0), | |
| "score": cached.get("score", 0), | |
| "data_source": "cache", | |
| "metrics": cached_metrics, # Populate metrics for frontend | |
| "result": { | |
| "company_name": cached.get("company_name", company_name), | |
| "score": cached.get("score", 0), | |
| "revision_count": cached.get("revision_count", 0), | |
| "report_length": cached.get("report_length", 0), | |
| "critique": cached.get("critique", ""), | |
| "swot_data": cached.get("swot_data", {}), | |
| "raw_report": cached.get("raw_report", ""), | |
| "data_source": "cache", | |
| "provider_used": cached.get("provider_used", "cached"), | |
| "raw_data": cached.get("raw_data", {}), | |
| "_cache_info": cached.get("_cache_info", {}) | |
| } | |
| }) | |
| return | |
| add_activity_log(workflow_id, "cache", f"Cache MISS - {ticker} not in history") | |
| add_activity_log(workflow_id, "cache", f"Proceeding with full agentic workflow...") | |
| # Import here to avoid circular imports and init issues | |
| from src.workflow.graph import app as graph_app | |
| # Update status to running | |
| WORKFLOWS[workflow_id]["status"] = "running" | |
| WORKFLOWS[workflow_id]["current_step"] = "researcher" | |
| add_activity_log(workflow_id, "input", f"Starting analysis for {company_name} ({ticker})") | |
| # Initialize MCP status | |
| WORKFLOWS[workflow_id]["mcp_status"] = { | |
| "fundamentals": "idle", | |
| "valuation": "idle", | |
| "volatility": "idle", | |
| "macro": "idle", | |
| "news": "idle", | |
| "sentiment": "idle" | |
| } | |
| # Initialize state | |
| state = { | |
| "company_name": company_name, | |
| "ticker": ticker, | |
| "strategy_focus": strategy_focus, | |
| "raw_data": None, | |
| "draft_report": None, | |
| "critique": None, | |
| "revision_count": 0, | |
| "messages": [], | |
| "score": 0, | |
| "data_source": "live", | |
| "provider_used": None, | |
| "workflow_id": workflow_id, | |
| "progress_store": WORKFLOWS, | |
| "user_api_keys": user_api_keys or {} # Pass user API keys to nodes | |
| } | |
| # Execute workflow | |
| result = graph_app.invoke(state) | |
| # Update MCP status based on sources | |
| sources_available = result.get("sources_available", []) | |
| sources_failed = result.get("sources_failed", []) | |
| mcp_status = WORKFLOWS[workflow_id]["mcp_status"] | |
| for source in sources_available: | |
| if source in mcp_status: | |
| mcp_status[source] = "completed" | |
| for source in sources_failed: | |
| if source in mcp_status: | |
| mcp_status[source] = "failed" | |
| add_activity_log(workflow_id, source, f"MCP server failed") | |
| # Update LLM status based on failed providers and used provider | |
| # IMPORTANT: Do this BEFORE checking for errors so frontend sees failures | |
| llm_providers_failed = result.get("llm_providers_failed", []) | |
| provider_used = result.get("provider_used", "") | |
| llm_status = WORKFLOWS[workflow_id]["llm_status"] | |
| # Mark failed providers | |
| for provider in llm_providers_failed: | |
| if provider in llm_status: | |
| llm_status[provider] = "failed" | |
| add_activity_log(workflow_id, "llm", f"{provider.capitalize()} provider failed") | |
| # Mark the used provider as completed | |
| if provider_used: | |
| provider_name = provider_used.split(":")[0] | |
| if provider_name in llm_status: | |
| llm_status[provider_name] = "completed" | |
| # Check if workflow ended with an error (LLM failures etc) | |
| # Do this BEFORE parsing SWOT so we properly abort on errors | |
| if result.get("error"): | |
| error_msg = result.get("error") | |
| add_activity_log(workflow_id, "workflow", f"Workflow failed: {error_msg}") | |
| WORKFLOWS[workflow_id].update({ | |
| "status": "aborted", | |
| "error": error_msg, | |
| "current_step": "aborted" | |
| }) | |
| return | |
| # Parse SWOT from draft report | |
| swot_data = parse_swot_text(result.get("draft_report", "")) | |
| # Supplement with MCP-aggregated SWOT data (ensures weaknesses/threats aren't lost) | |
| try: | |
| raw_data = result.get("raw_data", "{}") | |
| if isinstance(raw_data, str): | |
| raw_data = json.loads(raw_data) | |
| mcp_swot = raw_data.get("aggregated_swot", {}) | |
| if mcp_swot: | |
| # Add MCP items that aren't already in parsed data | |
| for category in ["strengths", "weaknesses", "opportunities", "threats"]: | |
| existing = set(item.lower()[:50] for item in swot_data.get(category, [])) | |
| for item in mcp_swot.get(category, []): | |
| # Only add if not a duplicate (check first 50 chars lowercased) | |
| if item.lower()[:50] not in existing: | |
| swot_data[category].append(item) | |
| existing.add(item.lower()[:50]) | |
| except Exception as e: | |
| logger.warning(f"Could not merge MCP SWOT data: {e}") | |
| # Parse raw_data for MCP display | |
| raw_data_parsed = {} | |
| try: | |
| raw_data_str = result.get("raw_data", "{}") | |
| if isinstance(raw_data_str, str): | |
| raw_data_parsed = json.loads(raw_data_str) | |
| else: | |
| raw_data_parsed = raw_data_str or {} | |
| except Exception as e: | |
| logger.warning(f"Could not parse raw_data: {e}") | |
| # Extract business address from company profile | |
| company_profile = raw_data_parsed.get("company_profile", {}) | |
| business_address = company_profile.get("business_address", "") | |
| # Generate data quality notes from metric reference | |
| metric_reference = result.get("metric_reference", {}) | |
| quality_notes = {"high_confidence": [], "gaps_or_stale": [], "assumptions": []} | |
| if metric_reference: | |
| from src.nodes.analyzer import _generate_data_quality_notes | |
| quality_notes = _generate_data_quality_notes(metric_reference) | |
| quality_notes["assumptions"] = [] # LLM assumptions added later if available | |
| # Build final result | |
| final_result = { | |
| "company_name": company_name, | |
| "business_address": business_address, | |
| "score": result.get("score", 0), | |
| "revision_count": result.get("revision_count", 0), | |
| "report_length": len(result.get("draft_report", "")), | |
| "critique": result.get("critique", ""), | |
| "quality_notes": quality_notes, | |
| "swot_data": swot_data, | |
| "raw_report": result.get("draft_report", ""), | |
| "data_source": result.get("data_source", "unknown"), | |
| "provider_used": result.get("provider_used", "unknown"), | |
| "raw_data": raw_data_parsed | |
| } | |
| # Cache the final result | |
| set_cached_analysis(ticker, company_name, final_result) | |
| add_activity_log(workflow_id, "cache", f"Cached analysis for {ticker}") | |
| # Update with final result | |
| WORKFLOWS[workflow_id].update({ | |
| "status": "completed", | |
| "current_step": "completed", | |
| "revision_count": result.get("revision_count", 0), | |
| "score": result.get("score", 0), | |
| "result": final_result | |
| }) | |
| except Exception as e: | |
| error_msg = str(e) | |
| # Determine if this is an abort (critical) or error (retryable) | |
| # Aborts: Core MCP failures, insufficient data, LLM failures | |
| is_abort = any(phrase in error_msg for phrase in [ | |
| "Insufficient core data", | |
| "All MCP servers failed", | |
| "Need at least 2 of", | |
| "All LLM providers failed" | |
| ]) | |
| WORKFLOWS[workflow_id].update({ | |
| "status": "aborted" if is_abort else "error", | |
| "error": error_msg | |
| }) | |