|
|
import json |
|
|
from pathlib import Path |
|
|
|
|
|
from json_repair import repair_json |
|
|
|
|
|
from langflow.custom import Component |
|
|
from langflow.io import FileInput, MessageTextInput, MultilineInput, Output |
|
|
from langflow.schema import Data |
|
|
|
|
|
|
|
|
class JSONToDataComponent(Component): |
|
|
display_name = "Load JSON" |
|
|
description = ( |
|
|
"Convert a JSON file, JSON from a file path, or a JSON string to a Data object or a list of Data objects" |
|
|
) |
|
|
icon = "braces" |
|
|
name = "JSONtoData" |
|
|
legacy = True |
|
|
|
|
|
inputs = [ |
|
|
FileInput( |
|
|
name="json_file", |
|
|
display_name="JSON File", |
|
|
file_types=["json"], |
|
|
info="Upload a JSON file to convert to a Data object or list of Data objects", |
|
|
), |
|
|
MessageTextInput( |
|
|
name="json_path", |
|
|
display_name="JSON File Path", |
|
|
info="Provide the path to the JSON file as pure text", |
|
|
), |
|
|
MultilineInput( |
|
|
name="json_string", |
|
|
display_name="JSON String", |
|
|
info="Enter a valid JSON string (object or array) to convert to a Data object or list of Data objects", |
|
|
), |
|
|
] |
|
|
|
|
|
outputs = [ |
|
|
Output(name="data", display_name="Data", method="convert_json_to_data"), |
|
|
] |
|
|
|
|
|
def convert_json_to_data(self) -> Data | list[Data]: |
|
|
if sum(bool(field) for field in [self.json_file, self.json_path, self.json_string]) != 1: |
|
|
msg = "Please provide exactly one of: JSON file, file path, or JSON string." |
|
|
self.status = msg |
|
|
raise ValueError(msg) |
|
|
|
|
|
json_data = None |
|
|
|
|
|
try: |
|
|
if self.json_file: |
|
|
resolved_path = self.resolve_path(self.json_file) |
|
|
file_path = Path(resolved_path) |
|
|
if file_path.suffix.lower() != ".json": |
|
|
self.status = "The provided file must be a JSON file." |
|
|
else: |
|
|
json_data = file_path.read_text(encoding="utf-8") |
|
|
|
|
|
elif self.json_path: |
|
|
file_path = Path(self.json_path) |
|
|
if file_path.suffix.lower() != ".json": |
|
|
self.status = "The provided file must be a JSON file." |
|
|
else: |
|
|
json_data = file_path.read_text(encoding="utf-8") |
|
|
|
|
|
else: |
|
|
json_data = self.json_string |
|
|
|
|
|
if json_data: |
|
|
|
|
|
try: |
|
|
parsed_data = json.loads(json_data) |
|
|
except json.JSONDecodeError: |
|
|
|
|
|
repaired_json_string = repair_json(json_data) |
|
|
parsed_data = json.loads(repaired_json_string) |
|
|
|
|
|
|
|
|
if isinstance(parsed_data, list): |
|
|
result = [Data(data=item) for item in parsed_data] |
|
|
else: |
|
|
result = Data(data=parsed_data) |
|
|
self.status = result |
|
|
return result |
|
|
|
|
|
except (json.JSONDecodeError, SyntaxError, ValueError) as e: |
|
|
error_message = f"Invalid JSON or Python literal: {e}" |
|
|
self.status = error_message |
|
|
raise ValueError(error_message) from e |
|
|
|
|
|
except Exception as e: |
|
|
error_message = f"An error occurred: {e}" |
|
|
self.status = error_message |
|
|
raise ValueError(error_message) from e |
|
|
|
|
|
|
|
|
raise ValueError(self.status) |
|
|
|