|
|
from fastapi import FastAPI, File, UploadFile |
|
|
import pytesseract as tess |
|
|
from PIL import Image |
|
|
import io |
|
|
import os |
|
|
import pymupdf |
|
|
import zipfile |
|
|
from fastapi.responses import StreamingResponse |
|
|
from typing import List |
|
|
import re |
|
|
import logging |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG) |
|
|
|
|
|
|
|
|
UPLOADS_DIR = "uploads" |
|
|
if not os.path.exists(UPLOADS_DIR): |
|
|
try: |
|
|
os.makedirs(UPLOADS_DIR) |
|
|
except Exception as e: |
|
|
logging.error(f"Error creating directory {UPLOADS_DIR}: {e}") |
|
|
raise |
|
|
|
|
|
def extract_text_from_image(image_stream): |
|
|
image = Image.open(image_stream) |
|
|
text = tess.image_to_string(image) |
|
|
return text |
|
|
|
|
|
def extract_text_from_pdf(pdf_stream): |
|
|
pdf_document = pymupdf.open(stream=pdf_stream, filetype="pdf") |
|
|
text = "" |
|
|
for page_num in range(pdf_document.page_count): |
|
|
page = pdf_document.load_page(page_num) |
|
|
text += page.get_text() |
|
|
if not text.strip(): |
|
|
pix = page.get_pixmap() |
|
|
image_stream = io.BytesIO(pix.pil_save()) |
|
|
text += extract_text_from_image(image_stream) |
|
|
return text |
|
|
|
|
|
def sanitize_filename(filename): |
|
|
|
|
|
filename = re.sub(r'[^\w\-_\. ]', '_', filename) |
|
|
return filename |
|
|
|
|
|
@app.post("/upload-pdfs/") |
|
|
async def upload_pdfs(files: List[UploadFile] = File(...)): |
|
|
response_files = [] |
|
|
for file in files: |
|
|
pdf_content = await file.read() |
|
|
text = extract_text_from_pdf(pdf_content) |
|
|
sanitized_filename = sanitize_filename(file.filename) |
|
|
txt_filename = os.path.join(UPLOADS_DIR, f"{os.path.splitext(sanitized_filename)[0]}.txt") |
|
|
|
|
|
|
|
|
logging.debug(f"Writing to file: {txt_filename}") |
|
|
|
|
|
try: |
|
|
with open(txt_filename, "w", encoding="utf-8") as text_file: |
|
|
text_file.write(text) |
|
|
response_files.append(txt_filename) |
|
|
except Exception as e: |
|
|
logging.error(f"Error writing to file {txt_filename}: {e}") |
|
|
return {"error": f"Failed to write file {txt_filename}"} |
|
|
|
|
|
|
|
|
zip_filename = os.path.join(UPLOADS_DIR, "converted_texts.zip") |
|
|
try: |
|
|
with zipfile.ZipFile(zip_filename, "w") as zip_file: |
|
|
for txt_filename in response_files: |
|
|
zip_file.write(txt_filename, os.path.basename(txt_filename)) |
|
|
os.remove(txt_filename) |
|
|
except Exception as e: |
|
|
logging.error(f"Error creating ZIP file {zip_filename}: {e}") |
|
|
return {"error": "Failed to create ZIP file"} |
|
|
|
|
|
return StreamingResponse( |
|
|
open(zip_filename, "rb"), |
|
|
media_type="application/zip", |
|
|
headers={"Content-Disposition": f"attachment; filename=converted_texts.zip"} |
|
|
) |
|
|
|