File size: 32,281 Bytes
399b80c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
"""OpenSpace MCP Server

Exposes the following tools to MCP clients:
  execute_task   β€” Delegate a task (auto-registers skills, auto-searches, auto-evolves)
  search_skills  β€” Standalone search across local & cloud skills
  fix_skill      β€” Manually fix a broken skill (FIX only; DERIVED/CAPTURED via execute_task)
  upload_skill   β€” Upload a local skill to cloud (pre-saved metadata, bot decides visibility)

Usage:
    python -m openspace.mcp_server                     # stdio (default)
    python -m openspace.mcp_server --transport sse     # SSE on port 8080
    python -m openspace.mcp_server --port 9090         # SSE on custom port

Environment variables: see ``openspace/host_detection/`` and ``openspace/cloud/auth.py``.
"""

from __future__ import annotations

import asyncio
import inspect
import json
import logging
import os
import sys
import traceback
from pathlib import Path
from typing import Any, Dict, List, Optional


class _MCPSafeStdout:
    """Stdout wrapper: binary (.buffer) β†’ real stdout, text (.write) β†’ stderr."""

    def __init__(self, real_stdout, stderr):
        self._real = real_stdout
        self._stderr = stderr

    @property
    def buffer(self):
        return self._real.buffer

    def fileno(self):
        return self._real.fileno()

    def write(self, s):
        return self._stderr.write(s)

    def writelines(self, lines):
        return self._stderr.writelines(lines)

    def flush(self):
        self._stderr.flush()
        self._real.flush()

    def isatty(self):
        return self._stderr.isatty()

    @property
    def encoding(self):
        return self._stderr.encoding

    @property
    def errors(self):
        return self._stderr.errors

    @property
    def closed(self):
        return self._stderr.closed

    def readable(self):
        return False

    def writable(self):
        return True

    def seekable(self):
        return False

    def __getattr__(self, name):
        return getattr(self._stderr, name)

_LOG_DIR = Path(__file__).resolve().parent.parent / "logs"
_LOG_DIR.mkdir(parents=True, exist_ok=True)

_real_stdout = sys.stdout

# Windows pipe buffers are small. When using stdio MCP transport,
# the parent process only reads stdout for MCP messages and does NOT
# drain stderr. Heavy log/print output during execute_task fills the stderr
# pipe buffer, blocking this process on write() β†’ deadlock β†’ timeout.
# Redirect stderr to a log file on Windows to prevent this.
if os.name == "nt":
    _stderr_file = open(
        _LOG_DIR / "mcp_stderr.log", "a", encoding="utf-8", buffering=1
    )
    sys.stderr = _stderr_file

sys.stdout = _MCPSafeStdout(_real_stdout, sys.stderr)

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[logging.FileHandler(_LOG_DIR / "mcp_server.log")],
)
logger = logging.getLogger("openspace.mcp_server")

from mcp.server.fastmcp import FastMCP

_fastmcp_kwargs: dict = {}
try:
    if "description" in inspect.signature(FastMCP.__init__).parameters:
        _fastmcp_kwargs["description"] = (
            "OpenSpace: Unite the Agents. Evolve the Mind. Rebuild the World."
        )
except (TypeError, ValueError):
    pass

mcp = FastMCP("OpenSpace", **_fastmcp_kwargs)

_openspace_instance = None
_openspace_lock = asyncio.Lock()
_standalone_store = None

# Internal state: tracks bot skill directories already registered this session.
_registered_skill_dirs: set = set()

_UPLOAD_META_FILENAME = ".upload_meta.json"


async def _get_openspace():
    """Lazy-initialise the OpenSpace engine."""
    global _openspace_instance
    if _openspace_instance is not None and _openspace_instance.is_initialized():
        return _openspace_instance

    async with _openspace_lock:
        if _openspace_instance is not None and _openspace_instance.is_initialized():
            return _openspace_instance

        logger.info("Initializing OpenSpace engine ...")
        from openspace.tool_layer import OpenSpace, OpenSpaceConfig
        from openspace.host_detection import build_llm_kwargs, build_grounding_config_path

        env_model = os.environ.get("OPENSPACE_MODEL", "")
        workspace = os.environ.get("OPENSPACE_WORKSPACE")
        max_iter = int(os.environ.get("OPENSPACE_MAX_ITERATIONS", "20"))
        enable_rec = os.environ.get("OPENSPACE_ENABLE_RECORDING", "true").lower() in ("true", "1", "yes")

        backend_scope_raw = os.environ.get("OPENSPACE_BACKEND_SCOPE")
        backend_scope = (
            [b.strip() for b in backend_scope_raw.split(",") if b.strip()]
            if backend_scope_raw else None
        )

        config_path = build_grounding_config_path()
        model, llm_kwargs = build_llm_kwargs(env_model)

        _pkg_root = str(Path(__file__).resolve().parent.parent)
        recording_base = workspace or _pkg_root
        recording_log_dir = str(Path(recording_base) / "logs" / "recordings")

        config = OpenSpaceConfig(
            llm_model=model,
            llm_kwargs=llm_kwargs,
            workspace_dir=workspace,
            grounding_max_iterations=max_iter,
            enable_recording=enable_rec,
            recording_backends=["shell"] if enable_rec else None, # ["shell", "mcp", "web"] if enable_rec else None
            recording_log_dir=recording_log_dir,
            backend_scope=backend_scope,
            grounding_config_path=config_path,
        )

        _openspace_instance = OpenSpace(config=config)
        await _openspace_instance.initialize()
        logger.info("OpenSpace engine ready (model=%s).", model)

        # Auto-register host bot skill directories from env (set once by human)
        host_skill_dirs_raw = os.environ.get("OPENSPACE_HOST_SKILL_DIRS", "")
        if host_skill_dirs_raw:
            dirs = [d.strip() for d in host_skill_dirs_raw.split(",") if d.strip()]
            if dirs:
                await _auto_register_skill_dirs(dirs)
                logger.info("Auto-registered host skill dirs from OPENSPACE_HOST_SKILL_DIRS: %s", dirs)

        return _openspace_instance


def _get_store():
    """Get SkillStore β€” reuses OpenSpace's internal instance when available."""
    global _standalone_store
    if _openspace_instance and _openspace_instance.is_initialized():
        internal = getattr(_openspace_instance, "_skill_store", None)
        if internal and not internal._closed:
            return internal
    if _standalone_store is None or _standalone_store._closed:
        from openspace.skill_engine import SkillStore
        _standalone_store = SkillStore()
    return _standalone_store


def _get_cloud_client():
    """Get a OpenSpaceClient instance (raises CloudError if not configured)."""
    from openspace.cloud.auth import get_openspace_auth
    from openspace.cloud.client import OpenSpaceClient
    auth_headers, api_base = get_openspace_auth()
    return OpenSpaceClient(auth_headers, api_base)


def _write_upload_meta(skill_dir: Path, info: Dict[str, Any]) -> None:
    """Write ``.upload_meta.json`` so ``upload_skill`` can read pre-saved metadata.

    Called after evolution (execute_task auto-evolve or fix_skill).
    The bot then only needs to provide ``skill_dir`` + ``visibility``
    when uploading β€” everything else is pre-filled.
    """
    meta = {
        "origin": info.get("origin", "imported"),
        "parent_skill_ids": info.get("parent_skill_ids", []),
        "change_summary": info.get("change_summary", ""),
        "created_by": info.get("created_by", "openspace"),
        "tags": info.get("tags", []),
    }
    meta_path = skill_dir / _UPLOAD_META_FILENAME
    try:
        meta_path.write_text(
            json.dumps(meta, ensure_ascii=False, indent=2) + "\n",
            encoding="utf-8",
        )
        logger.debug(f"Wrote upload metadata to {meta_path}")
    except Exception as e:
        logger.warning(f"Failed to write upload metadata: {e}")


def _read_upload_meta(skill_dir: Path) -> Dict[str, Any]:
    """Read upload metadata with three-tier fallback.

    Resolution order:
      1. ``.upload_meta.json`` sidecar file (written right after evolution)
      2. SkillStore DB lookup by path (long-term persistence)
      3. Empty dict (caller applies defaults)

    This ensures metadata survives even if the sidecar file is deleted
    or the user comes back to upload much later.
    """
    # Tier 1: sidecar file
    meta_path = skill_dir / _UPLOAD_META_FILENAME
    if meta_path.exists():
        try:
            data = json.loads(meta_path.read_text(encoding="utf-8"))
            if data:
                return data
        except (json.JSONDecodeError, OSError) as e:
            logger.warning(f"Failed to read upload metadata file: {e}")

    # Tier 2: DB lookup
    try:
        store = _get_store()
        rec = store.load_record_by_path(str(skill_dir))
        if rec:
            logger.debug(f"Upload metadata resolved from DB for {skill_dir}")
            return {
                "origin": rec.lineage.origin.value,
                "parent_skill_ids": rec.lineage.parent_skill_ids,
                "change_summary": rec.lineage.change_summary,
                "created_by": rec.lineage.created_by or "",
                "tags": rec.tags,
            }
    except Exception as e:
        logger.debug(f"DB upload metadata lookup failed: {e}")

    return {}


async def _auto_register_skill_dirs(skill_dirs: List[str]) -> int:
    """Register bot skill directories into OpenSpace's SkillRegistry + DB.

    Called automatically by ``execute_task`` on every invocation. Directories
    are re-scanned each time so that skills created by the host bot since the last call are discovered immediately.
    """
    global _registered_skill_dirs

    valid_dirs = [Path(d) for d in skill_dirs if Path(d).is_dir()]
    if not valid_dirs:
        return 0

    openspace = await _get_openspace()
    registry = openspace._skill_registry
    if not registry:
        logger.warning("_auto_register_skill_dirs: SkillRegistry not initialized")
        return 0

    added = registry.discover_from_dirs(valid_dirs)

    db_created = 0
    if added:
        store = _get_store()
        db_created = await store.sync_from_registry(added)

    is_first = any(d not in _registered_skill_dirs for d in skill_dirs)
    for d in skill_dirs:
        _registered_skill_dirs.add(d)

    if added:
        action = "Auto-registered" if is_first else "Re-scanned & found"
        logger.info(
            f"{action} {len(added)} skill(s) from {len(valid_dirs)} dir(s), "
            f"{db_created} new DB record(s)"
        )
    return len(added)


async def _cloud_search_and_import(task: str, limit: int = 8) -> List[Dict[str, Any]]:
    """Search cloud for skills relevant to *task* and auto-import top hits.

    This is **stage 1** of a two-stage pipeline:
      Stage 1 (here): cloud BM25+embedding β†’ pick top-N to import locally.
      Stage 2 (tool_layer): local BM25 + LLM β†’ select from ALL local skills
                            (including ones just imported) for injection.

    Stage 1 intentionally imports more than will be used (default: 8) so
    that stage 2 has a larger pool to choose from.  The two BM25 passes
    are NOT redundant β€” stage 1 filters thousands of cloud candidates down
    to a manageable import set; stage 2 makes the final task-specific choice.
    """
    try:
        from openspace.cloud.search import (
            SkillSearchEngine, build_cloud_candidates,
        )
        from openspace.cloud.embedding import generate_embedding, resolve_embedding_api

        client = _get_cloud_client()
        embedding_api_key, _ = resolve_embedding_api()
        has_embedding = bool(embedding_api_key)

        items = await asyncio.to_thread(
            client.fetch_metadata, include_embedding=has_embedding, limit=200,
        )
        if not items:
            return []

        candidates = build_cloud_candidates(items)
        if not candidates:
            return []

        query_embedding: Optional[List[float]] = None
        if has_embedding:
            query_embedding = await asyncio.to_thread(
                generate_embedding, task,
            )

        engine = SkillSearchEngine()
        results = engine.search(task, candidates, query_embedding=query_embedding, limit=limit * 2)

        cloud_hits = [
            r for r in results
            if r.get("source") == "cloud"
            and r.get("visibility", "public") == "public"
            and r.get("skill_id")
        ][:limit]

        import_results: List[Dict[str, Any]] = []
        for hit in cloud_hits:
            try:
                imp = await _do_import_cloud_skill(skill_id=hit["skill_id"])
                import_results.append({
                    "skill_id": hit["skill_id"],
                    "name": hit.get("name", ""),
                    "import_status": imp.get("status", "error"),
                    "local_path": imp.get("local_path", ""),
                })
            except Exception as e:
                logger.warning(f"Cloud import failed for {hit['skill_id']}: {e}")

        if import_results:
            logger.info(f"Cloud search imported {len(import_results)} skill(s)")
        return import_results

    except Exception as e:
        logger.warning(f"_cloud_search_and_import failed (non-fatal): {e}")
        return []


async def _do_import_cloud_skill(skill_id: str, target_dir: Optional[str] = None) -> Dict[str, Any]:
    """Download a cloud skill and register it locally."""
    client = _get_cloud_client()

    if target_dir:
        base_dir = Path(target_dir)
    else:
        host_ws = (
            os.environ.get("NANOBOT_WORKSPACE")
            or os.environ.get("OPENCLAW_STATE_DIR")
        )
        if host_ws:
            base_dir = Path(host_ws) / "skills"
            base_dir.mkdir(parents=True, exist_ok=True)
        else:
            openspace = await _get_openspace()
            skill_cfg = openspace._grounding_config.skills if openspace._grounding_config else None
            if skill_cfg and skill_cfg.skill_dirs:
                base_dir = Path(skill_cfg.skill_dirs[0])
            else:
                base_dir = Path(__file__).resolve().parent / "skills"

    result = await asyncio.to_thread(client.import_skill, skill_id, base_dir)

    skill_dir = Path(result.get("local_path", ""))
    if skill_dir.exists():
        openspace = await _get_openspace()
        registry = openspace._skill_registry
        if registry:
            meta = registry.register_skill_dir(skill_dir)
            if meta:
                store = _get_store()
                await store.sync_from_registry([meta])
                result["registered"] = True

    result.setdefault("registered", False)
    return result


def _format_task_result(result: Dict[str, Any]) -> Dict[str, Any]:
    """Format an OpenSpace execution result for MCP transport."""
    tool_execs = result.get("tool_executions", [])
    tool_summary = [
        {
            "tool": te.get("tool_name", te.get("tool", "")),
            "status": te.get("status", ""),
            "error": te.get("error", "")[:200] if te.get("error") else None,
        }
        for te in tool_execs[:20]
    ]

    output: Dict[str, Any] = {
        "status": result.get("status", "unknown"),
        "response": result.get("response", ""),
        "execution_time": round(result.get("execution_time", 0), 2),
        "iterations": result.get("iterations", 0),
        "skills_used": result.get("skills_used", []),
        "task_id": result.get("task_id", ""),
        "tool_call_count": len(tool_execs),
        "tool_summary": tool_summary,
    }
    if result.get("warning"):
        output["warning"] = result["warning"]

    # Format evolved_skills with skill_dir and upload_ready flag
    raw_evolved = result.get("evolved_skills", [])
    if raw_evolved:
        formatted_evolved = []
        for es in raw_evolved:
            skill_path = es.get("path", "")
            skill_dir = str(Path(skill_path).parent) if skill_path else ""
            formatted_evolved.append({
                "skill_dir": skill_dir,
                "name": es.get("name", ""),
                "origin": es.get("origin", ""),
                "change_summary": es.get("change_summary", ""),
                "upload_ready": bool(skill_dir),
            })
        output["evolved_skills"] = formatted_evolved
        # Prompt the calling agent to upload evolved skills
        names = [es["name"] for es in formatted_evolved if es.get("upload_ready")]
        if names:
            output["action_required"] = (
                f"OpenSpace auto-evolved {len(names)} skill(s): {', '.join(names)}. "
                f"Follow the 'When to upload' rules in your delegate-task skill to "
                f"decide visibility, then upload via upload_skill. "
                f"Tell the user what you evolved and what you uploaded."
            )

    return output


def _json_ok(data: Any) -> str:
    return json.dumps(data, ensure_ascii=False, indent=2)


def _json_error(error: Any, **extra) -> str:
    return json.dumps({"error": str(error), **extra}, ensure_ascii=False)


# MCP Tools (4 tools)
@mcp.tool()
async def execute_task(
    task: str,
    workspace_dir: str | None = None,
    max_iterations: int | None = None,
    skill_dirs: list[str] | None = None,
    search_scope: str = "all",
) -> str:
    """Execute a task with OpenSpace's full grounding engine.

    OpenSpace will:
    1. Auto-register bot skills from skill_dirs (if provided)
    2. Search for relevant skills (scope controls local vs cloud+local)
    3. Attempt skill-guided execution β†’ fallback to pure tools
    4. Auto-analyze β†’ auto-evolve (FIX/DERIVED/CAPTURED) if needed

    If skills are auto-evolved, the response includes ``evolved_skills``
    with ``upload_ready: true``.  Call ``upload_skill`` with just the
    ``skill_dir`` + ``visibility`` to upload β€” metadata is pre-saved.

    Note: This call blocks until the task completes (may take minutes).
    Set MCP client tool-call timeout β‰₯ 600 seconds.

    Args:
        task: The task instruction (natural language).
        workspace_dir: Working directory. Defaults to OPENSPACE_WORKSPACE env.
        max_iterations: Max agent iterations (default: 20).
        skill_dirs: Bot's skill directories to auto-register so OpenSpace
                    can select and track them.  Directories are re-scanned
                    on every call to discover skills created since the last
                    invocation.
        search_scope: Skill search scope before execution.
                      "all" (default) β€” local + cloud; falls back to local
                      if no API key is configured.
                      "local" β€” local SkillRegistry only (fast, no cloud).
    """
    try:
        openspace = await _get_openspace()

        # Re-scan host skill directories (from env) to pick up skills
        # created by the host bot since the last call.
        host_skill_dirs_raw = os.environ.get("OPENSPACE_HOST_SKILL_DIRS", "")
        if host_skill_dirs_raw:
            env_dirs = [d.strip() for d in host_skill_dirs_raw.split(",") if d.strip()]
            if env_dirs:
                await _auto_register_skill_dirs(env_dirs)

        # Auto-register bot skill directories (from call parameter)
        if skill_dirs:
            await _auto_register_skill_dirs(skill_dirs)

        # Cloud search + import (if requested)
        imported_skills: List[Dict[str, Any]] = []
        if search_scope == "all":
            imported_skills = await _cloud_search_and_import(task)

        # Execute
        result = await openspace.execute(
            task=task,
            workspace_dir=workspace_dir,
            max_iterations=max_iterations,
        )

        # Write .upload_meta.json for each evolved skill
        for es in result.get("evolved_skills", []):
            skill_path = es.get("path", "")
            if skill_path:
                _write_upload_meta(Path(skill_path).parent, es)

        formatted = _format_task_result(result)
        if imported_skills:
            formatted["imported_skills"] = imported_skills
        return _json_ok(formatted)

    except Exception as e:
        logger.error(f"execute_task failed: {e}", exc_info=True)
        return _json_error(e, status="error", traceback=traceback.format_exc(limit=5))


@mcp.tool()
async def search_skills(
    query: str,
    source: str = "all",
    limit: int = 20,
    auto_import: bool = True,
) -> str:
    """Search skills across local registry and cloud community.

    Standalone search for browsing / discovery.  Use this when the bot
    wants to find available skills, then decide whether to handle the
    task locally or delegate to ``execute_task``.

    **Scope difference from execute_task**:
      - ``search_skills`` returns results to the bot for decision-making.
      - ``execute_task``'s internal search feeds directly into execution
        (the bot never sees the search results).

    Uses hybrid ranking: BM25 β†’ embedding re-rank β†’ lexical boost.
    Embedding requires OPENAI_API_KEY; falls back to lexical-only without it.

    Args:
        query: Search query text (natural language or keywords).
        source: "all" (cloud + local), "local", or "cloud".  Default: "all".
        limit: Maximum results to return (default: 20).
        auto_import: Auto-download top public cloud skills (default: True).
    """
    try:
        from openspace.cloud.search import hybrid_search_skills

        q = query.strip()
        if not q:
            return _json_ok({"results": [], "count": 0})

        # Re-scan host skill directories so newly created skills are searchable.
        local_skills = None
        store = None
        if source in ("all", "local"):
            openspace = await _get_openspace()

            host_skill_dirs_raw = os.environ.get("OPENSPACE_HOST_SKILL_DIRS", "")
            if host_skill_dirs_raw:
                env_dirs = [d.strip() for d in host_skill_dirs_raw.split(",") if d.strip()]
                if env_dirs:
                    await _auto_register_skill_dirs(env_dirs)

            registry = openspace._skill_registry
            if registry:
                local_skills = registry.list_skills()
                store = _get_store()

        results = await hybrid_search_skills(
            query=q,
            local_skills=local_skills,
            store=store,
            source=source,
            limit=limit,
        )

        _AUTO_IMPORT_MAX = 3
        import_summary: List[Dict[str, Any]] = []
        if auto_import:
            cloud_results = [
                r for r in results
                if r.get("source") == "cloud"
                and r.get("visibility", "public") == "public"
                and r.get("skill_id")
            ][:_AUTO_IMPORT_MAX]
            for cr in cloud_results:
                try:
                    imp_result = await _do_import_cloud_skill(skill_id=cr["skill_id"])
                    status = imp_result.get("status", "error")
                    import_summary.append({
                        "skill_id": cr["skill_id"],
                        "name": cr.get("name", ""),
                        "import_status": status,
                        "local_path": imp_result.get("local_path", ""),
                    })
                    if status in ("success", "already_exists"):
                        cr["auto_imported"] = True
                        cr["local_path"] = imp_result.get("local_path", "")
                except Exception as imp_err:
                    logger.warning(f"auto_import failed for {cr['skill_id']}: {imp_err}")
                    import_summary.append({
                        "skill_id": cr["skill_id"],
                        "import_status": "error",
                        "error": str(imp_err),
                    })

        output: Dict[str, Any] = {"results": results, "count": len(results)}
        if import_summary:
            output["auto_import_summary"] = import_summary
        return _json_ok(output)

    except Exception as e:
        logger.error(f"search_skills failed: {e}", exc_info=True)
        return _json_error(e)


@mcp.tool()
async def fix_skill(
    skill_dir: str,
    direction: str,
) -> str:
    """Manually fix a broken skill.

    This is the **only** manual evolution entry point.  DERIVED and
    CAPTURED evolutions are triggered automatically by ``execute_task``
    (they need a task to run).  Use ``fix_skill`` when:

      - A skill's instructions are wrong or outdated
      - The bot knows exactly which skill is broken and what to fix
      - Auto-evolution inside ``execute_task`` didn't catch the issue

    The skill does NOT need to be pre-registered in OpenSpace β€”
    provide the skill directory path and OpenSpace will register it
    automatically before fixing.

    After fixing, the new skill is saved locally and ``.upload_meta.json``
    is pre-written.  Call ``upload_skill`` with just ``skill_dir`` +
    ``visibility`` to upload.

    Args:
        skill_dir: Path to the broken skill directory (must contain SKILL.md).
        direction: What's broken and how to fix it.  Be specific:
                   e.g. "The API endpoint changed from v1 to v2" or
                   "Add retry logic for HTTP 429 rate limit errors".
    """
    try:
        from openspace.skill_engine.types import EvolutionSuggestion, EvolutionType
        from openspace.skill_engine.evolver import EvolutionContext, EvolutionTrigger

        if not direction:
            return _json_error("direction is required β€” describe what to fix.")

        skill_path = Path(skill_dir)
        skill_md = skill_path / "SKILL.md"
        if not skill_md.exists():
            return _json_error(f"SKILL.md not found in {skill_dir}")

        openspace = await _get_openspace()
        registry = openspace._skill_registry
        if not registry:
            return _json_error("SkillRegistry not initialized")
        if not openspace._skill_evolver:
            return _json_error("Skill evolution is not enabled")

        # Step 1: Register the skill (idempotent)
        meta = registry.register_skill_dir(skill_path)
        if not meta:
            return _json_error(f"Failed to register skill from {skill_dir}")

        store = _get_store()
        await store.sync_from_registry([meta])

        # Step 2: Load record + content
        rec = store.load_record(meta.skill_id)
        if not rec:
            return _json_error(f"Failed to load skill record for {meta.skill_id}")

        evolver = openspace._skill_evolver
        content = evolver._load_skill_content(rec)
        if not content:
            return _json_error(f"Cannot load content for skill: {meta.skill_id}")

        # Step 3: Run FIX evolution
        recent = store.load_analyses(skill_id=meta.skill_id, limit=5)

        ctx = EvolutionContext(
            trigger=EvolutionTrigger.ANALYSIS,
            suggestion=EvolutionSuggestion(
                evolution_type=EvolutionType.FIX,
                target_skill_ids=[meta.skill_id],
                direction=direction,
            ),
            skill_records=[rec],
            skill_contents=[content],
            skill_dirs=[skill_path],
            recent_analyses=recent,
            available_tools=evolver._available_tools,
        )

        logger.info(f"fix_skill: {meta.skill_id} β€” {direction[:100]}")
        new_record = await evolver.evolve(ctx)

        if not new_record:
            return _json_ok({
                "status": "failed",
                "error": "Evolution did not produce a new skill.",
            })

        # Step 4: Write .upload_meta.json
        new_skill_dir = Path(new_record.path).parent if new_record.path else skill_path
        _write_upload_meta(new_skill_dir, {
            "origin": new_record.lineage.origin.value,
            "parent_skill_ids": new_record.lineage.parent_skill_ids,
            "change_summary": new_record.lineage.change_summary,
            "created_by": new_record.lineage.created_by or "openspace",
            "tags": new_record.tags,
        })

        return _json_ok({
            "status": "success",
            "new_skill": {
                "skill_dir": str(new_skill_dir),
                "name": new_record.name,
                "origin": new_record.lineage.origin.value,
                "change_summary": new_record.lineage.change_summary,
                "upload_ready": True,
            },
        })

    except Exception as e:
        logger.error(f"fix_skill failed: {e}", exc_info=True)
        return _json_error(e, status="error", traceback=traceback.format_exc(limit=5))


@mcp.tool()
async def upload_skill(
    skill_dir: str,
    visibility: str = "public",
    origin: str | None = None,
    parent_skill_ids: list[str] | None = None,
    tags: list[str] | None = None,
    created_by: str | None = None,
    change_summary: str | None = None,
) -> str:
    """Upload a local skill to the cloud.

    For evolved skills (from ``execute_task`` or ``fix_skill``), most
    metadata is **pre-saved** in ``.upload_meta.json``.  The bot only
    needs to provide:

      - ``skill_dir`` β€” path to the skill directory
      - ``visibility`` β€” "public" or "private"

    All other parameters are optional overrides.  If omitted, pre-saved
    values are used.  If no pre-saved values exist, sensible defaults
    are applied.

    **origin + parent_skill_ids constraints** (enforced by cloud):
      - imported / captured β†’ parent_skill_ids must be empty
      - derived β†’ at least 1 parent
      - fixed β†’ exactly 1 parent

    Args:
        skill_dir: Path to skill directory (must contain SKILL.md).
        visibility: "public" or "private".  This is the one thing the
                    bot MUST decide.
        origin: Override origin.  Default: from .upload_meta.json or "imported".
        parent_skill_ids: Override parents.  Default: from .upload_meta.json.
        tags: Override tags.  Default: from .upload_meta.json.
        created_by: Override creator.  Default: from .upload_meta.json.
        change_summary: Override summary.  Default: from .upload_meta.json.
    """
    try:
        skill_path = Path(skill_dir)
        if not (skill_path / "SKILL.md").exists():
            return _json_error(f"SKILL.md not found in {skill_dir}")

        # Read pre-saved metadata (written by execute_task/fix_skill)
        meta = _read_upload_meta(skill_path)

        # Merge: explicit params override pre-saved values
        final_origin = origin if origin is not None else meta.get("origin", "imported")
        final_parents = parent_skill_ids if parent_skill_ids is not None else meta.get("parent_skill_ids", [])
        final_tags = tags if tags is not None else meta.get("tags", [])
        final_created_by = created_by if created_by is not None else meta.get("created_by", "")
        final_change_summary = change_summary if change_summary is not None else meta.get("change_summary", "")

        client = _get_cloud_client()
        result = await asyncio.to_thread(
            client.upload_skill,
            skill_path,
            visibility=visibility,
            origin=final_origin,
            parent_skill_ids=final_parents,
            tags=final_tags,
            created_by=final_created_by,
            change_summary=final_change_summary,
        )
        return _json_ok(result)

    except Exception as e:
        logger.error(f"upload_skill failed: {e}", exc_info=True)
        return _json_error(e, status="error", traceback=traceback.format_exc(limit=5))

def run_mcp_server() -> None:
    """Console-script entry point for ``openspace-mcp``."""
    import argparse

    parser = argparse.ArgumentParser(description="OpenSpace MCP Server")
    parser.add_argument("--transport", choices=["stdio", "sse"], default="stdio")
    parser.add_argument("--port", type=int, default=8080)
    args = parser.parse_args()

    if args.transport == "sse":
        mcp.run(transport="sse", sse_params={"port": args.port})
    else:
        mcp.run(transport="stdio")


if __name__ == "__main__":
    run_mcp_server()