ganesh-vilje commited on
Commit
aaaba76
·
1 Parent(s): 6648afb

feat: Unified pipeline lifecycle - single S3 file per pipeline - Pipeline now stored in S3 with pipeline_id at creation - Proposals stored in S3, only reference in MongoDB - Results appended to same S3 file on completion - Workflow save uses pipeline_id, works at any stage - pipelines_history now in session API response

Browse files
api_routes_v2.py CHANGED
@@ -2553,13 +2553,19 @@ def get_session_complete_history(session_id: str, limit: int = 50):
2553
 
2554
  messages = session_manager.get_messages(session_id, limit=limit, include_content=True)
2555
 
 
 
 
 
 
2556
  return {
2557
  "session_id": session_id,
2558
  "chat_name": session.get("chat_name"),
2559
  "created_at": session.get("created_at"),
2560
  "last_activity": session.get("last_activity"),
2561
  "state": session.get("state"),
2562
- "messages": messages
 
2563
  }
2564
  except HTTPException:
2565
  raise
@@ -2584,6 +2590,69 @@ def get_session_pipeline_executions(session_id: str, limit: int = 50):
2584
  raise HTTPException(status_code=500, detail=f"Failed to get pipeline executions: {str(e)}")
2585
 
2586
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2587
  # ========================
2588
  # HEALTH
2589
  # ========================
 
2553
 
2554
  messages = session_manager.get_messages(session_id, limit=limit, include_content=True)
2555
 
2556
+ # V3: Get pipeline history
2557
+ from services.pipeline_manager import get_pipeline_manager
2558
+ pipeline_mgr = get_pipeline_manager()
2559
+ pipelines_history = pipeline_mgr.get_session_pipelines(session_id, limit=50)
2560
+
2561
  return {
2562
  "session_id": session_id,
2563
  "chat_name": session.get("chat_name"),
2564
  "created_at": session.get("created_at"),
2565
  "last_activity": session.get("last_activity"),
2566
  "state": session.get("state"),
2567
+ "messages": messages,
2568
+ "pipelines_history": pipelines_history # NEW: Pipeline history
2569
  }
2570
  except HTTPException:
2571
  raise
 
2590
  raise HTTPException(status_code=500, detail=f"Failed to get pipeline executions: {str(e)}")
2591
 
2592
 
2593
+ # ========================
2594
+ # V3 WORKFLOW SAVE API
2595
+ # ========================
2596
+
2597
+ @router.post("/workflows/save")
2598
+ def save_workflow_from_pipeline(data: dict):
2599
+ """
2600
+ Save workflow from pipeline (works at any stage: proposed, executing, completed)
2601
+ Uses pipeline_id instead of execution_id
2602
+ """
2603
+ pipeline_id = data.get("pipeline_id")
2604
+ workflow_name = data.get("workflow_name") # Optional override
2605
+
2606
+ if not pipeline_id:
2607
+ raise HTTPException(status_code=400, detail="pipeline_id is required")
2608
+
2609
+ try:
2610
+ from services.pipeline_manager import get_pipeline_manager
2611
+ from services.workflow_manager import get_workflow_manager
2612
+
2613
+ pipeline_mgr = get_pipeline_manager()
2614
+ workflow_mgr = get_workflow_manager()
2615
+
2616
+ # Get pipeline metadata
2617
+ pipeline_metadata = pipeline_mgr.get_pipeline_metadata(pipeline_id)
2618
+ if not pipeline_metadata:
2619
+ raise HTTPException(status_code=404, detail="Pipeline not found")
2620
+
2621
+ # Download full pipeline document from S3
2622
+ pipeline_doc = pipeline_mgr.get_full_pipeline_document(pipeline_id)
2623
+ if not pipeline_doc:
2624
+ raise HTTPException(status_code=404, detail="Pipeline document not found in S3")
2625
+
2626
+ # Extract definition
2627
+ pipeline_def = pipeline_doc.get("definition", {})
2628
+
2629
+ # Override name if provided
2630
+ if workflow_name:
2631
+ pipeline_def["pipeline_name"] = workflow_name
2632
+
2633
+ # Save as workflow with source tracking
2634
+ workflow_id = workflow_mgr.save_workflow(
2635
+ session_id=pipeline_doc.get("session_id", "unknown"),
2636
+ pipeline_definition=pipeline_def,
2637
+ user_message=f"Saved from pipeline {pipeline_id}",
2638
+ source_pipeline_id=pipeline_id,
2639
+ pipeline_status=pipeline_doc.get("status", "unknown")
2640
+ )
2641
+
2642
+ return {
2643
+ "workflow_id": workflow_id,
2644
+ "pipeline_name": pipeline_def.get("pipeline_name", "Untitled"),
2645
+ "source_pipeline_id": pipeline_id,
2646
+ "pipeline_status": pipeline_doc.get("status"),
2647
+ "message": "Workflow saved successfully"
2648
+ }
2649
+
2650
+ except HTTPException:
2651
+ raise
2652
+ except Exception as e:
2653
+ raise HTTPException(status_code=500, detail=f"Failed to save workflow: {str(e)}")
2654
+
2655
+
2656
  # ========================
2657
  # HEALTH
2658
  # ========================
app.py CHANGED
@@ -978,29 +978,32 @@ def chatbot_response_streaming(message: str, history: List, session_id: str, fil
978
  workflow_mgr = get_workflow_manager()
979
  s3 = get_s3_manager()
980
 
981
- # Get pipeline from pipelines collection
982
- pipeline_record = pipeline_mgr.get_pipeline(pending_workflow["execution_id"])
 
983
 
984
- if pipeline_record and pipeline_record.get("pipeline_definition_s3_key"):
985
- # Download pipeline definition from S3
986
- pipeline_def = s3.download_json(pipeline_record["pipeline_definition_s3_key"], add_prefix=False)
987
 
988
- # Save as workflow
989
  workflow_id = workflow_mgr.save_workflow(
990
  session_id=session_id,
991
  pipeline_definition=pipeline_def,
992
- user_message=message
 
 
993
  )
994
 
995
  # Clear pending
996
  session_manager.update_session(session_id, {"pending_workflow_save": None})
997
 
998
- response = f"✅ **Workflow Saved!**\n\nWorkflow ID: `{workflow_id}`\nName: {pending_workflow['pipeline_name']}\n\nYou can now reuse this workflow anytime!\n\nWhat else can I help you with?"
999
  session_manager.add_message(session_id, "assistant", response)
1000
  yield format_chat_history(history, message, response)
1001
  return
1002
  else:
1003
- # Pipeline record not found
1004
  session_manager.update_session(session_id, {"pending_workflow_save": None})
1005
  response = "⚠️ Sorry, I couldn't find the pipeline to save. The workflow save request has expired.\n\nWhat else can I help you with?"
1006
  session_manager.add_message(session_id, "assistant", response)
@@ -1043,9 +1046,44 @@ def chatbot_response_streaming(message: str, history: List, session_id: str, fil
1043
  prefer_bedrock=True
1044
  )
1045
 
1046
- # Save proposed pipeline to session
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1047
  session_manager.update_session(session_id, {
1048
- "proposed_pipeline": pipeline,
 
1049
  "state": ConversationState.PIPELINE_PROPOSED
1050
  })
1051
 
@@ -1088,25 +1126,41 @@ Here's what I'll do:
1088
  if "approve" in user_input or "yes" in user_input:
1089
  session_manager.update_session(session_id, {"state": ConversationState.EXECUTING})
1090
 
1091
- plan = session.get("proposed_pipeline", {})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1092
 
1093
- # V3: Initialize execution_id before use (fallback to session_id)
1094
- execution_id = session_id
 
 
 
 
 
 
 
 
1095
 
1096
- # V3: Create pipeline record BEFORE execution
1097
- try:
1098
- pipeline_mgr = get_pipeline_manager()
1099
- execution_id = pipeline_mgr.create_pipeline_record(
1100
- session_id=session_id,
1101
- pipeline_definition=plan,
1102
- created_from="request",
1103
- created_by_message=message
1104
- )
1105
- print(f"✅ Created pipeline record: {execution_id}")
1106
- except Exception as e:
1107
- print(f"⚠️ Failed to create pipeline record: {e}")
1108
- print(f" Using session_id as fallback")
1109
- # execution_id already initialized to session_id
1110
 
1111
  # Initial status - User-friendly
1112
  initial_message = f"✅ **Approved!** Starting execution of: **{plan.get('pipeline_name', 'pipeline')}**\n\n🚀 Processing... please wait...\n_(Using {plan.get('_generator', 'AI')} - {plan.get('_model', 'model')})_"
@@ -1215,17 +1269,42 @@ Here's what I'll do:
1215
  return
1216
 
1217
  # Process final result
1218
- if final_payload:
1219
- # V3: DO NOT store results in session - only in S3 via pipeline_manager
1220
- # The pipeline_manager already handled S3 storage during execution
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1221
  session_manager.update_session(session_id, {
1222
  "state": ConversationState.INITIAL
1223
  })
1224
 
1225
- # V3: Store pending workflow save info
1226
  session_manager.update_session(session_id, {
1227
  "pending_workflow_save": {
1228
- "execution_id": execution_id,
1229
  "pipeline_name": plan.get("pipeline_name", "Untitled")
1230
  }
1231
  })
 
978
  workflow_mgr = get_workflow_manager()
979
  s3 = get_s3_manager()
980
 
981
+ # V3: Get full pipeline document from S3 using pipeline_id
982
+ pipeline_id = pending_workflow.get("pipeline_id")
983
+ pipeline_doc = pipeline_mgr.get_full_pipeline_document(pipeline_id)
984
 
985
+ if pipeline_doc and pipeline_doc.get("definition"):
986
+ # Extract definition from pipeline document
987
+ pipeline_def = pipeline_doc["definition"]
988
 
989
+ # Save as workflow with source tracking
990
  workflow_id = workflow_mgr.save_workflow(
991
  session_id=session_id,
992
  pipeline_definition=pipeline_def,
993
+ user_message=message,
994
+ source_pipeline_id=pipeline_id,
995
+ pipeline_status=pipeline_doc.get("status", "unknown")
996
  )
997
 
998
  # Clear pending
999
  session_manager.update_session(session_id, {"pending_workflow_save": None})
1000
 
1001
+ response = f"✅ **Workflow Saved!**\n\nWorkflow ID: `{workflow_id}`\nName: {pending_workflow['pipeline_name']}\nSource Pipeline: `{pipeline_id[:8]}...`\n\nYou can now reuse this workflow anytime!\n\nWhat else can I help you with?"
1002
  session_manager.add_message(session_id, "assistant", response)
1003
  yield format_chat_history(history, message, response)
1004
  return
1005
  else:
1006
+ # Pipeline document not found
1007
  session_manager.update_session(session_id, {"pending_workflow_save": None})
1008
  response = "⚠️ Sorry, I couldn't find the pipeline to save. The workflow save request has expired.\n\nWhat else can I help you with?"
1009
  session_manager.add_message(session_id, "assistant", response)
 
1046
  prefer_bedrock=True
1047
  )
1048
 
1049
+ # V3: Create pipeline_id and upload to S3
1050
+ pipeline_id = str(uuid.uuid4())
1051
+
1052
+ # Build initial pipeline document
1053
+ pipeline_doc = {
1054
+ "pipeline_id": pipeline_id,
1055
+ "session_id": session_id,
1056
+ "pipeline_name": pipeline.get("pipeline_name"),
1057
+ "status": "proposed",
1058
+ "created_at": datetime.utcnow().isoformat() + "Z",
1059
+ "created_by_message": message,
1060
+ "definition": pipeline,
1061
+ "execution": None,
1062
+ "results": None
1063
+ }
1064
+
1065
+ # Upload to S3
1066
+ from services.s3_manager import get_s3_manager
1067
+ s3 = get_s3_manager()
1068
+ pipeline_s3_key = f"sessions/{session_id}/pipelines/{pipeline_id}.json"
1069
+ s3.upload_json(pipeline_s3_key, pipeline_doc, add_prefix=False)
1070
+
1071
+ # Create metadata in MongoDB
1072
+ from services.pipeline_manager import get_pipeline_manager
1073
+ pipeline_mgr = get_pipeline_manager()
1074
+ pipeline_mgr.create_pipeline_metadata(
1075
+ pipeline_id=pipeline_id,
1076
+ session_id=session_id,
1077
+ pipeline_name=pipeline.get("pipeline_name"),
1078
+ s3_key=pipeline_s3_key,
1079
+ status="proposed",
1080
+ created_by_message=message
1081
+ )
1082
+
1083
+ # Update session with reference only (not full pipeline)
1084
  session_manager.update_session(session_id, {
1085
+ "current_pipeline_id": pipeline_id,
1086
+ "current_pipeline_s3_key": pipeline_s3_key,
1087
  "state": ConversationState.PIPELINE_PROPOSED
1088
  })
1089
 
 
1126
  if "approve" in user_input or "yes" in user_input:
1127
  session_manager.update_session(session_id, {"state": ConversationState.EXECUTING})
1128
 
1129
+ # V3: Get pipeline references from session
1130
+ pipeline_s3_key = session.get("current_pipeline_s3_key")
1131
+ pipeline_id = session.get("current_pipeline_id")
1132
+
1133
+ # Download pipeline from S3
1134
+ from services.s3_manager import get_s3_manager
1135
+ s3 = get_s3_manager()
1136
+ from services.pipeline_manager import get_pipeline_manager
1137
+ pipeline_mgr = get_pipeline_manager()
1138
+
1139
+ if pipeline_s3_key:
1140
+ try:
1141
+ pipeline_doc = s3.download_json(pipeline_s3_key, add_prefix=False)
1142
+ plan = pipeline_doc["definition"]
1143
+ except Exception as e:
1144
+ print(f"⚠️ Failed to download pipeline from S3: {e}")
1145
+ plan = {}
1146
+ pipeline_id = session_id
1147
+ else:
1148
+ # Fallback for old sessions without S3 storage
1149
+ plan = session.get("proposed_pipeline", {})
1150
+ pipeline_id = session_id
1151
 
1152
+ # Update pipeline status to executing
1153
+ if pipeline_s3_key and pipeline_doc:
1154
+ pipeline_doc["status"] = "executing"
1155
+ pipeline_doc["execution"] = {
1156
+ "started_at": datetime.utcnow().isoformat() + "Z",
1157
+ "executor": "unknown",
1158
+ "components_status": []
1159
+ }
1160
+ s3.upload_json(pipeline_s3_key, pipeline_doc, add_prefix=False)
1161
+ pipeline_mgr.update_pipeline_status(pipeline_id, "executing")
1162
 
1163
+ execution_id = pipeline_id
 
 
 
 
 
 
 
 
 
 
 
 
 
1164
 
1165
  # Initial status - User-friendly
1166
  initial_message = f"✅ **Approved!** Starting execution of: **{plan.get('pipeline_name', 'pipeline')}**\n\n🚀 Processing... please wait...\n_(Using {plan.get('_generator', 'AI')} - {plan.get('_model', 'model')})_"
 
1269
  return
1270
 
1271
  # Process final result
1272
+ # V3: Update pipeline document with results in S3
1273
+ if pipeline_s3_key:
1274
+ try:
1275
+ pipeline_doc = s3.download_json(pipeline_s3_key, add_prefix=False)
1276
+ pipeline_doc["status"] = "completed"
1277
+ if pipeline_doc.get("execution"):
1278
+ pipeline_doc["execution"]["completed_at"] = datetime.utcnow().isoformat() + "Z"
1279
+ pipeline_doc["execution"]["executor"] = executor_used
1280
+ pipeline_doc["results"] = {
1281
+ "final_output_url": final_payload.get("final_output_url"),
1282
+ "final_output_expires_at": final_payload.get("final_output_expires_at"),
1283
+ "components_executed": final_payload.get("components_executed"),
1284
+ "last_node_output": final_payload.get("last_node_output"),
1285
+ "workflow_status": "completed"
1286
+ }
1287
+ s3.upload_json(pipeline_s3_key, pipeline_doc, add_prefix=False)
1288
+
1289
+ # Update MongoDB metadata
1290
+ pipeline_mgr.update_pipeline_status(
1291
+ pipeline_id,
1292
+ "completed",
1293
+ final_output_url=final_payload.get("final_output_url"),
1294
+ final_output_expires_at=final_payload.get("final_output_expires_at")
1295
+ )
1296
+ except Exception as e:
1297
+ print(f"⚠️ Failed to update pipeline document: {e}")
1298
+
1299
+ # Update session state
1300
  session_manager.update_session(session_id, {
1301
  "state": ConversationState.INITIAL
1302
  })
1303
 
1304
+ # V3: Store pending workflow save info with pipeline_id
1305
  session_manager.update_session(session_id, {
1306
  "pending_workflow_save": {
1307
+ "pipeline_id": pipeline_id,
1308
  "pipeline_name": plan.get("pipeline_name", "Untitled")
1309
  }
1310
  })
services/pipeline_manager.py CHANGED
@@ -41,6 +41,82 @@ class PipelineManager:
41
  # S3 manager
42
  self.s3 = get_s3_manager()
43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
44
  def create_pipeline_record(
45
  self,
46
  session_id: str,
 
41
  # S3 manager
42
  self.s3 = get_s3_manager()
43
 
44
+
45
+ def create_pipeline_metadata(
46
+ self,
47
+ pipeline_id: str,
48
+ session_id: str,
49
+ pipeline_name: str,
50
+ s3_key: str,
51
+ status: str = "proposed",
52
+ created_by_message: str = ""
53
+ ) -> bool:
54
+ """Create metadata record for pipeline (MongoDB only). Full document in S3."""
55
+ now = datetime.utcnow()
56
+
57
+ metadata = {
58
+ "pipeline_id": pipeline_id,
59
+ "session_id": session_id,
60
+ "pipeline_name": pipeline_name,
61
+ "status": status,
62
+ "s3_key": s3_key,
63
+ "final_output_url": None,
64
+ "created_at": now.isoformat() + "Z",
65
+ "updated_at": now.isoformat() + "Z",
66
+ "created_by_message": created_by_message
67
+ }
68
+
69
+ self.pipelines_collection.insert_one(metadata)
70
+ return True
71
+
72
+ def update_pipeline_status(
73
+ self,
74
+ pipeline_id: str,
75
+ status: str,
76
+ final_output_url: str = None,
77
+ final_output_expires_at: str = None
78
+ ) -> bool:
79
+ """Update pipeline status in metadata."""
80
+ update_data = {
81
+ "status": status,
82
+ "updated_at": datetime.utcnow().isoformat() + "Z"
83
+ }
84
+
85
+ if final_output_url:
86
+ update_data["final_output_url"] = final_output_url
87
+ if final_output_expires_at:
88
+ update_data["final_output_expires_at"] = final_output_expires_at
89
+
90
+ result = self.pipelines_collection.update_one(
91
+ {"pipeline_id": pipeline_id},
92
+ {"$set": update_data}
93
+ )
94
+ return result.modified_count > 0
95
+
96
+ def get_pipeline_metadata(self, pipeline_id: str) -> Optional[Dict[str, Any]]:
97
+ """Get pipeline metadata by ID."""
98
+ return self.pipelines_collection.find_one(
99
+ {"pipeline_id": pipeline_id},
100
+ {"_id": 0}
101
+ )
102
+
103
+ def get_full_pipeline_document(self, pipeline_id: str) -> Optional[Dict[str, Any]]:
104
+ """Get full pipeline document from S3 via metadata lookup."""
105
+ metadata = self.get_pipeline_metadata(pipeline_id)
106
+ if not metadata:
107
+ return None
108
+
109
+ s3_key = metadata.get("s3_key")
110
+ if not s3_key:
111
+ return None
112
+
113
+ try:
114
+ return self.s3.download_json(s3_key, add_prefix=False)
115
+ except Exception as e:
116
+ print(f"⚠️ Failed to download pipeline document: {e}")
117
+ return None
118
+
119
+
120
  def create_pipeline_record(
121
  self,
122
  session_id: str,
services/schemas.py CHANGED
@@ -153,6 +153,8 @@ class WorkflowSchema(BaseModel):
153
  pipeline_preview: str # "Extract text → Summarize → Translate"
154
  user_confirmed: bool = True # User explicitly confirmed save
155
  tags: List[str] = Field(default_factory=list)
 
 
156
  metadata: Dict[str, Any] = Field(default_factory=dict)
157
 
158
  class Config:
 
153
  pipeline_preview: str # "Extract text → Summarize → Translate"
154
  user_confirmed: bool = True # User explicitly confirmed save
155
  tags: List[str] = Field(default_factory=list)
156
+ source_pipeline_id: Optional[str] = None # Pipeline ID this workflow came from
157
+ pipeline_status: Optional[str] = None # Status when saved: "proposed", "completed"
158
  metadata: Dict[str, Any] = Field(default_factory=dict)
159
 
160
  class Config:
services/workflow_manager.py CHANGED
@@ -40,7 +40,9 @@ class WorkflowManager:
40
  self,
41
  session_id: str,
42
  pipeline_definition: Dict[str, Any],
43
- user_message: str
 
 
44
  ) -> str:
45
  """
46
  Save a pipeline as a workflow
@@ -51,6 +53,8 @@ class WorkflowManager:
51
  session_id: Session where workflow was created
52
  pipeline_definition: Full pipeline definition
53
  user_message: User's message when confirming save
 
 
54
 
55
  Returns:
56
  workflow_id: Unique workflow ID
@@ -76,7 +80,9 @@ class WorkflowManager:
76
  pipeline_definition_s3_key=s3_key,
77
  pipeline_name=pipeline_definition.get("pipeline_name", "Untitled Workflow"),
78
  pipeline_preview=pipeline_preview,
79
- user_confirmed=True
 
 
80
  )
81
 
82
  # Insert into MongoDB
 
40
  self,
41
  session_id: str,
42
  pipeline_definition: Dict[str, Any],
43
+ user_message: str,
44
+ source_pipeline_id: str = None,
45
+ pipeline_status: str = None
46
  ) -> str:
47
  """
48
  Save a pipeline as a workflow
 
53
  session_id: Session where workflow was created
54
  pipeline_definition: Full pipeline definition
55
  user_message: User's message when confirming save
56
+ source_pipeline_id: Pipeline ID this workflow came from
57
+ pipeline_status: Pipeline status when saved ("proposed", "completed")
58
 
59
  Returns:
60
  workflow_id: Unique workflow ID
 
80
  pipeline_definition_s3_key=s3_key,
81
  pipeline_name=pipeline_definition.get("pipeline_name", "Untitled Workflow"),
82
  pipeline_preview=pipeline_preview,
83
+ user_confirmed=True,
84
+ source_pipeline_id=source_pipeline_id,
85
+ pipeline_status=pipeline_status
86
  )
87
 
88
  # Insert into MongoDB