File size: 18,220 Bytes
1ea26af
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
#

import os
import re
import shutil
import urllib.request
from contextlib import contextmanager
from concurrent.futures import ThreadPoolExecutor

from ..agents.agent import MultiStepAgent, register_template, ActionResult
from ..agents.model import LLM
from ..agents.utils import zwarn, rprint, have_images_in_messages
from ..agents.tool import SimpleSearchTool

from .utils import WebEnv
from .playwright_utils import PlaywrightWebEnv
from .prompts import PROMPTS as WEB_PROMPTS

# --
# pre-defined actions: simply convert things to str
def web_click(id: int, link_name=""): return ActionResult(f"click [{id}] {link_name}")
def web_type(id: int, content: str, enter=True): return ActionResult(f"type [{id}] {content}" if enter else f"type [{id}] {content}[NOENTER]")
def web_scroll_up(): return ActionResult(f"scroll up")
def web_scroll_down(): return ActionResult(f"scroll down")
def web_wait(): return ActionResult(f"wait")
def web_goback(): return ActionResult(f"goback")
def web_restart(): return ActionResult(f"restart")
def web_goto(url: str): return ActionResult(f"goto {url}")
class ThreadedWebEnv:
    """A thin proxy that runs the builtin PlaywrightWebEnv entirely on a dedicated thread.
    Ensures sync Playwright APIs never execute on an asyncio event-loop thread.
    """
    def __init__(self, **kwargs):
        self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ck_web_env")
        self._env = None

        def _create():
            # Import here so tests can monkeypatch ck_pro.ck_web.playwright_utils.PlaywrightWebEnv
            from .playwright_utils import PlaywrightWebEnv as _PWE
            return _PWE(**kwargs)

        # Construct the real env on the dedicated thread
        self._env = self._executor.submit(_create).result()

    def _call(self, fn_name, *args, **kwargs):
        def _invoke():
            env = self._env
            return getattr(env, fn_name)(*args, **kwargs)
        return self._executor.submit(_invoke).result()

    # Public methods used by WebAgent
    def get_state(self):
        return self._call("get_state")

    def step_state(self, action_string: str) -> str:
        return self._call("step_state", action_string)

    def sync_files(self):
        return self._call("sync_files")

    def stop(self):
        # Cleanup the underlying env on its own thread, then shutdown the executor
        def _cleanup():
            env = self._env
            if env is not None:
                try:
                    env.stop()
                finally:
                    bp = getattr(env, "browser_pool", None)
                    if bp:
                        try:
                            bp.stop()
                        finally:
                            pass
                self._env = None
        try:
            self._executor.submit(_cleanup).result()
        finally:
            self._executor.shutdown(wait=True)

# def web_stop(answer, summary): return ActionResult(f"stop [{answer}] ({summary})")  # use self-defined function!
# --

class WebAgent(MultiStepAgent):
    def __init__(self, settings=None, logger=None, **kwargs):
        # note: this is a little tricky since things will get re-init again in super().__init__
        feed_kwargs = dict(
            name="web_agent",
            description="A web agent helping to browse and operate web pages to solve a specific task.",
            templates={"plan": "web_plan", "action": "web_action", "end": "web_end"},  # template names
            max_steps=16,
        )
        feed_kwargs.update(kwargs)
        self.logger = logger  # 接收外部传入的日志器
        self.settings = settings  # Store settings reference
        self.web_env_kwargs = {}  # kwargs for web env
        self.check_nodiff_steps = 3  # if for 3 steps, we have the same web page, then explicitly indicating this!
        self.html_md_budget = 0  # budget in bytes (around 4 bytes per token, for example: 2K bytes ~ 500 tokens; 0 means not using this)
        self.use_multimodal = "auto"  # no: always no, yes: always yes, auto: let the agent decide
        # Use same model config as main model for multimodal (if provided); otherwise lazy init
        multimodal_kwargs = kwargs.get('model', {}).copy() if kwargs.get('model') else None
        if multimodal_kwargs:
            self.model_multimodal = LLM(**multimodal_kwargs)
        else:
            # Lazy/default init to avoid validation errors when not needed
            self.model_multimodal = LLM(_default_init=True)

        # Fuse mechanism is fully automatic - no manual configuration needed

        # self.searcher = SimpleSearchTool(max_results=16, list_enum=False)  # use more!
        # --
        register_template(WEB_PROMPTS)  # add web prompts
        super().__init__(**feed_kwargs)
        self.web_envs = {}  # session_id -> ENV
        self.ACTIVE_FUNCTIONS.update(click=web_click, type=web_type, scroll_up=web_scroll_up, scroll_down=web_scroll_down, wait=web_wait, goback=web_goback, restart=web_restart, goto=web_goto)
        # self.ACTIVE_FUNCTIONS.update(stop=self._my_stop, save=self._my_save, search=self._my_search)
        self.ACTIVE_FUNCTIONS.update(stop=self._my_stop, save=self._my_save, screenshot=self._my_screenshot)
        # --

    # note: a specific stop function!
    def _my_stop(self, answer: str = None, summary: str = None, output: str = None):
        if output:
            ret = f"Final answer: [{output}] ({summary})"
        else:
            ret = f"Final answer: [{answer}] ({summary})"
        self.put_final_result(ret)  # mark end and put final result
        return ActionResult("stop", ret)

    # note: special save
    def _my_save(self, remote_path: str, local_path: str):
        try:
            _dir = os.path.dirname(local_path)
            if _dir:
                os.makedirs(_dir, exist_ok=True)
            if local_path != remote_path:
                remote_path = remote_path.strip()
                if remote_path.startswith("http://") or remote_path.startswith("https://"):  # retrieve from the web
                    urllib.request.urlretrieve(remote_path, local_path)
                else:  # simply copy!
                    shutil.copyfile(remote_path, local_path)
            ret = f"Save Succeed: from remote_path = {remote_path} to local_path = {local_path}"
        except Exception as e:
            ret = f"Save Failed with {e}: from remote_path = {remote_path} to local_path = {local_path}"
        return ActionResult("save", ret)

    # note: whether use the screenshot mode
    def _my_screenshot(self, flag: bool, save_path=""):
        return ActionResult(f"screenshot {int(flag)} {save_path}")

    def get_function_definition(self, short: bool):
        if short:
            return "- def web_agent(task: str, target_url: str = None) -> Dict:  # Employs a web browser to navigate and interact with web pages to accomplish a specific task. Note that the web agent is limited to downloading files and cannot process or analyze them."
        else:
            return """- web_agent
```python
def web_agent(task: str) -> dict:
    \""" Employs a web browser to navigate and interact with web pages to accomplish a specific task.
    Args:
        task (str): A detailed description of the task to perform. This may include:
            - The target website(s) to visit (include valid URLs).
            - Specific output formatting requirements.
            - Instructions to download files (specify desired output path if needed).
    Returns:
        dict: A dictionary with the following structure:
            {
                'output': <str>  # The well-formatted answer, strictly following any specified output format.
                'log': <str>     # Additional notes, such as steps taken, issues encountered, or relevant context.
            }
    Notes:
        - If the `task` specifies an output format, ensure the 'output' field matches it exactly.
        - The web agent can download files, but cannot process or analyze them. If file analysis is required, save the file to a local path and return control to an external planner or file agent for further processing.
    Example:
        >>> answer = web_agent(task="What is the current club of Messi? (Format your output directly as 'club_name'.)")
        >>> print(answer)  # directly print the full result dictionary
    \"""
```"""

    def __call__(self, task: str, **kwargs):  # allow *args styled calling
        return super().__call__(task, **kwargs)

    def init_run(self, session):
        super().init_run(session)
        _id = session.id
        assert _id not in self.web_envs
        _kwargs = self.web_env_kwargs.copy()
        if session.info.get("target_url"):
            _kwargs["starting_target_url"] = session.info["target_url"]
        _kwargs["logger"] = self.logger  # 传递 logger 给 WebEnv

        # 自动选择Web环境实现:优先HTTP API,失败则使用内置Playwright
        web_ip = _kwargs.get("web_ip", "localhost:3000")

        if self._test_web_ip_connection(web_ip):
            if self.logger:
                self.logger.info("[WEB_AGENT] Using HTTP API (web_ip: %s)", web_ip)
            self.web_envs[_id] = WebEnv(**_kwargs)
        else:
            if self.logger:
                self.logger.info("[WEB_AGENT] HTTP API unavailable, using builtin")
            # 使用内置实现
            builtin_kwargs = {k: v for k, v in _kwargs.items()
                            if k in ["starting_target_url", "logger", "headless", "max_browsers", "web_timeout"]}
            # Run builtin PlaywrightWebEnv entirely on a dedicated thread to avoid asyncio-loop conflicts
            self.web_envs[_id] = ThreadedWebEnv(**builtin_kwargs)

    def _test_web_ip_connection(self, web_ip: str) -> bool:
        """测试web_ip连接性"""
        try:
            import requests
            response = requests.get(f"http://{web_ip}/health", timeout=5)
            return response.status_code == 200
        except Exception:
            return False

    def end_run(self, session):
        ret = super().end_run(session)
        _id = session.id
        self.web_envs[_id].stop()
        del self.web_envs[_id]  # remove web env
        return ret

    def step_call(self, messages, session, model=None):
        _use_multimodal = session.info.get("use_multimodal", False) or have_images_in_messages(messages)
        if model is None:
            model = self.model_multimodal if _use_multimodal else self.model  # use which model?
        response = model(messages)
        return response

    def step_prepare(self, session, state):
        _input_kwargs, _extra_kwargs = super().step_prepare(session, state)
        _web_env = self.web_envs[session.id]
        _web_state = _web_env.get_state()
        _this_page_info = self._prep_page(_web_state)
        _input_kwargs.update(_this_page_info)  # update for the current one
        if session.num_of_steps() > 1:  # has previous step
            _prev_step = session.get_specific_step(-2)  # the step before
            _input_kwargs.update(self._prep_page(_prev_step["action"]["web_state_before"], suffix="_old"))
        else:
            _input_kwargs["web_page_old"] = "N/A"
        _input_kwargs["html_md"] = self._prep_html_md(_web_state)
        # --
        # check web page differences
        if session.num_of_steps() >= self.check_nodiff_steps and self.check_nodiff_steps > 1:
            _check_pages = [self._prep_page(z["action"]["web_state_before"]) for z in session.get_latest_steps(count=self.check_nodiff_steps-1)] + [_this_page_info]
            if all(z==_check_pages[0] for z in _check_pages):  # error
                # 埋点:检测到卡在同一页面的错误
                if self.logger:
                    self.logger.warning("[WEB_FALLBACK] Trigger: stuck_same_page | Method: stop_function | Result: error_message_added | Impact: task_termination")
                _input_kwargs["web_page"] = _input_kwargs["web_page"] + "\n(* Error: Notice that we have been stuck at the same page for many steps, use the `stop` function to terminate and report related errors!!)"
            elif _check_pages[-1] == _check_pages[-2]:  # warning
                # 埋点:检测到页面未变化的警告
                if self.logger:
                    self.logger.debug("[WEB_DECISION] page_unchanged -> warning_message")
                _input_kwargs["web_page"] = _input_kwargs["web_page"] + "\n(* Warning: Notice that the web page has not been changed.)"
        # --
        _extra_kwargs["web_env"] = _web_env
        return _input_kwargs, _extra_kwargs

    def step_action(self, action_res, action_input_kwargs, web_env=None, **kwargs):
        action_res["web_state_before"] = web_env.get_state()  # inplace storage of the web-state before the action
        _rr = super().step_action(action_res, action_input_kwargs)  # get action from code execution
        if isinstance(_rr, ActionResult):
            action_str, action_result = _rr.action, _rr.result
        else:
            action_str = self.get_obs_str(None, obs=_rr, add_seq_enum=False)
            action_str, action_result = "nop", action_str.strip()  # no-operation

        # 埋点:浏览器动作执行前
        if self.logger:
            current_state = web_env.get_state()
            current_url = current_state.get('current_url', 'unknown')
            self.logger.info("[WEB_BROWSER] Executing: %s", action_str)
            self.logger.debug("[WEB_STATE] Before_URL: %s", current_url)

        # state step
        try:  # execute the action on the browser
            step_result = web_env.step_state(action_str)
            ret = action_result if action_result is not None else step_result  # use action result if there are direct ones
            web_env.sync_files()

            # 埋点:浏览器动作执行后
            if self.logger:
                new_state = web_env.get_state()
                new_url = new_state.get('current_url', 'unknown')
                self.logger.info("[WEB_BROWSER] Result: success | URL: %s", new_url)
                if new_url != current_url:
                    self.logger.info("[WEB_STATE] URL_Changed: %s -> %s", current_url, new_url)

        except Exception as e:
            zwarn("web_env execution error!")
            ret = f"Browser error: {e}"
            # 埋点:浏览器动作执行错误
            if self.logger:
                self.logger.error("[WEB_BROWSER] Error: %s", str(e))
        return ret

    # --
    # other helpers

    def _prep_page(self, web_state, suffix=""):
        _ss = web_state
        _ret = _ss["current_accessibility_tree"]
        if _ss["error_message"]:
            _ret = _ret + "\n(Note: " + _ss["error_message"] + ")"
        elif _ss["current_has_cookie_popup"]:
            _ret = _ret + "\n(Note: There is a cookie banner on the page, please accept the cookie banner.)"
        ret = {"web_page": _ret, "downloaded_file_path": _ss["downloaded_file_path"]}
        # --
        if self.use_multimodal == 'on':  # always on
            ret["screenshot"] = _ss["boxed_screenshot"]
        elif self.use_multimodal == 'off':
            ret["screenshot_note"] = "The current system does not support webpage screenshots. Please refer to the accessibility tree to understand the current webpage."
        else:  # adaptive decision
            if web_state.get("curr_screenshot_mode"):  # currently on
                ret["screenshot"] = _ss["boxed_screenshot"]
            else:
                ret["screenshot_note"] = "The current system's screenshot mode is off. If you need the screenshots, please use the corresponding action to turn it on."
        # --
        if suffix:
            ret = {k+suffix: v for k, v in ret.items()}
        return ret

    def _prep_html_md(self, web_state):
        _IGNORE_LINE_LEN = 7  # ignore md line if <= this
        _LOCAL_WINDOW = 2  # -W -> +W
        _budget = self.html_md_budget
        if _budget <= 0:
            return ""
        # --
        axtree, html_md = web_state["current_accessibility_tree"], web_state.get("html_md", "")
        # first locate raw texts from axtree
        axtree_texts = []
        for line in axtree.split("\n"):
            m = re.findall(r"(?:StaticText|link)\s+'(.*)'", line)
            axtree_texts.extend(m)
        # then locate to the html ones
        html_lines = [z for z in html_md.split("\n") if z.strip() and len(z) > _IGNORE_LINE_LEN]
        hit_lines = set()
        _last_hit = 0
        for one_t in axtree_texts:
            _curr = _last_hit
            while _curr < len(html_lines):
                if one_t in html_lines[_curr]: # hit
                    hit_lines.update([ii for ii in range(_curr-_LOCAL_WINDOW, _curr+_LOCAL_WINDOW+1) if ii>=0 and ii<len(html_lines)])  # add local window
                    _last_hit = _curr
                    break
                _curr += 1
        # get the contents
        _last_idx = -1
        _all_addings = []
        _all_adding_lines = []
        for line_idx in sorted(hit_lines):
            if _budget < 0:
                break
            _line = html_lines[line_idx].rstrip()
            adding = f"...\n{_line}" if (line_idx > _last_idx+1) else _line
            _all_addings.append(adding)
            _all_adding_lines.append(line_idx)
            _budget -= len(adding.encode())  # with regard to bytes!
            _last_idx = line_idx
        while _budget > 0:  # add more lines if we still have budget
            _last_idx = _last_idx + 1
            if _last_idx >= len(html_lines):
                break
            _line = html_lines[_last_idx].rstrip()
            _all_addings.append(_line)
            _all_adding_lines.append(_last_idx)
            _budget -= len(_line.encode())  # with regard to bytes!
        if _last_idx < len(html_lines):
            _all_addings.append("...")
        final_ret = "\n".join(_all_addings)
        return final_ret

    def set_multimodal(self, use_multimodal):
        if use_multimodal is not None:
            self.use_multimodal = use_multimodal

    def get_multimodal(self):
        return self.use_multimodal