| """ |
| FastAPI application for PDF redaction using NER |
| """ |
| from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks |
| from fastapi.responses import FileResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel |
| from typing import List, Optional, Dict |
| import uvicorn |
| import os |
| import uuid |
| import shutil |
| from pathlib import Path |
| import logging |
|
|
| from app.redaction import PDFRedactor |
| from client_supabase import supabase |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| app = FastAPI( |
| title="PDF Redaction API", |
| description="Redact sensitive information from PDFs using Named Entity Recognition", |
| version="1.0.0" |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| UPLOAD_DIR = Path("uploads") |
| OUTPUT_DIR = Path("outputs") |
| UPLOAD_DIR.mkdir(exist_ok=True) |
| OUTPUT_DIR.mkdir(exist_ok=True) |
|
|
| |
| redactor = PDFRedactor() |
|
|
| |
| |
| redaction_status: Dict[str, str] = {} |
|
|
| |
| class RedactionEntity(BaseModel): |
| entity_type: str |
| entity_text: str |
| page: int |
| word_count: int |
|
|
| class RedactionResponse(BaseModel): |
| job_id: str |
| status: str |
| message: str |
| entities: Optional[List[RedactionEntity]] = None |
| redacted_file_url: Optional[str] = None |
|
|
| class RedactionStatusResponse(BaseModel): |
| request_id: str |
| status: str |
| files: List[str] |
| message: str |
|
|
|
|
| class HealthResponse(BaseModel): |
| status: str |
| version: str |
| model_loaded: bool |
|
|
| |
| def get_public_url(bucket: str, storage_path: str) -> str: |
| return f"{os.getenv('SUPABASE_URL')}/storage/v1/object/public/{bucket}/{storage_path}" |
| def cleanup_files(job_id: str): |
| """Clean up temporary files after a delay""" |
| try: |
| upload_path = UPLOAD_DIR / f"{job_id}.pdf" |
| if upload_path.exists(): |
| upload_path.unlink() |
| logger.info(f"Cleaned up files for job {job_id}") |
| except Exception as e: |
| logger.error(f"Error cleaning up files for job {job_id}: {str(e)}") |
|
|
| def cleanup_temp_files(paths: List[Path]): |
| for path in paths: |
| if path.exists(): |
| path.unlink() |
|
|
| def download_file_from_supabase(bucket: str, storage_path: str, local_path: Path): |
| logger.info(f"Downloading {storage_path} to {local_path}") |
| data = supabase.storage.from_(bucket).download(storage_path) |
| if not data: |
| raise Exception(f"Failed to download {storage_path}") |
| with local_path.open("wb") as f: |
| f.write(data) |
|
|
| def upload_file_to_supabase(bucket: str, storage_path: str, local_path: Path): |
| logger.info(f"Uploading {local_path} to {storage_path}") |
|
|
| with local_path.open("rb") as f: |
| content = f.read() |
|
|
| supabase.storage.from_(bucket).upload( |
| path=storage_path, |
| file=content, |
| file_options={ |
| "upsert": "true", |
| "content-type": "application/pdf" |
| } |
| ) |
|
|
| def redact_request(request_id: str, bucket: str = "doc_storage"): |
| """Background task: redact all files for a given request_id""" |
| try: |
| redaction_status[request_id] = "processing" |
|
|
| |
| response = ( |
| supabase |
| .from_("request_files") |
| .select("id, storage_path") |
| .eq("request_id", request_id) |
| .execute() |
| ) |
|
|
| files = response.data |
|
|
| if not files: |
| raise Exception(f"No files found for request {request_id}") |
| if not files: |
| raise Exception(f"No files found for request {request_id}") |
|
|
| for file in files: |
| storage_path = file["storage_path"] |
| local_upload = UPLOAD_DIR / f"{uuid.uuid4()}.pdf" |
| local_output = OUTPUT_DIR / f"{uuid.uuid4()}_redacted.pdf" |
|
|
| |
| download_file_from_supabase(bucket, storage_path, local_upload) |
|
|
| |
| redactor.redact_document(pdf_path=str(local_upload), output_path=str(local_output)) |
|
|
| |
| upload_file_to_supabase(bucket, storage_path, local_output) |
|
|
| |
| cleanup_temp_files([local_upload, local_output]) |
|
|
| redaction_status[request_id] = "completed" |
|
|
| except Exception as e: |
| logger.error(f"Redaction failed for {request_id}: {str(e)}") |
| redaction_status[request_id] = "failed" |
|
|
| |
| @app.get("/", response_model=HealthResponse) |
| async def root(): |
| return HealthResponse( |
| status="healthy", |
| version="1.0.0", |
| model_loaded=redactor.is_model_loaded() |
| ) |
|
|
| @app.get("/health", response_model=HealthResponse) |
| async def health_check(): |
| return HealthResponse( |
| status="healthy", |
| version="1.0.0", |
| model_loaded=redactor.is_model_loaded() |
| ) |
|
|
| @app.post("/redact", response_model=RedactionResponse) |
| async def redact_pdf( |
| background_tasks: BackgroundTasks, |
| file: UploadFile = File(...), |
| dpi: int = 300, |
| entity_types: Optional[str] = None |
| ): |
| if not file.filename.endswith('.pdf'): |
| raise HTTPException(status_code=400, detail="Only PDF files are supported") |
| job_id = str(uuid.uuid4()) |
| upload_path = UPLOAD_DIR / f"{job_id}.pdf" |
| output_path = OUTPUT_DIR / f"{job_id}_redacted.pdf" |
| try: |
| with upload_path.open("wb") as buffer: |
| shutil.copyfileobj(file.file, buffer) |
|
|
| entity_filter = None |
| if entity_types: |
| entity_filter = [et.strip() for et in entity_types.split(',')] |
|
|
| result = redactor.redact_document( |
| pdf_path=str(upload_path), |
| output_path=str(output_path), |
| dpi=dpi, |
| entity_filter=entity_filter |
| ) |
|
|
| response_entities = [ |
| RedactionEntity( |
| entity_type=e['entity_type'], |
| entity_text=e['entity_text'], |
| page=e['words'][0]['page'] if e['words'] else 0, |
| word_count=len(e['words']) |
| ) for e in result['entities'] |
| ] |
|
|
| background_tasks.add_task(cleanup_files, job_id) |
|
|
| return RedactionResponse( |
| job_id=job_id, |
| status="completed", |
| message=f"Successfully redacted {len(result['entities'])} entities", |
| entities=response_entities, |
| redacted_file_url=f"/download/{job_id}" |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Error processing job {job_id}: {str(e)}") |
| if upload_path.exists(): |
| upload_path.unlink() |
| if output_path.exists(): |
| output_path.unlink() |
| raise HTTPException(status_code=500, detail=f"Error processing PDF: {str(e)}") |
|
|
| @app.get("/download/{job_id}") |
| async def download_redacted_pdf(job_id: str): |
| output_path = OUTPUT_DIR / f"{job_id}_redacted.pdf" |
| if not output_path.exists(): |
| raise HTTPException(status_code=404, detail="Redacted file not found") |
| return FileResponse( |
| path=output_path, |
| media_type="application/pdf", |
| filename=f"redacted_{job_id}.pdf" |
| ) |
|
|
| @app.delete("/cleanup/{job_id}") |
| async def cleanup_job(job_id: str): |
| try: |
| cleanup_files(job_id) |
| output_path = OUTPUT_DIR / f"{job_id}_redacted.pdf" |
| if output_path.exists(): |
| output_path.unlink() |
| return {"message": f"Successfully cleaned up files for job {job_id}"} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error cleaning up: {str(e)}") |
|
|
| @app.get("/stats") |
| async def get_stats(): |
| upload_count = len(list(UPLOAD_DIR.glob("*.pdf"))) |
| output_count = len(list(OUTPUT_DIR.glob("*.pdf"))) |
| return { |
| "pending_uploads": upload_count, |
| "processed_files": output_count, |
| "model_loaded": redactor.is_model_loaded() |
| } |
|
|
| |
| @app.post("/redact_by_request/{request_id}", response_model=RedactionStatusResponse) |
| async def redact_by_request(request_id: str, background_tasks: BackgroundTasks): |
| if redaction_status.get(request_id) == "processing": |
| return RedactionStatusResponse( |
| request_id=request_id, |
| status="processing", |
| files=[], |
| message="Redaction already in progress" |
| ) |
| redaction_status[request_id] = "pending" |
| background_tasks.add_task(redact_request, request_id) |
| return RedactionStatusResponse( |
| request_id=request_id, |
| status="pending", |
| files=[], |
| message="Redaction started in background" |
| ) |
|
|
| @app.get("/redaction_status/{request_id}", response_model=RedactionStatusResponse) |
| async def get_redaction_status(request_id: str): |
| status = redaction_status.get(request_id, "not_found") |
|
|
| |
| files: List[str] = [] |
|
|
| if status == "completed": |
| |
| response = ( |
| supabase |
| .from_("request_files") |
| .select("storage_path") |
| .eq("request_id", request_id) |
| .execute() |
| ) |
| |
| if response.data: |
| files = [ |
| get_public_url("doc_storage", row["storage_path"]) |
| for row in response.data |
| ] |
|
|
| message = ( |
| "Redaction completed" |
| if status == "completed" |
| else "Redaction pending" |
| if status == "pending" |
| else "Redaction failed" |
| if status == "failed" |
| else "Request not found" |
| ) |
|
|
| return RedactionStatusResponse( |
| request_id=request_id, |
| status=status, |
| files=files, |
| message=message |
| ) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |