Spaces:
Sleeping
Sleeping
| """ | |
| Plugins API Router | |
| ================== | |
| Lists registered tools, capabilities, and playground plugins. | |
| Provides direct tool execution for the Playground tester. | |
| """ | |
| import asyncio | |
| import contextlib | |
| import json | |
| import logging | |
| import re | |
| import time | |
| from typing import Any, AsyncGenerator | |
| from fastapi import APIRouter, HTTPException | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel | |
| from deeptutor.runtime.registry.capability_registry import get_capability_registry | |
| from deeptutor.runtime.registry.tool_registry import get_tool_registry | |
| from deeptutor.logging import ConsoleFormatter | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter() | |
| ANSI_ESCAPE_RE = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]") | |
| def _discover_plugins() -> list[Any]: | |
| try: | |
| from deeptutor.plugins.loader import discover_plugins | |
| except Exception: | |
| logger.debug("Plugin loader unavailable; returning no plugins.", exc_info=True) | |
| return [] | |
| return discover_plugins() | |
| class ToolExecuteRequest(BaseModel): | |
| params: dict[str, Any] = {} | |
| class CapabilityExecuteRequest(BaseModel): | |
| content: str | |
| tools: list[str] = [] | |
| knowledge_bases: list[str] = [] | |
| language: str = "en" | |
| config: dict[str, Any] = {} | |
| attachments: list[dict[str, Any]] = [] | |
| async def list_plugins(): | |
| tool_registry = get_tool_registry() | |
| capability_registry = get_capability_registry() | |
| plugin_manifests = _discover_plugins() | |
| tools = [ | |
| { | |
| "name": definition.name, | |
| "description": definition.description, | |
| "parameters": [ | |
| { | |
| "name": parameter.name, | |
| "type": parameter.type, | |
| "description": parameter.description, | |
| "required": parameter.required, | |
| "default": parameter.default, | |
| "enum": parameter.enum, | |
| } | |
| for parameter in definition.parameters | |
| ], | |
| } | |
| for definition in tool_registry.get_definitions() | |
| ] | |
| capabilities = capability_registry.get_manifests() | |
| plugins = [ | |
| { | |
| "name": plugin.name, | |
| "type": plugin.type, | |
| "description": plugin.description, | |
| "stages": plugin.stages, | |
| "version": plugin.version, | |
| "author": plugin.author, | |
| } | |
| for plugin in plugin_manifests | |
| ] | |
| return { | |
| "tools": tools, | |
| "capabilities": capabilities, | |
| "plugins": plugins, | |
| } | |
| async def execute_tool(tool_name: str, body: ToolExecuteRequest): | |
| """Execute a single tool with explicit parameters (for Playground testing).""" | |
| registry = get_tool_registry() | |
| tool = registry.get(tool_name) | |
| if not tool: | |
| raise HTTPException(status_code=404, detail=f"Tool '{tool_name}' not found") | |
| try: | |
| result = await tool.execute(**body.params) | |
| return { | |
| "success": result.success, | |
| "content": result.content, | |
| "sources": result.sources, | |
| "metadata": result.metadata, | |
| } | |
| except Exception as exc: | |
| logger.exception("Tool execution failed: %s", tool_name) | |
| raise HTTPException(status_code=500, detail=str(exc)) | |
| class _QueueLogHandler(logging.Handler): | |
| """Temporary handler that pushes formatted log records into an asyncio queue.""" | |
| def __init__(self, queue: asyncio.Queue, loop: asyncio.AbstractEventLoop): | |
| super().__init__(level=logging.DEBUG) | |
| self._queue = queue | |
| self._loop = loop | |
| formatter = ConsoleFormatter(service_prefix=None) | |
| formatter.use_colors = False | |
| self.setFormatter(formatter) | |
| def emit(self, record: logging.LogRecord): | |
| line = ANSI_ESCAPE_RE.sub("", self.format(record)).strip() | |
| if line: | |
| self._loop.call_soon_threadsafe(self._queue.put_nowait, f"[Backend] {line}") | |
| class _QueueTextStream: | |
| """Capture plain stdout/stderr writes and forward complete lines into the queue.""" | |
| def __init__(self, queue: asyncio.Queue, loop: asyncio.AbstractEventLoop, stream): | |
| self._queue = queue | |
| self._loop = loop | |
| self._stream = stream | |
| self._buffer = "" | |
| def write(self, text: str) -> int: | |
| if self._stream is not None: | |
| self._stream.write(text) | |
| self._stream.flush() | |
| self._buffer += text | |
| while "\n" in self._buffer: | |
| line, self._buffer = self._buffer.split("\n", 1) | |
| line = line.rstrip("\r") | |
| if line.strip(): | |
| self._loop.call_soon_threadsafe( | |
| self._queue.put_nowait, f"[Backend] {ANSI_ESCAPE_RE.sub('', line)}" | |
| ) | |
| return len(text) | |
| def flush(self): | |
| if self._stream is not None: | |
| self._stream.flush() | |
| def isatty(self) -> bool: | |
| return False | |
| def _collect_project_loggers() -> list[logging.Logger]: | |
| """Collect active project loggers because many do not propagate to the root logger.""" | |
| candidates: list[logging.Logger] = [] | |
| for parent_name in ("deeptutor", "src"): | |
| parent_logger = logging.getLogger(parent_name) | |
| if isinstance(parent_logger, logging.Logger): | |
| candidates.append(parent_logger) | |
| for name, logger_obj in logging.root.manager.loggerDict.items(): | |
| if not (name.startswith("deeptutor") or name.startswith("src")): | |
| continue | |
| if isinstance(logger_obj, logging.Logger): | |
| candidates.append(logger_obj) | |
| unique: list[logging.Logger] = [] | |
| seen: set[int] = set() | |
| for logger_obj in candidates: | |
| key = id(logger_obj) | |
| if key in seen: | |
| continue | |
| seen.add(key) | |
| unique.append(logger_obj) | |
| return unique | |
| async def _execute_stream(tool_name: str, params: dict[str, Any]) -> AsyncGenerator[str, None]: | |
| """Run a tool while capturing all deeptutor.* logs and yielding SSE events.""" | |
| registry = get_tool_registry() | |
| tool = registry.get(tool_name) | |
| if not tool: | |
| yield f"event: error\ndata: {json.dumps({'detail': f'Tool {tool_name!r} not found'})}\n\n" | |
| return | |
| log_queue: asyncio.Queue[str] = asyncio.Queue() | |
| loop = asyncio.get_running_loop() | |
| handler = _QueueLogHandler(log_queue, loop) | |
| stdout_stream = _QueueTextStream(log_queue, loop, stream=None) | |
| stderr_stream = _QueueTextStream(log_queue, loop, stream=None) | |
| attached_loggers = _collect_project_loggers() | |
| for logger_obj in attached_loggers: | |
| logger_obj.addHandler(handler) | |
| result_holder: dict[str, Any] = {} | |
| error_holder: dict[str, str] = {} | |
| done = asyncio.Event() | |
| async def _run(): | |
| try: | |
| import sys | |
| stdout_stream._stream = sys.stdout | |
| stderr_stream._stream = sys.stderr | |
| with contextlib.redirect_stdout(stdout_stream), contextlib.redirect_stderr( | |
| stderr_stream | |
| ): | |
| result = await tool.execute(**params) | |
| result_holder["data"] = { | |
| "success": result.success, | |
| "content": result.content, | |
| "sources": result.sources, | |
| "metadata": result.metadata, | |
| } | |
| except Exception as exc: | |
| error_holder["detail"] = str(exc) | |
| finally: | |
| done.set() | |
| task = asyncio.create_task(_run()) | |
| t0 = time.monotonic() | |
| try: | |
| while not done.is_set(): | |
| try: | |
| line = await asyncio.wait_for(log_queue.get(), timeout=0.15) | |
| yield f"event: log\ndata: {json.dumps({'line': line})}\n\n" | |
| except asyncio.TimeoutError: | |
| pass | |
| while not log_queue.empty(): | |
| line = log_queue.get_nowait() | |
| yield f"event: log\ndata: {json.dumps({'line': line})}\n\n" | |
| elapsed_ms = round((time.monotonic() - t0) * 1000) | |
| if error_holder: | |
| yield f"event: error\ndata: {json.dumps({'detail': error_holder['detail'], 'elapsed_ms': elapsed_ms})}\n\n" | |
| else: | |
| payload = {**result_holder.get("data", {}), "elapsed_ms": elapsed_ms} | |
| yield f"event: result\ndata: {json.dumps(payload, default=str)}\n\n" | |
| finally: | |
| for logger_obj in attached_loggers: | |
| if handler in logger_obj.handlers: | |
| logger_obj.removeHandler(handler) | |
| if not task.done(): | |
| task.cancel() | |
| async def execute_tool_stream(tool_name: str, body: ToolExecuteRequest): | |
| """Execute a tool and stream logs + result as SSE.""" | |
| return StreamingResponse( | |
| _execute_stream(tool_name, body.params), | |
| media_type="text/event-stream", | |
| headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, | |
| ) | |
| async def _execute_capability_stream( | |
| capability_name: str, | |
| body: CapabilityExecuteRequest, | |
| ) -> AsyncGenerator[str, None]: | |
| """Run a capability while streaming logs, trace events, and the final result.""" | |
| from deeptutor.core.context import Attachment, UnifiedContext | |
| from deeptutor.runtime.orchestrator import ChatOrchestrator | |
| orch = ChatOrchestrator() | |
| if capability_name not in orch.list_capabilities(): | |
| yield ( | |
| f"event: error\ndata: " | |
| f"{json.dumps({'detail': f'Capability {capability_name!r} not found'})}\n\n" | |
| ) | |
| return | |
| attachments = [ | |
| Attachment( | |
| type=a.get("type", "file"), | |
| url=a.get("url", ""), | |
| base64=a.get("base64", ""), | |
| filename=a.get("filename", ""), | |
| mime_type=a.get("mime_type", ""), | |
| ) | |
| for a in body.attachments | |
| ] | |
| ctx = UnifiedContext( | |
| user_message=body.content, | |
| enabled_tools=body.tools, | |
| active_capability=capability_name, | |
| knowledge_bases=body.knowledge_bases, | |
| attachments=attachments, | |
| config_overrides=body.config, | |
| language=body.language, | |
| ) | |
| log_queue: asyncio.Queue[str] = asyncio.Queue() | |
| loop = asyncio.get_running_loop() | |
| handler = _QueueLogHandler(log_queue, loop) | |
| stdout_stream = _QueueTextStream(log_queue, loop, stream=None) | |
| stderr_stream = _QueueTextStream(log_queue, loop, stream=None) | |
| attached_loggers = _collect_project_loggers() | |
| for logger_obj in attached_loggers: | |
| logger_obj.addHandler(handler) | |
| final_result: dict[str, Any] | None = None | |
| error_holder: dict[str, str] = {} | |
| done = asyncio.Event() | |
| async def _run(): | |
| nonlocal final_result | |
| try: | |
| import sys | |
| stdout_stream._stream = sys.stdout | |
| stderr_stream._stream = sys.stderr | |
| with contextlib.redirect_stdout(stdout_stream), contextlib.redirect_stderr( | |
| stderr_stream | |
| ): | |
| async for event in orch.handle(ctx): | |
| if event.type.value == "result": | |
| final_result = dict(event.metadata) | |
| continue | |
| await log_queue.put( | |
| "__STREAM_EVENT__" + json.dumps(event.to_dict(), default=str) | |
| ) | |
| except Exception as exc: | |
| error_holder["detail"] = str(exc) | |
| finally: | |
| done.set() | |
| task = asyncio.create_task(_run()) | |
| t0 = time.monotonic() | |
| try: | |
| while not done.is_set(): | |
| try: | |
| line = await asyncio.wait_for(log_queue.get(), timeout=0.15) | |
| if line.startswith("__STREAM_EVENT__"): | |
| payload = line.removeprefix("__STREAM_EVENT__") | |
| yield f"event: stream\ndata: {payload}\n\n" | |
| else: | |
| yield f"event: log\ndata: {json.dumps({'line': line})}\n\n" | |
| except asyncio.TimeoutError: | |
| pass | |
| while not log_queue.empty(): | |
| line = log_queue.get_nowait() | |
| if line.startswith("__STREAM_EVENT__"): | |
| payload = line.removeprefix("__STREAM_EVENT__") | |
| yield f"event: stream\ndata: {payload}\n\n" | |
| else: | |
| yield f"event: log\ndata: {json.dumps({'line': line})}\n\n" | |
| elapsed_ms = round((time.monotonic() - t0) * 1000) | |
| if error_holder: | |
| yield ( | |
| f"event: error\ndata: " | |
| f"{json.dumps({'detail': error_holder['detail'], 'elapsed_ms': elapsed_ms})}\n\n" | |
| ) | |
| else: | |
| yield ( | |
| f"event: result\ndata: " | |
| f"{json.dumps({'success': True, 'data': final_result or {}, 'elapsed_ms': elapsed_ms}, default=str)}\n\n" | |
| ) | |
| finally: | |
| for logger_obj in attached_loggers: | |
| if handler in logger_obj.handlers: | |
| logger_obj.removeHandler(handler) | |
| if not task.done(): | |
| task.cancel() | |
| async def execute_capability_stream( | |
| capability_name: str, | |
| body: CapabilityExecuteRequest, | |
| ): | |
| """Execute a capability and stream logs + trace + final result as SSE.""" | |
| return StreamingResponse( | |
| _execute_capability_stream(capability_name, body), | |
| media_type="text/event-stream", | |
| headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, | |
| ) | |