File size: 28,949 Bytes
14c6469
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
from __future__ import annotations

import asyncio
import re, json, glob
import time
from pathlib import Path
from typing import (
    Any,
    Dict,
    Iterator,
    List,
    Literal,
    Optional,
    TYPE_CHECKING,
    TypedDict,
)

from regex import W

from helpers import (
    files,
    git,
    notification,
    print_style,
    yaml as yaml_helper,
    cache,
    extension,
    watchdog,
    modules,
    functions,
)
from pydantic import BaseModel, Field

from helpers.defer import DeferredTask
from helpers.watchdog import WatchItem

if TYPE_CHECKING:
    from agent import Agent

# Extracts target selector from <meta name="plugin-target" content="...">
_META_TARGET_RE = re.compile(
    r'<meta\s+name=["\']plugin-target["\']\s+content=["\']([^"\']+)["\']',
    re.IGNORECASE,
)


ToggleState = Literal["enabled", "disabled", "advanced"]


class PluginAssetFile(TypedDict):
    path: str
    project_name: str
    agent_profile: str


META_FILE_NAME = "plugin.yaml"
CONFIG_FILE_NAME = "config.json"
CONFIG_DEFAULT_FILE_NAME = "default_config.yaml"
DISABLED_FILE_NAME = ".toggle-0"
ENABLED_FILE_NAME = ".toggle-1"
TOGGLE_FILE_PATTERN = ".toggle-[01]"

HOOKS_SCRIPT = "hooks.py"
HOOKS_CACHE_AREA = "plugin_hooks(plugins)"
PLUGINS_LIST_CACHE_AREA = "plugins_list(plugins)"
ENABLED_PLUGINS_LIST_CACHE_AREA = "enabled_plugins(plugins)"
ENABLED_PLUGINS_PATHS_CACHE_AREA = "enabled_plugins_paths(plugins)"


_last_frontend_reload_notification_at = 0.0


class PluginMetadata(BaseModel):
    name: str = ""
    title: str = ""
    description: str = ""
    version: str = ""
    settings_sections: List[str] = Field(default_factory=list)
    per_project_config: bool = False
    per_agent_config: bool = False
    always_enabled: bool = False


class PluginListItem(BaseModel):
    name: str
    path: str
    display_name: str = ""
    description: str = ""
    version: str = ""
    author: str = ""
    repo: str = ""
    settings_sections: List[str] = Field(default_factory=list)
    per_project_config: bool = False
    per_agent_config: bool = False
    always_enabled: bool = False
    is_custom: bool = False
    has_main_screen: bool = False
    has_config_screen: bool = False
    has_readme: bool = False
    has_license: bool = False
    has_execute_script: bool = False
    toggle_state: ToggleState = "disabled"
    current_commit: str = ""
    current_commit_timestamp: str = ""
    thumbnail_url: str = ""


class PluginUpdateInfo(BaseModel):
    name: str
    path: str
    display_name: str = ""
    commits_since_local: int = 0
    last_remote_commit_at: str = ""
    branch: str = ""
    remote_branch: str = ""
    is_git_repo: bool = False
    is_remote: bool = False
    error: str = ""


def register_watchdogs():

    def on_plugin_change(events: list[WatchItem]):
        plugin_names: list[str] = []
        for path, _event in events:
            path = path.replace("\\", "/")
            if "/plugins/" not in path:
                continue
            plugin_name = path.split("/plugins/", 1)[1].split("/", 1)[0]
            if plugin_name and plugin_name not in plugin_names:
                plugin_names.append(plugin_name)
        print_style.PrintStyle.debug("Plugins watchdog triggered", plugin_names)
        python_change = any(path.endswith('.py') for path, _event in events)
        after_plugin_change(plugin_names or None, python_change=python_change)

    relevant_patterns = ["**/extensions/**/*", TOGGLE_FILE_PATTERN, HOOKS_SCRIPT]

    # combine relevant patterns with base path
    def expand_patterns(base_path: str):
        result = []
        for pattern in relevant_patterns:
            result.append(base_path + pattern)
        return result

    # add watchdogs for plugin roots
    watchdog.add_watchdog(
        id="plugins_roots",
        roots=get_plugin_roots(),
        patterns=[*expand_patterns("*/")],
        handler=on_plugin_change,
    )

    from helpers import projects
    from helpers import subagents

    # add watchdogs for plugin overrides in projects/plugins and projects/agents/plugins
    watchdog.add_watchdog(
        id="plugins_projects",
        roots=[files.get_abs_path(projects.PROJECTS_PARENT_DIR)],
        patterns=[
            *expand_patterns(f"*/{projects.PROJECT_META_DIR}/plugins/"),
            *expand_patterns(f"*/{projects.PROJECT_META_DIR}/agents/*/plugins/"),
        ],
        handler=on_plugin_change,
    )

    # add watchdogs for plugin overrides in /agents/plugins and /usr/agents/plugins
    watchdog.add_watchdog(
        id="plugins_agents",
        roots=[
            files.get_abs_path(subagents.DEFAULT_AGENTS_DIR),
            files.get_abs_path(subagents.USER_AGENTS_DIR),
        ],
        patterns=[*expand_patterns(f"*/plugins/*/")],
        handler=on_plugin_change,
    )


@extension.extensible
def after_plugin_change(plugin_names: list[str] | None = None, python_change:bool=False):
    clear_plugin_cache(plugin_names)
    if python_change:
        refresh_plugin_modules(plugin_names)
    send_frontend_reload_notification(plugin_names)


def refresh_plugin_modules(plugin_names: list[str] | None = None):
    if plugin_names:
        clear_plugins = any(name.startswith("_") for name in plugin_names)
        clear_usr_plugins = any(not name.startswith("_") for name in plugin_names)
        if clear_plugins:
            modules.purge_namespace("plugins")
        if clear_usr_plugins:
            modules.purge_namespace("usr.plugins")
    else:
        modules.purge_namespace("plugins")
        modules.purge_namespace("usr.plugins")


def clear_plugin_cache(plugin_names: list[str] | None = None):
    areas = ["*(plugins)*", "*(extensions)*", "*(api)*"]
    for area in areas:
        cache.clear(area)

    from helpers.ws_manager import send_data

    DeferredTask().start_task(
        send_data,
        "clear_cache",
        {"areas": areas},
        endpoint_name="/ws",
    )


def get_plugin_roots(plugin_name: str = "") -> List[str]:
    """Plugin root directories, ordered by priority (user first)."""
    return [
        files.get_abs_path(files.USER_DIR, files.PLUGINS_DIR, plugin_name),
        files.get_abs_path(files.PLUGINS_DIR, plugin_name),
    ]


def get_plugins_list():
    if cached := cache.get(PLUGINS_LIST_CACHE_AREA, ""):
        return cached

    result: list[str] = []
    seen_names: set[str] = set()
    for root in get_plugin_roots():
        for dir in Path(root).iterdir():
            if not dir.is_dir() or dir.name.startswith("."):
                continue
            if dir.name in seen_names:
                continue
            if files.exists(str(dir), META_FILE_NAME):
                seen_names.add(dir.name)
                result.append(dir.name)
    result.sort(key=lambda p: Path(p).name)

    cache.add(PLUGINS_LIST_CACHE_AREA, "", result)
    return result


def get_enhanced_plugins_list(
    custom: bool = True, builtin: bool = True, plugin_names: list[str] | None = None
) -> List[PluginListItem]:
    """Discover plugins by directory convention. First root wins on ID conflict."""
    results = []
    allowed_names = set(plugin_names) if plugin_names else None

    def load_plugins(root_path: str, is_custom: bool):
        for d in sorted(Path(root_path).iterdir(), key=lambda p: p.name):
            try:
                if not d.is_dir() or d.name.startswith("."):
                    continue
                if allowed_names is not None and d.name not in allowed_names:
                    continue
                meta_file = str(d / META_FILE_NAME)
                if not files.exists(meta_file):
                    continue
                meta = PluginMetadata.model_validate(files.read_file_yaml(meta_file))
                has_main_screen = files.exists(str(d / "webui" / "main.html"))
                has_config_screen = files.exists(str(d / "webui" / "config.html"))
                has_readme = files.exists(str(d / "README.md"))
                has_license = files.exists(str(d / "LICENSE"))
                has_execute_script = files.exists(str(d / "execute.py"))
                toggle_state = get_toggle_state(d.name)
                thumbnail_url = ""
                _thumb_exts = ("png", "jpg", "jpeg", "gif", "webp")
                for _ext in _thumb_exts:
                    _thumb = d / "webui" / f"thumbnail.{_ext}"
                    if _thumb.is_file():
                        thumbnail_url = f"/plugins/{d.name}/webui/thumbnail.{_ext}"
                        break
                current_commit = ""
                current_commit_timestamp = ""
                author = ""
                repo_name = ""
                if is_custom:
                    repo_info = git.get_repo_release_info(str(d))
                    if repo_info.is_git_repo:
                        author = repo_info.author
                        repo_name = repo_info.repo
                        if repo_info.head:
                            current_commit = repo_info.head.hash
                            current_commit_timestamp = repo_info.head.committed_at
                results.append(
                    PluginListItem(
                        name=d.name,
                        path=files.normalize_a0_path(str(d)),
                        display_name=meta.title or d.name,
                        description=meta.description,
                        version=meta.version,
                        author=author,
                        repo=repo_name,
                        settings_sections=meta.settings_sections,
                        per_project_config=meta.per_project_config,
                        per_agent_config=meta.per_agent_config,
                        always_enabled=meta.always_enabled,
                        is_custom=is_custom,
                        has_main_screen=has_main_screen,
                        has_config_screen=has_config_screen,
                        has_readme=has_readme,
                        has_license=has_license,
                        has_execute_script=has_execute_script,
                        toggle_state=toggle_state,
                        current_commit=current_commit,
                        current_commit_timestamp=current_commit_timestamp,
                        thumbnail_url=thumbnail_url,
                    )
                )
            except Exception as e:
                print_style.PrintStyle.error(f"Failed to load plugin {d.name}: {e}")
                continue

    if custom:
        load_plugins(files.get_abs_path(files.USER_DIR, files.PLUGINS_DIR), True)
    if builtin:
        load_plugins(files.get_abs_path(files.PLUGINS_DIR), False)
    return results


def get_custom_plugins_updates(
    plugin_names: list[str] | None = None,
) -> List[PluginUpdateInfo]:
    plugins = get_enhanced_plugins_list(
        custom=True, builtin=False, plugin_names=plugin_names
    )
    results: list[PluginUpdateInfo] = []

    for plugin in plugins:
        update = git.get_remote_commits_since_local(plugin.path)
        results.append(
            PluginUpdateInfo(
                name=plugin.name,
                path=plugin.path,
                display_name=plugin.display_name,
                commits_since_local=update.commits_since_local,
                last_remote_commit_at=update.last_remote_commit_at,
                branch=update.branch,
                remote_branch=update.remote_branch,
                is_git_repo=update.is_git_repo,
                is_remote=update.is_remote,
                error=update.error,
            )
        )

    return results


def get_plugin_meta(plugin_name: str):
    plugin_dir = find_plugin_dir(plugin_name)
    if not plugin_dir:
        return None
    return PluginMetadata.model_validate(
        files.read_file_yaml(files.get_abs_path(plugin_dir, META_FILE_NAME))
    )


def find_plugin_dir(plugin_name: str):
    if not plugin_name:
        return None

    # check if the plugin is in the user directory
    user_plugin_path = files.get_abs_path(
        files.USER_DIR, files.PLUGINS_DIR, plugin_name, META_FILE_NAME
    )
    if files.exists(user_plugin_path):
        return files.get_abs_path(files.USER_DIR, files.PLUGINS_DIR, plugin_name)

    # check if the plugin is in the default directory
    default_plugin_path = files.get_abs_path(
        files.PLUGINS_DIR, plugin_name, META_FILE_NAME
    )
    if files.exists(default_plugin_path):
        return files.get_abs_path(files.PLUGINS_DIR, plugin_name)

    return None


@extension.extensible
def uninstall_plugin(plugin_name):
    # call the uninstall hook if any
    call_plugin_hook(plugin_name, "uninstall")
    # then delete
    delete_plugin(plugin_name)


@extension.extensible
def delete_plugin(plugin_name: str):
    plugin_dir = find_plugin_dir(plugin_name)
    if not plugin_dir:
        raise FileNotFoundError(f"Plugin '{plugin_name}' not found")
    custom_plugins_dir = files.get_abs_path(files.USER_DIR, files.PLUGINS_DIR)
    if not files.is_in_dir(plugin_dir, custom_plugins_dir):
        raise ValueError("Only custom plugins can be deleted")

    # delete additional plugin folders
    assets = [asset for asset in find_plugin_assets("", plugin_name=plugin_name) if not asset["path"].startswith(plugin_dir)]
    for asset in assets:
        files.delete_dir(asset["path"])

    send_frontend_reload_notification(
        [plugin_name]
    )  # send before deletion to properly check the extensions, second notification will be skipped automatically

    # does it have python files?
    python_change = bool(files.find_existing_paths_by_pattern(plugin_dir+"/**/*.py"))

    # delete main plugin folder
    files.delete_dir(plugin_dir)

    after_plugin_change([plugin_name], python_change=python_change)


def get_plugin_paths(*subpaths: str) -> List[str]:
    sub = "*/" + "/".join(subpaths) if subpaths else "*"
    paths: List[str] = []
    for root in get_plugin_roots():
        paths.extend(
            files.find_existing_paths_by_pattern(files.get_abs_path(root, sub))
        )
    return paths


def get_enabled_plugin_paths(agent: Agent | None, *subpaths: str) -> List[str]:
    if cached := cache.get(
        ENABLED_PLUGINS_PATHS_CACHE_AREA, cache.determine_cache_key(agent, *subpaths)
    ):
        return cached

    enabled = get_enabled_plugins(agent)
    paths: list[str] = []

    for plugin in enabled:
        base_dir = find_plugin_dir(plugin)
        if not base_dir:
            continue

        if not subpaths:
            if files.exists(base_dir):
                paths.append(base_dir)
            continue

        path_pattern = files.get_abs_path(base_dir, *subpaths)
        paths.extend(files.find_existing_paths_by_pattern(path_pattern))

    cache.add(
        ENABLED_PLUGINS_PATHS_CACHE_AREA,
        cache.determine_cache_key(agent, *subpaths),
        paths,
    )

    return paths


def get_enabled_plugins(agent: Agent | None):
    if cached := cache.get(
        ENABLED_PLUGINS_LIST_CACHE_AREA, cache.determine_cache_key(agent)
    ):
        return cached

    plugins = get_plugins_list()
    active = []

    for plugin in plugins:
        # plugins are toggled via .enabled / .disabled files
        # every plugin is on by default, unless disabled in usr dir
        enabled = True

        # root plugin paths
        plugin_paths = get_plugin_roots(plugin)

        # + agent paths
        if agent:
            from helpers import subagents

            agent_paths = subagents.get_paths(
                agent,
                files.PLUGINS_DIR,
                plugin,
                must_exist_completely=True,
                include_default=False,
                include_user=False,
                include_plugins=False,
                include_project=True,
            )
            plugin_paths = agent_paths + plugin_paths

        # go through paths in reverse order and determine the state
        enabled = determined_toggle_from_paths(enabled, reversed(plugin_paths))

        if enabled:
            active.append(plugin)

    cache.add(ENABLED_PLUGINS_LIST_CACHE_AREA, cache.determine_cache_key(agent), active)

    return active


def determined_toggle_from_paths(default: bool, paths: Iterator[str]):
    enabled = default
    for plugin_path in paths:
        if enabled:
            enabled = not files.exists(
                files.get_abs_path(plugin_path, DISABLED_FILE_NAME)
            )
        else:
            enabled = files.exists(files.get_abs_path(plugin_path, ENABLED_FILE_NAME))
    return enabled


def get_toggle_state(plugin_name: str) -> ToggleState:
    meta = get_plugin_meta(plugin_name)
    if not meta:
        return "disabled"
    if meta.always_enabled:
        return "enabled"

    # root plugin paths
    plugin_paths = get_plugin_roots(plugin_name)
    state = (
        "enabled"
        if determined_toggle_from_paths(True, reversed(plugin_paths))
        else "disabled"
    )

    # additional toggles in project/agent directories, return advanced
    if meta.per_agent_config or meta.per_project_config:
        configs = find_plugin_assets(
            TOGGLE_FILE_PATTERN,
            plugin_name=plugin_name,
            project_name="*" if meta.per_project_config else "",
            agent_profile="*" if meta.per_agent_config else "",
            only_first=False,
        )

        # Advanced if there are specific overrides (project or agent specific)
        if any(c.get("project_name") or c.get("agent_profile") for c in configs):
            state = "advanced"

    return state


@extension.extensible
def toggle_plugin(
    plugin_name: str,
    enabled: bool,
    project_name: str = "",
    agent_profile: str = "",
    clear_overrides: bool = False,
):
    if clear_overrides:
        all_toggles = find_plugin_assets(
            TOGGLE_FILE_PATTERN,
            plugin_name=plugin_name,
            project_name="*",
            agent_profile="*",
            only_first=False,
        )
        for toggle in all_toggles:
            files.delete_file(toggle["path"])

    enabled_file = determine_plugin_asset_path(
        plugin_name, project_name, agent_profile, ENABLED_FILE_NAME
    )
    disabled_file = determine_plugin_asset_path(
        plugin_name, project_name, agent_profile, DISABLED_FILE_NAME
    )

    # ensure clean state by deleting both potential files first
    files.delete_file(enabled_file)
    files.delete_file(disabled_file)

    if enabled:
        files.write_file(enabled_file, "")
    else:
        files.write_file(disabled_file, "")
    after_plugin_change([plugin_name])


@extension.extensible
def get_plugin_config(
    plugin_name: str,
    agent: Agent | None = None,
    project_name: str | None = None,
    agent_profile: str | None = None,
):

    default_used = False

    if project_name is None and agent is not None:
        from helpers import projects

        project_name = projects.get_context_project_name(agent.context)
    if agent_profile is None and agent is not None:
        agent_profile = agent.config.profile

    # find config.json in all possible places
    file = find_plugin_asset(
        plugin_name,
        CONFIG_FILE_NAME,
        project_name=project_name or "",
        agent_profile=agent_profile or "",
    )
    file_path = file.get("path", "") if file else ""

    # use default config if not found
    if not file_path:
        file_path = files.get_abs_path(
            find_plugin_dir(plugin_name), CONFIG_DEFAULT_FILE_NAME
        )
        default_used = True

    result = None
    if file_path and files.exists(file_path):
        result = (
            json.loads if file_path.lower().endswith(".json") else yaml_helper.loads
        )(files.read_file(file_path))

        if default_used:
            _apply_defaults_from_env(plugin_name, result)

    # call plugin hook to modify the standard result if needed
    result = call_plugin_hook(
        plugin_name,
        "get_plugin_config",
        default=result,
        agent=agent,
        project_name=project_name,
        agent_profile=agent_profile,
    )

    return result


def get_default_plugin_config(plugin_name: str):
    file_path = files.get_abs_path(
        find_plugin_dir(plugin_name), CONFIG_DEFAULT_FILE_NAME
    )

    # call plugin hook to get the result
    result = call_plugin_hook(
        plugin_name, "get_default_plugin_config", file_path=file_path
    )

    # or do standard load
    if result is None and file_path and files.exists(file_path):
        result = (
            json.loads if file_path.lower().endswith(".json") else yaml_helper.loads
        )(files.read_file(file_path))

    return result


@extension.extensible
def save_plugin_config(
    plugin_name: str, project_name: str, agent_profile: str, settings: dict
):
    file_path = determine_plugin_asset_path(
        plugin_name, project_name, agent_profile, CONFIG_FILE_NAME
    )

    # call plugin hook to get the result first
    new_settings = call_plugin_hook(
        plugin_name,
        "save_plugin_config",
        default=settings,
        project_name=project_name,
        agent_profile=agent_profile,
        settings=settings,
    )

    # or do standard load
    if new_settings is not None and file_path:
        files.write_file(file_path, json.dumps(new_settings))
        # after_plugin_change([plugin_name]) # don't trigger when only config changes


def find_plugin_asset(
    plugin_name: str, *subpaths: str, project_name="", agent_profile=""
):
    result = find_plugin_assets(
        *subpaths,
        plugin_name=plugin_name,
        project_name=project_name,
        agent_profile=agent_profile,
        only_first=True,
    )
    return result[0] if result else None


def find_plugin_assets(
    *subpaths: str,
    plugin_name: str = "*",
    project_name: str = "*",
    agent_profile: str = "*",
    only_first: bool = False,
) -> list[PluginAssetFile]:
    from helpers import projects, subagents

    results: list[PluginAssetFile] = []

    def _collect(path: str, proj: str, profile: str) -> bool:
        is_glob = glob.has_magic(path)
        matched_paths = (
            files.find_existing_paths_by_pattern(path)
            if is_glob
            else ([path] if files.exists(path) else [])
        )

        need_proj = proj == "*"
        need_prof = profile == "*"

        def _after(s: str, marker: str, last: bool = False) -> str:
            i = s.rfind(marker) if last else s.find(marker)
            if i == -1:
                return ""
            start = i + len(marker)
            end = s.find("/", start)
            return s[start:] if end == -1 else s[start:end]

        for matched in matched_paths:
            inferred_proj = _after(matched, "/projects/") if need_proj else proj
            inferred_prof = (
                _after(matched, "/agents/", last=True) if need_prof else profile
            )
            results.append(
                {
                    "project_name": inferred_proj,
                    "agent_profile": inferred_prof,
                    "path": matched,
                }
            )
            if only_first:
                return True
        return False

    # project/.a0proj/agents/<profile>/plugins/<plugin_name>/...
    if project_name:
        if agent_profile:
            path = projects.get_project_meta(
                project_name,
                files.AGENTS_DIR,
                agent_profile,
                files.PLUGINS_DIR,
                plugin_name,
                *subpaths,
            )
            if _collect(path, project_name, agent_profile):
                return results
        if not agent_profile or agent_profile == "*":
            # project/.a0proj/plugins/<plugin_name>/...
            path = projects.get_project_meta(
                project_name, files.PLUGINS_DIR, plugin_name, *subpaths
            )
            if _collect(path, project_name, ""):
                return results

    # usr/agents/<profile>/plugins/<plugin_name>/...
    if agent_profile:
        path = files.get_abs_path(
            subagents.USER_AGENTS_DIR,
            agent_profile,
            files.PLUGINS_DIR,
            plugin_name,
            *subpaths,
        )
        if _collect(path, "", agent_profile):
            return results

        # usr?/plugins/<any_plugin>/agents/<profile>/plugins/<plugin_name>/...
        for plugin_base in get_enabled_plugin_paths(None):
            path = files.get_abs_path(
                plugin_base,
                files.AGENTS_DIR,
                agent_profile,
                files.PLUGINS_DIR,
                plugin_name,
                *subpaths,
            )
            if _collect(path, "", agent_profile):
                return results

        # agents/<profile>/plugins/<plugin_name>/...
        path = files.get_abs_path(
            subagents.DEFAULT_AGENTS_DIR,
            agent_profile,
            files.PLUGINS_DIR,
            plugin_name,
            *subpaths,
        )
        if _collect(path, "", agent_profile):
            return results

    # usr/plugins/<plugin_name>/...
    path = files.get_abs_path(files.USER_DIR, files.PLUGINS_DIR, plugin_name, *subpaths)
    if _collect(path, "", ""):
        return results

    # plugins/<plugin_name>/...
    path = files.get_abs_path(files.PLUGINS_DIR, plugin_name, *subpaths)
    _collect(path, "", "")

    return results


def determine_plugin_asset_path(
    plugin_name: str, project_name: str, agent_profile: str, *subpaths: str
):
    base_path = files.get_abs_path(files.USER_DIR)

    if project_name:
        from helpers import projects

        base_path = projects.get_project_meta(project_name)

    if agent_profile:
        base_path = files.get_abs_path(base_path, files.AGENTS_DIR, agent_profile)

    return files.get_abs_path(base_path, files.PLUGINS_DIR, plugin_name, *subpaths)


def send_frontend_reload_notification(plugin_names: list[str] | None = None):
    """If the plugin changed has webui extensions, notify frontend to reload the page"""
    global _last_frontend_reload_notification_at

    display_time = 5
    now = time.monotonic()
    if now - _last_frontend_reload_notification_at < display_time:
        return

    if plugin_names:
        has_webui_extension = False
        for plugin_name in plugin_names:
            plugin_dir = find_plugin_dir(plugin_name)
            if plugin_dir and files.exists(
                files.get_abs_path(plugin_dir, "extensions", "webui")
            ):
                has_webui_extension = True
                break
        if not has_webui_extension:
            return

    async def _send_later():
        global _last_frontend_reload_notification_at

        await asyncio.sleep(1)

        _last_frontend_reload_notification_at = time.monotonic()

        notification.NotificationManager.send_notification(
            type=notification.NotificationType.INFO,
            priority=notification.NotificationPriority.NORMAL,
            title="Plugins with frontend extensions updated, page reload recommended",
            message="""<button type="button" class="button confirm" onclick="window.location.reload()"><span class="icon material-symbols-outlined">refresh</span>Reload page</button>""",
            detail="",
            display_time=display_time,
            group="plugins_changed",
            id="plugins_frontend_reload",
        )

    DeferredTask().start_task(_send_later)


def call_plugin_hook(
    plugin_name: str, hook_name: str, default: Any = None, *args, **kwargs
):
    hooks = None

    # use cached hooks if enabled
    if not cache.has(HOOKS_CACHE_AREA, plugin_name):
        plugin_dir = find_plugin_dir(plugin_name)
        if not plugin_dir:
            return default  # plugin directory not found, skip hooks
        hooks_script = files.get_abs_path(plugin_dir, HOOKS_SCRIPT)
        hooks = (
            modules.import_module(hooks_script) if files.exists(hooks_script) else None
        )
        cache.add(HOOKS_CACHE_AREA, plugin_name, hooks)
    else:
        hooks = cache.get(HOOKS_CACHE_AREA, plugin_name)

    if not hooks:
        return default

    hook = getattr(hooks, hook_name, None)
    if not hook:
        return default

    if asyncio.iscoroutinefunction(hook):
        return asyncio.run(functions.safe_call(hook, *args, default=default, **kwargs))

    return functions.safe_call(hook, *args, default=default, **kwargs)


def _apply_defaults_from_env(plugin_name: str, config: dict[str, Any]):
    from helpers.settings import get_default_value

    def _apply(prefix: list[str], value: dict[str, Any]):
        for key, child in value.items():
            env_name = "__".join([plugin_name, *prefix, key])
            value[key] = get_default_value(env_name, child)
            if isinstance(value[key], dict):
                _apply([*prefix, key], value[key])

    _apply([], config)