Spaces:
Sleeping
Sleeping
| # -------------------------------------------------------- | |
| # π Multi-Document Summarizer (using BART + Clustering) | |
| # -------------------------------------------------------- | |
| import numpy as np | |
| import streamlit as st | |
| from transformers import BartTokenizer, BartForConditionalGeneration | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.cluster import KMeans | |
| from nltk.tokenize import sent_tokenize | |
| import nltk | |
| import PyPDF2 | |
| from io import BytesIO | |
| # β Streamlit Page Configuration | |
| st.set_page_config(page_title="Multi-Document Summarizer", layout="centered") | |
| # -------------------------------------------------------- | |
| # π¦ Download NLTK data | |
| # -------------------------------------------------------- | |
| def download_nltk_data(): | |
| try: | |
| nltk.data.find('tokenizers/punkt') | |
| except LookupError: | |
| nltk.download('punkt', quiet=True) | |
| nltk.download('punkt_tab', quiet=True) | |
| download_nltk_data() | |
| # -------------------------------------------------------- | |
| # π€ Load BART Model | |
| # -------------------------------------------------------- | |
| def load_model(): | |
| tokenizer = BartTokenizer.from_pretrained('facebook/bart-large-cnn') | |
| model = BartForConditionalGeneration.from_pretrained('facebook/bart-large-cnn') | |
| return tokenizer, model | |
| tokenizer, model = load_model() | |
| # -------------------------------------------------------- | |
| # π§° Helper Functions | |
| # -------------------------------------------------------- | |
| def extract_text_from_pdf(file) -> str: | |
| """Extract text from uploaded PDF file.""" | |
| pdf_reader = PyPDF2.PdfReader(BytesIO(file.read())) | |
| text = "" | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() or "" | |
| return text.strip() | |
| def chunk_text(text, max_words=800): | |
| """Split long text into smaller chunks for summarization.""" | |
| if len(text.split()) <= max_words: | |
| return [text] | |
| sentences = sent_tokenize(text) | |
| chunks, current_chunk, current_word_count = [], [], 0 | |
| for sentence in sentences: | |
| sentence_words = len(sentence.split()) | |
| if current_word_count + sentence_words <= max_words: | |
| current_chunk.append(sentence) | |
| current_word_count += sentence_words | |
| else: | |
| chunks.append(" ".join(current_chunk)) | |
| current_chunk = [sentence] | |
| current_word_count = sentence_words | |
| if current_chunk: | |
| chunks.append(" ".join(current_chunk)) | |
| return chunks | |
| def summarize_large_text(text, max_length=150, min_length=50): | |
| """Summarize long text by splitting into chunks and combining summaries.""" | |
| chunks = chunk_text(text, max_words=800) | |
| summaries = [] | |
| for chunk in chunks: | |
| inputs = tokenizer([chunk], max_length=1024, truncation=True, return_tensors='pt') | |
| summary_ids = model.generate( | |
| inputs['input_ids'], | |
| num_beams=4, | |
| max_length=max_length, | |
| min_length=min_length, | |
| early_stopping=True | |
| ) | |
| chunk_summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True) | |
| summaries.append(chunk_summary) | |
| # Combine all chunk summaries and re-summarize them | |
| combined_summary_text = " ".join(summaries) | |
| inputs = tokenizer([combined_summary_text], max_length=1024, truncation=True, return_tensors='pt') | |
| final_ids = model.generate( | |
| inputs['input_ids'], | |
| num_beams=4, | |
| max_length=200, | |
| min_length=80, | |
| early_stopping=True | |
| ) | |
| final_summary = tokenizer.decode(final_ids[0], skip_special_tokens=True) | |
| return final_summary | |
| def cluster_documents(documents, n_clusters=3): | |
| """Cluster similar documents using TF-IDF + KMeans.""" | |
| vectorizer = TfidfVectorizer(stop_words='english') | |
| X = vectorizer.fit_transform(documents) | |
| kmeans = KMeans(n_clusters=n_clusters, random_state=42).fit(X) | |
| return kmeans.labels_ | |
| def multi_document_summarize(documents): | |
| """Summarize multiple related documents using clustering + BART.""" | |
| results = { | |
| 'individual_summaries': [], | |
| 'cluster_summaries': [], | |
| 'final_summary': None | |
| } | |
| # 1οΈβ£ Individual summaries | |
| for doc in documents: | |
| doc_summary = summarize_large_text(doc) | |
| results['individual_summaries'].append(doc_summary) | |
| # 2οΈβ£ Clustering (if >1 doc) | |
| if len(documents) > 1: | |
| n_clusters = min(3, len(documents)) | |
| clusters = cluster_documents(documents, n_clusters=n_clusters) | |
| for cluster_id in np.unique(clusters): | |
| cluster_docs = [doc for doc, c in zip(documents, clusters) if c == cluster_id] | |
| combined_text = " ".join(cluster_docs) | |
| cluster_summary = summarize_large_text(combined_text) | |
| results['cluster_summaries'].append({ | |
| 'doc_indices': [i for i, c in enumerate(clusters) if c == cluster_id], | |
| 'summary': cluster_summary | |
| }) | |
| # 3οΈβ£ Final overall summary | |
| all_summaries = results['individual_summaries'] + [cs['summary'] for cs in results['cluster_summaries']] | |
| results['final_summary'] = summarize_large_text(" ".join(all_summaries)) | |
| else: | |
| results['final_summary'] = results['individual_summaries'][0] | |
| return results | |
| # -------------------------------------------------------- | |
| # --------------------------- STREAMLIT UI --------------------------- | |
| st.set_page_config(page_title="Multi-Document Summarizer", layout="centered") | |
| st.title("π Multi-Document + PDF Summarization App") | |
| st.write("Upload multiple text or PDF files to get summaries, clusters, and a final combined summary.") | |
| uploaded_files = st.file_uploader( | |
| "π€ Upload text or PDF files", | |
| type=['txt', 'pdf'], | |
| accept_multiple_files=True | |
| ) | |
| if uploaded_files: | |
| documents = [] | |
| for file in uploaded_files: | |
| file_name = file.name | |
| file_type = file.type | |
| st.markdown(f"**π File:** `{file_name}`") | |
| if file_name.lower().endswith('.pdf'): | |
| text = extract_text_from_pdf(file) | |
| else: | |
| text = file.read().decode("utf-8") | |
| if not text.strip(): | |
| st.warning(f"β οΈ No text found in `{file_name}` β skipping.") | |
| continue | |
| st.text_area(f"π Preview of {file_name}", text[:700] + "..." if len(text) > 700 else text, height=150) | |
| documents.append(text) | |
| if len(documents) == 0: | |
| st.error("No readable text found in uploaded files.") | |
| elif st.button("π Generate Summary"): | |
| with st.spinner("β³ Summarizing documents... please wait (this may take 1β2 minutes)..."): | |
| try: | |
| results = multi_document_summarize(documents) | |
| # Individual summaries | |
| st.subheader("π Individual Document Summaries") | |
| for i, summary in enumerate(results['individual_summaries']): | |
| with st.expander(f"Document {i+1} Summary"): | |
| st.write(summary) | |
| # Cluster summaries | |
| if results['cluster_summaries']: | |
| st.subheader("π§© Cluster Summaries") | |
| for i, cluster in enumerate(results['cluster_summaries']): | |
| with st.expander(f"Cluster {i+1} (Documents: {', '.join(str(x+1) for x in cluster['doc_indices'])})"): | |
| st.write(cluster['summary']) | |
| # Final combined summary | |
| st.subheader("π Final Comprehensive Summary") | |
| st.success(results['final_summary']) | |
| except Exception as e: | |
| st.error(f"β An error occurred: {str(e)}") | |
| else: | |
| st.info("π Please upload one or more `.txt` or `.pdf` files to start summarizing.") | |