| from fastapi import FastAPI, File, UploadFile, HTTPException, Form |
| from fastapi.responses import FileResponse, StreamingResponse |
| from typing import List, Optional |
| import PyPDF2 |
| import io |
| import os |
| import zipfile |
| from PIL import Image |
|
|
| app = FastAPI() |
|
|
| |
| UPLOAD_FOLDER = "uploads" |
| MERGED_PDF_PATH = os.path.join(UPLOAD_FOLDER, "merged.pdf") |
|
|
| |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
|
|
| @app.post("/merge_pdfs/") |
| async def merge_pdfs(files: List[UploadFile] = File(...)): |
| if not files: |
| raise HTTPException(status_code=400, detail="No files uploaded") |
|
|
| pdf_merger = PyPDF2.PdfMerger() |
|
|
| for file in files: |
| if file.content_type != "application/pdf": |
| raise HTTPException(status_code=400, detail=f"Invalid file type: {file.filename} is not a PDF") |
|
|
| try: |
| pdf_content = io.BytesIO(await file.read()) |
| pdf_merger.append(pdf_content) |
| except PyPDF2.errors.PdfReadError: |
| raise HTTPException(status_code=400, detail=f"Error reading PDF: {file.filename} may be corrupted") |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error processing {file.filename}: {e}") |
|
|
| with open(MERGED_PDF_PATH, "wb") as output_file: |
| pdf_merger.write(output_file) |
|
|
| return FileResponse( |
| path=MERGED_PDF_PATH, |
| filename="merged.pdf", |
| media_type="application/pdf", |
| headers={"Content-Disposition": "attachment; filename=merged.pdf"} |
| ) |
|
|
| @app.post("/split_pdf/") |
| async def split_pdf(file: UploadFile = File(...), split_points: Optional[str] = Form(None)): |
| if file.content_type != "application/pdf": |
| raise HTTPException(status_code=400, detail="Uploaded file is not a PDF") |
|
|
| if not split_points: |
| raise HTTPException(status_code=400, detail="No split points provided.") |
|
|
| try: |
| split_pages = sorted([int(x) for x in split_points.split(',') if x.strip()]) |
| if not split_pages: |
| raise HTTPException(status_code=400, detail="Invalid split points provided.") |
| except ValueError: |
| raise HTTPException(status_code=400, detail="Invalid split points format. Please provide comma-separated numbers.") |
|
|
| pdf_reader = PyPDF2.PdfReader(io.BytesIO(await file.read())) |
| total_pages = len(pdf_reader.pages) |
|
|
| if any(page > total_pages or page <= 0 for page in split_pages): |
| raise HTTPException(status_code=400, detail=f"Split points must be within the range of pages (1 to {total_pages}).") |
|
|
| zip_buffer = io.BytesIO() |
| with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: |
| start_page = 0 |
| file_index = 1 |
|
|
| for i, split_page in enumerate(split_pages): |
| if split_page <= start_page: |
| raise HTTPException(status_code=400, detail="Split points must be in increasing order.") |
|
|
| pdf_writer = PyPDF2.PdfWriter() |
| for page_num in range(start_page, split_page): |
| if page_num < len(pdf_reader.pages): |
| pdf_writer.add_page(pdf_reader.pages[page_num]) |
|
|
| split_filename = f"split_{file_index}_{file.filename.replace('.pdf', '')}.pdf" |
| temp_buffer = io.BytesIO() |
| pdf_writer.write(temp_buffer) |
| temp_buffer.seek(0) |
| zf.writestr(split_filename, temp_buffer.read()) |
|
|
| start_page = split_page |
| file_index += 1 |
|
|
| if start_page < total_pages: |
| pdf_writer = PyPDF2.PdfWriter() |
| for page_num in range(start_page, total_pages): |
| pdf_writer.add_page(pdf_reader.pages[page_num]) |
|
|
| split_filename = f"split_{file_index}_{file.filename.replace('.pdf', '')}.pdf" |
| temp_buffer = io.BytesIO() |
| pdf_writer.write(temp_buffer) |
| temp_buffer.seek(0) |
| zf.writestr(split_filename, temp_buffer.read()) |
|
|
| zip_buffer.seek(0) |
|
|
| return StreamingResponse( |
| io.BytesIO(zip_buffer.getvalue()), |
| media_type="application/zip", |
| headers={"Content-Disposition": f"attachment; filename=split_pdfs_{file.filename.replace('.pdf', '')}.zip"} |
| ) |
|
|
| @app.post("/rotate_pdf/") |
| async def rotate_pdf( |
| file: UploadFile = File(...), |
| rotation: int = Form(90), |
| page_numbers: Optional[str] = Form(None) |
| ): |
| """Rotates pages in a PDF document.""" |
| if file.content_type != "application/pdf": |
| raise HTTPException(status_code=400, detail="Uploaded file is not a PDF") |
|
|
| if rotation not in [90, 180, 270]: |
| raise HTTPException(status_code=400, detail="Rotation must be 90, 180, or 270 degrees.") |
|
|
| try: |
| pdf_reader = PyPDF2.PdfReader(io.BytesIO(await file.read())) |
| pdf_writer = PyPDF2.PdfWriter() |
|
|
| total_pages = len(pdf_reader.pages) |
| pages_to_rotate = set() |
|
|
| if page_numbers: |
| for page_num_str in page_numbers.split(','): |
| try: |
| page_num = int(page_num_str.strip()) |
| if 1 <= page_num <= total_pages: |
| pages_to_rotate.add(page_num - 1) |
| else: |
| raise HTTPException(status_code=400, detail=f"Invalid page number: {page_num}. Page numbers must be between 1 and {total_pages}.") |
| except ValueError: |
| raise HTTPException(status_code=400, detail="Invalid page numbers format. Please provide comma-separated numbers.") |
| else: |
| |
| pages_to_rotate = set(range(total_pages)) |
|
|
| for i, page in enumerate(pdf_reader.pages): |
| if i in pages_to_rotate: |
| page.rotate(rotation) |
| pdf_writer.add_page(page) |
|
|
| output_buffer = io.BytesIO() |
| pdf_writer.write(output_buffer) |
| output_buffer.seek(0) |
|
|
| return StreamingResponse( |
| output_buffer, |
| media_type="application/pdf", |
| headers={"Content-Disposition": f"attachment; filename=rotated_{file.filename}"} |
| ) |
|
|
| except PyPDF2.errors.PdfReadError: |
| raise HTTPException(status_code=400, detail="Error reading PDF: The file may be corrupted.") |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error processing PDF: {e}") |
|
|
| @app.post("/reorder_pdf/") |
| async def reorder_pdf( |
| file: UploadFile = File(...), |
| page_order: str = Form(...) |
| ): |
| """Reorders pages in a PDF document.""" |
| if file.content_type != "application/pdf": |
| raise HTTPException(status_code=400, detail="Uploaded file is not a PDF") |
|
|
| try: |
| pdf_reader = PyPDF2.PdfReader(io.BytesIO(await file.read())) |
| pdf_writer = PyPDF2.PdfWriter() |
| total_pages = len(pdf_reader.pages) |
| ordered_pages = [] |
|
|
| try: |
| ordered_pages = [int(x.strip()) - 1 for x in page_order.split(',')] |
| except ValueError: |
| raise HTTPException(status_code=400, detail="Invalid page order format. Please provide comma-separated numbers.") |
|
|
| if len(ordered_pages) != total_pages: |
| raise HTTPException(status_code=400, detail="The number of pages in the order does not match the total number of pages in the PDF.") |
|
|
| seen_indices = set() |
| for index in ordered_pages: |
| if not (0 <= index < total_pages): |
| raise HTTPException(status_code=400, detail=f"Invalid page number in order: {index + 1}. Page numbers must be between 1 and {total_pages}.") |
| if index in seen_indices: |
| raise HTTPException(status_code=400, detail=f"Duplicate page number in order: {index + 1}.") |
| seen_indices.add(index) |
|
|
| for page_index in ordered_pages: |
| pdf_writer.add_page(pdf_reader.pages[page_index]) |
|
|
| output_buffer = io.BytesIO() |
| pdf_writer.write(output_buffer) |
| output_buffer.seek(0) |
|
|
| return StreamingResponse( |
| output_buffer, |
| media_type="application/pdf", |
| headers={"Content-Disposition": f"attachment; filename=reordered_{file.filename}"} |
| ) |
|
|
| except PyPDF2.errors.PdfReadError: |
| raise HTTPException(status_code=400, detail="Error reading PDF: The file may be corrupted.") |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error processing PDF: {e}") |
|
|
| @app.post("/images_to_pdf/") |
| async def images_to_pdf(files: List[UploadFile] = File(...)): |
| if not files: |
| raise HTTPException(status_code=400, detail="No files uploaded") |
|
|
| pdf_writer = PyPDF2.PdfWriter() |
|
|
| for file in files: |
| try: |
| img = Image.open(io.BytesIO(await file.read())) |
| img_buffer = io.BytesIO() |
| img.save(img_buffer, format="PDF") |
| img_buffer.seek(0) |
| pdf_reader = PyPDF2.PdfReader(img_buffer) |
| pdf_writer.add_page(pdf_reader.pages[0]) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error processing {file.filename}: {e}") |
|
|
| output_buffer = io.BytesIO() |
| pdf_writer.write(output_buffer) |
| output_buffer.seek(0) |
|
|
| return StreamingResponse( |
| output_buffer, |
| media_type="application/pdf", |
| headers={"Content-Disposition": "attachment; filename=images.pdf"} |
| ) |
|
|
| @app.post("/extract_images/") |
| async def extract_images(file: UploadFile = File(...)): |
| if file.content_type != "application/pdf": |
| raise HTTPException(status_code=400, detail="Uploaded file is not a PDF") |
|
|
| pdf_reader = PyPDF2.PdfReader(io.BytesIO(await file.read())) |
| zip_buffer = io.BytesIO() |
|
|
| with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: |
| for page_num, page in enumerate(pdf_reader.pages): |
| try: |
| resources = page['/Resources'] |
| if '/XObject' in resources: |
| xobjects = resources['/XObject'].items() |
| for name, obj in xobjects: |
| if obj['/Subtype'] == '/Image': |
| image_data = pdf_reader.get_object(obj.objnr) |
| ext = image_data.get('/Filter', '/FlateDecode') |
|
|
| if '/DCTDecode' in ext: |
| ext = ".jpg" |
| elif '/JPXDecode' in ext: |
| ext = ".jp2" |
| elif '/FlateDecode' in ext: |
| if '/ColorSpace' in image_data and '/DeviceRGB' in image_data['/ColorSpace']: |
| ext = ".png" |
| else: |
| ext = ".raw" |
| else: |
| ext = ".img" |
|
|
| zf.writestr(f"page_{page_num + 1}_image_{name[1:]}{ext}", image_data.get_data()) |
| except Exception as e: |
| print(f"Error extracting images from page {page_num + 1}: {e}") |
|
|
| zip_buffer.seek(0) |
| return StreamingResponse( |
| zip_buffer, |
| media_type="application/zip", |
| headers={"Content-Disposition": f"attachment; filename=extracted_images_{file.filename.replace('.pdf', '')}.zip"} |
| ) |
|
|
| @app.delete("/cleanup") |
| async def cleanup(): |
| for filename in os.listdir(UPLOAD_FOLDER): |
| file_path = os.path.join(UPLOAD_FOLDER, filename) |
| try: |
| if os.path.isfile(file_path): |
| os.remove(file_path) |
| except Exception as e: |
| print(f"Error deleting file {filename}: {e}") |
| return {"message": "Temporary files cleaned up"} |