p-ai / app.py
r3hab's picture
Update app.py
cff29f7 verified
from fastapi import FastAPI, File, UploadFile, HTTPException, Form
from fastapi.responses import FileResponse, StreamingResponse
from typing import List, Optional
import PyPDF2
import io
import os
import zipfile
from PIL import Image
app = FastAPI()
# Temporary directory to store uploaded files and the merged PDF
UPLOAD_FOLDER = "uploads"
MERGED_PDF_PATH = os.path.join(UPLOAD_FOLDER, "merged.pdf")
# Ensure the temporary directory exists
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
@app.post("/merge_pdfs/")
async def merge_pdfs(files: List[UploadFile] = File(...)):
if not files:
raise HTTPException(status_code=400, detail="No files uploaded")
pdf_merger = PyPDF2.PdfMerger()
for file in files:
if file.content_type != "application/pdf":
raise HTTPException(status_code=400, detail=f"Invalid file type: {file.filename} is not a PDF")
try:
pdf_content = io.BytesIO(await file.read())
pdf_merger.append(pdf_content)
except PyPDF2.errors.PdfReadError:
raise HTTPException(status_code=400, detail=f"Error reading PDF: {file.filename} may be corrupted")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing {file.filename}: {e}")
with open(MERGED_PDF_PATH, "wb") as output_file:
pdf_merger.write(output_file)
return FileResponse(
path=MERGED_PDF_PATH,
filename="merged.pdf",
media_type="application/pdf",
headers={"Content-Disposition": "attachment; filename=merged.pdf"}
)
@app.post("/split_pdf/")
async def split_pdf(file: UploadFile = File(...), split_points: Optional[str] = Form(None)):
if file.content_type != "application/pdf":
raise HTTPException(status_code=400, detail="Uploaded file is not a PDF")
if not split_points:
raise HTTPException(status_code=400, detail="No split points provided.")
try:
split_pages = sorted([int(x) for x in split_points.split(',') if x.strip()])
if not split_pages:
raise HTTPException(status_code=400, detail="Invalid split points provided.")
except ValueError:
raise HTTPException(status_code=400, detail="Invalid split points format. Please provide comma-separated numbers.")
pdf_reader = PyPDF2.PdfReader(io.BytesIO(await file.read()))
total_pages = len(pdf_reader.pages)
if any(page > total_pages or page <= 0 for page in split_pages):
raise HTTPException(status_code=400, detail=f"Split points must be within the range of pages (1 to {total_pages}).")
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
start_page = 0
file_index = 1
for i, split_page in enumerate(split_pages):
if split_page <= start_page:
raise HTTPException(status_code=400, detail="Split points must be in increasing order.")
pdf_writer = PyPDF2.PdfWriter()
for page_num in range(start_page, split_page):
if page_num < len(pdf_reader.pages):
pdf_writer.add_page(pdf_reader.pages[page_num])
split_filename = f"split_{file_index}_{file.filename.replace('.pdf', '')}.pdf"
temp_buffer = io.BytesIO()
pdf_writer.write(temp_buffer)
temp_buffer.seek(0)
zf.writestr(split_filename, temp_buffer.read())
start_page = split_page
file_index += 1
if start_page < total_pages:
pdf_writer = PyPDF2.PdfWriter()
for page_num in range(start_page, total_pages):
pdf_writer.add_page(pdf_reader.pages[page_num])
split_filename = f"split_{file_index}_{file.filename.replace('.pdf', '')}.pdf"
temp_buffer = io.BytesIO()
pdf_writer.write(temp_buffer)
temp_buffer.seek(0)
zf.writestr(split_filename, temp_buffer.read())
zip_buffer.seek(0)
return StreamingResponse(
io.BytesIO(zip_buffer.getvalue()),
media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename=split_pdfs_{file.filename.replace('.pdf', '')}.zip"}
)
@app.post("/rotate_pdf/")
async def rotate_pdf(
file: UploadFile = File(...),
rotation: int = Form(90),
page_numbers: Optional[str] = Form(None)
):
"""Rotates pages in a PDF document."""
if file.content_type != "application/pdf":
raise HTTPException(status_code=400, detail="Uploaded file is not a PDF")
if rotation not in [90, 180, 270]:
raise HTTPException(status_code=400, detail="Rotation must be 90, 180, or 270 degrees.")
try:
pdf_reader = PyPDF2.PdfReader(io.BytesIO(await file.read()))
pdf_writer = PyPDF2.PdfWriter()
total_pages = len(pdf_reader.pages)
pages_to_rotate = set()
if page_numbers:
for page_num_str in page_numbers.split(','):
try:
page_num = int(page_num_str.strip())
if 1 <= page_num <= total_pages:
pages_to_rotate.add(page_num - 1) # 0-based indexing
else:
raise HTTPException(status_code=400, detail=f"Invalid page number: {page_num}. Page numbers must be between 1 and {total_pages}.")
except ValueError:
raise HTTPException(status_code=400, detail="Invalid page numbers format. Please provide comma-separated numbers.")
else:
# Rotate all pages if no specific page numbers are provided
pages_to_rotate = set(range(total_pages))
for i, page in enumerate(pdf_reader.pages):
if i in pages_to_rotate:
page.rotate(rotation)
pdf_writer.add_page(page)
output_buffer = io.BytesIO()
pdf_writer.write(output_buffer)
output_buffer.seek(0)
return StreamingResponse(
output_buffer,
media_type="application/pdf",
headers={"Content-Disposition": f"attachment; filename=rotated_{file.filename}"}
)
except PyPDF2.errors.PdfReadError:
raise HTTPException(status_code=400, detail="Error reading PDF: The file may be corrupted.")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing PDF: {e}")
@app.post("/reorder_pdf/")
async def reorder_pdf(
file: UploadFile = File(...),
page_order: str = Form(...)
):
"""Reorders pages in a PDF document."""
if file.content_type != "application/pdf":
raise HTTPException(status_code=400, detail="Uploaded file is not a PDF")
try:
pdf_reader = PyPDF2.PdfReader(io.BytesIO(await file.read()))
pdf_writer = PyPDF2.PdfWriter()
total_pages = len(pdf_reader.pages)
ordered_pages = []
try:
ordered_pages = [int(x.strip()) - 1 for x in page_order.split(',')] # Convert to 0-based index
except ValueError:
raise HTTPException(status_code=400, detail="Invalid page order format. Please provide comma-separated numbers.")
if len(ordered_pages) != total_pages:
raise HTTPException(status_code=400, detail="The number of pages in the order does not match the total number of pages in the PDF.")
seen_indices = set()
for index in ordered_pages:
if not (0 <= index < total_pages):
raise HTTPException(status_code=400, detail=f"Invalid page number in order: {index + 1}. Page numbers must be between 1 and {total_pages}.")
if index in seen_indices:
raise HTTPException(status_code=400, detail=f"Duplicate page number in order: {index + 1}.")
seen_indices.add(index)
for page_index in ordered_pages:
pdf_writer.add_page(pdf_reader.pages[page_index])
output_buffer = io.BytesIO()
pdf_writer.write(output_buffer)
output_buffer.seek(0)
return StreamingResponse(
output_buffer,
media_type="application/pdf",
headers={"Content-Disposition": f"attachment; filename=reordered_{file.filename}"}
)
except PyPDF2.errors.PdfReadError:
raise HTTPException(status_code=400, detail="Error reading PDF: The file may be corrupted.")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing PDF: {e}")
@app.post("/images_to_pdf/")
async def images_to_pdf(files: List[UploadFile] = File(...)):
if not files:
raise HTTPException(status_code=400, detail="No files uploaded")
pdf_writer = PyPDF2.PdfWriter()
for file in files:
try:
img = Image.open(io.BytesIO(await file.read()))
img_buffer = io.BytesIO()
img.save(img_buffer, format="PDF")
img_buffer.seek(0)
pdf_reader = PyPDF2.PdfReader(img_buffer)
pdf_writer.add_page(pdf_reader.pages[0])
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing {file.filename}: {e}")
output_buffer = io.BytesIO()
pdf_writer.write(output_buffer)
output_buffer.seek(0)
return StreamingResponse(
output_buffer,
media_type="application/pdf",
headers={"Content-Disposition": "attachment; filename=images.pdf"}
)
@app.post("/extract_images/")
async def extract_images(file: UploadFile = File(...)):
if file.content_type != "application/pdf":
raise HTTPException(status_code=400, detail="Uploaded file is not a PDF")
pdf_reader = PyPDF2.PdfReader(io.BytesIO(await file.read()))
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for page_num, page in enumerate(pdf_reader.pages):
try:
resources = page['/Resources']
if '/XObject' in resources:
xobjects = resources['/XObject'].items()
for name, obj in xobjects:
if obj['/Subtype'] == '/Image':
image_data = pdf_reader.get_object(obj.objnr)
ext = image_data.get('/Filter', '/FlateDecode') # Try to get extension info
if '/DCTDecode' in ext:
ext = ".jpg"
elif '/JPXDecode' in ext:
ext = ".jp2"
elif '/FlateDecode' in ext:
if '/ColorSpace' in image_data and '/DeviceRGB' in image_data['/ColorSpace']:
ext = ".png" # Assuming PNG for FlateDecode with RGB
else:
ext = ".raw" # Generic raw if unsure
else:
ext = ".img" # Default extension
zf.writestr(f"page_{page_num + 1}_image_{name[1:]}{ext}", image_data.get_data())
except Exception as e:
print(f"Error extracting images from page {page_num + 1}: {e}")
zip_buffer.seek(0)
return StreamingResponse(
zip_buffer,
media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename=extracted_images_{file.filename.replace('.pdf', '')}.zip"}
)
@app.delete("/cleanup")
async def cleanup():
for filename in os.listdir(UPLOAD_FOLDER):
file_path = os.path.join(UPLOAD_FOLDER, filename)
try:
if os.path.isfile(file_path):
os.remove(file_path)
except Exception as e:
print(f"Error deleting file {filename}: {e}")
return {"message": "Temporary files cleaned up"}