Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import logging | |
| import requests | |
| import nltk | |
| nltk.download('punkt_tab') | |
| from nltk.tokenize import sent_tokenize | |
| from bs4 import BeautifulSoup, SoupStrainer | |
| from typing import List, Tuple, Dict, Optional | |
| from docx import Document | |
| from pptx import Presentation | |
| # Faster PDF Extraction | |
| try: | |
| import fitz # PyMuPDF | |
| _MU_PDF_AVAILABLE = True | |
| except ImportError: | |
| from pypdf import PdfReader | |
| _MU_PDF_AVAILABLE = False | |
| # Persistent session for network requests | |
| session = requests.Session() | |
| session.headers.update({"User-Agent": "vantage-rag-reader/2.0"}) | |
| def chunk_text_semantic( | |
| text: str, | |
| max_tokens: int = 400, | |
| overlap_sentences: int = 2, | |
| tokenizer=None | |
| ) -> List[str]: | |
| """ | |
| Strictly chunks text based on sentence boundaries and token limits. | |
| """ | |
| # FIX: Ensure 'text' is a single string even if a list/dict was passed | |
| if isinstance(text, list): | |
| # Join content if it's a list of page dicts or strings | |
| text = " ".join([str(i.get("content", i)) if isinstance(i, dict) else str(i) for i in text]) | |
| elif not isinstance(text, str): | |
| text = str(text) if text else "" | |
| if not text.strip(): | |
| return [] | |
| # Now nltk.sent_tokenize is guaranteed to receive a string | |
| sentences = sent_tokenize(text) | |
| chunks = [] | |
| current_chunk = [] | |
| current_tokens = 0 | |
| for sent in sentences: | |
| token_count = len(tokenizer(sent)) if tokenizer else len(sent.split()) | |
| if current_tokens + token_count > max_tokens and current_chunk: | |
| chunks.append(" ".join(current_chunk)) | |
| # Sliding window overlap | |
| if overlap_sentences > 0: | |
| current_chunk = current_chunk[-overlap_sentences:] | |
| current_tokens = sum(len(s.split()) for s in current_chunk) | |
| else: | |
| current_chunk = [] | |
| current_tokens = 0 | |
| current_chunk.append(sent) | |
| current_tokens += token_count | |
| if current_chunk: | |
| chunks.append(" ".join(current_chunk)) | |
| return chunks | |
| def extract_pages_from_pdf(path: str) -> List[Tuple[int, str]]: | |
| """ | |
| Extracts text using PyMuPDF (fitz) if available, falling back to pypdf. | |
| PyMuPDF is ~15x faster than pypdf. | |
| """ | |
| pages = [] | |
| if _MU_PDF_AVAILABLE: | |
| with fitz.open(path) as doc: | |
| for i, page in enumerate(doc, start=1): | |
| pages.append((i, page.get_text().strip())) | |
| else: | |
| reader = PdfReader(path) | |
| for i, page in enumerate(reader.pages, start=1): | |
| pages.append((i, page.extract_text() or "")) | |
| return pages | |
| # 1. Word Extraction (.docx) | |
| def extract_text_from_docx(file_path: str) -> List[Dict]: | |
| doc = Document(file_path) | |
| pages = [] | |
| # Note: docx doesn't have native "pages", so we treat | |
| # every ~2000 characters as a virtual page for citation. | |
| full_text = "\n".join([para.text for para in doc.paragraphs]) | |
| # Virtual pagination | |
| page_size = 2000 | |
| for i in range(0, len(full_text), page_size): | |
| pages.append({ | |
| "page_num": (i // page_size) + 1, | |
| "content": full_text[i:i + page_size] | |
| }) | |
| return pages | |
| # 2. PowerPoint Extraction (.pptx) | |
| def extract_text_from_pptx(file_path: str) -> List[Dict]: | |
| prs = Presentation(file_path) | |
| pages = [] | |
| for i, slide in enumerate(prs.slides): | |
| slide_text = [] | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text"): | |
| slide_text.append(shape.text) | |
| pages.append({ | |
| "page_num": i + 1, | |
| "content": "\n".join(slide_text) | |
| }) | |
| return pages | |
| def fetch_and_extract(url: str) -> str: | |
| """ | |
| Optimized URL fetching with partial HTML parsing. | |
| """ | |
| try: | |
| r = session.get(url, timeout=15, allow_redirects=True) | |
| r.raise_for_status() | |
| except Exception as e: | |
| logging.error(f"Failed to fetch {url}: {e}") | |
| return "" | |
| content_type = r.headers.get("content-type", "").lower() | |
| # If it's a PDF, extract immediately | |
| if "application/pdf" in content_type or url.lower().endswith(".pdf"): | |
| return _extract_from_bytes(r.content, ".pdf") | |
| # If it's HTML, use SoupStrainer to only parse the body (saves RAM/CPU) | |
| only_body = SoupStrainer("body") | |
| soup = BeautifulSoup(r.text, "lxml", parse_only=only_body) | |
| # Remove junk before extracting text | |
| for script in soup(["script", "style", "nav", "footer", "header"]): | |
| script.decompose() | |
| return soup.get_text(separator="\n\n", strip=True) | |
| def _extract_from_bytes(content: bytes, suffix: str) -> str: | |
| """Helper to handle temporary files for bytes-based extraction.""" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tf: | |
| tf.write(content) | |
| tmp_path = tf.name | |
| try: | |
| if suffix == ".pdf": | |
| pages = extract_pages_from_pdf(tmp_path) | |
| return "\n\n".join(t for _, t in pages if t) | |
| return "" | |
| finally: | |
| if os.path.exists(tmp_path): | |
| os.remove(tmp_path) |