pdfclean / pdfclean.py
hjbarraza's picture
Upload pdfclean.py with huggingface_hub
f5accf1 verified
#!/usr/bin/env python3
"""
pdfclean.py — Remove watermarks and footers from academic PDFs (any publisher)
Edits PDF content streams directly to surgically remove rotated watermarks and
bottom-of-page footer text without affecting body content.
Uses repetition-based detection: text that repeats at the same Y-position on >50%
of pages = footer (works with any publisher, no hardcoded patterns).
Usage: python pdfclean.py document.pdf # saves document-clean.pdf
python pdfclean.py document.pdf -o output.pdf # custom output path
python pdfclean.py document.pdf --dry-run # scan only, don't modify
"""
import argparse
import re
import sys
import os
import time
import shutil
from collections import defaultdict
try:
import fitz
except ImportError:
print("Missing dependency: pip install pymupdf")
sys.exit(1)
# ============================================================================
# TUI — minimal progress dashboard
# ============================================================================
STEPS = ['Decrypt', 'Scan', 'Clean', 'Verify', 'Export']
DIM = '\033[2m'
BOLD = '\033[1m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
CYAN = '\033[36m'
RESET = '\033[0m'
class Dashboard:
"""Minimal TUI progress dashboard with step tracking and time estimates."""
def __init__(self, filename, total_pages, steps=None):
self.filename = os.path.basename(filename)
self.total_pages = total_pages
self.steps = steps or STEPS
self.current_step = 0
self.step_label = ''
self.start_time = time.time()
self.step_start = time.time()
self.step_times = {} # step_idx → elapsed seconds
self.sub_progress = 0.0 # 0.0–1.0 within current step
self.detail = '' # one-line detail text
self.findings = [] # detection results to display
self.warnings = [] # verification warnings
self.cols = min(shutil.get_terminal_size().columns, 80)
def set_step(self, idx, label=''):
"""Move to a new pipeline step."""
if self.current_step < len(self.steps):
self.step_times[self.current_step] = time.time() - self.step_start
self.current_step = idx
self.step_label = label
self.step_start = time.time()
self.sub_progress = 0.0
self.detail = ''
self._render()
def progress(self, current, total, detail=''):
"""Update sub-progress within current step."""
self.sub_progress = current / total if total > 0 else 1.0
self.detail = detail
self._render()
def add_finding(self, text):
self.findings.append(text)
self._render()
def add_warning(self, text):
self.warnings.append(text)
self._render()
def finish(self, output_path, md_path=None):
"""Render final summary."""
self.step_times[self.current_step] = time.time() - self.step_start
elapsed = time.time() - self.start_time
sys.stderr.write('\033[2J\033[H') # clear screen
# Header
w = self.cols
print(f"{BOLD}{'─' * w}{RESET}")
print(f"{BOLD} pdfclean{RESET} {DIM}{self.filename} · {self.total_pages} pages{RESET}")
print(f"{BOLD}{'─' * w}{RESET}")
# Steps with times
for i, step in enumerate(self.steps):
t = self.step_times.get(i)
if t is not None and t > 0.001:
mark = f'{GREEN}{RESET}'
time_str = f'{DIM}{t:.1f}s{RESET}'
elif t is not None:
mark = f'{DIM}{RESET}'
time_str = f'{DIM}skip{RESET}'
else:
mark = f'{DIM}{RESET}'
time_str = ''
print(f" {mark} {step:<12} {time_str}")
print()
# Findings
if self.findings:
for f in self.findings:
print(f" {f}")
print()
# Warnings
if self.warnings:
for warn in self.warnings:
print(f" {YELLOW}{warn}{RESET}")
print()
elif any(self.step_times.get(i, 0) > 0.001 for i in [2]): # clean step ran
print(f" {GREEN}✓ all clean, all content intact{RESET}")
print()
# Output
print(f" {BOLD}pdf{RESET} {output_path}")
if md_path:
print(f" {BOLD}md {RESET} {md_path}")
# Timing
print(f"\n{DIM} done in {elapsed:.1f}s{RESET}")
print(f"{BOLD}{'─' * w}{RESET}")
def _render(self):
"""Redraw the dashboard to stderr."""
elapsed = time.time() - self.start_time
step_elapsed = time.time() - self.step_start
# Estimate remaining: average step time × remaining steps
completed_times = [v for v in self.step_times.values() if v > 0.001]
if completed_times and self.current_step < len(self.steps):
avg = sum(completed_times) / len(completed_times)
remaining_steps = len(self.steps) - self.current_step - 1
step_remaining = step_elapsed * (1 - self.sub_progress) / max(self.sub_progress, 0.01)
eta = step_remaining + (avg * remaining_steps)
else:
eta = 0
sys.stderr.write('\033[2J\033[H') # clear screen
w = self.cols
# Header
print(f"{BOLD}{'─' * w}{RESET}", file=sys.stderr)
print(f"{BOLD} pdfclean{RESET} {DIM}{self.filename} · {self.total_pages} pages{RESET}", file=sys.stderr)
print(f"{BOLD}{'─' * w}{RESET}", file=sys.stderr)
# Step indicators
for i, step in enumerate(self.steps):
if i < self.current_step:
t = self.step_times.get(i, 0)
if t > 0.001:
mark = f'{GREEN}{RESET}'
time_str = f'{DIM}{t:.1f}s{RESET}'
else:
mark = f'{DIM}{RESET}'
time_str = f'{DIM}skip{RESET}'
elif i == self.current_step:
mark = f'{CYAN}{RESET}'
time_str = f'{DIM}{step_elapsed:.1f}s{RESET}'
else:
mark = f'{DIM}·{RESET}'
time_str = ''
label = self.step_label if i == self.current_step and self.step_label else step
print(f" {mark} {label:<12} {time_str}", file=sys.stderr)
print(file=sys.stderr)
# Progress bar for current step
if self.sub_progress > 0:
bar_w = w - 20
filled = int(self.sub_progress * bar_w)
pct = self.sub_progress * 100
bar = f"{'█' * filled}{'░' * (bar_w - filled)}"
print(f" {bar} {pct:5.1f}%", file=sys.stderr)
else:
print(f" {DIM}working...{RESET}", file=sys.stderr)
# Detail line
if self.detail:
detail_max = w - 4
d = self.detail[:detail_max]
print(f" {DIM}{d}{RESET}", file=sys.stderr)
print(file=sys.stderr)
# Findings so far
for f in self.findings:
print(f" {f}", file=sys.stderr)
# Timing footer
eta_str = f'~{eta:.0f}s left' if eta > 1 else ''
print(f"\n{DIM} elapsed {elapsed:.1f}s {eta_str}{RESET}", file=sys.stderr)
print(f"{BOLD}{'─' * w}{RESET}", file=sys.stderr)
sys.stderr.flush()
class NullDashboard(Dashboard):
"""Silent dashboard for headless/web usage."""
def __init__(self, filename, total_pages, steps=None):
super().__init__(filename, total_pages, steps)
def _render(self):
pass
def finish(self, output_path, md_path=None):
pass
# ============================================================================
# PRINT-TO-PDF — flatten encrypted PDFs
# ============================================================================
def print_to_pdf(input_path, dash):
"""Re-print an encrypted/restricted PDF into an unencrypted text-based copy."""
src = fitz.open(input_path)
dst = fitz.open()
total = len(src)
dst.insert_pdf(src, from_page=0, to_page=total - 1)
for i in range(total):
dash.progress(i + 1, total, f'page {i+1}/{total}')
base, ext = os.path.splitext(input_path)
printed_path = f"{base}-printed{ext}"
dst.save(printed_path, garbage=4, deflate=True)
dst.close()
src.close()
return printed_path
# ============================================================================
# DETECTION — find watermarks by text properties, footers by repetition
# ============================================================================
def detect_watermarks(doc):
"""Scan all pages for large rotated text (watermarks)."""
findings = {
'watermark_texts': set(),
'watermark_pages': 0,
}
for i in range(len(doc)):
page = doc[i]
page_h = page.rect.height
blocks = page.get_text('dict')['blocks']
has_wm = False
for b in blocks:
if b['type'] != 0:
continue
for line in b['lines']:
for s in line['spans']:
text = s['text'].strip()
bbox = s['bbox']
height_span = bbox[3] - bbox[1]
# Rotated watermarks have height_span far exceeding font size
# (e.g. ratio > 3), normal titles are proportional (~1.2)
rotated = height_span > s['size'] * 3
if rotated and (s['size'] > 30 or height_span > page_h * 0.3) and text:
findings['watermark_texts'].add(text)
has_wm = True
if has_wm:
findings['watermark_pages'] += 1
return findings
def detect_footers(doc):
"""Detect footer zones by repetition analysis — publisher-agnostic."""
total = len(doc)
if total < 3:
return {'footer_zones': {}, 'footer_pages': 0, 'footer_texts': set()}
y_bands = defaultdict(list)
bottom_pct = 0.12
for i in range(total):
page = doc[i]
page_h = page.rect.height
threshold_y = page_h * (1 - bottom_pct)
blocks = page.get_text('dict')['blocks']
for b in blocks:
if b['type'] != 0:
continue
for line in b['lines']:
for s in line['spans']:
text = s['text'].strip()
if not text or len(text) < 3:
continue
bbox = s['bbox']
if bbox[1] < threshold_y:
continue
if len(text) < 5 and text.replace('-', '').replace('.', '').isdigit():
continue
y_bucket = round(bbox[1] / 5) * 5
y_bands[y_bucket].append({
'page': i, 'text': text,
'size': s['size'], 'bbox': bbox
})
min_pages = max(2, total * 0.5)
footer_zones = {}
for y_bucket, spans in y_bands.items():
pages_with_text = set(s['page'] for s in spans)
if len(pages_with_text) >= min_pages:
sample_texts = set(s['text'] for s in spans[:5])
avg_size = sum(s['size'] for s in spans) / len(spans)
footer_zones[y_bucket] = {
'samples': sample_texts,
'avg_size': avg_size,
'page_count': len(pages_with_text),
}
footer_texts = set()
footer_pages = set()
for info in footer_zones.values():
footer_texts.update(info['samples'])
y_bucket = [k for k, v in footer_zones.items() if v is info][0]
for s in y_bands[y_bucket]:
footer_pages.add(s['page'])
return {
'footer_zones': footer_zones,
'footer_pages': len(footer_pages),
'footer_texts': footer_texts,
'bottom_pct': bottom_pct,
}
# ============================================================================
# REMOVAL — build regexes from detected patterns and clean content streams
# ============================================================================
def build_watermark_pattern(doc):
"""Build regex to match rotated watermark blocks in content streams."""
for i in range(min(5, len(doc))):
page = doc[i]
for cx in page.get_contents():
stream = doc.xref_stream(cx)
if not stream:
continue
text = stream.decode('latin-1', errors='replace')
for m in re.finditer(
r'([\d.]+)\s+([\d.]+)\s+(-[\d.]+)\s+([\d.]+)\s+'
r'[\d.\-]+\s+[\d.\-]+\s+cm\s*\n?'
r'BT\s*\n?'
r'[\d.\-]+\s+[\d.\-]+\s+(?:Td|Tm)',
text
):
a, b = float(m.group(1)), float(m.group(2))
if a < 0.99 or b > 0.01:
a_val = re.escape(m.group(1))
b_val = re.escape(m.group(2))
pat = (
r'q\s*\n?'
+ a_val + r'\s+' + b_val
+ r'[\s\S]*?TJ\s*\n?\s*ET\s*\n?\s*Q'
)
return pat
return None
def clean_pdf(input_path, output_path, dry_run=False, no_markdown=False):
"""Main cleaning pipeline: detect, build patterns, remove, verify, export."""
doc = fitz.open(input_path)
total = len(doc)
# Determine active steps
steps = list(STEPS)
if no_markdown:
steps = [s for s in steps if s != 'Export']
dash = Dashboard(input_path, total, steps)
# Step 0: Decrypt
was_printed = False
step_idx = 0
if doc.is_encrypted or doc.permissions & fitz.PDF_PERM_MODIFY == 0:
dash.set_step(step_idx, 'Decrypt')
dash.add_finding(f'{DIM}encrypted → printing unencrypted copy{RESET}')
printed_path = print_to_pdf(input_path, dash)
doc.close()
doc = fitz.open(printed_path)
input_path = printed_path
total = len(doc)
was_printed = True
else:
dash.set_step(step_idx, 'Decrypt')
# mark as done immediately (no encryption)
# Step 1: Scan
step_idx = steps.index('Scan')
dash.set_step(step_idx, 'Scan')
dash.progress(0.3, 1, 'detecting watermarks...')
wm_findings = detect_watermarks(doc)
dash.progress(0.7, 1, 'detecting footers...')
ft_findings = detect_footers(doc)
dash.progress(1, 1, 'done')
findings = {**wm_findings, **ft_findings}
# Report findings
if findings['watermark_texts']:
dash.add_finding(f'{CYAN}watermarks{RESET} {findings["watermark_pages"]}/{total} pages')
if findings['footer_zones']:
n_zones = len(findings['footer_zones'])
dash.add_finding(f'{CYAN}footers{RESET} {n_zones} zone{"s" if n_zones > 1 else ""}, {findings["footer_pages"]}/{total} pages')
if not findings['watermark_texts'] and not findings['footer_zones']:
dash.add_finding(f'{DIM}nothing to clean{RESET}')
if was_printed:
os.remove(input_path)
dash.finish(output_path)
return True
if dry_run:
dash.add_finding(f'{YELLOW}--dry-run: no changes made{RESET}')
if was_printed:
os.remove(input_path)
dash.finish(input_path)
return True
# Step 2: Clean
step_idx = steps.index('Clean')
success, wm_total, ft_total = clean_text_pdf(doc, findings, input_path, output_path, total, dash, step_idx)
# Clean up temp printed file
if was_printed and os.path.exists(input_path):
os.remove(input_path)
dash.add_finding(f'{GREEN}removed{RESET} {wm_total} watermarks, {ft_total} footers')
# Step 4: Export markdown
md_path = None
if success and not no_markdown:
step_idx = steps.index('Export')
md_path = export_markdown(output_path, dash, step_idx)
dash.finish(output_path, md_path)
return success
def clean_pdf_headless(input_path, output_path, export_md=True):
"""Headless cleaning pipeline for web/API usage. Returns (success, warnings, md_path)."""
doc = fitz.open(input_path)
total = len(doc)
steps = list(STEPS) if export_md else [s for s in STEPS if s != 'Export']
dash = NullDashboard(input_path, total, steps)
was_printed = False
step_idx = 0
if doc.is_encrypted or doc.permissions & fitz.PDF_PERM_MODIFY == 0:
dash.set_step(step_idx, 'Decrypt')
printed_path = print_to_pdf(input_path, dash)
doc.close()
doc = fitz.open(printed_path)
input_path = printed_path
total = len(doc)
was_printed = True
else:
dash.set_step(step_idx, 'Decrypt')
step_idx = steps.index('Scan')
dash.set_step(step_idx, 'Scan')
wm_findings = detect_watermarks(doc)
ft_findings = detect_footers(doc)
findings = {**wm_findings, **ft_findings}
if not findings['watermark_texts'] and not findings['footer_zones']:
doc.close()
shutil.copy2(input_path, output_path)
if was_printed:
os.remove(input_path)
return True, ['Nothing to clean — PDF is already clean'], None
step_idx = steps.index('Clean')
success, wm_total, ft_total = clean_text_pdf(doc, findings, input_path, output_path, total, dash, step_idx)
if was_printed and os.path.exists(input_path):
os.remove(input_path)
warnings = list(dash.warnings)
warnings.insert(0, f'Removed {wm_total} watermarks, {ft_total} footers')
# Export markdown even if verification found remaining issues —
# the PDF was still cleaned, just not perfectly
md_path = None
if export_md:
step_idx = steps.index('Export')
try:
md_path = export_markdown(output_path, dash, step_idx)
except Exception as e:
warnings.append(f'Markdown export failed: {e}')
return success, warnings, md_path
def clean_text_pdf(doc, findings, input_path, output_path, total, dash, step_idx):
"""Clean PDF: watermarks via content stream regex, footers via redaction."""
dash.set_step(step_idx, 'Clean')
wm_pattern = build_watermark_pattern(doc)
footer_y_buckets = set(findings.get('footer_zones', {}).keys())
doc.close()
doc = fitz.open(input_path)
wm_total = 0
ft_total = 0
for i in range(total):
page = doc[i]
page_h = page.rect.height
threshold_y = page_h * (1 - findings.get('bottom_pct', 0.12))
# Remove watermarks
if wm_pattern:
for cx in page.get_contents():
stream = doc.xref_stream(cx)
if not stream:
continue
text = stream.decode('latin-1', errors='replace')
text, count = re.subn(wm_pattern, '', text)
if count > 0:
wm_total += count
doc.update_stream(cx, text.encode('latin-1'))
# Remove footers
if footer_y_buckets:
blocks = page.get_text('dict')['blocks']
for b in blocks:
if b['type'] != 0:
continue
for line in b['lines']:
for s in line['spans']:
text = s['text'].strip()
if not text or len(text) < 3:
continue
bbox = s['bbox']
if bbox[1] < threshold_y:
continue
y_bucket = round(bbox[1] / 5) * 5
if y_bucket in footer_y_buckets:
rect = fitz.Rect(bbox)
page.add_redact_annot(rect, fill=(1, 1, 1))
ft_total += 1
page.apply_redactions()
dash.progress(i + 1, total, f'page {i+1}/{total}')
# Save
doc.save(output_path, garbage=4, deflate=True)
doc.close()
# Verify
verify_idx = dash.steps.index('Verify')
dash.set_step(verify_idx, 'Verify')
doc = fitz.open(output_path)
remaining_wm = 0
remaining_ft = 0
empty_pages = 0
for i in range(len(doc)):
text = doc[i].get_text()
for wt in findings['watermark_texts']:
if wt in text:
remaining_wm += 1
break
for ft in list(findings['footer_texts'])[:3]:
if ft in text:
remaining_ft += 1
break
if len(text.strip()) < 20 and i < total - 3:
empty_pages += 1
dash.progress(i + 1, len(doc), f'page {i+1}/{len(doc)}')
doc.close()
if remaining_wm:
dash.add_warning(f'{remaining_wm} pages still have watermarks')
if remaining_ft:
dash.add_warning(f'{remaining_ft} pages still have footers')
if empty_pages:
dash.add_warning(f'{empty_pages} pages appear empty')
return remaining_wm == 0 and remaining_ft == 0 and empty_pages == 0, wm_total, ft_total
# ============================================================================
# TITLE EXTRACTION
# ============================================================================
def extract_title(pdf_path):
"""Extract document title from PDF metadata or largest text on first page."""
doc = fitz.open(pdf_path)
meta_title = doc.metadata.get('title', '').strip()
if meta_title and len(meta_title) > 5 and meta_title.lower() != 'unnamed document':
doc.close()
return slugify(meta_title)
page = doc[0]
page_h = page.rect.height
largest_size = 0
largest_text = ''
for b in page.get_text('dict')['blocks']:
if b['type'] != 0:
continue
for line in b['lines']:
for s in line['spans']:
text = s['text'].strip()
if not text or len(text) <= 5:
continue
# Skip rotated watermark spans (same heuristic as detect_watermarks)
bbox = s['bbox']
height_span = bbox[3] - bbox[1]
rotated = height_span > s['size'] * 3
if rotated and (s['size'] > 30 or height_span > page_h * 0.3):
continue
if s['size'] > largest_size:
largest_size = s['size']
largest_text = text
doc.close()
return slugify(largest_text) if largest_text else None
def slugify(text):
"""Convert text to a filesystem-safe filename slug."""
slug = re.sub(r'[^\w\s-]', '', text)
slug = re.sub(r'[\s_]+', '-', slug).strip('-').lower()
return slug[:80] if slug else None
# ============================================================================
# MARKDOWN EXPORT
# ============================================================================
def export_markdown(pdf_path, dash, step_idx):
"""Convert cleaned PDF to markdown using docling."""
try:
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.datamodel.base_models import InputFormat
except ImportError:
dash.add_warning('docling not installed, skipping markdown')
return None
dash.set_step(step_idx, 'Export')
md_path = os.path.splitext(pdf_path)[0] + '.md'
dash.progress(0.2, 1, 'loading docling...')
pipeline_options = PdfPipelineOptions()
pipeline_options.generate_page_images = False
pipeline_options.generate_picture_images = False
pipeline_options.do_ocr = False # skip OCR — we have text-based PDFs
pipeline_options.do_table_structure = False # skip heavy table detection
pipeline_options.do_picture_classification = False
pipeline_options.do_picture_description = False
pipeline_options.do_code_enrichment = False
pipeline_options.do_formula_enrichment = False
converter = DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
}
)
dash.progress(0.4, 1, 'converting pdf → markdown...')
result = converter.convert(pdf_path)
md_content = result.document.export_to_markdown()
dash.progress(0.8, 1, 'cleaning markdown...')
# Remove <!-- image --> tags
md_content = re.sub(r'<!-- image -->\n?', '', md_content)
# Strip metadata before first heading
first_heading = re.search(r'^#{1,6}\s+', md_content, re.MULTILINE)
if first_heading:
md_content = md_content[first_heading.start():]
# Fix page-break splits: rejoin paragraphs broken mid-sentence
md_content = re.sub(
r'([a-zA-Z,;\-\u2013\u2014])\n\n([a-z])',
r'\1 \2',
md_content
)
# Remove orphan page numbers
md_content = re.sub(r'\n\n\d{1,4}\n\n', '\n\n', md_content)
dash.progress(1, 1, f'{len(md_content):,} chars')
with open(md_path, 'w', encoding='utf-8') as f:
f.write(md_content)
return md_path
# ============================================================================
# CLI
# ============================================================================
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Remove watermarks and footers from academic PDFs')
parser.add_argument('input', nargs='+', help='Input PDF file(s)')
parser.add_argument('-o', '--output', help='Output file (only with single input)')
parser.add_argument('--dry-run', action='store_true', help='Scan and report only, no modifications')
parser.add_argument('--no-markdown', action='store_true', help='Skip markdown export')
args = parser.parse_args()
if args.output and len(args.input) > 1:
print("Error: -o/--output only works with a single input file")
sys.exit(1)
all_success = True
for input_file in args.input:
if not os.path.exists(input_file):
print(f"File not found: {input_file}")
all_success = False
continue
if args.output:
output = args.output
else:
title_slug = extract_title(input_file)
out_dir = os.path.dirname(input_file) or '.'
if title_slug:
output = os.path.join(out_dir, f"{title_slug}.pdf")
else:
base, ext = os.path.splitext(input_file)
output = f"{base}-clean{ext}"
success = clean_pdf(input_file, output, args.dry_run, args.no_markdown)
if not success:
all_success = False
sys.exit(0 if all_success else 1)