masterllm / services /workflow_manager.py
ganesh-vilje's picture
feat: Unified pipeline lifecycle - single S3 file per pipeline - Pipeline now stored in S3 with pipeline_id at creation - Proposals stored in S3, only reference in MongoDB - Results appended to same S3 file on completion - Workflow save uses pipeline_id, works at any stage - pipelines_history now in session API response
aaaba76
# services/workflow_manager.py
"""
Workflow Manager for MasterLLM V3 Architecture
Manages saved workflows:
- Save user-approved pipelines to workflows collection
- NO deduplication (allow duplicates)
- Store workflow definitions in S3
- Retrieve and manage saved workflows
"""
import uuid
from typing import Dict, Any, List, Optional
from datetime import datetime
from pymongo import MongoClient
from services.s3_manager import get_s3_manager
from services.schemas import WorkflowSchema, schema_to_dict
import os
class WorkflowManager:
"""Manages saved workflows"""
def __init__(self):
"""Initialize Workflow Manager"""
# MongoDB connection
mongodb_uri = os.getenv("MONGODB_URI")
mongodb_db = os.getenv("MONGODB_DB", "masterllm")
if not mongodb_uri:
raise RuntimeError("MONGODB_URI environment variable not set")
self.client = MongoClient(mongodb_uri)
self.db = self.client[mongodb_db]
self.workflows_collection = self.db["workflows"]
# S3 manager
self.s3 = get_s3_manager()
def save_workflow(
self,
session_id: str,
pipeline_definition: Dict[str, Any],
user_message: str,
source_pipeline_id: str = None,
pipeline_status: str = None
) -> str:
"""
Save a pipeline as a workflow
NOTE: NO DEDUPLICATION - always creates a new workflow even if identical
Args:
session_id: Session where workflow was created
pipeline_definition: Full pipeline definition
user_message: User's message when confirming save
source_pipeline_id: Pipeline ID this workflow came from
pipeline_status: Pipeline status when saved ("proposed", "completed")
Returns:
workflow_id: Unique workflow ID
"""
workflow_id = str(uuid.uuid4())
now = datetime.utcnow()
# Store pipeline definition in S3
s3_key = f"workflows/{workflow_id}.json"
self.s3.upload_json(s3_key, pipeline_definition, add_prefix=False)
# Create pipeline preview (component names joined)
components = pipeline_definition.get("components", []) or pipeline_definition.get("pipeline_steps", [])
component_names = [comp.get("tool_name", "unknown") for comp in components]
pipeline_preview = " → ".join(component_names) if component_names else "Empty Pipeline"
# Create workflow record
workflow_record = WorkflowSchema(
workflow_id=workflow_id,
session_id=session_id,
saved_at=now,
saved_by_user_message=user_message,
pipeline_definition_s3_key=s3_key,
pipeline_name=pipeline_definition.get("pipeline_name", "Untitled Workflow"),
pipeline_preview=pipeline_preview,
user_confirmed=True,
source_pipeline_id=source_pipeline_id,
pipeline_status=pipeline_status
)
# Insert into MongoDB
self.workflows_collection.insert_one(schema_to_dict(workflow_record))
return workflow_id
def get_workflows(
self,
limit: int = 100,
skip: int = 0
) -> List[Dict[str, Any]]:
"""
Get all workflows
Args:
limit: Maximum number to return
skip: Number to skip (for pagination)
Returns:
List of workflow records (latest first)
"""
workflows = list(self.workflows_collection.find(
{},
{"_id": 0} # Exclude MongoDB _id
).sort("saved_at", -1).skip(skip).limit(limit))
return workflows
def get_workflow(self, workflow_id: str) -> Optional[Dict[str, Any]]:
"""
Get a specific workflow by ID
Args:
workflow_id: Workflow ID
Returns:
Workflow record with full definition from S3, or None if not found
"""
workflow = self.workflows_collection.find_one(
{"workflow_id": workflow_id},
{"_id": 0}
)
if not workflow:
return None
# Load full pipeline definition from S3
try:
pipeline_definition = self.s3.download_json(
workflow["pipeline_definition_s3_key"],
add_prefix=False
)
workflow["pipeline_definition"] = pipeline_definition
except Exception as e:
workflow["pipeline_definition"] = None
workflow["error"] = f"Failed to load pipeline definition: {str(e)}"
return workflow
def delete_workflow(self, workflow_id: str) -> bool:
"""
Delete a workflow
Args:
workflow_id: Workflow ID
Returns:
True if deleted successfully
"""
workflow = self.workflows_collection.find_one({"workflow_id": workflow_id})
if not workflow:
return False
# Delete from S3
try:
self.s3.delete_object(
workflow["pipeline_definition_s3_key"],
add_prefix=False
)
except Exception:
# Continue even if S3 deletion fails
pass
# Delete from MongoDB
result = self.workflows_collection.delete_one({"workflow_id": workflow_id})
return result.deleted_count > 0
def get_session_workflows(
self,
session_id: str,
limit: int = 10
) -> List[Dict[str, Any]]:
"""
Get workflows created in a specific session
Args:
session_id: Session ID
limit: Maximum number to return
Returns:
List of workflow records
"""
workflows = list(self.workflows_collection.find(
{"session_id": session_id},
{"_id": 0}
).sort("saved_at", -1).limit(limit))
return workflows
def count_workflows(self) -> int:
"""Get total count of workflows"""
return self.workflows_collection.count_documents({})
# Global singleton instance
_workflow_manager_instance: Optional[WorkflowManager] = None
def get_workflow_manager() -> WorkflowManager:
"""Get or create global WorkflowManager instance"""
global _workflow_manager_instance
if _workflow_manager_instance is None:
_workflow_manager_instance = WorkflowManager()
return _workflow_manager_instance