Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import fitz # PyMuPDF | |
| import torch | |
| from transformers import pipeline | |
| import time, logging, re, difflib | |
| import matplotlib | |
| matplotlib.use('Agg') | |
| import matplotlib.pyplot as plt | |
| import io | |
| from PIL import Image | |
| logging.basicConfig(level=logging.ERROR) | |
| device = -1 # CPU-only | |
| print("β οΈ CPU-only. Using faster models!") | |
| # Load faster summarizer | |
| try: | |
| summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6", device=device, torch_dtype=torch.float32) | |
| except Exception as e: | |
| print(f"β Summarizer model loading failed: {str(e)}") | |
| exit(1) | |
| # Load QA model | |
| try: | |
| qa_pipeline = pipeline("question-answering", model="deepset/roberta-base-squad2", device=device) | |
| except Exception as e: | |
| print(f"β QA model loading failed: {str(e)}") | |
| exit(1) | |
| def visualize_chunk_status(chunk_data): | |
| status_colors = {'summarized': 'green', 'skipped': 'orange', 'error': 'red'} | |
| labels = [f"C{i['chunk']}" for i in chunk_data] | |
| colors = [status_colors.get(i['status'], 'gray') for i in chunk_data] | |
| times = [i.get('time', 0.1) for i in chunk_data] | |
| fig, ax = plt.subplots(figsize=(10, 2.5)) | |
| ax.barh(labels, times, color=colors) | |
| ax.set_xlabel("Time (s)") | |
| ax.set_title("π Chunk Processing Status") | |
| plt.tight_layout() | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format='png') | |
| buf.seek(0) | |
| plt.close(fig) | |
| return Image.open(buf) | |
| def summarize_file(file_bytes): | |
| start = time.time() | |
| chunk_info = [] | |
| try: | |
| doc = fitz.open(stream=file_bytes, filetype="pdf") | |
| text = "".join(page.get_text("text") for page in doc) | |
| text = re.sub(r"\$\s*([^$]+)\s*\$", r"\1", text) | |
| text = re.sub(r"\\cap", "intersection", text) | |
| text = re.sub(r"\s+", " ", text).strip() | |
| text = "".join(c for c in text if ord(c) < 128) | |
| except Exception as e: | |
| return f"β Text extraction failed: {str(e)}", None | |
| if not text.strip(): | |
| return "β No text found", None | |
| text = text[:300000] # allow full but reasonable size | |
| chunks = [text[i:i+1000] for i in range(0, len(text), 1000)] | |
| summaries = [] | |
| for i, chunk in enumerate(chunks): | |
| chunk_start = time.time() | |
| chunk_result = {'chunk': i + 1, 'status': '', 'time': 0} | |
| if sum(1 for c in chunk if not c.isalnum()) / len(chunk) > 0.5: | |
| summaries.append(f"### Chunk {i+1}: Skipped (equation-heavy)") | |
| chunk_result['status'] = 'skipped' | |
| else: | |
| try: | |
| summary = summarizer(chunk, max_length=150, min_length=50, do_sample=False)[0]['summary_text'] | |
| summaries.append(f"### Chunk {i+1}\n{summary}") | |
| chunk_result['status'] = 'summarized' | |
| except Exception as e: | |
| summaries.append(f"### Chunk {i+1}: β Error: {str(e)}") | |
| chunk_result['status'] = 'error' | |
| chunk_result['time'] = time.time() - chunk_start | |
| chunk_info.append(chunk_result) | |
| formatted_chunks = "\n\n---\n\n".join(summaries) | |
| final_summary = f"""**Characters Processed**: {len(text)} | |
| **Total Time**: {time.time()-start:.2f} seconds | |
| ## πΉ Summary by Chunks | |
| {formatted_chunks} | |
| """ | |
| image = visualize_chunk_status(chunk_info) | |
| return final_summary, image | |
| def find_relevant_passages(text, question, num_passages=5): | |
| passages = re.split(r'(?<=[.?!])\s+', text) | |
| scored = [] | |
| question_tokens = set(question.lower().split()) | |
| for passage in passages: | |
| passage_tokens = set(passage.lower().split()) | |
| match_score = len(question_tokens.intersection(passage_tokens)) | |
| if match_score == 0: | |
| match_score = difflib.SequenceMatcher(None, question, passage).ratio() | |
| scored.append((match_score, passage)) | |
| scored.sort(reverse=True) | |
| best_passages = " ".join([p for _, p in scored[:num_passages]]) | |
| return best_passages | |
| def answer_question(file_bytes, question): | |
| try: | |
| doc = fitz.open(stream=file_bytes, filetype="pdf") | |
| text = "".join(page.get_text("text") for page in doc) | |
| text = re.sub(r"\s+", " ", text).strip() | |
| text = "".join(c for c in text if ord(c) < 128) | |
| context = text[:300000] | |
| except Exception as e: | |
| return f"β Text extraction failed: {str(e)}" | |
| if not question.strip(): | |
| return "β οΈ Please enter a valid question." | |
| try: | |
| relevant_context = find_relevant_passages(context, question) | |
| result = qa_pipeline(question=question, context=relevant_context) | |
| return f"**Answer**: {result['answer']}\n\n**Score**: {result['score']:.2f}" | |
| except Exception as e: | |
| return f"β QA failed: {str(e)}" | |
| # Summarizer UI | |
| summarizer_ui = gr.Interface( | |
| fn=summarize_file, | |
| inputs=gr.File(label="π Upload PDF", type="binary"), | |
| outputs=[ | |
| gr.Textbox(label="π Summarized Output", lines=30, show_copy_button=True), | |
| gr.Image(label="π Visual Process Flow", type="pil") | |
| ], | |
| title="π AI-Powered PDF Summarizer", | |
| description="Summarizes long PDFs and visualizes chunk-level processing." | |
| ) | |
| # Q&A UI | |
| qa_ui = gr.Interface( | |
| fn=answer_question, | |
| inputs=[ | |
| gr.File(label="π Upload PDF", type="binary"), | |
| gr.Textbox(label="β Ask a Question") | |
| ], | |
| outputs=gr.Textbox(label="π Answer"), | |
| title="π PDF Q&A Assistant", | |
| description="Ask natural language questions from the uploaded PDF." | |
| ) | |
| # Tabs | |
| if __name__ == "__main__": | |
| try: | |
| gr.TabbedInterface( | |
| [summarizer_ui, qa_ui], | |
| ["π Summarizer", "β Q&A Assistant"] | |
| ).launch(server_port=7860) | |
| except Exception as e: | |
| print(f"β Gradio launch failed: {str(e)}") | |