Spaces:
Paused
Paused
File size: 13,126 Bytes
7d4338a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 | from __future__ import annotations
from datetime import datetime, timezone
import json
import os
import time
from turtle import stamp
import urllib.request
import uuid
import zipfile
from pathlib import Path
from typing import Any
from helpers import files, print_style, plugins, git
from helpers import yaml as yaml_helper
from helpers.plugins import (
META_FILE_NAME,
PluginMetadata,
get_plugins_list,
after_plugin_change,
)
from werkzeug.datastructures import FileStorage
from werkzeug.utils import secure_filename
def _get_user_plugins_dir() -> str:
"""Return absolute path to usr/plugins/."""
return files.get_abs_path(files.USER_DIR, files.PLUGINS_DIR)
def _get_plugin_name(meta: PluginMetadata) -> str:
plugin_name = (meta.name or "").strip()
if not plugin_name:
raise ValueError(f"{META_FILE_NAME} is missing required field 'name'")
return plugin_name
def validate_plugin_dir(path: str, plugin_name: str = "") -> PluginMetadata:
"""Check directory contains plugin.yaml and return parsed metadata.
Raises ValueError if plugin.yaml is missing or invalid."""
meta_path = os.path.join(path, META_FILE_NAME)
if not os.path.isfile(meta_path):
raise ValueError(f"No {META_FILE_NAME} found in {os.path.basename(path)}")
with open(meta_path, "r", encoding="utf-8") as f:
content = f.read()
data = yaml_helper.loads(content)
model = PluginMetadata.model_validate(data)
if plugin_name and plugin_name != model.name:
raise ValueError(
f"Plugin name is incorrect: expected '{plugin_name}', got '{model.name}'. The author needs to correct this in the plugin.yaml file."
)
return model
def check_plugin_conflict(name: str) -> None:
"""Raise ValueError if a plugin with this name already exists in usr/plugins/."""
dest = os.path.join(_get_user_plugins_dir(), name)
if os.path.exists(dest):
raise ValueError(f"Plugin '{name}' is already installed")
def _find_plugin_root(extracted_dir: str) -> str:
"""Walk extracted directory to find the parent of plugin.yaml.
Returns absolute path to the plugin root directory."""
for root, dirs, dir_files in os.walk(extracted_dir):
if META_FILE_NAME in dir_files:
return root
raise ValueError(f"No {META_FILE_NAME} found in the uploaded archive")
def install_uploaded_zip(plugin_file: FileStorage) -> dict:
"""Persist an uploaded ZIP temporarily and install it."""
original_filename = Path((plugin_file.filename or "").strip()).name
if not original_filename:
raise ValueError("No file selected")
tmp_dir = Path(files.get_abs_path("tmp", "plugin_uploads"))
tmp_dir.mkdir(parents=True, exist_ok=True)
temp_name = secure_filename(original_filename) or "plugin.zip"
if not temp_name.lower().endswith(".zip"):
temp_name = f"{temp_name}.zip"
unique = uuid.uuid4().hex[:8]
stamp = time.strftime("%Y%m%d_%H%M%S")
tmp_path = str(tmp_dir / f"plugin_{stamp}_{unique}_{temp_name}")
plugin_file.save(tmp_path)
return install_from_zip(tmp_path, original_filename=original_filename)
def install_from_zip(zip_path: str, original_filename: str | None = None) -> dict:
"""Extract ZIP, find plugin.yaml, move its parent to usr/plugins/.
Returns dict with plugin name and metadata.
Cleans up tmp files regardless of outcome."""
temp_name = f"tmp_plugin_{time.strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"
extract_dir = files.get_abs_path(files.TEMP_DIR, "plugin_installs", temp_name)
extract_dir = files.create_dir_safe(extract_dir)
dest = ""
try:
try:
# Extract with path traversal protection
with zipfile.ZipFile(zip_path, "r") as z:
for member in z.namelist():
member_path = os.path.realpath(os.path.join(extract_dir, member))
if not (files.is_in_dir(member_path, extract_dir)):
raise ValueError(f"Unsafe path in archive: {member}")
z.extractall(extract_dir)
# Find plugin.yaml
plugin_root = _find_plugin_root(extract_dir)
meta = validate_plugin_dir(plugin_root)
plugin_name = _get_plugin_name(meta)
check_plugin_conflict(plugin_name)
# Move to usr/plugins/
dest = os.path.join(_get_user_plugins_dir(), plugin_name)
files.create_dir(os.path.dirname(dest))
files.move_dir(plugin_root, dest)
except Exception as e:
print_style.PrintStyle.error(f"Failed to validate plugin: {e}")
files.delete_dir(extract_dir)
raise
# run installation hook
try:
run_install_hook(plugin_name)
except Exception as e:
print_style.PrintStyle.error(
f"Failed to run installation hook for {plugin_name}: {e}"
)
files.delete_dir(dest)
raise
# does it have python files?
python_change = bool(files.find_existing_paths_by_pattern(dest+"/**/*.py"))
after_plugin_change([plugin_name], python_change=python_change)
return {
"success": True,
"plugin_name": plugin_name,
"title": meta.title or plugin_name,
"path": files.deabsolute_path(dest),
}
finally:
# Cleanup: extracted files and the archive
try:
files.delete_dir(extract_dir)
files.delete_file(zip_path)
except Exception as e:
pass
def _download_thumbnail(thumbnail_url: str, plugin_dir: str) -> None:
"""Download thumbnail from URL to plugin_dir/webui/thumbnail.<ext>. Non-fatal."""
try:
if not thumbnail_url:
return
from urllib.parse import urlparse
parsed = urlparse(thumbnail_url)
if parsed.scheme not in ("http", "https"):
return
_allowed_exts = {"png", "jpg", "jpeg", "gif", "webp"}
url_path = parsed.path.lower()
ext = url_path.rsplit(".", 1)[-1] if "." in url_path else ""
if ext not in _allowed_exts:
ext = "png"
webui_dir = Path(plugin_dir) / "webui"
webui_dir.mkdir(parents=True, exist_ok=True)
dest = webui_dir / f"thumbnail.{ext}"
req = urllib.request.Request(thumbnail_url, headers={"User-Agent": "AgentZero"})
with urllib.request.urlopen(req, timeout=10) as resp:
dest.write_bytes(resp.read())
except Exception as e:
print_style.PrintStyle.warning(f"Failed to download plugin thumbnail: {e}")
def install_from_git(url: str, token: str | None = None, plugin_name: str = "", thumbnail_url: str = "") -> dict:
"""Clone git repo into usr/plugins/, validate plugin.yaml.
Returns dict with plugin name and metadata."""
from helpers.git import clone_repo
temp_name = f"tmp_plugin_{time.strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"
git_dir = files.get_abs_path(files.TEMP_DIR, "plugins_installer", temp_name)
try:
files.create_dir_safe(git_dir)
clone_repo(url, git_dir, token=token or None)
meta = validate_plugin_dir(git_dir, plugin_name=plugin_name)
plugin_name = _get_plugin_name(meta)
check_plugin_conflict(plugin_name)
final_dir = os.path.join(_get_user_plugins_dir(), plugin_name)
files.move_dir(git_dir, final_dir)
except Exception as e:
# No plugin.yaml — remove cloned repo
print_style.PrintStyle.error(f"Failed to validate plugin: {e}")
files.delete_dir(git_dir)
raise
_download_thumbnail(thumbnail_url, final_dir)
# run installation hook
try:
run_install_hook(plugin_name)
except Exception as e:
print_style.PrintStyle.error(
f"Failed to run installation hook for {plugin_name}: {e}"
)
files.delete_dir(final_dir)
raise
# does it have python files?
python_change = bool(files.find_existing_paths_by_pattern(final_dir+"/**/*.py"))
after_plugin_change([plugin_name],python_change=python_change)
return {
"success": True,
"plugin_name": plugin_name,
"title": meta.title or plugin_name,
"path": files.deabsolute_path(final_dir),
}
def update_from_git(plugin_name: str) -> dict:
plugin_name = (plugin_name or "").strip()
if not plugin_name:
raise ValueError("Missing plugin_name")
plugin_dir = plugins.find_plugin_dir(plugin_name)
if not plugin_dir:
raise ValueError("Plugin not found")
custom_plugins_dir = files.get_abs_path(files.USER_DIR, files.PLUGINS_DIR)
if not files.is_in_dir(plugin_dir, custom_plugins_dir):
raise ValueError("Only custom plugins can be updated")
try:
run_pre_update_hook(plugin_name)
except Exception as e:
print_style.PrintStyle.error(
f"Failed to run pre-update hook for {plugin_name}: {e}"
)
raise
try:
repo = git.update_repo(plugin_dir)
meta = plugins.get_plugin_meta(plugin_name)
except Exception as e:
print_style.PrintStyle.error(f"Failed to update plugin: {e}")
raise
try:
run_install_hook(plugin_name)
except Exception as e:
print_style.PrintStyle.error(
f"Failed to run installation hook for {plugin_name}: {e}"
)
raise
after_plugin_change([plugin_name])
head = repo.head.commit
return {
"ok": True,
"success": True,
"plugin_name": plugin_name,
"title": meta.title if meta else plugin_name,
"path": files.deabsolute_path(plugin_dir),
"current_commit": head.hexsha,
"current_commit_timestamp": datetime.fromtimestamp(head.committed_date, timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
"version": getattr(meta, "version", "") or "",
"branch": repo.active_branch.name if not repo.head.is_detached else "",
"remote_url": git.strip_auth_from_url(repo.remotes.origin.url) if repo.remotes else "",
"directory_name": Path(plugin_dir).name,
}
def run_install_hook(plugin_name: str):
return plugins.call_plugin_hook(plugin_name, "install")
def run_pre_update_hook(plugin_name: str):
return plugins.call_plugin_hook(plugin_name, "pre_update")
def get_plugin_hub_index(force: bool = False) -> dict[str, Any]:
"""Return the plugin index plus installed Plugin Hub keys."""
index_data = fetch_plugin_index(force=force)
if not isinstance(index_data, dict):
raise ValueError("Plugin index response was not a JSON object")
plugins = index_data.get("plugins")
if not isinstance(plugins, dict):
raise ValueError("Plugin index payload is missing a valid 'plugins' map")
from helpers.plugins import find_plugin_dir
installed_dirs = set(get_plugins_list())
installed_keys: list[str] = []
_thumb_exts = ("png", "jpg", "jpeg", "gif", "webp")
for key, plugin_data in plugins.items():
if not isinstance(plugin_data, dict):
continue
if key not in installed_dirs:
continue
installed_keys.append(key)
# Backfill thumbnail for plugins installed before this feature existed
plugin_dir = find_plugin_dir(key)
if not plugin_dir:
continue
webui_dir = Path(plugin_dir) / "webui"
has_thumb = any((webui_dir / f"thumbnail.{ext}").is_file() for ext in _thumb_exts)
if has_thumb:
continue
thumb_url = plugin_data.get("thumbnail") or ""
if not thumb_url:
from urllib.parse import urlparse
raw_base = None
github = plugin_data.get("github") or ""
if github:
try:
parsed = urlparse(github)
if parsed.netloc == "github.com":
parts = parsed.path.strip("/").split("/")
if len(parts) >= 2:
raw_base = f"https://raw.githubusercontent.com/{parts[0]}/{parts[1]}"
except Exception:
pass
if raw_base:
thumb_url = f"{raw_base}/main/thumbnail.png"
if thumb_url:
_download_thumbnail(thumb_url, plugin_dir)
return {"index": index_data, "installed_plugins": installed_keys}
def fetch_plugin_index(force: bool = False) -> dict:
"""Download the plugin index from GitHub releases."""
index_url = "https://github.com/agent0ai/a0-plugins/releases/download/generated-index/index.json"
if force:
separator = "&" if "?" in index_url else "?"
index_url = f"{index_url}{separator}ts={time.time_ns()}"
headers = {"User-Agent": "AgentZero"}
if force:
headers["Cache-Control"] = "no-cache"
headers["Pragma"] = "no-cache"
req = urllib.request.Request(index_url, headers=headers)
with urllib.request.urlopen(req, timeout=30) as resp:
data = json.loads(resp.read().decode())
return data
|