| |
| """ |
| agent_add.py -- Add new agents with intake gate + wiki ingestion. |
| |
| Mirror of ``skill_add`` for the agent subject type. Agents live as flat |
| ``.md`` files under ``~/.claude/agents/`` (unlike skills, which each get |
| their own directory). The flow is therefore shorter: |
| |
| validate name -> read content -> intake gate -> copy file -> |
| record embedding -> write wiki entity page -> log |
| |
| Usage |
| ----- |
| # Single agent |
| python agent_add.py --agent-path /path/to/agent.md --name my-agent \ |
| --wiki ~/.claude/skill-wiki --agents-dir ~/.claude/agents |
| |
| # Batch from directory (every *.md at depth 1 is treated as an agent) |
| python agent_add.py --scan-dir /path/to/new-agents/ \ |
| --wiki ~/.claude/skill-wiki --agents-dir ~/.claude/agents |
| """ |
|
|
| import argparse |
| import os |
| import sys |
| from datetime import datetime, timezone |
| from pathlib import Path |
|
|
| from ctx.core.entity_update import build_update_review, render_update_review |
| from ctx_config import cfg |
| from ctx.adapters.claude_code.install.install_utils import safe_copy_file |
| from intake_pipeline import IntakeRejected, check_intake, record_embedding |
| from wiki_batch_entities import generate_agent_page |
| from ctx.core.wiki.wiki_queue import enqueue_entity_upsert |
| from ctx.core.wiki.wiki_packs import ( |
| load_merged_wiki_pages, |
| write_active_wiki_overlay_pack, |
| ) |
| from ctx.core.wiki.wiki_sync import append_log, ensure_wiki, update_index |
| from ctx.core.wiki.wiki_utils import validate_skill_name |
| from ctx.utils._fs_utils import reject_symlink_path, safe_atomic_write_text |
|
|
| TODAY = datetime.now(timezone.utc).strftime("%Y-%m-%d") |
|
|
| |
| _MAX_AGENT_BYTES = 1_048_576 |
|
|
|
|
| def install_agent(source: Path, agents_dir: Path, name: str) -> Path: |
| """Copy agent file into ``agents_dir/<name>.md``. |
| |
| Unlike :func:`skill_add.install_skill`, agents are flat single-file |
| entries β no per-agent subdirectory. |
| """ |
| dest = agents_dir / f"{name}.md" |
| safe_copy_file(source, dest, dest_root=agents_dir) |
| return dest |
|
|
|
|
| def mirror_agent_body(installed_path: Path, wiki_path: Path, name: str) -> Path: |
| """Mirror the installed agent body into the wiki install source tree.""" |
| mirror_root = wiki_path / "converted-agents" |
| dest = mirror_root / f"{name}.md" |
| safe_copy_file(installed_path, dest, dest_root=mirror_root) |
| return dest |
|
|
|
|
| def write_entity_page(wiki_path: Path, name: str, content: str) -> bool: |
| """Write agent entity page. Returns True if newly created.""" |
| relpath = f"entities/agents/{name}.md" |
| page = wiki_path / relpath |
| packs_dir = wiki_path / "wiki-packs" |
| is_new = _read_entity_page_text(wiki_path, name) is None |
| if page.exists() or not packs_dir.is_dir(): |
| reject_symlink_path(page) |
| safe_atomic_write_text(page, content, encoding="utf-8") |
| if packs_dir.is_dir(): |
| write_active_wiki_overlay_pack( |
| packs_dir=packs_dir, |
| pages={relpath: content}, |
| tombstones=[], |
| ) |
| return is_new |
|
|
|
|
| def _read_entity_page_text(wiki_path: Path, name: str) -> str | None: |
| relpath = f"entities/agents/{name}.md" |
| packs_dir = wiki_path / "wiki-packs" |
| if packs_dir.is_dir(): |
| pages = load_merged_wiki_pages(packs_dir) |
| if relpath in pages: |
| return pages[relpath] |
| entity_page = wiki_path / relpath |
| if entity_page.exists(): |
| return entity_page.read_text(encoding="utf-8", errors="replace") |
| return None |
|
|
|
|
| def _existing_agent_review_text(wiki_path: Path, name: str, installed_path: Path) -> str: |
| existing_page = _read_entity_page_text(wiki_path, name) |
| if existing_page is not None: |
| existing = existing_page |
| if installed_path.exists(): |
| installed = installed_path.read_text(encoding="utf-8", errors="replace") |
| existing += f"\n\n## Installed agent definition\n\n{installed}" |
| return existing |
| return installed_path.read_text(encoding="utf-8", errors="replace") |
|
|
|
|
| def _proposed_agent_review_text( |
| *, |
| name: str, |
| source_path: Path, |
| source_content: str, |
| ) -> str: |
| page_content = generate_agent_page(name, source_path) |
| return f"{page_content}\n\n## Proposed agent definition\n\n{source_content}" |
|
|
|
|
| def add_agent( |
| *, |
| source_path: Path, |
| name: str, |
| wiki_path: Path, |
| agents_dir: Path, |
| review_existing: bool = False, |
| update_existing: bool = False, |
| ) -> dict: |
| """Add a single agent: validate, gate, install, ingest, log. |
| |
| Returns a result dict with keys: name, installed, is_new_page. |
| """ |
| validate_skill_name(name) |
| reject_symlink_path(source_path) |
|
|
| file_size = source_path.stat().st_size |
| if file_size > _MAX_AGENT_BYTES: |
| raise ValueError( |
| f"agent file too large ({file_size:,} bytes). Max " |
| f"{_MAX_AGENT_BYTES:,}. Trim before ingestion." |
| ) |
|
|
| content = source_path.read_text(encoding="utf-8-sig", errors="replace") |
| line_count = len(content.splitlines()) |
|
|
| installed_path = agents_dir / f"{name}.md" |
| has_existing = ( |
| installed_path.exists() |
| or _read_entity_page_text(wiki_path, name) is not None |
| ) |
|
|
| if review_existing and has_existing and not update_existing: |
| review = build_update_review( |
| entity_type="agent", |
| slug=name, |
| existing_text=_existing_agent_review_text(wiki_path, name, installed_path), |
| proposed_text=_proposed_agent_review_text( |
| name=name, |
| source_path=source_path, |
| source_content=content, |
| ), |
| ) |
| return { |
| "name": name, |
| "installed": str(installed_path), |
| "is_new_page": False, |
| "skipped": True, |
| "update_required": True, |
| "update_review": render_update_review(review), |
| } |
|
|
| decision = None |
| if not has_existing: |
| |
| |
| |
| decision = check_intake(content, "agents") |
| if not decision.allow: |
| raise IntakeRejected(decision) |
|
|
| |
| installed_path = install_agent(source_path, agents_dir, name) |
| mirror_agent_body(installed_path, wiki_path, name) |
|
|
| |
| |
| try: |
| record_embedding(subject_id=name, raw_md=content, subject_type="agents") |
| except Exception as exc: |
| print( |
| f"Warning: failed to record intake embedding for {name}: {exc}", |
| file=sys.stderr, |
| ) |
|
|
| |
| |
| page_content = generate_agent_page(name, installed_path) |
| is_new = write_entity_page(wiki_path, name, page_content) |
|
|
| |
| if is_new: |
| update_index(str(wiki_path), [name], subject_type="agents") |
|
|
| log_details = [ |
| f"Source: {source_path}", |
| f"Installed: {installed_path}", |
| f"Lines: {line_count}", |
| ] |
| warnings = decision.warnings if decision is not None else () |
| if warnings: |
| log_details.append( |
| "Warnings: " + "; ".join(f"{w.code}:{w.message}" for w in warnings) |
| ) |
| append_log(str(wiki_path), "add-agent", name, log_details) |
| queue_job = enqueue_entity_upsert( |
| wiki_path, |
| entity_type="agent", |
| slug=name, |
| entity_path=wiki_path / "entities" / "agents" / f"{name}.md", |
| content=page_content, |
| action="created" if is_new else "updated", |
| source="agent_add", |
| ) |
|
|
| return { |
| "name": name, |
| "installed": str(installed_path), |
| "is_new_page": is_new, |
| "skipped": False, |
| "update_required": False, |
| "queued_job_id": queue_job.id, |
| } |
|
|
|
|
| |
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser(description="Add new agents with wiki ingestion") |
| parser.add_argument("--agent-path", help="Path to a single agent .md to add") |
| parser.add_argument("--name", help="Agent name (required with --agent-path)") |
| parser.add_argument( |
| "--scan-dir", |
| help="Directory of agent .md files to batch-add (non-recursive)", |
| ) |
| parser.add_argument( |
| "--skip-existing", |
| action="store_true", |
| help="Skip agents already installed (prevents overwrites)", |
| ) |
| parser.add_argument( |
| "--update-existing", |
| action="store_true", |
| help="Apply the reviewed replacement when an agent already exists", |
| ) |
| parser.add_argument("--wiki", default=str(cfg.wiki_dir), help="Wiki path") |
| parser.add_argument( |
| "--agents-dir", |
| default=str(cfg.agents_dir), |
| help="Agents install path", |
| ) |
| args = parser.parse_args() |
|
|
| wiki_path = Path(os.path.expanduser(args.wiki)) |
| agents_dir = Path(os.path.expanduser(args.agents_dir)) |
|
|
| ensure_wiki(str(wiki_path)) |
| agents_dir.mkdir(parents=True, exist_ok=True) |
|
|
| if args.agent_path and args.scan_dir: |
| print("Error: use --agent-path or --scan-dir, not both.", file=sys.stderr) |
| sys.exit(1) |
|
|
| if not args.agent_path and not args.scan_dir: |
| print("Error: --agent-path or --scan-dir is required.", file=sys.stderr) |
| sys.exit(1) |
|
|
| candidates: list[tuple[Path, str]] = [] |
|
|
| if args.agent_path: |
| if not args.name: |
| print("Error: --name is required with --agent-path.", file=sys.stderr) |
| sys.exit(1) |
| source = Path(os.path.expanduser(args.agent_path)) |
| if not source.exists(): |
| print(f"Error: {source} does not exist.", file=sys.stderr) |
| sys.exit(1) |
| candidates.append((source, args.name)) |
|
|
| if args.scan_dir: |
| scan_root = Path(os.path.expanduser(args.scan_dir)) |
| if not scan_root.exists(): |
| print(f"Error: {scan_root} does not exist.", file=sys.stderr) |
| sys.exit(1) |
| |
| |
| |
| for agent_md in sorted(scan_root.glob("*.md")): |
| candidates.append((agent_md, agent_md.stem)) |
|
|
| if not candidates: |
| print(f"No agent .md files found under {scan_root}.", file=sys.stderr) |
| sys.exit(0) |
|
|
| added = updated = skipped = rejected = errors = 0 |
| total = len(candidates) |
| for i, (source_path, name) in enumerate(candidates, 1): |
| if args.skip_existing and (agents_dir / f"{name}.md").exists(): |
| skipped += 1 |
| if skipped <= 5 or skipped % 100 == 0: |
| print(f" [{i}/{total}] [skipped] {name}") |
| continue |
| try: |
| result = add_agent( |
| source_path=source_path, |
| name=name, |
| wiki_path=wiki_path, |
| agents_dir=agents_dir, |
| review_existing=True, |
| update_existing=args.update_existing, |
| ) |
| if result.get("skipped"): |
| skipped += 1 |
| if result.get("update_review"): |
| print(result["update_review"]) |
| print(f" [{i}/{total}] [update-review] {name}") |
| continue |
| if result["is_new_page"]: |
| added += 1 |
| status = "installed" |
| else: |
| updated += 1 |
| status = "updated" |
| print(f" [{i}/{total}] [{status}] {name}") |
| except IntakeRejected as exc: |
| rejected += 1 |
| codes = ", ".join(f.code for f in exc.decision.failures) or "unknown" |
| print(f" [{i}/{total}] [rejected] {name}: {codes}", file=sys.stderr) |
| except Exception as exc: |
| errors += 1 |
| print(f" [{i}/{total}] ERROR: {name}: {exc}", file=sys.stderr) |
|
|
| print( |
| f"\nDone: {added} added, {updated} updated, {skipped} skipped, " |
| f"{rejected} rejected, {errors} errors" |
| ) |
| if rejected or errors: |
| sys.exit(1) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|