File size: 33,673 Bytes
1ea26af
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
#

# utils for our web-agent

import re
import os
import subprocess
import signal
import time
import requests
import base64
import markdownify
from ..agents.utils import KwargsInitializable, rprint, zwarn, zlog

# --
# web state
class WebState:
    def __init__(self, **kwargs):
        # not-changed
        self.browser_id = ""
        self.page_id = ""
        self.target_url = ""
        # from tree-results
        self.get_accessibility_tree_succeed = False
        self.current_accessibility_tree = ""
        self.step_url = ""
        self.html_md = ""
        self.snapshot = ""
        self.boxed_screenshot = ""  # always store the screenshot here
        self.downloaded_file_path = []
        self.current_has_cookie_popup = False
        self.expanded_part = None
        # step info
        self.curr_step = 0  # step to the root
        self.curr_screenshot_mode = False  # whether we are using screenshot or not?
        self.total_actual_step = 0  # [no-rev] total actual steps including reverting (can serve as ID)
        self.num_revert_state = 0  # [no-rev] number of state reversion
        # (last) action information
        self.action_string = ""
        self.action = None
        self.error_message = ""
        # --
        self.update(**kwargs)

    def get_id(self):  # use these as ID
        return (self.browser_id, self.page_id, self.total_actual_step)

    def update(self, **kwargs):
        for k, v in kwargs.items():
            assert (k in self.__dict__), f"Attribute not found for {k} <- {v}"
        self.__dict__.update(**kwargs)

    def to_dict(self):
        return self.__dict__.copy()

    def copy(self):
        return WebState(**self.to_dict())

    def __repr__(self):
        return f"WebState({self.__dict__})"

# --
class MyMarkdownify(markdownify.MarkdownConverter):
    def convert_img(self, el, text, parent_tags):
        return ""  # simply ignore image

    def convert_a(self, el, text, parent_tags):
        if (not text) or (not text.strip()):
            return ""  # empty
        text = text.strip()  # simply strip!
        href = el.get("href")
        if not href:
            href = ""
        if not any(href.startswith(z) for z in ["http", "https"]):
            ret = text  # simply no links
            # ret = ""  # more aggressively remove things! (nope, removing too much...)
        else:
            ret = f"[{text}]({href})"
        return ret

    @staticmethod
    def md_convert(html: str):
        html_md = MyMarkdownify().convert(html)
        valid_lines = []
        for line in html_md.split("\n"):
            line = line.rstrip()
            if not line: continue
            valid_lines.append(line)
        ret = "\n".join(valid_lines)
        return ret

    @classmethod
    def create_from_dict(cls, data):
        """Create WebState instance from dictionary"""
        return cls(**data)

# an opened web browser
class WebEnv(KwargsInitializable):
    def __init__(self, settings=None, starting=True, starting_target_url=None, logger=None, **kwargs):
        # Use configuration from settings - unified web config from [web.env]
        if settings and hasattr(settings, 'web') and hasattr(settings.web, 'env'):
            self.web_ip = settings.web.env.web_ip
            self.web_command = settings.web.env.web_command
            self.web_timeout = settings.web.env.web_timeout
            self.screenshot_boxed = settings.web.env.screenshot_boxed
            self.target_url = settings.web.env.target_url
        else:
            # Fallback defaults if no settings provided
            self.web_ip = "localhost:3000"
            self.web_command = ""
            self.web_timeout = 600
            self.screenshot_boxed = True
            self.target_url = "https://www.bing.com/"
        self.web_ip = settings.web.env.web_ip  # use TOML config from [web.env]
        self.web_command = settings.web.env.web_command  # use TOML config
        self.web_timeout = settings.web.env.web_timeout  # use TOML config
        # self.use_screenshot = False  # add screenshot? -> for simplicity, always store it!
        self.screenshot_boxed = settings.web.env.screenshot_boxed  # use TOML config
        # self.target_url = "https://duckduckgo.com/"  # by default
        self.target_url = settings.web.env.target_url  # use TOML config
        # self.target_url = "https://duckduckgo.com/"  # by default
        self.logger = logger  # 诊断日志器
        # --
        super().__init__(**kwargs)
        # --
        self.state: WebState = None
        self.popen = None  # popen obj for subprocess running
        if starting:
            self.start(starting_target_url)  # start at the beginning
        # --

    def start(self, target_url=None):
        self.stop()  # stop first
        # --
        # optionally start one
        if self.web_command:
            self.popen = subprocess.Popen(self.web_command, shell=True, preexec_fn=os.setsid)  # make a new one
            time.sleep(15)  # wait for some time
            rprint(f"Web-Utils-Start {self.popen}")
        # --
        target_url = target_url if target_url is not None else self.target_url  # otherwise use default
        ### hard code: replace google to bing
        if 'www.google.com' in target_url:
            if not 'www.google.com/maps' in target_url:
                target_url = target_url.replace('www.google.com', 'www.bing.com')
        self.init_state(target_url)

    def stop(self):
        if self.state is not None:
            self.end_state()
            self.state = None
        if self.popen is not None:
            os.killpg(self.popen.pid, signal.SIGKILL)  # kill the PG
            self.popen.kill()
            time.sleep(1)  # slightly wait
            rprint(f"Web-Utils-Kill {self.popen} with {self.popen.poll()}")
            self.popen = None

    def __del__(self):
        self.stop()

    # note: return a copy!
    def get_state(self, export_to_dict=True, return_copy=True):
        assert self.state is not None, "Current state is None, should first start it!"
        if export_to_dict:
            ret = self.state.to_dict()
        elif return_copy:
            ret = self.state.copy()
        else:
            ret = self.state
        return ret

    def get_target_url(self):
        return self.target_url

    # --
    # helpers

    def get_browser(self, storage_state, geo_location):
        url = f"http://{self.web_ip}/getBrowser"
        data = {"storageState": storage_state, "geoLocation": geo_location}

        # 埋点:获取浏览器请求
        if self.logger:
            self.logger.info("[WEB_HTTP] Get_Browser_Request: %s", url)
            self.logger.debug("[WEB_HTTP] Get_Browser_Data: %s", data)

        response = requests.post(url, json=data, timeout=self.web_timeout)

        if response.status_code == 200:
            browser_data = response.json()
            zlog(f"==> Get browser {browser_data}")
            # 埋点:获取浏览器成功
            if self.logger:
                self.logger.info("[WEB_HTTP] Get_Browser_Success: %s", browser_data)
            return browser_data["browserId"]
        else:
            # 埋点:获取浏览器失败
            if self.logger:
                self.logger.error("[WEB_HTTP] Get_Browser_Failed: Status: %s | Response: %s",
                                response.status_code, response.text)
            raise requests.RequestException(f"Getting browser failed: {response}")

    def close_browser(self, browser_id):
        url = f"http://{self.web_ip}/closeBrowser"
        data = {"browserId": browser_id}
        zlog(f"==> Closing browser {browser_id}")
        try:  # put try here
            response = requests.post(url, json=data, timeout=self.web_timeout)
            if response.status_code == 200:
                return None
            else:
                zwarn(f"Bad response when closing browser: {response}")
        except requests.RequestException as e:
            zwarn(f"Request Error: {e}")
        return None

    def open_page(self, browser_id, target_url):
        url = f"http://{self.web_ip}/openPage"
        data = {"browserId": browser_id, "url": target_url}

        # 埋点:打开页面请求
        if self.logger:
            self.logger.info("[WEB_HTTP] Open_Page_Request: %s", url)
            self.logger.info("[WEB_HTTP] Open_Page_Data: Browser: %s | Target: %s", browser_id, target_url)

        response = requests.post(url, json=data, timeout=self.web_timeout)

        if response.status_code == 200:
            page_data = response.json()
            # 埋点:打开页面成功
            if self.logger:
                self.logger.info("[WEB_HTTP] Open_Page_Success: %s", page_data)
            return page_data["pageId"]
        else:
            # 埋点:打开页面失败
            if self.logger:
                self.logger.error("[WEB_HTTP] Open_Page_Failed: Status: %s | Response: %s",
                                response.status_code, response.text)
            raise requests.RequestException(f"Open page Request failed: {response}")

    def goto_url(self, browser_id, page_id, target_url):
        url = f"http://{self.web_ip}/gotoUrl"
        data = {"browserId": browser_id, "pageId": page_id, "targetUrl": target_url}
        response = requests.post(url, json=data, timeout=self.web_timeout)
        if response.status_code == 200:
            return True
        else:
            raise requests.RequestException(f"GOTO page Request failed: {response}")

    def process_html(self, html: str):
        if not html.strip():
            return html  # empty
        return MyMarkdownify.md_convert(html)

    def process_axtree(self, res_json):
        # --
        def _parse_tree_str(_s):
            if "[2]" in _s:
                _lines = _s.split("[2]", 1)[1].split("\n")
                _lines = [z for z in _lines if z.strip().startswith("[")]
                _lines = [" ".join(z.split()[1:]) for z in _lines]
                return _lines
            else:
                return []
        # --
        def _process_tree_str(_s):
            _s = _s.strip()
            if _s.startswith("Tab 0 (current):"):  # todo(+N): sometimes this line can be strange, simply remove it!
                _s = _s.split("\n", 1)[-1].strip()
            return _s
        # --
        html_md = self.process_html(res_json.get("html", ""))
        AccessibilityTree = _process_tree_str(res_json.get("yaml", ""))
        curr_url = res_json.get("url", "")
        snapshot = res_json.get("snapshot", "")
        fulltree = _process_tree_str(res_json.get("fulltree", ""))
        screenshot = res_json.get("boxed_screenshot", "") if self.screenshot_boxed else res_json.get("nonboxed_screenshot", "")
        downloaded_file_path = res_json.get("downloaded_file_path", [])
        all_at, all_ft = _parse_tree_str(AccessibilityTree), _parse_tree_str(fulltree)
        # all_ft_map = {v: i for i, v in enumerate(all_ft)}
        all_ft_map = {}
        for ii, vv in enumerate(all_ft):
            if vv not in all_ft_map:  # no overwritten to get the minumum one
                all_ft_map[vv] = ii
        _hit_at_idxes = [all_ft_map[z] for z in all_at if z in all_ft_map]
        if _hit_at_idxes:
            _last_hit_idx = max(_hit_at_idxes)
            _remaining = len(all_ft) - (_last_hit_idx + 1)
            if _remaining >= len(_hit_at_idxes) * 0.5:  # note: a simple heuristic
                AccessibilityTree = AccessibilityTree.strip() + "\n(* Scroll down to see more items)"
        # --
        ret = {"current_accessibility_tree": AccessibilityTree, "step_url": curr_url, "html_md": html_md, "snapshot": snapshot, "boxed_screenshot": screenshot, "downloaded_file_path": downloaded_file_path}
        return ret

    def get_accessibility_tree(self, browser_id, page_id, current_round):
        url = f"http://{self.web_ip}/getAccessibilityTree"
        data = {
            "browserId": browser_id,
            "pageId": page_id,
            "currentRound": current_round,
        }
        default_axtree = ""  # default empty
        default_res = {"current_accessibility_tree": default_axtree, "step_url": "", "html_md": "", "snapshot": "", "boxed_screenshot": "", "downloaded_file_path": []}
        try:
            response = requests.post(url, json=data, timeout=self.web_timeout)
            if response.status_code == 200:
                res_json = response.json()
                res_dict = self.process_axtree(res_json)
                return True, res_dict
            else:
                zwarn(f"Get accessibility tree Request failed with status code: {response.status_code}")
                return False, default_res
        except requests.RequestException as e:
            zwarn(f"Request failed: {e}")
            return False, default_res

    def action(self, browser_id, page_id, action):
        url = f"http://{self.web_ip}/performAction"
        data = {
            "browserId": browser_id,
            "pageId": page_id,
            "actionName": action["action_name"],
            "targetId": action["target_id"],
            "targetElementType": action["target_element_type"],
            "targetElementName": action["target_element_name"],
            "actionValue": action["action_value"],
            "needEnter": action["need_enter"],
        }

        # 埋点:HTTP 请求详情
        if self.logger:
            self.logger.info("[WEB_HTTP] Request_URL: %s", url)
            self.logger.info("[WEB_HTTP] Request_Data: %s", data)
            self.logger.debug("[WEB_HTTP] Timeout: %s seconds", self.web_timeout)

        try:
            response = requests.post(url, json=data, timeout=self.web_timeout)

            # 埋点:HTTP 响应详情
            if self.logger:
                self.logger.info("[WEB_HTTP] Response_Status: %s", response.status_code)
                if response.status_code != 200:
                    self.logger.error("[WEB_HTTP] Response_Text: %s", response.text)

            if response.status_code == 200:
                return True
            else:
                zwarn(f"Request failed with status code: {response.status_code} {response.text}")
                return False
        except requests.RequestException as e:
            # 埋点:HTTP 请求异常
            if self.logger:
                self.logger.error("[WEB_HTTP] Request_Exception: %s", str(e))
            zwarn(f"Request failed: {e}")
            return False

    # --
    # other helpers

    def is_annoying(self, current_accessbility_tree):
        if "See results closer to you?" in current_accessbility_tree and len(current_accessbility_tree.split("\n")) <= 10:
            return True
        return False

    def parse_action_string(self, action_string: str, state):
        patterns = {"click": r"click\s+\[?(\d+)\]?", "type": r"type\s+\[?(\d+)\]?\s+\{?(.+)\}?", "scroll": r"scroll\s+(down|up)", "wait": "wait", "goback": "goback", "restart": "restart", "stop": r"stop(.*)", "goto": r"goto(.*)", "save": r"save(.*)", "screenshot": r"screenshot(.*)", "nop": r"nop(.*)"}
        action = {"action_name": "", "target_id": None, "action_value": None, "need_enter": None, "target_element_type": None, "target_element_name": None}  # assuming these fields
        if action_string:
            for key, pat in patterns.items():
                m = re.match(pat, action_string, flags=(re.IGNORECASE|re.DOTALL))  # ignore case and allow \n
                if m:
                    action["action_name"] = key
                    if key in ["click", "type"]:
                        action["target_id"] = m.groups()[0]  # target ID
                    if key in ["type", "scroll", "stop", "goto", "save", "screenshot"]:
                        action["action_value"] = m.groups()[-1].strip()  # target value
                        if key == "type":  # quick fix
                            action["action_value"] = action["action_value"].rstrip("}]").rstrip().strip("\"'").strip()
                    # if key == "restart":
                    #     action["action_value"] = state.target_url  # restart
                    break
        return action

    @staticmethod
    def find_target_element_info(current_accessibility_tree, target_id, action_name):
        if target_id is None:
            return None, None, None
        if action_name == "type":
            tree_to_check = current_accessibility_tree.split("\n")[int(target_id) - 1:]
            for i, line in enumerate(tree_to_check):
                if f"[{target_id}]" in line and ("combobox" in line or "box" not in line):
                    num_tabs = len(line) - len(line.lstrip("\t"))
                    for j in range(i + 1, len(tree_to_check)):
                        curr_num_tabs = len(tree_to_check[j]) - len(tree_to_check[j].lstrip("\t"))
                        if curr_num_tabs <= num_tabs:
                            break
                        if "textbox" in tree_to_check[j] or "searchbox" in tree_to_check[j]:
                            target_element_id = tree_to_check[j].split("]")[0].strip()[1:]
                            # print("CATCHED ONE MISSED TYPE ACTION, changing the type action to", target_element_id)
                            target_id = target_element_id
        target_pattern = r"\[" + re.escape(target_id) + r"\] ([a-z]+) '(.*)'"
        matches = re.finditer(target_pattern, current_accessibility_tree, re.IGNORECASE)
        for match in matches:
            target_element_type, target_element_name = match.groups()
            return target_id, target_element_type, target_element_name
        return target_id, None, None

    @staticmethod
    def get_skip_action(current_accessbility_tree):
        # action_name, target_id, action_value, need_enter = extract_info_from_action("click [5]")
        action_name, target_id, action_value, need_enter = "click", "5", "", None
        target_id, target_element_type, target_element_name = WebEnv.find_target_element_info(current_accessbility_tree, target_id, action_name)
        return {
            "action_name": action_name,
            "target_id": target_id,
            "action_value": action_value,
            "need_enter": need_enter,
            "target_element_type": target_element_type,
            "target_element_name": target_element_name,
        }

    @staticmethod
    def check_if_menu_is_expanded(accessibility_tree, snapshot):
        node_to_expand = {}
        lines = accessibility_tree.split("\n")
        for i, line in enumerate(lines):
            if 'hasPopup: menu' in line and 'expanded: true' in line:
                num_tabs = len(line) - len(line.lstrip("\t"))
                next_tabs = len(lines[i + 1]) - len(lines[i + 1].lstrip("\t"))
                if next_tabs <= num_tabs:
                    # In this case, the menu should be expanded but is not present in the tree
                    target_pattern = r"\[(\d+)\] ([a-z]+) '(.*)'"
                    matches = re.finditer(target_pattern, line, re.IGNORECASE)
                    target_id = None
                    target_element_type = None
                    target_element_name = None
                    for match in matches:
                        target_id, target_element_type, target_element_name = match.groups()
                        break
                    if target_element_type is not None:
                        # locate the menu items from the snapshot instead
                        children = WebEnv.find_node_with_children(snapshot, target_element_type, target_element_name)
                        if children is not None:
                            node_to_expand[i] = (num_tabs + 1, children, target_id, target_element_type, target_element_name)
        new_lines = []
        curr = 1
        if len(node_to_expand) == 0:
            return accessibility_tree, None
        expanded_part = {}
        # add the menu items to the correct location in the tree
        for i, line in enumerate(lines):
            if not line.strip().startswith('['):
                new_lines.append(line)
                continue
            num_tabs = len(line) - len(line.lstrip("\t"))
            content = line.split('] ')[1]
            new_lines.append('\t' * num_tabs + f"[{curr}] {content}")
            curr += 1
            if i in node_to_expand:
                for child in node_to_expand[i][1]:
                    child_content = f"{child.get('role', '')} '{child.get('name', '')}' " + ' '.join([f"{k}: {v}" for k, v in child.items() if k not in ['role', 'name']])
                    tabs = '\t' * node_to_expand[i][0]
                    new_lines.append(f"{tabs}[{curr}] {child_content}")
                    expanded_part[curr] = (node_to_expand[i][2], node_to_expand[i][3], node_to_expand[i][4])
                    curr += 1
        return '\n'.join(new_lines), expanded_part

    @staticmethod
    def find_node_with_children(node, target_role, target_name):
        # Check if the current node matches the target role and name
        if node.get('role') == target_role and node.get('name') == target_name:
            return node.get('children', None)
        # If the node has children, recursively search through them
        children = node.get('children', [])
        for child in children:
            result = WebEnv.find_node_with_children(child, target_role, target_name)
            if result is not None:
                return result
        # If no matching node is found, return None
        return None

    # --
    # main step

    def init_state(self, target_url: str):
        # 埋点:开始初始化浏览器状态
        if self.logger:
            self.logger.info("[WEB_INIT] Starting browser initialization")
            self.logger.info("[WEB_INIT] Target_URL: %s", target_url)
            self.logger.info("[WEB_INIT] Web_IP: %s", self.web_ip)

        browser_id = self.get_browser(None, None)

        # 埋点:浏览器创建成功
        if self.logger:
            self.logger.info("[WEB_INIT] Browser_Created: %s", browser_id)

        page_id = self.open_page(browser_id, target_url)

        # 埋点:页面打开成功
        if self.logger:
            self.logger.info("[WEB_INIT] Page_Opened: %s", page_id)

        curr_step = 0
        state = WebState(browser_id=browser_id, page_id=page_id, target_url=target_url, curr_step=curr_step, total_actual_step=curr_step)  # start from 0
        results = self._get_accessibility_tree_results(state)
        state.update(**results)  # update it!

        # 埋点:状态初始化完成
        if self.logger:
            actual_url = getattr(state, 'step_url', 'unknown')
            self.logger.info("[WEB_INIT] State_Initialized: Actual_URL: %s", actual_url)
            if actual_url != target_url:
                self.logger.warning("[WEB_INIT] URL_Mismatch: Expected: %s | Actual: %s", target_url, actual_url)

        # --
        self.state = state  # set the new state!
        # --

    def end_state(self):
        state = self.state
        self.close_browser(state.browser_id)

    def reset_to_state(self, target_state):
        state = self.state
        if isinstance(target_state, dict):
            target_state = WebState.create_from_dict(target_state)
        # assert state.browser_id == target_state.browser_id and state.page_id == target_state.page_id, "Mismatched basic IDs"
        if state.get_id() != target_state.get_id():  # need to revert to another URL
            self.goto_url(target_state.browser_id, target_state.page_id, target_state.step_url)
            state.update(browser_id=target_state.browser_id, page_id=target_state.page_id)
            results = self._get_accessibility_tree_results(state)
            state.update(**results)  # update it!
            # --
            # revert other state info
            state.update(curr_step=target_state.curr_step, action_string=target_state.action_string, action=target_state.action, error_message=target_state.error_message)  # no change of total_step!
            state.num_revert_state += 1
            # --
            zlog(f"Reset state with URL={target_state.step_url}")
            return True
        else:
            assert state.to_dict() == target_state.to_dict(), "Mismatched state!"
            zlog("No need for state resetting!")
            return False
        # --

    def _get_accessibility_tree_results(self, state):
        get_accessibility_tree_succeed, curr_res = self.get_accessibility_tree(state.browser_id, state.page_id, state.curr_step)
        current_accessibility_tree = curr_res.get("current_accessibility_tree", "")
        if not get_accessibility_tree_succeed:
            zwarn("Failed to get current_accessibility_tree!!")
        if self.is_annoying(current_accessibility_tree):
            skip_this_action = self.get_skip_action(current_accessibility_tree)
            self.action(state.browser_id, state.page_id, skip_this_action)
            get_accessibility_tree_succeed, curr_res = self.get_accessibility_tree(state.browser_id, state.page_id, state.curr_step)
        # try to close cookie popup
        if "Cookie banner" in current_accessibility_tree:
            current_has_cookie_popup = True  # note: only mark here!
        else:
            current_has_cookie_popup = False
        current_accessibility_tree, expanded_part = self.check_if_menu_is_expanded(current_accessibility_tree, curr_res["snapshot"])
        # --
        # if (not self.use_screenshot) and ("boxed_screenshot" in curr_res):  # note: no storing of snapshot since it is too much
        #     del curr_res["boxed_screenshot"]  # for simplicity, always store it
        # --
        # more checking on axtree
        if not current_accessibility_tree or ("[2]" not in current_accessibility_tree):  # at least we should have some elements!
            curr_res["current_accessibility_tree"] = current_accessibility_tree + "\n**Warning**: The accessibility tree is currently unavailable. Please try some alternative actions. If the issue persists after multiple attempts, consider goback or restart."
        # --
        curr_res.update(get_accessibility_tree_succeed=get_accessibility_tree_succeed, current_has_cookie_popup=current_has_cookie_popup, expanded_part=expanded_part)
        return curr_res

    def step_state(self, action_string: str):
        state = self.state

        # 埋点:WebEnv 开始执行动作
        if self.logger:
            self.logger.info("[WEB_ENV] Step_State_Start: %s", action_string)
            self.logger.debug("[WEB_ENV] Current_URL: %s", getattr(state, 'step_url', 'unknown'))

        # --
        need_enter = True
        if "[NOENTER]" in action_string:
            need_enter = False
            action_string = action_string.replace("[NOENTER]", "")  # note: ugly quick fix ...
        # --
        action_string = action_string.strip()
        # parse action
        action = self.parse_action_string(action_string, state)

        # 埋点:动作解析结果
        if self.logger:
            self.logger.info("[WEB_ENV] Parsed_Action: %s", action)
        if action["action_name"]:
            if action["action_name"] in ["click", "type"]:  # need more handling
                target_id, target_element_type, target_element_name = self.find_target_element_info(state.current_accessibility_tree, action["target_id"], action["action_name"])
                if state.expanded_part and int(target_id) in state.expanded_part:
                    expand_target_id, expand_target_type, expand_target_name = state.expanded_part[int(target_id)]
                    action.update({"action_name": "select", "target_id": expand_target_id, "action_value": target_element_name, "target_element_type": expand_target_type, "target_element_name": expand_target_name})
                else:
                    action.update({"target_id": target_id, "target_element_type": target_element_type, "target_element_name": target_element_name})
            if action["action_name"] == "type":
                action["need_enter"] = need_enter
        zlog(f"[CallWeb:{state.curr_step}:{state.total_actual_step}] ACTION={action} ACTION_STR={action_string}", timed=True)
        # --
        # execution
        state.curr_step += 1
        state.total_actual_step += 1
        state.update(action=action, action_string=action_string, error_message="")  # first update some of the things
        if not action["action_name"]:  # UNK action
            state.error_message = f"The action you previously choose is not well-formatted: {action_string}. Please double-check if you have selected the correct element or used correct action format."
            ret = state.error_message
            # 埋点:动作格式错误
            if self.logger:
                self.logger.error("[WEB_ENV] Action_Parse_Error: %s", action_string)
        elif action["action_name"] in ["stop", "save", "nop"]:  # ok, nothing to do
            ret = f"Browser step: {action_string}"
            # 埋点:简单动作执行
            if self.logger:
                self.logger.info("[WEB_ENV] Simple_Action: %s", action["action_name"])
        elif action["action_name"] == "screenshot":
            _old_mode = state.curr_screenshot_mode
            _fields = action["action_value"].split() + [""] * 2
            _new_mode = _fields[0].lower() in ["1", "true", "yes"]
            _save_path = _fields[1].strip()
            if _save_path:
                try:
                    assert state.boxed_screenshot.strip(), "Screenshot not available!"
                    file_bytes = base64.b64decode(state.boxed_screenshot)
                    _dir = os.path.dirname(_save_path)
                    if _dir:
                        os.makedirs(_dir, exist_ok=True)
                    with open(_save_path, 'wb') as fd:
                        fd.write(file_bytes)
                    save_info = f" (Current screenshot saved to {_save_path}.)"
                except Exception as e:
                    save_info = f" (Error {e} when saving screenshot.)"
            else:
                save_info = ""
            state.curr_screenshot_mode = _new_mode
            ret = f"Browser step: {action_string} -> Changing curr_screenshot_mode from {_old_mode} to {_new_mode}" + save_info
        else:
            # actually perform action
            # 埋点:即将执行浏览器动作
            if self.logger:
                self.logger.info("[WEB_ENV] Executing_Browser_Action: %s | Browser_ID: %s | Page_ID: %s",
                                action["action_name"], state.browser_id, state.page_id)

            action_succeed = self.action(state.browser_id, state.page_id, action)

            if not action_succeed:  # no succeed
                state.error_message = f"The action you have chosen cannot be executed: {action_string}. Please double-check if you have selected the correct element or used correct action format."
                ret = state.error_message
                # 埋点:浏览器动作执行失败
                if self.logger:
                    self.logger.error("[WEB_ENV] Browser_Action_Failed: %s", action_string)
            else:  # get new states
                # 埋点:浏览器动作执行成功,获取新状态
                if self.logger:
                    self.logger.info("[WEB_ENV] Browser_Action_Success: %s", action_string)
                    self.logger.debug("[WEB_ENV] Getting_New_State...")

                results = self._get_accessibility_tree_results(state)
                state.update(**results)  # update it!
                ret = f"Browser step: {action_string}"

                # 埋点:状态更新完成
                if self.logger:
                    new_url = getattr(state, 'step_url', 'unknown')
                    self.logger.info("[WEB_ENV] State_Updated: New_URL: %s", new_url)
        return ret
        # --

    # sync files between remote and local dirs
    def sync_files(self):
        # --
        def _get_file(_f: str):
            url = f"http://{self.web_ip}/getFile"
            data = {"filename": _f}
            try:
                response = requests.post(url, json=data, timeout=self.web_timeout)
                if response.status_code == 200:
                    res_json = response.json()
                    base64_str = res_json["file"]
                    file_bytes = base64.b64decode(base64_str)
                    if _f:
                        _dir = os.path.dirname(_f)
                        if _dir:
                            os.makedirs(_dir, exist_ok=True)
                    with open(_f, 'wb') as fd:  # Change output filename as needed
                        fd.write(file_bytes)
                    return True
                else:
                    zwarn(f"Get file failed with status code: {response.status_code}")
                    return False
            except Exception as e:
                zwarn(f"Request failed: {e}")
                return False
        # --
        files = {}
        for file in self.state.downloaded_file_path:
            if not os.path.exists(file):
                fres = _get_file(file)
                files[file] = f"Get[res={fres}]"
            else:
                files[file] = "Exist"
        zlog(f"Sync files: {files}")

    def screenshot_mode(self, flag=None):
        old_mode = self.state.curr_screenshot_mode
        new_mode = old_mode
        if flag is not None:  # set as flag
            self.state.curr_screenshot_mode = flag
        return old_mode, new_mode