File size: 4,639 Bytes
92f1a38
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import gc
import json
import torch
import pymupdf, pymupdf4llm

from ast import literal_eval
from pathlib import Path
from datasets import Dataset
from collections import defaultdict
from docling.document_converter import DocumentConverter

from src.config import log
from pathlib import Path


def empty_cache():
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    elif torch.mps.is_available():
        torch.mps.empty_cache()


def extract_text(file, markdown=False, backend="pymupdf", **kwargs):
    if backend == "pymupdf":
        if not markdown:
            with pymupdf.open(file, filetype="pdf") as doc:
                return "\n".join(page.get_text(**kwargs) for page in doc)
        else:
            log.debug("\n\n using pymupdf4llm \n\n")
            return pymupdf4llm.to_markdown(file, show_progress=True, **kwargs)

    elif backend == "docling":
        converter = DocumentConverter(allowed_formats=["pdf"])
        doc = converter.convert(file, **kwargs).document
        res = doc.export_to_markdown() if markdown else doc.export_to_text()
        del converter, doc
        empty_cache()
        return res


def _load_pdf_sync(file, markdown=True, fast=False, **kwargs):
    """Synchronous PDF loading function for thread pool execution"""
    text = extract_text(
        file,
        markdown,
        backend="docling"
        if ((not fast) and (torch.cuda.is_available() or torch.mps.is_available()))
        else "pymupdf",
        **kwargs,
    )

    return (Path(file).stem, text)


def load_pdfs(files, markdown=True, fast_extract=False, **kwargs):
    """
    Load multiple PDF files

    Args:
        files: PDF filepaths
        markdown: whether to extract text in markdown
        fast_extract: whether to use pymupdf to extract text in markdown
    Returns:
        list: List of tuples containing (filename, extracted_text)
    """
    # # Use ThreadPoolExecutor to run synchronous operations concurrently
    # loop = asyncio.get_event_loop()

    # # Create executor with limited workers
    # with ThreadPoolExecutor(max_workers=max_concurrence) as executor:
    #     # Submit all PDF processing tasks
    #     futures = [
    #         loop.run_in_executor(executor, _load_pdf_sync, file, markdown, fast_extract, **kwargs) for file in files if file is not None
    #     ]

    #     results = await asyncio.gather(*futures, return_exceptions=True)

    # valid_results = [result for result in results if not isinstance(result, Exception)]

    # log.debug(f"Successfully processed {len(valid_results)} out of {len(files)} PDFs")
    # return valid_results

    results = []
    for file in files:
        results.append(_load_pdf_sync(file, markdown, fast_extract, **kwargs))
    return results


def find_think_tag_in_each_row(tensor):
    # look for `</think>` tag
    res = dict((tensor == 151668).nonzero().tolist())
    if not res:
        return [0] * len(tensor)
    idxs = []
    for idx in range(len(tensor)):
        idxs.append(res.get(idx, -1))
    return [x + 1 for x in idxs]


def build_corpus(pdfs, text_splitter, **load_pdf_kwargs):
    texts = load_pdfs(pdfs, **load_pdf_kwargs)
    corpus_with_meta = []
    _id = 0
    for file_name, raw_text in texts:
        chunks = text_splitter.split_text(raw_text)
        for idx, chunk in enumerate(chunks):
            corpus_with_meta.append(
                {
                    "id": _id,
                    "file": Path(file_name).stem,
                    "chunk_id": idx,
                    "chunk": chunk,
                }
            )
            _id += 1
    return Dataset.from_list(corpus_with_meta)


def reciprocal_rank_fusion(indices, top_k=3, denom=50):
    scores = defaultdict(int)
    for row in indices:
        for rank, idx in enumerate(row):
            scores[idx] += 1 / (rank + denom)
    results = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:top_k]
    return [idx for idx, _ in results]


def clean_rewrite_resp(resp):
    try:
        resp = json.loads(resp)  # Parse JSON
    except json.JSONDecodeError:
        try:
            resp = literal_eval(resp)  # Fallback parse
        except Exception:
            pass  # Keep resp as-is if both fail

    # Ensure resp is a string before strip and slicing
    if isinstance(resp, str):
        resp = resp.strip()
        if resp:
            start = resp.find("{")
            if start != -1:
                end = resp[::-1].find("}")
                if end != -1:
                    resp = resp[start : len(resp) - end]
                    return clean_rewrite_resp(resp)
    return resp