redhairedshanks1's picture
Update app.py
5a92fca verified
# import os
# import tempfile
# from typing import List
# from fastapi import FastAPI, UploadFile, File, Form
# from fastapi.responses import JSONResponse
# from pydantic import BaseModel
# from fastapi import Body
# import traceback
# from typing import TypedDict, Dict, Any
# # === LangGraph ===
# from langgraph.graph import StateGraph
# # from langgraph.checkpoint import MemorySaver
# # === Service Imports ===
# from services.extract_text import extract_text_from_file, extract_images_with_fitz
# from services.extract_table import extract_tables_from_file
# from services.vector_store import get_entry, upsert_entry
# from services.s3_utils import upload_to_s3
# # === FastAPI Init ===
# api = FastAPI()
# # === Shared helpers ===
# def save_temp_file(file: UploadFile) -> str:
# tmp = tempfile.NamedTemporaryFile(delete=False)
# tmp.write(file.file.read())
# tmp.flush()
# upload_to_s3(tmp.name, f"documents/{file.filename}")
# print(f"πŸ“€ Uploaded {file.filename} to S3")
# return tmp.name
# # === LangGraph Nodes ===
# def extract_text_node(state):
# filename = state["filename"]
# path = state["temp_files"][filename]
# start_page = state.get("start_page")
# end_page = state.get("end_page")
# with open(path, "rb") as fh:
# state["text"] = extract_text_from_file(fh, start_page, end_page, filename)
# return state
# def extract_tables_node(state):
# filename = state["filename"]
# path = state["temp_files"][filename]
# start_page = state.get("start_page")
# end_page = state.get("end_page")
# with open(path, "rb") as fh:
# state["tables"] = extract_tables_from_file(fh, start_page, end_page, filename)
# return state
# node_map = {
# "text": extract_text_node,
# "table": extract_tables_node
# }
# # === Individual APIs ===
# @api.post("/api/text")
# async def extract_text_api(
# file: UploadFile = File(...),
# filename: str = Form(...),
# start_page: int = Form(...),
# end_page: int = Form(...)
# ):
# cache = get_entry(filename) or {}
# if "text" in cache:
# return {"text": cache["text"]}
# path = save_temp_file(file)
# with open(path, "rb") as fh:
# cache["text"] = extract_text_from_file(fh, start_page, end_page, filename)
# os.remove(path)
# cache.pop("filename", None)
# upsert_entry(filename, **cache)
# return {"text": cache["text"]}
# @api.post("/api/tables")
# async def extract_table_api(
# file: UploadFile = File(...),
# filename: str = Form(...),
# start_page: int = Form(...),
# end_page: int = Form(...)
# ):
# cache = get_entry(filename) or {}
# if "tables" in cache:
# return {"tables": cache["tables"]}
# path = save_temp_file(file)
# with open(path, "rb") as fh:
# cache["tables"] = extract_tables_from_file(fh, start_page, end_page, filename)
# os.remove(path)
# cache.pop("filename", None)
# upsert_entry(filename, **cache)
# return {"tables": cache["tables"]}
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run("app:api", host="0.0.0.0", port=7860, reload=True)
import os
import tempfile
from typing import List
from fastapi import FastAPI, UploadFile, File, Form
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from fastapi import Body
import traceback
from typing import TypedDict, Dict, Any
# === LangGraph ===
from langgraph.graph import StateGraph
# from langgraph.checkpoint import MemorySaver
# === Service Imports ===
from services.extract_text import extract_text_from_file, extract_images_with_fitz
from services.extract_table import extract_tables_from_file
# from services.vector_store import get_entry, upsert_entry # ❌ Disabled cache
from services.s3_utils import upload_to_s3
# === FastAPI Init ===
api = FastAPI()
# === Root Health Check ===
@api.get("/")
async def root():
return {"status": "ok", "message": "Space is running"}
# === Shared helpers ===
def save_temp_file(file: UploadFile) -> str:
tmp = tempfile.NamedTemporaryFile(delete=False)
tmp.write(file.file.read())
tmp.flush()
upload_to_s3(tmp.name, f"documents/{file.filename}")
print(f"πŸ“€ Uploaded {file.filename} to S3")
return tmp.name
# === LangGraph Nodes ===
def extract_text_node(state):
filename = state["filename"]
path = state["temp_files"][filename]
start_page = state.get("start_page")
end_page = state.get("end_page")
with open(path, "rb") as fh:
state["text"] = extract_text_from_file(fh, start_page, end_page, filename)
return state
def extract_tables_node(state):
filename = state["filename"]
path = state["temp_files"][filename]
start_page = state.get("start_page")
end_page = state.get("end_page")
with open(path, "rb") as fh:
state["tables"] = extract_tables_from_file(fh, start_page, end_page, filename)
return state
node_map = {
"text": extract_text_node,
"table": extract_tables_node
}
# === Individual APIs ===
@api.post("/api/text")
async def extract_text_api(
file: UploadFile = File(...),
filename: str = Form(...),
start_page: int = Form(...),
end_page: int = Form(...),
):
# cache = get_entry(filename) or {} # ❌ disabled
path = save_temp_file(file)
with open(path, "rb") as fh:
text = extract_text_from_file(fh, start_page, end_page, filename)
os.remove(path)
# cache.pop("filename", None)
# upsert_entry(filename, **cache) # ❌ disabled
return {"text": text}
@api.post("/api/tables")
async def extract_table_api(
file: UploadFile = File(...),
filename: str = Form(...),
start_page: int = Form(...),
end_page: int = Form(...),
):
# cache = get_entry(filename) or {} # ❌ disabled
path = save_temp_file(file)
with open(path, "rb") as fh:
tables = extract_tables_from_file(fh, start_page, end_page, filename)
os.remove(path)
# cache.pop("filename", None)
# upsert_entry(filename, **cache) # ❌ disabled
return {"tables": tables}
if __name__ == "__main__":
import uvicorn
# ⬇️ Removed reload=True (causing shutdowns in Hugging Face Spaces)
uvicorn.run("app:app", host="0.0.0.0", port=7860)