Spaces:
Running
Running
| from langflow.base.data import BaseFileComponent | |
| from langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data | |
| from langflow.io import BoolInput, IntInput | |
| from langflow.schema import Data | |
| class FileComponent(BaseFileComponent): | |
| """Handles loading and processing of individual or zipped text files. | |
| This component supports processing multiple valid files within a zip archive, | |
| resolving paths, validating file types, and optionally using multithreading for processing. | |
| """ | |
| display_name = "File" | |
| description = "Load a file to be used in your project." | |
| icon = "file-text" | |
| name = "File" | |
| VALID_EXTENSIONS = TEXT_FILE_TYPES | |
| inputs = [ | |
| *BaseFileComponent._base_inputs, | |
| BoolInput( | |
| name="use_multithreading", | |
| display_name="[Deprecated] Use Multithreading", | |
| advanced=True, | |
| value=True, | |
| info="Set 'Processing Concurrency' greater than 1 to enable multithreading.", | |
| ), | |
| IntInput( | |
| name="concurrency_multithreading", | |
| display_name="Processing Concurrency", | |
| advanced=False, | |
| info="When multiple files are being processed, the number of files to process concurrently.", | |
| value=1, | |
| ), | |
| ] | |
| outputs = [ | |
| *BaseFileComponent._base_outputs, | |
| ] | |
| def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]: | |
| """Processes files either sequentially or in parallel, depending on concurrency settings. | |
| Args: | |
| file_list (list[BaseFileComponent.BaseFile]): List of files to process. | |
| Returns: | |
| list[BaseFileComponent.BaseFile]: Updated list of files with merged data. | |
| """ | |
| def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None: | |
| """Processes a single file and returns its Data object.""" | |
| try: | |
| return parse_text_file_to_data(file_path, silent_errors=silent_errors) | |
| except FileNotFoundError as e: | |
| msg = f"File not found: {file_path}. Error: {e}" | |
| self.log(msg) | |
| if not silent_errors: | |
| raise | |
| return None | |
| except Exception as e: | |
| msg = f"Unexpected error processing {file_path}: {e}" | |
| self.log(msg) | |
| if not silent_errors: | |
| raise | |
| return None | |
| if not file_list: | |
| self.log("No files to process.") | |
| return file_list | |
| concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading) | |
| file_count = len(file_list) | |
| parallel_processing_threshold = 2 | |
| if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold: | |
| if file_count > 1: | |
| self.log(f"Processing {file_count} files sequentially.") | |
| processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list] | |
| else: | |
| self.log(f"Starting parallel processing of {file_count} files with concurrency: {concurrency}.") | |
| file_paths = [str(file.path) for file in file_list] | |
| processed_data = parallel_load_data( | |
| file_paths, | |
| silent_errors=self.silent_errors, | |
| load_function=process_file, | |
| max_concurrency=concurrency, | |
| ) | |
| # Use rollup_basefile_data to merge processed data with BaseFile objects | |
| return self.rollup_data(file_list, processed_data) | |