| import time |
| import os |
| import tempfile |
| import shutil |
| import logging |
| import json |
| from fastapi import FastAPI, File, UploadFile, HTTPException |
| from fastapi.responses import StreamingResponse |
| from pypdf import PdfReader |
| from pypdf.generic import NameObject, IndirectObject |
| from commonforms import prepare_form |
|
|
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| app = FastAPI() |
|
|
| def pdf_page_generator(pdf_path: str): |
| """ |
| Generator function that yields JSON strings page by page. |
| """ |
| try: |
| reader = PdfReader(pdf_path) |
| except Exception as e: |
| logger.error(f"Failed to open PDF {pdf_path}: {e}") |
| yield json.dumps({"error": "Failed to open PDF", "details": str(e)}) + "\n" |
| return |
|
|
| if reader.is_encrypted: |
| try: |
| reader.decrypt("") |
| except: |
| yield json.dumps({"error": "File is encrypted"}) + "\n" |
| return |
|
|
| for page_idx, page in enumerate(reader.pages): |
| page_fields = [] |
| |
| try: |
| annots = page.get("/Annots") |
| if annots: |
| for annot_ref in annots: |
| if isinstance(annot_ref, IndirectObject): |
| annot = annot_ref.get_object() |
| else: |
| annot = annot_ref |
|
|
| if annot is None: continue |
| if annot.get("/Subtype") != NameObject("/Widget"): continue |
|
|
| rect = annot.get("/Rect") |
| field_type = annot.get("/FT") |
| field_name = annot.get("/T") |
|
|
| page_fields.append({ |
| "name": str(field_name) if field_name else None, |
| "ft": str(field_type) if field_type else None, |
| "bbox": { |
| "x0": float(rect[0]), "y0": float(rect[1]), |
| "x1": float(rect[2]), "y1": float(rect[3]), |
| } if rect and len(rect) == 4 else None, |
| }) |
| |
| result_chunk = { |
| "page": page_idx, |
| "status": "success", |
| "fields": page_fields |
| } |
| |
| yield json.dumps(result_chunk) + "\n" |
| |
| except Exception as e: |
| logger.warning(f"Error parsing page {page_idx}: {e}") |
| yield json.dumps({"page": page_idx, "status": "error", "msg": str(e)}) + "\n" |
|
|
| @app.post("/extract-fields-stream") |
| async def extract_fields_stream(file: UploadFile = File(...)): |
| input_tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") |
| output_tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") |
| |
| input_path = input_tmp.name |
| output_path = output_tmp.name |
|
|
| try: |
| with input_tmp as f: |
| shutil.copyfileobj(file.file, f) |
| await file.close() |
|
|
| prepare_form(input_path, output_path, model_or_path="FFDNet-L",confidence=0.3) |
| |
| |
|
|
| if not os.path.exists(output_path) or os.path.getsize(output_path) == 0: |
| raise HTTPException(status_code=500, detail="Processing failed.") |
|
|
| def cleanup_wrapper(): |
| try: |
| yield from pdf_page_generator(output_path) |
| finally: |
| for path in [input_path, output_path]: |
| if os.path.exists(path): |
| try: os.remove(path) |
| except: pass |
| |
| return StreamingResponse( |
| cleanup_wrapper(), |
| media_type="application/x-ndjson" |
| ) |
|
|
| except Exception as e: |
| if os.path.exists(input_path): os.remove(input_path) |
| if os.path.exists(output_path): os.remove(output_path) |
| raise HTTPException(status_code=500, detail=str(e)) |