|
|
|
|
|
|
|
|
import os |
|
|
import logging |
|
|
import hashlib |
|
|
import folder_paths |
|
|
|
|
|
|
|
|
logger = logging.getLogger("VibeVoice") |
|
|
|
|
|
class LoadTextFromFileNode: |
|
|
@classmethod |
|
|
def INPUT_TYPES(cls): |
|
|
|
|
|
all_files = [] |
|
|
|
|
|
|
|
|
for dir_name in ["input", "output", "temp"]: |
|
|
files = cls.get_files_for_directory(dir_name) |
|
|
for f in files: |
|
|
if f != "No text files found": |
|
|
all_files.append(f"{dir_name}/{f}") |
|
|
|
|
|
if not all_files: |
|
|
all_files = ["No text files found in any directory"] |
|
|
|
|
|
return { |
|
|
"required": { |
|
|
"file": (sorted(all_files), { |
|
|
"tooltip": "Select a text file to load (format: directory/filename)" |
|
|
}), |
|
|
} |
|
|
} |
|
|
|
|
|
@classmethod |
|
|
def get_files_for_directory(cls, source_dir): |
|
|
"""Get list of text files for the selected directory""" |
|
|
|
|
|
if source_dir == "input": |
|
|
dir_path = folder_paths.get_input_directory() |
|
|
elif source_dir == "output": |
|
|
dir_path = folder_paths.get_output_directory() |
|
|
elif source_dir == "temp": |
|
|
dir_path = folder_paths.get_temp_directory() |
|
|
else: |
|
|
return [] |
|
|
|
|
|
files = [] |
|
|
try: |
|
|
for f in os.listdir(dir_path): |
|
|
if os.path.isfile(os.path.join(dir_path, f)): |
|
|
|
|
|
if f.lower().endswith(('.txt')): |
|
|
files.append(f) |
|
|
except Exception as e: |
|
|
logger.warning(f"Error listing files in {source_dir}: {e}") |
|
|
|
|
|
return files |
|
|
|
|
|
RETURN_TYPES = ("STRING",) |
|
|
RETURN_NAMES = ("text",) |
|
|
FUNCTION = "load_text" |
|
|
CATEGORY = "VibeVoiceWrapper" |
|
|
DESCRIPTION = "Load text content from a .txt file" |
|
|
|
|
|
def load_text(self, file: str): |
|
|
"""Load text content from file""" |
|
|
|
|
|
try: |
|
|
|
|
|
if not file or file == "No text files found in any directory": |
|
|
raise Exception("Please select a valid text file.") |
|
|
|
|
|
|
|
|
if "/" not in file: |
|
|
raise Exception(f"Invalid file format: {file}") |
|
|
|
|
|
source_dir, filename = file.split("/", 1) |
|
|
|
|
|
|
|
|
if source_dir == "input": |
|
|
dir_path = folder_paths.get_input_directory() |
|
|
elif source_dir == "output": |
|
|
dir_path = folder_paths.get_output_directory() |
|
|
elif source_dir == "temp": |
|
|
dir_path = folder_paths.get_temp_directory() |
|
|
else: |
|
|
raise Exception(f"Invalid source directory: {source_dir}") |
|
|
|
|
|
|
|
|
file_path = os.path.join(dir_path, filename) |
|
|
|
|
|
if not os.path.exists(file_path): |
|
|
raise Exception(f"File not found: {file_path}") |
|
|
|
|
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
text_content = f.read() |
|
|
|
|
|
if not text_content.strip(): |
|
|
raise Exception("File is empty or contains only whitespace") |
|
|
|
|
|
return (text_content,) |
|
|
|
|
|
except UnicodeDecodeError as e: |
|
|
raise Exception(f"Encoding error reading file: {str(e)}. File may not be UTF-8 encoded.") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load text file: {str(e)}") |
|
|
raise Exception(f"Error loading text file: {str(e)}") |
|
|
|
|
|
@classmethod |
|
|
def IS_CHANGED(cls, file): |
|
|
"""Cache key for ComfyUI""" |
|
|
if not file or file == "No text files found in any directory": |
|
|
return "no_file" |
|
|
|
|
|
|
|
|
if "/" not in file: |
|
|
return f"{file}_invalid" |
|
|
|
|
|
source_dir, filename = file.split("/", 1) |
|
|
|
|
|
|
|
|
if source_dir == "input": |
|
|
dir_path = folder_paths.get_input_directory() |
|
|
elif source_dir == "output": |
|
|
dir_path = folder_paths.get_output_directory() |
|
|
elif source_dir == "temp": |
|
|
dir_path = folder_paths.get_temp_directory() |
|
|
else: |
|
|
return f"{file}_invalid_dir" |
|
|
|
|
|
file_path = os.path.join(dir_path, filename) |
|
|
|
|
|
if not os.path.exists(file_path): |
|
|
return f"{file}_not_found" |
|
|
|
|
|
|
|
|
try: |
|
|
m = hashlib.sha256() |
|
|
with open(file_path, 'rb') as f: |
|
|
m.update(f.read()) |
|
|
return m.digest().hex() |
|
|
except: |
|
|
return f"{file}_error" |
|
|
|
|
|
@classmethod |
|
|
def VALIDATE_INPUTS(cls, file, **kwargs): |
|
|
"""Validate that the file exists""" |
|
|
if not file or file == "No text files found in any directory": |
|
|
return "No valid text file selected" |
|
|
|
|
|
|
|
|
if "/" not in file: |
|
|
return f"Invalid file format: {file}" |
|
|
|
|
|
source_dir, filename = file.split("/", 1) |
|
|
|
|
|
|
|
|
if source_dir == "input": |
|
|
dir_path = folder_paths.get_input_directory() |
|
|
elif source_dir == "output": |
|
|
dir_path = folder_paths.get_output_directory() |
|
|
elif source_dir == "temp": |
|
|
dir_path = folder_paths.get_temp_directory() |
|
|
else: |
|
|
return f"Invalid source directory: {source_dir}" |
|
|
|
|
|
file_path = os.path.join(dir_path, filename) |
|
|
if not os.path.exists(file_path): |
|
|
return f"File not found: {filename} in {source_dir}" |
|
|
|
|
|
return True |