akashub
feat(v1): Added pipeline.py, compiler.py
b522293
raw
history blame
1.82 kB
"""
Stage 3: Response Compiler - Data Fusion
"""
from typing import Dict, Any, List
class ResponseCompiler:
"""Stage 3: Compile results from multiple servers"""
def compile(self, raw_results: Dict[str, Any]) -> Dict[str, Any]:
"""
Merge results into structured format
Args:
raw_results: Dictionary containing results from MCPExecutor
{
"results": {
"weather": {"status": "success", "data": {...}},
"soil_properties": {"status": "success", "data": {...}},
...
},
"execution_time_seconds": 3.5
}
Returns:
{
"successful_servers": List[str],
"failed_servers": List[dict],
"data": Dict[str, Any],
"execution_time": float,
"completeness": str
}
"""
results_dict = raw_results.get("results", {})
successful = []
failed = []
compiled_data = {}
for server_name, result in results_dict.items():
if result.get("status") == "success":
successful.append(server_name)
compiled_data[server_name] = result.get("data", {})
else:
failed.append({
"server": server_name,
"error": result.get("error", "Unknown error")
})
return {
"successful_servers": successful,
"failed_servers": failed,
"data": compiled_data,
"execution_time": raw_results.get("execution_time_seconds", 0),
"completeness": f"{len(successful)}/{len(results_dict)} servers"
}