import os import tempfile import logging import requests import nltk nltk.download('punkt_tab') from nltk.tokenize import sent_tokenize from bs4 import BeautifulSoup, SoupStrainer from typing import List, Tuple, Dict, Optional from docx import Document from pptx import Presentation # Faster PDF Extraction try: import fitz # PyMuPDF _MU_PDF_AVAILABLE = True except ImportError: from pypdf import PdfReader _MU_PDF_AVAILABLE = False # Persistent session for network requests session = requests.Session() session.headers.update({"User-Agent": "vantage-rag-reader/2.0"}) def chunk_text_semantic( text: str, max_tokens: int = 400, overlap_sentences: int = 2, tokenizer=None ) -> List[str]: """ Strictly chunks text based on sentence boundaries and token limits. """ # FIX: Ensure 'text' is a single string even if a list/dict was passed if isinstance(text, list): # Join content if it's a list of page dicts or strings text = " ".join([str(i.get("content", i)) if isinstance(i, dict) else str(i) for i in text]) elif not isinstance(text, str): text = str(text) if text else "" if not text.strip(): return [] # Now nltk.sent_tokenize is guaranteed to receive a string sentences = sent_tokenize(text) chunks = [] current_chunk = [] current_tokens = 0 for sent in sentences: token_count = len(tokenizer(sent)) if tokenizer else len(sent.split()) if current_tokens + token_count > max_tokens and current_chunk: chunks.append(" ".join(current_chunk)) # Sliding window overlap if overlap_sentences > 0: current_chunk = current_chunk[-overlap_sentences:] current_tokens = sum(len(s.split()) for s in current_chunk) else: current_chunk = [] current_tokens = 0 current_chunk.append(sent) current_tokens += token_count if current_chunk: chunks.append(" ".join(current_chunk)) return chunks def extract_pages_from_pdf(path: str) -> List[Tuple[int, str]]: """ Extracts text using PyMuPDF (fitz) if available, falling back to pypdf. PyMuPDF is ~15x faster than pypdf. """ pages = [] if _MU_PDF_AVAILABLE: with fitz.open(path) as doc: for i, page in enumerate(doc, start=1): pages.append((i, page.get_text().strip())) else: reader = PdfReader(path) for i, page in enumerate(reader.pages, start=1): pages.append((i, page.extract_text() or "")) return pages # 1. Word Extraction (.docx) def extract_text_from_docx(file_path: str) -> List[Dict]: doc = Document(file_path) pages = [] # Note: docx doesn't have native "pages", so we treat # every ~2000 characters as a virtual page for citation. full_text = "\n".join([para.text for para in doc.paragraphs]) # Virtual pagination page_size = 2000 for i in range(0, len(full_text), page_size): pages.append({ "page_num": (i // page_size) + 1, "content": full_text[i:i + page_size] }) return pages # 2. PowerPoint Extraction (.pptx) def extract_text_from_pptx(file_path: str) -> List[Dict]: prs = Presentation(file_path) pages = [] for i, slide in enumerate(prs.slides): slide_text = [] for shape in slide.shapes: if hasattr(shape, "text"): slide_text.append(shape.text) pages.append({ "page_num": i + 1, "content": "\n".join(slide_text) }) return pages def fetch_and_extract(url: str) -> str: """ Optimized URL fetching with partial HTML parsing. """ try: r = session.get(url, timeout=15, allow_redirects=True) r.raise_for_status() except Exception as e: logging.error(f"Failed to fetch {url}: {e}") return "" content_type = r.headers.get("content-type", "").lower() # If it's a PDF, extract immediately if "application/pdf" in content_type or url.lower().endswith(".pdf"): return _extract_from_bytes(r.content, ".pdf") # If it's HTML, use SoupStrainer to only parse the body (saves RAM/CPU) only_body = SoupStrainer("body") soup = BeautifulSoup(r.text, "lxml", parse_only=only_body) # Remove junk before extracting text for script in soup(["script", "style", "nav", "footer", "header"]): script.decompose() return soup.get_text(separator="\n\n", strip=True) def _extract_from_bytes(content: bytes, suffix: str) -> str: """Helper to handle temporary files for bytes-based extraction.""" with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tf: tf.write(content) tmp_path = tf.name try: if suffix == ".pdf": pages = extract_pages_from_pdf(tmp_path) return "\n\n".join(t for _, t in pages if t) return "" finally: if os.path.exists(tmp_path): os.remove(tmp_path)