Spaces:
Build error
Build error
| import os | |
| import json | |
| import uuid | |
| import random | |
| import yaml | |
| import litellm | |
| import tqdm | |
| import concurrent.futures | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from datetime import datetime | |
| from pathlib import Path | |
| # Reuse components from the existing codebase | |
| from prompts import PROMPTS, format_prompt, load_prompts | |
| from utils import save_results, generate_user_id | |
| from ibfs import generate_strategies, answer_query | |
| from zero_shot import zero_shot_answer | |
| # Load environment variables and prompts | |
| load_prompts() | |
| class UserAgent: | |
| """Simulates a user with a preferred answer and decision-making behavior.""" | |
| def __init__(self, llm_model: str = "gpt-4o", epsilon: float = 0.2): | |
| """ | |
| Initialize the UserAgent with properties. | |
| Args: | |
| llm_model: The LLM model to use for agent decisions | |
| epsilon: Probability of making a random choice instead of optimal | |
| """ | |
| self.llm_model = llm_model | |
| self.epsilon = epsilon | |
| self.preferred_answer = None | |
| self.query = None | |
| self.id = generate_user_id() | |
| def set_preferences(self, query: str): | |
| """ | |
| Set a query and generate the preferred answer for this user. | |
| Args: | |
| query: The question the user wants answered | |
| """ | |
| self.query = query | |
| # Generate the user's preferred answer using the LLM | |
| messages = [ | |
| {"role": "system", | |
| "content": "You are generating a preferred answer that a user has in mind for their query. This represents what the user is hoping to learn or the perspective they're hoping to see."}, | |
| {"role": "user", | |
| "content": f"For the query: '{query}', generate a detailed, thoughtful answer that will serve as the user's preferred answer. This is the information or perspective they are hoping to find. Make it 20 words."} | |
| ] | |
| response = litellm.completion( | |
| model=self.llm_model, | |
| messages=messages, | |
| max_tokens=1000 | |
| ) | |
| self.preferred_answer = response.choices[0].message.content | |
| return self.preferred_answer | |
| def choose_strategy(self, strategies: List[str]) -> int: | |
| """ | |
| Choose a strategy from the provided options. | |
| Args: | |
| strategies: List of strategy descriptions | |
| Returns: | |
| Index of the chosen strategy (0-based) | |
| """ | |
| # With probability epsilon, make a random choice | |
| if random.random() < self.epsilon: | |
| return random.randint(0, len(strategies) - 1) | |
| # Otherwise, evaluate which strategy gets closest to preferred answer | |
| if not self.preferred_answer or not strategies: | |
| return 0 # Default to first option if no preference or strategies | |
| # Prompt the LLM to rank the strategies based on similarity to preferred answer | |
| strategy_list = "\n".join([f"{i + 1}. {s}" for i, s in enumerate(strategies)]) | |
| messages = [ | |
| {"role": "system", | |
| "content": "You are helping a user select the strategy that would most likely lead to their preferred answer."}, | |
| {"role": "user", "content": f""" | |
| Query: {self.query} | |
| User's preferred answer: {self.preferred_answer} | |
| Available strategies: | |
| {strategy_list} | |
| Which strategy (provide the number only) would most likely lead to an answer that matches the user's preferred answer? Respond with only a single number representing your choice. | |
| """} | |
| ] | |
| try: | |
| response = litellm.completion( | |
| model=self.llm_model, | |
| messages=messages, | |
| temperature=0.2, | |
| max_tokens=10 | |
| ) | |
| # Extract the chosen strategy number | |
| content = response.choices[0].message.content.strip() | |
| # Find the first number in the response | |
| import re | |
| match = re.search(r'\d+', content) | |
| if match: | |
| choice = int(match.group()) - 1 # Convert to 0-based index | |
| # Ensure it's within bounds | |
| if 0 <= choice < len(strategies): | |
| return choice | |
| # If we couldn't parse the response or it's out of bounds, make a random choice | |
| return random.randint(0, len(strategies) - 1) | |
| except Exception as e: | |
| print(f"Error in choosing strategy: {e}") | |
| # Fall back to random choice | |
| return random.randint(0, len(strategies) - 1) | |
| class IBFSAgent: | |
| """Implements the Interactive Best-First Search process.""" | |
| def __init__(self, | |
| llm_model: str = "gpt-4o", | |
| diversity_level: str = "medium", | |
| branching_factor: int = 4, | |
| max_depth: int = 2): | |
| """ | |
| Initialize the IBFSAgent with properties. | |
| Args: | |
| llm_model: The LLM model to use for generating candidates | |
| diversity_level: How diverse the generated candidates should be (low, medium, high) | |
| branching_factor: Number of candidates to generate at each step | |
| max_depth: Maximum depth/iterations of the IBFS process | |
| """ | |
| self.llm_model = llm_model | |
| self.diversity_level = diversity_level | |
| self.branching_factor = branching_factor | |
| self.max_depth = max_depth | |
| self.id = generate_user_id() | |
| # Set up diversity-specific prompts | |
| self._setup_prompts() | |
| def _setup_prompts(self): | |
| """Set up the candidate generation and refinement prompts based on diversity level.""" | |
| # Load the base prompts from PROMPTS dictionary | |
| self.base_system_prompt = PROMPTS["ibfs"]["initial_strategies"]["system"] | |
| self.base_user_prompt = PROMPTS["ibfs"]["initial_strategies"]["user"] | |
| self.refinement_system_prompt = PROMPTS["ibfs"]["continuation_strategies"]["system"] | |
| self.refinement_user_prompt = PROMPTS["ibfs"]["continuation_strategies"]["user"] | |
| # Augment with diversity-specific instructions | |
| diversity_instructions = { | |
| "low": """ | |
| The strategies you generate can be similar to each other and explore related approaches. | |
| There's no need to make them very different from each other. | |
| """, | |
| "medium": """ | |
| Each strategy should represent a somewhat different approach to answering the question. | |
| Try to include some variety in the approaches. | |
| """, | |
| "high": """ | |
| Each strategy should represent a substantially different approach to answering the question. | |
| Make sure the strategies are maximally diverse from each other - consider entirely different angles, | |
| methodologies, perspectives, and areas of knowledge. | |
| """ | |
| } | |
| # Add diversity instructions to the prompts | |
| self.diversity_instructions = diversity_instructions[self.diversity_level] | |
| def generate_strategies(self, query: str, current_path: List[str] = None) -> List[str]: | |
| """ | |
| Generate strategy options for the current step. | |
| Args: | |
| query: The user's query | |
| current_path: List of previously selected strategies | |
| Returns: | |
| List of strategy descriptions | |
| """ | |
| if not current_path or len(current_path) == 0: | |
| # Initial generation | |
| system_prompt = self.base_system_prompt + "\n" + self.diversity_instructions | |
| user_prompt = self.base_user_prompt | |
| # Format the prompts | |
| format_args = { | |
| "query": query, | |
| "k": self.branching_factor | |
| } | |
| else: | |
| # Refinement of previously selected strategy | |
| system_prompt = self.refinement_system_prompt + "\n" + self.diversity_instructions | |
| user_prompt = self.refinement_user_prompt | |
| # Format the prompts | |
| format_args = { | |
| "query": query, | |
| "selected_strategy": current_path[-1], | |
| "k": self.branching_factor | |
| } | |
| # Format the prompts | |
| system_message = format_prompt(system_prompt, **format_args) | |
| user_message = format_prompt(user_prompt, **format_args) | |
| messages = [ | |
| {"role": "system", "content": system_message}, | |
| {"role": "user", "content": user_message} | |
| ] | |
| try: | |
| response = litellm.completion( | |
| model=self.llm_model, | |
| messages=messages, | |
| temperature=0.7, | |
| max_tokens=1000 | |
| ) | |
| content = response.choices[0].message.content | |
| # Use the strategy parsing from ibfs.py | |
| # Parse strategies using regex | |
| import re | |
| strategies = re.findall(r'\d+\.\s*(I can answer by[^\n\d]*(?:\n(?!\d+\.)[^\n]*)*)', content, re.IGNORECASE) | |
| # If we didn't find enough strategies with that format, try alternative parsing | |
| if len(strategies) < self.branching_factor: | |
| strategies = re.findall(r'(?:^|\n)(I can answer by[^\n]*(?:\n(?!I can answer by)[^\n]*)*)', content, | |
| re.IGNORECASE) | |
| # Clean up the strategies | |
| strategies = [s.strip() for s in strategies] | |
| # Ensure we have exactly b strategies | |
| if len(strategies) > self.branching_factor: | |
| strategies = strategies[:self.branching_factor] | |
| # If we still don't have enough strategies, create generic ones | |
| while len(strategies) < self.branching_factor: | |
| strategies.append( | |
| f"I can answer by using approach #{len(strategies) + 1} (Note: Strategy generation incomplete)") | |
| return strategies | |
| except Exception as e: | |
| print(f"Error generating strategies: {e}") | |
| # Return fallback strategies | |
| return [f"I can answer by approach #{i + 1} (Error: Could not generate strategies)" for i in | |
| range(self.branching_factor)] | |
| def generate_final_answer(self, query: str, strategy_path: List[str]) -> str: | |
| """ | |
| Generate the final answer based on the selected strategy path. | |
| Args: | |
| query: The original user query | |
| strategy_path: List of selected strategies | |
| Returns: | |
| Final answer to the query | |
| """ | |
| if not strategy_path: | |
| return "No strategy was selected to generate an answer." | |
| final_strategy = strategy_path[-1] | |
| # Use the answer_query function from ibfs.py | |
| return answer_query(query, final_strategy) | |
| def run_simulation(query: str, | |
| user_agent: UserAgent, | |
| ibfs_agent: IBFSAgent) -> Dict[str, Any]: | |
| """ | |
| Run a full simulation of a user interacting with the IBFS system. | |
| Args: | |
| query: The question to be answered | |
| user_agent: The UserAgent instance | |
| ibfs_agent: The IBFSAgent instance | |
| Returns: | |
| Dictionary containing the simulation results | |
| """ | |
| # Set up the user's preferred answer | |
| user_agent.set_preferences(query) | |
| # Initialize the strategy path | |
| strategy_path = [] | |
| # Record all strategies presented and choices made | |
| history = [] | |
| # Run through the IBFS process up to max_depth | |
| for depth in range(ibfs_agent.max_depth): | |
| # Generate strategies at this step | |
| strategies = ibfs_agent.generate_strategies(query, strategy_path) | |
| # Have the user agent choose a strategy | |
| choice_idx = user_agent.choose_strategy(strategies) | |
| chosen_strategy = strategies[choice_idx] | |
| # Record this step | |
| history.append({ | |
| "depth": depth, | |
| "strategies": strategies, | |
| "choice_idx": choice_idx, | |
| "chosen_strategy": chosen_strategy | |
| }) | |
| # Update the strategy path | |
| strategy_path.append(chosen_strategy) | |
| # Generate the final answer | |
| final_answer = ibfs_agent.generate_final_answer(query, strategy_path) | |
| # Create the simulation result | |
| result = { | |
| "query": query, | |
| "user_id": user_agent.id, | |
| "ibfs_id": ibfs_agent.id, | |
| "user_preferred_answer": user_agent.preferred_answer, | |
| "final_answer": final_answer, | |
| "strategy_path": strategy_path, | |
| "history": history, | |
| "ibfs_config": { | |
| "diversity_level": ibfs_agent.diversity_level, | |
| "branching_factor": ibfs_agent.branching_factor, | |
| "max_depth": ibfs_agent.max_depth | |
| }, | |
| "user_config": { | |
| "epsilon": user_agent.epsilon | |
| }, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| return result | |
| def evaluate_answer_similarity(answer1: str, answer2: str) -> float: | |
| """ | |
| Evaluate the similarity between two answers using the LLM. | |
| Args: | |
| answer1: First answer | |
| answer2: Second answer | |
| Returns: | |
| Similarity score (0-1) | |
| """ | |
| messages = [ | |
| {"role": "system", | |
| "content": "You are evaluating the similarity between a user's preferred answer and a generated answer. Provide a similarity score from 0 to 1, where 1 means identical in content and perspective, and 0 means completely different."}, | |
| {"role": "user", "content": f""" | |
| Answer 1: | |
| {answer1} | |
| Answer 2: | |
| {answer2} | |
| On a scale from 0 to 1, how similar are these answers in terms of content, perspective, and key information? | |
| Provide only a single number as your response. | |
| """} | |
| ] | |
| try: | |
| response = litellm.completion( | |
| model="gpt-4o", | |
| messages=messages, | |
| temperature=0.1, | |
| max_tokens=10 | |
| ) | |
| content = response.choices[0].message.content.strip() | |
| # Extract the score from the response | |
| import re | |
| match = re.search(r'(\d+(\.\d+)?)', content) | |
| if match: | |
| score = float(match.group(1)) | |
| # Ensure it's in the range [0, 1] | |
| return max(0, min(score, 1)) | |
| else: | |
| # Default score if parsing fails | |
| return 0.5 | |
| except Exception as e: | |
| print(f"Error evaluating similarity: {e}") | |
| return 0.5 | |
| def process_simulation(args): | |
| """ | |
| Process a single simulation for parallel execution. | |
| Args: | |
| args: Tuple containing (query, user_config, ibfs_config, experiment_id, sim_count) | |
| Returns: | |
| Simulation result | |
| """ | |
| query, user_config, ibfs_config, experiment_id, sim_count = args | |
| try: | |
| # Create agents with the current configuration | |
| user_agent = UserAgent(epsilon=user_config["epsilon"]) | |
| ibfs_agent = IBFSAgent( | |
| diversity_level=ibfs_config["diversity_level"], | |
| branching_factor=ibfs_config["branching_factor"], | |
| max_depth=ibfs_config["max_depth"] | |
| ) | |
| # Run the simulation | |
| result = run_simulation(query, user_agent, ibfs_agent) | |
| # Add evaluation of similarity between preferred and final answers | |
| similarity = evaluate_answer_similarity( | |
| user_agent.preferred_answer, | |
| result["final_answer"] | |
| ) | |
| result["similarity_score"] = similarity | |
| # Add metadata | |
| result["experiment_id"] = experiment_id | |
| result["simulation_id"] = sim_count | |
| # Save individual result | |
| os.makedirs("experiment_results", exist_ok=True) | |
| simulation_id = f"{experiment_id}_sim_{sim_count}" | |
| with open(f"experiment_results/{simulation_id}.json", "w") as f: | |
| json.dump(result, f, indent=2) | |
| return result | |
| except Exception as e: | |
| print(f"Error in simulation {sim_count}: {e}") | |
| return { | |
| "error": str(e), | |
| "experiment_id": experiment_id, | |
| "simulation_id": sim_count, | |
| "query": query | |
| } | |
| def run_experiment(queries: List[str], | |
| diversity_levels: List[str], | |
| branching_factors: List[int], | |
| max_depths: List[int], | |
| epsilon_values: List[float], | |
| repetitions: int = 3, | |
| max_workers: int = 4) -> List[Dict[str, Any]]: | |
| """ | |
| Run a full experiment with different configurations using parallel processing. | |
| Args: | |
| queries: List of queries to test | |
| diversity_levels: List of diversity levels to test | |
| branching_factors: List of branching factors to test | |
| max_depths: List of max depths to test | |
| epsilon_values: List of epsilon values to test | |
| repetitions: Number of repetitions for each configuration | |
| max_workers: Maximum number of parallel workers | |
| Returns: | |
| List of results for all simulations | |
| """ | |
| # Create the results directory if it doesn't exist | |
| os.makedirs("experiment_results", exist_ok=True) | |
| # Generate a unique experiment ID | |
| experiment_id = f"exp_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| # Create a config log for this experiment | |
| config = { | |
| "experiment_id": experiment_id, | |
| "queries": queries, | |
| "diversity_levels": diversity_levels, | |
| "branching_factors": branching_factors, | |
| "max_depths": max_depths, | |
| "epsilon_values": epsilon_values, | |
| "repetitions": repetitions, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # Save the configuration | |
| with open(f"experiment_results/{experiment_id}_config.json", "w") as f: | |
| json.dump(config, f, indent=2) | |
| # Prepare all simulation configurations | |
| simulation_args = [] | |
| sim_count = 0 | |
| for query in queries: | |
| for diversity in diversity_levels: | |
| for branching in branching_factors: | |
| for depth in max_depths: | |
| for epsilon in epsilon_values: | |
| for rep in range(repetitions): | |
| # Create configuration for this simulation | |
| user_config = {"epsilon": epsilon} | |
| ibfs_config = { | |
| "diversity_level": diversity, | |
| "branching_factor": branching, | |
| "max_depth": depth | |
| } | |
| # Add to arguments list | |
| simulation_args.append((query, user_config, ibfs_config, experiment_id, sim_count)) | |
| sim_count += 1 | |
| # Calculate total simulations | |
| total_simulations = len(simulation_args) | |
| # Run simulations in parallel with progress bar | |
| results = [] | |
| with tqdm.tqdm(total=total_simulations, desc="Running simulations") as pbar: | |
| with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: | |
| # Submit all simulations and process as they complete | |
| future_to_sim = {executor.submit(process_simulation, args): args for args in simulation_args} | |
| for future in concurrent.futures.as_completed(future_to_sim): | |
| result = future.result() | |
| results.append(result) | |
| pbar.update(1) | |
| # Save aggregated results | |
| with open(f"experiment_results/{experiment_id}_all_results.json", "w") as f: | |
| json.dump(results, f, indent=2) | |
| return results | |
| def analyze_results(experiment_id: str) -> Dict[str, Any]: | |
| """ | |
| Analyze the results of an experiment. | |
| Args: | |
| experiment_id: ID of the experiment to analyze | |
| Returns: | |
| Dictionary with analysis results | |
| """ | |
| # Load all simulation results for this experiment | |
| results = [] | |
| for filename in os.listdir("experiment_results"): | |
| if filename.startswith(experiment_id) and not filename.endswith("config.json") and not filename.endswith( | |
| "all_results.json") and not filename.endswith("analysis.json"): | |
| with open(f"experiment_results/{filename}", "r") as f: | |
| try: | |
| results.append(json.load(f)) | |
| except json.JSONDecodeError: | |
| print(f"Error loading {filename}") | |
| # Aggregate metrics by configuration | |
| aggregated = {} | |
| for result in results: | |
| if "error" in result: | |
| continue # Skip failed simulations | |
| # Create a key for this configuration | |
| config_key = ( | |
| result["ibfs_config"]["diversity_level"], | |
| result["ibfs_config"]["branching_factor"], | |
| result["ibfs_config"]["max_depth"], | |
| result["user_config"]["epsilon"] | |
| ) | |
| # Initialize if this is the first result with this config | |
| if config_key not in aggregated: | |
| aggregated[config_key] = { | |
| "similarity_scores": [], | |
| "config": { | |
| "diversity_level": result["ibfs_config"]["diversity_level"], | |
| "branching_factor": result["ibfs_config"]["branching_factor"], | |
| "max_depth": result["ibfs_config"]["max_depth"], | |
| "epsilon": result["user_config"]["epsilon"] | |
| }, | |
| "queries": [] | |
| } | |
| # Add the similarity score | |
| aggregated[config_key]["similarity_scores"].append(result["similarity_score"]) | |
| # Track queries for this configuration | |
| if result["query"] not in aggregated[config_key]["queries"]: | |
| aggregated[config_key]["queries"].append(result["query"]) | |
| # Calculate summary statistics | |
| summary = [] | |
| for config_key, data in aggregated.items(): | |
| scores = data["similarity_scores"] | |
| summary.append({ | |
| "config": data["config"], | |
| "avg_similarity": sum(scores) / len(scores) if scores else 0, | |
| "min_similarity": min(scores) if scores else 0, | |
| "max_similarity": max(scores) if scores else 0, | |
| "std_deviation": (sum((x - (sum(scores) / len(scores))) ** 2 for x in scores) / len( | |
| scores)) ** 0.5 if scores else 0, | |
| "num_samples": len(scores), | |
| "queries_tested": len(data["queries"]) | |
| }) | |
| # Sort by average similarity (descending) | |
| summary.sort(key=lambda x: x["avg_similarity"], reverse=True) | |
| # Add query-specific analysis | |
| query_analysis = {} | |
| for result in results: | |
| if "error" in result: | |
| continue | |
| query = result["query"] | |
| if query not in query_analysis: | |
| query_analysis[query] = { | |
| "best_config": None, | |
| "best_similarity": -1, | |
| "configs_tested": 0, | |
| "avg_similarity": 0, | |
| "all_scores": [] | |
| } | |
| # Track all scores for this query | |
| query_analysis[query]["all_scores"].append(result["similarity_score"]) | |
| # Update best configuration for this query | |
| if result["similarity_score"] > query_analysis[query]["best_similarity"]: | |
| query_analysis[query]["best_similarity"] = result["similarity_score"] | |
| query_analysis[query]["best_config"] = { | |
| "diversity_level": result["ibfs_config"]["diversity_level"], | |
| "branching_factor": result["ibfs_config"]["branching_factor"], | |
| "max_depth": result["ibfs_config"]["max_depth"], | |
| "epsilon": result["user_config"]["epsilon"] | |
| } | |
| # Calculate query statistics | |
| for query, data in query_analysis.items(): | |
| scores = data["all_scores"] | |
| data["avg_similarity"] = sum(scores) / len(scores) if scores else 0 | |
| data["configs_tested"] = len(scores) | |
| # Remove the raw scores to keep the analysis file smaller | |
| del data["all_scores"] | |
| # Save the analysis | |
| analysis = { | |
| "experiment_id": experiment_id, | |
| "total_simulations": len(results), | |
| "summary": summary, | |
| "query_analysis": query_analysis, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| with open(f"experiment_results/{experiment_id}_analysis.json", "w") as f: | |
| json.dump(analysis, f, indent=2) | |
| return analysis | |
| def compare_to_zero_shot(experiment_id: str, queries: List[str]) -> Dict[str, Any]: | |
| """ | |
| Compare IBFS results to zero-shot answers. | |
| Args: | |
| experiment_id: ID of the experiment to compare | |
| queries: List of queries to test with zero-shot | |
| Returns: | |
| Comparison results | |
| """ | |
| # First, get zero-shot answers for all queries | |
| zero_shot_results = [] | |
| print("Generating zero-shot answers...") | |
| for query in tqdm.tqdm(queries): | |
| try: | |
| # Generate zero-shot answer using function from ibfs.py | |
| answer = zero_shot_answer(query) | |
| # Save the result | |
| zero_shot_results.append({ | |
| "query": query, | |
| "zero_shot_answer": answer | |
| }) | |
| except Exception as e: | |
| print(f"Error generating zero-shot answer for '{query}': {e}") | |
| zero_shot_results.append({ | |
| "query": query, | |
| "zero_shot_answer": f"Error: {str(e)}", | |
| "error": True | |
| }) | |
| # Load the IBFS experiment results | |
| analysis = analyze_results(experiment_id) | |
| # Load the best configurations from the analysis | |
| best_config = analysis["summary"][0]["config"] if analysis["summary"] else None | |
| # For each query, compare the best IBFS result to the zero-shot answer | |
| comparison = [] | |
| print("Comparing zero-shot to IBFS results...") | |
| for zero_shot_result in tqdm.tqdm(zero_shot_results): | |
| query = zero_shot_result["query"] | |
| zero_shot_answer = zero_shot_result["zero_shot_answer"] | |
| # Find the best IBFS result for this query | |
| query_data = analysis.get("query_analysis", {}).get(query, {}) | |
| best_config_for_query = query_data.get("best_config", best_config) | |
| if best_config_for_query: | |
| # Find a simulation with this configuration and query | |
| matching_results = [] | |
| for filename in os.listdir("experiment_results"): | |
| if filename.startswith(experiment_id) and not filename.endswith( | |
| "config.json") and not filename.endswith("all_results.json") and not filename.endswith( | |
| "analysis.json"): | |
| try: | |
| with open(f"experiment_results/{filename}", "r") as f: | |
| result = json.load(f) | |
| if (result.get("query") == query and | |
| result.get("ibfs_config", {}).get("diversity_level") == best_config_for_query.get( | |
| "diversity_level") and | |
| result.get("ibfs_config", {}).get("branching_factor") == best_config_for_query.get( | |
| "branching_factor") and | |
| result.get("ibfs_config", {}).get("max_depth") == best_config_for_query.get( | |
| "max_depth") and | |
| result.get("user_config", {}).get("epsilon") == best_config_for_query.get( | |
| "epsilon")): | |
| matching_results.append(result) | |
| except: | |
| continue | |
| # Use the best matching result (if any) | |
| if matching_results: | |
| # Sort by similarity score (descending) | |
| matching_results.sort(key=lambda x: x.get("similarity_score", 0), reverse=True) | |
| best_ibfs_result = matching_results[0] | |
| # Compare zero-shot to user's preferred answer | |
| preferred_answer = best_ibfs_result.get("user_preferred_answer", "") | |
| zero_shot_similarity = evaluate_answer_similarity(preferred_answer, zero_shot_answer) | |
| # Get the IBFS similarity (already calculated) | |
| ibfs_similarity = best_ibfs_result.get("similarity_score", 0) | |
| comparison.append({ | |
| "query": query, | |
| "zero_shot_similarity": zero_shot_similarity, | |
| "ibfs_similarity": ibfs_similarity, | |
| "difference": ibfs_similarity - zero_shot_similarity, | |
| "ibfs_config": best_config_for_query | |
| }) | |
| else: | |
| print(f"No valid configuration found for query: {query}") | |
| # Calculate overall metrics | |
| zero_shot_avg = sum(item["zero_shot_similarity"] for item in comparison) / len(comparison) if comparison else 0 | |
| ibfs_avg = sum(item["ibfs_similarity"] for item in comparison) / len(comparison) if comparison else 0 | |
| avg_difference = sum(item["difference"] for item in comparison) / len(comparison) if comparison else 0 | |
| # Save the comparison results | |
| comparison_results = { | |
| "experiment_id": experiment_id, | |
| "zero_shot_avg_similarity": zero_shot_avg, | |
| "ibfs_avg_similarity": ibfs_avg, | |
| "avg_difference": avg_difference, | |
| "query_comparisons": comparison, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| with open(f"experiment_results/{experiment_id}_zero_shot_comparison.json", "w") as f: | |
| json.dump(comparison_results, f, indent=2) | |
| return comparison_results | |
| if __name__ == "__main__": | |
| # Ensure the prompts from YAML file are loaded | |
| if not PROMPTS: | |
| load_prompts() | |
| # Sample queries to test | |
| queries = [ | |
| "What are the environmental impacts of electric vehicles compared to traditional gasoline vehicles?", | |
| "How has artificial intelligence changed the job market in the past decade?", | |
| "What are the most effective strategies for reducing stress and anxiety?", | |
| "What are the arguments for and against universal basic income?", | |
| ] | |
| # Experiment parameters | |
| diversity_levels = ["low", "medium", "high"] | |
| branching_factors = [2, 4, 8] | |
| max_depths = [1, 2, 4] | |
| epsilon_values = [0.1, 0.3] | |
| # Run a smaller test experiment | |
| print("Running test experiment...") | |
| test_results = run_experiment( | |
| queries=queries[:1], # Just use the first query for testing | |
| diversity_levels=diversity_levels[:2], # Test low and medium diversity | |
| branching_factors=[2, 4], # Test b=2 and b=4 | |
| max_depths=[1, 2], # Test m=1 and m=2 | |
| epsilon_values=[0.2], # Test epsilon=0.2 | |
| repetitions=10, # Just 2 repetitions for testing | |
| max_workers=7 # Use 2 parallel workers for testing | |
| ) | |