redhairedshanks1 commited on
Commit
6f4a1bb
·
1 Parent(s): 6e1f348

Update api_routes_v2.py

Browse files
Files changed (1) hide show
  1. api_routes_v2.py +65 -28
api_routes_v2.py CHANGED
@@ -403,13 +403,13 @@
403
  from fastapi import APIRouter, HTTPException, UploadFile, File
404
  from fastapi.responses import StreamingResponse
405
  from pydantic import BaseModel
406
- from typing import Optional, List, Dict, Any, Generator
407
  import json
408
  import os
409
  from datetime import datetime
410
  from urllib.parse import urlparse
411
- from pathlib import Path
412
  import tempfile
 
413
 
414
  # AWS S3 (server-side access, no presigned URLs)
415
  import boto3
@@ -465,12 +465,34 @@ class ChatResponse(BaseModel):
465
  # ========================
466
 
467
  def _ensure_chat(chat_id: Optional[str]) -> str:
468
- """Create a new chat if none provided."""
469
- return chat_id or session_manager.create_session()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
470
 
471
 
472
  def _normalize_history_for_api(chat_id: str) -> List[Message]:
473
- """Return history in Gradio-like format: [{role, content, timestamp}]"""
 
 
474
  session = session_manager.get_session(chat_id) or {}
475
  raw_messages = session.get("messages") or []
476
  history: List[Message] = []
@@ -505,7 +527,7 @@ def _assistant_response_payload(
505
  )
506
 
507
 
508
- def parse_s3_uri(uri: str):
509
  """
510
  Parse s3://bucket/key to (bucket, key).
511
  """
@@ -515,9 +537,9 @@ def parse_s3_uri(uri: str):
515
  return p.netloc, p.path.lstrip("/")
516
 
517
 
518
- def download_to_temp_file(file_ref: Optional[str]) -> (Optional[str], callable):
519
  """
520
- If file_ref is an S3 URI, download to a temporary file and return path + cleanup callback.
521
  If local path or None, return as-is and a no-op cleanup.
522
  """
523
  def noop():
@@ -526,7 +548,7 @@ def download_to_temp_file(file_ref: Optional[str]) -> (Optional[str], callable):
526
  if not file_ref:
527
  return None, noop
528
 
529
- if file_ref.startswith("s3://"):
530
  bucket, key = parse_s3_uri(file_ref)
531
  suffix = os.path.splitext(key)[1] or ""
532
  fd, temp_path = tempfile.mkstemp(prefix="masterllm_", suffix=suffix)
@@ -569,12 +591,12 @@ async def smart_chat(request: ChatRequest):
569
  - assistant_response (LLM-visible reply), plus full history like Gradio UI
570
  """
571
  chat_id = _ensure_chat(request.chat_id)
572
- session = session_manager.get_session(chat_id) or {}
573
 
574
  # Update file if provided (can be local path or s3://)
575
  if request.file_path:
576
  session_manager.update_session(chat_id, {"current_file": request.file_path})
577
- session = session_manager.get_session(chat_id)
578
 
579
  # Add user message
580
  session_manager.add_message(chat_id, "user", request.message)
@@ -665,7 +687,7 @@ async def smart_chat(request: ChatRequest):
665
  }
666
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
667
 
668
- # Generate pipeline (pass the stored file ref; generator usually doesn't need bytes)
669
  try:
670
  pipeline = generate_pipeline(
671
  user_input=request.message,
@@ -807,12 +829,12 @@ def smart_chat_stream(request: ChatRequest):
807
 
808
  def gen() -> Generator[bytes, None, None]:
809
  chat_id = _ensure_chat(request.chat_id)
810
- session = session_manager.get_session(chat_id) or {}
811
 
812
  # Update file if provided
813
  if request.file_path:
814
  session_manager.update_session(chat_id, {"current_file": request.file_path})
815
- session = session_manager.get_session(chat_id)
816
 
817
  # Add user message
818
  session_manager.add_message(chat_id, "user", request.message)
@@ -824,7 +846,6 @@ def smart_chat_stream(request: ChatRequest):
824
  def emit(obj: Dict[str, Any]):
825
  obj.setdefault("chat_id", chat_id)
826
  obj.setdefault("state", session_manager.get_session(chat_id).get("state", current_state))
827
- # Add history only on finals to keep payloads small
828
  line = json.dumps(obj, ensure_ascii=False).encode("utf-8") + b"\n"
829
  return line
830
 
@@ -1045,7 +1066,9 @@ async def execute_pipeline_now(chat_id: str):
1045
  Execute the currently proposed pipeline and return final result (non-streaming).
1046
  Downloads the file to a temp path for execution and deletes it afterward.
1047
  """
1048
- session = session_manager.get_session(chat_id) or {}
 
 
1049
  plan = session.get("proposed_pipeline")
1050
  if not plan:
1051
  raise HTTPException(status_code=400, detail="No pipeline proposed for this chat.")
@@ -1084,7 +1107,9 @@ def execute_pipeline_stream_endpoint(chat_id: str):
1084
  Stream the execution of the currently proposed pipeline (NDJSON).
1085
  Downloads the file to a temp path for execution and deletes it afterward.
1086
  """
1087
- session = session_manager.get_session(chat_id) or {}
 
 
1088
  plan = session.get("proposed_pipeline")
1089
  if not plan:
1090
  raise HTTPException(status_code=400, detail="No pipeline proposed for this chat.")
@@ -1188,14 +1213,22 @@ async def upload_file_to_chat(chat_id: str, file: UploadFile = File(...)):
1188
  key = f"{S3_PREFIX}/{chat_id}/{file.filename}"
1189
  config = TransferConfig(multipart_threshold=8 * 1024 * 1024, max_concurrency=4)
1190
 
1191
- # Stream from request to S3 (no full in-memory read, no local disk)
1192
- s3.upload_fileobj(
1193
- Fileobj=file.file,
1194
- Bucket=S3_BUCKET,
1195
- Key=key,
1196
- ExtraArgs={"ContentType": file.content_type or "application/octet-stream"},
1197
- Config=config
1198
- )
 
 
 
 
 
 
 
 
1199
 
1200
  s3_uri = f"s3://{S3_BUCKET}/{key}"
1201
  session_manager.update_session(chat_id, {"current_file": s3_uri, "state": "initial"})
@@ -1214,10 +1247,14 @@ async def upload_file_to_chat(chat_id: str, file: UploadFile = File(...)):
1214
  @router.get("/chats/{chat_id}/file")
1215
  def download_chat_file(chat_id: str):
1216
  s = session_manager.get_session(chat_id)
1217
- if not s or not s.get("current_file", "").startswith("s3://"):
 
 
 
 
1218
  raise HTTPException(status_code=404, detail="No S3 file attached to this chat")
1219
 
1220
- bucket, key = parse_s3_uri(s["current_file"])
1221
  try:
1222
  obj = s3.get_object(Bucket=bucket, Key=key)
1223
  except ClientError as e:
@@ -1241,4 +1278,4 @@ def download_chat_file(chat_id: str):
1241
 
1242
  @router.get("/health")
1243
  def health_check():
1244
- return {"status": "ok", "service": "MasterLLM v2.0", "time": datetime.utcnow().isoformat() + "Z"}
 
403
  from fastapi import APIRouter, HTTPException, UploadFile, File
404
  from fastapi.responses import StreamingResponse
405
  from pydantic import BaseModel
406
+ from typing import Optional, List, Dict, Any, Generator, Callable, Tuple
407
  import json
408
  import os
409
  from datetime import datetime
410
  from urllib.parse import urlparse
 
411
  import tempfile
412
+ from pathlib import Path
413
 
414
  # AWS S3 (server-side access, no presigned URLs)
415
  import boto3
 
465
  # ========================
466
 
467
  def _ensure_chat(chat_id: Optional[str]) -> str:
468
+ """
469
+ Ensure a chat exists; if not provided or missing, create a new one.
470
+ """
471
+ if chat_id:
472
+ if session_manager.get_session(chat_id):
473
+ return chat_id
474
+ # Create a brand new chat
475
+ new_id = session_manager.create_session()
476
+ # Warm/init
477
+ session_manager.get_session(new_id)
478
+ return new_id
479
+
480
+
481
+ def _get_session_or_init(chat_id: str) -> Dict[str, Any]:
482
+ """
483
+ Always return a session dict; initialize if missing.
484
+ """
485
+ session = session_manager.get_session(chat_id)
486
+ if not session:
487
+ session_manager.update_session(chat_id, {"state": "initial", "messages": []})
488
+ session = session_manager.get_session(chat_id) or {"state": "initial", "messages": []}
489
+ return session
490
 
491
 
492
  def _normalize_history_for_api(chat_id: str) -> List[Message]:
493
+ """
494
+ Return history in Gradio-like format: [{role, content, timestamp}]
495
+ """
496
  session = session_manager.get_session(chat_id) or {}
497
  raw_messages = session.get("messages") or []
498
  history: List[Message] = []
 
527
  )
528
 
529
 
530
+ def parse_s3_uri(uri: str) -> Tuple[str, str]:
531
  """
532
  Parse s3://bucket/key to (bucket, key).
533
  """
 
537
  return p.netloc, p.path.lstrip("/")
538
 
539
 
540
+ def download_to_temp_file(file_ref: Optional[str]) -> Tuple[Optional[str], Callable[[], None]]:
541
  """
542
+ If file_ref is an S3 URI, download to a temporary file and return (path, cleanup).
543
  If local path or None, return as-is and a no-op cleanup.
544
  """
545
  def noop():
 
548
  if not file_ref:
549
  return None, noop
550
 
551
+ if isinstance(file_ref, str) and file_ref.startswith("s3://"):
552
  bucket, key = parse_s3_uri(file_ref)
553
  suffix = os.path.splitext(key)[1] or ""
554
  fd, temp_path = tempfile.mkstemp(prefix="masterllm_", suffix=suffix)
 
591
  - assistant_response (LLM-visible reply), plus full history like Gradio UI
592
  """
593
  chat_id = _ensure_chat(request.chat_id)
594
+ session = _get_session_or_init(chat_id)
595
 
596
  # Update file if provided (can be local path or s3://)
597
  if request.file_path:
598
  session_manager.update_session(chat_id, {"current_file": request.file_path})
599
+ session = _get_session_or_init(chat_id)
600
 
601
  # Add user message
602
  session_manager.add_message(chat_id, "user", request.message)
 
687
  }
688
  return _assistant_response_payload(chat_id, friendly, intent_data, api_data, current_state)
689
 
690
+ # Generate pipeline (no need to download file)
691
  try:
692
  pipeline = generate_pipeline(
693
  user_input=request.message,
 
829
 
830
  def gen() -> Generator[bytes, None, None]:
831
  chat_id = _ensure_chat(request.chat_id)
832
+ session = _get_session_or_init(chat_id)
833
 
834
  # Update file if provided
835
  if request.file_path:
836
  session_manager.update_session(chat_id, {"current_file": request.file_path})
837
+ session = _get_session_or_init(chat_id)
838
 
839
  # Add user message
840
  session_manager.add_message(chat_id, "user", request.message)
 
846
  def emit(obj: Dict[str, Any]):
847
  obj.setdefault("chat_id", chat_id)
848
  obj.setdefault("state", session_manager.get_session(chat_id).get("state", current_state))
 
849
  line = json.dumps(obj, ensure_ascii=False).encode("utf-8") + b"\n"
850
  return line
851
 
 
1066
  Execute the currently proposed pipeline and return final result (non-streaming).
1067
  Downloads the file to a temp path for execution and deletes it afterward.
1068
  """
1069
+ session = session_manager.get_session(chat_id)
1070
+ if not session:
1071
+ raise HTTPException(status_code=404, detail="Chat not found")
1072
  plan = session.get("proposed_pipeline")
1073
  if not plan:
1074
  raise HTTPException(status_code=400, detail="No pipeline proposed for this chat.")
 
1107
  Stream the execution of the currently proposed pipeline (NDJSON).
1108
  Downloads the file to a temp path for execution and deletes it afterward.
1109
  """
1110
+ session = session_manager.get_session(chat_id)
1111
+ if not session:
1112
+ raise HTTPException(status_code=404, detail="Chat not found")
1113
  plan = session.get("proposed_pipeline")
1114
  if not plan:
1115
  raise HTTPException(status_code=400, detail="No pipeline proposed for this chat.")
 
1213
  key = f"{S3_PREFIX}/{chat_id}/{file.filename}"
1214
  config = TransferConfig(multipart_threshold=8 * 1024 * 1024, max_concurrency=4)
1215
 
1216
+ try:
1217
+ # Stream from request to S3 (no full in-memory read, no local disk)
1218
+ s3.upload_fileobj(
1219
+ Fileobj=file.file,
1220
+ Bucket=S3_BUCKET,
1221
+ Key=key,
1222
+ ExtraArgs={"ContentType": file.content_type or "application/octet-stream"},
1223
+ Config=config
1224
+ )
1225
+ except ClientError as e:
1226
+ code = e.response.get("Error", {}).get("Code", "Unknown")
1227
+ msg = f"S3 upload failed: {code}. Check AWS credentials, permissions (s3:PutObject), region and bucket."
1228
+ raise HTTPException(
1229
+ status_code=403 if code in ("AccessDenied", "InvalidAccessKeyId", "SignatureDoesNotMatch") else 500,
1230
+ detail=msg
1231
+ )
1232
 
1233
  s3_uri = f"s3://{S3_BUCKET}/{key}"
1234
  session_manager.update_session(chat_id, {"current_file": s3_uri, "state": "initial"})
 
1247
  @router.get("/chats/{chat_id}/file")
1248
  def download_chat_file(chat_id: str):
1249
  s = session_manager.get_session(chat_id)
1250
+ if not s:
1251
+ raise HTTPException(status_code=404, detail="Chat not found")
1252
+
1253
+ file_ref = s.get("current_file")
1254
+ if not file_ref or not isinstance(file_ref, str) or not file_ref.startswith("s3://"):
1255
  raise HTTPException(status_code=404, detail="No S3 file attached to this chat")
1256
 
1257
+ bucket, key = parse_s3_uri(file_ref)
1258
  try:
1259
  obj = s3.get_object(Bucket=bucket, Key=key)
1260
  except ClientError as e:
 
1278
 
1279
  @router.get("/health")
1280
  def health_check():
1281
+ return {"status": "ok", "service": "MasterLLM v2.0", "time": datetime.utcnow().isoformat() + "Z"}