File size: 6,714 Bytes
f0db057
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aaaba76
 
 
f0db057
 
 
 
 
 
 
 
 
 
aaaba76
 
f0db057
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aaaba76
 
 
f0db057
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# services/workflow_manager.py
"""
Workflow Manager for MasterLLM V3 Architecture

Manages saved workflows:
- Save user-approved pipelines to workflows collection
- NO deduplication (allow duplicates)
- Store workflow definitions in S3
- Retrieve and manage saved workflows
"""
import uuid
from typing import Dict, Any, List, Optional
from datetime import datetime
from pymongo import MongoClient
from services.s3_manager import get_s3_manager
from services.schemas import WorkflowSchema, schema_to_dict
import os


class WorkflowManager:
    """Manages saved workflows"""
    
    def __init__(self):
        """Initialize Workflow Manager"""
        # MongoDB connection
        mongodb_uri = os.getenv("MONGODB_URI")
        mongodb_db = os.getenv("MONGODB_DB", "masterllm")
        
        if not mongodb_uri:
            raise RuntimeError("MONGODB_URI environment variable not set")
        
        self.client = MongoClient(mongodb_uri)
        self.db = self.client[mongodb_db]
        self.workflows_collection = self.db["workflows"]
        
        # S3 manager
        self.s3 = get_s3_manager()
    
    def save_workflow(
        self,
        session_id: str,
        pipeline_definition: Dict[str, Any],
        user_message: str,
        source_pipeline_id: str = None,
        pipeline_status: str = None
    ) -> str:
        """
        Save a pipeline as a workflow
        
        NOTE: NO DEDUPLICATION - always creates a new workflow even if identical
        
        Args:
            session_id: Session where workflow was created
            pipeline_definition: Full pipeline definition
            user_message: User's message when confirming save
            source_pipeline_id: Pipeline ID this workflow came from
            pipeline_status: Pipeline status when saved ("proposed", "completed")
        
        Returns:
            workflow_id: Unique workflow ID
        """
        workflow_id = str(uuid.uuid4())
        now = datetime.utcnow()
        
        # Store pipeline definition in S3
        s3_key = f"workflows/{workflow_id}.json"
        self.s3.upload_json(s3_key, pipeline_definition, add_prefix=False)
        
        # Create pipeline preview (component names joined)
        components = pipeline_definition.get("components", []) or pipeline_definition.get("pipeline_steps", [])
        component_names = [comp.get("tool_name", "unknown") for comp in components]
        pipeline_preview = " → ".join(component_names) if component_names else "Empty Pipeline"
        
        # Create workflow record
        workflow_record = WorkflowSchema(
            workflow_id=workflow_id,
            session_id=session_id,
            saved_at=now,
            saved_by_user_message=user_message,
            pipeline_definition_s3_key=s3_key,
            pipeline_name=pipeline_definition.get("pipeline_name", "Untitled Workflow"),
            pipeline_preview=pipeline_preview,
            user_confirmed=True,
            source_pipeline_id=source_pipeline_id,
            pipeline_status=pipeline_status
        )
        
        # Insert into MongoDB
        self.workflows_collection.insert_one(schema_to_dict(workflow_record))
        
        return workflow_id
    
    def get_workflows(
        self,
        limit: int = 100,
        skip: int = 0
    ) -> List[Dict[str, Any]]:
        """
        Get all workflows
        
        Args:
            limit: Maximum number to return
            skip: Number to skip (for pagination)
        
        Returns:
            List of workflow records (latest first)
        """
        workflows = list(self.workflows_collection.find(
            {},
            {"_id": 0}  # Exclude MongoDB _id
        ).sort("saved_at", -1).skip(skip).limit(limit))
        
        return workflows
    
    def get_workflow(self, workflow_id: str) -> Optional[Dict[str, Any]]:
        """
        Get a specific workflow by ID
        
        Args:
            workflow_id: Workflow ID
        
        Returns:
            Workflow record with full definition from S3, or None if not found
        """
        workflow = self.workflows_collection.find_one(
            {"workflow_id": workflow_id},
            {"_id": 0}
        )
        
        if not workflow:
            return None
        
        # Load full pipeline definition from S3
        try:
            pipeline_definition = self.s3.download_json(
                workflow["pipeline_definition_s3_key"],
                add_prefix=False
            )
            workflow["pipeline_definition"] = pipeline_definition
        except Exception as e:
            workflow["pipeline_definition"] = None
            workflow["error"] = f"Failed to load pipeline definition: {str(e)}"
        
        return workflow
    
    def delete_workflow(self, workflow_id: str) -> bool:
        """
        Delete a workflow
        
        Args:
            workflow_id: Workflow ID
        
        Returns:
            True if deleted successfully
        """
        workflow = self.workflows_collection.find_one({"workflow_id": workflow_id})
        
        if not workflow:
            return False
        
        # Delete from S3
        try:
            self.s3.delete_object(
                workflow["pipeline_definition_s3_key"],
                add_prefix=False
            )
        except Exception:
            # Continue even if S3 deletion fails
            pass
        
        # Delete from MongoDB
        result = self.workflows_collection.delete_one({"workflow_id": workflow_id})
        
        return result.deleted_count > 0
    
    def get_session_workflows(
        self,
        session_id: str,
        limit: int = 10
    ) -> List[Dict[str, Any]]:
        """
        Get workflows created in a specific session
        
        Args:
            session_id: Session ID
            limit: Maximum number to return
        
        Returns:
            List of workflow records
        """
        workflows = list(self.workflows_collection.find(
            {"session_id": session_id},
            {"_id": 0}
        ).sort("saved_at", -1).limit(limit))
        
        return workflows
    
    def count_workflows(self) -> int:
        """Get total count of workflows"""
        return self.workflows_collection.count_documents({})


# Global singleton instance
_workflow_manager_instance: Optional[WorkflowManager] = None


def get_workflow_manager() -> WorkflowManager:
    """Get or create global WorkflowManager instance"""
    global _workflow_manager_instance
    
    if _workflow_manager_instance is None:
        _workflow_manager_instance = WorkflowManager()
    
    return _workflow_manager_instance