File size: 7,410 Bytes
5374a2d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
import asyncio
from pathlib import Path
from typing import Union, List, Tuple, Optional, Callable, Dict
from llama_index.core import SimpleDirectoryReader
from evoagentx.rag.schema import Document
from evoagentx.core.logging import logger
# You Could fllow the llama_index tutorial to develop a valid Reader for new file format:
# https://docs.llamaindex.ai/en/stable/module_guides/loading/simpledirectoryreader/
class LLamaIndexReader:
"""A universal file reader based on LlamaIndex's SimpleDirectoryReader.
This class provides a flexible interface for loading documents from files or directories,
supporting various formats (e.g., PDF, Word, Markdown) with customizable filtering and metadata.
Attributes:
recursive (bool): Whether to recursively load files from directories.
exclude_hidden (bool): Whether to exclude hidden files (starting with '.').
num_workers (Optional[int]): Number of worker threads for parallel loading.
num_files_limits (Optional[int]): Maximum number of files to load.
custom_metadata_function (Optional[Callable]): Custom function to extract metadata.
extern_file_extractor (Optional[Dict]): Custom file extractors for specific file types.
errors (str): Error handling strategy for file reading (e.g., 'ignore', 'strict').
encoding (str): File encoding (default: 'utf-8').
"""
def __init__(
self,
recursive: bool = False,
exclude_hidden: bool = True,
num_workers: Optional[int] = None,
num_files_limits: Optional[int] = None,
custom_metadata_function: Optional[Callable] = None,
extern_file_extractor: Optional[Dict] = None,
errors: str = "ignore",
encoding: str = "utf-8",
):
self.recursive = recursive
self.exclude_hidden = exclude_hidden
self.num_workers = num_workers
self.num_files_limits = num_files_limits
self.custom_metadata_function = custom_metadata_function
self.extern_file_extractor = extern_file_extractor
self.errors = errors
self.encoding = encoding
def _validate_path(self, path: Union[str, Path]) -> Path:
"""Validate and convert a path to a Path object.
Args:
path: A string or Path object representing a file or directory.
Returns:
Path: A validated Path object.
Raises:
FileNotFoundError: If the path does not exist.
ValueError: If the path is invalid.
"""
path = Path(path)
if not path.exists():
logger.error(f"Path does not exist: {path}")
raise FileNotFoundError(f"Path does not exist: {path}")
return path
def _check_input(
self, input_data: Union[str, List, Tuple], is_file: bool = True
) -> Union[List[Path], Path]:
"""Check input to a list of Path objects or a single Path for directories.
Args:
input_data: A string, list, or tuple of file/directory paths.
is_file: Whether to treat input as file paths (True) or directory (False).
Returns:
Union[List[Path], Path]: Valied file paths or directory path.
Raises:
ValueError: If input type is invalid.
"""
if isinstance(input_data, str):
return self._validate_path(input_data)
elif isinstance(input_data, (list, tuple)):
if is_file:
return [self._validate_path(p) for p in input_data]
else:
return self._validate_path(input_data[0])
else:
logger.error(f"Invalid input type: {type(input_data)}")
raise ValueError(f"Invalid input type: {type(input_data)}")
def load(
self,
file_paths: Union[str, List, Tuple],
exclude_files: Optional[Union[str, List, Tuple]] = None,
filter_file_by_suffix: Optional[Union[str, List, Tuple]] = None,
merge_by_file: bool = False,
show_progress: bool = False,
use_async: bool = False,
) -> List[Document]:
"""Load documents from files or directories.
Args:
file_paths: A string, list, or tuple of file paths or a directory path.
exclude_files: Files to exclude from loading.
filter_file_by_suffix: File extensions to include (e.g., ['.pdf', '.docx']).
Returns:
List[Document]: List of loaded documents.
Raises:
FileNotFoundError: If input paths are invalid.
RuntimeError: If document loading fails.
"""
try:
input_files = None
input_dir = None
if isinstance(file_paths, (list, tuple)):
input_files = self._check_input(file_paths, is_file=True)
else:
path = self._check_input(file_paths, is_file=False)
if path.is_dir():
input_dir = path
else:
input_files = [path]
exclude_files = (
self._check_input(exclude_files, is_file=True)
if exclude_files
else None
)
filter_file_by_suffix = (
list(filter_file_by_suffix)
if isinstance(filter_file_by_suffix, (list, tuple))
else [filter_file_by_suffix]
if isinstance(filter_file_by_suffix, str)
else None
)
reader = SimpleDirectoryReader(
input_dir=input_dir,
input_files=input_files,
exclude=exclude_files,
exclude_hidden=self.exclude_hidden,
recursive=self.recursive,
required_exts=filter_file_by_suffix,
num_files_limit=self.num_files_limits,
file_metadata=self.custom_metadata_function,
file_extractor=self.extern_file_extractor,
encoding=self.encoding,
errors=self.errors,
)
llama_docs = asyncio.run(reader.aload_data(show_progress=show_progress, num_workers=self.num_workers)) if use_async \
else reader.load_data(show_progress=show_progress)
if merge_by_file:
file_to_docs = {}
for doc in llama_docs:
file_path = doc.metadata.get("file_path", "")
if file_path not in file_to_docs:
file_to_docs[file_path] = []
file_to_docs[file_path].append(doc)
documents = []
for file_path, docs in file_to_docs.items():
combined_text = "\n".join(doc.text for doc in docs)
combined = docs[0].copy()
combined.text_resource.text = combined_text
combined.metadata["page_count"] = len(docs)
documents.append(Document.from_llama_document(combined))
else:
documents = [Document.from_llama_document(doc) for doc in llama_docs]
logger.info(f"Loaded {len(documents)} documents")
return documents
except Exception as e:
logger.error(f"Failed to load documents: {str(e)}")
raise RuntimeError(f"Failed to load documents: {str(e)}") |