AUXteam's picture
Upload folder using huggingface_hub
00a91e8 verified
from app.core.config import settings
import gradio as gr
from fastapi import FastAPI
import argparse
import os
import sys
import tempfile
import threading
import json
import traceback
import concurrent.futures
import time
import base64
from run import webvoyager_run
from utils import generate_persona
import re
import logging
# Set up FastAPI for health checks
from app.main import app
@app.get("/health")
def health():
return {"status": "ok"}
def format_log_for_gradio(log_content):
"""
Formats the raw log content into a more readable format for the Gradio UI.
"""
try:
# The log content may or may not be a valid JSON string.
# It could be a simple string like "Observing...".
log_data = json.loads(log_content)
except (json.JSONDecodeError, TypeError):
return log_content
formatted_output = ""
if log_data.get('role') == 'user':
# This is the initial prompt
formatted_output += "--- Starting Task ---\n"
content = log_data['content'][0]['text']
task_match = re.search(r'Now given a task: (.*?)\s+Please interact with', content)
if task_match:
formatted_output += f"Task: {task_match.group(1)}\n"
# Check for image URL and replace it with a placeholder
if len(log_data['content']) > 1 and log_data['content'][1].get('type') == 'image_url':
formatted_output += "Processing screenshot...\n"
elif log_data.get('role') == 'assistant':
# This is the agent's response
content = log_data['content']
thought_match = re.search(r'Thought: (.*?)\nAction:', content, re.DOTALL)
action_match = re.search(r'Action: (.*)', content, re.DOTALL)
if thought_match:
formatted_output += f"Thought: {thought_match.group(1).strip()}\n"
if action_match:
action = action_match.group(1).strip()
# Make action more human-readable
action = action.replace("click", "Clicking element").replace("type", "Typing into element").replace("scroll", "Scrolling")
formatted_output += f"Action: {action}\n"
elif 'error' in log_data:
formatted_output += f"An error occurred: {log_data['error']}. Please check the container logs for more details."
return formatted_output
def run_script_for_gradio(url, task, use_persona, persona_criteria=None):
"""
A wrapper to run the webvoyager script for Gradio, capturing output and screenshots.
"""
with tempfile.TemporaryDirectory() as temp_dir:
task_file_path = os.path.join(temp_dir, 'task.jsonl')
with open(task_file_path, 'w') as f:
f.write(f'{{"id": "custom_task", "web": "{url}", "ques": "{task}"}}')
args = argparse.Namespace(
test_file=task_file_path,
max_iter=5,
api_key=os.environ.get("BLABLADOR_API_KEY"),
api_base_url="https://api.helmholtz-blablador.fz-juelich.de/v1",
api_model=settings.MODEL_LARGE,
output_dir=os.path.join(temp_dir, 'results'),
seed=None,
max_attached_imgs=1,
temperature=1.0,
download_dir=os.path.join(temp_dir, 'downloads'),
text_only=False,
headless=True,
save_accessibility_tree=False,
force_device_scale=False,
window_width=1024,
window_height=768,
fix_box_color=False
)
os.makedirs(args.output_dir, exist_ok=True)
os.makedirs(args.download_dir, exist_ok=True)
task_dir = os.path.join(args.output_dir, 'taskcustom_task')
os.makedirs(task_dir, exist_ok=True)
# Import run here to avoid circular dependency if any, but mainly to use its setup_logger
from run import setup_logger
setup_logger(task_dir)
full_log = ""
last_screenshot_html = "" # Keep track of the last image to ensure we always show something
debug_log = ""
raw_log_file_path = os.path.join(task_dir, "raw_log.txt")
# Ensure the raw_log file exists
with open(raw_log_file_path, "w") as f:
f.write("")
persona = None
if not use_persona: persona_criteria = None
if persona_criteria:
full_log += "--- Initializing TinyTroupe Persona ---\n"
yield last_screenshot_html, full_log, debug_log, "--- Initializing TinyTroupe Persona ---"
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(generate_persona, persona_criteria)
# Poll for log updates while persona is being generated
while not future.done():
try:
with open(os.path.join(task_dir, 'agent.log'), 'r') as f:
new_debug_log = f.read()
if new_debug_log != debug_log:
# Extract last few lines as status update
status_lines = [l for l in new_debug_log[len(debug_log):].split('\n') if l.strip()]
if status_lines:
last_status = status_lines[-1]
yield last_screenshot_html, full_log, new_debug_log, f"Status: {last_status}"
debug_log = new_debug_log
except Exception:
pass
time.sleep(1.0)
persona = future.result()
# Final debug log read after persona generation
try:
with open(os.path.join(task_dir, 'agent.log'), 'r') as f:
debug_log = f.read()
except Exception:
pass
if persona:
full_log += f"Persona generated: {persona.get('name', 'Unknown')}\n"
yield last_screenshot_html, full_log, debug_log, "Raw log will be available here."
else:
full_log += "Failed to generate persona. STOPPING execution as requested.\n"
yield last_screenshot_html, full_log, debug_log, "Task stopped due to persona generation failure."
return
try:
# We'll get real-time updates by iterating through the run function
with open(raw_log_file_path, "w") as raw_log_file:
for i, log_entry in enumerate(webvoyager_run(args, {"id": "custom_task", "web": url, "ques": task}, task_dir, persona=persona)):
raw_log_file.write(log_entry + '\n')
raw_log_file.flush()
try:
log_data = json.loads(log_entry)
if log_data.get("status"):
full_log += f"{log_data['status']}\n"
yield last_screenshot_html, full_log, debug_log, f"Log file: {raw_log_file_path}"
continue
except (json.JSONDecodeError, AttributeError):
pass # Not a status update, proceed as normal
formatted_log = format_log_for_gradio(log_entry)
if formatted_log:
full_log += f"--- Step {i+1} ---\n{formatted_log}\n"
# Read agent.log for debug info
try:
with open(os.path.join(task_dir, 'agent.log'), 'r') as f:
debug_log = f.read()
except FileNotFoundError:
debug_log = "agent.log not found."
current_screenshot = None
try:
# Get all png files starting with 'screenshot'
files = [f for f in os.listdir(task_dir) if f.startswith('screenshot') and f.endswith('.png')]
if files:
# Sort by number (e.g. screenshot1, screenshot1_action, screenshot2)
files.sort(key=lambda x: [int(c) if c.isdigit() else c for c in re.split(r'(\d+)', x)])
current_screenshot = os.path.join(task_dir, files[-1])
except Exception:
pass # If directory read fails momentarily, just skip update
if current_screenshot and os.path.exists(current_screenshot):
with open(current_screenshot, "rb") as img_file:
b64_data = base64.b64encode(img_file.read()).decode('utf-8')
last_screenshot_html = f"<img src='data:image/png;base64,{b64_data}' style='max-width: 100%;'>"
yield last_screenshot_html, full_log, debug_log, f"Log file: {raw_log_file_path}"
except Exception as e:
tb = traceback.format_exc()
full_log += f"An error occurred: {e}\n\nFull Traceback:\n{tb}"
yield last_screenshot_html, full_log, debug_log, f"Error: {e}"
with gr.Blocks() as iface:
gr.Markdown("# WebVoyager")
gr.Markdown("An LMM-powered web agent that can complete user instructions end-to-end.")
with gr.Row():
with gr.Column():
url_input = gr.Textbox(label="URL", placeholder="Enter the URL of the website")
task_input = gr.Textbox(label="Task", placeholder="Describe the task to perform")
criteria_input = gr.Textbox(label="Persona Criteria (TinyTroupe)", placeholder="Describe the persona you want (e.g. salesman for CRM)")
use_persona_toggle = gr.Checkbox(label="Use Persona", value=True)
submit_btn = gr.Button("Submit")
with gr.Column():
screenshot_output = gr.HTML(label="Agent's View")
agent_output = gr.Textbox(label="Agent Output", lines=20, interactive=False)
with gr.Row():
debug_output = gr.Textbox(label="Debug Log", lines=10, interactive=False)
raw_log_status = gr.Markdown(label="Raw Log Status")
submit_btn.click(
run_script_for_gradio,
inputs=[url_input, task_input, use_persona_toggle, criteria_input],
outputs=[screenshot_output, agent_output, debug_output, raw_log_status], api_name="execute_task"
)
# Mount Gradio to FastAPI
app = gr.mount_gradio_app(app, iface, path="/")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)