Researcher-Agent / tests /test_mcp_e2e.py
vn6295337's picture
Feat: Industry-specific metrics via SIC code detection + company address
ab2f4fb
"""
E2E test for all 6 MCP servers using subprocess+MCP protocol (same as production).
Usage: python tests/test_mcp_e2e.py [TICKER] [COMPANY_NAME]
Default: KO "The Coca-Cola Company"
"""
import asyncio
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
# Project root
PROJECT_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
# Load environment variables from project .env
from dotenv import load_dotenv
load_dotenv(PROJECT_ROOT / ".env")
# Import MCP client (subprocess+MCP protocol)
from mcp_client import call_mcp_server
# Default test company
DEFAULT_TICKER = "KO"
DEFAULT_COMPANY = "The Coca-Cola Company"
# MCP server timeout (seconds)
MCP_TIMEOUT = 90.0
class MCPTestResult:
"""Result from testing a single MCP."""
def __init__(self, name: str):
self.name = name
self.status = "FAIL"
self.data: Optional[Dict] = None
self.errors: List[str] = []
self.warnings: List[str] = []
self.item_count = 0
self.duration_ms = 0
async def test_fundamentals(ticker: str) -> MCPTestResult:
"""Test fundamentals-basket MCP via subprocess+MCP protocol."""
result = MCPTestResult("fundamentals")
start = datetime.now()
try:
data = await call_mcp_server(
"fundamentals-basket",
"get_all_sources_fundamentals",
{"ticker": ticker},
timeout=MCP_TIMEOUT
)
result.data = data
if not isinstance(data, dict):
result.errors.append("Response is not a dict")
return result
if "error" in data:
result.errors.append(f"MCP error: {data['error']}")
return result
# Schema validation - fundamentals uses sources.sec_edgar/sources.yahoo_finance
sources = data.get("sources", {})
sec_data = sources.get("sec_edgar", {})
yahoo_data = sources.get("yahoo_finance", {})
if not sec_data and not yahoo_data:
result.errors.append("No SEC or Yahoo data")
else:
# Count only dict metrics with values (matches extraction logic)
sec_metrics = sec_data.get("data", {}) if isinstance(sec_data, dict) else {}
yahoo_metrics = yahoo_data.get("data", {}) if isinstance(yahoo_data, dict) else {}
sec_count = sum(1 for v in sec_metrics.values() if isinstance(v, dict))
yahoo_count = sum(1 for v in yahoo_metrics.values() if isinstance(v, dict))
result.item_count = sec_count + yahoo_count
if result.item_count == 0:
result.warnings.append("No data items returned")
result.status = "PASS" if not result.errors else "FAIL"
except Exception as e:
result.errors.append(str(e))
result.duration_ms = int((datetime.now() - start).total_seconds() * 1000)
return result
async def test_valuation(ticker: str) -> MCPTestResult:
"""Test valuation-basket MCP via subprocess+MCP protocol."""
result = MCPTestResult("valuation")
start = datetime.now()
try:
data = await call_mcp_server(
"valuation-basket",
"get_all_sources_valuation",
{"ticker": ticker},
timeout=MCP_TIMEOUT
)
result.data = data
if not isinstance(data, dict):
result.errors.append("Response is not a dict")
return result
if "error" in data:
result.errors.append(f"MCP error: {data['error']}")
return result
# Schema validation
if "sources" not in data:
result.errors.append("Missing 'sources' key")
else:
sources = data.get("sources", {})
result.item_count = sum(
len(v.get("data", {})) if isinstance(v, dict) else 0
for v in sources.values()
)
if result.item_count == 0:
result.warnings.append("No data items returned")
result.status = "PASS" if not result.errors else "FAIL"
except Exception as e:
result.errors.append(str(e))
result.duration_ms = int((datetime.now() - start).total_seconds() * 1000)
return result
async def test_volatility(ticker: str) -> MCPTestResult:
"""Test volatility-basket MCP via subprocess+MCP protocol."""
result = MCPTestResult("volatility")
start = datetime.now()
try:
data = await call_mcp_server(
"volatility-basket",
"get_all_sources_volatility",
{"ticker": ticker},
timeout=MCP_TIMEOUT
)
result.data = data
if not isinstance(data, dict):
result.errors.append("Response is not a dict")
return result
if "error" in data:
result.errors.append(f"MCP error: {data['error']}")
return result
# Schema validation
if "metrics" not in data:
result.errors.append("Missing 'metrics' key")
else:
result.item_count = len(data.get("metrics", {}))
if result.item_count == 0:
result.warnings.append("No metrics returned")
result.status = "PASS" if not result.errors else "FAIL"
except Exception as e:
result.errors.append(str(e))
result.duration_ms = int((datetime.now() - start).total_seconds() * 1000)
return result
async def test_macro() -> MCPTestResult:
"""Test macro-basket MCP via subprocess+MCP protocol."""
result = MCPTestResult("macro")
start = datetime.now()
try:
data = await call_mcp_server(
"macro-basket",
"get_all_sources_macro",
{},
timeout=MCP_TIMEOUT
)
result.data = data
if not isinstance(data, dict):
result.errors.append("Response is not a dict")
return result
if "error" in data:
result.errors.append(f"MCP error: {data['error']}")
return result
# Schema validation
if "metrics" not in data:
result.errors.append("Missing 'metrics' key")
else:
result.item_count = len(data.get("metrics", {}))
if result.item_count == 0:
result.warnings.append("No metrics returned")
result.status = "PASS" if not result.errors else "FAIL"
except Exception as e:
result.errors.append(str(e))
result.duration_ms = int((datetime.now() - start).total_seconds() * 1000)
return result
async def test_news(ticker: str, company_name: str) -> MCPTestResult:
"""Test news-basket MCP via subprocess+MCP protocol."""
result = MCPTestResult("news")
start = datetime.now()
try:
data = await call_mcp_server(
"news-basket",
"get_all_sources_news",
{"ticker": ticker, "company_name": company_name},
timeout=MCP_TIMEOUT
)
result.data = data
if not isinstance(data, dict):
result.errors.append("Response is not a dict")
return result
if "error" in data:
result.errors.append(f"MCP error: {data['error']}")
return result
# Schema validation
if "items" not in data:
result.errors.append("Missing 'items' key")
else:
items = data.get("items", [])
result.item_count = len(items)
if result.item_count == 0:
result.warnings.append("No news items returned")
else:
# Validate item schema
for item in items[:3]:
if "title" not in item:
result.warnings.append("Item missing 'title'")
break
if "url" not in item:
result.warnings.append("Item missing 'url'")
break
result.status = "PASS" if not result.errors else "FAIL"
except Exception as e:
result.errors.append(str(e))
result.duration_ms = int((datetime.now() - start).total_seconds() * 1000)
return result
async def test_sentiment(ticker: str, company_name: str) -> MCPTestResult:
"""Test sentiment-basket MCP via subprocess+MCP protocol."""
result = MCPTestResult("sentiment")
start = datetime.now()
try:
data = await call_mcp_server(
"sentiment-basket",
"get_sentiment_basket",
{"ticker": ticker, "company_name": company_name},
timeout=MCP_TIMEOUT
)
result.data = data
if not isinstance(data, dict):
result.errors.append("Response is not a dict")
return result
if "error" in data:
result.errors.append(f"MCP error: {data['error']}")
return result
# Schema validation
if "items" not in data:
result.errors.append("Missing 'items' key")
else:
items = data.get("items", [])
result.item_count = len(items)
if result.item_count == 0:
result.warnings.append("No sentiment items returned")
else:
# Validate item schema
for item in items[:3]:
if "title" not in item:
result.warnings.append("Item missing 'title'")
break
if "url" not in item:
result.warnings.append("Item missing 'url'")
break
result.status = "PASS" if not result.errors else "FAIL"
except Exception as e:
result.errors.append(str(e))
result.duration_ms = int((datetime.now() - start).total_seconds() * 1000)
return result
async def run_all_tests(ticker: str, company_name: str) -> List[MCPTestResult]:
"""Run all MCP tests."""
print(f"\nRunning E2E tests for {company_name} ({ticker})...")
print("-" * 50)
# Run tests - some in parallel, some sequential to avoid import conflicts
results = []
# Quantitative tests
print("Testing fundamentals-basket...", end=" ", flush=True)
r = await test_fundamentals(ticker)
print(f"{r.status} ({r.duration_ms}ms)")
results.append(r)
print("Testing valuation-basket...", end=" ", flush=True)
r = await test_valuation(ticker)
print(f"{r.status} ({r.duration_ms}ms)")
results.append(r)
print("Testing volatility-basket...", end=" ", flush=True)
r = await test_volatility(ticker)
print(f"{r.status} ({r.duration_ms}ms)")
results.append(r)
print("Testing macro-basket...", end=" ", flush=True)
r = await test_macro()
print(f"{r.status} ({r.duration_ms}ms)")
results.append(r)
# Qualitative tests
print("Testing news-basket...", end=" ", flush=True)
r = await test_news(ticker, company_name)
print(f"{r.status} ({r.duration_ms}ms)")
results.append(r)
print("Testing sentiment-basket...", end=" ", flush=True)
r = await test_sentiment(ticker, company_name)
print(f"{r.status} ({r.duration_ms}ms)")
results.append(r)
return results
def format_value(val: Any) -> str:
"""Format a value for display - raw output for now."""
if val is None:
return "-"
# Just return string representation of value
return str(val)
def extract_quantitative_rows(results: List[MCPTestResult], ticker: str) -> List[Dict]:
"""Extract quantitative data rows from results."""
rows = []
# Fundamentals - uses sources.sec_edgar/sources.yahoo_finance with nested 'data' key
fund_result = next((r for r in results if r.name == "fundamentals"), None)
if fund_result and fund_result.data:
sources = fund_result.data.get("sources", {})
# SEC EDGAR data - metrics are inside sources.sec_edgar.data
sec_wrapper = sources.get("sec_edgar", {})
sec_data = sec_wrapper.get("data", {}) if isinstance(sec_wrapper, dict) else {}
for metric_name, metric_val in sec_data.items():
if isinstance(metric_val, dict):
rows.append({
"metric": metric_name,
"value": format_value(metric_val.get("value")),
"data_type": metric_val.get("data_type", "FY"),
"as_of": metric_val.get("end_date", "-"),
"filed": metric_val.get("filed", "-"),
"source": "SEC EDGAR",
"category": "Fundamentals",
})
# Yahoo Finance data - metrics are inside sources.yahoo_finance.data
yahoo_wrapper = sources.get("yahoo_finance", {})
yahoo_as_of = yahoo_wrapper.get("as_of", "-") if isinstance(yahoo_wrapper, dict) else "-"
yahoo_data = yahoo_wrapper.get("data", {}) if isinstance(yahoo_wrapper, dict) else {}
for metric_name, metric_val in yahoo_data.items():
if isinstance(metric_val, dict):
rows.append({
"metric": metric_name,
"value": format_value(metric_val.get("value")),
"data_type": metric_val.get("period", metric_val.get("data_type", "TTM")),
"as_of": metric_val.get("end_date", metric_val.get("as_of", yahoo_as_of)),
"filed": metric_val.get("filed", "-"),
"source": "Yahoo Finance",
"category": "Fundamentals",
})
# Valuation
val_result = next((r for r in results if r.name == "valuation"), None)
if val_result and val_result.data:
sources = val_result.data.get("sources", {})
as_of = val_result.data.get("as_of", "-")
for source_name, source_data in sources.items():
if isinstance(source_data, dict) and "data" in source_data:
for metric_name, metric_val in source_data["data"].items():
# Handle both dict and scalar values
if isinstance(metric_val, dict):
value = format_value(metric_val.get("value"))
data_type = metric_val.get("data_type", "-")
metric_as_of = metric_val.get("as_of", as_of)
else:
value = format_value(metric_val)
data_type = "-"
metric_as_of = as_of
rows.append({
"metric": metric_name,
"value": value,
"data_type": data_type,
"as_of": metric_as_of,
"source": source_name,
"category": "Valuation",
})
# Volatility
vol_result = next((r for r in results if r.name == "volatility"), None)
if vol_result and vol_result.data:
metrics = vol_result.data.get("metrics", {})
for metric_name, metric_val in metrics.items():
if isinstance(metric_val, dict):
rows.append({
"metric": metric_name,
"value": format_value(metric_val.get("value")),
"data_type": metric_val.get("data_type", "-"),
"as_of": metric_val.get("as_of", "-"),
"filed": "-",
"source": metric_val.get("source", "-"),
"category": "Volatility",
})
# Macro
macro_result = next((r for r in results if r.name == "macro"), None)
if macro_result and macro_result.data:
metrics = macro_result.data.get("metrics", {})
for metric_name, metric_val in metrics.items():
if isinstance(metric_val, dict):
rows.append({
"metric": metric_name,
"value": format_value(metric_val.get("value")),
"data_type": metric_val.get("data_type", "-"),
"as_of": metric_val.get("as_of", "-"),
"filed": "-",
"source": metric_val.get("source", "-"),
"category": "Macro",
})
return rows
def extract_date(item: Dict) -> str:
"""Extract date (YYYY-MM-DD) from item, checking both date and datetime fields."""
# Try 'date' first, then 'datetime'
val = item.get("date") or item.get("datetime") or "-"
if val == "-":
return val
# Extract just the date portion (first 10 chars: YYYY-MM-DD)
val_str = str(val)
if len(val_str) >= 10:
return val_str[:10]
return val_str
def extract_qualitative_rows(results: List[MCPTestResult]) -> List[Dict]:
"""Extract qualitative data rows from results."""
rows = []
# News
news_result = next((r for r in results if r.name == "news"), None)
if news_result and news_result.data:
items = news_result.data.get("items", [])
for item in items[:10]: # Limit to 10
rows.append({
"title": item.get("title", "-")[:80],
"date": extract_date(item),
"source": item.get("source", "-"),
"subreddit": "-",
"url": item.get("url", "-"),
"category": "News",
})
# Sentiment
sent_result = next((r for r in results if r.name == "sentiment"), None)
if sent_result and sent_result.data:
items = sent_result.data.get("items", [])
for item in items[:10]: # Limit to 10
subreddit = item.get("subreddit") or "-"
rows.append({
"title": item.get("title", "-")[:80],
"date": extract_date(item),
"source": item.get("source", "-"),
"subreddit": subreddit if subreddit != "None" else "-",
"url": item.get("url", "-"),
"category": "Sentiment",
})
return rows
def generate_report(results: List[MCPTestResult], ticker: str, company_name: str) -> str:
"""Generate markdown report."""
# Expected item counts per MCP (quantitative only - dict metrics with values)
expected_counts = {
"fundamentals": 9, # SEC EDGAR (5 universal) + Yahoo Finance (4 supplementary)
"valuation": 11, # Yahoo Finance only (11 universal, excludes ev_ebitda)
"volatility": 5, # VIX, VXN, beta, historical_vol, implied_vol
"macro": 4, # GDP, interest_rate, CPI, unemployment
}
lines = [
f"# MCP E2E Test Report: {company_name} ({ticker})",
"",
"## Summary",
"",
"| S/N | MCP | Status | Expected | Actual | Duration | Errors | Warnings |",
"|-----|-----|--------|----------|--------|----------|--------|----------|",
]
for i, r in enumerate(results, 1):
expected = expected_counts.get(r.name, "-")
errors = "; ".join(r.errors) if r.errors else "-"
warnings = "; ".join(r.warnings) if r.warnings else "-"
lines.append(f"| {i} | {r.name} | {r.status} | {expected} | {r.item_count} | {r.duration_ms}ms | {errors} | {warnings} |")
# Company Info (from fundamentals)
fund_result = next((r for r in results if r.name == "fundamentals"), None)
if fund_result and fund_result.data:
company = fund_result.data.get("company", {})
if company:
lines.extend([
"",
"---",
"",
"## Company Info",
"",
f"| Field | Value |",
f"|-------|-------|",
f"| Name | {company.get('name', '-')} |",
f"| CIK | {company.get('cik', '-')} |",
f"| SIC | {company.get('sic', '-')} ({company.get('sic_description', '-')}) |",
f"| State | {company.get('state_of_incorporation', '-')} |",
f"| Fiscal Year End | {company.get('fiscal_year_end', '-')} |",
])
# Business address
addr = company.get("business_address", {})
if addr:
street = addr.get("street1", "")
if addr.get("street2"):
street += f", {addr.get('street2')}"
city_state_zip = f"{addr.get('city', '')}, {addr.get('stateOrCountry', '')} {addr.get('zipCode', '')}"
lines.append(f"| Address | {street} |")
lines.append(f"| | {city_state_zip} |")
# Quantitative Data
lines.extend([
"",
"---",
"",
"## Quantitative Data",
"",
"| S/N | Metric | Value | Data Type | As Of | Source | Category |",
"|-----|--------|-------|-----------|-------|--------|----------|",
])
quant_rows = extract_quantitative_rows(results, ticker)
for i, row in enumerate(quant_rows, 1):
lines.append(f"| {i} | {row['metric']} | {row['value']} | {row['data_type']} | {row['as_of']} | {row['source']} | {row['category']} |")
if not quant_rows:
lines.append("| - | - | - | - | - | - | - |")
# Qualitative Data
lines.extend([
"",
"---",
"",
"## Qualitative Data",
"",
"| S/N | Title | Date | Source | Subreddit | URL | Category |",
"|-----|-------|------|--------|-----------|-----|----------|",
])
qual_rows = extract_qualitative_rows(results)
for i, row in enumerate(qual_rows, 1):
url_link = f"[Link]({row['url']})" if row['url'] != "-" else "-"
lines.append(f"| {i} | {row['title']} | {row['date']} | {row['source']} | {row['subreddit']} | {url_link} | {row['category']} |")
if not qual_rows:
lines.append("| - | - | - | - | - | - | - |")
lines.append("")
return "\n".join(lines)
def main():
"""Main entry point."""
# Parse args
ticker = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_TICKER
company_name = sys.argv[2] if len(sys.argv) > 2 else DEFAULT_COMPANY
# Run tests
results = asyncio.run(run_all_tests(ticker, company_name))
# Generate report
report = generate_report(results, ticker, company_name)
# Write report
output_path = PROJECT_ROOT / "docs" / f"mcp_test_report_{ticker}.md"
output_path.write_text(report)
print("-" * 50)
print(f"Report generated: {output_path}")
# Summary
passed = sum(1 for r in results if r.status == "PASS")
total = len(results)
print(f"\nResult: {passed}/{total} MCPs passed")
return 0 if passed == total else 1
if __name__ == "__main__":
sys.exit(main())