Spaces:
Sleeping
Sleeping
| """ | |
| E2E test for all 6 MCP servers using subprocess+MCP protocol (same as production). | |
| Usage: python tests/test_mcp_e2e.py [TICKER] [COMPANY_NAME] | |
| Default: KO "The Coca-Cola Company" | |
| """ | |
| import asyncio | |
| import sys | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| # Project root | |
| PROJECT_ROOT = Path(__file__).parent.parent | |
| sys.path.insert(0, str(PROJECT_ROOT)) | |
| # Load environment variables from project .env | |
| from dotenv import load_dotenv | |
| load_dotenv(PROJECT_ROOT / ".env") | |
| # Import MCP client (subprocess+MCP protocol) | |
| from mcp_client import call_mcp_server | |
| # Default test company | |
| DEFAULT_TICKER = "KO" | |
| DEFAULT_COMPANY = "The Coca-Cola Company" | |
| # MCP server timeout (seconds) | |
| MCP_TIMEOUT = 90.0 | |
| class MCPTestResult: | |
| """Result from testing a single MCP.""" | |
| def __init__(self, name: str): | |
| self.name = name | |
| self.status = "FAIL" | |
| self.data: Optional[Dict] = None | |
| self.errors: List[str] = [] | |
| self.warnings: List[str] = [] | |
| self.item_count = 0 | |
| self.duration_ms = 0 | |
| async def test_fundamentals(ticker: str) -> MCPTestResult: | |
| """Test fundamentals-basket MCP via subprocess+MCP protocol.""" | |
| result = MCPTestResult("fundamentals") | |
| start = datetime.now() | |
| try: | |
| data = await call_mcp_server( | |
| "fundamentals-basket", | |
| "get_all_sources_fundamentals", | |
| {"ticker": ticker}, | |
| timeout=MCP_TIMEOUT | |
| ) | |
| result.data = data | |
| if not isinstance(data, dict): | |
| result.errors.append("Response is not a dict") | |
| return result | |
| if "error" in data: | |
| result.errors.append(f"MCP error: {data['error']}") | |
| return result | |
| # Schema validation - fundamentals uses sources.sec_edgar/sources.yahoo_finance | |
| sources = data.get("sources", {}) | |
| sec_data = sources.get("sec_edgar", {}) | |
| yahoo_data = sources.get("yahoo_finance", {}) | |
| if not sec_data and not yahoo_data: | |
| result.errors.append("No SEC or Yahoo data") | |
| else: | |
| # Count only dict metrics with values (matches extraction logic) | |
| sec_metrics = sec_data.get("data", {}) if isinstance(sec_data, dict) else {} | |
| yahoo_metrics = yahoo_data.get("data", {}) if isinstance(yahoo_data, dict) else {} | |
| sec_count = sum(1 for v in sec_metrics.values() if isinstance(v, dict)) | |
| yahoo_count = sum(1 for v in yahoo_metrics.values() if isinstance(v, dict)) | |
| result.item_count = sec_count + yahoo_count | |
| if result.item_count == 0: | |
| result.warnings.append("No data items returned") | |
| result.status = "PASS" if not result.errors else "FAIL" | |
| except Exception as e: | |
| result.errors.append(str(e)) | |
| result.duration_ms = int((datetime.now() - start).total_seconds() * 1000) | |
| return result | |
| async def test_valuation(ticker: str) -> MCPTestResult: | |
| """Test valuation-basket MCP via subprocess+MCP protocol.""" | |
| result = MCPTestResult("valuation") | |
| start = datetime.now() | |
| try: | |
| data = await call_mcp_server( | |
| "valuation-basket", | |
| "get_all_sources_valuation", | |
| {"ticker": ticker}, | |
| timeout=MCP_TIMEOUT | |
| ) | |
| result.data = data | |
| if not isinstance(data, dict): | |
| result.errors.append("Response is not a dict") | |
| return result | |
| if "error" in data: | |
| result.errors.append(f"MCP error: {data['error']}") | |
| return result | |
| # Schema validation | |
| if "sources" not in data: | |
| result.errors.append("Missing 'sources' key") | |
| else: | |
| sources = data.get("sources", {}) | |
| result.item_count = sum( | |
| len(v.get("data", {})) if isinstance(v, dict) else 0 | |
| for v in sources.values() | |
| ) | |
| if result.item_count == 0: | |
| result.warnings.append("No data items returned") | |
| result.status = "PASS" if not result.errors else "FAIL" | |
| except Exception as e: | |
| result.errors.append(str(e)) | |
| result.duration_ms = int((datetime.now() - start).total_seconds() * 1000) | |
| return result | |
| async def test_volatility(ticker: str) -> MCPTestResult: | |
| """Test volatility-basket MCP via subprocess+MCP protocol.""" | |
| result = MCPTestResult("volatility") | |
| start = datetime.now() | |
| try: | |
| data = await call_mcp_server( | |
| "volatility-basket", | |
| "get_all_sources_volatility", | |
| {"ticker": ticker}, | |
| timeout=MCP_TIMEOUT | |
| ) | |
| result.data = data | |
| if not isinstance(data, dict): | |
| result.errors.append("Response is not a dict") | |
| return result | |
| if "error" in data: | |
| result.errors.append(f"MCP error: {data['error']}") | |
| return result | |
| # Schema validation | |
| if "metrics" not in data: | |
| result.errors.append("Missing 'metrics' key") | |
| else: | |
| result.item_count = len(data.get("metrics", {})) | |
| if result.item_count == 0: | |
| result.warnings.append("No metrics returned") | |
| result.status = "PASS" if not result.errors else "FAIL" | |
| except Exception as e: | |
| result.errors.append(str(e)) | |
| result.duration_ms = int((datetime.now() - start).total_seconds() * 1000) | |
| return result | |
| async def test_macro() -> MCPTestResult: | |
| """Test macro-basket MCP via subprocess+MCP protocol.""" | |
| result = MCPTestResult("macro") | |
| start = datetime.now() | |
| try: | |
| data = await call_mcp_server( | |
| "macro-basket", | |
| "get_all_sources_macro", | |
| {}, | |
| timeout=MCP_TIMEOUT | |
| ) | |
| result.data = data | |
| if not isinstance(data, dict): | |
| result.errors.append("Response is not a dict") | |
| return result | |
| if "error" in data: | |
| result.errors.append(f"MCP error: {data['error']}") | |
| return result | |
| # Schema validation | |
| if "metrics" not in data: | |
| result.errors.append("Missing 'metrics' key") | |
| else: | |
| result.item_count = len(data.get("metrics", {})) | |
| if result.item_count == 0: | |
| result.warnings.append("No metrics returned") | |
| result.status = "PASS" if not result.errors else "FAIL" | |
| except Exception as e: | |
| result.errors.append(str(e)) | |
| result.duration_ms = int((datetime.now() - start).total_seconds() * 1000) | |
| return result | |
| async def test_news(ticker: str, company_name: str) -> MCPTestResult: | |
| """Test news-basket MCP via subprocess+MCP protocol.""" | |
| result = MCPTestResult("news") | |
| start = datetime.now() | |
| try: | |
| data = await call_mcp_server( | |
| "news-basket", | |
| "get_all_sources_news", | |
| {"ticker": ticker, "company_name": company_name}, | |
| timeout=MCP_TIMEOUT | |
| ) | |
| result.data = data | |
| if not isinstance(data, dict): | |
| result.errors.append("Response is not a dict") | |
| return result | |
| if "error" in data: | |
| result.errors.append(f"MCP error: {data['error']}") | |
| return result | |
| # Schema validation | |
| if "items" not in data: | |
| result.errors.append("Missing 'items' key") | |
| else: | |
| items = data.get("items", []) | |
| result.item_count = len(items) | |
| if result.item_count == 0: | |
| result.warnings.append("No news items returned") | |
| else: | |
| # Validate item schema | |
| for item in items[:3]: | |
| if "title" not in item: | |
| result.warnings.append("Item missing 'title'") | |
| break | |
| if "url" not in item: | |
| result.warnings.append("Item missing 'url'") | |
| break | |
| result.status = "PASS" if not result.errors else "FAIL" | |
| except Exception as e: | |
| result.errors.append(str(e)) | |
| result.duration_ms = int((datetime.now() - start).total_seconds() * 1000) | |
| return result | |
| async def test_sentiment(ticker: str, company_name: str) -> MCPTestResult: | |
| """Test sentiment-basket MCP via subprocess+MCP protocol.""" | |
| result = MCPTestResult("sentiment") | |
| start = datetime.now() | |
| try: | |
| data = await call_mcp_server( | |
| "sentiment-basket", | |
| "get_sentiment_basket", | |
| {"ticker": ticker, "company_name": company_name}, | |
| timeout=MCP_TIMEOUT | |
| ) | |
| result.data = data | |
| if not isinstance(data, dict): | |
| result.errors.append("Response is not a dict") | |
| return result | |
| if "error" in data: | |
| result.errors.append(f"MCP error: {data['error']}") | |
| return result | |
| # Schema validation | |
| if "items" not in data: | |
| result.errors.append("Missing 'items' key") | |
| else: | |
| items = data.get("items", []) | |
| result.item_count = len(items) | |
| if result.item_count == 0: | |
| result.warnings.append("No sentiment items returned") | |
| else: | |
| # Validate item schema | |
| for item in items[:3]: | |
| if "title" not in item: | |
| result.warnings.append("Item missing 'title'") | |
| break | |
| if "url" not in item: | |
| result.warnings.append("Item missing 'url'") | |
| break | |
| result.status = "PASS" if not result.errors else "FAIL" | |
| except Exception as e: | |
| result.errors.append(str(e)) | |
| result.duration_ms = int((datetime.now() - start).total_seconds() * 1000) | |
| return result | |
| async def run_all_tests(ticker: str, company_name: str) -> List[MCPTestResult]: | |
| """Run all MCP tests.""" | |
| print(f"\nRunning E2E tests for {company_name} ({ticker})...") | |
| print("-" * 50) | |
| # Run tests - some in parallel, some sequential to avoid import conflicts | |
| results = [] | |
| # Quantitative tests | |
| print("Testing fundamentals-basket...", end=" ", flush=True) | |
| r = await test_fundamentals(ticker) | |
| print(f"{r.status} ({r.duration_ms}ms)") | |
| results.append(r) | |
| print("Testing valuation-basket...", end=" ", flush=True) | |
| r = await test_valuation(ticker) | |
| print(f"{r.status} ({r.duration_ms}ms)") | |
| results.append(r) | |
| print("Testing volatility-basket...", end=" ", flush=True) | |
| r = await test_volatility(ticker) | |
| print(f"{r.status} ({r.duration_ms}ms)") | |
| results.append(r) | |
| print("Testing macro-basket...", end=" ", flush=True) | |
| r = await test_macro() | |
| print(f"{r.status} ({r.duration_ms}ms)") | |
| results.append(r) | |
| # Qualitative tests | |
| print("Testing news-basket...", end=" ", flush=True) | |
| r = await test_news(ticker, company_name) | |
| print(f"{r.status} ({r.duration_ms}ms)") | |
| results.append(r) | |
| print("Testing sentiment-basket...", end=" ", flush=True) | |
| r = await test_sentiment(ticker, company_name) | |
| print(f"{r.status} ({r.duration_ms}ms)") | |
| results.append(r) | |
| return results | |
| def format_value(val: Any) -> str: | |
| """Format a value for display - raw output for now.""" | |
| if val is None: | |
| return "-" | |
| # Just return string representation of value | |
| return str(val) | |
| def extract_quantitative_rows(results: List[MCPTestResult], ticker: str) -> List[Dict]: | |
| """Extract quantitative data rows from results.""" | |
| rows = [] | |
| # Fundamentals - uses sources.sec_edgar/sources.yahoo_finance with nested 'data' key | |
| fund_result = next((r for r in results if r.name == "fundamentals"), None) | |
| if fund_result and fund_result.data: | |
| sources = fund_result.data.get("sources", {}) | |
| # SEC EDGAR data - metrics are inside sources.sec_edgar.data | |
| sec_wrapper = sources.get("sec_edgar", {}) | |
| sec_data = sec_wrapper.get("data", {}) if isinstance(sec_wrapper, dict) else {} | |
| for metric_name, metric_val in sec_data.items(): | |
| if isinstance(metric_val, dict): | |
| rows.append({ | |
| "metric": metric_name, | |
| "value": format_value(metric_val.get("value")), | |
| "data_type": metric_val.get("data_type", "FY"), | |
| "as_of": metric_val.get("end_date", "-"), | |
| "filed": metric_val.get("filed", "-"), | |
| "source": "SEC EDGAR", | |
| "category": "Fundamentals", | |
| }) | |
| # Yahoo Finance data - metrics are inside sources.yahoo_finance.data | |
| yahoo_wrapper = sources.get("yahoo_finance", {}) | |
| yahoo_as_of = yahoo_wrapper.get("as_of", "-") if isinstance(yahoo_wrapper, dict) else "-" | |
| yahoo_data = yahoo_wrapper.get("data", {}) if isinstance(yahoo_wrapper, dict) else {} | |
| for metric_name, metric_val in yahoo_data.items(): | |
| if isinstance(metric_val, dict): | |
| rows.append({ | |
| "metric": metric_name, | |
| "value": format_value(metric_val.get("value")), | |
| "data_type": metric_val.get("period", metric_val.get("data_type", "TTM")), | |
| "as_of": metric_val.get("end_date", metric_val.get("as_of", yahoo_as_of)), | |
| "filed": metric_val.get("filed", "-"), | |
| "source": "Yahoo Finance", | |
| "category": "Fundamentals", | |
| }) | |
| # Valuation | |
| val_result = next((r for r in results if r.name == "valuation"), None) | |
| if val_result and val_result.data: | |
| sources = val_result.data.get("sources", {}) | |
| as_of = val_result.data.get("as_of", "-") | |
| for source_name, source_data in sources.items(): | |
| if isinstance(source_data, dict) and "data" in source_data: | |
| for metric_name, metric_val in source_data["data"].items(): | |
| # Handle both dict and scalar values | |
| if isinstance(metric_val, dict): | |
| value = format_value(metric_val.get("value")) | |
| data_type = metric_val.get("data_type", "-") | |
| metric_as_of = metric_val.get("as_of", as_of) | |
| else: | |
| value = format_value(metric_val) | |
| data_type = "-" | |
| metric_as_of = as_of | |
| rows.append({ | |
| "metric": metric_name, | |
| "value": value, | |
| "data_type": data_type, | |
| "as_of": metric_as_of, | |
| "source": source_name, | |
| "category": "Valuation", | |
| }) | |
| # Volatility | |
| vol_result = next((r for r in results if r.name == "volatility"), None) | |
| if vol_result and vol_result.data: | |
| metrics = vol_result.data.get("metrics", {}) | |
| for metric_name, metric_val in metrics.items(): | |
| if isinstance(metric_val, dict): | |
| rows.append({ | |
| "metric": metric_name, | |
| "value": format_value(metric_val.get("value")), | |
| "data_type": metric_val.get("data_type", "-"), | |
| "as_of": metric_val.get("as_of", "-"), | |
| "filed": "-", | |
| "source": metric_val.get("source", "-"), | |
| "category": "Volatility", | |
| }) | |
| # Macro | |
| macro_result = next((r for r in results if r.name == "macro"), None) | |
| if macro_result and macro_result.data: | |
| metrics = macro_result.data.get("metrics", {}) | |
| for metric_name, metric_val in metrics.items(): | |
| if isinstance(metric_val, dict): | |
| rows.append({ | |
| "metric": metric_name, | |
| "value": format_value(metric_val.get("value")), | |
| "data_type": metric_val.get("data_type", "-"), | |
| "as_of": metric_val.get("as_of", "-"), | |
| "filed": "-", | |
| "source": metric_val.get("source", "-"), | |
| "category": "Macro", | |
| }) | |
| return rows | |
| def extract_date(item: Dict) -> str: | |
| """Extract date (YYYY-MM-DD) from item, checking both date and datetime fields.""" | |
| # Try 'date' first, then 'datetime' | |
| val = item.get("date") or item.get("datetime") or "-" | |
| if val == "-": | |
| return val | |
| # Extract just the date portion (first 10 chars: YYYY-MM-DD) | |
| val_str = str(val) | |
| if len(val_str) >= 10: | |
| return val_str[:10] | |
| return val_str | |
| def extract_qualitative_rows(results: List[MCPTestResult]) -> List[Dict]: | |
| """Extract qualitative data rows from results.""" | |
| rows = [] | |
| # News | |
| news_result = next((r for r in results if r.name == "news"), None) | |
| if news_result and news_result.data: | |
| items = news_result.data.get("items", []) | |
| for item in items[:10]: # Limit to 10 | |
| rows.append({ | |
| "title": item.get("title", "-")[:80], | |
| "date": extract_date(item), | |
| "source": item.get("source", "-"), | |
| "subreddit": "-", | |
| "url": item.get("url", "-"), | |
| "category": "News", | |
| }) | |
| # Sentiment | |
| sent_result = next((r for r in results if r.name == "sentiment"), None) | |
| if sent_result and sent_result.data: | |
| items = sent_result.data.get("items", []) | |
| for item in items[:10]: # Limit to 10 | |
| subreddit = item.get("subreddit") or "-" | |
| rows.append({ | |
| "title": item.get("title", "-")[:80], | |
| "date": extract_date(item), | |
| "source": item.get("source", "-"), | |
| "subreddit": subreddit if subreddit != "None" else "-", | |
| "url": item.get("url", "-"), | |
| "category": "Sentiment", | |
| }) | |
| return rows | |
| def generate_report(results: List[MCPTestResult], ticker: str, company_name: str) -> str: | |
| """Generate markdown report.""" | |
| # Expected item counts per MCP (quantitative only - dict metrics with values) | |
| expected_counts = { | |
| "fundamentals": 9, # SEC EDGAR (5 universal) + Yahoo Finance (4 supplementary) | |
| "valuation": 11, # Yahoo Finance only (11 universal, excludes ev_ebitda) | |
| "volatility": 5, # VIX, VXN, beta, historical_vol, implied_vol | |
| "macro": 4, # GDP, interest_rate, CPI, unemployment | |
| } | |
| lines = [ | |
| f"# MCP E2E Test Report: {company_name} ({ticker})", | |
| "", | |
| "## Summary", | |
| "", | |
| "| S/N | MCP | Status | Expected | Actual | Duration | Errors | Warnings |", | |
| "|-----|-----|--------|----------|--------|----------|--------|----------|", | |
| ] | |
| for i, r in enumerate(results, 1): | |
| expected = expected_counts.get(r.name, "-") | |
| errors = "; ".join(r.errors) if r.errors else "-" | |
| warnings = "; ".join(r.warnings) if r.warnings else "-" | |
| lines.append(f"| {i} | {r.name} | {r.status} | {expected} | {r.item_count} | {r.duration_ms}ms | {errors} | {warnings} |") | |
| # Company Info (from fundamentals) | |
| fund_result = next((r for r in results if r.name == "fundamentals"), None) | |
| if fund_result and fund_result.data: | |
| company = fund_result.data.get("company", {}) | |
| if company: | |
| lines.extend([ | |
| "", | |
| "---", | |
| "", | |
| "## Company Info", | |
| "", | |
| f"| Field | Value |", | |
| f"|-------|-------|", | |
| f"| Name | {company.get('name', '-')} |", | |
| f"| CIK | {company.get('cik', '-')} |", | |
| f"| SIC | {company.get('sic', '-')} ({company.get('sic_description', '-')}) |", | |
| f"| State | {company.get('state_of_incorporation', '-')} |", | |
| f"| Fiscal Year End | {company.get('fiscal_year_end', '-')} |", | |
| ]) | |
| # Business address | |
| addr = company.get("business_address", {}) | |
| if addr: | |
| street = addr.get("street1", "") | |
| if addr.get("street2"): | |
| street += f", {addr.get('street2')}" | |
| city_state_zip = f"{addr.get('city', '')}, {addr.get('stateOrCountry', '')} {addr.get('zipCode', '')}" | |
| lines.append(f"| Address | {street} |") | |
| lines.append(f"| | {city_state_zip} |") | |
| # Quantitative Data | |
| lines.extend([ | |
| "", | |
| "---", | |
| "", | |
| "## Quantitative Data", | |
| "", | |
| "| S/N | Metric | Value | Data Type | As Of | Source | Category |", | |
| "|-----|--------|-------|-----------|-------|--------|----------|", | |
| ]) | |
| quant_rows = extract_quantitative_rows(results, ticker) | |
| for i, row in enumerate(quant_rows, 1): | |
| lines.append(f"| {i} | {row['metric']} | {row['value']} | {row['data_type']} | {row['as_of']} | {row['source']} | {row['category']} |") | |
| if not quant_rows: | |
| lines.append("| - | - | - | - | - | - | - |") | |
| # Qualitative Data | |
| lines.extend([ | |
| "", | |
| "---", | |
| "", | |
| "## Qualitative Data", | |
| "", | |
| "| S/N | Title | Date | Source | Subreddit | URL | Category |", | |
| "|-----|-------|------|--------|-----------|-----|----------|", | |
| ]) | |
| qual_rows = extract_qualitative_rows(results) | |
| for i, row in enumerate(qual_rows, 1): | |
| url_link = f"[Link]({row['url']})" if row['url'] != "-" else "-" | |
| lines.append(f"| {i} | {row['title']} | {row['date']} | {row['source']} | {row['subreddit']} | {url_link} | {row['category']} |") | |
| if not qual_rows: | |
| lines.append("| - | - | - | - | - | - | - |") | |
| lines.append("") | |
| return "\n".join(lines) | |
| def main(): | |
| """Main entry point.""" | |
| # Parse args | |
| ticker = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_TICKER | |
| company_name = sys.argv[2] if len(sys.argv) > 2 else DEFAULT_COMPANY | |
| # Run tests | |
| results = asyncio.run(run_all_tests(ticker, company_name)) | |
| # Generate report | |
| report = generate_report(results, ticker, company_name) | |
| # Write report | |
| output_path = PROJECT_ROOT / "docs" / f"mcp_test_report_{ticker}.md" | |
| output_path.write_text(report) | |
| print("-" * 50) | |
| print(f"Report generated: {output_path}") | |
| # Summary | |
| passed = sum(1 for r in results if r.status == "PASS") | |
| total = len(results) | |
| print(f"\nResult: {passed}/{total} MCPs passed") | |
| return 0 if passed == total else 1 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |