File size: 5,089 Bytes
c07baa6
b6e1b94
 
c07baa6
 
299a880
 
b6e1b94
299a880
c07baa6
 
 
 
 
 
b6e1b94
c07baa6
 
 
 
 
 
 
 
 
b6e1b94
 
 
c07baa6
b6e1b94
 
c07baa6
 
 
 
 
 
 
 
 
 
 
 
b6e1b94
 
c07baa6
b6e1b94
 
 
 
 
 
 
 
c07baa6
b6e1b94
c07baa6
 
 
 
 
 
 
 
b6e1b94
 
 
 
 
 
 
 
 
 
c07baa6
 
 
 
b6e1b94
 
c07baa6
 
 
 
 
 
 
 
b6e1b94
 
c07baa6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b6e1b94
c07baa6
 
 
 
 
 
 
 
 
 
 
 
 
 
b6e1b94
 
 
c07baa6
 
 
 
 
 
 
b6e1b94
 
 
 
c07baa6
b6e1b94
c07baa6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b6e1b94
c07baa6
 
 
b6e1b94
c07baa6
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162

import os
import tempfile
import logging
import requests
import nltk
nltk.download('punkt_tab')
from nltk.tokenize import sent_tokenize
from bs4 import BeautifulSoup, SoupStrainer
from typing import List, Tuple, Dict, Optional
from docx import Document
from pptx import Presentation


# Faster PDF Extraction
try:
    import fitz  # PyMuPDF
    _MU_PDF_AVAILABLE = True
except ImportError:
    from pypdf import PdfReader
    _MU_PDF_AVAILABLE = False

# Persistent session for network requests
session = requests.Session()
session.headers.update({"User-Agent": "vantage-rag-reader/2.0"})

def chunk_text_semantic(
    text: str,
    max_tokens: int = 400,
    overlap_sentences: int = 2,
    tokenizer=None
) -> List[str]:
    """
    Strictly chunks text based on sentence boundaries and token limits.
    """
    # FIX: Ensure 'text' is a single string even if a list/dict was passed
    if isinstance(text, list):
        # Join content if it's a list of page dicts or strings
        text = " ".join([str(i.get("content", i)) if isinstance(i, dict) else str(i) for i in text])
    elif not isinstance(text, str):
        text = str(text) if text else ""

    if not text.strip():
        return []

    # Now nltk.sent_tokenize is guaranteed to receive a string
    sentences = sent_tokenize(text)
    chunks = []
    current_chunk = []
    current_tokens = 0

    for sent in sentences:
        token_count = len(tokenizer(sent)) if tokenizer else len(sent.split())

        if current_tokens + token_count > max_tokens and current_chunk:
            chunks.append(" ".join(current_chunk))
            
            # Sliding window overlap
            if overlap_sentences > 0:
                current_chunk = current_chunk[-overlap_sentences:]
                current_tokens = sum(len(s.split()) for s in current_chunk)
            else:
                current_chunk = []
                current_tokens = 0

        current_chunk.append(sent)
        current_tokens += token_count

    if current_chunk:
        chunks.append(" ".join(current_chunk))

    return chunks


def extract_pages_from_pdf(path: str) -> List[Tuple[int, str]]:
    """
    Extracts text using PyMuPDF (fitz) if available, falling back to pypdf.
    PyMuPDF is ~15x faster than pypdf.
    """
    pages = []
    if _MU_PDF_AVAILABLE:
        with fitz.open(path) as doc:
            for i, page in enumerate(doc, start=1):
                pages.append((i, page.get_text().strip()))
    else:
        reader = PdfReader(path)
        for i, page in enumerate(reader.pages, start=1):
            pages.append((i, page.extract_text() or ""))
    return pages

# 1. Word Extraction (.docx)
def extract_text_from_docx(file_path: str) -> List[Dict]:
    doc = Document(file_path)
    pages = []
    # Note: docx doesn't have native "pages", so we treat 
    # every ~2000 characters as a virtual page for citation.
    full_text = "\n".join([para.text for para in doc.paragraphs])
    
    # Virtual pagination
    page_size = 2000
    for i in range(0, len(full_text), page_size):
        pages.append({
            "page_num": (i // page_size) + 1,
            "content": full_text[i:i + page_size]
        })
    return pages

# 2. PowerPoint Extraction (.pptx)
def extract_text_from_pptx(file_path: str) -> List[Dict]:
    prs = Presentation(file_path)
    pages = []
    for i, slide in enumerate(prs.slides):
        slide_text = []
        for shape in slide.shapes:
            if hasattr(shape, "text"):
                slide_text.append(shape.text)
        pages.append({
            "page_num": i + 1,
            "content": "\n".join(slide_text)
        })
    return pages

def fetch_and_extract(url: str) -> str:
    """
    Optimized URL fetching with partial HTML parsing.
    """
    try:
        r = session.get(url, timeout=15, allow_redirects=True)
        r.raise_for_status()
    except Exception as e:
        logging.error(f"Failed to fetch {url}: {e}")
        return ""

    content_type = r.headers.get("content-type", "").lower()

    # If it's a PDF, extract immediately
    if "application/pdf" in content_type or url.lower().endswith(".pdf"):
        return _extract_from_bytes(r.content, ".pdf")

    # If it's HTML, use SoupStrainer to only parse the body (saves RAM/CPU)
    only_body = SoupStrainer("body")
    soup = BeautifulSoup(r.text, "lxml", parse_only=only_body)
    
    # Remove junk before extracting text
    for script in soup(["script", "style", "nav", "footer", "header"]):
        script.decompose()

    return soup.get_text(separator="\n\n", strip=True)

def _extract_from_bytes(content: bytes, suffix: str) -> str:
    """Helper to handle temporary files for bytes-based extraction."""
    with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tf:
        tf.write(content)
        tmp_path = tf.name
    try:
        if suffix == ".pdf":
            pages = extract_pages_from_pdf(tmp_path)
            return "\n\n".join(t for _, t in pages if t)
        return ""
    finally:
        if os.path.exists(tmp_path):
            os.remove(tmp_path)