File size: 7,534 Bytes
6dad1de
e5ab379
 
6dad1de
e5ab379
6dad1de
e5ab379
 
6dad1de
 
e5ab379
6dad1de
 
e5ab379
6dad1de
 
 
e5ab379
6dad1de
 
 
 
 
 
 
 
 
 
e5ab379
 
 
6dad1de
 
 
 
 
 
e5ab379
6dad1de
 
 
e5ab379
 
 
 
 
 
 
 
 
 
6dad1de
e5ab379
 
6dad1de
e5ab379
6dad1de
e5ab379
 
 
 
 
 
6dad1de
e5ab379
 
 
 
6dad1de
e5ab379
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6dad1de
e5ab379
 
6dad1de
e5ab379
 
 
6dad1de
e5ab379
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
from typing import List, Dict, Any, Optional
import uuid
import threading
from datetime import datetime
from tinytroupe.agent import TinyPerson
from tinytroupe.social_network import NetworkTopology
from tinytroupe.environment.social_tiny_world import SocialTinyWorld, SimulationResult
from tinytroupe.agent.social_types import Content
from tinytroupe.ml_models import EngagementPredictor
from tinytroupe.content_generation import ContentVariantGenerator
from tinytroupe.network_generator import NetworkGenerator

class SimulationConfig:
    def __init__(self, name: str, persona_count: int = 10, network_type: str = "scale_free", **kwargs):
        self.name = name
        self.persona_count = persona_count
        self.network_type = network_type
        self.user_id = kwargs.get("user_id")

class Simulation:
    def __init__(self, id: str, config: SimulationConfig, world: SocialTinyWorld, personas: List[TinyPerson], network: NetworkTopology):
        self.id = id
        self.config = config
        self.world = world
        self.personas = personas
        self.network = network
        self.status = "ready"
        self.created_at = datetime.now()
        self.last_result: Optional[SimulationResult] = None
        self.chat_history: List[Dict[str, Any]] = []
        self.progress = 0.0

class SimulationManager:
    """Manages simulation lifecycle and execution"""
    
    def __init__(self):
        self.simulations: Dict[str, Simulation] = {}
        self.focus_groups: Dict[str, List[TinyPerson]] = {}
        self.predictor = EngagementPredictor()
        self.variant_generator = ContentVariantGenerator()
    
    def create_simulation(self, config: SimulationConfig, focus_group_name: str = None) -> Simulation:
        if focus_group_name and focus_group_name in self.focus_groups:
            personas = self.focus_groups[focus_group_name]
        else:
            from tinytroupe.factory.tiny_person_factory import TinyPersonFactory
            factory = TinyPersonFactory(
                context=config.name,
                total_population_size=config.persona_count
            )
            personas = factory.generate_people(number_of_people=config.persona_count)
        
        # Generate network
        net_gen = NetworkGenerator(personas)
        if config.network_type == "scale_free":
            network = net_gen.generate_scale_free_network(config.persona_count, 2)
        else:
            network = net_gen.generate_small_world_network(config.persona_count, 4, 0.1)
        
        # Create world
        world = SocialTinyWorld(config.name, network=network)
        for persona in personas:
            world.add_agent(persona)
        
        sim_id = str(uuid.uuid4())
        simulation = Simulation(sim_id, config, world, personas, network)
        self.simulations[sim_id] = simulation
        return simulation
    
    def run_simulation(self, simulation_id: str, content: Content, mode: str = "full", background: bool = False) -> Optional[SimulationResult]:
        if simulation_id not in self.simulations:
            raise ValueError(f"Simulation {simulation_id} not found.")
            
        simulation = self.simulations[simulation_id]
        
        if background:
            thread = threading.Thread(target=self._run_simulation_task, args=(simulation, content))
            thread.start()
            return None
        else:
            return self._run_simulation_task(simulation, content)

    def _run_simulation_task(self, simulation: Simulation, content: Content) -> SimulationResult:
        simulation.status = "running"
        simulation.progress = 0.1
        
        initial_viewers = [p.name for p in simulation.personas[:5]] # Seed with first 5
        
        # In a real async scenario, simulate_content_spread would update progress
        result = simulation.world.simulate_content_spread(content, initial_viewers)
        
        simulation.status = "completed"
        simulation.progress = 1.0
        simulation.last_result = result
        return result

    def send_chat_message(self, simulation_id: str, sender: str, message: str) -> Dict[str, Any]:
        sim = self.get_simulation(simulation_id)
        if not sim: raise ValueError(f"Simulation {simulation_id} not found.")
        
        msg = {
            "sender": sender,
            "message": message,
            "timestamp": datetime.now().isoformat()
        }
        sim.chat_history.append(msg)
        
        # Trigger persona responses if it's a "User" message
        if sender == "User":
            # For now, pick a random persona to respond
            import random
            responder = random.choice(sim.personas)
            # In a real implementation, the persona would "think" and "act"
            response_text = f"As a {responder._persona.get('occupation')}, I think: {message[:10]}... sounds interesting!"
            
            response_msg = {
                "sender": responder.name,
                "message": response_text,
                "timestamp": datetime.now().isoformat()
            }
            sim.chat_history.append(response_msg)
            
        return msg

    def get_chat_history(self, simulation_id: str) -> List[Dict[str, Any]]:
        sim = self.get_simulation(simulation_id)
        if not sim: return []
        return sim.chat_history
    
    def get_simulation(self, simulation_id: str, user_id: str = None) -> Optional[Simulation]:
        return self.simulations.get(simulation_id)

    def list_simulations(self) -> List[Dict[str, Any]]:
        return [
            {
                "id": sim.id,
                "name": sim.config.name,
                "status": sim.status,
                "persona_count": len(sim.personas),
                "created_at": sim.created_at.isoformat()
            }
            for sim in self.simulations.values()
        ]

    def get_persona(self, simulation_id: str, persona_name: str) -> Optional[Dict[str, Any]]:
        sim = self.get_simulation(simulation_id)
        if not sim: return None
        for p in sim.personas:
            if p.name == persona_name:
                return p._persona
        return None

    def list_personas(self, simulation_id: str) -> List[Dict[str, Any]]:
        sim = self.get_simulation(simulation_id)
        if not sim: return []
        return [p._persona for p in sim.personas]

    def save_focus_group(self, name: str, personas: List[TinyPerson]):
        self.focus_groups[name] = personas

    def list_focus_groups(self) -> List[str]:
        return list(self.focus_groups.keys())

    def get_focus_group(self, name: str) -> Optional[List[TinyPerson]]:
        return self.focus_groups.get(name)

    def delete_simulation(self, simulation_id: str) -> bool:
        if simulation_id in self.simulations:
            del self.simulations[simulation_id]
            return True
        return False

    def export_simulation(self, simulation_id: str) -> Optional[Dict[str, Any]]:
        sim = self.get_simulation(simulation_id)
        if not sim: return None
        return {
            "id": sim.id,
            "config": {
                "name": sim.config.name,
                "persona_count": sim.config.persona_count,
                "network_type": sim.config.network_type
            },
            "status": sim.status,
            "created_at": sim.created_at.isoformat(),
            "personas": [p._persona for p in sim.personas],
            "network": sim.network.get_metrics()
        }