|
|
|
|
|
import os |
|
|
import zipfile |
|
|
import uuid |
|
|
import subprocess |
|
|
import difflib |
|
|
import io |
|
|
import pdfplumber |
|
|
import pandas as pd |
|
|
from pypdf import PdfWriter, PdfReader, Transformation |
|
|
from pdf2image import convert_from_path |
|
|
from pdf2docx import Converter |
|
|
from PIL import Image |
|
|
from pptx import Presentation |
|
|
from pptx.util import Inches |
|
|
|
|
|
from reportlab.lib.pagesizes import A4, letter |
|
|
from reportlab.lib import colors |
|
|
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer |
|
|
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle |
|
|
from reportlab.lib.enums import TA_JUSTIFY |
|
|
from reportlab.pdfgen import canvas |
|
|
from reportlab.lib.units import inch |
|
|
|
|
|
from config import TEMP_DIR |
|
|
|
|
|
class PDFEngine: |
|
|
|
|
|
|
|
|
@staticmethod |
|
|
def _get_output_path(filename: str) -> str: |
|
|
unique_name = f"{uuid.uuid4().hex[:8]}_{filename}" |
|
|
return os.path.join(TEMP_DIR, unique_name) |
|
|
|
|
|
def get_pdf_info(self, file_path: str) -> dict: |
|
|
try: |
|
|
reader = PdfReader(file_path) |
|
|
meta = reader.metadata |
|
|
title = meta.title if meta and meta.title else "Sin título" |
|
|
return {"pages": len(reader.pages), "name": os.path.basename(file_path), "title": title} |
|
|
except: return {"pages": 0, "name": "Error", "title": ""} |
|
|
|
|
|
def _parse_range_groups(self, range_str: str, max_pages: int) -> list: |
|
|
groups = [] |
|
|
parts = range_str.split(',') |
|
|
for part in parts: |
|
|
part = part.strip() |
|
|
if not part: continue |
|
|
current_group = [] |
|
|
if '-' in part: |
|
|
try: |
|
|
start, end = map(int, part.split('-')) |
|
|
start = max(1, start) |
|
|
end = min(max_pages, end) |
|
|
if start <= end: current_group = list(range(start - 1, end)) |
|
|
except ValueError: continue |
|
|
else: |
|
|
try: |
|
|
p = int(part) |
|
|
if 1 <= p <= max_pages: current_group = [p - 1] |
|
|
except ValueError: continue |
|
|
if current_group: groups.append({"label": part, "indices": current_group}) |
|
|
return groups |
|
|
|
|
|
|
|
|
def generate_preview(self, f, p): |
|
|
try: |
|
|
imgs = convert_from_path(f, first_page=p, last_page=p, size=(None, 400)) |
|
|
if imgs: |
|
|
out = self._get_output_path(f"preview_pg{p}.jpg") |
|
|
imgs[0].save(out, "JPEG") |
|
|
return out |
|
|
except: return None |
|
|
|
|
|
def get_rotated_preview(self, f, a): |
|
|
if not f: return None |
|
|
try: |
|
|
imgs = convert_from_path(f, first_page=1, last_page=1, size=(None, 500)) |
|
|
if not imgs: return None |
|
|
img = imgs[0] |
|
|
if a != 0: img = img.rotate(-a, expand=True) |
|
|
out = self._get_output_path(f"rot_prev_{a}.jpg") |
|
|
img.save(out, "JPEG") |
|
|
return out |
|
|
except: return None |
|
|
|
|
|
def get_preview_indices_from_string(self, range_str: str, max_pages: int) -> list: |
|
|
key_pages = [] |
|
|
parts = range_str.split(',') |
|
|
for part in parts: |
|
|
part = part.strip() |
|
|
if '-' in part: |
|
|
try: |
|
|
s, e = map(int, part.split('-')) |
|
|
key_pages.extend([max(1, min(s, max_pages)), max(1, min(e, max_pages))]) |
|
|
except ValueError: continue |
|
|
else: |
|
|
try: |
|
|
p = int(part) |
|
|
if 1 <= p <= max_pages: key_pages.append(p) |
|
|
except ValueError: continue |
|
|
return sorted(list(set(key_pages))) |
|
|
|
|
|
|
|
|
|
|
|
def merge_pdfs(self, file_paths: list, order_indices: list = None, use_numbering: bool = False) -> str: |
|
|
if not file_paths: raise ValueError("No hay archivos.") |
|
|
|
|
|
ordered = [] |
|
|
if order_indices and len(order_indices) == len(file_paths): |
|
|
try: ordered = [file_paths[int(i)] for i in order_indices] |
|
|
except: ordered = file_paths |
|
|
else: ordered = file_paths |
|
|
|
|
|
m = PdfWriter() |
|
|
for p in ordered: m.append(p) |
|
|
|
|
|
temp_out = self._get_output_path("temp_unido.pdf") |
|
|
with open(temp_out, "wb") as f: m.write(f) |
|
|
|
|
|
if use_numbering: |
|
|
final_out = self._add_page_numbers(temp_out) |
|
|
try: os.remove(temp_out) |
|
|
except: pass |
|
|
return final_out |
|
|
|
|
|
return temp_out |
|
|
|
|
|
def _add_page_numbers(self, file_path: str) -> str: |
|
|
reader = PdfReader(file_path) |
|
|
writer = PdfWriter() |
|
|
num_pages = len(reader.pages) |
|
|
|
|
|
for i, page in enumerate(reader.pages): |
|
|
packet = io.BytesIO() |
|
|
can = canvas.Canvas(packet, pagesize=letter) |
|
|
page_width = float(page.mediabox.width) |
|
|
text = f"Página {i+1} de {num_pages}" |
|
|
can.setFont("Helvetica", 10) |
|
|
can.drawCentredString(page_width / 2.0, 20, text) |
|
|
can.save() |
|
|
|
|
|
packet.seek(0) |
|
|
new_pdf = PdfReader(packet) |
|
|
page.merge_page(new_pdf.pages[0]) |
|
|
writer.add_page(page) |
|
|
|
|
|
out = self._get_output_path("unido_numerado.pdf") |
|
|
with open(out, "wb") as f: writer.write(f) |
|
|
return out |
|
|
|
|
|
def add_watermark(self, file_path: str, text: str) -> str: |
|
|
if not file_path or not text: raise ValueError("Falta archivo o texto.") |
|
|
|
|
|
reader = PdfReader(file_path) |
|
|
writer = PdfWriter() |
|
|
|
|
|
packet = io.BytesIO() |
|
|
can = canvas.Canvas(packet, pagesize=letter) |
|
|
can.setFont("Helvetica-Bold", 50) |
|
|
can.setFillColorRGB(0.5, 0.5, 0.5, 0.3) |
|
|
|
|
|
can.saveState() |
|
|
can.translate(300, 400) |
|
|
can.rotate(45) |
|
|
can.drawCentredString(0, 0, text) |
|
|
can.restoreState() |
|
|
can.save() |
|
|
|
|
|
packet.seek(0) |
|
|
watermark_pdf = PdfReader(packet) |
|
|
watermark_page = watermark_pdf.pages[0] |
|
|
|
|
|
for page in reader.pages: |
|
|
page.merge_page(watermark_page) |
|
|
writer.add_page(page) |
|
|
|
|
|
out = self._get_output_path("marca_agua.pdf") |
|
|
with open(out, "wb") as f: writer.write(f) |
|
|
return out |
|
|
|
|
|
def repair_pdf(self, file_path: str) -> str: |
|
|
if not file_path: raise ValueError("Falta archivo.") |
|
|
|
|
|
out = self._get_output_path("reparado.pdf") |
|
|
cmd = [ |
|
|
"gs", |
|
|
"-o", out, |
|
|
"-sDEVICE=pdfwrite", |
|
|
"-dPDFSETTINGS=/default", |
|
|
"-dInteract=N", |
|
|
"-dNOPAUSE", "-dQUIET", "-dBATCH", |
|
|
file_path |
|
|
] |
|
|
|
|
|
try: |
|
|
subprocess.run(cmd, check=True) |
|
|
return out |
|
|
except subprocess.CalledProcessError: |
|
|
raise RuntimeError("Ghostscript no pudo reparar el archivo (daño severo).") |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Error sistema: {e}") |
|
|
|
|
|
def split_pdf_custom(self, file_path: str, range_str: str) -> str: |
|
|
if not file_path: raise ValueError("Falta archivo.") |
|
|
r = PdfReader(file_path) |
|
|
g = self._parse_range_groups(range_str, len(r.pages)) |
|
|
if not g: raise ValueError("Rango inválido.") |
|
|
gen = [] |
|
|
base = os.path.basename(file_path).replace(".pdf", "") |
|
|
for group in g: |
|
|
w = PdfWriter() |
|
|
for i in group["indices"]: w.add_page(r.pages[i]) |
|
|
safe = group["label"].replace(" ", "") |
|
|
p = self._get_output_path(f"{base}_part_{safe}.pdf") |
|
|
with open(p, "wb") as f: w.write(f) |
|
|
gen.append(p) |
|
|
zp = self._get_output_path(f"{base}_split.zip") |
|
|
with zipfile.ZipFile(zp, 'w') as z: |
|
|
for f in gen: z.write(f, arcname=os.path.basename(f)) |
|
|
return zp |
|
|
|
|
|
def reorder_pages(self, file_path: str, order_str: str) -> str: |
|
|
if not file_path: raise ValueError("Falta archivo.") |
|
|
r = PdfReader(file_path) |
|
|
g = self._parse_range_groups(order_str, len(r.pages)) |
|
|
if not g: raise ValueError("Orden inválido.") |
|
|
w = PdfWriter() |
|
|
flat = [i for group in g for i in group["indices"]] |
|
|
for i in flat: w.add_page(r.pages[i]) |
|
|
out = self._get_output_path("reordenado.pdf") |
|
|
with open(out, "wb") as f: w.write(f) |
|
|
return out |
|
|
|
|
|
def compress_pdf(self, file_path: str, power: int = 3) -> str: |
|
|
if not file_path: raise ValueError("Falta archivo.") |
|
|
q = {1: "/prepress", 3: "/ebook", 4: "/screen"} |
|
|
gs_set = q.get(power, "/ebook") |
|
|
out = self._get_output_path("comprimido.pdf") |
|
|
cmd = ["gs", "-sDEVICE=pdfwrite", "-dCompatibilityLevel=1.4", f"-dPDFSETTINGS={gs_set}", "-dNOPAUSE", "-dQUIET", "-dBATCH", f"-sOutputFile={out}", file_path] |
|
|
try: |
|
|
subprocess.run(cmd, check=True) |
|
|
return out |
|
|
except: raise RuntimeError("Error comprimiendo (Ghostscript).") |
|
|
|
|
|
def protect_pdf(self, file_path: str, password: str) -> str: |
|
|
if not file_path or not password: raise ValueError("Faltan datos.") |
|
|
try: |
|
|
r = PdfReader(file_path) |
|
|
w = PdfWriter() |
|
|
for p in r.pages: w.add_page(p) |
|
|
w.encrypt(password) |
|
|
out = self._get_output_path("protegido.pdf") |
|
|
with open(out, "wb") as f: w.write(f) |
|
|
return out |
|
|
except Exception as e: raise RuntimeError(f"Error: {e}") |
|
|
|
|
|
def rotate_pdf(self, file_path: str, angle: int) -> str: |
|
|
if not file_path: raise ValueError("Falta archivo.") |
|
|
try: |
|
|
r = PdfReader(file_path) |
|
|
w = PdfWriter() |
|
|
for p in r.pages: |
|
|
p.rotate(angle) |
|
|
w.add_page(p) |
|
|
out = self._get_output_path(f"rotado_{angle}.pdf") |
|
|
with open(out, "wb") as f: w.write(f) |
|
|
return out |
|
|
except Exception as e: raise RuntimeError(f"Error: {e}") |
|
|
|
|
|
def update_metadata(self, f, t, a, s): |
|
|
if not f: raise ValueError("Falta archivo.") |
|
|
try: |
|
|
r = PdfReader(f) |
|
|
w = PdfWriter() |
|
|
for p in r.pages: w.add_page(p) |
|
|
w.add_metadata({"/Title": t, "/Author": a, "/Subject": s, "/Producer": "OpenPDF Tools"}) |
|
|
out = self._get_output_path("meta.pdf") |
|
|
with open(out, "wb") as outf: w.write(outf) |
|
|
return out |
|
|
except Exception as e: raise RuntimeError(f"Error: {e}") |
|
|
|
|
|
def extract_text(self, f): |
|
|
if not f: raise ValueError("Falta archivo.") |
|
|
try: |
|
|
r = PdfReader(f) |
|
|
txts = [] |
|
|
for i, p in enumerate(r.pages): |
|
|
t = p.extract_text() |
|
|
if t: txts.append(f"--- Pág {i+1} ---\n{t}\n") |
|
|
out = self._get_output_path(os.path.basename(f).replace(".pdf", ".txt")) |
|
|
with open(out, "w", encoding="utf-8") as file: file.write("\n".join(txts)) |
|
|
return out |
|
|
except Exception as e: raise RuntimeError(f"Error: {e}") |
|
|
|
|
|
|
|
|
def compare_pdfs_text(self, path_a: str, path_b: str) -> str: |
|
|
if not path_a or not path_b: raise ValueError("Faltan archivos.") |
|
|
|
|
|
def get_all_words(path): |
|
|
try: |
|
|
reader = PdfReader(path) |
|
|
text = "" |
|
|
for page in reader.pages: |
|
|
extracted = page.extract_text() |
|
|
if extracted: text += extracted + " " |
|
|
return text.split() |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Error leyendo PDF: {e}") |
|
|
|
|
|
words_a = get_all_words(path_a) |
|
|
words_b = get_all_words(path_b) |
|
|
diff = difflib.ndiff(words_a, words_b) |
|
|
|
|
|
output_path = self._get_output_path("informe_diferencias_palabras.pdf") |
|
|
doc = SimpleDocTemplate(output_path, pagesize=A4) |
|
|
styles = getSampleStyleSheet() |
|
|
|
|
|
style_body = ParagraphStyle( |
|
|
'Body', |
|
|
parent=styles['BodyText'], |
|
|
alignment=TA_JUSTIFY, |
|
|
fontSize=11, |
|
|
leading=14 |
|
|
) |
|
|
|
|
|
story = [] |
|
|
story.append(Paragraph("Informe de Comparación (Modo Palabras)", styles['Heading1'])) |
|
|
story.append(Spacer(1, 12)) |
|
|
|
|
|
legend = '<b>Leyenda:</b> <font color="red"><strike>Eliminado</strike></font> | <font color="green"><b>Añadido</b></font> | Texto Común' |
|
|
story.append(Paragraph(legend, style_body)) |
|
|
story.append(Spacer(1, 12)) |
|
|
story.append(Paragraph(f"<b>A:</b> {os.path.basename(path_a)} | <b>B:</b> {os.path.basename(path_b)}", style_body)) |
|
|
story.append(Spacer(1, 12)) |
|
|
|
|
|
current_html = "" |
|
|
word_count = 0 |
|
|
|
|
|
for token in diff: |
|
|
code = token[:2] |
|
|
word = token[2:] |
|
|
safe_word = word.replace('&', '&').replace('<', '<').replace('>', '>') |
|
|
|
|
|
chunk = "" |
|
|
if code == '- ': |
|
|
chunk = f'<font color="red"><strike>{safe_word}</strike></font> ' |
|
|
elif code == '+ ': |
|
|
chunk = f'<font color="green"><b>{safe_word}</b></font> ' |
|
|
elif code == ' ': |
|
|
chunk = f'{safe_word} ' |
|
|
|
|
|
current_html += chunk |
|
|
word_count += 1 |
|
|
|
|
|
if word_count > 300 and code == ' ': |
|
|
story.append(Paragraph(current_html, style_body)) |
|
|
story.append(Spacer(1, 6)) |
|
|
current_html = "" |
|
|
word_count = 0 |
|
|
|
|
|
if current_html: |
|
|
story.append(Paragraph(current_html, style_body)) |
|
|
|
|
|
doc.build(story) |
|
|
return output_path |
|
|
|
|
|
|
|
|
def pdf_to_pptx(self, f): |
|
|
if not f: raise ValueError("Falta archivo.") |
|
|
try: |
|
|
imgs = convert_from_path(f, dpi=150) |
|
|
prs = Presentation() |
|
|
blank = 6 |
|
|
for i, img in enumerate(imgs): |
|
|
ip = self._get_output_path(f"slide_{i}.jpg") |
|
|
img.save(ip, "JPEG") |
|
|
slide = prs.slides.add_slide(prs.slide_layouts[blank]) |
|
|
slide.shapes.add_picture(ip, Inches(0), Inches(0), width=prs.slide_width) |
|
|
out = self._get_output_path(os.path.basename(f).replace(".pdf", ".pptx")) |
|
|
prs.save(out) |
|
|
return out |
|
|
except Exception as e: raise RuntimeError(f"Error PPTX: {e}") |
|
|
|
|
|
def pdf_to_word(self, f): |
|
|
if not f: raise ValueError("Falta archivo.") |
|
|
try: |
|
|
out = self._get_output_path(os.path.basename(f).replace(".pdf", ".docx")) |
|
|
cv = Converter(f) |
|
|
cv.convert(out, start=0, end=None) |
|
|
cv.close() |
|
|
return out |
|
|
except Exception as e: raise RuntimeError(f"Error Word: {e}") |
|
|
|
|
|
def pdf_to_images_zip(self, f): |
|
|
if not f: raise ValueError("Falta archivo.") |
|
|
try: |
|
|
imgs = convert_from_path(f, dpi=150) |
|
|
paths = [] |
|
|
base = os.path.basename(f).replace(".pdf", "") |
|
|
for i, img in enumerate(imgs): |
|
|
p = self._get_output_path(f"{base}_{i+1}.jpg") |
|
|
img.save(p, "JPEG") |
|
|
paths.append(p) |
|
|
zp = self._get_output_path(f"{base}_imgs.zip") |
|
|
with zipfile.ZipFile(zp, 'w') as z: |
|
|
for p in paths: z.write(p, arcname=os.path.basename(p)) |
|
|
return zp |
|
|
except: raise RuntimeError("Error imgs") |