import streamlit as st import json import PyPDF2 from docling.document_converter import DocumentConverter import re from io import BytesIO import openai import anthropic # Add import for Anthropic's Claude models import pandas as pd import itertools import random import math from tqdm import tqdm # Setup page config st.set_page_config( page_title="Template Generator", layout="wide", initial_sidebar_state="expanded", ) # Initialize OpenAI client (you'll need to provide your API key) def get_openai_client(): api_key = st.session_state.get("api_key", "") if api_key: return openai.OpenAI(api_key=api_key) return None def get_anthropic_client(): api_key = st.session_state.get("anthropic_api_key", "") if api_key: return anthropic.Anthropic(api_key=api_key) return None def call_model_api(prompt, model, temperature=0.7, max_tokens=1000): """ Abstraction function to call the appropriate LLM API based on the model name. Args: prompt (str): The prompt to send to the model model (str): The model name (e.g., "gpt-4", "claude-3-opus-latest") temperature (float): Creativity parameter (0.0 to 1.0) max_tokens (int): Maximum number of tokens to generate Returns: str: The generated text response """ # Check if it's a Claude model if model.startswith("claude"): client = get_anthropic_client() if not client: return "Error: No Anthropic API key provided." try: response = client.messages.create( model=model, messages=[{"role": "user", "content": prompt}], max_tokens=max_tokens, temperature=temperature, ) return response.content[0].text except Exception as e: return f"Error calling Anthropic API: {str(e)}" # Otherwise, use OpenAI else: client = get_openai_client() if not client: return "Error: No OpenAI API key provided." try: response = client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}], max_tokens=max_tokens, temperature=temperature, ) return response.choices[0].message.content except Exception as e: return f"Error calling OpenAI API: {str(e)}" # @st.cache_resource def get_document_converter(): """Cache the DocumentConverter to prevent reloading on each interaction""" return None # Return None initially def get_or_create_document_converter(): """Get existing converter or create a new one only when needed""" converter = get_document_converter() if converter is None: converter = DocumentConverter() # Update the cached value get_document_converter._cached_obj = converter return converter def create_example_templates(): examples = [ { "name": "Character Generator", "description": "Generate fantasy character descriptions based on selected traits", "version": "1.0.0", "input": [ { "name": "race", "description": "Character's fantasy race", "type": "categorical", "options": ["Human", "Elf", "Dwarf", "Orc", "Halfling"], "min": 1, "max": 1, }, { "name": "class", "description": "Character's profession or class", "type": "categorical", "options": ["Warrior", "Mage", "Rogue", "Cleric", "Ranger"], "min": 1, "max": 1, }, { "name": "alignment", "description": "Character's moral alignment", "type": "categorical", "options": [ "Lawful Good", "Neutral", "Chaotic Evil", "Lawful Evil", "Chaotic Good", ], "min": 1, "max": 1, }, ], "output": [ { "name": "character_name", "description": "Generated character name", "type": "string", "min": 3, "max": 30, }, { "name": "background", "description": "Character background story", "type": "string", "min": 100, "max": 500, }, ], "prompt": "Create a fantasy character with the following traits:\nRace: {race}\nClass: {class}\nAlignment: {alignment}\n\nGenerate a suitable name and background story for this character.", }, { "name": "Recipe Generator", "description": "Generate cooking recipes based on ingredients and cuisine", "version": "1.0.0", "input": [ { "name": "cuisine", "description": "Style of cooking", "type": "categorical", "options": ["Italian", "Mexican", "Chinese", "Indian", "French"], "min": 1, "max": 1, }, { "name": "main_ingredient", "description": "Primary ingredient", "type": "categorical", "options": ["Chicken", "Beef", "Fish", "Tofu", "Vegetables"], "min": 1, "max": 1, }, { "name": "dietary_restriction", "description": "Dietary requirements", "type": "categorical", "options": [ "None", "Vegetarian", "Vegan", "Gluten-free", "Dairy-free", ], "min": 1, "max": 1, }, ], "output": [ { "name": "recipe_name", "description": "Name of the recipe", "type": "string", "min": 5, "max": 50, }, { "name": "ingredients", "description": "List of ingredients needed", "type": "string", "min": 50, "max": 300, }, { "name": "instructions", "description": "Cooking instructions", "type": "string", "min": 100, "max": 500, }, ], "prompt": "Create a {cuisine} recipe using {main_ingredient} as the main ingredient. The recipe should be {dietary_restriction}.\n\nProvide a recipe name, list of ingredients, and cooking instructions.", }, { "name": "Product Description", "description": "Generate marketing descriptions for products", "version": "1.0.0", "input": [ { "name": "product_type", "description": "Type of product", "type": "categorical", "options": [ "Smartphone", "Laptop", "Headphones", "Smartwatch", "Camera", ], "min": 1, "max": 1, }, { "name": "target_audience", "description": "Target customer demographic", "type": "categorical", "options": [ "Students", "Professionals", "Gamers", "Creatives", "Seniors", ], "min": 1, "max": 1, }, { "name": "price_tier", "description": "Price category", "type": "categorical", "options": [ "Budget", "Mid-range", "Premium", "Luxury", "Enterprise", ], "min": 1, "max": 1, }, ], "output": [ { "name": "product_name", "description": "Generated product name", "type": "string", "min": 5, "max": 30, }, { "name": "tagline", "description": "Short marketing tagline", "type": "string", "min": 10, "max": 100, }, { "name": "description", "description": "Full product description", "type": "string", "min": 100, "max": 500, }, ], "prompt": "Create a marketing description for a {price_tier} {product_type} targeted at {target_audience}.\n\nProvide a product name, catchy tagline, and compelling product description.", }, ] return examples # Create a function to display example outputs def create_example_outputs(template): # Predefined outputs for each template if template["name"] == "Character Generator": outputs = { "Human Warrior Lawful Good": { "character_name": "Sir Galahad Ironheart", "background": "Born to a noble family in the kingdom of Valorhaven, Sir Galahad trained from childhood in the arts of combat. After saving the king's daughter from bandits, he was knighted and now serves as captain of the royal guard. His unwavering dedication to justice and honor has made him a legend throughout the realm, though his strict adherence to the code of chivalry sometimes puts him at odds with more pragmatic allies.", }, "Elf Mage Chaotic Good": { "character_name": "Lyraniel Starweaver", "background": "Raised in the ancient forest of Eldrath, Lyraniel discovered her affinity for arcane magic when she accidentally set a tree ablaze during an argument. Rather than follow the structured magical traditions of her people, she left to study diverse magical practices across the continent. She now uses her considerable powers to protect the innocent and fight tyranny, though her methods are often unpredictable and sometimes cause as much chaos as they resolve.", }, "Dwarf Rogue Neutral": { "character_name": "Grimble Lockpick", "background": "Once a respected jeweler in the mountain halls of Karak-Dûm, Grimble's curiosity about the perfect lock led him down a different path. Neither malicious nor heroic, he sees himself as a professional who offers specialized services for the right price. His reputation for being able to open any lock or disarm any trap has made him sought after by adventurers and nobles alike, though he remains careful to avoid political entanglements that might limit his freedom.", }, } elif template["name"] == "Recipe Generator": outputs = { "Italian Chicken None": { "recipe_name": "Tuscan Herb-Roasted Chicken", "ingredients": "- 4 chicken breasts\n- 3 tbsp olive oil\n- 4 cloves garlic, minced\n- 1 tbsp fresh rosemary, chopped\n- 1 tbsp fresh thyme, chopped\n- 1 lemon, zested and juiced\n- 1 cup cherry tomatoes, halved\n- 1/2 cup chicken broth\n- 1/4 cup dry white wine\n- Salt and pepper to taste\n- Fresh basil for garnish", "instructions": "1. Preheat oven to 375°F (190°C).\n2. Season chicken breasts with salt and pepper.\n3. In a large oven-safe skillet, heat olive oil over medium-high heat.\n4. Sear chicken breasts for 3-4 minutes per side until golden brown.\n5. Add garlic, rosemary, and thyme to the pan and cook for 1 minute until fragrant.\n6. Add lemon zest, lemon juice, cherry tomatoes, chicken broth, and white wine.\n7. Transfer skillet to the oven and roast for 20-25 minutes until chicken is cooked through.\n8. Garnish with fresh basil before serving.", }, "Mexican Vegetables Vegetarian": { "recipe_name": "Roasted Vegetable Enchiladas Verde", "ingredients": "- 2 zucchini, diced\n- 1 red bell pepper, diced\n- 1 yellow bell pepper, diced\n- 1 red onion, sliced\n- 2 cups mushrooms, sliced\n- 3 tbsp olive oil\n- 2 tsp cumin\n- 1 tsp chili powder\n- 1 tsp oregano\n- 8 corn tortillas\n- 2 cups salsa verde\n- 1 1/2 cups shredded Monterey Jack cheese\n- 1 avocado, sliced\n- 1/4 cup cilantro, chopped\n- Lime wedges for serving", "instructions": "1. Preheat oven to 425°F (220°C).\n2. Toss zucchini, bell peppers, onion, and mushrooms with olive oil, cumin, chili powder, oregano, salt, and pepper.\n3. Spread vegetables on a baking sheet and roast for 20 minutes, stirring halfway through.\n4. Reduce oven temperature to 375°F (190°C).\n5. Warm tortillas slightly to make them pliable.\n6. Fill each tortilla with roasted vegetables and roll up.\n7. Place enchiladas seam-side down in a baking dish.\n8. Pour salsa verde over enchiladas and sprinkle with cheese.\n9. Bake for 20-25 minutes until cheese is melted and bubbly.\n10. Garnish with avocado slices and cilantro. Serve with lime wedges.", }, } elif template["name"] == "Product Description": outputs = { "Smartphone Professionals Premium": { "product_name": "ExecuTech Pro X9", "tagline": "Seamless productivity meets uncompromising elegance.", "description": 'The ExecuTech Pro X9 redefines what a business smartphone can be. Crafted with aerospace-grade materials and featuring our revolutionary 6.7" CrystalClear AMOLED display, the Pro X9 ensures your presentations and video conferences look impeccable in any lighting condition. The advanced 5-lens camera system with AI enhancement captures professional-quality images for your reports and social media, while the dedicated security co-processor keeps your sensitive data protected with military-grade encryption. With an impressive 36-hour battery life and our proprietary RapidCharge technology, the Pro X9 keeps pace with your demanding schedule. Experience the perfect balance of performance and sophistication that successful professionals deserve.', }, "Headphones Gamers Mid-range": { "product_name": "SonicStrike GT-500", "tagline": "Hear every move. Dominate every game.", "description": "Level up your gaming experience with the SonicStrike GT-500 gaming headset. Engineered specifically for competitive gamers, these headphones feature our proprietary 50mm UltraBass drivers that deliver thunderous lows while maintaining crystal-clear highs, allowing you to hear enemy footsteps with pinpoint accuracy. The detachable boom microphone with noise-cancellation ensures your teammates hear your callouts clearly, even in the heat of battle. With memory foam ear cushions wrapped in breathable mesh fabric, the GT-500 remains comfortable during marathon gaming sessions. Compatible with all major gaming platforms and featuring customizable RGB lighting through our GameSync app, the SonicStrike GT-500 offers premium features at a price that won't break the bank. Your gaming advantage starts here.", }, } else: outputs = {} return outputs def calculate_cartesian_product_size(categorical_vars): """Calculate the size of the Cartesian product based on selected options.""" if not categorical_vars: return 0, [] # Calculate the product size product_size = 1 var_counts = [] for var in categorical_vars: options = var.get("options", []) # Use selected_options if available, otherwise use all options selected_options = var.get("selected_options", options) min_sel = var.get("min", 1) max_sel = var.get("max", 1) # Use only selected options for calculation options_to_use = [opt for opt in options if opt in selected_options] # If no options selected, use all options if not options_to_use: options_to_use = options # Single selection case if min_sel == 1 and max_sel == 1: count = len(options_to_use) else: # Multi-selection case - calculate combinations count = 0 # Include min selections from math import comb if len(options_to_use) >= min_sel: count += comb(len(options_to_use), min_sel) # Include max selections if different from min if max_sel != min_sel and len(options_to_use) >= max_sel: count += comb(len(options_to_use), max_sel) # Include some intermediate selections if applicable for size in range(min_sel + 1, max_sel): if len(options_to_use) >= size: count += min( 3, comb(len(options_to_use), size) ) # Take up to 3 samples var_counts.append({"name": var["name"], "count": count}) product_size *= max(count, 1) # Avoid multiplying by zero return product_size, var_counts @st.cache_data def parse_documents(uploaded_files): """Parse multiple document files and extract their text content.""" if not uploaded_files: return "" import tempfile import os converter = get_or_create_document_converter() content = "" for file in uploaded_files: try: file_type = file.name.split(".")[-1].lower() # Handle text files directly if file_type == "txt": content += file.getvalue().decode("utf-8") # Use converter for other supported file types elif file_type in ["pdf", "docx", "html"]: # Create a temporary file with the correct extension with tempfile.NamedTemporaryFile( delete=False, suffix=f".{file_type}" ) as tmp_file: # Write the uploaded file content to the temp file tmp_file.write(file.getvalue()) tmp_path = tmp_file.name # Convert using the file path instead of the UploadedFile object source = converter.convert(tmp_path) content += source.document.export_to_markdown() # Clean up the temporary file os.unlink(tmp_path) else: st.warning(f"Unsupported file type: {file.name}") except Exception as e: st.error(f"Error processing file {file.name}: {str(e)}") return content # Add this function after parse_documents function def parse_template_file(uploaded_template): """Parse an uploaded template JSON file and validate its structure.""" try: # Read the file content if uploaded_template.name.endswith(".json"): template_content = uploaded_template.getvalue().decode("utf-8") template_spec = json.loads(template_content) # Sanitize the template to remove UI-specific keys template_spec = sanitize_template_spec(template_spec) # Validate the template structure required_keys = [ "name", "version", "description", "input", "output", "prompt", ] for key in required_keys: if key not in template_spec: return None, f"Invalid template: Missing '{key}' field" # Validate input and output arrays if not isinstance(template_spec["input"], list): return None, "Invalid template: 'input' must be an array" if not isinstance(template_spec["output"], list): return None, "Invalid template: 'output' must be an array" # Check that each input and output has required fields for i, input_var in enumerate(template_spec["input"]): if not all(k in input_var for k in ["name", "description", "type"]): return ( None, f"Invalid template: Input variable at index {i} is missing required fields", ) for i, output_var in enumerate(template_spec["output"]): if not all(k in output_var for k in ["name", "description", "type"]): return ( None, f"Invalid template: Output variable at index {i} is missing required fields", ) return template_spec, None else: return None, "Uploaded file must be a JSON file" except json.JSONDecodeError: return None, "Invalid JSON format in the uploaded template file" except Exception as e: return None, f"Error parsing template file: {str(e)}" def sanitize_template_spec(template_spec): """ Remove UI-specific keys from template specification that shouldn't be part of the template. Args: template_spec (dict): The template specification to sanitize Returns: dict: Sanitized template specification """ if not template_spec: return template_spec # Create a deep copy to avoid modifying the original sanitized_spec = template_spec.copy() # List of UI-specific keys that should be removed ui_specific_keys = ["previous_options", "selected_options"] # Clean input variables if "input" in sanitized_spec and isinstance(sanitized_spec["input"], list): for i, var in enumerate(sanitized_spec["input"]): # Remove UI-specific keys from each variable sanitized_spec["input"][i] = { k: v for k, v in var.items() if k not in ui_specific_keys } # Clean output variables if "output" in sanitized_spec and isinstance(sanitized_spec["output"], list): for i, var in enumerate(sanitized_spec["output"]): # Remove UI-specific keys from each variable sanitized_spec["output"][i] = { k: v for k, v in var.items() if k not in ui_specific_keys } return sanitized_spec # LLM call function def call_llm(prompt, model="gpt-3.5-turbo"): """Call the LLM API to generate text based on the prompt.""" try: # Get output specifications from the template if available output_specs = "" if st.session_state.show_template_editor and st.session_state.template_spec: output_vars = st.session_state.template_spec.get("output", []) if output_vars: output_specs = "Please generate output with the following specifications in JSON format:\n" for var in output_vars: output_specs += ( f"- {var['name']}: {var['description']} (Type: {var['type']})" ) if var.get("options"): output_specs += f", Options: {var['options']}" output_specs += "\n" # Add the output specs to the prompt prompt = f"{prompt}\n\n{output_specs}\n\nReturn ONLY a JSON object with the output variables, with no additional text or explanation." result = call_model_api( model=model, prompt=prompt, max_tokens=1000, temperature=st.session_state.get("temperature", 0.7), ) # Try to parse as JSON if the template has output variables if ( st.session_state.show_template_editor and st.session_state.template_spec and st.session_state.template_spec.get("output") ): # Extract JSON from the response json_pattern = r"```json\s*([\s\S]*?)\s*```|^\s*\{[\s\S]*\}\s*$" json_match = re.search(json_pattern, result) if json_match: json_str = json_match.group(1) if json_match.group(1) else result # Clean up any remaining markdown or comments json_str = re.sub(r"```.*|```", "", json_str).strip() try: output_data = json.loads(json_str) # Store the parsed JSON in session state for proper rendering st.session_state.json_output = output_data return output_data except: pass else: try: output_data = json.loads(result) # Store the parsed JSON in session state for proper rendering st.session_state.json_output = output_data return output_data except: pass # If we couldn't parse as JSON or it's not meant to be JSON, return as is return result except Exception as e: st.error(f"Error calling LLM API: {str(e)}") return f"Error: {str(e)}" # Function to generate a template based on instructions and documents def generate_template_from_instructions(instructions, document_content=""): """ Use LLM to generate a template specification based on user instructions and document content. """ # Prepare the prompt for the LLM prompt = f""" You are a template designer for an LLM-powered content generation system. Create a template specification based on the following instructions: INSTRUCTIONS: {instructions} {"DOCUMENT CONTENT (EXCERPT):" + document_content + "..." if document_content else "NO DOCUMENTS PROVIDED"} Generate a JSON template specification with the following structure: {{ "name": "A descriptive name for the template", "version": "1.0.0", "description": "A brief description of what this template does", "input": [ {{ "name": "variable_name", "description": "What this variable represents", "type": "string/int/float/bool/categorical", "min": minimum_value_or_length, "max": maximum_value_or_length, "options": ["option1", "option2"] (only for categorical type) }}, ... more input variables ], "output": [ {{ "name": "output_variable_name", "description": "What this output represents", "type": "string/int/float/bool/categorical" }}, ... more output variables ], "prompt": "A template string with {{variable_name}} placeholders that will be replaced with actual values" }} Make sure the prompt includes all input variables and is designed to produce the expected outputs. The prompt should address an LLM as if it was a combination of a system prompt and user input, and must contain information around formatting, structure and context for the LLM to generate the desired content as derived from these instructions and/or documents. If a 'lore' or 'knowledge_base' should be incorporated, include {{lore}} in the prompt template. If document content was provided, design the template to effectively use that information. """ try: # Call the LLM to generate the template template_text = call_model_api( model=st.session_state.model, prompt=prompt, max_tokens=4096, temperature=0.7, ) # Extract the JSON part from the response json_pattern = r"```json\s*([\s\S]*?)\s*```|^\s*{[\s\S]*}\s*$" json_match = re.search(json_pattern, template_text) if json_match: json_str = json_match.group(1) if json_match.group(1) else template_text # Clean up any remaining markdown or comments json_str = re.sub(r"```.*|```", "", json_str).strip() template_spec = json.loads(json_str, strict=False) return template_spec else: # If no JSON format found, try to parse the entire response try: template_spec = json.loads(template_text, strict=False) return template_spec except: st.warning("LLM didn't return valid JSON. Using fallback template.") return create_fallback_template(instructions) except Exception as e: st.error(f"Error generating template: {str(e)}") return create_fallback_template(instructions) # Add these functions after the generate_template_from_instructions function def generate_improved_prompt_template(template_spec, knowledge_base=""): """ Use LLM to generate an improved prompt template based on current template variables. """ if not st.session_state.get("api_key") and not st.session_state.get( "anthropic_api_key" ): st.error("Please provide an OpenAI or Anthropic API key to rewrite the prompt.") return template_spec["prompt"] # Extract template information for context input_vars = template_spec["input"] output_vars = template_spec["output"] template_description = template_spec["description"] # Format variable information for the prompt input_vars_text = "\n".join( [ f"- {var['name']}: {var['description']} (Type: {var['type']})" + (f", Options: {var['options']}" if var.get("options") else "") for var in input_vars ] ) output_vars_text = "\n".join( [ f"- {var['name']}: {var['description']} (Type: {var['type']})" for var in output_vars ] ) # Prepare the prompt for the LLM prompt = f""" You are an expert at designing effective prompts for LLMs. Rewrite the prompt template based on the following details: TEMPLATE PURPOSE: {template_description} INPUT VARIABLES: {input_vars_text} OUTPUT VARIABLES: {output_vars_text} {"KNOWLEDGE BASE AVAILABLE:" if knowledge_base else "NO KNOWLEDGE BASE AVAILABLE."} {knowledge_base if knowledge_base else ""} Current prompt template: {template_spec["prompt"]} Please create an improved prompt template that: 1. Uses all input variables (in curly braces like {{variable_name}}) 2. Is designed to generate the specified outputs 3. Includes {{lore}} where background information or context should be inserted 4. Is clear, specific, and well-structured 5. Provides enough guidance to the LLM to generate high-quality results Return ONLY the revised prompt template text, with no additional explanations. """ try: # Call the LLM to generate the improved prompt template improved_template = call_model_api( model=st.session_state.model, prompt=prompt, max_tokens=4096, temperature=0.7, ) # Remove any markdown code block formatting if present improved_template = re.sub(r"```.*\n|```", "", improved_template) return improved_template except Exception as e: st.error(f"Error generating improved prompt: {str(e)}") return template_spec["prompt"] # Fallback template if generation fails def create_fallback_template(instructions=""): """Create a basic template to use as fallback.""" return { "name": "Generated Template", "version": "1.0.0", "description": instructions, "input": [ { "name": "input_1", "description": "First input variable", "type": "string", "min": 1, "max": 100, } ], "output": [ { "name": "output_1", "description": "Generated output", "type": "string", "min": 10, "max": 1000, } ], "prompt": "Based on the following information:\n{input_1}\n\nAnd considering this additional context:\n{lore}\n\nGenerate the following output.", } def generate_synthetic_inputs_hybrid(template_spec, num_samples=10, max_retries=3): """ Generate synthetic input data using a hybrid approach: - Programmatically generate combinations of categorical variables - Use LLM to fill in non-categorical variables - Process row by row for resilience """ if not st.session_state.get("api_key") and not st.session_state.get( "anthropic_api_key" ): st.error("Please provide an OpenAI API key to generate synthetic data.") return [] # Extract all variables from the template input_vars = template_spec["input"] # Separate categorical and non-categorical variables categorical_vars = [ var for var in input_vars if var["type"] == "categorical" and var.get("options") ] non_categorical_vars = [var for var in input_vars if var not in categorical_vars] default_value_vars = [var for var in input_vars if "default_value" in var] # Process in batches and show progress with st.spinner(f"Generating {num_samples} synthetic inputs..."): progress_bar = st.progress(0) results = [] # If we have categorical variables, use them to create base permutations if categorical_vars: st.info( f"Generating permutations for {len(categorical_vars)} categorical variables" ) # Create permutations of categorical values permutations = generate_categorical_permutations( categorical_vars, num_samples ) # For each permutation, fill in non-categorical variables for i, perm in enumerate(permutations): # Update progress progress_bar.progress(min((i + 1) / len(permutations), 1.0)) # Create a complete row by adding non-categorical values row = perm.copy() # Add default values first for var in default_value_vars: row[var["name"]] = var["default_value"] # Generate values for remaining non-categorical variables remaining_non_cat_vars = [ var for var in non_categorical_vars if var not in default_value_vars ] if remaining_non_cat_vars: non_cat_values = generate_non_categorical_values( remaining_non_cat_vars, perm, max_retries ) row.update(non_cat_values) results.append(row) # Stop if we have enough samples if len(results) >= num_samples: break else: # No categorical variables, generate each row individually for i in range(num_samples): # Update progress progress_bar.progress(min((i + 1) / num_samples, 1.0)) # Generate a complete row of values row = generate_single_row(input_vars, max_retries) if row: results.append(row) # Ensure we have the requested number of samples while len(results) < num_samples: # Generate additional rows if needed row = generate_single_row(input_vars, max_retries) if row: results.append(row) # Ensure progress bar completes progress_bar.progress(1.0) return results[:num_samples] def generate_categorical_permutations(categorical_vars, target_count): """Generate efficient permutations of categorical variables.""" # Build option sets for each categorical variable option_sets = [] for var in categorical_vars: var_name = var["name"] options = var.get("options", []) min_sel = var.get("min", 1) max_sel = var.get("max", 1) # Get selected options if they exist selected_options = var.get("selected_options", options) # Use only selected options for permutation options_to_use = [opt for opt in options if opt in selected_options] # If no options selected, use all options if not options_to_use: options_to_use = options # Single selection case if min_sel == 1 and max_sel == 1: option_sets.append([(var_name, opt) for opt in options_to_use]) else: # Multi-selection case - generate varied selection sizes var_options = [] # Include min selections for combo in itertools.combinations(options_to_use, min_sel): var_options.append((var_name, list(combo))) # Include max selections if different from min if max_sel != min_sel: for combo in itertools.combinations(options_to_use, max_sel): var_options.append((var_name, list(combo))) # Include some intermediate selections if applicable for size in range(min_sel + 1, max_sel): combos = list(itertools.combinations(options_to_use, size)) if combos: sample_size = min(3, len(combos)) # Take up to 3 samples for combo in random.sample(combos, sample_size): var_options.append((var_name, list(combo))) option_sets.append(var_options) # Generate permutations all_permutations = [] for combo in itertools.product(*option_sets): perm = {name: value for name, value in combo} all_permutations.append(perm) # If we have too many permutations, sample a diverse subset if len(all_permutations) > target_count: return random.sample(all_permutations, target_count) # If we don't have enough, duplicate with variations while len(all_permutations) < target_count: # Clone an existing permutation new_perm = random.choice(all_permutations).copy() # Modify a random categorical value if possible if categorical_vars: var = random.choice(categorical_vars) var_name = var["name"] options = var.get("options", []) selected_options = var.get("selected_options", options) # Use only selected options for variation options_to_use = [opt for opt in options if opt in selected_options] if not options_to_use: options_to_use = options if options_to_use and len(options_to_use) > 1: if var.get("min", 1) == 1 and var.get("max", 1) == 1: # For single selection, choose a different option current = new_perm[var_name] other_options = [opt for opt in options_to_use if opt != current] if other_options: new_perm[var_name] = random.choice(other_options) else: # For multi-selection, modify the selection current_selection = new_perm[var_name] min_sel = var.get("min", 1) max_sel = var.get("max", 1) # Decide whether to add or remove an item if len(current_selection) < max_sel and random.random() > 0.5: # Add an item not already in the selection available = [ opt for opt in options_to_use if opt not in current_selection ] if available: current_selection.append(random.choice(available)) elif len(current_selection) > min_sel: # Remove a random item idx_to_remove = random.randrange(len(current_selection)) current_selection.pop(idx_to_remove) all_permutations.append(new_perm) return all_permutations def generate_non_categorical_values(non_cat_vars, existing_values, max_retries): """Generate values for non-categorical variables given existing categorical values.""" if not non_cat_vars: return {} # Separate string and numeric variables llm_vars = [var for var in non_cat_vars if var["type"] == "string"] numeric_vars = [var for var in non_cat_vars if var["type"] in ["int", "float"]] # Sample numeric values within the specified range result_values = {} # result_values_descr = {} # Uncomment to include the var description, i.e. units so the LLM understands the numerical values # Otherwise, good practice is to include units in numerical vars names (e.g. price_in_euros instead of price) for var in numeric_vars: name = var["name"] var_min = var.get("min") var_max = var.get("max") # description = var.get("description") if var_min is None or var_max is None: result_values[name] = get_default_value(var) # result_values_descr[name] = get_default_value(var) else: try: if var["type"] == "int": result_values[name] = random.randint(int(var_min), int(var_max)) # result_values_descr[name] = [result_values[name], description] elif var["type"] == "float": result_values[name] = round(random.uniform(float(var_min), float(var_max)), 2) # result_values_descr[name] = [result_values[name], description] except: result_values[name] = get_default_value(var) # result_values_descr[name] = get_default_value(var) # Format the string variables for the prompt if llm_vars: vars_text = "\n".join( [f"- {var['name']}: {var['description']} (Type: string)" for var in llm_vars] ) # Combine categorical and numeric values for LLM context # context_values = {**existing_values, **result_values_descr} context_values = {**existing_values, **result_values} print(context_values) # Create prompt with existing categorical and numerical values as context prompt = f""" As a synthetic data generator, create values for these variables: {vars_text} These values should be coherent with the existing categorical and/or numerical values: {json.dumps(context_values, indent=2)} Return ONLY a JSON object with the new variable values: {{ "variable_name_1": value1, "variable_name_2": value2 }} """ # print("*************** PROMPT FOR STR VAR:", prompt) for attempt in range(max_retries): try: response = call_model_api( model=st.session_state.model, prompt=prompt, max_tokens=1000, temperature=st.session_state.temperature, ) result = response.strip() # Extract JSON json_pattern = r"```json\s*([\s\S]*?)\s*```|^\s*\{[\s\S]*\}\s*$" json_match = re.search(json_pattern, result) if json_match: json_str = json_match.group(1) if json_match.group(1) else result json_str = re.sub(r"```.*|```", "", json_str).strip() try: values = json.loads(json_str, strict=False) if isinstance(values, dict): result_values.update(values) return result_values except: pass else: try: values = json.loads(result, strict=False) if isinstance(values, dict): result_values.update(values) return result_values except: pass except Exception as e: if attempt == max_retries - 1: st.warning(f"Failed to generate string values: {str(e)}") # Fallback: generate empty values for all string variables for var in llm_vars: result_values[var["name"]] = get_default_value(var) return result_values def generate_single_row(all_vars, max_retries): """Generate a complete row of data using hybrid logic: - Use LLM for string/categorical vars - Sample int/float within range """ numeric_vars = [var for var in all_vars if var["type"] in ["int", "float"]] llm_vars = [var for var in all_vars if var["type"] in ["string", "categorical"]] row = {} # Sample numeric vars for var in numeric_vars: name = var["name"] var_min = var.get("min") var_max = var.get("max") if var_min is None or var_max is None: row[name] = get_default_value(var) else: try: if var["type"] == "int": row[name] = random.randint(int(var_min), int(var_max)) elif var["type"] == "float": row[name] = round(random.uniform(float(var_min), float(var_max)), 2) except: row[name] = get_default_value(var) # Generate string and categorical via LLM if llm_vars: vars_text = "\n".join( [ f"- {var['name']}: {var['description']} (Type: {var['type']})" + ( f", Options: {var['options']}" if var["type"] == "categorical" and var.get("options") else "" ) for var in llm_vars ] ) prompt = f""" You are a synthetic data generator. Generate values for the following variables: {vars_text} Based on this partial row: {json.dumps(row, indent=2)} Return ONLY a JSON object with the new values: {{ "var_name_1": value1, "var_name_2": value2 }} For categorical variables that allow multiple selections, return a list of values. """ # print("*************** PROMPT FOR STR,CAT VAR:", prompt) for attempt in range(max_retries): try: response = call_model_api( model=st.session_state.model, prompt=prompt, max_tokens=1000, temperature=st.session_state.temperature, ) result = response.strip() json_pattern = r"```json\s*([\s\S]*?)\s*```|^\s*\{[\s\S]*\}\s*$" json_match = re.search(json_pattern, result) if json_match: json_str = json_match.group(1) if json_match.group(1) else result json_str = re.sub(r"```.*|```", "", json_str).strip() values = json.loads(json_str, strict=False) if isinstance(values, dict): row.update(values) break else: values = json.loads(result, strict=False) if isinstance(values, dict): row.update(values) break except Exception as e: if attempt == max_retries - 1: st.warning(f"Failed to generate string/categorical values: {str(e)}") return row if row else None def get_default_value(var): """Generate a default value for a variable based on its type.""" var_type = var["type"] if var_type == "string": return "N/A" elif var_type == "int": min_val = var.get("min", 0) max_val = var.get("max", 100) return min_val elif var_type == "float": min_val = float(var.get("min", 0)) max_val = float(var.get("max", 1)) return min_val elif var_type == "bool": return False elif var_type == "categorical": options = var.get("options", []) min_sel = var.get("min", 1) if options: if min_sel == 1 and var.get("max", 1) == 1: return options[0] else: return options[:min_sel] else: return None return None def generate_synthetic_outputs( template_spec, input_data, knowledge_base="", max_retries=3 ): """Generate synthetic output data based on template and input data with retry logic.""" output_vars = template_spec["output"] prompt_template = template_spec["prompt"] # Format output variable information for the prompt output_vars_text = "\n".join( [ f"- {var['name']}: {var['description']} (Type: {var['type']}) {'Options: '+str(var['options']) if var.get('options') else ''}" for var in output_vars ] ) input_vars = template_spec["input"] input_vars_text = "\n".join( [ f"- {var['name']}: {var['description']} (Type: {var['type']})" for var in input_vars ] ) output_format = "{" for var in output_vars: output_format += f'"{var["name"]}": output, ' output_format = output_format.rstrip(", ") + "}" results = [] # Create a progress bar progress_bar = st.progress(0) try: input_var_names = [var["name"] for var in template_spec["input"]] for i, input_item in enumerate(input_data): # Filter out variables not defined in the template spec input_item = {k: v for k, v in input_item.items() if k in input_var_names} # Fill the prompt template with input values filled_prompt = prompt_template for var_name, var_value in input_item.items(): filled_prompt = filled_prompt.replace(f"{{{var_name}}}", str(var_value)) # Replace {lore} with knowledge base if present if "{lore}" in filled_prompt: filled_prompt = filled_prompt.replace("{lore}", knowledge_base) # Create a prompt for generating synthetic output generation_prompt = f""" You are generating synthetic output data based on the following input: DEFINITION OF INPUT VARIABLES: {input_vars_text} INPUT DATA: {json.dumps(input_item, indent=2)} PROMPT USED: {filled_prompt} REQUIRED OUTPUT VARIABLES: {output_vars_text} Generate realistic output data for these variables. Return ONLY a JSON object with the below format, using the names of the required output variables as keys: {output_format} Use appropriate data types for each variable. Return ONLY the JSON object with no additional text or explanation. The response must be valid JSON that can be parsed directly. """ # debug logs: # print("*************Filtered Input:", input_item) # print("*************Generated Prompt:", generation_prompt) output_data = None for attempt in range(max_retries): try: response = call_model_api( model=st.session_state.model, prompt=generation_prompt, max_tokens=2000, temperature=st.session_state.temperature, ) result = response.strip() # Extract JSON from the response json_pattern = r"```json\s*([\s\S]*?)\s*```|^\s*\{[\s\S]*\}\s*$" json_match = re.search(json_pattern, result) if json_match: json_str = ( json_match.group(1) if json_match.group(1) else result ) # Clean up any remaining markdown or comments json_str = re.sub(r"```.*|```", "", json_str).strip() try: output_data = json.loads(json_str, strict=False) # Validate that we got a dictionary if isinstance(output_data, dict): # Check if all required output variables are present required_vars = [var["name"] for var in output_vars] if all(var in output_data for var in required_vars): break # Valid output, exit retry loop else: missing_vars = [ var for var in required_vars if var not in output_data ] st.warning( f"Attempt {attempt+1} for input {i+1}: Missing output variables: {missing_vars}. Retrying..." ) else: st.warning( f"Attempt {attempt+1} for input {i+1}: Generated output is not a dictionary. Retrying..." ) except json.JSONDecodeError: st.warning( f"Attempt {attempt+1} for input {i+1}: Failed to parse JSON. Retrying..." ) else: # Try to parse the entire response as JSON try: output_data = json.loads(result, strict=False) # Validate that we got a dictionary if isinstance(output_data, dict): # Check if all required output variables are present required_vars = [var["name"] for var in output_vars] if all(var in output_data for var in required_vars): break # Valid output, exit retry loop else: missing_vars = [ var for var in required_vars if var not in output_data ] st.warning( f"Attempt {attempt+1} for input {i+1}: Missing output variables: {missing_vars}. Retrying..." ) else: st.warning( f"Attempt {attempt+1} for input {i+1}: Generated output is not a dictionary. Retrying..." ) except json.JSONDecodeError: st.warning( f"Attempt {attempt+1} for input {i+1}: Failed to parse JSON. Retrying..." ) except Exception as e: st.warning( f"Attempt {attempt+1} for input {i+1}: Error generating output: {str(e)}. Retrying..." ) # If we've reached the max retries, log the error if attempt == max_retries - 1: st.error( f"Failed to generate valid output for input {i+1} after {max_retries} attempts." ) output_data = { "error": f"Failed to generate valid output after {max_retries} attempts" } # Combine input and output data if output_data: combined_data = {**input_item, **output_data} results.append(combined_data) else: results.append({**input_item, "error": "Failed to generate output"}) # Update progress bar progress_bar.progress((i + 1) / len(input_data)) finally: # Ensure progress bar reaches 100% when done if len(input_data) > 0: progress_bar.progress(1.0) return results def suggest_variable_values_from_kb( variable_name, variable_type, knowledge_base, model="gpt-3.5-turbo" ): """ Use LLM to suggest possible values for a variable based on the knowledge base content. Especially useful for categorical variables to extract options from documents. """ if not knowledge_base: return None # Truncate knowledge base if it's too long kb_excerpt = ( knowledge_base[:100000] + "..." if len(knowledge_base) > 100000 else knowledge_base ) prompt = f""" Based on the following knowledge base content, suggest appropriate values for a variable named "{variable_name}" of type "{variable_type}". KNOWLEDGE BASE EXCERPT: {kb_excerpt} TASK: Extract or suggest appropriate values for this variable from the knowledge base. If the variable type is "categorical", return a list of possible options found in the knowledge base. If the variable type is "string", suggest a few example values. If the variable type is "int" or "float", suggest appropriate min/max ranges. If the variable type is "bool", suggest appropriate true/false conditions. Return your response as a JSON object with the following structure: For categorical: {{"options": ["option1", "option2", ...]}} For string: {{"examples": ["example1", "example2", ...], "min": min_length, "max": max_length}} For int/float: {{"min": minimum_value, "max": maximum_value, "examples": [value1, value2, ...]}} For bool: {{"examples": ["condition for true", "condition for false"]}} Only include values that are actually present or strongly implied in the knowledge base. """ try: result = call_model_api( model=model, prompt=prompt, max_tokens=1000, temperature=0.3, ) # Extract JSON from the response json_pattern = r"```json\s*([\s\S]*?)\s*```|^\s*\{[\s\S]*\}\s*$" json_match = re.search(json_pattern, result) if json_match: json_str = json_match.group(1) if json_match.group(1) else result json_str = re.sub(r"```.*|```", "", json_str).strip() try: suggestions = json.loads(json_str, strict=False) return suggestions except: pass else: try: suggestions = json.loads(result, strict=False) return suggestions except: pass return None except Exception as e: print(f"Error suggesting variable values: {str(e)}") return None @st.cache_data def analyze_knowledge_base(knowledge_base, model="gpt-4o-mini"): """ Analyze the knowledge base to extract potential variable names and values. This can be used to suggest variables when creating a new template. """ if not knowledge_base: return None # Truncate knowledge base if it's too long kb_excerpt = ( knowledge_base[:100000] + "..." if len(knowledge_base) > 100000 else knowledge_base ) prompt = f""" Analyze the following knowledge base content and identify potential variables that could be used in a template. KNOWLEDGE BASE EXCERPT: {kb_excerpt} TASK: 1. Identify key entities, attributes, or concepts that could be used as variables 2. For each variable, suggest an appropriate type (string, int, float, bool, categorical) 3. For categorical variables, suggest possible options Return your analysis as a JSON array with the following structure: [ {{ "name": "variable_name", "description": "what this variable represents", "type": "string/int/float/bool/categorical", "options": ["option1", "option2", ...] (only for categorical type) }}, ... ] Focus on extracting variables that appear frequently or seem important in the knowledge base. """ try: result = call_model_api( model=model, prompt=prompt, max_tokens=2000, temperature=0.3, ) # Extract JSON from the response json_pattern = r"```json\s*([\s\S]*?)\s*```|^\s*\[[\s\S]*\]\s*$" json_match = re.search(json_pattern, result) if json_match: json_str = json_match.group(1) if json_match.group(1) else result json_str = re.sub(r"```.*|```", "", json_str).strip() try: suggestions = json.loads(json_str, strict=False) return suggestions except: pass else: try: suggestions = json.loads(result, strict=False) return suggestions except: pass return None except Exception as e: print(f"Error analyzing knowledge base: {str(e)}") return None # Initialize session state if "template_spec" not in st.session_state: st.session_state.template_spec = None if "knowledge_base" not in st.session_state: st.session_state.knowledge_base = "" if "show_template_editor" not in st.session_state: st.session_state.show_template_editor = False if "user_inputs" not in st.session_state: st.session_state.user_inputs = {} if "generated_output" not in st.session_state: st.session_state.generated_output = "" if "uploaded_filenames" not in st.session_state: st.session_state.uploaded_filenames = [] if "kb_cleared" not in st.session_state: st.session_state.kb_cleared = False # Sidebar setup with st.sidebar: st.title("Template Generator") st.write("Create templates for generating content with LLMs.") # API Key inputs st.subheader("API Keys") api_key = st.text_input("OpenAI API Key", type="password") if api_key: st.session_state.api_key = api_key anthropic_api_key = st.text_input("Anthropic API Key", type="password") if anthropic_api_key: st.session_state.anthropic_api_key = anthropic_api_key # Model selection st.subheader("Model Selection") model_provider = st.radio( "Select Model Provider", options=["OpenAI", "Anthropic"], index=0, ) if model_provider == "OpenAI": st.session_state.model = st.selectbox( "Select OpenAI Model", options=[ "gpt-4o-mini", "gpt-4.1-mini", "gpt-4.1", "gpt-4o", "gpt-4.1-nano", ], index=1, ) else: # Anthropic st.session_state.model = st.selectbox( "Select Claude Model", options=[ "claude-3-7-sonnet-latest", "claude-3-5-haiku-latest", "claude-3-5-sonnet-latest", "claude-3-opus-latest", ], index=1, # Default to Sonnet as a good balance of capability and cost ) # Main application layout st.title("Template Generator") # Create tabs for workflow tab1, tab2, tab3 = st.tabs(["Setup", "Edit and Use Template", "Generate Data"]) with tab1: st.header("Project Setup") # Add option to either upload a template or create a new one setup_option = st.radio( "Choose how to start your project", options=[ "Create new template from documents", "Upload existing template", "Create an empty template", ], index=0, ) if ( setup_option == "Create new template from documents" or setup_option == "Create an empty template" ): # Add Examples section st.markdown("---") st.subheader("Or try one of our examples") # Get example templates example_templates = create_example_templates() # Create columns for example cards cols = st.columns(len(example_templates)) # Display each example in a card for i, (col, template) in enumerate(zip(cols, example_templates)): with col: st.markdown(f"#### {template['name']}") st.markdown(f"*{template['description']}*") # Show input variables with st.expander("Inputs and Outputs", expanded=False): st.markdown("**Inputs:**") for inp in template["input"]: st.markdown(f"- {inp['name']}: {inp['type']}") # Show output variables st.markdown("**Outputs:**") for out in template["output"]: st.markdown(f"- {out['name']}: {out['type']}") # Button to use this example if st.button(f"Use this example", key=f"use_example_{i}"): st.session_state.template_spec = template st.session_state.show_template_editor = True # Create some example outputs to show example_outputs = create_example_outputs(template) # Store example outputs in session state st.session_state.example_outputs = example_outputs # Success message st.success( f"Example template loaded! Go to the 'Edit Template' tab to see it in action." ) # Rerun to update the UI # st.rerun() if setup_option == "Upload existing template": st.subheader("Upload Template File") uploaded_template = st.file_uploader( "Upload a template JSON file", type=["json"], help="Upload a previously created template file (.json)", ) if uploaded_template: template_spec, error = parse_template_file(uploaded_template) if error: st.error(error) else: # Sanitize the template to remove UI-specific keys template_spec = sanitize_template_spec(template_spec) st.success(f"Successfully loaded template: {template_spec['name']}") # Show template preview with st.expander("Template Preview", expanded=False): st.json(template_spec) # Button to use this template if st.button("Use This Template"): st.session_state.template_spec = template_spec st.session_state.show_template_editor = True st.success( "Template loaded! Go to the 'Edit Template' tab to customize it." ) elif setup_option == "Create new template from documents": # Step 1: Upload Knowledge Base st.subheader("Step 1: Upload Knowledge Base") uploaded_files = st.file_uploader( "Upload documents to use as knowledge base", accept_multiple_files=True, type=["pdf", "txt", "html"], ) # Rest of your existing code for document processing... if uploaded_files and not st.session_state.kb_cleared: # Track filenames for UI feedback st.session_state.uploaded_filenames = [file.name for file in uploaded_files] with st.spinner("Processing documents..."): st.session_state.knowledge_base = parse_documents(uploaded_files) st.success(f"Processed {len(uploaded_files)} documents") with st.expander("Preview extracted content"): st.text_area( "Extracted Text", value=st.session_state.knowledge_base, height=200, disabled=True, ) # Step 2: Provide Instructions st.subheader("Step 2: Provide Instructions") instructions = st.text_area( "Describe what you want to create", placeholder="Describe what you want to create (e.g., 'Create a character background generator with name, faction, and race as inputs...')", height=150, ) # Generate Template button if st.button("Generate Template"): if not st.session_state.get("api_key") and not st.session_state.get( "anthropic_api_key" ): st.error( "Please provide an OpenAI API key in the sidebar before generating a template." ) elif instructions: with st.spinner("Analyzing instructions and generating template..."): # Generate template based on instructions and document content st.session_state.template_spec = ( generate_template_from_instructions( instructions, st.session_state.knowledge_base ) ) st.session_state.show_template_editor = True st.success( "Template generated! Go to the 'Edit Template' tab to customize it." ) else: st.warning("Please provide instructions first") elif setup_option == "Create an empty template": st.subheader("Create Empty Template") st.info( "This option creates a minimal template that you can customize in the 'Edit Template' tab." ) # Optional: Allow setting a name and description for the template template_name = st.text_input("Template Name", value="Custom Template") template_description = st.text_area( "Template Description", value="A custom template created from scratch" ) if st.button("Create Empty Template"): # Create a minimal template structure st.session_state.template_spec = { "name": template_name, "version": "1.0.0", "description": template_description, "input": [ { "name": "input_1", "description": "First input variable", "type": "string", "min": 1, "max": 100, } ], "output": [ { "name": "output_1", "description": "Generated output", "type": "string", "min": 10, "max": 1000, } ], "prompt": "Based on the following information:\n{input_1}\n\nGenerate the following output.", } st.session_state.show_template_editor = True st.success( "Empty template created! Go to the 'Edit Template' tab to customize it." ) # Optional: Initialize an empty knowledge base if "knowledge_base" not in st.session_state: st.session_state.knowledge_base = "" with tab2: if st.session_state.show_template_editor and st.session_state.template_spec: st.header("Template Editor") st.subheader(st.session_state.template_spec["name"]) # Initialize session state variables if "suggested_variables" not in st.session_state: st.session_state.suggested_variables = [] if "added_suggestions" not in st.session_state: st.session_state.added_suggestions = set() if ( "last_template" not in st.session_state or st.session_state.last_template != st.session_state.template_spec ): st.session_state.user_inputs = {} st.session_state.last_template = st.session_state.template_spec if "show_variable_editor" not in st.session_state: st.session_state.show_variable_editor = None if "show_output_editor" not in st.session_state: st.session_state.show_output_editor = None if "show_suggested_vars" not in st.session_state: st.session_state.show_suggested_vars = False # Create main layout with left (settings) and right (generation) columns left_col, right_col = st.columns([3, 2]) # LEFT COLUMN - Settings with left_col: # Basic template information with st.expander("Template Information (Metadata)", expanded=False): col1, col2 = st.columns(2) with col1: st.session_state.template_spec["name"] = st.text_input( "Template Name", value=st.session_state.template_spec["name"] ) with col2: st.session_state.template_spec["version"] = st.text_input( "Version", value=st.session_state.template_spec["version"] ) st.session_state.template_spec["description"] = st.text_area( "Description", value=st.session_state.template_spec["description"], height=100, ) # Prompt Template Section with st.expander("Prompt Template", expanded=True): st.info( "Use {variable_name} to refer to input variables in your template" ) # Add buttons for prompt management col1, col2 = st.columns([1, 1]) with col1: rewrite_prompt = st.button("AI Rewrite Prompt") with col2: reroll_prompt = st.button("Reroll Prompt Variation") # Handle prompt rewriting if rewrite_prompt or reroll_prompt: with st.spinner("Generating improved prompt template..."): improved_template = generate_improved_prompt_template( st.session_state.template_spec, st.session_state.knowledge_base, ) # Only update if we got a valid result back if improved_template and len(improved_template) > 10: st.session_state.template_spec["prompt"] = improved_template st.success("Prompt template updated!") # Display the prompt template prompt_template = st.text_area( "Edit the prompt template", value=st.session_state.template_spec["prompt"], height=200, ) st.session_state.template_spec["prompt"] = prompt_template # Knowledge Base Management Section with st.expander("Knowledge Base Management", expanded=False): st.info("Upload and manage documents to use as knowledge base") # Upload interface uploaded_files = st.file_uploader( "Upload documents", accept_multiple_files=True, type=["pdf", "txt", "docx", "html"], ) # Handle document processing if uploaded_files: # Choose how to handle new uploads handle_method = st.radio( "How to handle new documents?", ["Replace existing", "Append to existing"], horizontal=True, ) if st.button("Process Documents"): parse_documents.clear() analyze_knowledge_base.clear() st.session_state.kb_cleared = True with st.spinner("Processing documents..."): if handle_method == "Replace existing": new_content = parse_documents(uploaded_files) st.session_state.knowledge_base = new_content st.session_state.uploaded_filenames = [ file.name for file in uploaded_files ] else: # Append # Find new files by comparing filenames new_files = [] duplicate_files = [] for file in uploaded_files: if file.name in st.session_state.uploaded_filenames: duplicate_files.append(file.name) else: new_files.append(file) st.session_state.uploaded_filenames.append( file.name ) # Process only new files if new_files: new_content = parse_documents(new_files) st.session_state.knowledge_base += ( "\n\n" + new_content ) # Provide feedback about duplicates if duplicate_files: st.info( f"Skipped {len(duplicate_files)} duplicate files: {', '.join(duplicate_files)}" ) # Reset any analysis that depends on knowledge base if "suggested_variables" in st.session_state: st.session_state.suggested_variables = [] st.session_state.show_suggested_vars = False st.success(f"Processed {len(uploaded_files)} documents") st.rerun() # Display knowledge base information if st.session_state.knowledge_base: st.write( f"Knowledge base size: {len(st.session_state.knowledge_base)} characters" ) # Clear knowledge base button # Display uploaded filenames if st.session_state.uploaded_filenames: st.write("Uploaded files:") for filename in st.session_state.uploaded_filenames: st.write(f"- {filename}") if st.button("Clear Knowledge Base"): analyze_knowledge_base.clear() st.session_state.knowledge_base = "" st.session_state.kb_cleared = True st.session_state.uploaded_filenames = [] if "suggested_variables" in st.session_state: st.session_state.suggested_variables = [] st.session_state.show_suggested_vars = False st.success("Knowledge base cleared") st.rerun() # Option to edit knowledge base directly edit_kb = st.checkbox("Edit knowledge base directly") if edit_kb: new_content = st.text_area( "Edit knowledge base content", value=st.session_state.knowledge_base, height=300, ) if st.button("Update Knowledge Base"): analyze_knowledge_base.clear() st.session_state.knowledge_base = new_content if "suggested_variables" in st.session_state: st.session_state.suggested_variables = [] st.session_state.show_suggested_vars = False st.success("Knowledge base updated") st.rerun() # Add knowledge base as input variable option if st.session_state.knowledge_base: kb_var_option = st.checkbox( "Create input variable from knowledge base" ) if kb_var_option: # Allow editing the content to include as variable kb_content = st.text_area( "Edit knowledge base content for input variable", value=st.session_state.knowledge_base, height=300, ) # Create input variable name kb_var_name = st.text_input( "Input variable name", value="kb_content" ) # Add button to create the input variable if st.button("Add as input variable"): # Check if variable already exists var_exists = False for var in st.session_state.template_spec["input"]: if var["name"] == kb_var_name: var_exists = True var["description"] = "Knowledge base content" var["type"] = "string" var["default_value"] = kb_content st.success( f"Updated existing input variable '{kb_var_name}'" ) break if not var_exists: # Create new input variable new_var = { "name": kb_var_name, "description": "Knowledge base content", "type": "string", "min": len(kb_content), "max": len(kb_content) * 2, "default_value": kb_content, } st.session_state.template_spec["input"].append( new_var ) st.success( f"Added new input variable '{kb_var_name}'" ) # Remind user to update prompt template st.info( f"Remember to use {{{kb_var_name}}} in your prompt template" ) # Knowledge Base Analysis Section if st.session_state.knowledge_base: with st.expander("Knowledge Base Analysis", expanded=False): st.info( "Analyze the knowledge base to suggest variables and values" ) if st.button( "Analyze Knowledge Base for Variables", key="analyze_kb_button_input", ): client = get_openai_client() if not client: st.error( "Please provide an OpenAI API key to analyze the knowledge base." ) else: with st.spinner("Analyzing knowledge base..."): suggested_vars = analyze_knowledge_base( st.session_state.knowledge_base ) if suggested_vars: st.session_state.suggested_variables = ( suggested_vars ) st.session_state.show_suggested_vars = True st.success( f"Found {len(suggested_vars)} potential variables in the knowledge base" ) else: st.warning( "Could not extract variables from the knowledge base" ) # Display suggested variables if they exist if ( st.session_state.suggested_variables and st.session_state.show_suggested_vars ): st.subheader("Suggested Variables") for i, var in enumerate(st.session_state.suggested_variables): # Generate a unique ID for this variable var_id = f"{var['name']}_{i}" # Check if this variable has already been added if var_id in st.session_state.added_suggestions: continue col1, col2 = st.columns([4, 1]) with col1: st.markdown( f"**{var['name']}** ({var['type']}): {var['description']}" ) if var.get("options"): st.markdown(f"Options: {', '.join(var['options'])}") with col2: if st.button("Add", key=f"add_suggested_{var_id}"): # Add this variable to the template new_var = { "name": var["name"], "description": var["description"], "type": var["type"], } if var.get("options"): new_var["options"] = var["options"] if var["type"] in ["string", "int", "float"]: new_var["min"] = 1 new_var["max"] = 100 # Add to input variables st.session_state.template_spec["input"].append( new_var ) # Mark this variable as added st.session_state.added_suggestions.add(var_id) # Show success message st.success( f"Added {var['name']} to input variables!" ) # Input Variables Section with st.expander("Input Variables", expanded=True): # Add input variable button col1, col2 = st.columns([3, 1]) with col1: new_input_name = st.text_input( "New input variable name", key="new_input_name" ) with col2: if st.button("Add Input Variable"): new_var = { "name": ( new_input_name if new_input_name else f"new_input_{len(st.session_state.template_spec['input']) + 1}" ), "description": "New input variable", "type": "string", "min": 1, "max": 100, } st.session_state.template_spec["input"].append(new_var) # Display input variables with integrated input fields st.subheader("Input Variables") # Create a container for the variables for i, input_var in enumerate(st.session_state.template_spec["input"]): var_name = input_var["name"] var_type = input_var["type"] var_desc = input_var["description"] with st.container(): # Variable header with description st.markdown(f"##### {var_name}\n###### {var_desc}") # Create columns for the variable controls col1, col2, col3 = st.columns([3, 1, 1]) with col1: # Create the appropriate input field based on variable type if var_type == "string": # Check if this is a knowledge base variable with default value if "default_value" in input_var: use_default = st.checkbox( f"Use default value for {var_name}", value=True, key=f"use_default_{var_name}", ) if use_default: st.session_state.user_inputs[var_name] = ( input_var["default_value"] ) st.text_area( f"Default value for {var_name}", value=input_var["default_value"][:500] + ( "..." if len(input_var["default_value"]) > 500 else "" ), height=150, disabled=True, key=f"preview_{var_name}", ) else: st.session_state.user_inputs[var_name] = ( st.text_area( f"Enter value for {var_name}", value=input_var["default_value"], height=150, key=f"use_{var_name}", ) ) else: st.session_state.user_inputs[var_name] = ( st.text_input( f"Enter value for {var_name}", key=f"use_{var_name}", ) ) elif var_type == "int": st.session_state.user_inputs[var_name] = ( st.number_input( f"Enter value for {var_name}", min_value=input_var.get("min", None), max_value=input_var.get("max", None), step=1, key=f"use_{var_name}", ) ) elif var_type == "float": st.session_state.user_inputs[var_name] = ( st.number_input( f"Enter value for {var_name}", min_value=float(input_var.get("min", 0)), max_value=float(input_var.get("max", 100)), key=f"use_{var_name}", ) ) elif var_type == "bool": st.session_state.user_inputs[var_name] = st.checkbox( f"Select value for {var_name}", key=f"use_{var_name}", ) elif var_type == "categorical": options = input_var.get("options", []) min_selections = input_var.get("min", 1) max_selections = input_var.get("max", 1) if options: if min_selections == 1 and max_selections == 1: # Single selection st.session_state.user_inputs[var_name] = ( st.selectbox( f"Select value for {var_name}", options=options, key=f"use_{var_name}", ) ) else: # Multi-selection st.session_state.user_inputs[var_name] = ( st.multiselect( f"Select {min_selections}-{max_selections} values for {var_name}", options=options, default=( options[:min_selections] if len(options) >= min_selections else options ), key=f"use_{var_name}", ) ) else: st.warning(f"No options defined for {var_name}") with col2: # Button to edit this variable if st.button("Edit Settings", key=f"edit_input_{i}"): st.session_state.show_variable_editor = i with col3: # Button to remove this variable if st.button("Remove", key=f"remove_input_{i}"): st.session_state.template_spec["input"].pop(i) st.rerun() # Show editor if this variable is selected if st.session_state.show_variable_editor == i: with st.container(): st.markdown("---") st.markdown( f"##### Variable Settings: {input_var['name']}" ) # Name and description input_var["name"] = st.text_input( "Name", value=input_var["name"], key=f"input_name_{i}", ) input_var["description"] = st.text_input( "Description", value=input_var["description"], key=f"input_desc_{i}", ) # Type selection var_type = st.selectbox( "Type", options=[ "string", "int", "float", "bool", "categorical", ], index=[ "string", "int", "float", "bool", "categorical", ].index(input_var["type"]), key=f"input_type_{i}", ) input_var["type"] = var_type # Type-specific settings if var_type in ["string", "int", "float"]: col1, col2 = st.columns(2) with col1: input_var["min"] = st.number_input( "Min", value=int(input_var.get("min", 0)), key=f"input_min_{i}", ) with col2: input_var["max"] = st.number_input( "Max", value=int(input_var.get("max", 100)), key=f"input_max_{i}", ) if var_type == "categorical": # Suggest options from KB button if st.button( "Suggest Options from KB", key=f"suggest_input_{i}", ): client = get_openai_client() if not client: st.error( "Please provide an OpenAI API key to suggest options." ) elif not st.session_state.knowledge_base: st.warning( "No knowledge base available. Please upload documents first." ) else: with st.spinner( f"Suggesting options for {input_var['name']}..." ): suggestions = ( suggest_variable_values_from_kb( input_var["name"], "categorical", st.session_state.knowledge_base, ) ) if ( suggestions and "options" in suggestions ): input_var["options"] = suggestions[ "options" ] st.success( f"Found {len(suggestions['options'])} options" ) else: st.warning( "Could not find suitable options in the knowledge base" ) # Options editor options = input_var.get("options", []) options_str = st.text_area( "Options (one per line)", value="\n".join(options), key=f"input_options_{i}", ) input_var["options"] = [ opt.strip() for opt in options_str.split("\n") if opt.strip() ] # Min/max selections col1, col2 = st.columns(2) with col1: input_var["min"] = st.number_input( "Min selections", value=int(input_var.get("min", 1)), min_value=0, key=f"input_cat_min_{i}", ) with col2: input_var["max"] = st.number_input( "Max selections", value=int(input_var.get("max", 1)), min_value=1, key=f"input_cat_max_{i}", ) # Close editor button if st.button("Done Editing", key=f"done_input_{i}"): st.session_state.show_variable_editor = None st.rerun() st.markdown("---") st.divider() # Output Variables Section with st.expander("Output Variables", expanded=True): # Add output variable button col1, col2 = st.columns([3, 1]) with col1: new_output_name = st.text_input( "New output variable name", key="new_output_name" ) with col2: if st.button("Add Output Variable"): new_var = { "name": ( new_output_name if new_output_name else f"new_output_{len(st.session_state.template_spec['output']) + 1}" ), "description": "New output variable", "type": "string", "min": 1, "max": 100, } st.session_state.template_spec["output"].append(new_var) # Display output variables in a table-like format st.subheader("Output Variables") # Create a container for the variables for i, output_var in enumerate( st.session_state.template_spec["output"] ): col1, col2, col3 = st.columns([3, 1, 1]) with col1: st.markdown( f"**{output_var['name']}** - {output_var['description']}" ) with col2: # Button to edit this variable if st.button("Edit", key=f"edit_output_{i}"): st.session_state.show_output_editor = i with col3: # Button to remove this variable if st.button("Remove", key=f"remove_output_{i}"): st.session_state.template_spec["output"].pop(i) st.rerun() # Show editor if this variable is selected if st.session_state.show_output_editor == i: with st.container(): st.markdown("---") st.markdown( f"##### Edit Output Variable: {output_var['name']}" ) # Name and description output_var["name"] = st.text_input( "Name", value=output_var["name"], key=f"output_name_{i}" ) output_var["description"] = st.text_input( "Description", value=output_var["description"], key=f"output_desc_{i}", ) # Type selection var_type = st.selectbox( "Type", options=[ "string", "int", "float", "bool", "categorical", ], index=[ "string", "int", "float", "bool", "categorical", ].index(output_var["type"]), key=f"output_type_{i}", ) output_var["type"] = var_type # Type-specific settings if var_type in ["string", "int", "float"]: col1, col2 = st.columns(2) with col1: output_var["min"] = st.number_input( "Min", value=int(output_var.get("min", 0)), key=f"output_min_{i}", ) with col2: output_var["max"] = st.number_input( "Max", value=int(output_var.get("max", 100)), key=f"output_max_{i}", ) if var_type == "categorical": # Suggest options from KB button if st.button( "Suggest Options from KB", key=f"suggest_output_{i}" ): client = get_openai_client() if not client: st.error( "Please provide an OpenAI API key to suggest options." ) elif not st.session_state.knowledge_base: st.warning( "No knowledge base available. Please upload documents first." ) else: with st.spinner( f"Suggesting options for {output_var['name']}..." ): suggestions = ( suggest_variable_values_from_kb( output_var["name"], "categorical", st.session_state.knowledge_base, ) ) if suggestions and "options" in suggestions: output_var["options"] = suggestions[ "options" ] st.success( f"Found {len(suggestions['options'])} options" ) else: st.warning( "Could not find suitable options in the knowledge base" ) # Options editor options = output_var.get("options", []) options_str = st.text_area( "Options (one per line)", value="\n".join(options), key=f"output_options_{i}", ) output_var["options"] = [ opt.strip() for opt in options_str.split("\n") if opt.strip() ] # Min/max selections col1, col2 = st.columns(2) with col1: output_var["min"] = st.number_input( "Min selections", value=int(output_var.get("min", 1)), min_value=0, key=f"output_cat_min_{i}", ) with col2: output_var["max"] = st.number_input( "Max selections", value=int(output_var.get("max", 1)), min_value=1, key=f"output_cat_max_{i}", ) # Close editor button if st.button("Done Editing", key=f"done_output_{i}"): st.session_state.show_output_editor = None st.rerun() st.markdown("---") # Template JSON with st.expander("Template JSON", expanded=False): st.json(st.session_state.template_spec) # Download button template_json = json.dumps(st.session_state.template_spec, indent=2) st.download_button( label="Download Template JSON", data=template_json, file_name="template_spec.json", mime="application/json", ) # RIGHT COLUMN - Generation with right_col: st.header("Generation") # Handle the lore/knowledge base as a special variable prompt_template = st.session_state.template_spec["prompt"] if "{lore}" in prompt_template: with st.expander("Document Knowledge Base", expanded=False): st.markdown("##### Document Knowledge Base") # Display info about the knowledge base if st.session_state.knowledge_base: st.success( f"Using content from {len(st.session_state.uploaded_filenames) if 'uploaded_filenames' in st.session_state else 'uploaded'} documents as knowledge base" ) # Use a button to toggle knowledge base content view instead of an expander if st.button( "View/Hide Knowledge Base Content", key="toggle_kb_view" ): st.session_state.show_kb_content = not st.session_state.get( "show_kb_content", False ) if st.session_state.get("show_kb_content", False): st.text_area( "Knowledge base content", value=st.session_state.knowledge_base[:2000] + ( "..." if len(st.session_state.knowledge_base) > 2000 else "" ), height=200, disabled=True, ) # Add option to edit if needed use_edited_lore = st.checkbox("Edit knowledge base content") if use_edited_lore: st.session_state.user_inputs["lore"] = st.text_area( "Edit knowledge base for this generation", value=st.session_state.knowledge_base, height=300, ) else: st.session_state.user_inputs["lore"] = ( st.session_state.knowledge_base ) else: st.warning( "No documents uploaded. You can provide custom lore below." ) st.session_state.user_inputs["lore"] = st.text_area( "Enter background information or context", placeholder="Enter custom lore or background information here...", height=150, ) # Temperature control slider st.session_state.temperature = st.slider( "Temperature (creativity level)", min_value=0.0, max_value=1.0, value=0.7, step=0.05 ) # Generate Output button if st.button("Generate Output", key="generate_button"): # Check if API key is provided if not st.session_state.get("api_key") and not st.session_state.get( "anthropic_api_key" ): st.error( "Please provide an OpenAI or Anthropic API key in the sidebar before generating output." ) else: # Fill the prompt template with user-provided values filled_prompt = prompt_template for var_name, var_value in st.session_state.user_inputs.items(): filled_prompt = filled_prompt.replace( f"{{{var_name}}}", str(var_value) ) # Show the filled prompt with st.expander("View populated prompt"): st.text_area( "Prompt sent to LLM", value=filled_prompt, height=200, disabled=True, ) # Call LLM with the filled prompt # Create a single input data item from user inputs input_data = [st.session_state.user_inputs.copy()] # Create a copy of the template spec template_spec_copy = st.session_state.template_spec.copy() # Call generate_synthetic_outputs with the input data with st.spinner("Generating output..."): model_selected = st.session_state.model generated_outputs = generate_synthetic_outputs( template_spec_copy, input_data, st.session_state.knowledge_base, max_retries=3, ) # Extract the first output (since we only have one input) if generated_outputs and len(generated_outputs) > 0: # The output contains both input and output fields # We only want to display the output fields output_vars = [ var["name"] for var in template_spec_copy["output"] ] output_data = { k: v for k, v in generated_outputs[0].items() if k in output_vars } st.session_state.generated_output = output_data else: st.session_state.generated_output = { "error": "Failed to generate output" } # Display generated output if ( "generated_output" in st.session_state and st.session_state.generated_output ): st.header("Generated Output") # Check if the output is a dictionary (JSON) if isinstance(st.session_state.generated_output, dict): # Display as JSON st.json(st.session_state.generated_output) # Option to save the output as JSON output_json = json.dumps( st.session_state.generated_output, indent=2 ) st.download_button( label="Download Output (JSON)", data=output_json, file_name="generated_output.json", mime="application/json", ) else: # Display as text st.write(st.session_state.generated_output) # Option to save the output as text st.download_button( label="Download Output", data=str(st.session_state.generated_output), file_name="generated_output.txt", mime="text/plain", ) else: st.info( "No template has been generated yet. Go to the 'Setup' tab to create one." ) with tab3: if st.session_state.show_template_editor and st.session_state.template_spec: st.header("Generate Synthetic Data") with st.expander("Template Information", expanded=False): st.json(st.session_state.template_spec) # Data generation controls st.subheader("Generation Settings") col1, col2 = st.columns(2) with col1: num_samples = st.number_input( "Number of samples to generate", min_value=1, max_value=100, value=5 ) with col2: # Store the temperature value in session state st.session_state.temperature = st.slider( "Temperature (creativity)", min_value=0.1, max_value=1.0, value=0.7, step=0.1, ) # Initialize containers for generated data if "synthetic_inputs" not in st.session_state: st.session_state.synthetic_inputs = [] if "synthetic_outputs" not in st.session_state: st.session_state.synthetic_outputs = [] if "combined_data" not in st.session_state: st.session_state.combined_data = [] if "show_json_columns" not in st.session_state: st.session_state.show_json_columns = False if "modified_prompt_template" not in st.session_state: st.session_state.modified_prompt_template = "" if "selected_samples" not in st.session_state: st.session_state.selected_samples = [] # Add option selection for categorical variables categorical_vars = [ var for var in st.session_state.template_spec["input"] if var["type"] == "categorical" and var.get("options") ] # In tab3, modify the categorical variable options section if categorical_vars: st.subheader("Categorical Variable Options") st.info( "Select which options to include in the permutations for each categorical variable." ) # Create a copy of the template spec for modification template_spec_copy = st.session_state.template_spec.copy() template_spec_copy["input"] = st.session_state.template_spec["input"].copy() # Initialize UI state for categorical variables if not present if "categorical_ui_state" not in st.session_state: st.session_state.categorical_ui_state = {} # For each categorical variable, allow selecting options for i, var in enumerate( [ v for v in template_spec_copy["input"] if v["type"] == "categorical" and v.get("options") ] ): var_name = var["name"] # Initialize UI state for this variable if not present if var_name not in st.session_state.categorical_ui_state: st.session_state.categorical_ui_state[var_name] = { "selected_options": var.get("options", []).copy(), "previous_options": var.get("options", []).copy(), } with st.expander( f"{var['name']} - {var['description']}", expanded=False ): options = var.get("options", []) # Get UI state for this variable ui_state = st.session_state.categorical_ui_state[var_name] # Filter selected_options to only include valid options ui_state["selected_options"] = [ opt for opt in ui_state["selected_options"] if opt in options ] # Check for new options that need to be automatically selected previous_options = ui_state["previous_options"] # Find new options that weren't in the previous options list new_options = [ opt for opt in options if opt not in previous_options ] # Add new options to selected_options if new_options: ui_state["selected_options"].extend(new_options) # Store current options for future comparison ui_state["previous_options"] = options.copy() # Add "Select All" and "Clear All" buttons col1, col2 = st.columns([1, 1]) with col1: if st.button( f"Select All Options for {var['name']}", key=f"select_all_{i}", ): ui_state["selected_options"] = options.copy() with col2: if st.button( f"Clear All Options for {var['name']}", key=f"clear_all_{i}" ): ui_state["selected_options"] = [] # Create multiselect for options ui_state["selected_options"] = st.multiselect( f"Select options to include for {var['name']}", options=options, default=ui_state["selected_options"], key=f"options_select_{i}", ) # Show selected count st.write( f"Selected {len(ui_state['selected_options'])} out of {len(options)} options" ) # Create a temporary copy of the variable with selected_options for the calculation # but don't modify the actual template var_copy = var.copy() var_copy["selected_options"] = ui_state["selected_options"] # Update the template spec copy with the selected options for calculation purposes only for j, input_var in enumerate(template_spec_copy["input"]): if input_var["name"] == var["name"]: template_spec_copy["input"][j] = var_copy break # Calculate and display Cartesian product size product_size, var_counts = calculate_cartesian_product_size( [v for v in template_spec_copy["input"] if v["type"] == "categorical"] ) st.subheader("Combination Analysis") st.info(f"Total number of possible combinations: {product_size:,}") # Display breakdown of combinations st.write("Breakdown by variable:") for var in var_counts: st.write(f"- {var['name']}: {var['count']:,} possible values") if product_size > num_samples: st.warning( f"Note: Only {num_samples} samples will be generated from the {product_size:,} possible combinations" ) elif product_size < num_samples: st.warning( f"Note: Some combinations will be repeated to reach {num_samples} samples (only {product_size:,} unique combinations possible)" ) # Generate inputs button if st.button("Generate Synthetic Inputs"): if not st.session_state.get("api_key") and not st.session_state.get( "anthropic_api_key" ): st.error( "Please provide an OpenAI or Anthropic API key in the sidebar." ) else: with st.spinner(f"Generating {num_samples} synthetic input samples..."): # Create a clean template spec without UI state variables clean_template_spec = st.session_state.template_spec.copy() clean_template_spec["input"] = st.session_state.template_spec[ "input" ].copy() # If we have categorical variables, apply the selected options from UI state if categorical_vars: for i, var in enumerate(clean_template_spec["input"]): if ( var["type"] == "categorical" and var.get("options") and var["name"] in st.session_state.categorical_ui_state ): # Create a copy of the variable with selected_options for generation var_copy = var.copy() var_copy["selected_options"] = ( st.session_state.categorical_ui_state[var["name"]][ "selected_options" ] ) clean_template_spec["input"][i] = var_copy st.session_state.synthetic_inputs = ( generate_synthetic_inputs_hybrid( clean_template_spec, num_samples=num_samples ) ) else: st.session_state.synthetic_inputs = ( generate_synthetic_inputs_hybrid( clean_template_spec, num_samples=num_samples ) ) if st.session_state.synthetic_inputs: st.success( f"Generated {len(st.session_state.synthetic_inputs)} input samples" ) # Reset selected samples when new inputs are generated st.session_state.selected_samples = [] # Reset modified prompt when new inputs are generated st.session_state.modified_prompt_template = ( st.session_state.template_spec["prompt"] ) # Display generated inputs if available if st.session_state.synthetic_inputs: st.subheader("Generated Input Data") # Show data in a table input_df = pd.DataFrame(st.session_state.synthetic_inputs) st.dataframe(input_df) # Download button for inputs input_csv = input_df.to_csv(index=False) st.download_button( label="Download Input Data (CSV)", data=input_csv, file_name="synthetic_inputs.csv", mime="text/csv", ) # Sample selection for output generation st.subheader("Generate Outputs") # Initialize the modified prompt template if not already done if not st.session_state.modified_prompt_template: st.session_state.modified_prompt_template = ( st.session_state.template_spec["prompt"] ) # Allow editing the prompt template with st.expander("View/Edit Prompt Template", expanded=False): st.info( "You can modify the prompt template used for generating outputs. Use {variable_name} to refer to input variables." ) st.session_state.modified_prompt_template = st.text_area( "Prompt Template", value=st.session_state.modified_prompt_template, height=200, ) # Button to reset to original template if st.button("Reset to Original Template"): st.session_state.modified_prompt_template = ( st.session_state.template_spec["prompt"] ) st.success("Prompt template reset to original") # Sample selection options selection_method = st.radio( "Select samples for output generation", options=["Generate for all samples", "Select specific samples"], index=0, ) if selection_method == "Select specific samples": # Create a list of sample indices for selection sample_options = [ f"Sample {i+1}" for i in range(len(st.session_state.synthetic_inputs)) ] # Allow multi-selection of samples selected_indices = st.multiselect( "Select samples to generate outputs for", options=range(len(sample_options)), format_func=lambda i: sample_options[i], ) # Store selected samples st.session_state.selected_samples = selected_indices # Preview selected samples if selected_indices: st.write(f"Selected {len(selected_indices)} samples:") selected_df = pd.DataFrame( [st.session_state.synthetic_inputs[i] for i in selected_indices] ) st.dataframe(selected_df) else: # Use all samples st.session_state.selected_samples = list( range(len(st.session_state.synthetic_inputs)) ) # Preview the prompt for a selected sample if st.session_state.selected_samples: with st.expander("Preview Prompt for Sample", expanded=False): # Let user select which sample to preview preview_index = st.selectbox( "Select a sample to preview prompt", options=st.session_state.selected_samples, format_func=lambda i: f"Sample {i+1}", ) # Get the selected sample sample = st.session_state.synthetic_inputs[preview_index] # Fill the prompt template with sample values filled_prompt = st.session_state.modified_prompt_template for var_name, var_value in sample.items(): filled_prompt = filled_prompt.replace( f"{{{var_name}}}", str(var_value) ) # Replace {lore} with knowledge base if present if "{lore}" in filled_prompt: filled_prompt = filled_prompt.replace( "{lore}", st.session_state.knowledge_base ) # Show the filled prompt st.text_area( "Filled Prompt", value=filled_prompt, height=300, disabled=True ) # Advanced output generation options with st.expander("Advanced Output Generation Options", expanded=False): st.info("Configure options for generating multiple outputs per input") # Option to generate multiple outputs for some inputs enable_multiple_outputs = st.checkbox( "Generate multiple outputs for some inputs", help="Enable generating multiple variations of outputs for selected inputs", ) if enable_multiple_outputs: # Proportion of inputs to duplicate duplicate_proportion = st.slider( "Proportion of inputs to generate multiple outputs for", min_value=0.0, max_value=1.0, value=0.2, step=0.1, help="What fraction of the input samples should have multiple outputs", ) # Number of outputs per duplicated input outputs_per_input = st.number_input( "Number of outputs per selected input", min_value=2, max_value=5, value=2, help="How many different outputs to generate for each selected input", ) # Preview the effect if st.session_state.selected_samples: num_selected = len(st.session_state.selected_samples) num_to_duplicate = math.ceil( num_selected * duplicate_proportion ) total_outputs = (num_selected - num_to_duplicate) + ( num_to_duplicate * outputs_per_input ) st.write( f"This will result in approximately {total_outputs} total outputs:" ) st.write( f"- {num_selected - num_to_duplicate} inputs with 1 output" ) st.write( f"- {num_to_duplicate} inputs with {outputs_per_input} outputs each" ) # Generate outputs button if st.button("Generate Outputs for Selected Samples"): if not st.session_state.get("api_key") and not st.session_state.get( "anthropic_api_key" ): st.error( "Please provide an OpenAI or Anthropic API key in the sidebar." ) elif not st.session_state.selected_samples: st.error("No samples selected for output generation.") else: # Create a copy of the template spec with the modified prompt modified_template = st.session_state.template_spec.copy() modified_template["prompt"] = ( st.session_state.modified_prompt_template ) # Get only the selected samples selected_inputs = [ st.session_state.synthetic_inputs[i] for i in st.session_state.selected_samples ] # Handle multiple outputs if enabled if enable_multiple_outputs: # Calculate how many inputs should have multiple outputs num_to_duplicate = math.ceil( len(selected_inputs) * duplicate_proportion ) # Randomly select inputs for multiple outputs duplicate_indices = random.sample( range(len(selected_inputs)), num_to_duplicate ) # Create the expanded input list expanded_inputs = [] for i, input_data in enumerate(selected_inputs): if i in duplicate_indices: # Add multiple copies for selected inputs expanded_inputs.extend([input_data] * outputs_per_input) else: # Add single copy for other inputs expanded_inputs.append(input_data) # Update selected_inputs with the expanded list selected_inputs = expanded_inputs with st.spinner( f"Generating outputs for {len(selected_inputs)} samples..." ): generated_outputs = generate_synthetic_outputs( modified_template, selected_inputs, st.session_state.knowledge_base, ) if generated_outputs: # If we're generating for all samples, replace the combined data if selection_method == "Generate for all samples": st.session_state.combined_data = generated_outputs else: # For specific samples, we need to handle the case of multiple outputs if enable_multiple_outputs: # Simply use all generated outputs as the combined data st.session_state.combined_data = generated_outputs else: # Handle single outputs as before if not st.session_state.combined_data or len( st.session_state.combined_data ) != len(st.session_state.synthetic_inputs): st.session_state.combined_data = [None] * len( st.session_state.synthetic_inputs ) # Update only the selected samples for i, output_idx in enumerate( st.session_state.selected_samples ): if i < len(generated_outputs): st.session_state.combined_data[output_idx] = ( generated_outputs[i] ) # Remove any None values (samples that haven't been generated yet) st.session_state.combined_data = [ item for item in st.session_state.combined_data if item is not None ] st.success(f"Generated {len(generated_outputs)} outputs") # Display combined data if available if st.session_state.combined_data: st.subheader("Complete Dataset (Inputs + Outputs)") # Get all available column names from the data all_columns = pd.DataFrame(st.session_state.combined_data).columns.tolist() # Let the user select columns to exclude from input JSON st.session_state.columns_to_drop = st.multiselect( "Select input variables to exclude:", options=all_columns, default=st.session_state.get("columns_to_drop", []), ) # Add this function before the prepare_dataframe_with_json_columns function def prepare_dataframe_for_parquet(df): """ Convert DataFrame columns to types compatible with Parquet format. Args: df (pd.DataFrame): Input DataFrame Returns: pd.DataFrame: DataFrame with converted types """ df_copy = df.copy() for col in df_copy.columns: # Check if column contains lists or dictionaries if df_copy[col].apply(lambda x: isinstance(x, (list, dict))).any(): # Convert lists and dictionaries to JSON strings df_copy[col] = df_copy[col].apply( lambda x: ( json.dumps(x) if isinstance(x, (list, dict)) else x ) ) # Check for mixed types that might cause issues if ( df_copy[col] .apply(lambda x: isinstance(x, (bool, int, float, str))) .all() ): # Column has consistent primitive types, leave as is continue else: # Convert any complex or mixed types to strings df_copy[col] = df_copy[col].apply(str) return df_copy # Create a function to prepare the dataframe with JSON columns def prepare_dataframe_with_json_columns( data, template_spec, show_json_columns=False, columns_to_drop=None ): df = pd.DataFrame(data) # Drop specified columns from the dataframe if columns_to_drop: df = df.drop( columns=[col for col in columns_to_drop if col in df.columns] ) else: columns_to_drop = [] # Create input and output JSON columns input_vars = [ var["name"] for var in template_spec["input"] if var["name"] not in columns_to_drop ] output_vars = [var["name"] for var in template_spec["output"]] # Create input JSON column df["input"] = df.apply( lambda row: json.dumps( {var: row[var] for var in input_vars if var in row} ), axis=1, ) # Create output JSON column df["output"] = df.apply( lambda row: json.dumps( {var: row[var] for var in output_vars if var in row} ), axis=1, ) # If not showing JSON columns in UI, remove them for display only if not show_json_columns: display_df = df.drop(columns=["input", "output"]) else: display_df = df # Return the same filtered df for export (full_df) return df, display_df # Toggle for showing JSON columns st.session_state.show_json_columns = st.checkbox( "Show input/output JSON columns", value=st.session_state.show_json_columns, ) # Prepare dataframe with JSON columns full_df, display_df = prepare_dataframe_with_json_columns( st.session_state.combined_data, st.session_state.template_spec, st.session_state.show_json_columns, columns_to_drop=st.session_state.columns_to_drop, ) # Show data in a table st.dataframe(display_df) # Download buttons for different formats col1, col2, col3 = st.columns(3) with col1: # CSV download combined_csv = full_df.to_csv(index=False) st.download_button( label="Download Dataset (CSV)", data=combined_csv, file_name="synthetic_dataset.csv", mime="text/csv", ) with col2: # JSON download using cleaned dataframe json_ready_df = full_df.drop(columns=["input", "output"]) combined_json = json.dumps( json_ready_df.to_dict(orient="records"), indent=2 ) st.download_button( label="Download Dataset (JSON)", data=combined_json, file_name="synthetic_dataset.json", mime="application/json", ) with col3: # Parquet download try: # Create a BytesIO object to hold the Parquet file parquet_buffer = BytesIO() # Convert DataFrame to Parquet-compatible types parquet_df = prepare_dataframe_for_parquet(full_df) # Write the DataFrame to the BytesIO object in Parquet format parquet_df.to_parquet(parquet_buffer, index=False) # Reset the buffer's position to the beginning parquet_buffer.seek(0) st.download_button( label="Download Dataset (Parquet)", data=parquet_buffer, file_name="synthetic_dataset.parquet", mime="application/octet-stream", ) except Exception as e: st.error(f"Error creating Parquet file: {str(e)}") st.info( "To use Parquet format, install pyarrow with: pip install pyarrow" ) else: st.info( "No template has been generated yet. Go to the 'Setup' tab to create one." )