redhairedshanks1 commited on
Commit
3d55ccd
Β·
1 Parent(s): cfa5d53

Update api_routes_v2.py

Browse files
Files changed (1) hide show
  1. api_routes_v2.py +369 -60
api_routes_v2.py CHANGED
@@ -421,6 +421,9 @@ from services.pipeline_executor import execute_pipeline_streaming, execute_pipel
421
  from services.session_manager import session_manager
422
  from services.intent_classifier import intent_classifier
423
 
 
 
 
424
  router = APIRouter(prefix="/api/v2", tags=["MasterLLM API V2 - Enhanced"])
425
 
426
  # ========================
@@ -468,6 +471,199 @@ class ChatResponse(BaseModel):
468
  fileUrl: Optional[str] = None
469
 
470
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
471
  # ========================
472
  # HELPERS
473
  # ========================
@@ -532,10 +728,31 @@ def _normalize_history_for_api(chat_id: str) -> List[Message]:
532
  return history
533
 
534
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
535
  def _add_and_mirror_message(chat_id: str, role: str, content: str):
536
  """
537
  Add a message via session_manager AND mirror it into the session doc
538
  so history is always available in responses.
 
 
539
  """
540
  session_manager.add_message(chat_id, role, content)
541
  try:
@@ -545,6 +762,7 @@ def _add_and_mirror_message(chat_id: str, role: str, content: str):
545
  "role": role,
546
  "content": content if isinstance(content, str) else json.dumps(content, ensure_ascii=False),
547
  "timestamp": datetime.utcnow().isoformat() + "Z",
 
548
  })
549
  session_manager.update_session(chat_id, {"messages": msgs})
550
  except Exception:
@@ -634,6 +852,9 @@ def upload_stream_to_s3(chat_id: str, file: UploadFile) -> str:
634
  """
635
  Stream an UploadFile directly to S3, return s3:// URI.
636
  Supports optional SSE via env S3_SSE and S3_KMS_KEY_ID.
 
 
 
637
  """
638
  key = f"{S3_PREFIX}/{chat_id}/{file.filename}"
639
  config = TransferConfig(multipart_threshold=8 * 1024 * 1024, max_concurrency=4)
@@ -665,8 +886,10 @@ def upload_stream_to_s3(chat_id: str, file: UploadFile) -> str:
665
  )
666
 
667
  s3_uri = f"s3://{S3_BUCKET}/{key}"
668
-
669
- # Store file metadata in session
 
 
670
  session_manager.update_session(chat_id, {
671
  "current_file": s3_uri,
672
  "state": "initial",
@@ -674,10 +897,11 @@ def upload_stream_to_s3(chat_id: str, file: UploadFile) -> str:
674
  "has_file": True,
675
  "file_name": file.filename,
676
  "file_url": s3_uri,
677
- "uploaded_at": datetime.utcnow().isoformat() + "Z"
 
678
  }
679
  })
680
-
681
  # Create a user message with file metadata (instead of system message)
682
  file_message = {
683
  "role": "user",
@@ -689,13 +913,13 @@ def upload_stream_to_s3(chat_id: str, file: UploadFile) -> str:
689
  "file_url": s3_uri
690
  }
691
  }
692
-
693
  # Get existing messages and add the file message
694
  s = session_manager.get_session(chat_id) or {}
695
  msgs = list(s.get("messages", []))
696
  msgs.append(file_message)
697
  session_manager.update_session(chat_id, {"messages": msgs})
698
-
699
  return s3_uri
700
 
701
 
@@ -711,25 +935,19 @@ async def get_all_sessions(
711
  ):
712
  """
713
  Get all session IDs from the database with optional pagination
714
-
715
- Args:
716
- limit: Maximum number of sessions to return (default: 100)
717
- skip: Number of sessions to skip (for pagination)
718
- include_stats: If True, include session statistics
719
-
720
- Returns:
721
- List of sessions with basic info or full stats
722
  """
723
  try:
724
  all_session_ids = session_manager.get_all_session_ids()
725
-
726
  if not all_session_ids:
727
  return {"sessions": [], "pagination": {"total": 0, "returned": 0}}
728
-
729
  # Apply pagination
730
  total_sessions = len(all_session_ids)
731
  paginated_ids = all_session_ids[skip:skip + limit]
732
-
733
  if not include_stats:
734
  # Return just session IDs with pagination info
735
  sessions_basic = [
@@ -740,7 +958,7 @@ async def get_all_sessions(
740
  }
741
  for sid in paginated_ids
742
  ]
743
-
744
  return {
745
  "sessions": sessions_basic,
746
  "pagination": {
@@ -751,7 +969,7 @@ async def get_all_sessions(
751
  "has_more": total_sessions > (skip + limit)
752
  }
753
  }
754
-
755
  # Include detailed statistics for each session
756
  sessions_with_stats = []
757
  for session_id in paginated_ids:
@@ -760,12 +978,12 @@ async def get_all_sessions(
760
  # Format datetime objects for JSON serialization
761
  created_at = session.get("created_at")
762
  last_activity = session.get("last_activity")
763
-
764
  if isinstance(created_at, datetime):
765
  created_at = created_at.isoformat()
766
  if isinstance(last_activity, datetime):
767
  last_activity = last_activity.isoformat()
768
-
769
  sessions_with_stats.append({
770
  "session_id": session_id,
771
  "user_id": session.get("user_id"),
@@ -773,11 +991,12 @@ async def get_all_sessions(
773
  "last_activity": last_activity,
774
  "state": session.get("state", "unknown"),
775
  "current_file": session.get("current_file"),
 
776
  "stats": session.get("stats", {}),
777
  "total_messages": len(session.get("conversation_history", [])),
778
  "pipeline_executions_count": len(session.get("pipeline_executions", []))
779
  })
780
-
781
  return {
782
  "sessions": sessions_with_stats,
783
  "pagination": {
@@ -788,7 +1007,7 @@ async def get_all_sessions(
788
  "has_more": total_sessions > (skip + limit)
789
  }
790
  }
791
-
792
  except Exception as e:
793
  raise HTTPException(
794
  status_code=500,
@@ -807,18 +1026,13 @@ async def get_session_history(
807
  ):
808
  """
809
  Get conversation history for a session
810
-
811
- Args:
812
- session_id: Session identifier
813
- limit: Maximum number of messages to return (default: 50)
814
-
815
- Returns:
816
- Session history including session ID
817
  """
818
  try:
819
  # Get the history using session manager
820
  history = session_manager.get_session_history(session_id, limit)
821
-
822
  # Format datetime objects to ISO strings for JSON serialization
823
  formatted_history = []
824
  for msg in history:
@@ -826,15 +1040,19 @@ async def get_session_history(
826
  if "timestamp" in msg_copy and isinstance(msg_copy["timestamp"], datetime):
827
  msg_copy["timestamp"] = msg_copy["timestamp"].isoformat()
828
  formatted_history.append(msg_copy)
829
-
 
 
830
  # Return with session_id included
831
  return {
832
  "session_id": session_id,
833
  "history": formatted_history,
834
  "count": len(formatted_history),
835
- "limit": limit
 
 
836
  }
837
-
838
  except Exception as e:
839
  raise HTTPException(
840
  status_code=500,
@@ -861,6 +1079,13 @@ async def chat_unified(
861
  - Handles casual chat, pipeline request, approve/reject, and edits.
862
  - On approval, executes the pipeline (non-stream) and returns the final result.
863
  Returns assistant_response + full history (role/content).
 
 
 
 
 
 
 
864
  """
865
 
866
  # Support JSON payloads too
@@ -886,13 +1111,21 @@ async def chat_unified(
886
  file_info = None
887
  if file is not None:
888
  s3_uri = upload_stream_to_s3(chat_id, file)
889
- file_info = {"bucket": S3_BUCKET, "key": s3_uri.split(f"s3://{S3_BUCKET}/", 1)[1], "s3_uri": s3_uri}
 
 
 
 
 
 
 
 
890
  session = _get_session_or_init(chat_id)
891
 
892
  # If no message and only a file was sent, respond with an acknowledgement
893
  if (message is None or str(message).strip() == "") and file_info:
894
  friendly = "πŸ“ File uploaded successfully. Tell me what you'd like to do with it (e.g., extract text, get tables, summarize)."
895
- api_data = {"type": "file_uploaded", "file": file_info, "next_action": "send_instruction"}
896
  return _assistant_response_payload(chat_id, friendly, {"intent": "file_uploaded"}, api_data, session.get("state", "initial"))
897
 
898
  # If still no message, nudge the user
@@ -903,6 +1136,8 @@ async def chat_unified(
903
 
904
  # Add user message
905
  _add_and_mirror_message(chat_id, "user", message)
 
 
906
 
907
  # Classify intent
908
  intent_data = intent_classifier.classify_intent(message)
@@ -964,10 +1199,11 @@ async def chat_unified(
964
  prefer_bedrock=bool(prefer_bedrock),
965
  )
966
  session_manager.update_session(chat_id, {"pipeline_result": result, "state": "initial"})
967
- friendly = (
968
- f"πŸŽ‰ Pipeline completed successfully!\n"
969
- f"βœ… All done! What else would you like me to help you with?"
970
- )
 
971
  api_data = {
972
  "type": "pipeline_completed",
973
  "result": result,
@@ -976,6 +1212,7 @@ async def chat_unified(
976
  return _assistant_response_payload(chat_id, friendly, {"intent": "pipeline_execute"}, api_data, "initial")
977
  except Exception as e:
978
  session_manager.update_session(chat_id, {"state": "initial"})
 
979
  friendly = f"❌ Pipeline execution failed: {str(e)}"
980
  api_data = {"type": "error", "error_code": "PIPELINE_EXECUTION_FAILED", "message": str(e)}
981
  return _assistant_response_payload(chat_id, friendly, {"intent": "pipeline_execute"}, api_data, "initial")
@@ -1015,6 +1252,7 @@ async def chat_unified(
1015
  )
1016
 
1017
  session_manager.update_session(chat_id, {"proposed_pipeline": pipeline, "state": "pipeline_proposed"})
 
1018
 
1019
  pipeline_name = pipeline.get("pipeline_name", "Document Processing")
1020
  steps_list = pipeline.get("pipeline_steps", [])
@@ -1074,6 +1312,7 @@ async def chat_unified(
1074
  prefer_bedrock=bool(prefer_bedrock)
1075
  )
1076
  session_manager.update_session(chat_id, {"proposed_pipeline": new_pipeline, "state": "pipeline_proposed"})
 
1077
 
1078
  formatted = format_pipeline_for_display(new_pipeline)
1079
  friendly = formatted + f"\n\n```json\n{json.dumps(new_pipeline, indent=2)}\n```"
@@ -1147,6 +1386,12 @@ async def chat_unified_stream(
1147
  - Accepts multipart/form-data (file + message) OR JSON.
1148
  - Uploads file if included.
1149
  - On approval, streams execution progress and final result.
 
 
 
 
 
 
1150
  """
1151
 
1152
  # Parse JSON if needed
@@ -1172,7 +1417,14 @@ async def chat_unified_stream(
1172
  uploaded_file_info = None
1173
  if file is not None:
1174
  s3_uri = upload_stream_to_s3(chat_id, file)
1175
- uploaded_file_info = {"bucket": S3_BUCKET, "key": s3_uri.split(f"s3://{S3_BUCKET}/", 1)[1], "s3_uri": s3_uri}
 
 
 
 
 
 
 
1176
  session = _get_session_or_init(chat_id)
1177
 
1178
  def emit(obj: Dict[str, Any]) -> bytes:
@@ -1199,6 +1451,8 @@ async def chat_unified_stream(
1199
 
1200
  # Add user message
1201
  _add_and_mirror_message(chat_id, "user", message)
 
 
1202
 
1203
  # Classify
1204
  intent_data = intent_classifier.classify_intent(message)
@@ -1237,6 +1491,7 @@ async def chat_unified_stream(
1237
  prefer_bedrock=bool(prefer_bedrock),
1238
  )
1239
  session_manager.update_session(chat_id, {"proposed_pipeline": pipeline, "state": "pipeline_proposed"})
 
1240
 
1241
  pipeline_name = pipeline.get("pipeline_name", "Document Processing")
1242
  steps_list = pipeline.get("pipeline_steps", [])
@@ -1323,6 +1578,7 @@ async def chat_unified_stream(
1323
  friendly_err = f"❌ Pipeline Failed\n\nError: {err}\n\nCompleted {len(steps_completed)} step(s) before failure."
1324
  session_manager.update_session(chat_id, {"state": "initial"})
1325
  _add_and_mirror_message(chat_id, "assistant", friendly_err)
 
1326
  yield emit({"type": "assistant_final", "content": friendly_err, "error": err, "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
1327
  return
1328
 
@@ -1336,13 +1592,17 @@ async def chat_unified_stream(
1336
  executor=executor_used
1337
  )
1338
  success_count = len([s for s in steps_completed if s.get("status") == "completed"])
 
 
 
 
1339
  friendly_final = (
1340
  f"πŸŽ‰ Pipeline Completed Successfully!\n"
1341
  f"- Pipeline: {plan.get('pipeline_name', 'Document Processing')}\n"
1342
  f"- Total Steps: {len(steps_completed)}\n"
1343
  f"- Successful: {success_count}\n"
1344
- f"- Executor: {executor_used}\n"
1345
- f"βœ… All done! What else would you like me to help you with?"
1346
  )
1347
  _add_and_mirror_message(chat_id, "assistant", friendly_final)
1348
  yield emit({"type": "assistant_final", "content": friendly_final, "result": final_payload, "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
@@ -1358,6 +1618,7 @@ async def chat_unified_stream(
1358
  friendly_err = f"❌ Pipeline Execution Failed\n\nError: {str(e)}"
1359
  session_manager.update_session(chat_id, {"state": "initial"})
1360
  _add_and_mirror_message(chat_id, "assistant", friendly_err)
 
1361
  yield emit({"type": "assistant_final", "content": friendly_err, "error": str(e), "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
1362
  return
1363
  finally:
@@ -1383,6 +1644,7 @@ async def chat_unified_stream(
1383
  prefer_bedrock=bool(prefer_bedrock)
1384
  )
1385
  session_manager.update_session(chat_id, {"proposed_pipeline": new_pipeline, "state": "pipeline_proposed"})
 
1386
  formatted = format_pipeline_for_display(new_pipeline)
1387
  friendly = formatted + f"\n\n```json\n{json.dumps(new_pipeline, indent=2)}\n```"
1388
  _add_and_mirror_message(chat_id, "assistant", friendly)
@@ -1412,6 +1674,11 @@ async def smart_chat(request: ChatRequest):
1412
  """
1413
  Kept for compatibility with existing clients (non-stream).
1414
  For a single all-in-one endpoint, use /api/v2/chat/unified.
 
 
 
 
 
1415
  """
1416
  chat_id = _ensure_chat(request.chat_id)
1417
  session = _get_session_or_init(chat_id)
@@ -1421,6 +1688,8 @@ async def smart_chat(request: ChatRequest):
1421
  session = _get_session_or_init(chat_id)
1422
 
1423
  _add_and_mirror_message(chat_id, "user", request.message)
 
 
1424
 
1425
  intent_data = intent_classifier.classify_intent(request.message)
1426
  current_state = session.get("state", "initial")
@@ -1504,6 +1773,7 @@ async def smart_chat(request: ChatRequest):
1504
  )
1505
 
1506
  session_manager.update_session(chat_id, {"proposed_pipeline": pipeline, "state": "pipeline_proposed"})
 
1507
 
1508
  pipeline_name = pipeline.get("pipeline_name", "Document Processing")
1509
  steps_list = pipeline.get("pipeline_steps", [])
@@ -1562,6 +1832,7 @@ async def smart_chat(request: ChatRequest):
1562
  prefer_bedrock=request.prefer_bedrock
1563
  )
1564
  session_manager.update_session(chat_id, {"proposed_pipeline": new_pipeline, "state": "pipeline_proposed"})
 
1565
 
1566
  formatted = format_pipeline_for_display(new_pipeline)
1567
  friendly = formatted + f"\n\n```json\n{json.dumps(new_pipeline, indent=2)}\n```"
@@ -1621,6 +1892,12 @@ def smart_chat_stream(request: ChatRequest):
1621
  """
1622
  Kept for compatibility (streaming NDJSON).
1623
  For the all-in-one streaming flow, use /api/v2/chat/unified/stream.
 
 
 
 
 
 
1624
  """
1625
 
1626
  def gen() -> Generator[bytes, None, None]:
@@ -1632,6 +1909,8 @@ def smart_chat_stream(request: ChatRequest):
1632
  session = _get_session_or_init(chat_id)
1633
 
1634
  _add_and_mirror_message(chat_id, "user", request.message)
 
 
1635
 
1636
  intent_data = intent_classifier.classify_intent(request.message)
1637
  current_state = session.get("state", "initial")
@@ -1673,6 +1952,7 @@ def smart_chat_stream(request: ChatRequest):
1673
  prefer_bedrock=request.prefer_bedrock
1674
  )
1675
  session_manager.update_session(chat_id, {"proposed_pipeline": pipeline, "state": "pipeline_proposed"})
 
1676
 
1677
  pipeline_name = pipeline.get("pipeline_name", "Document Processing")
1678
  steps_list = pipeline.get("pipeline_steps", [])
@@ -1759,9 +2039,10 @@ def smart_chat_stream(request: ChatRequest):
1759
 
1760
  elif etype == "error":
1761
  err = event.get("error", "Unknown error")
1762
- friendly_err = f"❌ Pipeline Failed\n\nError: {err}\n\nCompleted {len(steps_completed)} step(s) before failure."
1763
  session_manager.update_session(chat_id, {"state": "initial"})
1764
  _add_and_mirror_message(chat_id, "assistant", friendly_err)
 
1765
  yield emit({"type": "assistant_final", "content": friendly_err, "error": err, "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
1766
  return
1767
 
@@ -1775,13 +2056,16 @@ def smart_chat_stream(request: ChatRequest):
1775
  executor=executor_used
1776
  )
1777
  success_count = len([s for s in steps_completed if s.get("status") == "completed"])
 
 
 
1778
  friendly_final = (
1779
  f"πŸŽ‰ Pipeline Completed Successfully!\n"
1780
  f"- Pipeline: {plan.get('pipeline_name', 'Document Processing')}\n"
1781
  f"- Total Steps: {len(steps_completed)}\n"
1782
  f"- Successful: {success_count}\n"
1783
- f"- Executor: {executor_used}\n"
1784
- f"βœ… All done! What else would you like me to help you with?"
1785
  )
1786
  _add_and_mirror_message(chat_id, "assistant", friendly_final)
1787
  yield emit({"type": "assistant_final", "content": friendly_final, "result": final_payload, "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
@@ -1797,6 +2081,7 @@ def smart_chat_stream(request: ChatRequest):
1797
  friendly_err = f"❌ Pipeline Execution Failed\n\nError: {str(e)}\n\nCompleted {len(steps_completed)} step(s) before failure."
1798
  session_manager.update_session(chat_id, {"state": "initial"})
1799
  _add_and_mirror_message(chat_id, "assistant", friendly_err)
 
1800
  yield emit({"type": "assistant_final", "content": friendly_err, "error": str(e), "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
1801
  return
1802
  finally:
@@ -1822,6 +2107,7 @@ def smart_chat_stream(request: ChatRequest):
1822
  prefer_bedrock=request.prefer_bedrock
1823
  )
1824
  session_manager.update_session(chat_id, {"proposed_pipeline": new_pipeline, "state": "pipeline_proposed"})
 
1825
  formatted = format_pipeline_for_display(new_pipeline)
1826
  friendly = formatted + f"\n\n```json\n{json.dumps(new_pipeline, indent=2)}\n```"
1827
  _add_and_mirror_message(chat_id, "assistant", friendly)
@@ -1846,6 +2132,10 @@ def smart_chat_stream(request: ChatRequest):
1846
 
1847
  @router.post("/chats/{chat_id}/pipeline/execute", response_model=ChatResponse)
1848
  async def execute_pipeline_now(chat_id: str):
 
 
 
 
1849
  session = session_manager.get_session(chat_id)
1850
  if not session:
1851
  raise HTTPException(status_code=404, detail="Chat not found")
@@ -1865,11 +2155,16 @@ async def execute_pipeline_now(chat_id: str):
1865
  prefer_bedrock=True
1866
  )
1867
  session_manager.update_session(chat_id, {"pipeline_result": result, "state": "initial"})
1868
- friendly = "πŸŽ‰ Pipeline completed. Ready for your next task!"
 
 
 
 
1869
  api_data = {"type": "pipeline_completed", "result": result, "pipeline": plan}
1870
  return _assistant_response_payload(chat_id, friendly, {"intent": "pipeline_execute"}, api_data, "initial")
1871
  except Exception as e:
1872
  session_manager.update_session(chat_id, {"state": "initial"})
 
1873
  friendly = f"❌ Pipeline execution failed: {str(e)}"
1874
  api_data = {"type": "error", "error_code": "PIPELINE_EXECUTION_FAILED", "message": str(e)}
1875
  return _assistant_response_payload(chat_id, friendly, {"intent": "pipeline_execute"}, api_data, "initial")
@@ -1932,6 +2227,9 @@ def create_chat():
1932
 
1933
  @router.get("/chats/{chat_id}")
1934
  def get_chat(chat_id: str):
 
 
 
1935
  s = session_manager.get_session(chat_id)
1936
  if not s:
1937
  raise HTTPException(status_code=404, detail="Chat not found")
@@ -1941,6 +2239,8 @@ def get_chat(chat_id: str):
1941
  "current_file": s.get("current_file"),
1942
  "created_at": s.get("created_at"),
1943
  "updated_at": s.get("updated_at"),
 
 
1944
  }
1945
 
1946
 
@@ -1955,32 +2255,28 @@ def get_chat(chat_id: str):
1955
  def get_chat_history(chat_id: str):
1956
  """
1957
  Get conversation history for a specific chat
1958
-
1959
- Args:
1960
- chat_id: Chat identifier
1961
-
1962
- Returns:
1963
- Chat history with session ID and file metadata
1964
  """
1965
  s = session_manager.get_session(chat_id)
1966
  if not s:
1967
  raise HTTPException(status_code=404, detail="Chat not found")
1968
-
1969
  # Get the normalized history (system messages filtered out)
1970
  history = [m.dict() for m in _normalize_history_for_api(chat_id)]
1971
-
1972
  # Get file metadata
1973
  file_metadata = s.get("file_metadata", {})
1974
-
1975
  # Format datetime objects for JSON serialization
1976
  created_at = s.get("created_at")
1977
  updated_at = s.get("updated_at")
1978
-
1979
  if isinstance(created_at, datetime):
1980
  created_at = created_at.isoformat()
1981
  if isinstance(updated_at, datetime):
1982
  updated_at = updated_at.isoformat()
1983
-
1984
  # Return with session_id and file metadata
1985
  return {
1986
  "session_id": chat_id,
@@ -1992,7 +2288,9 @@ def get_chat_history(chat_id: str):
1992
  "updated_at": updated_at,
1993
  "file": file_metadata.get("has_file", False),
1994
  "fileName": file_metadata.get("file_name"),
1995
- "fileUrl": file_metadata.get("file_url")
 
 
1996
  }
1997
 
1998
 
@@ -2017,12 +2315,23 @@ async def send_message_to_chat(chat_id: str, payload: ChatRequest):
2017
 
2018
  @router.post("/chats/{chat_id}/upload")
2019
  async def upload_file_to_chat(chat_id: str, file: UploadFile = File(...)):
 
 
 
 
2020
  chat_id = _ensure_chat(chat_id)
2021
  s3_uri = upload_stream_to_s3(chat_id, file)
 
2022
  return {
2023
  "status": "success",
2024
  "message": "File uploaded to S3",
2025
- "file": {"bucket": S3_BUCKET, "key": s3_uri.split(f"s3://{S3_BUCKET}/", 1)[1], "s3_uri": s3_uri},
 
 
 
 
 
 
2026
  "chat_id": chat_id,
2027
  "next_action": "πŸ’¬ Now tell me what you'd like to do with this document"
2028
  }
 
421
  from services.session_manager import session_manager
422
  from services.intent_classifier import intent_classifier
423
 
424
+ # Add or update this import near the top (with other imports)
425
+ from datetime import datetime, timedelta
426
+
427
  router = APIRouter(prefix="/api/v2", tags=["MasterLLM API V2 - Enhanced"])
428
 
429
  # ========================
 
471
  fileUrl: Optional[str] = None
472
 
473
 
474
+
475
+ ## helpers for presigned url chat name and some more updates
476
+
477
+ def _resolve_bedrock_model_for_titles(session: Dict[str, Any]) -> str:
478
+ """
479
+ CHANGE: NEW helper.
480
+ Use the same model as other tasks if available (session.proposed_pipeline._model).
481
+ Fallback to env, then a sane default.
482
+ """
483
+ try:
484
+ model = (session.get("proposed_pipeline") or {}).get("_model")
485
+ except Exception:
486
+ model = None
487
+ if not model:
488
+ model = os.getenv("BEDROCK_MODEL_ID") or os.getenv("BEDROCK_DEFAULT_MODEL") or "anthropic.claude-3-haiku-20240307"
489
+ return model
490
+
491
+
492
+ def _bedrock_invoke_title(model_id: str, prompt_text: str) -> str:
493
+ """
494
+ CHANGE: NEW helper.
495
+ Minimal Bedrock invocation for Anthropic/Titan models to produce a short title.
496
+ If invocation fails, returns 'New Chat'.
497
+ """
498
+ try:
499
+ bedrock_runtime = boto3.client("bedrock-runtime", region_name=AWS_REGION)
500
+ if model_id.startswith("anthropic."):
501
+ # Anthropic Messages on Bedrock
502
+ body = {
503
+ "anthropic_version": "bedrock-2023-05-31",
504
+ "max_tokens": 48,
505
+ "temperature": 0.2,
506
+ "messages": [{"role": "user", "content": [{"type": "text", "text": prompt_text}]}],
507
+ }
508
+ resp = bedrock_runtime.invoke_model(
509
+ modelId=model_id,
510
+ accept="application/json",
511
+ contentType="application/json",
512
+ body=json.dumps(body),
513
+ )
514
+ payload = json.loads(resp["body"].read())
515
+ # Extract first text piece
516
+ text = ""
517
+ if isinstance(payload.get("content"), list) and payload["content"]:
518
+ part = payload["content"][0]
519
+ if isinstance(part, dict):
520
+ text = part.get("text") or ""
521
+ return (text or "").strip().strip('"').strip() or "New Chat"
522
+ else:
523
+ # Titan text models (or similar)
524
+ body = {
525
+ "inputText": prompt_text,
526
+ "textGenerationConfig": {"maxTokenCount": 64, "temperature": 0.2, "stopSequences": []},
527
+ }
528
+ resp = bedrock_runtime.invoke_model(
529
+ modelId=model_id,
530
+ accept="application/json",
531
+ contentType="application/json",
532
+ body=json.dumps(body),
533
+ )
534
+ payload = json.loads(resp["body"].read())
535
+ results = payload.get("results") or []
536
+ if results:
537
+ return (results[0].get("outputText") or "").strip().strip('"').strip() or "New Chat"
538
+ return "New Chat"
539
+ except Exception:
540
+ return "New Chat"
541
+
542
+
543
+ def _maybe_generate_chat_name(chat_id: str):
544
+ """
545
+ CHANGE: NEW helper.
546
+ Auto-generate a succinct chat title after the first real user message (not the 'Uploaded file:' stub).
547
+ Uses the same Bedrock model as used elsewhere where possible.
548
+ """
549
+ try:
550
+ s = session_manager.get_session(chat_id) or {}
551
+ if s.get("chat_name"):
552
+ return
553
+ msgs = list(s.get("messages", []))
554
+ first_user = None
555
+ for m in msgs:
556
+ if (m.get("role") or "") == "user":
557
+ content = (m.get("content") or "").strip()
558
+ if not content.lower().startswith("uploaded file:"):
559
+ first_user = content
560
+ break
561
+ if not first_user:
562
+ return
563
+ file_name = (s.get("file_metadata") or {}).get("file_name")
564
+ prompt = (
565
+ "Create a succinct, descriptive 3–6 word title for this chat session based on the first user message.\n"
566
+ "Return only the title, without quotes.\n\n"
567
+ f"First message: {first_user}\n"
568
+ f"File name (optional): {file_name or 'N/A'}"
569
+ )
570
+ model_id = _resolve_bedrock_model_for_titles(s)
571
+ title = _bedrock_invoke_title(model_id, prompt) or "New Chat"
572
+ session_manager.update_session(
573
+ chat_id,
574
+ {
575
+ "chat_name": title[:100],
576
+ "chat_name_generated_at": datetime.utcnow().isoformat() + "Z",
577
+ "chat_name_model": model_id,
578
+ },
579
+ )
580
+ except Exception:
581
+ # Do not block flow on errors
582
+ pass
583
+
584
+
585
+ def _generate_presigned_get_url(bucket: str, key: str, expires_in: int = 604800) -> Dict[str, str]:
586
+ """
587
+ CHANGE: NEW helper.
588
+ Generate a presigned S3 GET URL with max expiry (7 days). We only generate once on upload.
589
+ """
590
+ try:
591
+ url = s3.generate_presigned_url(
592
+ "get_object", Params={"Bucket": bucket, "Key": key}, ExpiresIn=expires_in
593
+ )
594
+ expires_at = (datetime.utcnow() + timedelta(seconds=expires_in)).isoformat() + "Z"
595
+ return {"presigned_url": url, "presigned_expires_at": expires_at}
596
+ except Exception:
597
+ return {}
598
+
599
+
600
+ def _push_pipeline_history(chat_id: str, pipeline: Dict[str, Any], created_from: str = "request", message: Optional[str] = None):
601
+ """
602
+ CHANGE: NEW helper.
603
+ Prepend a pipeline record to session.pipelines_history (unbounded, latest first).
604
+ """
605
+ try:
606
+ s = session_manager.get_session(chat_id) or {}
607
+ hist = list(s.get("pipelines_history", []))
608
+ entry = {
609
+ "pipeline": pipeline,
610
+ "created_at": datetime.utcnow().isoformat() + "Z",
611
+ "created_from": created_from,
612
+ "message": message,
613
+ "status": "proposed",
614
+ }
615
+ hist.insert(0, entry)
616
+ session_manager.update_session(chat_id, {"pipelines_history": hist})
617
+ except Exception:
618
+ pass
619
+
620
+
621
+ def _mark_latest_pipeline_history_status(chat_id: str, status: str, result_preview: Optional[str] = None):
622
+ """
623
+ CHANGE: NEW helper.
624
+ Update the most recent pipelines_history entry with status and optional result preview.
625
+ """
626
+ try:
627
+ s = session_manager.get_session(chat_id) or {}
628
+ hist = list(s.get("pipelines_history", []))
629
+ if not hist:
630
+ return
631
+ hist[0]["status"] = status
632
+ if result_preview is not None:
633
+ hist[0]["result_preview"] = result_preview[:500]
634
+ session_manager.update_session(chat_id, {"pipelines_history": hist})
635
+ except Exception:
636
+ pass
637
+
638
+
639
+ def _extract_user_facing_text(obj: Any) -> str:
640
+ """
641
+ CHANGE: NEW helper.
642
+ Heuristically extract user-facing text from pipeline results.
643
+ """
644
+ try:
645
+ if isinstance(obj, str):
646
+ return obj
647
+ if isinstance(obj, dict):
648
+ for k in ["summary", "final_text", "content", "text", "output"]:
649
+ v = obj.get(k)
650
+ if isinstance(v, str) and v.strip():
651
+ return v.strip()
652
+ if isinstance(v, list):
653
+ texts = [x for x in v if isinstance(x, str)]
654
+ if texts:
655
+ return "\n".join(texts[:3]).strip()
656
+ return json.dumps(obj, ensure_ascii=False)[:2000]
657
+ if isinstance(obj, list):
658
+ texts = [x for x in obj if isinstance(x, str)]
659
+ if texts:
660
+ return "\n".join(texts[:5])
661
+ return json.dumps(obj, ensure_ascii=False)[:2000]
662
+ return ""
663
+ except Exception:
664
+ return ""
665
+
666
+
667
  # ========================
668
  # HELPERS
669
  # ========================
 
728
  return history
729
 
730
 
731
+ # def _add_and_mirror_message(chat_id: str, role: str, content: str):
732
+ # """
733
+ # Add a message via session_manager AND mirror it into the session doc
734
+ # so history is always available in responses.
735
+ # """
736
+ # session_manager.add_message(chat_id, role, content)
737
+ # try:
738
+ # s = session_manager.get_session(chat_id) or {}
739
+ # msgs = list(s.get("messages", []))
740
+ # msgs.append({
741
+ # "role": role,
742
+ # "content": content if isinstance(content, str) else json.dumps(content, ensure_ascii=False),
743
+ # "timestamp": datetime.utcnow().isoformat() + "Z",
744
+ # })
745
+ # session_manager.update_session(chat_id, {"messages": msgs})
746
+ # except Exception:
747
+ # # Don't block the flow on mirror issues
748
+ # pass
749
+
750
  def _add_and_mirror_message(chat_id: str, role: str, content: str):
751
  """
752
  Add a message via session_manager AND mirror it into the session doc
753
  so history is always available in responses.
754
+ CHANGE: Now we always include file_data: {"has_file": False} for every message by default,
755
+ so Message.file is present per message in history.
756
  """
757
  session_manager.add_message(chat_id, role, content)
758
  try:
 
762
  "role": role,
763
  "content": content if isinstance(content, str) else json.dumps(content, ensure_ascii=False),
764
  "timestamp": datetime.utcnow().isoformat() + "Z",
765
+ "file_data": {"has_file": False} # CHANGE: ensure explicit boolean on every message
766
  })
767
  session_manager.update_session(chat_id, {"messages": msgs})
768
  except Exception:
 
852
  """
853
  Stream an UploadFile directly to S3, return s3:// URI.
854
  Supports optional SSE via env S3_SSE and S3_KMS_KEY_ID.
855
+ CHANGE:
856
+ - Generate a presigned GET URL with max expiry (7 days) once at upload time.
857
+ - Store presigned_url and presigned_expires_at in session.file_metadata (do not regenerate later).
858
  """
859
  key = f"{S3_PREFIX}/{chat_id}/{file.filename}"
860
  config = TransferConfig(multipart_threshold=8 * 1024 * 1024, max_concurrency=4)
 
886
  )
887
 
888
  s3_uri = f"s3://{S3_BUCKET}/{key}"
889
+ # CHANGE: generate and store a single presigned URL (max 7 days) for later reuse
890
+ presigned = _generate_presigned_get_url(S3_BUCKET, key, expires_in=604800)
891
+
892
+ # Store file metadata in session (adds presigned fields)
893
  session_manager.update_session(chat_id, {
894
  "current_file": s3_uri,
895
  "state": "initial",
 
897
  "has_file": True,
898
  "file_name": file.filename,
899
  "file_url": s3_uri,
900
+ "uploaded_at": datetime.utcnow().isoformat() + "Z",
901
+ **presigned # CHANGE
902
  }
903
  })
904
+
905
  # Create a user message with file metadata (instead of system message)
906
  file_message = {
907
  "role": "user",
 
913
  "file_url": s3_uri
914
  }
915
  }
916
+
917
  # Get existing messages and add the file message
918
  s = session_manager.get_session(chat_id) or {}
919
  msgs = list(s.get("messages", []))
920
  msgs.append(file_message)
921
  session_manager.update_session(chat_id, {"messages": msgs})
922
+
923
  return s3_uri
924
 
925
 
 
935
  ):
936
  """
937
  Get all session IDs from the database with optional pagination
938
+
939
+ CHANGE: When include_stats=True, include 'chat_name' (added field only).
 
 
 
 
 
 
940
  """
941
  try:
942
  all_session_ids = session_manager.get_all_session_ids()
943
+
944
  if not all_session_ids:
945
  return {"sessions": [], "pagination": {"total": 0, "returned": 0}}
946
+
947
  # Apply pagination
948
  total_sessions = len(all_session_ids)
949
  paginated_ids = all_session_ids[skip:skip + limit]
950
+
951
  if not include_stats:
952
  # Return just session IDs with pagination info
953
  sessions_basic = [
 
958
  }
959
  for sid in paginated_ids
960
  ]
961
+
962
  return {
963
  "sessions": sessions_basic,
964
  "pagination": {
 
969
  "has_more": total_sessions > (skip + limit)
970
  }
971
  }
972
+
973
  # Include detailed statistics for each session
974
  sessions_with_stats = []
975
  for session_id in paginated_ids:
 
978
  # Format datetime objects for JSON serialization
979
  created_at = session.get("created_at")
980
  last_activity = session.get("last_activity")
981
+
982
  if isinstance(created_at, datetime):
983
  created_at = created_at.isoformat()
984
  if isinstance(last_activity, datetime):
985
  last_activity = last_activity.isoformat()
986
+
987
  sessions_with_stats.append({
988
  "session_id": session_id,
989
  "user_id": session.get("user_id"),
 
991
  "last_activity": last_activity,
992
  "state": session.get("state", "unknown"),
993
  "current_file": session.get("current_file"),
994
+ "chat_name": session.get("chat_name"), # CHANGE: added field
995
  "stats": session.get("stats", {}),
996
  "total_messages": len(session.get("conversation_history", [])),
997
  "pipeline_executions_count": len(session.get("pipeline_executions", []))
998
  })
999
+
1000
  return {
1001
  "sessions": sessions_with_stats,
1002
  "pagination": {
 
1007
  "has_more": total_sessions > (skip + limit)
1008
  }
1009
  }
1010
+
1011
  except Exception as e:
1012
  raise HTTPException(
1013
  status_code=500,
 
1026
  ):
1027
  """
1028
  Get conversation history for a session
1029
+
1030
+ CHANGE: Adds 'chat_name' and 'pipelines_history' (added fields only).
 
 
 
 
 
1031
  """
1032
  try:
1033
  # Get the history using session manager
1034
  history = session_manager.get_session_history(session_id, limit)
1035
+
1036
  # Format datetime objects to ISO strings for JSON serialization
1037
  formatted_history = []
1038
  for msg in history:
 
1040
  if "timestamp" in msg_copy and isinstance(msg_copy["timestamp"], datetime):
1041
  msg_copy["timestamp"] = msg_copy["timestamp"].isoformat()
1042
  formatted_history.append(msg_copy)
1043
+
1044
+ s = session_manager.get_session(session_id) or {}
1045
+
1046
  # Return with session_id included
1047
  return {
1048
  "session_id": session_id,
1049
  "history": formatted_history,
1050
  "count": len(formatted_history),
1051
+ "limit": limit,
1052
+ "chat_name": s.get("chat_name"), # CHANGE
1053
+ "pipelines_history": s.get("pipelines_history", []) # CHANGE
1054
  }
1055
+
1056
  except Exception as e:
1057
  raise HTTPException(
1058
  status_code=500,
 
1079
  - Handles casual chat, pipeline request, approve/reject, and edits.
1080
  - On approval, executes the pipeline (non-stream) and returns the final result.
1081
  Returns assistant_response + full history (role/content).
1082
+
1083
+ CHANGE SUMMARY:
1084
+ - Generate session name after first real user message via Bedrock LLM.
1085
+ - Include presigned_url in file ack when a file is uploaded here (from stored metadata; do not regenerate).
1086
+ - Maintain unbounded pipelines_history (latest first) on pipeline create/edit.
1087
+ - Final output after pipeline execution includes actual user-facing result (summary) instead of generic text.
1088
+ - No response field names changed; only fields were added where applicable.
1089
  """
1090
 
1091
  # Support JSON payloads too
 
1111
  file_info = None
1112
  if file is not None:
1113
  s3_uri = upload_stream_to_s3(chat_id, file)
1114
+ # CHANGE: Include presigned fields from saved metadata (no regeneration)
1115
+ meta = (session_manager.get_session(chat_id) or {}).get("file_metadata", {}) or {}
1116
+ file_info = {
1117
+ "bucket": S3_BUCKET,
1118
+ "key": s3_uri.split(f"s3://{S3_BUCKET}/", 1)[1],
1119
+ "s3_uri": s3_uri,
1120
+ "presigned_url": meta.get("presigned_url"), # CHANGE
1121
+ "presigned_expires_at": meta.get("presigned_expires_at") # CHANGE
1122
+ }
1123
  session = _get_session_or_init(chat_id)
1124
 
1125
  # If no message and only a file was sent, respond with an acknowledgement
1126
  if (message is None or str(message).strip() == "") and file_info:
1127
  friendly = "πŸ“ File uploaded successfully. Tell me what you'd like to do with it (e.g., extract text, get tables, summarize)."
1128
+ api_data = {"type": "file_uploaded", "file": file_info, "next_action": "send_instruction"} # CHANGE: file contains presigned
1129
  return _assistant_response_payload(chat_id, friendly, {"intent": "file_uploaded"}, api_data, session.get("state", "initial"))
1130
 
1131
  # If still no message, nudge the user
 
1136
 
1137
  # Add user message
1138
  _add_and_mirror_message(chat_id, "user", message)
1139
+ # CHANGE: maybe generate a session title after first real user message
1140
+ _maybe_generate_chat_name(chat_id)
1141
 
1142
  # Classify intent
1143
  intent_data = intent_classifier.classify_intent(message)
 
1199
  prefer_bedrock=bool(prefer_bedrock),
1200
  )
1201
  session_manager.update_session(chat_id, {"pipeline_result": result, "state": "initial"})
1202
+ # CHANGE: include actual user-facing result
1203
+ result_text = _extract_user_facing_text(result)
1204
+ preview = (result_text or json.dumps(result, ensure_ascii=False))[:500]
1205
+ _mark_latest_pipeline_history_status(chat_id, "executed", result_preview=preview) # CHANGE
1206
+ friendly = f"πŸŽ‰ Pipeline completed successfully!\n\n{result_text or 'βœ… All done!'}" # CHANGE
1207
  api_data = {
1208
  "type": "pipeline_completed",
1209
  "result": result,
 
1212
  return _assistant_response_payload(chat_id, friendly, {"intent": "pipeline_execute"}, api_data, "initial")
1213
  except Exception as e:
1214
  session_manager.update_session(chat_id, {"state": "initial"})
1215
+ _mark_latest_pipeline_history_status(chat_id, "failed", result_preview=str(e)) # CHANGE
1216
  friendly = f"❌ Pipeline execution failed: {str(e)}"
1217
  api_data = {"type": "error", "error_code": "PIPELINE_EXECUTION_FAILED", "message": str(e)}
1218
  return _assistant_response_payload(chat_id, friendly, {"intent": "pipeline_execute"}, api_data, "initial")
 
1252
  )
1253
 
1254
  session_manager.update_session(chat_id, {"proposed_pipeline": pipeline, "state": "pipeline_proposed"})
1255
+ _push_pipeline_history(chat_id, pipeline, created_from="request", message=message) # CHANGE
1256
 
1257
  pipeline_name = pipeline.get("pipeline_name", "Document Processing")
1258
  steps_list = pipeline.get("pipeline_steps", [])
 
1312
  prefer_bedrock=bool(prefer_bedrock)
1313
  )
1314
  session_manager.update_session(chat_id, {"proposed_pipeline": new_pipeline, "state": "pipeline_proposed"})
1315
+ _push_pipeline_history(chat_id, new_pipeline, created_from="edit", message=message) # CHANGE
1316
 
1317
  formatted = format_pipeline_for_display(new_pipeline)
1318
  friendly = formatted + f"\n\n```json\n{json.dumps(new_pipeline, indent=2)}\n```"
 
1386
  - Accepts multipart/form-data (file + message) OR JSON.
1387
  - Uploads file if included.
1388
  - On approval, streams execution progress and final result.
1389
+
1390
+ CHANGE SUMMARY:
1391
+ - Generate session name after first real user message via Bedrock LLM.
1392
+ - Include presigned_url in file ack when a file is uploaded here (from stored metadata; do not regenerate).
1393
+ - Maintain pipelines_history on pipeline create/edit, and mark executed/failed after run.
1394
+ - Stream final includes actual user-facing result text.
1395
  """
1396
 
1397
  # Parse JSON if needed
 
1417
  uploaded_file_info = None
1418
  if file is not None:
1419
  s3_uri = upload_stream_to_s3(chat_id, file)
1420
+ meta = (session_manager.get_session(chat_id) or {}).get("file_metadata", {}) or {}
1421
+ uploaded_file_info = {
1422
+ "bucket": S3_BUCKET,
1423
+ "key": s3_uri.split(f"s3://{S3_BUCKET}/", 1)[1],
1424
+ "s3_uri": s3_uri,
1425
+ "presigned_url": meta.get("presigned_url"), # CHANGE
1426
+ "presigned_expires_at": meta.get("presigned_expires_at") # CHANGE
1427
+ }
1428
  session = _get_session_or_init(chat_id)
1429
 
1430
  def emit(obj: Dict[str, Any]) -> bytes:
 
1451
 
1452
  # Add user message
1453
  _add_and_mirror_message(chat_id, "user", message)
1454
+ # CHANGE: Maybe generate chat title
1455
+ _maybe_generate_chat_name(chat_id)
1456
 
1457
  # Classify
1458
  intent_data = intent_classifier.classify_intent(message)
 
1491
  prefer_bedrock=bool(prefer_bedrock),
1492
  )
1493
  session_manager.update_session(chat_id, {"proposed_pipeline": pipeline, "state": "pipeline_proposed"})
1494
+ _push_pipeline_history(chat_id, pipeline, created_from="request", message=message) # CHANGE
1495
 
1496
  pipeline_name = pipeline.get("pipeline_name", "Document Processing")
1497
  steps_list = pipeline.get("pipeline_steps", [])
 
1578
  friendly_err = f"❌ Pipeline Failed\n\nError: {err}\n\nCompleted {len(steps_completed)} step(s) before failure."
1579
  session_manager.update_session(chat_id, {"state": "initial"})
1580
  _add_and_mirror_message(chat_id, "assistant", friendly_err)
1581
+ _mark_latest_pipeline_history_status(chat_id, "failed", result_preview=str(err)) # CHANGE
1582
  yield emit({"type": "assistant_final", "content": friendly_err, "error": err, "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
1583
  return
1584
 
 
1592
  executor=executor_used
1593
  )
1594
  success_count = len([s for s in steps_completed if s.get("status") == "completed"])
1595
+ # CHANGE: include actual user-facing result
1596
+ result_text = _extract_user_facing_text(final_payload)
1597
+ preview = (result_text or json.dumps(final_payload, ensure_ascii=False))[:500]
1598
+ _mark_latest_pipeline_history_status(chat_id, "executed", result_preview=preview) # CHANGE
1599
  friendly_final = (
1600
  f"πŸŽ‰ Pipeline Completed Successfully!\n"
1601
  f"- Pipeline: {plan.get('pipeline_name', 'Document Processing')}\n"
1602
  f"- Total Steps: {len(steps_completed)}\n"
1603
  f"- Successful: {success_count}\n"
1604
+ f"- Executor: {executor_used}\n\n"
1605
+ f"{result_text or ''}"
1606
  )
1607
  _add_and_mirror_message(chat_id, "assistant", friendly_final)
1608
  yield emit({"type": "assistant_final", "content": friendly_final, "result": final_payload, "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
 
1618
  friendly_err = f"❌ Pipeline Execution Failed\n\nError: {str(e)}"
1619
  session_manager.update_session(chat_id, {"state": "initial"})
1620
  _add_and_mirror_message(chat_id, "assistant", friendly_err)
1621
+ _mark_latest_pipeline_history_status(chat_id, "failed", result_preview=str(e)) # CHANGE
1622
  yield emit({"type": "assistant_final", "content": friendly_err, "error": str(e), "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
1623
  return
1624
  finally:
 
1644
  prefer_bedrock=bool(prefer_bedrock)
1645
  )
1646
  session_manager.update_session(chat_id, {"proposed_pipeline": new_pipeline, "state": "pipeline_proposed"})
1647
+ _push_pipeline_history(chat_id, new_pipeline, created_from="edit", message=message) # CHANGE
1648
  formatted = format_pipeline_for_display(new_pipeline)
1649
  friendly = formatted + f"\n\n```json\n{json.dumps(new_pipeline, indent=2)}\n```"
1650
  _add_and_mirror_message(chat_id, "assistant", friendly)
 
1674
  """
1675
  Kept for compatibility with existing clients (non-stream).
1676
  For a single all-in-one endpoint, use /api/v2/chat/unified.
1677
+
1678
+ CHANGE SUMMARY:
1679
+ - Generate session name after first real user message.
1680
+ - Maintain pipelines_history on pipeline create/edit.
1681
+ - (This legacy endpoint does not execute on approval; unchanged to avoid breaking behavior.)
1682
  """
1683
  chat_id = _ensure_chat(request.chat_id)
1684
  session = _get_session_or_init(chat_id)
 
1688
  session = _get_session_or_init(chat_id)
1689
 
1690
  _add_and_mirror_message(chat_id, "user", request.message)
1691
+ # CHANGE: maybe name the chat
1692
+ _maybe_generate_chat_name(chat_id)
1693
 
1694
  intent_data = intent_classifier.classify_intent(request.message)
1695
  current_state = session.get("state", "initial")
 
1773
  )
1774
 
1775
  session_manager.update_session(chat_id, {"proposed_pipeline": pipeline, "state": "pipeline_proposed"})
1776
+ _push_pipeline_history(chat_id, pipeline, created_from="request", message=request.message) # CHANGE
1777
 
1778
  pipeline_name = pipeline.get("pipeline_name", "Document Processing")
1779
  steps_list = pipeline.get("pipeline_steps", [])
 
1832
  prefer_bedrock=request.prefer_bedrock
1833
  )
1834
  session_manager.update_session(chat_id, {"proposed_pipeline": new_pipeline, "state": "pipeline_proposed"})
1835
+ _push_pipeline_history(chat_id, new_pipeline, created_from="edit", message=request.message) # CHANGE
1836
 
1837
  formatted = format_pipeline_for_display(new_pipeline)
1838
  friendly = formatted + f"\n\n```json\n{json.dumps(new_pipeline, indent=2)}\n```"
 
1892
  """
1893
  Kept for compatibility (streaming NDJSON).
1894
  For the all-in-one streaming flow, use /api/v2/chat/unified/stream.
1895
+
1896
+ CHANGE SUMMARY:
1897
+ - Generate session name after first real user message.
1898
+ - Maintain pipelines_history on pipeline create/edit.
1899
+ - Mark latest pipelines_history as executed/failed when streaming completes.
1900
+ - Final streamed assistant message includes user-facing result text.
1901
  """
1902
 
1903
  def gen() -> Generator[bytes, None, None]:
 
1909
  session = _get_session_or_init(chat_id)
1910
 
1911
  _add_and_mirror_message(chat_id, "user", request.message)
1912
+ # CHANGE: maybe generate chat name
1913
+ _maybe_generate_chat_name(chat_id)
1914
 
1915
  intent_data = intent_classifier.classify_intent(request.message)
1916
  current_state = session.get("state", "initial")
 
1952
  prefer_bedrock=request.prefer_bedrock
1953
  )
1954
  session_manager.update_session(chat_id, {"proposed_pipeline": pipeline, "state": "pipeline_proposed"})
1955
+ _push_pipeline_history(chat_id, pipeline, created_from="request", message=request.message) # CHANGE
1956
 
1957
  pipeline_name = pipeline.get("pipeline_name", "Document Processing")
1958
  steps_list = pipeline.get("pipeline_steps", [])
 
2039
 
2040
  elif etype == "error":
2041
  err = event.get("error", "Unknown error")
2042
+ friendly_err = f"❌ Pipeline Execution Failed\n\nError: {err}\n\nCompleted {len(steps_completed)} step(s) before failure."
2043
  session_manager.update_session(chat_id, {"state": "initial"})
2044
  _add_and_mirror_message(chat_id, "assistant", friendly_err)
2045
+ _mark_latest_pipeline_history_status(chat_id, "failed", result_preview=str(err)) # CHANGE
2046
  yield emit({"type": "assistant_final", "content": friendly_err, "error": err, "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
2047
  return
2048
 
 
2056
  executor=executor_used
2057
  )
2058
  success_count = len([s for s in steps_completed if s.get("status") == "completed"])
2059
+ result_text = _extract_user_facing_text(final_payload) # CHANGE
2060
+ preview = (result_text or json.dumps(final_payload, ensure_ascii=False))[:500]
2061
+ _mark_latest_pipeline_history_status(chat_id, "executed", result_preview=preview) # CHANGE
2062
  friendly_final = (
2063
  f"πŸŽ‰ Pipeline Completed Successfully!\n"
2064
  f"- Pipeline: {plan.get('pipeline_name', 'Document Processing')}\n"
2065
  f"- Total Steps: {len(steps_completed)}\n"
2066
  f"- Successful: {success_count}\n"
2067
+ f"- Executor: {executor_used}\n\n"
2068
+ f"{result_text or ''}"
2069
  )
2070
  _add_and_mirror_message(chat_id, "assistant", friendly_final)
2071
  yield emit({"type": "assistant_final", "content": friendly_final, "result": final_payload, "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
 
2081
  friendly_err = f"❌ Pipeline Execution Failed\n\nError: {str(e)}\n\nCompleted {len(steps_completed)} step(s) before failure."
2082
  session_manager.update_session(chat_id, {"state": "initial"})
2083
  _add_and_mirror_message(chat_id, "assistant", friendly_err)
2084
+ _mark_latest_pipeline_history_status(chat_id, "failed", result_preview=str(e)) # CHANGE
2085
  yield emit({"type": "assistant_final", "content": friendly_err, "error": str(e), "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
2086
  return
2087
  finally:
 
2107
  prefer_bedrock=request.prefer_bedrock
2108
  )
2109
  session_manager.update_session(chat_id, {"proposed_pipeline": new_pipeline, "state": "pipeline_proposed"})
2110
+ _push_pipeline_history(chat_id, new_pipeline, created_from="edit", message=request.message) # CHANGE
2111
  formatted = format_pipeline_for_display(new_pipeline)
2112
  friendly = formatted + f"\n\n```json\n{json.dumps(new_pipeline, indent=2)}\n```"
2113
  _add_and_mirror_message(chat_id, "assistant", friendly)
 
2132
 
2133
  @router.post("/chats/{chat_id}/pipeline/execute", response_model=ChatResponse)
2134
  async def execute_pipeline_now(chat_id: str):
2135
+ """
2136
+ CHANGE: After execution, assistant_response includes user-facing result text.
2137
+ Also mark latest pipelines_history entry as executed/failed.
2138
+ """
2139
  session = session_manager.get_session(chat_id)
2140
  if not session:
2141
  raise HTTPException(status_code=404, detail="Chat not found")
 
2155
  prefer_bedrock=True
2156
  )
2157
  session_manager.update_session(chat_id, {"pipeline_result": result, "state": "initial"})
2158
+ # CHANGE: include actual result text
2159
+ result_text = _extract_user_facing_text(result)
2160
+ preview = (result_text or json.dumps(result, ensure_ascii=False))[:500]
2161
+ _mark_latest_pipeline_history_status(chat_id, "executed", result_preview=preview) # CHANGE
2162
+ friendly = f"πŸŽ‰ Pipeline completed.\n\n{result_text or ''}" # CHANGE
2163
  api_data = {"type": "pipeline_completed", "result": result, "pipeline": plan}
2164
  return _assistant_response_payload(chat_id, friendly, {"intent": "pipeline_execute"}, api_data, "initial")
2165
  except Exception as e:
2166
  session_manager.update_session(chat_id, {"state": "initial"})
2167
+ _mark_latest_pipeline_history_status(chat_id, "failed", result_preview=str(e)) # CHANGE
2168
  friendly = f"❌ Pipeline execution failed: {str(e)}"
2169
  api_data = {"type": "error", "error_code": "PIPELINE_EXECUTION_FAILED", "message": str(e)}
2170
  return _assistant_response_payload(chat_id, friendly, {"intent": "pipeline_execute"}, api_data, "initial")
 
2227
 
2228
  @router.get("/chats/{chat_id}")
2229
  def get_chat(chat_id: str):
2230
+ """
2231
+ CHANGE: Add chat_name and pipelines_history to session retrieval (added fields only).
2232
+ """
2233
  s = session_manager.get_session(chat_id)
2234
  if not s:
2235
  raise HTTPException(status_code=404, detail="Chat not found")
 
2239
  "current_file": s.get("current_file"),
2240
  "created_at": s.get("created_at"),
2241
  "updated_at": s.get("updated_at"),
2242
+ "chat_name": s.get("chat_name"), # CHANGE
2243
+ "pipelines_history": s.get("pipelines_history", []) # CHANGE
2244
  }
2245
 
2246
 
 
2255
  def get_chat_history(chat_id: str):
2256
  """
2257
  Get conversation history for a specific chat
2258
+
2259
+ CHANGE: Add chat_name and pipelines_history into the response (added fields only).
 
 
 
 
2260
  """
2261
  s = session_manager.get_session(chat_id)
2262
  if not s:
2263
  raise HTTPException(status_code=404, detail="Chat not found")
2264
+
2265
  # Get the normalized history (system messages filtered out)
2266
  history = [m.dict() for m in _normalize_history_for_api(chat_id)]
2267
+
2268
  # Get file metadata
2269
  file_metadata = s.get("file_metadata", {})
2270
+
2271
  # Format datetime objects for JSON serialization
2272
  created_at = s.get("created_at")
2273
  updated_at = s.get("updated_at")
2274
+
2275
  if isinstance(created_at, datetime):
2276
  created_at = created_at.isoformat()
2277
  if isinstance(updated_at, datetime):
2278
  updated_at = updated_at.isoformat()
2279
+
2280
  # Return with session_id and file metadata
2281
  return {
2282
  "session_id": chat_id,
 
2288
  "updated_at": updated_at,
2289
  "file": file_metadata.get("has_file", False),
2290
  "fileName": file_metadata.get("file_name"),
2291
+ "fileUrl": file_metadata.get("file_url"),
2292
+ "chat_name": s.get("chat_name"), # CHANGE
2293
+ "pipelines_history": s.get("pipelines_history", []) # CHANGE
2294
  }
2295
 
2296
 
 
2315
 
2316
  @router.post("/chats/{chat_id}/upload")
2317
  async def upload_file_to_chat(chat_id: str, file: UploadFile = File(...)):
2318
+ """
2319
+ CHANGE: Include presigned_url and presigned_expires_at in response file object.
2320
+ These are generated once during upload_stream_to_s3 and stored in session; we reuse them here.
2321
+ """
2322
  chat_id = _ensure_chat(chat_id)
2323
  s3_uri = upload_stream_to_s3(chat_id, file)
2324
+ meta = (session_manager.get_session(chat_id) or {}).get("file_metadata", {}) or {} # CHANGE
2325
  return {
2326
  "status": "success",
2327
  "message": "File uploaded to S3",
2328
+ "file": {
2329
+ "bucket": S3_BUCKET,
2330
+ "key": s3_uri.split(f"s3://{S3_BUCKET}/", 1)[1],
2331
+ "s3_uri": s3_uri,
2332
+ "presigned_url": meta.get("presigned_url"), # CHANGE
2333
+ "presigned_expires_at": meta.get("presigned_expires_at") # CHANGE
2334
+ },
2335
  "chat_id": chat_id,
2336
  "next_action": "πŸ’¬ Now tell me what you'd like to do with this document"
2337
  }