Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import tempfile | |
| import gradio as gr | |
| from transformers import pipeline | |
| import pdfplumber | |
| from gtts import gTTS | |
| import nltk | |
| import numpy as np | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from pydub import AudioSegment | |
| import faiss | |
| from sentence_transformers import SentenceTransformer | |
| from groq import Groq | |
| from diffusers import StableDiffusionPipeline | |
| import torch | |
| from PIL import Image | |
| # ========================================================== | |
| # π§ NLTK Setup | |
| # ========================================================== | |
| for pkg in ["punkt", "punkt_tab"]: | |
| try: | |
| nltk.data.find(f"tokenizers/{pkg}") | |
| except LookupError: | |
| nltk.download(pkg) | |
| # ========================================================== | |
| # π Environment Setup | |
| # ========================================================== | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY", "") | |
| # ========================================================== | |
| # βοΈ Model Setup | |
| # ========================================================== | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"Using device: {DEVICE}") | |
| # Initialize models | |
| print("Loading models... please wait β³") | |
| # Summarization model | |
| SUMMARIZER_MODEL = "facebook/bart-large-cnn" | |
| try: | |
| summarizer = pipeline("summarization", model=SUMMARIZER_MODEL) | |
| print("β Summarizer loaded successfully.") | |
| except Exception as e: | |
| print("β Summarizer load error:", e) | |
| summarizer = None | |
| # Embedding model for RAG | |
| try: | |
| embedder = SentenceTransformer('all-MiniLM-L6-v2') | |
| print("β Embedding model loaded successfully.") | |
| except Exception as e: | |
| print("β Embedding model load error:", e) | |
| embedder = None | |
| # Stable Diffusion for diagram generation | |
| try: | |
| if torch.cuda.is_available(): | |
| sd_pipe = StableDiffusionPipeline.from_pretrained( | |
| "runwayml/stable-diffusion-v1-5", | |
| torch_dtype=torch.float16, | |
| safety_checker=None, | |
| requires_safety_checker=False | |
| ) | |
| sd_pipe = sd_pipe.to("cuda") | |
| else: | |
| sd_pipe = StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5") | |
| sd_pipe = sd_pipe.to("cpu") | |
| print("β Stable Diffusion loaded successfully.") | |
| except Exception as e: | |
| print("β Stable Diffusion load error:", e) | |
| sd_pipe = None | |
| # Groq client | |
| try: | |
| groq_client = Groq(api_key=GROQ_API_KEY) if GROQ_API_KEY else None | |
| if groq_client: | |
| print("β Groq client initialized successfully.") | |
| else: | |
| print("β οΈ Groq API key not found. Chat functionality will be limited.") | |
| except Exception as e: | |
| print("β Groq client initialization error:", e) | |
| groq_client = None | |
| # ========================================================== | |
| # π§© Utility Functions | |
| # ========================================================== | |
| def clean_text(text: str) -> str: | |
| """Clean extracted PDF text.""" | |
| text = re.sub(r'\r\n?', '\n', text) | |
| text = re.sub(r'\n{2,}', '\n\n', text) | |
| text = re.sub(r'References[\s\S]*', '', text, flags=re.IGNORECASE) | |
| text = re.sub(r'[^\x00-\x7F]+', ' ', text) | |
| text = re.sub(r'\s+', ' ', text) | |
| return text.strip() | |
| def extract_text_from_pdf(path: str) -> str: | |
| """Extract text from all pages of a PDF.""" | |
| try: | |
| text = "" | |
| with pdfplumber.open(path) as pdf: | |
| for page in pdf.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n\n" | |
| return text.strip() if text.strip() else "No text extracted from PDF." | |
| except Exception as e: | |
| return f"Error extracting text: {e}" | |
| def sentence_tokenize(text: str): | |
| """Split text into sentences.""" | |
| return [s.strip() for s in nltk.tokenize.sent_tokenize(text) if len(s.strip()) > 10] | |
| def chunk_text(text: str, max_chars=1500): | |
| """Split text into chunks for summarization.""" | |
| sents = sentence_tokenize(text) | |
| chunks, cur = [], "" | |
| for s in sents: | |
| if len(cur) + len(s) < max_chars: | |
| cur += (" " if cur else "") + s | |
| else: | |
| chunks.append(cur) | |
| cur = s | |
| if cur: | |
| chunks.append(cur) | |
| return chunks | |
| def extract_keywords_tfidf(text: str, top_k=8): | |
| """Extract keywords using TF-IDF.""" | |
| try: | |
| paras = [p.strip() for p in re.split(r'\n{2,}', text) if len(p.strip()) > 0] | |
| vectorizer = TfidfVectorizer(stop_words='english', ngram_range=(1, 2)) | |
| X = vectorizer.fit_transform(paras) | |
| features = vectorizer.get_feature_names_out() | |
| scores = np.asarray(X.mean(axis=0)).ravel() | |
| idx = np.argsort(scores)[::-1][:top_k] | |
| return [features[i] for i in idx] | |
| except Exception: | |
| return [] | |
| # ========================================================== | |
| # βοΈ Adaptive Summarization | |
| # ========================================================== | |
| def summarize_long_text(text: str) -> str: | |
| """Adaptive summarization based on PDF length.""" | |
| if summarizer is None: | |
| return "Summarization model unavailable." | |
| text = clean_text(text) | |
| L = len(text) | |
| # Dynamic summarization scaling | |
| if L < 1500: | |
| max_len, min_len, chunk_size = 180, 60, 1400 | |
| elif L < 5000: | |
| max_len, min_len, chunk_size = 250, 100, 1600 | |
| elif L < 15000: | |
| max_len, min_len, chunk_size = 350, 150, 1800 | |
| else: | |
| max_len, min_len, chunk_size = 500, 200, 2000 | |
| if L <= chunk_size: | |
| return summarizer(text, max_length=max_len, min_length=min_len, do_sample=False)[0]["summary_text"] | |
| parts = chunk_text(text, max_chars=chunk_size)[:6] | |
| summaries = [] | |
| for p in parts: | |
| try: | |
| summaries.append(summarizer(p, max_length=200, min_length=80, do_sample=False)[0]["summary_text"]) | |
| except Exception: | |
| continue | |
| combined = " ".join(summaries) | |
| final = summarizer(combined, max_length=max_len, min_length=min_len, do_sample=False)[0]["summary_text"] | |
| return final | |
| # ========================================================== | |
| # πΌοΈ Diagram Generation with Stable Diffusion | |
| # ========================================================== | |
| def generate_diagram(summary: str, keywords: str) -> Image.Image: | |
| """Generate a diagram based on summary and keywords.""" | |
| if sd_pipe is None: | |
| return None | |
| try: | |
| # Create a prompt for diagram generation | |
| prompt = f"educational diagram, infographic style, clean and professional, illustrating: {summary[:500]}. Keywords: {keywords}" | |
| # Generate image | |
| with torch.no_grad(): | |
| if torch.cuda.is_available(): | |
| image = sd_pipe( | |
| prompt, | |
| num_inference_steps=25, | |
| guidance_scale=7.5, | |
| width=512, | |
| height=512 | |
| ).images[0] | |
| else: | |
| image = sd_pipe( | |
| prompt, | |
| num_inference_steps=15, | |
| guidance_scale=7.5, | |
| width=512, | |
| height=512 | |
| ).images[0] | |
| return image | |
| except Exception as e: | |
| print(f"Diagram generation error: {e}") | |
| return None | |
| # ========================================================== | |
| # π¬ RAG Chatbot Functions | |
| # ========================================================== | |
| class PDFChatBot: | |
| def __init__(self): | |
| self.vector_store = None | |
| self.chunks = [] | |
| self.current_pdf_text = "" | |
| self.is_processed = False | |
| def process_pdf_for_chat(self, pdf_text: str): | |
| """Process PDF text for RAG system.""" | |
| if not pdf_text or pdf_text.startswith("Error") or pdf_text.startswith("No text"): | |
| return False | |
| self.current_pdf_text = clean_text(pdf_text) | |
| # Chunk the text | |
| self.chunks = self._create_chunks(self.current_pdf_text, chunk_size=500, overlap=50) | |
| # Create embeddings | |
| if embedder is not None and self.chunks: | |
| embeddings = embedder.encode(self.chunks) | |
| # Create FAISS index | |
| dimension = embeddings.shape[1] | |
| self.vector_store = faiss.IndexFlatIP(dimension) | |
| # Normalize embeddings for cosine similarity | |
| faiss.normalize_L2(embeddings) | |
| self.vector_store.add(embeddings) | |
| self.is_processed = True | |
| return True | |
| return False | |
| def _create_chunks(self, text: str, chunk_size: int = 500, overlap: int = 50): | |
| """Create overlapping chunks of text.""" | |
| sentences = sentence_tokenize(text) | |
| chunks = [] | |
| current_chunk = "" | |
| for sentence in sentences: | |
| if len(current_chunk) + len(sentence) <= chunk_size: | |
| current_chunk += " " + sentence | |
| else: | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| current_chunk = sentence | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| return chunks | |
| def get_relevant_chunks(self, query: str, top_k: int = 3): | |
| """Retrieve relevant chunks for a query.""" | |
| if self.vector_store is None or not self.chunks: | |
| return [] | |
| try: | |
| # Encode query | |
| query_embedding = embedder.encode([query]) | |
| faiss.normalize_L2(query_embedding) | |
| # Search | |
| scores, indices = self.vector_store.search(query_embedding, top_k) | |
| # Return relevant chunks | |
| relevant_chunks = [] | |
| for i, score in zip(indices[0], scores[0]): | |
| if i < len(self.chunks) and score > 0.3: # similarity threshold | |
| relevant_chunks.append(self.chunks[i]) | |
| return relevant_chunks | |
| except Exception as e: | |
| print(f"Error in retrieval: {e}") | |
| return [] | |
| def generate_answer(self, query: str, chat_history): | |
| """Generate answer using RAG with Groq.""" | |
| if groq_client is None: | |
| return "Groq API not available. Please set your GROQ_API_KEY in the Hugging Face Spaces secrets." | |
| if not self.is_processed: | |
| return "Please upload and process a PDF first. Go to the 'PDF Summarizer' tab to upload your PDF." | |
| # Get relevant context | |
| relevant_chunks = self.get_relevant_chunks(query) | |
| if not relevant_chunks: | |
| return "No relevant information found in the PDF for your question." | |
| context = "\n\n".join(relevant_chunks[:3]) # Use top 3 chunks | |
| # Create prompt | |
| prompt = f"""Based on the following context from a PDF document, please answer the user's question. | |
| Context: | |
| {context} | |
| Question: {query} | |
| Please provide a helpful and accurate answer based only on the given context. If the context doesn't contain enough information to fully answer the question, please say so.""" | |
| try: | |
| # Try different available Groq models | |
| available_models = [ | |
| "llama-3.3-70b-versatile", | |
| "llama-3.1-8b-instant", | |
| "llama-3.2-3b-preview", | |
| "llama-3.2-1b-preview", | |
| "mixtral-8x7b-32768" | |
| ] | |
| for model in available_models: | |
| try: | |
| completion = groq_client.chat.completions.create( | |
| model=model, | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.7, | |
| max_tokens=1024, | |
| top_p=1, | |
| stream=False | |
| ) | |
| answer = completion.choices[0].message.content | |
| return answer | |
| except Exception as model_error: | |
| print(f"Model {model} failed: {model_error}") | |
| continue | |
| return "All available models failed. Please check your Groq API access." | |
| except Exception as e: | |
| return f"Error generating answer: {str(e)}" | |
| # Initialize chatbot | |
| chatbot = PDFChatBot() | |
| # ========================================================== | |
| # π Text-to-Speech | |
| # ========================================================== | |
| def text_to_speech(text): | |
| """Convert text to speech and ensure WAV output.""" | |
| if not text: | |
| return None | |
| try: | |
| # Temporary paths | |
| mp3_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3").name | |
| wav_path = tempfile.NamedTemporaryFile(delete=False, suffix=".wav").name | |
| # Generate TTS (MP3) | |
| gTTS(text=text[:900], lang="en").save(mp3_path) | |
| # Convert to WAV for browser playback | |
| AudioSegment.from_mp3(mp3_path).export(wav_path, format="wav") | |
| # Clean up MP3 file | |
| os.unlink(mp3_path) | |
| return wav_path | |
| except Exception as e: | |
| print("TTS error:", e) | |
| return None | |
| # ========================================================== | |
| # π PDF Processing - Main Function | |
| # ========================================================== | |
| def process_pdf(pdf_file): | |
| """Main handler to process PDF - this will be shared across all tabs.""" | |
| if not pdf_file: | |
| return "Please upload a PDF.", "", None, "", None, "No PDF uploaded" | |
| text = extract_text_from_pdf(pdf_file) | |
| if text.startswith("Error") or text.startswith("No text"): | |
| return text, "", None, "", None, "Failed to extract text" | |
| text = clean_text(text) | |
| summary = summarize_long_text(text) | |
| keywords = ", ".join(extract_keywords_tfidf(text)) | |
| audio = text_to_speech(summary) | |
| # Generate diagram | |
| diagram = generate_diagram(summary, keywords) | |
| # Also process for chatbot | |
| chatbot.process_pdf_for_chat(text) | |
| # Return status message for chat tab | |
| status_message = "β PDF processed successfully! You can now chat with this PDF in the 'Chat with PDF' tab." | |
| return text, summary, audio, keywords, diagram, status_message | |
| # ========================================================== | |
| # π Gradio Interface with Shared PDF State | |
| # ========================================================== | |
| def create_interface(): | |
| with gr.Blocks( | |
| title="AI PDF Summarizer Pro", | |
| theme=gr.themes.Soft() | |
| ) as demo: | |
| gr.Markdown(""" | |
| # AI PDF Summarizer Pro | |
| *Upload once, use everywhere across all tabs* | |
| """) | |
| # --- Main Tab: PDF Summarizer --- | |
| with gr.Tab("π PDF Summarizer"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Upload Your PDF") | |
| gr.Markdown("Upload a PDF here and it will be automatically available in all other tabs.") | |
| pdf_input = gr.File( | |
| label="Upload PDF Document", | |
| file_types=[".pdf"], | |
| type="filepath" | |
| ) | |
| process_btn = gr.Button( | |
| "Process PDF", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| with gr.Column(scale=2): | |
| with gr.Accordion("Extracted Text", open=False): | |
| extracted_text = gr.Textbox( | |
| label="", | |
| lines=8, | |
| interactive=False, | |
| show_copy_button=True | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| summary_box = gr.Textbox( | |
| label="AI Summary", | |
| lines=4, | |
| interactive=False, | |
| show_copy_button=True | |
| ) | |
| with gr.Column(): | |
| keywords_box = gr.Textbox( | |
| label="Top Keywords", | |
| lines=2, | |
| interactive=False | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| audio_box = gr.Audio( | |
| label="Summary Audio", | |
| type="filepath", | |
| interactive=False | |
| ) | |
| with gr.Column(): | |
| diagram_box = gr.Image( | |
| label="AI Generated Diagram", | |
| interactive=False, | |
| height=200 | |
| ) | |
| # Status message | |
| status_display = gr.HTML( | |
| value="<div>No PDF processed yet. Upload a PDF and click 'Process PDF'.</div>" | |
| ) | |
| # --- Tab: AI Diagram Generator --- | |
| with gr.Tab("πΌοΈ AI Diagram"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Create Diagram") | |
| gr.Markdown("Create diagrams using the summary from your uploaded PDF or enter custom text.") | |
| diagram_summary_input = gr.Textbox( | |
| label="Summary Text", | |
| lines=3, | |
| placeholder="Text from your PDF summary will appear here after processing..." | |
| ) | |
| diagram_keywords_input = gr.Textbox( | |
| label="Keywords (optional)", | |
| placeholder="Keywords from your PDF will appear here..." | |
| ) | |
| generate_diagram_btn = gr.Button( | |
| "Generate Diagram", | |
| variant="primary" | |
| ) | |
| with gr.Column(): | |
| gr.Markdown("### Generated Diagram") | |
| diagram_output = gr.Image( | |
| label="", | |
| interactive=False, | |
| height=400, | |
| show_download_button=True | |
| ) | |
| # --- Tab: Chat with PDF --- | |
| with gr.Tab("π¬ Chat with PDF"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Chat with Your PDF") | |
| gr.Markdown(""" | |
| **Ask questions about your uploaded PDF** | |
| Simply go to the **PDF Summarizer** tab, upload and process your PDF, then come back here to start chatting! | |
| """) | |
| # Display current PDF status | |
| chat_status_display = gr.HTML( | |
| value="<div>Please upload and process a PDF in the 'PDF Summarizer' tab first.</div>" | |
| ) | |
| with gr.Column(scale=2): | |
| chatbot_interface = gr.ChatInterface( | |
| fn=chatbot.generate_answer, | |
| title="Chat with Your PDF", | |
| description="Ask questions about the content of your uploaded PDF document", | |
| examples=[ | |
| "What is the main topic of this document?", | |
| "Can you summarize the key points?", | |
| "What are the most important findings?", | |
| "Explain the methodology used", | |
| "What conclusions does the author reach?" | |
| ] | |
| ) | |
| # --- Tab: About --- | |
| with gr.Tab("βΉοΈ About"): | |
| gr.Markdown(""" | |
| ## About AI PDF Summarizer Pro | |
| **One PDF Upload, Multiple AI Features** | |
| Upload your PDF once in the **PDF Summarizer** tab and use it across all features: | |
| - **π PDF Summarizer**: Extract text, generate summaries, get keywords | |
| - **πΌοΈ AI Diagram**: Create visual diagrams from your content | |
| - **π¬ Chat with PDF**: Ask questions and get instant answers | |
| ### How it works: | |
| 1. Upload your PDF in the **PDF Summarizer** tab | |
| 2. Click **Process PDF** | |
| 3. The same PDF is automatically available in all other tabs | |
| 4. No need to re-upload - seamless experience! | |
| ### Powered by: | |
| - Hugging Face Transformers | |
| - Stable Diffusion | |
| - Groq API | |
| - FAISS Vector Search | |
| ### Setup Instructions: | |
| For full functionality, add your Groq API key in Hugging Face Spaces secrets: | |
| - Go to your Space settings | |
| - Add a secret named `GROQ_API_KEY` with your Groq API key | |
| """) | |
| # --- Event Handlers --- | |
| # Main PDF processing - updates all tabs | |
| process_btn.click( | |
| process_pdf, | |
| inputs=[pdf_input], | |
| outputs=[extracted_text, summary_box, audio_box, keywords_box, diagram_box, status_display] | |
| ).then( | |
| # Update the diagram tab inputs with the generated summary and keywords | |
| lambda summary, keywords: (summary, keywords), | |
| inputs=[summary_box, keywords_box], | |
| outputs=[diagram_summary_input, diagram_keywords_input] | |
| ).then( | |
| # Update chat status | |
| lambda: "<div>β PDF processed successfully! You can now chat with your document.</div>", | |
| outputs=[chat_status_display] | |
| ) | |
| # Standalone diagram generation | |
| generate_diagram_btn.click( | |
| generate_diagram, | |
| inputs=[diagram_summary_input, diagram_keywords_input], | |
| outputs=[diagram_output] | |
| ) | |
| return demo | |
| # ========================================================== | |
| # π Launch Application | |
| # ========================================================== | |
| if __name__ == "__main__": | |
| print("Starting AI PDF Summarizer Pro Version") | |
| print("Key Feature: Upload PDF once, use across all tabs!") | |
| print("Loading AI models...") | |
| print("β Summarization Model") | |
| print("β Embedding Model") | |
| print("β Diagram Generation") | |
| print("β Chat Model") | |
| demo = create_interface() | |
| demo.launch(share=False) |