Spaces:
Sleeping
Sleeping
File size: 4,639 Bytes
92f1a38 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
import gc
import json
import torch
import pymupdf, pymupdf4llm
from ast import literal_eval
from pathlib import Path
from datasets import Dataset
from collections import defaultdict
from docling.document_converter import DocumentConverter
from src.config import log
from pathlib import Path
def empty_cache():
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
elif torch.mps.is_available():
torch.mps.empty_cache()
def extract_text(file, markdown=False, backend="pymupdf", **kwargs):
if backend == "pymupdf":
if not markdown:
with pymupdf.open(file, filetype="pdf") as doc:
return "\n".join(page.get_text(**kwargs) for page in doc)
else:
log.debug("\n\n using pymupdf4llm \n\n")
return pymupdf4llm.to_markdown(file, show_progress=True, **kwargs)
elif backend == "docling":
converter = DocumentConverter(allowed_formats=["pdf"])
doc = converter.convert(file, **kwargs).document
res = doc.export_to_markdown() if markdown else doc.export_to_text()
del converter, doc
empty_cache()
return res
def _load_pdf_sync(file, markdown=True, fast=False, **kwargs):
"""Synchronous PDF loading function for thread pool execution"""
text = extract_text(
file,
markdown,
backend="docling"
if ((not fast) and (torch.cuda.is_available() or torch.mps.is_available()))
else "pymupdf",
**kwargs,
)
return (Path(file).stem, text)
def load_pdfs(files, markdown=True, fast_extract=False, **kwargs):
"""
Load multiple PDF files
Args:
files: PDF filepaths
markdown: whether to extract text in markdown
fast_extract: whether to use pymupdf to extract text in markdown
Returns:
list: List of tuples containing (filename, extracted_text)
"""
# # Use ThreadPoolExecutor to run synchronous operations concurrently
# loop = asyncio.get_event_loop()
# # Create executor with limited workers
# with ThreadPoolExecutor(max_workers=max_concurrence) as executor:
# # Submit all PDF processing tasks
# futures = [
# loop.run_in_executor(executor, _load_pdf_sync, file, markdown, fast_extract, **kwargs) for file in files if file is not None
# ]
# results = await asyncio.gather(*futures, return_exceptions=True)
# valid_results = [result for result in results if not isinstance(result, Exception)]
# log.debug(f"Successfully processed {len(valid_results)} out of {len(files)} PDFs")
# return valid_results
results = []
for file in files:
results.append(_load_pdf_sync(file, markdown, fast_extract, **kwargs))
return results
def find_think_tag_in_each_row(tensor):
# look for `</think>` tag
res = dict((tensor == 151668).nonzero().tolist())
if not res:
return [0] * len(tensor)
idxs = []
for idx in range(len(tensor)):
idxs.append(res.get(idx, -1))
return [x + 1 for x in idxs]
def build_corpus(pdfs, text_splitter, **load_pdf_kwargs):
texts = load_pdfs(pdfs, **load_pdf_kwargs)
corpus_with_meta = []
_id = 0
for file_name, raw_text in texts:
chunks = text_splitter.split_text(raw_text)
for idx, chunk in enumerate(chunks):
corpus_with_meta.append(
{
"id": _id,
"file": Path(file_name).stem,
"chunk_id": idx,
"chunk": chunk,
}
)
_id += 1
return Dataset.from_list(corpus_with_meta)
def reciprocal_rank_fusion(indices, top_k=3, denom=50):
scores = defaultdict(int)
for row in indices:
for rank, idx in enumerate(row):
scores[idx] += 1 / (rank + denom)
results = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:top_k]
return [idx for idx, _ in results]
def clean_rewrite_resp(resp):
try:
resp = json.loads(resp) # Parse JSON
except json.JSONDecodeError:
try:
resp = literal_eval(resp) # Fallback parse
except Exception:
pass # Keep resp as-is if both fail
# Ensure resp is a string before strip and slicing
if isinstance(resp, str):
resp = resp.strip()
if resp:
start = resp.find("{")
if start != -1:
end = resp[::-1].find("}")
if end != -1:
resp = resp[start : len(resp) - end]
return clean_rewrite_resp(resp)
return resp
|