Spaces:
Sleeping
Sleeping
Refactor: Use MongoDB _id for messages, pipeline_id_index for components, standardize outputs to result:{text:{}} format
d6a6b32
| # services/pipeline_manager.py | |
| """ | |
| Pipeline Manager for MasterLLM V3 Architecture | |
| Manages pipeline execution records: | |
| - Create pipeline records in MongoDB | |
| - Track component execution status | |
| - Store component outputs to S3 | |
| - Generate presigned URLs for outputs | |
| - Manage final output compilation | |
| """ | |
| import json | |
| import uuid | |
| from typing import Dict, Any, List, Optional | |
| from datetime import datetime | |
| from pymongo import MongoClient | |
| from services.s3_manager import get_s3_manager | |
| from services.schemas import ( | |
| PipelineSchema, ComponentMetadata, PipelineStatus, ComponentStatus, | |
| FinalOutputS3, ComponentOutputS3, schema_to_dict, | |
| generate_component_id, generate_output_id | |
| ) | |
| import os | |
| class PipelineManager: | |
| """Manages pipeline execution records and outputs""" | |
| def __init__(self): | |
| """Initialize Pipeline Manager""" | |
| # MongoDB connection | |
| mongodb_uri = os.getenv("MONGODB_URI") | |
| mongodb_db = os.getenv("MONGODB_DB", "masterllm") | |
| if not mongodb_uri: | |
| raise RuntimeError("MONGODB_URI environment variable not set") | |
| self.client = MongoClient(mongodb_uri) | |
| self.db = self.client[mongodb_db] | |
| self.pipelines_collection = self.db["pipelines"] | |
| # S3 manager | |
| self.s3 = get_s3_manager() | |
| def create_pipeline_metadata( | |
| self, | |
| pipeline_id: str, | |
| session_id: str, | |
| pipeline_name: str, | |
| s3_key: str, | |
| status: str = "proposed", | |
| created_by_message: str = "" | |
| ) -> bool: | |
| """Create metadata record for pipeline (MongoDB only). Full document in S3.""" | |
| now = datetime.utcnow() | |
| metadata = { | |
| "pipeline_id": pipeline_id, | |
| "session_id": session_id, | |
| "pipeline_name": pipeline_name, | |
| "status": status, | |
| "s3_key": s3_key, | |
| "final_output_url": None, | |
| "created_at": now.isoformat() + "Z", | |
| "updated_at": now.isoformat() + "Z", | |
| "created_by_message": created_by_message | |
| } | |
| self.pipelines_collection.insert_one(metadata) | |
| return True | |
| def update_pipeline_status( | |
| self, | |
| pipeline_id: str, | |
| status: str, | |
| final_output_url: str = None, | |
| final_output_expires_at: str = None | |
| ) -> bool: | |
| """Update pipeline status in metadata.""" | |
| update_data = { | |
| "status": status, | |
| "updated_at": datetime.utcnow().isoformat() + "Z" | |
| } | |
| if final_output_url: | |
| update_data["final_output_url"] = final_output_url | |
| if final_output_expires_at: | |
| update_data["final_output_expires_at"] = final_output_expires_at | |
| result = self.pipelines_collection.update_one( | |
| {"pipeline_id": pipeline_id}, | |
| {"$set": update_data} | |
| ) | |
| return result.modified_count > 0 | |
| def get_pipeline_metadata(self, pipeline_id: str) -> Optional[Dict[str, Any]]: | |
| """Get pipeline metadata by ID.""" | |
| return self.pipelines_collection.find_one( | |
| {"pipeline_id": pipeline_id}, | |
| {"_id": 0} | |
| ) | |
| def get_full_pipeline_document(self, pipeline_id: str) -> Optional[Dict[str, Any]]: | |
| """Get full pipeline document from S3 via metadata lookup.""" | |
| metadata = self.get_pipeline_metadata(pipeline_id) | |
| if not metadata: | |
| return None | |
| s3_key = metadata.get("s3_key") | |
| if not s3_key: | |
| return None | |
| try: | |
| return self.s3.download_json(s3_key, add_prefix=False) | |
| except Exception as e: | |
| print(f"⚠️ Failed to download pipeline document: {e}") | |
| return None | |
| def create_pipeline_record( | |
| self, | |
| session_id: str, | |
| pipeline_definition: Dict[str, Any], | |
| created_from: str = "request", | |
| created_by_message: str = "" | |
| ) -> str: | |
| """ | |
| Create a new pipeline execution record | |
| Args: | |
| session_id: Session ID | |
| pipeline_definition: Full pipeline definition dict | |
| created_from: "request" or "edit" | |
| created_by_message: User's message that triggered pipeline | |
| Returns: | |
| execution_id: Unique execution ID | |
| """ | |
| execution_id = str(uuid.uuid4()) | |
| now = datetime.utcnow() | |
| # Extract components from pipeline | |
| pipeline_steps = pipeline_definition.get("components", []) or pipeline_definition.get("pipeline_steps", []) | |
| components = [] | |
| # Generate component IDs in format: execution_id_index | |
| for i, step in enumerate(pipeline_steps): | |
| component = ComponentMetadata( | |
| component_id=generate_component_id(execution_id, i), | |
| component_name=step.get("tool_name", "unknown"), | |
| order=i + 1, | |
| status=ComponentStatus.PENDING | |
| ) | |
| components.append(component) | |
| # Store pipeline definition in S3 | |
| s3_key = f"sessions/{session_id}/pipelines/{execution_id}_definition.json" | |
| self.s3.upload_json(s3_key, pipeline_definition, add_prefix=False) | |
| # Create pipeline record | |
| output_id = generate_output_id() | |
| pipeline_record = PipelineSchema( | |
| execution_id=execution_id, | |
| session_id=session_id, | |
| pipeline_name=pipeline_definition.get("pipeline_name", "Untitled Pipeline"), | |
| created_at=now, | |
| created_from=created_from, | |
| created_by_message=created_by_message, | |
| status=PipelineStatus.PROPOSED, | |
| pipeline_definition_s3_key=s3_key, | |
| components=components, | |
| output_id=output_id | |
| ) | |
| # Insert into MongoDB | |
| self.pipelines_collection.insert_one(schema_to_dict(pipeline_record)) | |
| return execution_id | |
| def update_component_status( | |
| self, | |
| execution_id: str, | |
| component_name: str, | |
| status: str, | |
| output: Optional[Dict[str, Any]] = None, | |
| error: Optional[str] = None, | |
| success_message: Optional[str] = None | |
| ) -> bool: | |
| """ | |
| Update component execution status | |
| Args: | |
| execution_id: Pipeline execution ID | |
| component_name: Name of component | |
| status: "executing", "completed", or "failed" | |
| output: Component output (will be wrapped in result:{text:{}} format) | |
| error: Error message if failed | |
| success_message: Success message if completed | |
| Returns: | |
| True if updated successfully | |
| """ | |
| now = datetime.utcnow() | |
| # Find the component in the pipeline | |
| pipeline = self.pipelines_collection.find_one({"execution_id": execution_id}) | |
| if not pipeline: | |
| return False | |
| # Update component metadata | |
| components = pipeline.get("components", []) | |
| component_found = False | |
| for comp in components: | |
| if comp["component_name"] == component_name: | |
| comp["status"] = status | |
| if status == "executing": | |
| comp["started_at"] = now.isoformat() + "Z" | |
| elif status in ["completed", "failed"]: | |
| comp["completed_at"] = now.isoformat() + "Z" | |
| if success_message: | |
| comp["success_message"] = success_message | |
| if error: | |
| comp["error"] = {"message": error} if isinstance(error, str) else error | |
| comp["hasError"] = True | |
| else: | |
| comp["hasError"] = False | |
| # Wrap component output in standardized format: result:{text:{}} | |
| if output: | |
| # Check if already wrapped | |
| if "result" in output and "text" in output.get("result", {}): | |
| comp["component_output"] = output | |
| else: | |
| # Wrap in standardized format | |
| comp["component_output"] = { | |
| "result": { | |
| "text": output | |
| } | |
| } | |
| component_found = True | |
| break | |
| if not component_found: | |
| return False | |
| # Update in MongoDB | |
| self.pipelines_collection.update_one( | |
| {"execution_id": execution_id}, | |
| { | |
| "$set": { | |
| "components": components, | |
| "status": PipelineStatus.EXECUTING | |
| } | |
| } | |
| ) | |
| # Store output temporarily (will be compiled into final_output later) | |
| if output: | |
| # Wrap output before storing | |
| wrapped_output = comp["component_output"] # Already wrapped above | |
| temp_key = f"sessions/{pipeline['session_id']}/outputs/{execution_id}/temp_{component_name}.json" | |
| self.s3.upload_json(temp_key, wrapped_output, add_prefix=False) | |
| return True | |
| def mark_pipeline_completed( | |
| self, | |
| execution_id: str, | |
| components_results: List[Dict[str, Any]], | |
| executor: str = "unknown" | |
| ) -> Dict[str, Any]: | |
| """ | |
| Mark pipeline as completed and generate final output | |
| Args: | |
| execution_id: Pipeline execution ID | |
| components_results: List of {component_name, result, status, message} dicts | |
| executor: Executor used ("bedrock", "crewai", etc.) | |
| Returns: | |
| Dict with final_output_url (presigned) and metadata | |
| """ | |
| now = datetime.utcnow() | |
| # Get pipeline record | |
| pipeline = self.pipelines_collection.find_one({"execution_id": execution_id}) | |
| if not pipeline: | |
| raise RuntimeError(f"Pipeline {execution_id} not found") | |
| session_id = pipeline["session_id"] | |
| # Determine workflow status (failed if ANY component failed) | |
| workflow_status = "completed" | |
| for result in components_results: | |
| if result.get("status") == "failed": | |
| workflow_status = "failed" | |
| break | |
| # Get last successful node output - extract from result:{text:{}} format | |
| last_node_output = None | |
| for result in reversed(components_results): | |
| if result.get("status") == "completed" and result.get("result"): | |
| # Extract text from standardized result:{text:{}} format | |
| res = result.get("result", {}) | |
| if isinstance(res, dict): | |
| # Check for result:{text:{}} structure | |
| if "result" in res and "text" in res.get("result", {}): | |
| text_content = res["result"]["text"] | |
| if isinstance(text_content, dict): | |
| last_node_output = text_content.get("text") or text_content.get("summary") or json.dumps(text_content)[:500] | |
| else: | |
| last_node_output = str(text_content)[:500] | |
| else: | |
| # Fallback for non-wrapped format | |
| last_node_output = res.get("text") or res.get("summary") or json.dumps(res)[:500] | |
| elif isinstance(res, str): | |
| last_node_output = res | |
| break | |
| # Build final output for S3 with standardized format | |
| component_outputs = [] | |
| for i, result in enumerate(components_results): | |
| # Ensure result is wrapped in result:{text:{}} format | |
| result_data = result.get("result", {}) | |
| if isinstance(result_data, dict) and "result" in result_data and "text" in result_data.get("result", {}): | |
| wrapped_result = result_data | |
| else: | |
| # Wrap in standardized format | |
| wrapped_result = { | |
| "result": { | |
| "text": result_data | |
| } | |
| } | |
| comp_output = ComponentOutputS3( | |
| component_id=result.get("component_id", f"{execution_id}_{i}"), | |
| name=result.get("component_name", f"component_{i+1}"), | |
| order=i + 1, | |
| status=result.get("status", "unknown"), | |
| component_output=wrapped_result, | |
| hasError=result.get("status") == "failed", | |
| error={"message": result.get("error")} if result.get("error") else None, | |
| success_message=result.get("success_message"), | |
| metadata=result.get("metadata"), | |
| result=wrapped_result | |
| ) | |
| component_outputs.append(comp_output.model_dump()) | |
| final_output = FinalOutputS3( | |
| workflow_name=pipeline.get("pipeline_name", "Untitled"), | |
| workflow_status=workflow_status, | |
| last_node_output=last_node_output, | |
| components=component_outputs, | |
| execution_time=0.0, # TODO: Calculate from started_at to now | |
| timestamp=now.isoformat() + "Z" | |
| ) | |
| # Store final output in S3 | |
| final_output_key = f"sessions/{session_id}/outputs/{execution_id}/final_output.json" | |
| self.s3.upload_json(final_output_key, final_output.model_dump(), add_prefix=False) | |
| # Generate presigned URL (7 days max) | |
| presigned = self.s3.generate_presigned_url(final_output_key, expires_in=604800, add_prefix=False) | |
| # Extract result preview for MongoDB | |
| result_preview = last_node_output[:500] if last_node_output else "No output" | |
| # Update pipeline record | |
| self.pipelines_collection.update_one( | |
| {"execution_id": execution_id}, | |
| { | |
| "$set": { | |
| "status": PipelineStatus.COMPLETED if workflow_status == "completed" else PipelineStatus.FAILED, | |
| "final_output_s3_key": final_output_key, | |
| "final_output_presigned_url": presigned["presigned_url"], | |
| "final_output_presigned_expires_at": presigned["presigned_expires_at"], | |
| "completed_at": now.isoformat() + "Z", | |
| "executor": executor, | |
| "result": result_preview, | |
| "hasError": workflow_status == "failed", | |
| "error": {"message": "Pipeline execution failed"} if workflow_status == "failed" else None | |
| } | |
| } | |
| ) | |
| return { | |
| "final_output_url": presigned["presigned_url"], | |
| "final_output_expires_at": presigned["presigned_expires_at"], | |
| "final_output_s3_key": final_output_key, | |
| "workflow_status": workflow_status, | |
| "last_node_output": last_node_output, | |
| "result": {"text": {"summary": last_node_output}} if last_node_output else None | |
| } | |
| def mark_pipeline_failed( | |
| self, | |
| execution_id: str, | |
| error: str | |
| ) -> bool: | |
| """ | |
| Mark pipeline as failed | |
| Args: | |
| execution_id: Pipeline execution ID | |
| error: Error message | |
| Returns: | |
| True if updated successfully | |
| """ | |
| now = datetime.utcnow() | |
| self.pipelines_collection.update_one( | |
| {"execution_id": execution_id}, | |
| { | |
| "$set": { | |
| "status": PipelineStatus.FAILED, | |
| "error": {"message": error} if isinstance(error, str) else error, | |
| "hasError": True, | |
| "completed_at": now.isoformat() + "Z" | |
| } | |
| } | |
| ) | |
| return True | |
| def get_pipeline(self, execution_id: str) -> Optional[Dict[str, Any]]: | |
| """Get pipeline record by execution ID""" | |
| return self.pipelines_collection.find_one( | |
| {"execution_id": execution_id}, | |
| {"_id": 0} # Exclude MongoDB _id | |
| ) | |
| def get_session_pipelines( | |
| self, | |
| session_id: str, | |
| limit: int = 10 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Get pipelines for a session | |
| Args: | |
| session_id: Session ID | |
| limit: Maximum number of pipelines to return | |
| Returns: | |
| List of pipeline records (latest first) | |
| """ | |
| pipelines = list(self.pipelines_collection.find( | |
| {"session_id": session_id}, | |
| {"_id": 0} | |
| ).sort("created_at", -1).limit(limit)) | |
| return pipelines | |
| def regenerate_presigned_url(self, execution_id: str) -> Optional[Dict[str, str]]: | |
| """ | |
| Regenerate presigned URL for pipeline output | |
| Args: | |
| execution_id: Pipeline execution ID | |
| Returns: | |
| Dict with new presigned_url and expires_at, or None if not found | |
| """ | |
| pipeline = self.pipelines_collection.find_one({"execution_id": execution_id}) | |
| if not pipeline or not pipeline.get("final_output_s3_key"): | |
| return None | |
| # Generate new presigned URL | |
| presigned = self.s3.generate_presigned_url( | |
| pipeline["final_output_s3_key"], | |
| expires_in=604800, | |
| add_prefix=False | |
| ) | |
| # Update in MongoDB | |
| self.pipelines_collection.update_one( | |
| {"execution_id": execution_id}, | |
| { | |
| "$set": { | |
| "final_output_presigned_url": presigned["presigned_url"], | |
| "final_output_presigned_expires_at": presigned["presigned_expires_at"] | |
| } | |
| } | |
| ) | |
| return presigned | |
| # Global singleton instance | |
| _pipeline_manager_instance: Optional[PipelineManager] = None | |
| def get_pipeline_manager() -> PipelineManager: | |
| """Get or create global PipelineManager instance""" | |
| global _pipeline_manager_instance | |
| if _pipeline_manager_instance is None: | |
| _pipeline_manager_instance = PipelineManager() | |
| return _pipeline_manager_instance | |