File size: 7,963 Bytes
5374a2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
from pathlib import Path
from typing import Union, List, Tuple, Optional
from PIL import Image
from llama_index.core.schema import ImageDocument
from evoagentx.core.logging import logger

class MultimodalReader:
    """An efficient image file reader for multimodal RAG.

    This class provides interface for loading images from files or directories,
    supporting various image formats with path-based lazy loading.

    Attributes:
        recursive (bool): Whether to recursively read directories.
        exclude_hidden (bool): Whether to exclude hidden files (starting with '.').
        num_files_limits (Optional[int]): Maximum number of files to read.
        errors (str): Error handling strategy for file reading (e.g., 'ignore', 'strict').
    """

    def __init__(
        self,
        recursive: bool = False,
        exclude_hidden: bool = True,
        num_files_limits: Optional[int] = None,
        errors: str = "ignore",
    ):
        self.recursive = recursive
        self.exclude_hidden = exclude_hidden
        self.num_files_limits = num_files_limits
        self.errors = errors

    def _validate_path(self, path: Union[str, Path]) -> Path:
        """Validate and convert a path to a Path object.

        Args:
            path: A string or Path object representing a file or directory.

        Returns:
            Path: A validated Path object.

        Raises:
            FileNotFoundError: If the path does not exist.
            ValueError: If the path is invalid.
        """
        path = Path(path)
        if not path.exists():
            logger.error(f"Path does not exist: {path}")
            raise FileNotFoundError(f"Path does not exist: {path}")
        return path

    def _check_input(
        self, input_data: Union[str, List, Tuple], is_file: bool = True
    ) -> Union[List[Path], Path]:
        """Check input to a list of Path objects or a single Path for directories.

        Args:
            input_data: A string, list, or tuple of file/directory paths.
            is_file: Whether to treat input as file paths (True) or directory (False).

        Returns:
            Union[List[Path], Path]: Valid file paths or directory path.

        Raises:
            ValueError: If input type is invalid.
        """
        if isinstance(input_data, str):
            return self._validate_path(input_data)
        elif isinstance(input_data, (list, tuple)):
            if is_file:
                return [self._validate_path(p) for p in input_data]
            else:
                return self._validate_path(input_data[0])
        else:
            logger.error(f"Invalid input type: {type(input_data)}")
            raise ValueError(f"Invalid input type: {type(input_data)}")

    def load(
        self,
        file_paths: Union[str, List, Tuple],
        exclude_files: Optional[Union[str, List, Tuple]] = None,
        filter_file_by_suffix: Optional[Union[str, List, Tuple]] = None,
        merge_by_file: bool = False,
        show_progress: bool = False,
    ) -> List[ImageDocument]:
        """Load images from files or directories.

        Args:
            file_paths: A string, list, or tuple of file paths or a directory path.
            exclude_files: Files to exclude from loading.
            filter_file_by_suffix: File extensions to include (e.g., ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff', '.webp']).
            merge_by_file: Whether to merge documents by file (unused for images, kept for compatibility).

        Returns:
            List[ImageDocument]: List of loaded ImageDocuments.

        Raises:
            FileNotFoundError: If input paths are invalid.
            RuntimeError: If image loading fails.
        """
        try:
            input_files = None
            input_dir = None
            if isinstance(file_paths, (list, tuple)):
                input_files = self._check_input(file_paths, is_file=True)
            else:
                path = self._check_input(file_paths, is_file=False)
                if path.is_dir():
                    input_dir = path
                else:
                    input_files = [path]

            exclude_files = (
                self._check_input(exclude_files, is_file=True)
                if exclude_files
                else None
            )
            filter_file_by_suffix = (
                list(filter_file_by_suffix)
                if isinstance(filter_file_by_suffix, (list, tuple))
                else [filter_file_by_suffix]
                if isinstance(filter_file_by_suffix, str)
                else None
            )

            # Get all files to process
            all_files = []
            if input_files:
                all_files = input_files
            elif input_dir:
                pattern = "**/*" if self.recursive else "*"
                all_files = [f for f in input_dir.glob(pattern) if f.is_file()]
                
                if self.exclude_hidden:
                    all_files = [f for f in all_files if not f.name.startswith('.')]

            # Apply exclusions
            if exclude_files:
                exclude_names = {f.name for f in exclude_files}
                all_files = [f for f in all_files if f.name not in exclude_names]

            # Apply suffix filter
            if filter_file_by_suffix:
                all_files = [f for f in all_files if f.suffix.lower() in filter_file_by_suffix]

            # Apply file limit
            if self.num_files_limits:
                all_files = all_files[:self.num_files_limits]

            # Process files
            documents = []
            for file_path in all_files:
                if show_progress:
                    logger.info(f"Processing: {file_path.name}")
                
                try:
                    if file_path.suffix.lower() in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff', '.webp']:
                        # Process image file
                        img_doc = self._process_image(file_path)
                        if img_doc:
                            documents.append(img_doc)
                except Exception as e:
                    logger.error(f"Failed to process {file_path}: {str(e)}")
                    if self.errors == "strict":
                        raise

            logger.info(f"Loaded {len(documents)} image documents")
            return documents

        except Exception as e:
            logger.error(f"Failed to load documents: {str(e)}")
            raise RuntimeError(f"Failed to load documents: {str(e)}")

    def _process_image(self, file_path: Path) -> ImageDocument:
        """Process a single image file."""
        try:
            # Just validate image can be opened and get basic metadata
            with Image.open(file_path) as img:
                width, height = img.size
                format_name = img.format or 'Unknown'
                
            # Create ImageDocument with path reference only - no image bytes
            # This avoids unnecessary memory usage and processing
            document = ImageDocument(
                text="",  # No text content for pure images
                image=None,  # No image bytes - load on demand
                image_path=str(file_path),
                image_mimetype=f"image/{format_name.lower()}",
                metadata={
                    "file_path": str(file_path),
                    "file_name": file_path.name,
                    "file_type": file_path.suffix,
                    "file_size": file_path.stat().st_size,
                    "creation_date": str(file_path.stat().st_ctime),
                    "last_modified_date": str(file_path.stat().st_mtime)
                }
            )
            
            return document
        except Exception as e:
            logger.error(f"Failed to process image {file_path}: {str(e)}")
            if self.errors == "strict":
                raise
            return None