llm_topic_modelling / tools /view_logs.py
seanpedrickcase's picture
Sync: Fix on mismatched tables where batch size was 1
c7b1c6d
"""
Functions for viewing and filtering JSON log files containing LLM prompts and responses.
"""
import json
from typing import Dict, List, Optional, Tuple
import gradio as gr
def load_log_file_handler(log_file):
"""Handle log file upload and initialize dropdowns."""
if log_file is None:
return (
[], # log_data_state
gr.Dropdown(choices=[]), # log_batch_dropdown
gr.Dropdown(choices=[]), # log_task_type_dropdown
gr.Dropdown(choices=[]), # log_group_dropdown
gr.Dropdown(choices=[]), # log_model_choice_dropdown
gr.Dropdown(choices=[]), # log_validated_dropdown
"### Prompt\n\nNo file uploaded.", # log_prompt_markdown
"### Response\n\nNo file uploaded.", # log_response_markdown
)
file_path = log_file.name if hasattr(log_file, "name") else log_file
log_data = load_log_file(file_path)
if not log_data:
return (
[], # log_data_state
gr.Dropdown(choices=[]), # log_batch_dropdown
gr.Dropdown(choices=[]), # log_task_type_dropdown
gr.Dropdown(choices=[]), # log_group_dropdown
gr.Dropdown(choices=[]), # log_model_choice_dropdown
gr.Dropdown(choices=[]), # log_validated_dropdown
"### Prompt\n\nError: Could not load log file or file is empty.", # log_prompt_markdown
"### Response\n\nError: Could not load log file or file is empty.", # log_response_markdown
)
# Extract unique values for all filters
batches, task_types, groups, model_choices, validated_values = (
extract_unique_filter_values(log_data)
)
# Format choices as strings for dropdown, add "All" option
batch_choices = ["All"] + [str(b) for b in batches] if batches else ["All"]
task_type_choices = ["All"] + task_types if task_types else ["All"]
group_choices = ["All"] + groups if groups else ["All"]
model_choice_choices = ["All"] + model_choices if model_choices else ["All"]
validated_choices = ["All"] + validated_values if validated_values else ["All"]
# Get default prompt and response (show all if no filters)
prompt, response = get_prompt_and_response(log_data)
# Format for markdown display
prompt_display = f"### Prompt\n\n{prompt}"
response_display = f"### Response\n\n{response}"
return (
log_data, # log_data_state
gr.Dropdown(choices=batch_choices, value="All"), # log_batch_dropdown
gr.Dropdown(choices=task_type_choices, value="All"), # log_task_type_dropdown
gr.Dropdown(choices=group_choices, value="All"), # log_group_dropdown
gr.Dropdown(
choices=model_choice_choices, value="All"
), # log_model_choice_dropdown
gr.Dropdown(choices=validated_choices, value="All"), # log_validated_dropdown
prompt_display, # log_prompt_markdown
response_display, # log_response_markdown
)
def filter_log_display(log_data, batch_str, task_type, group, model_choice, validated):
"""Filter and display log entries based on all filter criteria."""
if not log_data:
return (
"### Prompt\n\nNo log data available.",
"### Response\n\nNo log data available.",
)
# Convert batch string to int, handle "All" option
batch = None
if batch_str and batch_str != "All":
try:
batch = int(batch_str)
except (ValueError, TypeError):
batch = None
# Handle "All" option for all filters
task_type_filter = None if task_type == "All" else task_type
group_filter = None if group == "All" else group
model_choice_filter = None if model_choice == "All" else model_choice
validated_filter = None if validated == "All" else validated
# Get filtered prompt and response
prompt, response = get_prompt_and_response(
log_data,
batch,
task_type_filter,
group_filter,
model_choice_filter,
validated_filter,
)
# Format for markdown display
prompt_display = f"### Prompt\n\n{prompt}"
response_display = f"### Response\n\n{response}"
return prompt_display, response_display
# Handle batch and task type changes
def update_log_display_on_filter(
log_data, batch_str, task_type, group, model_choice, validated
):
"""Update display when any filter changes."""
return filter_log_display(
log_data, batch_str, task_type, group, model_choice, validated
)
def load_log_file(file_path: str) -> List[Dict]:
"""
Load a JSON log file containing LLM prompts and responses.
Args:
file_path: Path to the JSON file
Returns:
List of dictionaries containing log entries
"""
try:
with open(file_path, "r", encoding="utf-8-sig", errors="replace") as f:
data = json.load(f)
return data if isinstance(data, list) else []
except (FileNotFoundError, json.JSONDecodeError, Exception) as e:
print(f"Error loading log file: {e}")
return []
def parse_batch_number(batch: str) -> int:
"""
Parse batch number from string format.
Handles formats like "1", "1:", "2", "2:", etc.
Args:
batch: Batch string (e.g., "1", "1:", "2")
Returns:
Integer batch number
"""
if not batch:
return 0
# Remove colon if present and strip whitespace
batch_clean = batch.strip().rstrip(":")
try:
return int(batch_clean)
except (ValueError, TypeError):
return 0
def extract_unique_batches_and_task_types(
log_data: List[Dict],
) -> Tuple[List[int], List[str]]:
"""
Extract unique batch numbers and task types from log data.
Args:
log_data: List of log entry dictionaries
Returns:
Tuple of (sorted list of unique batch numbers, sorted list of unique task types)
"""
batches = set()
task_types = set()
for entry in log_data:
if "batch" in entry:
batch_num = parse_batch_number(str(entry["batch"]))
if batch_num > 0:
batches.add(batch_num)
if "task_type" in entry:
task_type = entry["task_type"]
if task_type:
task_types.add(task_type)
return sorted(list(batches)), sorted(list(task_types))
def extract_unique_filter_values(
log_data: List[Dict],
) -> Tuple[List[int], List[str], List[str], List[str], List[str]]:
"""
Extract unique values for all filter fields from log data.
Args:
log_data: List of log entry dictionaries
Returns:
Tuple of (batches, task_types, groups, model_choices, validated_values)
"""
batches = set()
task_types = set()
groups = set()
model_choices = set()
validated_values = set()
for entry in log_data:
if "batch" in entry:
batch_num = parse_batch_number(str(entry["batch"]))
if batch_num > 0:
batches.add(batch_num)
if "task_type" in entry:
task_type = entry["task_type"]
if task_type:
task_types.add(task_type)
if "group" in entry:
group = entry["group"]
if group:
groups.add(str(group))
if "model_choice" in entry:
model_choice = entry["model_choice"]
if model_choice:
model_choices.add(str(model_choice))
if "validated" in entry:
validated = entry["validated"]
if validated:
validated_values.add(str(validated))
return (
sorted(list(batches)),
sorted(list(task_types)),
sorted(list(groups)),
sorted(list(model_choices)),
sorted(list(validated_values)),
)
def filter_log_entries(
log_data: List[Dict],
batch: Optional[int] = None,
task_type: Optional[str] = None,
group: Optional[str] = None,
model_choice: Optional[str] = None,
validated: Optional[str] = None,
) -> List[Dict]:
"""
Filter log entries by batch number, task type, group, model_choice, and/or validated.
Args:
log_data: List of log entry dictionaries
batch: Optional batch number to filter by
task_type: Optional task type to filter by
group: Optional group to filter by
model_choice: Optional model choice to filter by
validated: Optional validated value to filter by
Returns:
Filtered list of log entries
"""
filtered = []
for entry in log_data:
match_batch = True
match_task_type = True
match_group = True
match_model_choice = True
match_validated = True
if batch is not None:
entry_batch = parse_batch_number(str(entry.get("batch", "0")))
match_batch = entry_batch == batch
if task_type is not None:
entry_task_type = entry.get("task_type", "")
match_task_type = entry_task_type == task_type
if group is not None:
entry_group = str(entry.get("group", ""))
match_group = entry_group == group
if model_choice is not None:
entry_model_choice = str(entry.get("model_choice", ""))
match_model_choice = entry_model_choice == model_choice
if validated is not None:
entry_validated = str(entry.get("validated", ""))
match_validated = entry_validated == validated
if (
match_batch
and match_task_type
and match_group
and match_model_choice
and match_validated
):
filtered.append(entry)
return filtered
def get_prompt_and_response(
log_data: List[Dict],
batch: Optional[int] = None,
task_type: Optional[str] = None,
group: Optional[str] = None,
model_choice: Optional[str] = None,
validated: Optional[str] = None,
) -> Tuple[str, str]:
"""
Get prompt and response text for filtered log entries.
If multiple entries match, concatenate them.
Args:
log_data: List of log entry dictionaries
batch: Optional batch number to filter by
task_type: Optional task type to filter by
group: Optional group to filter by
model_choice: Optional model choice to filter by
validated: Optional validated value to filter by
Returns:
Tuple of (prompt_text, response_text)
"""
filtered = filter_log_entries(
log_data, batch, task_type, group, model_choice, validated
)
if not filtered:
return "No entries found for the selected batch and task type.", ""
# If multiple entries, combine them with separators
prompts = []
responses = []
for idx, entry in enumerate(filtered, 1):
prompt = entry.get("prompt", "")
response = entry.get("response", "")
# Add entry number if multiple entries
entry_header = f"**Entry {idx}**" if len(filtered) > 1 else ""
if prompt:
if entry_header:
prompts.append(f"{entry_header}\n\n{prompt}")
else:
prompts.append(prompt)
if response:
if entry_header:
responses.append(f"{entry_header}\n\n{response}")
else:
responses.append(response)
prompt_text = "\n\n---\n\n".join(prompts) if prompts else "No prompt found."
response_text = "\n\n---\n\n".join(responses) if responses else "No response found."
return prompt_text, response_text
def load_and_initialize_log_viewer(
file_path: str,
) -> Tuple[List[int], List[str], str, str]:
"""
Load log file and initialize viewer with default values.
Args:
file_path: Path to the JSON log file
Returns:
Tuple of (batch_numbers, task_types, default_prompt, default_response)
"""
log_data = load_log_file(file_path)
if not log_data:
return [], [], "No log data found in file.", ""
batches, task_types = extract_unique_batches_and_task_types(log_data)
# Get default prompt and response (first entry or all if no filters)
prompt, response = get_prompt_and_response(log_data)
return batches, task_types, prompt, response