# services/pipeline_manager.py """ Pipeline Manager for MasterLLM V3 Architecture Manages pipeline execution records: - Create pipeline records in MongoDB - Track component execution status - Store component outputs to S3 - Generate presigned URLs for outputs - Manage final output compilation """ import json import uuid from typing import Dict, Any, List, Optional from datetime import datetime from pymongo import MongoClient from services.s3_manager import get_s3_manager from services.schemas import ( PipelineSchema, ComponentMetadata, PipelineStatus, ComponentStatus, FinalOutputS3, ComponentOutputS3, schema_to_dict, generate_component_id, generate_output_id ) import os class PipelineManager: """Manages pipeline execution records and outputs""" def __init__(self): """Initialize Pipeline Manager""" # MongoDB connection mongodb_uri = os.getenv("MONGODB_URI") mongodb_db = os.getenv("MONGODB_DB", "masterllm") if not mongodb_uri: raise RuntimeError("MONGODB_URI environment variable not set") self.client = MongoClient(mongodb_uri) self.db = self.client[mongodb_db] self.pipelines_collection = self.db["pipelines"] # S3 manager self.s3 = get_s3_manager() def create_pipeline_metadata( self, pipeline_id: str, session_id: str, pipeline_name: str, s3_key: str, status: str = "proposed", created_by_message: str = "" ) -> bool: """Create metadata record for pipeline (MongoDB only). Full document in S3.""" now = datetime.utcnow() metadata = { "pipeline_id": pipeline_id, "session_id": session_id, "pipeline_name": pipeline_name, "status": status, "s3_key": s3_key, "final_output_url": None, "created_at": now.isoformat() + "Z", "updated_at": now.isoformat() + "Z", "created_by_message": created_by_message } self.pipelines_collection.insert_one(metadata) return True def update_pipeline_status( self, pipeline_id: str, status: str, final_output_url: str = None, final_output_expires_at: str = None ) -> bool: """Update pipeline status in metadata.""" update_data = { "status": status, "updated_at": datetime.utcnow().isoformat() + "Z" } if final_output_url: update_data["final_output_url"] = final_output_url if final_output_expires_at: update_data["final_output_expires_at"] = final_output_expires_at result = self.pipelines_collection.update_one( {"pipeline_id": pipeline_id}, {"$set": update_data} ) return result.modified_count > 0 def get_pipeline_metadata(self, pipeline_id: str) -> Optional[Dict[str, Any]]: """Get pipeline metadata by ID.""" return self.pipelines_collection.find_one( {"pipeline_id": pipeline_id}, {"_id": 0} ) def get_full_pipeline_document(self, pipeline_id: str) -> Optional[Dict[str, Any]]: """Get full pipeline document from S3 via metadata lookup.""" metadata = self.get_pipeline_metadata(pipeline_id) if not metadata: return None s3_key = metadata.get("s3_key") if not s3_key: return None try: return self.s3.download_json(s3_key, add_prefix=False) except Exception as e: print(f"⚠️ Failed to download pipeline document: {e}") return None def create_pipeline_record( self, session_id: str, pipeline_definition: Dict[str, Any], created_from: str = "request", created_by_message: str = "" ) -> str: """ Create a new pipeline execution record Args: session_id: Session ID pipeline_definition: Full pipeline definition dict created_from: "request" or "edit" created_by_message: User's message that triggered pipeline Returns: execution_id: Unique execution ID """ execution_id = str(uuid.uuid4()) now = datetime.utcnow() # Extract components from pipeline pipeline_steps = pipeline_definition.get("components", []) or pipeline_definition.get("pipeline_steps", []) components = [] # Generate component IDs in format: execution_id_index for i, step in enumerate(pipeline_steps): component = ComponentMetadata( component_id=generate_component_id(execution_id, i), component_name=step.get("tool_name", "unknown"), order=i + 1, status=ComponentStatus.PENDING ) components.append(component) # Store pipeline definition in S3 s3_key = f"sessions/{session_id}/pipelines/{execution_id}_definition.json" self.s3.upload_json(s3_key, pipeline_definition, add_prefix=False) # Create pipeline record output_id = generate_output_id() pipeline_record = PipelineSchema( execution_id=execution_id, session_id=session_id, pipeline_name=pipeline_definition.get("pipeline_name", "Untitled Pipeline"), created_at=now, created_from=created_from, created_by_message=created_by_message, status=PipelineStatus.PROPOSED, pipeline_definition_s3_key=s3_key, components=components, output_id=output_id ) # Insert into MongoDB self.pipelines_collection.insert_one(schema_to_dict(pipeline_record)) return execution_id def update_component_status( self, execution_id: str, component_name: str, status: str, output: Optional[Dict[str, Any]] = None, error: Optional[str] = None, success_message: Optional[str] = None ) -> bool: """ Update component execution status Args: execution_id: Pipeline execution ID component_name: Name of component status: "executing", "completed", or "failed" output: Component output (will be wrapped in result:{text:{}} format) error: Error message if failed success_message: Success message if completed Returns: True if updated successfully """ now = datetime.utcnow() # Find the component in the pipeline pipeline = self.pipelines_collection.find_one({"execution_id": execution_id}) if not pipeline: return False # Update component metadata components = pipeline.get("components", []) component_found = False for comp in components: if comp["component_name"] == component_name: comp["status"] = status if status == "executing": comp["started_at"] = now.isoformat() + "Z" elif status in ["completed", "failed"]: comp["completed_at"] = now.isoformat() + "Z" if success_message: comp["success_message"] = success_message if error: comp["error"] = {"message": error} if isinstance(error, str) else error comp["hasError"] = True else: comp["hasError"] = False # Wrap component output in standardized format: result:{text:{}} if output: # Check if already wrapped if "result" in output and "text" in output.get("result", {}): comp["component_output"] = output else: # Wrap in standardized format comp["component_output"] = { "result": { "text": output } } component_found = True break if not component_found: return False # Update in MongoDB self.pipelines_collection.update_one( {"execution_id": execution_id}, { "$set": { "components": components, "status": PipelineStatus.EXECUTING } } ) # Store output temporarily (will be compiled into final_output later) if output: # Wrap output before storing wrapped_output = comp["component_output"] # Already wrapped above temp_key = f"sessions/{pipeline['session_id']}/outputs/{execution_id}/temp_{component_name}.json" self.s3.upload_json(temp_key, wrapped_output, add_prefix=False) return True def mark_pipeline_completed( self, execution_id: str, components_results: List[Dict[str, Any]], executor: str = "unknown" ) -> Dict[str, Any]: """ Mark pipeline as completed and generate final output Args: execution_id: Pipeline execution ID components_results: List of {component_name, result, status, message} dicts executor: Executor used ("bedrock", "crewai", etc.) Returns: Dict with final_output_url (presigned) and metadata """ now = datetime.utcnow() # Get pipeline record pipeline = self.pipelines_collection.find_one({"execution_id": execution_id}) if not pipeline: raise RuntimeError(f"Pipeline {execution_id} not found") session_id = pipeline["session_id"] # Determine workflow status (failed if ANY component failed) workflow_status = "completed" for result in components_results: if result.get("status") == "failed": workflow_status = "failed" break # Get last successful node output - extract from result:{text:{}} format last_node_output = None for result in reversed(components_results): if result.get("status") == "completed" and result.get("result"): # Extract text from standardized result:{text:{}} format res = result.get("result", {}) if isinstance(res, dict): # Check for result:{text:{}} structure if "result" in res and "text" in res.get("result", {}): text_content = res["result"]["text"] if isinstance(text_content, dict): last_node_output = text_content.get("text") or text_content.get("summary") or json.dumps(text_content)[:500] else: last_node_output = str(text_content)[:500] else: # Fallback for non-wrapped format last_node_output = res.get("text") or res.get("summary") or json.dumps(res)[:500] elif isinstance(res, str): last_node_output = res break # Build final output for S3 with standardized format component_outputs = [] for i, result in enumerate(components_results): # Ensure result is wrapped in result:{text:{}} format result_data = result.get("result", {}) if isinstance(result_data, dict) and "result" in result_data and "text" in result_data.get("result", {}): wrapped_result = result_data else: # Wrap in standardized format wrapped_result = { "result": { "text": result_data } } comp_output = ComponentOutputS3( component_id=result.get("component_id", f"{execution_id}_{i}"), name=result.get("component_name", f"component_{i+1}"), order=i + 1, status=result.get("status", "unknown"), component_output=wrapped_result, hasError=result.get("status") == "failed", error={"message": result.get("error")} if result.get("error") else None, success_message=result.get("success_message"), metadata=result.get("metadata"), result=wrapped_result ) component_outputs.append(comp_output.model_dump()) final_output = FinalOutputS3( workflow_name=pipeline.get("pipeline_name", "Untitled"), workflow_status=workflow_status, last_node_output=last_node_output, components=component_outputs, execution_time=0.0, # TODO: Calculate from started_at to now timestamp=now.isoformat() + "Z" ) # Store final output in S3 final_output_key = f"sessions/{session_id}/outputs/{execution_id}/final_output.json" self.s3.upload_json(final_output_key, final_output.model_dump(), add_prefix=False) # Generate presigned URL (7 days max) presigned = self.s3.generate_presigned_url(final_output_key, expires_in=604800, add_prefix=False) # Extract result preview for MongoDB result_preview = last_node_output[:500] if last_node_output else "No output" # Update pipeline record self.pipelines_collection.update_one( {"execution_id": execution_id}, { "$set": { "status": PipelineStatus.COMPLETED if workflow_status == "completed" else PipelineStatus.FAILED, "final_output_s3_key": final_output_key, "final_output_presigned_url": presigned["presigned_url"], "final_output_presigned_expires_at": presigned["presigned_expires_at"], "completed_at": now.isoformat() + "Z", "executor": executor, "result": result_preview, "hasError": workflow_status == "failed", "error": {"message": "Pipeline execution failed"} if workflow_status == "failed" else None } } ) return { "final_output_url": presigned["presigned_url"], "final_output_expires_at": presigned["presigned_expires_at"], "final_output_s3_key": final_output_key, "workflow_status": workflow_status, "last_node_output": last_node_output, "result": {"text": {"summary": last_node_output}} if last_node_output else None } def mark_pipeline_failed( self, execution_id: str, error: str ) -> bool: """ Mark pipeline as failed Args: execution_id: Pipeline execution ID error: Error message Returns: True if updated successfully """ now = datetime.utcnow() self.pipelines_collection.update_one( {"execution_id": execution_id}, { "$set": { "status": PipelineStatus.FAILED, "error": {"message": error} if isinstance(error, str) else error, "hasError": True, "completed_at": now.isoformat() + "Z" } } ) return True def get_pipeline(self, execution_id: str) -> Optional[Dict[str, Any]]: """Get pipeline record by execution ID""" return self.pipelines_collection.find_one( {"execution_id": execution_id}, {"_id": 0} # Exclude MongoDB _id ) def get_session_pipelines( self, session_id: str, limit: int = 10 ) -> List[Dict[str, Any]]: """ Get pipelines for a session Args: session_id: Session ID limit: Maximum number of pipelines to return Returns: List of pipeline records (latest first) """ pipelines = list(self.pipelines_collection.find( {"session_id": session_id}, {"_id": 0} ).sort("created_at", -1).limit(limit)) return pipelines def regenerate_presigned_url(self, execution_id: str) -> Optional[Dict[str, str]]: """ Regenerate presigned URL for pipeline output Args: execution_id: Pipeline execution ID Returns: Dict with new presigned_url and expires_at, or None if not found """ pipeline = self.pipelines_collection.find_one({"execution_id": execution_id}) if not pipeline or not pipeline.get("final_output_s3_key"): return None # Generate new presigned URL presigned = self.s3.generate_presigned_url( pipeline["final_output_s3_key"], expires_in=604800, add_prefix=False ) # Update in MongoDB self.pipelines_collection.update_one( {"execution_id": execution_id}, { "$set": { "final_output_presigned_url": presigned["presigned_url"], "final_output_presigned_expires_at": presigned["presigned_expires_at"] } } ) return presigned # Global singleton instance _pipeline_manager_instance: Optional[PipelineManager] = None def get_pipeline_manager() -> PipelineManager: """Get or create global PipelineManager instance""" global _pipeline_manager_instance if _pipeline_manager_instance is None: _pipeline_manager_instance = PipelineManager() return _pipeline_manager_instance