File size: 2,782 Bytes
8e549ff
 
 
 
 
 
 
 
 
 
 
52ab60b
 
 
 
8e549ff
 
52ab60b
 
 
 
 
8e549ff
 
 
52ab60b
 
 
 
 
 
 
8e549ff
 
 
 
52ab60b
 
 
 
 
 
 
 
8e549ff
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52ab60b
 
 
 
 
 
 
8e549ff
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from typing import Dict, Any
from aiflows.base_flows.atomic import AtomicFlow
class MemoryWritingAtomicFlow(AtomicFlow):
    """This class is used to write memory to memory files.
    
    *Input Interface*:
    - `summary` (str): summary to write to memory file (logs)
    - `memory_files` (dict): dictionary of memory files to write to. 
    
    *Output Interface*:
    - `MemWrite_output` (str): message that whether memory write was successful

    *Configuration Parameters*:
    - `input_interface`: the input interface of the atomic flow
    - `output_interface`: the output interface of the atomic flow
    """
    def __init__(self, **kwargs):
        """
        Initialize the atomic flow.
        :param kwargs: additional key-value arguments to pass to the atomic flow
        :type kwargs: Dict[str, Any]
        """
        super().__init__(**kwargs)
        self.supported_memories = ["summary"]
    def _check_input(self, input_data: Dict[str, Any]):
        """
        Check whether the input data is valid.
        :param input_data: the input data to check
        :type input_data: Dict[str, Any]
        :raises AssertionError: if memory_files is not passed to MemoryWritingAtomicFlow
        :raises AssertionError: if input data is not supported by MemoryWritingAtomicFlow
        """
        assert "memory_files" in input_data, "memory_files not passed to MemoryWritingAtomicFlow"
        assert any(item in input_data for item in self.supported_memories), "no memories to write"

    def _call(self, input_data: Dict[str, Any]):
        """
        Write memory to memory files.
        :param input_data: the input data to write
        :type input_data: Dict[str, Any]
        :return: the output data
        :rtype: Dict[str, Any]
        :raises AssertionError: if logs is not in memory_files
        """
        try:
            if "summary" in input_data:
                assert "logs" in input_data["memory_files"], "there is summary to write, but no logs file in memory_files"
                logs_file_location = input_data["memory_files"]["logs"]
                summary_to_write = input_data["summary"]
                with open(logs_file_location, 'a') as file:
                    file.write(summary_to_write + "\n")
            return {"MemWrite_output": "Memory Write was successful"}
        except Exception as e:
            return {"MemWrite_output": str(e)}
            

    def run(
            self,
            input_data: Dict[str, Any]
    ):
        """
        Run the atomic flow.
        :param input_data: the input data to run
        :type input_data: Dict[str, Any]
        :return: the output data
        :rtype: Dict[str, Any]
        """
        self._check_input(input_data)
        return self._call(input_data)