split-pdf / main.py
anujakkulkarni's picture
Upload main.py
6723bab verified
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import FileResponse
from PyPDF2 import PdfReader, PdfWriter
from typing import List, Optional
import os
from datetime import datetime, timedelta
from uuid import uuid4
import uvicorn
try:
from azure.storage.blob import (
BlobServiceClient,
ContentSettings,
BlobSasPermissions,
generate_blob_sas,
)
AZURE_AVAILABLE = True
except ImportError:
AZURE_AVAILABLE = False
app = FastAPI()
OUTPUT_FOLDER = "output_pdfs"
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
AZURE_STORAGE_CONNECTION_STRING = os.getenv(
"AZURE_STORAGE_CONNECTION_STRING", "").strip()
AZURE_STORAGE_ACCOUNT_NAME = os.getenv(
"AZURE_STORAGE_ACCOUNT_NAME", "").strip()
AZURE_STORAGE_ACCOUNT_KEY = os.getenv("AZURE_STORAGE_ACCOUNT_KEY", "").strip()
AZURE_CONTAINER_NAME = os.getenv(
"AZURE_CONTAINER_NAME", "invoice-splits").strip()
ROOT_FOLDER = os.getenv("ROOT_FOLDER", "POD").strip()
def sanitize_name(value: str) -> str:
safe = os.path.basename(value or "")
name_part, _ = os.path.splitext(safe)
safe = name_part or "uploaded"
safe = "".join(ch if ch.isalnum() or ch in (
"-", "_") else "_" for ch in safe)
return safe.strip("._") or "uploaded"
def get_blob_service_client():
if not AZURE_AVAILABLE:
raise HTTPException(
status_code=500, detail="azure-storage-blob is not installed")
if AZURE_STORAGE_CONNECTION_STRING:
return BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING)
if AZURE_STORAGE_ACCOUNT_NAME and AZURE_STORAGE_ACCOUNT_KEY:
account_url = f"https://{AZURE_STORAGE_ACCOUNT_NAME}.blob.core.windows.net"
return BlobServiceClient(account_url=account_url, credential=AZURE_STORAGE_ACCOUNT_KEY)
raise HTTPException(
status_code=500, detail="Azure storage credentials are not configured")
def ensure_container_exists(blob_service_client):
container_client = blob_service_client.get_container_client(
AZURE_CONTAINER_NAME)
if not container_client.exists():
container_client.create_container()
def build_blob_path(parts) -> str:
return "/".join(str(p).strip("/") for p in parts if str(p).strip("/")).replace("\\", "/")
def create_blob_url(blob_name: str) -> str:
base_url = (
f"https://{AZURE_STORAGE_ACCOUNT_NAME}.blob.core.windows.net/"
f"{AZURE_CONTAINER_NAME}/{blob_name}"
)
if AZURE_STORAGE_ACCOUNT_NAME and AZURE_STORAGE_ACCOUNT_KEY:
sas_token = generate_blob_sas(
account_name=AZURE_STORAGE_ACCOUNT_NAME,
account_key=AZURE_STORAGE_ACCOUNT_KEY,
container_name=AZURE_CONTAINER_NAME,
blob_name=blob_name,
permission=BlobSasPermissions(read=True),
expiry=datetime.utcnow() + timedelta(days=7),
)
return f"{base_url}?{sas_token}"
return base_url
def upload_pdf_to_blob(blob_service_client, local_path: str, blob_name: str) -> str:
blob_client = blob_service_client.get_blob_client(
container=AZURE_CONTAINER_NAME,
blob=blob_name,
)
with open(local_path, "rb") as file_data:
blob_client.upload_blob(
file_data,
overwrite=True,
content_settings=ContentSettings(content_type="application/pdf"),
)
return create_blob_url(blob_name)
def split_pdf(input_path, output_dir, pages_per_split=30):
reader = PdfReader(input_path)
total_pages = len(reader.pages)
blocks = []
split_index = 1
for start in range(0, total_pages, pages_per_split):
writer = PdfWriter()
end = min(start + pages_per_split, total_pages)
for page_num in range(start, end):
writer.add_page(reader.pages[page_num])
split_label = f"Split_{split_index}"
split_folder = os.path.join(output_dir, split_label)
split_raw_folder = os.path.join(split_folder, "Raw")
split_invoices_folder = os.path.join(split_folder, "Invoices")
os.makedirs(split_raw_folder, exist_ok=True)
os.makedirs(split_invoices_folder, exist_ok=True)
filename = f"{split_label}_{start+1}_to_{end}.pdf"
output_path = os.path.join(split_raw_folder, filename)
with open(output_path, "wb") as f:
writer.write(f)
blocks.append({
"split_id": split_label,
"split_index": split_index,
"start_page": start + 1,
"end_page": end,
"filename": filename,
"local_path": output_path,
"local_split_folder": split_folder,
"local_raw_folder": split_raw_folder,
"local_invoices_folder": split_invoices_folder,
})
split_index += 1
return blocks
@app.post("/split-pdf")
async def split_pdf_api(
files: Optional[List[UploadFile]] = File(None),
file: Optional[List[UploadFile]] = File(None),
):
job_id = f"{datetime.utcnow().strftime('%Y%m%d%H%M%S')}_{uuid4().hex[:8]}"
uploaded_files = []
if files:
uploaded_files.extend(files)
if file:
uploaded_files.extend(file)
if not uploaded_files:
raise HTTPException(status_code=400, detail="No files uploaded")
blob_service_client = get_blob_service_client()
ensure_container_exists(blob_service_client)
response_files = []
split_files_for_invoice_service = []
used_file_stems = set()
batch_split_counter = 1
for upload in uploaded_files:
safe_filename = os.path.basename(upload.filename or "uploaded.pdf")
base_stem = sanitize_name(safe_filename)
safe_file_stem = base_stem
suffix = 1
while safe_file_stem in used_file_stems:
suffix += 1
safe_file_stem = f"{base_stem}_{suffix}"
used_file_stems.add(safe_file_stem)
# Requested hierarchy:
# ROOT_FOLDER / {batch_id} / FileName / Raw
# ROOT_FOLDER / {batch_id} / FileName / Splitted / Split_n / Raw
# ROOT_FOLDER / {batch_id} / FileName / Splitted / Split_n / Invoices
batch_local_base = os.path.join(
OUTPUT_FOLDER, job_id, safe_file_stem)
raw_job_folder = os.path.join(batch_local_base, "Raw")
split_job_folder = os.path.join(batch_local_base, "Splitted")
os.makedirs(raw_job_folder, exist_ok=True)
os.makedirs(split_job_folder, exist_ok=True)
input_path = os.path.join(raw_job_folder, safe_filename)
with open(input_path, "wb") as f:
f.write(await upload.read())
blocks = split_pdf(input_path, split_job_folder)
raw_blob_name = build_blob_path([
ROOT_FOLDER,
job_id,
safe_file_stem,
"Raw",
safe_filename,
])
raw_blob_url = upload_pdf_to_blob(
blob_service_client, input_path, raw_blob_name)
split_files = []
for block in blocks:
unique_split_id = f"Split_{batch_split_counter}"
batch_split_counter += 1
split_blob_name = build_blob_path([
ROOT_FOLDER,
job_id,
safe_file_stem,
"Splitted",
unique_split_id,
"Raw",
block["filename"],
])
split_invoices_blob_folder = build_blob_path([
ROOT_FOLDER,
job_id,
safe_file_stem,
"Splitted",
unique_split_id,
"Invoices",
])
split_blob_url = upload_pdf_to_blob(
blob_service_client,
block["local_path"],
split_blob_name,
)
split_files_for_invoice_service.append({
"batch_id": job_id,
"file_name": safe_file_stem,
"split_id": unique_split_id,
"split_raw_blob_path": split_blob_name,
"split_raw_url": split_blob_url,
"target_invoices_blob_folder": split_invoices_blob_folder,
"app3_form_data": {
"batch_id": job_id,
"target_invoices_blob_folder": split_invoices_blob_folder,
"split_id": unique_split_id,
"file_name": safe_file_stem,
},
})
split_files.append({
"split_id": unique_split_id,
"pages": f"{block['start_page']}-{block['end_page']}",
"Raw": {
"filename": block["filename"],
"blob_path": split_blob_name,
"url": split_blob_url,
},
"Invoices": [],
})
response_files.append({
"file_name": safe_file_stem,
"Raw": {
"original_file": {
"filename": safe_filename,
"blob_path": raw_blob_name,
"url": raw_blob_url,
}
},
"Splitted": split_files,
})
return {
"blob_folder": ROOT_FOLDER,
"batch_id": job_id,
"files": response_files,
"next_service": {
"service": "splitpdffile (2).py",
"handoff": split_files_for_invoice_service,
},
}
@app.get("/pdfs/{job_id}/{filename}")
def get_pdf(job_id: str, filename: str):
file_path = os.path.join(OUTPUT_FOLDER, job_id)
for root, _, files in os.walk(file_path):
if filename in files:
return FileResponse(os.path.join(root, filename))
raise HTTPException(status_code=404, detail="File not found")
@app.get("/raw-pdfs/{job_id}/{filename}")
def get_raw_pdf(job_id: str, filename: str):
file_path = os.path.join(OUTPUT_FOLDER, job_id)
for root, _, files in os.walk(file_path):
if os.path.basename(root).lower() == "raw" and filename in files:
return FileResponse(os.path.join(root, filename))
raise HTTPException(status_code=404, detail="File not found")
if __name__ == "__main__":
uvicorn.run("main:app", host="127.0.0.1", port=8001, reload=False)