redhairedshanks1 commited on
Commit
307eb51
·
1 Parent(s): 3f5051e

Non-streaming (all-in-one), Streaming (live progress)

Browse files
Files changed (1) hide show
  1. api_routes_v2.py +644 -112
api_routes_v2.py CHANGED
@@ -400,7 +400,7 @@
400
 
401
  # api_routes_v2.py
402
 
403
- from fastapi import APIRouter, HTTPException, UploadFile, File
404
  from fastapi.responses import StreamingResponse
405
  from pydantic import BaseModel
406
  from typing import Optional, List, Dict, Any, Generator, Callable, Tuple
@@ -409,7 +409,6 @@ import os
409
  from datetime import datetime
410
  from urllib.parse import urlparse
411
  import tempfile
412
- from pathlib import Path
413
 
414
  # AWS S3 (server-side access, no presigned URLs)
415
  import boto3
@@ -428,10 +427,13 @@ router = APIRouter(prefix="/api/v2", tags=["MasterLLM API V2 - Enhanced"])
428
  # CONFIG: S3
429
  # ========================
430
 
431
- AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
432
- S3_BUCKET = os.getenv("S3_BUCKET_NAME", "your-bucket")
433
  S3_PREFIX = os.getenv("S3_PREFIX", "masterllm")
434
 
 
 
 
435
  s3 = boto3.client("s3", region_name=AWS_REGION)
436
 
437
  # ========================
@@ -468,12 +470,9 @@ def _ensure_chat(chat_id: Optional[str]) -> str:
468
  """
469
  Ensure a chat exists; if not provided or missing, create a new one.
470
  """
471
- if chat_id:
472
- if session_manager.get_session(chat_id):
473
- return chat_id
474
- # Create a brand new chat
475
  new_id = session_manager.create_session()
476
- # Warm/init
477
  session_manager.get_session(new_id)
478
  return new_id
479
 
@@ -509,7 +508,7 @@ def _normalize_history_for_api(chat_id: str) -> List[Message]:
509
  def _assistant_response_payload(
510
  chat_id: str,
511
  friendly_response: str,
512
- intent_data: Dict[str, Any],
513
  api_data: Dict[str, Any],
514
  state: str
515
  ) -> ChatResponse:
@@ -520,7 +519,7 @@ def _assistant_response_payload(
520
  return ChatResponse(
521
  assistant_response=friendly_response,
522
  api_response=api_data,
523
- intent=intent_data,
524
  chat_id=chat_id,
525
  state=state,
526
  history=history
@@ -575,38 +574,649 @@ def download_to_temp_file(file_ref: Optional[str]) -> Tuple[Optional[str], Calla
575
  return file_ref, noop
576
 
577
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
578
  # ========================
579
- # SMART CHAT (non-streaming)
580
  # ========================
581
 
582
  @router.post("/chat", response_model=ChatResponse)
583
  async def smart_chat(request: ChatRequest):
584
  """
585
- Intelligent chat endpoint aligned with app.py:
586
- - Classifies intent (casual_chat, question, unclear, pipeline_request, approval, rejection)
587
- - Casual/chat/question/unclear: friendly reply
588
- - Pipeline requests: generates + proposes plan
589
- - Approvals/rejections when in 'pipeline_proposed' state
590
- Returns:
591
- - assistant_response (LLM-visible reply), plus full history like Gradio UI
592
  """
593
  chat_id = _ensure_chat(request.chat_id)
594
  session = _get_session_or_init(chat_id)
595
 
596
- # Update file if provided (can be local path or s3://)
597
  if request.file_path:
598
  session_manager.update_session(chat_id, {"current_file": request.file_path})
599
  session = _get_session_or_init(chat_id)
600
 
601
- # Add user message
602
  session_manager.add_message(chat_id, "user", request.message)
603
 
604
- # Classify intent
605
  intent_data = intent_classifier.classify_intent(request.message)
606
  current_state = session.get("state", "initial")
607
 
608
  try:
609
- # Casual chat
610
  if intent_data["intent"] == "casual_chat":
611
  friendly = intent_classifier.get_friendly_response("casual_chat", request.message)
612
  api_data = {
@@ -621,13 +1231,11 @@ async def smart_chat(request: ChatRequest):
621
  }
622
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
623
 
624
- # Questions
625
  if intent_data["intent"] == "question":
626
  friendly = intent_classifier.get_friendly_response("question", request.message)
627
  api_data = {"type": "informational_response", "message": friendly, "intent_classification": intent_data}
628
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
629
 
630
- # Unclear
631
  if intent_data["intent"] == "unclear":
632
  friendly = intent_classifier.get_friendly_response("unclear", request.message)
633
  api_data = {
@@ -642,37 +1250,29 @@ async def smart_chat(request: ChatRequest):
642
  }
643
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
644
 
645
- # Approval
646
  if intent_data["intent"] == "approval" and current_state == "pipeline_proposed":
647
  proposed = session.get("proposed_pipeline")
648
  if not proposed:
649
  msg = "No pipeline to approve. Please request a task first."
650
- return _assistant_response_payload(
651
- chat_id, msg, intent_data, {"type": "error", "message": msg}, current_state
652
- )
653
 
654
  session_manager.update_session(chat_id, {"state": "executing"})
655
- friendly = (
656
- f"✅ Great! Executing the pipeline: {proposed.get('pipeline_name')}\n\n"
657
- f"⏳ Processing... (Use the streaming endpoint for real-time updates)"
658
- )
659
  api_data = {
660
  "type": "pipeline_approved",
661
  "message": "Pipeline execution started",
662
  "pipeline": proposed,
663
  "execution_status": "started",
664
- "note": "Use /api/v2/chats/{chat_id}/pipeline/execute/stream for real-time progress"
665
  }
666
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, "executing")
667
 
668
- # Rejection
669
  if intent_data["intent"] == "rejection" and current_state == "pipeline_proposed":
670
  session_manager.update_session(chat_id, {"state": "initial", "proposed_pipeline": None})
671
  friendly = "👍 No problem! The pipeline has been cancelled. What else would you like me to help you with?"
672
  api_data = {"type": "pipeline_rejected", "message": "Pipeline cancelled by user", "state_reset": True}
673
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, "initial")
674
 
675
- # Pipeline request
676
  if intent_data["intent"] == "pipeline_request" and intent_data.get("requires_pipeline", False):
677
  if not session.get("current_file"):
678
  friendly = (
@@ -687,7 +1287,6 @@ async def smart_chat(request: ChatRequest):
687
  }
688
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
689
 
690
- # Generate pipeline (no need to download file)
691
  try:
692
  pipeline = generate_pipeline(
693
  user_input=request.message,
@@ -743,7 +1342,6 @@ async def smart_chat(request: ChatRequest):
743
  }
744
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
745
 
746
- # Modify when pipeline_proposed and user describes changes
747
  if current_state == "pipeline_proposed":
748
  if len(request.message.strip()) > 5:
749
  try:
@@ -775,7 +1373,6 @@ async def smart_chat(request: ChatRequest):
775
  friendly = f"```json\n{json.dumps(api_data, indent=2)}\n```"
776
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, "pipeline_proposed")
777
 
778
- # Waiting for confirmation
779
  api_data = {
780
  "type": "waiting_for_confirmation",
781
  "message": "Please type 'approve', 'reject', or describe changes",
@@ -784,7 +1381,6 @@ async def smart_chat(request: ChatRequest):
784
  friendly = f"```json\n{json.dumps(api_data, indent=2)}\n```"
785
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, "pipeline_proposed")
786
 
787
- # Default nudge
788
  friendly = (
789
  "I'm here to help process documents! Please tell me what you'd like to do with your document.\n\n"
790
  "For example:\n- 'extract text and summarize'\n- 'get tables from pages 2-5'\n- 'translate to Spanish'\n\n"
@@ -799,7 +1395,6 @@ async def smart_chat(request: ChatRequest):
799
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
800
 
801
  except Exception as e:
802
- # Global error handler
803
  error_msg = f"An unexpected error occurred: {str(e)}"
804
  history = _normalize_history_for_api(chat_id)
805
  return ChatResponse(
@@ -812,34 +1407,23 @@ async def smart_chat(request: ChatRequest):
812
  )
813
 
814
 
815
- # ========================
816
- # STREAMING CHAT (NDJSON)
817
- # ========================
818
-
819
  @router.post("/chat/stream")
820
  def smart_chat_stream(request: ChatRequest):
821
  """
822
- Streaming variant of /chat. Emits NDJSON lines mirroring Gradio-like updates.
823
- Content-Type: application/x-ndjson
824
- Events:
825
- - assistant_final: final assistant message
826
- - assistant_delta: accumulated content (during execution approval path)
827
- - status/info/error: additional signals
828
  """
829
 
830
  def gen() -> Generator[bytes, None, None]:
831
  chat_id = _ensure_chat(request.chat_id)
832
  session = _get_session_or_init(chat_id)
833
 
834
- # Update file if provided
835
  if request.file_path:
836
  session_manager.update_session(chat_id, {"current_file": request.file_path})
837
  session = _get_session_or_init(chat_id)
838
 
839
- # Add user message
840
  session_manager.add_message(chat_id, "user", request.message)
841
 
842
- # Classify
843
  intent_data = intent_classifier.classify_intent(request.message)
844
  current_state = session.get("state", "initial")
845
 
@@ -849,14 +1433,12 @@ def smart_chat_stream(request: ChatRequest):
849
  line = json.dumps(obj, ensure_ascii=False).encode("utf-8") + b"\n"
850
  return line
851
 
852
- # Casual / question / unclear at initial
853
  if intent_data["intent"] in {"casual_chat", "question", "unclear"} and current_state == "initial":
854
  friendly = intent_classifier.get_friendly_response(intent_data["intent"], request.message)
855
  session_manager.add_message(chat_id, "assistant", friendly)
856
  yield emit({"type": "assistant_final", "content": friendly, "intent": intent_data, "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
857
  return
858
 
859
- # Initial: pipeline request or nudge
860
  if current_state == "initial":
861
  if not intent_data.get("requires_pipeline", False):
862
  friendly = (
@@ -874,7 +1456,6 @@ def smart_chat_stream(request: ChatRequest):
874
  yield emit({"type": "assistant_final", "content": friendly, "intent": intent_data, "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
875
  return
876
 
877
- # Generate pipeline (no need to download file)
878
  yield emit({"type": "status", "message": "Analyzing request and creating a pipeline..."})
879
  try:
880
  pipeline = generate_pipeline(
@@ -906,7 +1487,6 @@ def smart_chat_stream(request: ChatRequest):
906
  yield emit({"type": "assistant_final", "content": friendly, "error": str(e), "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
907
  return
908
 
909
- # Pipeline proposed: handle approval, rejection, or edit
910
  if current_state == "pipeline_proposed":
911
  if intent_data["intent"] == "approval":
912
  session_manager.update_session(chat_id, {"state": "executing"})
@@ -922,7 +1502,6 @@ def smart_chat_stream(request: ChatRequest):
922
  executor_used = "unknown"
923
  accumulated = initial
924
 
925
- # Download to temp for execution and ensure cleanup
926
  file_ref = session.get("current_file")
927
  local_path, cleanup = download_to_temp_file(file_ref)
928
 
@@ -977,7 +1556,6 @@ def smart_chat_stream(request: ChatRequest):
977
  yield emit({"type": "assistant_final", "content": friendly_err, "error": err, "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
978
  return
979
 
980
- # Finalize
981
  if final_payload:
982
  session_manager.update_session(chat_id, {"pipeline_result": final_payload, "state": "initial"})
983
  session_manager.save_pipeline_execution(
@@ -1013,7 +1591,6 @@ def smart_chat_stream(request: ChatRequest):
1013
  yield emit({"type": "assistant_final", "content": friendly_err, "error": str(e), "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
1014
  return
1015
  finally:
1016
- # Clear the temp file after using it
1017
  try:
1018
  cleanup()
1019
  except Exception:
@@ -1027,7 +1604,6 @@ def smart_chat_stream(request: ChatRequest):
1027
  return
1028
 
1029
  else:
1030
- # Treat as edit/modify
1031
  try:
1032
  original_plan = session.get("proposed_pipeline", {})
1033
  edit_context = f"Original: {original_plan.get('pipeline_name')}. User wants: {request.message}"
@@ -1048,7 +1624,6 @@ def smart_chat_stream(request: ChatRequest):
1048
  yield emit({"type": "assistant_final", "content": friendly, "error": str(e), "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
1049
  return
1050
 
1051
- # Default
1052
  friendly = "Please upload a document and tell me what you'd like me to do (e.g., extract text, summarize, translate)."
1053
  session_manager.add_message(chat_id, "assistant", friendly)
1054
  yield emit({"type": "assistant_final", "content": friendly, "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
@@ -1057,15 +1632,11 @@ def smart_chat_stream(request: ChatRequest):
1057
 
1058
 
1059
  # ========================
1060
- # PIPELINE EXECUTE (non-streaming)
1061
  # ========================
1062
 
1063
  @router.post("/chats/{chat_id}/pipeline/execute", response_model=ChatResponse)
1064
  async def execute_pipeline_now(chat_id: str):
1065
- """
1066
- Execute the currently proposed pipeline and return final result (non-streaming).
1067
- Downloads the file to a temp path for execution and deletes it afterward.
1068
- """
1069
  session = session_manager.get_session(chat_id)
1070
  if not session:
1071
  raise HTTPException(status_code=404, detail="Chat not found")
@@ -1087,14 +1658,13 @@ async def execute_pipeline_now(chat_id: str):
1087
  session_manager.update_session(chat_id, {"pipeline_result": result, "state": "initial"})
1088
  friendly = "🎉 Pipeline completed. Ready for your next task!"
1089
  api_data = {"type": "pipeline_completed", "result": result, "pipeline": plan}
1090
- return _assistant_response_payload(chat_id, friendly, intent={"intent": "pipeline_execute"}, api_data=api_data, state="initial")
1091
  except Exception as e:
1092
  session_manager.update_session(chat_id, {"state": "initial"})
1093
  friendly = f"❌ Pipeline execution failed: {str(e)}"
1094
  api_data = {"type": "error", "error_code": "PIPELINE_EXECUTION_FAILED", "message": str(e)}
1095
- return _assistant_response_payload(chat_id, friendly, intent={"intent": "pipeline_execute"}, api_data=api_data, state="initial")
1096
  finally:
1097
- # Clear temp file after use
1098
  try:
1099
  cleanup()
1100
  except Exception:
@@ -1103,10 +1673,6 @@ async def execute_pipeline_now(chat_id: str):
1103
 
1104
  @router.post("/chats/{chat_id}/pipeline/execute/stream")
1105
  def execute_pipeline_stream_endpoint(chat_id: str):
1106
- """
1107
- Stream the execution of the currently proposed pipeline (NDJSON).
1108
- Downloads the file to a temp path for execution and deletes it afterward.
1109
- """
1110
  session = session_manager.get_session(chat_id)
1111
  if not session:
1112
  raise HTTPException(status_code=404, detail="Chat not found")
@@ -1132,18 +1698,14 @@ def execute_pipeline_stream_endpoint(chat_id: str):
1132
  prefer_bedrock=True
1133
  ):
1134
  yield emit(event)
1135
-
1136
- # No final event? Return summary info
1137
  yield emit({"type": "info", "message": "Execution finished."})
1138
  except Exception as e:
1139
  yield emit({"type": "error", "error": str(e)})
1140
  finally:
1141
- # Clear temp file after use
1142
  try:
1143
  cleanup()
1144
  except Exception:
1145
  pass
1146
- # Reset state to initial after stream ends
1147
  session_manager.update_session(chat_id, {"state": "initial"})
1148
 
1149
  return StreamingResponse(gen(), media_type="application/x-ndjson")
@@ -1192,52 +1754,22 @@ def get_chat_stats(chat_id: str):
1192
 
1193
  @router.post("/chats/{chat_id}/messages", response_model=ChatResponse)
1194
  async def send_message_to_chat(chat_id: str, payload: ChatRequest):
1195
- """
1196
- Alias for POST /api/v2/chat, bound to a specific chat_id.
1197
- Returns assistant response plus full history (role/content), same as Gradio.
1198
- """
1199
  payload.chat_id = chat_id
1200
  return await smart_chat(payload)
1201
 
1202
 
1203
  # ========================
1204
- # FILE UPLOAD (to S3, no presigned URLs)
1205
  # ========================
1206
 
1207
  @router.post("/chats/{chat_id}/upload")
1208
  async def upload_file_to_chat(chat_id: str, file: UploadFile = File(...)):
1209
- """
1210
- Streams the uploaded file directly to S3; stores only s3:// URI in chat state.
1211
- """
1212
  chat_id = _ensure_chat(chat_id)
1213
- key = f"{S3_PREFIX}/{chat_id}/{file.filename}"
1214
- config = TransferConfig(multipart_threshold=8 * 1024 * 1024, max_concurrency=4)
1215
-
1216
- try:
1217
- # Stream from request to S3 (no full in-memory read, no local disk)
1218
- s3.upload_fileobj(
1219
- Fileobj=file.file,
1220
- Bucket=S3_BUCKET,
1221
- Key=key,
1222
- ExtraArgs={"ContentType": file.content_type or "application/octet-stream"},
1223
- Config=config
1224
- )
1225
- except ClientError as e:
1226
- code = e.response.get("Error", {}).get("Code", "Unknown")
1227
- msg = f"S3 upload failed: {code}. Check AWS credentials, permissions (s3:PutObject), region and bucket."
1228
- raise HTTPException(
1229
- status_code=403 if code in ("AccessDenied", "InvalidAccessKeyId", "SignatureDoesNotMatch") else 500,
1230
- detail=msg
1231
- )
1232
-
1233
- s3_uri = f"s3://{S3_BUCKET}/{key}"
1234
- session_manager.update_session(chat_id, {"current_file": s3_uri, "state": "initial"})
1235
- session_manager.add_message(chat_id, "system", f"File uploaded to S3: {s3_uri}")
1236
-
1237
  return {
1238
  "status": "success",
1239
  "message": "File uploaded to S3",
1240
- "file": {"bucket": S3_BUCKET, "key": key, "s3_uri": s3_uri},
1241
  "chat_id": chat_id,
1242
  "next_action": "💬 Now tell me what you'd like to do with this document"
1243
  }
 
400
 
401
  # api_routes_v2.py
402
 
403
+ from fastapi import APIRouter, HTTPException, UploadFile, File, Request, Form
404
  from fastapi.responses import StreamingResponse
405
  from pydantic import BaseModel
406
  from typing import Optional, List, Dict, Any, Generator, Callable, Tuple
 
409
  from datetime import datetime
410
  from urllib.parse import urlparse
411
  import tempfile
 
412
 
413
  # AWS S3 (server-side access, no presigned URLs)
414
  import boto3
 
427
  # CONFIG: S3
428
  # ========================
429
 
430
+ AWS_REGION = os.getenv("AWS_REGION") or os.getenv("AWS_DEFAULT_REGION") or "us-east-1"
431
+ S3_BUCKET = os.getenv("S3_BUCKET") or os.getenv("S3_BUCKET_NAME")
432
  S3_PREFIX = os.getenv("S3_PREFIX", "masterllm")
433
 
434
+ if not S3_BUCKET:
435
+ raise RuntimeError("Missing S3 bucket. Set S3_BUCKET (or S3_BUCKET_NAME).")
436
+
437
  s3 = boto3.client("s3", region_name=AWS_REGION)
438
 
439
  # ========================
 
470
  """
471
  Ensure a chat exists; if not provided or missing, create a new one.
472
  """
473
+ if chat_id and session_manager.get_session(chat_id):
474
+ return chat_id
 
 
475
  new_id = session_manager.create_session()
 
476
  session_manager.get_session(new_id)
477
  return new_id
478
 
 
508
  def _assistant_response_payload(
509
  chat_id: str,
510
  friendly_response: str,
511
+ intent: Dict[str, Any],
512
  api_data: Dict[str, Any],
513
  state: str
514
  ) -> ChatResponse:
 
519
  return ChatResponse(
520
  assistant_response=friendly_response,
521
  api_response=api_data,
522
+ intent=intent,
523
  chat_id=chat_id,
524
  state=state,
525
  history=history
 
574
  return file_ref, noop
575
 
576
 
577
+ def upload_stream_to_s3(chat_id: str, file: UploadFile) -> str:
578
+ """
579
+ Stream an UploadFile directly to S3, return s3:// URI.
580
+ Supports optional SSE via env S3_SSE and S3_KMS_KEY_ID.
581
+ """
582
+ key = f"{S3_PREFIX}/{chat_id}/{file.filename}"
583
+ config = TransferConfig(multipart_threshold=8 * 1024 * 1024, max_concurrency=4)
584
+
585
+ extra_args = {"ContentType": file.content_type or "application/octet-stream"}
586
+ sse = os.getenv("S3_SSE", "").upper()
587
+ if sse == "AES256":
588
+ extra_args["ServerSideEncryption"] = "AES256"
589
+ elif sse == "KMS":
590
+ extra_args["ServerSideEncryption"] = "aws:kms"
591
+ kms_key = os.getenv("S3_KMS_KEY_ID")
592
+ if kms_key:
593
+ extra_args["SSEKMSKeyId"] = kms_key
594
+
595
+ try:
596
+ s3.upload_fileobj(
597
+ Fileobj=file.file,
598
+ Bucket=S3_BUCKET,
599
+ Key=key,
600
+ ExtraArgs=extra_args,
601
+ Config=config
602
+ )
603
+ except ClientError as e:
604
+ code = e.response.get("Error", {}).get("Code", "Unknown")
605
+ msg = f"S3 upload failed: {code}. Check AWS credentials, permissions (s3:PutObject), region and bucket."
606
+ raise HTTPException(
607
+ status_code=403 if code in ("AccessDenied", "InvalidAccessKeyId", "SignatureDoesNotMatch") else 500,
608
+ detail=msg
609
+ )
610
+
611
+ s3_uri = f"s3://{S3_BUCKET}/{key}"
612
+ session_manager.update_session(chat_id, {"current_file": s3_uri, "state": "initial"})
613
+ session_manager.add_message(chat_id, "system", f"File uploaded to S3: {s3_uri}")
614
+ return s3_uri
615
+
616
+
617
+ # ========================
618
+ # UNIFIED CHAT (non-streaming)
619
+ # ========================
620
+
621
+ @router.post("/chat/unified", response_model=ChatResponse)
622
+ async def chat_unified(
623
+ request: Request,
624
+ chat_id: Optional[str] = Form(None),
625
+ message: Optional[str] = Form(None),
626
+ prefer_bedrock: Optional[bool] = Form(True),
627
+ file: Optional[UploadFile] = File(None),
628
+ ):
629
+ """
630
+ One endpoint that behaves like the Gradio chatbot:
631
+ - Accepts multipart/form-data (file + message) OR application/json.
632
+ - If a file is included, it uploads to S3 and sets current_file.
633
+ - Handles casual chat, pipeline request, approve/reject, and edits.
634
+ - On approval, executes the pipeline (non-stream) and returns the final result.
635
+ Returns assistant_response + full history (role/content).
636
+ """
637
+
638
+ # Support JSON payloads too
639
+ content_type = (request.headers.get("content-type") or "").lower()
640
+ file_path_from_json = None
641
+ if "application/json" in content_type:
642
+ body = await request.json()
643
+ chat_id = body.get("chat_id") or chat_id
644
+ message = body.get("message") if "message" in body else message
645
+ prefer_bedrock = body.get("prefer_bedrock", True) if "prefer_bedrock" in body else prefer_bedrock
646
+ file_path_from_json = body.get("file_path")
647
+
648
+ chat_id = _ensure_chat(chat_id)
649
+ session = _get_session_or_init(chat_id)
650
+
651
+ # If JSON included a file_path (e.g., s3://...), attach it
652
+ if file_path_from_json:
653
+ session_manager.update_session(chat_id, {"current_file": file_path_from_json})
654
+ session_manager.add_message(chat_id, "system", f"File attached: {file_path_from_json}")
655
+ session = _get_session_or_init(chat_id)
656
+
657
+ # If a file is included in the form, upload to S3 and attach it
658
+ file_info = None
659
+ if file is not None:
660
+ s3_uri = upload_stream_to_s3(chat_id, file)
661
+ file_info = {"bucket": S3_BUCKET, "key": s3_uri.split(f"s3://{S3_BUCKET}/", 1)[1], "s3_uri": s3_uri}
662
+ session = _get_session_or_init(chat_id)
663
+
664
+ # If no message and only a file was sent, respond with an acknowledgement
665
+ if (message is None or str(message).strip() == "") and file_info:
666
+ friendly = "📁 File uploaded successfully. Tell me what you'd like to do with it (e.g., extract text, get tables, summarize)."
667
+ api_data = {"type": "file_uploaded", "file": file_info, "next_action": "send_instruction"}
668
+ return _assistant_response_payload(chat_id, friendly, {"intent": "file_uploaded"}, api_data, session.get("state", "initial"))
669
+
670
+ # If still no message, nudge the user
671
+ if message is None or str(message).strip() == "":
672
+ friendly = "Please provide a message (e.g., 'extract text', 'get tables', 'summarize')."
673
+ api_data = {"type": "missing_message"}
674
+ return _assistant_response_payload(chat_id, friendly, {"intent": "missing_message"}, api_data, session.get("state", "initial"))
675
+
676
+ # Add user message
677
+ session_manager.add_message(chat_id, "user", message)
678
+
679
+ # Classify intent
680
+ intent_data = intent_classifier.classify_intent(message)
681
+ current_state = session.get("state", "initial")
682
+
683
+ try:
684
+ # Casual chat
685
+ if intent_data["intent"] == "casual_chat":
686
+ friendly = intent_classifier.get_friendly_response("casual_chat", message)
687
+ api_data = {
688
+ "type": "casual_response",
689
+ "message": friendly,
690
+ "intent_classification": intent_data,
691
+ "suggestions": [
692
+ "Upload a document to get started",
693
+ "Ask 'what can you do?' to see capabilities",
694
+ "Type 'help' for usage instructions"
695
+ ]
696
+ }
697
+ return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
698
+
699
+ # Questions
700
+ if intent_data["intent"] == "question":
701
+ friendly = intent_classifier.get_friendly_response("question", message)
702
+ api_data = {"type": "informational_response", "message": friendly, "intent_classification": intent_data}
703
+ return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
704
+
705
+ # Unclear
706
+ if intent_data["intent"] == "unclear":
707
+ friendly = intent_classifier.get_friendly_response("unclear", message)
708
+ api_data = {
709
+ "type": "clarification_needed",
710
+ "message": friendly,
711
+ "intent_classification": intent_data,
712
+ "suggestions": [
713
+ "Be more specific about what you want to do",
714
+ "Use keywords like: extract, summarize, translate, etc.",
715
+ "Type 'help' for examples"
716
+ ]
717
+ }
718
+ return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
719
+
720
+ # Approval (execute now in unified endpoint)
721
+ if intent_data["intent"] == "approval" and current_state == "pipeline_proposed":
722
+ proposed = session.get("proposed_pipeline")
723
+ if not proposed:
724
+ msg = "No pipeline to approve. Please request a task first."
725
+ return _assistant_response_payload(chat_id, msg, intent_data, {"type": "error", "message": msg}, current_state)
726
+
727
+ file_ref = session.get("current_file")
728
+ local_path, cleanup = download_to_temp_file(file_ref)
729
+ session_manager.update_session(chat_id, {"state": "executing"})
730
+
731
+ try:
732
+ result = execute_pipeline(
733
+ pipeline=proposed,
734
+ file_path=local_path,
735
+ session_id=chat_id,
736
+ prefer_bedrock=bool(prefer_bedrock),
737
+ )
738
+ session_manager.update_session(chat_id, {"pipeline_result": result, "state": "initial"})
739
+ friendly = (
740
+ f"🎉 Pipeline completed successfully!\n"
741
+ f"✅ All done! What else would you like me to help you with?"
742
+ )
743
+ api_data = {
744
+ "type": "pipeline_completed",
745
+ "result": result,
746
+ "pipeline": proposed
747
+ }
748
+ return _assistant_response_payload(chat_id, friendly, {"intent": "pipeline_execute"}, api_data, "initial")
749
+ except Exception as e:
750
+ session_manager.update_session(chat_id, {"state": "initial"})
751
+ friendly = f"❌ Pipeline execution failed: {str(e)}"
752
+ api_data = {"type": "error", "error_code": "PIPELINE_EXECUTION_FAILED", "message": str(e)}
753
+ return _assistant_response_payload(chat_id, friendly, {"intent": "pipeline_execute"}, api_data, "initial")
754
+ finally:
755
+ try:
756
+ cleanup()
757
+ except Exception:
758
+ pass
759
+
760
+ # Rejection
761
+ if intent_data["intent"] == "rejection" and current_state == "pipeline_proposed":
762
+ session_manager.update_session(chat_id, {"state": "initial", "proposed_pipeline": None})
763
+ friendly = "👍 No problem! The pipeline has been cancelled. What else would you like me to help you with?"
764
+ api_data = {"type": "pipeline_rejected", "message": "Pipeline cancelled by user", "state_reset": True}
765
+ return _assistant_response_payload(chat_id, friendly, intent_data, api_data, "initial")
766
+
767
+ # Pipeline request
768
+ if intent_data["intent"] == "pipeline_request" and intent_data.get("requires_pipeline", False):
769
+ if not session.get("current_file"):
770
+ friendly = (
771
+ "📁 Please upload a document first before I can process it!\n\n"
772
+ "Once you upload a file, I'll be happy to help you with that task."
773
+ )
774
+ api_data = {
775
+ "type": "error",
776
+ "error_code": "NO_FILE_UPLOADED",
777
+ "message": "Document required before pipeline generation",
778
+ "action_required": "upload_file"
779
+ }
780
+ return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
781
+
782
+ try:
783
+ pipeline = generate_pipeline(
784
+ user_input=message,
785
+ file_path=session.get("current_file"),
786
+ prefer_bedrock=bool(prefer_bedrock),
787
+ )
788
+
789
+ session_manager.update_session(chat_id, {"proposed_pipeline": pipeline, "state": "pipeline_proposed"})
790
+
791
+ pipeline_name = pipeline.get("pipeline_name", "Document Processing")
792
+ steps_list = pipeline.get("pipeline_steps", [])
793
+ steps_summary = "\n".join([f" {i+1}. {step.get('tool', 'Unknown')}" for i, step in enumerate(steps_list)])
794
+
795
+ friendly = (
796
+ f"🎯 **Pipeline Created: {pipeline_name}**\n"
797
+ f"Here's what I'll do:\n{steps_summary}\n"
798
+ f"**Ready to proceed?**\n"
799
+ f"- Type 'approve' or 'yes' to execute\n"
800
+ f"- Type 'reject' or 'no' to cancel\n"
801
+ f"- Describe changes to modify the plan"
802
+ )
803
+
804
+ api_data = {
805
+ "type": "pipeline_generated",
806
+ "message": "Pipeline successfully created",
807
+ "pipeline": pipeline,
808
+ "pipeline_summary": {
809
+ "name": pipeline_name,
810
+ "total_steps": len(steps_list),
811
+ "steps": steps_list,
812
+ "generator": pipeline.get("_generator"),
813
+ "model": pipeline.get("_model")
814
+ },
815
+ "required_action": "approval",
816
+ "next_steps": {
817
+ "approve": "Type 'approve' or 'yes'",
818
+ "reject": "Type 'reject' or 'no'",
819
+ "modify": "Describe your changes"
820
+ }
821
+ }
822
+ return _assistant_response_payload(chat_id, friendly, intent_data, api_data, "pipeline_proposed")
823
+
824
+ except Exception as e:
825
+ friendly = (
826
+ f"❌ Oops! I encountered an error while creating the pipeline:\n\n{str(e)}\n\n"
827
+ "Please try rephrasing your request or type 'help' for examples."
828
+ )
829
+ api_data = {
830
+ "type": "error",
831
+ "error_code": "PIPELINE_GENERATION_FAILED",
832
+ "message": str(e),
833
+ "traceback": str(e),
834
+ }
835
+ return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
836
+
837
+ # Modify when pipeline_proposed and user describes changes
838
+ if current_state == "pipeline_proposed":
839
+ if len(message.strip()) > 5:
840
+ try:
841
+ original_plan = session.get("proposed_pipeline", {})
842
+ edit_context = f"Original: {original_plan.get('pipeline_name')}. User wants: {message}"
843
+ new_pipeline = generate_pipeline(
844
+ user_input=edit_context,
845
+ file_path=session.get("current_file"),
846
+ prefer_bedrock=bool(prefer_bedrock)
847
+ )
848
+ session_manager.update_session(chat_id, {"proposed_pipeline": new_pipeline, "state": "pipeline_proposed"})
849
+
850
+ formatted = format_pipeline_for_display(new_pipeline)
851
+ friendly = formatted + f"\n\n```json\n{json.dumps(new_pipeline, indent=2)}\n```"
852
+
853
+ api_data = {
854
+ "type": "pipeline_modified",
855
+ "message": "Pipeline updated based on user's edits",
856
+ "pipeline": new_pipeline
857
+ }
858
+ return _assistant_response_payload(chat_id, friendly, intent_data, api_data, "pipeline_proposed")
859
+ except Exception as e:
860
+ api_data = {
861
+ "type": "edit_failed",
862
+ "error": str(e),
863
+ "message": "Could not modify the plan",
864
+ "action": "Try 'approve' to run as-is, or 'reject' to start over"
865
+ }
866
+ friendly = f"```json\n{json.dumps(api_data, indent=2)}\n```"
867
+ return _assistant_response_payload(chat_id, friendly, intent_data, api_data, "pipeline_proposed")
868
+
869
+ # Waiting for confirmation
870
+ api_data = {
871
+ "type": "waiting_for_confirmation",
872
+ "message": "Please type 'approve', 'reject', or describe changes",
873
+ "hint": "You can also say 'edit' for modification hints"
874
+ }
875
+ friendly = f"```json\n{json.dumps(api_data, indent=2)}\n```"
876
+ return _assistant_response_payload(chat_id, friendly, intent_data, api_data, "pipeline_proposed")
877
+
878
+ # Default nudge
879
+ friendly = (
880
+ "I'm here to help process documents! Please tell me what you'd like to do with your document.\n\n"
881
+ "For example:\n- 'extract text and summarize'\n- 'get tables from pages 2-5'\n- 'translate to Spanish'\n\n"
882
+ "Type 'help' to see all capabilities!"
883
+ )
884
+ api_data = {
885
+ "type": "unclear_intent",
886
+ "message": "Could not determine appropriate action",
887
+ "intent_classification": intent_data,
888
+ "current_state": current_state
889
+ }
890
+ return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
891
+
892
+ except Exception as e:
893
+ error_msg = f"An unexpected error occurred: {str(e)}"
894
+ history = _normalize_history_for_api(chat_id)
895
+ return ChatResponse(
896
+ assistant_response=error_msg,
897
+ api_response={"type": "unexpected_error", "error": str(e)},
898
+ intent=intent_data if isinstance(intent_data, dict) else {"intent": "unknown"},
899
+ chat_id=chat_id,
900
+ state=current_state if isinstance(current_state, str) else "initial",
901
+ history=history
902
+ )
903
+
904
+
905
+ # ========================
906
+ # UNIFIED CHAT (streaming, NDJSON)
907
+ # ========================
908
+
909
+ @router.post("/chat/unified/stream")
910
+ def chat_unified_stream(
911
+ request: Request,
912
+ chat_id: Optional[str] = Form(None),
913
+ message: Optional[str] = Form(None),
914
+ prefer_bedrock: Optional[bool] = Form(True),
915
+ file: Optional[UploadFile] = File(None),
916
+ ):
917
+ """
918
+ Unified streaming endpoint (NDJSON), same behavior as Gradio:
919
+ - Accepts multipart/form-data (file + message) OR JSON.
920
+ - Uploads file if included.
921
+ - On approval, streams execution progress and final result.
922
+ """
923
+
924
+ async def prepare():
925
+ # Parse JSON if needed
926
+ content_type = (request.headers.get("content-type") or "").lower()
927
+ file_path_from_json = None
928
+ _chat_id, _message, _prefer_bedrock, _file = chat_id, message, prefer_bedrock, file
929
+ if "application/json" in content_type:
930
+ body = await request.json()
931
+ _chat_id = body.get("chat_id") or _chat_id
932
+ _message = body.get("message") if "message" in body else _message
933
+ _prefer_bedrock = body.get("prefer_bedrock", True) if "prefer_bedrock" in body else _prefer_bedrock
934
+ file_path_from_json = body.get("file_path")
935
+
936
+ _chat_id = _ensure_chat(_chat_id)
937
+ _session = _get_session_or_init(_chat_id)
938
+
939
+ # Attach JSON file path if provided
940
+ if file_path_from_json:
941
+ session_manager.update_session(_chat_id, {"current_file": file_path_from_json})
942
+ session_manager.add_message(_chat_id, "system", f"File attached: {file_path_from_json}")
943
+ _session = _get_session_or_init(_chat_id)
944
+
945
+ # Upload file if provided (form)
946
+ uploaded_file_info = None
947
+ if _file is not None:
948
+ s3_uri = upload_stream_to_s3(_chat_id, _file)
949
+ uploaded_file_info = {"bucket": S3_BUCKET, "key": s3_uri.split(f"s3://{S3_BUCKET}/", 1)[1], "s3_uri": s3_uri}
950
+ _session = _get_session_or_init(_chat_id)
951
+
952
+ return _chat_id, _message, _prefer_bedrock, uploaded_file_info
953
+
954
+ def make_stream(chat_id_local: str, msg: Optional[str], prefer_bedrock_local: bool, uploaded_file_info: Optional[Dict[str, Any]]):
955
+ def emit(obj: Dict[str, Any]):
956
+ obj.setdefault("chat_id", chat_id_local)
957
+ obj.setdefault("state", session_manager.get_session(chat_id_local).get("state", "initial"))
958
+ line = json.dumps(obj, ensure_ascii=False).encode("utf-8") + b"\n"
959
+ return line
960
+
961
+ def gen() -> Generator[bytes, None, None]:
962
+ session = _get_session_or_init(chat_id_local)
963
+
964
+ # If only a file was uploaded and no message, acknowledge
965
+ if (msg is None or str(msg).strip() == "") and uploaded_file_info:
966
+ friendly = "📁 File uploaded successfully. Tell me what you'd like to do with it (e.g., extract text, get tables, summarize)."
967
+ session_manager.add_message(chat_id_local, "assistant", friendly)
968
+ yield emit({"type": "assistant_final", "content": friendly, "file": uploaded_file_info, "history": [m.dict() for m in _normalize_history_for_api(chat_id_local)]})
969
+ return
970
+
971
+ # If no message at all, nudge
972
+ if msg is None or str(msg).strip() == "":
973
+ friendly = "Please provide a message (e.g., 'extract text', 'get tables', 'summarize')."
974
+ session_manager.add_message(chat_id_local, "assistant", friendly)
975
+ yield emit({"type": "assistant_final", "content": friendly, "history": [m.dict() for m in _normalize_history_for_api(chat_id_local)]})
976
+ return
977
+
978
+ # Add user message
979
+ session_manager.add_message(chat_id_local, "user", msg)
980
+
981
+ # Classify
982
+ intent_data = intent_classifier.classify_intent(msg)
983
+ current_state = session.get("state", "initial")
984
+
985
+ # Casual / question / unclear at initial
986
+ if intent_data["intent"] in {"casual_chat", "question", "unclear"} and current_state == "initial":
987
+ friendly = intent_classifier.get_friendly_response(intent_data["intent"], msg)
988
+ session_manager.add_message(chat_id_local, "assistant", friendly)
989
+ yield emit({"type": "assistant_final", "content": friendly, "intent": intent_data, "history": [m.dict() for m in _normalize_history_for_api(chat_id_local)]})
990
+ return
991
+
992
+ # Initial: pipeline request or nudge
993
+ if current_state == "initial":
994
+ if not intent_data.get("requires_pipeline", False):
995
+ friendly = (
996
+ "I'm here to help process documents! Please tell me what you'd like to do with your document.\n\n"
997
+ "For example:\n- 'extract text and summarize'\n- 'get tables from pages 2-5'\n- 'translate to Spanish'\n\n"
998
+ "Type 'help' to see all capabilities!"
999
+ )
1000
+ session_manager.add_message(chat_id_local, "assistant", friendly)
1001
+ yield emit({"type": "assistant_final", "content": friendly, "intent": intent_data, "history": [m.dict() for m in _normalize_history_for_api(chat_id_local)]})
1002
+ return
1003
+
1004
+ if not session.get("current_file"):
1005
+ friendly = "📁 Please upload a document first before I can process it!\n\nClick 'Upload Document' to get started."
1006
+ session_manager.add_message(chat_id_local, "assistant", friendly)
1007
+ yield emit({"type": "assistant_final", "content": friendly, "intent": intent_data, "history": [m.dict() for m in _normalize_history_for_api(chat_id_local)]})
1008
+ return
1009
+
1010
+ # Generate pipeline (no need to download file)
1011
+ yield emit({"type": "status", "message": "Analyzing request and creating a pipeline..."})
1012
+ try:
1013
+ pipeline = generate_pipeline(
1014
+ user_input=msg,
1015
+ file_path=session.get("current_file"),
1016
+ prefer_bedrock=bool(prefer_bedrock_local)
1017
+ )
1018
+ session_manager.update_session(chat_id_local, {"proposed_pipeline": pipeline, "state": "pipeline_proposed"})
1019
+
1020
+ pipeline_name = pipeline.get("pipeline_name", "Document Processing")
1021
+ steps_list = pipeline.get("pipeline_steps", [])
1022
+ steps_summary = "\n".join([f" {i+1}. {step.get('tool', 'Unknown')}" for i, step in enumerate(steps_list)])
1023
+
1024
+ friendly = (
1025
+ f"🎯 **Pipeline Created: {pipeline_name}**\n"
1026
+ f"Here's what I'll do:\n{steps_summary}\n"
1027
+ f"**Ready to proceed?**\n"
1028
+ f"- Type 'approve' or 'yes' to execute\n"
1029
+ f"- Type 'reject' or 'no' to cancel\n"
1030
+ f"- Describe changes to modify the plan"
1031
+ )
1032
+
1033
+ session_manager.add_message(chat_id_local, "assistant", friendly)
1034
+ yield emit({"type": "assistant_final", "content": friendly, "pipeline": pipeline, "history": [m.dict() for m in _normalize_history_for_api(chat_id_local)]})
1035
+ return
1036
+ except Exception as e:
1037
+ friendly = f"❌ Error generating pipeline: {str(e)}"
1038
+ session_manager.add_message(chat_id_local, "assistant", friendly)
1039
+ yield emit({"type": "assistant_final", "content": friendly, "error": str(e), "history": [m.dict() for m in _normalize_history_for_api(chat_id_local)]})
1040
+ return
1041
+
1042
+ # Pipeline proposed: handle approval, rejection, or edit
1043
+ if current_state == "pipeline_proposed":
1044
+ if intent_data["intent"] == "approval":
1045
+ session_manager.update_session(chat_id_local, {"state": "executing"})
1046
+ plan = session.get("proposed_pipeline", {})
1047
+ initial = (
1048
+ f"✅ Approved! Starting execution of: **{plan.get('pipeline_name', 'pipeline')}**\n\n"
1049
+ f"🚀 Processing, please wait...\n_(Using {plan.get('_generator', 'AI')} - {plan.get('_model', 'model')})_"
1050
+ )
1051
+ yield emit({"type": "assistant_delta", "content": initial})
1052
+
1053
+ steps_completed = []
1054
+ final_payload = None
1055
+ executor_used = "unknown"
1056
+ accumulated = initial
1057
+
1058
+ file_ref = session.get("current_file")
1059
+ local_path, cleanup = download_to_temp_file(file_ref)
1060
+
1061
+ try:
1062
+ for event in execute_pipeline_streaming(
1063
+ pipeline=plan,
1064
+ file_path=local_path,
1065
+ session_id=chat_id_local,
1066
+ prefer_bedrock=bool(prefer_bedrock_local)
1067
+ ):
1068
+ etype = event.get("type")
1069
+
1070
+ if etype == "info":
1071
+ msg2 = f"ℹ️ {event.get('message')} _(Executor: {event.get('executor', 'unknown')})_"
1072
+ accumulated += "\n\n" + msg2
1073
+ yield emit({"type": "assistant_delta", "content": accumulated})
1074
+
1075
+ elif etype == "step":
1076
+ step_num = event.get("step", 0)
1077
+ tool_name = event.get("tool", "processing")
1078
+ status = event.get("status", "running")
1079
+ if status == "completed" and "observation" in event:
1080
+ obs_preview = str(event.get("observation"))[:80]
1081
+ step_msg = f"✅ Step {step_num}: {tool_name} - Completed!\n Preview: {obs_preview}..."
1082
+ elif status == "executing":
1083
+ step_msg = f"⏳ Step {step_num}: {tool_name} - Processing..."
1084
+ else:
1085
+ step_msg = f"📍 Step {step_num}: {tool_name}"
1086
+
1087
+ steps_completed.append({
1088
+ "step": step_num,
1089
+ "tool": tool_name,
1090
+ "status": status,
1091
+ "executor": event.get("executor", "unknown"),
1092
+ "observation": event.get("observation"),
1093
+ "input": event.get("input"),
1094
+ })
1095
+ executor_used = event.get("executor", executor_used)
1096
+
1097
+ accumulated += "\n\n" + step_msg
1098
+ yield emit({"type": "assistant_delta", "content": accumulated})
1099
+
1100
+ elif etype == "final":
1101
+ final_payload = event.get("data")
1102
+ executor_used = event.get("executor", executor_used)
1103
+
1104
+ elif etype == "error":
1105
+ err = event.get("error", "Unknown error")
1106
+ friendly_err = f"❌ Pipeline Failed\n\nError: {err}\n\nCompleted {len(steps_completed)} step(s) before failure."
1107
+ session_manager.update_session(chat_id_local, {"state": "initial"})
1108
+ session_manager.add_message(chat_id_local, "assistant", friendly_err)
1109
+ yield emit({"type": "assistant_final", "content": friendly_err, "error": err, "history": [m.dict() for m in _normalize_history_for_api(chat_id_local)]})
1110
+ return
1111
+
1112
+ # Finalize
1113
+ if final_payload:
1114
+ session_manager.update_session(chat_id_local, {"pipeline_result": final_payload, "state": "initial"})
1115
+ session_manager.save_pipeline_execution(
1116
+ session_id=chat_id_local,
1117
+ pipeline=plan,
1118
+ result=final_payload,
1119
+ file_path=file_ref,
1120
+ executor=executor_used
1121
+ )
1122
+ success_count = len([s for s in steps_completed if s.get("status") == "completed"])
1123
+ friendly_final = (
1124
+ f"🎉 Pipeline Completed Successfully!\n"
1125
+ f"- Pipeline: {plan.get('pipeline_name', 'Document Processing')}\n"
1126
+ f"- Total Steps: {len(steps_completed)}\n"
1127
+ f"- Successful: {success_count}\n"
1128
+ f"- Executor: {executor_used}\n"
1129
+ f"✅ All done! What else would you like me to help you with?"
1130
+ )
1131
+ session_manager.add_message(chat_id_local, "assistant", friendly_final)
1132
+ yield emit({"type": "assistant_final", "content": friendly_final, "result": final_payload, "history": [m.dict() for m in _normalize_history_for_api(chat_id_local)]})
1133
+ return
1134
+ else:
1135
+ done = f"✅ Pipeline Completed! Executed {len(steps_completed)} steps using {executor_used}."
1136
+ session_manager.update_session(chat_id_local, {"state": "initial"})
1137
+ session_manager.add_message(chat_id_local, "assistant", done)
1138
+ yield emit({"type": "assistant_final", "content": done, "history": [m.dict() for m in _normalize_history_for_api(chat_id_local)]})
1139
+ return
1140
+
1141
+ except Exception as e:
1142
+ friendly_err = f"❌ Pipeline Execution Failed\n\nError: {str(e)}\n\nCompleted {len(steps_completed)} step(s) before failure."
1143
+ session_manager.update_session(chat_id_local, {"state": "initial"})
1144
+ session_manager.add_message(chat_id_local, "assistant", friendly_err)
1145
+ yield emit({"type": "assistant_final", "content": friendly_err, "error": str(e), "history": [m.dict() for m in _normalize_history_for_api(chat_id_local)]})
1146
+ return
1147
+ finally:
1148
+ try:
1149
+ cleanup()
1150
+ except Exception:
1151
+ pass
1152
+
1153
+ elif intent_data["intent"] == "rejection":
1154
+ session_manager.update_session(chat_id_local, {"state": "initial", "proposed_pipeline": None})
1155
+ friendly = "👍 No problem! Pipeline cancelled. What else would you like me to help you with?"
1156
+ session_manager.add_message(chat_id_local, "assistant", friendly)
1157
+ yield emit({"type": "assistant_final", "content": friendly, "history": [m.dict() for m in _normalize_history_for_api(chat_id_local)]})
1158
+ return
1159
+
1160
+ else:
1161
+ # Treat as edit/modify
1162
+ try:
1163
+ original_plan = session.get("proposed_pipeline", {})
1164
+ edit_context = f"Original: {original_plan.get('pipeline_name')}. User wants: {msg}"
1165
+ new_pipeline = generate_pipeline(
1166
+ user_input=edit_context,
1167
+ file_path=session.get("current_file"),
1168
+ prefer_bedrock=bool(prefer_bedrock_local)
1169
+ )
1170
+ session_manager.update_session(chat_id_local, {"proposed_pipeline": new_pipeline, "state": "pipeline_proposed"})
1171
+ formatted = format_pipeline_for_display(new_pipeline)
1172
+ friendly = formatted + f"\n\n```json\n{json.dumps(new_pipeline, indent=2)}\n```"
1173
+ session_manager.add_message(chat_id_local, "assistant", friendly)
1174
+ yield emit({"type": "assistant_final", "content": friendly, "pipeline": new_pipeline, "history": [m.dict() for m in _normalize_history_for_api(chat_id_local)]})
1175
+ return
1176
+ except Exception as e:
1177
+ friendly = f"❌ Edit failed: {str(e)}"
1178
+ session_manager.add_message(chat_id_local, "assistant", friendly)
1179
+ yield emit({"type": "assistant_final", "content": friendly, "error": str(e), "history": [m.dict() for m in _normalize_history_for_api(chat_id_local)]})
1180
+ return
1181
+
1182
+ # Default
1183
+ friendly = "Please upload a document and tell me what you'd like me to do (e.g., extract text, summarize, translate)."
1184
+ session_manager.add_message(chat_id_local, "assistant", friendly)
1185
+ yield emit({"type": "assistant_final", "content": friendly, "history": [m.dict() for m in _normalize_history_for_api(chat_id_local)]})
1186
+
1187
+ return gen()
1188
+
1189
+ async def stream_wrapper():
1190
+ chat_id_local, msg_local, prefer_local, uploaded_info = await prepare()
1191
+ return StreamingResponse(make_stream(chat_id_local, msg_local, bool(prefer_local), uploaded_info), media_type="application/x-ndjson")
1192
+
1193
+ # Note: for FastAPI sync path, we return the awaitable response
1194
+ return stream_wrapper()
1195
+
1196
+
1197
  # ========================
1198
+ # LEGACY SMART CHAT (kept for compatibility)
1199
  # ========================
1200
 
1201
  @router.post("/chat", response_model=ChatResponse)
1202
  async def smart_chat(request: ChatRequest):
1203
  """
1204
+ Kept for compatibility with existing clients (non-stream).
1205
+ For a single all-in-one endpoint, use /api/v2/chat/unified.
 
 
 
 
 
1206
  """
1207
  chat_id = _ensure_chat(request.chat_id)
1208
  session = _get_session_or_init(chat_id)
1209
 
 
1210
  if request.file_path:
1211
  session_manager.update_session(chat_id, {"current_file": request.file_path})
1212
  session = _get_session_or_init(chat_id)
1213
 
 
1214
  session_manager.add_message(chat_id, "user", request.message)
1215
 
 
1216
  intent_data = intent_classifier.classify_intent(request.message)
1217
  current_state = session.get("state", "initial")
1218
 
1219
  try:
 
1220
  if intent_data["intent"] == "casual_chat":
1221
  friendly = intent_classifier.get_friendly_response("casual_chat", request.message)
1222
  api_data = {
 
1231
  }
1232
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
1233
 
 
1234
  if intent_data["intent"] == "question":
1235
  friendly = intent_classifier.get_friendly_response("question", request.message)
1236
  api_data = {"type": "informational_response", "message": friendly, "intent_classification": intent_data}
1237
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
1238
 
 
1239
  if intent_data["intent"] == "unclear":
1240
  friendly = intent_classifier.get_friendly_response("unclear", request.message)
1241
  api_data = {
 
1250
  }
1251
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
1252
 
 
1253
  if intent_data["intent"] == "approval" and current_state == "pipeline_proposed":
1254
  proposed = session.get("proposed_pipeline")
1255
  if not proposed:
1256
  msg = "No pipeline to approve. Please request a task first."
1257
+ return _assistant_response_payload(chat_id, msg, intent_data, {"type": "error", "message": msg}, current_state)
 
 
1258
 
1259
  session_manager.update_session(chat_id, {"state": "executing"})
1260
+ friendly = f"✅ Great! Executing the pipeline: {proposed.get('pipeline_name')}\n\n⏳ Processing... (Use the streaming endpoint for real-time updates)"
 
 
 
1261
  api_data = {
1262
  "type": "pipeline_approved",
1263
  "message": "Pipeline execution started",
1264
  "pipeline": proposed,
1265
  "execution_status": "started",
1266
+ "note": "Use /api/v2/chat/unified/stream for real-time progress"
1267
  }
1268
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, "executing")
1269
 
 
1270
  if intent_data["intent"] == "rejection" and current_state == "pipeline_proposed":
1271
  session_manager.update_session(chat_id, {"state": "initial", "proposed_pipeline": None})
1272
  friendly = "👍 No problem! The pipeline has been cancelled. What else would you like me to help you with?"
1273
  api_data = {"type": "pipeline_rejected", "message": "Pipeline cancelled by user", "state_reset": True}
1274
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, "initial")
1275
 
 
1276
  if intent_data["intent"] == "pipeline_request" and intent_data.get("requires_pipeline", False):
1277
  if not session.get("current_file"):
1278
  friendly = (
 
1287
  }
1288
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
1289
 
 
1290
  try:
1291
  pipeline = generate_pipeline(
1292
  user_input=request.message,
 
1342
  }
1343
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
1344
 
 
1345
  if current_state == "pipeline_proposed":
1346
  if len(request.message.strip()) > 5:
1347
  try:
 
1373
  friendly = f"```json\n{json.dumps(api_data, indent=2)}\n```"
1374
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, "pipeline_proposed")
1375
 
 
1376
  api_data = {
1377
  "type": "waiting_for_confirmation",
1378
  "message": "Please type 'approve', 'reject', or describe changes",
 
1381
  friendly = f"```json\n{json.dumps(api_data, indent=2)}\n```"
1382
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, "pipeline_proposed")
1383
 
 
1384
  friendly = (
1385
  "I'm here to help process documents! Please tell me what you'd like to do with your document.\n\n"
1386
  "For example:\n- 'extract text and summarize'\n- 'get tables from pages 2-5'\n- 'translate to Spanish'\n\n"
 
1395
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
1396
 
1397
  except Exception as e:
 
1398
  error_msg = f"An unexpected error occurred: {str(e)}"
1399
  history = _normalize_history_for_api(chat_id)
1400
  return ChatResponse(
 
1407
  )
1408
 
1409
 
 
 
 
 
1410
  @router.post("/chat/stream")
1411
  def smart_chat_stream(request: ChatRequest):
1412
  """
1413
+ Kept for compatibility (streaming NDJSON).
1414
+ For the all-in-one streaming flow, use /api/v2/chat/unified/stream.
 
 
 
 
1415
  """
1416
 
1417
  def gen() -> Generator[bytes, None, None]:
1418
  chat_id = _ensure_chat(request.chat_id)
1419
  session = _get_session_or_init(chat_id)
1420
 
 
1421
  if request.file_path:
1422
  session_manager.update_session(chat_id, {"current_file": request.file_path})
1423
  session = _get_session_or_init(chat_id)
1424
 
 
1425
  session_manager.add_message(chat_id, "user", request.message)
1426
 
 
1427
  intent_data = intent_classifier.classify_intent(request.message)
1428
  current_state = session.get("state", "initial")
1429
 
 
1433
  line = json.dumps(obj, ensure_ascii=False).encode("utf-8") + b"\n"
1434
  return line
1435
 
 
1436
  if intent_data["intent"] in {"casual_chat", "question", "unclear"} and current_state == "initial":
1437
  friendly = intent_classifier.get_friendly_response(intent_data["intent"], request.message)
1438
  session_manager.add_message(chat_id, "assistant", friendly)
1439
  yield emit({"type": "assistant_final", "content": friendly, "intent": intent_data, "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
1440
  return
1441
 
 
1442
  if current_state == "initial":
1443
  if not intent_data.get("requires_pipeline", False):
1444
  friendly = (
 
1456
  yield emit({"type": "assistant_final", "content": friendly, "intent": intent_data, "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
1457
  return
1458
 
 
1459
  yield emit({"type": "status", "message": "Analyzing request and creating a pipeline..."})
1460
  try:
1461
  pipeline = generate_pipeline(
 
1487
  yield emit({"type": "assistant_final", "content": friendly, "error": str(e), "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
1488
  return
1489
 
 
1490
  if current_state == "pipeline_proposed":
1491
  if intent_data["intent"] == "approval":
1492
  session_manager.update_session(chat_id, {"state": "executing"})
 
1502
  executor_used = "unknown"
1503
  accumulated = initial
1504
 
 
1505
  file_ref = session.get("current_file")
1506
  local_path, cleanup = download_to_temp_file(file_ref)
1507
 
 
1556
  yield emit({"type": "assistant_final", "content": friendly_err, "error": err, "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
1557
  return
1558
 
 
1559
  if final_payload:
1560
  session_manager.update_session(chat_id, {"pipeline_result": final_payload, "state": "initial"})
1561
  session_manager.save_pipeline_execution(
 
1591
  yield emit({"type": "assistant_final", "content": friendly_err, "error": str(e), "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
1592
  return
1593
  finally:
 
1594
  try:
1595
  cleanup()
1596
  except Exception:
 
1604
  return
1605
 
1606
  else:
 
1607
  try:
1608
  original_plan = session.get("proposed_pipeline", {})
1609
  edit_context = f"Original: {original_plan.get('pipeline_name')}. User wants: {request.message}"
 
1624
  yield emit({"type": "assistant_final", "content": friendly, "error": str(e), "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
1625
  return
1626
 
 
1627
  friendly = "Please upload a document and tell me what you'd like me to do (e.g., extract text, summarize, translate)."
1628
  session_manager.add_message(chat_id, "assistant", friendly)
1629
  yield emit({"type": "assistant_final", "content": friendly, "history": [m.dict() for m in _normalize_history_for_api(chat_id)]})
 
1632
 
1633
 
1634
  # ========================
1635
+ # PIPELINE EXECUTION ENDPOINTS (keep for compatibility)
1636
  # ========================
1637
 
1638
  @router.post("/chats/{chat_id}/pipeline/execute", response_model=ChatResponse)
1639
  async def execute_pipeline_now(chat_id: str):
 
 
 
 
1640
  session = session_manager.get_session(chat_id)
1641
  if not session:
1642
  raise HTTPException(status_code=404, detail="Chat not found")
 
1658
  session_manager.update_session(chat_id, {"pipeline_result": result, "state": "initial"})
1659
  friendly = "🎉 Pipeline completed. Ready for your next task!"
1660
  api_data = {"type": "pipeline_completed", "result": result, "pipeline": plan}
1661
+ return _assistant_response_payload(chat_id, friendly, {"intent": "pipeline_execute"}, api_data, "initial")
1662
  except Exception as e:
1663
  session_manager.update_session(chat_id, {"state": "initial"})
1664
  friendly = f"❌ Pipeline execution failed: {str(e)}"
1665
  api_data = {"type": "error", "error_code": "PIPELINE_EXECUTION_FAILED", "message": str(e)}
1666
+ return _assistant_response_payload(chat_id, friendly, {"intent": "pipeline_execute"}, api_data, "initial")
1667
  finally:
 
1668
  try:
1669
  cleanup()
1670
  except Exception:
 
1673
 
1674
  @router.post("/chats/{chat_id}/pipeline/execute/stream")
1675
  def execute_pipeline_stream_endpoint(chat_id: str):
 
 
 
 
1676
  session = session_manager.get_session(chat_id)
1677
  if not session:
1678
  raise HTTPException(status_code=404, detail="Chat not found")
 
1698
  prefer_bedrock=True
1699
  ):
1700
  yield emit(event)
 
 
1701
  yield emit({"type": "info", "message": "Execution finished."})
1702
  except Exception as e:
1703
  yield emit({"type": "error", "error": str(e)})
1704
  finally:
 
1705
  try:
1706
  cleanup()
1707
  except Exception:
1708
  pass
 
1709
  session_manager.update_session(chat_id, {"state": "initial"})
1710
 
1711
  return StreamingResponse(gen(), media_type="application/x-ndjson")
 
1754
 
1755
  @router.post("/chats/{chat_id}/messages", response_model=ChatResponse)
1756
  async def send_message_to_chat(chat_id: str, payload: ChatRequest):
 
 
 
 
1757
  payload.chat_id = chat_id
1758
  return await smart_chat(payload)
1759
 
1760
 
1761
  # ========================
1762
+ # FILE UPLOAD (to S3, no presigned URLs) — still available
1763
  # ========================
1764
 
1765
  @router.post("/chats/{chat_id}/upload")
1766
  async def upload_file_to_chat(chat_id: str, file: UploadFile = File(...)):
 
 
 
1767
  chat_id = _ensure_chat(chat_id)
1768
+ s3_uri = upload_stream_to_s3(chat_id, file)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1769
  return {
1770
  "status": "success",
1771
  "message": "File uploaded to S3",
1772
+ "file": {"bucket": S3_BUCKET, "key": s3_uri.split(f"s3://{S3_BUCKET}/", 1)[1], "s3_uri": s3_uri},
1773
  "chat_id": chat_id,
1774
  "next_action": "💬 Now tell me what you'd like to do with this document"
1775
  }