# # utils for our web-agent import re import os import subprocess import signal import time import requests import base64 import markdownify from ..agents.utils import KwargsInitializable, rprint, zwarn, zlog # -- # web state class WebState: def __init__(self, **kwargs): # not-changed self.browser_id = "" self.page_id = "" self.target_url = "" # from tree-results self.get_accessibility_tree_succeed = False self.current_accessibility_tree = "" self.step_url = "" self.html_md = "" self.snapshot = "" self.boxed_screenshot = "" # always store the screenshot here self.downloaded_file_path = [] self.current_has_cookie_popup = False self.expanded_part = None # step info self.curr_step = 0 # step to the root self.curr_screenshot_mode = False # whether we are using screenshot or not? self.total_actual_step = 0 # [no-rev] total actual steps including reverting (can serve as ID) self.num_revert_state = 0 # [no-rev] number of state reversion # (last) action information self.action_string = "" self.action = None self.error_message = "" # -- self.update(**kwargs) def get_id(self): # use these as ID return (self.browser_id, self.page_id, self.total_actual_step) def update(self, **kwargs): for k, v in kwargs.items(): assert (k in self.__dict__), f"Attribute not found for {k} <- {v}" self.__dict__.update(**kwargs) def to_dict(self): return self.__dict__.copy() def copy(self): return WebState(**self.to_dict()) def __repr__(self): return f"WebState({self.__dict__})" # -- class MyMarkdownify(markdownify.MarkdownConverter): def convert_img(self, el, text, parent_tags): return "" # simply ignore image def convert_a(self, el, text, parent_tags): if (not text) or (not text.strip()): return "" # empty text = text.strip() # simply strip! href = el.get("href") if not href: href = "" if not any(href.startswith(z) for z in ["http", "https"]): ret = text # simply no links # ret = "" # more aggressively remove things! (nope, removing too much...) else: ret = f"[{text}]({href})" return ret @staticmethod def md_convert(html: str): html_md = MyMarkdownify().convert(html) valid_lines = [] for line in html_md.split("\n"): line = line.rstrip() if not line: continue valid_lines.append(line) ret = "\n".join(valid_lines) return ret @classmethod def create_from_dict(cls, data): """Create WebState instance from dictionary""" return cls(**data) # an opened web browser class WebEnv(KwargsInitializable): def __init__(self, settings=None, starting=True, starting_target_url=None, logger=None, **kwargs): # Use configuration from settings - unified web config from [web.env] if settings and hasattr(settings, 'web') and hasattr(settings.web, 'env'): self.web_ip = settings.web.env.web_ip self.web_command = settings.web.env.web_command self.web_timeout = settings.web.env.web_timeout self.screenshot_boxed = settings.web.env.screenshot_boxed self.target_url = settings.web.env.target_url else: # Fallback defaults if no settings provided self.web_ip = "localhost:3000" self.web_command = "" self.web_timeout = 600 self.screenshot_boxed = True self.target_url = "https://www.bing.com/" self.web_ip = settings.web.env.web_ip # use TOML config from [web.env] self.web_command = settings.web.env.web_command # use TOML config self.web_timeout = settings.web.env.web_timeout # use TOML config # self.use_screenshot = False # add screenshot? -> for simplicity, always store it! self.screenshot_boxed = settings.web.env.screenshot_boxed # use TOML config # self.target_url = "https://duckduckgo.com/" # by default self.target_url = settings.web.env.target_url # use TOML config # self.target_url = "https://duckduckgo.com/" # by default self.logger = logger # 诊断日志器 # -- super().__init__(**kwargs) # -- self.state: WebState = None self.popen = None # popen obj for subprocess running if starting: self.start(starting_target_url) # start at the beginning # -- def start(self, target_url=None): self.stop() # stop first # -- # optionally start one if self.web_command: self.popen = subprocess.Popen(self.web_command, shell=True, preexec_fn=os.setsid) # make a new one time.sleep(15) # wait for some time rprint(f"Web-Utils-Start {self.popen}") # -- target_url = target_url if target_url is not None else self.target_url # otherwise use default ### hard code: replace google to bing if 'www.google.com' in target_url: if not 'www.google.com/maps' in target_url: target_url = target_url.replace('www.google.com', 'www.bing.com') self.init_state(target_url) def stop(self): if self.state is not None: self.end_state() self.state = None if self.popen is not None: os.killpg(self.popen.pid, signal.SIGKILL) # kill the PG self.popen.kill() time.sleep(1) # slightly wait rprint(f"Web-Utils-Kill {self.popen} with {self.popen.poll()}") self.popen = None def __del__(self): self.stop() # note: return a copy! def get_state(self, export_to_dict=True, return_copy=True): assert self.state is not None, "Current state is None, should first start it!" if export_to_dict: ret = self.state.to_dict() elif return_copy: ret = self.state.copy() else: ret = self.state return ret def get_target_url(self): return self.target_url # -- # helpers def get_browser(self, storage_state, geo_location): url = f"http://{self.web_ip}/getBrowser" data = {"storageState": storage_state, "geoLocation": geo_location} # 埋点:获取浏览器请求 if self.logger: self.logger.info("[WEB_HTTP] Get_Browser_Request: %s", url) self.logger.debug("[WEB_HTTP] Get_Browser_Data: %s", data) response = requests.post(url, json=data, timeout=self.web_timeout) if response.status_code == 200: browser_data = response.json() zlog(f"==> Get browser {browser_data}") # 埋点:获取浏览器成功 if self.logger: self.logger.info("[WEB_HTTP] Get_Browser_Success: %s", browser_data) return browser_data["browserId"] else: # 埋点:获取浏览器失败 if self.logger: self.logger.error("[WEB_HTTP] Get_Browser_Failed: Status: %s | Response: %s", response.status_code, response.text) raise requests.RequestException(f"Getting browser failed: {response}") def close_browser(self, browser_id): url = f"http://{self.web_ip}/closeBrowser" data = {"browserId": browser_id} zlog(f"==> Closing browser {browser_id}") try: # put try here response = requests.post(url, json=data, timeout=self.web_timeout) if response.status_code == 200: return None else: zwarn(f"Bad response when closing browser: {response}") except requests.RequestException as e: zwarn(f"Request Error: {e}") return None def open_page(self, browser_id, target_url): url = f"http://{self.web_ip}/openPage" data = {"browserId": browser_id, "url": target_url} # 埋点:打开页面请求 if self.logger: self.logger.info("[WEB_HTTP] Open_Page_Request: %s", url) self.logger.info("[WEB_HTTP] Open_Page_Data: Browser: %s | Target: %s", browser_id, target_url) response = requests.post(url, json=data, timeout=self.web_timeout) if response.status_code == 200: page_data = response.json() # 埋点:打开页面成功 if self.logger: self.logger.info("[WEB_HTTP] Open_Page_Success: %s", page_data) return page_data["pageId"] else: # 埋点:打开页面失败 if self.logger: self.logger.error("[WEB_HTTP] Open_Page_Failed: Status: %s | Response: %s", response.status_code, response.text) raise requests.RequestException(f"Open page Request failed: {response}") def goto_url(self, browser_id, page_id, target_url): url = f"http://{self.web_ip}/gotoUrl" data = {"browserId": browser_id, "pageId": page_id, "targetUrl": target_url} response = requests.post(url, json=data, timeout=self.web_timeout) if response.status_code == 200: return True else: raise requests.RequestException(f"GOTO page Request failed: {response}") def process_html(self, html: str): if not html.strip(): return html # empty return MyMarkdownify.md_convert(html) def process_axtree(self, res_json): # -- def _parse_tree_str(_s): if "[2]" in _s: _lines = _s.split("[2]", 1)[1].split("\n") _lines = [z for z in _lines if z.strip().startswith("[")] _lines = [" ".join(z.split()[1:]) for z in _lines] return _lines else: return [] # -- def _process_tree_str(_s): _s = _s.strip() if _s.startswith("Tab 0 (current):"): # todo(+N): sometimes this line can be strange, simply remove it! _s = _s.split("\n", 1)[-1].strip() return _s # -- html_md = self.process_html(res_json.get("html", "")) AccessibilityTree = _process_tree_str(res_json.get("yaml", "")) curr_url = res_json.get("url", "") snapshot = res_json.get("snapshot", "") fulltree = _process_tree_str(res_json.get("fulltree", "")) screenshot = res_json.get("boxed_screenshot", "") if self.screenshot_boxed else res_json.get("nonboxed_screenshot", "") downloaded_file_path = res_json.get("downloaded_file_path", []) all_at, all_ft = _parse_tree_str(AccessibilityTree), _parse_tree_str(fulltree) # all_ft_map = {v: i for i, v in enumerate(all_ft)} all_ft_map = {} for ii, vv in enumerate(all_ft): if vv not in all_ft_map: # no overwritten to get the minumum one all_ft_map[vv] = ii _hit_at_idxes = [all_ft_map[z] for z in all_at if z in all_ft_map] if _hit_at_idxes: _last_hit_idx = max(_hit_at_idxes) _remaining = len(all_ft) - (_last_hit_idx + 1) if _remaining >= len(_hit_at_idxes) * 0.5: # note: a simple heuristic AccessibilityTree = AccessibilityTree.strip() + "\n(* Scroll down to see more items)" # -- ret = {"current_accessibility_tree": AccessibilityTree, "step_url": curr_url, "html_md": html_md, "snapshot": snapshot, "boxed_screenshot": screenshot, "downloaded_file_path": downloaded_file_path} return ret def get_accessibility_tree(self, browser_id, page_id, current_round): url = f"http://{self.web_ip}/getAccessibilityTree" data = { "browserId": browser_id, "pageId": page_id, "currentRound": current_round, } default_axtree = "" # default empty default_res = {"current_accessibility_tree": default_axtree, "step_url": "", "html_md": "", "snapshot": "", "boxed_screenshot": "", "downloaded_file_path": []} try: response = requests.post(url, json=data, timeout=self.web_timeout) if response.status_code == 200: res_json = response.json() res_dict = self.process_axtree(res_json) return True, res_dict else: zwarn(f"Get accessibility tree Request failed with status code: {response.status_code}") return False, default_res except requests.RequestException as e: zwarn(f"Request failed: {e}") return False, default_res def action(self, browser_id, page_id, action): url = f"http://{self.web_ip}/performAction" data = { "browserId": browser_id, "pageId": page_id, "actionName": action["action_name"], "targetId": action["target_id"], "targetElementType": action["target_element_type"], "targetElementName": action["target_element_name"], "actionValue": action["action_value"], "needEnter": action["need_enter"], } # 埋点:HTTP 请求详情 if self.logger: self.logger.info("[WEB_HTTP] Request_URL: %s", url) self.logger.info("[WEB_HTTP] Request_Data: %s", data) self.logger.debug("[WEB_HTTP] Timeout: %s seconds", self.web_timeout) try: response = requests.post(url, json=data, timeout=self.web_timeout) # 埋点:HTTP 响应详情 if self.logger: self.logger.info("[WEB_HTTP] Response_Status: %s", response.status_code) if response.status_code != 200: self.logger.error("[WEB_HTTP] Response_Text: %s", response.text) if response.status_code == 200: return True else: zwarn(f"Request failed with status code: {response.status_code} {response.text}") return False except requests.RequestException as e: # 埋点:HTTP 请求异常 if self.logger: self.logger.error("[WEB_HTTP] Request_Exception: %s", str(e)) zwarn(f"Request failed: {e}") return False # -- # other helpers def is_annoying(self, current_accessbility_tree): if "See results closer to you?" in current_accessbility_tree and len(current_accessbility_tree.split("\n")) <= 10: return True return False def parse_action_string(self, action_string: str, state): patterns = {"click": r"click\s+\[?(\d+)\]?", "type": r"type\s+\[?(\d+)\]?\s+\{?(.+)\}?", "scroll": r"scroll\s+(down|up)", "wait": "wait", "goback": "goback", "restart": "restart", "stop": r"stop(.*)", "goto": r"goto(.*)", "save": r"save(.*)", "screenshot": r"screenshot(.*)", "nop": r"nop(.*)"} action = {"action_name": "", "target_id": None, "action_value": None, "need_enter": None, "target_element_type": None, "target_element_name": None} # assuming these fields if action_string: for key, pat in patterns.items(): m = re.match(pat, action_string, flags=(re.IGNORECASE|re.DOTALL)) # ignore case and allow \n if m: action["action_name"] = key if key in ["click", "type"]: action["target_id"] = m.groups()[0] # target ID if key in ["type", "scroll", "stop", "goto", "save", "screenshot"]: action["action_value"] = m.groups()[-1].strip() # target value if key == "type": # quick fix action["action_value"] = action["action_value"].rstrip("}]").rstrip().strip("\"'").strip() # if key == "restart": # action["action_value"] = state.target_url # restart break return action @staticmethod def find_target_element_info(current_accessibility_tree, target_id, action_name): if target_id is None: return None, None, None if action_name == "type": tree_to_check = current_accessibility_tree.split("\n")[int(target_id) - 1:] for i, line in enumerate(tree_to_check): if f"[{target_id}]" in line and ("combobox" in line or "box" not in line): num_tabs = len(line) - len(line.lstrip("\t")) for j in range(i + 1, len(tree_to_check)): curr_num_tabs = len(tree_to_check[j]) - len(tree_to_check[j].lstrip("\t")) if curr_num_tabs <= num_tabs: break if "textbox" in tree_to_check[j] or "searchbox" in tree_to_check[j]: target_element_id = tree_to_check[j].split("]")[0].strip()[1:] # print("CATCHED ONE MISSED TYPE ACTION, changing the type action to", target_element_id) target_id = target_element_id target_pattern = r"\[" + re.escape(target_id) + r"\] ([a-z]+) '(.*)'" matches = re.finditer(target_pattern, current_accessibility_tree, re.IGNORECASE) for match in matches: target_element_type, target_element_name = match.groups() return target_id, target_element_type, target_element_name return target_id, None, None @staticmethod def get_skip_action(current_accessbility_tree): # action_name, target_id, action_value, need_enter = extract_info_from_action("click [5]") action_name, target_id, action_value, need_enter = "click", "5", "", None target_id, target_element_type, target_element_name = WebEnv.find_target_element_info(current_accessbility_tree, target_id, action_name) return { "action_name": action_name, "target_id": target_id, "action_value": action_value, "need_enter": need_enter, "target_element_type": target_element_type, "target_element_name": target_element_name, } @staticmethod def check_if_menu_is_expanded(accessibility_tree, snapshot): node_to_expand = {} lines = accessibility_tree.split("\n") for i, line in enumerate(lines): if 'hasPopup: menu' in line and 'expanded: true' in line: num_tabs = len(line) - len(line.lstrip("\t")) next_tabs = len(lines[i + 1]) - len(lines[i + 1].lstrip("\t")) if next_tabs <= num_tabs: # In this case, the menu should be expanded but is not present in the tree target_pattern = r"\[(\d+)\] ([a-z]+) '(.*)'" matches = re.finditer(target_pattern, line, re.IGNORECASE) target_id = None target_element_type = None target_element_name = None for match in matches: target_id, target_element_type, target_element_name = match.groups() break if target_element_type is not None: # locate the menu items from the snapshot instead children = WebEnv.find_node_with_children(snapshot, target_element_type, target_element_name) if children is not None: node_to_expand[i] = (num_tabs + 1, children, target_id, target_element_type, target_element_name) new_lines = [] curr = 1 if len(node_to_expand) == 0: return accessibility_tree, None expanded_part = {} # add the menu items to the correct location in the tree for i, line in enumerate(lines): if not line.strip().startswith('['): new_lines.append(line) continue num_tabs = len(line) - len(line.lstrip("\t")) content = line.split('] ')[1] new_lines.append('\t' * num_tabs + f"[{curr}] {content}") curr += 1 if i in node_to_expand: for child in node_to_expand[i][1]: child_content = f"{child.get('role', '')} '{child.get('name', '')}' " + ' '.join([f"{k}: {v}" for k, v in child.items() if k not in ['role', 'name']]) tabs = '\t' * node_to_expand[i][0] new_lines.append(f"{tabs}[{curr}] {child_content}") expanded_part[curr] = (node_to_expand[i][2], node_to_expand[i][3], node_to_expand[i][4]) curr += 1 return '\n'.join(new_lines), expanded_part @staticmethod def find_node_with_children(node, target_role, target_name): # Check if the current node matches the target role and name if node.get('role') == target_role and node.get('name') == target_name: return node.get('children', None) # If the node has children, recursively search through them children = node.get('children', []) for child in children: result = WebEnv.find_node_with_children(child, target_role, target_name) if result is not None: return result # If no matching node is found, return None return None # -- # main step def init_state(self, target_url: str): # 埋点:开始初始化浏览器状态 if self.logger: self.logger.info("[WEB_INIT] Starting browser initialization") self.logger.info("[WEB_INIT] Target_URL: %s", target_url) self.logger.info("[WEB_INIT] Web_IP: %s", self.web_ip) browser_id = self.get_browser(None, None) # 埋点:浏览器创建成功 if self.logger: self.logger.info("[WEB_INIT] Browser_Created: %s", browser_id) page_id = self.open_page(browser_id, target_url) # 埋点:页面打开成功 if self.logger: self.logger.info("[WEB_INIT] Page_Opened: %s", page_id) curr_step = 0 state = WebState(browser_id=browser_id, page_id=page_id, target_url=target_url, curr_step=curr_step, total_actual_step=curr_step) # start from 0 results = self._get_accessibility_tree_results(state) state.update(**results) # update it! # 埋点:状态初始化完成 if self.logger: actual_url = getattr(state, 'step_url', 'unknown') self.logger.info("[WEB_INIT] State_Initialized: Actual_URL: %s", actual_url) if actual_url != target_url: self.logger.warning("[WEB_INIT] URL_Mismatch: Expected: %s | Actual: %s", target_url, actual_url) # -- self.state = state # set the new state! # -- def end_state(self): state = self.state self.close_browser(state.browser_id) def reset_to_state(self, target_state): state = self.state if isinstance(target_state, dict): target_state = WebState.create_from_dict(target_state) # assert state.browser_id == target_state.browser_id and state.page_id == target_state.page_id, "Mismatched basic IDs" if state.get_id() != target_state.get_id(): # need to revert to another URL self.goto_url(target_state.browser_id, target_state.page_id, target_state.step_url) state.update(browser_id=target_state.browser_id, page_id=target_state.page_id) results = self._get_accessibility_tree_results(state) state.update(**results) # update it! # -- # revert other state info state.update(curr_step=target_state.curr_step, action_string=target_state.action_string, action=target_state.action, error_message=target_state.error_message) # no change of total_step! state.num_revert_state += 1 # -- zlog(f"Reset state with URL={target_state.step_url}") return True else: assert state.to_dict() == target_state.to_dict(), "Mismatched state!" zlog("No need for state resetting!") return False # -- def _get_accessibility_tree_results(self, state): get_accessibility_tree_succeed, curr_res = self.get_accessibility_tree(state.browser_id, state.page_id, state.curr_step) current_accessibility_tree = curr_res.get("current_accessibility_tree", "") if not get_accessibility_tree_succeed: zwarn("Failed to get current_accessibility_tree!!") if self.is_annoying(current_accessibility_tree): skip_this_action = self.get_skip_action(current_accessibility_tree) self.action(state.browser_id, state.page_id, skip_this_action) get_accessibility_tree_succeed, curr_res = self.get_accessibility_tree(state.browser_id, state.page_id, state.curr_step) # try to close cookie popup if "Cookie banner" in current_accessibility_tree: current_has_cookie_popup = True # note: only mark here! else: current_has_cookie_popup = False current_accessibility_tree, expanded_part = self.check_if_menu_is_expanded(current_accessibility_tree, curr_res["snapshot"]) # -- # if (not self.use_screenshot) and ("boxed_screenshot" in curr_res): # note: no storing of snapshot since it is too much # del curr_res["boxed_screenshot"] # for simplicity, always store it # -- # more checking on axtree if not current_accessibility_tree or ("[2]" not in current_accessibility_tree): # at least we should have some elements! curr_res["current_accessibility_tree"] = current_accessibility_tree + "\n**Warning**: The accessibility tree is currently unavailable. Please try some alternative actions. If the issue persists after multiple attempts, consider goback or restart." # -- curr_res.update(get_accessibility_tree_succeed=get_accessibility_tree_succeed, current_has_cookie_popup=current_has_cookie_popup, expanded_part=expanded_part) return curr_res def step_state(self, action_string: str): state = self.state # 埋点:WebEnv 开始执行动作 if self.logger: self.logger.info("[WEB_ENV] Step_State_Start: %s", action_string) self.logger.debug("[WEB_ENV] Current_URL: %s", getattr(state, 'step_url', 'unknown')) # -- need_enter = True if "[NOENTER]" in action_string: need_enter = False action_string = action_string.replace("[NOENTER]", "") # note: ugly quick fix ... # -- action_string = action_string.strip() # parse action action = self.parse_action_string(action_string, state) # 埋点:动作解析结果 if self.logger: self.logger.info("[WEB_ENV] Parsed_Action: %s", action) if action["action_name"]: if action["action_name"] in ["click", "type"]: # need more handling target_id, target_element_type, target_element_name = self.find_target_element_info(state.current_accessibility_tree, action["target_id"], action["action_name"]) if state.expanded_part and int(target_id) in state.expanded_part: expand_target_id, expand_target_type, expand_target_name = state.expanded_part[int(target_id)] action.update({"action_name": "select", "target_id": expand_target_id, "action_value": target_element_name, "target_element_type": expand_target_type, "target_element_name": expand_target_name}) else: action.update({"target_id": target_id, "target_element_type": target_element_type, "target_element_name": target_element_name}) if action["action_name"] == "type": action["need_enter"] = need_enter zlog(f"[CallWeb:{state.curr_step}:{state.total_actual_step}] ACTION={action} ACTION_STR={action_string}", timed=True) # -- # execution state.curr_step += 1 state.total_actual_step += 1 state.update(action=action, action_string=action_string, error_message="") # first update some of the things if not action["action_name"]: # UNK action state.error_message = f"The action you previously choose is not well-formatted: {action_string}. Please double-check if you have selected the correct element or used correct action format." ret = state.error_message # 埋点:动作格式错误 if self.logger: self.logger.error("[WEB_ENV] Action_Parse_Error: %s", action_string) elif action["action_name"] in ["stop", "save", "nop"]: # ok, nothing to do ret = f"Browser step: {action_string}" # 埋点:简单动作执行 if self.logger: self.logger.info("[WEB_ENV] Simple_Action: %s", action["action_name"]) elif action["action_name"] == "screenshot": _old_mode = state.curr_screenshot_mode _fields = action["action_value"].split() + [""] * 2 _new_mode = _fields[0].lower() in ["1", "true", "yes"] _save_path = _fields[1].strip() if _save_path: try: assert state.boxed_screenshot.strip(), "Screenshot not available!" file_bytes = base64.b64decode(state.boxed_screenshot) _dir = os.path.dirname(_save_path) if _dir: os.makedirs(_dir, exist_ok=True) with open(_save_path, 'wb') as fd: fd.write(file_bytes) save_info = f" (Current screenshot saved to {_save_path}.)" except Exception as e: save_info = f" (Error {e} when saving screenshot.)" else: save_info = "" state.curr_screenshot_mode = _new_mode ret = f"Browser step: {action_string} -> Changing curr_screenshot_mode from {_old_mode} to {_new_mode}" + save_info else: # actually perform action # 埋点:即将执行浏览器动作 if self.logger: self.logger.info("[WEB_ENV] Executing_Browser_Action: %s | Browser_ID: %s | Page_ID: %s", action["action_name"], state.browser_id, state.page_id) action_succeed = self.action(state.browser_id, state.page_id, action) if not action_succeed: # no succeed state.error_message = f"The action you have chosen cannot be executed: {action_string}. Please double-check if you have selected the correct element or used correct action format." ret = state.error_message # 埋点:浏览器动作执行失败 if self.logger: self.logger.error("[WEB_ENV] Browser_Action_Failed: %s", action_string) else: # get new states # 埋点:浏览器动作执行成功,获取新状态 if self.logger: self.logger.info("[WEB_ENV] Browser_Action_Success: %s", action_string) self.logger.debug("[WEB_ENV] Getting_New_State...") results = self._get_accessibility_tree_results(state) state.update(**results) # update it! ret = f"Browser step: {action_string}" # 埋点:状态更新完成 if self.logger: new_url = getattr(state, 'step_url', 'unknown') self.logger.info("[WEB_ENV] State_Updated: New_URL: %s", new_url) return ret # -- # sync files between remote and local dirs def sync_files(self): # -- def _get_file(_f: str): url = f"http://{self.web_ip}/getFile" data = {"filename": _f} try: response = requests.post(url, json=data, timeout=self.web_timeout) if response.status_code == 200: res_json = response.json() base64_str = res_json["file"] file_bytes = base64.b64decode(base64_str) if _f: _dir = os.path.dirname(_f) if _dir: os.makedirs(_dir, exist_ok=True) with open(_f, 'wb') as fd: # Change output filename as needed fd.write(file_bytes) return True else: zwarn(f"Get file failed with status code: {response.status_code}") return False except Exception as e: zwarn(f"Request failed: {e}") return False # -- files = {} for file in self.state.downloaded_file_path: if not os.path.exists(file): fres = _get_file(file) files[file] = f"Get[res={fres}]" else: files[file] = "Exist" zlog(f"Sync files: {files}") def screenshot_mode(self, flag=None): old_mode = self.state.curr_screenshot_mode new_mode = old_mode if flag is not None: # set as flag self.state.curr_screenshot_mode = flag return old_mode, new_mode