Spaces:
Build error
Build error
| """ | |
| Functions for viewing and filtering JSON log files containing LLM prompts and responses. | |
| """ | |
| import json | |
| from typing import Dict, List, Optional, Tuple | |
| import gradio as gr | |
| def load_log_file_handler(log_file): | |
| """Handle log file upload and initialize dropdowns.""" | |
| if log_file is None: | |
| return ( | |
| [], # log_data_state | |
| gr.Dropdown(choices=[]), # log_batch_dropdown | |
| gr.Dropdown(choices=[]), # log_task_type_dropdown | |
| gr.Dropdown(choices=[]), # log_group_dropdown | |
| gr.Dropdown(choices=[]), # log_model_choice_dropdown | |
| gr.Dropdown(choices=[]), # log_validated_dropdown | |
| "### Prompt\n\nNo file uploaded.", # log_prompt_markdown | |
| "### Response\n\nNo file uploaded.", # log_response_markdown | |
| ) | |
| file_path = log_file.name if hasattr(log_file, "name") else log_file | |
| log_data = load_log_file(file_path) | |
| if not log_data: | |
| return ( | |
| [], # log_data_state | |
| gr.Dropdown(choices=[]), # log_batch_dropdown | |
| gr.Dropdown(choices=[]), # log_task_type_dropdown | |
| gr.Dropdown(choices=[]), # log_group_dropdown | |
| gr.Dropdown(choices=[]), # log_model_choice_dropdown | |
| gr.Dropdown(choices=[]), # log_validated_dropdown | |
| "### Prompt\n\nError: Could not load log file or file is empty.", # log_prompt_markdown | |
| "### Response\n\nError: Could not load log file or file is empty.", # log_response_markdown | |
| ) | |
| # Extract unique values for all filters | |
| batches, task_types, groups, model_choices, validated_values = ( | |
| extract_unique_filter_values(log_data) | |
| ) | |
| # Format choices as strings for dropdown, add "All" option | |
| batch_choices = ["All"] + [str(b) for b in batches] if batches else ["All"] | |
| task_type_choices = ["All"] + task_types if task_types else ["All"] | |
| group_choices = ["All"] + groups if groups else ["All"] | |
| model_choice_choices = ["All"] + model_choices if model_choices else ["All"] | |
| validated_choices = ["All"] + validated_values if validated_values else ["All"] | |
| # Get default prompt and response (show all if no filters) | |
| prompt, response = get_prompt_and_response(log_data) | |
| # Format for markdown display | |
| prompt_display = f"### Prompt\n\n{prompt}" | |
| response_display = f"### Response\n\n{response}" | |
| return ( | |
| log_data, # log_data_state | |
| gr.Dropdown(choices=batch_choices, value="All"), # log_batch_dropdown | |
| gr.Dropdown(choices=task_type_choices, value="All"), # log_task_type_dropdown | |
| gr.Dropdown(choices=group_choices, value="All"), # log_group_dropdown | |
| gr.Dropdown( | |
| choices=model_choice_choices, value="All" | |
| ), # log_model_choice_dropdown | |
| gr.Dropdown(choices=validated_choices, value="All"), # log_validated_dropdown | |
| prompt_display, # log_prompt_markdown | |
| response_display, # log_response_markdown | |
| ) | |
| def filter_log_display(log_data, batch_str, task_type, group, model_choice, validated): | |
| """Filter and display log entries based on all filter criteria.""" | |
| if not log_data: | |
| return ( | |
| "### Prompt\n\nNo log data available.", | |
| "### Response\n\nNo log data available.", | |
| ) | |
| # Convert batch string to int, handle "All" option | |
| batch = None | |
| if batch_str and batch_str != "All": | |
| try: | |
| batch = int(batch_str) | |
| except (ValueError, TypeError): | |
| batch = None | |
| # Handle "All" option for all filters | |
| task_type_filter = None if task_type == "All" else task_type | |
| group_filter = None if group == "All" else group | |
| model_choice_filter = None if model_choice == "All" else model_choice | |
| validated_filter = None if validated == "All" else validated | |
| # Get filtered prompt and response | |
| prompt, response = get_prompt_and_response( | |
| log_data, | |
| batch, | |
| task_type_filter, | |
| group_filter, | |
| model_choice_filter, | |
| validated_filter, | |
| ) | |
| # Format for markdown display | |
| prompt_display = f"### Prompt\n\n{prompt}" | |
| response_display = f"### Response\n\n{response}" | |
| return prompt_display, response_display | |
| # Handle batch and task type changes | |
| def update_log_display_on_filter( | |
| log_data, batch_str, task_type, group, model_choice, validated | |
| ): | |
| """Update display when any filter changes.""" | |
| return filter_log_display( | |
| log_data, batch_str, task_type, group, model_choice, validated | |
| ) | |
| def load_log_file(file_path: str) -> List[Dict]: | |
| """ | |
| Load a JSON log file containing LLM prompts and responses. | |
| Args: | |
| file_path: Path to the JSON file | |
| Returns: | |
| List of dictionaries containing log entries | |
| """ | |
| try: | |
| with open(file_path, "r", encoding="utf-8-sig", errors="replace") as f: | |
| data = json.load(f) | |
| return data if isinstance(data, list) else [] | |
| except (FileNotFoundError, json.JSONDecodeError, Exception) as e: | |
| print(f"Error loading log file: {e}") | |
| return [] | |
| def parse_batch_number(batch: str) -> int: | |
| """ | |
| Parse batch number from string format. | |
| Handles formats like "1", "1:", "2", "2:", etc. | |
| Args: | |
| batch: Batch string (e.g., "1", "1:", "2") | |
| Returns: | |
| Integer batch number | |
| """ | |
| if not batch: | |
| return 0 | |
| # Remove colon if present and strip whitespace | |
| batch_clean = batch.strip().rstrip(":") | |
| try: | |
| return int(batch_clean) | |
| except (ValueError, TypeError): | |
| return 0 | |
| def extract_unique_batches_and_task_types( | |
| log_data: List[Dict], | |
| ) -> Tuple[List[int], List[str]]: | |
| """ | |
| Extract unique batch numbers and task types from log data. | |
| Args: | |
| log_data: List of log entry dictionaries | |
| Returns: | |
| Tuple of (sorted list of unique batch numbers, sorted list of unique task types) | |
| """ | |
| batches = set() | |
| task_types = set() | |
| for entry in log_data: | |
| if "batch" in entry: | |
| batch_num = parse_batch_number(str(entry["batch"])) | |
| if batch_num > 0: | |
| batches.add(batch_num) | |
| if "task_type" in entry: | |
| task_type = entry["task_type"] | |
| if task_type: | |
| task_types.add(task_type) | |
| return sorted(list(batches)), sorted(list(task_types)) | |
| def extract_unique_filter_values( | |
| log_data: List[Dict], | |
| ) -> Tuple[List[int], List[str], List[str], List[str], List[str]]: | |
| """ | |
| Extract unique values for all filter fields from log data. | |
| Args: | |
| log_data: List of log entry dictionaries | |
| Returns: | |
| Tuple of (batches, task_types, groups, model_choices, validated_values) | |
| """ | |
| batches = set() | |
| task_types = set() | |
| groups = set() | |
| model_choices = set() | |
| validated_values = set() | |
| for entry in log_data: | |
| if "batch" in entry: | |
| batch_num = parse_batch_number(str(entry["batch"])) | |
| if batch_num > 0: | |
| batches.add(batch_num) | |
| if "task_type" in entry: | |
| task_type = entry["task_type"] | |
| if task_type: | |
| task_types.add(task_type) | |
| if "group" in entry: | |
| group = entry["group"] | |
| if group: | |
| groups.add(str(group)) | |
| if "model_choice" in entry: | |
| model_choice = entry["model_choice"] | |
| if model_choice: | |
| model_choices.add(str(model_choice)) | |
| if "validated" in entry: | |
| validated = entry["validated"] | |
| if validated: | |
| validated_values.add(str(validated)) | |
| return ( | |
| sorted(list(batches)), | |
| sorted(list(task_types)), | |
| sorted(list(groups)), | |
| sorted(list(model_choices)), | |
| sorted(list(validated_values)), | |
| ) | |
| def filter_log_entries( | |
| log_data: List[Dict], | |
| batch: Optional[int] = None, | |
| task_type: Optional[str] = None, | |
| group: Optional[str] = None, | |
| model_choice: Optional[str] = None, | |
| validated: Optional[str] = None, | |
| ) -> List[Dict]: | |
| """ | |
| Filter log entries by batch number, task type, group, model_choice, and/or validated. | |
| Args: | |
| log_data: List of log entry dictionaries | |
| batch: Optional batch number to filter by | |
| task_type: Optional task type to filter by | |
| group: Optional group to filter by | |
| model_choice: Optional model choice to filter by | |
| validated: Optional validated value to filter by | |
| Returns: | |
| Filtered list of log entries | |
| """ | |
| filtered = [] | |
| for entry in log_data: | |
| match_batch = True | |
| match_task_type = True | |
| match_group = True | |
| match_model_choice = True | |
| match_validated = True | |
| if batch is not None: | |
| entry_batch = parse_batch_number(str(entry.get("batch", "0"))) | |
| match_batch = entry_batch == batch | |
| if task_type is not None: | |
| entry_task_type = entry.get("task_type", "") | |
| match_task_type = entry_task_type == task_type | |
| if group is not None: | |
| entry_group = str(entry.get("group", "")) | |
| match_group = entry_group == group | |
| if model_choice is not None: | |
| entry_model_choice = str(entry.get("model_choice", "")) | |
| match_model_choice = entry_model_choice == model_choice | |
| if validated is not None: | |
| entry_validated = str(entry.get("validated", "")) | |
| match_validated = entry_validated == validated | |
| if ( | |
| match_batch | |
| and match_task_type | |
| and match_group | |
| and match_model_choice | |
| and match_validated | |
| ): | |
| filtered.append(entry) | |
| return filtered | |
| def get_prompt_and_response( | |
| log_data: List[Dict], | |
| batch: Optional[int] = None, | |
| task_type: Optional[str] = None, | |
| group: Optional[str] = None, | |
| model_choice: Optional[str] = None, | |
| validated: Optional[str] = None, | |
| ) -> Tuple[str, str]: | |
| """ | |
| Get prompt and response text for filtered log entries. | |
| If multiple entries match, concatenate them. | |
| Args: | |
| log_data: List of log entry dictionaries | |
| batch: Optional batch number to filter by | |
| task_type: Optional task type to filter by | |
| group: Optional group to filter by | |
| model_choice: Optional model choice to filter by | |
| validated: Optional validated value to filter by | |
| Returns: | |
| Tuple of (prompt_text, response_text) | |
| """ | |
| filtered = filter_log_entries( | |
| log_data, batch, task_type, group, model_choice, validated | |
| ) | |
| if not filtered: | |
| return "No entries found for the selected batch and task type.", "" | |
| # If multiple entries, combine them with separators | |
| prompts = [] | |
| responses = [] | |
| for idx, entry in enumerate(filtered, 1): | |
| prompt = entry.get("prompt", "") | |
| response = entry.get("response", "") | |
| # Add entry number if multiple entries | |
| entry_header = f"**Entry {idx}**" if len(filtered) > 1 else "" | |
| if prompt: | |
| if entry_header: | |
| prompts.append(f"{entry_header}\n\n{prompt}") | |
| else: | |
| prompts.append(prompt) | |
| if response: | |
| if entry_header: | |
| responses.append(f"{entry_header}\n\n{response}") | |
| else: | |
| responses.append(response) | |
| prompt_text = "\n\n---\n\n".join(prompts) if prompts else "No prompt found." | |
| response_text = "\n\n---\n\n".join(responses) if responses else "No response found." | |
| return prompt_text, response_text | |
| def load_and_initialize_log_viewer( | |
| file_path: str, | |
| ) -> Tuple[List[int], List[str], str, str]: | |
| """ | |
| Load log file and initialize viewer with default values. | |
| Args: | |
| file_path: Path to the JSON log file | |
| Returns: | |
| Tuple of (batch_numbers, task_types, default_prompt, default_response) | |
| """ | |
| log_data = load_log_file(file_path) | |
| if not log_data: | |
| return [], [], "No log data found in file.", "" | |
| batches, task_types = extract_unique_batches_and_task_types(log_data) | |
| # Get default prompt and response (first entry or all if no filters) | |
| prompt, response = get_prompt_and_response(log_data) | |
| return batches, task_types, prompt, response | |