Duibonduil commited on
Commit
974d4df
·
verified ·
1 Parent(s): 4abb3dc

Upload 2 files

Browse files
aworld/output/storage/artifact_repository.py ADDED
@@ -0,0 +1,286 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import os
3
+ import time
4
+ import uuid
5
+ from enum import Enum
6
+ from pathlib import Path
7
+ from typing import Dict, Any, Optional, List
8
+
9
+ from pydantic import BaseModel
10
+
11
+ from aworld.output import Artifact
12
+
13
+
14
+ class ArtifactRepository:
15
+ def __init__(self):
16
+ """
17
+ Initialize the artifact repository
18
+ """
19
+ pass
20
+
21
+ def load_index(self) -> Dict[str, Any]:
22
+ """Load or create index file"""
23
+ pass
24
+
25
+ def save_index(self, index_data: Dict[str, Any]) -> None:
26
+ """Save index to file"""
27
+
28
+ def store_artifact(self,
29
+ artifact: Artifact
30
+ ) -> str:
31
+ """
32
+ Store artifact and return its version identifier
33
+
34
+ Args:
35
+ artifact_id: Unique identifier of the artifact
36
+ data: Data to be stored
37
+ metadata: Optional metadata
38
+
39
+ Returns:
40
+ Version identifier
41
+ """
42
+ pass
43
+
44
+ def delete_artifact(self, artifact_id: str) -> None:
45
+ """
46
+ Delete artifact from repository
47
+ """
48
+ pass
49
+
50
+ def retrieve_latest_artifact(self, artifact_id: str) -> Optional[Dict[str, Any]]:
51
+ pass
52
+
53
+ def get_artifact_versions(self, artifact_id: str) -> List[Dict[str, Any]]:
54
+ """
55
+ Get information about all versions of an artifact
56
+
57
+ Args:
58
+ artifact_id: Artifact identifier
59
+
60
+ Returns:
61
+ List of version information
62
+ """
63
+ pass
64
+ def super_path(self) -> str:
65
+ pass
66
+
67
+ def artifact_path(self, artifact_id):
68
+ return self.super_path() + f"/artifact/{artifact_id}/index.json"
69
+
70
+ def generate_tree_data(self, workspace_name: str) -> dict:
71
+ """
72
+ Abstract method: Generate a directory tree structure for the workspace.
73
+ Args:
74
+ workspace_name: Name of the workspace (for root node)
75
+ Returns:
76
+ Directory tree as dict
77
+ """
78
+ raise NotImplementedError()
79
+
80
+
81
+ class CommonEncoder(json.JSONEncoder):
82
+ def default(self, obj):
83
+ if isinstance(obj, Enum):
84
+ return obj.name
85
+ if isinstance(obj, BaseModel):
86
+ return obj.model_dump_json()
87
+ return json.JSONEncoder.default(self, obj)
88
+
89
+ class EnumDecoder(json.JSONDecoder):
90
+ def decode(self, s, **kwargs):
91
+ parsed_json = super().decode(s, **kwargs)
92
+ for key, value in parsed_json.items():
93
+ if isinstance(value, dict) and value.get("__enum__"):
94
+ enum_type = globals()[value["__enum_type__"]]
95
+ enum_value = enum_type[value["__enum_value__"]]
96
+ parsed_json[key] = enum_value
97
+ return parsed_json
98
+
99
+
100
+ class LocalArtifactRepository(ArtifactRepository):
101
+ """Artifact storage layer: manages versioned artifacts through content-addressable storage"""
102
+
103
+ def __init__(self, storage_path: str):
104
+ """
105
+ Initialize the artifact repository
106
+
107
+ Args:
108
+ storage_path: Directory path for storing data
109
+ """
110
+ super().__init__()
111
+ self.storage_path = Path(storage_path)
112
+ self.storage_path.mkdir(parents=True, exist_ok=True)
113
+ self.index_path = self.storage_path / "index.json"
114
+ self.index = self.load_index()
115
+
116
+ def load_index(self) -> Dict[str, Any]:
117
+ """Load or create index file"""
118
+ if self.index_path.exists():
119
+ try:
120
+ with open(self.index_path, 'r') as f:
121
+ return json.load(f)
122
+ except json.JSONDecodeError:
123
+ return {"artifacts": [], "versions": []}
124
+ else:
125
+ index = {"artifacts": [], "versions": []}
126
+ self._save_index(index)
127
+ return index
128
+
129
+ def save_index(self, workspace_data) -> None:
130
+ self._save_index(workspace_data)
131
+
132
+ def _save_index(self, index: Dict[str, Any]) -> None:
133
+ """Save index to file"""
134
+ with open(self.index_path, 'w') as f:
135
+ json.dump(index, f, indent=2, ensure_ascii=False, cls=CommonEncoder)
136
+
137
+ def store_artifact(self,
138
+ artifact: Artifact
139
+ ) -> str:
140
+ """
141
+ Store artifact and return its version identifier
142
+
143
+ Args:
144
+ artifact: Artifact to be stored
145
+
146
+ Returns:
147
+ Version identifier
148
+ """
149
+ # Create version record
150
+ version = {
151
+ "hash": artifact.artifact_id,
152
+ "timestamp": time.time(),
153
+ "metadata": artifact.metadata or {}
154
+ }
155
+
156
+ # Update index
157
+
158
+ data = artifact.to_dict()
159
+ # Store content
160
+ content_path = Path(self.artifact_path(artifact.artifact_id))
161
+ content_path.parent.mkdir(parents=True, exist_ok=True)
162
+ with open(content_path, 'w') as f:
163
+ json.dump(data, f, indent=2, ensure_ascii=False, cls=CommonEncoder)
164
+
165
+ if artifact.attachments:
166
+ for attachment in artifact.attachments:
167
+ attachment_path = content_path.parent / attachment.filename
168
+ attachment_path.parent.mkdir(parents=True, exist_ok=True)
169
+ with open(attachment_path, 'w') as f:
170
+ f.write(attachment.content)
171
+
172
+
173
+
174
+ return "success"
175
+
176
+ def retrieve_latest_artifact(self, artifact_id: str) -> Optional[Dict[str, Any]]:
177
+ """
178
+ Retrieve artifact based on version ID
179
+
180
+ Args:
181
+ version_id: Version identifier
182
+
183
+ Returns:
184
+ Stored data, or None if it doesn't exist
185
+ """
186
+ artifact_path = self.artifact_path(artifact_id)
187
+
188
+ if not Path(artifact_path).exists():
189
+ return None
190
+
191
+ with open(artifact_path, 'r') as f:
192
+ return json.load(f)
193
+
194
+ def get_artifact_versions(self, artifact_id: str) -> List[Dict[str, Any]]:
195
+ """
196
+ Get information about all versions of an artifact
197
+
198
+ Args:
199
+ artifact_id: Artifact identifier
200
+
201
+ Returns:
202
+ List of version information
203
+ """
204
+ if artifact_id not in self.index["artifacts"]:
205
+ return []
206
+
207
+ versions = []
208
+ for version_id in self.index["artifacts"][artifact_id]:
209
+ version_info = self.index["versions"][version_id].copy()
210
+ version_info["id"] = version_id
211
+ versions.append(version_info)
212
+
213
+ return versions
214
+
215
+ def delete_artifact(self, artifact_id: str) -> bool:
216
+ """
217
+ Delete the specified artifact and its attachments from storage
218
+ Args:
219
+ artifact_id: Artifact identifier
220
+ Returns:
221
+ Whether deletion was successful
222
+ """
223
+ content_path = Path(self.artifact_path(artifact_id))
224
+ if not Path(content_path).exists():
225
+ return False
226
+
227
+ # Delete the artifact file
228
+ os.remove(content_path)
229
+
230
+ # Delete the artifact directory and all its contents
231
+ artifact_dir = content_path.parent
232
+ if artifact_dir.exists():
233
+ for item in artifact_dir.glob('**/*'):
234
+ if item.is_file():
235
+ os.remove(item)
236
+ for item in reversed(list(artifact_dir.glob('**/*'))):
237
+ if item.is_dir():
238
+ os.rmdir(item)
239
+ os.rmdir(artifact_dir)
240
+
241
+ return True
242
+
243
+ def super_path(self):
244
+ return str(self.storage_path)
245
+
246
+ def generate_tree_data(self, workspace_name: str) -> dict:
247
+ """
248
+ Generate a directory tree structure based on the actual local workspace folder structure.
249
+ Args:
250
+ workspace_name: Name of the workspace (for root node)
251
+ Returns:
252
+ Directory tree as dict
253
+ """
254
+ import os
255
+ def build_tree(path: str, parent_id: str, depth: int = 1) -> dict:
256
+ import uuid
257
+ node = {
258
+ "name": os.path.basename(path) or workspace_name,
259
+ "id": str(uuid.uuid4()),
260
+ "type": "dir" if os.path.isdir(path) else "file",
261
+ "parentId": parent_id,
262
+ "depth": depth,
263
+ "expanded": False,
264
+ "children": []
265
+ }
266
+ if os.path.isdir(path):
267
+ for entry in sorted(os.listdir(path)):
268
+ full_path = os.path.join(path, entry)
269
+ node["children"].append(build_tree(full_path, node["id"], depth + 1))
270
+ return node
271
+ root_path = str(self.storage_path)
272
+ import os
273
+ if not os.path.exists(root_path):
274
+ return {
275
+ "name": workspace_name,
276
+ "id": "-1",
277
+ "children": []
278
+ }
279
+ tree = build_tree(root_path, "-1", 1)
280
+ tree["name"] = workspace_name
281
+ tree["id"] = "-1"
282
+ tree["parentId"] = None
283
+ tree["depth"] = 0
284
+ return tree
285
+
286
+
aworld/output/storage/oss_artifact_repository.py ADDED
@@ -0,0 +1,283 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import hashlib
2
+ import json
3
+ import time
4
+ import uuid
5
+ from typing import Dict, Any, Optional, List, Literal
6
+ from .artifact_repository import ArtifactRepository, CommonEncoder
7
+ from aworld.output.artifact import Artifact, ArtifactAttachment
8
+
9
+
10
+ class OSSArtifactRepository(ArtifactRepository):
11
+ """
12
+ Artifact storage implementation based on Alibaba Cloud OSS, similar to LocalArtifactRepository but using OSS as backend.
13
+ """
14
+ def __init__(self,
15
+ access_key_id: str,
16
+ access_key_secret: str,
17
+ endpoint: str,
18
+ bucket_name: str,
19
+ storage_path: str = "aworld/workspaces/"):
20
+ """
21
+ Initialize OSS artifact repository
22
+ Args:
23
+ access_key_id: OSS access key ID
24
+ access_key_secret: OSS access key secret
25
+ endpoint: OSS service endpoint
26
+ bucket_name: OSS bucket name
27
+ storage_path: Storage prefix, defaults to "aworld/workspaces/"
28
+ """
29
+ import oss2
30
+
31
+ super().__init__()
32
+ self.auth = oss2.Auth(access_key_id, access_key_secret)
33
+ self.bucket = oss2.Bucket(self.auth, endpoint, bucket_name)
34
+ self.prefix = storage_path.rstrip('/') + '/'
35
+ self.index_key = f"{self.prefix}index.json"
36
+ self.index = self.load_index()
37
+
38
+ def load_index(self) -> Dict[str, Any]:
39
+ """
40
+ Load or create index file from OSS
41
+ Returns:
42
+ Index dictionary
43
+ """
44
+ import oss2
45
+
46
+ try:
47
+ result = self.bucket.get_object(self.index_key)
48
+ content = result.read().decode('utf-8')
49
+ return json.loads(content)
50
+ except oss2.exceptions.NoSuchKey:
51
+ index = {"artifacts": [], "versions": []}
52
+ self._save_index(index)
53
+ return index
54
+ except Exception as e:
55
+ print(f"Failed to load index file: {e}")
56
+ return {"artifacts": [], "versions": []}
57
+
58
+ def save_index(self, index: Dict[str, Any]) -> None:
59
+ """
60
+ Save the current index to OSS
61
+ """
62
+ self._save_index(index)
63
+
64
+ def _save_index(self, index: Dict[str, Any]) -> None:
65
+ """
66
+ Save index file to OSS
67
+ Args:
68
+ index: Index dictionary
69
+ """
70
+ try:
71
+ content = json.dumps(index, indent=2, ensure_ascii=False, cls=CommonEncoder)
72
+ self.bucket.put_object(self.index_key, content.encode('utf-8'))
73
+ except Exception as e:
74
+ print(f"Failed to save index file: {e}")
75
+ raise
76
+
77
+ def artifact_path(self, artifact_id: str) -> str:
78
+ """
79
+ Get the OSS path for a given artifact
80
+ Args:
81
+ artifact_id: Artifact identifier
82
+ Returns:
83
+ OSS path string
84
+ """
85
+ return f"{self.prefix}artifact/{artifact_id}/index.json"
86
+
87
+ def attachment_path(self, artifact_id: str, filename: str) -> str:
88
+ """
89
+ Get the OSS path for an artifact attachment
90
+ Args:
91
+ artifact_id: Artifact identifier
92
+ filename: Attachment filename
93
+ Returns:
94
+ OSS path string
95
+ """
96
+ return f"{self.prefix}artifact/{artifact_id}/attachments/{filename}"
97
+
98
+ def store_artifact(self, artifact: Artifact) -> str:
99
+ """
100
+ Store artifact and its attachments to OSS
101
+ Args:
102
+ artifact: Artifact instance to be stored
103
+ Returns:
104
+ Version identifier (always 'success' for now)
105
+ """
106
+ import oss2
107
+
108
+ try:
109
+ # Prepare version record
110
+ version = {
111
+ "hash": artifact.artifact_id,
112
+ "timestamp": time.time(),
113
+ "metadata": artifact.metadata or {}
114
+ }
115
+ # Store artifact content
116
+ data = artifact.to_dict()
117
+ content_key = self.artifact_path(artifact.artifact_id)
118
+ content = json.dumps(data, indent=2, ensure_ascii=False, cls=CommonEncoder)
119
+ self.bucket.put_object(content_key, content.encode('utf-8'))
120
+ # Store attachments if any
121
+ if artifact.attachments:
122
+ for attachment in artifact.attachments:
123
+ if isinstance(attachment, ArtifactAttachment):
124
+ attachment_key = self.attachment_path(artifact.artifact_id, attachment.filename)
125
+ self.bucket.put_object(attachment_key, attachment.content.encode('utf-8'))
126
+ # Update index
127
+ artifact_exists = False
128
+ for item in self.index["artifacts"]:
129
+ if item['artifact_id'] == artifact.artifact_id:
130
+ item['version'] = version
131
+ artifact_exists = True
132
+ break
133
+ if not artifact_exists:
134
+ self.index["artifacts"].append({
135
+ 'artifact_id': artifact.artifact_id,
136
+ 'type': 'artifact',
137
+ 'version': version
138
+ })
139
+ self._save_index(self.index)
140
+ return "success"
141
+ except Exception as e:
142
+ print(f"Storage failed: {e}")
143
+ raise
144
+
145
+ def retrieve_latest_artifact(self, artifact_id: str) -> Optional[Dict[str, Any]]:
146
+ """
147
+ Retrieve the latest version of artifact based on artifact ID
148
+ Args:
149
+ artifact_id: Artifact identifier
150
+ Returns:
151
+ Stored data as dict, or None if it doesn't exist
152
+ """
153
+ import oss2
154
+
155
+ try:
156
+ content_key = self.artifact_path(artifact_id)
157
+ try:
158
+ result = self.bucket.get_object(content_key)
159
+ content = result.read().decode('utf-8')
160
+ return json.loads(content)
161
+ except oss2.exceptions.NoSuchKey:
162
+ print(f"Content file doesn't exist: {content_key}")
163
+ return None
164
+ except Exception as e:
165
+ print(f"Failed to retrieve latest artifact: {e}")
166
+ return None
167
+
168
+ def get_artifact_versions(self, artifact_id: str) -> List[Dict[str, Any]]:
169
+ """
170
+ Get information about all versions of an artifact (currently only latest version is tracked)
171
+ Args:
172
+ artifact_id: Artifact identifier
173
+ Returns:
174
+ List of version information
175
+ """
176
+ try:
177
+ for artifact in self.index["artifacts"]:
178
+ if artifact['artifact_id'] == artifact_id:
179
+ version_info = artifact["version"].copy()
180
+ version_info["artifact_id"] = artifact_id
181
+ return [version_info]
182
+ return []
183
+ except Exception as e:
184
+ print(f"Failed to get version information: {e}")
185
+ return []
186
+
187
+ def delete_artifact(self, artifact_id: str) -> bool:
188
+ """
189
+ Delete the specified artifact and its attachments from OSS
190
+ Args:
191
+ artifact_id: Artifact identifier
192
+ Returns:
193
+ Whether deletion was successful
194
+ """
195
+ import oss2
196
+
197
+ try:
198
+ # Delete artifact content
199
+ content_key = self.artifact_path(artifact_id)
200
+ try:
201
+ self.bucket.delete_object(content_key)
202
+ except oss2.exceptions.NoSuchKey:
203
+ pass
204
+ # Delete attachments (list objects under attachments/)
205
+ attachment_prefix = f"{self.prefix}artifact/{artifact_id}/attachments/"
206
+ for obj in oss2.ObjectIterator(self.bucket, prefix=attachment_prefix):
207
+ self.bucket.delete_object(obj.key)
208
+ # Remove from index
209
+ for i, artifact in enumerate(self.index["artifacts"]):
210
+ if artifact['artifact_id'] == artifact_id:
211
+ del self.index["artifacts"][i]
212
+ self._save_index(self.index)
213
+ return True
214
+ return False
215
+ except Exception as e:
216
+ print(f"Failed to delete artifact: {e}")
217
+ return False
218
+
219
+ def list_artifacts(self) -> List[Dict[str, Any]]:
220
+ """
221
+ List all artifacts in the repository
222
+ Returns:
223
+ List of artifact information
224
+ """
225
+ try:
226
+ return [
227
+ {
228
+ "artifact_id": artifact["artifact_id"],
229
+ "type": artifact["type"],
230
+ "timestamp": artifact["version"]["timestamp"],
231
+ "metadata": artifact["version"]["metadata"]
232
+ }
233
+ for artifact in self.index["artifacts"]
234
+ ]
235
+ except Exception as e:
236
+ print(f"Failed to list artifacts: {e}")
237
+ return []
238
+
239
+
240
+ def generate_tree_data(self, workspace_name: str) -> dict:
241
+ """
242
+ Generate a directory tree structure based on the OSS workspace folder structure.
243
+ Args:
244
+ workspace_name: Name of the workspace (for root node)
245
+ Returns:
246
+ Directory tree as dict
247
+ """
248
+ import oss2
249
+
250
+ all_keys = [obj.key for obj in oss2.ObjectIterator(self.bucket, prefix=self.prefix)]
251
+ # remove root prefix
252
+ rel_keys = [key[len(self.prefix):] for key in all_keys if key != self.index_key]
253
+ # build tree
254
+ root = {
255
+ "name": workspace_name,
256
+ "id": "-1",
257
+ "type": "dir",
258
+ "parentId": None,
259
+ "depth": 0,
260
+ "expanded": False,
261
+ "children": []
262
+ }
263
+ node_map = {"": root} # 路径到节点的映射
264
+ for key in rel_keys:
265
+ parts = [p for p in key.split('/') if p]
266
+ cur_path = ""
267
+ parent_path = ""
268
+ for depth, part in enumerate(parts):
269
+ parent_path = cur_path
270
+ cur_path = f"{cur_path}/{part}" if cur_path else part
271
+ if cur_path not in node_map:
272
+ node = {
273
+ "name": part,
274
+ "id": str(uuid.uuid4()),
275
+ "type": "dir" if depth < len(parts) - 1 else "file",
276
+ "parentId": node_map[parent_path]["id"],
277
+ "depth": depth + 1,
278
+ "expanded": False,
279
+ "children": []
280
+ }
281
+ node_map[parent_path]["children"].append(node)
282
+ node_map[cur_path] = node
283
+ return root