MemoryWritingFlowModule / MemoryWritingAtomicFlow.py
Tachi67's picture
Upload 6 files
52ab60b verified
from typing import Dict, Any
from aiflows.base_flows.atomic import AtomicFlow
class MemoryWritingAtomicFlow(AtomicFlow):
"""This class is used to write memory to memory files.
*Input Interface*:
- `summary` (str): summary to write to memory file (logs)
- `memory_files` (dict): dictionary of memory files to write to.
*Output Interface*:
- `MemWrite_output` (str): message that whether memory write was successful
*Configuration Parameters*:
- `input_interface`: the input interface of the atomic flow
- `output_interface`: the output interface of the atomic flow
"""
def __init__(self, **kwargs):
"""
Initialize the atomic flow.
:param kwargs: additional key-value arguments to pass to the atomic flow
:type kwargs: Dict[str, Any]
"""
super().__init__(**kwargs)
self.supported_memories = ["summary"]
def _check_input(self, input_data: Dict[str, Any]):
"""
Check whether the input data is valid.
:param input_data: the input data to check
:type input_data: Dict[str, Any]
:raises AssertionError: if memory_files is not passed to MemoryWritingAtomicFlow
:raises AssertionError: if input data is not supported by MemoryWritingAtomicFlow
"""
assert "memory_files" in input_data, "memory_files not passed to MemoryWritingAtomicFlow"
assert any(item in input_data for item in self.supported_memories), "no memories to write"
def _call(self, input_data: Dict[str, Any]):
"""
Write memory to memory files.
:param input_data: the input data to write
:type input_data: Dict[str, Any]
:return: the output data
:rtype: Dict[str, Any]
:raises AssertionError: if logs is not in memory_files
"""
try:
if "summary" in input_data:
assert "logs" in input_data["memory_files"], "there is summary to write, but no logs file in memory_files"
logs_file_location = input_data["memory_files"]["logs"]
summary_to_write = input_data["summary"]
with open(logs_file_location, 'a') as file:
file.write(summary_to_write + "\n")
return {"MemWrite_output": "Memory Write was successful"}
except Exception as e:
return {"MemWrite_output": str(e)}
def run(
self,
input_data: Dict[str, Any]
):
"""
Run the atomic flow.
:param input_data: the input data to run
:type input_data: Dict[str, Any]
:return: the output data
:rtype: Dict[str, Any]
"""
self._check_input(input_data)
return self._call(input_data)