ganesh-vilje commited on
Commit
f266ceb
Β·
1 Parent(s): 2534de8

feat: Implement remaining V3 architecture fixes

Browse files

Fixes 2, 3, and 5 implemented in app.py:

Pipeline Record Creation (Fix 2):
- Import pipeline_manager and workflow_manager
- Create pipeline record BEFORE execution starts
- Use execution_id instead of session_id for tracking

S3-Only Result Storage (Fix 3):
- Removed pipeline_result storage in session documents
- Removed deprecated save_pipeline_execution() calls
- Results now stored only in S3 via pipeline_manager
- MongoDB pipelines collection properly populated

Workflow Save Prompting (Fix 5):
- Added prompt after successful execution
- Stores pending_workflow_save in session
- Detects user confirmation (save/yes/sure/ok)
- Saves workflow to workflows collection + S3
- Clears pending state after save

Component Status Handling:
- Added component_status event type handling
- Displays component success messages to user
- User-friendly emoji status indicators

Chat Name Fix:
- Only generates once per session (already committed)

Remaining:
- Fix 4: Gradio file upload integration (separate commit)

Files changed (1) hide show
  1. app.py +101 -12
app.py CHANGED
@@ -670,6 +670,10 @@ from services.intent_classifier import intent_classifier
670
  from api_routes import router as api_router
671
  from api_routes_v2 import router as api_router_v2
672
 
 
 
 
 
673
 
674
  # ========================
675
  # BACKGROUND CLEANUP TASK
@@ -859,6 +863,54 @@ def chatbot_response_streaming(message: str, history: List, session_id: str, fil
859
  yield format_chat_history(history, message, friendly_response)
860
  return
861
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
862
  # ========================
863
  # STATE: INITIAL - Generate Pipeline ONLY if intent requires it
864
  # ========================
@@ -961,7 +1013,35 @@ Here's what I'll do:
961
  accumulated_response = initial_message + "\n\n" + "\n".join(progress_messages)
962
  yield format_chat_history(history, message, accumulated_response)
963
 
964
- # Step updates
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
965
  elif event_type == "step":
966
  step_info = {
967
  "step": event.get("step", 0),
@@ -1015,22 +1095,28 @@ Here's what I'll do:
1015
 
1016
  # Process final result
1017
  if final_payload:
 
 
1018
  session_manager.update_session(session_id, {
1019
- "pipeline_result": final_payload,
1020
  "state": ConversationState.INITIAL
1021
  })
1022
 
1023
- # Save execution to MongoDB
1024
- session_manager.save_pipeline_execution(
1025
- session_id=session_id,
1026
- pipeline=plan,
1027
- result=final_payload,
1028
- file_path=session.get("current_file"),
1029
- executor=executor_used
1030
- )
 
1031
 
1032
- # Create user-friendly final response
1033
  success_count = len([s for s in steps_completed if s.get("status") == "completed"])
 
 
 
 
1034
  friendly_final = f"""πŸŽ‰ **Pipeline Completed Successfully!**
1035
 
1036
  **Summary:**
@@ -1039,7 +1125,10 @@ Here's what I'll do:
1039
  - Successful: {success_count}
1040
  - Executor: {executor_used}
1041
 
1042
- βœ… All done! What else would you like me to help you with?
 
 
 
1043
 
1044
  <details>
1045
  <summary>πŸ“Š Detailed Results (for developers)</summary>
 
670
  from api_routes import router as api_router
671
  from api_routes_v2 import router as api_router_v2
672
 
673
+ # V3 Architecture managers
674
+ from services.pipeline_manager import get_pipeline_manager
675
+ from services.workflow_manager import get_workflow_manager
676
+
677
 
678
  # ========================
679
  # BACKGROUND CLEANUP TASK
 
863
  yield format_chat_history(history, message, friendly_response)
864
  return
865
 
866
+ # ========================
867
+ # HANDLE WORKFLOW SAVE REQUEST (V3)
868
+ # ========================
869
+ pending_workflow = session.get("pending_workflow_save")
870
+ if pending_workflow and any(phrase in message.lower() for phrase in ["save", "yes", "sure", "ok"]):
871
+ try:
872
+ from services.s3_manager import get_s3_manager
873
+
874
+ pipeline_mgr = get_pipeline_manager()
875
+ workflow_mgr = get_workflow_manager()
876
+ s3 = get_s3_manager()
877
+
878
+ # Get pipeline from pipelines collection
879
+ pipeline_record = pipeline_mgr.get_pipeline(pending_workflow["execution_id"])
880
+
881
+ if pipeline_record and pipeline_record.get("pipeline_definition_s3_key"):
882
+ # Download pipeline definition from S3
883
+ pipeline_def = s3.download_json(pipeline_record["pipeline_definition_s3_key"], add_prefix=False)
884
+
885
+ # Save as workflow
886
+ workflow_id = workflow_mgr.save_workflow(
887
+ session_id=session_id,
888
+ pipeline_definition=pipeline_def,
889
+ user_message=message
890
+ )
891
+
892
+ # Clear pending
893
+ session_manager.update_session(session_id, {"pending_workflow_save": None})
894
+
895
+ response = f"βœ… **Workflow Saved!**\n\nWorkflow ID: `{workflow_id}`\nName: {pending_workflow['pipeline_name']}\n\nYou can now reuse this workflow anytime!\n\nWhat else can I help you with?"
896
+ session_manager.add_message(session_id, "assistant", response)
897
+ yield format_chat_history(history, message, response)
898
+ return
899
+ else:
900
+ # Pipeline record not found
901
+ session_manager.update_session(session_id, {"pending_workflow_save": None})
902
+ response = "⚠️ Sorry, I couldn't find the pipeline to save. The workflow save request has expired.\n\nWhat else can I help you with?"
903
+ session_manager.add_message(session_id, "assistant", response)
904
+ yield format_chat_history(history, message, response)
905
+ return
906
+
907
+ except Exception as e:
908
+ session_manager.update_session(session_id, {"pending_workflow_save": None})
909
+ response = f"❌ Failed to save workflow: {str(e)}\n\nWhat would you like to do next?"
910
+ session_manager.add_message(session_id, "assistant", response)
911
+ yield format_chat_history(history, message, response)
912
+ return
913
+
914
  # ========================
915
  # STATE: INITIAL - Generate Pipeline ONLY if intent requires it
916
  # ========================
 
1013
  accumulated_response = initial_message + "\n\n" + "\n".join(progress_messages)
1014
  yield format_chat_history(history, message, accumulated_response)
1015
 
1016
+ # Component status updates (V3)
1017
+ elif event_type == "component_status":
1018
+ step_info = {
1019
+ "step": event.get("step", 0),
1020
+ "component": event.get("component", "processing"),
1021
+ "status": event.get("status", "running"),
1022
+ "message": event.get("message", ""),
1023
+ "executor": event.get("executor", "unknown")
1024
+ }
1025
+
1026
+ if "observation" in event:
1027
+ step_info["observation"] = event.get("observation")
1028
+
1029
+ steps_completed.append(step_info)
1030
+ executor_used = event.get("executor", executor_used)
1031
+
1032
+ # User-friendly status
1033
+ status_emoji = "βœ…" if event.get('status') == 'completed' else "⏳"
1034
+ component_msg = event.get('message', '')
1035
+ status_line = f"{status_emoji} Step {event.get('step', 0)}: {event.get('component', 'processing')}"
1036
+ if component_msg:
1037
+ status_line += f" - {component_msg}"
1038
+
1039
+ progress_messages.append(status_line)
1040
+ current_progress = "\n".join(progress_messages[-5:]) # Last 5 steps
1041
+ accumulated = initial_message + "\n\n" + current_progress
1042
+ yield format_chat_history(history, message, accumulated)
1043
+
1044
+ # Legacy step updates (backwards compatibility)
1045
  elif event_type == "step":
1046
  step_info = {
1047
  "step": event.get("step", 0),
 
1095
 
1096
  # Process final result
1097
  if final_payload:
1098
+ # V3: DO NOT store results in session - only in S3 via pipeline_manager
1099
+ # The pipeline_manager already handled S3 storage during execution
1100
  session_manager.update_session(session_id, {
 
1101
  "state": ConversationState.INITIAL
1102
  })
1103
 
1104
+ # V3: Store pending workflow save info
1105
+ session_manager.update_session(session_id, {
1106
+ "pending_workflow_save": {
1107
+ "execution_id": execution_id,
1108
+ "pipeline_name": plan.get("pipeline_name", "Untitled")
1109
+ }
1110
+ })
1111
+
1112
+ # REMOVED: Deprecated save_pipeline_execution - now handled by pipeline_manager
1113
 
1114
+ # Create user-friendly final response with workflow save prompt
1115
  success_count = len([s for s in steps_completed if s.get("status") == "completed"])
1116
+
1117
+ # V3: Check if we have S3 final output URL
1118
+ final_output_url = final_payload.get("final_output_url", "")
1119
+
1120
  friendly_final = f"""πŸŽ‰ **Pipeline Completed Successfully!**
1121
 
1122
  **Summary:**
 
1125
  - Successful: {success_count}
1126
  - Executor: {executor_used}
1127
 
1128
+ πŸ’Ύ **Would you like to save this workflow for future use?**
1129
+ Type **'save workflow'** or **'yes'** to save it.
1130
+
1131
+ βœ… All done! What else can I help you with?
1132
 
1133
  <details>
1134
  <summary>πŸ“Š Detailed Results (for developers)</summary>