import base64
import io
import os
import streamlit as st
import pandas as pd
from pathlib import Path
from PIL import Image, ImageDraw
import ast
import sys
import time
print("[result_viewer] MODULE LOADED", file=sys.stderr, flush=True)
# Load .env from repo root or script dir so HF_IMAGES_LOCAL_PATH is set when running e.g. streamlit run
def _load_dotenv():
for base in (Path(__file__).resolve().parent.parent, Path(__file__).resolve().parent):
env_file = base / ".env"
if env_file.exists():
for line in env_file.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, _, v = line.partition("=")
key = k.strip()
val = v.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = val
break
_load_dotenv()
# Path to local dataset (HF_IMAGES_LOCAL_PATH = where HF CLI downloaded it, or save_to_disk output).
HF_IMAGES_LOCAL_PATH = os.environ.get("HF_IMAGES_LOCAL_PATH", "").strip()
TECHNICAL_REPORT_1_LINK = "http://www.fig.inc/blog/gui-pertubed-breaking-browser-use-models/?utm_source=huggingface&utm_medium=readme&utm_campaign=guip-p2"
CODE_LINK = "https://github.com/ManifoldRG/GUI-DR"
DATA_LINK = "https://huggingface.co/datasets/figai/GUI-Perturbed"
FIG_LINK = "https://fig.inc/"
MANIFOLDRG_LINK = "https://www.manifoldrg.com/"
# Media (logos): try script dir then repo root so it works from scripts/ or src/ (e.g. HF Space)
_script_dir = Path(__file__).resolve().parent
_repo_root = _script_dir.parent
MEDIA_DIR = (_script_dir / "media") if (_script_dir / "media").exists() else (_repo_root / "media")
PERTURBATION_VARIANTS = ["precision", "style", "text_shrink"]
def _logo_data_uri(filename):
"""Return data URI for a logo under media/ for use in HTML img src."""
path = MEDIA_DIR / filename
if not path.exists():
path = _repo_root / "media" / filename
if not path.exists():
return None
raw = path.read_bytes()
b64 = base64.b64encode(raw).decode()
suffix = path.suffix.lower()
mime = {"png": "image/png", "jpg": "image/jpeg", "jpeg": "image/jpeg", "webp": "image/webp", "svg": "image/svg+xml"}.get(suffix.lstrip("."), "image/png")
return f"data:{mime};base64,{b64}"
def _badge_icon_html(kind, fig_data_uri):
"""Return inline HTML for a small badge icon. kind: 'fig' | 'github' | 'huggingface'."""
style = "width:14px;height:14px;margin-right:5px;flex-shrink:0;vertical-align:middle;"
try:
if kind == "fig" and fig_data_uri:
return f''
if kind == "github":
svg = (
'"
)
return svg
if kind == "huggingface":
hf_path = (
"M12.025 1.13c-5.77 0-10.449 4.647-10.449 10.378 0 1.112.178 2.181.503 3.185.064-.222.203-.444.416-.577a.96.96 0 0 1 .524-.15c.293 0 .584.124.84.284.278.173.48.408.71.694.226.282.458.611.684.951v-.014c.017-.324.106-.622.264-.874s.403-.487.762-.543c.3-.047.596.06.787.203s.31.313.4.467c.15.257.212.468.233.542.01.026.653 1.552 1.657 2.54.616.605 1.01 1.223 1.082 1.912.055.537-.096 1.059-.38 1.572.637.121 1.294.187 1.967.187.657 0 1.298-.063 1.921-.178-.287-.517-.44-1.041-.384-1.581.07-.69.465-1.307 1.081-1.913 1.004-.987 1.647-2.513 1.657-2.539.021-.074.083-.285.233-.542.09-.154.208-.323.4-.467a1.08 1.08 0 0 1 .787-.203c.359.056.604.29.762.543s.247.55.265.874v.015c.225-.34.457-.67.683-.952.23-.286.432-.52.71-.694.257-.16.547-.284.84-.285a.97.97 0 0 1 .524.151c.228.143.373.388.43.625l.006.04a10.3 10.3 0 0 0 .534-3.273c0-5.731-4.678-10.378-10.449-10.378M8.327 6.583a1.5 1.5 0 0 1 .713.174 1.487 1.487 0 0 1 .617 2.013c-.183.343-.762-.214-1.102-.094-.38.134-.532.914-.917.71a1.487 1.487 0 0 1 .69-2.803m7.486 0a1.487 1.487 0 0 1 .689 2.803c-.385.204-.536-.576-.916-.71-.34-.12-.92.437-1.103.094a1.487 1.487 0 0 1 .617-2.013 1.5 1.5 0 0 1 .713-.174m-10.68 1.55a.96.96 0 1 1 0 1.921.96.96 0 0 1 0-1.92m13.838 0a.96.96 0 1 1 0 1.92.96.96 0 0 1 0-1.92M8.489 11.458c.588.01 1.965 1.157 3.572 1.164 1.607-.007 2.984-1.155 3.572-1.164.196-.003.305.12.305.454 0 .886-.424 2.328-1.563 3.202-.22-.756-1.396-1.366-1.63-1.32q-.011.001-.02.006l-.044.026-.01.008-.03.024q-.018.017-.035.036l-.032.04a1 1 0 0 0-.058.09l-.014.025q-.049.088-.11.19a1 1 0 0 1-.083.116 1.2 1.2 0 0 1-.173.18q-.035.029-.075.058a1.3 1.3 0 0 1-.251-.243 1 1 0 0 1-.076-.107c-.124-.193-.177-.363-.337-.444-.034-.016-.104-.008-.2.022q-.094.03-.216.087-.06.028-.125.063l-.13.074q-.067.04-.136.086a3 3 0 0 0-.135.096 3 3 0 0 0-.26.219 2 2 0 0 0-.12.121 2 2 0 0 0-.106.128l-.002.002a2 2 0 0 0-.09.132l-.001.001a1.2 1.2 0 0 0-.105.212q-.013.036-.024.073c-1.139-.875-1.563-2.317-1.563-3.203 0-.334.109-.457.305-.454m.836 10.354c.824-1.19.766-2.082-.365-3.194-1.13-1.112-1.789-2.738-1.789-2.738s-.246-.945-.806-.858-.97 1.499.202 2.362c1.173.864-.233 1.45-.685.64-.45-.812-1.683-2.896-2.322-3.295s-1.089-.175-.938.647 2.822 2.813 2.562 3.244-1.176-.506-1.176-.506-2.866-2.567-3.49-1.898.473 1.23 2.037 2.16c1.564.932 1.686 1.178 1.464 1.53s-3.675-2.511-4-1.297c-.323 1.214 3.524 1.567 3.287 2.405-.238.839-2.71-1.587-3.216-.642-.506.946 3.49 2.056 3.522 2.064 1.29.33 4.568 1.028 5.713-.624m5.349 0c-.824-1.19-.766-2.082.365-3.194 1.13-1.112 1.789-2.738 1.789-2.738s.246-.945.806-.858.97 1.499-.202 2.362c-1.173.864.233 1.45.685.64.451-.812 1.683-2.896 2.322-3.295s1.089-.175.938.647-2.822 2.813-2.562 3.244 1.176-.506 1.176-.506 2.866-2.567 3.49-1.898-.473 1.23-2.037 2.16c-1.564.932-1.686 1.178-1.464 1.53s3.675-2.511 4-1.297c.323 1.214-3.524 1.567-3.287 2.405.238.839 2.71-1.587 3.216-.642.506.946-3.49 2.056-3.522 2.064-1.29.33-4.568 1.028-5.713-.624"
)
svg_str = (
'"
)
b64 = base64.b64encode(svg_str.encode("utf-8")).decode("ascii")
data_uri = f"data:image/svg+xml;base64,{b64}"
return f'
'
except Exception:
pass
return ""
st.set_page_config(page_title="GUI Perturbation Evaluation Viewer", page_icon="🔬", layout="wide")
# Theme and layout styles; system fonts only (no external CDN requests)
st.markdown("""
""", unsafe_allow_html=True)
def _parse_success(value):
"""Normalize hit_box_accuracy to bool."""
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.lower() == "true"
return False
def _csv_mtime():
"""Return CSV modification time so cache invalidates when file changes."""
for base in (_repo_root, _script_dir):
candidate = base / "data" / "baseline_results_full_new.csv"
if candidate.exists():
return candidate.stat().st_mtime
return None
@st.cache_data
def load_data(_mtime=None):
"""Load and clean data. Tries repo root (HF Space: /app/data/) then script dir."""
csv_path = None
for base in (_repo_root, _script_dir):
candidate = base / "data" / "baseline_results_full_new.csv"
if candidate.exists():
csv_path = candidate
break
if csv_path is None:
return pd.DataFrame()
df = pd.read_csv(csv_path, low_memory=False)
if "interesting_cases" in df.columns:
df = df[df["interesting_cases"] != "Invalid"]
df["success"] = df["hit_box_accuracy"].astype(str).str.lower() == "true"
return df
def _debug_csv_paths():
"""Return list of (path_str, exists) for triage when no data found."""
out = []
for name, base in [("repo_root", _repo_root), ("script_dir", _script_dir)]:
p = base / "data" / "baseline_results_full_new.csv"
out.append((f"{name}: {p}", p.exists()))
out.append((f"__file__ = {__file__}", None))
return out
def resolve_image_path(row):
"""Get image path for a row - variant-specific patterns then exact path."""
image_path = row.get('image_path', '')
if not image_path or pd.isna(image_path):
return None
if image_path.startswith('/mnt/'):
image_path = image_path[5:]
image_path_obj = Path(image_path)
image_dir = _script_dir / image_path_obj.parent if not image_path_obj.is_absolute() else image_path_obj.parent
step_idx = str(row.get('step_index'))
variant = row.get('variant', '')
for pattern in [
f"step_{step_idx}_{variant}_*.png",
f"step_{step_idx}_*{variant}*.png",
f"*{variant}*step_{step_idx}*.png",
f"step_{step_idx}_*.png",
]:
matching = list(image_dir.glob(pattern))
if matching:
return matching[0]
exact = _script_dir / image_path
if exact.exists():
return exact
if HF_IMAGES_LOCAL_PATH:
base = Path(HF_IMAGES_LOCAL_PATH)
name = image_path_obj.name
for candidate in (base / name, base / "images" / name):
if candidate.exists():
return candidate
return None
def _get_local_dataset_path():
"""Return canonical path to local dataset root (snapshot or save_to_disk)."""
if HF_IMAGES_LOCAL_PATH:
return str(Path(HF_IMAGES_LOCAL_PATH).resolve())
if Path("/data").is_dir():
return "/data/gui_perturbed_subset"
return str(_repo_root / "data" / "gui_perturbed_subset")
def _row_to_key(row):
"""(task_id, step_index, variant) from CSV row."""
task_id, step_index, variant = row.get("task_id"), row.get("step_index"), row.get("variant")
if pd.isna(task_id) or pd.isna(step_index) or pd.isna(variant):
return None
try:
return (str(task_id), int(step_index), str(variant))
except (TypeError, ValueError):
return None
@st.cache_data
def _load_local_dataset(path):
"""Build a lazy index from parquet: only read key columns (no images). Returns (parquet_paths, key->(path, row_idx), error_msg)."""
if not path:
return None, None, "path is empty"
base = Path(path).resolve()
if not base.exists():
return None, None, f"path does not exist: {base}"
try:
import pyarrow.parquet as pq
except ImportError as e:
return None, None, f"pyarrow import failed: {e}"
data_dir = base / "data"
if data_dir.is_dir():
parquet_files = sorted(data_dir.glob("*.parquet"))
else:
parquet_files = list(base.rglob("*.parquet"))
if not parquet_files:
return None, None, f"no parquet files under {base}"
index = {}
paths = []
for pf in parquet_files:
try:
t = pq.read_table(pf, columns=["task_id", "step_index", "visual_variant"])
vcol = "visual_variant"
except Exception:
try:
t = pq.read_table(pf, columns=["task_id", "step_index", "variant"])
vcol = "variant"
except Exception:
continue
paths.append(str(pf))
task_ids = t.column("task_id")
step_indices = t.column("step_index")
variants = t.column(vcol)
for i in range(t.num_rows):
ti, si, v = task_ids[i], step_indices[i], variants[i]
if ti is None or si is None or v is None:
continue
try:
key = (str(ti.as_py()) if hasattr(ti, "as_py") else str(ti), int(si.as_py()) if hasattr(si, "as_py") else int(si), str(v.as_py()) if hasattr(v, "as_py") else str(v))
except Exception:
continue
index[key] = (str(pf), i)
if not index:
return None, None, "no valid rows in parquet files"
return paths, index, None
def _read_screenshot_from_parquet(file_path, row_idx):
"""Read a single row's screenshot from a parquet file. Returns PIL Image or None."""
try:
import pyarrow.parquet as pq
except ImportError:
return None
def _extract_image(row_val):
if row_val is None:
return None
if hasattr(row_val, "as_py"):
row_val = row_val.as_py()
if isinstance(row_val, dict):
b = row_val.get("bytes")
if b is not None:
if hasattr(b, "as_py"):
b = b.as_py()
if not isinstance(b, bytes):
b = bytes(b)
return Image.open(io.BytesIO(b))
return None
def _read_single_row(pf, col_name, row_idx):
offset = 0
for rg in range(pf.metadata.num_row_groups):
rg_size = pf.metadata.row_group(rg).num_rows
if row_idx < offset + rg_size:
t = pf.read_row_group(rg, columns=[col_name])
return t.column(col_name)[row_idx - offset]
offset += rg_size
return None
try:
pf = pq.ParquetFile(file_path)
if row_idx < 0 or row_idx >= pf.metadata.num_rows:
return None
for col_name in ("screenshot", "image"):
try:
row_val = _read_single_row(pf, col_name, row_idx)
img = _extract_image(row_val)
if img is not None:
return img
except Exception:
continue
except Exception:
pass
return None
def _ensure_dataset_loaded():
path = _get_local_dataset_path()
if "_ds_index" in st.session_state and "_ds_parquet_paths" in st.session_state:
return
result = _load_local_dataset(path)
if isinstance(result, (list, tuple)) and len(result) >= 2:
parquet_paths, index = result[0], result[1]
load_error = result[2] if len(result) > 2 else None
else:
parquet_paths, index, load_error = None, None, "unexpected return from _load_local_dataset"
if parquet_paths is not None and index is not None:
st.session_state["_ds_parquet_paths"] = parquet_paths
st.session_state["_ds_index"] = index
st.session_state["_ds_base_path"] = path
def _pil_from_row(row_data):
"""Convert dataset row's screenshot/image to PIL Image. Handles dict, bytes, PIL, and Arrow types."""
if row_data is None:
return None
if not isinstance(row_data, dict) and hasattr(row_data, "keys"):
row_data = dict(row_data)
elif not isinstance(row_data, dict):
return None
img = row_data.get("screenshot") or row_data.get("image")
if img is None:
return None
if hasattr(img, "as_py"):
img = img.as_py()
if img is None:
return None
if isinstance(img, Image.Image):
return img
if isinstance(img, bytes):
return Image.open(io.BytesIO(img))
if isinstance(img, dict):
if "bytes" in img and img["bytes"]:
b = img["bytes"]
if hasattr(b, "as_py"):
b = b.as_py()
try:
if not isinstance(b, bytes):
b = bytes(b)
return Image.open(io.BytesIO(b))
except Exception:
pass
path_val = img.get("path")
if path_val and isinstance(path_val, str):
base = HF_IMAGES_LOCAL_PATH or st.session_state.get("_ds_base_path") or _get_local_dataset_path()
if base:
base_path = Path(base).resolve()
candidate = (base_path / path_val).resolve()
if candidate.is_relative_to(base_path) and candidate.exists():
try:
return Image.open(candidate)
except Exception:
pass
candidate2 = (base_path / "images" / path_val).resolve()
if candidate2.is_relative_to(base_path) and candidate2.exists():
try:
return Image.open(candidate2)
except Exception:
pass
if hasattr(img, "__array__"):
try:
import numpy as np
arr = np.asarray(img)
if arr.dtype == np.uint8 and arr.ndim >= 2:
return Image.fromarray(arr)
except Exception:
pass
return None
def get_image_for_row(row):
"""PIL for this row: local file first, else from local dataset (lazy single-row read from parquet)."""
img_path = resolve_image_path(row)
if img_path and img_path.exists():
try:
return Image.open(img_path)
except Exception:
pass
_ensure_dataset_loaded()
key = _row_to_key(row)
if key is None:
return None
index = st.session_state.get("_ds_index")
if not index or key not in index:
return None
file_path, row_idx = index[key]
return _read_screenshot_from_parquet(file_path, row_idx)
def format_raw_prediction(raw_pred):
"""Return raw prediction as string for display, or None if missing."""
return None if pd.isna(raw_pred) else str(raw_pred)
def parse_coords(coord_str):
"""Parse coordinate string like '[553, 86]' to (x, y) or None."""
if pd.isna(coord_str):
return None
try:
coords = ast.literal_eval(coord_str)
if isinstance(coords, list) and len(coords) >= 2:
return (int(coords[0]), int(coords[1]))
except (ValueError, TypeError, SyntaxError):
pass
return None
# Solid cursor colors; semi-transparent so overlapping cursors stay visible
CONTRAST_OUTLINE = (50, 50, 50)
CURSOR_ALPHA = 180
MODEL_STYLES = {
"gta1": {"color": (0, 0, 0), "label": "GTA1"},
"qwen25vl": {"color": (255, 255, 255), "label": "Qwen2.5VL"},
"uitars15": {"color": (255, 165, 0), "label": "UI-TARS-1.5"},
}
def _model_label(model):
"""Display label for a model key."""
return MODEL_STYLES.get(model, {"label": model})["label"]
def _arrow_points(scale):
"""Arrow shape with tip at origin, pointing down-right. Returns list of (dx, dy)."""
s = scale
return [
(0, 0),
(0, 48 * s),
(12 * s, 36 * s),
(21 * s, 54 * s),
(27 * s, 51 * s),
(18 * s, 33 * s),
(33 * s, 33 * s),
]
def _draw_cursor_arrow(draw, cx, cy, fill_color, scale=1.0, outline_color=None):
"""Draw arrow cursor with tip at (cx, cy)."""
pts_rel = _arrow_points(scale)
pts_int = [(int(cx + x), int(cy + y)) for x, y in pts_rel]
outline = outline_color if outline_color is not None else CONTRAST_OUTLINE
draw.polygon(pts_int, fill=fill_color, outline=outline, width=max(1, int(2 * scale)))
def draw_model_prediction(draw, coords, model, scale=1.0, alpha=255):
"""Draw a model's prediction as solid arrow cursor."""
if not coords:
return
cx, cy = int(coords[0]), int(coords[1])
style = MODEL_STYLES.get(model, {'color': (180, 180, 180), 'label': model})
color = style.get('color', (180, 180, 180))
fill_rgba = (*color, alpha)
outline_rgba = (*CONTRAST_OUTLINE, 255)
_draw_cursor_arrow(draw, cx, cy, fill_rgba, scale, outline_rgba)
def _draw_dashed_rect(draw, x, y, w, h, color, width, dash_length=8, gap_length=8):
"""Draw a dashed rectangle."""
def draw_dashed_line(p1, p2, c, w):
dx = p2[0] - p1[0]
dy = p2[1] - p1[1]
dist = (dx**2 + dy**2) ** 0.5
if dist == 0:
return
n = int(dist / (dash_length + gap_length))
for i in range(n + 1):
t0 = i * (dash_length + gap_length) / dist
t1 = min(1.0, (i * (dash_length + gap_length) + dash_length) / dist)
start = (p1[0] + dx * t0, p1[1] + dy * t0)
end = (p1[0] + dx * t1, p1[1] + dy * t1)
draw.line([start, end], fill=c, width=w)
draw_dashed_line((x, y), (x + w, y), color, width)
draw_dashed_line((x + w, y), (x + w, y + h), color, width)
draw_dashed_line((x + w, y + h), (x, y + h), color, width)
draw_dashed_line((x, y + h), (x, y), color, width)
def annotate_image_multi_model(img, rows_by_model, selected_models, draw_predictions=False):
"""Annotate image with GT bbox. If draw_predictions=True, also draw model cursor predictions."""
annotated_img = img.copy().convert("RGBA")
draw = ImageDraw.Draw(annotated_img)
first_row = next(iter(rows_by_model.values()), None)
if first_row is not None and pd.notna(first_row.get("ground_truth_bbox")):
try:
gt_bbox = ast.literal_eval(first_row["ground_truth_bbox"])
if len(gt_bbox) >= 4:
x, y, w, h = gt_bbox[0], gt_bbox[1], gt_bbox[2], gt_bbox[3]
outer_color, inner_color = (255, 0, 0), (255, 255, 0)
_draw_dashed_rect(draw, x, y, w, h, outer_color, 5)
offset = 2
if w > 2 * offset and h > 2 * offset:
_draw_dashed_rect(
draw, x + offset, y + offset, w - 2 * offset, h - 2 * offset, inner_color, 3
)
except (ValueError, TypeError, SyntaxError):
pass
if draw_predictions:
for model in selected_models:
if model in rows_by_model:
row = rows_by_model[model]
coords = parse_coords(row.get('coordinates'))
draw_model_prediction(draw, coords, model, alpha=CURSOR_ALPHA)
return annotated_img
# Max display width for images sent via WebSocket
_MAX_IMG_W = 1100
def _prep_for_display(img):
"""Resize to display width and convert to RGB JPEG to minimise WebSocket payload."""
if img.width > _MAX_IMG_W:
ratio = _MAX_IMG_W / img.width
img = img.resize((_MAX_IMG_W, int(img.height * ratio)), Image.LANCZOS)
return img.convert("RGB")
def _render_model_status(row):
"""Render just the Success/Failure badge."""
success = row['success']
status_class = "gui-viewer-status-success" if success else "gui-viewer-status-failure"
status_text = "Success" if success else "Failure"
st.markdown(f"
Explore how 7B GUI grounding models perform on original vs. perturbed screenshots from GUI-Perturbed