| import streamlit as st |
| import os |
| import zipfile |
| from io import BytesIO |
| from PyPDF2 import PdfReader |
| from keybert import KeyBERT |
| from sentence_transformers import SentenceTransformer, util |
|
|
| |
| kw_model = KeyBERT() |
| semantic_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') |
|
|
| def main(): |
| st.title("PDF Topic Grouping App") |
|
|
| st.warning(""" |
| **Warning**: Do not enter confidential data into this app when it is running in the cloud. |
| Your information may not be secure. |
| """) |
|
|
| st.warning(""" |
| **Important**: This Space is shared with other users, meaning others can view your results and data. |
| Please duplicate this Space to your own Hugging Face account for privacy and security. |
| """) |
|
|
| |
| uploaded_files = st.file_uploader("Upload PDFs", type="pdf", accept_multiple_files=True) |
|
|
| if not uploaded_files: |
| st.info("Please upload PDFs to continue.") |
| return |
|
|
| |
| uploaded_file_names = [f.name for f in uploaded_files] |
| if "uploaded_files" not in st.session_state or st.session_state.uploaded_files != uploaded_file_names: |
| st.session_state.uploaded_files = uploaded_file_names |
| st.session_state.keywords_set = None |
|
|
| |
| if st.session_state.keywords_set is None: |
| st.info("Extracting keywords from PDFs...") |
| pdf_texts = {} |
| keywords_set = set() |
|
|
| progress1 = st.progress(0) |
| total_files = len(uploaded_files) |
|
|
| for i, uploaded_file in enumerate(uploaded_files): |
| pdf_name = uploaded_file.name |
| try: |
| reader = PdfReader(uploaded_file) |
| text = "".join(page.extract_text() for page in reader.pages) |
| pdf_texts[pdf_name] = text.lower() |
|
|
| extracted_keywords = kw_model.extract_keywords(text, top_n=5) |
| for kw, _ in extracted_keywords: |
| keywords_set.add(kw.lower()) |
| except Exception as e: |
| st.error(f"Failed to process {pdf_name}: {e}") |
| finally: |
| progress1.progress((i + 1) / total_files) |
|
|
| if not pdf_texts: |
| st.error("No PDFs could be processed.") |
| return |
|
|
| progress1.progress(1.0) |
| st.session_state.pdf_texts = pdf_texts |
| st.session_state.keywords_set = keywords_set |
|
|
| |
| selected_keywords = st.multiselect( |
| "Select at least two keywords/topics for grouping:", |
| list(st.session_state.keywords_set), |
| default=list(st.session_state.keywords_set)[:2] |
| ) |
|
|
| if st.button("Confirm Keyword Selection"): |
| if len(selected_keywords) < 2: |
| st.error("Please select at least two keywords to continue.") |
| else: |
| st.session_state.selected_keywords = selected_keywords |
| st.session_state.keywords_confirmed = True |
| else: |
| st.session_state.keywords_confirmed = False |
|
|
| if not st.session_state.get("keywords_confirmed", False): |
| st.stop() |
|
|
| st.success("Keyword selection confirmed. Processing PDFs...") |
|
|
| |
| st.info("Precomputing embeddings for PDFs...") |
| progress2 = st.progress(0) |
| pdf_embeddings = {} |
| pdf_texts = st.session_state.pdf_texts |
| total_pdfs = len(pdf_texts) |
|
|
| for i, (pdf_name, text) in enumerate(pdf_texts.items()): |
| try: |
| pdf_embeddings[pdf_name] = semantic_model.encode(text, convert_to_tensor=True) |
| except Exception as e: |
| st.error(f"Failed to compute embedding for {pdf_name}: {e}") |
| finally: |
| progress2.progress((i + 1) / total_pdfs) |
|
|
| progress2.progress(1.0) |
|
|
| |
| st.info("Precomputing embeddings for selected keywords...") |
| progress3 = st.progress(0) |
| selected_keywords = st.session_state.selected_keywords |
| keyword_embeddings = {} |
| total_keywords = len(selected_keywords) |
|
|
| for i, keyword in enumerate(selected_keywords): |
| try: |
| keyword_embeddings[keyword] = semantic_model.encode(keyword, convert_to_tensor=True) |
| except Exception as e: |
| st.error(f"Failed to compute embedding for keyword '{keyword}': {e}") |
| finally: |
| progress3.progress((i + 1) / total_keywords) |
|
|
| progress3.progress(1.0) |
|
|
| |
| st.info("Assigning PDFs to the most relevant topics...") |
| pdf_groups = {keyword: [] for keyword in selected_keywords} |
|
|
| for pdf_name, text_embedding in pdf_embeddings.items(): |
| best_keyword = None |
| max_similarity = -1 |
|
|
| for keyword, keyword_embedding in keyword_embeddings.items(): |
| similarity = util.pytorch_cos_sim(text_embedding, keyword_embedding).item() |
| if similarity > max_similarity: |
| max_similarity = similarity |
| best_keyword = keyword |
|
|
| if best_keyword: |
| pdf_groups[best_keyword].append(pdf_name) |
|
|
| |
| output_folder = "grouped_pdfs" |
| os.makedirs(output_folder, exist_ok=True) |
|
|
| for keyword, pdf_names in pdf_groups.items(): |
| keyword_folder = os.path.join(output_folder, keyword) |
| os.makedirs(keyword_folder, exist_ok=True) |
| for pdf_name in pdf_names: |
| matched_file = next((f for f in uploaded_files if f.name == pdf_name), None) |
| if matched_file: |
| with open(os.path.join(keyword_folder, pdf_name), "wb") as f: |
| f.write(matched_file.getvalue()) |
|
|
| |
| zip_buffer = BytesIO() |
| with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: |
| for root, _, files in os.walk(output_folder): |
| for file in files: |
| file_path = os.path.join(root, file) |
| zip_file.write(file_path, os.path.relpath(file_path, output_folder)) |
| zip_buffer.seek(0) |
|
|
| |
| for root, dirs, files in os.walk(output_folder, topdown=False): |
| for file in files: |
| os.remove(os.path.join(root, file)) |
| for dir in dirs: |
| os.rmdir(os.path.join(root, dir)) |
| os.rmdir(output_folder) |
|
|
| |
| st.success("PDFs processed and grouped successfully!") |
| st.download_button( |
| label="Download Grouped PDFs", |
| data=zip_buffer, |
| file_name="grouped_pdfs.zip", |
| mime="application/zip" |
| ) |
|
|
| if __name__ == "__main__": |
| main() |
|
|