redhairedshanks1 commited on
Commit
09168dc
·
1 Parent(s): d8b9e43

Update api_routes_v2.py

Browse files
Files changed (1) hide show
  1. api_routes_v2.py +105 -105
api_routes_v2.py CHANGED
@@ -1023,150 +1023,149 @@ async def get_session_history(
1023
  ):
1024
  """
1025
  Get conversation history for a session.
1026
- ENHANCEMENT: For pipeline completion messages, add final_output from S3.
 
 
 
1027
  """
1028
  try:
1029
  # Load conversation history from S3
1030
  history = _load_conversation_from_s3(session_id)
1031
-
1032
- # NEW: Add final_output to pipeline completion messages
1033
  enhanced_history = []
 
1034
  for msg in history:
1035
  msg_copy = msg.copy()
1036
-
1037
- # Check if this is a pipeline completion message
1038
- # Look for the specific pattern in your example
 
 
 
 
1039
  content = msg_copy.get("content", "")
1040
- role = msg_copy.get("role", "")
1041
-
1042
- # Identify pipeline completion messages
1043
- # Your example: "🎉 Pipeline completed successfully!"
1044
- if role == "assistant" and "Pipeline completed" in content:
1045
- # Try to find pipeline_id for this message
1046
- pipeline_id = None
1047
-
1048
- # Method 1: Check message metadata (if we stored it)
1049
- if "pipeline_id" in msg_copy:
1050
- pipeline_id = msg_copy.get("pipeline_id")
1051
-
1052
- # Method 2: Look in recent context
1053
- if not pipeline_id:
1054
- # Check the last few assistant messages for pipeline context
1055
- for prev_msg in reversed(enhanced_history):
1056
- prev_content = prev_msg.get("content", "")
1057
- if "Pipeline Created" in prev_content and "output" in prev_msg:
1058
- pipeline_id = prev_msg["output"].get("pipeline_id")
1059
- break
1060
-
1061
- # Load results from S3 if we found a pipeline_id
1062
- if pipeline_id:
1063
- try:
1064
- # Load pipeline results from S3
1065
- s3_key = f"{S3_PREFIX}/pipelines/{pipeline_id}.json"
1066
- resp = s3.get_object(Bucket=S3_BUCKET, Key=s3_key)
1067
- pipeline_data = json.loads(resp["Body"].read().decode("utf-8"))
1068
-
1069
- # Check if results exist
1070
- if "result" in pipeline_data:
1071
- # Extract the final output from results
1072
- results = pipeline_data["result"]
1073
-
1074
- # Add final_output to the message
1075
- msg_copy["final_output"] = results
1076
-
1077
- # Also store pipeline metadata
1078
- msg_copy["pipeline_metadata"] = {
1079
- "pipeline_id": pipeline_id,
1080
- "pipeline_name": pipeline_data.get("pipeline_name", "Unknown"),
1081
- "status": pipeline_data.get("status", "unknown"),
1082
- "executed_at": pipeline_data.get("updated_at", pipeline_data.get("created_at"))
1083
- }
1084
- except Exception as e:
1085
- print(f"Warning: Could not load pipeline results for {pipeline_id}: {e}")
1086
- # Optionally add error info
1087
- msg_copy["final_output_error"] = str(e)
1088
-
1089
- # Add message_id if not present (existing code)
1090
  if "message_id" not in msg_copy:
1091
  from services.schemas import generate_message_id
1092
  msg_copy["message_id"] = generate_message_id()
1093
-
1094
- # Format timestamp (existing code)
1095
- if "timestamp" in msg_copy and isinstance(msg_copy["timestamp"], datetime):
 
 
1096
  msg_copy["timestamp"] = msg_copy["timestamp"].isoformat()
1097
-
1098
  enhanced_history.append(msg_copy)
1099
-
1100
- # Apply limit (existing code)
1101
  if limit and len(enhanced_history) > limit:
1102
  enhanced_history = enhanced_history[-limit:]
1103
-
1104
- # Get session info (existing code)
1105
- s = session_manager.get_session(session_id) or {}
1106
-
1107
- # Enhanced pipelines_history (existing code, but add results)
1108
- pipelines_hist = s.get("pipelines_history", [])
1109
  enhanced_pipelines = []
 
1110
  for pipeline_meta in pipelines_hist:
1111
  enhanced_pipe = pipeline_meta.copy()
1112
-
1113
- # Rename result_preview to result
1114
- if "result_preview" in enhanced_pipe:
1115
- enhanced_pipe["result"] = enhanced_pipe.pop("result_preview")
1116
-
1117
- # Remove internal S3 keys from response
1118
  enhanced_pipe.pop("pipeline_s3_key", None)
1119
-
1120
- # Try to load full results from S3 for pipelines_history too
1121
  pipeline_s3_key = pipeline_meta.get("pipeline_s3_key")
1122
  if pipeline_s3_key:
1123
  try:
1124
- resp = s3.get_object(Bucket=S3_BUCKET, Key=pipeline_s3_key)
1125
- pipeline_def = json.loads(resp["Body"].read().decode("utf-8"))
1126
-
1127
- # Extract tools/components
1128
- components = pipeline_def.get("components") or pipeline_def.get("pipeline_steps", [])
1129
- tools_list = [
1130
- comp.get("tool_name", comp.get("tool", "unknown"))
1131
- for comp in components
1132
- ]
1133
-
1134
- enhanced_pipe["tools"] = tools_list
1135
- enhanced_pipe["component_count"] = len(components)
 
1136
  enhanced_pipe["components"] = components
1137
-
1138
- # Load results if available
 
 
 
 
1139
  if "result" in pipeline_def:
1140
  enhanced_pipe["execution_results"] = pipeline_def["result"]
1141
-
1142
  except Exception as e:
1143
- print(f"Warning: Failed to load pipeline {pipeline_s3_key}: {e}")
1144
- enhanced_pipe["tools"] = []
1145
- enhanced_pipe["component_count"] = 0
1146
  enhanced_pipe["components"] = []
 
 
 
 
1147
  else:
1148
- enhanced_pipe["tools"] = []
1149
- enhanced_pipe["component_count"] = 0
1150
  enhanced_pipe["components"] = []
1151
-
1152
- # Add hasError field if not present
 
1153
  if "hasError" not in enhanced_pipe:
1154
- enhanced_pipe["hasError"] = enhanced_pipe.get("status") == "failed"
1155
-
 
 
1156
  enhanced_pipelines.append(enhanced_pipe)
1157
-
1158
- # Ensure sorting by most recent first
1159
  enhanced_pipelines.sort(
1160
- key=lambda p: p.get("updated_at") or p.get("created_at") or "",
1161
  reverse=True
1162
  )
1163
 
1164
  return {
1165
  "session_id": session_id,
1166
- "history": enhanced_history, # Now includes final_output for pipeline messages
1167
  "count": len(enhanced_history),
1168
  "limit": limit,
1169
- "chat_name": s.get("chat_name"),
1170
  "pipelines_history": enhanced_pipelines
1171
  }
1172
 
@@ -1176,6 +1175,7 @@ async def get_session_history(
1176
  detail=f"Error retrieving session history: {str(e)}"
1177
  )
1178
 
 
1179
  @router.delete("/sessions/{session_id}")
1180
  async def delete_session(session_id: str):
1181
  """
 
1023
  ):
1024
  """
1025
  Get conversation history for a session.
1026
+ Correct behavior:
1027
+ - Preserve pipeline_id on messages
1028
+ - Enrich ONLY using stored pipeline_id
1029
+ - No inference, no guessing, no context scanning
1030
  """
1031
  try:
1032
  # Load conversation history from S3
1033
  history = _load_conversation_from_s3(session_id)
1034
+
 
1035
  enhanced_history = []
1036
+
1037
  for msg in history:
1038
  msg_copy = msg.copy()
1039
+
1040
+ # ALWAYS pass through pipeline_id if present
1041
+ pipeline_id = msg.get("pipeline_id")
1042
+ if pipeline_id:
1043
+ msg_copy["pipeline_id"] = pipeline_id
1044
+
1045
+ role = msg_copy.get("role")
1046
  content = msg_copy.get("content", "")
1047
+
1048
+ # ✅ Enrich ONLY pipeline completion messages WITH pipeline_id
1049
+ if (
1050
+ role == "assistant"
1051
+ and "Pipeline completed" in content
1052
+ and pipeline_id
1053
+ ):
1054
+ try:
1055
+ s3_key = f"{S3_PREFIX}/pipelines/{pipeline_id}.json"
1056
+ resp = s3.get_object(Bucket=S3_BUCKET, Key=s3_key)
1057
+ pipeline_data = json.loads(
1058
+ resp["Body"].read().decode("utf-8")
1059
+ )
1060
+
1061
+ # Attach final output if present
1062
+ if "result" in pipeline_data:
1063
+ msg_copy["final_output"] = pipeline_data["result"]
1064
+
1065
+ # Attach pipeline metadata
1066
+ msg_copy["pipeline_metadata"] = {
1067
+ "pipeline_id": pipeline_id,
1068
+ "pipeline_name": pipeline_data.get(
1069
+ "pipeline_name", "unknown"
1070
+ ),
1071
+ "status": pipeline_data.get("status", "unknown"),
1072
+ "executed_at": pipeline_data.get(
1073
+ "updated_at",
1074
+ pipeline_data.get("created_at")
1075
+ )
1076
+ }
1077
+
1078
+ except Exception as e:
1079
+ # Non-fatal enrichment failure
1080
+ msg_copy["final_output_error"] = str(e)
1081
+
1082
+ # Ensure message_id exists
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1083
  if "message_id" not in msg_copy:
1084
  from services.schemas import generate_message_id
1085
  msg_copy["message_id"] = generate_message_id()
1086
+
1087
+ # Normalize timestamp
1088
+ if "timestamp" in msg_copy and isinstance(
1089
+ msg_copy["timestamp"], datetime
1090
+ ):
1091
  msg_copy["timestamp"] = msg_copy["timestamp"].isoformat()
1092
+
1093
  enhanced_history.append(msg_copy)
1094
+
1095
+ # Apply limit (keep most recent)
1096
  if limit and len(enhanced_history) > limit:
1097
  enhanced_history = enhanced_history[-limit:]
1098
+
1099
+ # Load session metadata
1100
+ session = session_manager.get_session(session_id) or {}
1101
+
1102
+ # ----- PIPELINES HISTORY (unchanged but cleaned) -----
1103
+ pipelines_hist = session.get("pipelines_history", [])
1104
  enhanced_pipelines = []
1105
+
1106
  for pipeline_meta in pipelines_hist:
1107
  enhanced_pipe = pipeline_meta.copy()
1108
+
1109
+ # Remove internal-only keys
 
 
 
 
1110
  enhanced_pipe.pop("pipeline_s3_key", None)
1111
+
1112
+ # Load full pipeline definition if available
1113
  pipeline_s3_key = pipeline_meta.get("pipeline_s3_key")
1114
  if pipeline_s3_key:
1115
  try:
1116
+ resp = s3.get_object(
1117
+ Bucket=S3_BUCKET,
1118
+ Key=pipeline_s3_key
1119
+ )
1120
+ pipeline_def = json.loads(
1121
+ resp["Body"].read().decode("utf-8")
1122
+ )
1123
+
1124
+ components = (
1125
+ pipeline_def.get("components")
1126
+ or pipeline_def.get("pipeline_steps", [])
1127
+ )
1128
+
1129
  enhanced_pipe["components"] = components
1130
+ enhanced_pipe["component_count"] = len(components)
1131
+ enhanced_pipe["tools"] = [
1132
+ c.get("tool_name", c.get("tool", "unknown"))
1133
+ for c in components
1134
+ ]
1135
+
1136
  if "result" in pipeline_def:
1137
  enhanced_pipe["execution_results"] = pipeline_def["result"]
1138
+
1139
  except Exception as e:
 
 
 
1140
  enhanced_pipe["components"] = []
1141
+ enhanced_pipe["component_count"] = 0
1142
+ enhanced_pipe["tools"] = []
1143
+ enhanced_pipe["load_error"] = str(e)
1144
+
1145
  else:
 
 
1146
  enhanced_pipe["components"] = []
1147
+ enhanced_pipe["component_count"] = 0
1148
+ enhanced_pipe["tools"] = []
1149
+
1150
  if "hasError" not in enhanced_pipe:
1151
+ enhanced_pipe["hasError"] = (
1152
+ enhanced_pipe.get("status") == "failed"
1153
+ )
1154
+
1155
  enhanced_pipelines.append(enhanced_pipe)
1156
+
1157
+ # Sort pipelines by recency
1158
  enhanced_pipelines.sort(
1159
+ key=lambda p: p.get("updated_at") or p.get("created_at") or "",
1160
  reverse=True
1161
  )
1162
 
1163
  return {
1164
  "session_id": session_id,
1165
+ "history": enhanced_history,
1166
  "count": len(enhanced_history),
1167
  "limit": limit,
1168
+ "chat_name": session.get("chat_name"),
1169
  "pipelines_history": enhanced_pipelines
1170
  }
1171
 
 
1175
  detail=f"Error retrieving session history: {str(e)}"
1176
  )
1177
 
1178
+
1179
  @router.delete("/sessions/{session_id}")
1180
  async def delete_session(session_id: str):
1181
  """