masterllm / services /pipeline_manager.py
ganesh-vilje's picture
Refactor: Use MongoDB _id for messages, pipeline_id_index for components, standardize outputs to result:{text:{}} format
d6a6b32
# services/pipeline_manager.py
"""
Pipeline Manager for MasterLLM V3 Architecture
Manages pipeline execution records:
- Create pipeline records in MongoDB
- Track component execution status
- Store component outputs to S3
- Generate presigned URLs for outputs
- Manage final output compilation
"""
import json
import uuid
from typing import Dict, Any, List, Optional
from datetime import datetime
from pymongo import MongoClient
from services.s3_manager import get_s3_manager
from services.schemas import (
PipelineSchema, ComponentMetadata, PipelineStatus, ComponentStatus,
FinalOutputS3, ComponentOutputS3, schema_to_dict,
generate_component_id, generate_output_id
)
import os
class PipelineManager:
"""Manages pipeline execution records and outputs"""
def __init__(self):
"""Initialize Pipeline Manager"""
# MongoDB connection
mongodb_uri = os.getenv("MONGODB_URI")
mongodb_db = os.getenv("MONGODB_DB", "masterllm")
if not mongodb_uri:
raise RuntimeError("MONGODB_URI environment variable not set")
self.client = MongoClient(mongodb_uri)
self.db = self.client[mongodb_db]
self.pipelines_collection = self.db["pipelines"]
# S3 manager
self.s3 = get_s3_manager()
def create_pipeline_metadata(
self,
pipeline_id: str,
session_id: str,
pipeline_name: str,
s3_key: str,
status: str = "proposed",
created_by_message: str = ""
) -> bool:
"""Create metadata record for pipeline (MongoDB only). Full document in S3."""
now = datetime.utcnow()
metadata = {
"pipeline_id": pipeline_id,
"session_id": session_id,
"pipeline_name": pipeline_name,
"status": status,
"s3_key": s3_key,
"final_output_url": None,
"created_at": now.isoformat() + "Z",
"updated_at": now.isoformat() + "Z",
"created_by_message": created_by_message
}
self.pipelines_collection.insert_one(metadata)
return True
def update_pipeline_status(
self,
pipeline_id: str,
status: str,
final_output_url: str = None,
final_output_expires_at: str = None
) -> bool:
"""Update pipeline status in metadata."""
update_data = {
"status": status,
"updated_at": datetime.utcnow().isoformat() + "Z"
}
if final_output_url:
update_data["final_output_url"] = final_output_url
if final_output_expires_at:
update_data["final_output_expires_at"] = final_output_expires_at
result = self.pipelines_collection.update_one(
{"pipeline_id": pipeline_id},
{"$set": update_data}
)
return result.modified_count > 0
def get_pipeline_metadata(self, pipeline_id: str) -> Optional[Dict[str, Any]]:
"""Get pipeline metadata by ID."""
return self.pipelines_collection.find_one(
{"pipeline_id": pipeline_id},
{"_id": 0}
)
def get_full_pipeline_document(self, pipeline_id: str) -> Optional[Dict[str, Any]]:
"""Get full pipeline document from S3 via metadata lookup."""
metadata = self.get_pipeline_metadata(pipeline_id)
if not metadata:
return None
s3_key = metadata.get("s3_key")
if not s3_key:
return None
try:
return self.s3.download_json(s3_key, add_prefix=False)
except Exception as e:
print(f"⚠️ Failed to download pipeline document: {e}")
return None
def create_pipeline_record(
self,
session_id: str,
pipeline_definition: Dict[str, Any],
created_from: str = "request",
created_by_message: str = ""
) -> str:
"""
Create a new pipeline execution record
Args:
session_id: Session ID
pipeline_definition: Full pipeline definition dict
created_from: "request" or "edit"
created_by_message: User's message that triggered pipeline
Returns:
execution_id: Unique execution ID
"""
execution_id = str(uuid.uuid4())
now = datetime.utcnow()
# Extract components from pipeline
pipeline_steps = pipeline_definition.get("components", []) or pipeline_definition.get("pipeline_steps", [])
components = []
# Generate component IDs in format: execution_id_index
for i, step in enumerate(pipeline_steps):
component = ComponentMetadata(
component_id=generate_component_id(execution_id, i),
component_name=step.get("tool_name", "unknown"),
order=i + 1,
status=ComponentStatus.PENDING
)
components.append(component)
# Store pipeline definition in S3
s3_key = f"sessions/{session_id}/pipelines/{execution_id}_definition.json"
self.s3.upload_json(s3_key, pipeline_definition, add_prefix=False)
# Create pipeline record
output_id = generate_output_id()
pipeline_record = PipelineSchema(
execution_id=execution_id,
session_id=session_id,
pipeline_name=pipeline_definition.get("pipeline_name", "Untitled Pipeline"),
created_at=now,
created_from=created_from,
created_by_message=created_by_message,
status=PipelineStatus.PROPOSED,
pipeline_definition_s3_key=s3_key,
components=components,
output_id=output_id
)
# Insert into MongoDB
self.pipelines_collection.insert_one(schema_to_dict(pipeline_record))
return execution_id
def update_component_status(
self,
execution_id: str,
component_name: str,
status: str,
output: Optional[Dict[str, Any]] = None,
error: Optional[str] = None,
success_message: Optional[str] = None
) -> bool:
"""
Update component execution status
Args:
execution_id: Pipeline execution ID
component_name: Name of component
status: "executing", "completed", or "failed"
output: Component output (will be wrapped in result:{text:{}} format)
error: Error message if failed
success_message: Success message if completed
Returns:
True if updated successfully
"""
now = datetime.utcnow()
# Find the component in the pipeline
pipeline = self.pipelines_collection.find_one({"execution_id": execution_id})
if not pipeline:
return False
# Update component metadata
components = pipeline.get("components", [])
component_found = False
for comp in components:
if comp["component_name"] == component_name:
comp["status"] = status
if status == "executing":
comp["started_at"] = now.isoformat() + "Z"
elif status in ["completed", "failed"]:
comp["completed_at"] = now.isoformat() + "Z"
if success_message:
comp["success_message"] = success_message
if error:
comp["error"] = {"message": error} if isinstance(error, str) else error
comp["hasError"] = True
else:
comp["hasError"] = False
# Wrap component output in standardized format: result:{text:{}}
if output:
# Check if already wrapped
if "result" in output and "text" in output.get("result", {}):
comp["component_output"] = output
else:
# Wrap in standardized format
comp["component_output"] = {
"result": {
"text": output
}
}
component_found = True
break
if not component_found:
return False
# Update in MongoDB
self.pipelines_collection.update_one(
{"execution_id": execution_id},
{
"$set": {
"components": components,
"status": PipelineStatus.EXECUTING
}
}
)
# Store output temporarily (will be compiled into final_output later)
if output:
# Wrap output before storing
wrapped_output = comp["component_output"] # Already wrapped above
temp_key = f"sessions/{pipeline['session_id']}/outputs/{execution_id}/temp_{component_name}.json"
self.s3.upload_json(temp_key, wrapped_output, add_prefix=False)
return True
def mark_pipeline_completed(
self,
execution_id: str,
components_results: List[Dict[str, Any]],
executor: str = "unknown"
) -> Dict[str, Any]:
"""
Mark pipeline as completed and generate final output
Args:
execution_id: Pipeline execution ID
components_results: List of {component_name, result, status, message} dicts
executor: Executor used ("bedrock", "crewai", etc.)
Returns:
Dict with final_output_url (presigned) and metadata
"""
now = datetime.utcnow()
# Get pipeline record
pipeline = self.pipelines_collection.find_one({"execution_id": execution_id})
if not pipeline:
raise RuntimeError(f"Pipeline {execution_id} not found")
session_id = pipeline["session_id"]
# Determine workflow status (failed if ANY component failed)
workflow_status = "completed"
for result in components_results:
if result.get("status") == "failed":
workflow_status = "failed"
break
# Get last successful node output - extract from result:{text:{}} format
last_node_output = None
for result in reversed(components_results):
if result.get("status") == "completed" and result.get("result"):
# Extract text from standardized result:{text:{}} format
res = result.get("result", {})
if isinstance(res, dict):
# Check for result:{text:{}} structure
if "result" in res and "text" in res.get("result", {}):
text_content = res["result"]["text"]
if isinstance(text_content, dict):
last_node_output = text_content.get("text") or text_content.get("summary") or json.dumps(text_content)[:500]
else:
last_node_output = str(text_content)[:500]
else:
# Fallback for non-wrapped format
last_node_output = res.get("text") or res.get("summary") or json.dumps(res)[:500]
elif isinstance(res, str):
last_node_output = res
break
# Build final output for S3 with standardized format
component_outputs = []
for i, result in enumerate(components_results):
# Ensure result is wrapped in result:{text:{}} format
result_data = result.get("result", {})
if isinstance(result_data, dict) and "result" in result_data and "text" in result_data.get("result", {}):
wrapped_result = result_data
else:
# Wrap in standardized format
wrapped_result = {
"result": {
"text": result_data
}
}
comp_output = ComponentOutputS3(
component_id=result.get("component_id", f"{execution_id}_{i}"),
name=result.get("component_name", f"component_{i+1}"),
order=i + 1,
status=result.get("status", "unknown"),
component_output=wrapped_result,
hasError=result.get("status") == "failed",
error={"message": result.get("error")} if result.get("error") else None,
success_message=result.get("success_message"),
metadata=result.get("metadata"),
result=wrapped_result
)
component_outputs.append(comp_output.model_dump())
final_output = FinalOutputS3(
workflow_name=pipeline.get("pipeline_name", "Untitled"),
workflow_status=workflow_status,
last_node_output=last_node_output,
components=component_outputs,
execution_time=0.0, # TODO: Calculate from started_at to now
timestamp=now.isoformat() + "Z"
)
# Store final output in S3
final_output_key = f"sessions/{session_id}/outputs/{execution_id}/final_output.json"
self.s3.upload_json(final_output_key, final_output.model_dump(), add_prefix=False)
# Generate presigned URL (7 days max)
presigned = self.s3.generate_presigned_url(final_output_key, expires_in=604800, add_prefix=False)
# Extract result preview for MongoDB
result_preview = last_node_output[:500] if last_node_output else "No output"
# Update pipeline record
self.pipelines_collection.update_one(
{"execution_id": execution_id},
{
"$set": {
"status": PipelineStatus.COMPLETED if workflow_status == "completed" else PipelineStatus.FAILED,
"final_output_s3_key": final_output_key,
"final_output_presigned_url": presigned["presigned_url"],
"final_output_presigned_expires_at": presigned["presigned_expires_at"],
"completed_at": now.isoformat() + "Z",
"executor": executor,
"result": result_preview,
"hasError": workflow_status == "failed",
"error": {"message": "Pipeline execution failed"} if workflow_status == "failed" else None
}
}
)
return {
"final_output_url": presigned["presigned_url"],
"final_output_expires_at": presigned["presigned_expires_at"],
"final_output_s3_key": final_output_key,
"workflow_status": workflow_status,
"last_node_output": last_node_output,
"result": {"text": {"summary": last_node_output}} if last_node_output else None
}
def mark_pipeline_failed(
self,
execution_id: str,
error: str
) -> bool:
"""
Mark pipeline as failed
Args:
execution_id: Pipeline execution ID
error: Error message
Returns:
True if updated successfully
"""
now = datetime.utcnow()
self.pipelines_collection.update_one(
{"execution_id": execution_id},
{
"$set": {
"status": PipelineStatus.FAILED,
"error": {"message": error} if isinstance(error, str) else error,
"hasError": True,
"completed_at": now.isoformat() + "Z"
}
}
)
return True
def get_pipeline(self, execution_id: str) -> Optional[Dict[str, Any]]:
"""Get pipeline record by execution ID"""
return self.pipelines_collection.find_one(
{"execution_id": execution_id},
{"_id": 0} # Exclude MongoDB _id
)
def get_session_pipelines(
self,
session_id: str,
limit: int = 10
) -> List[Dict[str, Any]]:
"""
Get pipelines for a session
Args:
session_id: Session ID
limit: Maximum number of pipelines to return
Returns:
List of pipeline records (latest first)
"""
pipelines = list(self.pipelines_collection.find(
{"session_id": session_id},
{"_id": 0}
).sort("created_at", -1).limit(limit))
return pipelines
def regenerate_presigned_url(self, execution_id: str) -> Optional[Dict[str, str]]:
"""
Regenerate presigned URL for pipeline output
Args:
execution_id: Pipeline execution ID
Returns:
Dict with new presigned_url and expires_at, or None if not found
"""
pipeline = self.pipelines_collection.find_one({"execution_id": execution_id})
if not pipeline or not pipeline.get("final_output_s3_key"):
return None
# Generate new presigned URL
presigned = self.s3.generate_presigned_url(
pipeline["final_output_s3_key"],
expires_in=604800,
add_prefix=False
)
# Update in MongoDB
self.pipelines_collection.update_one(
{"execution_id": execution_id},
{
"$set": {
"final_output_presigned_url": presigned["presigned_url"],
"final_output_presigned_expires_at": presigned["presigned_expires_at"]
}
}
)
return presigned
# Global singleton instance
_pipeline_manager_instance: Optional[PipelineManager] = None
def get_pipeline_manager() -> PipelineManager:
"""Get or create global PipelineManager instance"""
global _pipeline_manager_instance
if _pipeline_manager_instance is None:
_pipeline_manager_instance = PipelineManager()
return _pipeline_manager_instance