Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import fitz # PyMuPDF | |
| import os | |
| import subprocess | |
| import tempfile | |
| import sys | |
| import toml | |
| import shutil | |
| import zipfile | |
| import io | |
| # Ensure we can import from utils if needed | |
| sys.path.append(os.path.dirname(__file__)) | |
| from utils import toc_processor | |
| from pdfxmeta import pdfxmeta | |
| st.set_page_config(page_title="PDF Bookmark Generator", layout="wide") | |
| st.title("PDF Table of Contents Generator") | |
| st.markdown(""" | |
| **Upload a PDF**, analyze its fonts to find headers, and generate a clean Table of Contents. | |
| """) | |
| uploaded_file = st.file_uploader("Choose a PDF file", type="pdf") | |
| if uploaded_file is not None: | |
| # We need to save the uploaded file to disk for the CLI tools to read it | |
| # We'll use a permanent temp file for the session so we don't have to re-upload constantly | |
| # But for cleanliness, we might want to put this in a temp dir too? | |
| # For now, keeping the input file logic as is (tempfile), but we'll put OUTPUTS in a pure temp dir | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf: | |
| tmp_pdf.write(uploaded_file.getvalue()) | |
| input_pdf_path = tmp_pdf.name | |
| # --- State Management & Reset --- | |
| # Check if a new file is uploaded | |
| file_id = f"{uploaded_file.name}_{uploaded_file.size}" # Robust proxy for ID | |
| if 'current_file_id' not in st.session_state: | |
| st.session_state['current_file_id'] = None | |
| if st.session_state['current_file_id'] != file_id: | |
| # NEW FILE DETECTED: Reset Pipeline State | |
| keys_to_reset = ['final_pdf_bytes', 'final_zip_bytes', 'final_zip_name', 'search_matches', 'font_name', 'font_size'] | |
| for k in keys_to_reset: | |
| if k in st.session_state: | |
| del st.session_state[k] | |
| st.session_state['current_file_id'] = file_id | |
| # st.toast(f"New file loaded: {uploaded_file.name}. State cleared.") | |
| st.success(f"Loaded: {uploaded_file.name}") | |
| # --- Data Source Selection --- | |
| st.header("1. Source Selection") | |
| source_mode = st.radio("Where should the bookmarks come from?", | |
| ["Scan & Generate (Create New)", "Use Existing Bookmarks (Modify)"], | |
| help="Choose 'Scan & Generate' to build new bookmarks from fonts. Choose 'Use Existing' to tidy up bookmarks already in the file.") | |
| # --- Analysis Section (Only for Generate) --- | |
| if source_mode == "Scan & Generate (Create New)": | |
| st.header("2. Analyze Fonts") | |
| if 'font_name' not in st.session_state: | |
| st.session_state['font_name'] = '' | |
| if 'font_size' not in st.session_state: | |
| st.session_state['font_size'] = 18.0 | |
| tab1, tab2 = st.tabs(["Scan for Large Fonts", "Search by Text"]) | |
| with tab1: | |
| if st.button("Find Header Candidates"): | |
| with st.spinner("Scanning PDF for large fonts..."): | |
| doc = fitz.open(input_pdf_path) | |
| candidates = [] | |
| for page in doc[:50]: | |
| text_page = page.get_text("dict") | |
| for block in text_page["blocks"]: | |
| for line in block.get("lines", []): | |
| for span in line["spans"]: | |
| text = span["text"].strip() | |
| if len(text) > 3: | |
| candidates.append({ | |
| "Text": text[:50], | |
| "Font": span["font"], | |
| "Size": round(span["size"], 2), | |
| "Page": page.number + 1 | |
| }) | |
| doc.close() | |
| if candidates: | |
| df = pd.DataFrame(candidates) | |
| summary = df.groupby(['Font', 'Size']).size().reset_index(name='Count') | |
| summary = summary.sort_values(by=['Size', 'Count'], ascending=[False, False]).head(20) | |
| st.session_state['scan_results'] = summary | |
| else: | |
| st.warning("No text found.") | |
| if 'scan_results' in st.session_state: | |
| st.write("### Top Large Fonts Found") | |
| st.dataframe(st.session_state['scan_results'], use_container_width=True) | |
| def update_from_scan(): | |
| val = st.session_state.scan_selector | |
| if val: | |
| f_name = val.split(" (")[0] | |
| f_size = float(val.split("(")[1].replace("pt)", "")) | |
| st.session_state['font_name'] = f_name | |
| st.session_state['font_size'] = f_size | |
| options = st.session_state['scan_results'].apply(lambda x: f"{x['Font']} ({x['Size']}pt)", axis=1) | |
| st.selectbox("Select extraction font:", options, key='scan_selector', on_change=update_from_scan, index=None, placeholder="Choose a font...") | |
| with tab2: | |
| search_query = st.text_input("Enter text to find (e.g., 'Chapter 1')", "") | |
| c1, c2 = st.columns([1, 3]) | |
| with c1: | |
| do_search = st.button("Search Text") | |
| with c2: | |
| is_case_sensitive = st.checkbox("Case Sensitive", value=False) | |
| if do_search: | |
| with st.spinner(f"Searching for '{search_query}'..."): | |
| # Use the robust pdfxmeta library | |
| try: | |
| doc = fitz.open(input_pdf_path) | |
| # pdfxmeta expects a regex pattern, so we escape the query to be safe | |
| import re | |
| safe_pattern = re.escape(search_query) | |
| # extract_meta returns a list of dicts (spans) | |
| results = pdfxmeta.extract_meta(doc, safe_pattern, ign_case=(not is_case_sensitive)) | |
| doc.close() | |
| matches = [] | |
| for res in results: | |
| matches.append({ | |
| "Text": res.get("text", "").strip(), | |
| "Font": res.get("font", ""), | |
| "Size": round(res.get("size", 0), 2), | |
| "Page": res.get("page_index", 0) | |
| }) | |
| # Limit for display safety | |
| if len(matches) > 50: break | |
| if matches: | |
| st.session_state['search_matches'] = pd.DataFrame(matches) | |
| else: | |
| st.warning("No matches found.") | |
| except Exception as e: | |
| st.error(f"Search failed: {e}") | |
| if 'search_matches' in st.session_state: | |
| st.write(f"### Found Matches") | |
| st.dataframe(st.session_state['search_matches'], use_container_width=True) | |
| def update_from_search(): | |
| val = st.session_state.search_selector | |
| if val: | |
| parts = val.split(" (") | |
| f_name = parts[0] | |
| f_size = float(parts[1].split("pt)")[0]) | |
| st.session_state['font_name'] = f_name | |
| st.session_state['font_size'] = f_size | |
| options = st.session_state['search_matches'].apply(lambda x: f"{x['Font']} ({x['Size']}pt) - Pg {x['Page']}", axis=1) | |
| st.selectbox("Select font from match:", options, key='search_selector', on_change=update_from_search, index=None, placeholder="Choose a match...") | |
| # --- Configuration (Only for Generate) --- | |
| st.header("3. Configure Recipe") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| font_name_input = st.text_input("Font Name", key='font_name') | |
| with col2: | |
| font_size_input = st.number_input("Font Size", key='font_size') | |
| greedy = st.checkbox("Greedy Match (Merge multiline specs)", value=True) | |
| # --- Back Matter Configuration --- | |
| with st.expander("Back Matter Configuration (Optional)", expanded=False): | |
| st.markdown("Identify where the **Back Matter** (Index, Glossary, etc.) starts to split it into a separate `999_Back_matter.pdf`.") | |
| # Independent Search for Back Matter | |
| bm_query = st.text_input("Find Back Matter start (e.g., 'Index')", key="bm_search_query") | |
| c_bm1, c_bm2 = st.columns([1, 3]) | |
| with c_bm1: | |
| do_bm_search = st.button("Search Back Matter") | |
| with c_bm2: | |
| bm_case_sensitive = st.checkbox("Case Sensitive", key="bm_sens", value=False) | |
| if do_bm_search: | |
| with st.spinner("Searching..."): | |
| try: | |
| doc = fitz.open(input_pdf_path) | |
| import re | |
| safe_pattern = re.escape(bm_query) | |
| results = pdfxmeta.extract_meta(doc, safe_pattern, ign_case=(not bm_case_sensitive)) | |
| doc.close() | |
| bm_matches = [] | |
| for res in results: | |
| bm_matches.append({ | |
| "Text": res.get("text", "").strip(), | |
| "Page": res.get("page_index", 0) # Display raw (already 1-based from pdfxmeta) | |
| }) | |
| if len(bm_matches) > 50: break | |
| if bm_matches: | |
| st.session_state['bm_matches'] = pd.DataFrame(bm_matches) | |
| else: | |
| st.warning("No matches found.") | |
| except Exception as e: | |
| st.error(f"Search failed: {e}") | |
| if 'bm_matches' in st.session_state: | |
| st.dataframe(st.session_state['bm_matches'], use_container_width=True) | |
| def update_bm_page(): | |
| val = st.session_state.bm_selector | |
| if val: | |
| # Value format: "Page X - Text..." | |
| page_num = int(val.split(" -")[0].replace("Page ", "")) | |
| st.session_state['back_matter_page'] = page_num | |
| bm_options = st.session_state['bm_matches'].apply(lambda x: f"Page {x['Page']} - {x['Text'][:30]}...", axis=1) | |
| st.selectbox("Select Start Page:", bm_options, key='bm_selector', on_change=update_bm_page, index=None, placeholder="Select start page...") | |
| # Manual Override | |
| # Update session state when this input changes | |
| def update_manual_bm(): | |
| st.session_state['back_matter_page'] = st.session_state.back_matter_page_manual | |
| st.number_input("Or manually set Start Page:", min_value=0, value=st.session_state.get('back_matter_page', 0), key='back_matter_page_manual', on_change=update_manual_bm) | |
| else: | |
| # Existing Mode | |
| st.info("Using existing bookmarks. They will be cleaned, numbered, and used for splitting/downloading.") | |
| # --- Generation --- | |
| st.header("4. Process & Generate") | |
| if st.button("Run Pipeline"): | |
| # Validate inputs if generating | |
| if source_mode == "Scan & Generate (Create New)" and not st.session_state.get('font_name'): | |
| st.error("Please specify a font name for extraction.") | |
| else: | |
| with st.status("Running pipeline tasks...", expanded=True) as status: | |
| # Use a temporary directory for all intermediate files | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| status.write(f"Created temp workspace: {temp_dir}") | |
| # Paths | |
| recipe_path = os.path.join(temp_dir, "recipe.toml") | |
| raw_toc_path = os.path.join(temp_dir, "raw.toc") # pdftocgen output | |
| clean_toc_path = os.path.join(temp_dir, "clean.toc") # modify_toc output | |
| output_pdf_path = os.path.join(temp_dir, "final.pdf") | |
| raw_toc_content = "" | |
| if source_mode == "Scan & Generate (Create New)": | |
| # 1. Create Recipe | |
| recipe_data = { | |
| "heading": [{ | |
| "level": 1, | |
| "greedy": greedy, | |
| "font": { | |
| "name": st.session_state['font_name'], | |
| "size": st.session_state['font_size'], | |
| "size_tolerance": 0.1 | |
| } | |
| }] | |
| } | |
| with open(recipe_path, "w") as f: | |
| toml.dump(recipe_data, f) | |
| status.write("β Recipe created") | |
| # 2. Run pdftocgen -> raw.toc | |
| status.write("Running pdftocgen (Scanning)...") | |
| cmd1 = [sys.executable, "-m", "pdftocgen", "-r", recipe_path, input_pdf_path] | |
| process = subprocess.run(cmd1, capture_output=True, text=True, encoding='utf-8') | |
| if process.returncode != 0: | |
| st.error(f"pdftocgen failed: {process.stderr}") | |
| st.stop() | |
| raw_toc_content = process.stdout | |
| status.write("β Headers extracted") | |
| else: | |
| # Existing Bookmarks | |
| status.write("Extracting existing bookmarks...") | |
| # Run pdftocio in extract mode | |
| cmd1 = [sys.executable, "-m", "pdftocio", input_pdf_path] | |
| process = subprocess.run(cmd1, capture_output=True, text=True, encoding='utf-8') | |
| if process.returncode != 0: | |
| st.error(f"pdftocio failed: {process.stderr}") | |
| st.stop() | |
| raw_toc_content = process.stdout | |
| if not raw_toc_content.strip(): | |
| st.warning("No existing bookmarks found!") | |
| st.stop() | |
| status.write("β Existing bookmarks imported") | |
| # 3. Clean Content (Using centralized utility) | |
| status.write("Cleaning and merging bookmarks...") | |
| cleaned_toc_content = toc_processor.process_toc(raw_toc_content) | |
| with open(clean_toc_path, "w", encoding='utf-8') as f: | |
| f.write(cleaned_toc_content) | |
| status.write("β Bookmarks formatted (Double-splits fixed)") | |
| # 4. Write PDF | |
| status.write("Writing to PDF...") | |
| cmd3 = [sys.executable, "-m", "pdftocio", "-t", clean_toc_path, "-o", output_pdf_path, input_pdf_path] | |
| process = subprocess.run(cmd3, capture_output=True, text=True) | |
| if process.returncode != 0: | |
| st.error(f"pdftocio failed: {process.stderr}") | |
| st.stop() | |
| status.write("β PDF saved") | |
| # 5. Read Result for Download | |
| with open(output_pdf_path, "rb") as f: | |
| st.session_state['final_pdf_bytes'] = f.read() | |
| # 6. Split & Zip (The Feature) | |
| # Use a temp file for the zip to avoid memory issues | |
| with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp_zip: | |
| tmp_zip_path = tmp_zip.name | |
| try: | |
| # Pass back_matter_page if it exists and is valid | |
| bm_page = st.session_state.get('back_matter_page', 0) | |
| if bm_page == 0: bm_page = None | |
| toc_processor.generate_chapter_splits(output_pdf_path, tmp_zip_path, back_matter_start_page=bm_page) | |
| with open(tmp_zip_path, "rb") as f: | |
| st.session_state['final_zip_bytes'] = f.read() | |
| base_name = os.path.splitext(uploaded_file.name)[0] | |
| st.session_state['final_zip_name'] = f"{base_name}_chapters.zip" | |
| except Exception as e: | |
| st.error(f"Error generating zip: {e}") | |
| finally: | |
| if os.path.exists(tmp_zip_path): | |
| os.unlink(tmp_zip_path) | |
| # --- Persistent Download Area --- | |
| if 'final_pdf_bytes' in st.session_state: | |
| st.success("Pipeline completed successfully!") | |
| st.write("### Downloads") | |
| c_dl1, c_dl2 = st.columns(2) | |
| with c_dl1: | |
| st.download_button( | |
| label="Download Bookmarked PDF", | |
| data=st.session_state['final_pdf_bytes'], | |
| file_name="bookmarked_doc.pdf", | |
| mime="application/pdf", | |
| key="dl_pdf_btn" | |
| ) | |
| with c_dl2: | |
| if 'final_zip_bytes' in st.session_state: | |
| st.download_button( | |
| label=f"Download ZIP ({st.session_state['final_zip_name']})", | |
| data=st.session_state['final_zip_bytes'], | |
| file_name=st.session_state['final_zip_name'], | |
| mime="application/zip", | |
| key="dl_zip_btn" | |
| ) | |
| st.markdown("---") | |
| st.markdown(""" | |
| <div style="text-align: center; color: #666; font-size: 0.8em;"> | |
| Based on <a href="https://github.com/Krasjet/pdf.tocgen" target="_blank">pdf.tocgen</a> by krasjet. <br> | |
| Enhanced with UI, Chapter Splitting, and Metadata Search. Licensed under AGPL-3.0. | |
| </div> | |
| """, unsafe_allow_html=True) | |