ganesh-vilje commited on
Commit
d6a6b32
·
1 Parent(s): 6431123

Refactor: Use MongoDB _id for messages, pipeline_id_index for components, standardize outputs to result:{text:{}} format

Browse files
services/pipeline_manager.py CHANGED
@@ -144,9 +144,10 @@ class PipelineManager:
144
  pipeline_steps = pipeline_definition.get("components", []) or pipeline_definition.get("pipeline_steps", [])
145
  components = []
146
 
 
147
  for i, step in enumerate(pipeline_steps):
148
  component = ComponentMetadata(
149
- component_id=generate_component_id(),
150
  component_name=step.get("tool_name", "unknown"),
151
  order=i + 1,
152
  status=ComponentStatus.PENDING
@@ -193,7 +194,7 @@ class PipelineManager:
193
  execution_id: Pipeline execution ID
194
  component_name: Name of component
195
  status: "executing", "completed", or "failed"
196
- output: Component output (stored temporarily, compiled to final output later)
197
  error: Error message if failed
198
  success_message: Success message if completed
199
 
@@ -229,9 +230,18 @@ class PipelineManager:
229
  else:
230
  comp["hasError"] = False
231
 
232
- # Store component output
233
  if output:
234
- comp["component_output"] = output
 
 
 
 
 
 
 
 
 
235
 
236
  component_found = True
237
  break
@@ -252,9 +262,10 @@ class PipelineManager:
252
 
253
  # Store output temporarily (will be compiled into final_output later)
254
  if output:
255
- # Store in a temporary location, will be moved to final output
 
256
  temp_key = f"sessions/{pipeline['session_id']}/outputs/{execution_id}/temp_{component_name}.json"
257
- self.s3.upload_json(temp_key, output, add_prefix=False)
258
 
259
  return True
260
 
@@ -291,32 +302,53 @@ class PipelineManager:
291
  workflow_status = "failed"
292
  break
293
 
294
- # Get last successful node output
295
  last_node_output = None
296
  for result in reversed(components_results):
297
  if result.get("status") == "completed" and result.get("result"):
298
- # Extract text from result
299
  res = result.get("result", {})
300
  if isinstance(res, dict):
301
- last_node_output = res.get("text") or res.get("summary") or json.dumps(res)[:500]
 
 
 
 
 
 
 
 
 
302
  elif isinstance(res, str):
303
  last_node_output = res
304
  break
305
 
306
- # Build final output for S3
307
  component_outputs = []
308
  for i, result in enumerate(components_results):
 
 
 
 
 
 
 
 
 
 
 
 
309
  comp_output = ComponentOutputS3(
310
- component_id=result.get("component_id", generate_component_id()),
311
  name=result.get("component_name", f"component_{i+1}"),
312
  order=i + 1,
313
  status=result.get("status", "unknown"),
314
- component_output=result.get("result"),
315
  hasError=result.get("status") == "failed",
316
  error={"message": result.get("error")} if result.get("error") else None,
317
  success_message=result.get("success_message"),
318
  metadata=result.get("metadata"),
319
- result=result.get("result")
320
  )
321
  component_outputs.append(comp_output.model_dump())
322
 
@@ -362,7 +394,8 @@ class PipelineManager:
362
  "final_output_expires_at": presigned["presigned_expires_at"],
363
  "final_output_s3_key": final_output_key,
364
  "workflow_status": workflow_status,
365
- "last_node_output": last_node_output
 
366
  }
367
 
368
  def mark_pipeline_failed(
 
144
  pipeline_steps = pipeline_definition.get("components", []) or pipeline_definition.get("pipeline_steps", [])
145
  components = []
146
 
147
+ # Generate component IDs in format: execution_id_index
148
  for i, step in enumerate(pipeline_steps):
149
  component = ComponentMetadata(
150
+ component_id=generate_component_id(execution_id, i),
151
  component_name=step.get("tool_name", "unknown"),
152
  order=i + 1,
153
  status=ComponentStatus.PENDING
 
194
  execution_id: Pipeline execution ID
195
  component_name: Name of component
196
  status: "executing", "completed", or "failed"
197
+ output: Component output (will be wrapped in result:{text:{}} format)
198
  error: Error message if failed
199
  success_message: Success message if completed
200
 
 
230
  else:
231
  comp["hasError"] = False
232
 
233
+ # Wrap component output in standardized format: result:{text:{}}
234
  if output:
235
+ # Check if already wrapped
236
+ if "result" in output and "text" in output.get("result", {}):
237
+ comp["component_output"] = output
238
+ else:
239
+ # Wrap in standardized format
240
+ comp["component_output"] = {
241
+ "result": {
242
+ "text": output
243
+ }
244
+ }
245
 
246
  component_found = True
247
  break
 
262
 
263
  # Store output temporarily (will be compiled into final_output later)
264
  if output:
265
+ # Wrap output before storing
266
+ wrapped_output = comp["component_output"] # Already wrapped above
267
  temp_key = f"sessions/{pipeline['session_id']}/outputs/{execution_id}/temp_{component_name}.json"
268
+ self.s3.upload_json(temp_key, wrapped_output, add_prefix=False)
269
 
270
  return True
271
 
 
302
  workflow_status = "failed"
303
  break
304
 
305
+ # Get last successful node output - extract from result:{text:{}} format
306
  last_node_output = None
307
  for result in reversed(components_results):
308
  if result.get("status") == "completed" and result.get("result"):
309
+ # Extract text from standardized result:{text:{}} format
310
  res = result.get("result", {})
311
  if isinstance(res, dict):
312
+ # Check for result:{text:{}} structure
313
+ if "result" in res and "text" in res.get("result", {}):
314
+ text_content = res["result"]["text"]
315
+ if isinstance(text_content, dict):
316
+ last_node_output = text_content.get("text") or text_content.get("summary") or json.dumps(text_content)[:500]
317
+ else:
318
+ last_node_output = str(text_content)[:500]
319
+ else:
320
+ # Fallback for non-wrapped format
321
+ last_node_output = res.get("text") or res.get("summary") or json.dumps(res)[:500]
322
  elif isinstance(res, str):
323
  last_node_output = res
324
  break
325
 
326
+ # Build final output for S3 with standardized format
327
  component_outputs = []
328
  for i, result in enumerate(components_results):
329
+ # Ensure result is wrapped in result:{text:{}} format
330
+ result_data = result.get("result", {})
331
+ if isinstance(result_data, dict) and "result" in result_data and "text" in result_data.get("result", {}):
332
+ wrapped_result = result_data
333
+ else:
334
+ # Wrap in standardized format
335
+ wrapped_result = {
336
+ "result": {
337
+ "text": result_data
338
+ }
339
+ }
340
+
341
  comp_output = ComponentOutputS3(
342
+ component_id=result.get("component_id", f"{execution_id}_{i}"),
343
  name=result.get("component_name", f"component_{i+1}"),
344
  order=i + 1,
345
  status=result.get("status", "unknown"),
346
+ component_output=wrapped_result,
347
  hasError=result.get("status") == "failed",
348
  error={"message": result.get("error")} if result.get("error") else None,
349
  success_message=result.get("success_message"),
350
  metadata=result.get("metadata"),
351
+ result=wrapped_result
352
  )
353
  component_outputs.append(comp_output.model_dump())
354
 
 
394
  "final_output_expires_at": presigned["presigned_expires_at"],
395
  "final_output_s3_key": final_output_key,
396
  "workflow_status": workflow_status,
397
+ "last_node_output": last_node_output,
398
+ "result": {"text": {"summary": last_node_output}} if last_node_output else None
399
  }
400
 
401
  def mark_pipeline_failed(
services/schemas.py CHANGED
@@ -248,14 +248,9 @@ class PipelineDefinitionS3(BaseModel):
248
  # UTILITY FUNCTIONS
249
  # ========================
250
 
251
- def generate_message_id() -> str:
252
- """Generate unique message ID"""
253
- return f"msg_{uuid.uuid4().hex[:12]}"
254
-
255
-
256
- def generate_component_id() -> str:
257
- """Generate unique component ID"""
258
- return f"comp_{uuid.uuid4().hex[:12]}"
259
 
260
 
261
  def generate_output_id() -> str:
 
248
  # UTILITY FUNCTIONS
249
  # ========================
250
 
251
+ def generate_component_id(pipeline_id: str, index: int) -> str:
252
+ """Generate component ID in format: pipeline_id_index"""
253
+ return f"{pipeline_id}_{index}"
 
 
 
 
 
254
 
255
 
256
  def generate_output_id() -> str:
services/session_manager.py CHANGED
@@ -20,8 +20,7 @@ from pymongo.errors import DuplicateKeyError, ConnectionFailure
20
  from services.s3_manager import get_s3_manager
21
  from services.schemas import (
22
  SessionSchema, MessageSchema, SessionState, MessageRole,
23
- SessionStats, MessageS3Content, schema_to_dict, datetime_to_iso,
24
- generate_message_id
25
  )
26
 
27
 
@@ -92,13 +91,14 @@ class SessionManager:
92
  session_id: str,
93
  role: str,
94
  content: str,
95
- file_data: Optional[Dict[str, Any]] = None
 
96
  ) -> Dict[str, Any]:
97
  """
98
  Add message to session
99
 
100
  Stores:
101
- - Metadata in messages collection
102
  - Full content in S3
103
  - Updates session last_activity
104
 
@@ -107,45 +107,62 @@ class SessionManager:
107
  role: "user", "assistant", or "system"
108
  content: Message content
109
  file_data: Optional file metadata
 
110
 
111
  Returns:
112
- Message metadata dict
113
  """
114
- message_id = generate_message_id()
115
  now = datetime.utcnow()
116
 
117
- # Store full message content in S3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
118
  s3_key = f"sessions/{session_id}/messages/{message_id}.json"
119
  s3_bucket = self.s3.bucket_name
120
 
121
- message_s3_content = MessageS3Content(
122
- message_id=message_id,
123
- role=role,
124
- content=content,
125
- timestamp=now.isoformat() + "Z",
126
- file_data=file_data or {}
127
- )
128
 
129
- self.s3.upload_json(s3_key, message_s3_content.model_dump(), add_prefix=False)
 
130
 
131
- # Create message metadata record
132
- content_preview = content[:200] if len(content) > 200 else content
133
 
134
- message_record = MessageSchema(
135
- message_id=message_id,
136
- session_id=session_id,
137
- role=MessageRole(role),
138
- timestamp=now,
139
- s3_key=s3_key,
140
- s3_bucket=s3_bucket,
141
- content_preview=content_preview,
142
- has_file=bool(file_data and file_data.get("file_id")),
143
- file_id=file_data.get("file_id") if file_data else None
144
  )
145
 
146
- # Insert message metadata
147
- self.messages_collection.insert_one(schema_to_dict(message_record))
148
-
149
  # Update session last_activity and message count
150
  self.sessions_collection.update_one(
151
  {"session_id": session_id},
@@ -155,7 +172,6 @@ class SessionManager:
155
  }
156
  )
157
 
158
-
159
  # Auto-generate chat name after first user message (only if not already generated)
160
  if role == "user" and not content.lower().startswith("uploaded file"):
161
  # Check if chat name already exists to avoid unnecessary generation attempts
@@ -166,8 +182,9 @@ class SessionManager:
166
  if session and not session.get("chat_name"):
167
  self._maybe_generate_chat_name(session_id)
168
 
169
-
170
- return schema_to_dict(message_record)
 
171
 
172
  def get_messages(
173
  self,
@@ -184,14 +201,17 @@ class SessionManager:
184
  include_content: Whether to fetch full content from S3
185
 
186
  Returns:
187
- List of message dicts
188
  """
189
- # Get message metadata from MongoDB
190
  messages = list(self.messages_collection.find(
191
- {"session_id": session_id},
192
- {"_id": 0}
193
  ).sort("timestamp", 1).limit(limit))
194
 
 
 
 
 
195
  if not include_content:
196
  return messages
197
 
@@ -201,6 +221,9 @@ class SessionManager:
201
  full_content = self.s3.download_json(msg["s3_key"], add_prefix=False)
202
  msg["content"] = full_content.get("content", msg["content_preview"])
203
  msg["file_data"] = full_content.get("file_data", {})
 
 
 
204
  except Exception as e:
205
  # If S3 fetch fails, use preview
206
  msg["content"] = msg["content_preview"]
 
20
  from services.s3_manager import get_s3_manager
21
  from services.schemas import (
22
  SessionSchema, MessageSchema, SessionState, MessageRole,
23
+ SessionStats, MessageS3Content, schema_to_dict, datetime_to_iso
 
24
  )
25
 
26
 
 
91
  session_id: str,
92
  role: str,
93
  content: str,
94
+ file_data: Optional[Dict[str, Any]] = None,
95
+ result: Optional[Dict[str, Any]] = None
96
  ) -> Dict[str, Any]:
97
  """
98
  Add message to session
99
 
100
  Stores:
101
+ - Metadata in messages collection (using MongoDB _id as message_id)
102
  - Full content in S3
103
  - Updates session last_activity
104
 
 
107
  role: "user", "assistant", or "system"
108
  content: Message content
109
  file_data: Optional file metadata
110
+ result: Optional pipeline result data (for pipeline completion messages)
111
 
112
  Returns:
113
+ Message metadata dict with MongoDB _id as message_id
114
  """
 
115
  now = datetime.utcnow()
116
 
117
+ # Create message metadata record WITHOUT message_id (MongoDB will generate _id)
118
+ content_preview = content[:200] if len(content) > 200 else content
119
+
120
+ message_record = {
121
+ "session_id": session_id,
122
+ "role": role,
123
+ "timestamp": now.isoformat() + "Z",
124
+ "content_preview": content_preview,
125
+ "has_file": bool(file_data and file_data.get("file_id")),
126
+ "file_id": file_data.get("file_id") if file_data else None,
127
+ "metadata": {}
128
+ }
129
+
130
+ # Add result if provided (for pipeline completion messages)
131
+ if result:
132
+ message_record["result"] = result
133
+
134
+ # Insert message metadata and get the MongoDB _id
135
+ insert_result = self.messages_collection.insert_one(message_record)
136
+ message_id = str(insert_result.inserted_id)
137
+
138
+ # Now store full message content in S3 using the MongoDB _id
139
  s3_key = f"sessions/{session_id}/messages/{message_id}.json"
140
  s3_bucket = self.s3.bucket_name
141
 
142
+ message_s3_content = {
143
+ "message_id": message_id,
144
+ "role": role,
145
+ "content": content,
146
+ "timestamp": now.isoformat() + "Z",
147
+ "file_data": file_data or {}
148
+ }
149
 
150
+ if result:
151
+ message_s3_content["result"] = result
152
 
153
+ self.s3.upload_json(s3_key, message_s3_content, add_prefix=False)
 
154
 
155
+ # Update the message record with S3 info
156
+ self.messages_collection.update_one(
157
+ {"_id": insert_result.inserted_id},
158
+ {
159
+ "$set": {
160
+ "s3_key": s3_key,
161
+ "s3_bucket": s3_bucket
162
+ }
163
+ }
 
164
  )
165
 
 
 
 
166
  # Update session last_activity and message count
167
  self.sessions_collection.update_one(
168
  {"session_id": session_id},
 
172
  }
173
  )
174
 
 
175
  # Auto-generate chat name after first user message (only if not already generated)
176
  if role == "user" and not content.lower().startswith("uploaded file"):
177
  # Check if chat name already exists to avoid unnecessary generation attempts
 
182
  if session and not session.get("chat_name"):
183
  self._maybe_generate_chat_name(session_id)
184
 
185
+ # Return message record with _id as message_id
186
+ message_record["message_id"] = message_id
187
+ return message_record
188
 
189
  def get_messages(
190
  self,
 
201
  include_content: Whether to fetch full content from S3
202
 
203
  Returns:
204
+ List of message dicts with _id converted to message_id
205
  """
206
+ # Get message metadata from MongoDB (include _id this time)
207
  messages = list(self.messages_collection.find(
208
+ {"session_id": session_id}
 
209
  ).sort("timestamp", 1).limit(limit))
210
 
211
+ # Convert MongoDB _id to message_id string
212
+ for msg in messages:
213
+ msg["message_id"] = str(msg.pop("_id"))
214
+
215
  if not include_content:
216
  return messages
217
 
 
221
  full_content = self.s3.download_json(msg["s3_key"], add_prefix=False)
222
  msg["content"] = full_content.get("content", msg["content_preview"])
223
  msg["file_data"] = full_content.get("file_data", {})
224
+ # Include result if present in S3
225
+ if "result" in full_content:
226
+ msg["result"] = full_content["result"]
227
  except Exception as e:
228
  # If S3 fetch fails, use preview
229
  msg["content"] = msg["content_preview"]