# import os # import tempfile # from typing import List # from fastapi import FastAPI, UploadFile, File, Form # from fastapi.responses import JSONResponse # from pydantic import BaseModel # from fastapi import Body # import traceback # from typing import TypedDict, Dict, Any # # === LangGraph === # from langgraph.graph import StateGraph # # from langgraph.checkpoint import MemorySaver # # === Service Imports === # from services.extract_text import extract_text_from_file, extract_images_with_fitz # from services.extract_table import extract_tables_from_file # from services.vector_store import get_entry, upsert_entry # from services.s3_utils import upload_to_s3 # # === FastAPI Init === # api = FastAPI() # # === Shared helpers === # def save_temp_file(file: UploadFile) -> str: # tmp = tempfile.NamedTemporaryFile(delete=False) # tmp.write(file.file.read()) # tmp.flush() # upload_to_s3(tmp.name, f"documents/{file.filename}") # print(f"📤 Uploaded {file.filename} to S3") # return tmp.name # # === LangGraph Nodes === # def extract_text_node(state): # filename = state["filename"] # path = state["temp_files"][filename] # start_page = state.get("start_page") # end_page = state.get("end_page") # with open(path, "rb") as fh: # state["text"] = extract_text_from_file(fh, start_page, end_page, filename) # return state # def extract_tables_node(state): # filename = state["filename"] # path = state["temp_files"][filename] # start_page = state.get("start_page") # end_page = state.get("end_page") # with open(path, "rb") as fh: # state["tables"] = extract_tables_from_file(fh, start_page, end_page, filename) # return state # node_map = { # "text": extract_text_node, # "table": extract_tables_node # } # # === Individual APIs === # @api.post("/api/text") # async def extract_text_api( # file: UploadFile = File(...), # filename: str = Form(...), # start_page: int = Form(...), # end_page: int = Form(...) # ): # cache = get_entry(filename) or {} # if "text" in cache: # return {"text": cache["text"]} # path = save_temp_file(file) # with open(path, "rb") as fh: # cache["text"] = extract_text_from_file(fh, start_page, end_page, filename) # os.remove(path) # cache.pop("filename", None) # upsert_entry(filename, **cache) # return {"text": cache["text"]} # @api.post("/api/tables") # async def extract_table_api( # file: UploadFile = File(...), # filename: str = Form(...), # start_page: int = Form(...), # end_page: int = Form(...) # ): # cache = get_entry(filename) or {} # if "tables" in cache: # return {"tables": cache["tables"]} # path = save_temp_file(file) # with open(path, "rb") as fh: # cache["tables"] = extract_tables_from_file(fh, start_page, end_page, filename) # os.remove(path) # cache.pop("filename", None) # upsert_entry(filename, **cache) # return {"tables": cache["tables"]} # if __name__ == "__main__": # import uvicorn # uvicorn.run("app:api", host="0.0.0.0", port=7860, reload=True) import os import tempfile from typing import List from fastapi import FastAPI, UploadFile, File, Form from fastapi.responses import JSONResponse from pydantic import BaseModel from fastapi import Body import traceback from typing import TypedDict, Dict, Any # === LangGraph === from langgraph.graph import StateGraph # from langgraph.checkpoint import MemorySaver # === Service Imports === from services.extract_text import extract_text_from_file, extract_images_with_fitz from services.extract_table import extract_tables_from_file # from services.vector_store import get_entry, upsert_entry # ❌ Disabled cache from services.s3_utils import upload_to_s3 # === FastAPI Init === api = FastAPI() # === Root Health Check === @api.get("/") async def root(): return {"status": "ok", "message": "Space is running"} # === Shared helpers === def save_temp_file(file: UploadFile) -> str: tmp = tempfile.NamedTemporaryFile(delete=False) tmp.write(file.file.read()) tmp.flush() upload_to_s3(tmp.name, f"documents/{file.filename}") print(f"📤 Uploaded {file.filename} to S3") return tmp.name # === LangGraph Nodes === def extract_text_node(state): filename = state["filename"] path = state["temp_files"][filename] start_page = state.get("start_page") end_page = state.get("end_page") with open(path, "rb") as fh: state["text"] = extract_text_from_file(fh, start_page, end_page, filename) return state def extract_tables_node(state): filename = state["filename"] path = state["temp_files"][filename] start_page = state.get("start_page") end_page = state.get("end_page") with open(path, "rb") as fh: state["tables"] = extract_tables_from_file(fh, start_page, end_page, filename) return state node_map = { "text": extract_text_node, "table": extract_tables_node } # === Individual APIs === @api.post("/api/text") async def extract_text_api( file: UploadFile = File(...), filename: str = Form(...), start_page: int = Form(...), end_page: int = Form(...), ): # cache = get_entry(filename) or {} # ❌ disabled path = save_temp_file(file) with open(path, "rb") as fh: text = extract_text_from_file(fh, start_page, end_page, filename) os.remove(path) # cache.pop("filename", None) # upsert_entry(filename, **cache) # ❌ disabled return {"text": text} @api.post("/api/tables") async def extract_table_api( file: UploadFile = File(...), filename: str = Form(...), start_page: int = Form(...), end_page: int = Form(...), ): # cache = get_entry(filename) or {} # ❌ disabled path = save_temp_file(file) with open(path, "rb") as fh: tables = extract_tables_from_file(fh, start_page, end_page, filename) os.remove(path) # cache.pop("filename", None) # upsert_entry(filename, **cache) # ❌ disabled return {"tables": tables} if __name__ == "__main__": import uvicorn # ⬇️ Removed reload=True (causing shutdowns in Hugging Face Spaces) uvicorn.run("app:app", host="0.0.0.0", port=7860)