| """ | |
| Stage 3: Response Compiler - Data Fusion | |
| """ | |
| from typing import Dict, Any, List | |
| class ResponseCompiler: | |
| """Stage 3: Compile results from multiple servers""" | |
| def compile(self, raw_results: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Merge results into structured format | |
| Args: | |
| raw_results: Dictionary containing results from MCPExecutor | |
| { | |
| "results": { | |
| "weather": {"status": "success", "data": {...}}, | |
| "soil_properties": {"status": "success", "data": {...}}, | |
| ... | |
| }, | |
| "execution_time_seconds": 3.5 | |
| } | |
| Returns: | |
| { | |
| "successful_servers": List[str], | |
| "failed_servers": List[dict], | |
| "data": Dict[str, Any], | |
| "execution_time": float, | |
| "completeness": str | |
| } | |
| """ | |
| results_dict = raw_results.get("results", {}) | |
| successful = [] | |
| failed = [] | |
| compiled_data = {} | |
| for server_name, result in results_dict.items(): | |
| if result.get("status") == "success": | |
| successful.append(server_name) | |
| compiled_data[server_name] = result.get("data", {}) | |
| else: | |
| failed.append({ | |
| "server": server_name, | |
| "error": result.get("error", "Unknown error") | |
| }) | |
| return { | |
| "successful_servers": successful, | |
| "failed_servers": failed, | |
| "data": compiled_data, | |
| "execution_time": raw_results.get("execution_time_seconds", 0), | |
| "completeness": f"{len(successful)}/{len(results_dict)} servers" | |
| } |