|
|
from langflow.base.data import BaseFileComponent |
|
|
from langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data |
|
|
from langflow.io import BoolInput, IntInput |
|
|
from langflow.schema import Data |
|
|
|
|
|
|
|
|
class FileComponent(BaseFileComponent): |
|
|
"""Handles loading and processing of individual or zipped text files. |
|
|
|
|
|
This component supports processing multiple valid files within a zip archive, |
|
|
resolving paths, validating file types, and optionally using multithreading for processing. |
|
|
""" |
|
|
|
|
|
display_name = "File" |
|
|
description = "Load a file to be used in your project." |
|
|
icon = "file-text" |
|
|
name = "File" |
|
|
|
|
|
VALID_EXTENSIONS = TEXT_FILE_TYPES |
|
|
|
|
|
inputs = [ |
|
|
*BaseFileComponent._base_inputs, |
|
|
BoolInput( |
|
|
name="use_multithreading", |
|
|
display_name="[Deprecated] Use Multithreading", |
|
|
advanced=True, |
|
|
value=True, |
|
|
info="Set 'Processing Concurrency' greater than 1 to enable multithreading.", |
|
|
), |
|
|
IntInput( |
|
|
name="concurrency_multithreading", |
|
|
display_name="Processing Concurrency", |
|
|
advanced=False, |
|
|
info="When multiple files are being processed, the number of files to process concurrently.", |
|
|
value=1, |
|
|
), |
|
|
] |
|
|
|
|
|
outputs = [ |
|
|
*BaseFileComponent._base_outputs, |
|
|
] |
|
|
|
|
|
def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]: |
|
|
"""Processes files either sequentially or in parallel, depending on concurrency settings. |
|
|
|
|
|
Args: |
|
|
file_list (list[BaseFileComponent.BaseFile]): List of files to process. |
|
|
|
|
|
Returns: |
|
|
list[BaseFileComponent.BaseFile]: Updated list of files with merged data. |
|
|
""" |
|
|
|
|
|
def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None: |
|
|
"""Processes a single file and returns its Data object.""" |
|
|
try: |
|
|
return parse_text_file_to_data(file_path, silent_errors=silent_errors) |
|
|
except FileNotFoundError as e: |
|
|
msg = f"File not found: {file_path}. Error: {e}" |
|
|
self.log(msg) |
|
|
if not silent_errors: |
|
|
raise |
|
|
return None |
|
|
except Exception as e: |
|
|
msg = f"Unexpected error processing {file_path}: {e}" |
|
|
self.log(msg) |
|
|
if not silent_errors: |
|
|
raise |
|
|
return None |
|
|
|
|
|
if not file_list: |
|
|
self.log("No files to process.") |
|
|
return file_list |
|
|
|
|
|
concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading) |
|
|
file_count = len(file_list) |
|
|
|
|
|
parallel_processing_threshold = 2 |
|
|
if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold: |
|
|
if file_count > 1: |
|
|
self.log(f"Processing {file_count} files sequentially.") |
|
|
processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list] |
|
|
else: |
|
|
self.log(f"Starting parallel processing of {file_count} files with concurrency: {concurrency}.") |
|
|
file_paths = [str(file.path) for file in file_list] |
|
|
processed_data = parallel_load_data( |
|
|
file_paths, |
|
|
silent_errors=self.silent_errors, |
|
|
load_function=process_file, |
|
|
max_concurrency=concurrency, |
|
|
) |
|
|
|
|
|
|
|
|
return self.rollup_data(file_list, processed_data) |
|
|
|