""" Stage 3: Response Compiler - Data Fusion """ from typing import Dict, Any, List class ResponseCompiler: """Stage 3: Compile results from multiple servers""" def compile(self, raw_results: Dict[str, Any]) -> Dict[str, Any]: """ Merge results into structured format Args: raw_results: Dictionary containing results from MCPExecutor { "results": { "weather": {"status": "success", "data": {...}}, "soil_properties": {"status": "success", "data": {...}}, ... }, "execution_time_seconds": 3.5 } Returns: { "successful_servers": List[str], "failed_servers": List[dict], "data": Dict[str, Any], "execution_time": float, "completeness": str } """ results_dict = raw_results.get("results", {}) successful = [] failed = [] compiled_data = {} for server_name, result in results_dict.items(): if result.get("status") == "success": successful.append(server_name) compiled_data[server_name] = result.get("data", {}) else: failed.append({ "server": server_name, "error": result.get("error", "Unknown error") }) return { "successful_servers": successful, "failed_servers": failed, "data": compiled_data, "execution_time": raw_results.get("execution_time_seconds", 0), "completeness": f"{len(successful)}/{len(results_dict)} servers" }