ganesh-vilje commited on
Commit
7c2e633
·
1 Parent(s): 3804e08

feat: API response corrections - add message_id, component tracking, and error handling

Browse files

- Added ID generation utilities (message_id, component_id, output_id)
- Enhanced ComponentMetadata with component_output, hasError, and structured errors
- Renamed result_preview to result in pipeline responses
- Added hasError flag to ChatResponse model
- Removed internal fields (current_file, pipeline_s3_key) from API responses
- Updated session_manager to use generate_message_id()
- Enhanced pipeline_manager with component tracking and error handling
- Updated API routes to expose new response structures

All changes are backward compatible. No database migration required.

api_routes_v2.py CHANGED
@@ -40,6 +40,7 @@ s3 = boto3.client("s3", region_name=AWS_REGION)
40
  # ========================
41
 
42
  class Message(BaseModel):
 
43
  role: str
44
  content: str
45
  timestamp: Optional[str] = None
@@ -48,9 +49,11 @@ class Message(BaseModel):
48
  fileUrl: Optional[str] = None
49
 
50
  class ChatResponse(BaseModel):
 
51
  assistant_response: str
52
  output: Dict[str, Any] = Field(default_factory=dict)
53
  final_output: Optional[Dict[str, Any]] = None
 
54
  exception: Optional[str] = None
55
  api_response: Dict[str, Any]
56
  intent: Dict[str, Any]
@@ -594,6 +597,10 @@ def _assistant_response_payload(
594
  """
595
  Create ChatResponse payload with all required fields.
596
  """
 
 
 
 
597
  # Persist assistant message to S3
598
  _add_and_mirror_message(chat_id, "assistant", friendly_response)
599
 
@@ -602,9 +609,11 @@ def _assistant_response_payload(
602
  file_metadata = session.get("file_metadata", {})
603
 
604
  return ChatResponse(
 
605
  assistant_response=friendly_response,
606
  output=output or {},
607
  final_output=final_output,
 
608
  exception=exception,
609
  api_response=api_data,
610
  intent=intent,
@@ -804,7 +813,7 @@ async def get_all_sessions(
804
  "created_at": created_at,
805
  "last_activity": last_activity,
806
  "state": session.get("state", "unknown"),
807
- "current_file": session.get("current_file"),
808
  "chat_name": session.get("chat_name"), # CHANGE: added field
809
  "stats": session.get("stats", {}),
810
  "total_messages": len(session.get("conversation_history", [])),
@@ -866,6 +875,14 @@ async def get_session_history(
866
  enhanced_pipelines = []
867
  for pipeline_meta in pipelines_hist:
868
  enhanced_pipe = pipeline_meta.copy()
 
 
 
 
 
 
 
 
869
  # Load pipeline definition from S3 to get components/tools
870
  pipeline_s3_key = pipeline_meta.get("pipeline_s3_key")
871
  if pipeline_s3_key:
@@ -892,6 +909,11 @@ async def get_session_history(
892
  enhanced_pipe["tools"] = []
893
  enhanced_pipe["component_count"] = 0
894
  enhanced_pipe["components"] = []
 
 
 
 
 
895
  enhanced_pipelines.append(enhanced_pipe)
896
 
897
  # Ensure sorting by most recent first (updated_at or created_at)
 
40
  # ========================
41
 
42
  class Message(BaseModel):
43
+ message_id: Optional[str] = None # Unique message identifier
44
  role: str
45
  content: str
46
  timestamp: Optional[str] = None
 
49
  fileUrl: Optional[str] = None
50
 
51
  class ChatResponse(BaseModel):
52
+ message_id: Optional[str] = None # Message ID for assistant response
53
  assistant_response: str
54
  output: Dict[str, Any] = Field(default_factory=dict)
55
  final_output: Optional[Dict[str, Any]] = None
56
+ hasError: bool = False # Error flag
57
  exception: Optional[str] = None
58
  api_response: Dict[str, Any]
59
  intent: Dict[str, Any]
 
597
  """
598
  Create ChatResponse payload with all required fields.
599
  """
600
+ # Generate message_id for assistant response
601
+ from services.schemas import generate_message_id
602
+ message_id = generate_message_id()
603
+
604
  # Persist assistant message to S3
605
  _add_and_mirror_message(chat_id, "assistant", friendly_response)
606
 
 
609
  file_metadata = session.get("file_metadata", {})
610
 
611
  return ChatResponse(
612
+ message_id=message_id,
613
  assistant_response=friendly_response,
614
  output=output or {},
615
  final_output=final_output,
616
+ hasError=bool(exception),
617
  exception=exception,
618
  api_response=api_data,
619
  intent=intent,
 
813
  "created_at": created_at,
814
  "last_activity": last_activity,
815
  "state": session.get("state", "unknown"),
816
+ # REMOVED: current_file (not needed for session list)
817
  "chat_name": session.get("chat_name"), # CHANGE: added field
818
  "stats": session.get("stats", {}),
819
  "total_messages": len(session.get("conversation_history", [])),
 
875
  enhanced_pipelines = []
876
  for pipeline_meta in pipelines_hist:
877
  enhanced_pipe = pipeline_meta.copy()
878
+
879
+ # CHANGE: Rename result_preview to result
880
+ if "result_preview" in enhanced_pipe:
881
+ enhanced_pipe["result"] = enhanced_pipe.pop("result_preview")
882
+
883
+ # CHANGE: Remove internal S3 keys from response
884
+ enhanced_pipe.pop("pipeline_s3_key", None)
885
+
886
  # Load pipeline definition from S3 to get components/tools
887
  pipeline_s3_key = pipeline_meta.get("pipeline_s3_key")
888
  if pipeline_s3_key:
 
909
  enhanced_pipe["tools"] = []
910
  enhanced_pipe["component_count"] = 0
911
  enhanced_pipe["components"] = []
912
+
913
+ # CHANGE: Add hasError field if not present
914
+ if "hasError" not in enhanced_pipe:
915
+ enhanced_pipe["hasError"] = enhanced_pipe.get("status") == "failed"
916
+
917
  enhanced_pipelines.append(enhanced_pipe)
918
 
919
  # Ensure sorting by most recent first (updated_at or created_at)
services/pipeline_manager.py CHANGED
@@ -17,7 +17,8 @@ from pymongo import MongoClient
17
  from services.s3_manager import get_s3_manager
18
  from services.schemas import (
19
  PipelineSchema, ComponentMetadata, PipelineStatus, ComponentStatus,
20
- FinalOutputS3, ComponentOutputS3, schema_to_dict
 
21
  )
22
  import os
23
 
@@ -145,6 +146,7 @@ class PipelineManager:
145
 
146
  for i, step in enumerate(pipeline_steps):
147
  component = ComponentMetadata(
 
148
  component_name=step.get("tool_name", "unknown"),
149
  order=i + 1,
150
  status=ComponentStatus.PENDING
@@ -156,6 +158,7 @@ class PipelineManager:
156
  self.s3.upload_json(s3_key, pipeline_definition, add_prefix=False)
157
 
158
  # Create pipeline record
 
159
  pipeline_record = PipelineSchema(
160
  execution_id=execution_id,
161
  session_id=session_id,
@@ -165,7 +168,8 @@ class PipelineManager:
165
  created_by_message=created_by_message,
166
  status=PipelineStatus.PROPOSED,
167
  pipeline_definition_s3_key=s3_key,
168
- components=components
 
169
  )
170
 
171
  # Insert into MongoDB
@@ -220,7 +224,14 @@ class PipelineManager:
220
  comp["success_message"] = success_message
221
 
222
  if error:
223
- comp["error"] = error
 
 
 
 
 
 
 
224
 
225
  component_found = True
226
  break
@@ -296,11 +307,15 @@ class PipelineManager:
296
  component_outputs = []
297
  for i, result in enumerate(components_results):
298
  comp_output = ComponentOutputS3(
 
299
  name=result.get("component_name", f"component_{i+1}"),
300
  order=i + 1,
301
  status=result.get("status", "unknown"),
 
 
 
302
  success_message=result.get("success_message"),
303
- error=result.get("error"),
304
  result=result.get("result")
305
  )
306
  component_outputs.append(comp_output.model_dump())
@@ -335,7 +350,9 @@ class PipelineManager:
335
  "final_output_presigned_expires_at": presigned["presigned_expires_at"],
336
  "completed_at": now.isoformat() + "Z",
337
  "executor": executor,
338
- "result_preview": result_preview
 
 
339
  }
340
  }
341
  )
@@ -370,7 +387,8 @@ class PipelineManager:
370
  {
371
  "$set": {
372
  "status": PipelineStatus.FAILED,
373
- "error": error,
 
374
  "completed_at": now.isoformat() + "Z"
375
  }
376
  }
 
17
  from services.s3_manager import get_s3_manager
18
  from services.schemas import (
19
  PipelineSchema, ComponentMetadata, PipelineStatus, ComponentStatus,
20
+ FinalOutputS3, ComponentOutputS3, schema_to_dict,
21
+ generate_component_id, generate_output_id
22
  )
23
  import os
24
 
 
146
 
147
  for i, step in enumerate(pipeline_steps):
148
  component = ComponentMetadata(
149
+ component_id=generate_component_id(),
150
  component_name=step.get("tool_name", "unknown"),
151
  order=i + 1,
152
  status=ComponentStatus.PENDING
 
158
  self.s3.upload_json(s3_key, pipeline_definition, add_prefix=False)
159
 
160
  # Create pipeline record
161
+ output_id = generate_output_id()
162
  pipeline_record = PipelineSchema(
163
  execution_id=execution_id,
164
  session_id=session_id,
 
168
  created_by_message=created_by_message,
169
  status=PipelineStatus.PROPOSED,
170
  pipeline_definition_s3_key=s3_key,
171
+ components=components,
172
+ output_id=output_id
173
  )
174
 
175
  # Insert into MongoDB
 
224
  comp["success_message"] = success_message
225
 
226
  if error:
227
+ comp["error"] = {"message": error} if isinstance(error, str) else error
228
+ comp["hasError"] = True
229
+ else:
230
+ comp["hasError"] = False
231
+
232
+ # Store component output
233
+ if output:
234
+ comp["component_output"] = output
235
 
236
  component_found = True
237
  break
 
307
  component_outputs = []
308
  for i, result in enumerate(components_results):
309
  comp_output = ComponentOutputS3(
310
+ component_id=result.get("component_id", generate_component_id()),
311
  name=result.get("component_name", f"component_{i+1}"),
312
  order=i + 1,
313
  status=result.get("status", "unknown"),
314
+ component_output=result.get("result"),
315
+ hasError=result.get("status") == "failed",
316
+ error={"message": result.get("error")} if result.get("error") else None,
317
  success_message=result.get("success_message"),
318
+ metadata=result.get("metadata"),
319
  result=result.get("result")
320
  )
321
  component_outputs.append(comp_output.model_dump())
 
350
  "final_output_presigned_expires_at": presigned["presigned_expires_at"],
351
  "completed_at": now.isoformat() + "Z",
352
  "executor": executor,
353
+ "result": result_preview,
354
+ "hasError": workflow_status == "failed",
355
+ "error": {"message": "Pipeline execution failed"} if workflow_status == "failed" else None
356
  }
357
  }
358
  )
 
387
  {
388
  "$set": {
389
  "status": PipelineStatus.FAILED,
390
+ "error": {"message": error} if isinstance(error, str) else error,
391
+ "hasError": True,
392
  "completed_at": now.isoformat() + "Z"
393
  }
394
  }
services/schemas.py CHANGED
@@ -13,6 +13,7 @@ from typing import Dict, Any, List, Optional
13
  from datetime import datetime
14
  from pydantic import BaseModel, Field
15
  from enum import Enum
 
16
 
17
 
18
  # ========================
@@ -24,6 +25,9 @@ class SessionState(str, Enum):
24
  INITIAL = "initial"
25
  PIPELINE_PROPOSED = "pipeline_proposed"
26
  EXECUTING = "executing"
 
 
 
27
 
28
 
29
  class PipelineStatus(str, Enum):
@@ -38,7 +42,9 @@ class ComponentStatus(str, Enum):
38
  """Component execution states"""
39
  PENDING = "pending"
40
  EXECUTING = "executing"
 
41
  COMPLETED = "completed"
 
42
  FAILED = "failed"
43
 
44
 
@@ -55,13 +61,17 @@ class MessageRole(str, Enum):
55
 
56
  class ComponentMetadata(BaseModel):
57
  """Component execution metadata within a pipeline"""
 
58
  component_name: str
59
  order: int
60
  status: ComponentStatus = ComponentStatus.PENDING
61
  started_at: Optional[datetime] = None
62
  completed_at: Optional[datetime] = None
 
 
 
63
  success_message: Optional[str] = None # Component-wise success message
64
- error: Optional[str] = None
65
 
66
 
67
  class SessionStats(BaseModel):
@@ -127,13 +137,15 @@ class PipelineSchema(BaseModel):
127
  pipeline_definition_s3_key: str # Full pipeline definition in S3
128
  components: List[ComponentMetadata] # Component metadata only
129
  final_output_s3_key: Optional[str] = None # Contains ALL component outputs
 
130
  final_output_presigned_url: Optional[str] = None
131
  final_output_presigned_expires_at: Optional[datetime] = None
132
  executed_at: Optional[datetime] = None
133
  completed_at: Optional[datetime] = None
134
  executor: Optional[str] = None # "bedrock", "crewai", "langchain"
135
- result_preview: Optional[str] = None # First 500 chars of result
136
- error: Optional[str] = None
 
137
  metadata: Dict[str, Any] = Field(default_factory=dict)
138
 
139
  class Config:
@@ -198,11 +210,15 @@ class MessageS3Content(BaseModel):
198
 
199
  class ComponentOutputS3(BaseModel):
200
  """Schema for individual component output in final_output.json"""
 
201
  name: str
202
  order: int
203
  status: str
 
 
 
204
  success_message: Optional[str] = None
205
- error: Optional[str] = None
206
  result: Optional[Dict[str, Any]] = None
207
 
208
 
@@ -232,6 +248,20 @@ class PipelineDefinitionS3(BaseModel):
232
  # UTILITY FUNCTIONS
233
  # ========================
234
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
235
  def datetime_to_iso(dt: Optional[datetime]) -> Optional[str]:
236
  """Convert datetime to ISO string"""
237
  return dt.isoformat() + "Z" if dt else None
 
13
  from datetime import datetime
14
  from pydantic import BaseModel, Field
15
  from enum import Enum
16
+ import uuid
17
 
18
 
19
  # ========================
 
25
  INITIAL = "initial"
26
  PIPELINE_PROPOSED = "pipeline_proposed"
27
  EXECUTING = "executing"
28
+ IN_PROGRESS = "in_progress" # Pipeline is running
29
+ COMPLETED = "completed" # Pipeline completed successfully
30
+ FAILED = "failed" # Pipeline failed
31
 
32
 
33
  class PipelineStatus(str, Enum):
 
42
  """Component execution states"""
43
  PENDING = "pending"
44
  EXECUTING = "executing"
45
+ RUNNING = "running" # Alias for executing
46
  COMPLETED = "completed"
47
+ SUCCESS = "success" # Alias for completed
48
  FAILED = "failed"
49
 
50
 
 
61
 
62
  class ComponentMetadata(BaseModel):
63
  """Component execution metadata within a pipeline"""
64
+ component_id: str # Unique identifier for component
65
  component_name: str
66
  order: int
67
  status: ComponentStatus = ComponentStatus.PENDING
68
  started_at: Optional[datetime] = None
69
  completed_at: Optional[datetime] = None
70
+ component_output: Optional[Any] = None # Output from component execution
71
+ hasError: bool = False # Error flag
72
+ error: Optional[Dict[str, Any]] = None # Error details {error_code, message, details}
73
  success_message: Optional[str] = None # Component-wise success message
74
+ metadata: Optional[Dict[str, Any]] = None # Duration, tokens, etc.
75
 
76
 
77
  class SessionStats(BaseModel):
 
137
  pipeline_definition_s3_key: str # Full pipeline definition in S3
138
  components: List[ComponentMetadata] # Component metadata only
139
  final_output_s3_key: Optional[str] = None # Contains ALL component outputs
140
+ output_id: Optional[str] = None # ID for downloading output
141
  final_output_presigned_url: Optional[str] = None
142
  final_output_presigned_expires_at: Optional[datetime] = None
143
  executed_at: Optional[datetime] = None
144
  completed_at: Optional[datetime] = None
145
  executor: Optional[str] = None # "bedrock", "crewai", "langchain"
146
+ result: Optional[str] = None # Renamed from result_preview - output if completed
147
+ hasError: bool = False # Error flag
148
+ error: Optional[Dict[str, Any]] = None # Error details
149
  metadata: Dict[str, Any] = Field(default_factory=dict)
150
 
151
  class Config:
 
210
 
211
  class ComponentOutputS3(BaseModel):
212
  """Schema for individual component output in final_output.json"""
213
+ component_id: str
214
  name: str
215
  order: int
216
  status: str
217
+ component_output: Optional[Any] = None
218
+ hasError: bool = False
219
+ error: Optional[Dict[str, Any]] = None
220
  success_message: Optional[str] = None
221
+ metadata: Optional[Dict[str, Any]] = None
222
  result: Optional[Dict[str, Any]] = None
223
 
224
 
 
248
  # UTILITY FUNCTIONS
249
  # ========================
250
 
251
+ def generate_message_id() -> str:
252
+ """Generate unique message ID"""
253
+ return f"msg_{uuid.uuid4().hex[:12]}"
254
+
255
+
256
+ def generate_component_id() -> str:
257
+ """Generate unique component ID"""
258
+ return f"comp_{uuid.uuid4().hex[:12]}"
259
+
260
+
261
+ def generate_output_id() -> str:
262
+ """Generate unique output ID for downloads"""
263
+ return f"output_{uuid.uuid4().hex[:12]}"
264
+
265
  def datetime_to_iso(dt: Optional[datetime]) -> Optional[str]:
266
  """Convert datetime to ISO string"""
267
  return dt.isoformat() + "Z" if dt else None
services/session_manager.py CHANGED
@@ -20,7 +20,8 @@ from pymongo.errors import DuplicateKeyError, ConnectionFailure
20
  from services.s3_manager import get_s3_manager
21
  from services.schemas import (
22
  SessionSchema, MessageSchema, SessionState, MessageRole,
23
- SessionStats, MessageS3Content, schema_to_dict, datetime_to_iso
 
24
  )
25
 
26
 
@@ -110,7 +111,7 @@ class SessionManager:
110
  Returns:
111
  Message metadata dict
112
  """
113
- message_id = str(uuid.uuid4())
114
  now = datetime.utcnow()
115
 
116
  # Store full message content in S3
 
20
  from services.s3_manager import get_s3_manager
21
  from services.schemas import (
22
  SessionSchema, MessageSchema, SessionState, MessageRole,
23
+ SessionStats, MessageS3Content, schema_to_dict, datetime_to_iso,
24
+ generate_message_id
25
  )
26
 
27
 
 
111
  Returns:
112
  Message metadata dict
113
  """
114
+ message_id = generate_message_id()
115
  now = datetime.utcnow()
116
 
117
  # Store full message content in S3