Spaces:
Sleeping
Sleeping
| from src.llm_client import get_llm_client | |
| from langsmith import traceable | |
| import time | |
| import json | |
| # VADER Sentiment Analysis | |
| from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer | |
| _vader_analyzer = None | |
| def _get_vader(): | |
| """Lazy-load VADER analyzer (singleton).""" | |
| global _vader_analyzer | |
| if _vader_analyzer is None: | |
| _vader_analyzer = SentimentIntensityAnalyzer() | |
| return _vader_analyzer | |
| def _compute_vader_sentiment(texts: list) -> dict: | |
| """ | |
| Compute VADER sentiment scores for a list of texts. | |
| Args: | |
| texts: List of strings (headlines, titles, etc.) | |
| Returns: | |
| { | |
| "avg_compound": 0.42, | |
| "min_compound": -0.31, | |
| "max_compound": 0.78, | |
| "positive_count": 3, | |
| "negative_count": 1, | |
| "neutral_count": 1, | |
| "total_count": 5 | |
| } | |
| or None if no texts provided | |
| """ | |
| if not texts: | |
| return None | |
| vader = _get_vader() | |
| scores = [] | |
| for text in texts: | |
| if text and isinstance(text, str): | |
| score = vader.polarity_scores(text)["compound"] | |
| scores.append(score) | |
| if not scores: | |
| return None | |
| return { | |
| "avg_compound": round(sum(scores) / len(scores), 3), | |
| "min_compound": round(min(scores), 3), | |
| "max_compound": round(max(scores), 3), | |
| "positive_count": sum(1 for s in scores if s > 0.05), | |
| "negative_count": sum(1 for s in scores if s < -0.05), | |
| "neutral_count": sum(1 for s in scores if -0.05 <= s <= 0.05), | |
| "total_count": len(scores) | |
| } | |
| # Financial institution detection for EV/EBITDA exclusion | |
| FINANCIAL_SECTORS = { | |
| "financial services", "financial", "banking", "banks", | |
| "insurance", "real estate investment trust", "reit", | |
| "investment management", "capital markets", "diversified financial services", | |
| "consumer finance", "asset management", "mortgage finance", | |
| } | |
| FINANCIAL_INDUSTRIES = { | |
| "banks", "regional banks", "diversified banks", "money center banks", | |
| "insurance", "life insurance", "property insurance", "reinsurance", | |
| "real estate", "reit", "mortgage reits", "equity reits", | |
| "asset management", "investment banking", "capital markets", | |
| "consumer finance", "specialty finance", | |
| } | |
| # Fallback: known financial tickers when sector data unavailable | |
| FINANCIAL_TICKERS = { | |
| "JPM", "BAC", "WFC", "GS", "MS", "C", "USB", "PNC", "TFC", "COF", | |
| "AXP", "BLK", "SCHW", "CME", "ICE", "SPGI", "MCO", | |
| "BRK.A", "BRK.B", "MET", "PRU", "AIG", "ALL", "TRV", "PGR", "CB", | |
| "AMT", "PLD", "CCI", "EQIX", "PSA", "O", "WELL", "AVB", "EQR", | |
| } | |
| # ============================================================================= | |
| # REVISION MODE: Conditional Focus Area Blocks | |
| # These are included in revision prompts based on which rubric criteria failed | |
| # ============================================================================= | |
| EVIDENCE_GROUNDING_BLOCK = """ | |
| **EVIDENCE GROUNDING (Critical)** | |
| - Every claim must cite a specific metric from the input data | |
| - Use exact field names: `revenue`, `net_margin_pct`, `trailing_pe`, etc. | |
| - Format citations as: "[Metric]: [Value] ([Source], [Period])" | |
| - If a metric was flagged as fabricated, remove it entirely or replace with actual data | |
| """ | |
| CONSTRAINT_COMPLIANCE_BLOCK = """ | |
| **CONSTRAINT COMPLIANCE (Critical)** | |
| - Remove any language that sounds like investment advice | |
| - Check all temporal labels — TTM vs FY vs Q must match the source | |
| - Add confidence levels to key conclusions: (High/Medium/Low) | |
| - Do not use EV/EBITDA for financial institutions | |
| - For missing data, state "DATA NOT PROVIDED" — do not estimate | |
| """ | |
| SPECIFICITY_BLOCK = """ | |
| **SPECIFICITY & ACTIONABILITY** | |
| - Replace generic statements with company-specific observations | |
| - Quantify every claim possible: not "strong margins" but "31.0% operating margin" | |
| - Remove business clichés: "leveraging," "best-in-class," "synergies" | |
| """ | |
| INSIGHT_BLOCK = """ | |
| **STRATEGIC INSIGHT** | |
| - Connect observations across data baskets (e.g., link margin trends to macro rates) | |
| - Go beyond restating metrics — explain WHY they matter | |
| - Identify non-obvious relationships in the data | |
| """ | |
| COMPLETENESS_BLOCK = """ | |
| **COMPLETENESS & BALANCE** | |
| - Ensure ALL required sections are present (Strengths, Weaknesses, Opportunities, Threats, Data Quality Notes) | |
| - Balance quadrants — no section should be filler or disproportionately thin | |
| """ | |
| CLARITY_BLOCK = """ | |
| **CLARITY & STRUCTURE** | |
| - Use consistent formatting throughout | |
| - Ensure no contradictions across sections | |
| - Make output scannable — executives should grasp key points in 30 seconds | |
| """ | |
| def _is_financial_institution(sector: str, industry: str, ticker: str) -> bool: | |
| """Detect if company is a financial institution (EV/EBITDA not meaningful).""" | |
| sector_lower = (sector or "").lower().strip() | |
| industry_lower = (industry or "").lower().strip() | |
| if any(fs in sector_lower for fs in FINANCIAL_SECTORS): | |
| return True | |
| if any(fi in industry_lower for fi in FINANCIAL_INDUSTRIES): | |
| return True | |
| if ticker and ticker.upper() in FINANCIAL_TICKERS: | |
| return True | |
| return False | |
| def _extract_company_profile(raw_data: str) -> dict: | |
| """Extract company profile details from SEC EDGAR and Yahoo Finance data.""" | |
| try: | |
| data = json.loads(raw_data) | |
| except json.JSONDecodeError: | |
| return {} | |
| multi_source = data.get("multi_source", {}) | |
| profile = {} | |
| # Try SEC EDGAR for business address (most authoritative) | |
| # Handle both old format (with "data" wrapper) and new flat format | |
| fin_all = multi_source.get("fundamentals_all", {}) | |
| sec_source = fin_all.get("sec_edgar", {}) | |
| # Check if old format with "data" wrapper or new flat format | |
| sec_data = sec_source.get("data", sec_source) if "data" in sec_source else sec_source | |
| sec_profile = sec_data.get("company_info", {}) or sec_data.get("profile", {}) | |
| if sec_profile: | |
| # SEC EDGAR company info | |
| city = sec_profile.get("city", "") | |
| state = sec_profile.get("state", sec_profile.get("stateOrCountry", "")) | |
| if city and state: | |
| profile["business_address"] = f"{city}, {state}" | |
| profile["cik"] = sec_profile.get("cik", "") | |
| profile["sic"] = sec_profile.get("sic", "") | |
| profile["sic_description"] = sec_profile.get("sicDescription", "") | |
| # Try Yahoo Finance for sector/industry and other details | |
| yf_val_source = multi_source.get("valuation_all", {}).get("yahoo_finance", {}) | |
| yf_val = yf_val_source.get("data", yf_val_source) if "data" in yf_val_source else yf_val_source | |
| yf_profile = yf_val.get("profile", {}) | |
| if not yf_profile: | |
| # Try fundamentals yahoo_finance | |
| yf_fund_source = fin_all.get("yahoo_finance", {}) | |
| yf_fund = yf_fund_source.get("data", yf_fund_source) if "data" in yf_fund_source else yf_fund_source | |
| yf_profile = yf_fund.get("profile", {}) | |
| if yf_profile: | |
| profile["sector"] = yf_profile.get("sector", "") | |
| profile["industry"] = yf_profile.get("industry", "") | |
| profile["employees"] = yf_profile.get("fullTimeEmployees", "") | |
| profile["website"] = yf_profile.get("website", "") | |
| # Yahoo Finance may also have address | |
| if not profile.get("business_address"): | |
| city = yf_profile.get("city", "") | |
| state = yf_profile.get("state", "") | |
| country = yf_profile.get("country", "") | |
| if city: | |
| addr_parts = [city] | |
| if state: | |
| addr_parts.append(state) | |
| if country and country != "United States": | |
| addr_parts.append(country) | |
| profile["business_address"] = ", ".join(addr_parts) | |
| return profile | |
| def _add_activity_log(workflow_id, progress_store, step, message): | |
| """Helper to add activity log entry.""" | |
| if workflow_id and progress_store: | |
| from src.services.workflow_store import add_activity_log | |
| add_activity_log(workflow_id, step, message) | |
| def _extract_temporal_metric(metric_data: dict) -> dict: | |
| """Extract metric value with temporal metadata (fiscal year, period end, form type).""" | |
| if not isinstance(metric_data, dict): | |
| return {"value": metric_data} | |
| return { | |
| "value": metric_data.get("value"), | |
| "end_date": metric_data.get("end_date"), | |
| "fiscal_year": metric_data.get("fiscal_year"), | |
| "form": metric_data.get("form"), # "10-K" (annual) or "10-Q" (quarterly) | |
| } | |
| def _extract_valuation_metric(metric_data: dict) -> dict: | |
| """Extract valuation metric with as_of date (new MCP structure).""" | |
| if not isinstance(metric_data, dict): | |
| return {"value": metric_data} | |
| return { | |
| "value": metric_data.get("value"), | |
| "end_date": metric_data.get("as_of"), # MCP uses "as_of" for valuation | |
| } | |
| def _get_fiscal_period_label(metric: dict) -> str: | |
| """Format fiscal period label from temporal data (e.g., 'FY 2023' or 'Q3 2024').""" | |
| if not isinstance(metric, dict): | |
| return "" | |
| form = metric.get("form", "") | |
| fy = metric.get("fiscal_year") | |
| end_date = metric.get("end_date") | |
| if not fy: | |
| return "" | |
| if form == "10-K": | |
| return f"FY {fy}" | |
| elif form == "10-Q" and end_date: | |
| try: | |
| # Parse quarter from end date | |
| month = int(end_date.split("-")[1]) | |
| quarter = (month - 1) // 3 + 1 | |
| return f"Q{quarter} {fy}" | |
| except (ValueError, IndexError): | |
| return f"FY {fy}" | |
| return f"FY {fy}" | |
| def _format_currency(value): | |
| """Format large numbers as currency (B/M).""" | |
| if value is None: | |
| return "N/A" | |
| if isinstance(value, dict): | |
| value = value.get("value") | |
| if value is None: | |
| return "N/A" | |
| if isinstance(value, (int, float)): | |
| if abs(value) >= 1e12: | |
| return f"${value/1e12:.2f}T" | |
| if abs(value) >= 1e9: | |
| return f"${value/1e9:.2f}B" | |
| if abs(value) >= 1e6: | |
| return f"${value/1e6:.0f}M" | |
| return f"${value:,.0f}" | |
| return str(value) | |
| def _format_number(value, suffix="", decimals=2): | |
| """Format a number with optional suffix.""" | |
| if value is None: | |
| return "N/A" | |
| if isinstance(value, dict): | |
| value = value.get("value") | |
| if value is None: | |
| return "N/A" | |
| if isinstance(value, (int, float)): | |
| return f"{value:.{decimals}f}{suffix}" | |
| return str(value) | |
| def _get_period_label(metric_data: dict) -> str: | |
| """Get period label from metric data (e.g., 'FY 2024', 'Q3 2024', '2024-11').""" | |
| if not isinstance(metric_data, dict): | |
| return "" | |
| # Check for fiscal year/form info | |
| fy = metric_data.get("fiscal_year") | |
| form = metric_data.get("form", "") | |
| end_date = metric_data.get("end_date", "") | |
| date = metric_data.get("date", "") | |
| if fy: | |
| if form == "10-K": | |
| return f"FY {fy}" | |
| elif form == "10-Q" and end_date: | |
| try: | |
| month = int(end_date.split("-")[1]) | |
| quarter = (month - 1) // 3 + 1 | |
| return f"Q{quarter} {fy}" | |
| except: | |
| return f"FY {fy}" | |
| return f"FY {fy}" | |
| # Fallback to date | |
| if end_date: | |
| return end_date[:10] | |
| if date: | |
| return str(date)[:10] | |
| return "" | |
| def _get_value(metric_data) -> any: | |
| """Extract value from metric data (handles both dict and plain values).""" | |
| if isinstance(metric_data, dict): | |
| return metric_data.get("value") | |
| return metric_data | |
| def _generate_data_report(raw_data: str, is_financial: bool = False) -> str: | |
| """Generate complete multi-source data report with simple tables. | |
| Args: | |
| raw_data: JSON string of research data | |
| is_financial: If True, exclude EV/EBITDA for financial institutions | |
| """ | |
| try: | |
| data = json.loads(raw_data) | |
| except json.JSONDecodeError: | |
| return "Error: Could not parse data" | |
| lines = [] | |
| company = data.get("company_name", "Unknown") | |
| ticker = data.get("ticker", "N/A") | |
| multi_source = data.get("multi_source", {}) | |
| metrics = data.get("metrics", {}) | |
| lines.append(f"# Data Report: {company} ({ticker})") | |
| lines.append("") | |
| # ========== FINANCIALS ========== | |
| fin_all = multi_source.get("fundamentals_all", {}) | |
| # Handle both old format (with "data" wrapper) and new flat format | |
| sec_source = fin_all.get("sec_edgar", {}) | |
| sec_data = sec_source.get("data", sec_source) if "data" in sec_source else sec_source | |
| yf_source = fin_all.get("yahoo_finance", {}) | |
| yf_data = yf_source.get("data", yf_source) if "data" in yf_source else yf_source | |
| if sec_data or yf_data: | |
| lines.append("## Financials") | |
| lines.append("Primary: SEC EDGAR | Secondary: Yahoo Finance") | |
| lines.append("") | |
| lines.append("| Metric | Period | SEC EDGAR | Yahoo Finance |") | |
| lines.append("|--------|--------|-----------|---------------|") | |
| fin_metrics = [ | |
| ("Revenue", "revenue", _format_currency), | |
| ("Net Income", "net_income", _format_currency), | |
| ("Gross Profit", "gross_profit", _format_currency), | |
| ("Operating Income", "operating_income", _format_currency), | |
| ("Gross Margin %", "gross_margin_pct", lambda v: _format_number(v, "%")), | |
| ("Operating Margin %", "operating_margin_pct", lambda v: _format_number(v, "%")), | |
| ("Net Margin %", "net_margin_pct", lambda v: _format_number(v, "%")), | |
| ("Free Cash Flow", "free_cash_flow", _format_currency), | |
| ("Operating Cash Flow", "operating_cash_flow", _format_currency), | |
| ("Total Assets", "total_assets", _format_currency), | |
| ("Total Liabilities", "total_liabilities", _format_currency), | |
| ("Stockholders Equity", "stockholders_equity", _format_currency), | |
| ("Cash", "cash", _format_currency), | |
| ("Long-term Debt", "long_term_debt", _format_currency), | |
| ("Net Debt", "net_debt", _format_currency), | |
| ("R&D Expense", "rd_expense", _format_currency), | |
| ] | |
| for name, key, fmt in fin_metrics: | |
| sec_val = sec_data.get(key) | |
| yf_val = yf_data.get(key) | |
| period = _get_period_label(sec_val) or _get_period_label(yf_val) | |
| sec_str = fmt(_get_value(sec_val)) if sec_val else "N/A" | |
| yf_str = fmt(_get_value(yf_val)) if yf_val else "N/A" | |
| if sec_str != "N/A" or yf_str != "N/A": | |
| lines.append(f"| {name} | {period} | {sec_str} | {yf_str} |") | |
| lines.append("") | |
| # ========== VALUATION ========== | |
| val_all = multi_source.get("valuation_all", {}) | |
| yf_val_src = val_all.get("yahoo_finance", {}) | |
| yf_val = yf_val_src.get("data", yf_val_src) if "data" in yf_val_src else yf_val_src | |
| av_val_src = val_all.get("alpha_vantage", {}) | |
| av_val = av_val_src.get("data", av_val_src) if "data" in av_val_src else av_val_src | |
| if yf_val or av_val: | |
| lines.append("## Valuation") | |
| lines.append("Primary: Yahoo Finance | Secondary: Alpha Vantage") | |
| lines.append("") | |
| lines.append("| Metric | Yahoo Finance | Alpha Vantage |") | |
| lines.append("|--------|---------------|---------------|") | |
| val_metrics = [ | |
| ("Market Cap", "market_cap", _format_currency), | |
| ("Enterprise Value", "enterprise_value", _format_currency), | |
| ("P/E Trailing", "trailing_pe", lambda v: _format_number(v, "x")), | |
| ("P/E Forward", "forward_pe", lambda v: _format_number(v, "x")), | |
| ("P/B Ratio", "pb_ratio", lambda v: _format_number(v, "x")), | |
| ("P/S Ratio", "ps_ratio", lambda v: _format_number(v, "x")), | |
| ("PEG Ratio", "trailing_peg", lambda v: _format_number(v, "x")), | |
| ("Price/FCF", "price_to_fcf", lambda v: _format_number(v, "x")), | |
| ("Revenue Growth", "revenue_growth", lambda v: _format_number(v * 100 if v and abs(v) < 10 else v, "%") if v else "N/A"), | |
| ("Earnings Growth", "earnings_growth", lambda v: _format_number(v * 100 if v and abs(v) < 10 else v, "%") if v else "N/A"), | |
| ] | |
| # Only include EV/EBITDA for non-financial companies | |
| if not is_financial: | |
| val_metrics.insert(6, ("EV/EBITDA", "ev_ebitda", lambda v: _format_number(v, "x"))) | |
| val_metrics.insert(7, ("EV/Revenue", "ev_revenue", lambda v: _format_number(v, "x"))) | |
| for name, key, fmt in val_metrics: | |
| y = yf_val.get(key) | |
| a = av_val.get(key) | |
| ys = fmt(_get_value(y)) if y is not None else "N/A" | |
| avs = fmt(_get_value(a)) if a is not None else "N/A" | |
| if ys != "N/A" or avs != "N/A": | |
| lines.append(f"| {name} | {ys} | {avs} |") | |
| lines.append("") | |
| # ========== VOLATILITY ========== | |
| vol_all = multi_source.get("volatility_all", {}) | |
| if vol_all: | |
| lines.append("## Volatility") | |
| lines.append("Primary: FRED + Yahoo | Secondary: Alpha Vantage") | |
| lines.append("") | |
| lines.append("| Metric | Date | Primary | Secondary |") | |
| lines.append("|--------|------|---------|-----------|") | |
| ctx = vol_all.get("market_volatility_context", {}) | |
| vix = ctx.get("vix", {}) | |
| vxn = ctx.get("vxn", {}) | |
| yf_vol_src = vol_all.get("yahoo_finance", {}) | |
| yf_vol = yf_vol_src.get("data", yf_vol_src) if "data" in yf_vol_src else yf_vol_src | |
| av_vol_src = vol_all.get("alpha_vantage", {}) | |
| av_vol = av_vol_src.get("data", av_vol_src) if "data" in av_vol_src else av_vol_src | |
| # VIX | |
| if vix.get("value"): | |
| lines.append(f"| VIX | {vix.get('date', '')} | {_format_number(vix.get('value'))} | - |") | |
| # VXN | |
| if vxn.get("value"): | |
| lines.append(f"| VXN | {vxn.get('date', '')} | {_format_number(vxn.get('value'))} | - |") | |
| # Beta | |
| beta_yf = _get_value(yf_vol.get("beta")) | |
| beta_av = _get_value(av_vol.get("beta")) if av_vol else None | |
| if beta_yf or beta_av: | |
| lines.append(f"| Beta | - | {_format_number(beta_yf, '', 3)} | {_format_number(beta_av, '', 3) if beta_av else 'N/A'} |") | |
| # Historical Volatility | |
| hv_yf = _get_value(yf_vol.get("historical_volatility")) | |
| hv_av = _get_value(av_vol.get("historical_volatility")) if av_vol else None | |
| if hv_yf or hv_av: | |
| lines.append(f"| Historical Volatility | - | {_format_number(hv_yf, '%')} | {_format_number(hv_av, '%') if hv_av else 'N/A'} |") | |
| # Implied Volatility | |
| iv_yf = _get_value(yf_vol.get("implied_volatility")) | |
| if iv_yf: | |
| lines.append(f"| Implied Volatility | - | {_format_number(iv_yf, '%')} | N/A |") | |
| lines.append("") | |
| # ========== MACRO ========== | |
| macro_all = multi_source.get("macro_all", {}) | |
| if macro_all: | |
| lines.append("## Macro Indicators") | |
| lines.append("Primary: BEA/BLS | Secondary: FRED") | |
| lines.append("") | |
| lines.append("| Metric | Period | BEA/BLS | FRED |") | |
| lines.append("|--------|--------|---------|------|") | |
| bea_src = macro_all.get("bea_bls", {}) | |
| bea_bls = bea_src.get("data", bea_src) if "data" in bea_src else bea_src | |
| fred_src = macro_all.get("fred", {}) | |
| fred = fred_src.get("data", fred_src) if "data" in fred_src else fred_src | |
| # GDP Growth | |
| gdp_p = bea_bls.get("gdp_growth", {}) or {} | |
| gdp_f = fred.get("gdp_growth", {}) or {} | |
| gdp_date = gdp_p.get("date", "") or gdp_f.get("date", "") | |
| lines.append(f"| GDP Growth | {gdp_date} | {_format_number(gdp_p.get('value'), '%')} | {_format_number(gdp_f.get('value'), '%')} |") | |
| # CPI/Inflation | |
| cpi_p = bea_bls.get("cpi_inflation", {}) or {} | |
| cpi_f = fred.get("cpi_inflation", {}) or {} | |
| cpi_date = cpi_p.get("date", "") or cpi_f.get("date", "") | |
| lines.append(f"| Inflation (CPI YoY) | {cpi_date} | {_format_number(cpi_p.get('value'), '%')} | {_format_number(cpi_f.get('value'), '%')} |") | |
| # Unemployment | |
| unemp_p = bea_bls.get("unemployment", {}) or {} | |
| unemp_f = fred.get("unemployment", {}) or {} | |
| unemp_date = unemp_p.get("date", "") or unemp_f.get("date", "") | |
| lines.append(f"| Unemployment | {unemp_date} | {_format_number(unemp_p.get('value'), '%')} | {_format_number(unemp_f.get('value'), '%')} |") | |
| # Fed Funds Rate (FRED only) | |
| rates = fred.get("interest_rate", {}) or {} | |
| lines.append(f"| Fed Funds Rate | {rates.get('date', '')} | - | {_format_number(rates.get('value'), '%')} |") | |
| lines.append("") | |
| # ========== NEWS ========== | |
| news = metrics.get("news", {}) | |
| if news: | |
| # New format: {tavily: [...], nyt: [...], newsapi: [...]} | |
| all_articles = [] | |
| for source in ["tavily", "nyt", "newsapi"]: | |
| for article in news.get(source, []): | |
| all_articles.append({**article, "source": source}) | |
| if all_articles: | |
| lines.append("## News Articles") | |
| lines.append("") | |
| lines.append("| # | Title | Source | URL |") | |
| lines.append("|---|-------|--------|-----|") | |
| for i, article in enumerate(all_articles[:10], 1): | |
| title = article.get("title", "Untitled") | |
| source = article.get("source", "Unknown") | |
| url = article.get("url", "") | |
| lines.append(f"| {i} | {title} | {source} | {url} |") | |
| lines.append("") | |
| # ========== SENTIMENT ========== | |
| sentiment = metrics.get("sentiment", {}) | |
| if sentiment: | |
| # New format: {finnhub: [...], reddit: [...]} | |
| finnhub_articles = sentiment.get("finnhub", []) | |
| reddit_posts = sentiment.get("reddit", []) | |
| lines.append("## Sentiment Analysis") | |
| lines.append("") | |
| lines.append("| Source | Items |") | |
| lines.append("|--------|-------|") | |
| lines.append(f"| Finnhub | {len(finnhub_articles)} articles |") | |
| lines.append(f"| Reddit | {len(reddit_posts)} posts |") | |
| lines.append("") | |
| # Show Finnhub articles | |
| if finnhub_articles: | |
| lines.append("### Finnhub Articles") | |
| lines.append("") | |
| lines.append("| # | Title | URL |") | |
| lines.append("|---|-------|-----|") | |
| for i, article in enumerate(finnhub_articles[:10], 1): | |
| title = article.get("title", "Untitled") | |
| url = article.get("url", "") | |
| lines.append(f"| {i} | {title} | {url} |") | |
| lines.append("") | |
| # Show Reddit posts | |
| if reddit_posts: | |
| lines.append("### Reddit Posts") | |
| lines.append("") | |
| lines.append("| # | Title | URL |") | |
| lines.append("|---|-------|-----|") | |
| for i, post in enumerate(reddit_posts[:10], 1): | |
| title = post.get("title", "Untitled") | |
| url = post.get("url", "") | |
| lines.append(f"| {i} | {title} | {url} |") | |
| lines.append("") | |
| lines.append("---") | |
| lines.append("") | |
| return "\n".join(lines) | |
| def _extract_key_metrics(raw_data: str) -> dict: | |
| """Extract and format key metrics from raw JSON data, preserving temporal info.""" | |
| try: | |
| data = json.loads(raw_data) | |
| except json.JSONDecodeError: | |
| return {"error": "Could not parse raw data"} | |
| metrics = data.get("metrics", {}) | |
| # Extract company profile for business address | |
| company_profile = data.get("company_profile", {}) | |
| extracted = { | |
| "company": data.get("company_name", "Unknown"), | |
| "ticker": data.get("ticker", "N/A"), | |
| "business_address": company_profile.get("business_address", ""), | |
| "fundamentals": {}, | |
| "valuation": {}, | |
| "volatility": {}, | |
| "macro": {}, | |
| "news": {}, | |
| "sentiment": {}, | |
| "aggregated_swot": data.get("aggregated_swot", {}) | |
| } | |
| # Extract fundamentals with temporal data | |
| # Structure varies: | |
| # Formats supported: | |
| # - Old: {"sec_edgar": {"data": {...}}, "yahoo_finance": {"data": {...}}} | |
| # - New (flat): {"sec_edgar": {...}, "yahoo_finance": {...}} | |
| fin = metrics.get("fundamentals", {}) | |
| if not fin or "error" in fin: | |
| fin = data.get("multi_source", {}).get("fundamentals_all", {}) | |
| if fin and "error" not in fin: | |
| # Handle both old format (with "data" wrapper) and new flat format | |
| sec_source = fin.get("sec_edgar", {}) | |
| sec_data = sec_source.get("data", sec_source) if "data" in sec_source else sec_source | |
| yf_source = fin.get("yahoo_finance", {}) | |
| yf_data = yf_source.get("data", yf_source) if "data" in yf_source else yf_source | |
| # Merge with SEC as primary | |
| fin_data = {**yf_data, **sec_data} # SEC overwrites YF where both exist | |
| extracted["fundamentals"] = { | |
| "revenue": _extract_temporal_metric(fin_data.get("revenue", {})), | |
| "revenue_cagr_3yr": fin_data.get("revenue_growth_3yr"), | |
| "net_margin": _extract_temporal_metric(fin_data.get("net_margin_pct", {})), | |
| "gross_margin": _extract_temporal_metric(fin_data.get("gross_margin_pct", {})), | |
| "operating_margin": _extract_temporal_metric(fin_data.get("operating_margin_pct", {})), | |
| "eps": _extract_temporal_metric(fin_data.get("eps", {})), | |
| "debt_to_equity": _extract_temporal_metric(fin_data.get("debt_to_equity", {})), | |
| "free_cash_flow": _extract_temporal_metric(fin_data.get("free_cash_flow", {})), | |
| "net_income": _extract_temporal_metric(fin_data.get("net_income", {})), | |
| } | |
| # Extract valuation (with temporal data) | |
| # Handle both old format (with "data" wrapper) and new flat format | |
| val = metrics.get("valuation", {}) | |
| if not val or "error" in val: | |
| val = data.get("multi_source", {}).get("valuation_all", {}) | |
| if val and "error" not in val: | |
| # New MCP structure: {yahoo_finance: {...}, alpha_vantage: {...}} | |
| # Check both sources - yahoo_finance is primary, alpha_vantage is fallback | |
| yf_val = val.get("yahoo_finance", {}) | |
| av_val = val.get("alpha_vantage", {}) | |
| extracted["valuation"] = { | |
| "pe_trailing": _extract_valuation_metric(yf_val.get("trailing_pe") or av_val.get("trailing_pe", {})), | |
| "pe_forward": _extract_valuation_metric(yf_val.get("forward_pe") or av_val.get("forward_pe", {})), | |
| "pb_ratio": _extract_valuation_metric(yf_val.get("pb_ratio") or av_val.get("pb_ratio", {})), | |
| "ps_ratio": _extract_valuation_metric(yf_val.get("ps_ratio") or av_val.get("ps_ratio", {})), | |
| "ev_ebitda": _extract_valuation_metric(av_val.get("ev_ebitda") or yf_val.get("ev_ebitda", {})), | |
| "valuation_signal": val.get("overall_signal"), | |
| } | |
| # Extract volatility (with temporal data) | |
| # New structure: {fred: {vix: {...}}, yahoo_finance: {beta: {...}}} | |
| vol = metrics.get("volatility", {}) | |
| if not vol or "error" in vol: | |
| vol = data.get("multi_source", {}).get("volatility_all", {}) | |
| if vol and "error" not in vol: | |
| # Yahoo Finance data (beta, historical volatility) | |
| yf_vol_source = vol.get("yahoo_finance", {}) | |
| yf_vol = yf_vol_source.get("data", yf_vol_source) if "data" in yf_vol_source else yf_vol_source | |
| # FRED data (VIX) | |
| fred_source = vol.get("fred", {}) | |
| fred_vol = fred_source.get("data", fred_source) if "data" in fred_source else fred_source | |
| extracted["volatility"] = { | |
| "beta": _extract_valuation_metric(yf_vol.get("beta", {})), | |
| "vix": _extract_valuation_metric(fred_vol.get("vix", {})), | |
| "historical_volatility": _extract_valuation_metric(yf_vol.get("historical_volatility", {})), | |
| } | |
| # Extract macro (with temporal data) | |
| # New structure: {bea: {gdp_growth: {...}}, bls: {unemployment_rate: {...}}, fred: {fed_funds_rate: {...}}} | |
| macro = metrics.get("macro", {}) | |
| if not macro or "error" in macro: | |
| macro = data.get("multi_source", {}).get("macro_all", {}) | |
| if macro and "error" not in macro: | |
| # BEA data (GDP) | |
| bea_source = macro.get("bea", {}) | |
| bea = bea_source.get("data", bea_source) if "data" in bea_source else bea_source | |
| # BLS data (unemployment, CPI) | |
| bls_source = macro.get("bls", {}) | |
| bls = bls_source.get("data", bls_source) if "data" in bls_source else bls_source | |
| # FRED data (interest rates) | |
| fred_source = macro.get("fred", {}) | |
| fred = fred_source.get("data", fred_source) if "data" in fred_source else fred_source | |
| extracted["macro"] = { | |
| "gdp_growth": _extract_valuation_metric(bea.get("gdp_growth", {})), | |
| "interest_rate": _extract_valuation_metric(fred.get("interest_rate", {})), | |
| "inflation": _extract_valuation_metric(bls.get("cpi_inflation", {})), | |
| "unemployment": _extract_valuation_metric(bls.get("unemployment", {})), | |
| } | |
| # Extract news with VADER sentiment | |
| # New format: {tavily: [...], nyt: [...], newsapi: [...]} | |
| news = metrics.get("news", {}) | |
| if news and "error" not in news: | |
| all_articles = [] | |
| for source in ["tavily", "nyt", "newsapi"]: | |
| all_articles.extend(news.get(source, [])) | |
| headlines = [a.get("title", "") for a in all_articles if a.get("title")] | |
| # Compute VADER sentiment on headlines | |
| vader_news = _compute_vader_sentiment(headlines) | |
| extracted["news"] = { | |
| "article_count": len(all_articles), | |
| "headlines": [a.get("title", "")[:100] for a in all_articles[:5]], | |
| "vader_sentiment": vader_news, | |
| } | |
| # Extract sentiment with VADER on reddit posts | |
| # New format: {finnhub: [...], reddit: [...]} | |
| sent = metrics.get("sentiment", {}) | |
| if sent and "error" not in sent: | |
| reddit_posts = sent.get("reddit", []) | |
| reddit_titles = [p.get("title", "") for p in reddit_posts if p.get("title")] | |
| # Compute VADER sentiment on reddit titles | |
| vader_reddit = _compute_vader_sentiment(reddit_titles) | |
| extracted["sentiment"] = { | |
| "finnhub_count": len(sent.get("finnhub", [])), | |
| "reddit_count": len(reddit_posts), | |
| "vader_reddit": vader_reddit, | |
| } | |
| return extracted | |
| def _format_metrics_for_prompt(extracted: dict, is_financial: bool = False) -> str: | |
| """Format extracted metrics into a clear text for the LLM. | |
| Args: | |
| extracted: Extracted metrics dictionary | |
| is_financial: If True, exclude EV/EBITDA from valuation metrics | |
| """ | |
| lines = [] | |
| lines.append(f"Company: {extracted['company']} ({extracted['ticker']})") | |
| lines.append("") | |
| # Financials (with temporal context) | |
| fin = extracted.get("fundamentals", {}) | |
| if fin: | |
| lines.append("=== FINANCIALS (from SEC EDGAR) ===") | |
| # Revenue with fiscal period | |
| revenue = fin.get("revenue", {}) | |
| if isinstance(revenue, dict) and revenue.get("value"): | |
| period = _get_fiscal_period_label(revenue) | |
| period_str = f" ({period})" if period else "" | |
| lines.append(f"- Revenue: ${revenue['value']:,.0f}{period_str}") | |
| elif isinstance(revenue, (int, float)): | |
| lines.append(f"- Revenue: ${revenue:,.0f}") | |
| cagr = fin.get("revenue_cagr_3yr") | |
| if cagr: | |
| if isinstance(cagr, dict) and cagr.get("value") is not None: | |
| lines.append(f"- Revenue CAGR (3yr): {cagr['value']:.1f}%") | |
| elif isinstance(cagr, (int, float)): | |
| lines.append(f"- Revenue CAGR (3yr): {cagr:.1f}%") | |
| # Net margin with fiscal period | |
| net_margin = fin.get("net_margin", {}) | |
| if isinstance(net_margin, dict) and net_margin.get("value") is not None: | |
| period = _get_fiscal_period_label(net_margin) | |
| period_str = f" ({period})" if period else "" | |
| lines.append(f"- Net Margin: {net_margin['value']:.1f}%{period_str}") | |
| elif isinstance(net_margin, (int, float)): | |
| lines.append(f"- Net Margin: {net_margin:.1f}%") | |
| # EPS with fiscal period | |
| eps = fin.get("eps", {}) | |
| if isinstance(eps, dict) and eps.get("value"): | |
| period = _get_fiscal_period_label(eps) | |
| period_str = f" ({period})" if period else "" | |
| lines.append(f"- EPS: ${eps['value']:.2f}{period_str}") | |
| elif isinstance(eps, (int, float)): | |
| lines.append(f"- EPS: ${eps:.2f}") | |
| # Debt/Equity with fiscal period | |
| d_to_e = fin.get("debt_to_equity", {}) | |
| if isinstance(d_to_e, dict) and d_to_e.get("value") is not None: | |
| period = _get_fiscal_period_label(d_to_e) | |
| period_str = f" ({period})" if period else "" | |
| lines.append(f"- Debt/Equity: {d_to_e['value']:.2f}{period_str}") | |
| elif isinstance(d_to_e, (int, float)): | |
| lines.append(f"- Debt/Equity: {d_to_e:.2f}") | |
| # Free Cash Flow with fiscal period | |
| fcf = fin.get("free_cash_flow", {}) | |
| if isinstance(fcf, dict) and fcf.get("value"): | |
| period = _get_fiscal_period_label(fcf) | |
| period_str = f" ({period})" if period else "" | |
| lines.append(f"- Free Cash Flow: ${fcf['value']:,.0f}{period_str}") | |
| elif isinstance(fcf, (int, float)): | |
| lines.append(f"- Free Cash Flow: ${fcf:,.0f}") | |
| lines.append("") | |
| # Helper to extract value from temporal dict or plain value | |
| def _get_val(d): | |
| if isinstance(d, dict): | |
| return d.get("value") | |
| return d | |
| # Valuation | |
| val = extracted.get("valuation", {}) | |
| if val: | |
| lines.append("=== VALUATION (from Yahoo Finance) ===") | |
| pe_t = _get_val(val.get("pe_trailing")) | |
| pe_f = _get_val(val.get("pe_forward")) | |
| pb = _get_val(val.get("pb_ratio")) | |
| ps = _get_val(val.get("ps_ratio")) | |
| ev = _get_val(val.get("ev_ebitda")) | |
| if pe_t: | |
| lines.append(f"- P/E Ratio (trailing): {pe_t:.1f}") | |
| if pe_f: | |
| lines.append(f"- P/E Ratio (forward): {pe_f:.1f}") | |
| if pb: | |
| lines.append(f"- P/B Ratio: {pb:.2f}") | |
| if ps: | |
| lines.append(f"- P/S Ratio: {ps:.2f}") | |
| if ev and not is_financial: | |
| lines.append(f"- EV/EBITDA: {ev:.1f}") | |
| if val.get("valuation_signal"): | |
| lines.append(f"- Overall Signal: {val['valuation_signal']}") | |
| lines.append("") | |
| # Volatility | |
| vol = extracted.get("volatility", {}) | |
| if vol: | |
| lines.append("=== VOLATILITY/RISK ===") | |
| beta = _get_val(vol.get("beta")) | |
| vix = _get_val(vol.get("vix")) | |
| hv = _get_val(vol.get("historical_volatility")) | |
| if beta: | |
| lines.append(f"- Beta: {beta:.2f}") | |
| if vix: | |
| lines.append(f"- VIX (market fear index): {vix:.1f}") | |
| if hv: | |
| lines.append(f"- Historical Volatility: {hv:.1f}%") | |
| lines.append("") | |
| # Macro | |
| macro = extracted.get("macro", {}) | |
| if macro: | |
| lines.append("=== MACROECONOMIC ENVIRONMENT (from FRED) ===") | |
| gdp = _get_val(macro.get("gdp_growth")) | |
| ir = _get_val(macro.get("interest_rate")) | |
| inf = _get_val(macro.get("inflation")) | |
| unemp = _get_val(macro.get("unemployment")) | |
| if gdp: | |
| lines.append(f"- GDP Growth: {gdp:.1f}%") | |
| if ir: | |
| lines.append(f"- Federal Funds Rate: {ir:.2f}%") | |
| if inf: | |
| lines.append(f"- Inflation (CPI): {inf:.1f}%") | |
| if unemp: | |
| lines.append(f"- Unemployment: {unemp:.1f}%") | |
| lines.append("") | |
| # News with VADER sentiment | |
| news = extracted.get("news", {}) | |
| if news: | |
| lines.append("=== RECENT NEWS ===") | |
| lines.append(f"- Articles found: {news.get('article_count', 0)}") | |
| # VADER sentiment scores for news | |
| vader_news = news.get("vader_sentiment") | |
| if vader_news: | |
| lines.append(f"- VADER Sentiment: {vader_news['avg_compound']:.2f} (range: {vader_news['min_compound']:.2f} to {vader_news['max_compound']:.2f})") | |
| lines.append(f" Breakdown: {vader_news['positive_count']} positive, {vader_news['negative_count']} negative, {vader_news['neutral_count']} neutral") | |
| for headline in news.get("headlines", []): | |
| lines.append(f" • {headline}") | |
| lines.append("") | |
| # Sentiment with VADER for reddit | |
| sent = extracted.get("sentiment", {}) | |
| if sent: | |
| lines.append("=== MARKET SENTIMENT ===") | |
| if sent.get("composite_score") is not None: | |
| lines.append(f"- Composite Score: {sent['composite_score']:.2f}") | |
| if sent.get("overall_category"): | |
| lines.append(f"- Overall: {sent['overall_category']}") | |
| # VADER sentiment scores for reddit | |
| vader_reddit = sent.get("vader_reddit") | |
| if vader_reddit: | |
| lines.append(f"- Reddit VADER: {vader_reddit['avg_compound']:.2f} (range: {vader_reddit['min_compound']:.2f} to {vader_reddit['max_compound']:.2f})") | |
| lines.append(f" Breakdown: {vader_reddit['positive_count']} positive, {vader_reddit['negative_count']} negative, {vader_reddit['neutral_count']} neutral") | |
| lines.append("") | |
| # Pre-built SWOT hints from MCP servers | |
| swot = extracted.get("aggregated_swot", {}) | |
| if any(swot.get(k) for k in ["strengths", "weaknesses", "opportunities", "threats"]): | |
| lines.append("=== DATA-DRIVEN SWOT SIGNALS (from metrics analysis) ===") | |
| for category in ["strengths", "weaknesses", "opportunities", "threats"]: | |
| items = swot.get(category, []) | |
| if items: | |
| lines.append(f"{category.upper()}:") | |
| for item in items: | |
| lines.append(f" • {item}") | |
| lines.append("") | |
| return "\n".join(lines) | |
| # ============================================================ | |
| # METRIC REFERENCE TABLE - For Hallucination Prevention (Layer 1) | |
| # ============================================================ | |
| import hashlib | |
| def _format_metric_for_reference(key: str, value, temporal_info: dict = None) -> tuple: | |
| """ | |
| Format a single metric for the reference table with exact as-of date. | |
| Returns: | |
| tuple: (formatted_string, as_of_date) | |
| """ | |
| if value is None: | |
| return None, None | |
| # Format value based on metric type | |
| if key in ("revenue", "net_income", "free_cash_flow", "market_cap", "enterprise_value", | |
| "total_assets", "total_liabilities", "stockholders_equity", "operating_cash_flow"): | |
| # Use human-readable format with B/M suffixes | |
| if abs(value) >= 1e9: | |
| formatted = f"${value/1e9:.1f}B" | |
| elif abs(value) >= 1e6: | |
| formatted = f"${value/1e6:.0f}M" | |
| else: | |
| formatted = f"${value:,.0f}" | |
| elif key in ("net_margin", "gross_margin", "operating_margin", "gdp_growth", | |
| "inflation", "unemployment", "historical_volatility", "revenue_cagr_3yr"): | |
| formatted = f"{value:.1f}%" | |
| elif key in ("interest_rate",): | |
| formatted = f"{value:.2f}%" | |
| elif key in ("pe_trailing", "pe_forward", "ps_ratio", "ev_ebitda", "vix"): | |
| formatted = f"{value:.1f}" | |
| elif key in ("pb_ratio", "debt_to_equity", "beta"): | |
| formatted = f"{value:.2f}" | |
| elif key in ("eps",): | |
| formatted = f"${value:.2f}" | |
| elif key in ("composite_score",): | |
| formatted = f"{value:.1f}" | |
| else: | |
| # Default formatting for unknown metrics | |
| if isinstance(value, float): | |
| formatted = f"{value:.2f}" | |
| else: | |
| formatted = str(value) | |
| # Extract actual date (not fiscal period label) | |
| as_of_date = None | |
| if temporal_info and isinstance(temporal_info, dict): | |
| as_of_date = temporal_info.get("end_date") # e.g., "2024-09-28" | |
| if as_of_date: | |
| formatted = f"{formatted} (as of {as_of_date})" | |
| return formatted, as_of_date | |
| def _generate_metric_reference_table(extracted: dict, is_financial: bool = False) -> tuple: | |
| """ | |
| Generate an immutable metric reference table for LLM grounding. | |
| Args: | |
| extracted: Extracted metrics dictionary from _extract_key_metrics() | |
| is_financial: If True, exclude EV/EBITDA | |
| Returns: | |
| tuple: (table_string, metric_lookup_dict) | |
| """ | |
| lines = [ | |
| "=" * 60, | |
| "METRIC REFERENCE TABLE - COPY VALUES EXACTLY AS SHOWN", | |
| "=" * 60, | |
| "", | |
| "CRITICAL INSTRUCTION:", | |
| "- Copy metric values EXACTLY as shown (including $, %, decimals)", | |
| "- Do NOT round, estimate, or approximate numbers", | |
| "- Do NOT invent metrics not listed below", | |
| "- Include the 'as of' date when citing temporal metrics", | |
| "", | |
| ] | |
| lookup = {} | |
| mid = 1 | |
| # Define categories and their metric keys | |
| categories = [ | |
| ("FUNDAMENTALS", "fundamentals", [ | |
| "revenue", "net_income", "net_margin", "gross_margin", "operating_margin", | |
| "eps", "debt_to_equity", "free_cash_flow", "revenue_cagr_3yr" | |
| ]), | |
| ("VALUATION", "valuation", [ | |
| "pe_trailing", "pe_forward", "pb_ratio", "ps_ratio", "ev_ebitda" | |
| ]), | |
| ("VOLATILITY", "volatility", [ | |
| "beta", "vix", "historical_volatility" | |
| ]), | |
| ("MACRO", "macro", [ | |
| "gdp_growth", "interest_rate", "inflation", "unemployment" | |
| ]), | |
| ] | |
| for label, cat_key, metric_keys in categories: | |
| data = extracted.get(cat_key, {}) | |
| if not data: | |
| continue | |
| category_lines = [] | |
| for metric_key in metric_keys: | |
| metric_val = data.get(metric_key) | |
| if metric_val is None: | |
| continue | |
| # Skip EV/EBITDA for financial institutions | |
| if is_financial and metric_key == "ev_ebitda": | |
| continue | |
| # Handle temporal metrics (dict with value and end_date) | |
| if isinstance(metric_val, dict) and metric_val.get("value") is not None: | |
| raw_value = metric_val["value"] | |
| formatted, as_of_date = _format_metric_for_reference( | |
| metric_key, raw_value, metric_val | |
| ) | |
| elif isinstance(metric_val, (int, float)): | |
| raw_value = metric_val | |
| formatted, as_of_date = _format_metric_for_reference(metric_key, raw_value) | |
| else: | |
| continue # Skip non-numeric | |
| if formatted: | |
| ref_id = f"M{mid:02d}" | |
| category_lines.append(f" {ref_id}: {metric_key} = {formatted}") | |
| lookup[ref_id] = { | |
| "key": metric_key, | |
| "raw_value": raw_value, | |
| "formatted": formatted, | |
| "as_of_date": as_of_date, | |
| "category": cat_key | |
| } | |
| mid += 1 | |
| if category_lines: | |
| lines.append(f"[{label}]") | |
| lines.extend(category_lines) | |
| lines.append("") | |
| # Add VADER sentiment metrics (news and reddit) | |
| sentiment_lines = [] | |
| # News VADER sentiment | |
| news_data = extracted.get("news", {}) | |
| if news_data.get("vader_sentiment"): | |
| vader = news_data["vader_sentiment"] | |
| ref_id = f"M{mid:02d}" | |
| formatted = f"{vader['avg_compound']:.2f}" | |
| sentiment_lines.append(f" {ref_id}: news_sentiment = {formatted} ({vader['total_count']} articles)") | |
| lookup[ref_id] = { | |
| "key": "news_sentiment", | |
| "raw_value": vader['avg_compound'], | |
| "formatted": formatted, | |
| "as_of_date": None, | |
| "category": "sentiment" | |
| } | |
| mid += 1 | |
| # Reddit VADER sentiment | |
| sent_data = extracted.get("sentiment", {}) | |
| if sent_data.get("vader_reddit"): | |
| vader = sent_data["vader_reddit"] | |
| ref_id = f"M{mid:02d}" | |
| formatted = f"{vader['avg_compound']:.2f}" | |
| sentiment_lines.append(f" {ref_id}: reddit_sentiment = {formatted} ({vader['total_count']} posts)") | |
| lookup[ref_id] = { | |
| "key": "reddit_sentiment", | |
| "raw_value": vader['avg_compound'], | |
| "formatted": formatted, | |
| "as_of_date": None, | |
| "category": "sentiment" | |
| } | |
| mid += 1 | |
| if sentiment_lines: | |
| lines.append("[SENTIMENT]") | |
| lines.extend(sentiment_lines) | |
| lines.append("") | |
| lines.append("=" * 60) | |
| lines.append("") | |
| return "\n".join(lines), lookup | |
| def _compute_reference_hash(metric_lookup: dict) -> str: | |
| """Compute SHA256 hash of metric lookup for integrity verification.""" | |
| # Sort keys for deterministic serialization | |
| serialized = json.dumps(metric_lookup, sort_keys=True, default=str) | |
| return hashlib.sha256(serialized.encode()).hexdigest() | |
| def _verify_reference_integrity(metric_lookup: dict, stored_hash: str) -> bool: | |
| """Verify metric lookup hasn't been corrupted.""" | |
| if not metric_lookup or not stored_hash: | |
| return False | |
| return _compute_reference_hash(metric_lookup) == stored_hash | |
| def _format_reference_log(metric_lookup: dict) -> str: | |
| """Format metric reference as compact single-line log for activity display.""" | |
| if not metric_lookup: | |
| return "No metrics extracted" | |
| parts = [] | |
| for ref_id in sorted(metric_lookup.keys()): | |
| entry = metric_lookup[ref_id] | |
| key = entry.get("key", "unknown") | |
| formatted = entry.get("formatted", "N/A") | |
| # Shorten large numbers for compact display | |
| if "$" in formatted and len(formatted) > 15: | |
| # Convert $394,328,000,000 to $394.3B | |
| raw = entry.get("raw_value", 0) | |
| if isinstance(raw, (int, float)) and abs(raw) >= 1e9: | |
| formatted = f"${raw/1e9:.1f}B" | |
| elif isinstance(raw, (int, float)) and abs(raw) >= 1e6: | |
| formatted = f"${raw/1e6:.0f}M" | |
| # Remove "as of" date for compact display | |
| if " (as of " in formatted: | |
| formatted = formatted.split(" (as of ")[0] | |
| parts.append(f"{key}={formatted}") | |
| return ", ".join(parts) | |
| def _format_metric_key(key: str) -> str: | |
| """Format metric key to human-readable name (e.g., pb_ratio -> P/B Ratio).""" | |
| METRIC_NAMES = { | |
| "revenue": "Revenue", "net_income": "Net Income", "net_margin": "Net Margin", | |
| "net_margin_pct": "Net Margin", "gross_margin": "Gross Margin", "operating_margin": "Operating Margin", | |
| "free_cash_flow": "Free Cash Flow", "operating_cash_flow": "Operating Cash Flow", | |
| "total_assets": "Total Assets", "total_liabilities": "Total Liabilities", | |
| "stockholders_equity": "Stockholders' Equity", "debt_to_equity": "Debt/Equity", | |
| "eps": "EPS", "market_cap": "Market Cap", "enterprise_value": "Enterprise Value", | |
| "trailing_pe": "P/E (Trailing)", "forward_pe": "P/E (Forward)", | |
| "pb_ratio": "P/B Ratio", "ps_ratio": "P/S Ratio", "trailing_peg": "PEG Ratio", | |
| "price_to_fcf": "Price/FCF", "ev_ebitda": "EV/EBITDA", "ev_revenue": "EV/Revenue", | |
| "vix": "VIX", "beta": "Beta", "historical_volatility": "Historical Volatility", | |
| "gdp_growth": "GDP Growth", "interest_rate": "Interest Rate", | |
| "cpi_inflation": "Inflation", "unemployment": "Unemployment", | |
| } | |
| return METRIC_NAMES.get(key, key.replace("_", " ").title()) | |
| def _generate_data_quality_notes(metric_reference: dict) -> dict: | |
| """ | |
| Generate deterministic data quality assessment from metric reference. | |
| Returns: | |
| { | |
| "high_confidence": ["Revenue", "Net Margin", ...], | |
| "gaps_or_stale": ["EPS (stale: 2024-06-30)", "Debt/Equity (missing)"], | |
| } | |
| """ | |
| from datetime import datetime, timedelta | |
| high_confidence = [] | |
| gaps_or_stale = [] | |
| threshold = timedelta(days=30) | |
| today = datetime.now() | |
| for ref_id, entry in metric_reference.items(): | |
| key = entry.get("key", "unknown") | |
| display_name = _format_metric_key(key) | |
| raw_value = entry.get("raw_value") | |
| as_of_date = entry.get("as_of_date") | |
| if raw_value is None: | |
| gaps_or_stale.append(f"{display_name} (missing)") | |
| elif as_of_date: | |
| try: | |
| date = datetime.strptime(as_of_date, "%Y-%m-%d") | |
| if today - date > threshold: | |
| gaps_or_stale.append(f"{display_name} (stale: {as_of_date})") | |
| else: | |
| high_confidence.append(display_name) | |
| except ValueError: | |
| high_confidence.append(display_name) | |
| else: | |
| high_confidence.append(display_name) | |
| return { | |
| "high_confidence": high_confidence, | |
| "gaps_or_stale": gaps_or_stale, | |
| } | |
| # New institutional-grade prompt | |
| ANALYZER_SYSTEM_PROMPT = """You are a senior financial analyst producing institutional-grade SWOT analyses. | |
| ## DATA GROUNDING RULES (CRITICAL) | |
| 1. USE ONLY the provided data. Never invent or assume metrics not given. | |
| 2. CITE specific numbers for every finding (e.g., "Net margin: 24.3%", "P/E: 21.3x"). | |
| 3. If data is missing, state "Insufficient data" - do NOT fabricate. | |
| 4. Distinguish trailing (historical) vs forward (projected) metrics. | |
| ## AVAILABLE DATA BASKETS | |
| ### Fundamentals (SEC EDGAR + Yahoo Finance) | |
| revenue, net_income, net_margin_pct, gross_margin_pct, operating_margin_pct, | |
| total_assets, total_liabilities, stockholders_equity, free_cash_flow, | |
| operating_cash_flow, long_term_debt, debt_to_equity, eps | |
| ### Valuation (Yahoo Finance) | |
| market_cap, enterprise_value, trailing_pe, forward_pe, pb_ratio, ps_ratio, | |
| trailing_peg, price_to_fcf, revenue_growth, earnings_growth | |
| {ev_ebitda_note} | |
| ### Volatility (FRED + Yahoo) | |
| vix, vxn, beta, historical_volatility, implied_volatility | |
| ### Macro (BEA/BLS/FRED) | |
| gdp_growth, interest_rate, cpi_inflation, unemployment | |
| ### News & Sentiment | |
| News articles with title, source, url | |
| Sentiment scores from Finnhub and Reddit | |
| ## WHAT YOU DO NOT DO | |
| - Provide buy/sell/hold recommendations | |
| - Compare to sector/peer benchmarks (data not provided) | |
| - Speculate beyond provided data | |
| - Use vague hedge words without quantification""" | |
| def _build_revision_prompt( | |
| critique_details: dict, | |
| company_data: str, | |
| current_draft: str, | |
| is_financial: bool, | |
| extracted: dict = None | |
| ) -> str: | |
| """Build revision prompt with conditional focus areas based on failed criteria. | |
| Args: | |
| critique_details: Structured dict from Critic with scores and feedback | |
| company_data: Formatted metrics string for reference | |
| current_draft: The current SWOT draft to be revised | |
| is_financial: Whether the company is a financial institution | |
| extracted: Extracted metrics dict for reference table generation | |
| Returns: | |
| Complete revision prompt string | |
| """ | |
| # Generate metric reference table for revision (same as initial mode) | |
| reference_table = "" | |
| if extracted: | |
| reference_table, _ = _generate_metric_reference_table(extracted, is_financial) | |
| scores = critique_details.get("scores", {}) | |
| # Determine which focus areas to include based on failed criteria | |
| focus_areas = [] | |
| if scores.get("evidence_grounding", 10) < 7: | |
| focus_areas.append(EVIDENCE_GROUNDING_BLOCK) | |
| if scores.get("constraint_compliance", 10) < 6: | |
| focus_areas.append(CONSTRAINT_COMPLIANCE_BLOCK) | |
| if scores.get("specificity_actionability", 10) < 7: | |
| focus_areas.append(SPECIFICITY_BLOCK) | |
| if scores.get("strategic_insight", 10) < 7: | |
| focus_areas.append(INSIGHT_BLOCK) | |
| if scores.get("completeness_balance", 10) < 7: | |
| focus_areas.append(COMPLETENESS_BLOCK) | |
| if scores.get("clarity_structure", 10) < 7: | |
| focus_areas.append(CLARITY_BLOCK) | |
| # Format critic feedback components | |
| deficiencies = critique_details.get("key_deficiencies", []) | |
| strengths = critique_details.get("strengths_to_preserve", []) | |
| feedback = critique_details.get("actionable_feedback", []) | |
| # Build deficiencies section | |
| deficiencies_text = "\n".join(f"- {d}" for d in deficiencies) if deficiencies else "- None specified" | |
| # Build strengths section | |
| strengths_text = "\n".join(f"- {s}" for s in strengths) if strengths else "- None specified" | |
| # Build feedback section | |
| feedback_text = "\n".join(f"{i+1}. {f}" for i, f in enumerate(feedback)) if feedback else "- None specified" | |
| # Build focus areas section | |
| focus_areas_text = "\n".join(focus_areas) if focus_areas else "Address all deficiencies listed above." | |
| # Add EV/EBITDA note for financial institutions | |
| ev_note = "" | |
| if is_financial: | |
| ev_note = "\n**Note:** This is a financial institution - EV/EBITDA is excluded from analysis." | |
| prompt = f"""{reference_table}## REVISION MODE ACTIVATED | |
| You previously generated a SWOT analysis that did not meet quality standards. You are now in revision mode. | |
| ### YOUR TASK | |
| 1. **Review the Critic's feedback** carefully | |
| 2. **Address each deficiency** listed in priority order | |
| 3. **Preserve strengths** explicitly called out — do not regress on what worked | |
| 4. **Regenerate the complete SWOT** — not a partial patch | |
| 5. **Use EXACT values from the METRIC REFERENCE TABLE above** — do not round or estimate | |
| ### CRITIC FEEDBACK | |
| Status: {critique_details.get('status', 'REJECTED')} | |
| Weighted Score: {critique_details.get('weighted_score', 0):.1f} / 10 | |
| **Key Deficiencies:** | |
| {deficiencies_text} | |
| **Strengths to Preserve:** | |
| {strengths_text} | |
| **Actionable Feedback:** | |
| {feedback_text} | |
| ### FOCUS AREAS FOR THIS REVISION | |
| {focus_areas_text} | |
| ### REVISION RULES | |
| **DO:** | |
| - Fix every item in "Key Deficiencies" — these are blocking issues | |
| - Apply each point in "Actionable Feedback" — these are specific instructions | |
| - Keep everything listed under "Strengths to Preserve" — do not modify these sections | |
| - **Use EXACT metric values from the METRIC REFERENCE TABLE** — copy numbers verbatim | |
| - **Include [M##] citation after every metric value** — e.g., "$394.3B [M01]" | |
| - Include the 'as of' date when citing temporal metrics | |
| {ev_note} | |
| **DO NOT:** | |
| - Ignore lower-priority feedback items — address all of them | |
| - Introduce new metrics not in the original input data | |
| - **Round, estimate, or approximate any numbers** — use exact values only | |
| - **Omit [M##] citations** — they are required for automatic verification | |
| - Remove content that was working well | |
| - Add defensive caveats or apologies about the revision | |
| - Reference the revision process in your output — produce a clean SWOT as if first attempt | |
| ### REFERENCE DATA | |
| {company_data} | |
| ### CURRENT DRAFT (to revise) | |
| {current_draft} | |
| ### OUTPUT INSTRUCTIONS | |
| Produce a complete, revised SWOT analysis with this exact structure (3-5 points per section): | |
| ## Strengths | |
| - [M01] Revenue: $394.3B - Strong market position with substantial scale | |
| - [M02] Net Margin: 24.3% - High profitability indicates pricing power | |
| ## Weaknesses | |
| - [M04] Debt/Equity: 1.87 - Elevated leverage increases financial risk | |
| ## Opportunities | |
| - [M12] GDP Growth: 4.3% - Favorable macro environment for expansion | |
| ## Threats | |
| - [M13] Interest Rate: 3.72% - Higher borrowing costs may impact margins | |
| CRITICAL REQUIREMENTS: | |
| 1. Each point MUST start with metric reference in brackets: [M##] | |
| 2. Format: [M##] Metric: Value - Strategic insight | |
| 3. Use EXACT values from the METRIC REFERENCE TABLE - do NOT round | |
| 4. Keep insights concise (one sentence) | |
| 5. Include 3-5 points per section | |
| Do not: | |
| - Include any preamble about revisions | |
| - Reference the Critic's feedback in your output | |
| Simply output the improved SWOT as a clean, final deliverable.""" | |
| return prompt | |
| def _build_analyzer_prompt(company: str, ticker: str, formatted_data: str, | |
| is_financial: bool, extracted: dict = None) -> tuple: | |
| """Build analyzer prompt with metric reference table for hallucination prevention. | |
| Args: | |
| company: Company name | |
| ticker: Stock ticker | |
| formatted_data: Formatted metrics text | |
| is_financial: If True, exclude EV/EBITDA | |
| extracted: Extracted metrics dict (for reference table generation) | |
| Returns: | |
| tuple: (prompt_string, metric_lookup_dict, reference_hash) | |
| """ | |
| # Generate metric reference table if extracted data is available | |
| reference_table = "" | |
| metric_lookup = {} | |
| ref_hash = "" | |
| if extracted: | |
| reference_table, metric_lookup = _generate_metric_reference_table(extracted, is_financial) | |
| ref_hash = _compute_reference_hash(metric_lookup) | |
| if is_financial: | |
| ev_note = "\nNote: EV/EBITDA excluded - not meaningful for financial institutions." | |
| else: | |
| ev_note = ", ev_ebitda, ev_revenue" | |
| system = ANALYZER_SYSTEM_PROMPT.format(ev_ebitda_note=ev_note) | |
| prompt = f"""{reference_table}{system} | |
| === DATA FOR {company} ({ticker}) === | |
| {formatted_data} | |
| === OUTPUT FORMAT === | |
| Produce a SWOT analysis with this exact structure (3-5 points per section): | |
| ## Strengths | |
| - [M01] Revenue: $394.3B - Strong market position with substantial scale | |
| - [M02] Net Margin: 24.3% - High profitability indicates pricing power | |
| ## Weaknesses | |
| - [M04] Debt/Equity: 1.87 - Elevated leverage increases financial risk | |
| ## Opportunities | |
| - [M12] GDP Growth: 4.3% - Favorable macro environment for expansion | |
| ## Threats | |
| - [M13] Interest Rate: 3.72% - Higher borrowing costs may impact margins | |
| CRITICAL REQUIREMENTS: | |
| 1. Each point MUST start with metric reference in brackets: [M##] | |
| 2. Format: [M##] Metric: Value - Strategic insight | |
| 3. Use EXACT values from the METRIC REFERENCE TABLE - do NOT round | |
| 4. Keep insights concise (one sentence) | |
| 5. Include 3-5 points per section""" | |
| return prompt, metric_lookup, ref_hash | |
| def analyzer_node(state, workflow_id=None, progress_store=None): | |
| # Extract workflow_id and progress_store from state (graph invokes with state only) | |
| if workflow_id is None: | |
| workflow_id = state.get("workflow_id") | |
| if progress_store is None: | |
| progress_store = state.get("progress_store") | |
| # Update progress if tracking is enabled | |
| if workflow_id and progress_store: | |
| progress_store[workflow_id].update({ | |
| "current_step": "analyzer", | |
| "revision_count": state.get("revision_count", 0), | |
| "score": state.get("score", 0) | |
| }) | |
| # Use user-provided API keys if available | |
| user_keys = state.get("user_api_keys", {}) | |
| llm = get_llm_client(user_keys) if user_keys else get_llm_client() | |
| raw = state["raw_data"] | |
| company = state["company_name"] | |
| ticker = state.get("ticker", "") | |
| # Extract company profile and detect financial institution | |
| company_profile = _extract_company_profile(raw) | |
| sector = company_profile.get("sector", "") | |
| industry = company_profile.get("industry", "") | |
| is_financial = _is_financial_institution(sector, industry, ticker) | |
| if is_financial: | |
| _add_activity_log(workflow_id, progress_store, "analyzer", | |
| f"Financial institution detected - excluding EV/EBITDA") | |
| # Extract and format metrics for better LLM understanding | |
| extracted = _extract_key_metrics(raw) | |
| formatted_data = _format_metrics_for_prompt(extracted, is_financial=is_financial) | |
| # Generate detailed data report (shown before SWOT) | |
| data_report = _generate_data_report(raw, is_financial=is_financial) | |
| # Detect revision mode: if we have critique_details with REJECTED status | |
| # (revision_count may still be 0 on first revision loop) | |
| critique_details = state.get("critique_details", {}) | |
| is_revision = bool(critique_details) and critique_details.get("status") == "REJECTED" | |
| # Debug: Log critique details presence | |
| print(f"[DEBUG] Analyzer: critique_details={bool(critique_details)}, status={critique_details.get('status')}, is_revision={is_revision}") | |
| if is_revision and critique_details: | |
| # REVISION MODE: Use enhanced revision prompt with Critic feedback | |
| current_revision = state.get("revision_count", 0) + 1 | |
| _add_activity_log(workflow_id, progress_store, "analyzer", | |
| f"Revision #{current_revision} in progress...") | |
| prompt = _build_revision_prompt( | |
| critique_details=critique_details, | |
| company_data=formatted_data, | |
| current_draft=state.get("draft_report", ""), | |
| is_financial=is_financial, | |
| extracted=extracted | |
| ) | |
| # Update progress with revision info | |
| if workflow_id and progress_store: | |
| progress_store[workflow_id].update({ | |
| "current_step": "analyzer", | |
| "revision_count": current_revision, | |
| }) | |
| else: | |
| # INITIAL MODE: Use standard analyzer prompt | |
| _add_activity_log(workflow_id, progress_store, "analyzer", | |
| f"Calling LLM to generate SWOT analysis...") | |
| prompt, metric_lookup, ref_hash = _build_analyzer_prompt( | |
| company, ticker, formatted_data, is_financial, extracted | |
| ) | |
| # Store metric reference for validation (Layer 1 hallucination prevention) | |
| state["metric_reference"] = metric_lookup | |
| state["metric_reference_hash"] = ref_hash | |
| # Log reference values for manual verification | |
| ref_log = _format_reference_log(metric_lookup) | |
| _add_activity_log(workflow_id, progress_store, "analyzer", | |
| f"Reference values: {ref_log}") | |
| current_revision = 0 | |
| # In revision mode, add delay before LLM call to avoid rate limits | |
| # (Critic just called LLM, so we need to wait) | |
| if is_revision: | |
| print("Waiting 10s before revision LLM call (rate limit buffer)...") | |
| time.sleep(10) | |
| start_time = time.time() | |
| response, provider, error, providers_failed = llm.query(prompt, temperature=0) | |
| elapsed = time.time() - start_time | |
| # Log failed providers and update LLM status in real-time | |
| for pf in providers_failed: | |
| _add_activity_log(workflow_id, progress_store, "analyzer", f"LLM {pf['name']} failed: {pf['error']}") | |
| # Update LLM status in real-time for frontend | |
| if workflow_id and progress_store and workflow_id in progress_store: | |
| llm_status = progress_store[workflow_id].get("llm_status", {}) | |
| if pf["name"] in llm_status: | |
| llm_status[pf["name"]] = "failed" | |
| # Track failed providers in state for frontend | |
| if "llm_providers_failed" not in state: | |
| state["llm_providers_failed"] = [] | |
| state["llm_providers_failed"].extend([pf["name"] for pf in providers_failed]) | |
| # Update successful provider status | |
| if provider and workflow_id and progress_store and workflow_id in progress_store: | |
| llm_status = progress_store[workflow_id].get("llm_status", {}) | |
| provider_name = provider.split(":")[0] | |
| if provider_name in llm_status: | |
| llm_status[provider_name] = "completed" | |
| if error: | |
| if is_revision: | |
| # REVISION MODE ERROR: Graceful degradation - keep previous draft | |
| _add_activity_log(workflow_id, progress_store, "analyzer", f"Revision failed: {error}") | |
| if current_revision == 1: | |
| _add_activity_log(workflow_id, progress_store, "analyzer", | |
| "Using initial draft (revision unavailable)") | |
| else: | |
| _add_activity_log(workflow_id, progress_store, "analyzer", | |
| f"Using revision #{current_revision - 1} draft (further revision unavailable)") | |
| # Don't set error - allow workflow to complete with previous draft | |
| state["analyzer_revision_skipped"] = True | |
| state["revision_count"] = current_revision | |
| else: | |
| # INITIAL MODE ERROR: Abort workflow | |
| state["draft_report"] = f"Error generating analysis: {error}" | |
| state["provider_used"] = None | |
| state["error"] = error # Signal workflow to abort | |
| _add_activity_log(workflow_id, progress_store, "analyzer", f"LLM error: {error}") | |
| _add_activity_log(workflow_id, progress_store, "analyzer", | |
| "Workflow aborted - all LLM providers unavailable") | |
| else: | |
| if is_revision: | |
| # REVISION MODE SUCCESS: Update draft with revision | |
| state["draft_report"] = response | |
| state["provider_used"] = provider | |
| state["analyzer_revision_skipped"] = False | |
| state["revision_count"] = current_revision | |
| _add_activity_log(workflow_id, progress_store, "analyzer", | |
| f"Revision #{current_revision} completed via {provider} ({elapsed:.1f}s)") | |
| else: | |
| # INITIAL MODE SUCCESS: Combine data report with SWOT analysis | |
| swot_section = f"## SWOT Analysis\n\n{response}" | |
| full_report = f"{data_report}\n{swot_section}" | |
| state["draft_report"] = full_report | |
| state["data_report"] = data_report # Store separately for frontend flexibility | |
| state["provider_used"] = provider | |
| _add_activity_log(workflow_id, progress_store, "analyzer", | |
| f"SWOT generated via {provider} ({elapsed:.1f}s)") | |
| # Update progress with final revision count | |
| if workflow_id and progress_store: | |
| progress_store[workflow_id].update({ | |
| "revision_count": state.get("revision_count", 0), | |
| "score": state.get("score", 0) | |
| }) | |
| return state | |