import json from typing import List, Dict, Any from pydantic import BaseModel from concurrent.futures import ThreadPoolExecutor from utils import call_llm import logging from prompts import ( GENERATE_USER_PARAMETERS_PROMPT, GENERATE_SYNTHETIC_PERSONAS_PROMPT, CHAT_WITH_PERSONA_PROMPT, ASK_QUESTIONS_TO_PERSONA_PROMPT, GENERATE_REPORT_PROMPT, CHAT_WITH_REPORT_PROMPT, GENERATE_AUDIENCE_NAME_PROMPT, UX_FSM_SIMPLE_PROMPT, persona_schema, answers_schema ) logger = logging.getLogger(__name__) MAX_WORKERS=10 def generate_user_parameters(audience: str, scope: str,n:int=24) -> List[str]: standard_parameters = ["Name", "Age", "Location", "Profession"] n_general=20 n_specific=n-n_general prompt = GENERATE_USER_PARAMETERS_PROMPT.format( audience=audience, scope=scope, standard_parameters=standard_parameters, n=n, n_specific=n_specific, n_general=n_general ) class Response(BaseModel): additional_parameters: list[str] response = call_llm(prompt=prompt, response_format=Response,model_type="mid",temperature=0) additional_parameters = json.loads(response)["additional_parameters"] return standard_parameters + additional_parameters def build_previous_personas_context(previous_personas: List[Dict[str, Any]]) -> str: previous_personas_context = "\n\n--- Existing Personas Context ---\n" for i, persona in enumerate(previous_personas, 1): previous_personas_context += f"\nExisting Persona {i}:\n" persona_str = "\n".join([f"{param}: {value}" for param, value in persona.items() if param != 'answers']) previous_personas_context += persona_str + "\n" previous_personas_context += "--- End Existing Personas Context ---" # Update requirement text only if context is added return previous_personas_context def generate_synthetic_personas(num_personas: int, audience: str, previous_personas: List[Dict[str, Any]] = None) -> Dict: """ Generate synthetic personas, ensuring variability by considering previously generated personas. Retries LLM calls if fewer personas than requested are returned, up to a limit. Args: parameters: List of parameters to include in each persona num_personas: Total number of personas to generate audience: Target audience for the personas previous_personas: Optional list of already existing personas to ensure differentiation. Returns: Dictionary containing the list of newly generated personas """ all_new_personas = [] max_iterations = 5 # Safety break to prevent infinite loops current_iteration = 0 while len(all_new_personas) < num_personas and current_iteration < max_iterations: current_iteration += 1 needed_personas = num_personas - len(all_new_personas) logger.info(f"Iteration {current_iteration}/{max_iterations}: Requesting {needed_personas} more personas (Total needed: {num_personas}, Have: {len(all_new_personas)})...") response_format = persona_schema(needed_personas) # Combine original previous_personas with those generated in this function's previous iterations current_context_personas = (previous_personas or []) + all_new_personas prompt = GENERATE_SYNTHETIC_PERSONAS_PROMPT.format( needed_personas=needed_personas, audience=audience ) if current_context_personas: # Add context and the requirement for differentiation prompt += "\n\n" prompt += f"To ensure diversity, we have already generated {len(current_context_personas)} persona(s) for this audience. Their details are listed below.\n" prompt += "LAST IMPORTANT REQUIREMENT: Each new persona you generate MUST be significantly different from these existing ones.\n" prompt += build_previous_personas_context(current_context_personas) # Appends the formatted list try: response_str = call_llm(prompt=prompt, response_format=response_format,temperature=1, model_type="mid",shuffle=False) response_data = json.loads(response_str) users_list = response_data.get("users_personas", []) iteration_personas = users_list num_received_iteration = len(iteration_personas) logger.info(f"Iteration {current_iteration}: Received {num_received_iteration} personas (requested {needed_personas}).") if num_received_iteration == 0 and needed_personas > 0: logger.warning(f"Iteration {current_iteration}: Received 0 personas despite needing {needed_personas}. Stopping attempts for this request.") break # Stop if LLM returns 0 when we still need more all_new_personas.extend(iteration_personas) except Exception as e: logger.error(f"Iteration {current_iteration}: Error during LLM call or processing: {e}") # Optionally break or continue based on desired robustness # For now, let's break if an error occurs during an iteration break # Final check and logging if len(all_new_personas) < num_personas: logger.warning(f"generate_synthetic_personas finished after {current_iteration} iterations, but only generated {len(all_new_personas)}/{num_personas} requested personas.") else: logger.info(f"generate_synthetic_personas successfully generated {len(all_new_personas)} personas in {current_iteration} iterations.") return {"users_personas": all_new_personas} # Renamed and simplified: Processes one question for one persona def ask_single_question_to_persona(persona: dict, question: str) -> str: """Asks a single question to a single persona and returns the answer.""" try: prompt = CHAT_WITH_PERSONA_PROMPT.format( persona=persona, question=question ) answer = call_llm(prompt=prompt,temperature=0, model_type="low",shuffle=False) return answer except Exception as e: logger.error(f"Error asking question '{question}' to persona {persona.get('Name', 'Unknown')}: {e}") return f"Error generating answer for question: {question}" def ask_all_questions_to_persona(persona: dict, questions: List[str],context:str=None) -> str: """Asks a single question to a single persona and returns the answer.""" response_format = answers_schema(len(questions)) try: prompt = ASK_QUESTIONS_TO_PERSONA_PROMPT.format( persona=persona, questions=questions, num_questions=len(questions) ) if context: prompt += f"\n\nHere is some context that might be relevant to the questions: {context}" response_str = call_llm(prompt=prompt,temperature=0.5, model_type="mid",response_format=response_format,shuffle=False) response_data = json.loads(response_str) answers = response_data.get("answers", []) return answers except Exception as e: logger.error(f"Error asking questions to persona {persona.get('Name', 'Unknown')}: {e}") return f"Error generating answers" def add_answers_to_users_bulk(users_personas: List, questions: List[str],context:str=None) -> List[Dict]: """ Adds answers to each user persona by asking all questions at once for each persona. Processes all personas in parallel for better efficiency. Args: users_personas: List of user personas questions: List of questions to ask each persona Returns: List of personas with their answers added """ personas_list = users_personas if not personas_list or not questions: # Return original personas if no personas or no questions for p in personas_list: p["answers"] = [] # Ensure 'answers' key exists even if empty return personas_list # Helper function to process a single persona with all questions def process_persona(persona): try: answers = ask_all_questions_to_persona(persona, questions,context) persona["answers"] = answers return persona except Exception as e: logger.error(f"Error processing all questions for persona {persona.get('Name', 'Unknown')}: {e}") persona["answers"] = ["Error generating answer"] * len(questions) return persona # Process all personas in parallel max_workers = min(MAX_WORKERS, len(personas_list)) with ThreadPoolExecutor(max_workers=max_workers) as executor: updated_personas = list(executor.map(process_persona, personas_list)) return updated_personas def add_answers_to_users(users_personas: List, questions: List[str]) -> List[Dict]: """ Adds answers to each user persona by processing a flat list of (persona, question) pairs in parallel. Ensures the order of answers matches the order of questions for each persona. """ personas_list = users_personas if not personas_list or not questions: # Return original personas if no personas or no questions for p in personas_list: p["answers"] = [] # Ensure 'answers' key exists even if empty return personas_list num_questions = len(questions) # 1. Create the flat list of tasks: [(persona, question), ...] # We need the original persona object in each task tasks = [] for persona in personas_list: for question in questions: tasks.append((persona, question)) # Tuple of (persona_dict, question_str) # Helper function to be mapped, unpacks the tuple def process_task(task_tuple): persona_dict, question_str = task_tuple return ask_single_question_to_persona(persona_dict, question_str) flat_answers = [] # Adjust max_workers based on total tasks and API limits max_workers = min(MAX_WORKERS, len(tasks)) # Increased potential workers with ThreadPoolExecutor(max_workers=max_workers) as executor: # 2. Process tasks in parallel, map preserves order flat_answers = list(executor.map(process_task, tasks)) # 3. Reconstruct the fleet with ordered answers fleet = [] answer_index = 0 for persona in personas_list: # Slice the flat_answers list to get answers for the current persona persona_answers = flat_answers[answer_index : answer_index + num_questions] persona["answers"] = persona_answers fleet.append(persona) answer_index += num_questions # Move index to the start of the next persona's answers return fleet def generate_content(fleet,questions=None,scope=None) -> str: content = "" if scope: content += f"Scope of Research:\n{scope}\n\n" if questions: content += "Questions:\n" for i, question in enumerate(questions, 1): content += f"Q{i}: {question}\n" content += "\n" for i, user in enumerate(fleet, 1): content += f"### User {i} ###\n" for key, value in user.items(): if key != "answers": content += f"{key}: {value}\n" content += "\n" for j, answer in enumerate(user.get("answers", []), 1): content += f"Q{j}: {answer}\n\n" content += "\n---\n\n" return content def generate_report(questions,fleet,scope) -> str: content=generate_content(questions=questions,fleet=fleet,scope=scope) prompt = GENERATE_REPORT_PROMPT.format( content=content, scope=scope ) report_text = call_llm(prompt=prompt,model_type="mid",temperature=0) return report_text def chat_with_persona(persona: dict, question: str, conversation_history: List[dict] = None) -> str: """ Chat with a specific persona, taking into account conversation history if provided. Args: persona: The user persona to chat with question: The current question to ask conversation_history: List of previous Q&A pairs, if any Returns: The persona's answer to the question """ history_context = "" if conversation_history: history_context = "\nPrevious conversation:\n" for chat in conversation_history: history_context += f"Q: {chat['question']}\n" history_context += f"A: {chat['answer']}\n" prompt = CHAT_WITH_PERSONA_PROMPT.format( persona=persona, question=question ) if conversation_history: prompt += f"\nHere you have the previous conversation, make sure to answer the question in a way that is consistent with it:\n{history_context}" return call_llm(prompt=prompt,temperature=0.5, model_type="mid",shuffle=False) def chat_with_report(users: List[dict], question: str, questions: List[str]) -> str: """ Chat with the content of a report, using the provided users' data. Args: users: List of user personas with their answers (fleet) question: The question to ask about the report content questions: List of questions that were asked to the users Returns: The answer based on the report content """ # Generate the content string that would be used in the report content = generate_content(fleet=users, questions=questions) prompt = CHAT_WITH_REPORT_PROMPT.format( content=content, question=question ) return call_llm(prompt=prompt,temperature=0, model_type="low") def generate_audience_name(audience: str, scope: str) -> str: """ Generate a concise audience name based on the provided audience description and scope. Args: audience: Detailed audience description scope: Research scope Returns: String containing a concise audience name """ prompt = GENERATE_AUDIENCE_NAME_PROMPT.format( audience=audience, scope=scope ) audience_name = call_llm(prompt=prompt, temperature=0, model_type="low") return audience_name.strip() def ux_testing_fsm(persona: dict, task: str, image: str, available_actions: list, session_history: list = None) -> dict: """ Conduct simple FSM-based UX testing with a persona. Args: persona: User persona to conduct testing with task: The task the persona needs to accomplish image: URL of the current interface image available_actions: List of available actions in current state session_history: List of previous steps in this session Returns: Dictionary with action_taken, thought, task_finished, and task_difficulty """ # Format available actions actions_text = ", ".join(available_actions) # Format session history if session_history: history_text = "Previous steps in this session:\n" for i, step in enumerate(session_history, 1): history_text += f"Step {i}: Action '{step.get('action_taken', 'unknown')}' - {step.get('thought', 'No thought recorded')}\n" else: history_text = "This is the first step of the session." prompt = UX_FSM_SIMPLE_PROMPT.format( persona=persona, task=task, available_actions=actions_text, session_history=history_text ) # Define response format for structured JSON response_format = { "type": "json_schema", "json_schema": { "name": "ux_testing_response", "schema": { "type": "object", "properties": { "action_taken": { "type": "string", "description": "The action chosen from available actions", "enum": available_actions }, "thought": { "type": "string", "description": "Reasoning for the action" }, "task_finished": { "type": "boolean", "description": "Whether the task is complete" }, "task_difficulty": { "type": ["number", "null"], "minimum": 1.0, "maximum": 5.0, "description": "Difficulty rating if task is finished" } }, "required": ["action_taken", "thought", "task_finished", "task_difficulty"], "additionalProperties": False }, "strict": True } } try: # Call LLM with the image and structured response format response = call_llm( prompt=prompt, temperature=0.7, model_type="mid", images=[image], response_format=response_format ) # Parse JSON response parsed_response = json.loads(response) # Validate action is in available actions if parsed_response.get("action_taken") not in available_actions: logger.warning(f"Persona chose invalid action: {parsed_response.get('action_taken')}. Using first available action.") parsed_response["action_taken"] = available_actions[0] if available_actions else "unknown" logger.info(f"UX FSM testing completed for persona: {persona.get('Name', 'Unknown')}") logger.info(f"Action taken: {parsed_response.get('action_taken')}") logger.info(f"Task finished: {parsed_response.get('task_finished')}") return parsed_response except Exception as e: logger.error(f"Error during UX FSM testing for persona {persona.get('Name', 'Unknown')}: {e}") return { "action_taken": available_actions[0] if available_actions else "unknown", "thought": f"Error occurred during testing: {str(e)}", "task_finished": False, "task_difficulty": None }