| | """Script to run end-to-end evaluation on the benchmark. |
| | Utils and basic architecture credit to https://github.com/web-arena-x/webarena/blob/main/run.py. |
| | """ |
| |
|
| | import argparse |
| | import datetime |
| | import json |
| | import logging |
| | import os |
| | import sys |
| | import math |
| | import ast |
| | import time |
| |
|
| | import backoff |
| | import httpx |
| | from openai import APIConnectionError, APIError, OpenAI, RateLimitError |
| | from requests.exceptions import SSLError |
| | from tqdm import tqdm |
| |
|
| |
|
| | |
| | sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) |
| |
|
| | import lib_run_single |
| | from desktop_env.desktop_env import MAX_RETRIES, DesktopEnv as DesktopEnvBase |
| | from mm_agents.autoglm import AutoGLMAgent |
| | from typing import Optional, Dict, Any |
| | from multiprocessing import Pool |
| |
|
| | |
| | logger = logging.getLogger() |
| | logger.setLevel(logging.DEBUG) |
| |
|
| | datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S") |
| |
|
| | file_handler = logging.FileHandler(os.path.join("logs", "normal-{:}.log".format(datetime_str)), encoding="utf-8") |
| | debug_handler = logging.FileHandler(os.path.join("logs", "debug-{:}.log".format(datetime_str)), encoding="utf-8") |
| | stdout_handler = logging.StreamHandler(sys.stdout) |
| | sdebug_handler = logging.FileHandler(os.path.join("logs", "sdebug-{:}.log".format(datetime_str)), encoding="utf-8") |
| |
|
| | file_handler.setLevel(logging.INFO) |
| | debug_handler.setLevel(logging.DEBUG) |
| | stdout_handler.setLevel(logging.INFO) |
| | sdebug_handler.setLevel(logging.DEBUG) |
| |
|
| | formatter = logging.Formatter( |
| | fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s" |
| | ) |
| | file_handler.setFormatter(formatter) |
| | debug_handler.setFormatter(formatter) |
| | stdout_handler.setFormatter(formatter) |
| | sdebug_handler.setFormatter(formatter) |
| |
|
| | stdout_handler.addFilter(logging.Filter("desktopenv")) |
| | sdebug_handler.addFilter(logging.Filter("desktopenv")) |
| |
|
| | logger.addHandler(file_handler) |
| | logger.addHandler(debug_handler) |
| | logger.addHandler(stdout_handler) |
| | logger.addHandler(sdebug_handler) |
| | |
| |
|
| | logger = logging.getLogger("desktopenv.experiment") |
| |
|
| |
|
| | def config() -> argparse.Namespace: |
| | parser = argparse.ArgumentParser(description="Run end-to-end evaluation on the benchmark") |
| |
|
| | |
| | parser.add_argument("--path_to_vm", type=str) |
| | parser.add_argument( |
| | "--provider_name", |
| | type=str, |
| | default="docker", |
| | help="Virtualization provider (vmware, docker, aws, azure, gcp, virtualbox)", |
| | ) |
| | parser.add_argument("--headless", action="store_true", default=True, help="Run in headless machine") |
| | parser.add_argument("--action_space", type=str, default="autoglm_computer_use", help="Action type") |
| | parser.add_argument( |
| | "--observation_type", |
| | choices=["screenshot", "a11y_tree", "screenshot_a11y_tree", "som"], |
| | default="a11y_tree", |
| | help="Observation type", |
| | ) |
| | parser.add_argument("--screen_width", type=int, default=1920) |
| | parser.add_argument("--screen_height", type=int, default=1080) |
| | parser.add_argument("--sleep_after_execution", type=float, default=1.0) |
| | parser.add_argument("--max_steps", type=int, default=50) |
| |
|
| | |
| | parser.add_argument("--max_trajectory_length", type=int, default=3) |
| | parser.add_argument("--test_config_base_dir", type=str, default="evaluation_examples") |
| |
|
| | |
| | parser.add_argument("--model", type=str, default="autoglm-os") |
| | parser.add_argument("--temperature", type=float, default=0.4) |
| | parser.add_argument("--top_p", type=float, default=0.5) |
| | parser.add_argument("--max_tokens", type=int, default=4096) |
| | parser.add_argument("--stop_token", type=str, default=None) |
| |
|
| | |
| | parser.add_argument("--domain", type=str, default="all") |
| | parser.add_argument("--test_all_meta_path", type=str, default="evaluation_examples/test_nogdrive.json") |
| |
|
| | |
| | parser.add_argument( |
| | "--region", type=str, default="us-east-1", help="AWS region for the VM" |
| | ) |
| | parser.add_argument("--client_password", type=str, default="", help="Client password") |
| |
|
| | |
| | parser.add_argument("--result_dir", type=str, default="./results") |
| | |
| | |
| | parser.add_argument("--num_workers", type=int, default=20, help="Number of parallel workers") |
| | args = parser.parse_args() |
| |
|
| | return args |
| |
|
| |
|
| | class DesktopEnv(DesktopEnvBase): |
| | def step(self, action, pause=2): |
| | self._step_no += 1 |
| | self.action_history.append(action) |
| | |
| | |
| | self.is_environment_used = True |
| |
|
| | reward = 0 |
| | done = False |
| | info = {} |
| | logger.info(f"Step {self._step_no} in trajectory {self._traj_no} with action: {action}") |
| |
|
| | |
| | if action in ['WAIT', 'FAIL', 'DONE']: |
| | if action == 'WAIT': |
| | time.sleep(pause) |
| | exe_result = 'Wait ' + str(pause) + ' seconds' |
| | elif action == 'FAIL': |
| | done = True |
| | info = {"fail": True} |
| | exe_result = 'Finish: fail' |
| | elif action == 'DONE': |
| | done = True |
| | info = {"done": True} |
| | exe_result = 'Finish: success' |
| | elif type(action) == dict: |
| | if action['action_type'] == 'OPEN_APP': |
| | self.setup_controller._launch_setup(action['parameters']['launch_app_command'], shell=True) |
| | exe_result = 'Open ' + action['parameters']['app_name'] |
| | elif action['action_type'] == 'OPEN_CHROME_TAB': |
| | self.setup_controller._chrome_open_tabs_setup(action['parameters']['urls_to_open']) |
| | exe_result = 'Open ' + str(action['parameters']['urls_to_open']) + ' in Chrome successfully' |
| | else: |
| | |
| | result = self.controller.execute_python_command(action) |
| | try: |
| | if result['error']: |
| | exe_result = result['error'].strip() |
| | else: |
| | exe_result = result['output'].strip() |
| | except Exception as e: |
| | exe_result = 'Error Action: ' + action |
| | logger.error(f"Error executing action: {e}") |
| |
|
| | time.sleep(pause) |
| | observation = self._get_obs() |
| | observation['exe_result'] = exe_result |
| | |
| | return observation, reward, done, info |
| |
|
| | def reset(self, task_config: Optional[Dict[str, Any]] = None, seed=None, options=None) -> Dict[str, Any]: |
| | |
| | logger.info("Resetting environment...") |
| | logger.info("Switching task...") |
| | logger.info("Setting counters...") |
| | self._traj_no += 1 |
| | self._step_no = 0 |
| | self.action_history.clear() |
| |
|
| | for attempt in range(MAX_RETRIES): |
| | |
| | |
| | |
| | |
| | if task_config is not None: |
| | |
| | task_use_proxy = task_config.get("proxy", False) and self.enable_proxy |
| | if not self.enable_proxy and task_config.get("proxy", False): |
| | logger.info("Task requires proxy but proxy is disabled at system level, ignoring proxy requirement.") |
| | |
| | if task_use_proxy != self.current_use_proxy: |
| | |
| | self.current_use_proxy = task_use_proxy |
| | |
| | if self.is_environment_used: |
| | logger.info("Environment has been used, reverting to snapshot {}...".format(self.snapshot_name)) |
| | self._revert_to_snapshot() |
| | logger.info("Starting emulator...") |
| | self._start_emulator() |
| | logger.info("Emulator started.") |
| | |
| | self.is_environment_used = False |
| | else: |
| | logger.info("Environment is clean, skipping snapshot revert (provider: {}).".format(self.provider_name)) |
| |
|
| | if task_config is not None: |
| | if task_config.get("proxy", False) and self.enable_proxy: |
| | |
| | self.setup_controller._proxy_setup(self.client_password) |
| | self._set_task_info(task_config) |
| | self.setup_controller.reset_cache_dir(self.cache_dir) |
| | logger.info("Setting up environment...") |
| | success = self.setup_controller.setup(self.config, task_config.get("proxy", False) and self.enable_proxy) |
| | if success: |
| | |
| | if self.config: |
| | self.is_environment_used = True |
| | break |
| | else: |
| | logger.error( |
| | "Environment setup failed, retrying (%d/%d)...", |
| | attempt + 1, |
| | MAX_RETRIES, |
| | ) |
| | time.sleep(5) |
| | else: |
| | break |
| | |
| | logger.info("Environment setup complete.") |
| |
|
| | |
| | import mm_agents.autoglm |
| | tool_dir = os.path.join(os.path.dirname(mm_agents.autoglm.__file__), 'tools', 'package') |
| | for file in os.listdir(tool_dir): |
| | if os.path.isdir(os.path.join(tool_dir, file)): |
| | continue |
| | self.setup_controller._upload_file_setup([{ |
| | "local_path": os.path.join(tool_dir, file), |
| | "path": os.path.join('~', file) |
| | }]) |
| |
|
| | |
| | self.setup_controller._launch_setup('soffice --accept="socket,host=localhost,port=2002;urp;" --norestore --nologo --nodefault', shell=True) |
| | time.sleep(5) |
| |
|
| | observation = self._get_obs() |
| | return observation |
| |
|
| | def get_current_apps(self): |
| | apps_code = r"""import subprocess; |
| | command = "wmctrl -xl"; |
| | apps = subprocess.run(command, shell=True, capture_output=True, text=True).stdout.strip().split('\n'); |
| | print(apps);""" |
| | window_code = r"""import subprocess; |
| | command = "wmctrl -a :ACTIVE: -v 2>&1 | grep 'Using window' | awk '{print $3}'"; |
| | window_id = subprocess.run(command, shell=True, capture_output=True, text=True).stdout.strip(); |
| | print(window_id);""" |
| |
|
| | apps = self.controller.execute_python_command(apps_code)['output'].strip() |
| | apps = ast.literal_eval(apps) |
| | app_list = {} |
| | |
| | for app in apps: |
| | parts = app.split(maxsplit=4) |
| | if len(parts) < 4: |
| | continue |
| | if parts[1] != '0': |
| | continue |
| | window_id = parts[0] |
| | app_name = '.'.join(parts[2].split('.')[-(math.ceil(parts[2].count('.') / 2)):]) |
| | title = parts[3] |
| | app_list[window_id] = { |
| | 'app_name': app_name, |
| | 'title': title |
| | } |
| | |
| | cur_id = self.controller.execute_python_command(window_code)['output'].strip() |
| |
|
| | return app_list, cur_id |
| |
|
| | def maximize_window(self): |
| | window_state = r"""import subprocess; |
| | command = "xprop -id $(xprop -root _NET_ACTIVE_WINDOW | awk -F' ' '{print $5}') _NET_WM_STATE" |
| | output = subprocess.run(command, shell=True, capture_output=True, text=True).stdout.strip(); |
| | print(output);""" |
| | for _ in range(5): |
| | try: |
| | self.setup_controller._launch_setup('wmctrl -r :ACTIVE: -b add,maximized_vert,maximized_horz', shell=True) |
| | time.sleep(2) |
| | output = self.controller.execute_python_command(window_state)['output'].strip() |
| | if '_NET_WM_STATE_FOCUSED' not in output or '_NET_WM_STATE_SKIP_TASKBAR' in output or '_NET_WM_STATE_MODAL' in output or '_NET_WM_STATE_MAXIMIZED' in output: |
| | return |
| | except Exception as e: |
| | logger.error(f"Failed to maximize window: {e}") |
| | time.sleep(1) |
| |
|
| | def _get_obs(self): |
| | tool_list = { |
| | "libreoffice_calc": "CalcTools", |
| | "libreoffice_impress": "ImpressTools", |
| | "libreoffice_writer": "WriterTools", |
| | "code": "CodeTools", |
| | "vlc": "VLCTools", |
| | "google_chrome": "BrowserTools" |
| | } |
| | |
| | self.maximize_window() |
| | |
| | for i in range(3): |
| | try: |
| | app_list, cur_id = self.get_current_apps() |
| | except Exception as e: |
| | if i == 2: |
| | raise e |
| | logger.error(f"Failed to get current apps: {e}") |
| | time.sleep(1) |
| | |
| | if cur_id in app_list: |
| | cur_app = app_list[cur_id]['app_name'] |
| |
|
| | tool_name = cur_app.strip().lower().replace('-', '_') |
| | if tool_name in tool_list: |
| | class_name = tool_list[tool_name] |
| | command = f"from {tool_name} import *; " |
| | command += f"{class_name}.env_info(); " |
| | command += f"{class_name}.print_result();" |
| | app_info = self.controller.execute_python_command(command)['output'].strip() |
| | else: |
| | app_info = None |
| | else: |
| | cur_app = None |
| | app_info = None |
| | |
| | tree = self.controller.get_accessibility_tree() |
| | screenshot = self.controller.get_screenshot() |
| | if screenshot is None: |
| | logger.error("Failed to get screenshot.") |
| | screenshot = b'' |
| |
|
| | return { |
| | "screenshot": screenshot, |
| | "accessibility_tree": tree, |
| | "instruction": self.instruction, |
| | "apps": app_list, |
| | "cur_window_id": cur_id, |
| | "cur_app": cur_app, |
| | "app_info": app_info, |
| | } |
| |
|
| | def get_unfinished(action_space, use_model, observation_type, result_dir, total_file_json): |
| | target_dir = os.path.join(result_dir, action_space, observation_type, use_model) |
| |
|
| | if not os.path.exists(target_dir): |
| | return total_file_json |
| |
|
| | finished = {} |
| | for domain in os.listdir(target_dir): |
| | finished[domain] = [] |
| | domain_path = os.path.join(target_dir, domain) |
| | if os.path.isdir(domain_path): |
| | for example_id in os.listdir(domain_path): |
| | if example_id == "onboard": |
| | continue |
| | example_path = os.path.join(domain_path, example_id) |
| | if os.path.isdir(example_path): |
| | if "result.txt" not in os.listdir(example_path): |
| | |
| | for file in os.listdir(example_path): |
| | os.remove(os.path.join(example_path, file)) |
| | else: |
| | finished[domain].append(example_id) |
| |
|
| | if not finished: |
| | return total_file_json |
| |
|
| | for domain, examples in finished.items(): |
| | if domain in total_file_json: |
| | total_file_json[domain] = [x for x in total_file_json[domain] if x not in examples] |
| |
|
| | return total_file_json |
| |
|
| |
|
| | def get_result(action_space, use_model, observation_type, result_dir, total_file_json): |
| | target_dir = os.path.join(result_dir, action_space, observation_type, use_model) |
| | if not os.path.exists(target_dir): |
| | print("New experiment, no result yet.") |
| | return None |
| |
|
| | all_result = [] |
| |
|
| | for domain in os.listdir(target_dir): |
| | domain_path = os.path.join(target_dir, domain) |
| | if os.path.isdir(domain_path): |
| | for example_id in os.listdir(domain_path): |
| | example_path = os.path.join(domain_path, example_id) |
| | if os.path.isdir(example_path): |
| | if "result.txt" in os.listdir(example_path): |
| | |
| | try: |
| | all_result.append(float(open(os.path.join(example_path, "result.txt"), "r").read())) |
| | except: |
| | all_result.append(0.0) |
| |
|
| | if not all_result: |
| | print("New experiment, no result yet.") |
| | return None |
| | else: |
| | print("Current Success Rate:", sum(all_result) / len(all_result) * 100, "%") |
| | return all_result |
| |
|
| | def _worker_run(task): |
| | import json, os, datetime, logging, httpx, backoff |
| | from openai import OpenAI, RateLimitError, APIConnectionError |
| | from types import SimpleNamespace |
| | domain, example_id, args = task |
| | logger = logging.getLogger("desktopenv.experiment") |
| | try: |
| | config_file = os.path.join(args.test_config_base_dir, f"examples/{domain}/{example_id}.json") |
| | with open(config_file, "r", encoding="utf-8") as f: |
| | example = json.load(f) |
| | instruction = example["instruction"] |
| |
|
| | @backoff.on_exception(backoff.constant, (RateLimitError, APIConnectionError), interval=0.1) |
| | def call_llm(messages): |
| | logger.info("Calling LLM...") |
| | |
| | engine = OpenAI(timeout=60.0) |
| | response = engine.chat.completions.create( |
| | model=args.model, |
| | messages=messages, |
| | max_tokens=args.max_tokens, |
| | temperature=args.temperature, |
| | top_p=args.top_p, |
| | ) |
| | logger.info("LLM called successfully.") |
| | return response.choices[0].message.content |
| |
|
| | env = DesktopEnv( |
| | provider_name=args.provider_name, |
| | region=args.region, |
| | client_password=args.client_password, |
| | path_to_vm=args.path_to_vm, |
| | action_space=args.action_space, |
| | screen_size=(args.screen_width, args.screen_height), |
| | headless=args.headless, |
| | os_type="Ubuntu", |
| | require_a11y_tree=args.observation_type in ["a11y_tree", "screenshot_a11y_tree", "som"], |
| | ) |
| | agent = AutoGLMAgent( |
| | action_space=args.action_space, |
| | observation_type=args.observation_type, |
| | max_trajectory_length=args.max_trajectory_length, |
| | client_password=args.client_password, |
| | gen_func=call_llm, |
| | ) |
| |
|
| | example_result_dir = os.path.join( |
| | args.result_dir, |
| | args.action_space, |
| | args.observation_type, |
| | args.model, |
| | domain, |
| | example_id, |
| | ) |
| | os.makedirs(example_result_dir, exist_ok=True) |
| |
|
| | local_scores = [] |
| | try: |
| | lib_run_single.run_single_example_autoglm( |
| | agent, |
| | env, |
| | example, |
| | args.max_steps, |
| | instruction, |
| | args, |
| | example_result_dir, |
| | local_scores, |
| | ) |
| | except Exception as e: |
| | logger.error(f"[并发任务异常] {domain}/{example_id}: {e}") |
| | if hasattr(env, "controller") and env.controller is not None: |
| | try: |
| | env.controller.end_recording(os.path.join(example_result_dir, "recording.mp4")) |
| | except Exception: |
| | pass |
| | with open(os.path.join(example_result_dir, "traj.jsonl"), "a") as f: |
| | f.write(json.dumps({"Error": f"Exception in {domain}/{example_id}: {str(e)}"}) + "\n") |
| | finally: |
| | try: |
| | env.close() |
| | except Exception: |
| | pass |
| |
|
| | score = None |
| | result_path = os.path.join(example_result_dir, "result.txt") |
| | if os.path.exists(result_path): |
| | try: |
| | with open(result_path, "r") as rf: |
| | score = float(rf.read().strip()) |
| | except Exception: |
| | score = 0.0 |
| | else: |
| | score = 0.0 |
| | logger.info(f"[Finish] {domain}/{example_id} score={score}") |
| | return (domain, example_id, score) |
| | except Exception as e: |
| | logger = logging.getLogger("desktopenv.experiment") |
| | logger.error(f"[Initializing Fail] {domain}/{example_id}: {e}") |
| | return (domain, example_id, 0.0) |
| |
|
| | def test_parallel(args: argparse.Namespace, test_all_meta: dict): |
| | from tqdm import tqdm |
| | tasks = [] |
| | for domain in test_all_meta: |
| | for example_id in test_all_meta[domain]: |
| | tasks.append((domain, example_id, args)) |
| | if not tasks: |
| | logger.info("No pending tasks") |
| | return |
| | logger.info(f"Starting parallel execution: {args.num_workers} processes, {len(tasks)} tasks total") |
| |
|
| | results = [] |
| | with Pool(processes=args.num_workers) as pool: |
| | for res in tqdm(pool.imap_unordered(_worker_run, tasks), total=len(tasks), desc="Parallel execution"): |
| | results.append(res) |
| |
|
| | scores = [s for (_, _, s) in results if s is not None] |
| | if scores: |
| | avg = sum(scores) / len(scores) |
| | logger.info(f"Parallel execution completed. Average score: {avg}") |
| | else: |
| | logger.info("No scores obtained.") |
| |
|
| | if __name__ == "__main__": |
| | |
| | os.environ["TOKENIZERS_PARALLELISM"] = "false" |
| | args = config() |
| | if args.client_password == "": |
| | if args.provider_name == "aws": |
| | args.client_password = "osworld-public-evaluation" |
| | else: |
| | args.client_password = "password" |
| | else: |
| | args.client_password = args.client_password |
| |
|
| | |
| | path_to_args = os.path.join( |
| | args.result_dir, |
| | args.action_space, |
| | args.observation_type, |
| | args.model, |
| | "args.json", |
| | ) |
| | os.makedirs(os.path.dirname(path_to_args), exist_ok=True) |
| | with open(path_to_args, "w", encoding="utf-8") as f: |
| | json.dump(vars(args), f, indent=4) |
| |
|
| | with open(args.test_all_meta_path, "r", encoding="utf-8") as f: |
| | test_all_meta = json.load(f) |
| |
|
| | if args.domain != "all": |
| | test_all_meta = {args.domain: test_all_meta[args.domain]} |
| |
|
| | test_file_list = get_unfinished( |
| | args.action_space, |
| | args.model, |
| | args.observation_type, |
| | args.result_dir, |
| | test_all_meta, |
| | ) |
| | left_info = "" |
| | for domain in test_file_list: |
| | left_info += f"{domain}: {len(test_file_list[domain])}\n" |
| | logger.info(f"Left tasks:\n{left_info}") |
| |
|
| | get_result( |
| | args.action_space, |
| | args.model, |
| | args.observation_type, |
| | args.result_dir, |
| | test_all_meta, |
| | ) |
| | test_parallel(args, test_file_list) |
| |
|