|
|
from loguru import logger |
|
|
|
|
|
from langflow.custom import Component |
|
|
from langflow.io import DataInput, Output |
|
|
from langflow.schema import Data |
|
|
|
|
|
|
|
|
class MergeDataComponent(Component): |
|
|
"""MergeDataComponent is responsible for combining multiple Data objects into a unified list of Data objects. |
|
|
|
|
|
It ensures that all keys across the input Data objects are present in each merged Data object. |
|
|
Missing keys are filled with empty strings to maintain consistency. |
|
|
""" |
|
|
|
|
|
display_name = "Merge Data" |
|
|
description = ( |
|
|
"Combines multiple Data objects into a unified list, ensuring all keys are present in each Data object." |
|
|
) |
|
|
icon = "merge" |
|
|
|
|
|
inputs = [ |
|
|
DataInput( |
|
|
name="data_inputs", |
|
|
display_name="Data Inputs", |
|
|
is_list=True, |
|
|
info="A list of Data inputs objects to be merged.", |
|
|
), |
|
|
] |
|
|
|
|
|
outputs = [ |
|
|
Output( |
|
|
display_name="Merged Data", |
|
|
name="merged_data", |
|
|
method="merge_data", |
|
|
), |
|
|
] |
|
|
|
|
|
def merge_data(self) -> list[Data]: |
|
|
"""Merges multiple Data objects into a single list of Data objects. |
|
|
|
|
|
Ensures that all keys from the input Data objects are present in each merged Data object. |
|
|
Missing keys are filled with empty strings. |
|
|
|
|
|
Returns: |
|
|
List[Data]: A list of merged Data objects with consistent keys. |
|
|
""" |
|
|
logger.info("Initiating the data merging process.") |
|
|
|
|
|
data_inputs: list[Data] = self.data_inputs |
|
|
logger.debug(f"Received {len(data_inputs)} data input(s) for merging.") |
|
|
|
|
|
if not data_inputs: |
|
|
logger.warning("No data inputs provided. Returning an empty list.") |
|
|
return [] |
|
|
|
|
|
|
|
|
all_keys: set[str] = set() |
|
|
for idx, data_input in enumerate(data_inputs): |
|
|
if not isinstance(data_input, Data): |
|
|
error_message = f"Data input at index {idx} is not of type Data." |
|
|
logger.error(error_message) |
|
|
type_error_message = ( |
|
|
f"All items in data_inputs must be of type Data. Item at index {idx} is {type(data_input)}" |
|
|
) |
|
|
raise TypeError(type_error_message) |
|
|
all_keys.update(data_input.data.keys()) |
|
|
logger.debug(f"Collected {len(all_keys)} unique key(s) from input data.") |
|
|
|
|
|
try: |
|
|
|
|
|
merged_data_list = [] |
|
|
for idx, data_input in enumerate(data_inputs): |
|
|
merged_data_dict = {} |
|
|
|
|
|
for key in all_keys: |
|
|
|
|
|
value = data_input.data.get(key, "") |
|
|
if key not in data_input.data: |
|
|
log_message = f"Key '{key}' missing in data input at index {idx}. " "Assigning empty string." |
|
|
logger.debug(log_message) |
|
|
merged_data_dict[key] = value |
|
|
|
|
|
merged_data = Data( |
|
|
text_key=data_input.text_key, data=merged_data_dict, default_value=data_input.default_value |
|
|
) |
|
|
merged_data_list.append(merged_data) |
|
|
logger.debug("Merged Data object created for input at index: " + str(idx)) |
|
|
|
|
|
except Exception: |
|
|
logger.exception("An error occurred during the data merging process.") |
|
|
raise |
|
|
|
|
|
logger.info("Data merging process completed successfully.") |
|
|
return merged_data_list |
|
|
|