from __future__ import annotations
import json
import re
import subprocess
import time
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from urllib.parse import quote_plus, unquote, urlparse, parse_qs
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import sync_playwright
def _compact_text(text: str, max_chars: int = 1200) -> str:
text = re.sub(r"\s+", " ", (text or "")).strip()
return text[:max_chars]
def _extract_quoted_text(prompt: str) -> Optional[str]:
for pattern in (r'"([^"]{1,160})"', r"'([^']{1,160})'"):
match = re.search(pattern, prompt)
if match:
return match.group(1).strip()
return None
def _extract_click_target(prompt: str) -> Optional[str]:
quoted = _extract_quoted_text(prompt or "")
if quoted and any(word in (prompt or "").lower() for word in ("click", "open", "select", "follow", "press")):
return quoted
match = re.search(r"click\s+(?:the\s+)?(?:link|button|tab|menu|item)?\s*([A-Za-z0-9][A-Za-z0-9 _-]{1,80})", prompt or "", flags=re.I)
if match:
return match.group(1).strip()
return None
def _extract_open_target(prompt: str) -> Optional[str]:
quoted = _extract_quoted_text(prompt or "")
lowered = (prompt or "").lower()
if quoted and any(word in lowered for word in ("open", "visit", "go to", "browse", "navigate to")):
return quoted
domain_match = re.search(
r"(?:open|visit|go to|browse|navigate to)\s+(?:the\s+)?((?:[A-Za-z0-9-]+\.)+[A-Za-z]{2,})(?:[\\s,;:.!?]|$)",
prompt or "",
flags=re.I,
)
if domain_match:
return domain_match.group(1).strip()
match = re.search(
r"(?:open|visit|go to|browse|navigate to)\s+(?:the\s+)?([A-Za-z0-9][A-Za-z0-9 ._-]{2,120})",
prompt or "",
flags=re.I,
)
if match:
return match.group(1).strip()
return None
def _looks_like_domain(target: str) -> bool:
value = (target or "").strip().strip(".,;:!?")
if not value or " " in value:
return False
return bool(re.fullmatch(r"(?:[A-Za-z0-9-]+\.)+[A-Za-z]{2,}", value))
def _extract_type_text(prompt: str) -> Optional[str]:
quoted = _extract_quoted_text(prompt or "")
if quoted and any(word in (prompt or "").lower() for word in ("type", "enter", "input", "fill", "search for")):
return quoted
return None
def _extract_search_query(prompt: str) -> Optional[str]:
quoted = _extract_quoted_text(prompt or "")
lowered = (prompt or "").lower()
if quoted and any(word in lowered for word in ("search for", "look up", "find", "search")):
return quoted
match = re.search(
r"(?:search for|look up|find)\s+(?:the\s+)?([A-Za-z0-9][A-Za-z0-9 _:/().,#-]{2,120})",
prompt or "",
flags=re.I,
)
if match:
return match.group(1).strip()
return None
def _extract_scroll_direction(prompt: str) -> Optional[str]:
p = (prompt or "").lower()
if any(token in p for token in ("scroll to top", "back to top", "scroll up", "move up")):
return "up"
if any(token in p for token in ("scroll to bottom", "scroll down", "move down", "go lower")):
return "down"
return None
def _extract_scroll_target(prompt: str) -> Optional[str]:
quoted = _extract_quoted_text(prompt or "")
if quoted and any(word in (prompt or "").lower() for word in ("scroll to", "find", "locate", "until you see")):
return quoted
match = re.search(
r"(?:scroll to|until you see|find|locate)\s+(?:the\s+)?([A-Za-z0-9][A-Za-z0-9 _:/().,#-]{2,100})",
prompt or "",
flags=re.I,
)
if match:
return match.group(1).strip()
return None
def _should_follow_search(prompt: str) -> bool:
p = (prompt or "").lower()
return any(
token in p
for token in (
"open", "visit", "go to", "inspect", "read", "summarize", "analyze",
"explore", "browse", "website", "site", "page", "official", "login",
"click", "form", "button", "dashboard",
)
)
def _wants_observation(prompt: str) -> bool:
p = (prompt or "").lower()
return any(
token in p
for token in (
"what page",
"what is open",
"what do you see",
"summarize",
"inspect",
"analyze",
"report",
"title",
"url",
)
)
def _browser_observation_snippet(payload: Dict[str, Any], max_chars: int = 900) -> str:
parts: List[str] = []
action = str(payload.get("action") or "").strip()
title = str(payload.get("title") or "").strip()
url = str(payload.get("url") or "").strip()
error = str(payload.get("error") or "").strip()
summary = str(payload.get("summary") or payload.get("text") or "").strip()
if action:
parts.append(f"ACTION: {action}")
if title:
parts.append(f"TITLE: {title}")
if url:
parts.append(f"URL: {url}")
if error:
parts.append(f"ERROR: {_compact_text(error, max_chars // 2)}")
if summary:
parts.append(f"SUMMARY: {_compact_text(summary, max_chars)}")
acc = payload.get("accessibility_tree")
if acc:
parts.append(f"ACCESSIBILITY: {_compact_text(str(acc), max_chars // 2)}")
ocr = payload.get("ocr")
if isinstance(ocr, dict):
ocr_text = str(ocr.get("text") or "").strip()
if ocr_text:
parts.append(f"OCR: {_compact_text(ocr_text, max_chars // 3)}")
return "\n".join(parts).strip()
def _normalize_label(text: str) -> str:
return re.sub(r"\s+", " ", (text or "").strip().lower())
def _score_text_target(target: str, candidate_text: str, candidate_href: str = "") -> float:
target_n = _normalize_label(target)
candidate_n = _normalize_label(candidate_text)
href_n = _normalize_label(candidate_href)
if not target_n:
return 0.0
score = 0.0
if candidate_n == target_n:
score += 10.0
if target_n and target_n in candidate_n:
score += 5.0
target_tokens = [token for token in re.split(r"\W+", target_n) if token]
candidate_tokens = set(token for token in re.split(r"\W+", candidate_n) if token)
href_tokens = set(token for token in re.split(r"\W+", href_n) if token)
for token in target_tokens:
if token in candidate_tokens:
score += 1.5
if token in href_tokens:
score += 0.5
return score
def run_browser_agentic_sequence(
task_prompt: str,
invoke_browser_action,
*,
max_steps: int = 12,
observation_chars: int = 1500,
) -> Dict[str, Any]:
"""
Advanced agentic workflow with loop prevention, self-correction, and vision-guided exploration.
"""
prompt = (task_prompt or "").strip()
if not prompt:
return {"mode": "browser_agentic", "steps": [], "summary": "", "done_reason": "empty_prompt"}
steps: List[Dict[str, Any]] = []
final_payload: Dict[str, Any] = {}
remaining = max(1, int(max_steps))
# LOOP PREVENTION: Track visited states and failed targets
visited_states = set() # Set of (url, content_hash)
blacklist = set() # Set of (url, label_idx) that did nothing
last_action_was_stagnant = False
def _record(payload: Dict[str, Any], *, action: str, target: str = ""):
observation = _browser_observation_snippet(payload, max_chars=observation_chars)
# HUD: Push rich activity update
invoke_browser_action(action="set_hud", query=f"{action.upper()}: {target or 'Analyzing'}")
steps.append({
"step": len(steps) + 1,
"action": action,
"target": target,
"url": payload.get("url"),
"title": payload.get("title"),
"observation": observation,
"memory": payload.get("working_memory")
})
def _get_state_hash(payload: Dict[str, Any]) -> tuple:
url = payload.get("url", "")
# Create a stable hash of the interactive elements to detect content changes
labels = payload.get("labeled_elements") or []
label_fingerprint = "|".join([f"{l['idx']}:{l['tag']}" for l in labels[:15]])
return (url, label_fingerprint)
# INITIAL ACTION: Deep Search or Open
direct_url = re.search(r"https?://[^\s)>\"]+", prompt)
if direct_url:
final_payload = invoke_browser_action(action="open", url=direct_url.group(0).rstrip(".,;:!?"))
else:
final_payload = invoke_browser_action(action="search", query=_compact_text(prompt, 180))
_record(final_payload, action="initial_dispatch")
visited_states.add(_get_state_hash(final_payload))
remaining -= 1
# HUD: Initialize Mission Visuals
invoke_browser_action(action="set_hud", query=f"Mission: {prompt[:40]}...")
while remaining > 0:
step_num = int(max_steps - remaining + 1)
invoke_browser_action(action="update_progress", query=str(step_num))
current_state = _get_state_hash(final_payload)
page_text = str(final_payload.get("text") or "").lower()
labels = final_payload.get("labeled_elements") or []
current_url = final_payload.get("url", "")
# SELF-CORRECT: If blocked by popups, clean the page
if any(token in page_text for token in ("cookie", "consent", "accept our", "subscribe", "sign up")):
final_payload = invoke_browser_action(action="clean")
_record(final_payload, action="self_correct_cleanup")
remaining -= 1
continue
# DECISION LOGIC
action_to_take = "observe"
target_val = ""
# 1. Classification: What does the task actually want right now?
# Use low-level heuristics to map the overarching task to the current page state
click_query = _extract_click_target(prompt)
type_query = _extract_type_text(prompt)
search_query = _extract_search_query(prompt)
open_query = _extract_open_target(prompt)
if open_query and current_url == "": # Only jump if we are nowhere
action_to_take = "open"
target_val = open_query
elif type_query and any(l['tag'] in ('INPUT', 'TEXTAREA') for l in labels):
# Find the best input box
inputs = [l for l in labels if l['tag'] in ('INPUT', 'TEXTAREA')]
if inputs:
action_to_take = "type"
target_val = type_query
# Note: In a real system we'd pick the input index, but here we'll assume the primary input
elif click_query:
targets = [
l for l in labels
if _score_text_target(click_query, l['text']) > 4
and (current_url, l['idx']) not in blacklist
]
if targets:
action_to_take = "click_label"
target_val = str(targets[0]['idx'])
if action_to_take == "observe":
if any(token in prompt.lower() for token in ("scroll", "find", "look for")) or last_action_was_stagnant:
action_to_take = "scroll"
target_val = _extract_scroll_direction(prompt) or "down"
else:
action_to_take = "vision"
# EXECUTE
if action_to_take == "click_label":
final_payload = invoke_browser_action(action=action_to_take, query=target_val)
elif action_to_take == "type":
final_payload = invoke_browser_action(action=action_to_take, text=target_val)
elif action_to_take == "open":
final_payload = invoke_browser_action(action=action_to_take, url=target_val)
elif action_to_take == "scroll":
final_payload = invoke_browser_action(action=action_to_take, direction=target_val)
else:
final_payload = invoke_browser_action(action="vision")
_record(final_payload, action=action_to_take, target=target_val)
# STAGNATION CHECK (for stateful actions)
if action_to_take in ("click_label", "type", "open"):
new_state = _get_state_hash(final_payload)
if new_state == current_state:
if action_to_take == "click_label" and targets:
blacklist.add((current_url, targets[0]['idx']))
last_action_was_stagnant = True
else:
visited_states.add(new_state)
last_action_was_stagnant = False
remaining -= 1
# Termination condition
if len(page_text) > 1200 and "results" not in final_payload:
if not any(token in prompt.lower() for token in ("click", "type", "login", "submit")):
break
invoke_browser_action(action="set_hud", query="MISSION: COMPLETE")
invoke_browser_action(action="update_progress", query="12")
return {
"mode": "browser_agentic",
"steps": steps,
"summary": "\n\n".join(step["observation"] for step in steps if step.get("observation")),
"done_reason": "completed",
"final_observation": final_payload,
}
def _unwrap_yahoo_redirect(url: str) -> str:
try:
parsed = urlparse(url)
if "search.yahoo.com" in parsed.netloc or "r.search.yahoo.com" in parsed.netloc:
qs = parse_qs(parsed.query)
ru_values = qs.get("RU") or qs.get("ru")
if ru_values and ru_values[0]:
return unquote(ru_values[0])
match = re.search(r"/RU=([^/]+)/", parsed.path)
if match:
return unquote(match.group(1))
except Exception:
pass
return url
class PlaywrightBrowserTool:
"""
Persistent Playwright browser/computer-use wrapper for Phillnet.
Supports:
- search
- open
- click
- type
- press
- scroll
- scroll_to_text
- vision
- screenshot
- accessibility
- ocr
"""
_OVERLAY_STYLE = """
#phillnet-cursor {
position: fixed;
width: 32px;
height: 32px;
border: 1px solid rgba(0, 242, 255, 0.4);
border-radius: 50%;
background: radial-gradient(circle, rgba(0, 242, 255, 0.1) 0%, transparent 70%);
box-shadow: 0 0 20px rgba(0, 242, 255, 0.2);
pointer-events: none;
transform: translate(-50%, -50%);
z-index: 2147483647;
transition: left 160ms cubic-bezier(0.19, 1, 0.22, 1),
top 160ms cubic-bezier(0.19, 1, 0.22, 1),
width 0.2s ease, height 0.2s ease, border-color 0.2s ease;
display: flex;
align-items: center;
justify-content: center;
}
#phillnet-cursor-inner {
width: 6px;
height: 6px;
background: #00f2ff;
border-radius: 50%;
box-shadow: 0 0 12px #00f2ff, 0 0 24px rgba(0, 242, 255, 0.6);
transition: transform 0.2s ease, background 0.2s ease;
}
#phillnet-cursor.locked {
width: 48px;
height: 48px;
border-color: #ff00ff;
border-width: 2px;
box-shadow: 0 0 30px rgba(255, 0, 255, 0.4);
}
#phillnet-cursor.locked #phillnet-cursor-inner {
background: #ff00ff;
transform: scale(1.5);
box-shadow: 0 0 15px #ff00ff, 0 0 30px rgba(255, 0, 255, 0.8);
}
#phillnet-cursor-rings {
position: absolute;
width: 100%; height: 100%;
border: 1px solid rgba(0, 242, 255, 0.2);
border-radius: 50%;
animation: phillnet-pulse 2s infinite;
}
@keyframes phillnet-pulse {
0% { transform: scale(0.8); opacity: 0.8; }
100% { transform: scale(1.5); opacity: 0; }
}
#phillnet-token-tag {
position: fixed;
padding: 4px 10px;
background: rgba(15, 23, 42, 0.96);
color: #7dd3fc;
font: bold 12px 'Consolas', 'Courier New', monospace;
border: 1px solid rgba(125, 211, 252, 0.4);
border-radius: 6px;
pointer-events: none;
z-index: 2147483647;
white-space: nowrap;
transform: translate(18px, 18px);
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.45);
transition: left 140ms cubic-bezier(0.19, 1, 0.22, 1), top 140ms cubic-bezier(0.19, 1, 0.22, 1);
}
.phillnet-interactive-label {
position: fixed;
background: #00f2ff;
color: #0f172a;
padding: 2px 6px;
border-radius: 4px;
font: bold 10px 'Segoe UI', system-ui, sans-serif;
z-index: 2147483640;
pointer-events: none;
box-shadow: 0 2px 8px rgba(0, 242, 255, 0.5);
transform: translate(-100%, -100%);
opacity: 0.9;
}
.phillnet-component-box {
position: fixed;
border: 1px dashed rgba(0, 242, 255, 0.25);
background: rgba(0, 242, 255, 0.03);
pointer-events: none;
z-index: 2147483630;
}
.phillnet-component-label {
position: absolute;
top: 0; left: 0;
background: rgba(0, 242, 255, 0.8);
color: #0f172a;
padding: 1px 4px;
font: bold 9px monospace;
text-transform: uppercase;
}
#phillnet-reticle {
position: fixed;
width: 60px; height: 60px;
border: 1px dashed rgba(0, 242, 255, 0.4);
border-radius: 50%;
pointer-events: none;
z-index: 2147483647;
transform: translate(-50%, -50%) scale(2);
opacity: 0;
transition: transform 300ms ease-out, opacity 300ms ease;
}
#phillnet-reticle.active {
transform: translate(-50%, -50%) scale(1);
opacity: 1;
}
#phillnet-scanline {
position: fixed;
top: 0; left: 0; width: 100%; height: 2px;
background: linear-gradient(90deg, transparent, rgba(0, 242, 255, 0.8), transparent);
z-index: 2147483645;
pointer-events: none;
opacity: 0;
}
@keyframes phillnet-scan {
0% { top: 0%; }
100% { top: 100%; }
}
#phillnet-scanline.scanning {
opacity: 1;
animation: phillnet-scan 2s linear infinite;
}
#phillnet-trail {
position: fixed;
width: 6px;
height: 6px;
border-radius: 50%;
background: rgba(0, 242, 255, 0.3);
pointer-events: none;
transform: translate(-50%, -50%);
z-index: 2147483646;
}
#phillnet-hud {
position: fixed;
top: 16px;
right: 16px;
width: 340px;
background: rgba(10, 15, 25, 0.94);
backdrop-filter: blur(8px);
border: 1px solid rgba(0, 242, 255, 0.25);
border-radius: 12px;
padding: 14px;
color: #7dd3fc;
font: 11px/1.4 'Consolas', monospace;
z-index: 2147483647;
pointer-events: none;
box-shadow: 0 10px 40px rgba(0,0,0,0.6);
display: flex;
flex-direction: column;
gap: 8px;
}
.hud-header {
display: flex;
justify-content: space-between;
align-items: center;
border-bottom: 1px solid rgba(0, 242, 255, 0.1);
padding-bottom: 6px;
}
.hud-title {
font-weight: 800;
letter-spacing: 0.1em;
color: #00f2ff;
text-transform: uppercase;
}
.hud-progress {
height: 2px;
background: rgba(0, 242, 255, 0.1);
flex-grow: 1;
margin: 0 10px;
position: relative;
}
.hud-progress-fill {
position: absolute;
top: 0; left: 0; height: 100%;
background: #00f2ff;
transition: width 0.3s ease;
}
.hud-activity {
max-height: 80px;
overflow: hidden;
display: flex;
flex-direction: column-reverse;
gap: 3px;
}
.activity-item {
color: #94a3b8;
border-left: 2px solid rgba(0, 242, 255, 0.4);
padding-left: 6px;
opacity: 0.8;
animation: ph-fade-in 0.3s ease-out;
}
.activity-item.active {
color: #00f2ff;
opacity: 1;
}
.hud-memory {
background: rgba(0, 242, 255, 0.05);
border-radius: 6px;
padding: 6px;
display: none;
}
.hud-memory.active { display: block; }
.mem-tag {
color: #38bdf8;
font-weight: bold;
margin-right: 4px;
}
@keyframes ph-fade-in { from { opacity: 0; transform: translateX(5px); } to { opacity: 1; transform: translateX(0); } }
#phillnet-keyflash {
position: fixed;
bottom: 24px;
right: 24px;
padding: 12px 18px;
background: rgba(0, 242, 255, 0.9);
color: #0f172a;
font: bold 14px 'Consolas', monospace;
border-radius: 12px;
opacity: 0;
transform: translateY(10px);
transition: opacity 160ms ease, transform 160ms ease;
z-index: 2147483647;
pointer-events: none;
box-shadow: 0 0 20px rgba(0, 242, 255, 0.4);
}
#phillnet-keyflash.show {
opacity: 1;
transform: translateY(0);
}
.phillnet-targeted {
outline: 2px solid #00f2ff !important;
outline-offset: 4px !important;
box-shadow: 0 0 15px rgba(0, 242, 255, 0.4) !important;
scroll-margin: 150px !important;
}
#phillnet-vision-badge {
position: fixed;
bottom: 80px;
right: 24px;
padding: 6px 12px;
background: rgba(15, 23, 42, 0.9);
color: #00f2ff;
font: bold 10px monospace;
border: 1px solid rgba(0, 242, 255, 0.3);
border-radius: 4px;
z-index: 2147483647;
text-transform: uppercase;
opacity: 0;
transition: opacity 300ms ease;
}
#phillnet-vision-badge.active {
opacity: 1;
}
"""
_OVERLAY_SCRIPT = """
() => {
if (window.__phillnetOverlayInstalled) return;
const style = document.createElement('style');
style.textContent = `%STYLE%`;
document.head.appendChild(style);
const cursor = document.createElement('div');
cursor.id = 'phillnet-cursor';
cursor.innerHTML = '
';
cursor.style.left = '18px';
cursor.style.top = '18px';
const trail = document.createElement('div');
trail.id = 'phillnet-trail';
trail.style.left = '18px';
trail.style.top = '18px';
const tokenTag = document.createElement('div');
tokenTag.id = 'phillnet-token-tag';
tokenTag.textContent = 'PHILLNET';
const hud = document.createElement('div');
hud.id = 'phillnet-hud';
hud.innerHTML = `
Initializing Neural Cortex...
`;
const reticle = document.createElement('div');
reticle.id = 'phillnet-reticle';
const scanline = document.createElement('div');
scanline.id = 'phillnet-scanline';
const keyflash = document.createElement('div');
keyflash.id = 'phillnet-keyflash';
const visionBadge = document.createElement('div');
visionBadge.id = 'phillnet-vision-badge';
visionBadge.textContent = 'NEURAL VISION ACTIVE';
document.body.appendChild(trail);
document.body.appendChild(cursor);
document.body.appendChild(tokenTag);
document.body.appendChild(hud);
document.body.appendChild(reticle);
document.body.appendChild(scanline);
document.body.appendChild(keyflash);
document.body.appendChild(visionBadge);
window.__phillnetSetVisionBadge = (active) => {
if (active) visionBadge.classList.add('active');
else visionBadge.classList.remove('active');
};
window.__phillnetMoveCursor = (x, y, label) => {
cursor.style.left = `${x}px`;
cursor.style.top = `${y}px`;
tokenTag.style.left = `${x}px`;
tokenTag.style.top = `${y}px`;
trail.style.left = `${x}px`;
trail.style.top = `${y}px`;
reticle.style.left = `${x}px`;
reticle.style.top = `${y}px`;
if (label) {
tokenTag.textContent = label.toUpperCase();
const body = hud.querySelector('.ph-body');
if (body) body.textContent = label;
}
};
window.__phillnetSetReticle = (active) => {
if (active) {
reticle.classList.add('active');
cursor.classList.add('locked');
} else {
reticle.classList.remove('active');
cursor.classList.remove('locked');
}
};
window.__phillnetSetScan = (active) => {
if (active) scanline.classList.add('scanning');
else scanline.classList.remove('scanning');
};
window.__phillnetSetHud = (msg, isMemory = false) => {
if (!msg) return;
if (isMemory) {
const mon = hud.querySelector('#memory-monitor');
mon.innerHTML = `MEM: ${msg}`;
mon.classList.add('active');
setTimeout(() => mon.classList.remove('active'), 2500);
return;
}
const log = hud.querySelector('#activity-log');
const item = document.createElement('div');
item.className = 'activity-item active';
item.textContent = msg;
log.querySelectorAll('.activity-item').forEach(el => el.classList.remove('active'));
log.appendChild(item);
if (log.children.length > 3) log.children[0].remove();
};
window.__phillnetUpdateProgress = (current, total) => {
const bar = hud.querySelector('.hud-progress-fill');
const stp = hud.querySelector('#step-id');
bar.style.width = `${(current / total) * 100}%`;
stp.textContent = `STP ${current}`;
};
window.__phillnetMarkTarget = (element, label) => {
try {
document.querySelectorAll('.phillnet-targeted').forEach(node => node.classList.remove('phillnet-targeted'));
if (element) {
element.classList.add('phillnet-targeted');
if (label) {
const body = hud.querySelector('.ph-body');
if (body) body.textContent = label;
}
setTimeout(() => {
try { element.classList.remove('phillnet-targeted'); } catch (err) {}
}, 1800);
}
} catch (err) {}
};
window.__phillnetFlashKey = (text) => {
keyflash.textContent = text || '';
keyflash.classList.add('show');
setTimeout(() => keyflash.classList.remove('show'), 850);
};
window.__phillnetLabelInteractive = () => {
document.querySelectorAll('.phillnet-interactive-label').forEach(el => el.remove());
const elements = Array.from(document.querySelectorAll('a, button, input, select, textarea, [role=button], [role=link]'));
const labeled = [];
elements.forEach((el, idx) => {
const rect = el.getBoundingClientRect();
if (rect.width > 0 && rect.height > 0 && rect.top >= 0 && rect.top <= window.innerHeight) {
const label = document.createElement('div');
label.className = 'phillnet-interactive-label';
label.textContent = idx;
label.style.left = (rect.left + rect.width) + 'px';
label.style.top = rect.top + 'px';
document.body.appendChild(label);
labeled.push({
idx,
tag: el.tagName,
text: (el.innerText || el.value || '').slice(0, 30),
box: {x: rect.left, y: rect.top, w: rect.width, h: rect.height}
});
}
});
return labeled;
};
window.__phillnetScanComponents = () => {
document.querySelectorAll('.phillnet-component-box').forEach(el => el.remove());
const regions = ['header', 'nav', 'main', 'footer', 'aside', 'section'];
const found = [];
regions.forEach(tag => {
const el = document.querySelector(tag);
if (el) {
const rect = el.getBoundingClientRect();
if (rect.width > 50 && rect.height > 50) {
const box = document.createElement('div');
box.className = 'phillnet-component-box';
box.style.left = rect.left + 'px';
box.style.top = rect.top + 'px';
box.style.width = rect.width + 'px';
box.style.height = rect.height + 'px';
box.innerHTML = `${tag}
`;
document.body.appendChild(box);
found.push({tag, box: {x: rect.left, y: rect.top, w: rect.width, h: rect.height}});
}
}
});
return found;
};
window.__phillnetCleanPage = () => {
const selectors = [
'[id*="consent"]', '[class*="consent"]', '[id*="cookie"]', '[class*="cookie"]',
'[id*="modal"]', '[class*="modal"]', '[class*="overlay"]', '[class*="popup"]',
'.ad-banner', '.newsletter-signup'
];
let cleaned = 0;
selectors.forEach(sel => {
document.querySelectorAll(sel).forEach(el => {
const rect = el.getBoundingClientRect();
if (rect.width > 300 || rect.height > 300) {
el.style.display = 'none';
cleaned++;
}
});
});
return cleaned;
};
window.__phillnetOverlayInstalled = true;
}
"""
def __init__(
self,
*,
headless: bool = False,
timeout_ms: int = 12000,
capture_dir: Optional[str] = None,
enable_ocr: bool = True,
flush_captures_on_init: bool = True,
flush_captures_on_close: bool = True,
capture_keep_last: int = 0,
):
self.headless = bool(headless)
self.timeout_ms = int(timeout_ms)
self.capture_dir = Path(capture_dir or "outputs/captures")
self.capture_dir.mkdir(parents=True, exist_ok=True)
self.enable_ocr = bool(enable_ocr)
self.flush_captures_on_init = bool(flush_captures_on_init)
self.flush_captures_on_close = bool(flush_captures_on_close)
self.capture_keep_last = max(0, int(capture_keep_last))
self._pw = None
self._browser = None
self._page = None
self.history: List[str] = []
self.working_memory: Dict[str, str] = {}
if self.flush_captures_on_init:
self._cleanup_captures()
def _cleanup_captures(self):
try:
files = sorted(
[p for p in self.capture_dir.glob("*.png") if p.is_file()],
key=lambda p: p.stat().st_mtime,
reverse=True,
)
keep = self.capture_keep_last
for stale in files[keep:]:
try:
stale.unlink(missing_ok=True)
except Exception:
pass
except Exception:
pass
def _ensure_page(self):
if self._page is not None:
return self._page
print(f" [!] Launching Stealth Browser (Headless={self.headless})...")
self._pw = sync_playwright().start()
# Use a realistic user agent
self._browser = self._pw.chromium.launch(headless=self.headless)
self._context = self._browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
viewport={"width": 1440, "height": 960}
)
self._page = self._context.new_page()
# Bypass bot detection: hide webdriver
self._page.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
self._page.set_default_timeout(self.timeout_ms)
# PERSISTENT OVERLAY INJECTION
# Inject both style and HUD persistence script as a high-priority init script
full_init_script = f"""
(function() {{
const style = document.createElement('style');
style.textContent = `{self._OVERLAY_STYLE}`;
document.head.appendChild(style);
const installOverlay = () => {{
if (document.getElementById('phillnet-cursor')) return;
const cursor = document.createElement('div');
cursor.id = 'phillnet-cursor';
cursor.innerHTML = '';
document.body.appendChild(cursor);
const hud = document.createElement('div');
hud.id = 'phillnet-hud';
hud.innerHTML = '';
document.body.appendChild(hud);
}};
// Ensure installation on DOM load and any subsequent updates
installOverlay();
const observer = new MutationObserver(installOverlay);
observer.observe(document.body, {{ childList: true, subtree: true }});
document.addEventListener('mousemove', (e) => {{
const cursor = document.getElementById('phillnet-cursor');
if (cursor) {{
cursor.style.left = e.clientX + 'px';
cursor.style.top = e.clientY + 'px';
}}
}});
}})();
"""
self._page.add_init_script(full_init_script)
return self._page
def close(self):
if self._browser is not None:
try:
self._browser.close()
except Exception:
pass
if self._pw is not None:
try:
self._pw.stop()
except Exception:
pass
self._browser = None
self._pw = None
self._page = None
if self.flush_captures_on_close:
self._cleanup_captures()
def __del__(self):
self.close()
def _inject_overlay(self, page, label: str = "Phillnet browser mode active"):
try:
# Note: add_init_script handles the logic injection now,
# we just update the HUD label dynamically here.
page.evaluate("(label) => window.__phillnetSetHud && window.__phillnetSetHud(label)", label)
except Exception:
pass
def _move_cursor_to_locator(self, page, locator, label: str):
try:
bbox = locator.bounding_box()
if bbox:
x = bbox["x"] + (bbox["width"] / 2.0)
y = bbox["y"] + (bbox["height"] / 2.0)
# REFLEX: Skip steps if in high-speed mode
move_steps = 1 if getattr(self, "reflex_mode", False) else 18
page.mouse.move(x, y, steps=move_steps)
if not getattr(self, "reflex_mode", False):
page.evaluate(
"(payload) => window.__phillnetMoveCursor && window.__phillnetMoveCursor(payload.x, payload.y, payload.label)",
{"x": x, "y": y, "label": label},
)
page.wait_for_timeout(120)
except Exception:
pass
def _mark_locator(self, page, locator, label: str):
if getattr(self, "reflex_mode", False): return
try:
locator.evaluate(
"(element, label) => window.__phillnetMarkTarget && window.__phillnetMarkTarget(element, label)",
label,
)
page.wait_for_timeout(90)
except Exception:
pass
def _move_cursor_to_point(self, page, x: float, y: float, label: str):
try:
move_steps = 1 if getattr(self, "reflex_mode", False) else 20
page.mouse.move(x, y, steps=move_steps)
if not getattr(self, "reflex_mode", False):
page.evaluate(
"(payload) => window.__phillnetMoveCursor && window.__phillnetMoveCursor(payload.x, payload.y, payload.label)",
{"x": x, "y": y, "label": label},
)
page.wait_for_timeout(120)
except Exception:
pass
def enable_reflex_mode(self, enabled: bool = True):
"""Toggle low-latency execution path."""
self.reflex_mode = bool(enabled)
mode_str = "REFLEX (Low Latency)" if enabled else "Standard (Rich Visuals)"
print(f"[*] Browser Control Mode: {mode_str}")
def key_down(self, key: str):
page = self._ensure_page()
page.keyboard.down(key)
def key_up(self, key: str):
page = self._ensure_page()
page.keyboard.up(key)
def mouse_move(self, x: float, y: float):
page = self._ensure_page()
page.mouse.move(x, y)
def mouse_down(self):
page = self._ensure_page()
page.mouse.down()
def mouse_up(self):
page = self._ensure_page()
page.mouse.up()
def _candidate_locators_for_target(self, page, target: str):
candidates = []
selectors = [
("role_button", page.get_by_role("button", name=target, exact=False)),
("role_link", page.get_by_role("link", name=target, exact=False)),
("text", page.get_by_text(target, exact=False)),
("anchor_button", page.locator("a,button,[role='button']").filter(has_text=target)),
]
first_word = (target or "").split(" ", 1)[0].strip()
if first_word and first_word.lower() != (target or "").lower():
selectors.append(("first_word", page.locator("a,button,[role='button']").filter(has_text=first_word)))
for name, locator in selectors:
try:
count = min(locator.count(), 6)
except Exception:
count = 0
for index in range(count):
try:
item = locator.nth(index)
text = _compact_text(item.inner_text(), 180)
href = ""
tag = ""
role = ""
html_for = ""
input_type = ""
disabled = False
try:
href = str(item.get_attribute("href") or "")
except Exception:
href = ""
try:
tag = str(item.evaluate("(el) => (el.tagName || '').toLowerCase()") or "")
except Exception:
tag = ""
try:
role = str(item.get_attribute("role") or "")
except Exception:
role = ""
try:
html_for = str(item.get_attribute("for") or "")
except Exception:
html_for = ""
try:
input_type = str(item.get_attribute("type") or "")
except Exception:
input_type = ""
try:
disabled = bool(item.is_disabled())
except Exception:
disabled = False
score = _score_text_target(target, text, href)
if tag in {"a", "button"}:
score += 2.0
if role in {"button", "link", "menuitem", "tab"}:
score += 1.5
if tag in {"input", "textarea", "select"}:
score += 2.5
if input_type in {"submit", "button", "search"}:
score += 1.0
if tag == "label" and html_for:
score -= 2.0
if disabled:
score -= 4.0
if score > 0:
candidates.append((score, name, item))
except Exception:
continue
candidates.sort(key=lambda triplet: triplet[0], reverse=True)
return candidates
def _candidate_inputs_for_prompt(self, page, target: str):
target_n = _normalize_label(target)
candidates = []
locator = page.locator("input,textarea,[contenteditable='true'],[role='textbox']")
try:
count = min(locator.count(), 8)
except Exception:
count = 0
for index in range(count):
try:
item = locator.nth(index)
attrs = []
for name in ("placeholder", "name", "aria-label", "id", "type"):
try:
value = str(item.get_attribute(name) or "").strip()
except Exception:
value = ""
if value:
attrs.append(value)
label_blob = " ".join(attrs)
score = _score_text_target(target_n, label_blob, "")
if score <= 0 and any(term in label_blob.lower() for term in ("search", "query", "find")):
score = 2.5
try:
if item.is_disabled():
score -= 4.0
except Exception:
pass
if score > 0:
candidates.append((score, item))
except Exception:
continue
candidates.sort(key=lambda pair: pair[0], reverse=True)
return candidates
def _iter_click_candidates(self, page, selector: Optional[str], text_target: Optional[str]):
seen = set()
if selector:
yield page.locator(selector).first
return
text_target = (text_target or "").strip()
if not text_target:
return
lowered = text_target.lower()
if any(token in lowered for token in ("textbox", "input", "field", "search box", "search field", "combobox")):
for _, item in self._candidate_inputs_for_prompt(page, text_target):
try:
key = item.evaluate("(el) => (el.tagName || '') + '|' + (el.id || '') + '|' + (el.name || '')")
except Exception:
key = None
if key and key in seen:
continue
if key:
seen.add(key)
yield item
for _, _, item in self._candidate_locators_for_target(page, text_target):
try:
key = item.evaluate("(el) => (el.tagName || '') + '|' + (el.id || '') + '|' + (el.innerText || el.value || '').slice(0,120)")
except Exception:
key = None
if key and key in seen:
continue
if key:
seen.add(key)
yield item
for _, item in self._candidate_inputs_for_prompt(page, text_target):
try:
key = item.evaluate("(el) => (el.tagName || '') + '|' + (el.id || '') + '|' + (el.name || '')")
except Exception:
key = None
if key and key in seen:
continue
if key:
seen.add(key)
yield item
yield page.locator("a,button,input,textarea,[role='button'],[role='link'],[role='textbox']").first
def _click_with_fallback(self, page, candidates, label: str):
errors: List[str] = []
for candidate in candidates:
try:
target_locator = candidate.first if hasattr(candidate, "first") else candidate
target_locator.scroll_into_view_if_needed(timeout=self.timeout_ms)
self._mark_locator(page, target_locator, label)
self._move_cursor_to_locator(page, target_locator, label)
page.wait_for_timeout(80)
target_locator.click(timeout=self.timeout_ms)
page.wait_for_timeout(1000)
return
except Exception as exc:
errors.append(str(exc))
continue
raise RuntimeError("; ".join(errors[:3]) or "no clickable candidate succeeded")
def _flash_key(self, page, key_label: str):
try:
page.evaluate("(text) => window.__phillnetFlashKey && window.__phillnetFlashKey(text)", key_label)
except Exception:
pass
def _search_results_from_page(self, page, max_results: int = 5, max_snippet_chars: int = 280) -> List[Dict[str, str]]:
results: List[Dict[str, str]] = []
title_locators = []
for selector in (
"ol.searchCenterMiddle li div.compTitle h3 a",
"div#web h3 a",
"a.result__a",
"a[data-testid='result-title-a']",
):
locator = page.locator(selector)
if locator.count() > 0:
title_locators = locator.all()[: max_results * 10]
break
if not title_locators:
title_locators = page.locator("a[href]").all()[: max_results * 20]
for anchor in title_locators:
try:
href = _unwrap_yahoo_redirect((anchor.get_attribute("href") or "").strip())
title = _compact_text(anchor.inner_text(), 180)
if not href or not title:
continue
if not href.startswith("http"):
continue
if href.startswith("/") or href.startswith("javascript:"):
continue
parsed_href = urlparse(href)
if parsed_href.netloc.endswith("yahoo.com"):
continue
if any(blocked in href for blocked in (
"search.yahoo.com/search",
"duckduckgo.com",
"bing.com/search",
)):
continue
snippet = ""
try:
parent = anchor.locator(
"xpath=ancestor::li[1] | xpath=ancestor::*[contains(@class,'result')][1] | xpath=ancestor::*[contains(@class,'algo')][1]"
).first
snippet_locator = parent.locator(
".result__snippet, [data-result='snippet'], div.compText p, p.fc-dustygray, p"
)
if snippet_locator.count() > 0:
snippet = _compact_text(snippet_locator.first.inner_text(), max_snippet_chars)
except Exception:
snippet = ""
results.append({"title": title, "url": href, "snippet": snippet})
if len(results) >= max_results:
break
except Exception:
continue
return results
def _capture_accessibility_tree(self, page) -> Any:
try:
snapshot = page.locator("body").aria_snapshot()
if snapshot:
return snapshot
except Exception:
pass
try:
return page.evaluate(
"""
() => {
const nodes = Array.from(document.querySelectorAll('a,button,input,textarea,select,[role],[aria-label],h1,h2,h3,h4,h5,h6')).slice(0, 120);
return nodes.map((el, idx) => ({
index: idx,
tag: el.tagName.toLowerCase(),
role: el.getAttribute('role') || '',
ariaLabel: el.getAttribute('aria-label') || '',
text: (el.innerText || el.value || '').trim().slice(0, 160),
name: (el.getAttribute('name') || '').slice(0, 80),
type: (el.getAttribute('type') || '').slice(0, 40),
href: (el.getAttribute('href') || '').slice(0, 220),
}));
}
"""
)
except Exception:
return []
def _capture_page_analysis(self, page) -> Dict[str, Any]:
try:
return page.evaluate(
"""
() => {
const pick = (selector, mapper, limit=24) =>
Array.from(document.querySelectorAll(selector)).slice(0, limit).map(mapper);
return {
headings: pick('h1,h2,h3,h4', el => (el.innerText || '').trim()).filter(Boolean),
links: pick('a[href]', el => ({
text: (el.innerText || '').trim().slice(0, 120),
href: (el.href || '').slice(0, 220)
})).filter(x => x.text || x.href),
buttons: pick('button,input[type=button],input[type=submit],[role=button]', el => ({
text: (el.innerText || el.value || el.getAttribute('aria-label') || '').trim().slice(0, 120),
})).filter(x => x.text),
forms: pick('form', el => ({
action: (el.getAttribute('action') || '').slice(0, 200),
method: (el.getAttribute('method') || 'get').toLowerCase(),
}), 10),
inputs: pick('input,textarea,select', el => ({
type: (el.getAttribute('type') || el.tagName.toLowerCase()).slice(0, 60),
name: (el.getAttribute('name') || '').slice(0, 120),
placeholder: (el.getAttribute('placeholder') || '').slice(0, 120),
ariaLabel: (el.getAttribute('aria-label') || '').slice(0, 120),
}), 24),
landmarks: pick('header,nav,main,aside,footer,section,[role]', el => ({
tag: el.tagName.toLowerCase(),
role: (el.getAttribute('role') || '').slice(0, 60),
label: (el.getAttribute('aria-label') || '').slice(0, 120),
}), 24),
};
}
"""
)
except Exception:
return {}
def _capture_screenshot(self, page, prefix: str = "browser") -> str:
path = self.capture_dir / f"{prefix}_{int(time.time() * 1000)}_{uuid.uuid4().hex[:8]}.png"
page.screenshot(path=str(path), full_page=True)
return str(path)
def _run_vision_node(self, image_path: str) -> Dict[str, Any]:
"""Breakthrough Vision Node: Extracts deep visual forms from pixels."""
if not self.enable_ocr:
return {"error": "vision nodes disabled"}
# Try advanced vision first, fallback to basic OCR
script_path = Path(__file__).with_name("browser_vision.js")
if not script_path.exists():
script_path = Path(__file__).with_name("browser_ocr.js")
if not script_path.exists():
return {"error": f"vision script missing: {script_path}"}
try:
proc = subprocess.run(
["node", str(script_path), image_path],
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
check=True,
timeout=150,
)
stdout = (proc.stdout or "").strip()
if stdout:
return json.loads(stdout)
return {"error": "vision node produced no output"}
except Exception as exc:
return {"error": str(exc)}
def _compare_visuals(self, path_a: str, path_b: str) -> Dict[str, Any]:
"""Compares two screenshots to detect visual state changes (Visual Latch)."""
# Placeholder for real pixel comparison if needed,
# for now we use file existence and basic metadata
return {
"state_changed": path_a != path_b,
"verification": "Visual state transitioned" if path_a != path_b else "No visual change detected"
}
def _page_payload(self, page, *, action: str, max_chars: int = 1600, include_ocr: bool = False) -> Dict[str, Any]:
text = _compact_text(page.locator("body").inner_text(), max_chars)
# 🔗 High-Speed OODA: Visual injection without blocking delays
try:
# Ensure premium visuals are active
page.evaluate("window.__phillnetLabelInteractive && window.__phillnetLabelInteractive()")
# Small yield for DOM stability
page.wait_for_timeout(20)
except Exception:
pass
screenshot_path = self._capture_screenshot(page, prefix=action)
# Capture semantic metadata after screenshot to overlap compute
labeled_elements = []
semantic_components = []
try:
labeled_elements = page.evaluate("window.__phillnetGetLastLabels && window.__phillnetGetLastLabels()")
semantic_components = page.evaluate("window.__phillnetScanComponents && window.__phillnetScanComponents()")
except Exception:
pass
payload = {
"action": action,
"url": page.url,
"title": page.title(),
"text": text,
"summary": f"{page.title()} | {text}",
"accessibility_tree": self._capture_accessibility_tree(page),
"page_analysis": self._capture_page_analysis(page),
"labeled_elements": labeled_elements,
"semantic_components": semantic_components,
"history": self.history[-5:],
"working_memory": self.working_memory,
"screenshot_path": screenshot_path,
}
if include_ocr:
payload["vision_node_data"] = self._run_vision_node(screenshot_path)
# Legacy compatibility
if "vision_node_data" in payload and "ocr" in payload["vision_node_data"]:
payload["ocr"] = payload["vision_node_data"]["ocr"]
payload["agentic_context"] = {
"current_state": "active",
"can_continue": True,
"system_instruction": "Evaluate if the current result satisfies your objective. If not, chain the next logical tool call."
}
return payload
def search(
self,
query: str,
*,
max_results: int = 5,
max_snippet_chars: int = 280,
include_ocr: bool = False,
) -> Dict[str, Any]:
query = (query or "").strip()
if not query:
return {"action": "search", "query": query, "results": [], "summary": ""}
page = self._ensure_page()
tried = []
results: List[Dict[str, str]] = []
for engine_name, url in (
("google", f"https://www.google.com/search?q={quote_plus(query)}"),
("yahoo", f"https://search.yahoo.com/search?p={quote_plus(query)}"),
("duckduckgo_html", f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"),
):
tried.append(engine_name)
try:
page.goto(url, wait_until="domcontentloaded")
page.wait_for_timeout(1400)
self._inject_overlay(page, f"Phillnet browser_mode: {engine_name} search '{query}'")
results = self._search_results_from_page(page, max_results=max_results, max_snippet_chars=max_snippet_chars)
body_text = page.locator("body").inner_text().lower()
if results:
break
if any(challenge in body_text for challenge in ("confirm you're not a robot", "complete the following challenge", "captcha")):
continue
except Exception:
continue
payload = self._page_payload(page, action="search", max_chars=max_snippet_chars * 4, include_ocr=include_ocr)
payload.update(
{
"query": query,
"search_engines_tried": tried,
"results": results,
"summary": "\n".join(
f"- {item['title']} | {item['snippet']} | {item['url']}" for item in results
) or payload["summary"],
}
)
payload["agentic_context"] = {
"current_state": "active",
"can_continue": True,
"system_instruction": "Evaluate if the current result satisfies your objective. If not, chain the next logical tool call."
}
return payload
def open(self, url: str, *, max_chars: int = 1600, include_ocr: bool = False) -> Dict[str, Any]:
page = self._ensure_page()
page.goto((url or "").strip(), wait_until="domcontentloaded")
page.wait_for_timeout(800)
self._inject_overlay(page, f"Phillnet browser_mode: open {page.url}")
return self._page_payload(page, action="open", max_chars=max_chars, include_ocr=include_ocr)
def click(
self,
url: str = "",
*,
selector: Optional[str] = None,
text_target: Optional[str] = None,
max_chars: int = 1600,
include_ocr: bool = False,
) -> Dict[str, Any]:
page = self._ensure_page()
if url:
page.goto(url.strip(), wait_until="domcontentloaded")
page.wait_for_timeout(800)
self._inject_overlay(page, "Phillnet browser_mode: click")
if selector:
candidates = [page.locator(selector).first]
elif text_target:
candidates = list(self._iter_click_candidates(page, selector=None, text_target=text_target))
else:
raise ValueError("selector or text_target is required for click")
click_label = f"Phillnet clicking {text_target or selector or 'target'}"
# CYBER: Activate Reticle Lock-On
try:
page.evaluate("(active) => window.__phillnetSetReticle && window.__phillnetSetReticle(active)", True)
page.wait_for_timeout(300)
except Exception: pass
self._click_with_fallback(page, candidates, click_label)
try:
page.evaluate("(active) => window.__phillnetSetReticle && window.__phillnetSetReticle(active)", False)
except Exception: pass
payload = self._page_payload(page, action="click", max_chars=max_chars, include_ocr=include_ocr)
if text_target:
payload["text_target"] = text_target
if selector:
payload["selector"] = selector
payload["agentic_context"] = {
"current_state": "active",
"can_continue": True,
"system_instruction": "Evaluate if the current result satisfies your objective. If not, chain the next logical tool call."
}
return payload
def click_label(
self,
label_idx: int,
*,
max_chars: int = 1600,
include_ocr: bool = False,
) -> Dict[str, Any]:
page = self._ensure_page()
self._inject_overlay(page, f"Phillnet browser_mode: click label [{label_idx}]")
# Find element by label index
try:
# We re-query the elements in the same order as __phillnetLabelInteractive
handle = page.evaluate_handle(
f"(idx) => Array.from(document.querySelectorAll('a, button, input, select, textarea, [role=button], [role=link]'))[idx]",
label_idx
)
element = handle.as_element()
if not element:
raise ValueError(f"Label index {label_idx} not found on page")
# Convert element handle to locator or just use the handle for movement/clicking
# Playwright mouse move on element handle?
box = element.bounding_box()
if box:
self._move_cursor_to_point(page, box["x"] + box["width"]/2, box["y"] + box["height"]/2, f"Targeting Label [{label_idx}]")
# CYBER: Activate Reticle Lock-On
page.evaluate("(active) => window.__phillnetSetReticle && window.__phillnetSetReticle(active)", True)
page.wait_for_timeout(200)
element.click()
page.evaluate("(active) => window.__phillnetSetReticle && window.__phillnetSetReticle(active)", False)
page.wait_for_timeout(800)
except Exception as e:
raise RuntimeError(f"Failed to click label {label_idx}: {e}")
return self._page_payload(page, action="click_label", max_chars=max_chars, include_ocr=include_ocr)
def hover_label(
self,
label_idx: int,
*,
max_chars: int = 1600,
include_ocr: bool = False,
) -> Dict[str, Any]:
page = self._ensure_page()
self._inject_overlay(page, f"Phillnet browser_mode: hover label [{label_idx}]")
try:
handle = page.evaluate_handle(
f"(idx) => Array.from(document.querySelectorAll('a, button, input, select, textarea, [role=button], [role=link]'))[idx]",
label_idx
)
element = handle.as_element()
if not element:
raise ValueError(f"Label index {label_idx} not found")
box = element.bounding_box()
if box:
self._move_cursor_to_point(page, box["x"] + box["width"]/2, box["y"] + box["height"]/2, f"Hovering Label [{label_idx}]")
element.hover()
page.wait_for_timeout(400)
except Exception as e:
raise RuntimeError(f"Hover failed: {e}")
return self._page_payload(page, action="hover_label", max_chars=max_chars, include_ocr=include_ocr)
def clean_page(self, *, max_chars: int = 1600, include_ocr: bool = False) -> Dict[str, Any]:
page = self._ensure_page()
self._inject_overlay(page, "Phillnet browser_mode: cleaning frictions")
try:
cleaned_count = page.evaluate("window.__phillnetCleanPage()")
self._flash_key(page, f"CLEANED {cleaned_count} OVERLAYS")
page.wait_for_timeout(600)
except Exception:
pass
return self._page_payload(page, action="clean", max_chars=max_chars, include_ocr=include_ocr)
def type_text(
self,
*,
url: str = "",
selector: Optional[str] = None,
text: str = "",
max_chars: int = 1600,
include_ocr: bool = False,
) -> Dict[str, Any]:
page = self._ensure_page()
if url:
page.goto(url.strip(), wait_until="domcontentloaded")
page.wait_for_timeout(800)
self._inject_overlay(page, "Phillnet browser_mode: type")
if selector:
locator = page.locator(selector).first
elif text:
input_candidates = self._candidate_inputs_for_prompt(page, text)
locator = input_candidates[0][1] if input_candidates else page.locator("input,textarea,[contenteditable='true'],[role='textbox']").first
else:
locator = page.locator("input,textarea,[contenteditable='true'],[role='textbox']").first
try:
locator.scroll_into_view_if_needed(timeout=self.timeout_ms)
except Exception:
pass
self._mark_locator(page, locator, "Phillnet targeting input")
self._move_cursor_to_locator(page, locator, "Phillnet focusing input")
locator.click()
self._flash_key(page, f"TYPE {text[:32]}")
locator.fill("")
locator.type(text, delay=25)
page.wait_for_timeout(500)
return self._page_payload(page, action="type", max_chars=max_chars, include_ocr=include_ocr)
def press_key(
self,
*,
url: str = "",
key: str = "Enter",
max_chars: int = 1600,
include_ocr: bool = False,
) -> Dict[str, Any]:
page = self._ensure_page()
if url:
page.goto(url.strip(), wait_until="domcontentloaded")
page.wait_for_timeout(800)
self._inject_overlay(page, f"Phillnet browser_mode: key {key}")
self._flash_key(page, f"KEY {key}")
page.keyboard.press(key)
page.wait_for_timeout(700)
return self._page_payload(page, action="press", max_chars=max_chars, include_ocr=include_ocr)
def scroll(
self,
*,
url: str = "",
direction: str = "down",
amount: int = 900,
max_chars: int = 1600,
include_ocr: bool = False,
) -> Dict[str, Any]:
page = self._ensure_page()
if url:
page.goto(url.strip(), wait_until="domcontentloaded")
page.wait_for_timeout(800)
self._inject_overlay(page, f"Phillnet browser_mode: scroll {direction}")
amount = max(120, int(amount))
signed_amount = -amount if str(direction).lower().strip() == "up" else amount
try:
viewport = page.viewport_size or {"width": 1280, "height": 900}
center_x = max(120, int(viewport.get("width", 1280) * 0.56))
center_y = max(120, int(viewport.get("height", 900) * 0.58))
except Exception:
center_x, center_y = 720, 480
self._move_cursor_to_point(page, center_x, center_y, f"Scrolling {direction}")
# CYBER: Kinetic Smooth Glide
steps = 15
step_amount = signed_amount / steps
for _ in range(steps):
page.mouse.wheel(0, step_amount)
page.wait_for_timeout(40)
page.wait_for_timeout(400)
payload = self._page_payload(page, action="scroll", max_chars=max_chars, include_ocr=include_ocr)
payload["scroll_direction"] = str(direction)
payload["scroll_amount"] = int(amount)
payload["agentic_context"] = {
"current_state": "active",
"can_continue": True,
"system_instruction": "Evaluate if the current result satisfies your objective. If not, chain the next logical tool call."
}
return payload
def scroll_to_text(
self,
*,
url: str = "",
text_target: str = "",
max_chars: int = 1600,
include_ocr: bool = False,
) -> Dict[str, Any]:
page = self._ensure_page()
if url:
page.goto(url.strip(), wait_until="domcontentloaded")
page.wait_for_timeout(800)
self._inject_overlay(page, "Phillnet browser_mode: scroll to text")
target = (text_target or "").strip()
if not target:
raise ValueError("text_target is required for scroll_to_text")
locator = page.get_by_text(target, exact=False)
if locator.count() == 0:
first_word = target.split(" ", 1)[0].strip()
if first_word:
locator = page.get_by_text(first_word, exact=False)
locator = locator.first
if locator.count() > 0:
self._move_cursor_to_locator(page, locator, f"Scrolling to {target[:48]}")
try:
locator.scroll_into_view_if_needed(timeout=4000)
except Exception:
pass
page.wait_for_timeout(500)
payload = self._page_payload(page, action="scroll_to_text", max_chars=max_chars, include_ocr=include_ocr)
payload["text_target"] = target
payload["agentic_context"] = {
"current_state": "active",
"can_continue": True,
"system_instruction": "Evaluate if the current result satisfies your objective. If not, chain the next logical tool call."
}
return payload
def screenshot(self) -> Dict[str, Any]:
page = self._ensure_page()
self._inject_overlay(page, "Phillnet browser_mode: screenshot")
return {"action": "screenshot", "screenshot_path": self._capture_screenshot(page, prefix="manual")}
def accessibility(self) -> Dict[str, Any]:
page = self._ensure_page()
return {
"action": "accessibility",
"url": page.url,
"title": page.title(),
"accessibility_tree": self._capture_accessibility_tree(page),
"page_analysis": self._capture_page_analysis(page),
}
def analyze(self, *, max_chars: int = 1600, include_ocr: bool = False) -> Dict[str, Any]:
page = self._ensure_page()
# CYBER: Neural Sweep Activation
try:
page.evaluate("(active) => window.__phillnetSetScan && window.__phillnetSetScan(active)", True)
page.wait_for_timeout(800)
except Exception: pass
self._inject_overlay(page, "Phillnet browser_mode: analyze page")
res = self._page_payload(page, action="analyze", max_chars=max_chars, include_ocr=include_ocr)
try:
page.evaluate("(active) => window.__phillnetSetScan && window.__phillnetSetScan(active)", False)
except Exception: pass
return res
def vision(self, *, max_chars: int = 2400) -> Dict[str, Any]:
"""Layered Vision Fallback Engine: Ensures 'Total Awareness' across 5 levels."""
page = self._ensure_page()
self._inject_overlay(page, "Phillnet: gathering total awareness")
self._flash_key(page, "EYE OF THE SWARM ACTIVE")
# Primary Payload Generation
payload = self._page_payload(page, action="vision", max_chars=max_chars, include_ocr=True)
awareness_layers = {}
v_node = payload.get("vision_node_data") or {}
# LEVEL 1: Neural Describer (Ollama/Moondream)
try:
img_path = payload.get("screenshot_path")
if img_path and Path(img_path).exists():
page.evaluate("() => window.__phillnetSetVisionBadge && window.__phillnetSetVisionBadge(true)")
desc = self._describe_image_with_vision_model(img_path)
awareness_layers["level_1_neural_desc"] = desc
except Exception as e:
awareness_layers["level_1_neural_desc"] = f"Layer 1 Failed: {e}"
finally:
try: page.evaluate("() => window.__phillnetSetVisionBadge && window.__phillnetSetVisionBadge(false)")
except: pass
# LEVEL 2: Advanced Visual Forms (Colors, Salience)
try:
colors = v_node.get("colors", [])
salience = v_node.get("salience", [])
color_str = ", ".join([f"{c['hex']} ({int(c['weight']*100)}%)" for c in colors[:3]])
awareness_layers["level_2_visual_forms"] = f"Dominant Palette: {color_str}\nVisual Regions: {len(salience)} quadrants analyzed."
except Exception as e:
awareness_layers["level_2_visual_forms"] = f"Layer 2 Failed: {e}"
# LEVEL 3: Structural Optical Map (Semantic Components + Labels)
try:
labels = payload.get("labeled_elements") or []
components = payload.get("semantic_components") or []
layout_map = []
for comp in components[:5]:
layout_map.append(f"{comp['tag'].upper()} at {comp['box']}")
for lbl in labels[:12]:
layout_map.append(f"[{lbl['idx']}] {lbl['tag']} '{lbl['text']}'")
awareness_layers["level_3_structural_map"] = "\n".join(layout_map)
except Exception as e:
awareness_layers["level_3_structural_map"] = f"Layer 3 Failed: {e}"
# LEVEL 4: OCR (Optical Character Recognition + Word Grounding)
ocr = v_node.get("ocr") or {}
awareness_layers["level_4_pixel_text"] = ocr.get("text", "")[:max_chars // 2] if isinstance(ocr, dict) else ""
# LEVEL 5: Logical A11y Tree (The Fallback of Last Resort)
acc = payload.get("accessibility_tree")
awareness_layers["level_5_logical_tree"] = str(acc)[:max_chars // 2] if acc else ""
# CONSOLIDATED CORTEX OBSERVATION
cortex_parts = ["## TOTAL CORTEX AWARENESS REPORT"]
if "level_1_neural_desc" in awareness_layers and not awareness_layers["level_1_neural_desc"].startswith("Layer 1"):
cortex_parts.append(f"### OPTICAL REASONING\n{awareness_layers['level_1_neural_desc']}")
cortex_parts.append(f"### VISUAL FORMS\n{awareness_layers.get('level_2_visual_forms', '')}")
cortex_parts.append(f"### INTERACTIVE LANDSCAPE\n{awareness_layers.get('level_3_structural_map', '')}")
if len(awareness_layers["level_4_pixel_text"]) > 10:
cortex_parts.append(f"### PIXEL DATA (OCR)\n{awareness_layers['level_4_pixel_text']}")
payload["vision_summary"] = "\n\n".join(cortex_parts)
payload["awareness_layers"] = awareness_layers
# Rate the quality of vision
rating = 0
if not awareness_layers["level_1_neural_desc"].startswith("Layer 1"): rating += 35
if awareness_layers.get("level_2_visual_forms") and "Failed" not in awareness_layers["level_2_visual_forms"]: rating += 15
if awareness_layers.get("level_3_structural_map") and "Failed" not in awareness_layers["level_3_structural_map"]: rating += 25
if awareness_layers.get("level_4_pixel_text"): rating += 15
if awareness_layers.get("level_5_logical_tree"): rating += 10
payload["awareness_rating"] = f"{rating}%"
payload["agentic_context"] = {
"current_state": "active",
"can_continue": True,
"system_instruction": "Evaluate if the current result satisfies your objective. If not, chain the next logical tool call."
}
return payload
def _describe_image_with_vision_model(self, image_path: str) -> str:
"""Calls a local/remote vision model (e.g. moondream via Ollama) to describe the screen."""
try:
import base64
import requests
with open(image_path, "rb") as image_file:
base64_image = base64.b64encode(image_file.read()).decode('utf-8')
# Default to local Ollama with moondream or the currently used model
url = "http://localhost:11434/api/generate"
payload = {
"model": "moondream",
"prompt": "Analyze this screenshot. Describe the main layout, what central elements are visible, and any prominent text or images. Be concise and technical for an AI agent.",
"images": [base64_image],
"stream": False
}
response = requests.post(url, json=payload, timeout=25)
if response.status_code == 200:
return response.json().get("response", "Vision model failed to return a description.")
return f"Ollama Error: {response.status_code}"
except Exception as e:
return f"Vision Analysis Failed: {e}"
def update_progress(self, current: Union[str, int]):
"""Update the HUD progress bar."""
page = self._ensure_page()
try:
val = int(current)
page.evaluate(f"() => window.__phillnetUpdateProgress && window.__phillnetUpdateProgress({val}, 12)")
except Exception:
pass
return {"status": "progress_updated"}
def remember(self, key: str, value: str):
"""Store a fact in the browser's working memory."""
self.working_memory[str(key)] = str(value)
return {"status": "remembered", "memory": self.working_memory}
def run(
self,
*,
action: str,
query: Optional[str] = None,
url: Optional[str] = None,
selector: Optional[str] = None,
text_target: Optional[str] = None,
text: Optional[str] = None,
key: Optional[str] = None,
direction: Optional[str] = None,
amount: Optional[int] = None,
max_results: int = 5,
max_snippet_chars: int = 280,
max_chars: int = 1600,
include_ocr: bool = False,
) -> Dict[str, Any]:
action = (action or "").strip().lower()
self.history.append(f"{action} {query or url or ''}".strip())
if len(self.history) > 20: self.history.pop(0)
try:
if action == "remember":
return self.remember(str(query or ""), str(text or ""))
if action == "set_hud":
page = self._ensure_page()
self._inject_overlay(page, str(query or ""))
return {"status": "hud_updated"}
if action == "update_progress":
page = self._ensure_page()
try:
current = int(query or 0)
page.evaluate(f"() => window.__phillnetUpdateProgress && window.__phillnetUpdateProgress({current}, 12)")
except Exception: pass
return {"status": "progress_updated"}
if action == "search":
return self.search(query or "", max_results=max_results, max_snippet_chars=max_snippet_chars, include_ocr=include_ocr)
if action == "open":
return self.open(url or "", max_chars=max_chars, include_ocr=include_ocr)
if action == "click":
return self.click(url or "", selector=selector, text_target=text_target, max_chars=max_chars, include_ocr=include_ocr)
if action == "type":
return self.type_text(url=url or "", selector=selector, text=text or "", max_chars=max_chars, include_ocr=include_ocr)
if action == "press":
return self.press_key(url=url or "", key=key or "Enter", max_chars=max_chars, include_ocr=include_ocr)
if action == "scroll":
return self.scroll(url=url or "", direction=direction or "down", amount=amount or 900, max_chars=max_chars, include_ocr=include_ocr)
if action == "scroll_to_text":
return self.scroll_to_text(url=url or "", text_target=text_target or text or query or "", max_chars=max_chars, include_ocr=include_ocr)
if action == "screenshot":
return self.screenshot()
if action == "accessibility":
return self.accessibility()
if action == "analyze":
return self.analyze(max_chars=max_chars, include_ocr=include_ocr)
if action == "vision":
return self.vision(max_chars=max_chars)
if action == "ocr":
page = self._ensure_page()
screenshot_path = self._capture_screenshot(page, prefix="ocr")
return {"action": "ocr", "screenshot_path": screenshot_path, "ocr": self._run_ocr(screenshot_path)}
if action == "click_label":
label_idx = int(query or text_target or text or 0)
return self.click_label(label_idx, max_chars=max_chars, include_ocr=include_ocr)
if action == "hover_label":
label_idx = int(query or text_target or text or 0)
return self.hover_label(label_idx, max_chars=max_chars, include_ocr=include_ocr)
if action == "clean":
return self.clean_page(max_chars=max_chars, include_ocr=include_ocr)
raise ValueError(f"Unsupported browser action: {action}")
except PlaywrightTimeoutError as exc:
return {"action": action, "error": f"timeout: {exc}"}
except Exception as exc:
return {"action": action, "error": str(exc)}
class ReflexController:
"""High-speed orchestrator for real-time browser interaction (Games/Reflexes)."""
def __init__(self, browser_tool: PlaywrightBrowserTool):
self.tool = browser_tool
self.last_burst_time = 0
def execute_burst(self, script: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Execute a sequence of reflex actions in a tight loop.
Example script: [{'action': 'mouse_move', 'x': 500, 'y': 500}, {'action': 'mouse_down'}]
"""
self.tool.enable_reflex_mode(True)
start = time.perf_counter()
results = []
try:
for step in script:
action = step.get("action")
if action == "mouse_move":
self.tool.mouse_move(step["x"], step["y"])
elif action == "mouse_down":
self.tool.mouse_down()
elif action == "mouse_up":
self.tool.mouse_up()
elif action == "key_down":
self.tool.key_down(step["key"])
elif action == "key_up":
self.tool.key_up(step["key"])
elif action == "wait":
time.sleep(step.get("ms", 10) / 1000.0)
results.append(action)
except Exception as e:
return {"status": "error", "error": str(e)}
self.last_burst_time = (time.perf_counter() - start) * 1000
return {
"status": "success",
"actions_performed": results,
"latency_ms": f"{self.last_burst_time:.2f}ms"
}