from fastapi import FastAPI, File, UploadFile import pytesseract as tess from PIL import Image import io import os import pymupdf import zipfile from fastapi.responses import StreamingResponse from typing import List import re import logging app = FastAPI() # Configure logging logging.basicConfig(level=logging.DEBUG) # Create an uploads directory for text files UPLOADS_DIR = "uploads" if not os.path.exists(UPLOADS_DIR): try: os.makedirs(UPLOADS_DIR) except Exception as e: logging.error(f"Error creating directory {UPLOADS_DIR}: {e}") raise def extract_text_from_image(image_stream): image = Image.open(image_stream) text = tess.image_to_string(image) return text def extract_text_from_pdf(pdf_stream): pdf_document = pymupdf.open(stream=pdf_stream, filetype="pdf") text = "" for page_num in range(pdf_document.page_count): page = pdf_document.load_page(page_num) text += page.get_text() if not text.strip(): # If text is empty, the page might be an image pix = page.get_pixmap() image_stream = io.BytesIO(pix.pil_save()) text += extract_text_from_image(image_stream) return text def sanitize_filename(filename): # Remove special characters from the filename filename = re.sub(r'[^\w\-_\. ]', '_', filename) return filename @app.post("/upload-pdfs/") async def upload_pdfs(files: List[UploadFile] = File(...)): response_files = [] for file in files: pdf_content = await file.read() text = extract_text_from_pdf(pdf_content) sanitized_filename = sanitize_filename(file.filename) txt_filename = os.path.join(UPLOADS_DIR, f"{os.path.splitext(sanitized_filename)[0]}.txt") # Log filename and directory creation logging.debug(f"Writing to file: {txt_filename}") try: with open(txt_filename, "w", encoding="utf-8") as text_file: text_file.write(text) response_files.append(txt_filename) except Exception as e: logging.error(f"Error writing to file {txt_filename}: {e}") return {"error": f"Failed to write file {txt_filename}"} # Create a ZIP file of the text files zip_filename = os.path.join(UPLOADS_DIR, "converted_texts.zip") try: with zipfile.ZipFile(zip_filename, "w") as zip_file: for txt_filename in response_files: zip_file.write(txt_filename, os.path.basename(txt_filename)) os.remove(txt_filename) # Clean up the individual text files except Exception as e: logging.error(f"Error creating ZIP file {zip_filename}: {e}") return {"error": "Failed to create ZIP file"} return StreamingResponse( open(zip_filename, "rb"), media_type="application/zip", headers={"Content-Disposition": f"attachment; filename=converted_texts.zip"} )