redhairedshanks1 commited on
Commit
896941f
Β·
1 Parent(s): 09168dc

adding pipeline status into the message response

Browse files
Files changed (1) hide show
  1. api_routes_v2.py +247 -125
api_routes_v2.py CHANGED
@@ -645,13 +645,24 @@ def _add_and_mirror_message(
645
  content: str,
646
  *,
647
  pipeline_id: Optional[str] = None,
 
648
  pipeline_status: Optional[str] = None,
649
  pipeline_result: Optional[dict] = None,
650
  file_metadata: Optional[dict] = None,
651
  ):
652
  """
653
  V3 RULE: Append message to S3 conversation.
654
- Now includes pipeline results in assistant messages.
 
 
 
 
 
 
 
 
 
 
655
  """
656
  # 1. Load existing
657
  current_messages = _load_conversation_from_s3(chat_id)
@@ -663,34 +674,44 @@ def _add_and_mirror_message(
663
  "timestamp": datetime.utcnow().isoformat() + "Z"
664
  }
665
 
666
- # NEW: Add pipeline results if available (for assistant messages)
667
- if role == "assistant" and pipeline_result:
668
- # Check if this is a pipeline completion message
669
- if "pipeline completed" in content.lower() or "pipeline failed" in content.lower():
670
- # Extract the text/result from pipeline
671
- execution_results = pipeline_result.get("execution_results", {})
672
-
673
- # Try to extract the main result text
 
 
 
 
 
 
 
 
 
674
  result_text = ""
 
675
 
 
676
  # Check for image descriptions
677
  if execution_results.get("image_descriptions"):
678
  image_desc = execution_results["image_descriptions"]
679
  if image_desc.get("result") and isinstance(image_desc["result"], list) and len(image_desc["result"]) > 0:
680
  page_result = image_desc["result"][0]
681
- # Use Gemini description if available
682
  if page_result.get("gemini", {}).get("description"):
683
  result_text = page_result["gemini"]["description"]
684
  elif page_result.get("mistral", {}).get("description"):
685
  result_text = page_result["mistral"]["description"]
686
 
687
- # Check for extracted text (for text extraction pipelines)
688
  elif execution_results.get("text"):
689
  result_text = execution_results["text"]
690
 
691
  # Check for component results
692
  elif execution_results.get("components_executed"):
693
- for comp in execution_results["components_executed"]:
694
  comp_result = comp.get("result", {})
695
  if comp_result.get("text"):
696
  result_text = comp_result["text"]
@@ -706,20 +727,26 @@ def _add_and_mirror_message(
706
  result_text = page_result["mistral"]["description"]
707
  break
708
 
709
- # Add result to message if we found something
710
- if result_text:
711
- new_msg["result"] = {
712
- "text": result_text,
713
- "pipeline_id": pipeline_result.get("pipeline_id"),
714
- "status": pipeline_result.get("status", "completed")
715
- }
716
- elif pipeline_result.get("error"):
717
- # For error cases
718
- new_msg["result"] = {
719
- "error": pipeline_result.get("error"),
720
- "pipeline_id": pipeline_result.get("pipeline_id"),
721
- "status": "failed"
722
- }
 
 
 
 
 
 
723
 
724
  # Add file metadata if provided
725
  if file_metadata:
@@ -750,22 +777,45 @@ def _assistant_response_payload(
750
  final_output: Optional[Dict[str, Any]] = None,
751
  exception: Optional[str] = None,
752
  pipeline_result: Optional[Dict[str, Any]] = None,
753
- pipeline_id: Optional[str] = None, # βœ… ADD THIS PARAMETER
 
754
  ) -> ChatResponse:
755
  """
756
  Create ChatResponse payload with all required fields.
 
 
 
 
 
 
 
 
 
 
 
 
 
757
  """
758
  # Generate message_id for assistant response
759
  from services.schemas import generate_message_id
760
  message_id = generate_message_id()
761
 
762
- # Persist assistant message to S3 WITH pipeline results AND pipeline_id
 
 
 
 
 
 
 
763
  _add_and_mirror_message(
764
  chat_id=chat_id,
765
  role="assistant",
766
  content=friendly_response,
767
- pipeline_result=pipeline_result, # Pass pipeline results
768
- pipeline_id=pipeline_id # βœ… PASS THE PIPELINE_ID HERE
 
 
769
  )
770
 
771
  # Get file metadata from session
@@ -1023,10 +1073,11 @@ async def get_session_history(
1023
  ):
1024
  """
1025
  Get conversation history for a session.
1026
- Correct behavior:
1027
- - Preserve pipeline_id on messages
1028
- - Enrich ONLY using stored pipeline_id
1029
- - No inference, no guessing, no context scanning
 
1030
  """
1031
  try:
1032
  # Load conversation history from S3
@@ -1037,58 +1088,66 @@ async def get_session_history(
1037
  for msg in history:
1038
  msg_copy = msg.copy()
1039
 
1040
- # βœ… ALWAYS pass through pipeline_id if present
1041
- pipeline_id = msg.get("pipeline_id")
1042
- if pipeline_id:
1043
- msg_copy["pipeline_id"] = pipeline_id
1044
 
1045
- role = msg_copy.get("role")
1046
- content = msg_copy.get("content", "")
 
1047
 
1048
- # βœ… Enrich ONLY pipeline completion messages WITH pipeline_id
 
 
 
 
1049
  if (
1050
- role == "assistant"
1051
- and "Pipeline completed" in content
1052
- and pipeline_id
1053
  ):
 
1054
  try:
1055
  s3_key = f"{S3_PREFIX}/pipelines/{pipeline_id}.json"
1056
  resp = s3.get_object(Bucket=S3_BUCKET, Key=s3_key)
1057
- pipeline_data = json.loads(
1058
- resp["Body"].read().decode("utf-8")
1059
- )
1060
 
1061
- # Attach final output if present
1062
  if "result" in pipeline_data:
1063
- msg_copy["final_output"] = pipeline_data["result"]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1064
 
1065
- # Attach pipeline metadata
 
 
 
 
 
 
1066
  msg_copy["pipeline_metadata"] = {
1067
  "pipeline_id": pipeline_id,
1068
- "pipeline_name": pipeline_data.get(
1069
- "pipeline_name", "unknown"
1070
- ),
1071
  "status": pipeline_data.get("status", "unknown"),
1072
- "executed_at": pipeline_data.get(
1073
- "updated_at",
1074
- pipeline_data.get("created_at")
1075
- )
1076
  }
1077
-
1078
- except Exception as e:
1079
- # Non-fatal enrichment failure
1080
- msg_copy["final_output_error"] = str(e)
1081
-
1082
- # Ensure message_id exists
1083
- if "message_id" not in msg_copy:
1084
- from services.schemas import generate_message_id
1085
- msg_copy["message_id"] = generate_message_id()
1086
-
1087
- # Normalize timestamp
1088
- if "timestamp" in msg_copy and isinstance(
1089
- msg_copy["timestamp"], datetime
1090
- ):
1091
- msg_copy["timestamp"] = msg_copy["timestamp"].isoformat()
1092
 
1093
  enhanced_history.append(msg_copy)
1094
 
@@ -1099,7 +1158,7 @@ async def get_session_history(
1099
  # Load session metadata
1100
  session = session_manager.get_session(session_id) or {}
1101
 
1102
- # ----- PIPELINES HISTORY (unchanged but cleaned) -----
1103
  pipelines_hist = session.get("pipelines_history", [])
1104
  enhanced_pipelines = []
1105
 
@@ -1113,13 +1172,8 @@ async def get_session_history(
1113
  pipeline_s3_key = pipeline_meta.get("pipeline_s3_key")
1114
  if pipeline_s3_key:
1115
  try:
1116
- resp = s3.get_object(
1117
- Bucket=S3_BUCKET,
1118
- Key=pipeline_s3_key
1119
- )
1120
- pipeline_def = json.loads(
1121
- resp["Body"].read().decode("utf-8")
1122
- )
1123
 
1124
  components = (
1125
  pipeline_def.get("components")
@@ -1141,16 +1195,13 @@ async def get_session_history(
1141
  enhanced_pipe["component_count"] = 0
1142
  enhanced_pipe["tools"] = []
1143
  enhanced_pipe["load_error"] = str(e)
1144
-
1145
  else:
1146
  enhanced_pipe["components"] = []
1147
  enhanced_pipe["component_count"] = 0
1148
  enhanced_pipe["tools"] = []
1149
 
1150
  if "hasError" not in enhanced_pipe:
1151
- enhanced_pipe["hasError"] = (
1152
- enhanced_pipe.get("status") == "failed"
1153
- )
1154
 
1155
  enhanced_pipelines.append(enhanced_pipe)
1156
 
@@ -1415,6 +1466,9 @@ async def chat_unified(
1415
  file_ref = session.get("current_file")
1416
  local_path, cleanup = download_to_temp_file(file_ref)
1417
  session_manager.update_session(chat_id, {"state": "executing"})
 
 
 
1418
 
1419
  try:
1420
  result = execute_pipeline(
@@ -1435,9 +1489,7 @@ async def chat_unified(
1435
  is_success = (pipeline_status == "completed" and completed_steps == total_steps and not has_error)
1436
 
1437
  # V3: Update pipeline status in S3
1438
- pipeline_id = proposed.get("pipeline_id")
1439
  if pipeline_id:
1440
- # Mark as failed if not successful, otherwise completed
1441
  final_status = "completed" if is_success else "failed"
1442
  _update_pipeline_status(pipeline_id, chat_id, final_status, result=result)
1443
  _record_model_attribution(
@@ -1496,7 +1548,7 @@ async def chat_unified(
1496
  "pipeline": api_pipeline
1497
  }
1498
 
1499
- # Return response with pipeline result included
1500
  return _assistant_response_payload(
1501
  chat_id=chat_id,
1502
  friendly_response=friendly,
@@ -1506,8 +1558,9 @@ async def chat_unified(
1506
  output=output,
1507
  final_output=final_output,
1508
  exception=exception_msg,
1509
- pipeline_result=result, # NEW: Pass the pipeline results
1510
- pipeline_id=pipeline_id # βœ… PASS pipeline_id HERE
 
1511
  )
1512
  else:
1513
  # Pipeline failed or partially completed
@@ -1523,7 +1576,6 @@ async def chat_unified(
1523
  })
1524
 
1525
  if failed_components:
1526
- # Show specific component error
1527
  first_error = failed_components[0]
1528
  friendly = f"❌ Pipeline failed: {first_error['tool_name']} - {first_error['error']}"
1529
  else:
@@ -1533,20 +1585,20 @@ async def chat_unified(
1533
  "component_summary": f"Pipeline {pipeline_status}",
1534
  "steps": total_steps,
1535
  "completed": completed_steps,
1536
- "failed": total_steps - completed_steps
 
1537
  }
1538
  final_output = {"text": f"Pipeline execution {pipeline_status} with {completed_steps}/{total_steps} steps completed"}
1539
  api_type = "pipeline_failed" if pipeline_status == "failed" else "pipeline_partial"
1540
  exception_msg = error_msg
1541
 
1542
- # Build api_response data for failure
1543
  api_data = {
1544
  "type": api_type,
1545
  "result": result,
1546
  "pipeline": proposed
1547
  }
1548
 
1549
- # Return response with pipeline result included (even for failures)
1550
  return _assistant_response_payload(
1551
  chat_id=chat_id,
1552
  friendly_response=friendly,
@@ -1556,12 +1608,12 @@ async def chat_unified(
1556
  output=output,
1557
  final_output=final_output,
1558
  exception=exception_msg,
1559
- pipeline_result=result, # NEW: Pass results even for failures
1560
- pipeline_id=pipeline_id # βœ… PASS pipeline_id HERE
 
1561
  )
1562
  except Exception as e:
1563
  session_manager.update_session(chat_id, {"state": "initial"})
1564
- pipeline_id = proposed.get("pipeline_id")
1565
  if pipeline_id:
1566
  _update_pipeline_status(pipeline_id, chat_id, "failed", result={"error": str(e)})
1567
 
@@ -1573,7 +1625,6 @@ async def chat_unified(
1573
  pipeline_mgr = get_pipeline_manager()
1574
  pipeline_record = pipeline_mgr.get_pipeline(pipeline_id)
1575
  if pipeline_record and pipeline_record.get("components"):
1576
- # Find first failed component
1577
  for comp in pipeline_record.get("components", []):
1578
  if comp.get("hasError") or comp.get("status") == "failed":
1579
  failed_component = {
@@ -1601,13 +1652,7 @@ async def chat_unified(
1601
  if failed_component:
1602
  api_data["failed_component"] = failed_component
1603
 
1604
- # Create error result for the response
1605
- error_result = {
1606
- "error": str(e),
1607
- "pipeline_id": pipeline_id,
1608
- "status": "failed"
1609
- }
1610
-
1611
  return _assistant_response_payload(
1612
  chat_id=chat_id,
1613
  friendly_response=friendly,
@@ -1615,8 +1660,9 @@ async def chat_unified(
1615
  api_data=api_data,
1616
  state="initial",
1617
  exception=str(e),
1618
- pipeline_result=error_result, # NEW: Include error in pipeline_result
1619
- pipeline_id=pipeline_id # βœ… PASS pipeline_id HERE
 
1620
  )
1621
  finally:
1622
  try:
@@ -1726,13 +1772,16 @@ async def chat_unified(
1726
  ]
1727
  }
1728
 
 
1729
  return _assistant_response_payload(
1730
  chat_id=chat_id,
1731
  friendly_response=friendly,
1732
  intent=intent_data,
1733
  api_data=api_data,
1734
  state="pipeline_proposed",
1735
- output=output
 
 
1736
  )
1737
 
1738
  except Exception as e:
@@ -1801,13 +1850,16 @@ async def chat_unified(
1801
  "modification": "edited"
1802
  }
1803
 
 
1804
  return _assistant_response_payload(
1805
  chat_id=chat_id,
1806
  friendly_response=friendly,
1807
  intent=intent_data,
1808
  api_data=api_data,
1809
  state="pipeline_proposed",
1810
- output=output
 
 
1811
  )
1812
  except Exception as e:
1813
  api_data = {
@@ -1930,8 +1982,6 @@ async def chat_unified_stream(
1930
 
1931
  def emit(obj: Dict[str, Any]) -> bytes:
1932
  obj.setdefault("chat_id", chat_id)
1933
- # CHANGE: Fetch latest chat_name from session_manager to ensure it's up-to-date
1934
- # (e.g. if generated during the stream)
1935
  current_session = session_manager.get_session(chat_id) or {}
1936
  obj.setdefault("chat_name", current_session.get("chat_name"))
1937
  obj.setdefault("state", current_session.get("state", "initial"))
@@ -2025,11 +2075,22 @@ async def chat_unified_stream(
2025
  f"- Type 'reject' or 'no' to cancel\n"
2026
  f"- Describe changes to modify the plan"
2027
  )
2028
- _add_and_mirror_message(chat_id, "assistant", friendly)
 
 
 
 
 
 
 
 
 
2029
  yield emit({
2030
  "type": "assistant_final",
2031
  "content": friendly,
2032
  "pipeline": pipeline,
 
 
2033
  "output": {
2034
  "pipeline_id": pipeline_id,
2035
  "pipeline_name": pipeline_name,
@@ -2057,6 +2118,8 @@ async def chat_unified_stream(
2057
  if intent_data["intent"] == "approval":
2058
  session_manager.update_session(chat_id, {"state": "executing"})
2059
  plan = session_local.get("proposed_pipeline", {})
 
 
2060
  initial = (
2061
  f"βœ… Approved! Starting execution of: **{plan.get('pipeline_name', 'pipeline')}**\n\n"
2062
  f"πŸš€ Processing, please wait...\n_(Using {plan.get('_generator', 'AI')} - {plan.get('_model', 'model')})_"
@@ -2069,8 +2132,6 @@ async def chat_unified_stream(
2069
  local_path, cleanup = download_to_temp_file(file_ref)
2070
 
2071
  try:
2072
- pipeline_id = plan.get("pipeline_id")
2073
-
2074
  for event in execute_pipeline_streaming(
2075
  pipeline=plan,
2076
  file_path=local_path,
@@ -2117,7 +2178,16 @@ async def chat_unified_stream(
2117
  err = event.get("error", "Unknown error")
2118
  friendly_err = f"❌ Pipeline Failed\n\nError: {err}\n\nCompleted {len(steps_completed)} step(s) before failure."
2119
  session_manager.update_session(chat_id, {"state": "initial"})
2120
- _add_and_mirror_message(chat_id, "assistant", friendly_err)
 
 
 
 
 
 
 
 
 
2121
 
2122
  # V3: Update status + result in S3
2123
  if pipeline_id:
@@ -2127,7 +2197,9 @@ async def chat_unified_stream(
2127
  "type": "assistant_final",
2128
  "content": friendly_err,
2129
  "error": err,
2130
- "exception": str(err)
 
 
2131
  })
2132
  return
2133
 
@@ -2136,14 +2208,14 @@ async def chat_unified_stream(
2136
 
2137
  # V3: Update pipeline status
2138
  if pipeline_id:
2139
- _update_pipeline_status(pipeline_id, chat_id, "completed", result=final_payload)
2140
- _record_model_attribution(
2141
  pipeline_id=pipeline_id,
2142
  session_id=chat_id,
2143
  model_provider=plan.get("_model_provider", "unknown"),
2144
  model_name=plan.get("_model", "unknown"),
2145
  is_fallback=False
2146
- )
2147
 
2148
  success_count = len([s for s in steps_completed if s.get("status") == "completed"])
2149
  result_text = _extract_user_facing_text(final_payload)
@@ -2155,15 +2227,28 @@ async def chat_unified_stream(
2155
  f"- Executor: {executor_used}\n\n"
2156
  f"{result_text or ''}"
2157
  )
2158
- _add_and_mirror_message(chat_id, "assistant", friendly_final)
 
 
 
 
 
 
 
 
 
 
2159
  yield emit({
2160
  "type": "assistant_final",
2161
  "content": friendly_final,
2162
  "result": final_payload,
 
 
2163
  "output": {
2164
  "component_summary": f"Executed {success_count} steps successfully",
2165
  "steps_completed": success_count,
2166
- "total_steps": len(steps_completed)
 
2167
  },
2168
  "final_output": {
2169
  "text": result_text,
@@ -2175,18 +2260,42 @@ async def chat_unified_stream(
2175
  # Success but no payload?
2176
  done = f"βœ… Pipeline Completed! Executed {len(steps_completed)} steps."
2177
  session_manager.update_session(chat_id, {"state": "initial"})
2178
- _add_and_mirror_message(chat_id, "assistant", done)
 
 
 
 
 
 
 
 
 
 
2179
  # V3 check
2180
  if pipeline_id:
2181
  _update_pipeline_status(pipeline_id, chat_id, "completed", result={"message": "Completed without output"})
2182
 
2183
- yield emit({"type": "assistant_final", "content": done})
 
 
 
 
 
2184
  return
2185
 
2186
  except Exception as e:
2187
  friendly_err = f"❌ Pipeline Execution Failed\n\nError: {str(e)}"
2188
  session_manager.update_session(chat_id, {"state": "initial"})
2189
- _add_and_mirror_message(chat_id, "assistant", friendly_err)
 
 
 
 
 
 
 
 
 
2190
 
2191
  # V3 Update
2192
  if pipeline_id:
@@ -2196,7 +2305,9 @@ async def chat_unified_stream(
2196
  "type": "assistant_final",
2197
  "content": friendly_err,
2198
  "error": str(e),
2199
- "exception": str(e)
 
 
2200
  })
2201
  return
2202
  finally:
@@ -2213,6 +2324,7 @@ async def chat_unified_stream(
2213
  return
2214
 
2215
  else:
 
2216
  try:
2217
  original_plan = session_local.get("proposed_pipeline", {})
2218
  edit_context = f"Original: {original_plan.get('pipeline_name')}. User wants: {message}"
@@ -2233,7 +2345,15 @@ async def chat_unified_stream(
2233
  session_manager.update_session(chat_id, {"proposed_pipeline": new_pipeline, "state": "pipeline_proposed"})
2234
  formatted = format_pipeline_for_display(new_pipeline)
2235
  friendly = formatted + f"\n\n```json\n{json.dumps(new_pipeline, indent=2)}\n```"
2236
- _add_and_mirror_message(chat_id, "assistant", friendly)
 
 
 
 
 
 
 
 
2237
 
2238
  # Get steps list for output
2239
  steps_list = new_pipeline.get("pipeline_steps", new_pipeline.get("components", []))
@@ -2242,6 +2362,8 @@ async def chat_unified_stream(
2242
  "type": "assistant_final",
2243
  "content": friendly,
2244
  "pipeline": new_pipeline,
 
 
2245
  "output": {
2246
  "pipeline_id": pipeline_id,
2247
  "pipeline_name": new_pipeline.get("pipeline_name", "Document Processing"),
 
645
  content: str,
646
  *,
647
  pipeline_id: Optional[str] = None,
648
+ pipeline_action: Optional[str] = None, # "created" | "executed" | "failed"
649
  pipeline_status: Optional[str] = None,
650
  pipeline_result: Optional[dict] = None,
651
  file_metadata: Optional[dict] = None,
652
  ):
653
  """
654
  V3 RULE: Append message to S3 conversation.
655
+ Now properly includes pipeline_id and results in messages.
656
+
657
+ Args:
658
+ chat_id: Session ID
659
+ role: "user" | "assistant" | "system"
660
+ content: Message text
661
+ pipeline_id: ID of related pipeline (if any)
662
+ pipeline_action: "created" | "executed" | "failed"
663
+ pipeline_status: Pipeline status (for executed pipelines)
664
+ pipeline_result: Full pipeline result (for executed pipelines)
665
+ file_metadata: File info if message has file attachment
666
  """
667
  # 1. Load existing
668
  current_messages = _load_conversation_from_s3(chat_id)
 
674
  "timestamp": datetime.utcnow().isoformat() + "Z"
675
  }
676
 
677
+ # βœ… FIX: Add pipeline_id if provided
678
+ if pipeline_id:
679
+ new_msg["pipeline_id"] = pipeline_id
680
+
681
+ # βœ… FIX: Add pipeline_action if provided
682
+ if pipeline_action:
683
+ new_msg["pipeline_action"] = pipeline_action
684
+
685
+ # βœ… FIX: Add result for executed/failed pipelines
686
+ if role == "assistant" and pipeline_action in ("executed", "failed") and pipeline_id:
687
+ result_data = {
688
+ "pipeline_id": pipeline_id,
689
+ "status": pipeline_status or (pipeline_result.get("status") if pipeline_result else None) or pipeline_action,
690
+ }
691
+
692
+ if pipeline_result:
693
+ # Extract user-facing text from result
694
  result_text = ""
695
+ execution_results = pipeline_result.get("execution_results", pipeline_result)
696
 
697
+ # Try to extract the main result text from various locations
698
  # Check for image descriptions
699
  if execution_results.get("image_descriptions"):
700
  image_desc = execution_results["image_descriptions"]
701
  if image_desc.get("result") and isinstance(image_desc["result"], list) and len(image_desc["result"]) > 0:
702
  page_result = image_desc["result"][0]
 
703
  if page_result.get("gemini", {}).get("description"):
704
  result_text = page_result["gemini"]["description"]
705
  elif page_result.get("mistral", {}).get("description"):
706
  result_text = page_result["mistral"]["description"]
707
 
708
+ # Check for extracted text
709
  elif execution_results.get("text"):
710
  result_text = execution_results["text"]
711
 
712
  # Check for component results
713
  elif execution_results.get("components_executed"):
714
+ for comp in execution_results.get("components_executed", []):
715
  comp_result = comp.get("result", {})
716
  if comp_result.get("text"):
717
  result_text = comp_result["text"]
 
727
  result_text = page_result["mistral"]["description"]
728
  break
729
 
730
+ # Fallback to generic extraction
731
+ if not result_text:
732
+ result_text = _extract_user_facing_text(pipeline_result)
733
+
734
+ result_data["text"] = result_text
735
+
736
+ # Add error if present
737
+ if pipeline_result.get("error"):
738
+ result_data["error"] = pipeline_result.get("error")
739
+
740
+ # Add summary stats
741
+ if pipeline_result.get("completed_steps") is not None:
742
+ result_data["completed_steps"] = pipeline_result.get("completed_steps")
743
+ if pipeline_result.get("total_steps") is not None:
744
+ result_data["total_steps"] = pipeline_result.get("total_steps")
745
+
746
+ elif pipeline_action == "failed":
747
+ result_data["error"] = "Pipeline execution failed"
748
+
749
+ new_msg["result"] = result_data
750
 
751
  # Add file metadata if provided
752
  if file_metadata:
 
777
  final_output: Optional[Dict[str, Any]] = None,
778
  exception: Optional[str] = None,
779
  pipeline_result: Optional[Dict[str, Any]] = None,
780
+ pipeline_id: Optional[str] = None,
781
+ pipeline_action: Optional[str] = None, # βœ… NEW: "created" | "executed" | "failed"
782
  ) -> ChatResponse:
783
  """
784
  Create ChatResponse payload with all required fields.
785
+
786
+ Args:
787
+ chat_id: Session ID
788
+ friendly_response: User-friendly message
789
+ intent: Intent classification result
790
+ api_data: Technical API response data
791
+ state: Current session state
792
+ output: Pipeline summary output
793
+ final_output: Final downloadable result
794
+ exception: Error message if any
795
+ pipeline_result: Full pipeline execution result
796
+ pipeline_id: Pipeline ID (for tracking in messages)
797
+ pipeline_action: "created" | "executed" | "failed"
798
  """
799
  # Generate message_id for assistant response
800
  from services.schemas import generate_message_id
801
  message_id = generate_message_id()
802
 
803
+ # Determine pipeline status from result or exception
804
+ pipeline_status = None
805
+ if pipeline_result:
806
+ pipeline_status = pipeline_result.get("status")
807
+ elif exception:
808
+ pipeline_status = "failed"
809
+
810
+ # Persist assistant message to S3 WITH pipeline data
811
  _add_and_mirror_message(
812
  chat_id=chat_id,
813
  role="assistant",
814
  content=friendly_response,
815
+ pipeline_id=pipeline_id,
816
+ pipeline_action=pipeline_action,
817
+ pipeline_status=pipeline_status,
818
+ pipeline_result=pipeline_result,
819
  )
820
 
821
  # Get file metadata from session
 
1073
  ):
1074
  """
1075
  Get conversation history for a session.
1076
+
1077
+ V3 FIX:
1078
+ - pipeline_id and pipeline_action are now stored IN the messages
1079
+ - result is embedded for executed pipelines
1080
+ - Only enrich if result is missing (fallback)
1081
  """
1082
  try:
1083
  # Load conversation history from S3
 
1088
  for msg in history:
1089
  msg_copy = msg.copy()
1090
 
1091
+ # Ensure message_id exists
1092
+ if "message_id" not in msg_copy:
1093
+ from services.schemas import generate_message_id
1094
+ msg_copy["message_id"] = generate_message_id()
1095
 
1096
+ # Normalize timestamp
1097
+ if "timestamp" in msg_copy and isinstance(msg_copy["timestamp"], datetime):
1098
+ msg_copy["timestamp"] = msg_copy["timestamp"].isoformat()
1099
 
1100
+ # βœ… pipeline_id and pipeline_action are now IN the message - no guessing needed!
1101
+ pipeline_id = msg_copy.get("pipeline_id")
1102
+ pipeline_action = msg_copy.get("pipeline_action")
1103
+
1104
+ # βœ… Enrich ONLY if result is missing for executed/failed pipelines
1105
  if (
1106
+ pipeline_id
1107
+ and pipeline_action in ("executed", "failed")
1108
+ and not msg_copy.get("result")
1109
  ):
1110
+ # Fallback: Load result from S3 pipeline file
1111
  try:
1112
  s3_key = f"{S3_PREFIX}/pipelines/{pipeline_id}.json"
1113
  resp = s3.get_object(Bucket=S3_BUCKET, Key=s3_key)
1114
+ pipeline_data = json.loads(resp["Body"].read().decode("utf-8"))
 
 
1115
 
 
1116
  if "result" in pipeline_data:
1117
+ # Extract user-facing text
1118
+ result_text = _extract_user_facing_text(pipeline_data["result"])
1119
+ msg_copy["result"] = {
1120
+ "pipeline_id": pipeline_id,
1121
+ "status": pipeline_data.get("status", pipeline_action),
1122
+ "text": result_text,
1123
+ }
1124
+ if pipeline_data["result"].get("error"):
1125
+ msg_copy["result"]["error"] = pipeline_data["result"]["error"]
1126
+ except Exception as e:
1127
+ # Non-fatal enrichment failure
1128
+ msg_copy["result"] = {
1129
+ "pipeline_id": pipeline_id,
1130
+ "status": "unknown",
1131
+ "error": f"Could not load result: {str(e)}"
1132
+ }
1133
 
1134
+ # βœ… Add pipeline_metadata for created/executed pipelines
1135
+ if pipeline_id:
1136
+ try:
1137
+ s3_key = f"{S3_PREFIX}/pipelines/{pipeline_id}.json"
1138
+ resp = s3.get_object(Bucket=S3_BUCKET, Key=s3_key)
1139
+ pipeline_data = json.loads(resp["Body"].read().decode("utf-8"))
1140
+
1141
  msg_copy["pipeline_metadata"] = {
1142
  "pipeline_id": pipeline_id,
1143
+ "pipeline_name": pipeline_data.get("pipeline_name", "unknown"),
 
 
1144
  "status": pipeline_data.get("status", "unknown"),
1145
+ "created_at": pipeline_data.get("created_at"),
1146
+ "updated_at": pipeline_data.get("updated_at"),
 
 
1147
  }
1148
+ except Exception:
1149
+ # Keep existing pipeline_id, just no extra metadata
1150
+ pass
 
 
 
 
 
 
 
 
 
 
 
 
1151
 
1152
  enhanced_history.append(msg_copy)
1153
 
 
1158
  # Load session metadata
1159
  session = session_manager.get_session(session_id) or {}
1160
 
1161
+ # ----- PIPELINES HISTORY -----
1162
  pipelines_hist = session.get("pipelines_history", [])
1163
  enhanced_pipelines = []
1164
 
 
1172
  pipeline_s3_key = pipeline_meta.get("pipeline_s3_key")
1173
  if pipeline_s3_key:
1174
  try:
1175
+ resp = s3.get_object(Bucket=S3_BUCKET, Key=pipeline_s3_key)
1176
+ pipeline_def = json.loads(resp["Body"].read().decode("utf-8"))
 
 
 
 
 
1177
 
1178
  components = (
1179
  pipeline_def.get("components")
 
1195
  enhanced_pipe["component_count"] = 0
1196
  enhanced_pipe["tools"] = []
1197
  enhanced_pipe["load_error"] = str(e)
 
1198
  else:
1199
  enhanced_pipe["components"] = []
1200
  enhanced_pipe["component_count"] = 0
1201
  enhanced_pipe["tools"] = []
1202
 
1203
  if "hasError" not in enhanced_pipe:
1204
+ enhanced_pipe["hasError"] = enhanced_pipe.get("status") == "failed"
 
 
1205
 
1206
  enhanced_pipelines.append(enhanced_pipe)
1207
 
 
1466
  file_ref = session.get("current_file")
1467
  local_path, cleanup = download_to_temp_file(file_ref)
1468
  session_manager.update_session(chat_id, {"state": "executing"})
1469
+
1470
+ # βœ… Get pipeline_id from proposed pipeline
1471
+ pipeline_id = proposed.get("pipeline_id")
1472
 
1473
  try:
1474
  result = execute_pipeline(
 
1489
  is_success = (pipeline_status == "completed" and completed_steps == total_steps and not has_error)
1490
 
1491
  # V3: Update pipeline status in S3
 
1492
  if pipeline_id:
 
1493
  final_status = "completed" if is_success else "failed"
1494
  _update_pipeline_status(pipeline_id, chat_id, final_status, result=result)
1495
  _record_model_attribution(
 
1548
  "pipeline": api_pipeline
1549
  }
1550
 
1551
+ # βœ… Return response with pipeline_id and pipeline_action
1552
  return _assistant_response_payload(
1553
  chat_id=chat_id,
1554
  friendly_response=friendly,
 
1558
  output=output,
1559
  final_output=final_output,
1560
  exception=exception_msg,
1561
+ pipeline_result=result,
1562
+ pipeline_id=pipeline_id,
1563
+ pipeline_action="executed" # βœ… ADD THIS
1564
  )
1565
  else:
1566
  # Pipeline failed or partially completed
 
1576
  })
1577
 
1578
  if failed_components:
 
1579
  first_error = failed_components[0]
1580
  friendly = f"❌ Pipeline failed: {first_error['tool_name']} - {first_error['error']}"
1581
  else:
 
1585
  "component_summary": f"Pipeline {pipeline_status}",
1586
  "steps": total_steps,
1587
  "completed": completed_steps,
1588
+ "failed": total_steps - completed_steps,
1589
+ "pipeline_id": pipeline_id
1590
  }
1591
  final_output = {"text": f"Pipeline execution {pipeline_status} with {completed_steps}/{total_steps} steps completed"}
1592
  api_type = "pipeline_failed" if pipeline_status == "failed" else "pipeline_partial"
1593
  exception_msg = error_msg
1594
 
 
1595
  api_data = {
1596
  "type": api_type,
1597
  "result": result,
1598
  "pipeline": proposed
1599
  }
1600
 
1601
+ # βœ… Return response with pipeline_id and pipeline_action="failed"
1602
  return _assistant_response_payload(
1603
  chat_id=chat_id,
1604
  friendly_response=friendly,
 
1608
  output=output,
1609
  final_output=final_output,
1610
  exception=exception_msg,
1611
+ pipeline_result=result,
1612
+ pipeline_id=pipeline_id,
1613
+ pipeline_action="failed" # βœ… ADD THIS
1614
  )
1615
  except Exception as e:
1616
  session_manager.update_session(chat_id, {"state": "initial"})
 
1617
  if pipeline_id:
1618
  _update_pipeline_status(pipeline_id, chat_id, "failed", result={"error": str(e)})
1619
 
 
1625
  pipeline_mgr = get_pipeline_manager()
1626
  pipeline_record = pipeline_mgr.get_pipeline(pipeline_id)
1627
  if pipeline_record and pipeline_record.get("components"):
 
1628
  for comp in pipeline_record.get("components", []):
1629
  if comp.get("hasError") or comp.get("status") == "failed":
1630
  failed_component = {
 
1652
  if failed_component:
1653
  api_data["failed_component"] = failed_component
1654
 
1655
+ # βœ… Return with pipeline_id and pipeline_action="failed"
 
 
 
 
 
 
1656
  return _assistant_response_payload(
1657
  chat_id=chat_id,
1658
  friendly_response=friendly,
 
1660
  api_data=api_data,
1661
  state="initial",
1662
  exception=str(e),
1663
+ pipeline_result=error_result,
1664
+ pipeline_id=pipeline_id,
1665
+ pipeline_action="failed" # βœ… ADD THIS
1666
  )
1667
  finally:
1668
  try:
 
1772
  ]
1773
  }
1774
 
1775
+ # βœ… Return with pipeline_id and pipeline_action="created"
1776
  return _assistant_response_payload(
1777
  chat_id=chat_id,
1778
  friendly_response=friendly,
1779
  intent=intent_data,
1780
  api_data=api_data,
1781
  state="pipeline_proposed",
1782
+ output=output,
1783
+ pipeline_id=pipeline_id,
1784
+ pipeline_action="created" # βœ… ADD THIS
1785
  )
1786
 
1787
  except Exception as e:
 
1850
  "modification": "edited"
1851
  }
1852
 
1853
+ # βœ… Return with pipeline_id and pipeline_action="created"
1854
  return _assistant_response_payload(
1855
  chat_id=chat_id,
1856
  friendly_response=friendly,
1857
  intent=intent_data,
1858
  api_data=api_data,
1859
  state="pipeline_proposed",
1860
+ output=output,
1861
+ pipeline_id=pipeline_id,
1862
+ pipeline_action="created" # βœ… ADD THIS
1863
  )
1864
  except Exception as e:
1865
  api_data = {
 
1982
 
1983
  def emit(obj: Dict[str, Any]) -> bytes:
1984
  obj.setdefault("chat_id", chat_id)
 
 
1985
  current_session = session_manager.get_session(chat_id) or {}
1986
  obj.setdefault("chat_name", current_session.get("chat_name"))
1987
  obj.setdefault("state", current_session.get("state", "initial"))
 
2075
  f"- Type 'reject' or 'no' to cancel\n"
2076
  f"- Describe changes to modify the plan"
2077
  )
2078
+
2079
+ # βœ… FIX: Add pipeline_id and pipeline_action to message
2080
+ _add_and_mirror_message(
2081
+ chat_id,
2082
+ "assistant",
2083
+ friendly,
2084
+ pipeline_id=pipeline_id,
2085
+ pipeline_action="created"
2086
+ )
2087
+
2088
  yield emit({
2089
  "type": "assistant_final",
2090
  "content": friendly,
2091
  "pipeline": pipeline,
2092
+ "pipeline_id": pipeline_id, # βœ… Include in emit
2093
+ "pipeline_action": "created",
2094
  "output": {
2095
  "pipeline_id": pipeline_id,
2096
  "pipeline_name": pipeline_name,
 
2118
  if intent_data["intent"] == "approval":
2119
  session_manager.update_session(chat_id, {"state": "executing"})
2120
  plan = session_local.get("proposed_pipeline", {})
2121
+ pipeline_id = plan.get("pipeline_id") # βœ… Get pipeline_id
2122
+
2123
  initial = (
2124
  f"βœ… Approved! Starting execution of: **{plan.get('pipeline_name', 'pipeline')}**\n\n"
2125
  f"πŸš€ Processing, please wait...\n_(Using {plan.get('_generator', 'AI')} - {plan.get('_model', 'model')})_"
 
2132
  local_path, cleanup = download_to_temp_file(file_ref)
2133
 
2134
  try:
 
 
2135
  for event in execute_pipeline_streaming(
2136
  pipeline=plan,
2137
  file_path=local_path,
 
2178
  err = event.get("error", "Unknown error")
2179
  friendly_err = f"❌ Pipeline Failed\n\nError: {err}\n\nCompleted {len(steps_completed)} step(s) before failure."
2180
  session_manager.update_session(chat_id, {"state": "initial"})
2181
+
2182
+ # βœ… FIX: Add pipeline_id and pipeline_action to message
2183
+ _add_and_mirror_message(
2184
+ chat_id,
2185
+ "assistant",
2186
+ friendly_err,
2187
+ pipeline_id=pipeline_id,
2188
+ pipeline_action="failed",
2189
+ pipeline_result={"error": str(err), "status": "failed"}
2190
+ )
2191
 
2192
  # V3: Update status + result in S3
2193
  if pipeline_id:
 
2197
  "type": "assistant_final",
2198
  "content": friendly_err,
2199
  "error": err,
2200
+ "exception": str(err),
2201
+ "pipeline_id": pipeline_id,
2202
+ "pipeline_action": "failed"
2203
  })
2204
  return
2205
 
 
2208
 
2209
  # V3: Update pipeline status
2210
  if pipeline_id:
2211
+ _update_pipeline_status(pipeline_id, chat_id, "completed", result=final_payload)
2212
+ _record_model_attribution(
2213
  pipeline_id=pipeline_id,
2214
  session_id=chat_id,
2215
  model_provider=plan.get("_model_provider", "unknown"),
2216
  model_name=plan.get("_model", "unknown"),
2217
  is_fallback=False
2218
+ )
2219
 
2220
  success_count = len([s for s in steps_completed if s.get("status") == "completed"])
2221
  result_text = _extract_user_facing_text(final_payload)
 
2227
  f"- Executor: {executor_used}\n\n"
2228
  f"{result_text or ''}"
2229
  )
2230
+
2231
+ # βœ… FIX: Add pipeline_id and pipeline_action to message
2232
+ _add_and_mirror_message(
2233
+ chat_id,
2234
+ "assistant",
2235
+ friendly_final,
2236
+ pipeline_id=pipeline_id,
2237
+ pipeline_action="executed",
2238
+ pipeline_result=final_payload
2239
+ )
2240
+
2241
  yield emit({
2242
  "type": "assistant_final",
2243
  "content": friendly_final,
2244
  "result": final_payload,
2245
+ "pipeline_id": pipeline_id,
2246
+ "pipeline_action": "executed",
2247
  "output": {
2248
  "component_summary": f"Executed {success_count} steps successfully",
2249
  "steps_completed": success_count,
2250
+ "total_steps": len(steps_completed),
2251
+ "pipeline_id": pipeline_id
2252
  },
2253
  "final_output": {
2254
  "text": result_text,
 
2260
  # Success but no payload?
2261
  done = f"βœ… Pipeline Completed! Executed {len(steps_completed)} steps."
2262
  session_manager.update_session(chat_id, {"state": "initial"})
2263
+
2264
+ # βœ… FIX: Add pipeline_id and pipeline_action to message
2265
+ _add_and_mirror_message(
2266
+ chat_id,
2267
+ "assistant",
2268
+ done,
2269
+ pipeline_id=pipeline_id,
2270
+ pipeline_action="executed",
2271
+ pipeline_result={"message": "Completed without output", "status": "completed"}
2272
+ )
2273
+
2274
  # V3 check
2275
  if pipeline_id:
2276
  _update_pipeline_status(pipeline_id, chat_id, "completed", result={"message": "Completed without output"})
2277
 
2278
+ yield emit({
2279
+ "type": "assistant_final",
2280
+ "content": done,
2281
+ "pipeline_id": pipeline_id,
2282
+ "pipeline_action": "executed"
2283
+ })
2284
  return
2285
 
2286
  except Exception as e:
2287
  friendly_err = f"❌ Pipeline Execution Failed\n\nError: {str(e)}"
2288
  session_manager.update_session(chat_id, {"state": "initial"})
2289
+
2290
+ # βœ… FIX: Add pipeline_id and pipeline_action to message
2291
+ _add_and_mirror_message(
2292
+ chat_id,
2293
+ "assistant",
2294
+ friendly_err,
2295
+ pipeline_id=pipeline_id,
2296
+ pipeline_action="failed",
2297
+ pipeline_result={"error": str(e), "status": "failed"}
2298
+ )
2299
 
2300
  # V3 Update
2301
  if pipeline_id:
 
2305
  "type": "assistant_final",
2306
  "content": friendly_err,
2307
  "error": str(e),
2308
+ "exception": str(e),
2309
+ "pipeline_id": pipeline_id,
2310
+ "pipeline_action": "failed"
2311
  })
2312
  return
2313
  finally:
 
2324
  return
2325
 
2326
  else:
2327
+ # Edit request
2328
  try:
2329
  original_plan = session_local.get("proposed_pipeline", {})
2330
  edit_context = f"Original: {original_plan.get('pipeline_name')}. User wants: {message}"
 
2345
  session_manager.update_session(chat_id, {"proposed_pipeline": new_pipeline, "state": "pipeline_proposed"})
2346
  formatted = format_pipeline_for_display(new_pipeline)
2347
  friendly = formatted + f"\n\n```json\n{json.dumps(new_pipeline, indent=2)}\n```"
2348
+
2349
+ # βœ… FIX: Add pipeline_id and pipeline_action to message
2350
+ _add_and_mirror_message(
2351
+ chat_id,
2352
+ "assistant",
2353
+ friendly,
2354
+ pipeline_id=pipeline_id,
2355
+ pipeline_action="created"
2356
+ )
2357
 
2358
  # Get steps list for output
2359
  steps_list = new_pipeline.get("pipeline_steps", new_pipeline.get("components", []))
 
2362
  "type": "assistant_final",
2363
  "content": friendly,
2364
  "pipeline": new_pipeline,
2365
+ "pipeline_id": pipeline_id,
2366
+ "pipeline_action": "created",
2367
  "output": {
2368
  "pipeline_id": pipeline_id,
2369
  "pipeline_name": new_pipeline.get("pipeline_name", "Document Processing"),