import gradio as gr import json import os from huggingface_hub import HfApi, snapshot_download import threading import sys # Add basic logging def log_message(message): print(f"[APP_LOG] {message}", file=sys.stderr) # Use stderr so it appears in Colab output # Function to handle saving dataset def save_dataset(dataset_entries, filename): """Saves the dataset entries to a JSONL file.""" log_message(f"Attempting to save dataset to local file: {filename}") if not dataset_entries: log_message("No entries in dataset_entries to save.") return "No entries to save." jsonl_data = "" try: for entry in dataset_entries: # Pastikan entri adalah dictionary sebelum di-dumps if isinstance(entry, dict): jsonl_data += json.dumps(entry, ensure_ascii=False) + "\n" else: log_message(f"Warning: Skipping non-dictionary entry during local save: {entry}") # Log warning with open(filename, "w", encoding="utf-8") as f: f.write(jsonl_data) log_message(f"Dataset successfully saved to local file: {filename}") return f"Dataset saved successfully to {filename}" except Exception as e: log_message(f"Error saving local file {filename}: {e}") # Include the specific exception 'e' in the error message return f"Error saving file: {e}" # Function to handle saving to Hugging Face Hub def save_to_hf(dataset_entries, hf_token, hf_repo_id, hf_file_path): """Saves the dataset entries to Hugging Face Hub.""" log_message(f"Attempting to save dataset to Hugging Face Hub: {hf_repo_id}/{hf_file_path}") if not dataset_entries: log_message("No dataset entries to save to Hugging Face Hub.") return "No dataset entries to save to Hugging Face Hub." elif not hf_token or not hf_repo_id or not hf_file_path: log_message("Missing HF token, repo ID, or file path for saving.") return "Please provide Hugging Face API Token, Repository Name, and file path." try: api = HfApi(token=hf_token) log_message("HfApi initialized.") jsonl_data = "" for entry in dataset_entries: # Pastikan entri adalah dictionary sebelum di-dumps if isinstance(entry, dict): jsonl_data += json.dumps(entry, ensure_ascii=False) + "\n" else: log_message(f"Warning: Skipping non-dictionary entry during HF save: {entry}") # Log warning # Save the data to a temporary file to upload temp_file_path = "temp_dataset.jsonl" log_message(f"Saving to temporary file for upload: {temp_file_path}") with open(temp_file_path, "w", encoding="utf-8") as f: f.write(jsonl_data) log_message("Temporary file created.") # Upload the file log_message(f"Uploading file to HF Hub: repo_id={hf_repo_id}, path_in_repo={hf_file_path}") upload_info = api.upload_file( path_or_fileobj=temp_file_path, path_in_repo=hf_file_path, repo_id=hf_repo_id, repo_type="dataset", # Specify repo type as dataset commit_message="Add or update dataset via Gradio app" ) log_message(f"Upload successful. Info: {upload_info}") # Clean up the temporary file log_message(f"Removing temporary file: {temp_file_path}") os.remove(temp_file_path) return f"Dataset saved successfully to Hugging Face Hub: {upload_info.url}" except Exception as e: error_message = f"Error saving to Hugging Face Hub: {e}" log_message(f"HF Save Error: {e}") # Enhance specific error messages if "Repository not found" in str(e): error_message = f"Error: Repository '{hf_repo_id}' not found. Please check the repository ID. Original error: {e}" elif "Authentication required" in str(e) or "Invalid token" in str(e): error_message = f"Error: Authentication failed. Please check your Hugging Face API Token or ensure the repository is public. Original error: {e}" else: # Include the specific exception 'e' for other errors error_message = f"Error saving to Hugging Face Hub: {e}" return error_message # Function to handle loading dataset from a file def load_dataset_from_file(file_obj, local_file_path): """Loads dataset entries from an uploaded file object or a local file path.""" log_message("Attempting to load dataset from uploaded file or local path.") log_message(f"Received file_obj type: {type(file_obj)}") log_message(f"Received local_file_path type: {type(local_file_path)}") log_message(f"Received local_file_path value: {local_file_path}") loaded_entries = [] filename = "" try: if file_obj is not None and hasattr(file_obj, 'read'): # Handle file object (upload) if provided log_message(f"Loading from uploaded file object: {file_obj.name}") jsonl_data = file_obj.read().decode("utf-8") filename = os.path.basename(file_obj.name) log_message(f"Read {len(jsonl_data)} characters from uploaded file object: {filename}") elif local_file_path is not None and isinstance(local_file_path, str) and local_file_path.strip(): # Handle string (local path) if provided and not empty file_path = local_file_path.strip() log_message(f"Loading from local file path: {file_path}") if not os.path.exists(file_path): log_message(f"Local file not found: {file_path}") return [], 0, f"Error loading file: Local file not found at {file_path}", "" with open(file_path, "r", encoding="utf-8") as f: jsonl_data = f.read() filename = os.path.basename(file_path) log_message(f"Read {len(jsonl_data)} characters from local file path: {filename}") else: log_message("No file uploaded or local path provided.") return [], 0, "Please upload a JSONL file or provide a local path.", "" # Return empty data, index, message, and filename for i, line in enumerate(jsonl_data.strip().split('\n')): if line.strip(): # Check if line is not empty after stripping whitespace try: loaded_entries.append(json.loads(line)) except json.JSONDecodeError as e: log_message(f"Error decoding JSON on line {i+1}: {line.strip()} - {e}") # Log decoding errors # Continue processing other lines even if one fails pass else: log_message(f"Skipping empty line {i+1} in uploaded file.") log_message(f"Successfully loaded {len(loaded_entries)} entries from file: {filename}") # Return loaded entries, set index to 0, success message, and filename return loaded_entries, 0, f"Successfully loaded {len(loaded_entries)} entries.", filename except Exception as e: log_message(f"Error loading file: {e}") # Include the specific exception 'e' in the error message return [], 0, f"Error loading file: {e}", "" # Return empty data, index, and error message # Function to handle loading from Hugging Face Hub def load_from_hf(hf_token, hf_repo_id, hf_file_path): """Loads dataset entries from Hugging Face Hub.""" log_message(f"Attempting to load dataset from Hugging Face Hub: {hf_repo_id}/{hf_file_path}") if not hf_repo_id or not hf_file_path: log_message("Missing HF repo ID or file path for loading.") return [], 0, "Please provide Hugging Face Repository ID and file path.", "" loaded_entries = [] filename_for_save = "" try: # Download the file from the Hugging Face Hub log_message(f"Downloading file from HF Hub: repo_id={hf_repo_id}, path_in_repo={hf_file_path}") # Pass token if provided, otherwise allow anonymous download for public repos downloaded_folder = snapshot_download(repo_id=hf_repo_id, allow_patterns=hf_file_path, token=hf_token if hf_token else None) downloaded_file_path = os.path.join(downloaded_folder, hf_file_path) log_message(f"File downloaded to temporary path: {downloaded_file_path}") if not os.path.exists(downloaded_file_path): log_message(f"Downloaded file not found at expected path: {downloaded_file_path}") # Provide a specific message if the file is not found in the repo return [], 0, f"Error: File '{hf_file_path}' not found in repository '{hf_repo_id}'. Please check the file path.", "" with open(downloaded_file_path, "r", encoding="utf-8") as f: for i, line in enumerate(f): if line.strip(): # Check if line is not empty after stripping whitespace try: loaded_entries.append(json.loads(line)) except json.JSONDecodeError as e: log_message(f"Error decoding JSON on line {i+1} in HF file: {line.strip()} - {e}") # Log decoding errors # Continue processing other lines even if one fails pass else: log_message(f"Skipping empty line {i+1} in HF file.") # Extract filename for saving filename_for_save = os.path.basename(hf_file_path) log_message(f"Successfully loaded {len(loaded_entries)} entries from Hugging Face Hub file: {filename_for_save}") # Return loaded entries, set index to 0, success message, and filename return loaded_entries, 0, f"Successfully loaded {len(loaded_entries)} entri dari Hugging Face Hub.", filename_for_save except Exception as e: error_message = f"Gagal memuat dari Hugging Face Hub: {e}" log_message(f"HF Load Error: {e}") # Enhance specific error messages if "Repository not found" in str(e): error_message = f"Error: Repository '{hf_repo_id}' not found. Please check the repository ID. Original error: {e}" elif "Authentication required" in str(e) or "Invalid token" in str(e): error_message = f"Error: Authentication failed. Please check your Hugging Face API Token or ensure the repository is public. Original error: {e}" elif "allow_patterns" in str(e): # Handle specific download errors related to patterns error_message = f"Error: File path '{hf_file_path}' not found in repository '{hf_repo_id}' or pattern matching failed. Original error: {e}" else: # Include the specific exception 'e' for other errors error_message = f"Error loading from Hugging Face Hub: {e}" return [], 0, error_message, "" # Function to add a user/assistant turn def add_turn(messages, user_input, assistant_response): """Adds a user and assistant turn to the current messages.""" log_message("Attempting to add user/assistant turn.") if not user_input.strip() or not assistant_response.strip(): # Added strip() for validation log_message("User input or assistant response is empty, not adding turn.") # Return current state and a user-facing message return messages, user_input, assistant_response, "Please provide both User Input and Assistant Response." messages.append({"role": "user", "content": user_input.strip()}) # Added strip() for content messages.append({"role": "assistant", "content": assistant_response.strip()}) # Added strip() for content log_message("User/assistant turn added.") # Return updated messages, clear input fields, and return an empty status message return messages, "", "", "Turn added successfully." # Return updated messages and clear input fields # Function to clear turns def clear_turns(): """Clears the current messages.""" log_message("Clearing current turns.") return [], "" # Return empty messages and clear status message # Function to add an entry to the dataset def add_entry_to_dataset(dataset_entries, system_message, messages): """Adds the current system message and turns as a new entry to the dataset.""" log_message("Attempting to add entry to dataset.") new_entry_messages = [] if system_message.strip(): # Added strip() for validation new_entry_messages.append({"role": "system", "content": system_message.strip()}) # Added strip() for content log_message("System message added to new entry.") new_entry_messages.extend(messages) log_message(f"New entry messages: {new_entry_messages}") if new_entry_messages: dataset_entries.append({"messages": new_entry_messages}) log_message(f"Entry added to dataset. New dataset size: {len(dataset_entries)}") # After adding, update the dataset size display return dataset_entries, "", [], "Entry added to dataset!", f"Number of entries: {len(dataset_entries)}" else: log_message("No messages to add as an entry.") # Return current state and a user-facing message return dataset_entries, system_message, messages, "Cannot add empty entry. Add system message or user/assistant turns.", f"Number of entries: {len(dataset_entries)}" # Function to display current entry def display_entry(dataset_entries, current_index): """Displays the messages of the current dataset entry and provides editable textboxes.""" log_message(f"Attempting to display entry at index: {current_index}") log_message(f"Current dataset_entries size in display_entry: {len(dataset_entries) if dataset_entries is not None else 0}") # Prepare default outputs for empty dataset or invalid index empty_display_text = "No entries to display yet." empty_system_message = "" # Create a list of 10 gr.update objects for textboxes, setting initial values to "" and visible=False hidden_textboxes = [gr.update(value="", visible=False) for _ in range(10)] hide_buttons = gr.update(visible=False) clear_status = "" if not dataset_entries: log_message("dataset_entries is empty, cannot display.") # Return empty state and hide components # Note the use of *hidden_textboxes to unpack the list into individual arguments return empty_display_text, empty_system_message, *hidden_textboxes, hide_buttons, hide_buttons, clear_status total_entries = len(dataset_entries) # Ensure current_index is within bounds after operations like deletion if not (0 <= current_index < total_entries): log_message(f"Current index {current_index} out of bounds for dataset size {total_entries}. Adjusting.") # Adjust index to the last entry if out of bounds high, or stay at 0 if empty current_index = max(0, min(current_index, total_entries - 1)) if total_entries > 0 else 0 log_message(f"Adjusted index: {current_index}") # Re-evaluate based on the adjusted index if not (0 <= current_index < total_entries): # Check again if dataset became empty log_message("Dataset is empty after index adjustment.") # Return empty state and hide components # Ensure all output components match the function's expected outputs return empty_display_text, empty_system_message, *hidden_textboxes, hide_buttons, hide_buttons, clear_status # Proceed with displaying the valid entry entry = dataset_entries[current_index] log_message(f"Displaying entry {current_index + 1} of {total_entries}. Entry content sample: {str(entry)[:100]}...") # Log sample of entry display_text = f"Viewing Entry {current_index + 1} of {total_entries}\n\n" system_message_content = "" messages_content = [] # Separate system message from user/assistant messages if entry and 'messages' in entry and isinstance(entry['messages'], list) and entry['messages']: # Added type check and emptiness check if entry['messages'][0]['role'] == 'system': system_message_content = entry['messages'][0]['content'] messages_content = entry['messages'][1:] log_message("Found system message and user/assistant messages.") else: # Assume all messages are user/assistant if the first is not system messages_content = entry['messages'] log_message("No system message found, displaying all as user/assistant.") elif entry and 'messages' in entry and isinstance(entry['messages'], list) and not entry['messages']: log_message("Entry has empty messages list.") # messages_content remains empty else: # Handle invalid entry format or missing messages key log_message(f"Warning: Invalid entry format or missing messages key at index {current_index}: {entry}") # Return error state for this specific entry and hide components # Ensure all output components match the function's expected outputs return f"Error displaying entry {current_index + 1}: Invalid format.", "", *hidden_textboxes, hide_buttons, hide_buttons, "" # Format display text for user/assistant messages for msg in messages_content: display_text += f"**{msg['role'].capitalize()}:** {msg['content']}\n\n" # Prepare values for the editable textboxes editable_system_message = system_message_content # Ensure we only populate up to 10 textboxes # Also ensure message objects have 'content' key editable_messages = [msg.get('content', '') for msg in messages_content[:10] if isinstance(msg, dict)] + [""] * (10 - len(messages_content[:10])) # Pad with empty strings up to 10, added safety checks # Update visibility of message textboxes # Ensure visibility is based on the actual number of messages_content textbox_updates = [gr.update(value=editable_messages[i], visible=(i < len(messages_content) and i < 10)) for i in range(10)] # Ensure max 10 textboxes log_message("Successfully prepared display text and textbox updates.") # Show edit/delete buttons and clear edit status # Return all output components, including the updated value for edited_system_message_input return display_text, gr.update(value=editable_system_message, visible=True), *textbox_updates, gr.update(visible=True), gr.update(visible=True), "" # Function to navigate to the previous entry def prev_entry(current_index, dataset_entries): """Navigates to the previous entry.""" log_message(f"Navigating to previous entry from index {current_index}") if current_index > 0: new_index = current_index - 1 log_message(f"New index: {new_index}") return new_index log_message("Already at the beginning (index 0). Staying at 0.") return 0 # Stay at 0 if already at the beginning # Function to navigate to the next entry def next_entry(current_index, dataset_entries): """Navigates to the next entry.""" log_message(f"Navigating to next entry from index {current_index}") if len(dataset_entries) > 0 and current_index < len(dataset_entries) - 1: new_index = current_index + 1 log_message(f"New index: {new_index}") return new_index if len(dataset_entries) > 0: log_message("Already at the end. Staying at last index.") return len(dataset_entries) - 1 # Stay at the last index if already at the end log_message("Dataset is empty. Staying at index 0.") return 0 # If dataset is empty # Function to go to a specific entry number def go_to_entry(entry_number, dataset_entries): """Navigates to a specific entry number.""" log_message(f"Attempting to go to entry number: {entry_number}") total_entries = len(dataset_entries) default_index = 0 if total_entries == 0 else 0 # Default to 0 if empty, or first if not try: # Attempt to convert input to integer index = int(entry_number) - 1 # Validate index range if 0 <= index < total_entries: log_message(f"Valid index calculated: {index}") # Return valid index and empty status message return index, "" else: log_message(f"Calculated index {index} is out of bounds (0 to {total_entries-1 if total_entries > 0 else 0}).") # Return default index and error message for out of bounds return default_index, f"Error: Entry number {entry_number} is out of bounds. Please enter a number between 1 and {total_entries if total_entries > 0 else 1}." except (ValueError, TypeError): log_message(f"Invalid input for entry number: {entry_number}") # Return default index and error message for invalid input type return default_index, f"Error: Invalid input '{entry_number}'. Please enter a valid integer number." # Function to update messages in the current entry def update_entry_messages(dataset_entries, current_index, edited_system_message, *edited_contents): """Updates the messages of the current entry with edited content.""" log_message(f"Attempting to update entry at index: {current_index}") if not dataset_entries or not (0 <= current_index < len(dataset_entries)): log_message("Cannot update entry: dataset_entries empty or index out of bounds.") # Return current state and a specific error message return dataset_entries, "Error: Cannot update entry. Dataset is empty or index is out of bounds." updated_messages = [] # Handle the system message if edited_system_message.strip(): # Added strip() for validation and content updated_messages.append({"role": "system", "content": edited_system_message.strip()}) log_message("Updated system message added.") # Get original user/assistant messages original_messages_in_entry = dataset_entries[current_index].get('messages', []) original_user_assistant_messages = [msg for msg in original_messages_in_entry if msg.get('role') in ['user', 'assistant']] # Added role check and get with default original_user_assistant_count = len(original_user_assistant_messages) # Iterate through the edited contents provided by the textboxes # We are assuming a max of 10 editable message textboxes for i in range(10): # Process up to 10 edited message textboxes edited_content = edited_contents[i] # Check if the edited content is not empty if edited_content.strip(): # If it corresponds to an original message index, use its original role if i < original_user_assistant_count: updated_messages.append({"role": original_user_assistant_messages[i].get('role', 'user'), "content": edited_content.strip()}) # Corrected quote log_message(f"Updated original message {i+1} with role {original_user_assistant_messages[i].get('role', 'user')}.") # If it's a new message (beyond original count but within the 10 textboxes) else: # Determine role based on the last message added in the updated_messages list if len(updated_messages) > 0: last_role = updated_messages[-1]['role'] # Alternate roles, assuming the sequence is always user, assistant, user, assistant... new_role = 'user' if last_role == 'assistant' else 'assistant' else: # If no messages exist yet (only system message or initially empty), the first new message is 'user' new_role = 'user' updated_messages.append({"role": new_role, "content": edited_content.strip()}) # Corrected quote log_message(f"Added new message {i+1} with inferred role {new_role}.") # If edited content is empty and it was an original message, it's effectively deleted (not added to updated_messages) elif i < original_user_assistant_count: log_message(f"Original message {i+1} was cleared, effectively deleting it.") # Check if the updated entry has any messages (system or user/assistant) if not updated_messages: # Prevent saving an empty entry if it wasn't originally empty (unless system message was the only thing and is now empty) # Allow saving an empty messages list if the original entry only had a system message and it was cleared if not (len(original_messages_in_entry) == 1 and original_messages_in_entry[0]['role'] == 'system' and not edited_system_message.strip()): log_message("Attempted to save an empty entry. Preventing save.") # Return current state and a specific error message return dataset_entries, "Error: Cannot save an empty entry. Add system message or user/assistant turns." # Update the entry in the dataset_entries list if 0 <= current_index < len(dataset_entries): dataset_entries[current_index]['messages'] = updated_messages log_message(f"Entry {current_index + 1} updated successfully. New message count: {len(updated_messages)}") return dataset_entries, f"Changes saved for Entry {current_index + 1}." else: log_message(f"Error updating entry: index {current_index} out of bounds.") # Return current state and a specific error message return dataset_entries, "Error: Cannot update entry. Index out of bounds." # Function to delete the current entry def delete_entry(dataset_entries, current_index): """Deletes the current entry from the dataset.""" log_message(f"Attempting to delete entry at index: {current_index}") if not dataset_entries or not (0 <= current_index < len(dataset_entries)): log_message("Cannot delete entry: dataset_entries empty or index out of bounds.") # If dataset is already empty or index is invalid, just return current state and an error message # Return the current index as it hasn't changed due to deletion not happening return dataset_entries, current_index, "Error: Cannot delete entry. Dataset is empty or index is out of bounds." deleted_entry_index = current_index # Keep track of the index being deleted log_message(f"Deleting entry at index {deleted_entry_index}.") del dataset_entries[current_index] # Adjust index after deletion new_index = deleted_entry_index if new_index >= len(dataset_entries) and len(dataset_entries) > 0: new_index = len(dataset_entries) - 1 log_message(f"Adjusting index after deletion to last entry: {new_index}") elif len(dataset_entries) == 0: new_index = 0 # Reset index if dataset is empty log_message("Dataset is empty after deletion. Resetting index to 0.") else: log_message(f"Index remains {new_index} after deletion.") # Return updated dataset, new index, and a success message return dataset_entries, new_index, f"Entry {deleted_entry_index + 1} deleted." # Define the Gradio Interface with gr.Blocks() as demo: dataset_entries = gr.State([]) # Use Gradio State to maintain dataset entries current_messages = gr.State([]) # Use Gradio State to maintain current messages for creation current_entry_index = gr.State(0) # Use Gradio State for current viewing index current_loaded_filename = gr.State("") # State to hold the name of the currently loaded file gr.Markdown("## LLM Dataset Creator") with gr.Tabs() as tabs: with gr.TabItem("Create Entry", id=0): gr.Markdown("### Create a new entry") system_message_input = gr.Textbox(label="System Message", lines=5, placeholder="Instruksi peran yang sangat kuat (misalnya: Kamu adalah Yui Airi, teman yang santai...)") gr.Markdown("### User and Assistant Messages") user_input = gr.Textbox(label="User Input", lines=3) assistant_response = gr.Textbox(label="Assistant Response", lines=3) with gr.Row(): add_turn_btn = gr.Button("Add User/Assistant Turn") clear_turns_btn = gr.Button("Clear Turns") current_turns_output = gr.Markdown("Current Turns:") # Add a dedicated status textbox for this tab create_status_output = gr.Textbox(label="Status", interactive=False) add_entry_btn = gr.Button("Add Entry to Dataset") gr.Markdown("### Dataset Entries") dataset_size_output = gr.Markdown("Number of entries: 0") # Define dataset_size_output here # Link add_turn_btn to the add_turn function add_turn_btn.click( add_turn, inputs=[current_messages, user_input, assistant_response], outputs=[current_messages, user_input, assistant_response, create_status_output] # Update status output ).then( # Chain another event to update the displayed turns and clear status lambda messages: ("Current Turns:\n" + "\n".join([f"**{msg['role'].capitalize()}:** {msg['content']}" for msg in messages])), inputs=[current_messages], outputs=[current_turns_output] ) # Link clear_turns_btn to the clear_turns function clear_turns_btn.click( clear_turns, inputs=[], outputs=[current_messages, create_status_output] # Clear messages and status output ).then( # Chain another event to clear the displayed turns lambda: "Current Turns:", inputs=[], outputs=[current_turns_output] ) # Link add_entry_btn to the add_entry_to_dataset function add_entry_btn.click( add_entry_to_dataset, inputs=[dataset_entries, system_message_input, current_messages], outputs=[dataset_entries, system_message_input, current_messages, create_status_output, dataset_size_output] # Update status output ).then( # Chain another event to clear turns output lambda: "Current Turns:", inputs=[], outputs=[current_turns_output] ) with gr.TabItem("View/Edit Entries", id=1): gr.Markdown("### View Dataset Entries") entry_display = gr.Markdown("No entries to display yet.") # Define entry_display here # Components for navigation with gr.Row(): prev_btn = gr.Button("Previous") next_btn = gr.Button("Next") go_to_input = gr.Number(label="Go to Entry #", value=1, precision=0) # Textbox for editing system message edited_system_message_input = gr.Textbox(label="System Message", lines=5, visible=False) # Define edited_system_message_input here # Placeholder textboxes for editing user/assistant messages (assuming max 10 messages for simplicity) # We need 10 output components for the textboxes edited_message_inputs = [gr.Textbox(label=f"Message {i+1}", lines=3, visible=False) for i in range(10)] # Define edited_message_inputs here save_changes_btn = gr.Button("Save Changes", visible=False) # Define save_changes_btn here delete_entry_btn = gr.Button("Delete Entry", visible=False) # Define delete_entry_btn here edit_status_output = gr.Textbox(label="Edit Status", interactive=False) # Define edit_status_output here, already visible # Link navigation buttons and go_to_input to update the current_entry_index and display # The .then() calls need to output to all 11 textboxes (1 system + 10 messages) and the buttons/status prev_btn.click( prev_entry, inputs=[current_entry_index, dataset_entries], outputs=[current_entry_index] ).then( # Chain to display the updated entry display_entry, inputs=[dataset_entries, current_entry_index], outputs=[entry_display, edited_system_message_input, *edited_message_inputs, save_changes_btn, delete_entry_btn, edit_status_output] # Ensure all outputs are listed ) next_btn.click( next_entry, inputs=[current_entry_index, dataset_entries], outputs=[current_entry_index] ).then( # Chain to display the updated entry display_entry, inputs=[dataset_entries, current_entry_index], outputs=[entry_display, edited_system_message_input, *edited_message_inputs, save_changes_btn, delete_entry_btn, edit_status_output] # Ensure all outputs are listed ) go_to_input.submit( # Use submit event for number input go_to_entry, inputs=[go_to_input, dataset_entries], outputs=[current_entry_index, edit_status_output] # Output to index and status ).then( # Chain to display the updated entry (or the default if invalid) display_entry, inputs=[dataset_entries, current_entry_index], outputs=[entry_display, edited_system_message_input, *edited_message_inputs, save_changes_btn, delete_entry_btn, edit_status_output] # Ensure all outputs are listed ) # Add event listener for the 'change' event on go_to_input go_to_input.change( # Trigger on change as well go_to_entry, inputs=[go_to_input, dataset_entries], outputs=[current_entry_index, edit_status_output] # Output to index and status ).then( # Chain to display the updated entry (or the default if invalid) display_entry, inputs=[dataset_entries, current_entry_index], outputs=[entry_display, edited_system_message_input, *edited_message_inputs, save_changes_btn, delete_entry_btn, edit_status_output] # Ensure all outputs are listed ) # Link save_changes_btn to the update_entry_messages function save_changes_btn.click( update_entry_messages, inputs=[dataset_entries, current_entry_index, edited_system_message_input] + edited_message_inputs, outputs=[dataset_entries, edit_status_output] ).then( # Chain to re-display the entry after saving display_entry, inputs=[dataset_entries, current_entry_index], outputs=[entry_display, edited_system_message_input, *edited_message_inputs, save_changes_btn, delete_entry_btn, edit_status_output] # Ensure all outputs are listed ) # Link delete_entry_btn to the delete_entry function delete_entry_btn.click( delete_entry, inputs=[dataset_entries, current_entry_index], # Pass State objects as inputs to delete_entry outputs=[dataset_entries, current_entry_index, edit_status_output] # delete_entry returns updated list, new index, and status ).then( # First chained event: display the new current entry fn=display_entry, # Take the outputs from delete_entry as inputs for display_entry # Mapping: delete_entry outputs (dataset_entries, current_index, edit_status_output) # display_entry expects (dataset_entries, current_index) inputs=[dataset_entries, current_entry_index], outputs=[entry_display, edited_system_message_input, *edited_message_inputs, save_changes_btn, delete_entry_btn, edit_status_output] ).then( # Second chained event: update dataset size lambda entries: f"Number of entries: {len(entries)}", inputs=[dataset_entries], outputs=[dataset_size_output] ) with gr.TabItem("Save/Load Dataset", id=2): gr.Markdown("### Save Dataset") # Use the state variable for the filename input's value filename_to_save = gr.Textbox(label="Enter filename to save", value="dataset.jsonl", key="filename_to_save") # Added key with gr.Row(): save_local_btn = gr.Button("Save to File") # Changed button label hf_save_btn = gr.Button("Save to Hugging Face Hub") save_output = gr.Textbox(label="Save Status", interactive=False) # Already visible with gr.Accordion("Hugging Face Hub (Save)", open=False): hf_token_save = gr.Textbox(label="HF API Token", type="password") hf_repo_id_save = gr.Textbox(label="HF Repo Name", placeholder="user/repo") hf_file_path_save = gr.Textbox(label="File Path in Repo", value="dataset.jsonl") # Link save buttons to their respective functions save_local_btn.click( save_dataset, inputs=[dataset_entries, filename_to_save], outputs=[save_output] ) hf_save_btn.click( save_to_hf, inputs=[dataset_entries, hf_token_save, hf_repo_id_save, hf_file_path_save], outputs=[save_output] ) gr.Markdown("---") gr.Markdown("### Load Dataset") # Local File Load - Simplified to directly show upload and path input gr.Markdown("#### Load from Local File") uploaded_file = gr.File(label="Upload a JSONL file", file_types=[".jsonl"]) # Specify file type local_file_path_input = gr.Textbox(label="Or load from local path", placeholder="/path/to/your/dataset.jsonl") # New path input load_local_btn = gr.Button("Load Local File") # Changed button label # Hugging Face Hub Load gr.Markdown("#### Load from Hugging Face Hub") with gr.Column(): hf_token_load = gr.Textbox(label="HF API Token (optional for public repos)", type="password") hf_repo_id_load = gr.Textbox(label="HF Repository ID (e.g., your_username/your_repo)") hf_file_path_load = gr.Textbox(label="Path file JSONL in repository (e.g., dataset.jsonl)") load_hf_btn = gr.Button("Muat dari Hugging Face Hub") load_output = gr.Textbox(label="Load Status", interactive=False) # Already visible # Removed Logic to show/hide load columns based on radio button # Link load buttons to their respective functions # Modified load_local_btn to handle both upload and path input load_local_btn.click( load_dataset_from_file, inputs=[uploaded_file, local_file_path_input], # Pass both file object and path input outputs=[dataset_entries, current_entry_index, load_output, current_loaded_filename] # Update state variables and status ).then( # Chain to update dataset size and display the first entry display_entry, # Call display_entry first inputs=[dataset_entries, current_entry_index], outputs=[entry_display, edited_system_message_input, *edited_message_inputs, save_changes_btn, delete_entry_btn, edit_status_output] # Update UI components ).then( # Then update dataset size and filename lambda entries, loaded_filename: (f"Number of entries: {len(entries)}", loaded_filename), inputs=[dataset_entries, current_loaded_filename], outputs=[dataset_size_output, filename_to_save] # Update filename_to_save ) load_hf_btn.click( load_from_hf, inputs=[hf_token_load, hf_repo_id_load, hf_file_path_load], outputs=[dataset_entries, current_entry_index, load_output, current_loaded_filename] # Update state variables and status ).then( # Chain to update dataset size and display the first entry display_entry, # Call display_entry first inputs=[dataset_entries, current_entry_index], outputs=[entry_display, edited_system_message_input, *edited_message_inputs, save_changes_btn, delete_entry_btn, edit_status_output] # Update UI components ).then( # Then update dataset size and filename lambda entries, loaded_filename: (f"Number of entries: {len(entries)}", loaded_filename), inputs=[dataset_entries, current_loaded_filename], outputs=[dataset_size_output, filename_to_save] # Update filename_to_save ) # Add initial display of dataset size and first entry when the app loads # This will also handle the case after loading demo.load( fn=lambda entries: (f"Number of entries: {len(entries)}",) + display_entry(entries, 0), # Also display the first entry inputs=[dataset_entries], outputs=[dataset_size_output, entry_display, edited_system_message_input, *edited_message_inputs, save_changes_btn, delete_entry_btn, edit_status_output] ) # To run the app in Colab, you'll need to use the public interface # demo.launch(share=True) demo.launch(share=True)