Coverage for tinytroupe / environment / tiny_world.py: 0%
342 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-28 17:48 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-28 17:48 +0000
1from tinytroupe.environment import logger, default
3import copy
4from datetime import datetime, timedelta
5import textwrap
6import random
7import concurrent.futures
9from tinytroupe.agent import *
10from tinytroupe.utils import name_or_empty, pretty_datetime
11import tinytroupe.control as control
12from tinytroupe.control import transactional
13from tinytroupe import utils
14from tinytroupe import config_manager
16from rich.console import Console
18from typing import Any, TypeVar, Union
19AgentOrWorld = Union["TinyPerson", "TinyWorld"]
21class TinyWorld:
22 """
23 Base class for environments.
24 """
26 # A dict of all environments created so far.
27 all_environments = {} # name -> environment
29 # Whether to display environments communications or not, for all environments.
30 communication_display = True
32 def __init__(self, name: str=None, agents=[],
33 initial_datetime=datetime.now(),
34 interventions=[],
35 broadcast_if_no_target=True,
36 max_additional_targets_to_display=3):
37 """
38 Initializes an environment.
40 Args:
41 name (str): The name of the environment.
42 agents (list): A list of agents to add to the environment.
43 initial_datetifme (datetime): The initial datetime of the environment, or None (i.e., explicit time is optional).
44 Defaults to the current datetime in the real world.
45 interventions (list): A list of interventions to apply in the environment at each simulation step.
46 broadcast_if_no_target (bool): If True, broadcast actions if the target of an action is not found.
47 max_additional_targets_to_display (int): The maximum number of additional targets to display in a communication. If None,
48 all additional targets are displayed.
49 """
51 if name is not None:
52 self.name = name
53 else:
54 self.name = f"TinyWorld {utils.fresh_id(self.__class__.__name__)}"
56 self.current_datetime = initial_datetime
57 self.broadcast_if_no_target = broadcast_if_no_target
58 self.simulation_id = None # will be reset later if the agent is used within a specific simulation scope
60 self.agents = []
61 self.name_to_agent = {} # {agent_name: agent, agent_name_2: agent_2, ...}
63 self._interventions = interventions
65 # the buffer of communications that have been displayed so far, used for
66 # saving these communications to another output form later (e.g., caching)
67 self._displayed_communications_buffer = []
69 # a temporary buffer for communications target to make rendering easier
70 self._target_display_communications_buffer = []
71 self._max_additional_targets_to_display = max_additional_targets_to_display
73 self.console = Console()
75 # add the environment to the list of all environments
76 TinyWorld.add_environment(self)
78 self.add_agents(agents)
80 #######################################################################
81 # Simulation control methods
82 #######################################################################
83 @transactional()
84 def _step(self,
85 timedelta_per_step=None,
86 randomize_agents_order=True,
87 parallelize=True): # TODO have a configuration for parallelism?
88 """
89 Performs a single step in the environment. This default implementation
90 simply calls makes all agents in the environment act and properly
91 handle the resulting actions. Subclasses might override this method to implement
92 different policies.
93 """
95 # Increase current datetime if timedelta is given. This must happen before
96 # any other simulation updates, to make sure that the agents are acting
97 # in the correct time, particularly if only one step is being run.
98 self._advance_datetime(timedelta_per_step)
100 # Apply interventions.
101 #
102 # Why not in parallel? Owing to the very general nature of their potential effects,
103 # interventions are never parallelized, since that could introduce unforeseen race conditions.
104 for intervention in self._interventions:
105 should_apply_intervention = intervention.check_precondition()
106 if should_apply_intervention:
107 if TinyWorld.communication_display:
108 self._display_intervention_communication(intervention)
109 intervention.apply_effect()
111 logger.debug(f"[{self.name}] Intervention '{intervention.name}' was applied.")
113 # Agents can act in parallel or sequentially
114 if parallelize:
115 agents_actions = self._step_in_parallel(timedelta_per_step=timedelta_per_step)
116 else:
117 agents_actions = self._step_sequentially(timedelta_per_step=timedelta_per_step,
118 randomize_agents_order=randomize_agents_order)
120 return agents_actions
122 def _step_sequentially(self, timedelta_per_step=None, randomize_agents_order=True):
123 """
124 The sequential version of the _step method to request agents to act.
125 """
127 # agents can act in a random order
128 reordered_agents = copy.copy(self.agents)
129 if randomize_agents_order:
130 random.shuffle(reordered_agents)
132 # agents can act
133 agents_actions = {}
134 for agent in reordered_agents:
135 logger.debug(f"[{self.name}] Agent {name_or_empty(agent)} is acting.")
136 actions = agent.act(return_actions=True)
137 agents_actions[agent.name] = actions
139 self._handle_actions(agent, agent.pop_latest_actions())
141 return agents_actions
143 def _step_in_parallel(self, timedelta_per_step=None):
144 """
145 A parallelized version of the _step method to request agents to act.
146 """
148 with concurrent.futures.ThreadPoolExecutor() as executor:
149 futures = {executor.submit(agent.act, return_actions=True): agent for agent in self.agents}
150 agents_actions = {}
152 # Wait for all futures to complete
153 concurrent.futures.wait(futures.keys())
155 for future in futures:
156 agent = futures[future]
157 try:
158 actions = future.result()
159 agents_actions[agent.name] = actions
160 self._handle_actions(agent, agent.pop_latest_actions())
161 except Exception as exc:
162 logger.error(f"[{self.name}] Agent {name_or_empty(agent)} generated an exception: {exc}")
164 return agents_actions
168 def _advance_datetime(self, timedelta):
169 """
170 Advances the current datetime of the environment by the specified timedelta.
172 Args:
173 timedelta (timedelta): The timedelta to advance the current datetime by.
174 """
175 if timedelta is not None:
176 self.current_datetime += timedelta
177 else:
178 logger.info(f"[{self.name}] No timedelta provided, so the datetime was not advanced.")
180 @transactional()
181 @config_manager.config_defaults(parallelize="parallel_agent_actions")
182 def run(self, steps: int, timedelta_per_step=None, return_actions=False, randomize_agents_order=True, parallelize=None):
183 """
184 Runs the environment for a given number of steps.
186 Args:
187 steps (int): The number of steps to run the environment for.
188 timedelta_per_step (timedelta, optional): The time interval between steps. Defaults to None.
189 return_actions (bool, optional): If True, returns the actions taken by the agents. Defaults to False.
190 randomize_agents_order (bool, optional): If True, randomizes the order in which agents act. Defaults to True.
191 parallelize (bool, optional): If True, agents act in parallel. Defaults to True.
193 Returns:
194 list: A list of actions taken by the agents over time, if return_actions is True. The list has this format:
195 [{agent_name: [action_1, action_2, ...]}, {agent_name_2: [action_1, action_2, ...]}, ...]
196 """
197 agents_actions_over_time = []
198 for i in range(steps):
199 logger.info(f"[{self.name}] Running world simulation step {i+1} of {steps}.")
201 if TinyWorld.communication_display:
202 self._display_step_communication(cur_step=i+1, total_steps=steps, timedelta_per_step=timedelta_per_step)
204 agents_actions = self._step(timedelta_per_step=timedelta_per_step, randomize_agents_order=randomize_agents_order, parallelize=parallelize)
205 agents_actions_over_time.append(agents_actions)
207 if return_actions:
208 return agents_actions_over_time
210 @transactional()
211 def skip(self, steps: int, timedelta_per_step=None):
212 """
213 Skips a given number of steps in the environment. That is to say, time shall pass, but no actions will be taken
214 by the agents or any other entity in the environment.
216 Args:
217 steps (int): The number of steps to skip.
218 timedelta_per_step (timedelta, optional): The time interval between steps. Defaults to None.
219 """
220 self._advance_datetime(steps * timedelta_per_step)
222 @config_manager.config_defaults(parallelize="parallel_agent_actions")
223 def run_minutes(self, minutes: int, randomize_agents_order=True, parallelize=None):
224 """
225 Runs the environment for a given number of minutes.
227 Args:
228 minutes (int): The number of minutes to run the environment for.
229 """
230 self.run(steps=minutes, timedelta_per_step=timedelta(minutes=1), randomize_agents_order=randomize_agents_order, parallelize=parallelize)
232 def skip_minutes(self, minutes: int):
233 """
234 Skips a given number of minutes in the environment.
236 Args:
237 minutes (int): The number of minutes to skip.
238 """
239 self.skip(steps=minutes, timedelta_per_step=timedelta(minutes=1))
241 @config_manager.config_defaults(parallelize="parallel_agent_actions")
242 def run_hours(self, hours: int, randomize_agents_order=True, parallelize=None):
243 """
244 Runs the environment for a given number of hours.
246 Args:
247 hours (int): The number of hours to run the environment for.
248 """
249 self.run(steps=hours, timedelta_per_step=timedelta(hours=1), randomize_agents_order=randomize_agents_order, parallelize=parallelize)
251 def skip_hours(self, hours: int):
252 """
253 Skips a given number of hours in the environment.
255 Args:
256 hours (int): The number of hours to skip.
257 """
258 self.skip(steps=hours, timedelta_per_step=timedelta(hours=1))
260 @config_manager.config_defaults(parallelize="parallel_agent_actions")
261 def run_days(self, days: int, randomize_agents_order=True, parallelize=None):
262 """
263 Runs the environment for a given number of days.
265 Args:
266 days (int): The number of days to run the environment for.
267 """
268 self.run(steps=days, timedelta_per_step=timedelta(days=1), randomize_agents_order=randomize_agents_order, parallelize=parallelize)
270 def skip_days(self, days: int):
271 """
272 Skips a given number of days in the environment.
274 Args:
275 days (int): The number of days to skip.
276 """
277 self.skip(steps=days, timedelta_per_step=timedelta(days=1))
279 @config_manager.config_defaults(parallelize="parallel_agent_actions")
280 def run_weeks(self, weeks: int, randomize_agents_order=True, parallelize=None):
281 """
282 Runs the environment for a given number of weeks.
284 Args:
285 weeks (int): The number of weeks to run the environment for.
286 randomize_agents_order (bool, optional): If True, randomizes the order in which agents act. Defaults to True.
287 """
288 self.run(steps=weeks, timedelta_per_step=timedelta(weeks=1), randomize_agents_order=randomize_agents_order, parallelize=parallelize)
290 def skip_weeks(self, weeks: int):
291 """
292 Skips a given number of weeks in the environment.
294 Args:
295 weeks (int): The number of weeks to skip.
296 """
297 self.skip(steps=weeks, timedelta_per_step=timedelta(weeks=1))
299 @config_manager.config_defaults(parallelize="parallel_agent_actions")
300 def run_months(self, months: int, randomize_agents_order=True, parallelize=None):
301 """
302 Runs the environment for a given number of months.
304 Args:
305 months (int): The number of months to run the environment for.
306 randomize_agents_order (bool, optional): If True, randomizes the order in which agents act. Defaults to True.
307 """
308 self.run(steps=months, timedelta_per_step=timedelta(weeks=4), randomize_agents_order=randomize_agents_order, parallelize=parallelize)
310 def skip_months(self, months: int):
311 """
312 Skips a given number of months in the environment.
314 Args:
315 months (int): The number of months to skip.
316 """
317 self.skip(steps=months, timedelta_per_step=timedelta(weeks=4))
319 @config_manager.config_defaults(parallelize="parallel_agent_actions")
320 def run_years(self, years: int, randomize_agents_order=True, parallelize=None):
321 """
322 Runs the environment for a given number of years.
324 Args:
325 years (int): The number of years to run the environment for.
326 randomize_agents_order (bool, optional): If True, randomizes the order in which agents act. Defaults to True.
327 """
328 self.run(steps=years, timedelta_per_step=timedelta(days=365), randomize_agents_order=randomize_agents_order, parallelize=parallelize)
330 def skip_years(self, years: int):
331 """
332 Skips a given number of years in the environment.
334 Args:
335 years (int): The number of years to skip.
336 """
337 self.skip(steps=years, timedelta_per_step=timedelta(days=365))
339 #######################################################################
340 # Agent management methods
341 #######################################################################
342 def add_agents(self, agents: list):
343 """
344 Adds a list of agents to the environment.
346 Args:
347 agents (list): A list of agents to add to the environment.
348 """
349 for agent in agents:
350 self.add_agent(agent)
352 return self # for chaining
354 def add_agent(self, agent: TinyPerson):
355 """
356 Adds an agent to the environment. The agent must have a unique name within the environment.
358 Args:
359 agent (TinyPerson): The agent to add to the environment.
361 Raises:
362 ValueError: If the agent name is not unique within the environment.
363 """
365 # check if the agent is not already in the environment
366 if agent not in self.agents:
367 logger.debug(f"Adding agent {agent.name} to the environment.")
369 # Agent names must be unique in the environment.
370 # Check if the agent name is already there.
371 if agent.name not in self.name_to_agent:
372 agent.environment = self
373 self.agents.append(agent)
374 self.name_to_agent[agent.name] = agent
375 else:
376 raise ValueError(f"Agent names must be unique, but '{agent.name}' is already in the environment.")
377 else:
378 logger.warn(f"Agent {agent.name} is already in the environment.")
380 return self # for chaining
382 def remove_agent(self, agent: TinyPerson):
383 """
384 Removes an agent from the environment.
386 Args:
387 agent (TinyPerson): The agent to remove from the environment.
388 """
389 logger.debug(f"Removing agent {agent.name} from the environment.")
390 self.agents.remove(agent)
391 del self.name_to_agent[agent.name]
393 return self # for chaining
395 def remove_all_agents(self):
396 """
397 Removes all agents from the environment.
398 """
399 logger.debug(f"Removing all agents from the environment.")
400 self.agents = []
401 self.name_to_agent = {}
403 return self # for chaining
405 def get_agent_by_name(self, name: str) -> TinyPerson:
406 """
407 Returns the agent with the specified name. If no agent with that name exists in the environment,
408 returns None.
410 Args:
411 name (str): The name of the agent to return.
413 Returns:
414 TinyPerson: The agent with the specified name.
415 """
416 if name in self.name_to_agent:
417 return self.name_to_agent[name]
418 else:
419 return None
421 #######################################################################
422 # Intervention management methods
423 #######################################################################
425 def add_intervention(self, intervention):
426 """
427 Adds an intervention to the environment.
429 Args:
430 intervention: The intervention to add to the environment.
431 """
432 self._interventions.append(intervention)
434 #######################################################################
435 # Action handlers
436 #
437 # Specific actions issued by agents are handled by the environment,
438 # because they have effects beyond the agent itself.
439 #######################################################################
440 @transactional()
441 def _handle_actions(self, source: TinyPerson, actions: list):
442 """
443 Handles the actions issued by the agents.
445 Args:
446 source (TinyPerson): The agent that issued the actions.
447 actions (list): A list of actions issued by the agents. Each action is actually a
448 JSON specification.
450 """
451 for action in actions:
452 action_type = action["type"] # this is the only required field
453 content = action["content"] if "content" in action else None
454 target = action["target"] if "target" in action else None
456 logger.debug(f"[{self.name}] Handling action {action_type} from agent {name_or_empty(source)}. Content: {content}, target: {target}.")
458 # only some actions require the enviroment to intervene
459 if action_type == "REACH_OUT":
460 self._handle_reach_out(source, content, target)
461 elif action_type == "TALK":
462 self._handle_talk(source, content, target)
464 @transactional()
465 def _handle_reach_out(self, source_agent: TinyPerson, content: str, target: str):
466 """
467 Handles the REACH_OUT action. This default implementation always allows REACH_OUT to succeed.
468 Subclasses might override this method to implement different policies.
470 Args:
471 source_agent (TinyPerson): The agent that issued the REACH_OUT action.
472 content (str): The content of the message.
473 target (str): The target of the message.
474 """
476 # This default implementation always allows REACH_OUT to suceed.
477 target_agent = self.get_agent_by_name(target)
479 if target_agent is not None:
480 source_agent.make_agent_accessible(target_agent)
481 target_agent.make_agent_accessible(source_agent)
483 source_agent.socialize(f"{name_or_empty(target_agent)} was successfully reached out, and is now available for interaction.", source=self)
484 target_agent.socialize(f"{name_or_empty(source_agent)} reached out to you, and is now available for interaction.", source=self)
486 else:
487 logger.debug(f"[{self.name}] REACH_OUT action failed: target agent '{target}' not found.")
489 @transactional()
490 def _handle_talk(self, source_agent: TinyPerson, content: str, target: str):
491 """
492 Handles the TALK action by delivering the specified content to the specified target.
494 Args:
495 source_agent (TinyPerson): The agent that issued the TALK action.
496 content (str): The content of the message.
497 target (str, optional): The target of the message.
498 """
499 target_agent = self.get_agent_by_name(target)
501 logger.debug(f"[{self.name}] Delivering message from {name_or_empty(source_agent)} to {name_or_empty(target_agent)}.")
503 if target_agent is not None:
504 target_agent.listen(content, source=source_agent)
505 elif self.broadcast_if_no_target:
506 self.broadcast(content, source=source_agent)
508 #######################################################################
509 # Interaction methods
510 #######################################################################
511 @transactional()
512 def broadcast(self, speech: str, source: AgentOrWorld=None):
513 """
514 Delivers a speech to all agents in the environment.
516 Args:
517 speech (str): The content of the message.
518 source (AgentOrWorld, optional): The agent or environment that issued the message. Defaults to None.
519 """
520 logger.debug(f"[{self.name}] Broadcasting message: '{speech}'.")
522 for agent in self.agents:
523 # do not deliver the message to the source
524 if agent != source:
525 agent.listen(speech, source=source)
527 @transactional()
528 def broadcast_thought(self, thought: str, source: AgentOrWorld=None):
529 """
530 Broadcasts a thought to all agents in the environment.
532 Args:
533 thought (str): The content of the thought.
534 """
535 logger.debug(f"[{self.name}] Broadcasting thought: '{thought}'.")
537 for agent in self.agents:
538 agent.think(thought)
540 @transactional()
541 def broadcast_internal_goal(self, internal_goal: str):
542 """
543 Broadcasts an internal goal to all agents in the environment.
545 Args:
546 internal_goal (str): The content of the internal goal.
547 """
548 logger.debug(f"[{self.name}] Broadcasting internal goal: '{internal_goal}'.")
550 for agent in self.agents:
551 agent.internalize_goal(internal_goal)
553 @transactional()
554 def broadcast_context_change(self, context:list):
555 """
556 Broadcasts a context change to all agents in the environment.
558 Args:
559 context (list): The content of the context change.
560 """
561 logger.debug(f"[{self.name}] Broadcasting context change: '{context}'.")
563 for agent in self.agents:
564 agent.change_context(context)
566 def make_everyone_accessible(self):
567 """
568 Makes all agents in the environment accessible to each other.
569 """
570 for agent_1 in self.agents:
571 for agent_2 in self.agents:
572 if agent_1 != agent_2:
573 agent_1.make_agent_accessible(agent_2)
576 ###########################################################
577 # Formatting conveniences
578 ###########################################################
580 # TODO better names for these "display" methods
581 def _display_step_communication(self, cur_step, total_steps, timedelta_per_step=None):
582 """
583 Displays the current communication and stores it in a buffer for later use.
584 """
585 rendering = self._pretty_step(cur_step=cur_step, total_steps=total_steps, timedelta_per_step=timedelta_per_step)
587 self._push_and_display_latest_communication({"kind": 'step', "rendering": rendering, "content": None, "source": None, "target": None})
589 def _display_intervention_communication(self, intervention):
590 """
591 Displays the current intervention communication and stores it in a buffer for later use.
592 """
593 rendering = self._pretty_intervention(intervention)
594 self._push_and_display_latest_communication({"kind": 'intervention', "rendering": rendering, "content": None, "source": None, "target": None})
596 def _push_and_display_latest_communication(self, communication):
597 """
598 Pushes the latest communications to the agent's buffer.
599 """
600 #
601 # check if the communication is just repeating the last one for a different target
602 #
603 if len(self._displayed_communications_buffer) > 0:
604 # get values from last communication
605 last_communication = self._displayed_communications_buffer[-1]
606 last_kind = last_communication["kind"]
607 last_target = last_communication["target"]
608 last_source = last_communication["source"]
609 if last_kind == 'action':
610 last_content = last_communication["content"]["action"]["content"]
611 last_type = last_communication["content"]["action"]["type"]
612 elif last_kind == 'stimulus':
613 last_content = last_communication["content"]["stimulus"]["content"]
614 last_type = last_communication["content"]["stimulus"]["type"]
615 elif last_kind == 'stimuli':
616 last_stimulus = last_communication["content"]["stimuli"][0]
617 last_content = last_stimulus["content"]
618 last_type = last_stimulus["type"]
619 else:
620 last_content = None
621 last_type = None
623 # get values from current communication
624 current_kind = communication["kind"]
625 current_target = communication["target"]
626 current_source = communication["source"]
627 if current_kind == 'action':
628 current_content = communication["content"]["action"]["content"]
629 current_type = communication["content"]["action"]["type"]
630 elif current_kind == 'stimulus':
631 current_content = communication["content"]["stimulus"]["content"]
632 current_type = communication["content"]["stimulus"]["type"]
633 elif current_kind == 'stimuli':
634 current_stimulus = communication["content"]["stimuli"][0]
635 current_content = current_stimulus["content"]
636 current_type = current_stimulus["type"]
637 else:
638 current_content = None
639 current_type = None
641 # if we are repeating the last communication, let's simplify the rendering
642 if (last_source == current_source) and (last_type == current_type) and (last_kind == current_kind) and \
643 (last_content is not None) and (last_content == current_content) and \
644 (current_target is not None):
646 self._target_display_communications_buffer.append(current_target)
648 rich_style = utils.RichTextStyle.get_style_for(last_kind, last_type)
650 # print the additional target a limited number of times if a max is set, or
651 # always if no max is set.
652 if (self._max_additional_targets_to_display is None) or\
653 len(self._target_display_communications_buffer) < self._max_additional_targets_to_display:
654 communication["rendering"] = " " * len(last_source) + f"[{rich_style}] + --> [underline]{current_target}[/][/]"
656 elif len(self._target_display_communications_buffer) == self._max_additional_targets_to_display:
657 communication["rendering"] = " " * len(last_source) + f"[{rich_style}] + --> ...others...[/]"
659 else: # don't display anything anymore
660 communication["rendering"] = None
662 else:
663 # no repetition, so just display the communication and reset the targets buffer
664 self._target_display_communications_buffer = [] # resets
666 else:
667 # no repetition, so just display the communication and reset the targets buffer
668 self._target_display_communications_buffer = [] # resets
672 self._displayed_communications_buffer.append(communication)
673 self._display(communication)
675 def pop_and_display_latest_communications(self):
676 """
677 Pops the latest communications and displays them.
678 """
679 communications = self._displayed_communications_buffer
680 self._displayed_communications_buffer = []
682 for communication in communications:
683 self._display(communication)
685 return communications
687 def _display(self, communication:dict):
688 # unpack the rendering to find more info
689 content = communication["rendering"]
690 kind = communication["kind"]
692 if content is not None:
693 # render as appropriate
694 if kind == 'step':
695 self.console.rule(content)
696 else:
697 self.console.print(content)
699 def clear_communications_buffer(self):
700 """
701 Cleans the communications buffer.
702 """
703 self._displayed_communications_buffer = []
705 def __repr__(self):
706 return f"TinyWorld(name='{self.name}')"
708 def _pretty_step(self, cur_step, total_steps, timedelta_per_step=None):
709 rendering = f"{self.name} step {cur_step} of {total_steps}"
710 if timedelta_per_step is not None:
711 rendering += f" ({pretty_datetime(self.current_datetime)})"
713 return rendering
715 def _pretty_intervention(self, intervention):
716 indent = " > "
717 justification = textwrap.fill(
718 intervention.precondition_justification(),
719 width=TinyPerson.PP_TEXT_WIDTH,
720 initial_indent=indent,
721 subsequent_indent=indent,
722 )
724 rich_style = utils.RichTextStyle.get_style_for("intervention")
725 rendering = f"[{rich_style}] :zap: [bold] <<{intervention.name}>> Triggered, effects are being applied...[/] \n" + \
726 f"[italic]{justification}[/][/]"
727 # TODO add details about why the intervention was applied
729 return rendering
731 def pp_current_interactions(self, simplified=True, skip_system=True):
732 """
733 Pretty prints the current messages from agents in this environment.
734 """
735 print(self.pretty_current_interactions(simplified=simplified, skip_system=skip_system))
737 def pretty_current_interactions(self, simplified=True, skip_system=True, max_content_length=default["max_content_display_length"], first_n=None, last_n=None, include_omission_info:bool=True):
738 """
739 Returns a pretty, readable, string with the current messages of agents in this environment.
740 """
741 agent_contents = []
743 for agent in self.agents:
744 agent_content = f"#### Interactions from the point of view of {agent.name} agent:\n"
745 agent_content += f"**BEGIN AGENT {agent.name} HISTORY.**\n "
746 agent_content += agent.pretty_current_interactions(simplified=simplified, skip_system=skip_system, max_content_length=max_content_length, first_n=first_n, last_n=last_n, include_omission_info=include_omission_info) + "\n"
747 agent_content += f"**FINISHED AGENT {agent.name} HISTORY.**\n\n"
748 agent_contents.append(agent_content)
750 return "\n".join(agent_contents)
752 #######################################################################
753 # IO
754 #######################################################################
756 def encode_complete_state(self) -> dict:
757 """
758 Encodes the complete state of the environment in a dictionary.
760 Returns:
761 dict: A dictionary encoding the complete state of the environment.
762 """
763 to_copy = copy.copy(self.__dict__)
765 # remove the logger and other fields
766 del to_copy['console']
767 del to_copy['agents']
768 del to_copy['name_to_agent']
769 del to_copy['current_datetime']
770 del to_copy['_interventions'] # TODO: encode interventions
772 state = copy.deepcopy(to_copy)
774 # agents are encoded separately
775 state["agents"] = [agent.encode_complete_state() for agent in self.agents]
777 # datetime also has to be encoded separately
778 state["current_datetime"] = self.current_datetime.isoformat()
780 return state
782 def decode_complete_state(self, state:dict):
783 """
784 Decodes the complete state of the environment from a dictionary.
786 Args:
787 state (dict): A dictionary encoding the complete state of the environment.
789 Returns:
790 Self: The environment decoded from the dictionary.
791 """
792 state = copy.deepcopy(state)
794 #################################
795 # restore agents in-place
796 #################################
797 self.remove_all_agents()
798 for agent_state in state["agents"]:
799 try:
800 try:
801 agent = TinyPerson.get_agent_by_name(agent_state["name"])
802 except Exception as e:
803 raise ValueError(f"Could not find agent {agent_state['name']} for environment {self.name}.") from e
805 agent.decode_complete_state(agent_state)
806 self.add_agent(agent)
808 except Exception as e:
809 raise ValueError(f"Could not decode agent {agent_state['name']} for environment {self.name}.") from e
811 # remove the agent states to update the rest of the environment
812 del state["agents"]
814 # restore datetime
815 state["current_datetime"] = datetime.fromisoformat(state["current_datetime"])
817 # restore other fields
818 self.__dict__.update(state)
820 return self
822 @staticmethod
823 def add_environment(environment):
824 """
825 Adds an environment to the list of all environments. Environment names must be unique,
826 so if an environment with the same name already exists, an error is raised.
827 """
828 if environment.name in TinyWorld.all_environments:
829 raise ValueError(f"Environment names must be unique, but '{environment.name}' is already defined.")
830 else:
831 TinyWorld.all_environments[environment.name] = environment
834 @staticmethod
835 def set_simulation_for_free_environments(simulation):
836 """
837 Sets the simulation if it is None. This allows free environments to be captured by specific simulation scopes
838 if desired.
839 """
840 for environment in TinyWorld.all_environments.values():
841 if environment.simulation_id is None:
842 simulation.add_environment(environment)
844 @staticmethod
845 def get_environment_by_name(name: str):
846 """
847 Returns the environment with the specified name. If no environment with that name exists,
848 returns None.
850 Args:
851 name (str): The name of the environment to return.
853 Returns:
854 TinyWorld: The environment with the specified name.
855 """
856 if name in TinyWorld.all_environments:
857 return TinyWorld.all_environments[name]
858 else:
859 return None
861 @staticmethod
862 def clear_environments():
863 """
864 Clears the list of all environments.
865 """
866 TinyWorld.all_environments = {}