vn6295337's picture
fix: Align analyzer field names with MCP data structure
2db41a2
from src.llm_client import get_llm_client
from langsmith import traceable
import time
import json
# VADER Sentiment Analysis
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
_vader_analyzer = None
def _get_vader():
"""Lazy-load VADER analyzer (singleton)."""
global _vader_analyzer
if _vader_analyzer is None:
_vader_analyzer = SentimentIntensityAnalyzer()
return _vader_analyzer
def _compute_vader_sentiment(texts: list) -> dict:
"""
Compute VADER sentiment scores for a list of texts.
Args:
texts: List of strings (headlines, titles, etc.)
Returns:
{
"avg_compound": 0.42,
"min_compound": -0.31,
"max_compound": 0.78,
"positive_count": 3,
"negative_count": 1,
"neutral_count": 1,
"total_count": 5
}
or None if no texts provided
"""
if not texts:
return None
vader = _get_vader()
scores = []
for text in texts:
if text and isinstance(text, str):
score = vader.polarity_scores(text)["compound"]
scores.append(score)
if not scores:
return None
return {
"avg_compound": round(sum(scores) / len(scores), 3),
"min_compound": round(min(scores), 3),
"max_compound": round(max(scores), 3),
"positive_count": sum(1 for s in scores if s > 0.05),
"negative_count": sum(1 for s in scores if s < -0.05),
"neutral_count": sum(1 for s in scores if -0.05 <= s <= 0.05),
"total_count": len(scores)
}
# Financial institution detection for EV/EBITDA exclusion
FINANCIAL_SECTORS = {
"financial services", "financial", "banking", "banks",
"insurance", "real estate investment trust", "reit",
"investment management", "capital markets", "diversified financial services",
"consumer finance", "asset management", "mortgage finance",
}
FINANCIAL_INDUSTRIES = {
"banks", "regional banks", "diversified banks", "money center banks",
"insurance", "life insurance", "property insurance", "reinsurance",
"real estate", "reit", "mortgage reits", "equity reits",
"asset management", "investment banking", "capital markets",
"consumer finance", "specialty finance",
}
# Fallback: known financial tickers when sector data unavailable
FINANCIAL_TICKERS = {
"JPM", "BAC", "WFC", "GS", "MS", "C", "USB", "PNC", "TFC", "COF",
"AXP", "BLK", "SCHW", "CME", "ICE", "SPGI", "MCO",
"BRK.A", "BRK.B", "MET", "PRU", "AIG", "ALL", "TRV", "PGR", "CB",
"AMT", "PLD", "CCI", "EQIX", "PSA", "O", "WELL", "AVB", "EQR",
}
# =============================================================================
# REVISION MODE: Conditional Focus Area Blocks
# These are included in revision prompts based on which rubric criteria failed
# =============================================================================
EVIDENCE_GROUNDING_BLOCK = """
**EVIDENCE GROUNDING (Critical)**
- Every claim must cite a specific metric from the input data
- Use exact field names: `revenue`, `net_margin_pct`, `trailing_pe`, etc.
- Format citations as: "[Metric]: [Value] ([Source], [Period])"
- If a metric was flagged as fabricated, remove it entirely or replace with actual data
"""
CONSTRAINT_COMPLIANCE_BLOCK = """
**CONSTRAINT COMPLIANCE (Critical)**
- Remove any language that sounds like investment advice
- Check all temporal labels — TTM vs FY vs Q must match the source
- Add confidence levels to key conclusions: (High/Medium/Low)
- Do not use EV/EBITDA for financial institutions
- For missing data, state "DATA NOT PROVIDED" — do not estimate
"""
SPECIFICITY_BLOCK = """
**SPECIFICITY & ACTIONABILITY**
- Replace generic statements with company-specific observations
- Quantify every claim possible: not "strong margins" but "31.0% operating margin"
- Remove business clichés: "leveraging," "best-in-class," "synergies"
"""
INSIGHT_BLOCK = """
**STRATEGIC INSIGHT**
- Connect observations across data baskets (e.g., link margin trends to macro rates)
- Go beyond restating metrics — explain WHY they matter
- Identify non-obvious relationships in the data
"""
COMPLETENESS_BLOCK = """
**COMPLETENESS & BALANCE**
- Ensure ALL required sections are present (Strengths, Weaknesses, Opportunities, Threats, Data Quality Notes)
- Balance quadrants — no section should be filler or disproportionately thin
"""
CLARITY_BLOCK = """
**CLARITY & STRUCTURE**
- Use consistent formatting throughout
- Ensure no contradictions across sections
- Make output scannable — executives should grasp key points in 30 seconds
"""
def _is_financial_institution(sector: str, industry: str, ticker: str) -> bool:
"""Detect if company is a financial institution (EV/EBITDA not meaningful)."""
sector_lower = (sector or "").lower().strip()
industry_lower = (industry or "").lower().strip()
if any(fs in sector_lower for fs in FINANCIAL_SECTORS):
return True
if any(fi in industry_lower for fi in FINANCIAL_INDUSTRIES):
return True
if ticker and ticker.upper() in FINANCIAL_TICKERS:
return True
return False
def _extract_company_profile(raw_data: str) -> dict:
"""Extract company profile details from SEC EDGAR and Yahoo Finance data."""
try:
data = json.loads(raw_data)
except json.JSONDecodeError:
return {}
multi_source = data.get("multi_source", {})
profile = {}
# Try SEC EDGAR for business address (most authoritative)
# Handle both old format (with "data" wrapper) and new flat format
fin_all = multi_source.get("fundamentals_all", {})
sec_source = fin_all.get("sec_edgar", {})
# Check if old format with "data" wrapper or new flat format
sec_data = sec_source.get("data", sec_source) if "data" in sec_source else sec_source
sec_profile = sec_data.get("company_info", {}) or sec_data.get("profile", {})
if sec_profile:
# SEC EDGAR company info
city = sec_profile.get("city", "")
state = sec_profile.get("state", sec_profile.get("stateOrCountry", ""))
if city and state:
profile["business_address"] = f"{city}, {state}"
profile["cik"] = sec_profile.get("cik", "")
profile["sic"] = sec_profile.get("sic", "")
profile["sic_description"] = sec_profile.get("sicDescription", "")
# Try Yahoo Finance for sector/industry and other details
yf_val_source = multi_source.get("valuation_all", {}).get("yahoo_finance", {})
yf_val = yf_val_source.get("data", yf_val_source) if "data" in yf_val_source else yf_val_source
yf_profile = yf_val.get("profile", {})
if not yf_profile:
# Try fundamentals yahoo_finance
yf_fund_source = fin_all.get("yahoo_finance", {})
yf_fund = yf_fund_source.get("data", yf_fund_source) if "data" in yf_fund_source else yf_fund_source
yf_profile = yf_fund.get("profile", {})
if yf_profile:
profile["sector"] = yf_profile.get("sector", "")
profile["industry"] = yf_profile.get("industry", "")
profile["employees"] = yf_profile.get("fullTimeEmployees", "")
profile["website"] = yf_profile.get("website", "")
# Yahoo Finance may also have address
if not profile.get("business_address"):
city = yf_profile.get("city", "")
state = yf_profile.get("state", "")
country = yf_profile.get("country", "")
if city:
addr_parts = [city]
if state:
addr_parts.append(state)
if country and country != "United States":
addr_parts.append(country)
profile["business_address"] = ", ".join(addr_parts)
return profile
def _add_activity_log(workflow_id, progress_store, step, message):
"""Helper to add activity log entry."""
if workflow_id and progress_store:
from src.services.workflow_store import add_activity_log
add_activity_log(workflow_id, step, message)
def _extract_temporal_metric(metric_data: dict) -> dict:
"""Extract metric value with temporal metadata (fiscal year, period end, form type)."""
if not isinstance(metric_data, dict):
return {"value": metric_data}
return {
"value": metric_data.get("value"),
"end_date": metric_data.get("end_date"),
"fiscal_year": metric_data.get("fiscal_year"),
"form": metric_data.get("form"), # "10-K" (annual) or "10-Q" (quarterly)
}
def _extract_valuation_metric(metric_data: dict) -> dict:
"""Extract valuation metric with as_of date (new MCP structure)."""
if not isinstance(metric_data, dict):
return {"value": metric_data}
return {
"value": metric_data.get("value"),
"end_date": metric_data.get("as_of"), # MCP uses "as_of" for valuation
}
def _get_fiscal_period_label(metric: dict) -> str:
"""Format fiscal period label from temporal data (e.g., 'FY 2023' or 'Q3 2024')."""
if not isinstance(metric, dict):
return ""
form = metric.get("form", "")
fy = metric.get("fiscal_year")
end_date = metric.get("end_date")
if not fy:
return ""
if form == "10-K":
return f"FY {fy}"
elif form == "10-Q" and end_date:
try:
# Parse quarter from end date
month = int(end_date.split("-")[1])
quarter = (month - 1) // 3 + 1
return f"Q{quarter} {fy}"
except (ValueError, IndexError):
return f"FY {fy}"
return f"FY {fy}"
def _format_currency(value):
"""Format large numbers as currency (B/M)."""
if value is None:
return "N/A"
if isinstance(value, dict):
value = value.get("value")
if value is None:
return "N/A"
if isinstance(value, (int, float)):
if abs(value) >= 1e12:
return f"${value/1e12:.2f}T"
if abs(value) >= 1e9:
return f"${value/1e9:.2f}B"
if abs(value) >= 1e6:
return f"${value/1e6:.0f}M"
return f"${value:,.0f}"
return str(value)
def _format_number(value, suffix="", decimals=2):
"""Format a number with optional suffix."""
if value is None:
return "N/A"
if isinstance(value, dict):
value = value.get("value")
if value is None:
return "N/A"
if isinstance(value, (int, float)):
return f"{value:.{decimals}f}{suffix}"
return str(value)
def _get_period_label(metric_data: dict) -> str:
"""Get period label from metric data (e.g., 'FY 2024', 'Q3 2024', '2024-11')."""
if not isinstance(metric_data, dict):
return ""
# Check for fiscal year/form info
fy = metric_data.get("fiscal_year")
form = metric_data.get("form", "")
end_date = metric_data.get("end_date", "")
date = metric_data.get("date", "")
if fy:
if form == "10-K":
return f"FY {fy}"
elif form == "10-Q" and end_date:
try:
month = int(end_date.split("-")[1])
quarter = (month - 1) // 3 + 1
return f"Q{quarter} {fy}"
except:
return f"FY {fy}"
return f"FY {fy}"
# Fallback to date
if end_date:
return end_date[:10]
if date:
return str(date)[:10]
return ""
def _get_value(metric_data) -> any:
"""Extract value from metric data (handles both dict and plain values)."""
if isinstance(metric_data, dict):
return metric_data.get("value")
return metric_data
def _generate_data_report(raw_data: str, is_financial: bool = False) -> str:
"""Generate complete multi-source data report with simple tables.
Args:
raw_data: JSON string of research data
is_financial: If True, exclude EV/EBITDA for financial institutions
"""
try:
data = json.loads(raw_data)
except json.JSONDecodeError:
return "Error: Could not parse data"
lines = []
company = data.get("company_name", "Unknown")
ticker = data.get("ticker", "N/A")
multi_source = data.get("multi_source", {})
metrics = data.get("metrics", {})
lines.append(f"# Data Report: {company} ({ticker})")
lines.append("")
# ========== FINANCIALS ==========
fin_all = multi_source.get("fundamentals_all", {})
# Handle both old format (with "data" wrapper) and new flat format
sec_source = fin_all.get("sec_edgar", {})
sec_data = sec_source.get("data", sec_source) if "data" in sec_source else sec_source
yf_source = fin_all.get("yahoo_finance", {})
yf_data = yf_source.get("data", yf_source) if "data" in yf_source else yf_source
if sec_data or yf_data:
lines.append("## Financials")
lines.append("Primary: SEC EDGAR | Secondary: Yahoo Finance")
lines.append("")
lines.append("| Metric | Period | SEC EDGAR | Yahoo Finance |")
lines.append("|--------|--------|-----------|---------------|")
fin_metrics = [
("Revenue", "revenue", _format_currency),
("Net Income", "net_income", _format_currency),
("Gross Profit", "gross_profit", _format_currency),
("Operating Income", "operating_income", _format_currency),
("Gross Margin %", "gross_margin_pct", lambda v: _format_number(v, "%")),
("Operating Margin %", "operating_margin_pct", lambda v: _format_number(v, "%")),
("Net Margin %", "net_margin_pct", lambda v: _format_number(v, "%")),
("Free Cash Flow", "free_cash_flow", _format_currency),
("Operating Cash Flow", "operating_cash_flow", _format_currency),
("Total Assets", "total_assets", _format_currency),
("Total Liabilities", "total_liabilities", _format_currency),
("Stockholders Equity", "stockholders_equity", _format_currency),
("Cash", "cash", _format_currency),
("Long-term Debt", "long_term_debt", _format_currency),
("Net Debt", "net_debt", _format_currency),
("R&D Expense", "rd_expense", _format_currency),
]
for name, key, fmt in fin_metrics:
sec_val = sec_data.get(key)
yf_val = yf_data.get(key)
period = _get_period_label(sec_val) or _get_period_label(yf_val)
sec_str = fmt(_get_value(sec_val)) if sec_val else "N/A"
yf_str = fmt(_get_value(yf_val)) if yf_val else "N/A"
if sec_str != "N/A" or yf_str != "N/A":
lines.append(f"| {name} | {period} | {sec_str} | {yf_str} |")
lines.append("")
# ========== VALUATION ==========
val_all = multi_source.get("valuation_all", {})
yf_val_src = val_all.get("yahoo_finance", {})
yf_val = yf_val_src.get("data", yf_val_src) if "data" in yf_val_src else yf_val_src
av_val_src = val_all.get("alpha_vantage", {})
av_val = av_val_src.get("data", av_val_src) if "data" in av_val_src else av_val_src
if yf_val or av_val:
lines.append("## Valuation")
lines.append("Primary: Yahoo Finance | Secondary: Alpha Vantage")
lines.append("")
lines.append("| Metric | Yahoo Finance | Alpha Vantage |")
lines.append("|--------|---------------|---------------|")
val_metrics = [
("Market Cap", "market_cap", _format_currency),
("Enterprise Value", "enterprise_value", _format_currency),
("P/E Trailing", "trailing_pe", lambda v: _format_number(v, "x")),
("P/E Forward", "forward_pe", lambda v: _format_number(v, "x")),
("P/B Ratio", "pb_ratio", lambda v: _format_number(v, "x")),
("P/S Ratio", "ps_ratio", lambda v: _format_number(v, "x")),
("PEG Ratio", "trailing_peg", lambda v: _format_number(v, "x")),
("Price/FCF", "price_to_fcf", lambda v: _format_number(v, "x")),
("Revenue Growth", "revenue_growth", lambda v: _format_number(v * 100 if v and abs(v) < 10 else v, "%") if v else "N/A"),
("Earnings Growth", "earnings_growth", lambda v: _format_number(v * 100 if v and abs(v) < 10 else v, "%") if v else "N/A"),
]
# Only include EV/EBITDA for non-financial companies
if not is_financial:
val_metrics.insert(6, ("EV/EBITDA", "ev_ebitda", lambda v: _format_number(v, "x")))
val_metrics.insert(7, ("EV/Revenue", "ev_revenue", lambda v: _format_number(v, "x")))
for name, key, fmt in val_metrics:
y = yf_val.get(key)
a = av_val.get(key)
ys = fmt(_get_value(y)) if y is not None else "N/A"
avs = fmt(_get_value(a)) if a is not None else "N/A"
if ys != "N/A" or avs != "N/A":
lines.append(f"| {name} | {ys} | {avs} |")
lines.append("")
# ========== VOLATILITY ==========
vol_all = multi_source.get("volatility_all", {})
if vol_all:
lines.append("## Volatility")
lines.append("Primary: FRED + Yahoo | Secondary: Alpha Vantage")
lines.append("")
lines.append("| Metric | Date | Primary | Secondary |")
lines.append("|--------|------|---------|-----------|")
ctx = vol_all.get("market_volatility_context", {})
vix = ctx.get("vix", {})
vxn = ctx.get("vxn", {})
yf_vol_src = vol_all.get("yahoo_finance", {})
yf_vol = yf_vol_src.get("data", yf_vol_src) if "data" in yf_vol_src else yf_vol_src
av_vol_src = vol_all.get("alpha_vantage", {})
av_vol = av_vol_src.get("data", av_vol_src) if "data" in av_vol_src else av_vol_src
# VIX
if vix.get("value"):
lines.append(f"| VIX | {vix.get('date', '')} | {_format_number(vix.get('value'))} | - |")
# VXN
if vxn.get("value"):
lines.append(f"| VXN | {vxn.get('date', '')} | {_format_number(vxn.get('value'))} | - |")
# Beta
beta_yf = _get_value(yf_vol.get("beta"))
beta_av = _get_value(av_vol.get("beta")) if av_vol else None
if beta_yf or beta_av:
lines.append(f"| Beta | - | {_format_number(beta_yf, '', 3)} | {_format_number(beta_av, '', 3) if beta_av else 'N/A'} |")
# Historical Volatility
hv_yf = _get_value(yf_vol.get("historical_volatility"))
hv_av = _get_value(av_vol.get("historical_volatility")) if av_vol else None
if hv_yf or hv_av:
lines.append(f"| Historical Volatility | - | {_format_number(hv_yf, '%')} | {_format_number(hv_av, '%') if hv_av else 'N/A'} |")
# Implied Volatility
iv_yf = _get_value(yf_vol.get("implied_volatility"))
if iv_yf:
lines.append(f"| Implied Volatility | - | {_format_number(iv_yf, '%')} | N/A |")
lines.append("")
# ========== MACRO ==========
macro_all = multi_source.get("macro_all", {})
if macro_all:
lines.append("## Macro Indicators")
lines.append("Primary: BEA/BLS | Secondary: FRED")
lines.append("")
lines.append("| Metric | Period | BEA/BLS | FRED |")
lines.append("|--------|--------|---------|------|")
bea_src = macro_all.get("bea_bls", {})
bea_bls = bea_src.get("data", bea_src) if "data" in bea_src else bea_src
fred_src = macro_all.get("fred", {})
fred = fred_src.get("data", fred_src) if "data" in fred_src else fred_src
# GDP Growth
gdp_p = bea_bls.get("gdp_growth", {}) or {}
gdp_f = fred.get("gdp_growth", {}) or {}
gdp_date = gdp_p.get("date", "") or gdp_f.get("date", "")
lines.append(f"| GDP Growth | {gdp_date} | {_format_number(gdp_p.get('value'), '%')} | {_format_number(gdp_f.get('value'), '%')} |")
# CPI/Inflation
cpi_p = bea_bls.get("cpi_inflation", {}) or {}
cpi_f = fred.get("cpi_inflation", {}) or {}
cpi_date = cpi_p.get("date", "") or cpi_f.get("date", "")
lines.append(f"| Inflation (CPI YoY) | {cpi_date} | {_format_number(cpi_p.get('value'), '%')} | {_format_number(cpi_f.get('value'), '%')} |")
# Unemployment
unemp_p = bea_bls.get("unemployment", {}) or {}
unemp_f = fred.get("unemployment", {}) or {}
unemp_date = unemp_p.get("date", "") or unemp_f.get("date", "")
lines.append(f"| Unemployment | {unemp_date} | {_format_number(unemp_p.get('value'), '%')} | {_format_number(unemp_f.get('value'), '%')} |")
# Fed Funds Rate (FRED only)
rates = fred.get("interest_rate", {}) or {}
lines.append(f"| Fed Funds Rate | {rates.get('date', '')} | - | {_format_number(rates.get('value'), '%')} |")
lines.append("")
# ========== NEWS ==========
news = metrics.get("news", {})
if news:
# New format: {tavily: [...], nyt: [...], newsapi: [...]}
all_articles = []
for source in ["tavily", "nyt", "newsapi"]:
for article in news.get(source, []):
all_articles.append({**article, "source": source})
if all_articles:
lines.append("## News Articles")
lines.append("")
lines.append("| # | Title | Source | URL |")
lines.append("|---|-------|--------|-----|")
for i, article in enumerate(all_articles[:10], 1):
title = article.get("title", "Untitled")
source = article.get("source", "Unknown")
url = article.get("url", "")
lines.append(f"| {i} | {title} | {source} | {url} |")
lines.append("")
# ========== SENTIMENT ==========
sentiment = metrics.get("sentiment", {})
if sentiment:
# New format: {finnhub: [...], reddit: [...]}
finnhub_articles = sentiment.get("finnhub", [])
reddit_posts = sentiment.get("reddit", [])
lines.append("## Sentiment Analysis")
lines.append("")
lines.append("| Source | Items |")
lines.append("|--------|-------|")
lines.append(f"| Finnhub | {len(finnhub_articles)} articles |")
lines.append(f"| Reddit | {len(reddit_posts)} posts |")
lines.append("")
# Show Finnhub articles
if finnhub_articles:
lines.append("### Finnhub Articles")
lines.append("")
lines.append("| # | Title | URL |")
lines.append("|---|-------|-----|")
for i, article in enumerate(finnhub_articles[:10], 1):
title = article.get("title", "Untitled")
url = article.get("url", "")
lines.append(f"| {i} | {title} | {url} |")
lines.append("")
# Show Reddit posts
if reddit_posts:
lines.append("### Reddit Posts")
lines.append("")
lines.append("| # | Title | URL |")
lines.append("|---|-------|-----|")
for i, post in enumerate(reddit_posts[:10], 1):
title = post.get("title", "Untitled")
url = post.get("url", "")
lines.append(f"| {i} | {title} | {url} |")
lines.append("")
lines.append("---")
lines.append("")
return "\n".join(lines)
def _extract_key_metrics(raw_data: str) -> dict:
"""Extract and format key metrics from raw JSON data, preserving temporal info."""
try:
data = json.loads(raw_data)
except json.JSONDecodeError:
return {"error": "Could not parse raw data"}
metrics = data.get("metrics", {})
# Extract company profile for business address
company_profile = data.get("company_profile", {})
extracted = {
"company": data.get("company_name", "Unknown"),
"ticker": data.get("ticker", "N/A"),
"business_address": company_profile.get("business_address", ""),
"fundamentals": {},
"valuation": {},
"volatility": {},
"macro": {},
"news": {},
"sentiment": {},
"aggregated_swot": data.get("aggregated_swot", {})
}
# Extract fundamentals with temporal data
# Structure varies:
# Formats supported:
# - Old: {"sec_edgar": {"data": {...}}, "yahoo_finance": {"data": {...}}}
# - New (flat): {"sec_edgar": {...}, "yahoo_finance": {...}}
fin = metrics.get("fundamentals", {})
if not fin or "error" in fin:
fin = data.get("multi_source", {}).get("fundamentals_all", {})
if fin and "error" not in fin:
# Handle both old format (with "data" wrapper) and new flat format
sec_source = fin.get("sec_edgar", {})
sec_data = sec_source.get("data", sec_source) if "data" in sec_source else sec_source
yf_source = fin.get("yahoo_finance", {})
yf_data = yf_source.get("data", yf_source) if "data" in yf_source else yf_source
# Merge with SEC as primary
fin_data = {**yf_data, **sec_data} # SEC overwrites YF where both exist
extracted["fundamentals"] = {
"revenue": _extract_temporal_metric(fin_data.get("revenue", {})),
"revenue_cagr_3yr": fin_data.get("revenue_growth_3yr"),
"net_margin": _extract_temporal_metric(fin_data.get("net_margin_pct", {})),
"gross_margin": _extract_temporal_metric(fin_data.get("gross_margin_pct", {})),
"operating_margin": _extract_temporal_metric(fin_data.get("operating_margin_pct", {})),
"eps": _extract_temporal_metric(fin_data.get("eps", {})),
"debt_to_equity": _extract_temporal_metric(fin_data.get("debt_to_equity", {})),
"free_cash_flow": _extract_temporal_metric(fin_data.get("free_cash_flow", {})),
"net_income": _extract_temporal_metric(fin_data.get("net_income", {})),
}
# Extract valuation (with temporal data)
# Handle both old format (with "data" wrapper) and new flat format
val = metrics.get("valuation", {})
if not val or "error" in val:
val = data.get("multi_source", {}).get("valuation_all", {})
if val and "error" not in val:
# New MCP structure: {yahoo_finance: {...}, alpha_vantage: {...}}
# Check both sources - yahoo_finance is primary, alpha_vantage is fallback
yf_val = val.get("yahoo_finance", {})
av_val = val.get("alpha_vantage", {})
extracted["valuation"] = {
"pe_trailing": _extract_valuation_metric(yf_val.get("trailing_pe") or av_val.get("trailing_pe", {})),
"pe_forward": _extract_valuation_metric(yf_val.get("forward_pe") or av_val.get("forward_pe", {})),
"pb_ratio": _extract_valuation_metric(yf_val.get("pb_ratio") or av_val.get("pb_ratio", {})),
"ps_ratio": _extract_valuation_metric(yf_val.get("ps_ratio") or av_val.get("ps_ratio", {})),
"ev_ebitda": _extract_valuation_metric(av_val.get("ev_ebitda") or yf_val.get("ev_ebitda", {})),
"valuation_signal": val.get("overall_signal"),
}
# Extract volatility (with temporal data)
# New structure: {fred: {vix: {...}}, yahoo_finance: {beta: {...}}}
vol = metrics.get("volatility", {})
if not vol or "error" in vol:
vol = data.get("multi_source", {}).get("volatility_all", {})
if vol and "error" not in vol:
# Yahoo Finance data (beta, historical volatility)
yf_vol_source = vol.get("yahoo_finance", {})
yf_vol = yf_vol_source.get("data", yf_vol_source) if "data" in yf_vol_source else yf_vol_source
# FRED data (VIX)
fred_source = vol.get("fred", {})
fred_vol = fred_source.get("data", fred_source) if "data" in fred_source else fred_source
extracted["volatility"] = {
"beta": _extract_valuation_metric(yf_vol.get("beta", {})),
"vix": _extract_valuation_metric(fred_vol.get("vix", {})),
"historical_volatility": _extract_valuation_metric(yf_vol.get("historical_volatility", {})),
}
# Extract macro (with temporal data)
# New structure: {bea: {gdp_growth: {...}}, bls: {unemployment_rate: {...}}, fred: {fed_funds_rate: {...}}}
macro = metrics.get("macro", {})
if not macro or "error" in macro:
macro = data.get("multi_source", {}).get("macro_all", {})
if macro and "error" not in macro:
# BEA data (GDP)
bea_source = macro.get("bea", {})
bea = bea_source.get("data", bea_source) if "data" in bea_source else bea_source
# BLS data (unemployment, CPI)
bls_source = macro.get("bls", {})
bls = bls_source.get("data", bls_source) if "data" in bls_source else bls_source
# FRED data (interest rates)
fred_source = macro.get("fred", {})
fred = fred_source.get("data", fred_source) if "data" in fred_source else fred_source
extracted["macro"] = {
"gdp_growth": _extract_valuation_metric(bea.get("gdp_growth", {})),
"interest_rate": _extract_valuation_metric(fred.get("interest_rate", {})),
"inflation": _extract_valuation_metric(bls.get("cpi_inflation", {})),
"unemployment": _extract_valuation_metric(bls.get("unemployment", {})),
}
# Extract news with VADER sentiment
# New format: {tavily: [...], nyt: [...], newsapi: [...]}
news = metrics.get("news", {})
if news and "error" not in news:
all_articles = []
for source in ["tavily", "nyt", "newsapi"]:
all_articles.extend(news.get(source, []))
headlines = [a.get("title", "") for a in all_articles if a.get("title")]
# Compute VADER sentiment on headlines
vader_news = _compute_vader_sentiment(headlines)
extracted["news"] = {
"article_count": len(all_articles),
"headlines": [a.get("title", "")[:100] for a in all_articles[:5]],
"vader_sentiment": vader_news,
}
# Extract sentiment with VADER on reddit posts
# New format: {finnhub: [...], reddit: [...]}
sent = metrics.get("sentiment", {})
if sent and "error" not in sent:
reddit_posts = sent.get("reddit", [])
reddit_titles = [p.get("title", "") for p in reddit_posts if p.get("title")]
# Compute VADER sentiment on reddit titles
vader_reddit = _compute_vader_sentiment(reddit_titles)
extracted["sentiment"] = {
"finnhub_count": len(sent.get("finnhub", [])),
"reddit_count": len(reddit_posts),
"vader_reddit": vader_reddit,
}
return extracted
def _format_metrics_for_prompt(extracted: dict, is_financial: bool = False) -> str:
"""Format extracted metrics into a clear text for the LLM.
Args:
extracted: Extracted metrics dictionary
is_financial: If True, exclude EV/EBITDA from valuation metrics
"""
lines = []
lines.append(f"Company: {extracted['company']} ({extracted['ticker']})")
lines.append("")
# Financials (with temporal context)
fin = extracted.get("fundamentals", {})
if fin:
lines.append("=== FINANCIALS (from SEC EDGAR) ===")
# Revenue with fiscal period
revenue = fin.get("revenue", {})
if isinstance(revenue, dict) and revenue.get("value"):
period = _get_fiscal_period_label(revenue)
period_str = f" ({period})" if period else ""
lines.append(f"- Revenue: ${revenue['value']:,.0f}{period_str}")
elif isinstance(revenue, (int, float)):
lines.append(f"- Revenue: ${revenue:,.0f}")
cagr = fin.get("revenue_cagr_3yr")
if cagr:
if isinstance(cagr, dict) and cagr.get("value") is not None:
lines.append(f"- Revenue CAGR (3yr): {cagr['value']:.1f}%")
elif isinstance(cagr, (int, float)):
lines.append(f"- Revenue CAGR (3yr): {cagr:.1f}%")
# Net margin with fiscal period
net_margin = fin.get("net_margin", {})
if isinstance(net_margin, dict) and net_margin.get("value") is not None:
period = _get_fiscal_period_label(net_margin)
period_str = f" ({period})" if period else ""
lines.append(f"- Net Margin: {net_margin['value']:.1f}%{period_str}")
elif isinstance(net_margin, (int, float)):
lines.append(f"- Net Margin: {net_margin:.1f}%")
# EPS with fiscal period
eps = fin.get("eps", {})
if isinstance(eps, dict) and eps.get("value"):
period = _get_fiscal_period_label(eps)
period_str = f" ({period})" if period else ""
lines.append(f"- EPS: ${eps['value']:.2f}{period_str}")
elif isinstance(eps, (int, float)):
lines.append(f"- EPS: ${eps:.2f}")
# Debt/Equity with fiscal period
d_to_e = fin.get("debt_to_equity", {})
if isinstance(d_to_e, dict) and d_to_e.get("value") is not None:
period = _get_fiscal_period_label(d_to_e)
period_str = f" ({period})" if period else ""
lines.append(f"- Debt/Equity: {d_to_e['value']:.2f}{period_str}")
elif isinstance(d_to_e, (int, float)):
lines.append(f"- Debt/Equity: {d_to_e:.2f}")
# Free Cash Flow with fiscal period
fcf = fin.get("free_cash_flow", {})
if isinstance(fcf, dict) and fcf.get("value"):
period = _get_fiscal_period_label(fcf)
period_str = f" ({period})" if period else ""
lines.append(f"- Free Cash Flow: ${fcf['value']:,.0f}{period_str}")
elif isinstance(fcf, (int, float)):
lines.append(f"- Free Cash Flow: ${fcf:,.0f}")
lines.append("")
# Helper to extract value from temporal dict or plain value
def _get_val(d):
if isinstance(d, dict):
return d.get("value")
return d
# Valuation
val = extracted.get("valuation", {})
if val:
lines.append("=== VALUATION (from Yahoo Finance) ===")
pe_t = _get_val(val.get("pe_trailing"))
pe_f = _get_val(val.get("pe_forward"))
pb = _get_val(val.get("pb_ratio"))
ps = _get_val(val.get("ps_ratio"))
ev = _get_val(val.get("ev_ebitda"))
if pe_t:
lines.append(f"- P/E Ratio (trailing): {pe_t:.1f}")
if pe_f:
lines.append(f"- P/E Ratio (forward): {pe_f:.1f}")
if pb:
lines.append(f"- P/B Ratio: {pb:.2f}")
if ps:
lines.append(f"- P/S Ratio: {ps:.2f}")
if ev and not is_financial:
lines.append(f"- EV/EBITDA: {ev:.1f}")
if val.get("valuation_signal"):
lines.append(f"- Overall Signal: {val['valuation_signal']}")
lines.append("")
# Volatility
vol = extracted.get("volatility", {})
if vol:
lines.append("=== VOLATILITY/RISK ===")
beta = _get_val(vol.get("beta"))
vix = _get_val(vol.get("vix"))
hv = _get_val(vol.get("historical_volatility"))
if beta:
lines.append(f"- Beta: {beta:.2f}")
if vix:
lines.append(f"- VIX (market fear index): {vix:.1f}")
if hv:
lines.append(f"- Historical Volatility: {hv:.1f}%")
lines.append("")
# Macro
macro = extracted.get("macro", {})
if macro:
lines.append("=== MACROECONOMIC ENVIRONMENT (from FRED) ===")
gdp = _get_val(macro.get("gdp_growth"))
ir = _get_val(macro.get("interest_rate"))
inf = _get_val(macro.get("inflation"))
unemp = _get_val(macro.get("unemployment"))
if gdp:
lines.append(f"- GDP Growth: {gdp:.1f}%")
if ir:
lines.append(f"- Federal Funds Rate: {ir:.2f}%")
if inf:
lines.append(f"- Inflation (CPI): {inf:.1f}%")
if unemp:
lines.append(f"- Unemployment: {unemp:.1f}%")
lines.append("")
# News with VADER sentiment
news = extracted.get("news", {})
if news:
lines.append("=== RECENT NEWS ===")
lines.append(f"- Articles found: {news.get('article_count', 0)}")
# VADER sentiment scores for news
vader_news = news.get("vader_sentiment")
if vader_news:
lines.append(f"- VADER Sentiment: {vader_news['avg_compound']:.2f} (range: {vader_news['min_compound']:.2f} to {vader_news['max_compound']:.2f})")
lines.append(f" Breakdown: {vader_news['positive_count']} positive, {vader_news['negative_count']} negative, {vader_news['neutral_count']} neutral")
for headline in news.get("headlines", []):
lines.append(f" • {headline}")
lines.append("")
# Sentiment with VADER for reddit
sent = extracted.get("sentiment", {})
if sent:
lines.append("=== MARKET SENTIMENT ===")
if sent.get("composite_score") is not None:
lines.append(f"- Composite Score: {sent['composite_score']:.2f}")
if sent.get("overall_category"):
lines.append(f"- Overall: {sent['overall_category']}")
# VADER sentiment scores for reddit
vader_reddit = sent.get("vader_reddit")
if vader_reddit:
lines.append(f"- Reddit VADER: {vader_reddit['avg_compound']:.2f} (range: {vader_reddit['min_compound']:.2f} to {vader_reddit['max_compound']:.2f})")
lines.append(f" Breakdown: {vader_reddit['positive_count']} positive, {vader_reddit['negative_count']} negative, {vader_reddit['neutral_count']} neutral")
lines.append("")
# Pre-built SWOT hints from MCP servers
swot = extracted.get("aggregated_swot", {})
if any(swot.get(k) for k in ["strengths", "weaknesses", "opportunities", "threats"]):
lines.append("=== DATA-DRIVEN SWOT SIGNALS (from metrics analysis) ===")
for category in ["strengths", "weaknesses", "opportunities", "threats"]:
items = swot.get(category, [])
if items:
lines.append(f"{category.upper()}:")
for item in items:
lines.append(f" • {item}")
lines.append("")
return "\n".join(lines)
# ============================================================
# METRIC REFERENCE TABLE - For Hallucination Prevention (Layer 1)
# ============================================================
import hashlib
def _format_metric_for_reference(key: str, value, temporal_info: dict = None) -> tuple:
"""
Format a single metric for the reference table with exact as-of date.
Returns:
tuple: (formatted_string, as_of_date)
"""
if value is None:
return None, None
# Format value based on metric type
if key in ("revenue", "net_income", "free_cash_flow", "market_cap", "enterprise_value",
"total_assets", "total_liabilities", "stockholders_equity", "operating_cash_flow"):
# Use human-readable format with B/M suffixes
if abs(value) >= 1e9:
formatted = f"${value/1e9:.1f}B"
elif abs(value) >= 1e6:
formatted = f"${value/1e6:.0f}M"
else:
formatted = f"${value:,.0f}"
elif key in ("net_margin", "gross_margin", "operating_margin", "gdp_growth",
"inflation", "unemployment", "historical_volatility", "revenue_cagr_3yr"):
formatted = f"{value:.1f}%"
elif key in ("interest_rate",):
formatted = f"{value:.2f}%"
elif key in ("pe_trailing", "pe_forward", "ps_ratio", "ev_ebitda", "vix"):
formatted = f"{value:.1f}"
elif key in ("pb_ratio", "debt_to_equity", "beta"):
formatted = f"{value:.2f}"
elif key in ("eps",):
formatted = f"${value:.2f}"
elif key in ("composite_score",):
formatted = f"{value:.1f}"
else:
# Default formatting for unknown metrics
if isinstance(value, float):
formatted = f"{value:.2f}"
else:
formatted = str(value)
# Extract actual date (not fiscal period label)
as_of_date = None
if temporal_info and isinstance(temporal_info, dict):
as_of_date = temporal_info.get("end_date") # e.g., "2024-09-28"
if as_of_date:
formatted = f"{formatted} (as of {as_of_date})"
return formatted, as_of_date
def _generate_metric_reference_table(extracted: dict, is_financial: bool = False) -> tuple:
"""
Generate an immutable metric reference table for LLM grounding.
Args:
extracted: Extracted metrics dictionary from _extract_key_metrics()
is_financial: If True, exclude EV/EBITDA
Returns:
tuple: (table_string, metric_lookup_dict)
"""
lines = [
"=" * 60,
"METRIC REFERENCE TABLE - COPY VALUES EXACTLY AS SHOWN",
"=" * 60,
"",
"CRITICAL INSTRUCTION:",
"- Copy metric values EXACTLY as shown (including $, %, decimals)",
"- Do NOT round, estimate, or approximate numbers",
"- Do NOT invent metrics not listed below",
"- Include the 'as of' date when citing temporal metrics",
"",
]
lookup = {}
mid = 1
# Define categories and their metric keys
categories = [
("FUNDAMENTALS", "fundamentals", [
"revenue", "net_income", "net_margin", "gross_margin", "operating_margin",
"eps", "debt_to_equity", "free_cash_flow", "revenue_cagr_3yr"
]),
("VALUATION", "valuation", [
"pe_trailing", "pe_forward", "pb_ratio", "ps_ratio", "ev_ebitda"
]),
("VOLATILITY", "volatility", [
"beta", "vix", "historical_volatility"
]),
("MACRO", "macro", [
"gdp_growth", "interest_rate", "inflation", "unemployment"
]),
]
for label, cat_key, metric_keys in categories:
data = extracted.get(cat_key, {})
if not data:
continue
category_lines = []
for metric_key in metric_keys:
metric_val = data.get(metric_key)
if metric_val is None:
continue
# Skip EV/EBITDA for financial institutions
if is_financial and metric_key == "ev_ebitda":
continue
# Handle temporal metrics (dict with value and end_date)
if isinstance(metric_val, dict) and metric_val.get("value") is not None:
raw_value = metric_val["value"]
formatted, as_of_date = _format_metric_for_reference(
metric_key, raw_value, metric_val
)
elif isinstance(metric_val, (int, float)):
raw_value = metric_val
formatted, as_of_date = _format_metric_for_reference(metric_key, raw_value)
else:
continue # Skip non-numeric
if formatted:
ref_id = f"M{mid:02d}"
category_lines.append(f" {ref_id}: {metric_key} = {formatted}")
lookup[ref_id] = {
"key": metric_key,
"raw_value": raw_value,
"formatted": formatted,
"as_of_date": as_of_date,
"category": cat_key
}
mid += 1
if category_lines:
lines.append(f"[{label}]")
lines.extend(category_lines)
lines.append("")
# Add VADER sentiment metrics (news and reddit)
sentiment_lines = []
# News VADER sentiment
news_data = extracted.get("news", {})
if news_data.get("vader_sentiment"):
vader = news_data["vader_sentiment"]
ref_id = f"M{mid:02d}"
formatted = f"{vader['avg_compound']:.2f}"
sentiment_lines.append(f" {ref_id}: news_sentiment = {formatted} ({vader['total_count']} articles)")
lookup[ref_id] = {
"key": "news_sentiment",
"raw_value": vader['avg_compound'],
"formatted": formatted,
"as_of_date": None,
"category": "sentiment"
}
mid += 1
# Reddit VADER sentiment
sent_data = extracted.get("sentiment", {})
if sent_data.get("vader_reddit"):
vader = sent_data["vader_reddit"]
ref_id = f"M{mid:02d}"
formatted = f"{vader['avg_compound']:.2f}"
sentiment_lines.append(f" {ref_id}: reddit_sentiment = {formatted} ({vader['total_count']} posts)")
lookup[ref_id] = {
"key": "reddit_sentiment",
"raw_value": vader['avg_compound'],
"formatted": formatted,
"as_of_date": None,
"category": "sentiment"
}
mid += 1
if sentiment_lines:
lines.append("[SENTIMENT]")
lines.extend(sentiment_lines)
lines.append("")
lines.append("=" * 60)
lines.append("")
return "\n".join(lines), lookup
def _compute_reference_hash(metric_lookup: dict) -> str:
"""Compute SHA256 hash of metric lookup for integrity verification."""
# Sort keys for deterministic serialization
serialized = json.dumps(metric_lookup, sort_keys=True, default=str)
return hashlib.sha256(serialized.encode()).hexdigest()
def _verify_reference_integrity(metric_lookup: dict, stored_hash: str) -> bool:
"""Verify metric lookup hasn't been corrupted."""
if not metric_lookup or not stored_hash:
return False
return _compute_reference_hash(metric_lookup) == stored_hash
def _format_reference_log(metric_lookup: dict) -> str:
"""Format metric reference as compact single-line log for activity display."""
if not metric_lookup:
return "No metrics extracted"
parts = []
for ref_id in sorted(metric_lookup.keys()):
entry = metric_lookup[ref_id]
key = entry.get("key", "unknown")
formatted = entry.get("formatted", "N/A")
# Shorten large numbers for compact display
if "$" in formatted and len(formatted) > 15:
# Convert $394,328,000,000 to $394.3B
raw = entry.get("raw_value", 0)
if isinstance(raw, (int, float)) and abs(raw) >= 1e9:
formatted = f"${raw/1e9:.1f}B"
elif isinstance(raw, (int, float)) and abs(raw) >= 1e6:
formatted = f"${raw/1e6:.0f}M"
# Remove "as of" date for compact display
if " (as of " in formatted:
formatted = formatted.split(" (as of ")[0]
parts.append(f"{key}={formatted}")
return ", ".join(parts)
def _format_metric_key(key: str) -> str:
"""Format metric key to human-readable name (e.g., pb_ratio -> P/B Ratio)."""
METRIC_NAMES = {
"revenue": "Revenue", "net_income": "Net Income", "net_margin": "Net Margin",
"net_margin_pct": "Net Margin", "gross_margin": "Gross Margin", "operating_margin": "Operating Margin",
"free_cash_flow": "Free Cash Flow", "operating_cash_flow": "Operating Cash Flow",
"total_assets": "Total Assets", "total_liabilities": "Total Liabilities",
"stockholders_equity": "Stockholders' Equity", "debt_to_equity": "Debt/Equity",
"eps": "EPS", "market_cap": "Market Cap", "enterprise_value": "Enterprise Value",
"trailing_pe": "P/E (Trailing)", "forward_pe": "P/E (Forward)",
"pb_ratio": "P/B Ratio", "ps_ratio": "P/S Ratio", "trailing_peg": "PEG Ratio",
"price_to_fcf": "Price/FCF", "ev_ebitda": "EV/EBITDA", "ev_revenue": "EV/Revenue",
"vix": "VIX", "beta": "Beta", "historical_volatility": "Historical Volatility",
"gdp_growth": "GDP Growth", "interest_rate": "Interest Rate",
"cpi_inflation": "Inflation", "unemployment": "Unemployment",
}
return METRIC_NAMES.get(key, key.replace("_", " ").title())
def _generate_data_quality_notes(metric_reference: dict) -> dict:
"""
Generate deterministic data quality assessment from metric reference.
Returns:
{
"high_confidence": ["Revenue", "Net Margin", ...],
"gaps_or_stale": ["EPS (stale: 2024-06-30)", "Debt/Equity (missing)"],
}
"""
from datetime import datetime, timedelta
high_confidence = []
gaps_or_stale = []
threshold = timedelta(days=30)
today = datetime.now()
for ref_id, entry in metric_reference.items():
key = entry.get("key", "unknown")
display_name = _format_metric_key(key)
raw_value = entry.get("raw_value")
as_of_date = entry.get("as_of_date")
if raw_value is None:
gaps_or_stale.append(f"{display_name} (missing)")
elif as_of_date:
try:
date = datetime.strptime(as_of_date, "%Y-%m-%d")
if today - date > threshold:
gaps_or_stale.append(f"{display_name} (stale: {as_of_date})")
else:
high_confidence.append(display_name)
except ValueError:
high_confidence.append(display_name)
else:
high_confidence.append(display_name)
return {
"high_confidence": high_confidence,
"gaps_or_stale": gaps_or_stale,
}
# New institutional-grade prompt
ANALYZER_SYSTEM_PROMPT = """You are a senior financial analyst producing institutional-grade SWOT analyses.
## DATA GROUNDING RULES (CRITICAL)
1. USE ONLY the provided data. Never invent or assume metrics not given.
2. CITE specific numbers for every finding (e.g., "Net margin: 24.3%", "P/E: 21.3x").
3. If data is missing, state "Insufficient data" - do NOT fabricate.
4. Distinguish trailing (historical) vs forward (projected) metrics.
## AVAILABLE DATA BASKETS
### Fundamentals (SEC EDGAR + Yahoo Finance)
revenue, net_income, net_margin_pct, gross_margin_pct, operating_margin_pct,
total_assets, total_liabilities, stockholders_equity, free_cash_flow,
operating_cash_flow, long_term_debt, debt_to_equity, eps
### Valuation (Yahoo Finance)
market_cap, enterprise_value, trailing_pe, forward_pe, pb_ratio, ps_ratio,
trailing_peg, price_to_fcf, revenue_growth, earnings_growth
{ev_ebitda_note}
### Volatility (FRED + Yahoo)
vix, vxn, beta, historical_volatility, implied_volatility
### Macro (BEA/BLS/FRED)
gdp_growth, interest_rate, cpi_inflation, unemployment
### News & Sentiment
News articles with title, source, url
Sentiment scores from Finnhub and Reddit
## WHAT YOU DO NOT DO
- Provide buy/sell/hold recommendations
- Compare to sector/peer benchmarks (data not provided)
- Speculate beyond provided data
- Use vague hedge words without quantification"""
def _build_revision_prompt(
critique_details: dict,
company_data: str,
current_draft: str,
is_financial: bool,
extracted: dict = None
) -> str:
"""Build revision prompt with conditional focus areas based on failed criteria.
Args:
critique_details: Structured dict from Critic with scores and feedback
company_data: Formatted metrics string for reference
current_draft: The current SWOT draft to be revised
is_financial: Whether the company is a financial institution
extracted: Extracted metrics dict for reference table generation
Returns:
Complete revision prompt string
"""
# Generate metric reference table for revision (same as initial mode)
reference_table = ""
if extracted:
reference_table, _ = _generate_metric_reference_table(extracted, is_financial)
scores = critique_details.get("scores", {})
# Determine which focus areas to include based on failed criteria
focus_areas = []
if scores.get("evidence_grounding", 10) < 7:
focus_areas.append(EVIDENCE_GROUNDING_BLOCK)
if scores.get("constraint_compliance", 10) < 6:
focus_areas.append(CONSTRAINT_COMPLIANCE_BLOCK)
if scores.get("specificity_actionability", 10) < 7:
focus_areas.append(SPECIFICITY_BLOCK)
if scores.get("strategic_insight", 10) < 7:
focus_areas.append(INSIGHT_BLOCK)
if scores.get("completeness_balance", 10) < 7:
focus_areas.append(COMPLETENESS_BLOCK)
if scores.get("clarity_structure", 10) < 7:
focus_areas.append(CLARITY_BLOCK)
# Format critic feedback components
deficiencies = critique_details.get("key_deficiencies", [])
strengths = critique_details.get("strengths_to_preserve", [])
feedback = critique_details.get("actionable_feedback", [])
# Build deficiencies section
deficiencies_text = "\n".join(f"- {d}" for d in deficiencies) if deficiencies else "- None specified"
# Build strengths section
strengths_text = "\n".join(f"- {s}" for s in strengths) if strengths else "- None specified"
# Build feedback section
feedback_text = "\n".join(f"{i+1}. {f}" for i, f in enumerate(feedback)) if feedback else "- None specified"
# Build focus areas section
focus_areas_text = "\n".join(focus_areas) if focus_areas else "Address all deficiencies listed above."
# Add EV/EBITDA note for financial institutions
ev_note = ""
if is_financial:
ev_note = "\n**Note:** This is a financial institution - EV/EBITDA is excluded from analysis."
prompt = f"""{reference_table}## REVISION MODE ACTIVATED
You previously generated a SWOT analysis that did not meet quality standards. You are now in revision mode.
### YOUR TASK
1. **Review the Critic's feedback** carefully
2. **Address each deficiency** listed in priority order
3. **Preserve strengths** explicitly called out — do not regress on what worked
4. **Regenerate the complete SWOT** — not a partial patch
5. **Use EXACT values from the METRIC REFERENCE TABLE above** — do not round or estimate
### CRITIC FEEDBACK
Status: {critique_details.get('status', 'REJECTED')}
Weighted Score: {critique_details.get('weighted_score', 0):.1f} / 10
**Key Deficiencies:**
{deficiencies_text}
**Strengths to Preserve:**
{strengths_text}
**Actionable Feedback:**
{feedback_text}
### FOCUS AREAS FOR THIS REVISION
{focus_areas_text}
### REVISION RULES
**DO:**
- Fix every item in "Key Deficiencies" — these are blocking issues
- Apply each point in "Actionable Feedback" — these are specific instructions
- Keep everything listed under "Strengths to Preserve" — do not modify these sections
- **Use EXACT metric values from the METRIC REFERENCE TABLE** — copy numbers verbatim
- **Include [M##] citation after every metric value** — e.g., "$394.3B [M01]"
- Include the 'as of' date when citing temporal metrics
{ev_note}
**DO NOT:**
- Ignore lower-priority feedback items — address all of them
- Introduce new metrics not in the original input data
- **Round, estimate, or approximate any numbers** — use exact values only
- **Omit [M##] citations** — they are required for automatic verification
- Remove content that was working well
- Add defensive caveats or apologies about the revision
- Reference the revision process in your output — produce a clean SWOT as if first attempt
### REFERENCE DATA
{company_data}
### CURRENT DRAFT (to revise)
{current_draft}
### OUTPUT INSTRUCTIONS
Produce a complete, revised SWOT analysis with this exact structure (3-5 points per section):
## Strengths
- [M01] Revenue: $394.3B - Strong market position with substantial scale
- [M02] Net Margin: 24.3% - High profitability indicates pricing power
## Weaknesses
- [M04] Debt/Equity: 1.87 - Elevated leverage increases financial risk
## Opportunities
- [M12] GDP Growth: 4.3% - Favorable macro environment for expansion
## Threats
- [M13] Interest Rate: 3.72% - Higher borrowing costs may impact margins
CRITICAL REQUIREMENTS:
1. Each point MUST start with metric reference in brackets: [M##]
2. Format: [M##] Metric: Value - Strategic insight
3. Use EXACT values from the METRIC REFERENCE TABLE - do NOT round
4. Keep insights concise (one sentence)
5. Include 3-5 points per section
Do not:
- Include any preamble about revisions
- Reference the Critic's feedback in your output
Simply output the improved SWOT as a clean, final deliverable."""
return prompt
def _build_analyzer_prompt(company: str, ticker: str, formatted_data: str,
is_financial: bool, extracted: dict = None) -> tuple:
"""Build analyzer prompt with metric reference table for hallucination prevention.
Args:
company: Company name
ticker: Stock ticker
formatted_data: Formatted metrics text
is_financial: If True, exclude EV/EBITDA
extracted: Extracted metrics dict (for reference table generation)
Returns:
tuple: (prompt_string, metric_lookup_dict, reference_hash)
"""
# Generate metric reference table if extracted data is available
reference_table = ""
metric_lookup = {}
ref_hash = ""
if extracted:
reference_table, metric_lookup = _generate_metric_reference_table(extracted, is_financial)
ref_hash = _compute_reference_hash(metric_lookup)
if is_financial:
ev_note = "\nNote: EV/EBITDA excluded - not meaningful for financial institutions."
else:
ev_note = ", ev_ebitda, ev_revenue"
system = ANALYZER_SYSTEM_PROMPT.format(ev_ebitda_note=ev_note)
prompt = f"""{reference_table}{system}
=== DATA FOR {company} ({ticker}) ===
{formatted_data}
=== OUTPUT FORMAT ===
Produce a SWOT analysis with this exact structure (3-5 points per section):
## Strengths
- [M01] Revenue: $394.3B - Strong market position with substantial scale
- [M02] Net Margin: 24.3% - High profitability indicates pricing power
## Weaknesses
- [M04] Debt/Equity: 1.87 - Elevated leverage increases financial risk
## Opportunities
- [M12] GDP Growth: 4.3% - Favorable macro environment for expansion
## Threats
- [M13] Interest Rate: 3.72% - Higher borrowing costs may impact margins
CRITICAL REQUIREMENTS:
1. Each point MUST start with metric reference in brackets: [M##]
2. Format: [M##] Metric: Value - Strategic insight
3. Use EXACT values from the METRIC REFERENCE TABLE - do NOT round
4. Keep insights concise (one sentence)
5. Include 3-5 points per section"""
return prompt, metric_lookup, ref_hash
@traceable(name="Analyzer")
def analyzer_node(state, workflow_id=None, progress_store=None):
# Extract workflow_id and progress_store from state (graph invokes with state only)
if workflow_id is None:
workflow_id = state.get("workflow_id")
if progress_store is None:
progress_store = state.get("progress_store")
# Update progress if tracking is enabled
if workflow_id and progress_store:
progress_store[workflow_id].update({
"current_step": "analyzer",
"revision_count": state.get("revision_count", 0),
"score": state.get("score", 0)
})
# Use user-provided API keys if available
user_keys = state.get("user_api_keys", {})
llm = get_llm_client(user_keys) if user_keys else get_llm_client()
raw = state["raw_data"]
company = state["company_name"]
ticker = state.get("ticker", "")
# Extract company profile and detect financial institution
company_profile = _extract_company_profile(raw)
sector = company_profile.get("sector", "")
industry = company_profile.get("industry", "")
is_financial = _is_financial_institution(sector, industry, ticker)
if is_financial:
_add_activity_log(workflow_id, progress_store, "analyzer",
f"Financial institution detected - excluding EV/EBITDA")
# Extract and format metrics for better LLM understanding
extracted = _extract_key_metrics(raw)
formatted_data = _format_metrics_for_prompt(extracted, is_financial=is_financial)
# Generate detailed data report (shown before SWOT)
data_report = _generate_data_report(raw, is_financial=is_financial)
# Detect revision mode: if we have critique_details with REJECTED status
# (revision_count may still be 0 on first revision loop)
critique_details = state.get("critique_details", {})
is_revision = bool(critique_details) and critique_details.get("status") == "REJECTED"
# Debug: Log critique details presence
print(f"[DEBUG] Analyzer: critique_details={bool(critique_details)}, status={critique_details.get('status')}, is_revision={is_revision}")
if is_revision and critique_details:
# REVISION MODE: Use enhanced revision prompt with Critic feedback
current_revision = state.get("revision_count", 0) + 1
_add_activity_log(workflow_id, progress_store, "analyzer",
f"Revision #{current_revision} in progress...")
prompt = _build_revision_prompt(
critique_details=critique_details,
company_data=formatted_data,
current_draft=state.get("draft_report", ""),
is_financial=is_financial,
extracted=extracted
)
# Update progress with revision info
if workflow_id and progress_store:
progress_store[workflow_id].update({
"current_step": "analyzer",
"revision_count": current_revision,
})
else:
# INITIAL MODE: Use standard analyzer prompt
_add_activity_log(workflow_id, progress_store, "analyzer",
f"Calling LLM to generate SWOT analysis...")
prompt, metric_lookup, ref_hash = _build_analyzer_prompt(
company, ticker, formatted_data, is_financial, extracted
)
# Store metric reference for validation (Layer 1 hallucination prevention)
state["metric_reference"] = metric_lookup
state["metric_reference_hash"] = ref_hash
# Log reference values for manual verification
ref_log = _format_reference_log(metric_lookup)
_add_activity_log(workflow_id, progress_store, "analyzer",
f"Reference values: {ref_log}")
current_revision = 0
# In revision mode, add delay before LLM call to avoid rate limits
# (Critic just called LLM, so we need to wait)
if is_revision:
print("Waiting 10s before revision LLM call (rate limit buffer)...")
time.sleep(10)
start_time = time.time()
response, provider, error, providers_failed = llm.query(prompt, temperature=0)
elapsed = time.time() - start_time
# Log failed providers and update LLM status in real-time
for pf in providers_failed:
_add_activity_log(workflow_id, progress_store, "analyzer", f"LLM {pf['name']} failed: {pf['error']}")
# Update LLM status in real-time for frontend
if workflow_id and progress_store and workflow_id in progress_store:
llm_status = progress_store[workflow_id].get("llm_status", {})
if pf["name"] in llm_status:
llm_status[pf["name"]] = "failed"
# Track failed providers in state for frontend
if "llm_providers_failed" not in state:
state["llm_providers_failed"] = []
state["llm_providers_failed"].extend([pf["name"] for pf in providers_failed])
# Update successful provider status
if provider and workflow_id and progress_store and workflow_id in progress_store:
llm_status = progress_store[workflow_id].get("llm_status", {})
provider_name = provider.split(":")[0]
if provider_name in llm_status:
llm_status[provider_name] = "completed"
if error:
if is_revision:
# REVISION MODE ERROR: Graceful degradation - keep previous draft
_add_activity_log(workflow_id, progress_store, "analyzer", f"Revision failed: {error}")
if current_revision == 1:
_add_activity_log(workflow_id, progress_store, "analyzer",
"Using initial draft (revision unavailable)")
else:
_add_activity_log(workflow_id, progress_store, "analyzer",
f"Using revision #{current_revision - 1} draft (further revision unavailable)")
# Don't set error - allow workflow to complete with previous draft
state["analyzer_revision_skipped"] = True
state["revision_count"] = current_revision
else:
# INITIAL MODE ERROR: Abort workflow
state["draft_report"] = f"Error generating analysis: {error}"
state["provider_used"] = None
state["error"] = error # Signal workflow to abort
_add_activity_log(workflow_id, progress_store, "analyzer", f"LLM error: {error}")
_add_activity_log(workflow_id, progress_store, "analyzer",
"Workflow aborted - all LLM providers unavailable")
else:
if is_revision:
# REVISION MODE SUCCESS: Update draft with revision
state["draft_report"] = response
state["provider_used"] = provider
state["analyzer_revision_skipped"] = False
state["revision_count"] = current_revision
_add_activity_log(workflow_id, progress_store, "analyzer",
f"Revision #{current_revision} completed via {provider} ({elapsed:.1f}s)")
else:
# INITIAL MODE SUCCESS: Combine data report with SWOT analysis
swot_section = f"## SWOT Analysis\n\n{response}"
full_report = f"{data_report}\n{swot_section}"
state["draft_report"] = full_report
state["data_report"] = data_report # Store separately for frontend flexibility
state["provider_used"] = provider
_add_activity_log(workflow_id, progress_store, "analyzer",
f"SWOT generated via {provider} ({elapsed:.1f}s)")
# Update progress with final revision count
if workflow_id and progress_store:
progress_store[workflow_id].update({
"revision_count": state.get("revision_count", 0),
"score": state.get("score", 0)
})
return state