UserSyncUI / docs /api /tinytroupe /control.html
harvesthealth's picture
Upload folder using huggingface_hub
f6686e1 verified
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, minimum-scale=1" />
<meta name="generator" content="pdoc 0.10.0" />
<title>tinytroupe.control API documentation</title>
<meta name="description" content="Simulation controlling mechanisms." />
<link rel="preload stylesheet" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/10up-sanitize.css/11.0.1/sanitize.min.css" integrity="sha256-PK9q560IAAa6WVRRh76LtCaI8pjTJ2z11v0miyNNjrs=" crossorigin>
<link rel="preload stylesheet" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/10up-sanitize.css/11.0.1/typography.min.css" integrity="sha256-7l/o7C8jubJiy74VsKTidCy1yBkRtiUGbVkYBylBqUg=" crossorigin>
<link rel="stylesheet preload" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/10.1.1/styles/github.min.css" crossorigin>
<style>:root{--highlight-color:#fe9}.flex{display:flex !important}body{line-height:1.5em}#content{padding:20px}#sidebar{padding:30px;overflow:hidden}#sidebar > *:last-child{margin-bottom:2cm}.http-server-breadcrumbs{font-size:130%;margin:0 0 15px 0}#footer{font-size:.75em;padding:5px 30px;border-top:1px solid #ddd;text-align:right}#footer p{margin:0 0 0 1em;display:inline-block}#footer p:last-child{margin-right:30px}h1,h2,h3,h4,h5{font-weight:300}h1{font-size:2.5em;line-height:1.1em}h2{font-size:1.75em;margin:1em 0 .50em 0}h3{font-size:1.4em;margin:25px 0 10px 0}h4{margin:0;font-size:105%}h1:target,h2:target,h3:target,h4:target,h5:target,h6:target{background:var(--highlight-color);padding:.2em 0}a{color:#058;text-decoration:none;transition:color .3s ease-in-out}a:hover{color:#e82}.title code{font-weight:bold}h2[id^="header-"]{margin-top:2em}.ident{color:#900}pre code{background:#f8f8f8;font-size:.8em;line-height:1.4em}code{background:#f2f2f1;padding:1px 4px;overflow-wrap:break-word}h1 code{background:transparent}pre{background:#f8f8f8;border:0;border-top:1px solid #ccc;border-bottom:1px solid #ccc;margin:1em 0;padding:1ex}#http-server-module-list{display:flex;flex-flow:column}#http-server-module-list div{display:flex}#http-server-module-list dt{min-width:10%}#http-server-module-list p{margin-top:0}.toc ul,#index{list-style-type:none;margin:0;padding:0}#index code{background:transparent}#index h3{border-bottom:1px solid #ddd}#index ul{padding:0}#index h4{margin-top:.6em;font-weight:bold}@media (min-width:200ex){#index .two-column{column-count:2}}@media (min-width:300ex){#index .two-column{column-count:3}}dl{margin-bottom:2em}dl dl:last-child{margin-bottom:4em}dd{margin:0 0 1em 3em}#header-classes + dl > dd{margin-bottom:3em}dd dd{margin-left:2em}dd p{margin:10px 0}.name{background:#eee;font-weight:bold;font-size:.85em;padding:5px 10px;display:inline-block;min-width:40%}.name:hover{background:#e0e0e0}dt:target .name{background:var(--highlight-color)}.name > span:first-child{white-space:nowrap}.name.class > span:nth-child(2){margin-left:.4em}.inherited{color:#999;border-left:5px solid #eee;padding-left:1em}.inheritance em{font-style:normal;font-weight:bold}.desc h2{font-weight:400;font-size:1.25em}.desc h3{font-size:1em}.desc dt code{background:inherit}.source summary,.git-link-div{color:#666;text-align:right;font-weight:400;font-size:.8em;text-transform:uppercase}.source summary > *{white-space:nowrap;cursor:pointer}.git-link{color:inherit;margin-left:1em}.source pre{max-height:500px;overflow:auto;margin:0}.source pre code{font-size:12px;overflow:visible}.hlist{list-style:none}.hlist li{display:inline}.hlist li:after{content:',\2002'}.hlist li:last-child:after{content:none}.hlist .hlist{display:inline;padding-left:1em}img{max-width:100%}td{padding:0 .5em}.admonition{padding:.1em .5em;margin-bottom:1em}.admonition-title{font-weight:bold}.admonition.note,.admonition.info,.admonition.important{background:#aef}.admonition.todo,.admonition.versionadded,.admonition.tip,.admonition.hint{background:#dfd}.admonition.warning,.admonition.versionchanged,.admonition.deprecated{background:#fd4}.admonition.error,.admonition.danger,.admonition.caution{background:lightpink}</style>
<style media="screen and (min-width: 700px)">@media screen and (min-width:700px){#sidebar{width:30%;height:100vh;overflow:auto;position:sticky;top:0}#content{width:70%;max-width:100ch;padding:3em 4em;border-left:1px solid #ddd}pre code{font-size:1em}.item .name{font-size:1em}main{display:flex;flex-direction:row-reverse;justify-content:flex-end}.toc ul ul,#index ul{padding-left:1.5em}.toc > ul > li{margin-top:.5em}}</style>
<style media="print">@media print{#sidebar h1{page-break-before:always}.source{display:none}}@media print{*{background:transparent !important;color:#000 !important;box-shadow:none !important;text-shadow:none !important}a[href]:after{content:" (" attr(href) ")";font-size:90%}a[href][title]:after{content:none}abbr[title]:after{content:" (" attr(title) ")"}.ir a:after,a[href^="javascript:"]:after,a[href^="#"]:after{content:""}pre,blockquote{border:1px solid #999;page-break-inside:avoid}thead{display:table-header-group}tr,img{page-break-inside:avoid}img{max-width:100% !important}@page{margin:0.5cm}p,h2,h3{orphans:3;widows:3}h1,h2,h3,h4,h5,h6{page-break-after:avoid}}</style>
<script defer src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/10.1.1/highlight.min.js" integrity="sha256-Uv3H6lx7dJmRfRvH8TH6kJD1TSK1aFcwgx+mdg3epi8=" crossorigin></script>
<script>window.addEventListener('DOMContentLoaded', () => hljs.initHighlighting())</script>
</head>
<body>
<main>
<article id="content">
<header>
<h1 class="title">Module <code>tinytroupe.control</code></h1>
</header>
<section id="section-intro">
<p>Simulation controlling mechanisms.</p>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">&#34;&#34;&#34;
Simulation controlling mechanisms.
&#34;&#34;&#34;
import json
import os
import tempfile
import threading
import traceback
import tinytroupe
import tinytroupe.utils as utils
import uuid
import logging
logger = logging.getLogger(&#34;tinytroupe&#34;)
# to protect from race conditions when running in parallel
concurrent_execution_lock = threading.Lock()
class Simulation:
STATUS_STOPPED = &#34;stopped&#34;
STATUS_STARTED = &#34;started&#34;
def __init__(self, id=&#34;default&#34;, cached_trace:list=None):
self.id = id
self.agents = []
self.name_to_agent = {} # {agent_name: agent, ...}
self.environments = []
self.factories = [] # e.g., TinyPersonFactory instances
self.name_to_factory = {} # {factory_name: factory, ...}
self.name_to_environment = {} # {environment_name: environment, ...}
self.status = Simulation.STATUS_STOPPED
self.cache_path = f&#34;./tinytroupe-{id}.cache.json&#34; # default cache path
# should we always automatically checkpoint at the every transaction?
self.auto_checkpoint = False
# whether there are changes not yet saved to the cache file
self.has_unsaved_cache_changes = False
# whether the agent is under a transaction or not, used for managing
# simulation caching later
self._under_transaction = {None: False}
# whether the agent is under a parallel transactions segment or not, used for managing
# simulation caching later
self._under_parallel_transactions = False
# Cache chain mechanism.
#
# stores a list of simulation states.
# Each state is a tuple (prev_node_hash, event_hash, event_output, state), where prev_node_hash is a hash of the previous node in this chain,
# if any, event_hash is a hash of the event that triggered the transition to this state, if any, event_output is the output of the event,
# if any, and state is the actual complete state that resulted.
if cached_trace is None:
self.cached_trace = []
else:
self.cached_trace = cached_trace
self.cache_misses = 0
self.cache_hits = 0
# Execution chain mechanism.
#
# The actual, current, execution trace. Each state is a tuple (prev_node_hash, event_hash, state), where prev_node_hash is a hash
# of the previous node in this chain, if any, event_hash is a hash of the event that triggered the transition to this state, if any,
# event_output is the output of the event, if any, and state is the actual complete state that resulted.
self.execution_trace = []
def begin(self, cache_path:str=None, auto_checkpoint:bool=False):
&#34;&#34;&#34;
Marks the start of the simulation being controlled.
Args:
cache_path (str): The path to the cache file. If not specified,
defaults to the default cache path defined in the class.
auto_checkpoint (bool, optional): Whether to automatically checkpoint at the end of each transaction. Defaults to False.
&#34;&#34;&#34;
logger.debug(f&#34;Starting simulation, cache_path={cache_path}, auto_checkpoint={auto_checkpoint}.&#34;)
# local import to avoid circular dependencies
from tinytroupe.agent import TinyPerson
from tinytroupe.environment import TinyWorld
from tinytroupe.factory.tiny_factory import TinyFactory
from tinytroupe.factory.tiny_person_factory import TinyPersonFactory
if self.status == Simulation.STATUS_STOPPED:
self.status = Simulation.STATUS_STARTED
else:
raise ValueError(&#34;Simulation is already started.&#34;)
if cache_path is not None:
self.cache_path = cache_path
# should we automatically checkpoint?
self.auto_checkpoint = auto_checkpoint
# clear the agents, environments and other simulated entities, we&#39;ll track them from now on
TinyPerson.clear_agents()
TinyWorld.clear_environments()
TinyFactory.clear_factories()
TinyPersonFactory.clear_factories()
# All automated fresh ids will start from 0 again for this simulation
utils.reset_fresh_id()
# load the cache file, if any
if self.cache_path is not None:
self._load_cache_file(self.cache_path)
def end(self):
&#34;&#34;&#34;
Marks the end of the simulation being controlled.
&#34;&#34;&#34;
logger.debug(&#34;Ending simulation.&#34;)
if self.status == Simulation.STATUS_STARTED:
self.status = Simulation.STATUS_STOPPED
self.checkpoint()
else:
raise ValueError(&#34;Simulation is already stopped.&#34;)
def checkpoint(self):
&#34;&#34;&#34;
Saves current simulation trace to a file.
&#34;&#34;&#34;
logger.debug(&#34;Checkpointing simulation state...&#34;)
# save the cache file
if self.has_unsaved_cache_changes:
self._save_cache_file(self.cache_path)
else:
logger.debug(&#34;No unsaved cache changes to save to file.&#34;)
def add_agent(self, agent):
&#34;&#34;&#34;
Adds an agent to the simulation.
&#34;&#34;&#34;
if agent.name in self.name_to_agent:
raise ValueError(f&#34;Agent names must be unique, but &#39;{agent.name}&#39; is already defined.&#34;)
agent.simulation_id = self.id
self.agents.append(agent)
self.name_to_agent[agent.name] = agent
def add_environment(self, environment):
&#34;&#34;&#34;
Adds an environment to the simulation.
&#34;&#34;&#34;
if environment.name in self.name_to_environment:
raise ValueError(f&#34;Environment names must be unique, but &#39;{environment.name}&#39; is already defined.&#34;)
environment.simulation_id = self.id
self.environments.append(environment)
self.name_to_environment[environment.name] = environment
def add_factory(self, factory):
&#34;&#34;&#34;
Adds a factory to the simulation.
&#34;&#34;&#34;
if factory.name in self.name_to_factory:
raise ValueError(f&#34;Factory names must be unique, but &#39;{factory.name}&#39; is already defined.&#34;)
factory.simulation_id = self.id
self.factories.append(factory)
self.name_to_factory[factory.name] = factory
###################################################################################################
# Cache and execution chain mechanisms
###################################################################################################
def _execution_trace_position(self) -&gt; int:
&#34;&#34;&#34;
Returns the current position in the execution trace, or -1 if the execution trace is empty.
&#34;&#34;&#34;
return len(self.execution_trace) - 1
def _function_call_hash(self, function_name, *args, **kwargs) -&gt; int:
&#34;&#34;&#34;
Computes the hash of the given function call.
&#34;&#34;&#34;
# if functions are passed as arguments to the function, there&#39;s the problem that their
# string representation always changes due to memory position (e.g., &lt;function my_function at 0x7f8d1a7b7d30&gt;).
# so we need to remove the changing suffix in those cases, while preserving the function name if it exists.
# positional arguments
# covnerts to a list of string representations first
args_str = list(map(str, args))
for i, arg in enumerate(args):
if callable(arg):
args_str[i] = arg.__name__
# keyword arguments
# converts to a list of string representations first
kwargs_str = {k: str(v) for k, v in kwargs.items()}
for k, v in kwargs.items():
if callable(v):
kwargs_str[k] = v.__name__
# then, convert to a single string, to obtain a unique hash
event = str((function_name, args_str, kwargs_str))
# TODO actually compute a short hash of the event string, e.g., using SHA256 ?
# event_hash = utils.custom_hash(event)
return event
def _skip_execution_with_cache(self):
&#34;&#34;&#34;
Skips the current execution, assuming there&#39;s a cached state at the same position.
&#34;&#34;&#34;
assert len(self.cached_trace) &gt; self._execution_trace_position() + 1, &#34;There&#39;s no cached state at the current execution position.&#34;
self.execution_trace.append(self.cached_trace[self._execution_trace_position() + 1])
def _is_transaction_event_cached(self, event_hash, parallel=False) -&gt; bool:
&#34;&#34;&#34;
Checks whether the given event hash matches the corresponding cached one, if any.
If there&#39;s no corresponding cached state, returns True.
&#34;&#34;&#34;
if not parallel:
# there&#39;s cache that could be used
if len(self.cached_trace) &gt; self._execution_trace_position() + 1:
if self._execution_trace_position() &gt;= -1:
# here&#39;s a graphical depiction of the logic:
#
# Cache: c0:(c_prev_node_hash_0, c_event_hash_0, _, c_state_0) ------------------&gt; c1:(c_prev_node_hash_1, c_event_hash_1, _, c_state_1) -&gt; ...
# Execution: e0:(e_prev_node_hash_0, e_event_hash_0, _, e_state_0) -&lt;being computed&gt;-&gt; e1:(e_prev_node_hash_1, &lt;being computed&gt;, &lt;being computed&gt;, &lt;being computed&gt;)
# position = 0 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
#
# Must satisfy:
# - event_hash == c_event_hash_1
# - hash(e0) == c_prev_node_hash_1
try:
event_hash_match = event_hash == self.cached_trace[self._execution_trace_position() + 1][1]
except Exception as e:
logger.error(f&#34;Error while checking event hash match: {e}&#34;)
event_hash_match = False
prev_node_match = True # TODO implement real check
return event_hash_match and prev_node_match
else:
raise ValueError(&#34;Execution trace position is invalid, must be &gt;= -1, but is &#34;, self._execution_trace_position())
else: # no cache to use
return False
else: # parallel
if len(self.cached_trace) &gt;= self._execution_trace_position():
if self._execution_trace_position() &gt;= 0:
# parallel stores ignore order, so we need to check instead whether the event hash is a key in the parallel store,
# regardless of the order of the events generated the data therein.
if isinstance(self.cached_trace[self._execution_trace_position()], dict):
event_hash_match = event_hash in self.cached_trace[self._execution_trace_position()].keys()
else:
event_hash_match = False
prev_node_match = True # TODO implement real check
return event_hash_match and prev_node_match
else:
raise ValueError(&#34;Execution trace position is invalid, must be &gt;= 0, but is &#34;, self._execution_trace_position())
def _get_cached_parallel_value(self, event_hash, key):
parallel_store = self.cached_trace[self._execution_trace_position()]
value = parallel_store[event_hash][key]
return value
def _drop_cached_trace_suffix(self):
&#34;&#34;&#34;
Drops the cached trace suffix starting at the current execution trace position. This effectively
refreshes the cache to the current execution state and starts building a new cache from there.
&#34;&#34;&#34;
self.cached_trace = self.cached_trace[:self._execution_trace_position()+1]
def _add_to_execution_trace(self, state: dict, event_hash: int, event_output, parallel=False):
&#34;&#34;&#34;
Adds a state to the execution_trace list and computes the appropriate hash.
The computed hash is compared to the hash of the cached trace at the same position,
and if they don&#39;t match, the execution is aborted. Similarly, the event_hash is compared
to the hash of the event in the cached trace at the same position, and if they don&#39;t match, the execution
is aborted.
&#34;&#34;&#34;
# Compute the hash of the previous execution pair, if any
previous_hash = None
if not parallel:
# Create a tuple of (hash, state) and append it to the execution_trace list
self.execution_trace.append((previous_hash, event_hash, event_output, state))
else:
with concurrent_execution_lock:
# state is not stored in parallel segments, only outputs
self.execution_trace[-1][event_hash] = {&#34;prev_node_hash&#34;: previous_hash,
&#34;encoded_output&#34;: event_output}
def _add_to_cache_trace(self, state: dict, event_hash: int, event_output, parallel=False):
&#34;&#34;&#34;
Adds a state to the cached_trace list and computes the appropriate hash.
&#34;&#34;&#34;
# Compute the hash of the previous cached pair, if any
previous_hash = None
if self.cached_trace:
previous_hash = utils.custom_hash(self.cached_trace[-1])
if not parallel:
# Create a tuple of (hash, state) and append it to the cached_trace list
self.cached_trace.append((previous_hash, event_hash, event_output, state))
else:
with concurrent_execution_lock:
# state is not stored in parallel segments, only outputs
self.cached_trace[-1][event_hash] = {&#34;prev_node_hash&#34;: previous_hash,
&#34;encoded_output&#34;: event_output}
self.has_unsaved_cache_changes = True
def _load_cache_file(self, cache_path:str):
&#34;&#34;&#34;
Loads the cache file from the given path.
&#34;&#34;&#34;
try:
self.cached_trace = json.load(open(cache_path, &#34;r&#34;))
except FileNotFoundError:
logger.info(f&#34;Cache file not found on path: {cache_path}.&#34;)
self.cached_trace = []
def _save_cache_file(self, cache_path:str):
&#34;&#34;&#34;
Saves the cache file to the given path. Always overwrites.
&#34;&#34;&#34;
logger.debug(f&#34;Now saving cache file to {cache_path}.&#34;)
try:
# Create a temporary file
with tempfile.NamedTemporaryFile(&#39;w&#39;, delete=False) as temp:
json.dump(self.cached_trace, temp, indent=4)
# Replace the original file with the temporary file
os.replace(temp.name, cache_path)
except Exception as e:
traceback_string = &#39;&#39;.join(traceback.format_tb(e.__traceback__))
logger.error(f&#34;An error occurred while saving the cache file: {e}\nTraceback:\n{traceback_string}&#34;)
self.has_unsaved_cache_changes = False
###################################################################################################
# Transactional control
###################################################################################################
#
# Regular sequential transactions
#
def begin_transaction(self, id=None):
&#34;&#34;&#34;
Starts a transaction.
&#34;&#34;&#34;
with concurrent_execution_lock:
self._under_transaction[id] = True
self._clear_communications_buffers() # TODO &lt;----------------------------------------------------------------
def end_transaction(self, id=None):
&#34;&#34;&#34;
Ends a transaction.
&#34;&#34;&#34;
with concurrent_execution_lock:
self._under_transaction[id] = False
def is_under_transaction(self, id=None):
&#34;&#34;&#34;
Checks if the agent is under a transaction.
&#34;&#34;&#34;
with concurrent_execution_lock:
return self._under_transaction.get(id, False)
def _clear_communications_buffers(self):
&#34;&#34;&#34;
Cleans the communications buffers of all agents and environments.
&#34;&#34;&#34;
for agent in self.agents:
agent.clear_communications_buffer()
for environment in self.environments:
environment.clear_communications_buffer()
#
# Parallel transactions
#
def begin_parallel_transactions(self):
&#34;&#34;&#34;
Starts parallel transactions.
&#34;&#34;&#34;
with concurrent_execution_lock:
self._under_parallel_transactions = True
# add a new parallel segment to the execution and cache traces
self.execution_trace.append({})
self.cached_trace.append({})
def end_parallel_transactions(self):
&#34;&#34;&#34;
Ends parallel transactions.
&#34;&#34;&#34;
self._under_parallel_transactions = False
def is_under_parallel_transactions(self):
&#34;&#34;&#34;
Checks if the agent is under parallel transactions.
&#34;&#34;&#34;
return self._under_parallel_transactions
###################################################################################################
# Simulation state handling
###################################################################################################
def _encode_simulation_state(self) -&gt; dict:
&#34;&#34;&#34;
Encodes the current simulation state, including agents, environments, and other
relevant information.
&#34;&#34;&#34;
state = {}
# Encode agents
state[&#34;agents&#34;] = []
for agent in self.agents:
state[&#34;agents&#34;].append(agent.encode_complete_state())
# Encode environments
state[&#34;environments&#34;] = []
for environment in self.environments:
state[&#34;environments&#34;].append(environment.encode_complete_state())
# Encode factories
state[&#34;factories&#34;] = []
for factory in self.factories:
state[&#34;factories&#34;].append(factory.encode_complete_state())
return state
def _decode_simulation_state(self, state: dict):
&#34;&#34;&#34;
Decodes the given simulation state, including agents, environments, and other
relevant information.
Args:
state (dict): The state to decode.
&#34;&#34;&#34;
# local import to avoid circular dependencies
from tinytroupe.agent import TinyPerson
from tinytroupe.environment import TinyWorld
logger.debug(f&#34;Decoding simulation state: {state[&#39;factories&#39;]}&#34;)
logger.debug(f&#34;Registered factories: {self.name_to_factory}&#34;)
logger.debug(f&#34;Registered agents: {self.name_to_agent}&#34;)
logger.debug(f&#34;Registered environments: {self.name_to_environment}&#34;)
# Decode factories
for factory_state in state[&#34;factories&#34;]:
factory = self.name_to_factory[factory_state[&#34;name&#34;]]
factory.decode_complete_state(factory_state)
# Decode environments
###self.environments = []
for environment_state in state[&#34;environments&#34;]:
try:
environment = self.name_to_environment[environment_state[&#34;name&#34;]]
environment.decode_complete_state(environment_state)
if TinyWorld.communication_display:
environment.pop_and_display_latest_communications()
except Exception as e:
raise ValueError(f&#34;Environment {environment_state[&#39;name&#39;]} is not in the simulation, thus cannot be decoded there.&#34;) from e
# Decode agents (if they were not already decoded by the environment)
####self.agents = []
for agent_state in state[&#34;agents&#34;]:
try:
agent = self.name_to_agent[agent_state[&#34;name&#34;]]
agent.decode_complete_state(agent_state)
# The agent has not yet been decoded because it is not in any environment. So, decode it.
if agent.environment is None:
if TinyPerson.communication_display:
agent.pop_and_display_latest_communications()
except Exception as e:
raise ValueError(f&#34;Agent {agent_state[&#39;name&#39;]} is not in the simulation, thus cannot be decoded there.&#34;) from e
class Transaction:
def __init__(self, obj_under_transaction, simulation, function, *args, **kwargs):
# local import to avoid circular dependencies
from tinytroupe.agent import TinyPerson
from tinytroupe.environment import TinyWorld
from tinytroupe.factory.tiny_factory import TinyFactory
self.obj_under_transaction = obj_under_transaction
self.simulation = simulation
self.function_name = function.__name__
self.function = function
self.args = args
self.kwargs = kwargs
#
# If we have an ongoing simulation, set the simulation id of the object under transaction if it is not already set.
#
if simulation is not None:
if hasattr(obj_under_transaction, &#39;simulation_id&#39;) and obj_under_transaction.simulation_id is not None:
if obj_under_transaction.simulation_id != simulation.id:
raise ValueError(f&#34;Object {obj_under_transaction} is already captured by a different simulation (id={obj_under_transaction.simulation_id}), \
and cannot be captured by simulation id={simulation.id}.&#34;)
logger.debug(f&#34;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt; Object {obj_under_transaction} is already captured by simulation {simulation.id}.&#34;)
else:
# if is a TinyPerson, add the agent to the simulation
if isinstance(obj_under_transaction, TinyPerson):
simulation.add_agent(obj_under_transaction)
logger.debug(f&#34;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt; Added agent {obj_under_transaction} to simulation {simulation.id}.&#34;)
# if is a TinyWorld, add the environment to the simulation
elif isinstance(obj_under_transaction, TinyWorld):
simulation.add_environment(obj_under_transaction)
# if is a TinyFactory, add the factory to the simulation
elif isinstance(obj_under_transaction, TinyFactory):
simulation.add_factory(obj_under_transaction)
logger.debug(f&#34;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt; Added factory {obj_under_transaction} to simulation {simulation.id}.&#34;)
else:
raise ValueError(f&#34;Object {obj_under_transaction} (type = {type(obj_under_transaction)}) is not a TinyPerson or TinyWorld instance, and cannot be captured by the simulation.&#34;)
def execute(self, begin_parallel=False, parallel_id=None):
output = None
# Transaction caching will only operate if there is a simulation and it is started
if self.simulation is None or self.simulation.status == Simulation.STATUS_STOPPED:
# Compute the function and return it, no caching, since the simulation is not started
output = self.function(*self.args, **self.kwargs)
elif self.simulation.status == Simulation.STATUS_STARTED:
# Compute the event hash
event_hash = self.simulation._function_call_hash(self.function_name, *self.args, **self.kwargs)
# Sequential and parallel transactions are handled in different ways
if begin_parallel:
self.simulation.begin_parallel_transactions()
# CACHED? Check if the event hash is in the cache
if self.simulation._is_transaction_event_cached(event_hash,
parallel=self.simulation.is_under_parallel_transactions()):
self.simulation.cache_hits += 1
# Restore the full state and return the cached output
logger.info(f&#34;Skipping execution of {self.function_name} with args {self.args} and kwargs {self.kwargs} because it is already cached.&#34;)
# SEQUENTIAL
if not self.simulation.is_under_parallel_transactions():
self.simulation._skip_execution_with_cache()
state = self.simulation.cached_trace[self.simulation._execution_trace_position()][3] # state
self.simulation._decode_simulation_state(state)
# Output encoding/decoding is used to preserve references to TinyPerson and TinyWorld instances
# mainly. Scalar values (int, float, str, bool) and composite values (list, dict) are
# encoded/decoded as is.
encoded_output = self.simulation.cached_trace[self.simulation._execution_trace_position()][2] # output
output = self._decode_function_output(encoded_output)
# PARALLEL
else: # is under parallel transactions
# in parallel segments, state is not restored, only outputs
encoded_output = self.simulation._get_cached_parallel_value(event_hash, &#34;encoded_output&#34;)
output = self._decode_function_output(encoded_output)
else: # not cached
if not begin_parallel:
# in case of beginning a parallel segment, we don&#39;t want to count it as a cache miss,
# since the segment itself will not be cached, but rather the events within it.
self.simulation.cache_misses += 1
if not self.simulation.is_under_transaction(id=parallel_id) and not begin_parallel:
# BEGIN SEQUENTIAL TRANSACTION ###############################################################
#
# if this is the beginning of a parallel segment, we don&#39;t need to begin a transaction, since
# we want to allow additional transactions within the parallel segment (i.e., one-level reentrancy).
if not begin_parallel:
self.simulation.begin_transaction(id=parallel_id)
# Compute the function and encode the relevant output and simulation state
output = self.function(*self.args, **self.kwargs)
self._save_output_with_simulation_state(event_hash, output)
# END TRANSACTION #################################################################
if not begin_parallel:
self.simulation.end_transaction(id=parallel_id)
else: # already under transaction (thus, now a reentrant transaction) OR beginning a parallel segment
# NOTES:
#
# - Reentrant sequential transactions are not cached, since what matters is the final result of
# the top-level transaction.
#
# - The event that starts the parallel transactions segment WILL NOT itself be cached, since
# it is not part of the parallel segment, but rather the beginning of it. This event will be
# reconstructed during runtime from the parallel events within the segment.
output = self.function(*self.args, **self.kwargs)
if begin_parallel:
self.simulation.end_parallel_transactions()
# execute an ad-hoc Transaction to save the simulation state AFTER the parallel segment is done.
Transaction(self.obj_under_transaction, self.simulation, lambda: True).execute(begin_parallel=False, parallel_id=parallel_id)
else:
raise ValueError(f&#34;Simulation status is invalid at this point: {self.simulation.status}&#34;)
# Checkpoint if needed
logger.debug(f&#34;Will attempt to checkpoint simulation state after transaction execution.&#34;)
if self.simulation is not None and self.simulation.auto_checkpoint:
logger.debug(&#34;Auto-checkpointing simulation state after transaction execution.&#34;)
self.simulation.checkpoint()
# after all the transaction is done, return the output - the client will never know about all the complexity we&#39;ve
# gone through to get here.
return output
def _save_output_with_simulation_state(self, event_hash, output):
encoded_output = self._encode_function_output(output)
state = self.simulation._encode_simulation_state()
# immediately drop the cached trace suffix, since we are starting a new execution from this point on.
# in the case of parallel transactions, this will drop everything _after_ the current parallel segment
# (which itself occupies one position only, with a dictionary of event hashes and their outputs).
self.simulation._drop_cached_trace_suffix()
# Cache the result and update the current execution trace. If this is a parallel transaction, the
# cache and execution traces will be updated in a different way.
self.simulation._add_to_cache_trace(state, event_hash, encoded_output,
parallel=self.simulation.is_under_parallel_transactions())
self.simulation._add_to_execution_trace(state, event_hash, encoded_output,
parallel=self.simulation.is_under_parallel_transactions())
def _encode_function_output(self, output) -&gt; dict:
&#34;&#34;&#34;
Encodes the given function output.
&#34;&#34;&#34;
# local import to avoid circular dependencies
from tinytroupe.agent import TinyPerson
from tinytroupe.environment import TinyWorld
from tinytroupe.factory.tiny_factory import TinyFactory
# if the output is a supported object, encode it
if output is None:
return None
elif isinstance(output, TinyPerson):
return {&#34;type&#34;: &#34;TinyPersonRef&#34;, &#34;name&#34;: output.name}
elif isinstance(output, TinyWorld):
return {&#34;type&#34;: &#34;TinyWorldRef&#34;, &#34;name&#34;: output.name}
elif isinstance(output, TinyFactory):
return {&#34;type&#34;: &#34;TinyFactoryRef&#34;, &#34;name&#34;: output.name}
elif isinstance(output, list):
encoded_list = []
for item in output:
if isinstance(item, TinyPerson):
encoded_list.append({&#34;type&#34;: &#34;TinyPersonRef&#34;, &#34;name&#34;: item.name})
elif isinstance(item, TinyWorld):
encoded_list.append({&#34;type&#34;: &#34;TinyWorldRef&#34;, &#34;name&#34;: item.name})
elif isinstance(item, TinyFactory):
encoded_list.append({&#34;type&#34;: &#34;TinyFactoryRef&#34;, &#34;name&#34;: item.name})
else:
encoded_list.append({&#34;type&#34;: &#34;JSON&#34;, &#34;value&#34;: item})
return {&#34;type&#34;: &#34;List&#34;, &#34;value&#34;: encoded_list}
elif isinstance(output, (int, float, str, bool, dict, tuple)):
return {&#34;type&#34;: &#34;JSON&#34;, &#34;value&#34;: output}
else:
raise ValueError(f&#34;Unsupported output type: {type(output)}&#34;)
def _decode_function_output(self, encoded_output: dict):
&#34;&#34;&#34;
Decodes the given encoded function output.
&#34;&#34;&#34;
# local import to avoid circular dependencies
from tinytroupe.agent import TinyPerson
from tinytroupe.environment import TinyWorld
from tinytroupe.factory.tiny_factory import TinyFactory
if encoded_output is None:
return None
elif encoded_output[&#34;type&#34;] == &#34;TinyPersonRef&#34;:
return TinyPerson.get_agent_by_name(encoded_output[&#34;name&#34;])
elif encoded_output[&#34;type&#34;] == &#34;TinyWorldRef&#34;:
return TinyWorld.get_environment_by_name(encoded_output[&#34;name&#34;])
elif encoded_output[&#34;type&#34;] == &#34;TinyFactoryRef&#34;:
return TinyFactory.get_factory_by_name(encoded_output[&#34;name&#34;])
elif encoded_output[&#34;type&#34;] == &#34;List&#34;:
decoded_list = []
for item in encoded_output[&#34;value&#34;]:
if item[&#34;type&#34;] == &#34;TinyPersonRef&#34;:
decoded_list.append(TinyPerson.get_agent_by_name(item[&#34;name&#34;]))
elif item[&#34;type&#34;] == &#34;TinyWorldRef&#34;:
decoded_list.append(TinyWorld.get_environment_by_name(item[&#34;name&#34;]))
elif item[&#34;type&#34;] == &#34;TinyFactoryRef&#34;:
decoded_list.append(TinyFactory.get_factory_by_name(item[&#34;name&#34;]))
else:
decoded_list.append(item[&#34;value&#34;])
return decoded_list
elif encoded_output[&#34;type&#34;] == &#34;JSON&#34;:
return encoded_output[&#34;value&#34;]
else:
raise ValueError(f&#34;Unsupported output type: {encoded_output[&#39;type&#39;]}&#34;)
def transactional(parallel=False):
&#34;&#34;&#34;
A helper decorator that makes a function simulation-transactional.
&#34;&#34;&#34;
def decorator(func):
def wrapper(*args, **kwargs):
obj_under_transaction = args[0]
simulation = current_simulation()
obj_sim_id = obj_under_transaction.simulation_id if hasattr(obj_under_transaction, &#39;simulation_id&#39;) else None
logger.debug(f&#34;-----------------------------------------&gt; Transaction: {func.__name__} with args {args[1:]} and kwargs {kwargs} under simulation {obj_sim_id}, parallel={parallel}.&#34;)
parallel_id = str(threading.current_thread())
transaction = Transaction(obj_under_transaction, simulation, func, *args, **kwargs)
result = transaction.execute(begin_parallel=parallel, parallel_id=parallel_id)
return result
return wrapper
return decorator
class SkipTransaction(Exception):
pass
class CacheOutOfSync(Exception):
&#34;&#34;&#34;
Raised when a cached and the corresponding freshly executed elements are out of sync.
&#34;&#34;&#34;
pass
class ExecutionCached(Exception):
&#34;&#34;&#34;
Raised when a proposed execution is already cached.
&#34;&#34;&#34;
pass
###################################################################################################
# Convenience functions
###################################################################################################
def reset():
&#34;&#34;&#34;
Resets the entire simulation control state.
&#34;&#34;&#34;
global _current_simulations, _current_simulation_id
_current_simulations = {&#34;default&#34;: None}
# TODO Currently, only one simulation can be started at a time. In future versions, this should be
# changed to allow multiple simulations to be started at the same time, e.g., for fast
# analyses through parallelization.
_current_simulation_id = None
def _simulation(id=&#34;default&#34;):
global _current_simulations
if _current_simulations[id] is None:
_current_simulations[id] = Simulation()
return _current_simulations[id]
def begin(cache_path=None, id=&#34;default&#34;, auto_checkpoint=False):
&#34;&#34;&#34;
Marks the start of the simulation being controlled.
&#34;&#34;&#34;
global _current_simulation_id
if _current_simulation_id is None:
_simulation(id).begin(cache_path, auto_checkpoint)
_current_simulation_id = id
else:
raise ValueError(f&#34;Simulation is already started under id {_current_simulation_id}. Currently only one simulation can be started at a time.&#34;)
def end(id=&#34;default&#34;):
&#34;&#34;&#34;
Marks the end of the simulation being controlled.
&#34;&#34;&#34;
global _current_simulation_id
_simulation(id).end()
_current_simulation_id = None
def checkpoint(id=&#34;default&#34;):
&#34;&#34;&#34;
Saves current simulation state.
&#34;&#34;&#34;
_simulation(id).checkpoint()
def current_simulation():
&#34;&#34;&#34;
Returns the current simulation.
&#34;&#34;&#34;
global _current_simulation_id
if _current_simulation_id is not None:
return _simulation(_current_simulation_id)
else:
return None
def cache_hits(id=&#34;default&#34;):
&#34;&#34;&#34;
Returns the number of cache hits.
&#34;&#34;&#34;
return _simulation(id).cache_hits
def cache_misses(id=&#34;default&#34;):
&#34;&#34;&#34;
Returns the number of cache misses.
&#34;&#34;&#34;
return _simulation(id).cache_misses
reset() # initialize the control state</code></pre>
</details>
</section>
<section>
</section>
<section>
</section>
<section>
<h2 class="section-title" id="header-functions">Functions</h2>
<dl>
<dt id="tinytroupe.control.begin"><code class="name flex">
<span>def <span class="ident">begin</span></span>(<span>cache_path=None, id='default', auto_checkpoint=False)</span>
</code></dt>
<dd>
<div class="desc"><p>Marks the start of the simulation being controlled.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def begin(cache_path=None, id=&#34;default&#34;, auto_checkpoint=False):
&#34;&#34;&#34;
Marks the start of the simulation being controlled.
&#34;&#34;&#34;
global _current_simulation_id
if _current_simulation_id is None:
_simulation(id).begin(cache_path, auto_checkpoint)
_current_simulation_id = id
else:
raise ValueError(f&#34;Simulation is already started under id {_current_simulation_id}. Currently only one simulation can be started at a time.&#34;) </code></pre>
</details>
</dd>
<dt id="tinytroupe.control.cache_hits"><code class="name flex">
<span>def <span class="ident">cache_hits</span></span>(<span>id='default')</span>
</code></dt>
<dd>
<div class="desc"><p>Returns the number of cache hits.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def cache_hits(id=&#34;default&#34;):
&#34;&#34;&#34;
Returns the number of cache hits.
&#34;&#34;&#34;
return _simulation(id).cache_hits</code></pre>
</details>
</dd>
<dt id="tinytroupe.control.cache_misses"><code class="name flex">
<span>def <span class="ident">cache_misses</span></span>(<span>id='default')</span>
</code></dt>
<dd>
<div class="desc"><p>Returns the number of cache misses.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def cache_misses(id=&#34;default&#34;):
&#34;&#34;&#34;
Returns the number of cache misses.
&#34;&#34;&#34;
return _simulation(id).cache_misses</code></pre>
</details>
</dd>
<dt id="tinytroupe.control.checkpoint"><code class="name flex">
<span>def <span class="ident">checkpoint</span></span>(<span>id='default')</span>
</code></dt>
<dd>
<div class="desc"><p>Saves current simulation state.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def checkpoint(id=&#34;default&#34;):
&#34;&#34;&#34;
Saves current simulation state.
&#34;&#34;&#34;
_simulation(id).checkpoint()</code></pre>
</details>
</dd>
<dt id="tinytroupe.control.current_simulation"><code class="name flex">
<span>def <span class="ident">current_simulation</span></span>(<span>)</span>
</code></dt>
<dd>
<div class="desc"><p>Returns the current simulation.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def current_simulation():
&#34;&#34;&#34;
Returns the current simulation.
&#34;&#34;&#34;
global _current_simulation_id
if _current_simulation_id is not None:
return _simulation(_current_simulation_id)
else:
return None</code></pre>
</details>
</dd>
<dt id="tinytroupe.control.end"><code class="name flex">
<span>def <span class="ident">end</span></span>(<span>id='default')</span>
</code></dt>
<dd>
<div class="desc"><p>Marks the end of the simulation being controlled.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def end(id=&#34;default&#34;):
&#34;&#34;&#34;
Marks the end of the simulation being controlled.
&#34;&#34;&#34;
global _current_simulation_id
_simulation(id).end()
_current_simulation_id = None</code></pre>
</details>
</dd>
<dt id="tinytroupe.control.reset"><code class="name flex">
<span>def <span class="ident">reset</span></span>(<span>)</span>
</code></dt>
<dd>
<div class="desc"><p>Resets the entire simulation control state.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def reset():
&#34;&#34;&#34;
Resets the entire simulation control state.
&#34;&#34;&#34;
global _current_simulations, _current_simulation_id
_current_simulations = {&#34;default&#34;: None}
# TODO Currently, only one simulation can be started at a time. In future versions, this should be
# changed to allow multiple simulations to be started at the same time, e.g., for fast
# analyses through parallelization.
_current_simulation_id = None</code></pre>
</details>
</dd>
<dt id="tinytroupe.control.transactional"><code class="name flex">
<span>def <span class="ident">transactional</span></span>(<span>parallel=False)</span>
</code></dt>
<dd>
<div class="desc"><p>A helper decorator that makes a function simulation-transactional.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def transactional(parallel=False):
&#34;&#34;&#34;
A helper decorator that makes a function simulation-transactional.
&#34;&#34;&#34;
def decorator(func):
def wrapper(*args, **kwargs):
obj_under_transaction = args[0]
simulation = current_simulation()
obj_sim_id = obj_under_transaction.simulation_id if hasattr(obj_under_transaction, &#39;simulation_id&#39;) else None
logger.debug(f&#34;-----------------------------------------&gt; Transaction: {func.__name__} with args {args[1:]} and kwargs {kwargs} under simulation {obj_sim_id}, parallel={parallel}.&#34;)
parallel_id = str(threading.current_thread())
transaction = Transaction(obj_under_transaction, simulation, func, *args, **kwargs)
result = transaction.execute(begin_parallel=parallel, parallel_id=parallel_id)
return result
return wrapper
return decorator</code></pre>
</details>
</dd>
</dl>
</section>
<section>
<h2 class="section-title" id="header-classes">Classes</h2>
<dl>
<dt id="tinytroupe.control.CacheOutOfSync"><code class="flex name class">
<span>class <span class="ident">CacheOutOfSync</span></span>
<span>(</span><span>*args, **kwargs)</span>
</code></dt>
<dd>
<div class="desc"><p>Raised when a cached and the corresponding freshly executed elements are out of sync.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">class CacheOutOfSync(Exception):
&#34;&#34;&#34;
Raised when a cached and the corresponding freshly executed elements are out of sync.
&#34;&#34;&#34;
pass</code></pre>
</details>
<h3>Ancestors</h3>
<ul class="hlist">
<li>builtins.Exception</li>
<li>builtins.BaseException</li>
</ul>
</dd>
<dt id="tinytroupe.control.ExecutionCached"><code class="flex name class">
<span>class <span class="ident">ExecutionCached</span></span>
<span>(</span><span>*args, **kwargs)</span>
</code></dt>
<dd>
<div class="desc"><p>Raised when a proposed execution is already cached.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">class ExecutionCached(Exception):
&#34;&#34;&#34;
Raised when a proposed execution is already cached.
&#34;&#34;&#34;
pass</code></pre>
</details>
<h3>Ancestors</h3>
<ul class="hlist">
<li>builtins.Exception</li>
<li>builtins.BaseException</li>
</ul>
</dd>
<dt id="tinytroupe.control.Simulation"><code class="flex name class">
<span>class <span class="ident">Simulation</span></span>
<span>(</span><span>id='default', cached_trace: list = None)</span>
</code></dt>
<dd>
<div class="desc"></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">class Simulation:
STATUS_STOPPED = &#34;stopped&#34;
STATUS_STARTED = &#34;started&#34;
def __init__(self, id=&#34;default&#34;, cached_trace:list=None):
self.id = id
self.agents = []
self.name_to_agent = {} # {agent_name: agent, ...}
self.environments = []
self.factories = [] # e.g., TinyPersonFactory instances
self.name_to_factory = {} # {factory_name: factory, ...}
self.name_to_environment = {} # {environment_name: environment, ...}
self.status = Simulation.STATUS_STOPPED
self.cache_path = f&#34;./tinytroupe-{id}.cache.json&#34; # default cache path
# should we always automatically checkpoint at the every transaction?
self.auto_checkpoint = False
# whether there are changes not yet saved to the cache file
self.has_unsaved_cache_changes = False
# whether the agent is under a transaction or not, used for managing
# simulation caching later
self._under_transaction = {None: False}
# whether the agent is under a parallel transactions segment or not, used for managing
# simulation caching later
self._under_parallel_transactions = False
# Cache chain mechanism.
#
# stores a list of simulation states.
# Each state is a tuple (prev_node_hash, event_hash, event_output, state), where prev_node_hash is a hash of the previous node in this chain,
# if any, event_hash is a hash of the event that triggered the transition to this state, if any, event_output is the output of the event,
# if any, and state is the actual complete state that resulted.
if cached_trace is None:
self.cached_trace = []
else:
self.cached_trace = cached_trace
self.cache_misses = 0
self.cache_hits = 0
# Execution chain mechanism.
#
# The actual, current, execution trace. Each state is a tuple (prev_node_hash, event_hash, state), where prev_node_hash is a hash
# of the previous node in this chain, if any, event_hash is a hash of the event that triggered the transition to this state, if any,
# event_output is the output of the event, if any, and state is the actual complete state that resulted.
self.execution_trace = []
def begin(self, cache_path:str=None, auto_checkpoint:bool=False):
&#34;&#34;&#34;
Marks the start of the simulation being controlled.
Args:
cache_path (str): The path to the cache file. If not specified,
defaults to the default cache path defined in the class.
auto_checkpoint (bool, optional): Whether to automatically checkpoint at the end of each transaction. Defaults to False.
&#34;&#34;&#34;
logger.debug(f&#34;Starting simulation, cache_path={cache_path}, auto_checkpoint={auto_checkpoint}.&#34;)
# local import to avoid circular dependencies
from tinytroupe.agent import TinyPerson
from tinytroupe.environment import TinyWorld
from tinytroupe.factory.tiny_factory import TinyFactory
from tinytroupe.factory.tiny_person_factory import TinyPersonFactory
if self.status == Simulation.STATUS_STOPPED:
self.status = Simulation.STATUS_STARTED
else:
raise ValueError(&#34;Simulation is already started.&#34;)
if cache_path is not None:
self.cache_path = cache_path
# should we automatically checkpoint?
self.auto_checkpoint = auto_checkpoint
# clear the agents, environments and other simulated entities, we&#39;ll track them from now on
TinyPerson.clear_agents()
TinyWorld.clear_environments()
TinyFactory.clear_factories()
TinyPersonFactory.clear_factories()
# All automated fresh ids will start from 0 again for this simulation
utils.reset_fresh_id()
# load the cache file, if any
if self.cache_path is not None:
self._load_cache_file(self.cache_path)
def end(self):
&#34;&#34;&#34;
Marks the end of the simulation being controlled.
&#34;&#34;&#34;
logger.debug(&#34;Ending simulation.&#34;)
if self.status == Simulation.STATUS_STARTED:
self.status = Simulation.STATUS_STOPPED
self.checkpoint()
else:
raise ValueError(&#34;Simulation is already stopped.&#34;)
def checkpoint(self):
&#34;&#34;&#34;
Saves current simulation trace to a file.
&#34;&#34;&#34;
logger.debug(&#34;Checkpointing simulation state...&#34;)
# save the cache file
if self.has_unsaved_cache_changes:
self._save_cache_file(self.cache_path)
else:
logger.debug(&#34;No unsaved cache changes to save to file.&#34;)
def add_agent(self, agent):
&#34;&#34;&#34;
Adds an agent to the simulation.
&#34;&#34;&#34;
if agent.name in self.name_to_agent:
raise ValueError(f&#34;Agent names must be unique, but &#39;{agent.name}&#39; is already defined.&#34;)
agent.simulation_id = self.id
self.agents.append(agent)
self.name_to_agent[agent.name] = agent
def add_environment(self, environment):
&#34;&#34;&#34;
Adds an environment to the simulation.
&#34;&#34;&#34;
if environment.name in self.name_to_environment:
raise ValueError(f&#34;Environment names must be unique, but &#39;{environment.name}&#39; is already defined.&#34;)
environment.simulation_id = self.id
self.environments.append(environment)
self.name_to_environment[environment.name] = environment
def add_factory(self, factory):
&#34;&#34;&#34;
Adds a factory to the simulation.
&#34;&#34;&#34;
if factory.name in self.name_to_factory:
raise ValueError(f&#34;Factory names must be unique, but &#39;{factory.name}&#39; is already defined.&#34;)
factory.simulation_id = self.id
self.factories.append(factory)
self.name_to_factory[factory.name] = factory
###################################################################################################
# Cache and execution chain mechanisms
###################################################################################################
def _execution_trace_position(self) -&gt; int:
&#34;&#34;&#34;
Returns the current position in the execution trace, or -1 if the execution trace is empty.
&#34;&#34;&#34;
return len(self.execution_trace) - 1
def _function_call_hash(self, function_name, *args, **kwargs) -&gt; int:
&#34;&#34;&#34;
Computes the hash of the given function call.
&#34;&#34;&#34;
# if functions are passed as arguments to the function, there&#39;s the problem that their
# string representation always changes due to memory position (e.g., &lt;function my_function at 0x7f8d1a7b7d30&gt;).
# so we need to remove the changing suffix in those cases, while preserving the function name if it exists.
# positional arguments
# covnerts to a list of string representations first
args_str = list(map(str, args))
for i, arg in enumerate(args):
if callable(arg):
args_str[i] = arg.__name__
# keyword arguments
# converts to a list of string representations first
kwargs_str = {k: str(v) for k, v in kwargs.items()}
for k, v in kwargs.items():
if callable(v):
kwargs_str[k] = v.__name__
# then, convert to a single string, to obtain a unique hash
event = str((function_name, args_str, kwargs_str))
# TODO actually compute a short hash of the event string, e.g., using SHA256 ?
# event_hash = utils.custom_hash(event)
return event
def _skip_execution_with_cache(self):
&#34;&#34;&#34;
Skips the current execution, assuming there&#39;s a cached state at the same position.
&#34;&#34;&#34;
assert len(self.cached_trace) &gt; self._execution_trace_position() + 1, &#34;There&#39;s no cached state at the current execution position.&#34;
self.execution_trace.append(self.cached_trace[self._execution_trace_position() + 1])
def _is_transaction_event_cached(self, event_hash, parallel=False) -&gt; bool:
&#34;&#34;&#34;
Checks whether the given event hash matches the corresponding cached one, if any.
If there&#39;s no corresponding cached state, returns True.
&#34;&#34;&#34;
if not parallel:
# there&#39;s cache that could be used
if len(self.cached_trace) &gt; self._execution_trace_position() + 1:
if self._execution_trace_position() &gt;= -1:
# here&#39;s a graphical depiction of the logic:
#
# Cache: c0:(c_prev_node_hash_0, c_event_hash_0, _, c_state_0) ------------------&gt; c1:(c_prev_node_hash_1, c_event_hash_1, _, c_state_1) -&gt; ...
# Execution: e0:(e_prev_node_hash_0, e_event_hash_0, _, e_state_0) -&lt;being computed&gt;-&gt; e1:(e_prev_node_hash_1, &lt;being computed&gt;, &lt;being computed&gt;, &lt;being computed&gt;)
# position = 0 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
#
# Must satisfy:
# - event_hash == c_event_hash_1
# - hash(e0) == c_prev_node_hash_1
try:
event_hash_match = event_hash == self.cached_trace[self._execution_trace_position() + 1][1]
except Exception as e:
logger.error(f&#34;Error while checking event hash match: {e}&#34;)
event_hash_match = False
prev_node_match = True # TODO implement real check
return event_hash_match and prev_node_match
else:
raise ValueError(&#34;Execution trace position is invalid, must be &gt;= -1, but is &#34;, self._execution_trace_position())
else: # no cache to use
return False
else: # parallel
if len(self.cached_trace) &gt;= self._execution_trace_position():
if self._execution_trace_position() &gt;= 0:
# parallel stores ignore order, so we need to check instead whether the event hash is a key in the parallel store,
# regardless of the order of the events generated the data therein.
if isinstance(self.cached_trace[self._execution_trace_position()], dict):
event_hash_match = event_hash in self.cached_trace[self._execution_trace_position()].keys()
else:
event_hash_match = False
prev_node_match = True # TODO implement real check
return event_hash_match and prev_node_match
else:
raise ValueError(&#34;Execution trace position is invalid, must be &gt;= 0, but is &#34;, self._execution_trace_position())
def _get_cached_parallel_value(self, event_hash, key):
parallel_store = self.cached_trace[self._execution_trace_position()]
value = parallel_store[event_hash][key]
return value
def _drop_cached_trace_suffix(self):
&#34;&#34;&#34;
Drops the cached trace suffix starting at the current execution trace position. This effectively
refreshes the cache to the current execution state and starts building a new cache from there.
&#34;&#34;&#34;
self.cached_trace = self.cached_trace[:self._execution_trace_position()+1]
def _add_to_execution_trace(self, state: dict, event_hash: int, event_output, parallel=False):
&#34;&#34;&#34;
Adds a state to the execution_trace list and computes the appropriate hash.
The computed hash is compared to the hash of the cached trace at the same position,
and if they don&#39;t match, the execution is aborted. Similarly, the event_hash is compared
to the hash of the event in the cached trace at the same position, and if they don&#39;t match, the execution
is aborted.
&#34;&#34;&#34;
# Compute the hash of the previous execution pair, if any
previous_hash = None
if not parallel:
# Create a tuple of (hash, state) and append it to the execution_trace list
self.execution_trace.append((previous_hash, event_hash, event_output, state))
else:
with concurrent_execution_lock:
# state is not stored in parallel segments, only outputs
self.execution_trace[-1][event_hash] = {&#34;prev_node_hash&#34;: previous_hash,
&#34;encoded_output&#34;: event_output}
def _add_to_cache_trace(self, state: dict, event_hash: int, event_output, parallel=False):
&#34;&#34;&#34;
Adds a state to the cached_trace list and computes the appropriate hash.
&#34;&#34;&#34;
# Compute the hash of the previous cached pair, if any
previous_hash = None
if self.cached_trace:
previous_hash = utils.custom_hash(self.cached_trace[-1])
if not parallel:
# Create a tuple of (hash, state) and append it to the cached_trace list
self.cached_trace.append((previous_hash, event_hash, event_output, state))
else:
with concurrent_execution_lock:
# state is not stored in parallel segments, only outputs
self.cached_trace[-1][event_hash] = {&#34;prev_node_hash&#34;: previous_hash,
&#34;encoded_output&#34;: event_output}
self.has_unsaved_cache_changes = True
def _load_cache_file(self, cache_path:str):
&#34;&#34;&#34;
Loads the cache file from the given path.
&#34;&#34;&#34;
try:
self.cached_trace = json.load(open(cache_path, &#34;r&#34;))
except FileNotFoundError:
logger.info(f&#34;Cache file not found on path: {cache_path}.&#34;)
self.cached_trace = []
def _save_cache_file(self, cache_path:str):
&#34;&#34;&#34;
Saves the cache file to the given path. Always overwrites.
&#34;&#34;&#34;
logger.debug(f&#34;Now saving cache file to {cache_path}.&#34;)
try:
# Create a temporary file
with tempfile.NamedTemporaryFile(&#39;w&#39;, delete=False) as temp:
json.dump(self.cached_trace, temp, indent=4)
# Replace the original file with the temporary file
os.replace(temp.name, cache_path)
except Exception as e:
traceback_string = &#39;&#39;.join(traceback.format_tb(e.__traceback__))
logger.error(f&#34;An error occurred while saving the cache file: {e}\nTraceback:\n{traceback_string}&#34;)
self.has_unsaved_cache_changes = False
###################################################################################################
# Transactional control
###################################################################################################
#
# Regular sequential transactions
#
def begin_transaction(self, id=None):
&#34;&#34;&#34;
Starts a transaction.
&#34;&#34;&#34;
with concurrent_execution_lock:
self._under_transaction[id] = True
self._clear_communications_buffers() # TODO &lt;----------------------------------------------------------------
def end_transaction(self, id=None):
&#34;&#34;&#34;
Ends a transaction.
&#34;&#34;&#34;
with concurrent_execution_lock:
self._under_transaction[id] = False
def is_under_transaction(self, id=None):
&#34;&#34;&#34;
Checks if the agent is under a transaction.
&#34;&#34;&#34;
with concurrent_execution_lock:
return self._under_transaction.get(id, False)
def _clear_communications_buffers(self):
&#34;&#34;&#34;
Cleans the communications buffers of all agents and environments.
&#34;&#34;&#34;
for agent in self.agents:
agent.clear_communications_buffer()
for environment in self.environments:
environment.clear_communications_buffer()
#
# Parallel transactions
#
def begin_parallel_transactions(self):
&#34;&#34;&#34;
Starts parallel transactions.
&#34;&#34;&#34;
with concurrent_execution_lock:
self._under_parallel_transactions = True
# add a new parallel segment to the execution and cache traces
self.execution_trace.append({})
self.cached_trace.append({})
def end_parallel_transactions(self):
&#34;&#34;&#34;
Ends parallel transactions.
&#34;&#34;&#34;
self._under_parallel_transactions = False
def is_under_parallel_transactions(self):
&#34;&#34;&#34;
Checks if the agent is under parallel transactions.
&#34;&#34;&#34;
return self._under_parallel_transactions
###################################################################################################
# Simulation state handling
###################################################################################################
def _encode_simulation_state(self) -&gt; dict:
&#34;&#34;&#34;
Encodes the current simulation state, including agents, environments, and other
relevant information.
&#34;&#34;&#34;
state = {}
# Encode agents
state[&#34;agents&#34;] = []
for agent in self.agents:
state[&#34;agents&#34;].append(agent.encode_complete_state())
# Encode environments
state[&#34;environments&#34;] = []
for environment in self.environments:
state[&#34;environments&#34;].append(environment.encode_complete_state())
# Encode factories
state[&#34;factories&#34;] = []
for factory in self.factories:
state[&#34;factories&#34;].append(factory.encode_complete_state())
return state
def _decode_simulation_state(self, state: dict):
&#34;&#34;&#34;
Decodes the given simulation state, including agents, environments, and other
relevant information.
Args:
state (dict): The state to decode.
&#34;&#34;&#34;
# local import to avoid circular dependencies
from tinytroupe.agent import TinyPerson
from tinytroupe.environment import TinyWorld
logger.debug(f&#34;Decoding simulation state: {state[&#39;factories&#39;]}&#34;)
logger.debug(f&#34;Registered factories: {self.name_to_factory}&#34;)
logger.debug(f&#34;Registered agents: {self.name_to_agent}&#34;)
logger.debug(f&#34;Registered environments: {self.name_to_environment}&#34;)
# Decode factories
for factory_state in state[&#34;factories&#34;]:
factory = self.name_to_factory[factory_state[&#34;name&#34;]]
factory.decode_complete_state(factory_state)
# Decode environments
###self.environments = []
for environment_state in state[&#34;environments&#34;]:
try:
environment = self.name_to_environment[environment_state[&#34;name&#34;]]
environment.decode_complete_state(environment_state)
if TinyWorld.communication_display:
environment.pop_and_display_latest_communications()
except Exception as e:
raise ValueError(f&#34;Environment {environment_state[&#39;name&#39;]} is not in the simulation, thus cannot be decoded there.&#34;) from e
# Decode agents (if they were not already decoded by the environment)
####self.agents = []
for agent_state in state[&#34;agents&#34;]:
try:
agent = self.name_to_agent[agent_state[&#34;name&#34;]]
agent.decode_complete_state(agent_state)
# The agent has not yet been decoded because it is not in any environment. So, decode it.
if agent.environment is None:
if TinyPerson.communication_display:
agent.pop_and_display_latest_communications()
except Exception as e:
raise ValueError(f&#34;Agent {agent_state[&#39;name&#39;]} is not in the simulation, thus cannot be decoded there.&#34;) from e </code></pre>
</details>
<h3>Class variables</h3>
<dl>
<dt id="tinytroupe.control.Simulation.STATUS_STARTED"><code class="name">var <span class="ident">STATUS_STARTED</span></code></dt>
<dd>
<div class="desc"></div>
</dd>
<dt id="tinytroupe.control.Simulation.STATUS_STOPPED"><code class="name">var <span class="ident">STATUS_STOPPED</span></code></dt>
<dd>
<div class="desc"></div>
</dd>
</dl>
<h3>Methods</h3>
<dl>
<dt id="tinytroupe.control.Simulation.add_agent"><code class="name flex">
<span>def <span class="ident">add_agent</span></span>(<span>self, agent)</span>
</code></dt>
<dd>
<div class="desc"><p>Adds an agent to the simulation.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def add_agent(self, agent):
&#34;&#34;&#34;
Adds an agent to the simulation.
&#34;&#34;&#34;
if agent.name in self.name_to_agent:
raise ValueError(f&#34;Agent names must be unique, but &#39;{agent.name}&#39; is already defined.&#34;)
agent.simulation_id = self.id
self.agents.append(agent)
self.name_to_agent[agent.name] = agent</code></pre>
</details>
</dd>
<dt id="tinytroupe.control.Simulation.add_environment"><code class="name flex">
<span>def <span class="ident">add_environment</span></span>(<span>self, environment)</span>
</code></dt>
<dd>
<div class="desc"><p>Adds an environment to the simulation.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def add_environment(self, environment):
&#34;&#34;&#34;
Adds an environment to the simulation.
&#34;&#34;&#34;
if environment.name in self.name_to_environment:
raise ValueError(f&#34;Environment names must be unique, but &#39;{environment.name}&#39; is already defined.&#34;)
environment.simulation_id = self.id
self.environments.append(environment)
self.name_to_environment[environment.name] = environment</code></pre>
</details>
</dd>
<dt id="tinytroupe.control.Simulation.add_factory"><code class="name flex">
<span>def <span class="ident">add_factory</span></span>(<span>self, factory)</span>
</code></dt>
<dd>
<div class="desc"><p>Adds a factory to the simulation.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def add_factory(self, factory):
&#34;&#34;&#34;
Adds a factory to the simulation.
&#34;&#34;&#34;
if factory.name in self.name_to_factory:
raise ValueError(f&#34;Factory names must be unique, but &#39;{factory.name}&#39; is already defined.&#34;)
factory.simulation_id = self.id
self.factories.append(factory)
self.name_to_factory[factory.name] = factory</code></pre>
</details>
</dd>
<dt id="tinytroupe.control.Simulation.begin"><code class="name flex">
<span>def <span class="ident">begin</span></span>(<span>self, cache_path: str = None, auto_checkpoint: bool = False)</span>
</code></dt>
<dd>
<div class="desc"><p>Marks the start of the simulation being controlled.</p>
<h2 id="args">Args</h2>
<dl>
<dt><strong><code>cache_path</code></strong> :&ensp;<code>str</code></dt>
<dd>The path to the cache file. If not specified,
defaults to the default cache path defined in the class.</dd>
<dt><strong><code>auto_checkpoint</code></strong> :&ensp;<code>bool</code>, optional</dt>
<dd>Whether to automatically checkpoint at the end of each transaction. Defaults to False.</dd>
</dl></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def begin(self, cache_path:str=None, auto_checkpoint:bool=False):
&#34;&#34;&#34;
Marks the start of the simulation being controlled.
Args:
cache_path (str): The path to the cache file. If not specified,
defaults to the default cache path defined in the class.
auto_checkpoint (bool, optional): Whether to automatically checkpoint at the end of each transaction. Defaults to False.
&#34;&#34;&#34;
logger.debug(f&#34;Starting simulation, cache_path={cache_path}, auto_checkpoint={auto_checkpoint}.&#34;)
# local import to avoid circular dependencies
from tinytroupe.agent import TinyPerson
from tinytroupe.environment import TinyWorld
from tinytroupe.factory.tiny_factory import TinyFactory
from tinytroupe.factory.tiny_person_factory import TinyPersonFactory
if self.status == Simulation.STATUS_STOPPED:
self.status = Simulation.STATUS_STARTED
else:
raise ValueError(&#34;Simulation is already started.&#34;)
if cache_path is not None:
self.cache_path = cache_path
# should we automatically checkpoint?
self.auto_checkpoint = auto_checkpoint
# clear the agents, environments and other simulated entities, we&#39;ll track them from now on
TinyPerson.clear_agents()
TinyWorld.clear_environments()
TinyFactory.clear_factories()
TinyPersonFactory.clear_factories()
# All automated fresh ids will start from 0 again for this simulation
utils.reset_fresh_id()
# load the cache file, if any
if self.cache_path is not None:
self._load_cache_file(self.cache_path)</code></pre>
</details>
</dd>
<dt id="tinytroupe.control.Simulation.begin_parallel_transactions"><code class="name flex">
<span>def <span class="ident">begin_parallel_transactions</span></span>(<span>self)</span>
</code></dt>
<dd>
<div class="desc"><p>Starts parallel transactions.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def begin_parallel_transactions(self):
&#34;&#34;&#34;
Starts parallel transactions.
&#34;&#34;&#34;
with concurrent_execution_lock:
self._under_parallel_transactions = True
# add a new parallel segment to the execution and cache traces
self.execution_trace.append({})
self.cached_trace.append({})</code></pre>
</details>
</dd>
<dt id="tinytroupe.control.Simulation.begin_transaction"><code class="name flex">
<span>def <span class="ident">begin_transaction</span></span>(<span>self, id=None)</span>
</code></dt>
<dd>
<div class="desc"><p>Starts a transaction.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def begin_transaction(self, id=None):
&#34;&#34;&#34;
Starts a transaction.
&#34;&#34;&#34;
with concurrent_execution_lock:
self._under_transaction[id] = True
self._clear_communications_buffers() # TODO &lt;----------------------------------------------------------------</code></pre>
</details>
</dd>
<dt id="tinytroupe.control.Simulation.checkpoint"><code class="name flex">
<span>def <span class="ident">checkpoint</span></span>(<span>self)</span>
</code></dt>
<dd>
<div class="desc"><p>Saves current simulation trace to a file.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def checkpoint(self):
&#34;&#34;&#34;
Saves current simulation trace to a file.
&#34;&#34;&#34;
logger.debug(&#34;Checkpointing simulation state...&#34;)
# save the cache file
if self.has_unsaved_cache_changes:
self._save_cache_file(self.cache_path)
else:
logger.debug(&#34;No unsaved cache changes to save to file.&#34;)</code></pre>
</details>
</dd>
<dt id="tinytroupe.control.Simulation.end"><code class="name flex">
<span>def <span class="ident">end</span></span>(<span>self)</span>
</code></dt>
<dd>
<div class="desc"><p>Marks the end of the simulation being controlled.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def end(self):
&#34;&#34;&#34;
Marks the end of the simulation being controlled.
&#34;&#34;&#34;
logger.debug(&#34;Ending simulation.&#34;)
if self.status == Simulation.STATUS_STARTED:
self.status = Simulation.STATUS_STOPPED
self.checkpoint()
else:
raise ValueError(&#34;Simulation is already stopped.&#34;)</code></pre>
</details>
</dd>
<dt id="tinytroupe.control.Simulation.end_parallel_transactions"><code class="name flex">
<span>def <span class="ident">end_parallel_transactions</span></span>(<span>self)</span>
</code></dt>
<dd>
<div class="desc"><p>Ends parallel transactions.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def end_parallel_transactions(self):
&#34;&#34;&#34;
Ends parallel transactions.
&#34;&#34;&#34;
self._under_parallel_transactions = False</code></pre>
</details>
</dd>
<dt id="tinytroupe.control.Simulation.end_transaction"><code class="name flex">
<span>def <span class="ident">end_transaction</span></span>(<span>self, id=None)</span>
</code></dt>
<dd>
<div class="desc"><p>Ends a transaction.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def end_transaction(self, id=None):
&#34;&#34;&#34;
Ends a transaction.
&#34;&#34;&#34;
with concurrent_execution_lock:
self._under_transaction[id] = False</code></pre>
</details>
</dd>
<dt id="tinytroupe.control.Simulation.is_under_parallel_transactions"><code class="name flex">
<span>def <span class="ident">is_under_parallel_transactions</span></span>(<span>self)</span>
</code></dt>
<dd>
<div class="desc"><p>Checks if the agent is under parallel transactions.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def is_under_parallel_transactions(self):
&#34;&#34;&#34;
Checks if the agent is under parallel transactions.
&#34;&#34;&#34;
return self._under_parallel_transactions</code></pre>
</details>
</dd>
<dt id="tinytroupe.control.Simulation.is_under_transaction"><code class="name flex">
<span>def <span class="ident">is_under_transaction</span></span>(<span>self, id=None)</span>
</code></dt>
<dd>
<div class="desc"><p>Checks if the agent is under a transaction.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def is_under_transaction(self, id=None):
&#34;&#34;&#34;
Checks if the agent is under a transaction.
&#34;&#34;&#34;
with concurrent_execution_lock:
return self._under_transaction.get(id, False)</code></pre>
</details>
</dd>
</dl>
</dd>
<dt id="tinytroupe.control.SkipTransaction"><code class="flex name class">
<span>class <span class="ident">SkipTransaction</span></span>
<span>(</span><span>*args, **kwargs)</span>
</code></dt>
<dd>
<div class="desc"><p>Common base class for all non-exit exceptions.</p></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">class SkipTransaction(Exception):
pass</code></pre>
</details>
<h3>Ancestors</h3>
<ul class="hlist">
<li>builtins.Exception</li>
<li>builtins.BaseException</li>
</ul>
</dd>
<dt id="tinytroupe.control.Transaction"><code class="flex name class">
<span>class <span class="ident">Transaction</span></span>
<span>(</span><span>obj_under_transaction, simulation, function, *args, **kwargs)</span>
</code></dt>
<dd>
<div class="desc"></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">class Transaction:
def __init__(self, obj_under_transaction, simulation, function, *args, **kwargs):
# local import to avoid circular dependencies
from tinytroupe.agent import TinyPerson
from tinytroupe.environment import TinyWorld
from tinytroupe.factory.tiny_factory import TinyFactory
self.obj_under_transaction = obj_under_transaction
self.simulation = simulation
self.function_name = function.__name__
self.function = function
self.args = args
self.kwargs = kwargs
#
# If we have an ongoing simulation, set the simulation id of the object under transaction if it is not already set.
#
if simulation is not None:
if hasattr(obj_under_transaction, &#39;simulation_id&#39;) and obj_under_transaction.simulation_id is not None:
if obj_under_transaction.simulation_id != simulation.id:
raise ValueError(f&#34;Object {obj_under_transaction} is already captured by a different simulation (id={obj_under_transaction.simulation_id}), \
and cannot be captured by simulation id={simulation.id}.&#34;)
logger.debug(f&#34;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt; Object {obj_under_transaction} is already captured by simulation {simulation.id}.&#34;)
else:
# if is a TinyPerson, add the agent to the simulation
if isinstance(obj_under_transaction, TinyPerson):
simulation.add_agent(obj_under_transaction)
logger.debug(f&#34;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt; Added agent {obj_under_transaction} to simulation {simulation.id}.&#34;)
# if is a TinyWorld, add the environment to the simulation
elif isinstance(obj_under_transaction, TinyWorld):
simulation.add_environment(obj_under_transaction)
# if is a TinyFactory, add the factory to the simulation
elif isinstance(obj_under_transaction, TinyFactory):
simulation.add_factory(obj_under_transaction)
logger.debug(f&#34;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt; Added factory {obj_under_transaction} to simulation {simulation.id}.&#34;)
else:
raise ValueError(f&#34;Object {obj_under_transaction} (type = {type(obj_under_transaction)}) is not a TinyPerson or TinyWorld instance, and cannot be captured by the simulation.&#34;)
def execute(self, begin_parallel=False, parallel_id=None):
output = None
# Transaction caching will only operate if there is a simulation and it is started
if self.simulation is None or self.simulation.status == Simulation.STATUS_STOPPED:
# Compute the function and return it, no caching, since the simulation is not started
output = self.function(*self.args, **self.kwargs)
elif self.simulation.status == Simulation.STATUS_STARTED:
# Compute the event hash
event_hash = self.simulation._function_call_hash(self.function_name, *self.args, **self.kwargs)
# Sequential and parallel transactions are handled in different ways
if begin_parallel:
self.simulation.begin_parallel_transactions()
# CACHED? Check if the event hash is in the cache
if self.simulation._is_transaction_event_cached(event_hash,
parallel=self.simulation.is_under_parallel_transactions()):
self.simulation.cache_hits += 1
# Restore the full state and return the cached output
logger.info(f&#34;Skipping execution of {self.function_name} with args {self.args} and kwargs {self.kwargs} because it is already cached.&#34;)
# SEQUENTIAL
if not self.simulation.is_under_parallel_transactions():
self.simulation._skip_execution_with_cache()
state = self.simulation.cached_trace[self.simulation._execution_trace_position()][3] # state
self.simulation._decode_simulation_state(state)
# Output encoding/decoding is used to preserve references to TinyPerson and TinyWorld instances
# mainly. Scalar values (int, float, str, bool) and composite values (list, dict) are
# encoded/decoded as is.
encoded_output = self.simulation.cached_trace[self.simulation._execution_trace_position()][2] # output
output = self._decode_function_output(encoded_output)
# PARALLEL
else: # is under parallel transactions
# in parallel segments, state is not restored, only outputs
encoded_output = self.simulation._get_cached_parallel_value(event_hash, &#34;encoded_output&#34;)
output = self._decode_function_output(encoded_output)
else: # not cached
if not begin_parallel:
# in case of beginning a parallel segment, we don&#39;t want to count it as a cache miss,
# since the segment itself will not be cached, but rather the events within it.
self.simulation.cache_misses += 1
if not self.simulation.is_under_transaction(id=parallel_id) and not begin_parallel:
# BEGIN SEQUENTIAL TRANSACTION ###############################################################
#
# if this is the beginning of a parallel segment, we don&#39;t need to begin a transaction, since
# we want to allow additional transactions within the parallel segment (i.e., one-level reentrancy).
if not begin_parallel:
self.simulation.begin_transaction(id=parallel_id)
# Compute the function and encode the relevant output and simulation state
output = self.function(*self.args, **self.kwargs)
self._save_output_with_simulation_state(event_hash, output)
# END TRANSACTION #################################################################
if not begin_parallel:
self.simulation.end_transaction(id=parallel_id)
else: # already under transaction (thus, now a reentrant transaction) OR beginning a parallel segment
# NOTES:
#
# - Reentrant sequential transactions are not cached, since what matters is the final result of
# the top-level transaction.
#
# - The event that starts the parallel transactions segment WILL NOT itself be cached, since
# it is not part of the parallel segment, but rather the beginning of it. This event will be
# reconstructed during runtime from the parallel events within the segment.
output = self.function(*self.args, **self.kwargs)
if begin_parallel:
self.simulation.end_parallel_transactions()
# execute an ad-hoc Transaction to save the simulation state AFTER the parallel segment is done.
Transaction(self.obj_under_transaction, self.simulation, lambda: True).execute(begin_parallel=False, parallel_id=parallel_id)
else:
raise ValueError(f&#34;Simulation status is invalid at this point: {self.simulation.status}&#34;)
# Checkpoint if needed
logger.debug(f&#34;Will attempt to checkpoint simulation state after transaction execution.&#34;)
if self.simulation is not None and self.simulation.auto_checkpoint:
logger.debug(&#34;Auto-checkpointing simulation state after transaction execution.&#34;)
self.simulation.checkpoint()
# after all the transaction is done, return the output - the client will never know about all the complexity we&#39;ve
# gone through to get here.
return output
def _save_output_with_simulation_state(self, event_hash, output):
encoded_output = self._encode_function_output(output)
state = self.simulation._encode_simulation_state()
# immediately drop the cached trace suffix, since we are starting a new execution from this point on.
# in the case of parallel transactions, this will drop everything _after_ the current parallel segment
# (which itself occupies one position only, with a dictionary of event hashes and their outputs).
self.simulation._drop_cached_trace_suffix()
# Cache the result and update the current execution trace. If this is a parallel transaction, the
# cache and execution traces will be updated in a different way.
self.simulation._add_to_cache_trace(state, event_hash, encoded_output,
parallel=self.simulation.is_under_parallel_transactions())
self.simulation._add_to_execution_trace(state, event_hash, encoded_output,
parallel=self.simulation.is_under_parallel_transactions())
def _encode_function_output(self, output) -&gt; dict:
&#34;&#34;&#34;
Encodes the given function output.
&#34;&#34;&#34;
# local import to avoid circular dependencies
from tinytroupe.agent import TinyPerson
from tinytroupe.environment import TinyWorld
from tinytroupe.factory.tiny_factory import TinyFactory
# if the output is a supported object, encode it
if output is None:
return None
elif isinstance(output, TinyPerson):
return {&#34;type&#34;: &#34;TinyPersonRef&#34;, &#34;name&#34;: output.name}
elif isinstance(output, TinyWorld):
return {&#34;type&#34;: &#34;TinyWorldRef&#34;, &#34;name&#34;: output.name}
elif isinstance(output, TinyFactory):
return {&#34;type&#34;: &#34;TinyFactoryRef&#34;, &#34;name&#34;: output.name}
elif isinstance(output, list):
encoded_list = []
for item in output:
if isinstance(item, TinyPerson):
encoded_list.append({&#34;type&#34;: &#34;TinyPersonRef&#34;, &#34;name&#34;: item.name})
elif isinstance(item, TinyWorld):
encoded_list.append({&#34;type&#34;: &#34;TinyWorldRef&#34;, &#34;name&#34;: item.name})
elif isinstance(item, TinyFactory):
encoded_list.append({&#34;type&#34;: &#34;TinyFactoryRef&#34;, &#34;name&#34;: item.name})
else:
encoded_list.append({&#34;type&#34;: &#34;JSON&#34;, &#34;value&#34;: item})
return {&#34;type&#34;: &#34;List&#34;, &#34;value&#34;: encoded_list}
elif isinstance(output, (int, float, str, bool, dict, tuple)):
return {&#34;type&#34;: &#34;JSON&#34;, &#34;value&#34;: output}
else:
raise ValueError(f&#34;Unsupported output type: {type(output)}&#34;)
def _decode_function_output(self, encoded_output: dict):
&#34;&#34;&#34;
Decodes the given encoded function output.
&#34;&#34;&#34;
# local import to avoid circular dependencies
from tinytroupe.agent import TinyPerson
from tinytroupe.environment import TinyWorld
from tinytroupe.factory.tiny_factory import TinyFactory
if encoded_output is None:
return None
elif encoded_output[&#34;type&#34;] == &#34;TinyPersonRef&#34;:
return TinyPerson.get_agent_by_name(encoded_output[&#34;name&#34;])
elif encoded_output[&#34;type&#34;] == &#34;TinyWorldRef&#34;:
return TinyWorld.get_environment_by_name(encoded_output[&#34;name&#34;])
elif encoded_output[&#34;type&#34;] == &#34;TinyFactoryRef&#34;:
return TinyFactory.get_factory_by_name(encoded_output[&#34;name&#34;])
elif encoded_output[&#34;type&#34;] == &#34;List&#34;:
decoded_list = []
for item in encoded_output[&#34;value&#34;]:
if item[&#34;type&#34;] == &#34;TinyPersonRef&#34;:
decoded_list.append(TinyPerson.get_agent_by_name(item[&#34;name&#34;]))
elif item[&#34;type&#34;] == &#34;TinyWorldRef&#34;:
decoded_list.append(TinyWorld.get_environment_by_name(item[&#34;name&#34;]))
elif item[&#34;type&#34;] == &#34;TinyFactoryRef&#34;:
decoded_list.append(TinyFactory.get_factory_by_name(item[&#34;name&#34;]))
else:
decoded_list.append(item[&#34;value&#34;])
return decoded_list
elif encoded_output[&#34;type&#34;] == &#34;JSON&#34;:
return encoded_output[&#34;value&#34;]
else:
raise ValueError(f&#34;Unsupported output type: {encoded_output[&#39;type&#39;]}&#34;)</code></pre>
</details>
<h3>Methods</h3>
<dl>
<dt id="tinytroupe.control.Transaction.execute"><code class="name flex">
<span>def <span class="ident">execute</span></span>(<span>self, begin_parallel=False, parallel_id=None)</span>
</code></dt>
<dd>
<div class="desc"></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">def execute(self, begin_parallel=False, parallel_id=None):
output = None
# Transaction caching will only operate if there is a simulation and it is started
if self.simulation is None or self.simulation.status == Simulation.STATUS_STOPPED:
# Compute the function and return it, no caching, since the simulation is not started
output = self.function(*self.args, **self.kwargs)
elif self.simulation.status == Simulation.STATUS_STARTED:
# Compute the event hash
event_hash = self.simulation._function_call_hash(self.function_name, *self.args, **self.kwargs)
# Sequential and parallel transactions are handled in different ways
if begin_parallel:
self.simulation.begin_parallel_transactions()
# CACHED? Check if the event hash is in the cache
if self.simulation._is_transaction_event_cached(event_hash,
parallel=self.simulation.is_under_parallel_transactions()):
self.simulation.cache_hits += 1
# Restore the full state and return the cached output
logger.info(f&#34;Skipping execution of {self.function_name} with args {self.args} and kwargs {self.kwargs} because it is already cached.&#34;)
# SEQUENTIAL
if not self.simulation.is_under_parallel_transactions():
self.simulation._skip_execution_with_cache()
state = self.simulation.cached_trace[self.simulation._execution_trace_position()][3] # state
self.simulation._decode_simulation_state(state)
# Output encoding/decoding is used to preserve references to TinyPerson and TinyWorld instances
# mainly. Scalar values (int, float, str, bool) and composite values (list, dict) are
# encoded/decoded as is.
encoded_output = self.simulation.cached_trace[self.simulation._execution_trace_position()][2] # output
output = self._decode_function_output(encoded_output)
# PARALLEL
else: # is under parallel transactions
# in parallel segments, state is not restored, only outputs
encoded_output = self.simulation._get_cached_parallel_value(event_hash, &#34;encoded_output&#34;)
output = self._decode_function_output(encoded_output)
else: # not cached
if not begin_parallel:
# in case of beginning a parallel segment, we don&#39;t want to count it as a cache miss,
# since the segment itself will not be cached, but rather the events within it.
self.simulation.cache_misses += 1
if not self.simulation.is_under_transaction(id=parallel_id) and not begin_parallel:
# BEGIN SEQUENTIAL TRANSACTION ###############################################################
#
# if this is the beginning of a parallel segment, we don&#39;t need to begin a transaction, since
# we want to allow additional transactions within the parallel segment (i.e., one-level reentrancy).
if not begin_parallel:
self.simulation.begin_transaction(id=parallel_id)
# Compute the function and encode the relevant output and simulation state
output = self.function(*self.args, **self.kwargs)
self._save_output_with_simulation_state(event_hash, output)
# END TRANSACTION #################################################################
if not begin_parallel:
self.simulation.end_transaction(id=parallel_id)
else: # already under transaction (thus, now a reentrant transaction) OR beginning a parallel segment
# NOTES:
#
# - Reentrant sequential transactions are not cached, since what matters is the final result of
# the top-level transaction.
#
# - The event that starts the parallel transactions segment WILL NOT itself be cached, since
# it is not part of the parallel segment, but rather the beginning of it. This event will be
# reconstructed during runtime from the parallel events within the segment.
output = self.function(*self.args, **self.kwargs)
if begin_parallel:
self.simulation.end_parallel_transactions()
# execute an ad-hoc Transaction to save the simulation state AFTER the parallel segment is done.
Transaction(self.obj_under_transaction, self.simulation, lambda: True).execute(begin_parallel=False, parallel_id=parallel_id)
else:
raise ValueError(f&#34;Simulation status is invalid at this point: {self.simulation.status}&#34;)
# Checkpoint if needed
logger.debug(f&#34;Will attempt to checkpoint simulation state after transaction execution.&#34;)
if self.simulation is not None and self.simulation.auto_checkpoint:
logger.debug(&#34;Auto-checkpointing simulation state after transaction execution.&#34;)
self.simulation.checkpoint()
# after all the transaction is done, return the output - the client will never know about all the complexity we&#39;ve
# gone through to get here.
return output</code></pre>
</details>
</dd>
</dl>
</dd>
</dl>
</section>
</article>
<nav id="sidebar">
<h1>Index</h1>
<div class="toc">
<ul></ul>
</div>
<ul id="index">
<li><h3>Super-module</h3>
<ul>
<li><code><a title="tinytroupe" href="index.html">tinytroupe</a></code></li>
</ul>
</li>
<li><h3><a href="#header-functions">Functions</a></h3>
<ul class="two-column">
<li><code><a title="tinytroupe.control.begin" href="#tinytroupe.control.begin">begin</a></code></li>
<li><code><a title="tinytroupe.control.cache_hits" href="#tinytroupe.control.cache_hits">cache_hits</a></code></li>
<li><code><a title="tinytroupe.control.cache_misses" href="#tinytroupe.control.cache_misses">cache_misses</a></code></li>
<li><code><a title="tinytroupe.control.checkpoint" href="#tinytroupe.control.checkpoint">checkpoint</a></code></li>
<li><code><a title="tinytroupe.control.current_simulation" href="#tinytroupe.control.current_simulation">current_simulation</a></code></li>
<li><code><a title="tinytroupe.control.end" href="#tinytroupe.control.end">end</a></code></li>
<li><code><a title="tinytroupe.control.reset" href="#tinytroupe.control.reset">reset</a></code></li>
<li><code><a title="tinytroupe.control.transactional" href="#tinytroupe.control.transactional">transactional</a></code></li>
</ul>
</li>
<li><h3><a href="#header-classes">Classes</a></h3>
<ul>
<li>
<h4><code><a title="tinytroupe.control.CacheOutOfSync" href="#tinytroupe.control.CacheOutOfSync">CacheOutOfSync</a></code></h4>
</li>
<li>
<h4><code><a title="tinytroupe.control.ExecutionCached" href="#tinytroupe.control.ExecutionCached">ExecutionCached</a></code></h4>
</li>
<li>
<h4><code><a title="tinytroupe.control.Simulation" href="#tinytroupe.control.Simulation">Simulation</a></code></h4>
<ul class="">
<li><code><a title="tinytroupe.control.Simulation.STATUS_STARTED" href="#tinytroupe.control.Simulation.STATUS_STARTED">STATUS_STARTED</a></code></li>
<li><code><a title="tinytroupe.control.Simulation.STATUS_STOPPED" href="#tinytroupe.control.Simulation.STATUS_STOPPED">STATUS_STOPPED</a></code></li>
<li><code><a title="tinytroupe.control.Simulation.add_agent" href="#tinytroupe.control.Simulation.add_agent">add_agent</a></code></li>
<li><code><a title="tinytroupe.control.Simulation.add_environment" href="#tinytroupe.control.Simulation.add_environment">add_environment</a></code></li>
<li><code><a title="tinytroupe.control.Simulation.add_factory" href="#tinytroupe.control.Simulation.add_factory">add_factory</a></code></li>
<li><code><a title="tinytroupe.control.Simulation.begin" href="#tinytroupe.control.Simulation.begin">begin</a></code></li>
<li><code><a title="tinytroupe.control.Simulation.begin_parallel_transactions" href="#tinytroupe.control.Simulation.begin_parallel_transactions">begin_parallel_transactions</a></code></li>
<li><code><a title="tinytroupe.control.Simulation.begin_transaction" href="#tinytroupe.control.Simulation.begin_transaction">begin_transaction</a></code></li>
<li><code><a title="tinytroupe.control.Simulation.checkpoint" href="#tinytroupe.control.Simulation.checkpoint">checkpoint</a></code></li>
<li><code><a title="tinytroupe.control.Simulation.end" href="#tinytroupe.control.Simulation.end">end</a></code></li>
<li><code><a title="tinytroupe.control.Simulation.end_parallel_transactions" href="#tinytroupe.control.Simulation.end_parallel_transactions">end_parallel_transactions</a></code></li>
<li><code><a title="tinytroupe.control.Simulation.end_transaction" href="#tinytroupe.control.Simulation.end_transaction">end_transaction</a></code></li>
<li><code><a title="tinytroupe.control.Simulation.is_under_parallel_transactions" href="#tinytroupe.control.Simulation.is_under_parallel_transactions">is_under_parallel_transactions</a></code></li>
<li><code><a title="tinytroupe.control.Simulation.is_under_transaction" href="#tinytroupe.control.Simulation.is_under_transaction">is_under_transaction</a></code></li>
</ul>
</li>
<li>
<h4><code><a title="tinytroupe.control.SkipTransaction" href="#tinytroupe.control.SkipTransaction">SkipTransaction</a></code></h4>
</li>
<li>
<h4><code><a title="tinytroupe.control.Transaction" href="#tinytroupe.control.Transaction">Transaction</a></code></h4>
<ul class="">
<li><code><a title="tinytroupe.control.Transaction.execute" href="#tinytroupe.control.Transaction.execute">execute</a></code></li>
</ul>
</li>
</ul>
</li>
</ul>
</nav>
</main>
<footer id="footer">
<p>Generated by <a href="https://pdoc3.github.io/pdoc" title="pdoc: Python API documentation generator"><cite>pdoc</cite> 0.10.0</a>.</p>
</footer>
</body>
</html>