File size: 9,638 Bytes
985c397
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# SPDX-License-Identifier: LGPL-2.1-or-later

# ***************************************************************************
# *   Copyright (c) 2025 Samuel Abels <knipknap@gmail.com>                  *
# *                                                                         *
# *   This program is free software; you can redistribute it and/or modify  *
# *   it under the terms of the GNU Lesser General Public License (LGPL)    *
# *   as published by the Free Software Foundation; either version 2 of     *
# *   the License, or (at your option) any later version.                   *
# *   for detail see the LICENCE text file.                                 *
# *                                                                         *
# *   This program is distributed in the hope that it will be useful,       *
# *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
# *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
# *   GNU Library General Public License for more details.                  *
# *                                                                         *
# *   You should have received a copy of the GNU Library General Public     *
# *   License along with this program; if not, write to the Free Software   *
# *   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  *
# *   USA                                                                   *
# *                                                                         *
# ***************************************************************************
import pprint
from typing import Dict, List, Optional
from ..uri import AssetUri
from .base import AssetStore


class MemoryStore(AssetStore):
    """
    An in-memory implementation of the AssetStore.

    This store keeps all asset data in memory and is primarily intended for
    testing and demonstration purposes. It does not provide persistence.
    """

    def __init__(self, name: str, *args, **kwargs):
        super().__init__(name, *args, **kwargs)
        self._data: Dict[str, Dict[str, Dict[str, bytes]]] = {}
        self._versions: Dict[str, Dict[str, List[str]]] = {}

    async def get(self, uri: AssetUri) -> bytes:
        asset_type = uri.asset_type
        asset_id = uri.asset_id
        version = uri.version or self._get_latest_version(asset_type, asset_id)

        if (
            asset_type not in self._data
            or asset_id not in self._data[asset_type]
            or version not in self._data[asset_type][asset_id]
        ):
            raise FileNotFoundError(f"Asset not found: {uri}")

        return self._data[asset_type][asset_id][version]

    async def exists(self, uri: AssetUri) -> bool:
        asset_type = uri.asset_type
        asset_id = uri.asset_id
        version = uri.version or self._get_latest_version(asset_type, asset_id)

        return (
            asset_type in self._data
            and asset_id in self._data[asset_type]
            and version in self._data[asset_type][asset_id]
        )

    async def delete(self, uri: AssetUri) -> None:
        asset_type = uri.asset_type
        asset_id = uri.asset_id
        version = uri.version  # Capture the version from the URI

        if asset_type not in self._data or asset_id not in self._data[asset_type]:
            # Deleting non-existent asset should not raise an error
            return

        if version:
            # If a version is specified, try to delete only that version
            if version in self._data[asset_type][asset_id]:
                del self._data[asset_type][asset_id][version]
                # Remove version from the versions list
                if (
                    asset_type in self._versions
                    and asset_id in self._versions[asset_type]
                    and version in self._versions[asset_type][asset_id]
                ):
                    self._versions[asset_type][asset_id].remove(version)

                # If no versions left for this asset_id, clean up
                if not self._data[asset_type][asset_id]:
                    del self._data[asset_type][asset_id]
                    if asset_type in self._versions and asset_id in self._versions[asset_type]:
                        del self._versions[asset_type][asset_id]
        else:
            # If no version is specified, delete the entire asset
            del self._data[asset_type][asset_id]
            if asset_type in self._versions and asset_id in self._versions[asset_type]:
                del self._versions[asset_type][asset_id]

        # Clean up empty asset types
        if asset_type in self._data and not self._data[asset_type]:
            del self._data[asset_type]
        if asset_type in self._versions and not self._versions[asset_type]:
            del self._versions[asset_type]

    async def create(self, asset_type: str, asset_id: str, data: bytes) -> AssetUri:
        if asset_type not in self._data:
            self._data[asset_type] = {}
            self._versions[asset_type] = {}

        if asset_id in self._data[asset_type]:
            # For simplicity, create overwrites existing in this memory store
            # A real store might handle this differently or raise an error
            pass

        if asset_id not in self._data[asset_type]:
            self._data[asset_type][asset_id] = {}
            self._versions[asset_type][asset_id] = []

        version = "1"
        self._data[asset_type][asset_id][version] = data
        self._versions[asset_type][asset_id].append(version)

        return AssetUri(f"{asset_type}://{asset_id}/{version}")

    async def update(self, uri: AssetUri, data: bytes) -> AssetUri:
        asset_type = uri.asset_type
        asset_id = uri.asset_id

        if asset_type not in self._data or asset_id not in self._data[asset_type]:
            raise FileNotFoundError(f"Asset not found for update: {uri}")

        # Update should create a new version
        latest_version = self._get_latest_version(asset_type, asset_id)
        version = str(int(latest_version or 0) + 1)

        self._data[asset_type][asset_id][version] = data
        self._versions[asset_type][asset_id].append(version)

        return AssetUri(f"{asset_type}://{asset_id}/{version}")

    async def list_assets(
        self, asset_type: str | None = None, limit: int | None = None, offset: int | None = None
    ) -> List[AssetUri]:
        all_uris: List[AssetUri] = []
        for current_type, assets in self._data.items():
            if asset_type is None or current_type == asset_type:
                for asset_id in assets:
                    latest_version = self._get_latest_version(current_type, asset_id)
                    if latest_version:
                        all_uris.append(AssetUri(f"{current_type}://{asset_id}/{latest_version}"))

        # Apply offset and limit
        start = offset if offset is not None else 0
        end = start + limit if limit is not None else len(all_uris)
        return all_uris[start:end]

    async def count_assets(self, asset_type: str | None = None) -> int:
        """
        Counts assets in the store, optionally filtered by asset type.
        """
        if asset_type is None:
            count = 0
            for assets_by_id in self._data.values():
                count += len(assets_by_id)
            return count
        else:
            if asset_type in self._data:
                return len(self._data[asset_type])
            return 0

    async def list_versions(self, uri: AssetUri) -> List[AssetUri]:
        asset_type = uri.asset_type
        asset_id = uri.asset_id

        if asset_type not in self._versions or asset_id not in self._versions[asset_type]:
            return []

        version_uris: List[AssetUri] = []
        for version in self._versions[asset_type][asset_id]:
            version_uris.append(AssetUri(f"{asset_type}://{asset_id}/{version}"))
        return version_uris

    async def is_empty(self, asset_type: str | None = None) -> bool:
        if asset_type is None:
            return not bool(self._data)
        else:
            return asset_type not in self._data or not bool(self._data[asset_type])

    def _get_latest_version(self, asset_type: str, asset_id: str) -> Optional[str]:
        if (
            asset_type in self._versions
            and asset_id in self._versions[asset_type]
            and self._versions[asset_type][asset_id]
        ):
            return self._versions[asset_type][asset_id][-1]
        return None

    def dump(self, print: bool = False) -> Dict[str, Dict[str, Dict[str, bytes]]] | None:
        """
        Dumps the entire content of the memory store.

        Args:
            print (bool): If True, pretty-prints the data to the console,
                          excluding the asset data itself.

        Returns:
            Dict[str, Dict[str, Dict[str, bytes]]] | None: The stored data as a
            dictionary, or None if print is True.
        """
        if not print:
            return self._data

        printable_data = {}
        for asset_type, assets in self._data.items():
            printable_data[asset_type] = {}
            for asset_id, versions in assets.items():
                printable_data[asset_type][asset_id] = {}
                for version, data_bytes in versions.items():
                    printable_data[asset_type][asset_id][
                        version
                    ] = f"<data skipped, {len(data_bytes)} bytes>"

        pprint.pprint(printable_data, indent=4)
        return self._data