File size: 20,566 Bytes
0c591a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
255d8a9
 
 
 
 
 
5766b78
255d8a9
 
 
 
 
 
0c591a7
 
 
255d8a9
 
0c591a7
 
 
 
255d8a9
 
 
 
 
 
 
 
 
 
 
 
 
0c591a7
255d8a9
 
 
0c591a7
 
 
 
 
 
 
dc70069
 
 
 
 
 
 
255d8a9
 
 
 
 
 
 
 
 
 
 
0c591a7
 
 
 
 
 
 
533590e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33d2228
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
533590e
 
 
 
 
33d2228
09c3333
 
 
 
 
 
33d2228
533590e
 
 
 
 
 
 
 
 
33d2228
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
533590e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
09c3333
 
 
 
f80bf1d
09c3333
33d2228
533590e
 
 
33d2228
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
533590e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e0da39
 
0c591a7
 
3e0da39
0c591a7
 
 
3e0da39
 
 
 
 
 
0c591a7
 
 
 
533590e
 
 
 
 
0c591a7
 
 
 
 
 
533590e
0c591a7
 
 
 
 
 
 
 
 
 
c1e5f17
0c591a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5766b78
0c591a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e0da39
 
0c591a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fa01e42
0c591a7
 
 
 
 
 
 
 
fa01e42
0c591a7
 
 
 
 
 
 
fa01e42
 
 
 
 
 
 
 
 
 
 
 
0c591a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c1e5f17
 
 
 
 
 
 
 
 
 
 
faf1006
 
 
 
 
 
 
 
 
 
 
 
0c591a7
 
 
faf1006
0c591a7
 
 
 
faf1006
0c591a7
 
 
c1e5f17
 
0c591a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fa01e42
0c591a7
 
 
fa01e42
 
0c591a7
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
"""
Workflow state management service.
Handles in-memory workflow storage and background execution.
"""

import json
import logging
import os
from datetime import datetime

from src.services.swot_parser import parse_swot_text
from src.utils.analysis_cache import get_cached_analysis, set_cached_analysis

logger = logging.getLogger(__name__)


# In-memory workflow storage
WORKFLOWS: dict = {}

# Configurable delay for granular progress events (ms)
METRIC_DELAY_MS = int(os.getenv("METRIC_DELAY_MS", "300"))


def add_activity_log(workflow_id: str, step: str, message: str):
    """Add an entry to the workflow activity log."""
    if workflow_id in WORKFLOWS:
        if "activity_log" not in WORKFLOWS[workflow_id]:
            WORKFLOWS[workflow_id]["activity_log"] = []
        WORKFLOWS[workflow_id]["activity_log"].append({
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "step": step,
            "message": message
        })


def add_metric(workflow_id: str, source: str, metric: str, value,
               end_date: str = None, fiscal_year: int = None, form: str = None):
    """Add a metric to the workflow metrics array and activity log.

    Args:
        workflow_id: Workflow identifier
        source: Data source (e.g., 'fundamentals', 'valuation')
        metric: Metric name (e.g., 'Revenue', 'P/E')
        value: Metric value
        end_date: Fiscal period end date (e.g., '2023-09-30')
        fiscal_year: Fiscal year number (e.g., 2023)
        form: SEC form type ('10-K' for annual, '10-Q' for quarterly)
    """
    if workflow_id in WORKFLOWS:
        if "metrics" not in WORKFLOWS[workflow_id]:
            WORKFLOWS[workflow_id]["metrics"] = []

        metric_entry = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "source": source,
            "metric": metric,
            "value": value
        }

        # Add temporal fields if provided
        if end_date:
            metric_entry["end_date"] = end_date
        if fiscal_year:
            metric_entry["fiscal_year"] = fiscal_year
        if form:
            metric_entry["form"] = form

        WORKFLOWS[workflow_id]["metrics"].append(metric_entry)

        # Build display value with fiscal period if available
        display_value = f"{value:.2f}" if isinstance(value, float) else str(value)
        if fiscal_year:
            period_label = f"FY {fiscal_year}" if form == "10-K" else f"Q{_quarter_from_date(end_date)} {fiscal_year}" if end_date else f"FY {fiscal_year}"
            display_value = f"{display_value} ({period_label})"
        add_activity_log(workflow_id, source, f"Fetched {metric}: {display_value}")

        # Update MCP status to completed when we get a metric
        if "mcp_status" in WORKFLOWS[workflow_id] and source in WORKFLOWS[workflow_id]["mcp_status"]:
            WORKFLOWS[workflow_id]["mcp_status"][source] = "completed"


def set_mcp_executing(workflow_id: str):
    """Set all MCP servers to 'executing' state when research starts."""
    if workflow_id in WORKFLOWS and "mcp_status" in WORKFLOWS[workflow_id]:
        for source in WORKFLOWS[workflow_id]["mcp_status"]:
            WORKFLOWS[workflow_id]["mcp_status"][source] = "executing"


def _quarter_from_date(date_str: str) -> int:
    """Extract quarter number from a date string (YYYY-MM-DD)."""
    if not date_str:
        return 0
    try:
        month = int(date_str.split("-")[1])
        return (month - 1) // 3 + 1
    except (ValueError, IndexError):
        return 0


def update_mcp_status(workflow_id: str, source: str, status: str):
    """Update MCP server status (idle/executing/completed/failed)."""
    if workflow_id in WORKFLOWS and "mcp_status" in WORKFLOWS[workflow_id]:
        if source in WORKFLOWS[workflow_id]["mcp_status"]:
            WORKFLOWS[workflow_id]["mcp_status"][source] = status


def _extract_metrics_from_raw_data(raw_data: dict) -> list:
    """Extract metrics array from raw_data for cached analysis display.

    Parses the multi_source structure to extract quantitative metrics
    in the same format as add_metric() produces.

    Args:
        raw_data: Parsed raw_data dict from cached analysis

    Returns:
        List of metric entries with source, metric, value, and temporal fields
    """
    metrics = []
    timestamp = datetime.utcnow().isoformat() + "Z"

    multi_source = raw_data.get("multi_source", {})

    # Extract fundamentals from SEC EDGAR and Yahoo Finance
    fin_all = multi_source.get("fundamentals_all", {})
    sec_data = fin_all.get("sec_edgar", {}).get("data", {})
    yf_fund = fin_all.get("yahoo_finance", {}).get("data", {})

    # Fundamentals metrics to extract
    fin_metrics = [
        "revenue", "net_income", "gross_profit", "operating_income",
        "gross_margin_pct", "operating_margin_pct", "net_margin_pct",
        "free_cash_flow", "operating_cash_flow", "total_assets",
        "total_liabilities", "stockholders_equity", "cash",
        "long_term_debt", "net_debt", "rd_expense", "eps", "debt_to_equity"
    ]

    for metric_name in fin_metrics:
        # Prefer SEC EDGAR data, fall back to Yahoo Finance
        # Track which source the data actually came from
        sec_metric = sec_data.get(metric_name)
        yf_metric = yf_fund.get(metric_name)

        if sec_metric is not None:
            metric_data = sec_metric
            actual_source = "sec_edgar"
        elif yf_metric is not None:
            metric_data = yf_metric
            actual_source = "yahoo_finance"
        else:
            continue

        entry = {
            "timestamp": timestamp,
            "source": "fundamentals",
            "metric": metric_name,
            "data_source": actual_source,  # Track actual data source for frontend
        }
        if isinstance(metric_data, dict):
            entry["value"] = metric_data.get("value")
            if metric_data.get("end_date"):
                entry["end_date"] = metric_data["end_date"]
            if metric_data.get("fiscal_year"):
                entry["fiscal_year"] = metric_data["fiscal_year"]
            if metric_data.get("form"):
                entry["form"] = metric_data["form"]
        else:
            entry["value"] = metric_data

        if entry.get("value") is not None:
            metrics.append(entry)

    # Extract valuation metrics from Yahoo Finance
    val_all = multi_source.get("valuation_all", {})
    yf_val = val_all.get("yahoo_finance", {}).get("data", {})

    # Get valuation fetch date if available (point-in-time data)
    # MCP server returns regular_market_time from Yahoo Finance quote data
    val_fetch_date = (
        yf_val.get("_fetch_date")
        or yf_val.get("fetch_date")
        or multi_source.get("valuation_all", {}).get("yahoo_finance", {}).get("regular_market_time")
    )

    val_metrics = [
        "market_cap", "enterprise_value", "trailing_pe", "forward_pe",
        "pb_ratio", "ps_ratio", "trailing_peg", "price_to_fcf",
        "ev_ebitda", "ev_revenue", "revenue_growth", "earnings_growth"
    ]

    for metric_name in val_metrics:
        metric_data = yf_val.get(metric_name)
        if metric_data is not None:
            entry = {
                "timestamp": timestamp,
                "source": "valuation",
                "metric": metric_name,
            }
            if isinstance(metric_data, dict):
                entry["value"] = metric_data.get("value")
                # Extract date if available in metric data
                if metric_data.get("date"):
                    entry["end_date"] = metric_data["date"]
                elif metric_data.get("end_date"):
                    entry["end_date"] = metric_data["end_date"]
                elif val_fetch_date:
                    entry["end_date"] = val_fetch_date
            else:
                entry["value"] = metric_data
                if val_fetch_date:
                    entry["end_date"] = val_fetch_date

            if entry.get("value") is not None:
                metrics.append(entry)

    # Extract volatility metrics
    vol_all = multi_source.get("volatility_all", {})
    ctx = vol_all.get("market_volatility_context", {})
    yf_vol = vol_all.get("yahoo_finance", {}).get("data", {})

    # VIX and VXN from market context
    for vol_metric in ["vix", "vxn"]:
        vol_data = ctx.get(vol_metric, {})
        if vol_data.get("value") is not None:
            entry = {
                "timestamp": timestamp,
                "source": "volatility",
                "metric": vol_metric,
                "value": vol_data["value"]
            }
            if vol_data.get("date"):
                entry["end_date"] = vol_data["date"]
            metrics.append(entry)

    # Beta and volatility from Yahoo Finance
    # Get volatility fetch date if available (MCP returns generated_at at response level)
    vol_fetch_date = (
        yf_vol.get("_fetch_date")
        or yf_vol.get("fetch_date")
        or (vol_all.get("generated_at", "")[:10] if vol_all.get("generated_at") else None)
    )

    for vol_metric in ["beta", "historical_volatility", "implied_volatility"]:
        metric_data = yf_vol.get(vol_metric)
        if metric_data is not None:
            entry = {
                "timestamp": timestamp,
                "source": "volatility",
                "metric": vol_metric,
            }
            if isinstance(metric_data, dict):
                entry["value"] = metric_data.get("value")
                if metric_data.get("date"):
                    entry["end_date"] = metric_data["date"]
                elif metric_data.get("end_date"):
                    entry["end_date"] = metric_data["end_date"]
                elif vol_fetch_date:
                    entry["end_date"] = vol_fetch_date
            else:
                entry["value"] = metric_data
                if vol_fetch_date:
                    entry["end_date"] = vol_fetch_date

            if entry.get("value") is not None:
                metrics.append(entry)

    # Extract macro indicators
    macro_all = multi_source.get("macro_all", {})
    bea_bls = macro_all.get("bea_bls", {}).get("data", {})
    fred = macro_all.get("fred", {}).get("data", {})

    macro_metrics = ["gdp_growth", "cpi_inflation", "unemployment", "interest_rate"]

    for metric_name in macro_metrics:
        # Prefer BEA/BLS, fall back to FRED
        metric_data = bea_bls.get(metric_name) or fred.get(metric_name)
        if metric_data is not None and isinstance(metric_data, dict):
            if metric_data.get("value") is not None:
                entry = {
                    "timestamp": timestamp,
                    "source": "macro",
                    "metric": metric_name,
                    "value": metric_data["value"]
                }
                if metric_data.get("date"):
                    entry["end_date"] = metric_data["date"]
                metrics.append(entry)

    return metrics


def run_workflow_background(workflow_id: str, company_name: str, ticker: str, strategy_focus: str,
                            skip_cache: bool = False, user_api_keys: dict = None):
    """Execute workflow in background thread with progress tracking."""
    try:
        # Check cache first (unless skip_cache is True)
        add_activity_log(workflow_id, "cache", f"Checking cache for {ticker}")
        WORKFLOWS[workflow_id]["current_step"] = "cache"

        if skip_cache:
            add_activity_log(workflow_id, "cache", f"Cache skipped - running fresh analysis")
            cached = None
        else:
            cached = get_cached_analysis(ticker)

        if cached:
            # Cache hit - use cached result
            add_activity_log(workflow_id, "cache", f"Cache HIT - {ticker} analysis found in history")
            add_activity_log(workflow_id, "cache", f"Returning cached result (skipping agentic workflow)")

            # Extract metrics from cached raw_data for frontend display
            cached_raw_data = cached.get("raw_data", {})
            cached_metrics = _extract_metrics_from_raw_data(cached_raw_data)

            WORKFLOWS[workflow_id].update({
                "status": "completed",
                "current_step": "completed",
                "revision_count": cached.get("revision_count", 0),
                "score": cached.get("score", 0),
                "data_source": "cache",
                "metrics": cached_metrics,  # Populate metrics for frontend
                "result": {
                    "company_name": cached.get("company_name", company_name),
                    "score": cached.get("score", 0),
                    "revision_count": cached.get("revision_count", 0),
                    "report_length": cached.get("report_length", 0),
                    "critique": cached.get("critique", ""),
                    "swot_data": cached.get("swot_data", {}),
                    "raw_report": cached.get("raw_report", ""),
                    "data_source": "cache",
                    "provider_used": cached.get("provider_used", "cached"),
                    "raw_data": cached.get("raw_data", {}),
                    "_cache_info": cached.get("_cache_info", {})
                }
            })
            return

        add_activity_log(workflow_id, "cache", f"Cache MISS - {ticker} not in history")
        add_activity_log(workflow_id, "cache", f"Proceeding with full agentic workflow...")

        # Import here to avoid circular imports and init issues
        from src.workflow.graph import app as graph_app

        # Update status to running
        WORKFLOWS[workflow_id]["status"] = "running"
        WORKFLOWS[workflow_id]["current_step"] = "researcher"
        add_activity_log(workflow_id, "input", f"Starting analysis for {company_name} ({ticker})")

        # Initialize MCP status
        WORKFLOWS[workflow_id]["mcp_status"] = {
            "fundamentals": "idle",
            "valuation": "idle",
            "volatility": "idle",
            "macro": "idle",
            "news": "idle",
            "sentiment": "idle"
        }

        # Initialize state
        state = {
            "company_name": company_name,
            "ticker": ticker,
            "strategy_focus": strategy_focus,
            "raw_data": None,
            "draft_report": None,
            "critique": None,
            "revision_count": 0,
            "messages": [],
            "score": 0,
            "data_source": "live",
            "provider_used": None,
            "workflow_id": workflow_id,
            "progress_store": WORKFLOWS,
            "user_api_keys": user_api_keys or {}  # Pass user API keys to nodes
        }

        # Execute workflow
        result = graph_app.invoke(state)

        # Update MCP status based on sources
        sources_available = result.get("sources_available", [])
        sources_failed = result.get("sources_failed", [])
        mcp_status = WORKFLOWS[workflow_id]["mcp_status"]

        for source in sources_available:
            if source in mcp_status:
                mcp_status[source] = "completed"

        for source in sources_failed:
            if source in mcp_status:
                mcp_status[source] = "failed"
                add_activity_log(workflow_id, source, f"MCP server failed")

        # Update LLM status based on failed providers and used provider
        # IMPORTANT: Do this BEFORE checking for errors so frontend sees failures
        llm_providers_failed = result.get("llm_providers_failed", [])
        provider_used = result.get("provider_used", "")
        llm_status = WORKFLOWS[workflow_id]["llm_status"]

        # Mark failed providers
        for provider in llm_providers_failed:
            if provider in llm_status:
                llm_status[provider] = "failed"
                add_activity_log(workflow_id, "llm", f"{provider.capitalize()} provider failed")

        # Mark the used provider as completed
        if provider_used:
            provider_name = provider_used.split(":")[0]
            if provider_name in llm_status:
                llm_status[provider_name] = "completed"

        # Check if workflow ended with an error (LLM failures etc)
        # Do this BEFORE parsing SWOT so we properly abort on errors
        if result.get("error"):
            error_msg = result.get("error")
            add_activity_log(workflow_id, "workflow", f"Workflow failed: {error_msg}")
            WORKFLOWS[workflow_id].update({
                "status": "aborted",
                "error": error_msg,
                "current_step": "aborted"
            })
            return

        # Parse SWOT from draft report
        swot_data = parse_swot_text(result.get("draft_report", ""))

        # Supplement with MCP-aggregated SWOT data (ensures weaknesses/threats aren't lost)
        try:
            raw_data = result.get("raw_data", "{}")
            if isinstance(raw_data, str):
                raw_data = json.loads(raw_data)
            mcp_swot = raw_data.get("aggregated_swot", {})
            if mcp_swot:
                # Add MCP items that aren't already in parsed data
                for category in ["strengths", "weaknesses", "opportunities", "threats"]:
                    existing = set(item.lower()[:50] for item in swot_data.get(category, []))
                    for item in mcp_swot.get(category, []):
                        # Only add if not a duplicate (check first 50 chars lowercased)
                        if item.lower()[:50] not in existing:
                            swot_data[category].append(item)
                            existing.add(item.lower()[:50])
        except Exception as e:
            logger.warning(f"Could not merge MCP SWOT data: {e}")

        # Parse raw_data for MCP display
        raw_data_parsed = {}
        try:
            raw_data_str = result.get("raw_data", "{}")
            if isinstance(raw_data_str, str):
                raw_data_parsed = json.loads(raw_data_str)
            else:
                raw_data_parsed = raw_data_str or {}
        except Exception as e:
            logger.warning(f"Could not parse raw_data: {e}")

        # Extract business address from company profile
        company_profile = raw_data_parsed.get("company_profile", {})
        business_address = company_profile.get("business_address", "")

        # Generate data quality notes from metric reference
        metric_reference = result.get("metric_reference", {})
        quality_notes = {"high_confidence": [], "gaps_or_stale": [], "assumptions": []}
        if metric_reference:
            from src.nodes.analyzer import _generate_data_quality_notes
            quality_notes = _generate_data_quality_notes(metric_reference)
            quality_notes["assumptions"] = []  # LLM assumptions added later if available

        # Build final result
        final_result = {
            "company_name": company_name,
            "business_address": business_address,
            "score": result.get("score", 0),
            "revision_count": result.get("revision_count", 0),
            "report_length": len(result.get("draft_report", "")),
            "critique": result.get("critique", ""),
            "quality_notes": quality_notes,
            "swot_data": swot_data,
            "raw_report": result.get("draft_report", ""),
            "data_source": result.get("data_source", "unknown"),
            "provider_used": result.get("provider_used", "unknown"),
            "raw_data": raw_data_parsed
        }

        # Cache the final result
        set_cached_analysis(ticker, company_name, final_result)
        add_activity_log(workflow_id, "cache", f"Cached analysis for {ticker}")

        # Update with final result
        WORKFLOWS[workflow_id].update({
            "status": "completed",
            "current_step": "completed",
            "revision_count": result.get("revision_count", 0),
            "score": result.get("score", 0),
            "result": final_result
        })

    except Exception as e:
        error_msg = str(e)
        # Determine if this is an abort (critical) or error (retryable)
        # Aborts: Core MCP failures, insufficient data, LLM failures
        is_abort = any(phrase in error_msg for phrase in [
            "Insufficient core data",
            "All MCP servers failed",
            "Need at least 2 of",
            "All LLM providers failed"
        ])

        WORKFLOWS[workflow_id].update({
            "status": "aborted" if is_abort else "error",
            "error": error_msg
        })