Spaces:
Paused
Paused
| <html lang="en"> | |
| <head> | |
| <meta charset="utf-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1, minimum-scale=1" /> | |
| <meta name="generator" content="pdoc 0.10.0" /> | |
| <title>tinytroupe.control API documentation</title> | |
| <meta name="description" content="Simulation controlling mechanisms." /> | |
| <link rel="preload stylesheet" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/10up-sanitize.css/11.0.1/sanitize.min.css" integrity="sha256-PK9q560IAAa6WVRRh76LtCaI8pjTJ2z11v0miyNNjrs=" crossorigin> | |
| <link rel="preload stylesheet" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/10up-sanitize.css/11.0.1/typography.min.css" integrity="sha256-7l/o7C8jubJiy74VsKTidCy1yBkRtiUGbVkYBylBqUg=" crossorigin> | |
| <link rel="stylesheet preload" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/10.1.1/styles/github.min.css" crossorigin> | |
| <style>:root{--highlight-color:#fe9}.flex{display:flex }body{line-height:1.5em}#content{padding:20px}#sidebar{padding:30px;overflow:hidden}#sidebar > *:last-child{margin-bottom:2cm}.http-server-breadcrumbs{font-size:130%;margin:0 0 15px 0}#footer{font-size:.75em;padding:5px 30px;border-top:1px solid #ddd;text-align:right}#footer p{margin:0 0 0 1em;display:inline-block}#footer p:last-child{margin-right:30px}h1,h2,h3,h4,h5{font-weight:300}h1{font-size:2.5em;line-height:1.1em}h2{font-size:1.75em;margin:1em 0 .50em 0}h3{font-size:1.4em;margin:25px 0 10px 0}h4{margin:0;font-size:105%}h1:target,h2:target,h3:target,h4:target,h5:target,h6:target{background:var(--highlight-color);padding:.2em 0}a{color:#058;text-decoration:none;transition:color .3s ease-in-out}a:hover{color:#e82}.title code{font-weight:bold}h2[id^="header-"]{margin-top:2em}.ident{color:#900}pre code{background:#f8f8f8;font-size:.8em;line-height:1.4em}code{background:#f2f2f1;padding:1px 4px;overflow-wrap:break-word}h1 code{background:transparent}pre{background:#f8f8f8;border:0;border-top:1px solid #ccc;border-bottom:1px solid #ccc;margin:1em 0;padding:1ex}#http-server-module-list{display:flex;flex-flow:column}#http-server-module-list div{display:flex}#http-server-module-list dt{min-width:10%}#http-server-module-list p{margin-top:0}.toc ul,#index{list-style-type:none;margin:0;padding:0}#index code{background:transparent}#index h3{border-bottom:1px solid #ddd}#index ul{padding:0}#index h4{margin-top:.6em;font-weight:bold}@media (min-width:200ex){#index .two-column{column-count:2}}@media (min-width:300ex){#index .two-column{column-count:3}}dl{margin-bottom:2em}dl dl:last-child{margin-bottom:4em}dd{margin:0 0 1em 3em}#header-classes + dl > dd{margin-bottom:3em}dd dd{margin-left:2em}dd p{margin:10px 0}.name{background:#eee;font-weight:bold;font-size:.85em;padding:5px 10px;display:inline-block;min-width:40%}.name:hover{background:#e0e0e0}dt:target .name{background:var(--highlight-color)}.name > span:first-child{white-space:nowrap}.name.class > span:nth-child(2){margin-left:.4em}.inherited{color:#999;border-left:5px solid #eee;padding-left:1em}.inheritance em{font-style:normal;font-weight:bold}.desc h2{font-weight:400;font-size:1.25em}.desc h3{font-size:1em}.desc dt code{background:inherit}.source summary,.git-link-div{color:#666;text-align:right;font-weight:400;font-size:.8em;text-transform:uppercase}.source summary > *{white-space:nowrap;cursor:pointer}.git-link{color:inherit;margin-left:1em}.source pre{max-height:500px;overflow:auto;margin:0}.source pre code{font-size:12px;overflow:visible}.hlist{list-style:none}.hlist li{display:inline}.hlist li:after{content:',\2002'}.hlist li:last-child:after{content:none}.hlist .hlist{display:inline;padding-left:1em}img{max-width:100%}td{padding:0 .5em}.admonition{padding:.1em .5em;margin-bottom:1em}.admonition-title{font-weight:bold}.admonition.note,.admonition.info,.admonition.important{background:#aef}.admonition.todo,.admonition.versionadded,.admonition.tip,.admonition.hint{background:#dfd}.admonition.warning,.admonition.versionchanged,.admonition.deprecated{background:#fd4}.admonition.error,.admonition.danger,.admonition.caution{background:lightpink}</style> | |
| <style media="screen and (min-width: 700px)">@media screen and (min-width:700px){#sidebar{width:30%;height:100vh;overflow:auto;position:sticky;top:0}#content{width:70%;max-width:100ch;padding:3em 4em;border-left:1px solid #ddd}pre code{font-size:1em}.item .name{font-size:1em}main{display:flex;flex-direction:row-reverse;justify-content:flex-end}.toc ul ul,#index ul{padding-left:1.5em}.toc > ul > li{margin-top:.5em}}</style> | |
| <style media="print">@media print{#sidebar h1{page-break-before:always}.source{display:none}}@media print{*{background:transparent ;color:#000 ;box-shadow:none ;text-shadow:none }a[href]:after{content:" (" attr(href) ")";font-size:90%}a[href][title]:after{content:none}abbr[title]:after{content:" (" attr(title) ")"}.ir a:after,a[href^="javascript:"]:after,a[href^="#"]:after{content:""}pre,blockquote{border:1px solid #999;page-break-inside:avoid}thead{display:table-header-group}tr,img{page-break-inside:avoid}img{max-width:100% }@page{margin:0.5cm}p,h2,h3{orphans:3;widows:3}h1,h2,h3,h4,h5,h6{page-break-after:avoid}}</style> | |
| <script defer src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/10.1.1/highlight.min.js" integrity="sha256-Uv3H6lx7dJmRfRvH8TH6kJD1TSK1aFcwgx+mdg3epi8=" crossorigin></script> | |
| <script>window.addEventListener('DOMContentLoaded', () => hljs.initHighlighting())</script> | |
| </head> | |
| <body> | |
| <main> | |
| <article id="content"> | |
| <header> | |
| <h1 class="title">Module <code>tinytroupe.control</code></h1> | |
| </header> | |
| <section id="section-intro"> | |
| <p>Simulation controlling mechanisms.</p> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">""" | |
| Simulation controlling mechanisms. | |
| """ | |
| import json | |
| import os | |
| import tempfile | |
| import threading | |
| import traceback | |
| import tinytroupe | |
| import tinytroupe.utils as utils | |
| import uuid | |
| import logging | |
| logger = logging.getLogger("tinytroupe") | |
| # to protect from race conditions when running in parallel | |
| concurrent_execution_lock = threading.Lock() | |
| class Simulation: | |
| STATUS_STOPPED = "stopped" | |
| STATUS_STARTED = "started" | |
| def __init__(self, id="default", cached_trace:list=None): | |
| self.id = id | |
| self.agents = [] | |
| self.name_to_agent = {} # {agent_name: agent, ...} | |
| self.environments = [] | |
| self.factories = [] # e.g., TinyPersonFactory instances | |
| self.name_to_factory = {} # {factory_name: factory, ...} | |
| self.name_to_environment = {} # {environment_name: environment, ...} | |
| self.status = Simulation.STATUS_STOPPED | |
| self.cache_path = f"./tinytroupe-{id}.cache.json" # default cache path | |
| # should we always automatically checkpoint at the every transaction? | |
| self.auto_checkpoint = False | |
| # whether there are changes not yet saved to the cache file | |
| self.has_unsaved_cache_changes = False | |
| # whether the agent is under a transaction or not, used for managing | |
| # simulation caching later | |
| self._under_transaction = {None: False} | |
| # whether the agent is under a parallel transactions segment or not, used for managing | |
| # simulation caching later | |
| self._under_parallel_transactions = False | |
| # Cache chain mechanism. | |
| # | |
| # stores a list of simulation states. | |
| # Each state is a tuple (prev_node_hash, event_hash, event_output, state), where prev_node_hash is a hash of the previous node in this chain, | |
| # if any, event_hash is a hash of the event that triggered the transition to this state, if any, event_output is the output of the event, | |
| # if any, and state is the actual complete state that resulted. | |
| if cached_trace is None: | |
| self.cached_trace = [] | |
| else: | |
| self.cached_trace = cached_trace | |
| self.cache_misses = 0 | |
| self.cache_hits = 0 | |
| # Execution chain mechanism. | |
| # | |
| # The actual, current, execution trace. Each state is a tuple (prev_node_hash, event_hash, state), where prev_node_hash is a hash | |
| # of the previous node in this chain, if any, event_hash is a hash of the event that triggered the transition to this state, if any, | |
| # event_output is the output of the event, if any, and state is the actual complete state that resulted. | |
| self.execution_trace = [] | |
| def begin(self, cache_path:str=None, auto_checkpoint:bool=False): | |
| """ | |
| Marks the start of the simulation being controlled. | |
| Args: | |
| cache_path (str): The path to the cache file. If not specified, | |
| defaults to the default cache path defined in the class. | |
| auto_checkpoint (bool, optional): Whether to automatically checkpoint at the end of each transaction. Defaults to False. | |
| """ | |
| logger.debug(f"Starting simulation, cache_path={cache_path}, auto_checkpoint={auto_checkpoint}.") | |
| # local import to avoid circular dependencies | |
| from tinytroupe.agent import TinyPerson | |
| from tinytroupe.environment import TinyWorld | |
| from tinytroupe.factory.tiny_factory import TinyFactory | |
| from tinytroupe.factory.tiny_person_factory import TinyPersonFactory | |
| if self.status == Simulation.STATUS_STOPPED: | |
| self.status = Simulation.STATUS_STARTED | |
| else: | |
| raise ValueError("Simulation is already started.") | |
| if cache_path is not None: | |
| self.cache_path = cache_path | |
| # should we automatically checkpoint? | |
| self.auto_checkpoint = auto_checkpoint | |
| # clear the agents, environments and other simulated entities, we'll track them from now on | |
| TinyPerson.clear_agents() | |
| TinyWorld.clear_environments() | |
| TinyFactory.clear_factories() | |
| TinyPersonFactory.clear_factories() | |
| # All automated fresh ids will start from 0 again for this simulation | |
| utils.reset_fresh_id() | |
| # load the cache file, if any | |
| if self.cache_path is not None: | |
| self._load_cache_file(self.cache_path) | |
| def end(self): | |
| """ | |
| Marks the end of the simulation being controlled. | |
| """ | |
| logger.debug("Ending simulation.") | |
| if self.status == Simulation.STATUS_STARTED: | |
| self.status = Simulation.STATUS_STOPPED | |
| self.checkpoint() | |
| else: | |
| raise ValueError("Simulation is already stopped.") | |
| def checkpoint(self): | |
| """ | |
| Saves current simulation trace to a file. | |
| """ | |
| logger.debug("Checkpointing simulation state...") | |
| # save the cache file | |
| if self.has_unsaved_cache_changes: | |
| self._save_cache_file(self.cache_path) | |
| else: | |
| logger.debug("No unsaved cache changes to save to file.") | |
| def add_agent(self, agent): | |
| """ | |
| Adds an agent to the simulation. | |
| """ | |
| if agent.name in self.name_to_agent: | |
| raise ValueError(f"Agent names must be unique, but '{agent.name}' is already defined.") | |
| agent.simulation_id = self.id | |
| self.agents.append(agent) | |
| self.name_to_agent[agent.name] = agent | |
| def add_environment(self, environment): | |
| """ | |
| Adds an environment to the simulation. | |
| """ | |
| if environment.name in self.name_to_environment: | |
| raise ValueError(f"Environment names must be unique, but '{environment.name}' is already defined.") | |
| environment.simulation_id = self.id | |
| self.environments.append(environment) | |
| self.name_to_environment[environment.name] = environment | |
| def add_factory(self, factory): | |
| """ | |
| Adds a factory to the simulation. | |
| """ | |
| if factory.name in self.name_to_factory: | |
| raise ValueError(f"Factory names must be unique, but '{factory.name}' is already defined.") | |
| factory.simulation_id = self.id | |
| self.factories.append(factory) | |
| self.name_to_factory[factory.name] = factory | |
| ################################################################################################### | |
| # Cache and execution chain mechanisms | |
| ################################################################################################### | |
| def _execution_trace_position(self) -> int: | |
| """ | |
| Returns the current position in the execution trace, or -1 if the execution trace is empty. | |
| """ | |
| return len(self.execution_trace) - 1 | |
| def _function_call_hash(self, function_name, *args, **kwargs) -> int: | |
| """ | |
| Computes the hash of the given function call. | |
| """ | |
| # if functions are passed as arguments to the function, there's the problem that their | |
| # string representation always changes due to memory position (e.g., <function my_function at 0x7f8d1a7b7d30>). | |
| # so we need to remove the changing suffix in those cases, while preserving the function name if it exists. | |
| # positional arguments | |
| # covnerts to a list of string representations first | |
| args_str = list(map(str, args)) | |
| for i, arg in enumerate(args): | |
| if callable(arg): | |
| args_str[i] = arg.__name__ | |
| # keyword arguments | |
| # converts to a list of string representations first | |
| kwargs_str = {k: str(v) for k, v in kwargs.items()} | |
| for k, v in kwargs.items(): | |
| if callable(v): | |
| kwargs_str[k] = v.__name__ | |
| # then, convert to a single string, to obtain a unique hash | |
| event = str((function_name, args_str, kwargs_str)) | |
| # TODO actually compute a short hash of the event string, e.g., using SHA256 ? | |
| # event_hash = utils.custom_hash(event) | |
| return event | |
| def _skip_execution_with_cache(self): | |
| """ | |
| Skips the current execution, assuming there's a cached state at the same position. | |
| """ | |
| assert len(self.cached_trace) > self._execution_trace_position() + 1, "There's no cached state at the current execution position." | |
| self.execution_trace.append(self.cached_trace[self._execution_trace_position() + 1]) | |
| def _is_transaction_event_cached(self, event_hash, parallel=False) -> bool: | |
| """ | |
| Checks whether the given event hash matches the corresponding cached one, if any. | |
| If there's no corresponding cached state, returns True. | |
| """ | |
| if not parallel: | |
| # there's cache that could be used | |
| if len(self.cached_trace) > self._execution_trace_position() + 1: | |
| if self._execution_trace_position() >= -1: | |
| # here's a graphical depiction of the logic: | |
| # | |
| # Cache: c0:(c_prev_node_hash_0, c_event_hash_0, _, c_state_0) ------------------> c1:(c_prev_node_hash_1, c_event_hash_1, _, c_state_1) -> ... | |
| # Execution: e0:(e_prev_node_hash_0, e_event_hash_0, _, e_state_0) -<being computed>-> e1:(e_prev_node_hash_1, <being computed>, <being computed>, <being computed>) | |
| # position = 0 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | |
| # | |
| # Must satisfy: | |
| # - event_hash == c_event_hash_1 | |
| # - hash(e0) == c_prev_node_hash_1 | |
| try: | |
| event_hash_match = event_hash == self.cached_trace[self._execution_trace_position() + 1][1] | |
| except Exception as e: | |
| logger.error(f"Error while checking event hash match: {e}") | |
| event_hash_match = False | |
| prev_node_match = True # TODO implement real check | |
| return event_hash_match and prev_node_match | |
| else: | |
| raise ValueError("Execution trace position is invalid, must be >= -1, but is ", self._execution_trace_position()) | |
| else: # no cache to use | |
| return False | |
| else: # parallel | |
| if len(self.cached_trace) >= self._execution_trace_position(): | |
| if self._execution_trace_position() >= 0: | |
| # parallel stores ignore order, so we need to check instead whether the event hash is a key in the parallel store, | |
| # regardless of the order of the events generated the data therein. | |
| if isinstance(self.cached_trace[self._execution_trace_position()], dict): | |
| event_hash_match = event_hash in self.cached_trace[self._execution_trace_position()].keys() | |
| else: | |
| event_hash_match = False | |
| prev_node_match = True # TODO implement real check | |
| return event_hash_match and prev_node_match | |
| else: | |
| raise ValueError("Execution trace position is invalid, must be >= 0, but is ", self._execution_trace_position()) | |
| def _get_cached_parallel_value(self, event_hash, key): | |
| parallel_store = self.cached_trace[self._execution_trace_position()] | |
| value = parallel_store[event_hash][key] | |
| return value | |
| def _drop_cached_trace_suffix(self): | |
| """ | |
| Drops the cached trace suffix starting at the current execution trace position. This effectively | |
| refreshes the cache to the current execution state and starts building a new cache from there. | |
| """ | |
| self.cached_trace = self.cached_trace[:self._execution_trace_position()+1] | |
| def _add_to_execution_trace(self, state: dict, event_hash: int, event_output, parallel=False): | |
| """ | |
| Adds a state to the execution_trace list and computes the appropriate hash. | |
| The computed hash is compared to the hash of the cached trace at the same position, | |
| and if they don't match, the execution is aborted. Similarly, the event_hash is compared | |
| to the hash of the event in the cached trace at the same position, and if they don't match, the execution | |
| is aborted. | |
| """ | |
| # Compute the hash of the previous execution pair, if any | |
| previous_hash = None | |
| if not parallel: | |
| # Create a tuple of (hash, state) and append it to the execution_trace list | |
| self.execution_trace.append((previous_hash, event_hash, event_output, state)) | |
| else: | |
| with concurrent_execution_lock: | |
| # state is not stored in parallel segments, only outputs | |
| self.execution_trace[-1][event_hash] = {"prev_node_hash": previous_hash, | |
| "encoded_output": event_output} | |
| def _add_to_cache_trace(self, state: dict, event_hash: int, event_output, parallel=False): | |
| """ | |
| Adds a state to the cached_trace list and computes the appropriate hash. | |
| """ | |
| # Compute the hash of the previous cached pair, if any | |
| previous_hash = None | |
| if self.cached_trace: | |
| previous_hash = utils.custom_hash(self.cached_trace[-1]) | |
| if not parallel: | |
| # Create a tuple of (hash, state) and append it to the cached_trace list | |
| self.cached_trace.append((previous_hash, event_hash, event_output, state)) | |
| else: | |
| with concurrent_execution_lock: | |
| # state is not stored in parallel segments, only outputs | |
| self.cached_trace[-1][event_hash] = {"prev_node_hash": previous_hash, | |
| "encoded_output": event_output} | |
| self.has_unsaved_cache_changes = True | |
| def _load_cache_file(self, cache_path:str): | |
| """ | |
| Loads the cache file from the given path. | |
| """ | |
| try: | |
| self.cached_trace = json.load(open(cache_path, "r")) | |
| except FileNotFoundError: | |
| logger.info(f"Cache file not found on path: {cache_path}.") | |
| self.cached_trace = [] | |
| def _save_cache_file(self, cache_path:str): | |
| """ | |
| Saves the cache file to the given path. Always overwrites. | |
| """ | |
| logger.debug(f"Now saving cache file to {cache_path}.") | |
| try: | |
| # Create a temporary file | |
| with tempfile.NamedTemporaryFile('w', delete=False) as temp: | |
| json.dump(self.cached_trace, temp, indent=4) | |
| # Replace the original file with the temporary file | |
| os.replace(temp.name, cache_path) | |
| except Exception as e: | |
| traceback_string = ''.join(traceback.format_tb(e.__traceback__)) | |
| logger.error(f"An error occurred while saving the cache file: {e}\nTraceback:\n{traceback_string}") | |
| self.has_unsaved_cache_changes = False | |
| ################################################################################################### | |
| # Transactional control | |
| ################################################################################################### | |
| # | |
| # Regular sequential transactions | |
| # | |
| def begin_transaction(self, id=None): | |
| """ | |
| Starts a transaction. | |
| """ | |
| with concurrent_execution_lock: | |
| self._under_transaction[id] = True | |
| self._clear_communications_buffers() # TODO <---------------------------------------------------------------- | |
| def end_transaction(self, id=None): | |
| """ | |
| Ends a transaction. | |
| """ | |
| with concurrent_execution_lock: | |
| self._under_transaction[id] = False | |
| def is_under_transaction(self, id=None): | |
| """ | |
| Checks if the agent is under a transaction. | |
| """ | |
| with concurrent_execution_lock: | |
| return self._under_transaction.get(id, False) | |
| def _clear_communications_buffers(self): | |
| """ | |
| Cleans the communications buffers of all agents and environments. | |
| """ | |
| for agent in self.agents: | |
| agent.clear_communications_buffer() | |
| for environment in self.environments: | |
| environment.clear_communications_buffer() | |
| # | |
| # Parallel transactions | |
| # | |
| def begin_parallel_transactions(self): | |
| """ | |
| Starts parallel transactions. | |
| """ | |
| with concurrent_execution_lock: | |
| self._under_parallel_transactions = True | |
| # add a new parallel segment to the execution and cache traces | |
| self.execution_trace.append({}) | |
| self.cached_trace.append({}) | |
| def end_parallel_transactions(self): | |
| """ | |
| Ends parallel transactions. | |
| """ | |
| self._under_parallel_transactions = False | |
| def is_under_parallel_transactions(self): | |
| """ | |
| Checks if the agent is under parallel transactions. | |
| """ | |
| return self._under_parallel_transactions | |
| ################################################################################################### | |
| # Simulation state handling | |
| ################################################################################################### | |
| def _encode_simulation_state(self) -> dict: | |
| """ | |
| Encodes the current simulation state, including agents, environments, and other | |
| relevant information. | |
| """ | |
| state = {} | |
| # Encode agents | |
| state["agents"] = [] | |
| for agent in self.agents: | |
| state["agents"].append(agent.encode_complete_state()) | |
| # Encode environments | |
| state["environments"] = [] | |
| for environment in self.environments: | |
| state["environments"].append(environment.encode_complete_state()) | |
| # Encode factories | |
| state["factories"] = [] | |
| for factory in self.factories: | |
| state["factories"].append(factory.encode_complete_state()) | |
| return state | |
| def _decode_simulation_state(self, state: dict): | |
| """ | |
| Decodes the given simulation state, including agents, environments, and other | |
| relevant information. | |
| Args: | |
| state (dict): The state to decode. | |
| """ | |
| # local import to avoid circular dependencies | |
| from tinytroupe.agent import TinyPerson | |
| from tinytroupe.environment import TinyWorld | |
| logger.debug(f"Decoding simulation state: {state['factories']}") | |
| logger.debug(f"Registered factories: {self.name_to_factory}") | |
| logger.debug(f"Registered agents: {self.name_to_agent}") | |
| logger.debug(f"Registered environments: {self.name_to_environment}") | |
| # Decode factories | |
| for factory_state in state["factories"]: | |
| factory = self.name_to_factory[factory_state["name"]] | |
| factory.decode_complete_state(factory_state) | |
| # Decode environments | |
| ###self.environments = [] | |
| for environment_state in state["environments"]: | |
| try: | |
| environment = self.name_to_environment[environment_state["name"]] | |
| environment.decode_complete_state(environment_state) | |
| if TinyWorld.communication_display: | |
| environment.pop_and_display_latest_communications() | |
| except Exception as e: | |
| raise ValueError(f"Environment {environment_state['name']} is not in the simulation, thus cannot be decoded there.") from e | |
| # Decode agents (if they were not already decoded by the environment) | |
| ####self.agents = [] | |
| for agent_state in state["agents"]: | |
| try: | |
| agent = self.name_to_agent[agent_state["name"]] | |
| agent.decode_complete_state(agent_state) | |
| # The agent has not yet been decoded because it is not in any environment. So, decode it. | |
| if agent.environment is None: | |
| if TinyPerson.communication_display: | |
| agent.pop_and_display_latest_communications() | |
| except Exception as e: | |
| raise ValueError(f"Agent {agent_state['name']} is not in the simulation, thus cannot be decoded there.") from e | |
| class Transaction: | |
| def __init__(self, obj_under_transaction, simulation, function, *args, **kwargs): | |
| # local import to avoid circular dependencies | |
| from tinytroupe.agent import TinyPerson | |
| from tinytroupe.environment import TinyWorld | |
| from tinytroupe.factory.tiny_factory import TinyFactory | |
| self.obj_under_transaction = obj_under_transaction | |
| self.simulation = simulation | |
| self.function_name = function.__name__ | |
| self.function = function | |
| self.args = args | |
| self.kwargs = kwargs | |
| # | |
| # If we have an ongoing simulation, set the simulation id of the object under transaction if it is not already set. | |
| # | |
| if simulation is not None: | |
| if hasattr(obj_under_transaction, 'simulation_id') and obj_under_transaction.simulation_id is not None: | |
| if obj_under_transaction.simulation_id != simulation.id: | |
| raise ValueError(f"Object {obj_under_transaction} is already captured by a different simulation (id={obj_under_transaction.simulation_id}), \ | |
| and cannot be captured by simulation id={simulation.id}.") | |
| logger.debug(f">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Object {obj_under_transaction} is already captured by simulation {simulation.id}.") | |
| else: | |
| # if is a TinyPerson, add the agent to the simulation | |
| if isinstance(obj_under_transaction, TinyPerson): | |
| simulation.add_agent(obj_under_transaction) | |
| logger.debug(f">>>>>>>>>>>>>>>>>>>>>>> Added agent {obj_under_transaction} to simulation {simulation.id}.") | |
| # if is a TinyWorld, add the environment to the simulation | |
| elif isinstance(obj_under_transaction, TinyWorld): | |
| simulation.add_environment(obj_under_transaction) | |
| # if is a TinyFactory, add the factory to the simulation | |
| elif isinstance(obj_under_transaction, TinyFactory): | |
| simulation.add_factory(obj_under_transaction) | |
| logger.debug(f">>>>>>>>>>>>>>>>>>>>>>> Added factory {obj_under_transaction} to simulation {simulation.id}.") | |
| else: | |
| raise ValueError(f"Object {obj_under_transaction} (type = {type(obj_under_transaction)}) is not a TinyPerson or TinyWorld instance, and cannot be captured by the simulation.") | |
| def execute(self, begin_parallel=False, parallel_id=None): | |
| output = None | |
| # Transaction caching will only operate if there is a simulation and it is started | |
| if self.simulation is None or self.simulation.status == Simulation.STATUS_STOPPED: | |
| # Compute the function and return it, no caching, since the simulation is not started | |
| output = self.function(*self.args, **self.kwargs) | |
| elif self.simulation.status == Simulation.STATUS_STARTED: | |
| # Compute the event hash | |
| event_hash = self.simulation._function_call_hash(self.function_name, *self.args, **self.kwargs) | |
| # Sequential and parallel transactions are handled in different ways | |
| if begin_parallel: | |
| self.simulation.begin_parallel_transactions() | |
| # CACHED? Check if the event hash is in the cache | |
| if self.simulation._is_transaction_event_cached(event_hash, | |
| parallel=self.simulation.is_under_parallel_transactions()): | |
| self.simulation.cache_hits += 1 | |
| # Restore the full state and return the cached output | |
| logger.info(f"Skipping execution of {self.function_name} with args {self.args} and kwargs {self.kwargs} because it is already cached.") | |
| # SEQUENTIAL | |
| if not self.simulation.is_under_parallel_transactions(): | |
| self.simulation._skip_execution_with_cache() | |
| state = self.simulation.cached_trace[self.simulation._execution_trace_position()][3] # state | |
| self.simulation._decode_simulation_state(state) | |
| # Output encoding/decoding is used to preserve references to TinyPerson and TinyWorld instances | |
| # mainly. Scalar values (int, float, str, bool) and composite values (list, dict) are | |
| # encoded/decoded as is. | |
| encoded_output = self.simulation.cached_trace[self.simulation._execution_trace_position()][2] # output | |
| output = self._decode_function_output(encoded_output) | |
| # PARALLEL | |
| else: # is under parallel transactions | |
| # in parallel segments, state is not restored, only outputs | |
| encoded_output = self.simulation._get_cached_parallel_value(event_hash, "encoded_output") | |
| output = self._decode_function_output(encoded_output) | |
| else: # not cached | |
| if not begin_parallel: | |
| # in case of beginning a parallel segment, we don't want to count it as a cache miss, | |
| # since the segment itself will not be cached, but rather the events within it. | |
| self.simulation.cache_misses += 1 | |
| if not self.simulation.is_under_transaction(id=parallel_id) and not begin_parallel: | |
| # BEGIN SEQUENTIAL TRANSACTION ############################################################### | |
| # | |
| # if this is the beginning of a parallel segment, we don't need to begin a transaction, since | |
| # we want to allow additional transactions within the parallel segment (i.e., one-level reentrancy). | |
| if not begin_parallel: | |
| self.simulation.begin_transaction(id=parallel_id) | |
| # Compute the function and encode the relevant output and simulation state | |
| output = self.function(*self.args, **self.kwargs) | |
| self._save_output_with_simulation_state(event_hash, output) | |
| # END TRANSACTION ################################################################# | |
| if not begin_parallel: | |
| self.simulation.end_transaction(id=parallel_id) | |
| else: # already under transaction (thus, now a reentrant transaction) OR beginning a parallel segment | |
| # NOTES: | |
| # | |
| # - Reentrant sequential transactions are not cached, since what matters is the final result of | |
| # the top-level transaction. | |
| # | |
| # - The event that starts the parallel transactions segment WILL NOT itself be cached, since | |
| # it is not part of the parallel segment, but rather the beginning of it. This event will be | |
| # reconstructed during runtime from the parallel events within the segment. | |
| output = self.function(*self.args, **self.kwargs) | |
| if begin_parallel: | |
| self.simulation.end_parallel_transactions() | |
| # execute an ad-hoc Transaction to save the simulation state AFTER the parallel segment is done. | |
| Transaction(self.obj_under_transaction, self.simulation, lambda: True).execute(begin_parallel=False, parallel_id=parallel_id) | |
| else: | |
| raise ValueError(f"Simulation status is invalid at this point: {self.simulation.status}") | |
| # Checkpoint if needed | |
| logger.debug(f"Will attempt to checkpoint simulation state after transaction execution.") | |
| if self.simulation is not None and self.simulation.auto_checkpoint: | |
| logger.debug("Auto-checkpointing simulation state after transaction execution.") | |
| self.simulation.checkpoint() | |
| # after all the transaction is done, return the output - the client will never know about all the complexity we've | |
| # gone through to get here. | |
| return output | |
| def _save_output_with_simulation_state(self, event_hash, output): | |
| encoded_output = self._encode_function_output(output) | |
| state = self.simulation._encode_simulation_state() | |
| # immediately drop the cached trace suffix, since we are starting a new execution from this point on. | |
| # in the case of parallel transactions, this will drop everything _after_ the current parallel segment | |
| # (which itself occupies one position only, with a dictionary of event hashes and their outputs). | |
| self.simulation._drop_cached_trace_suffix() | |
| # Cache the result and update the current execution trace. If this is a parallel transaction, the | |
| # cache and execution traces will be updated in a different way. | |
| self.simulation._add_to_cache_trace(state, event_hash, encoded_output, | |
| parallel=self.simulation.is_under_parallel_transactions()) | |
| self.simulation._add_to_execution_trace(state, event_hash, encoded_output, | |
| parallel=self.simulation.is_under_parallel_transactions()) | |
| def _encode_function_output(self, output) -> dict: | |
| """ | |
| Encodes the given function output. | |
| """ | |
| # local import to avoid circular dependencies | |
| from tinytroupe.agent import TinyPerson | |
| from tinytroupe.environment import TinyWorld | |
| from tinytroupe.factory.tiny_factory import TinyFactory | |
| # if the output is a supported object, encode it | |
| if output is None: | |
| return None | |
| elif isinstance(output, TinyPerson): | |
| return {"type": "TinyPersonRef", "name": output.name} | |
| elif isinstance(output, TinyWorld): | |
| return {"type": "TinyWorldRef", "name": output.name} | |
| elif isinstance(output, TinyFactory): | |
| return {"type": "TinyFactoryRef", "name": output.name} | |
| elif isinstance(output, list): | |
| encoded_list = [] | |
| for item in output: | |
| if isinstance(item, TinyPerson): | |
| encoded_list.append({"type": "TinyPersonRef", "name": item.name}) | |
| elif isinstance(item, TinyWorld): | |
| encoded_list.append({"type": "TinyWorldRef", "name": item.name}) | |
| elif isinstance(item, TinyFactory): | |
| encoded_list.append({"type": "TinyFactoryRef", "name": item.name}) | |
| else: | |
| encoded_list.append({"type": "JSON", "value": item}) | |
| return {"type": "List", "value": encoded_list} | |
| elif isinstance(output, (int, float, str, bool, dict, tuple)): | |
| return {"type": "JSON", "value": output} | |
| else: | |
| raise ValueError(f"Unsupported output type: {type(output)}") | |
| def _decode_function_output(self, encoded_output: dict): | |
| """ | |
| Decodes the given encoded function output. | |
| """ | |
| # local import to avoid circular dependencies | |
| from tinytroupe.agent import TinyPerson | |
| from tinytroupe.environment import TinyWorld | |
| from tinytroupe.factory.tiny_factory import TinyFactory | |
| if encoded_output is None: | |
| return None | |
| elif encoded_output["type"] == "TinyPersonRef": | |
| return TinyPerson.get_agent_by_name(encoded_output["name"]) | |
| elif encoded_output["type"] == "TinyWorldRef": | |
| return TinyWorld.get_environment_by_name(encoded_output["name"]) | |
| elif encoded_output["type"] == "TinyFactoryRef": | |
| return TinyFactory.get_factory_by_name(encoded_output["name"]) | |
| elif encoded_output["type"] == "List": | |
| decoded_list = [] | |
| for item in encoded_output["value"]: | |
| if item["type"] == "TinyPersonRef": | |
| decoded_list.append(TinyPerson.get_agent_by_name(item["name"])) | |
| elif item["type"] == "TinyWorldRef": | |
| decoded_list.append(TinyWorld.get_environment_by_name(item["name"])) | |
| elif item["type"] == "TinyFactoryRef": | |
| decoded_list.append(TinyFactory.get_factory_by_name(item["name"])) | |
| else: | |
| decoded_list.append(item["value"]) | |
| return decoded_list | |
| elif encoded_output["type"] == "JSON": | |
| return encoded_output["value"] | |
| else: | |
| raise ValueError(f"Unsupported output type: {encoded_output['type']}") | |
| def transactional(parallel=False): | |
| """ | |
| A helper decorator that makes a function simulation-transactional. | |
| """ | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| obj_under_transaction = args[0] | |
| simulation = current_simulation() | |
| obj_sim_id = obj_under_transaction.simulation_id if hasattr(obj_under_transaction, 'simulation_id') else None | |
| logger.debug(f"-----------------------------------------> Transaction: {func.__name__} with args {args[1:]} and kwargs {kwargs} under simulation {obj_sim_id}, parallel={parallel}.") | |
| parallel_id = str(threading.current_thread()) | |
| transaction = Transaction(obj_under_transaction, simulation, func, *args, **kwargs) | |
| result = transaction.execute(begin_parallel=parallel, parallel_id=parallel_id) | |
| return result | |
| return wrapper | |
| return decorator | |
| class SkipTransaction(Exception): | |
| pass | |
| class CacheOutOfSync(Exception): | |
| """ | |
| Raised when a cached and the corresponding freshly executed elements are out of sync. | |
| """ | |
| pass | |
| class ExecutionCached(Exception): | |
| """ | |
| Raised when a proposed execution is already cached. | |
| """ | |
| pass | |
| ################################################################################################### | |
| # Convenience functions | |
| ################################################################################################### | |
| def reset(): | |
| """ | |
| Resets the entire simulation control state. | |
| """ | |
| global _current_simulations, _current_simulation_id | |
| _current_simulations = {"default": None} | |
| # TODO Currently, only one simulation can be started at a time. In future versions, this should be | |
| # changed to allow multiple simulations to be started at the same time, e.g., for fast | |
| # analyses through parallelization. | |
| _current_simulation_id = None | |
| def _simulation(id="default"): | |
| global _current_simulations | |
| if _current_simulations[id] is None: | |
| _current_simulations[id] = Simulation() | |
| return _current_simulations[id] | |
| def begin(cache_path=None, id="default", auto_checkpoint=False): | |
| """ | |
| Marks the start of the simulation being controlled. | |
| """ | |
| global _current_simulation_id | |
| if _current_simulation_id is None: | |
| _simulation(id).begin(cache_path, auto_checkpoint) | |
| _current_simulation_id = id | |
| else: | |
| raise ValueError(f"Simulation is already started under id {_current_simulation_id}. Currently only one simulation can be started at a time.") | |
| def end(id="default"): | |
| """ | |
| Marks the end of the simulation being controlled. | |
| """ | |
| global _current_simulation_id | |
| _simulation(id).end() | |
| _current_simulation_id = None | |
| def checkpoint(id="default"): | |
| """ | |
| Saves current simulation state. | |
| """ | |
| _simulation(id).checkpoint() | |
| def current_simulation(): | |
| """ | |
| Returns the current simulation. | |
| """ | |
| global _current_simulation_id | |
| if _current_simulation_id is not None: | |
| return _simulation(_current_simulation_id) | |
| else: | |
| return None | |
| def cache_hits(id="default"): | |
| """ | |
| Returns the number of cache hits. | |
| """ | |
| return _simulation(id).cache_hits | |
| def cache_misses(id="default"): | |
| """ | |
| Returns the number of cache misses. | |
| """ | |
| return _simulation(id).cache_misses | |
| reset() # initialize the control state</code></pre> | |
| </details> | |
| </section> | |
| <section> | |
| </section> | |
| <section> | |
| </section> | |
| <section> | |
| <h2 class="section-title" id="header-functions">Functions</h2> | |
| <dl> | |
| <dt id="tinytroupe.control.begin"><code class="name flex"> | |
| <span>def <span class="ident">begin</span></span>(<span>cache_path=None, id='default', auto_checkpoint=False)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Marks the start of the simulation being controlled.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def begin(cache_path=None, id="default", auto_checkpoint=False): | |
| """ | |
| Marks the start of the simulation being controlled. | |
| """ | |
| global _current_simulation_id | |
| if _current_simulation_id is None: | |
| _simulation(id).begin(cache_path, auto_checkpoint) | |
| _current_simulation_id = id | |
| else: | |
| raise ValueError(f"Simulation is already started under id {_current_simulation_id}. Currently only one simulation can be started at a time.") </code></pre> | |
| </details> | |
| </dd> | |
| <dt id="tinytroupe.control.cache_hits"><code class="name flex"> | |
| <span>def <span class="ident">cache_hits</span></span>(<span>id='default')</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Returns the number of cache hits.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def cache_hits(id="default"): | |
| """ | |
| Returns the number of cache hits. | |
| """ | |
| return _simulation(id).cache_hits</code></pre> | |
| </details> | |
| </dd> | |
| <dt id="tinytroupe.control.cache_misses"><code class="name flex"> | |
| <span>def <span class="ident">cache_misses</span></span>(<span>id='default')</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Returns the number of cache misses.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def cache_misses(id="default"): | |
| """ | |
| Returns the number of cache misses. | |
| """ | |
| return _simulation(id).cache_misses</code></pre> | |
| </details> | |
| </dd> | |
| <dt id="tinytroupe.control.checkpoint"><code class="name flex"> | |
| <span>def <span class="ident">checkpoint</span></span>(<span>id='default')</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Saves current simulation state.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def checkpoint(id="default"): | |
| """ | |
| Saves current simulation state. | |
| """ | |
| _simulation(id).checkpoint()</code></pre> | |
| </details> | |
| </dd> | |
| <dt id="tinytroupe.control.current_simulation"><code class="name flex"> | |
| <span>def <span class="ident">current_simulation</span></span>(<span>)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Returns the current simulation.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def current_simulation(): | |
| """ | |
| Returns the current simulation. | |
| """ | |
| global _current_simulation_id | |
| if _current_simulation_id is not None: | |
| return _simulation(_current_simulation_id) | |
| else: | |
| return None</code></pre> | |
| </details> | |
| </dd> | |
| <dt id="tinytroupe.control.end"><code class="name flex"> | |
| <span>def <span class="ident">end</span></span>(<span>id='default')</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Marks the end of the simulation being controlled.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def end(id="default"): | |
| """ | |
| Marks the end of the simulation being controlled. | |
| """ | |
| global _current_simulation_id | |
| _simulation(id).end() | |
| _current_simulation_id = None</code></pre> | |
| </details> | |
| </dd> | |
| <dt id="tinytroupe.control.reset"><code class="name flex"> | |
| <span>def <span class="ident">reset</span></span>(<span>)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Resets the entire simulation control state.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def reset(): | |
| """ | |
| Resets the entire simulation control state. | |
| """ | |
| global _current_simulations, _current_simulation_id | |
| _current_simulations = {"default": None} | |
| # TODO Currently, only one simulation can be started at a time. In future versions, this should be | |
| # changed to allow multiple simulations to be started at the same time, e.g., for fast | |
| # analyses through parallelization. | |
| _current_simulation_id = None</code></pre> | |
| </details> | |
| </dd> | |
| <dt id="tinytroupe.control.transactional"><code class="name flex"> | |
| <span>def <span class="ident">transactional</span></span>(<span>parallel=False)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>A helper decorator that makes a function simulation-transactional.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def transactional(parallel=False): | |
| """ | |
| A helper decorator that makes a function simulation-transactional. | |
| """ | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| obj_under_transaction = args[0] | |
| simulation = current_simulation() | |
| obj_sim_id = obj_under_transaction.simulation_id if hasattr(obj_under_transaction, 'simulation_id') else None | |
| logger.debug(f"-----------------------------------------> Transaction: {func.__name__} with args {args[1:]} and kwargs {kwargs} under simulation {obj_sim_id}, parallel={parallel}.") | |
| parallel_id = str(threading.current_thread()) | |
| transaction = Transaction(obj_under_transaction, simulation, func, *args, **kwargs) | |
| result = transaction.execute(begin_parallel=parallel, parallel_id=parallel_id) | |
| return result | |
| return wrapper | |
| return decorator</code></pre> | |
| </details> | |
| </dd> | |
| </dl> | |
| </section> | |
| <section> | |
| <h2 class="section-title" id="header-classes">Classes</h2> | |
| <dl> | |
| <dt id="tinytroupe.control.CacheOutOfSync"><code class="flex name class"> | |
| <span>class <span class="ident">CacheOutOfSync</span></span> | |
| <span>(</span><span>*args, **kwargs)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Raised when a cached and the corresponding freshly executed elements are out of sync.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">class CacheOutOfSync(Exception): | |
| """ | |
| Raised when a cached and the corresponding freshly executed elements are out of sync. | |
| """ | |
| pass</code></pre> | |
| </details> | |
| <h3>Ancestors</h3> | |
| <ul class="hlist"> | |
| <li>builtins.Exception</li> | |
| <li>builtins.BaseException</li> | |
| </ul> | |
| </dd> | |
| <dt id="tinytroupe.control.ExecutionCached"><code class="flex name class"> | |
| <span>class <span class="ident">ExecutionCached</span></span> | |
| <span>(</span><span>*args, **kwargs)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Raised when a proposed execution is already cached.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">class ExecutionCached(Exception): | |
| """ | |
| Raised when a proposed execution is already cached. | |
| """ | |
| pass</code></pre> | |
| </details> | |
| <h3>Ancestors</h3> | |
| <ul class="hlist"> | |
| <li>builtins.Exception</li> | |
| <li>builtins.BaseException</li> | |
| </ul> | |
| </dd> | |
| <dt id="tinytroupe.control.Simulation"><code class="flex name class"> | |
| <span>class <span class="ident">Simulation</span></span> | |
| <span>(</span><span>id='default', cached_trace: list = None)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">class Simulation: | |
| STATUS_STOPPED = "stopped" | |
| STATUS_STARTED = "started" | |
| def __init__(self, id="default", cached_trace:list=None): | |
| self.id = id | |
| self.agents = [] | |
| self.name_to_agent = {} # {agent_name: agent, ...} | |
| self.environments = [] | |
| self.factories = [] # e.g., TinyPersonFactory instances | |
| self.name_to_factory = {} # {factory_name: factory, ...} | |
| self.name_to_environment = {} # {environment_name: environment, ...} | |
| self.status = Simulation.STATUS_STOPPED | |
| self.cache_path = f"./tinytroupe-{id}.cache.json" # default cache path | |
| # should we always automatically checkpoint at the every transaction? | |
| self.auto_checkpoint = False | |
| # whether there are changes not yet saved to the cache file | |
| self.has_unsaved_cache_changes = False | |
| # whether the agent is under a transaction or not, used for managing | |
| # simulation caching later | |
| self._under_transaction = {None: False} | |
| # whether the agent is under a parallel transactions segment or not, used for managing | |
| # simulation caching later | |
| self._under_parallel_transactions = False | |
| # Cache chain mechanism. | |
| # | |
| # stores a list of simulation states. | |
| # Each state is a tuple (prev_node_hash, event_hash, event_output, state), where prev_node_hash is a hash of the previous node in this chain, | |
| # if any, event_hash is a hash of the event that triggered the transition to this state, if any, event_output is the output of the event, | |
| # if any, and state is the actual complete state that resulted. | |
| if cached_trace is None: | |
| self.cached_trace = [] | |
| else: | |
| self.cached_trace = cached_trace | |
| self.cache_misses = 0 | |
| self.cache_hits = 0 | |
| # Execution chain mechanism. | |
| # | |
| # The actual, current, execution trace. Each state is a tuple (prev_node_hash, event_hash, state), where prev_node_hash is a hash | |
| # of the previous node in this chain, if any, event_hash is a hash of the event that triggered the transition to this state, if any, | |
| # event_output is the output of the event, if any, and state is the actual complete state that resulted. | |
| self.execution_trace = [] | |
| def begin(self, cache_path:str=None, auto_checkpoint:bool=False): | |
| """ | |
| Marks the start of the simulation being controlled. | |
| Args: | |
| cache_path (str): The path to the cache file. If not specified, | |
| defaults to the default cache path defined in the class. | |
| auto_checkpoint (bool, optional): Whether to automatically checkpoint at the end of each transaction. Defaults to False. | |
| """ | |
| logger.debug(f"Starting simulation, cache_path={cache_path}, auto_checkpoint={auto_checkpoint}.") | |
| # local import to avoid circular dependencies | |
| from tinytroupe.agent import TinyPerson | |
| from tinytroupe.environment import TinyWorld | |
| from tinytroupe.factory.tiny_factory import TinyFactory | |
| from tinytroupe.factory.tiny_person_factory import TinyPersonFactory | |
| if self.status == Simulation.STATUS_STOPPED: | |
| self.status = Simulation.STATUS_STARTED | |
| else: | |
| raise ValueError("Simulation is already started.") | |
| if cache_path is not None: | |
| self.cache_path = cache_path | |
| # should we automatically checkpoint? | |
| self.auto_checkpoint = auto_checkpoint | |
| # clear the agents, environments and other simulated entities, we'll track them from now on | |
| TinyPerson.clear_agents() | |
| TinyWorld.clear_environments() | |
| TinyFactory.clear_factories() | |
| TinyPersonFactory.clear_factories() | |
| # All automated fresh ids will start from 0 again for this simulation | |
| utils.reset_fresh_id() | |
| # load the cache file, if any | |
| if self.cache_path is not None: | |
| self._load_cache_file(self.cache_path) | |
| def end(self): | |
| """ | |
| Marks the end of the simulation being controlled. | |
| """ | |
| logger.debug("Ending simulation.") | |
| if self.status == Simulation.STATUS_STARTED: | |
| self.status = Simulation.STATUS_STOPPED | |
| self.checkpoint() | |
| else: | |
| raise ValueError("Simulation is already stopped.") | |
| def checkpoint(self): | |
| """ | |
| Saves current simulation trace to a file. | |
| """ | |
| logger.debug("Checkpointing simulation state...") | |
| # save the cache file | |
| if self.has_unsaved_cache_changes: | |
| self._save_cache_file(self.cache_path) | |
| else: | |
| logger.debug("No unsaved cache changes to save to file.") | |
| def add_agent(self, agent): | |
| """ | |
| Adds an agent to the simulation. | |
| """ | |
| if agent.name in self.name_to_agent: | |
| raise ValueError(f"Agent names must be unique, but '{agent.name}' is already defined.") | |
| agent.simulation_id = self.id | |
| self.agents.append(agent) | |
| self.name_to_agent[agent.name] = agent | |
| def add_environment(self, environment): | |
| """ | |
| Adds an environment to the simulation. | |
| """ | |
| if environment.name in self.name_to_environment: | |
| raise ValueError(f"Environment names must be unique, but '{environment.name}' is already defined.") | |
| environment.simulation_id = self.id | |
| self.environments.append(environment) | |
| self.name_to_environment[environment.name] = environment | |
| def add_factory(self, factory): | |
| """ | |
| Adds a factory to the simulation. | |
| """ | |
| if factory.name in self.name_to_factory: | |
| raise ValueError(f"Factory names must be unique, but '{factory.name}' is already defined.") | |
| factory.simulation_id = self.id | |
| self.factories.append(factory) | |
| self.name_to_factory[factory.name] = factory | |
| ################################################################################################### | |
| # Cache and execution chain mechanisms | |
| ################################################################################################### | |
| def _execution_trace_position(self) -> int: | |
| """ | |
| Returns the current position in the execution trace, or -1 if the execution trace is empty. | |
| """ | |
| return len(self.execution_trace) - 1 | |
| def _function_call_hash(self, function_name, *args, **kwargs) -> int: | |
| """ | |
| Computes the hash of the given function call. | |
| """ | |
| # if functions are passed as arguments to the function, there's the problem that their | |
| # string representation always changes due to memory position (e.g., <function my_function at 0x7f8d1a7b7d30>). | |
| # so we need to remove the changing suffix in those cases, while preserving the function name if it exists. | |
| # positional arguments | |
| # covnerts to a list of string representations first | |
| args_str = list(map(str, args)) | |
| for i, arg in enumerate(args): | |
| if callable(arg): | |
| args_str[i] = arg.__name__ | |
| # keyword arguments | |
| # converts to a list of string representations first | |
| kwargs_str = {k: str(v) for k, v in kwargs.items()} | |
| for k, v in kwargs.items(): | |
| if callable(v): | |
| kwargs_str[k] = v.__name__ | |
| # then, convert to a single string, to obtain a unique hash | |
| event = str((function_name, args_str, kwargs_str)) | |
| # TODO actually compute a short hash of the event string, e.g., using SHA256 ? | |
| # event_hash = utils.custom_hash(event) | |
| return event | |
| def _skip_execution_with_cache(self): | |
| """ | |
| Skips the current execution, assuming there's a cached state at the same position. | |
| """ | |
| assert len(self.cached_trace) > self._execution_trace_position() + 1, "There's no cached state at the current execution position." | |
| self.execution_trace.append(self.cached_trace[self._execution_trace_position() + 1]) | |
| def _is_transaction_event_cached(self, event_hash, parallel=False) -> bool: | |
| """ | |
| Checks whether the given event hash matches the corresponding cached one, if any. | |
| If there's no corresponding cached state, returns True. | |
| """ | |
| if not parallel: | |
| # there's cache that could be used | |
| if len(self.cached_trace) > self._execution_trace_position() + 1: | |
| if self._execution_trace_position() >= -1: | |
| # here's a graphical depiction of the logic: | |
| # | |
| # Cache: c0:(c_prev_node_hash_0, c_event_hash_0, _, c_state_0) ------------------> c1:(c_prev_node_hash_1, c_event_hash_1, _, c_state_1) -> ... | |
| # Execution: e0:(e_prev_node_hash_0, e_event_hash_0, _, e_state_0) -<being computed>-> e1:(e_prev_node_hash_1, <being computed>, <being computed>, <being computed>) | |
| # position = 0 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | |
| # | |
| # Must satisfy: | |
| # - event_hash == c_event_hash_1 | |
| # - hash(e0) == c_prev_node_hash_1 | |
| try: | |
| event_hash_match = event_hash == self.cached_trace[self._execution_trace_position() + 1][1] | |
| except Exception as e: | |
| logger.error(f"Error while checking event hash match: {e}") | |
| event_hash_match = False | |
| prev_node_match = True # TODO implement real check | |
| return event_hash_match and prev_node_match | |
| else: | |
| raise ValueError("Execution trace position is invalid, must be >= -1, but is ", self._execution_trace_position()) | |
| else: # no cache to use | |
| return False | |
| else: # parallel | |
| if len(self.cached_trace) >= self._execution_trace_position(): | |
| if self._execution_trace_position() >= 0: | |
| # parallel stores ignore order, so we need to check instead whether the event hash is a key in the parallel store, | |
| # regardless of the order of the events generated the data therein. | |
| if isinstance(self.cached_trace[self._execution_trace_position()], dict): | |
| event_hash_match = event_hash in self.cached_trace[self._execution_trace_position()].keys() | |
| else: | |
| event_hash_match = False | |
| prev_node_match = True # TODO implement real check | |
| return event_hash_match and prev_node_match | |
| else: | |
| raise ValueError("Execution trace position is invalid, must be >= 0, but is ", self._execution_trace_position()) | |
| def _get_cached_parallel_value(self, event_hash, key): | |
| parallel_store = self.cached_trace[self._execution_trace_position()] | |
| value = parallel_store[event_hash][key] | |
| return value | |
| def _drop_cached_trace_suffix(self): | |
| """ | |
| Drops the cached trace suffix starting at the current execution trace position. This effectively | |
| refreshes the cache to the current execution state and starts building a new cache from there. | |
| """ | |
| self.cached_trace = self.cached_trace[:self._execution_trace_position()+1] | |
| def _add_to_execution_trace(self, state: dict, event_hash: int, event_output, parallel=False): | |
| """ | |
| Adds a state to the execution_trace list and computes the appropriate hash. | |
| The computed hash is compared to the hash of the cached trace at the same position, | |
| and if they don't match, the execution is aborted. Similarly, the event_hash is compared | |
| to the hash of the event in the cached trace at the same position, and if they don't match, the execution | |
| is aborted. | |
| """ | |
| # Compute the hash of the previous execution pair, if any | |
| previous_hash = None | |
| if not parallel: | |
| # Create a tuple of (hash, state) and append it to the execution_trace list | |
| self.execution_trace.append((previous_hash, event_hash, event_output, state)) | |
| else: | |
| with concurrent_execution_lock: | |
| # state is not stored in parallel segments, only outputs | |
| self.execution_trace[-1][event_hash] = {"prev_node_hash": previous_hash, | |
| "encoded_output": event_output} | |
| def _add_to_cache_trace(self, state: dict, event_hash: int, event_output, parallel=False): | |
| """ | |
| Adds a state to the cached_trace list and computes the appropriate hash. | |
| """ | |
| # Compute the hash of the previous cached pair, if any | |
| previous_hash = None | |
| if self.cached_trace: | |
| previous_hash = utils.custom_hash(self.cached_trace[-1]) | |
| if not parallel: | |
| # Create a tuple of (hash, state) and append it to the cached_trace list | |
| self.cached_trace.append((previous_hash, event_hash, event_output, state)) | |
| else: | |
| with concurrent_execution_lock: | |
| # state is not stored in parallel segments, only outputs | |
| self.cached_trace[-1][event_hash] = {"prev_node_hash": previous_hash, | |
| "encoded_output": event_output} | |
| self.has_unsaved_cache_changes = True | |
| def _load_cache_file(self, cache_path:str): | |
| """ | |
| Loads the cache file from the given path. | |
| """ | |
| try: | |
| self.cached_trace = json.load(open(cache_path, "r")) | |
| except FileNotFoundError: | |
| logger.info(f"Cache file not found on path: {cache_path}.") | |
| self.cached_trace = [] | |
| def _save_cache_file(self, cache_path:str): | |
| """ | |
| Saves the cache file to the given path. Always overwrites. | |
| """ | |
| logger.debug(f"Now saving cache file to {cache_path}.") | |
| try: | |
| # Create a temporary file | |
| with tempfile.NamedTemporaryFile('w', delete=False) as temp: | |
| json.dump(self.cached_trace, temp, indent=4) | |
| # Replace the original file with the temporary file | |
| os.replace(temp.name, cache_path) | |
| except Exception as e: | |
| traceback_string = ''.join(traceback.format_tb(e.__traceback__)) | |
| logger.error(f"An error occurred while saving the cache file: {e}\nTraceback:\n{traceback_string}") | |
| self.has_unsaved_cache_changes = False | |
| ################################################################################################### | |
| # Transactional control | |
| ################################################################################################### | |
| # | |
| # Regular sequential transactions | |
| # | |
| def begin_transaction(self, id=None): | |
| """ | |
| Starts a transaction. | |
| """ | |
| with concurrent_execution_lock: | |
| self._under_transaction[id] = True | |
| self._clear_communications_buffers() # TODO <---------------------------------------------------------------- | |
| def end_transaction(self, id=None): | |
| """ | |
| Ends a transaction. | |
| """ | |
| with concurrent_execution_lock: | |
| self._under_transaction[id] = False | |
| def is_under_transaction(self, id=None): | |
| """ | |
| Checks if the agent is under a transaction. | |
| """ | |
| with concurrent_execution_lock: | |
| return self._under_transaction.get(id, False) | |
| def _clear_communications_buffers(self): | |
| """ | |
| Cleans the communications buffers of all agents and environments. | |
| """ | |
| for agent in self.agents: | |
| agent.clear_communications_buffer() | |
| for environment in self.environments: | |
| environment.clear_communications_buffer() | |
| # | |
| # Parallel transactions | |
| # | |
| def begin_parallel_transactions(self): | |
| """ | |
| Starts parallel transactions. | |
| """ | |
| with concurrent_execution_lock: | |
| self._under_parallel_transactions = True | |
| # add a new parallel segment to the execution and cache traces | |
| self.execution_trace.append({}) | |
| self.cached_trace.append({}) | |
| def end_parallel_transactions(self): | |
| """ | |
| Ends parallel transactions. | |
| """ | |
| self._under_parallel_transactions = False | |
| def is_under_parallel_transactions(self): | |
| """ | |
| Checks if the agent is under parallel transactions. | |
| """ | |
| return self._under_parallel_transactions | |
| ################################################################################################### | |
| # Simulation state handling | |
| ################################################################################################### | |
| def _encode_simulation_state(self) -> dict: | |
| """ | |
| Encodes the current simulation state, including agents, environments, and other | |
| relevant information. | |
| """ | |
| state = {} | |
| # Encode agents | |
| state["agents"] = [] | |
| for agent in self.agents: | |
| state["agents"].append(agent.encode_complete_state()) | |
| # Encode environments | |
| state["environments"] = [] | |
| for environment in self.environments: | |
| state["environments"].append(environment.encode_complete_state()) | |
| # Encode factories | |
| state["factories"] = [] | |
| for factory in self.factories: | |
| state["factories"].append(factory.encode_complete_state()) | |
| return state | |
| def _decode_simulation_state(self, state: dict): | |
| """ | |
| Decodes the given simulation state, including agents, environments, and other | |
| relevant information. | |
| Args: | |
| state (dict): The state to decode. | |
| """ | |
| # local import to avoid circular dependencies | |
| from tinytroupe.agent import TinyPerson | |
| from tinytroupe.environment import TinyWorld | |
| logger.debug(f"Decoding simulation state: {state['factories']}") | |
| logger.debug(f"Registered factories: {self.name_to_factory}") | |
| logger.debug(f"Registered agents: {self.name_to_agent}") | |
| logger.debug(f"Registered environments: {self.name_to_environment}") | |
| # Decode factories | |
| for factory_state in state["factories"]: | |
| factory = self.name_to_factory[factory_state["name"]] | |
| factory.decode_complete_state(factory_state) | |
| # Decode environments | |
| ###self.environments = [] | |
| for environment_state in state["environments"]: | |
| try: | |
| environment = self.name_to_environment[environment_state["name"]] | |
| environment.decode_complete_state(environment_state) | |
| if TinyWorld.communication_display: | |
| environment.pop_and_display_latest_communications() | |
| except Exception as e: | |
| raise ValueError(f"Environment {environment_state['name']} is not in the simulation, thus cannot be decoded there.") from e | |
| # Decode agents (if they were not already decoded by the environment) | |
| ####self.agents = [] | |
| for agent_state in state["agents"]: | |
| try: | |
| agent = self.name_to_agent[agent_state["name"]] | |
| agent.decode_complete_state(agent_state) | |
| # The agent has not yet been decoded because it is not in any environment. So, decode it. | |
| if agent.environment is None: | |
| if TinyPerson.communication_display: | |
| agent.pop_and_display_latest_communications() | |
| except Exception as e: | |
| raise ValueError(f"Agent {agent_state['name']} is not in the simulation, thus cannot be decoded there.") from e </code></pre> | |
| </details> | |
| <h3>Class variables</h3> | |
| <dl> | |
| <dt id="tinytroupe.control.Simulation.STATUS_STARTED"><code class="name">var <span class="ident">STATUS_STARTED</span></code></dt> | |
| <dd> | |
| <div class="desc"></div> | |
| </dd> | |
| <dt id="tinytroupe.control.Simulation.STATUS_STOPPED"><code class="name">var <span class="ident">STATUS_STOPPED</span></code></dt> | |
| <dd> | |
| <div class="desc"></div> | |
| </dd> | |
| </dl> | |
| <h3>Methods</h3> | |
| <dl> | |
| <dt id="tinytroupe.control.Simulation.add_agent"><code class="name flex"> | |
| <span>def <span class="ident">add_agent</span></span>(<span>self, agent)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Adds an agent to the simulation.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def add_agent(self, agent): | |
| """ | |
| Adds an agent to the simulation. | |
| """ | |
| if agent.name in self.name_to_agent: | |
| raise ValueError(f"Agent names must be unique, but '{agent.name}' is already defined.") | |
| agent.simulation_id = self.id | |
| self.agents.append(agent) | |
| self.name_to_agent[agent.name] = agent</code></pre> | |
| </details> | |
| </dd> | |
| <dt id="tinytroupe.control.Simulation.add_environment"><code class="name flex"> | |
| <span>def <span class="ident">add_environment</span></span>(<span>self, environment)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Adds an environment to the simulation.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def add_environment(self, environment): | |
| """ | |
| Adds an environment to the simulation. | |
| """ | |
| if environment.name in self.name_to_environment: | |
| raise ValueError(f"Environment names must be unique, but '{environment.name}' is already defined.") | |
| environment.simulation_id = self.id | |
| self.environments.append(environment) | |
| self.name_to_environment[environment.name] = environment</code></pre> | |
| </details> | |
| </dd> | |
| <dt id="tinytroupe.control.Simulation.add_factory"><code class="name flex"> | |
| <span>def <span class="ident">add_factory</span></span>(<span>self, factory)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Adds a factory to the simulation.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def add_factory(self, factory): | |
| """ | |
| Adds a factory to the simulation. | |
| """ | |
| if factory.name in self.name_to_factory: | |
| raise ValueError(f"Factory names must be unique, but '{factory.name}' is already defined.") | |
| factory.simulation_id = self.id | |
| self.factories.append(factory) | |
| self.name_to_factory[factory.name] = factory</code></pre> | |
| </details> | |
| </dd> | |
| <dt id="tinytroupe.control.Simulation.begin"><code class="name flex"> | |
| <span>def <span class="ident">begin</span></span>(<span>self, cache_path: str = None, auto_checkpoint: bool = False)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Marks the start of the simulation being controlled.</p> | |
| <h2 id="args">Args</h2> | |
| <dl> | |
| <dt><strong><code>cache_path</code></strong> : <code>str</code></dt> | |
| <dd>The path to the cache file. If not specified, | |
| defaults to the default cache path defined in the class.</dd> | |
| <dt><strong><code>auto_checkpoint</code></strong> : <code>bool</code>, optional</dt> | |
| <dd>Whether to automatically checkpoint at the end of each transaction. Defaults to False.</dd> | |
| </dl></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def begin(self, cache_path:str=None, auto_checkpoint:bool=False): | |
| """ | |
| Marks the start of the simulation being controlled. | |
| Args: | |
| cache_path (str): The path to the cache file. If not specified, | |
| defaults to the default cache path defined in the class. | |
| auto_checkpoint (bool, optional): Whether to automatically checkpoint at the end of each transaction. Defaults to False. | |
| """ | |
| logger.debug(f"Starting simulation, cache_path={cache_path}, auto_checkpoint={auto_checkpoint}.") | |
| # local import to avoid circular dependencies | |
| from tinytroupe.agent import TinyPerson | |
| from tinytroupe.environment import TinyWorld | |
| from tinytroupe.factory.tiny_factory import TinyFactory | |
| from tinytroupe.factory.tiny_person_factory import TinyPersonFactory | |
| if self.status == Simulation.STATUS_STOPPED: | |
| self.status = Simulation.STATUS_STARTED | |
| else: | |
| raise ValueError("Simulation is already started.") | |
| if cache_path is not None: | |
| self.cache_path = cache_path | |
| # should we automatically checkpoint? | |
| self.auto_checkpoint = auto_checkpoint | |
| # clear the agents, environments and other simulated entities, we'll track them from now on | |
| TinyPerson.clear_agents() | |
| TinyWorld.clear_environments() | |
| TinyFactory.clear_factories() | |
| TinyPersonFactory.clear_factories() | |
| # All automated fresh ids will start from 0 again for this simulation | |
| utils.reset_fresh_id() | |
| # load the cache file, if any | |
| if self.cache_path is not None: | |
| self._load_cache_file(self.cache_path)</code></pre> | |
| </details> | |
| </dd> | |
| <dt id="tinytroupe.control.Simulation.begin_parallel_transactions"><code class="name flex"> | |
| <span>def <span class="ident">begin_parallel_transactions</span></span>(<span>self)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Starts parallel transactions.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def begin_parallel_transactions(self): | |
| """ | |
| Starts parallel transactions. | |
| """ | |
| with concurrent_execution_lock: | |
| self._under_parallel_transactions = True | |
| # add a new parallel segment to the execution and cache traces | |
| self.execution_trace.append({}) | |
| self.cached_trace.append({})</code></pre> | |
| </details> | |
| </dd> | |
| <dt id="tinytroupe.control.Simulation.begin_transaction"><code class="name flex"> | |
| <span>def <span class="ident">begin_transaction</span></span>(<span>self, id=None)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Starts a transaction.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def begin_transaction(self, id=None): | |
| """ | |
| Starts a transaction. | |
| """ | |
| with concurrent_execution_lock: | |
| self._under_transaction[id] = True | |
| self._clear_communications_buffers() # TODO <----------------------------------------------------------------</code></pre> | |
| </details> | |
| </dd> | |
| <dt id="tinytroupe.control.Simulation.checkpoint"><code class="name flex"> | |
| <span>def <span class="ident">checkpoint</span></span>(<span>self)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Saves current simulation trace to a file.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def checkpoint(self): | |
| """ | |
| Saves current simulation trace to a file. | |
| """ | |
| logger.debug("Checkpointing simulation state...") | |
| # save the cache file | |
| if self.has_unsaved_cache_changes: | |
| self._save_cache_file(self.cache_path) | |
| else: | |
| logger.debug("No unsaved cache changes to save to file.")</code></pre> | |
| </details> | |
| </dd> | |
| <dt id="tinytroupe.control.Simulation.end"><code class="name flex"> | |
| <span>def <span class="ident">end</span></span>(<span>self)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Marks the end of the simulation being controlled.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def end(self): | |
| """ | |
| Marks the end of the simulation being controlled. | |
| """ | |
| logger.debug("Ending simulation.") | |
| if self.status == Simulation.STATUS_STARTED: | |
| self.status = Simulation.STATUS_STOPPED | |
| self.checkpoint() | |
| else: | |
| raise ValueError("Simulation is already stopped.")</code></pre> | |
| </details> | |
| </dd> | |
| <dt id="tinytroupe.control.Simulation.end_parallel_transactions"><code class="name flex"> | |
| <span>def <span class="ident">end_parallel_transactions</span></span>(<span>self)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Ends parallel transactions.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def end_parallel_transactions(self): | |
| """ | |
| Ends parallel transactions. | |
| """ | |
| self._under_parallel_transactions = False</code></pre> | |
| </details> | |
| </dd> | |
| <dt id="tinytroupe.control.Simulation.end_transaction"><code class="name flex"> | |
| <span>def <span class="ident">end_transaction</span></span>(<span>self, id=None)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Ends a transaction.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def end_transaction(self, id=None): | |
| """ | |
| Ends a transaction. | |
| """ | |
| with concurrent_execution_lock: | |
| self._under_transaction[id] = False</code></pre> | |
| </details> | |
| </dd> | |
| <dt id="tinytroupe.control.Simulation.is_under_parallel_transactions"><code class="name flex"> | |
| <span>def <span class="ident">is_under_parallel_transactions</span></span>(<span>self)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Checks if the agent is under parallel transactions.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def is_under_parallel_transactions(self): | |
| """ | |
| Checks if the agent is under parallel transactions. | |
| """ | |
| return self._under_parallel_transactions</code></pre> | |
| </details> | |
| </dd> | |
| <dt id="tinytroupe.control.Simulation.is_under_transaction"><code class="name flex"> | |
| <span>def <span class="ident">is_under_transaction</span></span>(<span>self, id=None)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Checks if the agent is under a transaction.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def is_under_transaction(self, id=None): | |
| """ | |
| Checks if the agent is under a transaction. | |
| """ | |
| with concurrent_execution_lock: | |
| return self._under_transaction.get(id, False)</code></pre> | |
| </details> | |
| </dd> | |
| </dl> | |
| </dd> | |
| <dt id="tinytroupe.control.SkipTransaction"><code class="flex name class"> | |
| <span>class <span class="ident">SkipTransaction</span></span> | |
| <span>(</span><span>*args, **kwargs)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"><p>Common base class for all non-exit exceptions.</p></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">class SkipTransaction(Exception): | |
| pass</code></pre> | |
| </details> | |
| <h3>Ancestors</h3> | |
| <ul class="hlist"> | |
| <li>builtins.Exception</li> | |
| <li>builtins.BaseException</li> | |
| </ul> | |
| </dd> | |
| <dt id="tinytroupe.control.Transaction"><code class="flex name class"> | |
| <span>class <span class="ident">Transaction</span></span> | |
| <span>(</span><span>obj_under_transaction, simulation, function, *args, **kwargs)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">class Transaction: | |
| def __init__(self, obj_under_transaction, simulation, function, *args, **kwargs): | |
| # local import to avoid circular dependencies | |
| from tinytroupe.agent import TinyPerson | |
| from tinytroupe.environment import TinyWorld | |
| from tinytroupe.factory.tiny_factory import TinyFactory | |
| self.obj_under_transaction = obj_under_transaction | |
| self.simulation = simulation | |
| self.function_name = function.__name__ | |
| self.function = function | |
| self.args = args | |
| self.kwargs = kwargs | |
| # | |
| # If we have an ongoing simulation, set the simulation id of the object under transaction if it is not already set. | |
| # | |
| if simulation is not None: | |
| if hasattr(obj_under_transaction, 'simulation_id') and obj_under_transaction.simulation_id is not None: | |
| if obj_under_transaction.simulation_id != simulation.id: | |
| raise ValueError(f"Object {obj_under_transaction} is already captured by a different simulation (id={obj_under_transaction.simulation_id}), \ | |
| and cannot be captured by simulation id={simulation.id}.") | |
| logger.debug(f">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Object {obj_under_transaction} is already captured by simulation {simulation.id}.") | |
| else: | |
| # if is a TinyPerson, add the agent to the simulation | |
| if isinstance(obj_under_transaction, TinyPerson): | |
| simulation.add_agent(obj_under_transaction) | |
| logger.debug(f">>>>>>>>>>>>>>>>>>>>>>> Added agent {obj_under_transaction} to simulation {simulation.id}.") | |
| # if is a TinyWorld, add the environment to the simulation | |
| elif isinstance(obj_under_transaction, TinyWorld): | |
| simulation.add_environment(obj_under_transaction) | |
| # if is a TinyFactory, add the factory to the simulation | |
| elif isinstance(obj_under_transaction, TinyFactory): | |
| simulation.add_factory(obj_under_transaction) | |
| logger.debug(f">>>>>>>>>>>>>>>>>>>>>>> Added factory {obj_under_transaction} to simulation {simulation.id}.") | |
| else: | |
| raise ValueError(f"Object {obj_under_transaction} (type = {type(obj_under_transaction)}) is not a TinyPerson or TinyWorld instance, and cannot be captured by the simulation.") | |
| def execute(self, begin_parallel=False, parallel_id=None): | |
| output = None | |
| # Transaction caching will only operate if there is a simulation and it is started | |
| if self.simulation is None or self.simulation.status == Simulation.STATUS_STOPPED: | |
| # Compute the function and return it, no caching, since the simulation is not started | |
| output = self.function(*self.args, **self.kwargs) | |
| elif self.simulation.status == Simulation.STATUS_STARTED: | |
| # Compute the event hash | |
| event_hash = self.simulation._function_call_hash(self.function_name, *self.args, **self.kwargs) | |
| # Sequential and parallel transactions are handled in different ways | |
| if begin_parallel: | |
| self.simulation.begin_parallel_transactions() | |
| # CACHED? Check if the event hash is in the cache | |
| if self.simulation._is_transaction_event_cached(event_hash, | |
| parallel=self.simulation.is_under_parallel_transactions()): | |
| self.simulation.cache_hits += 1 | |
| # Restore the full state and return the cached output | |
| logger.info(f"Skipping execution of {self.function_name} with args {self.args} and kwargs {self.kwargs} because it is already cached.") | |
| # SEQUENTIAL | |
| if not self.simulation.is_under_parallel_transactions(): | |
| self.simulation._skip_execution_with_cache() | |
| state = self.simulation.cached_trace[self.simulation._execution_trace_position()][3] # state | |
| self.simulation._decode_simulation_state(state) | |
| # Output encoding/decoding is used to preserve references to TinyPerson and TinyWorld instances | |
| # mainly. Scalar values (int, float, str, bool) and composite values (list, dict) are | |
| # encoded/decoded as is. | |
| encoded_output = self.simulation.cached_trace[self.simulation._execution_trace_position()][2] # output | |
| output = self._decode_function_output(encoded_output) | |
| # PARALLEL | |
| else: # is under parallel transactions | |
| # in parallel segments, state is not restored, only outputs | |
| encoded_output = self.simulation._get_cached_parallel_value(event_hash, "encoded_output") | |
| output = self._decode_function_output(encoded_output) | |
| else: # not cached | |
| if not begin_parallel: | |
| # in case of beginning a parallel segment, we don't want to count it as a cache miss, | |
| # since the segment itself will not be cached, but rather the events within it. | |
| self.simulation.cache_misses += 1 | |
| if not self.simulation.is_under_transaction(id=parallel_id) and not begin_parallel: | |
| # BEGIN SEQUENTIAL TRANSACTION ############################################################### | |
| # | |
| # if this is the beginning of a parallel segment, we don't need to begin a transaction, since | |
| # we want to allow additional transactions within the parallel segment (i.e., one-level reentrancy). | |
| if not begin_parallel: | |
| self.simulation.begin_transaction(id=parallel_id) | |
| # Compute the function and encode the relevant output and simulation state | |
| output = self.function(*self.args, **self.kwargs) | |
| self._save_output_with_simulation_state(event_hash, output) | |
| # END TRANSACTION ################################################################# | |
| if not begin_parallel: | |
| self.simulation.end_transaction(id=parallel_id) | |
| else: # already under transaction (thus, now a reentrant transaction) OR beginning a parallel segment | |
| # NOTES: | |
| # | |
| # - Reentrant sequential transactions are not cached, since what matters is the final result of | |
| # the top-level transaction. | |
| # | |
| # - The event that starts the parallel transactions segment WILL NOT itself be cached, since | |
| # it is not part of the parallel segment, but rather the beginning of it. This event will be | |
| # reconstructed during runtime from the parallel events within the segment. | |
| output = self.function(*self.args, **self.kwargs) | |
| if begin_parallel: | |
| self.simulation.end_parallel_transactions() | |
| # execute an ad-hoc Transaction to save the simulation state AFTER the parallel segment is done. | |
| Transaction(self.obj_under_transaction, self.simulation, lambda: True).execute(begin_parallel=False, parallel_id=parallel_id) | |
| else: | |
| raise ValueError(f"Simulation status is invalid at this point: {self.simulation.status}") | |
| # Checkpoint if needed | |
| logger.debug(f"Will attempt to checkpoint simulation state after transaction execution.") | |
| if self.simulation is not None and self.simulation.auto_checkpoint: | |
| logger.debug("Auto-checkpointing simulation state after transaction execution.") | |
| self.simulation.checkpoint() | |
| # after all the transaction is done, return the output - the client will never know about all the complexity we've | |
| # gone through to get here. | |
| return output | |
| def _save_output_with_simulation_state(self, event_hash, output): | |
| encoded_output = self._encode_function_output(output) | |
| state = self.simulation._encode_simulation_state() | |
| # immediately drop the cached trace suffix, since we are starting a new execution from this point on. | |
| # in the case of parallel transactions, this will drop everything _after_ the current parallel segment | |
| # (which itself occupies one position only, with a dictionary of event hashes and their outputs). | |
| self.simulation._drop_cached_trace_suffix() | |
| # Cache the result and update the current execution trace. If this is a parallel transaction, the | |
| # cache and execution traces will be updated in a different way. | |
| self.simulation._add_to_cache_trace(state, event_hash, encoded_output, | |
| parallel=self.simulation.is_under_parallel_transactions()) | |
| self.simulation._add_to_execution_trace(state, event_hash, encoded_output, | |
| parallel=self.simulation.is_under_parallel_transactions()) | |
| def _encode_function_output(self, output) -> dict: | |
| """ | |
| Encodes the given function output. | |
| """ | |
| # local import to avoid circular dependencies | |
| from tinytroupe.agent import TinyPerson | |
| from tinytroupe.environment import TinyWorld | |
| from tinytroupe.factory.tiny_factory import TinyFactory | |
| # if the output is a supported object, encode it | |
| if output is None: | |
| return None | |
| elif isinstance(output, TinyPerson): | |
| return {"type": "TinyPersonRef", "name": output.name} | |
| elif isinstance(output, TinyWorld): | |
| return {"type": "TinyWorldRef", "name": output.name} | |
| elif isinstance(output, TinyFactory): | |
| return {"type": "TinyFactoryRef", "name": output.name} | |
| elif isinstance(output, list): | |
| encoded_list = [] | |
| for item in output: | |
| if isinstance(item, TinyPerson): | |
| encoded_list.append({"type": "TinyPersonRef", "name": item.name}) | |
| elif isinstance(item, TinyWorld): | |
| encoded_list.append({"type": "TinyWorldRef", "name": item.name}) | |
| elif isinstance(item, TinyFactory): | |
| encoded_list.append({"type": "TinyFactoryRef", "name": item.name}) | |
| else: | |
| encoded_list.append({"type": "JSON", "value": item}) | |
| return {"type": "List", "value": encoded_list} | |
| elif isinstance(output, (int, float, str, bool, dict, tuple)): | |
| return {"type": "JSON", "value": output} | |
| else: | |
| raise ValueError(f"Unsupported output type: {type(output)}") | |
| def _decode_function_output(self, encoded_output: dict): | |
| """ | |
| Decodes the given encoded function output. | |
| """ | |
| # local import to avoid circular dependencies | |
| from tinytroupe.agent import TinyPerson | |
| from tinytroupe.environment import TinyWorld | |
| from tinytroupe.factory.tiny_factory import TinyFactory | |
| if encoded_output is None: | |
| return None | |
| elif encoded_output["type"] == "TinyPersonRef": | |
| return TinyPerson.get_agent_by_name(encoded_output["name"]) | |
| elif encoded_output["type"] == "TinyWorldRef": | |
| return TinyWorld.get_environment_by_name(encoded_output["name"]) | |
| elif encoded_output["type"] == "TinyFactoryRef": | |
| return TinyFactory.get_factory_by_name(encoded_output["name"]) | |
| elif encoded_output["type"] == "List": | |
| decoded_list = [] | |
| for item in encoded_output["value"]: | |
| if item["type"] == "TinyPersonRef": | |
| decoded_list.append(TinyPerson.get_agent_by_name(item["name"])) | |
| elif item["type"] == "TinyWorldRef": | |
| decoded_list.append(TinyWorld.get_environment_by_name(item["name"])) | |
| elif item["type"] == "TinyFactoryRef": | |
| decoded_list.append(TinyFactory.get_factory_by_name(item["name"])) | |
| else: | |
| decoded_list.append(item["value"]) | |
| return decoded_list | |
| elif encoded_output["type"] == "JSON": | |
| return encoded_output["value"] | |
| else: | |
| raise ValueError(f"Unsupported output type: {encoded_output['type']}")</code></pre> | |
| </details> | |
| <h3>Methods</h3> | |
| <dl> | |
| <dt id="tinytroupe.control.Transaction.execute"><code class="name flex"> | |
| <span>def <span class="ident">execute</span></span>(<span>self, begin_parallel=False, parallel_id=None)</span> | |
| </code></dt> | |
| <dd> | |
| <div class="desc"></div> | |
| <details class="source"> | |
| <summary> | |
| <span>Expand source code</span> | |
| </summary> | |
| <pre><code class="python">def execute(self, begin_parallel=False, parallel_id=None): | |
| output = None | |
| # Transaction caching will only operate if there is a simulation and it is started | |
| if self.simulation is None or self.simulation.status == Simulation.STATUS_STOPPED: | |
| # Compute the function and return it, no caching, since the simulation is not started | |
| output = self.function(*self.args, **self.kwargs) | |
| elif self.simulation.status == Simulation.STATUS_STARTED: | |
| # Compute the event hash | |
| event_hash = self.simulation._function_call_hash(self.function_name, *self.args, **self.kwargs) | |
| # Sequential and parallel transactions are handled in different ways | |
| if begin_parallel: | |
| self.simulation.begin_parallel_transactions() | |
| # CACHED? Check if the event hash is in the cache | |
| if self.simulation._is_transaction_event_cached(event_hash, | |
| parallel=self.simulation.is_under_parallel_transactions()): | |
| self.simulation.cache_hits += 1 | |
| # Restore the full state and return the cached output | |
| logger.info(f"Skipping execution of {self.function_name} with args {self.args} and kwargs {self.kwargs} because it is already cached.") | |
| # SEQUENTIAL | |
| if not self.simulation.is_under_parallel_transactions(): | |
| self.simulation._skip_execution_with_cache() | |
| state = self.simulation.cached_trace[self.simulation._execution_trace_position()][3] # state | |
| self.simulation._decode_simulation_state(state) | |
| # Output encoding/decoding is used to preserve references to TinyPerson and TinyWorld instances | |
| # mainly. Scalar values (int, float, str, bool) and composite values (list, dict) are | |
| # encoded/decoded as is. | |
| encoded_output = self.simulation.cached_trace[self.simulation._execution_trace_position()][2] # output | |
| output = self._decode_function_output(encoded_output) | |
| # PARALLEL | |
| else: # is under parallel transactions | |
| # in parallel segments, state is not restored, only outputs | |
| encoded_output = self.simulation._get_cached_parallel_value(event_hash, "encoded_output") | |
| output = self._decode_function_output(encoded_output) | |
| else: # not cached | |
| if not begin_parallel: | |
| # in case of beginning a parallel segment, we don't want to count it as a cache miss, | |
| # since the segment itself will not be cached, but rather the events within it. | |
| self.simulation.cache_misses += 1 | |
| if not self.simulation.is_under_transaction(id=parallel_id) and not begin_parallel: | |
| # BEGIN SEQUENTIAL TRANSACTION ############################################################### | |
| # | |
| # if this is the beginning of a parallel segment, we don't need to begin a transaction, since | |
| # we want to allow additional transactions within the parallel segment (i.e., one-level reentrancy). | |
| if not begin_parallel: | |
| self.simulation.begin_transaction(id=parallel_id) | |
| # Compute the function and encode the relevant output and simulation state | |
| output = self.function(*self.args, **self.kwargs) | |
| self._save_output_with_simulation_state(event_hash, output) | |
| # END TRANSACTION ################################################################# | |
| if not begin_parallel: | |
| self.simulation.end_transaction(id=parallel_id) | |
| else: # already under transaction (thus, now a reentrant transaction) OR beginning a parallel segment | |
| # NOTES: | |
| # | |
| # - Reentrant sequential transactions are not cached, since what matters is the final result of | |
| # the top-level transaction. | |
| # | |
| # - The event that starts the parallel transactions segment WILL NOT itself be cached, since | |
| # it is not part of the parallel segment, but rather the beginning of it. This event will be | |
| # reconstructed during runtime from the parallel events within the segment. | |
| output = self.function(*self.args, **self.kwargs) | |
| if begin_parallel: | |
| self.simulation.end_parallel_transactions() | |
| # execute an ad-hoc Transaction to save the simulation state AFTER the parallel segment is done. | |
| Transaction(self.obj_under_transaction, self.simulation, lambda: True).execute(begin_parallel=False, parallel_id=parallel_id) | |
| else: | |
| raise ValueError(f"Simulation status is invalid at this point: {self.simulation.status}") | |
| # Checkpoint if needed | |
| logger.debug(f"Will attempt to checkpoint simulation state after transaction execution.") | |
| if self.simulation is not None and self.simulation.auto_checkpoint: | |
| logger.debug("Auto-checkpointing simulation state after transaction execution.") | |
| self.simulation.checkpoint() | |
| # after all the transaction is done, return the output - the client will never know about all the complexity we've | |
| # gone through to get here. | |
| return output</code></pre> | |
| </details> | |
| </dd> | |
| </dl> | |
| </dd> | |
| </dl> | |
| </section> | |
| </article> | |
| <nav id="sidebar"> | |
| <h1>Index</h1> | |
| <div class="toc"> | |
| <ul></ul> | |
| </div> | |
| <ul id="index"> | |
| <li><h3>Super-module</h3> | |
| <ul> | |
| <li><code><a title="tinytroupe" href="index.html">tinytroupe</a></code></li> | |
| </ul> | |
| </li> | |
| <li><h3><a href="#header-functions">Functions</a></h3> | |
| <ul class="two-column"> | |
| <li><code><a title="tinytroupe.control.begin" href="#tinytroupe.control.begin">begin</a></code></li> | |
| <li><code><a title="tinytroupe.control.cache_hits" href="#tinytroupe.control.cache_hits">cache_hits</a></code></li> | |
| <li><code><a title="tinytroupe.control.cache_misses" href="#tinytroupe.control.cache_misses">cache_misses</a></code></li> | |
| <li><code><a title="tinytroupe.control.checkpoint" href="#tinytroupe.control.checkpoint">checkpoint</a></code></li> | |
| <li><code><a title="tinytroupe.control.current_simulation" href="#tinytroupe.control.current_simulation">current_simulation</a></code></li> | |
| <li><code><a title="tinytroupe.control.end" href="#tinytroupe.control.end">end</a></code></li> | |
| <li><code><a title="tinytroupe.control.reset" href="#tinytroupe.control.reset">reset</a></code></li> | |
| <li><code><a title="tinytroupe.control.transactional" href="#tinytroupe.control.transactional">transactional</a></code></li> | |
| </ul> | |
| </li> | |
| <li><h3><a href="#header-classes">Classes</a></h3> | |
| <ul> | |
| <li> | |
| <h4><code><a title="tinytroupe.control.CacheOutOfSync" href="#tinytroupe.control.CacheOutOfSync">CacheOutOfSync</a></code></h4> | |
| </li> | |
| <li> | |
| <h4><code><a title="tinytroupe.control.ExecutionCached" href="#tinytroupe.control.ExecutionCached">ExecutionCached</a></code></h4> | |
| </li> | |
| <li> | |
| <h4><code><a title="tinytroupe.control.Simulation" href="#tinytroupe.control.Simulation">Simulation</a></code></h4> | |
| <ul class=""> | |
| <li><code><a title="tinytroupe.control.Simulation.STATUS_STARTED" href="#tinytroupe.control.Simulation.STATUS_STARTED">STATUS_STARTED</a></code></li> | |
| <li><code><a title="tinytroupe.control.Simulation.STATUS_STOPPED" href="#tinytroupe.control.Simulation.STATUS_STOPPED">STATUS_STOPPED</a></code></li> | |
| <li><code><a title="tinytroupe.control.Simulation.add_agent" href="#tinytroupe.control.Simulation.add_agent">add_agent</a></code></li> | |
| <li><code><a title="tinytroupe.control.Simulation.add_environment" href="#tinytroupe.control.Simulation.add_environment">add_environment</a></code></li> | |
| <li><code><a title="tinytroupe.control.Simulation.add_factory" href="#tinytroupe.control.Simulation.add_factory">add_factory</a></code></li> | |
| <li><code><a title="tinytroupe.control.Simulation.begin" href="#tinytroupe.control.Simulation.begin">begin</a></code></li> | |
| <li><code><a title="tinytroupe.control.Simulation.begin_parallel_transactions" href="#tinytroupe.control.Simulation.begin_parallel_transactions">begin_parallel_transactions</a></code></li> | |
| <li><code><a title="tinytroupe.control.Simulation.begin_transaction" href="#tinytroupe.control.Simulation.begin_transaction">begin_transaction</a></code></li> | |
| <li><code><a title="tinytroupe.control.Simulation.checkpoint" href="#tinytroupe.control.Simulation.checkpoint">checkpoint</a></code></li> | |
| <li><code><a title="tinytroupe.control.Simulation.end" href="#tinytroupe.control.Simulation.end">end</a></code></li> | |
| <li><code><a title="tinytroupe.control.Simulation.end_parallel_transactions" href="#tinytroupe.control.Simulation.end_parallel_transactions">end_parallel_transactions</a></code></li> | |
| <li><code><a title="tinytroupe.control.Simulation.end_transaction" href="#tinytroupe.control.Simulation.end_transaction">end_transaction</a></code></li> | |
| <li><code><a title="tinytroupe.control.Simulation.is_under_parallel_transactions" href="#tinytroupe.control.Simulation.is_under_parallel_transactions">is_under_parallel_transactions</a></code></li> | |
| <li><code><a title="tinytroupe.control.Simulation.is_under_transaction" href="#tinytroupe.control.Simulation.is_under_transaction">is_under_transaction</a></code></li> | |
| </ul> | |
| </li> | |
| <li> | |
| <h4><code><a title="tinytroupe.control.SkipTransaction" href="#tinytroupe.control.SkipTransaction">SkipTransaction</a></code></h4> | |
| </li> | |
| <li> | |
| <h4><code><a title="tinytroupe.control.Transaction" href="#tinytroupe.control.Transaction">Transaction</a></code></h4> | |
| <ul class=""> | |
| <li><code><a title="tinytroupe.control.Transaction.execute" href="#tinytroupe.control.Transaction.execute">execute</a></code></li> | |
| </ul> | |
| </li> | |
| </ul> | |
| </li> | |
| </ul> | |
| </nav> | |
| </main> | |
| <footer id="footer"> | |
| <p>Generated by <a href="https://pdoc3.github.io/pdoc" title="pdoc: Python API documentation generator"><cite>pdoc</cite> 0.10.0</a>.</p> | |
| </footer> | |
| </body> | |
| </html> |