advance-multidoc-rag / src /file_handler.py
Fnu Mahnoor
Fix file handler
299a880
import os
import tempfile
import logging
import requests
import nltk
nltk.download('punkt_tab')
from nltk.tokenize import sent_tokenize
from bs4 import BeautifulSoup, SoupStrainer
from typing import List, Tuple, Dict, Optional
from docx import Document
from pptx import Presentation
# Faster PDF Extraction
try:
import fitz # PyMuPDF
_MU_PDF_AVAILABLE = True
except ImportError:
from pypdf import PdfReader
_MU_PDF_AVAILABLE = False
# Persistent session for network requests
session = requests.Session()
session.headers.update({"User-Agent": "vantage-rag-reader/2.0"})
def chunk_text_semantic(
text: str,
max_tokens: int = 400,
overlap_sentences: int = 2,
tokenizer=None
) -> List[str]:
"""
Strictly chunks text based on sentence boundaries and token limits.
"""
# FIX: Ensure 'text' is a single string even if a list/dict was passed
if isinstance(text, list):
# Join content if it's a list of page dicts or strings
text = " ".join([str(i.get("content", i)) if isinstance(i, dict) else str(i) for i in text])
elif not isinstance(text, str):
text = str(text) if text else ""
if not text.strip():
return []
# Now nltk.sent_tokenize is guaranteed to receive a string
sentences = sent_tokenize(text)
chunks = []
current_chunk = []
current_tokens = 0
for sent in sentences:
token_count = len(tokenizer(sent)) if tokenizer else len(sent.split())
if current_tokens + token_count > max_tokens and current_chunk:
chunks.append(" ".join(current_chunk))
# Sliding window overlap
if overlap_sentences > 0:
current_chunk = current_chunk[-overlap_sentences:]
current_tokens = sum(len(s.split()) for s in current_chunk)
else:
current_chunk = []
current_tokens = 0
current_chunk.append(sent)
current_tokens += token_count
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def extract_pages_from_pdf(path: str) -> List[Tuple[int, str]]:
"""
Extracts text using PyMuPDF (fitz) if available, falling back to pypdf.
PyMuPDF is ~15x faster than pypdf.
"""
pages = []
if _MU_PDF_AVAILABLE:
with fitz.open(path) as doc:
for i, page in enumerate(doc, start=1):
pages.append((i, page.get_text().strip()))
else:
reader = PdfReader(path)
for i, page in enumerate(reader.pages, start=1):
pages.append((i, page.extract_text() or ""))
return pages
# 1. Word Extraction (.docx)
def extract_text_from_docx(file_path: str) -> List[Dict]:
doc = Document(file_path)
pages = []
# Note: docx doesn't have native "pages", so we treat
# every ~2000 characters as a virtual page for citation.
full_text = "\n".join([para.text for para in doc.paragraphs])
# Virtual pagination
page_size = 2000
for i in range(0, len(full_text), page_size):
pages.append({
"page_num": (i // page_size) + 1,
"content": full_text[i:i + page_size]
})
return pages
# 2. PowerPoint Extraction (.pptx)
def extract_text_from_pptx(file_path: str) -> List[Dict]:
prs = Presentation(file_path)
pages = []
for i, slide in enumerate(prs.slides):
slide_text = []
for shape in slide.shapes:
if hasattr(shape, "text"):
slide_text.append(shape.text)
pages.append({
"page_num": i + 1,
"content": "\n".join(slide_text)
})
return pages
def fetch_and_extract(url: str) -> str:
"""
Optimized URL fetching with partial HTML parsing.
"""
try:
r = session.get(url, timeout=15, allow_redirects=True)
r.raise_for_status()
except Exception as e:
logging.error(f"Failed to fetch {url}: {e}")
return ""
content_type = r.headers.get("content-type", "").lower()
# If it's a PDF, extract immediately
if "application/pdf" in content_type or url.lower().endswith(".pdf"):
return _extract_from_bytes(r.content, ".pdf")
# If it's HTML, use SoupStrainer to only parse the body (saves RAM/CPU)
only_body = SoupStrainer("body")
soup = BeautifulSoup(r.text, "lxml", parse_only=only_body)
# Remove junk before extracting text
for script in soup(["script", "style", "nav", "footer", "header"]):
script.decompose()
return soup.get_text(separator="\n\n", strip=True)
def _extract_from_bytes(content: bytes, suffix: str) -> str:
"""Helper to handle temporary files for bytes-based extraction."""
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tf:
tf.write(content)
tmp_path = tf.name
try:
if suffix == ".pdf":
pages = extract_pages_from_pdf(tmp_path)
return "\n\n".join(t for _, t in pages if t)
return ""
finally:
if os.path.exists(tmp_path):
os.remove(tmp_path)