from typing import Dict, Any from aiflows.base_flows.atomic import AtomicFlow class MemoryWritingAtomicFlow(AtomicFlow): """This class is used to write memory to memory files. *Input Interface*: - `summary` (str): summary to write to memory file (logs) - `memory_files` (dict): dictionary of memory files to write to. *Output Interface*: - `MemWrite_output` (str): message that whether memory write was successful *Configuration Parameters*: - `input_interface`: the input interface of the atomic flow - `output_interface`: the output interface of the atomic flow """ def __init__(self, **kwargs): """ Initialize the atomic flow. :param kwargs: additional key-value arguments to pass to the atomic flow :type kwargs: Dict[str, Any] """ super().__init__(**kwargs) self.supported_memories = ["summary"] def _check_input(self, input_data: Dict[str, Any]): """ Check whether the input data is valid. :param input_data: the input data to check :type input_data: Dict[str, Any] :raises AssertionError: if memory_files is not passed to MemoryWritingAtomicFlow :raises AssertionError: if input data is not supported by MemoryWritingAtomicFlow """ assert "memory_files" in input_data, "memory_files not passed to MemoryWritingAtomicFlow" assert any(item in input_data for item in self.supported_memories), "no memories to write" def _call(self, input_data: Dict[str, Any]): """ Write memory to memory files. :param input_data: the input data to write :type input_data: Dict[str, Any] :return: the output data :rtype: Dict[str, Any] :raises AssertionError: if logs is not in memory_files """ try: if "summary" in input_data: assert "logs" in input_data["memory_files"], "there is summary to write, but no logs file in memory_files" logs_file_location = input_data["memory_files"]["logs"] summary_to_write = input_data["summary"] with open(logs_file_location, 'a') as file: file.write(summary_to_write + "\n") return {"MemWrite_output": "Memory Write was successful"} except Exception as e: return {"MemWrite_output": str(e)} def run( self, input_data: Dict[str, Any] ): """ Run the atomic flow. :param input_data: the input data to run :type input_data: Dict[str, Any] :return: the output data :rtype: Dict[str, Any] """ self._check_input(input_data) return self._call(input_data)