charSLee013
feat: complete Hugging Face Spaces deployment with production-ready CognitiveKernel-Launchpad
1ea26af
#
import os
import re
import shutil
import urllib.request
from contextlib import contextmanager
from concurrent.futures import ThreadPoolExecutor
from ..agents.agent import MultiStepAgent, register_template, ActionResult
from ..agents.model import LLM
from ..agents.utils import zwarn, rprint, have_images_in_messages
from ..agents.tool import SimpleSearchTool
from .utils import WebEnv
from .playwright_utils import PlaywrightWebEnv
from .prompts import PROMPTS as WEB_PROMPTS
# --
# pre-defined actions: simply convert things to str
def web_click(id: int, link_name=""): return ActionResult(f"click [{id}] {link_name}")
def web_type(id: int, content: str, enter=True): return ActionResult(f"type [{id}] {content}" if enter else f"type [{id}] {content}[NOENTER]")
def web_scroll_up(): return ActionResult(f"scroll up")
def web_scroll_down(): return ActionResult(f"scroll down")
def web_wait(): return ActionResult(f"wait")
def web_goback(): return ActionResult(f"goback")
def web_restart(): return ActionResult(f"restart")
def web_goto(url: str): return ActionResult(f"goto {url}")
class ThreadedWebEnv:
"""A thin proxy that runs the builtin PlaywrightWebEnv entirely on a dedicated thread.
Ensures sync Playwright APIs never execute on an asyncio event-loop thread.
"""
def __init__(self, **kwargs):
self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ck_web_env")
self._env = None
def _create():
# Import here so tests can monkeypatch ck_pro.ck_web.playwright_utils.PlaywrightWebEnv
from .playwright_utils import PlaywrightWebEnv as _PWE
return _PWE(**kwargs)
# Construct the real env on the dedicated thread
self._env = self._executor.submit(_create).result()
def _call(self, fn_name, *args, **kwargs):
def _invoke():
env = self._env
return getattr(env, fn_name)(*args, **kwargs)
return self._executor.submit(_invoke).result()
# Public methods used by WebAgent
def get_state(self):
return self._call("get_state")
def step_state(self, action_string: str) -> str:
return self._call("step_state", action_string)
def sync_files(self):
return self._call("sync_files")
def stop(self):
# Cleanup the underlying env on its own thread, then shutdown the executor
def _cleanup():
env = self._env
if env is not None:
try:
env.stop()
finally:
bp = getattr(env, "browser_pool", None)
if bp:
try:
bp.stop()
finally:
pass
self._env = None
try:
self._executor.submit(_cleanup).result()
finally:
self._executor.shutdown(wait=True)
# def web_stop(answer, summary): return ActionResult(f"stop [{answer}] ({summary})") # use self-defined function!
# --
class WebAgent(MultiStepAgent):
def __init__(self, settings=None, logger=None, **kwargs):
# note: this is a little tricky since things will get re-init again in super().__init__
feed_kwargs = dict(
name="web_agent",
description="A web agent helping to browse and operate web pages to solve a specific task.",
templates={"plan": "web_plan", "action": "web_action", "end": "web_end"}, # template names
max_steps=16,
)
feed_kwargs.update(kwargs)
self.logger = logger # 接收外部传入的日志器
self.settings = settings # Store settings reference
self.web_env_kwargs = {} # kwargs for web env
self.check_nodiff_steps = 3 # if for 3 steps, we have the same web page, then explicitly indicating this!
self.html_md_budget = 0 # budget in bytes (around 4 bytes per token, for example: 2K bytes ~ 500 tokens; 0 means not using this)
self.use_multimodal = "auto" # no: always no, yes: always yes, auto: let the agent decide
# Use same model config as main model for multimodal (if provided); otherwise lazy init
multimodal_kwargs = kwargs.get('model', {}).copy() if kwargs.get('model') else None
if multimodal_kwargs:
self.model_multimodal = LLM(**multimodal_kwargs)
else:
# Lazy/default init to avoid validation errors when not needed
self.model_multimodal = LLM(_default_init=True)
# Fuse mechanism is fully automatic - no manual configuration needed
# self.searcher = SimpleSearchTool(max_results=16, list_enum=False) # use more!
# --
register_template(WEB_PROMPTS) # add web prompts
super().__init__(**feed_kwargs)
self.web_envs = {} # session_id -> ENV
self.ACTIVE_FUNCTIONS.update(click=web_click, type=web_type, scroll_up=web_scroll_up, scroll_down=web_scroll_down, wait=web_wait, goback=web_goback, restart=web_restart, goto=web_goto)
# self.ACTIVE_FUNCTIONS.update(stop=self._my_stop, save=self._my_save, search=self._my_search)
self.ACTIVE_FUNCTIONS.update(stop=self._my_stop, save=self._my_save, screenshot=self._my_screenshot)
# --
# note: a specific stop function!
def _my_stop(self, answer: str = None, summary: str = None, output: str = None):
if output:
ret = f"Final answer: [{output}] ({summary})"
else:
ret = f"Final answer: [{answer}] ({summary})"
self.put_final_result(ret) # mark end and put final result
return ActionResult("stop", ret)
# note: special save
def _my_save(self, remote_path: str, local_path: str):
try:
_dir = os.path.dirname(local_path)
if _dir:
os.makedirs(_dir, exist_ok=True)
if local_path != remote_path:
remote_path = remote_path.strip()
if remote_path.startswith("http://") or remote_path.startswith("https://"): # retrieve from the web
urllib.request.urlretrieve(remote_path, local_path)
else: # simply copy!
shutil.copyfile(remote_path, local_path)
ret = f"Save Succeed: from remote_path = {remote_path} to local_path = {local_path}"
except Exception as e:
ret = f"Save Failed with {e}: from remote_path = {remote_path} to local_path = {local_path}"
return ActionResult("save", ret)
# note: whether use the screenshot mode
def _my_screenshot(self, flag: bool, save_path=""):
return ActionResult(f"screenshot {int(flag)} {save_path}")
def get_function_definition(self, short: bool):
if short:
return "- def web_agent(task: str, target_url: str = None) -> Dict: # Employs a web browser to navigate and interact with web pages to accomplish a specific task. Note that the web agent is limited to downloading files and cannot process or analyze them."
else:
return """- web_agent
```python
def web_agent(task: str) -> dict:
\""" Employs a web browser to navigate and interact with web pages to accomplish a specific task.
Args:
task (str): A detailed description of the task to perform. This may include:
- The target website(s) to visit (include valid URLs).
- Specific output formatting requirements.
- Instructions to download files (specify desired output path if needed).
Returns:
dict: A dictionary with the following structure:
{
'output': <str> # The well-formatted answer, strictly following any specified output format.
'log': <str> # Additional notes, such as steps taken, issues encountered, or relevant context.
}
Notes:
- If the `task` specifies an output format, ensure the 'output' field matches it exactly.
- The web agent can download files, but cannot process or analyze them. If file analysis is required, save the file to a local path and return control to an external planner or file agent for further processing.
Example:
>>> answer = web_agent(task="What is the current club of Messi? (Format your output directly as 'club_name'.)")
>>> print(answer) # directly print the full result dictionary
\"""
```"""
def __call__(self, task: str, **kwargs): # allow *args styled calling
return super().__call__(task, **kwargs)
def init_run(self, session):
super().init_run(session)
_id = session.id
assert _id not in self.web_envs
_kwargs = self.web_env_kwargs.copy()
if session.info.get("target_url"):
_kwargs["starting_target_url"] = session.info["target_url"]
_kwargs["logger"] = self.logger # 传递 logger 给 WebEnv
# 自动选择Web环境实现:优先HTTP API,失败则使用内置Playwright
web_ip = _kwargs.get("web_ip", "localhost:3000")
if self._test_web_ip_connection(web_ip):
if self.logger:
self.logger.info("[WEB_AGENT] Using HTTP API (web_ip: %s)", web_ip)
self.web_envs[_id] = WebEnv(**_kwargs)
else:
if self.logger:
self.logger.info("[WEB_AGENT] HTTP API unavailable, using builtin")
# 使用内置实现
builtin_kwargs = {k: v for k, v in _kwargs.items()
if k in ["starting_target_url", "logger", "headless", "max_browsers", "web_timeout"]}
# Run builtin PlaywrightWebEnv entirely on a dedicated thread to avoid asyncio-loop conflicts
self.web_envs[_id] = ThreadedWebEnv(**builtin_kwargs)
def _test_web_ip_connection(self, web_ip: str) -> bool:
"""测试web_ip连接性"""
try:
import requests
response = requests.get(f"http://{web_ip}/health", timeout=5)
return response.status_code == 200
except Exception:
return False
def end_run(self, session):
ret = super().end_run(session)
_id = session.id
self.web_envs[_id].stop()
del self.web_envs[_id] # remove web env
return ret
def step_call(self, messages, session, model=None):
_use_multimodal = session.info.get("use_multimodal", False) or have_images_in_messages(messages)
if model is None:
model = self.model_multimodal if _use_multimodal else self.model # use which model?
response = model(messages)
return response
def step_prepare(self, session, state):
_input_kwargs, _extra_kwargs = super().step_prepare(session, state)
_web_env = self.web_envs[session.id]
_web_state = _web_env.get_state()
_this_page_info = self._prep_page(_web_state)
_input_kwargs.update(_this_page_info) # update for the current one
if session.num_of_steps() > 1: # has previous step
_prev_step = session.get_specific_step(-2) # the step before
_input_kwargs.update(self._prep_page(_prev_step["action"]["web_state_before"], suffix="_old"))
else:
_input_kwargs["web_page_old"] = "N/A"
_input_kwargs["html_md"] = self._prep_html_md(_web_state)
# --
# check web page differences
if session.num_of_steps() >= self.check_nodiff_steps and self.check_nodiff_steps > 1:
_check_pages = [self._prep_page(z["action"]["web_state_before"]) for z in session.get_latest_steps(count=self.check_nodiff_steps-1)] + [_this_page_info]
if all(z==_check_pages[0] for z in _check_pages): # error
# 埋点:检测到卡在同一页面的错误
if self.logger:
self.logger.warning("[WEB_FALLBACK] Trigger: stuck_same_page | Method: stop_function | Result: error_message_added | Impact: task_termination")
_input_kwargs["web_page"] = _input_kwargs["web_page"] + "\n(* Error: Notice that we have been stuck at the same page for many steps, use the `stop` function to terminate and report related errors!!)"
elif _check_pages[-1] == _check_pages[-2]: # warning
# 埋点:检测到页面未变化的警告
if self.logger:
self.logger.debug("[WEB_DECISION] page_unchanged -> warning_message")
_input_kwargs["web_page"] = _input_kwargs["web_page"] + "\n(* Warning: Notice that the web page has not been changed.)"
# --
_extra_kwargs["web_env"] = _web_env
return _input_kwargs, _extra_kwargs
def step_action(self, action_res, action_input_kwargs, web_env=None, **kwargs):
action_res["web_state_before"] = web_env.get_state() # inplace storage of the web-state before the action
_rr = super().step_action(action_res, action_input_kwargs) # get action from code execution
if isinstance(_rr, ActionResult):
action_str, action_result = _rr.action, _rr.result
else:
action_str = self.get_obs_str(None, obs=_rr, add_seq_enum=False)
action_str, action_result = "nop", action_str.strip() # no-operation
# 埋点:浏览器动作执行前
if self.logger:
current_state = web_env.get_state()
current_url = current_state.get('current_url', 'unknown')
self.logger.info("[WEB_BROWSER] Executing: %s", action_str)
self.logger.debug("[WEB_STATE] Before_URL: %s", current_url)
# state step
try: # execute the action on the browser
step_result = web_env.step_state(action_str)
ret = action_result if action_result is not None else step_result # use action result if there are direct ones
web_env.sync_files()
# 埋点:浏览器动作执行后
if self.logger:
new_state = web_env.get_state()
new_url = new_state.get('current_url', 'unknown')
self.logger.info("[WEB_BROWSER] Result: success | URL: %s", new_url)
if new_url != current_url:
self.logger.info("[WEB_STATE] URL_Changed: %s -> %s", current_url, new_url)
except Exception as e:
zwarn("web_env execution error!")
ret = f"Browser error: {e}"
# 埋点:浏览器动作执行错误
if self.logger:
self.logger.error("[WEB_BROWSER] Error: %s", str(e))
return ret
# --
# other helpers
def _prep_page(self, web_state, suffix=""):
_ss = web_state
_ret = _ss["current_accessibility_tree"]
if _ss["error_message"]:
_ret = _ret + "\n(Note: " + _ss["error_message"] + ")"
elif _ss["current_has_cookie_popup"]:
_ret = _ret + "\n(Note: There is a cookie banner on the page, please accept the cookie banner.)"
ret = {"web_page": _ret, "downloaded_file_path": _ss["downloaded_file_path"]}
# --
if self.use_multimodal == 'on': # always on
ret["screenshot"] = _ss["boxed_screenshot"]
elif self.use_multimodal == 'off':
ret["screenshot_note"] = "The current system does not support webpage screenshots. Please refer to the accessibility tree to understand the current webpage."
else: # adaptive decision
if web_state.get("curr_screenshot_mode"): # currently on
ret["screenshot"] = _ss["boxed_screenshot"]
else:
ret["screenshot_note"] = "The current system's screenshot mode is off. If you need the screenshots, please use the corresponding action to turn it on."
# --
if suffix:
ret = {k+suffix: v for k, v in ret.items()}
return ret
def _prep_html_md(self, web_state):
_IGNORE_LINE_LEN = 7 # ignore md line if <= this
_LOCAL_WINDOW = 2 # -W -> +W
_budget = self.html_md_budget
if _budget <= 0:
return ""
# --
axtree, html_md = web_state["current_accessibility_tree"], web_state.get("html_md", "")
# first locate raw texts from axtree
axtree_texts = []
for line in axtree.split("\n"):
m = re.findall(r"(?:StaticText|link)\s+'(.*)'", line)
axtree_texts.extend(m)
# then locate to the html ones
html_lines = [z for z in html_md.split("\n") if z.strip() and len(z) > _IGNORE_LINE_LEN]
hit_lines = set()
_last_hit = 0
for one_t in axtree_texts:
_curr = _last_hit
while _curr < len(html_lines):
if one_t in html_lines[_curr]: # hit
hit_lines.update([ii for ii in range(_curr-_LOCAL_WINDOW, _curr+_LOCAL_WINDOW+1) if ii>=0 and ii<len(html_lines)]) # add local window
_last_hit = _curr
break
_curr += 1
# get the contents
_last_idx = -1
_all_addings = []
_all_adding_lines = []
for line_idx in sorted(hit_lines):
if _budget < 0:
break
_line = html_lines[line_idx].rstrip()
adding = f"...\n{_line}" if (line_idx > _last_idx+1) else _line
_all_addings.append(adding)
_all_adding_lines.append(line_idx)
_budget -= len(adding.encode()) # with regard to bytes!
_last_idx = line_idx
while _budget > 0: # add more lines if we still have budget
_last_idx = _last_idx + 1
if _last_idx >= len(html_lines):
break
_line = html_lines[_last_idx].rstrip()
_all_addings.append(_line)
_all_adding_lines.append(_last_idx)
_budget -= len(_line.encode()) # with regard to bytes!
if _last_idx < len(html_lines):
_all_addings.append("...")
final_ret = "\n".join(_all_addings)
return final_ret
def set_multimodal(self, use_multimodal):
if use_multimodal is not None:
self.use_multimodal = use_multimodal
def get_multimodal(self):
return self.use_multimodal