| from typing import Dict, Any | |
| from flows.base_flows.atomic import AtomicFlow | |
| class MemoryWritingAtomicFlow(AtomicFlow): | |
| def __init__(self, **kwargs): | |
| super.__init__(**kwargs) | |
| self.supported_memories = ["summary"] | |
| def _check_input(self, input_data: Dict[str, Any]): | |
| assert "memory_files" in input_data, "memory_files not passed to MemoryWritingAtomicFlow" | |
| assert any(item in input_data for item in self.supported_memories), "no memories to write" | |
| def _call(self, input_data: Dict[str, Any]): | |
| if "summary" in input_data: | |
| assert "logs" in input_data["memory_files"], "there is summary to write, but no logs file in memory_files" | |
| logs_file_location = input_data["memory_files"]["logs"] | |
| summary_to_write = input_data["summary"] | |
| with open(logs_file_location, 'a') as file: | |
| file.write(summary_to_write + "\n") | |
| def run( | |
| self, | |
| input_data: Dict[str, Any] | |
| ): | |
| self._check_input(input_data) | |
| self._call(input_data) |