#!/usr/bin/env python3 """ pdfclean.py — Remove watermarks and footers from academic PDFs (any publisher) Edits PDF content streams directly to surgically remove rotated watermarks and bottom-of-page footer text without affecting body content. Uses repetition-based detection: text that repeats at the same Y-position on >50% of pages = footer (works with any publisher, no hardcoded patterns). Usage: python pdfclean.py document.pdf # saves document-clean.pdf python pdfclean.py document.pdf -o output.pdf # custom output path python pdfclean.py document.pdf --dry-run # scan only, don't modify """ import argparse import re import sys import os import time import shutil from collections import defaultdict try: import fitz except ImportError: print("Missing dependency: pip install pymupdf") sys.exit(1) # ============================================================================ # TUI — minimal progress dashboard # ============================================================================ STEPS = ['Decrypt', 'Scan', 'Clean', 'Verify', 'Export'] DIM = '\033[2m' BOLD = '\033[1m' GREEN = '\033[32m' YELLOW = '\033[33m' CYAN = '\033[36m' RESET = '\033[0m' class Dashboard: """Minimal TUI progress dashboard with step tracking and time estimates.""" def __init__(self, filename, total_pages, steps=None): self.filename = os.path.basename(filename) self.total_pages = total_pages self.steps = steps or STEPS self.current_step = 0 self.step_label = '' self.start_time = time.time() self.step_start = time.time() self.step_times = {} # step_idx → elapsed seconds self.sub_progress = 0.0 # 0.0–1.0 within current step self.detail = '' # one-line detail text self.findings = [] # detection results to display self.warnings = [] # verification warnings self.cols = min(shutil.get_terminal_size().columns, 80) def set_step(self, idx, label=''): """Move to a new pipeline step.""" if self.current_step < len(self.steps): self.step_times[self.current_step] = time.time() - self.step_start self.current_step = idx self.step_label = label self.step_start = time.time() self.sub_progress = 0.0 self.detail = '' self._render() def progress(self, current, total, detail=''): """Update sub-progress within current step.""" self.sub_progress = current / total if total > 0 else 1.0 self.detail = detail self._render() def add_finding(self, text): self.findings.append(text) self._render() def add_warning(self, text): self.warnings.append(text) self._render() def finish(self, output_path, md_path=None): """Render final summary.""" self.step_times[self.current_step] = time.time() - self.step_start elapsed = time.time() - self.start_time sys.stderr.write('\033[2J\033[H') # clear screen # Header w = self.cols print(f"{BOLD}{'─' * w}{RESET}") print(f"{BOLD} pdfclean{RESET} {DIM}{self.filename} · {self.total_pages} pages{RESET}") print(f"{BOLD}{'─' * w}{RESET}") # Steps with times for i, step in enumerate(self.steps): t = self.step_times.get(i) if t is not None and t > 0.001: mark = f'{GREEN}✓{RESET}' time_str = f'{DIM}{t:.1f}s{RESET}' elif t is not None: mark = f'{DIM}–{RESET}' time_str = f'{DIM}skip{RESET}' else: mark = f'{DIM}–{RESET}' time_str = '' print(f" {mark} {step:<12} {time_str}") print() # Findings if self.findings: for f in self.findings: print(f" {f}") print() # Warnings if self.warnings: for warn in self.warnings: print(f" {YELLOW}⚠ {warn}{RESET}") print() elif any(self.step_times.get(i, 0) > 0.001 for i in [2]): # clean step ran print(f" {GREEN}✓ all clean, all content intact{RESET}") print() # Output print(f" {BOLD}pdf{RESET} {output_path}") if md_path: print(f" {BOLD}md {RESET} {md_path}") # Timing print(f"\n{DIM} done in {elapsed:.1f}s{RESET}") print(f"{BOLD}{'─' * w}{RESET}") def _render(self): """Redraw the dashboard to stderr.""" elapsed = time.time() - self.start_time step_elapsed = time.time() - self.step_start # Estimate remaining: average step time × remaining steps completed_times = [v for v in self.step_times.values() if v > 0.001] if completed_times and self.current_step < len(self.steps): avg = sum(completed_times) / len(completed_times) remaining_steps = len(self.steps) - self.current_step - 1 step_remaining = step_elapsed * (1 - self.sub_progress) / max(self.sub_progress, 0.01) eta = step_remaining + (avg * remaining_steps) else: eta = 0 sys.stderr.write('\033[2J\033[H') # clear screen w = self.cols # Header print(f"{BOLD}{'─' * w}{RESET}", file=sys.stderr) print(f"{BOLD} pdfclean{RESET} {DIM}{self.filename} · {self.total_pages} pages{RESET}", file=sys.stderr) print(f"{BOLD}{'─' * w}{RESET}", file=sys.stderr) # Step indicators for i, step in enumerate(self.steps): if i < self.current_step: t = self.step_times.get(i, 0) if t > 0.001: mark = f'{GREEN}✓{RESET}' time_str = f'{DIM}{t:.1f}s{RESET}' else: mark = f'{DIM}–{RESET}' time_str = f'{DIM}skip{RESET}' elif i == self.current_step: mark = f'{CYAN}›{RESET}' time_str = f'{DIM}{step_elapsed:.1f}s{RESET}' else: mark = f'{DIM}·{RESET}' time_str = '' label = self.step_label if i == self.current_step and self.step_label else step print(f" {mark} {label:<12} {time_str}", file=sys.stderr) print(file=sys.stderr) # Progress bar for current step if self.sub_progress > 0: bar_w = w - 20 filled = int(self.sub_progress * bar_w) pct = self.sub_progress * 100 bar = f"{'█' * filled}{'░' * (bar_w - filled)}" print(f" {bar} {pct:5.1f}%", file=sys.stderr) else: print(f" {DIM}working...{RESET}", file=sys.stderr) # Detail line if self.detail: detail_max = w - 4 d = self.detail[:detail_max] print(f" {DIM}{d}{RESET}", file=sys.stderr) print(file=sys.stderr) # Findings so far for f in self.findings: print(f" {f}", file=sys.stderr) # Timing footer eta_str = f'~{eta:.0f}s left' if eta > 1 else '' print(f"\n{DIM} elapsed {elapsed:.1f}s {eta_str}{RESET}", file=sys.stderr) print(f"{BOLD}{'─' * w}{RESET}", file=sys.stderr) sys.stderr.flush() class NullDashboard(Dashboard): """Silent dashboard for headless/web usage.""" def __init__(self, filename, total_pages, steps=None): super().__init__(filename, total_pages, steps) def _render(self): pass def finish(self, output_path, md_path=None): pass # ============================================================================ # PRINT-TO-PDF — flatten encrypted PDFs # ============================================================================ def print_to_pdf(input_path, dash): """Re-print an encrypted/restricted PDF into an unencrypted text-based copy.""" src = fitz.open(input_path) dst = fitz.open() total = len(src) dst.insert_pdf(src, from_page=0, to_page=total - 1) for i in range(total): dash.progress(i + 1, total, f'page {i+1}/{total}') base, ext = os.path.splitext(input_path) printed_path = f"{base}-printed{ext}" dst.save(printed_path, garbage=4, deflate=True) dst.close() src.close() return printed_path # ============================================================================ # DETECTION — find watermarks by text properties, footers by repetition # ============================================================================ def detect_watermarks(doc): """Scan all pages for large rotated text (watermarks).""" findings = { 'watermark_texts': set(), 'watermark_pages': 0, } for i in range(len(doc)): page = doc[i] page_h = page.rect.height blocks = page.get_text('dict')['blocks'] has_wm = False for b in blocks: if b['type'] != 0: continue for line in b['lines']: for s in line['spans']: text = s['text'].strip() bbox = s['bbox'] height_span = bbox[3] - bbox[1] # Rotated watermarks have height_span far exceeding font size # (e.g. ratio > 3), normal titles are proportional (~1.2) rotated = height_span > s['size'] * 3 if rotated and (s['size'] > 30 or height_span > page_h * 0.3) and text: findings['watermark_texts'].add(text) has_wm = True if has_wm: findings['watermark_pages'] += 1 return findings def detect_footers(doc): """Detect footer zones by repetition analysis — publisher-agnostic.""" total = len(doc) if total < 3: return {'footer_zones': {}, 'footer_pages': 0, 'footer_texts': set()} y_bands = defaultdict(list) bottom_pct = 0.12 for i in range(total): page = doc[i] page_h = page.rect.height threshold_y = page_h * (1 - bottom_pct) blocks = page.get_text('dict')['blocks'] for b in blocks: if b['type'] != 0: continue for line in b['lines']: for s in line['spans']: text = s['text'].strip() if not text or len(text) < 3: continue bbox = s['bbox'] if bbox[1] < threshold_y: continue if len(text) < 5 and text.replace('-', '').replace('.', '').isdigit(): continue y_bucket = round(bbox[1] / 5) * 5 y_bands[y_bucket].append({ 'page': i, 'text': text, 'size': s['size'], 'bbox': bbox }) min_pages = max(2, total * 0.5) footer_zones = {} for y_bucket, spans in y_bands.items(): pages_with_text = set(s['page'] for s in spans) if len(pages_with_text) >= min_pages: sample_texts = set(s['text'] for s in spans[:5]) avg_size = sum(s['size'] for s in spans) / len(spans) footer_zones[y_bucket] = { 'samples': sample_texts, 'avg_size': avg_size, 'page_count': len(pages_with_text), } footer_texts = set() footer_pages = set() for info in footer_zones.values(): footer_texts.update(info['samples']) y_bucket = [k for k, v in footer_zones.items() if v is info][0] for s in y_bands[y_bucket]: footer_pages.add(s['page']) return { 'footer_zones': footer_zones, 'footer_pages': len(footer_pages), 'footer_texts': footer_texts, 'bottom_pct': bottom_pct, } # ============================================================================ # REMOVAL — build regexes from detected patterns and clean content streams # ============================================================================ def build_watermark_pattern(doc): """Build regex to match rotated watermark blocks in content streams.""" for i in range(min(5, len(doc))): page = doc[i] for cx in page.get_contents(): stream = doc.xref_stream(cx) if not stream: continue text = stream.decode('latin-1', errors='replace') for m in re.finditer( r'([\d.]+)\s+([\d.]+)\s+(-[\d.]+)\s+([\d.]+)\s+' r'[\d.\-]+\s+[\d.\-]+\s+cm\s*\n?' r'BT\s*\n?' r'[\d.\-]+\s+[\d.\-]+\s+(?:Td|Tm)', text ): a, b = float(m.group(1)), float(m.group(2)) if a < 0.99 or b > 0.01: a_val = re.escape(m.group(1)) b_val = re.escape(m.group(2)) pat = ( r'q\s*\n?' + a_val + r'\s+' + b_val + r'[\s\S]*?TJ\s*\n?\s*ET\s*\n?\s*Q' ) return pat return None def clean_pdf(input_path, output_path, dry_run=False, no_markdown=False): """Main cleaning pipeline: detect, build patterns, remove, verify, export.""" doc = fitz.open(input_path) total = len(doc) # Determine active steps steps = list(STEPS) if no_markdown: steps = [s for s in steps if s != 'Export'] dash = Dashboard(input_path, total, steps) # Step 0: Decrypt was_printed = False step_idx = 0 if doc.is_encrypted or doc.permissions & fitz.PDF_PERM_MODIFY == 0: dash.set_step(step_idx, 'Decrypt') dash.add_finding(f'{DIM}encrypted → printing unencrypted copy{RESET}') printed_path = print_to_pdf(input_path, dash) doc.close() doc = fitz.open(printed_path) input_path = printed_path total = len(doc) was_printed = True else: dash.set_step(step_idx, 'Decrypt') # mark as done immediately (no encryption) # Step 1: Scan step_idx = steps.index('Scan') dash.set_step(step_idx, 'Scan') dash.progress(0.3, 1, 'detecting watermarks...') wm_findings = detect_watermarks(doc) dash.progress(0.7, 1, 'detecting footers...') ft_findings = detect_footers(doc) dash.progress(1, 1, 'done') findings = {**wm_findings, **ft_findings} # Report findings if findings['watermark_texts']: dash.add_finding(f'{CYAN}watermarks{RESET} {findings["watermark_pages"]}/{total} pages') if findings['footer_zones']: n_zones = len(findings['footer_zones']) dash.add_finding(f'{CYAN}footers{RESET} {n_zones} zone{"s" if n_zones > 1 else ""}, {findings["footer_pages"]}/{total} pages') if not findings['watermark_texts'] and not findings['footer_zones']: dash.add_finding(f'{DIM}nothing to clean{RESET}') if was_printed: os.remove(input_path) dash.finish(output_path) return True if dry_run: dash.add_finding(f'{YELLOW}--dry-run: no changes made{RESET}') if was_printed: os.remove(input_path) dash.finish(input_path) return True # Step 2: Clean step_idx = steps.index('Clean') success, wm_total, ft_total = clean_text_pdf(doc, findings, input_path, output_path, total, dash, step_idx) # Clean up temp printed file if was_printed and os.path.exists(input_path): os.remove(input_path) dash.add_finding(f'{GREEN}removed{RESET} {wm_total} watermarks, {ft_total} footers') # Step 4: Export markdown md_path = None if success and not no_markdown: step_idx = steps.index('Export') md_path = export_markdown(output_path, dash, step_idx) dash.finish(output_path, md_path) return success def clean_pdf_headless(input_path, output_path, export_md=True): """Headless cleaning pipeline for web/API usage. Returns (success, warnings, md_path).""" doc = fitz.open(input_path) total = len(doc) steps = list(STEPS) if export_md else [s for s in STEPS if s != 'Export'] dash = NullDashboard(input_path, total, steps) was_printed = False step_idx = 0 if doc.is_encrypted or doc.permissions & fitz.PDF_PERM_MODIFY == 0: dash.set_step(step_idx, 'Decrypt') printed_path = print_to_pdf(input_path, dash) doc.close() doc = fitz.open(printed_path) input_path = printed_path total = len(doc) was_printed = True else: dash.set_step(step_idx, 'Decrypt') step_idx = steps.index('Scan') dash.set_step(step_idx, 'Scan') wm_findings = detect_watermarks(doc) ft_findings = detect_footers(doc) findings = {**wm_findings, **ft_findings} if not findings['watermark_texts'] and not findings['footer_zones']: doc.close() shutil.copy2(input_path, output_path) if was_printed: os.remove(input_path) return True, ['Nothing to clean — PDF is already clean'], None step_idx = steps.index('Clean') success, wm_total, ft_total = clean_text_pdf(doc, findings, input_path, output_path, total, dash, step_idx) if was_printed and os.path.exists(input_path): os.remove(input_path) warnings = list(dash.warnings) warnings.insert(0, f'Removed {wm_total} watermarks, {ft_total} footers') # Export markdown even if verification found remaining issues — # the PDF was still cleaned, just not perfectly md_path = None if export_md: step_idx = steps.index('Export') try: md_path = export_markdown(output_path, dash, step_idx) except Exception as e: warnings.append(f'Markdown export failed: {e}') return success, warnings, md_path def clean_text_pdf(doc, findings, input_path, output_path, total, dash, step_idx): """Clean PDF: watermarks via content stream regex, footers via redaction.""" dash.set_step(step_idx, 'Clean') wm_pattern = build_watermark_pattern(doc) footer_y_buckets = set(findings.get('footer_zones', {}).keys()) doc.close() doc = fitz.open(input_path) wm_total = 0 ft_total = 0 for i in range(total): page = doc[i] page_h = page.rect.height threshold_y = page_h * (1 - findings.get('bottom_pct', 0.12)) # Remove watermarks if wm_pattern: for cx in page.get_contents(): stream = doc.xref_stream(cx) if not stream: continue text = stream.decode('latin-1', errors='replace') text, count = re.subn(wm_pattern, '', text) if count > 0: wm_total += count doc.update_stream(cx, text.encode('latin-1')) # Remove footers if footer_y_buckets: blocks = page.get_text('dict')['blocks'] for b in blocks: if b['type'] != 0: continue for line in b['lines']: for s in line['spans']: text = s['text'].strip() if not text or len(text) < 3: continue bbox = s['bbox'] if bbox[1] < threshold_y: continue y_bucket = round(bbox[1] / 5) * 5 if y_bucket in footer_y_buckets: rect = fitz.Rect(bbox) page.add_redact_annot(rect, fill=(1, 1, 1)) ft_total += 1 page.apply_redactions() dash.progress(i + 1, total, f'page {i+1}/{total}') # Save doc.save(output_path, garbage=4, deflate=True) doc.close() # Verify verify_idx = dash.steps.index('Verify') dash.set_step(verify_idx, 'Verify') doc = fitz.open(output_path) remaining_wm = 0 remaining_ft = 0 empty_pages = 0 for i in range(len(doc)): text = doc[i].get_text() for wt in findings['watermark_texts']: if wt in text: remaining_wm += 1 break for ft in list(findings['footer_texts'])[:3]: if ft in text: remaining_ft += 1 break if len(text.strip()) < 20 and i < total - 3: empty_pages += 1 dash.progress(i + 1, len(doc), f'page {i+1}/{len(doc)}') doc.close() if remaining_wm: dash.add_warning(f'{remaining_wm} pages still have watermarks') if remaining_ft: dash.add_warning(f'{remaining_ft} pages still have footers') if empty_pages: dash.add_warning(f'{empty_pages} pages appear empty') return remaining_wm == 0 and remaining_ft == 0 and empty_pages == 0, wm_total, ft_total # ============================================================================ # TITLE EXTRACTION # ============================================================================ def extract_title(pdf_path): """Extract document title from PDF metadata or largest text on first page.""" doc = fitz.open(pdf_path) meta_title = doc.metadata.get('title', '').strip() if meta_title and len(meta_title) > 5 and meta_title.lower() != 'unnamed document': doc.close() return slugify(meta_title) page = doc[0] page_h = page.rect.height largest_size = 0 largest_text = '' for b in page.get_text('dict')['blocks']: if b['type'] != 0: continue for line in b['lines']: for s in line['spans']: text = s['text'].strip() if not text or len(text) <= 5: continue # Skip rotated watermark spans (same heuristic as detect_watermarks) bbox = s['bbox'] height_span = bbox[3] - bbox[1] rotated = height_span > s['size'] * 3 if rotated and (s['size'] > 30 or height_span > page_h * 0.3): continue if s['size'] > largest_size: largest_size = s['size'] largest_text = text doc.close() return slugify(largest_text) if largest_text else None def slugify(text): """Convert text to a filesystem-safe filename slug.""" slug = re.sub(r'[^\w\s-]', '', text) slug = re.sub(r'[\s_]+', '-', slug).strip('-').lower() return slug[:80] if slug else None # ============================================================================ # MARKDOWN EXPORT # ============================================================================ def export_markdown(pdf_path, dash, step_idx): """Convert cleaned PDF to markdown using docling.""" try: from docling.document_converter import DocumentConverter, PdfFormatOption from docling.datamodel.pipeline_options import PdfPipelineOptions from docling.datamodel.base_models import InputFormat except ImportError: dash.add_warning('docling not installed, skipping markdown') return None dash.set_step(step_idx, 'Export') md_path = os.path.splitext(pdf_path)[0] + '.md' dash.progress(0.2, 1, 'loading docling...') pipeline_options = PdfPipelineOptions() pipeline_options.generate_page_images = False pipeline_options.generate_picture_images = False pipeline_options.do_ocr = False # skip OCR — we have text-based PDFs pipeline_options.do_table_structure = False # skip heavy table detection pipeline_options.do_picture_classification = False pipeline_options.do_picture_description = False pipeline_options.do_code_enrichment = False pipeline_options.do_formula_enrichment = False converter = DocumentConverter( format_options={ InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options) } ) dash.progress(0.4, 1, 'converting pdf → markdown...') result = converter.convert(pdf_path) md_content = result.document.export_to_markdown() dash.progress(0.8, 1, 'cleaning markdown...') # Remove tags md_content = re.sub(r'\n?', '', md_content) # Strip metadata before first heading first_heading = re.search(r'^#{1,6}\s+', md_content, re.MULTILINE) if first_heading: md_content = md_content[first_heading.start():] # Fix page-break splits: rejoin paragraphs broken mid-sentence md_content = re.sub( r'([a-zA-Z,;\-\u2013\u2014])\n\n([a-z])', r'\1 \2', md_content ) # Remove orphan page numbers md_content = re.sub(r'\n\n\d{1,4}\n\n', '\n\n', md_content) dash.progress(1, 1, f'{len(md_content):,} chars') with open(md_path, 'w', encoding='utf-8') as f: f.write(md_content) return md_path # ============================================================================ # CLI # ============================================================================ if __name__ == '__main__': parser = argparse.ArgumentParser(description='Remove watermarks and footers from academic PDFs') parser.add_argument('input', nargs='+', help='Input PDF file(s)') parser.add_argument('-o', '--output', help='Output file (only with single input)') parser.add_argument('--dry-run', action='store_true', help='Scan and report only, no modifications') parser.add_argument('--no-markdown', action='store_true', help='Skip markdown export') args = parser.parse_args() if args.output and len(args.input) > 1: print("Error: -o/--output only works with a single input file") sys.exit(1) all_success = True for input_file in args.input: if not os.path.exists(input_file): print(f"File not found: {input_file}") all_success = False continue if args.output: output = args.output else: title_slug = extract_title(input_file) out_dir = os.path.dirname(input_file) or '.' if title_slug: output = os.path.join(out_dir, f"{title_slug}.pdf") else: base, ext = os.path.splitext(input_file) output = f"{base}-clean{ext}" success = clean_pdf(input_file, output, args.dry_run, args.no_markdown) if not success: all_success = False sys.exit(0 if all_success else 1)