| |
| |
| """Gradio frontend for remotely controlling an ECS-hosted OpAgent worker. |
| |
| Environment variables: |
| OPAGENT_API_BASE_URL ECS API base URL, e.g. https://your-ecs-host:8000 |
| OPAGENT_API_KEY Optional API key sent as X-API-Key |
| """ |
|
|
| from __future__ import annotations |
|
|
| from functools import lru_cache |
| import json |
| import os |
| import time |
| from io import BytesIO |
| from typing import Any, Dict, Generator, List, Optional, Tuple |
| from urllib.parse import urlparse, urlunparse |
|
|
| import gradio as gr |
| import requests |
| from PIL import Image |
| import subprocess |
| import sys |
| from alibabacloud_ecs20140526.client import Client as EcsClient |
| from alibabacloud_tea_openapi import models as open_api_models |
| from alibabacloud_ecs20140526 import models as ecs_models |
|
|
| DEFAULT_API_BASE_URL = os.getenv("OPAGENT_API_BASE_URL") |
| DEFAULT_API_KEY = os.getenv("OPAGENT_API_KEY") |
| DEFAULT_POLL_INTERVAL = float(os.getenv("OPAGENT_POLL_INTERVAL", "5")) |
| DEFAULT_TIMEOUT = float(os.getenv("OPAGENT_HTTP_TIMEOUT", "300")) |
| DEFAULT_SUBMIT_TIMEOUT = float(os.getenv("OPAGENT_SUBMIT_TIMEOUT", str(DEFAULT_TIMEOUT))) |
| DEFAULT_STATUS_TIMEOUT = float(os.getenv("OPAGENT_STATUS_TIMEOUT", str(DEFAULT_TIMEOUT))) |
| DEFAULT_IMAGE_TIMEOUT = float(os.getenv("OPAGENT_IMAGE_TIMEOUT", str(DEFAULT_TIMEOUT))) |
| DEFAULT_START_URL = os.getenv("OPAGENT_DEFAULT_START_URL", "https://www.amazon.com") |
| DEFAULT_QUERY = os.getenv("OPAGENT_DEFAULT_QUERY", "find a physical PS5 copy of NBA 2K25") |
| USE_SYSTEM_PROXY = os.getenv("OPAGENT_USE_SYSTEM_PROXY", "0") == "1" |
| EMPTY_HISTORY_MARKDOWN = "### Step History\n_No steps yet._" |
|
|
|
|
| @lru_cache(maxsize=1) |
| def get_http_session() -> requests.Session: |
| session = requests.Session() |
| if not USE_SYSTEM_PROXY: |
| session.trust_env = False |
| session.proxies.clear() |
| return session |
|
|
|
|
| def build_headers(api_key: str) -> Dict[str, str]: |
| headers = {"Content-Type": "application/json"} |
| if api_key.strip(): |
| headers["X-API-Key"] = api_key.strip() |
| return headers |
|
|
|
|
| def normalize_base_url(base_url: str) -> str: |
| base_url = base_url.strip().rstrip("/") |
| if not base_url: |
| raise ValueError("API base URL cannot be empty") |
| return base_url |
|
|
|
|
| def fetch_image(image_url: Optional[str], api_key: str) -> Optional[Image.Image]: |
| if not image_url: |
| return None |
|
|
| response = get_http_session().get( |
| image_url, |
| headers={"X-API-Key": api_key.strip()} if api_key.strip() else None, |
| timeout=DEFAULT_IMAGE_TIMEOUT, |
| ) |
| response.raise_for_status() |
| return Image.open(BytesIO(response.content)).convert("RGB") |
|
|
|
|
| def format_status(task: Dict[str, Any]) -> str: |
| lines = [ |
| f"### Task Status: {task.get('status', 'unknown')}", |
| f"- Current Step: {task.get('current_step', 0)} / {task.get('max_steps', 0)}", |
| f"- Current Page: {task.get('current_url') or '-'}", |
| ] |
|
|
| latest_action = task.get("latest_action") |
| if latest_action: |
| lines.append("- Latest Action:") |
| lines.append(f"\n```json\n{json.dumps(latest_action, ensure_ascii=False)}\n```") |
|
|
| latest_think = task.get("latest_think") |
| if latest_think: |
| preview = latest_think[:500] + ("..." if len(latest_think) > 500 else "") |
| lines.append("- Latest Reasoning:") |
| lines.append(f"\n> {preview}") |
|
|
| final_answer = task.get("final_answer") |
| if final_answer: |
| lines.append("\n### Final Answer") |
| lines.append(final_answer) |
|
|
| error = task.get("error") |
| if error: |
| lines.append("\n### Error") |
| lines.append(error) |
|
|
| events = task.get("events") or [] |
| if events: |
| lines.append("\n### Recent Logs") |
| lines.extend([f"- {event}" for event in events[-12:]]) |
|
|
| return "\n".join(lines) |
|
|
|
|
| def fetch_trajectory(api_base_url: str, api_key: str, task_id: str) -> Dict[str, Any]: |
| response = get_http_session().get( |
| f"{normalize_base_url(api_base_url)}/tasks/{task_id}/trajectory", |
| headers=build_headers(api_key), |
| timeout=DEFAULT_STATUS_TIMEOUT, |
| ) |
| response.raise_for_status() |
| return response.json() |
|
|
|
|
| def build_artifact_url(trajectory_url: Optional[str], artifact_path: Optional[str]) -> Optional[str]: |
| if not trajectory_url or not artifact_path: |
| return None |
|
|
| parsed = urlparse(trajectory_url) |
| trajectory_dir = parsed.path.rsplit("/", 1)[0] |
| artifact = os.path.basename(artifact_path) |
| artifact_dir = os.path.basename(os.path.dirname(artifact_path)) |
| artifact_url_path = f"{trajectory_dir}/{artifact_dir}/{artifact}" |
| return urlunparse(parsed._replace(path=artifact_url_path, query="", fragment="")) |
|
|
|
|
| def format_history_markdown(steps: List[Dict[str, Any]]) -> str: |
| if not steps: |
| return EMPTY_HISTORY_MARKDOWN |
|
|
| lines = ["### Step History"] |
| for step in steps: |
| step_no = step.get("step", "-") |
| action = step.get("action_type", "unknown") |
| url = step.get("url") or "-" |
| params = step.get("params") or {} |
| error = step.get("error") or "" |
|
|
| lines.append(f"#### Step {step_no} · {action}") |
| lines.append(f"- URL: {url}") |
| lines.append(f"- Params: `{json.dumps(params, ensure_ascii=False)}`") |
| if error: |
| lines.append(f"- Error: {error}") |
|
|
| return "\n".join(lines) |
|
|
|
|
| def build_history_from_trajectory( |
| api_key: str, |
| trajectory_url: Optional[str], |
| trajectory: Dict[str, Any], |
| ) -> Tuple[List[Tuple[Image.Image, str]], str]: |
| steps = trajectory.get("steps") or [] |
| gallery_items: List[Tuple[Image.Image, str]] = [] |
|
|
| for step in steps: |
| image_url = build_artifact_url( |
| trajectory_url, |
| step.get("annotated_screenshot") or step.get("screenshot"), |
| ) |
| if not image_url: |
| continue |
| try: |
| image = fetch_image(image_url, api_key) |
| except Exception: |
| continue |
| if image is None: |
| continue |
|
|
| caption = f"Step {step.get('step', '-')}: {step.get('action_type', 'unknown')}" |
| gallery_items.append((image, caption)) |
|
|
| return gallery_items, format_history_markdown(steps) |
|
|
|
|
| def submit_task( |
| api_base_url: str, |
| api_key: str, |
| start_url: str, |
| query: str, |
| max_steps: int, |
| ) -> Dict[str, Any]: |
| payload = { |
| "url": start_url.strip() or None, |
| "query": query.strip(), |
| "max_steps": int(max_steps), |
| } |
| response = get_http_session().post( |
| f"{normalize_base_url(api_base_url)}/tasks", |
| headers=build_headers(api_key), |
| json=payload, |
| timeout=DEFAULT_SUBMIT_TIMEOUT, |
| ) |
| response.raise_for_status() |
| return response.json() |
|
|
|
|
| def get_task_status(api_base_url: str, api_key: str, task_id: str) -> Dict[str, Any]: |
| response = get_http_session().get( |
| f"{normalize_base_url(api_base_url)}/tasks/{task_id}", |
| headers=build_headers(api_key), |
| timeout=DEFAULT_STATUS_TIMEOUT, |
| ) |
| response.raise_for_status() |
| return response.json() |
|
|
|
|
| def send_heartbeat(api_base_url: str, api_key: str, task_id: str) -> None: |
| """Notify the ECS server that the frontend is still alive. |
| |
| Swallows all errors so a transient network blip never kills the UI loop. |
| """ |
| try: |
| get_http_session().post( |
| f"{normalize_base_url(api_base_url)}/tasks/{task_id}/heartbeat", |
| headers=build_headers(api_key), |
| timeout=5, |
| ) |
| except Exception: |
| pass |
|
|
|
|
| def cancel_remote_task(api_base_url: str, api_key: str, task_state: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]: |
| task_id = (task_state or {}).get("task_id") |
| if not task_id: |
| raise gr.Error("There is no running task to cancel") |
|
|
| response = get_http_session().post( |
| f"{normalize_base_url(api_base_url)}/tasks/{task_id}/cancel", |
| headers=build_headers(api_key), |
| timeout=DEFAULT_STATUS_TIMEOUT, |
| ) |
| response.raise_for_status() |
| cancel_result = response.json() |
|
|
| task = get_task_status(api_base_url, api_key, task_id) |
| task.setdefault("events", []) |
| if cancel_result.get("message"): |
| task["events"].append(cancel_result["message"]) |
| return format_status(task), task |
|
|
|
|
| def run_remote_task( |
| api_base_url: str, |
| api_key: str, |
| start_url: str, |
| query: str, |
| max_steps: int, |
| poll_interval: float, |
| ) -> Generator[Tuple[str, Optional[Image.Image], str, Dict[str, Any], List[Tuple[Image.Image, str]], str], None, None]: |
| if not query.strip(): |
| raise gr.Error("Task description cannot be empty") |
|
|
| try: |
| created = submit_task(api_base_url, api_key, start_url, query, max_steps) |
| except Exception as exc: |
| raise gr.Error(f"Failed to submit task: {exc}") from exc |
|
|
| task_id = created["task_id"] |
| intro = f"### Task Submitted\n- Task ID: `{task_id}`\n- Status URL: {created.get('status_url', '-')}" |
| yield intro, None, "", {"task_id": task_id, "status": "queued"}, [], EMPTY_HISTORY_MARKDOWN |
|
|
| last_image_url = None |
| last_image = None |
| history_gallery: List[Tuple[Image.Image, str]] = [] |
| recorded_steps = set() |
| history_steps: List[Dict[str, Any]] = [] |
|
|
| while True: |
| try: |
| task = get_task_status(api_base_url, api_key, task_id) |
| except Exception as exc: |
| raise gr.Error(f"Failed to query task status: {exc}") from exc |
|
|
| |
| |
| send_heartbeat(api_base_url, api_key, task_id) |
|
|
| image_url = task.get("display_image_url") or task.get("latest_annotated_screenshot_url") or task.get("latest_screenshot_url") |
| if image_url and image_url != last_image_url: |
| try: |
| last_image = fetch_image(image_url, api_key) |
| last_image_url = image_url |
| except Exception: |
| pass |
|
|
| current_step = int(task.get("current_step") or 0) |
| latest_action = task.get("latest_action") or {} |
| if current_step > 0 and latest_action and current_step not in recorded_steps: |
| history_steps.append( |
| { |
| "step": current_step, |
| "action_type": latest_action.get("action_type", "unknown"), |
| "url": task.get("current_url") or "-", |
| "params": latest_action, |
| "error": task.get("error", ""), |
| } |
| ) |
| if last_image is not None: |
| history_gallery.append( |
| (last_image.copy(), f"Step {current_step}: {latest_action.get('action_type', 'unknown')}") |
| ) |
| recorded_steps.add(current_step) |
|
|
| final_answer = task.get("final_answer", "") |
| current_history_markdown = format_history_markdown(history_steps) |
|
|
| if task.get("status") in {"done", "error", "cancelled"}: |
| try: |
| trajectory = fetch_trajectory(api_base_url, api_key, task_id) |
| history_gallery, current_history_markdown = build_history_from_trajectory( |
| api_key, |
| task.get("trajectory_url"), |
| trajectory, |
| ) |
| except Exception: |
| pass |
|
|
| yield format_status(task), last_image, final_answer, task, history_gallery, current_history_markdown |
|
|
| if task.get("status") in {"done", "error", "cancelled"}: |
| break |
|
|
| time.sleep(max(0.5, float(poll_interval))) |
|
|
|
|
| def build_demo() -> gr.Blocks: |
| with gr.Blocks(title="OpAgent Remote Demo") as demo: |
| gr.Markdown("# OpAgent Remote Demo\nHuggingface frontend for remote execution on ECS.") |
|
|
| |
| |
| api_base_url = gr.State(DEFAULT_API_BASE_URL) |
| api_key = gr.State(DEFAULT_API_KEY) |
|
|
| with gr.Row(): |
| start_url = gr.Textbox(label="Start URL", value=DEFAULT_START_URL, placeholder="https://www.bilibili.com") |
| max_steps = gr.Slider(label="Max Steps", minimum=5, maximum=100, value=30, step=1) |
| poll_interval = gr.Slider(label="Polling Interval (seconds)", minimum=1, maximum=10, value=DEFAULT_POLL_INTERVAL, step=1) |
|
|
| query = gr.Textbox(label="Task Description", value=DEFAULT_QUERY, lines=4, placeholder="Describe what you want the browser agent to do") |
|
|
| with gr.Row(): |
| submit_button = gr.Button("Submit Task", variant="primary") |
| cancel_button = gr.Button("Stop Task", variant="stop") |
|
|
| with gr.Row(): |
| status_markdown = gr.Markdown(label="Status") |
| latest_image = gr.Image(label="Execution Screenshot", type="pil") |
|
|
| final_answer = gr.Textbox(label="Final Answer", lines=6) |
| task_state = gr.State({}) |
|
|
| gr.Markdown("### Historical Steps") |
| history_gallery = gr.Gallery(label="Step Screenshots", columns=4, height=360, preview=True) |
| history_markdown = gr.Markdown(EMPTY_HISTORY_MARKDOWN) |
|
|
| submit_button.click( |
| fn=run_remote_task, |
| inputs=[api_base_url, api_key, start_url, query, max_steps, poll_interval], |
| outputs=[status_markdown, latest_image, final_answer, task_state, history_gallery, history_markdown], |
| ) |
|
|
| cancel_button.click( |
| fn=cancel_remote_task, |
| inputs=[api_base_url, api_key, task_state], |
| outputs=[status_markdown, task_state], |
| ) |
|
|
| return demo |
|
|
|
|
| def get_my_public_ip(): |
| try: |
| |
| for url in ['https://ifconfig.me', 'https://icanhazip.com']: |
| ip = subprocess.check_output(f"curl -s -m 5 {url}", shell=True, text=True).strip() |
| if ip: return ip |
| except: |
| pass |
| return None |
|
|
| def update_security_group(ip): |
| ak = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID') |
| sk = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET') |
| sg_id = os.environ.get('SG_ID') |
| |
| |
| config = open_api_models.Config(access_key_id=ak, access_key_secret=sk, endpoint='ecs.cn-hangzhou.aliyuncs.com') |
| client = EcsClient(config) |
| |
| print(f"[*] 自动配置安全组,允许 IP: {ip}") |
| for port in [8000, 8080]: |
| try: |
| req = ecs_models.AuthorizeSecurityGroupRequest( |
| region_id="cn-hangzhou", |
| security_group_id=sg_id, |
| ip_protocol='tcp', |
| port_range=f'{port}/{port}', |
| source_cidr_ip=f'{ip}/32', |
| description='Auto-opened for HF Space' |
| ) |
| client.authorize_security_group(req) |
| print(f"[+] 端口 {port} 已对 {ip} 放行") |
| except Exception as e: |
| if 'RuleAlreadyExist' not in str(e): |
| print(f"[!] 规则配置异常: {e}") |
|
|
| def main() -> None: |
| my_ip = get_my_public_ip() |
| update_security_group(my_ip) |
| demo = build_demo() |
| demo.queue(default_concurrency_limit=1) |
| demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", "7860"))) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|