Spaces:
Sleeping
Sleeping
feat: Unified pipeline lifecycle - single S3 file per pipeline - Pipeline now stored in S3 with pipeline_id at creation - Proposals stored in S3, only reference in MongoDB - Results appended to same S3 file on completion - Workflow save uses pipeline_id, works at any stage - pipelines_history now in session API response
aaaba76
| # services/workflow_manager.py | |
| """ | |
| Workflow Manager for MasterLLM V3 Architecture | |
| Manages saved workflows: | |
| - Save user-approved pipelines to workflows collection | |
| - NO deduplication (allow duplicates) | |
| - Store workflow definitions in S3 | |
| - Retrieve and manage saved workflows | |
| """ | |
| import uuid | |
| from typing import Dict, Any, List, Optional | |
| from datetime import datetime | |
| from pymongo import MongoClient | |
| from services.s3_manager import get_s3_manager | |
| from services.schemas import WorkflowSchema, schema_to_dict | |
| import os | |
| class WorkflowManager: | |
| """Manages saved workflows""" | |
| def __init__(self): | |
| """Initialize Workflow Manager""" | |
| # MongoDB connection | |
| mongodb_uri = os.getenv("MONGODB_URI") | |
| mongodb_db = os.getenv("MONGODB_DB", "masterllm") | |
| if not mongodb_uri: | |
| raise RuntimeError("MONGODB_URI environment variable not set") | |
| self.client = MongoClient(mongodb_uri) | |
| self.db = self.client[mongodb_db] | |
| self.workflows_collection = self.db["workflows"] | |
| # S3 manager | |
| self.s3 = get_s3_manager() | |
| def save_workflow( | |
| self, | |
| session_id: str, | |
| pipeline_definition: Dict[str, Any], | |
| user_message: str, | |
| source_pipeline_id: str = None, | |
| pipeline_status: str = None | |
| ) -> str: | |
| """ | |
| Save a pipeline as a workflow | |
| NOTE: NO DEDUPLICATION - always creates a new workflow even if identical | |
| Args: | |
| session_id: Session where workflow was created | |
| pipeline_definition: Full pipeline definition | |
| user_message: User's message when confirming save | |
| source_pipeline_id: Pipeline ID this workflow came from | |
| pipeline_status: Pipeline status when saved ("proposed", "completed") | |
| Returns: | |
| workflow_id: Unique workflow ID | |
| """ | |
| workflow_id = str(uuid.uuid4()) | |
| now = datetime.utcnow() | |
| # Store pipeline definition in S3 | |
| s3_key = f"workflows/{workflow_id}.json" | |
| self.s3.upload_json(s3_key, pipeline_definition, add_prefix=False) | |
| # Create pipeline preview (component names joined) | |
| components = pipeline_definition.get("components", []) or pipeline_definition.get("pipeline_steps", []) | |
| component_names = [comp.get("tool_name", "unknown") for comp in components] | |
| pipeline_preview = " → ".join(component_names) if component_names else "Empty Pipeline" | |
| # Create workflow record | |
| workflow_record = WorkflowSchema( | |
| workflow_id=workflow_id, | |
| session_id=session_id, | |
| saved_at=now, | |
| saved_by_user_message=user_message, | |
| pipeline_definition_s3_key=s3_key, | |
| pipeline_name=pipeline_definition.get("pipeline_name", "Untitled Workflow"), | |
| pipeline_preview=pipeline_preview, | |
| user_confirmed=True, | |
| source_pipeline_id=source_pipeline_id, | |
| pipeline_status=pipeline_status | |
| ) | |
| # Insert into MongoDB | |
| self.workflows_collection.insert_one(schema_to_dict(workflow_record)) | |
| return workflow_id | |
| def get_workflows( | |
| self, | |
| limit: int = 100, | |
| skip: int = 0 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Get all workflows | |
| Args: | |
| limit: Maximum number to return | |
| skip: Number to skip (for pagination) | |
| Returns: | |
| List of workflow records (latest first) | |
| """ | |
| workflows = list(self.workflows_collection.find( | |
| {}, | |
| {"_id": 0} # Exclude MongoDB _id | |
| ).sort("saved_at", -1).skip(skip).limit(limit)) | |
| return workflows | |
| def get_workflow(self, workflow_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Get a specific workflow by ID | |
| Args: | |
| workflow_id: Workflow ID | |
| Returns: | |
| Workflow record with full definition from S3, or None if not found | |
| """ | |
| workflow = self.workflows_collection.find_one( | |
| {"workflow_id": workflow_id}, | |
| {"_id": 0} | |
| ) | |
| if not workflow: | |
| return None | |
| # Load full pipeline definition from S3 | |
| try: | |
| pipeline_definition = self.s3.download_json( | |
| workflow["pipeline_definition_s3_key"], | |
| add_prefix=False | |
| ) | |
| workflow["pipeline_definition"] = pipeline_definition | |
| except Exception as e: | |
| workflow["pipeline_definition"] = None | |
| workflow["error"] = f"Failed to load pipeline definition: {str(e)}" | |
| return workflow | |
| def delete_workflow(self, workflow_id: str) -> bool: | |
| """ | |
| Delete a workflow | |
| Args: | |
| workflow_id: Workflow ID | |
| Returns: | |
| True if deleted successfully | |
| """ | |
| workflow = self.workflows_collection.find_one({"workflow_id": workflow_id}) | |
| if not workflow: | |
| return False | |
| # Delete from S3 | |
| try: | |
| self.s3.delete_object( | |
| workflow["pipeline_definition_s3_key"], | |
| add_prefix=False | |
| ) | |
| except Exception: | |
| # Continue even if S3 deletion fails | |
| pass | |
| # Delete from MongoDB | |
| result = self.workflows_collection.delete_one({"workflow_id": workflow_id}) | |
| return result.deleted_count > 0 | |
| def get_session_workflows( | |
| self, | |
| session_id: str, | |
| limit: int = 10 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Get workflows created in a specific session | |
| Args: | |
| session_id: Session ID | |
| limit: Maximum number to return | |
| Returns: | |
| List of workflow records | |
| """ | |
| workflows = list(self.workflows_collection.find( | |
| {"session_id": session_id}, | |
| {"_id": 0} | |
| ).sort("saved_at", -1).limit(limit)) | |
| return workflows | |
| def count_workflows(self) -> int: | |
| """Get total count of workflows""" | |
| return self.workflows_collection.count_documents({}) | |
| # Global singleton instance | |
| _workflow_manager_instance: Optional[WorkflowManager] = None | |
| def get_workflow_manager() -> WorkflowManager: | |
| """Get or create global WorkflowManager instance""" | |
| global _workflow_manager_instance | |
| if _workflow_manager_instance is None: | |
| _workflow_manager_instance = WorkflowManager() | |
| return _workflow_manager_instance | |