Tes / app.py
ALFHAzero's picture
Upload app.py
56bfde5 verified
import gradio as gr
import json
import os
from huggingface_hub import HfApi, snapshot_download
import threading
import sys
# Add basic logging
def log_message(message):
print(f"[APP_LOG] {message}", file=sys.stderr) # Use stderr so it appears in Colab output
# Function to handle saving dataset
def save_dataset(dataset_entries, filename):
"""Saves the dataset entries to a JSONL file."""
log_message(f"Attempting to save dataset to local file: {filename}")
if not dataset_entries:
log_message("No entries in dataset_entries to save.")
return "No entries to save."
jsonl_data = ""
try:
for entry in dataset_entries:
# Pastikan entri adalah dictionary sebelum di-dumps
if isinstance(entry, dict):
jsonl_data += json.dumps(entry, ensure_ascii=False) + "\n"
else:
log_message(f"Warning: Skipping non-dictionary entry during local save: {entry}") # Log warning
with open(filename, "w", encoding="utf-8") as f:
f.write(jsonl_data)
log_message(f"Dataset successfully saved to local file: {filename}")
return f"Dataset saved successfully to {filename}"
except Exception as e:
log_message(f"Error saving local file {filename}: {e}")
# Include the specific exception 'e' in the error message
return f"Error saving file: {e}"
# Function to handle saving to Hugging Face Hub
def save_to_hf(dataset_entries, hf_token, hf_repo_id, hf_file_path):
"""Saves the dataset entries to Hugging Face Hub."""
log_message(f"Attempting to save dataset to Hugging Face Hub: {hf_repo_id}/{hf_file_path}")
if not dataset_entries:
log_message("No dataset entries to save to Hugging Face Hub.")
return "No dataset entries to save to Hugging Face Hub."
elif not hf_token or not hf_repo_id or not hf_file_path:
log_message("Missing HF token, repo ID, or file path for saving.")
return "Please provide Hugging Face API Token, Repository Name, and file path."
try:
api = HfApi(token=hf_token)
log_message("HfApi initialized.")
jsonl_data = ""
for entry in dataset_entries:
# Pastikan entri adalah dictionary sebelum di-dumps
if isinstance(entry, dict):
jsonl_data += json.dumps(entry, ensure_ascii=False) + "\n"
else:
log_message(f"Warning: Skipping non-dictionary entry during HF save: {entry}") # Log warning
# Save the data to a temporary file to upload
temp_file_path = "temp_dataset.jsonl"
log_message(f"Saving to temporary file for upload: {temp_file_path}")
with open(temp_file_path, "w", encoding="utf-8") as f:
f.write(jsonl_data)
log_message("Temporary file created.")
# Upload the file
log_message(f"Uploading file to HF Hub: repo_id={hf_repo_id}, path_in_repo={hf_file_path}")
upload_info = api.upload_file(
path_or_fileobj=temp_file_path,
path_in_repo=hf_file_path,
repo_id=hf_repo_id,
repo_type="dataset", # Specify repo type as dataset
commit_message="Add or update dataset via Gradio app"
)
log_message(f"Upload successful. Info: {upload_info}")
# Clean up the temporary file
log_message(f"Removing temporary file: {temp_file_path}")
os.remove(temp_file_path)
return f"Dataset saved successfully to Hugging Face Hub: {upload_info.url}"
except Exception as e:
error_message = f"Error saving to Hugging Face Hub: {e}"
log_message(f"HF Save Error: {e}")
# Enhance specific error messages
if "Repository not found" in str(e):
error_message = f"Error: Repository '{hf_repo_id}' not found. Please check the repository ID. Original error: {e}"
elif "Authentication required" in str(e) or "Invalid token" in str(e):
error_message = f"Error: Authentication failed. Please check your Hugging Face API Token or ensure the repository is public. Original error: {e}"
else:
# Include the specific exception 'e' for other errors
error_message = f"Error saving to Hugging Face Hub: {e}"
return error_message
# Function to handle loading dataset from a file
def load_dataset_from_file(file_obj, local_file_path):
"""Loads dataset entries from an uploaded file object or a local file path."""
log_message("Attempting to load dataset from uploaded file or local path.")
log_message(f"Received file_obj type: {type(file_obj)}")
log_message(f"Received local_file_path type: {type(local_file_path)}")
log_message(f"Received local_file_path value: {local_file_path}")
loaded_entries = []
filename = ""
try:
if file_obj is not None and hasattr(file_obj, 'read'): # Handle file object (upload) if provided
log_message(f"Loading from uploaded file object: {file_obj.name}")
jsonl_data = file_obj.read().decode("utf-8")
filename = os.path.basename(file_obj.name)
log_message(f"Read {len(jsonl_data)} characters from uploaded file object: {filename}")
elif local_file_path is not None and isinstance(local_file_path, str) and local_file_path.strip(): # Handle string (local path) if provided and not empty
file_path = local_file_path.strip()
log_message(f"Loading from local file path: {file_path}")
if not os.path.exists(file_path):
log_message(f"Local file not found: {file_path}")
return [], 0, f"Error loading file: Local file not found at {file_path}", ""
with open(file_path, "r", encoding="utf-8") as f:
jsonl_data = f.read()
filename = os.path.basename(file_path)
log_message(f"Read {len(jsonl_data)} characters from local file path: {filename}")
else:
log_message("No file uploaded or local path provided.")
return [], 0, "Please upload a JSONL file or provide a local path.", "" # Return empty data, index, message, and filename
for i, line in enumerate(jsonl_data.strip().split('\n')):
if line.strip(): # Check if line is not empty after stripping whitespace
try:
loaded_entries.append(json.loads(line))
except json.JSONDecodeError as e:
log_message(f"Error decoding JSON on line {i+1}: {line.strip()} - {e}") # Log decoding errors
# Continue processing other lines even if one fails
pass
else:
log_message(f"Skipping empty line {i+1} in uploaded file.")
log_message(f"Successfully loaded {len(loaded_entries)} entries from file: {filename}")
# Return loaded entries, set index to 0, success message, and filename
return loaded_entries, 0, f"Successfully loaded {len(loaded_entries)} entries.", filename
except Exception as e:
log_message(f"Error loading file: {e}")
# Include the specific exception 'e' in the error message
return [], 0, f"Error loading file: {e}", "" # Return empty data, index, and error message
# Function to handle loading from Hugging Face Hub
def load_from_hf(hf_token, hf_repo_id, hf_file_path):
"""Loads dataset entries from Hugging Face Hub."""
log_message(f"Attempting to load dataset from Hugging Face Hub: {hf_repo_id}/{hf_file_path}")
if not hf_repo_id or not hf_file_path:
log_message("Missing HF repo ID or file path for loading.")
return [], 0, "Please provide Hugging Face Repository ID and file path.", ""
loaded_entries = []
filename_for_save = ""
try:
# Download the file from the Hugging Face Hub
log_message(f"Downloading file from HF Hub: repo_id={hf_repo_id}, path_in_repo={hf_file_path}")
# Pass token if provided, otherwise allow anonymous download for public repos
downloaded_folder = snapshot_download(repo_id=hf_repo_id, allow_patterns=hf_file_path, token=hf_token if hf_token else None)
downloaded_file_path = os.path.join(downloaded_folder, hf_file_path)
log_message(f"File downloaded to temporary path: {downloaded_file_path}")
if not os.path.exists(downloaded_file_path):
log_message(f"Downloaded file not found at expected path: {downloaded_file_path}")
# Provide a specific message if the file is not found in the repo
return [], 0, f"Error: File '{hf_file_path}' not found in repository '{hf_repo_id}'. Please check the file path.", ""
with open(downloaded_file_path, "r", encoding="utf-8") as f:
for i, line in enumerate(f):
if line.strip(): # Check if line is not empty after stripping whitespace
try:
loaded_entries.append(json.loads(line))
except json.JSONDecodeError as e:
log_message(f"Error decoding JSON on line {i+1} in HF file: {line.strip()} - {e}") # Log decoding errors
# Continue processing other lines even if one fails
pass
else:
log_message(f"Skipping empty line {i+1} in HF file.")
# Extract filename for saving
filename_for_save = os.path.basename(hf_file_path)
log_message(f"Successfully loaded {len(loaded_entries)} entries from Hugging Face Hub file: {filename_for_save}")
# Return loaded entries, set index to 0, success message, and filename
return loaded_entries, 0, f"Successfully loaded {len(loaded_entries)} entri dari Hugging Face Hub.", filename_for_save
except Exception as e:
error_message = f"Gagal memuat dari Hugging Face Hub: {e}"
log_message(f"HF Load Error: {e}")
# Enhance specific error messages
if "Repository not found" in str(e):
error_message = f"Error: Repository '{hf_repo_id}' not found. Please check the repository ID. Original error: {e}"
elif "Authentication required" in str(e) or "Invalid token" in str(e):
error_message = f"Error: Authentication failed. Please check your Hugging Face API Token or ensure the repository is public. Original error: {e}"
elif "allow_patterns" in str(e): # Handle specific download errors related to patterns
error_message = f"Error: File path '{hf_file_path}' not found in repository '{hf_repo_id}' or pattern matching failed. Original error: {e}"
else:
# Include the specific exception 'e' for other errors
error_message = f"Error loading from Hugging Face Hub: {e}"
return [], 0, error_message, ""
# Function to add a user/assistant turn
def add_turn(messages, user_input, assistant_response):
"""Adds a user and assistant turn to the current messages."""
log_message("Attempting to add user/assistant turn.")
if not user_input.strip() or not assistant_response.strip(): # Added strip() for validation
log_message("User input or assistant response is empty, not adding turn.")
# Return current state and a user-facing message
return messages, user_input, assistant_response, "Please provide both User Input and Assistant Response."
messages.append({"role": "user", "content": user_input.strip()}) # Added strip() for content
messages.append({"role": "assistant", "content": assistant_response.strip()}) # Added strip() for content
log_message("User/assistant turn added.")
# Return updated messages, clear input fields, and return an empty status message
return messages, "", "", "Turn added successfully." # Return updated messages and clear input fields
# Function to clear turns
def clear_turns():
"""Clears the current messages."""
log_message("Clearing current turns.")
return [], "" # Return empty messages and clear status message
# Function to add an entry to the dataset
def add_entry_to_dataset(dataset_entries, system_message, messages):
"""Adds the current system message and turns as a new entry to the dataset."""
log_message("Attempting to add entry to dataset.")
new_entry_messages = []
if system_message.strip(): # Added strip() for validation
new_entry_messages.append({"role": "system", "content": system_message.strip()}) # Added strip() for content
log_message("System message added to new entry.")
new_entry_messages.extend(messages)
log_message(f"New entry messages: {new_entry_messages}")
if new_entry_messages:
dataset_entries.append({"messages": new_entry_messages})
log_message(f"Entry added to dataset. New dataset size: {len(dataset_entries)}")
# After adding, update the dataset size display
return dataset_entries, "", [], "Entry added to dataset!", f"Number of entries: {len(dataset_entries)}"
else:
log_message("No messages to add as an entry.")
# Return current state and a user-facing message
return dataset_entries, system_message, messages, "Cannot add empty entry. Add system message or user/assistant turns.", f"Number of entries: {len(dataset_entries)}"
# Function to display current entry
def display_entry(dataset_entries, current_index):
"""Displays the messages of the current dataset entry and provides editable textboxes."""
log_message(f"Attempting to display entry at index: {current_index}")
log_message(f"Current dataset_entries size in display_entry: {len(dataset_entries) if dataset_entries is not None else 0}")
# Prepare default outputs for empty dataset or invalid index
empty_display_text = "No entries to display yet."
empty_system_message = ""
# Create a list of 10 gr.update objects for textboxes, setting initial values to "" and visible=False
hidden_textboxes = [gr.update(value="", visible=False) for _ in range(10)]
hide_buttons = gr.update(visible=False)
clear_status = ""
if not dataset_entries:
log_message("dataset_entries is empty, cannot display.")
# Return empty state and hide components
# Note the use of *hidden_textboxes to unpack the list into individual arguments
return empty_display_text, empty_system_message, *hidden_textboxes, hide_buttons, hide_buttons, clear_status
total_entries = len(dataset_entries)
# Ensure current_index is within bounds after operations like deletion
if not (0 <= current_index < total_entries):
log_message(f"Current index {current_index} out of bounds for dataset size {total_entries}. Adjusting.")
# Adjust index to the last entry if out of bounds high, or stay at 0 if empty
current_index = max(0, min(current_index, total_entries - 1)) if total_entries > 0 else 0
log_message(f"Adjusted index: {current_index}")
# Re-evaluate based on the adjusted index
if not (0 <= current_index < total_entries): # Check again if dataset became empty
log_message("Dataset is empty after index adjustment.")
# Return empty state and hide components
# Ensure all output components match the function's expected outputs
return empty_display_text, empty_system_message, *hidden_textboxes, hide_buttons, hide_buttons, clear_status
# Proceed with displaying the valid entry
entry = dataset_entries[current_index]
log_message(f"Displaying entry {current_index + 1} of {total_entries}. Entry content sample: {str(entry)[:100]}...") # Log sample of entry
display_text = f"Viewing Entry {current_index + 1} of {total_entries}\n\n"
system_message_content = ""
messages_content = []
# Separate system message from user/assistant messages
if entry and 'messages' in entry and isinstance(entry['messages'], list) and entry['messages']: # Added type check and emptiness check
if entry['messages'][0]['role'] == 'system':
system_message_content = entry['messages'][0]['content']
messages_content = entry['messages'][1:]
log_message("Found system message and user/assistant messages.")
else: # Assume all messages are user/assistant if the first is not system
messages_content = entry['messages']
log_message("No system message found, displaying all as user/assistant.")
elif entry and 'messages' in entry and isinstance(entry['messages'], list) and not entry['messages']:
log_message("Entry has empty messages list.")
# messages_content remains empty
else: # Handle invalid entry format or missing messages key
log_message(f"Warning: Invalid entry format or missing messages key at index {current_index}: {entry}")
# Return error state for this specific entry and hide components
# Ensure all output components match the function's expected outputs
return f"Error displaying entry {current_index + 1}: Invalid format.", "", *hidden_textboxes, hide_buttons, hide_buttons, ""
# Format display text for user/assistant messages
for msg in messages_content:
display_text += f"**{msg['role'].capitalize()}:** {msg['content']}\n\n"
# Prepare values for the editable textboxes
editable_system_message = system_message_content
# Ensure we only populate up to 10 textboxes
# Also ensure message objects have 'content' key
editable_messages = [msg.get('content', '') for msg in messages_content[:10] if isinstance(msg, dict)] + [""] * (10 - len(messages_content[:10])) # Pad with empty strings up to 10, added safety checks
# Update visibility of message textboxes
# Ensure visibility is based on the actual number of messages_content
textbox_updates = [gr.update(value=editable_messages[i], visible=(i < len(messages_content) and i < 10)) for i in range(10)] # Ensure max 10 textboxes
log_message("Successfully prepared display text and textbox updates.")
# Show edit/delete buttons and clear edit status
# Return all output components, including the updated value for edited_system_message_input
return display_text, gr.update(value=editable_system_message, visible=True), *textbox_updates, gr.update(visible=True), gr.update(visible=True), ""
# Function to navigate to the previous entry
def prev_entry(current_index, dataset_entries):
"""Navigates to the previous entry."""
log_message(f"Navigating to previous entry from index {current_index}")
if current_index > 0:
new_index = current_index - 1
log_message(f"New index: {new_index}")
return new_index
log_message("Already at the beginning (index 0). Staying at 0.")
return 0 # Stay at 0 if already at the beginning
# Function to navigate to the next entry
def next_entry(current_index, dataset_entries):
"""Navigates to the next entry."""
log_message(f"Navigating to next entry from index {current_index}")
if len(dataset_entries) > 0 and current_index < len(dataset_entries) - 1:
new_index = current_index + 1
log_message(f"New index: {new_index}")
return new_index
if len(dataset_entries) > 0:
log_message("Already at the end. Staying at last index.")
return len(dataset_entries) - 1 # Stay at the last index if already at the end
log_message("Dataset is empty. Staying at index 0.")
return 0 # If dataset is empty
# Function to go to a specific entry number
def go_to_entry(entry_number, dataset_entries):
"""Navigates to a specific entry number."""
log_message(f"Attempting to go to entry number: {entry_number}")
total_entries = len(dataset_entries)
default_index = 0 if total_entries == 0 else 0 # Default to 0 if empty, or first if not
try:
# Attempt to convert input to integer
index = int(entry_number) - 1
# Validate index range
if 0 <= index < total_entries:
log_message(f"Valid index calculated: {index}")
# Return valid index and empty status message
return index, ""
else:
log_message(f"Calculated index {index} is out of bounds (0 to {total_entries-1 if total_entries > 0 else 0}).")
# Return default index and error message for out of bounds
return default_index, f"Error: Entry number {entry_number} is out of bounds. Please enter a number between 1 and {total_entries if total_entries > 0 else 1}."
except (ValueError, TypeError):
log_message(f"Invalid input for entry number: {entry_number}")
# Return default index and error message for invalid input type
return default_index, f"Error: Invalid input '{entry_number}'. Please enter a valid integer number."
# Function to update messages in the current entry
def update_entry_messages(dataset_entries, current_index, edited_system_message, *edited_contents):
"""Updates the messages of the current entry with edited content."""
log_message(f"Attempting to update entry at index: {current_index}")
if not dataset_entries or not (0 <= current_index < len(dataset_entries)):
log_message("Cannot update entry: dataset_entries empty or index out of bounds.")
# Return current state and a specific error message
return dataset_entries, "Error: Cannot update entry. Dataset is empty or index is out of bounds."
updated_messages = []
# Handle the system message
if edited_system_message.strip(): # Added strip() for validation and content
updated_messages.append({"role": "system", "content": edited_system_message.strip()})
log_message("Updated system message added.")
# Get original user/assistant messages
original_messages_in_entry = dataset_entries[current_index].get('messages', [])
original_user_assistant_messages = [msg for msg in original_messages_in_entry if msg.get('role') in ['user', 'assistant']] # Added role check and get with default
original_user_assistant_count = len(original_user_assistant_messages)
# Iterate through the edited contents provided by the textboxes
# We are assuming a max of 10 editable message textboxes
for i in range(10): # Process up to 10 edited message textboxes
edited_content = edited_contents[i]
# Check if the edited content is not empty
if edited_content.strip():
# If it corresponds to an original message index, use its original role
if i < original_user_assistant_count:
updated_messages.append({"role": original_user_assistant_messages[i].get('role', 'user'), "content": edited_content.strip()}) # Corrected quote
log_message(f"Updated original message {i+1} with role {original_user_assistant_messages[i].get('role', 'user')}.")
# If it's a new message (beyond original count but within the 10 textboxes)
else:
# Determine role based on the last message added in the updated_messages list
if len(updated_messages) > 0:
last_role = updated_messages[-1]['role']
# Alternate roles, assuming the sequence is always user, assistant, user, assistant...
new_role = 'user' if last_role == 'assistant' else 'assistant'
else:
# If no messages exist yet (only system message or initially empty), the first new message is 'user'
new_role = 'user'
updated_messages.append({"role": new_role, "content": edited_content.strip()}) # Corrected quote
log_message(f"Added new message {i+1} with inferred role {new_role}.")
# If edited content is empty and it was an original message, it's effectively deleted (not added to updated_messages)
elif i < original_user_assistant_count:
log_message(f"Original message {i+1} was cleared, effectively deleting it.")
# Check if the updated entry has any messages (system or user/assistant)
if not updated_messages:
# Prevent saving an empty entry if it wasn't originally empty (unless system message was the only thing and is now empty)
# Allow saving an empty messages list if the original entry only had a system message and it was cleared
if not (len(original_messages_in_entry) == 1 and original_messages_in_entry[0]['role'] == 'system' and not edited_system_message.strip()):
log_message("Attempted to save an empty entry. Preventing save.")
# Return current state and a specific error message
return dataset_entries, "Error: Cannot save an empty entry. Add system message or user/assistant turns."
# Update the entry in the dataset_entries list
if 0 <= current_index < len(dataset_entries):
dataset_entries[current_index]['messages'] = updated_messages
log_message(f"Entry {current_index + 1} updated successfully. New message count: {len(updated_messages)}")
return dataset_entries, f"Changes saved for Entry {current_index + 1}."
else:
log_message(f"Error updating entry: index {current_index} out of bounds.")
# Return current state and a specific error message
return dataset_entries, "Error: Cannot update entry. Index out of bounds."
# Function to delete the current entry
def delete_entry(dataset_entries, current_index):
"""Deletes the current entry from the dataset."""
log_message(f"Attempting to delete entry at index: {current_index}")
if not dataset_entries or not (0 <= current_index < len(dataset_entries)):
log_message("Cannot delete entry: dataset_entries empty or index out of bounds.")
# If dataset is already empty or index is invalid, just return current state and an error message
# Return the current index as it hasn't changed due to deletion not happening
return dataset_entries, current_index, "Error: Cannot delete entry. Dataset is empty or index is out of bounds."
deleted_entry_index = current_index # Keep track of the index being deleted
log_message(f"Deleting entry at index {deleted_entry_index}.")
del dataset_entries[current_index]
# Adjust index after deletion
new_index = deleted_entry_index
if new_index >= len(dataset_entries) and len(dataset_entries) > 0:
new_index = len(dataset_entries) - 1
log_message(f"Adjusting index after deletion to last entry: {new_index}")
elif len(dataset_entries) == 0:
new_index = 0 # Reset index if dataset is empty
log_message("Dataset is empty after deletion. Resetting index to 0.")
else:
log_message(f"Index remains {new_index} after deletion.")
# Return updated dataset, new index, and a success message
return dataset_entries, new_index, f"Entry {deleted_entry_index + 1} deleted."
# Define the Gradio Interface
with gr.Blocks() as demo:
dataset_entries = gr.State([]) # Use Gradio State to maintain dataset entries
current_messages = gr.State([]) # Use Gradio State to maintain current messages for creation
current_entry_index = gr.State(0) # Use Gradio State for current viewing index
current_loaded_filename = gr.State("") # State to hold the name of the currently loaded file
gr.Markdown("## LLM Dataset Creator")
with gr.Tabs() as tabs:
with gr.TabItem("Create Entry", id=0):
gr.Markdown("### Create a new entry")
system_message_input = gr.Textbox(label="System Message", lines=5, placeholder="Instruksi peran yang sangat kuat (misalnya: Kamu adalah Yui Airi, teman yang santai...)")
gr.Markdown("### User and Assistant Messages")
user_input = gr.Textbox(label="User Input", lines=3)
assistant_response = gr.Textbox(label="Assistant Response", lines=3)
with gr.Row():
add_turn_btn = gr.Button("Add User/Assistant Turn")
clear_turns_btn = gr.Button("Clear Turns")
current_turns_output = gr.Markdown("Current Turns:")
# Add a dedicated status textbox for this tab
create_status_output = gr.Textbox(label="Status", interactive=False)
add_entry_btn = gr.Button("Add Entry to Dataset")
gr.Markdown("### Dataset Entries")
dataset_size_output = gr.Markdown("Number of entries: 0") # Define dataset_size_output here
# Link add_turn_btn to the add_turn function
add_turn_btn.click(
add_turn,
inputs=[current_messages, user_input, assistant_response],
outputs=[current_messages, user_input, assistant_response, create_status_output] # Update status output
).then( # Chain another event to update the displayed turns and clear status
lambda messages: ("Current Turns:\n" + "\n".join([f"**{msg['role'].capitalize()}:** {msg['content']}" for msg in messages])),
inputs=[current_messages],
outputs=[current_turns_output]
)
# Link clear_turns_btn to the clear_turns function
clear_turns_btn.click(
clear_turns,
inputs=[],
outputs=[current_messages, create_status_output] # Clear messages and status output
).then( # Chain another event to clear the displayed turns
lambda: "Current Turns:",
inputs=[],
outputs=[current_turns_output]
)
# Link add_entry_btn to the add_entry_to_dataset function
add_entry_btn.click(
add_entry_to_dataset,
inputs=[dataset_entries, system_message_input, current_messages],
outputs=[dataset_entries, system_message_input, current_messages, create_status_output, dataset_size_output] # Update status output
).then( # Chain another event to clear turns output
lambda: "Current Turns:",
inputs=[],
outputs=[current_turns_output]
)
with gr.TabItem("View/Edit Entries", id=1):
gr.Markdown("### View Dataset Entries")
entry_display = gr.Markdown("No entries to display yet.") # Define entry_display here
# Components for navigation
with gr.Row():
prev_btn = gr.Button("Previous")
next_btn = gr.Button("Next")
go_to_input = gr.Number(label="Go to Entry #", value=1, precision=0)
# Textbox for editing system message
edited_system_message_input = gr.Textbox(label="System Message", lines=5, visible=False) # Define edited_system_message_input here
# Placeholder textboxes for editing user/assistant messages (assuming max 10 messages for simplicity)
# We need 10 output components for the textboxes
edited_message_inputs = [gr.Textbox(label=f"Message {i+1}", lines=3, visible=False) for i in range(10)] # Define edited_message_inputs here
save_changes_btn = gr.Button("Save Changes", visible=False) # Define save_changes_btn here
delete_entry_btn = gr.Button("Delete Entry", visible=False) # Define delete_entry_btn here
edit_status_output = gr.Textbox(label="Edit Status", interactive=False) # Define edit_status_output here, already visible
# Link navigation buttons and go_to_input to update the current_entry_index and display
# The .then() calls need to output to all 11 textboxes (1 system + 10 messages) and the buttons/status
prev_btn.click(
prev_entry,
inputs=[current_entry_index, dataset_entries],
outputs=[current_entry_index]
).then( # Chain to display the updated entry
display_entry,
inputs=[dataset_entries, current_entry_index],
outputs=[entry_display, edited_system_message_input, *edited_message_inputs, save_changes_btn, delete_entry_btn, edit_status_output] # Ensure all outputs are listed
)
next_btn.click(
next_entry,
inputs=[current_entry_index, dataset_entries],
outputs=[current_entry_index]
).then( # Chain to display the updated entry
display_entry,
inputs=[dataset_entries, current_entry_index],
outputs=[entry_display, edited_system_message_input, *edited_message_inputs, save_changes_btn, delete_entry_btn, edit_status_output] # Ensure all outputs are listed
)
go_to_input.submit( # Use submit event for number input
go_to_entry,
inputs=[go_to_input, dataset_entries],
outputs=[current_entry_index, edit_status_output] # Output to index and status
).then( # Chain to display the updated entry (or the default if invalid)
display_entry,
inputs=[dataset_entries, current_entry_index],
outputs=[entry_display, edited_system_message_input, *edited_message_inputs, save_changes_btn, delete_entry_btn, edit_status_output] # Ensure all outputs are listed
)
# Add event listener for the 'change' event on go_to_input
go_to_input.change( # Trigger on change as well
go_to_entry,
inputs=[go_to_input, dataset_entries],
outputs=[current_entry_index, edit_status_output] # Output to index and status
).then( # Chain to display the updated entry (or the default if invalid)
display_entry,
inputs=[dataset_entries, current_entry_index],
outputs=[entry_display, edited_system_message_input, *edited_message_inputs, save_changes_btn, delete_entry_btn, edit_status_output] # Ensure all outputs are listed
)
# Link save_changes_btn to the update_entry_messages function
save_changes_btn.click(
update_entry_messages,
inputs=[dataset_entries, current_entry_index, edited_system_message_input] + edited_message_inputs,
outputs=[dataset_entries, edit_status_output]
).then( # Chain to re-display the entry after saving
display_entry,
inputs=[dataset_entries, current_entry_index],
outputs=[entry_display, edited_system_message_input, *edited_message_inputs, save_changes_btn, delete_entry_btn, edit_status_output] # Ensure all outputs are listed
)
# Link delete_entry_btn to the delete_entry function
delete_entry_btn.click(
delete_entry,
inputs=[dataset_entries, current_entry_index], # Pass State objects as inputs to delete_entry
outputs=[dataset_entries, current_entry_index, edit_status_output] # delete_entry returns updated list, new index, and status
).then( # First chained event: display the new current entry
fn=display_entry,
# Take the outputs from delete_entry as inputs for display_entry
# Mapping: delete_entry outputs (dataset_entries, current_index, edit_status_output)
# display_entry expects (dataset_entries, current_index)
inputs=[dataset_entries, current_entry_index],
outputs=[entry_display, edited_system_message_input, *edited_message_inputs, save_changes_btn, delete_entry_btn, edit_status_output]
).then( # Second chained event: update dataset size
lambda entries: f"Number of entries: {len(entries)}",
inputs=[dataset_entries],
outputs=[dataset_size_output]
)
with gr.TabItem("Save/Load Dataset", id=2):
gr.Markdown("### Save Dataset")
# Use the state variable for the filename input's value
filename_to_save = gr.Textbox(label="Enter filename to save", value="dataset.jsonl", key="filename_to_save") # Added key
with gr.Row():
save_local_btn = gr.Button("Save to File") # Changed button label
hf_save_btn = gr.Button("Save to Hugging Face Hub")
save_output = gr.Textbox(label="Save Status", interactive=False) # Already visible
with gr.Accordion("Hugging Face Hub (Save)", open=False):
hf_token_save = gr.Textbox(label="HF API Token", type="password")
hf_repo_id_save = gr.Textbox(label="HF Repo Name", placeholder="user/repo")
hf_file_path_save = gr.Textbox(label="File Path in Repo", value="dataset.jsonl")
# Link save buttons to their respective functions
save_local_btn.click(
save_dataset,
inputs=[dataset_entries, filename_to_save],
outputs=[save_output]
)
hf_save_btn.click(
save_to_hf,
inputs=[dataset_entries, hf_token_save, hf_repo_id_save, hf_file_path_save],
outputs=[save_output]
)
gr.Markdown("---")
gr.Markdown("### Load Dataset")
# Local File Load - Simplified to directly show upload and path input
gr.Markdown("#### Load from Local File")
uploaded_file = gr.File(label="Upload a JSONL file", file_types=[".jsonl"]) # Specify file type
local_file_path_input = gr.Textbox(label="Or load from local path", placeholder="/path/to/your/dataset.jsonl") # New path input
load_local_btn = gr.Button("Load Local File") # Changed button label
# Hugging Face Hub Load
gr.Markdown("#### Load from Hugging Face Hub")
with gr.Column():
hf_token_load = gr.Textbox(label="HF API Token (optional for public repos)", type="password")
hf_repo_id_load = gr.Textbox(label="HF Repository ID (e.g., your_username/your_repo)")
hf_file_path_load = gr.Textbox(label="Path file JSONL in repository (e.g., dataset.jsonl)")
load_hf_btn = gr.Button("Muat dari Hugging Face Hub")
load_output = gr.Textbox(label="Load Status", interactive=False) # Already visible
# Removed Logic to show/hide load columns based on radio button
# Link load buttons to their respective functions
# Modified load_local_btn to handle both upload and path input
load_local_btn.click(
load_dataset_from_file,
inputs=[uploaded_file, local_file_path_input], # Pass both file object and path input
outputs=[dataset_entries, current_entry_index, load_output, current_loaded_filename] # Update state variables and status
).then( # Chain to update dataset size and display the first entry
display_entry, # Call display_entry first
inputs=[dataset_entries, current_entry_index],
outputs=[entry_display, edited_system_message_input, *edited_message_inputs, save_changes_btn, delete_entry_btn, edit_status_output] # Update UI components
).then( # Then update dataset size and filename
lambda entries, loaded_filename: (f"Number of entries: {len(entries)}", loaded_filename),
inputs=[dataset_entries, current_loaded_filename],
outputs=[dataset_size_output, filename_to_save] # Update filename_to_save
)
load_hf_btn.click(
load_from_hf,
inputs=[hf_token_load, hf_repo_id_load, hf_file_path_load],
outputs=[dataset_entries, current_entry_index, load_output, current_loaded_filename] # Update state variables and status
).then( # Chain to update dataset size and display the first entry
display_entry, # Call display_entry first
inputs=[dataset_entries, current_entry_index],
outputs=[entry_display, edited_system_message_input, *edited_message_inputs, save_changes_btn, delete_entry_btn, edit_status_output] # Update UI components
).then( # Then update dataset size and filename
lambda entries, loaded_filename: (f"Number of entries: {len(entries)}", loaded_filename),
inputs=[dataset_entries, current_loaded_filename],
outputs=[dataset_size_output, filename_to_save] # Update filename_to_save
)
# Add initial display of dataset size and first entry when the app loads
# This will also handle the case after loading
demo.load(
fn=lambda entries: (f"Number of entries: {len(entries)}",) + display_entry(entries, 0), # Also display the first entry
inputs=[dataset_entries],
outputs=[dataset_size_output, entry_display, edited_system_message_input, *edited_message_inputs, save_changes_btn, delete_entry_btn, edit_status_output]
)
# To run the app in Colab, you'll need to use the public interface
# demo.launch(share=True)
demo.launch(share=True)