redhairedshanks1 commited on
Commit
5dc0941
Β·
1 Parent(s): 6187f40

Update api_routes_v2.py

Browse files
Files changed (1) hide show
  1. api_routes_v2.py +86 -29
api_routes_v2.py CHANGED
@@ -948,50 +948,102 @@ async def get_session_history(
948
  limit: int = 50
949
  ):
950
  """
951
- Get conversation history for a session
952
- CHANGE: Adds 'chat_name' and 'pipelines_history' (added fields only).
953
  """
954
  try:
955
- # FIXED: Load conversation history from S3 (where messages are actually stored)
956
- # Previously was using session_manager.get_session_history() which reads from
957
- # MongoDB messages collection, but messages are stored in S3 conversation file
958
  history = _load_conversation_from_s3(session_id)
959
 
960
- # Apply limit
961
- if limit and len(history) > limit:
962
- history = history[-limit:] # Get last N messages
963
-
964
- # Format datetime objects to ISO strings for JSON serialization
965
- # Also add message_id to each message if not present
966
- from services.schemas import generate_message_id
967
- formatted_history = []
968
  for msg in history:
969
  msg_copy = msg.copy()
970
 
971
- # Add message_id if not present
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
972
  if "message_id" not in msg_copy:
 
973
  msg_copy["message_id"] = generate_message_id()
974
 
 
975
  if "timestamp" in msg_copy and isinstance(msg_copy["timestamp"], datetime):
976
  msg_copy["timestamp"] = msg_copy["timestamp"].isoformat()
977
- formatted_history.append(msg_copy)
978
-
 
 
 
 
 
 
979
  s = session_manager.get_session(session_id) or {}
980
 
981
- # Enhanced pipelines_history with component metadata, sorted by most recent first
982
  pipelines_hist = s.get("pipelines_history", [])
983
  enhanced_pipelines = []
984
  for pipeline_meta in pipelines_hist:
985
  enhanced_pipe = pipeline_meta.copy()
986
 
987
- # CHANGE: Rename result_preview to result
988
  if "result_preview" in enhanced_pipe:
989
  enhanced_pipe["result"] = enhanced_pipe.pop("result_preview")
990
 
991
- # CHANGE: Remove internal S3 keys from response
992
  enhanced_pipe.pop("pipeline_s3_key", None)
993
 
994
- # Load pipeline definition from S3 to get components/tools
995
  pipeline_s3_key = pipeline_meta.get("pipeline_s3_key")
996
  if pipeline_s3_key:
997
  try:
@@ -1007,7 +1059,12 @@ async def get_session_history(
1007
 
1008
  enhanced_pipe["tools"] = tools_list
1009
  enhanced_pipe["component_count"] = len(components)
1010
- enhanced_pipe["components"] = components # Full component details
 
 
 
 
 
1011
  except Exception as e:
1012
  print(f"Warning: Failed to load pipeline {pipeline_s3_key}: {e}")
1013
  enhanced_pipe["tools"] = []
@@ -1018,26 +1075,25 @@ async def get_session_history(
1018
  enhanced_pipe["component_count"] = 0
1019
  enhanced_pipe["components"] = []
1020
 
1021
- # CHANGE: Add hasError field if not present
1022
  if "hasError" not in enhanced_pipe:
1023
  enhanced_pipe["hasError"] = enhanced_pipe.get("status") == "failed"
1024
 
1025
  enhanced_pipelines.append(enhanced_pipe)
1026
 
1027
- # Ensure sorting by most recent first (updated_at or created_at)
1028
  enhanced_pipelines.sort(
1029
  key=lambda p: p.get("updated_at") or p.get("created_at") or "",
1030
  reverse=True
1031
  )
1032
 
1033
- # Return with session_id included
1034
  return {
1035
  "session_id": session_id,
1036
- "history": formatted_history,
1037
- "count": len(formatted_history),
1038
  "limit": limit,
1039
- "chat_name": s.get("chat_name"), # CHANGE
1040
- "pipelines_history": enhanced_pipelines # CHANGE: enhanced with components
1041
  }
1042
 
1043
  except Exception as e:
@@ -1323,7 +1379,8 @@ async def chat_unified(
1323
  friendly = "πŸŽ‰ Pipeline completed successfully!"
1324
  output = {
1325
  "component_summary": "Pipeline executed successfully",
1326
- "steps": total_steps
 
1327
  }
1328
  api_type = "pipeline_completed"
1329
  exception_msg = None
 
948
  limit: int = 50
949
  ):
950
  """
951
+ Get conversation history for a session.
952
+ ENHANCEMENT: For pipeline completion messages, add final_output from S3.
953
  """
954
  try:
955
+ # Load conversation history from S3
 
 
956
  history = _load_conversation_from_s3(session_id)
957
 
958
+ # NEW: Add final_output to pipeline completion messages
959
+ enhanced_history = []
 
 
 
 
 
 
960
  for msg in history:
961
  msg_copy = msg.copy()
962
 
963
+ # Check if this is a pipeline completion message
964
+ # Look for the specific pattern in your example
965
+ content = msg_copy.get("content", "")
966
+ role = msg_copy.get("role", "")
967
+
968
+ # Identify pipeline completion messages
969
+ # Your example: "πŸŽ‰ Pipeline completed successfully!"
970
+ if role == "assistant" and "Pipeline completed" in content:
971
+ # Try to find pipeline_id for this message
972
+ pipeline_id = None
973
+
974
+ # Method 1: Check message metadata (if we stored it)
975
+ if "pipeline_id" in msg_copy:
976
+ pipeline_id = msg_copy.get("pipeline_id")
977
+
978
+ # Method 2: Look in recent context
979
+ if not pipeline_id:
980
+ # Check the last few assistant messages for pipeline context
981
+ for prev_msg in reversed(enhanced_history):
982
+ prev_content = prev_msg.get("content", "")
983
+ if "Pipeline Created" in prev_content and "output" in prev_msg:
984
+ pipeline_id = prev_msg["output"].get("pipeline_id")
985
+ break
986
+
987
+ # Load results from S3 if we found a pipeline_id
988
+ if pipeline_id:
989
+ try:
990
+ # Load pipeline results from S3
991
+ s3_key = f"{S3_PREFIX}/pipelines/{pipeline_id}.json"
992
+ resp = s3.get_object(Bucket=S3_BUCKET, Key=s3_key)
993
+ pipeline_data = json.loads(resp["Body"].read().decode("utf-8"))
994
+
995
+ # Check if results exist
996
+ if "result" in pipeline_data:
997
+ # Extract the final output from results
998
+ results = pipeline_data["result"]
999
+
1000
+ # Add final_output to the message
1001
+ msg_copy["final_output"] = results
1002
+
1003
+ # Also store pipeline metadata
1004
+ msg_copy["pipeline_metadata"] = {
1005
+ "pipeline_id": pipeline_id,
1006
+ "pipeline_name": pipeline_data.get("pipeline_name", "Unknown"),
1007
+ "status": pipeline_data.get("status", "unknown"),
1008
+ "executed_at": pipeline_data.get("updated_at", pipeline_data.get("created_at"))
1009
+ }
1010
+ except Exception as e:
1011
+ print(f"Warning: Could not load pipeline results for {pipeline_id}: {e}")
1012
+ # Optionally add error info
1013
+ msg_copy["final_output_error"] = str(e)
1014
+
1015
+ # Add message_id if not present (existing code)
1016
  if "message_id" not in msg_copy:
1017
+ from services.schemas import generate_message_id
1018
  msg_copy["message_id"] = generate_message_id()
1019
 
1020
+ # Format timestamp (existing code)
1021
  if "timestamp" in msg_copy and isinstance(msg_copy["timestamp"], datetime):
1022
  msg_copy["timestamp"] = msg_copy["timestamp"].isoformat()
1023
+
1024
+ enhanced_history.append(msg_copy)
1025
+
1026
+ # Apply limit (existing code)
1027
+ if limit and len(enhanced_history) > limit:
1028
+ enhanced_history = enhanced_history[-limit:]
1029
+
1030
+ # Get session info (existing code)
1031
  s = session_manager.get_session(session_id) or {}
1032
 
1033
+ # Enhanced pipelines_history (existing code, but add results)
1034
  pipelines_hist = s.get("pipelines_history", [])
1035
  enhanced_pipelines = []
1036
  for pipeline_meta in pipelines_hist:
1037
  enhanced_pipe = pipeline_meta.copy()
1038
 
1039
+ # Rename result_preview to result
1040
  if "result_preview" in enhanced_pipe:
1041
  enhanced_pipe["result"] = enhanced_pipe.pop("result_preview")
1042
 
1043
+ # Remove internal S3 keys from response
1044
  enhanced_pipe.pop("pipeline_s3_key", None)
1045
 
1046
+ # Try to load full results from S3 for pipelines_history too
1047
  pipeline_s3_key = pipeline_meta.get("pipeline_s3_key")
1048
  if pipeline_s3_key:
1049
  try:
 
1059
 
1060
  enhanced_pipe["tools"] = tools_list
1061
  enhanced_pipe["component_count"] = len(components)
1062
+ enhanced_pipe["components"] = components
1063
+
1064
+ # Load results if available
1065
+ if "result" in pipeline_def:
1066
+ enhanced_pipe["execution_results"] = pipeline_def["result"]
1067
+
1068
  except Exception as e:
1069
  print(f"Warning: Failed to load pipeline {pipeline_s3_key}: {e}")
1070
  enhanced_pipe["tools"] = []
 
1075
  enhanced_pipe["component_count"] = 0
1076
  enhanced_pipe["components"] = []
1077
 
1078
+ # Add hasError field if not present
1079
  if "hasError" not in enhanced_pipe:
1080
  enhanced_pipe["hasError"] = enhanced_pipe.get("status") == "failed"
1081
 
1082
  enhanced_pipelines.append(enhanced_pipe)
1083
 
1084
+ # Ensure sorting by most recent first
1085
  enhanced_pipelines.sort(
1086
  key=lambda p: p.get("updated_at") or p.get("created_at") or "",
1087
  reverse=True
1088
  )
1089
 
 
1090
  return {
1091
  "session_id": session_id,
1092
+ "history": enhanced_history, # Now includes final_output for pipeline messages
1093
+ "count": len(enhanced_history),
1094
  "limit": limit,
1095
+ "chat_name": s.get("chat_name"),
1096
+ "pipelines_history": enhanced_pipelines
1097
  }
1098
 
1099
  except Exception as e:
 
1379
  friendly = "πŸŽ‰ Pipeline completed successfully!"
1380
  output = {
1381
  "component_summary": "Pipeline executed successfully",
1382
+ "steps": total_steps,
1383
+ "pipeline_id": pipeline_id
1384
  }
1385
  api_type = "pipeline_completed"
1386
  exception_msg = None