charSLee013
feat: complete Hugging Face Spaces deployment with production-ready CognitiveKernel-Launchpad
1ea26af
#
import time
import re
import random
from ..agents.agent import MultiStepAgent, register_template, AgentResult
from ..agents.tool import StopTool, AskLLMTool, SimpleSearchTool
from ..agents.utils import zwarn, CodeExecutor, rprint
from ..ck_web.agent import WebAgent
# SmolWeb alternative removed
from ..ck_file.agent import FileAgent
from .prompts import PROMPTS as CK_PROMPTS
# --
class CKAgent(MultiStepAgent):
def __init__(self, settings, logger=None, **kwargs):
# note: this is a little tricky since things will get re-init again in super().__init__
# Initialize search_backend attribute for KwargsInitializable
self.search_backend = None
# Dedicated single-thread executor for action code to ensure thread-affinity
self._action_executor = None
# Store settings reference
self.settings = settings
# sub-agents - pass settings to each sub-agent during construction
# Extract child configs from kwargs (do not pass them to super().__init__)
web_kwargs = (kwargs.get('web_agent') or {}).copy()
file_kwargs = (kwargs.get('file_agent') or {}).copy()
# Pass all web_agent kwargs through; WebAgent will consume model/max_steps/web_env_kwargs/etc.
self.web_agent = WebAgent(settings=settings, logger=logger, **web_kwargs)
# Likewise for file agent (model/max_steps/etc.)
self.file_agent = FileAgent(settings=settings, **file_kwargs)
self.tool_ask_llm = AskLLMTool()
# Configure search backend from config.toml if provided
search_backend = kwargs.get('search_backend')
if search_backend:
try:
from ..agents.search.config import SearchConfigManager
SearchConfigManager.initialize_from_string(search_backend)
except Exception as e:
# LET IT CRASH - don't hide configuration errors
raise RuntimeError(f"Failed to configure search backend {search_backend}: {e}") from e
# Create search tool (will use configured backend or factory default)
self.tool_simple_search = SimpleSearchTool()
# Choose ck_end template by verbosity style (less|medium|more)
style = kwargs.get('end_template', 'less')
_end_map = {
'less': 'ck_end_less',
'medium': 'ck_end_medium',
'more': 'ck_end_more',
}
end_tpl = _end_map.get(style, 'ck_end_less')
feed_kwargs = dict(
name="ck_agent",
description="Cognitive Kernel, an initial autopilot system.",
templates={"plan": "ck_plan", "action": "ck_action", "end": end_tpl}, # template names
active_functions=["web_agent", "file_agent", "stop", "ask_llm", "simple_web_search"], # enable the useful modules
sub_agent_names=["web_agent", "file_agent"], # note: another tricky point, use name rather than the objects themselves
tools=[StopTool(agent=self), self.tool_ask_llm, self.tool_simple_search], # add related tools
max_steps=16, # still give it more steps
max_time_limit=4200, # 70 minutes
exec_timeout_with_call=1000, # if calling sub-agent
exec_timeout_wo_call=200, # if not calling sub-agent
)
# Apply configuration overrides (remove internal-only keys first)
# Strip child sections so super().__init__ won't reconstruct sub-agents
filtered = {k: v for k, v in kwargs.items() if k not in ('web_agent', 'file_agent', 'end_template')}
feed_kwargs.update(filtered)
# Parallel processing removed - single execution path only
register_template(CK_PROMPTS) # add web prompts
super().__init__(**feed_kwargs)
self.tool_ask_llm.set_llm(self.model) # another tricky part, we need to assign LLM later
self.tool_simple_search.set_llm(self.model)
# --
def get_function_definition(self, short: bool):
raise RuntimeError("Should NOT use CKAgent as a sub-agent!")
def _ensure_action_executor(self):
if self._action_executor is None:
from concurrent.futures import ThreadPoolExecutor
# Single dedicated worker thread to keep Playwright and sub-agents in one thread
self._action_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ck_action")
def step_action(self, action_res, action_input_kwargs, **kwargs):
"""Execute single action step in a dedicated thread (to avoid asyncio-loop conflicts)."""
self._ensure_action_executor()
def _do_execute():
python_executor = CodeExecutor()
python_executor.add_global_vars(**self.ACTIVE_FUNCTIONS)
_exec_timeout = self.exec_timeout_with_call if any((z in action_res["code"]) for z in self.sub_agent_names) else self.exec_timeout_wo_call
python_executor.run(action_res["code"], catch_exception=True, timeout=_exec_timeout)
ret = python_executor.get_print_results()
rprint(f"Obtain action res = {ret}", style="white on yellow")
return ret
# Run user action code on the dedicated worker thread and wait for completion
future = self._action_executor.submit(_do_execute)
return future.result()
def end_run(self, session):
ret = super().end_run(session)
# Cleanly shutdown the dedicated action executor to release resources
if self._action_executor is not None:
self._action_executor.shutdown(wait=True)
self._action_executor = None
return ret