# services/workflow_manager.py """ Workflow Manager for MasterLLM V3 Architecture Manages saved workflows: - Save user-approved pipelines to workflows collection - NO deduplication (allow duplicates) - Store workflow definitions in S3 - Retrieve and manage saved workflows """ import uuid from typing import Dict, Any, List, Optional from datetime import datetime from pymongo import MongoClient from services.s3_manager import get_s3_manager from services.schemas import WorkflowSchema, schema_to_dict import os class WorkflowManager: """Manages saved workflows""" def __init__(self): """Initialize Workflow Manager""" # MongoDB connection mongodb_uri = os.getenv("MONGODB_URI") mongodb_db = os.getenv("MONGODB_DB", "masterllm") if not mongodb_uri: raise RuntimeError("MONGODB_URI environment variable not set") self.client = MongoClient(mongodb_uri) self.db = self.client[mongodb_db] self.workflows_collection = self.db["workflows"] # S3 manager self.s3 = get_s3_manager() def save_workflow( self, session_id: str, pipeline_definition: Dict[str, Any], user_message: str, source_pipeline_id: str = None, pipeline_status: str = None ) -> str: """ Save a pipeline as a workflow NOTE: NO DEDUPLICATION - always creates a new workflow even if identical Args: session_id: Session where workflow was created pipeline_definition: Full pipeline definition user_message: User's message when confirming save source_pipeline_id: Pipeline ID this workflow came from pipeline_status: Pipeline status when saved ("proposed", "completed") Returns: workflow_id: Unique workflow ID """ workflow_id = str(uuid.uuid4()) now = datetime.utcnow() # Store pipeline definition in S3 s3_key = f"workflows/{workflow_id}.json" self.s3.upload_json(s3_key, pipeline_definition, add_prefix=False) # Create pipeline preview (component names joined) components = pipeline_definition.get("components", []) or pipeline_definition.get("pipeline_steps", []) component_names = [comp.get("tool_name", "unknown") for comp in components] pipeline_preview = " → ".join(component_names) if component_names else "Empty Pipeline" # Create workflow record workflow_record = WorkflowSchema( workflow_id=workflow_id, session_id=session_id, saved_at=now, saved_by_user_message=user_message, pipeline_definition_s3_key=s3_key, pipeline_name=pipeline_definition.get("pipeline_name", "Untitled Workflow"), pipeline_preview=pipeline_preview, user_confirmed=True, source_pipeline_id=source_pipeline_id, pipeline_status=pipeline_status ) # Insert into MongoDB self.workflows_collection.insert_one(schema_to_dict(workflow_record)) return workflow_id def get_workflows( self, limit: int = 100, skip: int = 0 ) -> List[Dict[str, Any]]: """ Get all workflows Args: limit: Maximum number to return skip: Number to skip (for pagination) Returns: List of workflow records (latest first) """ workflows = list(self.workflows_collection.find( {}, {"_id": 0} # Exclude MongoDB _id ).sort("saved_at", -1).skip(skip).limit(limit)) return workflows def get_workflow(self, workflow_id: str) -> Optional[Dict[str, Any]]: """ Get a specific workflow by ID Args: workflow_id: Workflow ID Returns: Workflow record with full definition from S3, or None if not found """ workflow = self.workflows_collection.find_one( {"workflow_id": workflow_id}, {"_id": 0} ) if not workflow: return None # Load full pipeline definition from S3 try: pipeline_definition = self.s3.download_json( workflow["pipeline_definition_s3_key"], add_prefix=False ) workflow["pipeline_definition"] = pipeline_definition except Exception as e: workflow["pipeline_definition"] = None workflow["error"] = f"Failed to load pipeline definition: {str(e)}" return workflow def delete_workflow(self, workflow_id: str) -> bool: """ Delete a workflow Args: workflow_id: Workflow ID Returns: True if deleted successfully """ workflow = self.workflows_collection.find_one({"workflow_id": workflow_id}) if not workflow: return False # Delete from S3 try: self.s3.delete_object( workflow["pipeline_definition_s3_key"], add_prefix=False ) except Exception: # Continue even if S3 deletion fails pass # Delete from MongoDB result = self.workflows_collection.delete_one({"workflow_id": workflow_id}) return result.deleted_count > 0 def get_session_workflows( self, session_id: str, limit: int = 10 ) -> List[Dict[str, Any]]: """ Get workflows created in a specific session Args: session_id: Session ID limit: Maximum number to return Returns: List of workflow records """ workflows = list(self.workflows_collection.find( {"session_id": session_id}, {"_id": 0} ).sort("saved_at", -1).limit(limit)) return workflows def count_workflows(self) -> int: """Get total count of workflows""" return self.workflows_collection.count_documents({}) # Global singleton instance _workflow_manager_instance: Optional[WorkflowManager] = None def get_workflow_manager() -> WorkflowManager: """Get or create global WorkflowManager instance""" global _workflow_manager_instance if _workflow_manager_instance is None: _workflow_manager_instance = WorkflowManager() return _workflow_manager_instance