Spaces:
Sleeping
Sleeping
| import concurrent.futures | |
| import time | |
| import os | |
| import json | |
| import re | |
| import logging | |
| from typing import List, Dict, Any | |
| import tinytroupe | |
| from tinytroupe.agent import TinyPerson | |
| from tinytroupe.factory import TinyPersonFactory | |
| from tinytroupe.extraction import ResultsExtractor | |
| from backend.core.config import settings | |
| from backend.services.persona_matcher import persona_matcher | |
| logger = logging.getLogger(__name__) | |
| class TinyTroupeSimulationManager: | |
| def __init__(self): | |
| self.max_concurrency = settings.MAX_CONCURRENCY | |
| self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_concurrency) | |
| def generate_personas_async(self, business_description: str, customer_profile: str, num_personas: int, job_id: str, job_registry): | |
| try: | |
| # 1. First, check if there are any matching personas in the local /app/personas repo | |
| matched_personas = persona_matcher.scan_tresor(business_description, customer_profile) | |
| # Filter matches > 85% assureness score | |
| valid_personas = [p for p in matched_personas if p.get("_assureness_score", 0) >= 85] | |
| # Number of personas we still need to generate | |
| missing_count = max(0, num_personas - len(valid_personas)) | |
| logger.info(f"Job {job_id}: Found {len(valid_personas)} matching personas. Generating {missing_count} new personas.") | |
| job_registry.update_job(job_id, progress_percentage=20) | |
| new_personas = [] | |
| # 2. Generate missing personas via TinyTroupe LLM call | |
| if missing_count > 0: | |
| try: | |
| # Utilize the TinyPersonFactory dynamic population pattern | |
| # Utilize the custom pipeline schema pattern for structured creation and validation | |
| # Utilize the TinyPersonFactory dynamic population pattern natively with parallel generation | |
| from tinytroupe.factory import TinyPersonFactory | |
| factory = TinyPersonFactory( | |
| sampling_space_description=customer_profile, | |
| total_population_size=missing_count, | |
| context=business_description | |
| ) | |
| logger.info(f"Job {job_id}: Generating {missing_count} personas via TinyPersonFactory with parallelize=True...") | |
| people = factory.generate_people(missing_count, parallelize=True) | |
| for i, person in enumerate(people): | |
| if person is not None and getattr(person, '_persona', None) is not None: | |
| persona_data = person._persona | |
| persona_data["_assureness_score"] = 100 # New ones are perfectly matched to the description | |
| new_personas.append(persona_data) | |
| # Safe filename parsing | |
| safe_name = person.name.lower().strip() | |
| safe_name = re.sub(r"[^\w\s-]", "", safe_name) | |
| safe_name = re.sub(r"[\s-]+", "_", safe_name)[:60] | |
| # Save to local file system for git sync | |
| local_dir = "/app/personas" | |
| os.makedirs(local_dir, exist_ok=True) | |
| file_path = os.path.join(local_dir, f"{safe_name}.json") | |
| with open(file_path, "w") as f: | |
| json.dump(persona_data, f, indent=4) | |
| job_registry.update_job(job_id, progress_percentage=20 + int((i+1)/missing_count * 60)) | |
| except Exception as e: | |
| logger.error(f"Error during persona generation: {e}") | |
| job_registry.update_job(job_id, status="FAILED", message=f"LLM Error: {str(e)}") | |
| return | |
| # Combine | |
| all_personas = valid_personas[:num_personas] + new_personas | |
| all_personas = all_personas[:num_personas] # Ensure we don't exceed the requested count | |
| # Push new personas | |
| if new_personas: | |
| from backend.services.git_sync import git_sync | |
| git_sync.background_push(commit_message=f"Added {len(new_personas)} new personas for job {job_id}") | |
| job_registry.update_job( | |
| job_id, | |
| status="COMPLETED", | |
| progress_percentage=100, | |
| results={"personas": all_personas} | |
| ) | |
| except Exception as e: | |
| logger.error(f"Job {job_id} failed: {e}") | |
| job_registry.update_job(job_id, status="FAILED", message=str(e)) | |
| def run_simulation_async(self, job_id: str, content_text: str, personas_data: List[Dict[str, Any]], format_type: str, parameters: Dict[str, Any], job_registry): | |
| try: | |
| job_registry.update_job(job_id, progress_percentage=10, status="RUNNING") | |
| # Instantiate TinyPersons | |
| persons = [] | |
| for p_data in personas_data: | |
| try: | |
| p = TinyPerson(name=p_data.get("name", "Unknown")) | |
| # Make sure TinyPerson uses the loaded dictionary for the _persona structure | |
| p._persona = p_data | |
| persons.append(p) | |
| except Exception as e: | |
| logger.error(f"Failed to load person data: {e}") | |
| if not persons: | |
| job_registry.update_job(job_id, status="FAILED", message="No valid personas provided") | |
| return | |
| results = [] | |
| # Run in parallel using the ThreadPoolExecutor | |
| def process_person(person: TinyPerson, index: int): | |
| try: | |
| # Send prompt context | |
| prompt = f"Please read this {format_type}:\n\n'{content_text}'" | |
| person.listen_and_act(prompt) | |
| # Use TinyTroupe native ResultsExtractor | |
| extractor = ResultsExtractor() | |
| objective = "Rate the impact, attention, and relevance of the content based on the agent's background. Ratings must be integer scores between 0 and 100. Also provide a detailed analytical comment." | |
| extracted_data = extractor.extract_results_from_agent( | |
| person, | |
| extraction_objective=objective, | |
| situation=f"Testing a new {format_type}", | |
| fields=["impact_score", "attention", "relevance", "comment"], | |
| fields_hints={ | |
| "impact_score": "Integer between 0 and 100", | |
| "attention": "Integer between 0 and 100", | |
| "relevance": "Integer between 0 and 100", | |
| "comment": "A string containing a descriptive paragraph" | |
| }, | |
| verbose=False | |
| ) | |
| # Safe parsing fallback | |
| if extracted_data is None or type(extracted_data) is not dict: | |
| extracted_data = {} | |
| extracted_data = {} | |
| parsed_response = { | |
| "name": person.name, | |
| "impact_score": int(extracted_data.get("impact_score", 50)), | |
| "attention": int(extracted_data.get("attention", 50)), | |
| "relevance": int(extracted_data.get("relevance", 50)), | |
| "comment": extracted_data.get("comment", "No comment provided.") | |
| } | |
| # Update progress | |
| current_prog = job_registry.get_job(job_id).get("progress_percentage", 10) | |
| progress_increment = max(1, 80 // len(persons)) | |
| job_registry.update_job(job_id, progress_percentage=current_prog + progress_increment) | |
| return parsed_response | |
| except Exception as e: | |
| logger.error(f"Person {person.name} failed to process: {e}") | |
| return {"name": person.name, "error": str(e)} | |
| futures = [self._executor.submit(process_person, p, i) for i, p in enumerate(persons)] | |
| for future in concurrent.futures.as_completed(futures): | |
| results.append(future.result()) | |
| # Aggregate results | |
| total_impact = sum(r.get("impact_score", 0) for r in results if "error" not in r) | |
| total_attention = sum(r.get("attention", 0) for r in results if "error" not in r) | |
| total_relevance = sum(r.get("relevance", 0) for r in results if "error" not in r) | |
| valid_count = len([r for r in results if "error" not in r]) | |
| agg_results = { | |
| "impact_score": total_impact // valid_count if valid_count else 0, | |
| "attention": total_attention // valid_count if valid_count else 0, | |
| "relevance": total_relevance // valid_count if valid_count else 0, | |
| "key_insights": [r.get("comment") for r in results if "error" not in r][:3], | |
| "agent_dialogue": results | |
| } | |
| job_registry.update_job( | |
| job_id, | |
| status="COMPLETED", | |
| progress_percentage=100, | |
| results=agg_results | |
| ) | |
| except Exception as e: | |
| logger.error(f"Simulation Job {job_id} failed: {e}") | |
| job_registry.update_job(job_id, status="FAILED", message=str(e)) | |
| tinytroupe_manager = TinyTroupeSimulationManager() | |