Spaces:
Sleeping
Sleeping
File size: 6,714 Bytes
f0db057 aaaba76 f0db057 aaaba76 f0db057 aaaba76 f0db057 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# services/workflow_manager.py
"""
Workflow Manager for MasterLLM V3 Architecture
Manages saved workflows:
- Save user-approved pipelines to workflows collection
- NO deduplication (allow duplicates)
- Store workflow definitions in S3
- Retrieve and manage saved workflows
"""
import uuid
from typing import Dict, Any, List, Optional
from datetime import datetime
from pymongo import MongoClient
from services.s3_manager import get_s3_manager
from services.schemas import WorkflowSchema, schema_to_dict
import os
class WorkflowManager:
"""Manages saved workflows"""
def __init__(self):
"""Initialize Workflow Manager"""
# MongoDB connection
mongodb_uri = os.getenv("MONGODB_URI")
mongodb_db = os.getenv("MONGODB_DB", "masterllm")
if not mongodb_uri:
raise RuntimeError("MONGODB_URI environment variable not set")
self.client = MongoClient(mongodb_uri)
self.db = self.client[mongodb_db]
self.workflows_collection = self.db["workflows"]
# S3 manager
self.s3 = get_s3_manager()
def save_workflow(
self,
session_id: str,
pipeline_definition: Dict[str, Any],
user_message: str,
source_pipeline_id: str = None,
pipeline_status: str = None
) -> str:
"""
Save a pipeline as a workflow
NOTE: NO DEDUPLICATION - always creates a new workflow even if identical
Args:
session_id: Session where workflow was created
pipeline_definition: Full pipeline definition
user_message: User's message when confirming save
source_pipeline_id: Pipeline ID this workflow came from
pipeline_status: Pipeline status when saved ("proposed", "completed")
Returns:
workflow_id: Unique workflow ID
"""
workflow_id = str(uuid.uuid4())
now = datetime.utcnow()
# Store pipeline definition in S3
s3_key = f"workflows/{workflow_id}.json"
self.s3.upload_json(s3_key, pipeline_definition, add_prefix=False)
# Create pipeline preview (component names joined)
components = pipeline_definition.get("components", []) or pipeline_definition.get("pipeline_steps", [])
component_names = [comp.get("tool_name", "unknown") for comp in components]
pipeline_preview = " → ".join(component_names) if component_names else "Empty Pipeline"
# Create workflow record
workflow_record = WorkflowSchema(
workflow_id=workflow_id,
session_id=session_id,
saved_at=now,
saved_by_user_message=user_message,
pipeline_definition_s3_key=s3_key,
pipeline_name=pipeline_definition.get("pipeline_name", "Untitled Workflow"),
pipeline_preview=pipeline_preview,
user_confirmed=True,
source_pipeline_id=source_pipeline_id,
pipeline_status=pipeline_status
)
# Insert into MongoDB
self.workflows_collection.insert_one(schema_to_dict(workflow_record))
return workflow_id
def get_workflows(
self,
limit: int = 100,
skip: int = 0
) -> List[Dict[str, Any]]:
"""
Get all workflows
Args:
limit: Maximum number to return
skip: Number to skip (for pagination)
Returns:
List of workflow records (latest first)
"""
workflows = list(self.workflows_collection.find(
{},
{"_id": 0} # Exclude MongoDB _id
).sort("saved_at", -1).skip(skip).limit(limit))
return workflows
def get_workflow(self, workflow_id: str) -> Optional[Dict[str, Any]]:
"""
Get a specific workflow by ID
Args:
workflow_id: Workflow ID
Returns:
Workflow record with full definition from S3, or None if not found
"""
workflow = self.workflows_collection.find_one(
{"workflow_id": workflow_id},
{"_id": 0}
)
if not workflow:
return None
# Load full pipeline definition from S3
try:
pipeline_definition = self.s3.download_json(
workflow["pipeline_definition_s3_key"],
add_prefix=False
)
workflow["pipeline_definition"] = pipeline_definition
except Exception as e:
workflow["pipeline_definition"] = None
workflow["error"] = f"Failed to load pipeline definition: {str(e)}"
return workflow
def delete_workflow(self, workflow_id: str) -> bool:
"""
Delete a workflow
Args:
workflow_id: Workflow ID
Returns:
True if deleted successfully
"""
workflow = self.workflows_collection.find_one({"workflow_id": workflow_id})
if not workflow:
return False
# Delete from S3
try:
self.s3.delete_object(
workflow["pipeline_definition_s3_key"],
add_prefix=False
)
except Exception:
# Continue even if S3 deletion fails
pass
# Delete from MongoDB
result = self.workflows_collection.delete_one({"workflow_id": workflow_id})
return result.deleted_count > 0
def get_session_workflows(
self,
session_id: str,
limit: int = 10
) -> List[Dict[str, Any]]:
"""
Get workflows created in a specific session
Args:
session_id: Session ID
limit: Maximum number to return
Returns:
List of workflow records
"""
workflows = list(self.workflows_collection.find(
{"session_id": session_id},
{"_id": 0}
).sort("saved_at", -1).limit(limit))
return workflows
def count_workflows(self) -> int:
"""Get total count of workflows"""
return self.workflows_collection.count_documents({})
# Global singleton instance
_workflow_manager_instance: Optional[WorkflowManager] = None
def get_workflow_manager() -> WorkflowManager:
"""Get or create global WorkflowManager instance"""
global _workflow_manager_instance
if _workflow_manager_instance is None:
_workflow_manager_instance = WorkflowManager()
return _workflow_manager_instance
|