Coverage for tinytroupe / environment / tiny_world.py: 0%

342 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-28 17:48 +0000

1from tinytroupe.environment import logger, default 

2 

3import copy 

4from datetime import datetime, timedelta 

5import textwrap 

6import random 

7import concurrent.futures 

8 

9from tinytroupe.agent import * 

10from tinytroupe.utils import name_or_empty, pretty_datetime 

11import tinytroupe.control as control 

12from tinytroupe.control import transactional 

13from tinytroupe import utils 

14from tinytroupe import config_manager 

15 

16from rich.console import Console 

17 

18from typing import Any, TypeVar, Union 

19AgentOrWorld = Union["TinyPerson", "TinyWorld"] 

20 

21class TinyWorld: 

22 """ 

23 Base class for environments. 

24 """ 

25 

26 # A dict of all environments created so far. 

27 all_environments = {} # name -> environment 

28 

29 # Whether to display environments communications or not, for all environments.  

30 communication_display = True 

31 

32 def __init__(self, name: str=None, agents=[], 

33 initial_datetime=datetime.now(), 

34 interventions=[], 

35 broadcast_if_no_target=True, 

36 max_additional_targets_to_display=3): 

37 """ 

38 Initializes an environment. 

39 

40 Args: 

41 name (str): The name of the environment. 

42 agents (list): A list of agents to add to the environment. 

43 initial_datetifme (datetime): The initial datetime of the environment, or None (i.e., explicit time is optional).  

44 Defaults to the current datetime in the real world. 

45 interventions (list): A list of interventions to apply in the environment at each simulation step. 

46 broadcast_if_no_target (bool): If True, broadcast actions if the target of an action is not found. 

47 max_additional_targets_to_display (int): The maximum number of additional targets to display in a communication. If None,  

48 all additional targets are displayed. 

49 """ 

50 

51 if name is not None: 

52 self.name = name 

53 else: 

54 self.name = f"TinyWorld {utils.fresh_id(self.__class__.__name__)}" 

55 

56 self.current_datetime = initial_datetime 

57 self.broadcast_if_no_target = broadcast_if_no_target 

58 self.simulation_id = None # will be reset later if the agent is used within a specific simulation scope 

59 

60 self.agents = [] 

61 self.name_to_agent = {} # {agent_name: agent, agent_name_2: agent_2, ...} 

62 

63 self._interventions = interventions 

64 

65 # the buffer of communications that have been displayed so far, used for 

66 # saving these communications to another output form later (e.g., caching) 

67 self._displayed_communications_buffer = [] 

68 

69 # a temporary buffer for communications target to make rendering easier 

70 self._target_display_communications_buffer = [] 

71 self._max_additional_targets_to_display = max_additional_targets_to_display 

72 

73 self.console = Console() 

74 

75 # add the environment to the list of all environments 

76 TinyWorld.add_environment(self) 

77 

78 self.add_agents(agents) 

79 

80 ####################################################################### 

81 # Simulation control methods 

82 ####################################################################### 

83 @transactional() 

84 def _step(self, 

85 timedelta_per_step=None, 

86 randomize_agents_order=True, 

87 parallelize=True): # TODO have a configuration for parallelism? 

88 """ 

89 Performs a single step in the environment. This default implementation 

90 simply calls makes all agents in the environment act and properly 

91 handle the resulting actions. Subclasses might override this method to implement  

92 different policies. 

93 """ 

94 

95 # Increase current datetime if timedelta is given. This must happen before 

96 # any other simulation updates, to make sure that the agents are acting 

97 # in the correct time, particularly if only one step is being run. 

98 self._advance_datetime(timedelta_per_step) 

99 

100 # Apply interventions.  

101 #  

102 # Why not in parallel? Owing to the very general nature of their potential effects, 

103 # interventions are never parallelized, since that could introduce unforeseen race conditions. 

104 for intervention in self._interventions: 

105 should_apply_intervention = intervention.check_precondition() 

106 if should_apply_intervention: 

107 if TinyWorld.communication_display: 

108 self._display_intervention_communication(intervention) 

109 intervention.apply_effect() 

110 

111 logger.debug(f"[{self.name}] Intervention '{intervention.name}' was applied.") 

112 

113 # Agents can act in parallel or sequentially 

114 if parallelize: 

115 agents_actions = self._step_in_parallel(timedelta_per_step=timedelta_per_step) 

116 else: 

117 agents_actions = self._step_sequentially(timedelta_per_step=timedelta_per_step, 

118 randomize_agents_order=randomize_agents_order) 

119 

120 return agents_actions 

121 

122 def _step_sequentially(self, timedelta_per_step=None, randomize_agents_order=True): 

123 """ 

124 The sequential version of the _step method to request agents to act.  

125 """ 

126 

127 # agents can act in a random order 

128 reordered_agents = copy.copy(self.agents) 

129 if randomize_agents_order: 

130 random.shuffle(reordered_agents) 

131 

132 # agents can act 

133 agents_actions = {} 

134 for agent in reordered_agents: 

135 logger.debug(f"[{self.name}] Agent {name_or_empty(agent)} is acting.") 

136 actions = agent.act(return_actions=True) 

137 agents_actions[agent.name] = actions 

138 

139 self._handle_actions(agent, agent.pop_latest_actions()) 

140 

141 return agents_actions 

142 

143 def _step_in_parallel(self, timedelta_per_step=None): 

144 """ 

145 A parallelized version of the _step method to request agents to act. 

146 """ 

147 

148 with concurrent.futures.ThreadPoolExecutor() as executor: 

149 futures = {executor.submit(agent.act, return_actions=True): agent for agent in self.agents} 

150 agents_actions = {} 

151 

152 # Wait for all futures to complete 

153 concurrent.futures.wait(futures.keys()) 

154 

155 for future in futures: 

156 agent = futures[future] 

157 try: 

158 actions = future.result() 

159 agents_actions[agent.name] = actions 

160 self._handle_actions(agent, agent.pop_latest_actions()) 

161 except Exception as exc: 

162 logger.error(f"[{self.name}] Agent {name_or_empty(agent)} generated an exception: {exc}") 

163 

164 return agents_actions 

165 

166 

167 

168 def _advance_datetime(self, timedelta): 

169 """ 

170 Advances the current datetime of the environment by the specified timedelta. 

171 

172 Args: 

173 timedelta (timedelta): The timedelta to advance the current datetime by. 

174 """ 

175 if timedelta is not None: 

176 self.current_datetime += timedelta 

177 else: 

178 logger.info(f"[{self.name}] No timedelta provided, so the datetime was not advanced.") 

179 

180 @transactional() 

181 @config_manager.config_defaults(parallelize="parallel_agent_actions") 

182 def run(self, steps: int, timedelta_per_step=None, return_actions=False, randomize_agents_order=True, parallelize=None): 

183 """ 

184 Runs the environment for a given number of steps. 

185 

186 Args: 

187 steps (int): The number of steps to run the environment for. 

188 timedelta_per_step (timedelta, optional): The time interval between steps. Defaults to None. 

189 return_actions (bool, optional): If True, returns the actions taken by the agents. Defaults to False. 

190 randomize_agents_order (bool, optional): If True, randomizes the order in which agents act. Defaults to True. 

191 parallelize (bool, optional): If True, agents act in parallel. Defaults to True. 

192  

193 Returns: 

194 list: A list of actions taken by the agents over time, if return_actions is True. The list has this format: 

195 [{agent_name: [action_1, action_2, ...]}, {agent_name_2: [action_1, action_2, ...]}, ...] 

196 """ 

197 agents_actions_over_time = [] 

198 for i in range(steps): 

199 logger.info(f"[{self.name}] Running world simulation step {i+1} of {steps}.") 

200 

201 if TinyWorld.communication_display: 

202 self._display_step_communication(cur_step=i+1, total_steps=steps, timedelta_per_step=timedelta_per_step) 

203 

204 agents_actions = self._step(timedelta_per_step=timedelta_per_step, randomize_agents_order=randomize_agents_order, parallelize=parallelize) 

205 agents_actions_over_time.append(agents_actions) 

206 

207 if return_actions: 

208 return agents_actions_over_time 

209 

210 @transactional() 

211 def skip(self, steps: int, timedelta_per_step=None): 

212 """ 

213 Skips a given number of steps in the environment. That is to say, time shall pass, but no actions will be taken 

214 by the agents or any other entity in the environment. 

215 

216 Args: 

217 steps (int): The number of steps to skip. 

218 timedelta_per_step (timedelta, optional): The time interval between steps. Defaults to None. 

219 """ 

220 self._advance_datetime(steps * timedelta_per_step) 

221 

222 @config_manager.config_defaults(parallelize="parallel_agent_actions") 

223 def run_minutes(self, minutes: int, randomize_agents_order=True, parallelize=None): 

224 """ 

225 Runs the environment for a given number of minutes. 

226 

227 Args: 

228 minutes (int): The number of minutes to run the environment for. 

229 """ 

230 self.run(steps=minutes, timedelta_per_step=timedelta(minutes=1), randomize_agents_order=randomize_agents_order, parallelize=parallelize) 

231 

232 def skip_minutes(self, minutes: int): 

233 """ 

234 Skips a given number of minutes in the environment. 

235 

236 Args: 

237 minutes (int): The number of minutes to skip. 

238 """ 

239 self.skip(steps=minutes, timedelta_per_step=timedelta(minutes=1)) 

240 

241 @config_manager.config_defaults(parallelize="parallel_agent_actions") 

242 def run_hours(self, hours: int, randomize_agents_order=True, parallelize=None): 

243 """ 

244 Runs the environment for a given number of hours. 

245 

246 Args: 

247 hours (int): The number of hours to run the environment for. 

248 """ 

249 self.run(steps=hours, timedelta_per_step=timedelta(hours=1), randomize_agents_order=randomize_agents_order, parallelize=parallelize) 

250 

251 def skip_hours(self, hours: int): 

252 """ 

253 Skips a given number of hours in the environment. 

254 

255 Args: 

256 hours (int): The number of hours to skip. 

257 """ 

258 self.skip(steps=hours, timedelta_per_step=timedelta(hours=1)) 

259 

260 @config_manager.config_defaults(parallelize="parallel_agent_actions") 

261 def run_days(self, days: int, randomize_agents_order=True, parallelize=None): 

262 """ 

263 Runs the environment for a given number of days. 

264 

265 Args: 

266 days (int): The number of days to run the environment for. 

267 """ 

268 self.run(steps=days, timedelta_per_step=timedelta(days=1), randomize_agents_order=randomize_agents_order, parallelize=parallelize) 

269 

270 def skip_days(self, days: int): 

271 """ 

272 Skips a given number of days in the environment. 

273 

274 Args: 

275 days (int): The number of days to skip. 

276 """ 

277 self.skip(steps=days, timedelta_per_step=timedelta(days=1)) 

278 

279 @config_manager.config_defaults(parallelize="parallel_agent_actions") 

280 def run_weeks(self, weeks: int, randomize_agents_order=True, parallelize=None): 

281 """ 

282 Runs the environment for a given number of weeks. 

283 

284 Args: 

285 weeks (int): The number of weeks to run the environment for. 

286 randomize_agents_order (bool, optional): If True, randomizes the order in which agents act. Defaults to True. 

287 """ 

288 self.run(steps=weeks, timedelta_per_step=timedelta(weeks=1), randomize_agents_order=randomize_agents_order, parallelize=parallelize) 

289 

290 def skip_weeks(self, weeks: int): 

291 """ 

292 Skips a given number of weeks in the environment. 

293 

294 Args: 

295 weeks (int): The number of weeks to skip. 

296 """ 

297 self.skip(steps=weeks, timedelta_per_step=timedelta(weeks=1)) 

298 

299 @config_manager.config_defaults(parallelize="parallel_agent_actions") 

300 def run_months(self, months: int, randomize_agents_order=True, parallelize=None): 

301 """ 

302 Runs the environment for a given number of months. 

303 

304 Args: 

305 months (int): The number of months to run the environment for. 

306 randomize_agents_order (bool, optional): If True, randomizes the order in which agents act. Defaults to True. 

307 """ 

308 self.run(steps=months, timedelta_per_step=timedelta(weeks=4), randomize_agents_order=randomize_agents_order, parallelize=parallelize) 

309 

310 def skip_months(self, months: int): 

311 """ 

312 Skips a given number of months in the environment. 

313 

314 Args: 

315 months (int): The number of months to skip. 

316 """ 

317 self.skip(steps=months, timedelta_per_step=timedelta(weeks=4)) 

318 

319 @config_manager.config_defaults(parallelize="parallel_agent_actions") 

320 def run_years(self, years: int, randomize_agents_order=True, parallelize=None): 

321 """ 

322 Runs the environment for a given number of years. 

323 

324 Args: 

325 years (int): The number of years to run the environment for. 

326 randomize_agents_order (bool, optional): If True, randomizes the order in which agents act. Defaults to True. 

327 """ 

328 self.run(steps=years, timedelta_per_step=timedelta(days=365), randomize_agents_order=randomize_agents_order, parallelize=parallelize) 

329 

330 def skip_years(self, years: int): 

331 """ 

332 Skips a given number of years in the environment. 

333 

334 Args: 

335 years (int): The number of years to skip. 

336 """ 

337 self.skip(steps=years, timedelta_per_step=timedelta(days=365)) 

338 

339 ####################################################################### 

340 # Agent management methods 

341 ####################################################################### 

342 def add_agents(self, agents: list): 

343 """ 

344 Adds a list of agents to the environment. 

345 

346 Args: 

347 agents (list): A list of agents to add to the environment. 

348 """ 

349 for agent in agents: 

350 self.add_agent(agent) 

351 

352 return self # for chaining 

353 

354 def add_agent(self, agent: TinyPerson): 

355 """ 

356 Adds an agent to the environment. The agent must have a unique name within the environment. 

357 

358 Args: 

359 agent (TinyPerson): The agent to add to the environment. 

360  

361 Raises: 

362 ValueError: If the agent name is not unique within the environment. 

363 """ 

364 

365 # check if the agent is not already in the environment 

366 if agent not in self.agents: 

367 logger.debug(f"Adding agent {agent.name} to the environment.") 

368 

369 # Agent names must be unique in the environment.  

370 # Check if the agent name is already there. 

371 if agent.name not in self.name_to_agent: 

372 agent.environment = self 

373 self.agents.append(agent) 

374 self.name_to_agent[agent.name] = agent 

375 else: 

376 raise ValueError(f"Agent names must be unique, but '{agent.name}' is already in the environment.") 

377 else: 

378 logger.warn(f"Agent {agent.name} is already in the environment.") 

379 

380 return self # for chaining 

381 

382 def remove_agent(self, agent: TinyPerson): 

383 """ 

384 Removes an agent from the environment. 

385 

386 Args: 

387 agent (TinyPerson): The agent to remove from the environment. 

388 """ 

389 logger.debug(f"Removing agent {agent.name} from the environment.") 

390 self.agents.remove(agent) 

391 del self.name_to_agent[agent.name] 

392 

393 return self # for chaining 

394 

395 def remove_all_agents(self): 

396 """ 

397 Removes all agents from the environment. 

398 """ 

399 logger.debug(f"Removing all agents from the environment.") 

400 self.agents = [] 

401 self.name_to_agent = {} 

402 

403 return self # for chaining 

404 

405 def get_agent_by_name(self, name: str) -> TinyPerson: 

406 """ 

407 Returns the agent with the specified name. If no agent with that name exists in the environment,  

408 returns None. 

409 

410 Args: 

411 name (str): The name of the agent to return. 

412 

413 Returns: 

414 TinyPerson: The agent with the specified name. 

415 """ 

416 if name in self.name_to_agent: 

417 return self.name_to_agent[name] 

418 else: 

419 return None 

420 

421 ####################################################################### 

422 # Intervention management methods 

423 ####################################################################### 

424 

425 def add_intervention(self, intervention): 

426 """ 

427 Adds an intervention to the environment. 

428 

429 Args: 

430 intervention: The intervention to add to the environment. 

431 """ 

432 self._interventions.append(intervention) 

433 

434 ####################################################################### 

435 # Action handlers 

436 # 

437 # Specific actions issued by agents are handled by the environment, 

438 # because they have effects beyond the agent itself. 

439 ####################################################################### 

440 @transactional() 

441 def _handle_actions(self, source: TinyPerson, actions: list): 

442 """  

443 Handles the actions issued by the agents. 

444 

445 Args: 

446 source (TinyPerson): The agent that issued the actions. 

447 actions (list): A list of actions issued by the agents. Each action is actually a 

448 JSON specification. 

449  

450 """ 

451 for action in actions: 

452 action_type = action["type"] # this is the only required field 

453 content = action["content"] if "content" in action else None 

454 target = action["target"] if "target" in action else None 

455 

456 logger.debug(f"[{self.name}] Handling action {action_type} from agent {name_or_empty(source)}. Content: {content}, target: {target}.") 

457 

458 # only some actions require the enviroment to intervene 

459 if action_type == "REACH_OUT": 

460 self._handle_reach_out(source, content, target) 

461 elif action_type == "TALK": 

462 self._handle_talk(source, content, target) 

463 

464 @transactional() 

465 def _handle_reach_out(self, source_agent: TinyPerson, content: str, target: str): 

466 """ 

467 Handles the REACH_OUT action. This default implementation always allows REACH_OUT to succeed. 

468 Subclasses might override this method to implement different policies. 

469 

470 Args: 

471 source_agent (TinyPerson): The agent that issued the REACH_OUT action. 

472 content (str): The content of the message. 

473 target (str): The target of the message. 

474 """ 

475 

476 # This default implementation always allows REACH_OUT to suceed. 

477 target_agent = self.get_agent_by_name(target) 

478 

479 if target_agent is not None: 

480 source_agent.make_agent_accessible(target_agent) 

481 target_agent.make_agent_accessible(source_agent) 

482 

483 source_agent.socialize(f"{name_or_empty(target_agent)} was successfully reached out, and is now available for interaction.", source=self) 

484 target_agent.socialize(f"{name_or_empty(source_agent)} reached out to you, and is now available for interaction.", source=self) 

485 

486 else: 

487 logger.debug(f"[{self.name}] REACH_OUT action failed: target agent '{target}' not found.") 

488 

489 @transactional() 

490 def _handle_talk(self, source_agent: TinyPerson, content: str, target: str): 

491 """ 

492 Handles the TALK action by delivering the specified content to the specified target. 

493 

494 Args: 

495 source_agent (TinyPerson): The agent that issued the TALK action. 

496 content (str): The content of the message. 

497 target (str, optional): The target of the message. 

498 """ 

499 target_agent = self.get_agent_by_name(target) 

500 

501 logger.debug(f"[{self.name}] Delivering message from {name_or_empty(source_agent)} to {name_or_empty(target_agent)}.") 

502 

503 if target_agent is not None: 

504 target_agent.listen(content, source=source_agent) 

505 elif self.broadcast_if_no_target: 

506 self.broadcast(content, source=source_agent) 

507 

508 ####################################################################### 

509 # Interaction methods 

510 ####################################################################### 

511 @transactional() 

512 def broadcast(self, speech: str, source: AgentOrWorld=None): 

513 """ 

514 Delivers a speech to all agents in the environment. 

515 

516 Args: 

517 speech (str): The content of the message. 

518 source (AgentOrWorld, optional): The agent or environment that issued the message. Defaults to None. 

519 """ 

520 logger.debug(f"[{self.name}] Broadcasting message: '{speech}'.") 

521 

522 for agent in self.agents: 

523 # do not deliver the message to the source 

524 if agent != source: 

525 agent.listen(speech, source=source) 

526 

527 @transactional() 

528 def broadcast_thought(self, thought: str, source: AgentOrWorld=None): 

529 """ 

530 Broadcasts a thought to all agents in the environment. 

531 

532 Args: 

533 thought (str): The content of the thought. 

534 """ 

535 logger.debug(f"[{self.name}] Broadcasting thought: '{thought}'.") 

536 

537 for agent in self.agents: 

538 agent.think(thought) 

539 

540 @transactional() 

541 def broadcast_internal_goal(self, internal_goal: str): 

542 """ 

543 Broadcasts an internal goal to all agents in the environment. 

544 

545 Args: 

546 internal_goal (str): The content of the internal goal. 

547 """ 

548 logger.debug(f"[{self.name}] Broadcasting internal goal: '{internal_goal}'.") 

549 

550 for agent in self.agents: 

551 agent.internalize_goal(internal_goal) 

552 

553 @transactional() 

554 def broadcast_context_change(self, context:list): 

555 """ 

556 Broadcasts a context change to all agents in the environment. 

557 

558 Args: 

559 context (list): The content of the context change. 

560 """ 

561 logger.debug(f"[{self.name}] Broadcasting context change: '{context}'.") 

562 

563 for agent in self.agents: 

564 agent.change_context(context) 

565 

566 def make_everyone_accessible(self): 

567 """ 

568 Makes all agents in the environment accessible to each other. 

569 """ 

570 for agent_1 in self.agents: 

571 for agent_2 in self.agents: 

572 if agent_1 != agent_2: 

573 agent_1.make_agent_accessible(agent_2) 

574 

575 

576 ########################################################### 

577 # Formatting conveniences 

578 ########################################################### 

579 

580 # TODO better names for these "display" methods 

581 def _display_step_communication(self, cur_step, total_steps, timedelta_per_step=None): 

582 """ 

583 Displays the current communication and stores it in a buffer for later use. 

584 """ 

585 rendering = self._pretty_step(cur_step=cur_step, total_steps=total_steps, timedelta_per_step=timedelta_per_step) 

586 

587 self._push_and_display_latest_communication({"kind": 'step', "rendering": rendering, "content": None, "source": None, "target": None}) 

588 

589 def _display_intervention_communication(self, intervention): 

590 """ 

591 Displays the current intervention communication and stores it in a buffer for later use. 

592 """ 

593 rendering = self._pretty_intervention(intervention) 

594 self._push_and_display_latest_communication({"kind": 'intervention', "rendering": rendering, "content": None, "source": None, "target": None}) 

595 

596 def _push_and_display_latest_communication(self, communication): 

597 """ 

598 Pushes the latest communications to the agent's buffer. 

599 """ 

600 # 

601 # check if the communication is just repeating the last one for a different target 

602 # 

603 if len(self._displayed_communications_buffer) > 0: 

604 # get values from last communication 

605 last_communication = self._displayed_communications_buffer[-1] 

606 last_kind = last_communication["kind"] 

607 last_target = last_communication["target"] 

608 last_source = last_communication["source"] 

609 if last_kind == 'action': 

610 last_content = last_communication["content"]["action"]["content"] 

611 last_type = last_communication["content"]["action"]["type"] 

612 elif last_kind == 'stimulus': 

613 last_content = last_communication["content"]["stimulus"]["content"] 

614 last_type = last_communication["content"]["stimulus"]["type"] 

615 elif last_kind == 'stimuli': 

616 last_stimulus = last_communication["content"]["stimuli"][0] 

617 last_content = last_stimulus["content"] 

618 last_type = last_stimulus["type"] 

619 else: 

620 last_content = None 

621 last_type = None 

622 

623 # get values from current communication 

624 current_kind = communication["kind"] 

625 current_target = communication["target"] 

626 current_source = communication["source"] 

627 if current_kind == 'action': 

628 current_content = communication["content"]["action"]["content"] 

629 current_type = communication["content"]["action"]["type"] 

630 elif current_kind == 'stimulus': 

631 current_content = communication["content"]["stimulus"]["content"] 

632 current_type = communication["content"]["stimulus"]["type"] 

633 elif current_kind == 'stimuli': 

634 current_stimulus = communication["content"]["stimuli"][0] 

635 current_content = current_stimulus["content"] 

636 current_type = current_stimulus["type"] 

637 else: 

638 current_content = None 

639 current_type = None 

640 

641 # if we are repeating the last communication, let's simplify the rendering 

642 if (last_source == current_source) and (last_type == current_type) and (last_kind == current_kind) and \ 

643 (last_content is not None) and (last_content == current_content) and \ 

644 (current_target is not None): 

645 

646 self._target_display_communications_buffer.append(current_target) 

647 

648 rich_style = utils.RichTextStyle.get_style_for(last_kind, last_type) 

649 

650 # print the additional target a limited number of times if a max is set, or 

651 # always if no max is set. 

652 if (self._max_additional_targets_to_display is None) or\ 

653 len(self._target_display_communications_buffer) < self._max_additional_targets_to_display: 

654 communication["rendering"] = " " * len(last_source) + f"[{rich_style}] + --> [underline]{current_target}[/][/]" 

655 

656 elif len(self._target_display_communications_buffer) == self._max_additional_targets_to_display: 

657 communication["rendering"] = " " * len(last_source) + f"[{rich_style}] + --> ...others...[/]" 

658 

659 else: # don't display anything anymore 

660 communication["rendering"] = None 

661 

662 else: 

663 # no repetition, so just display the communication and reset the targets buffer 

664 self._target_display_communications_buffer = [] # resets 

665 

666 else: 

667 # no repetition, so just display the communication and reset the targets buffer 

668 self._target_display_communications_buffer = [] # resets 

669 

670 

671 

672 self._displayed_communications_buffer.append(communication) 

673 self._display(communication) 

674 

675 def pop_and_display_latest_communications(self): 

676 """ 

677 Pops the latest communications and displays them. 

678 """ 

679 communications = self._displayed_communications_buffer 

680 self._displayed_communications_buffer = [] 

681 

682 for communication in communications: 

683 self._display(communication) 

684 

685 return communications 

686 

687 def _display(self, communication:dict): 

688 # unpack the rendering to find more info 

689 content = communication["rendering"] 

690 kind = communication["kind"] 

691 

692 if content is not None: 

693 # render as appropriate 

694 if kind == 'step': 

695 self.console.rule(content) 

696 else: 

697 self.console.print(content) 

698 

699 def clear_communications_buffer(self): 

700 """ 

701 Cleans the communications buffer. 

702 """ 

703 self._displayed_communications_buffer = [] 

704 

705 def __repr__(self): 

706 return f"TinyWorld(name='{self.name}')" 

707 

708 def _pretty_step(self, cur_step, total_steps, timedelta_per_step=None): 

709 rendering = f"{self.name} step {cur_step} of {total_steps}" 

710 if timedelta_per_step is not None: 

711 rendering += f" ({pretty_datetime(self.current_datetime)})" 

712 

713 return rendering 

714 

715 def _pretty_intervention(self, intervention): 

716 indent = " > " 

717 justification = textwrap.fill( 

718 intervention.precondition_justification(), 

719 width=TinyPerson.PP_TEXT_WIDTH, 

720 initial_indent=indent, 

721 subsequent_indent=indent, 

722 ) 

723 

724 rich_style = utils.RichTextStyle.get_style_for("intervention") 

725 rendering = f"[{rich_style}] :zap: [bold] <<{intervention.name}>> Triggered, effects are being applied...[/] \n" + \ 

726 f"[italic]{justification}[/][/]" 

727 # TODO add details about why the intervention was applied 

728 

729 return rendering 

730 

731 def pp_current_interactions(self, simplified=True, skip_system=True): 

732 """ 

733 Pretty prints the current messages from agents in this environment. 

734 """ 

735 print(self.pretty_current_interactions(simplified=simplified, skip_system=skip_system)) 

736 

737 def pretty_current_interactions(self, simplified=True, skip_system=True, max_content_length=default["max_content_display_length"], first_n=None, last_n=None, include_omission_info:bool=True): 

738 """ 

739 Returns a pretty, readable, string with the current messages of agents in this environment. 

740 """ 

741 agent_contents = [] 

742 

743 for agent in self.agents: 

744 agent_content = f"#### Interactions from the point of view of {agent.name} agent:\n" 

745 agent_content += f"**BEGIN AGENT {agent.name} HISTORY.**\n " 

746 agent_content += agent.pretty_current_interactions(simplified=simplified, skip_system=skip_system, max_content_length=max_content_length, first_n=first_n, last_n=last_n, include_omission_info=include_omission_info) + "\n" 

747 agent_content += f"**FINISHED AGENT {agent.name} HISTORY.**\n\n" 

748 agent_contents.append(agent_content) 

749 

750 return "\n".join(agent_contents) 

751 

752 ####################################################################### 

753 # IO 

754 ####################################################################### 

755 

756 def encode_complete_state(self) -> dict: 

757 """ 

758 Encodes the complete state of the environment in a dictionary. 

759 

760 Returns: 

761 dict: A dictionary encoding the complete state of the environment. 

762 """ 

763 to_copy = copy.copy(self.__dict__) 

764 

765 # remove the logger and other fields 

766 del to_copy['console'] 

767 del to_copy['agents'] 

768 del to_copy['name_to_agent'] 

769 del to_copy['current_datetime'] 

770 del to_copy['_interventions'] # TODO: encode interventions 

771 

772 state = copy.deepcopy(to_copy) 

773 

774 # agents are encoded separately 

775 state["agents"] = [agent.encode_complete_state() for agent in self.agents] 

776 

777 # datetime also has to be encoded separately 

778 state["current_datetime"] = self.current_datetime.isoformat() 

779 

780 return state 

781 

782 def decode_complete_state(self, state:dict): 

783 """ 

784 Decodes the complete state of the environment from a dictionary. 

785 

786 Args: 

787 state (dict): A dictionary encoding the complete state of the environment. 

788 

789 Returns: 

790 Self: The environment decoded from the dictionary. 

791 """ 

792 state = copy.deepcopy(state) 

793 

794 ################################# 

795 # restore agents in-place 

796 ################################# 

797 self.remove_all_agents() 

798 for agent_state in state["agents"]: 

799 try: 

800 try: 

801 agent = TinyPerson.get_agent_by_name(agent_state["name"]) 

802 except Exception as e: 

803 raise ValueError(f"Could not find agent {agent_state['name']} for environment {self.name}.") from e 

804 

805 agent.decode_complete_state(agent_state) 

806 self.add_agent(agent) 

807 

808 except Exception as e: 

809 raise ValueError(f"Could not decode agent {agent_state['name']} for environment {self.name}.") from e 

810 

811 # remove the agent states to update the rest of the environment 

812 del state["agents"] 

813 

814 # restore datetime 

815 state["current_datetime"] = datetime.fromisoformat(state["current_datetime"]) 

816 

817 # restore other fields 

818 self.__dict__.update(state) 

819 

820 return self 

821 

822 @staticmethod 

823 def add_environment(environment): 

824 """ 

825 Adds an environment to the list of all environments. Environment names must be unique, 

826 so if an environment with the same name already exists, an error is raised. 

827 """ 

828 if environment.name in TinyWorld.all_environments: 

829 raise ValueError(f"Environment names must be unique, but '{environment.name}' is already defined.") 

830 else: 

831 TinyWorld.all_environments[environment.name] = environment 

832 

833 

834 @staticmethod 

835 def set_simulation_for_free_environments(simulation): 

836 """ 

837 Sets the simulation if it is None. This allows free environments to be captured by specific simulation scopes 

838 if desired. 

839 """ 

840 for environment in TinyWorld.all_environments.values(): 

841 if environment.simulation_id is None: 

842 simulation.add_environment(environment) 

843 

844 @staticmethod 

845 def get_environment_by_name(name: str): 

846 """ 

847 Returns the environment with the specified name. If no environment with that name exists,  

848 returns None. 

849 

850 Args: 

851 name (str): The name of the environment to return. 

852 

853 Returns: 

854 TinyWorld: The environment with the specified name. 

855 """ 

856 if name in TinyWorld.all_environments: 

857 return TinyWorld.all_environments[name] 

858 else: 

859 return None 

860 

861 @staticmethod 

862 def clear_environments(): 

863 """ 

864 Clears the list of all environments. 

865 """ 

866 TinyWorld.all_environments = {}