from typing import Dict, Any from flows.base_flows.atomic import AtomicFlow class MemoryWritingAtomicFlow(AtomicFlow): def __init__(self, **kwargs): super().__init__(**kwargs) self.supported_memories = ["summary"] def _check_input(self, input_data: Dict[str, Any]): assert "memory_files" in input_data, "memory_files not passed to MemoryWritingAtomicFlow" assert any(item in input_data for item in self.supported_memories), "no memories to write" def _call(self, input_data: Dict[str, Any]): if "summary" in input_data: assert "logs" in input_data["memory_files"], "there is summary to write, but no logs file in memory_files" logs_file_location = input_data["memory_files"]["logs"] summary_to_write = input_data["summary"] with open(logs_file_location, 'a') as file: file.write(summary_to_write + "\n") def run( self, input_data: Dict[str, Any] ): self._check_input(input_data) self._call(input_data) response = {} response["MemWrite_output"] = "Memory Write was successful"