from __future__ import annotations import json import re import subprocess import time import uuid from pathlib import Path from typing import Any, Dict, List, Optional, Union from urllib.parse import quote_plus, unquote, urlparse, parse_qs from playwright.sync_api import TimeoutError as PlaywrightTimeoutError from playwright.sync_api import sync_playwright def _compact_text(text: str, max_chars: int = 1200) -> str: text = re.sub(r"\s+", " ", (text or "")).strip() return text[:max_chars] def _extract_quoted_text(prompt: str) -> Optional[str]: for pattern in (r'"([^"]{1,160})"', r"'([^']{1,160})'"): match = re.search(pattern, prompt) if match: return match.group(1).strip() return None def _extract_click_target(prompt: str) -> Optional[str]: quoted = _extract_quoted_text(prompt or "") if quoted and any(word in (prompt or "").lower() for word in ("click", "open", "select", "follow", "press")): return quoted match = re.search(r"click\s+(?:the\s+)?(?:link|button|tab|menu|item)?\s*([A-Za-z0-9][A-Za-z0-9 _-]{1,80})", prompt or "", flags=re.I) if match: return match.group(1).strip() return None def _extract_open_target(prompt: str) -> Optional[str]: quoted = _extract_quoted_text(prompt or "") lowered = (prompt or "").lower() if quoted and any(word in lowered for word in ("open", "visit", "go to", "browse", "navigate to")): return quoted domain_match = re.search( r"(?:open|visit|go to|browse|navigate to)\s+(?:the\s+)?((?:[A-Za-z0-9-]+\.)+[A-Za-z]{2,})(?:[\\s,;:.!?]|$)", prompt or "", flags=re.I, ) if domain_match: return domain_match.group(1).strip() match = re.search( r"(?:open|visit|go to|browse|navigate to)\s+(?:the\s+)?([A-Za-z0-9][A-Za-z0-9 ._-]{2,120})", prompt or "", flags=re.I, ) if match: return match.group(1).strip() return None def _looks_like_domain(target: str) -> bool: value = (target or "").strip().strip(".,;:!?") if not value or " " in value: return False return bool(re.fullmatch(r"(?:[A-Za-z0-9-]+\.)+[A-Za-z]{2,}", value)) def _extract_type_text(prompt: str) -> Optional[str]: quoted = _extract_quoted_text(prompt or "") if quoted and any(word in (prompt or "").lower() for word in ("type", "enter", "input", "fill", "search for")): return quoted return None def _extract_search_query(prompt: str) -> Optional[str]: quoted = _extract_quoted_text(prompt or "") lowered = (prompt or "").lower() if quoted and any(word in lowered for word in ("search for", "look up", "find", "search")): return quoted match = re.search( r"(?:search for|look up|find)\s+(?:the\s+)?([A-Za-z0-9][A-Za-z0-9 _:/().,#-]{2,120})", prompt or "", flags=re.I, ) if match: return match.group(1).strip() return None def _extract_scroll_direction(prompt: str) -> Optional[str]: p = (prompt or "").lower() if any(token in p for token in ("scroll to top", "back to top", "scroll up", "move up")): return "up" if any(token in p for token in ("scroll to bottom", "scroll down", "move down", "go lower")): return "down" return None def _extract_scroll_target(prompt: str) -> Optional[str]: quoted = _extract_quoted_text(prompt or "") if quoted and any(word in (prompt or "").lower() for word in ("scroll to", "find", "locate", "until you see")): return quoted match = re.search( r"(?:scroll to|until you see|find|locate)\s+(?:the\s+)?([A-Za-z0-9][A-Za-z0-9 _:/().,#-]{2,100})", prompt or "", flags=re.I, ) if match: return match.group(1).strip() return None def _should_follow_search(prompt: str) -> bool: p = (prompt or "").lower() return any( token in p for token in ( "open", "visit", "go to", "inspect", "read", "summarize", "analyze", "explore", "browse", "website", "site", "page", "official", "login", "click", "form", "button", "dashboard", ) ) def _wants_observation(prompt: str) -> bool: p = (prompt or "").lower() return any( token in p for token in ( "what page", "what is open", "what do you see", "summarize", "inspect", "analyze", "report", "title", "url", ) ) def _browser_observation_snippet(payload: Dict[str, Any], max_chars: int = 900) -> str: parts: List[str] = [] action = str(payload.get("action") or "").strip() title = str(payload.get("title") or "").strip() url = str(payload.get("url") or "").strip() error = str(payload.get("error") or "").strip() summary = str(payload.get("summary") or payload.get("text") or "").strip() if action: parts.append(f"ACTION: {action}") if title: parts.append(f"TITLE: {title}") if url: parts.append(f"URL: {url}") if error: parts.append(f"ERROR: {_compact_text(error, max_chars // 2)}") if summary: parts.append(f"SUMMARY: {_compact_text(summary, max_chars)}") acc = payload.get("accessibility_tree") if acc: parts.append(f"ACCESSIBILITY: {_compact_text(str(acc), max_chars // 2)}") ocr = payload.get("ocr") if isinstance(ocr, dict): ocr_text = str(ocr.get("text") or "").strip() if ocr_text: parts.append(f"OCR: {_compact_text(ocr_text, max_chars // 3)}") return "\n".join(parts).strip() def _normalize_label(text: str) -> str: return re.sub(r"\s+", " ", (text or "").strip().lower()) def _score_text_target(target: str, candidate_text: str, candidate_href: str = "") -> float: target_n = _normalize_label(target) candidate_n = _normalize_label(candidate_text) href_n = _normalize_label(candidate_href) if not target_n: return 0.0 score = 0.0 if candidate_n == target_n: score += 10.0 if target_n and target_n in candidate_n: score += 5.0 target_tokens = [token for token in re.split(r"\W+", target_n) if token] candidate_tokens = set(token for token in re.split(r"\W+", candidate_n) if token) href_tokens = set(token for token in re.split(r"\W+", href_n) if token) for token in target_tokens: if token in candidate_tokens: score += 1.5 if token in href_tokens: score += 0.5 return score def run_browser_agentic_sequence( task_prompt: str, invoke_browser_action, *, max_steps: int = 12, observation_chars: int = 1500, ) -> Dict[str, Any]: """ Advanced agentic workflow with loop prevention, self-correction, and vision-guided exploration. """ prompt = (task_prompt or "").strip() if not prompt: return {"mode": "browser_agentic", "steps": [], "summary": "", "done_reason": "empty_prompt"} steps: List[Dict[str, Any]] = [] final_payload: Dict[str, Any] = {} remaining = max(1, int(max_steps)) # LOOP PREVENTION: Track visited states and failed targets visited_states = set() # Set of (url, content_hash) blacklist = set() # Set of (url, label_idx) that did nothing last_action_was_stagnant = False def _record(payload: Dict[str, Any], *, action: str, target: str = ""): observation = _browser_observation_snippet(payload, max_chars=observation_chars) # HUD: Push rich activity update invoke_browser_action(action="set_hud", query=f"{action.upper()}: {target or 'Analyzing'}") steps.append({ "step": len(steps) + 1, "action": action, "target": target, "url": payload.get("url"), "title": payload.get("title"), "observation": observation, "memory": payload.get("working_memory") }) def _get_state_hash(payload: Dict[str, Any]) -> tuple: url = payload.get("url", "") # Create a stable hash of the interactive elements to detect content changes labels = payload.get("labeled_elements") or [] label_fingerprint = "|".join([f"{l['idx']}:{l['tag']}" for l in labels[:15]]) return (url, label_fingerprint) # INITIAL ACTION: Deep Search or Open direct_url = re.search(r"https?://[^\s)>\"]+", prompt) if direct_url: final_payload = invoke_browser_action(action="open", url=direct_url.group(0).rstrip(".,;:!?")) else: final_payload = invoke_browser_action(action="search", query=_compact_text(prompt, 180)) _record(final_payload, action="initial_dispatch") visited_states.add(_get_state_hash(final_payload)) remaining -= 1 # HUD: Initialize Mission Visuals invoke_browser_action(action="set_hud", query=f"Mission: {prompt[:40]}...") while remaining > 0: step_num = int(max_steps - remaining + 1) invoke_browser_action(action="update_progress", query=str(step_num)) current_state = _get_state_hash(final_payload) page_text = str(final_payload.get("text") or "").lower() labels = final_payload.get("labeled_elements") or [] current_url = final_payload.get("url", "") # SELF-CORRECT: If blocked by popups, clean the page if any(token in page_text for token in ("cookie", "consent", "accept our", "subscribe", "sign up")): final_payload = invoke_browser_action(action="clean") _record(final_payload, action="self_correct_cleanup") remaining -= 1 continue # DECISION LOGIC action_to_take = "observe" target_val = "" # 1. Classification: What does the task actually want right now? # Use low-level heuristics to map the overarching task to the current page state click_query = _extract_click_target(prompt) type_query = _extract_type_text(prompt) search_query = _extract_search_query(prompt) open_query = _extract_open_target(prompt) if open_query and current_url == "": # Only jump if we are nowhere action_to_take = "open" target_val = open_query elif type_query and any(l['tag'] in ('INPUT', 'TEXTAREA') for l in labels): # Find the best input box inputs = [l for l in labels if l['tag'] in ('INPUT', 'TEXTAREA')] if inputs: action_to_take = "type" target_val = type_query # Note: In a real system we'd pick the input index, but here we'll assume the primary input elif click_query: targets = [ l for l in labels if _score_text_target(click_query, l['text']) > 4 and (current_url, l['idx']) not in blacklist ] if targets: action_to_take = "click_label" target_val = str(targets[0]['idx']) if action_to_take == "observe": if any(token in prompt.lower() for token in ("scroll", "find", "look for")) or last_action_was_stagnant: action_to_take = "scroll" target_val = _extract_scroll_direction(prompt) or "down" else: action_to_take = "vision" # EXECUTE if action_to_take == "click_label": final_payload = invoke_browser_action(action=action_to_take, query=target_val) elif action_to_take == "type": final_payload = invoke_browser_action(action=action_to_take, text=target_val) elif action_to_take == "open": final_payload = invoke_browser_action(action=action_to_take, url=target_val) elif action_to_take == "scroll": final_payload = invoke_browser_action(action=action_to_take, direction=target_val) else: final_payload = invoke_browser_action(action="vision") _record(final_payload, action=action_to_take, target=target_val) # STAGNATION CHECK (for stateful actions) if action_to_take in ("click_label", "type", "open"): new_state = _get_state_hash(final_payload) if new_state == current_state: if action_to_take == "click_label" and targets: blacklist.add((current_url, targets[0]['idx'])) last_action_was_stagnant = True else: visited_states.add(new_state) last_action_was_stagnant = False remaining -= 1 # Termination condition if len(page_text) > 1200 and "results" not in final_payload: if not any(token in prompt.lower() for token in ("click", "type", "login", "submit")): break invoke_browser_action(action="set_hud", query="MISSION: COMPLETE") invoke_browser_action(action="update_progress", query="12") return { "mode": "browser_agentic", "steps": steps, "summary": "\n\n".join(step["observation"] for step in steps if step.get("observation")), "done_reason": "completed", "final_observation": final_payload, } def _unwrap_yahoo_redirect(url: str) -> str: try: parsed = urlparse(url) if "search.yahoo.com" in parsed.netloc or "r.search.yahoo.com" in parsed.netloc: qs = parse_qs(parsed.query) ru_values = qs.get("RU") or qs.get("ru") if ru_values and ru_values[0]: return unquote(ru_values[0]) match = re.search(r"/RU=([^/]+)/", parsed.path) if match: return unquote(match.group(1)) except Exception: pass return url class PlaywrightBrowserTool: """ Persistent Playwright browser/computer-use wrapper for Phillnet. Supports: - search - open - click - type - press - scroll - scroll_to_text - vision - screenshot - accessibility - ocr """ _OVERLAY_STYLE = """ #phillnet-cursor { position: fixed; width: 32px; height: 32px; border: 1px solid rgba(0, 242, 255, 0.4); border-radius: 50%; background: radial-gradient(circle, rgba(0, 242, 255, 0.1) 0%, transparent 70%); box-shadow: 0 0 20px rgba(0, 242, 255, 0.2); pointer-events: none; transform: translate(-50%, -50%); z-index: 2147483647; transition: left 160ms cubic-bezier(0.19, 1, 0.22, 1), top 160ms cubic-bezier(0.19, 1, 0.22, 1), width 0.2s ease, height 0.2s ease, border-color 0.2s ease; display: flex; align-items: center; justify-content: center; } #phillnet-cursor-inner { width: 6px; height: 6px; background: #00f2ff; border-radius: 50%; box-shadow: 0 0 12px #00f2ff, 0 0 24px rgba(0, 242, 255, 0.6); transition: transform 0.2s ease, background 0.2s ease; } #phillnet-cursor.locked { width: 48px; height: 48px; border-color: #ff00ff; border-width: 2px; box-shadow: 0 0 30px rgba(255, 0, 255, 0.4); } #phillnet-cursor.locked #phillnet-cursor-inner { background: #ff00ff; transform: scale(1.5); box-shadow: 0 0 15px #ff00ff, 0 0 30px rgba(255, 0, 255, 0.8); } #phillnet-cursor-rings { position: absolute; width: 100%; height: 100%; border: 1px solid rgba(0, 242, 255, 0.2); border-radius: 50%; animation: phillnet-pulse 2s infinite; } @keyframes phillnet-pulse { 0% { transform: scale(0.8); opacity: 0.8; } 100% { transform: scale(1.5); opacity: 0; } } #phillnet-token-tag { position: fixed; padding: 4px 10px; background: rgba(15, 23, 42, 0.96); color: #7dd3fc; font: bold 12px 'Consolas', 'Courier New', monospace; border: 1px solid rgba(125, 211, 252, 0.4); border-radius: 6px; pointer-events: none; z-index: 2147483647; white-space: nowrap; transform: translate(18px, 18px); box-shadow: 0 4px 12px rgba(0, 0, 0, 0.45); transition: left 140ms cubic-bezier(0.19, 1, 0.22, 1), top 140ms cubic-bezier(0.19, 1, 0.22, 1); } .phillnet-interactive-label { position: fixed; background: #00f2ff; color: #0f172a; padding: 2px 6px; border-radius: 4px; font: bold 10px 'Segoe UI', system-ui, sans-serif; z-index: 2147483640; pointer-events: none; box-shadow: 0 2px 8px rgba(0, 242, 255, 0.5); transform: translate(-100%, -100%); opacity: 0.9; } .phillnet-component-box { position: fixed; border: 1px dashed rgba(0, 242, 255, 0.25); background: rgba(0, 242, 255, 0.03); pointer-events: none; z-index: 2147483630; } .phillnet-component-label { position: absolute; top: 0; left: 0; background: rgba(0, 242, 255, 0.8); color: #0f172a; padding: 1px 4px; font: bold 9px monospace; text-transform: uppercase; } #phillnet-reticle { position: fixed; width: 60px; height: 60px; border: 1px dashed rgba(0, 242, 255, 0.4); border-radius: 50%; pointer-events: none; z-index: 2147483647; transform: translate(-50%, -50%) scale(2); opacity: 0; transition: transform 300ms ease-out, opacity 300ms ease; } #phillnet-reticle.active { transform: translate(-50%, -50%) scale(1); opacity: 1; } #phillnet-scanline { position: fixed; top: 0; left: 0; width: 100%; height: 2px; background: linear-gradient(90deg, transparent, rgba(0, 242, 255, 0.8), transparent); z-index: 2147483645; pointer-events: none; opacity: 0; } @keyframes phillnet-scan { 0% { top: 0%; } 100% { top: 100%; } } #phillnet-scanline.scanning { opacity: 1; animation: phillnet-scan 2s linear infinite; } #phillnet-trail { position: fixed; width: 6px; height: 6px; border-radius: 50%; background: rgba(0, 242, 255, 0.3); pointer-events: none; transform: translate(-50%, -50%); z-index: 2147483646; } #phillnet-hud { position: fixed; top: 16px; right: 16px; width: 340px; background: rgba(10, 15, 25, 0.94); backdrop-filter: blur(8px); border: 1px solid rgba(0, 242, 255, 0.25); border-radius: 12px; padding: 14px; color: #7dd3fc; font: 11px/1.4 'Consolas', monospace; z-index: 2147483647; pointer-events: none; box-shadow: 0 10px 40px rgba(0,0,0,0.6); display: flex; flex-direction: column; gap: 8px; } .hud-header { display: flex; justify-content: space-between; align-items: center; border-bottom: 1px solid rgba(0, 242, 255, 0.1); padding-bottom: 6px; } .hud-title { font-weight: 800; letter-spacing: 0.1em; color: #00f2ff; text-transform: uppercase; } .hud-progress { height: 2px; background: rgba(0, 242, 255, 0.1); flex-grow: 1; margin: 0 10px; position: relative; } .hud-progress-fill { position: absolute; top: 0; left: 0; height: 100%; background: #00f2ff; transition: width 0.3s ease; } .hud-activity { max-height: 80px; overflow: hidden; display: flex; flex-direction: column-reverse; gap: 3px; } .activity-item { color: #94a3b8; border-left: 2px solid rgba(0, 242, 255, 0.4); padding-left: 6px; opacity: 0.8; animation: ph-fade-in 0.3s ease-out; } .activity-item.active { color: #00f2ff; opacity: 1; } .hud-memory { background: rgba(0, 242, 255, 0.05); border-radius: 6px; padding: 6px; display: none; } .hud-memory.active { display: block; } .mem-tag { color: #38bdf8; font-weight: bold; margin-right: 4px; } @keyframes ph-fade-in { from { opacity: 0; transform: translateX(5px); } to { opacity: 1; transform: translateX(0); } } #phillnet-keyflash { position: fixed; bottom: 24px; right: 24px; padding: 12px 18px; background: rgba(0, 242, 255, 0.9); color: #0f172a; font: bold 14px 'Consolas', monospace; border-radius: 12px; opacity: 0; transform: translateY(10px); transition: opacity 160ms ease, transform 160ms ease; z-index: 2147483647; pointer-events: none; box-shadow: 0 0 20px rgba(0, 242, 255, 0.4); } #phillnet-keyflash.show { opacity: 1; transform: translateY(0); } .phillnet-targeted { outline: 2px solid #00f2ff !important; outline-offset: 4px !important; box-shadow: 0 0 15px rgba(0, 242, 255, 0.4) !important; scroll-margin: 150px !important; } #phillnet-vision-badge { position: fixed; bottom: 80px; right: 24px; padding: 6px 12px; background: rgba(15, 23, 42, 0.9); color: #00f2ff; font: bold 10px monospace; border: 1px solid rgba(0, 242, 255, 0.3); border-radius: 4px; z-index: 2147483647; text-transform: uppercase; opacity: 0; transition: opacity 300ms ease; } #phillnet-vision-badge.active { opacity: 1; } """ _OVERLAY_SCRIPT = """ () => { if (window.__phillnetOverlayInstalled) return; const style = document.createElement('style'); style.textContent = `%STYLE%`; document.head.appendChild(style); const cursor = document.createElement('div'); cursor.id = 'phillnet-cursor'; cursor.innerHTML = '
'; cursor.style.left = '18px'; cursor.style.top = '18px'; const trail = document.createElement('div'); trail.id = 'phillnet-trail'; trail.style.left = '18px'; trail.style.top = '18px'; const tokenTag = document.createElement('div'); tokenTag.id = 'phillnet-token-tag'; tokenTag.textContent = 'PHILLNET'; const hud = document.createElement('div'); hud.id = 'phillnet-hud'; hud.innerHTML = `
PHILLNET NEURAL
STP 0
Initializing Neural Cortex...
`; const reticle = document.createElement('div'); reticle.id = 'phillnet-reticle'; const scanline = document.createElement('div'); scanline.id = 'phillnet-scanline'; const keyflash = document.createElement('div'); keyflash.id = 'phillnet-keyflash'; const visionBadge = document.createElement('div'); visionBadge.id = 'phillnet-vision-badge'; visionBadge.textContent = 'NEURAL VISION ACTIVE'; document.body.appendChild(trail); document.body.appendChild(cursor); document.body.appendChild(tokenTag); document.body.appendChild(hud); document.body.appendChild(reticle); document.body.appendChild(scanline); document.body.appendChild(keyflash); document.body.appendChild(visionBadge); window.__phillnetSetVisionBadge = (active) => { if (active) visionBadge.classList.add('active'); else visionBadge.classList.remove('active'); }; window.__phillnetMoveCursor = (x, y, label) => { cursor.style.left = `${x}px`; cursor.style.top = `${y}px`; tokenTag.style.left = `${x}px`; tokenTag.style.top = `${y}px`; trail.style.left = `${x}px`; trail.style.top = `${y}px`; reticle.style.left = `${x}px`; reticle.style.top = `${y}px`; if (label) { tokenTag.textContent = label.toUpperCase(); const body = hud.querySelector('.ph-body'); if (body) body.textContent = label; } }; window.__phillnetSetReticle = (active) => { if (active) { reticle.classList.add('active'); cursor.classList.add('locked'); } else { reticle.classList.remove('active'); cursor.classList.remove('locked'); } }; window.__phillnetSetScan = (active) => { if (active) scanline.classList.add('scanning'); else scanline.classList.remove('scanning'); }; window.__phillnetSetHud = (msg, isMemory = false) => { if (!msg) return; if (isMemory) { const mon = hud.querySelector('#memory-monitor'); mon.innerHTML = `MEM: ${msg}`; mon.classList.add('active'); setTimeout(() => mon.classList.remove('active'), 2500); return; } const log = hud.querySelector('#activity-log'); const item = document.createElement('div'); item.className = 'activity-item active'; item.textContent = msg; log.querySelectorAll('.activity-item').forEach(el => el.classList.remove('active')); log.appendChild(item); if (log.children.length > 3) log.children[0].remove(); }; window.__phillnetUpdateProgress = (current, total) => { const bar = hud.querySelector('.hud-progress-fill'); const stp = hud.querySelector('#step-id'); bar.style.width = `${(current / total) * 100}%`; stp.textContent = `STP ${current}`; }; window.__phillnetMarkTarget = (element, label) => { try { document.querySelectorAll('.phillnet-targeted').forEach(node => node.classList.remove('phillnet-targeted')); if (element) { element.classList.add('phillnet-targeted'); if (label) { const body = hud.querySelector('.ph-body'); if (body) body.textContent = label; } setTimeout(() => { try { element.classList.remove('phillnet-targeted'); } catch (err) {} }, 1800); } } catch (err) {} }; window.__phillnetFlashKey = (text) => { keyflash.textContent = text || ''; keyflash.classList.add('show'); setTimeout(() => keyflash.classList.remove('show'), 850); }; window.__phillnetLabelInteractive = () => { document.querySelectorAll('.phillnet-interactive-label').forEach(el => el.remove()); const elements = Array.from(document.querySelectorAll('a, button, input, select, textarea, [role=button], [role=link]')); const labeled = []; elements.forEach((el, idx) => { const rect = el.getBoundingClientRect(); if (rect.width > 0 && rect.height > 0 && rect.top >= 0 && rect.top <= window.innerHeight) { const label = document.createElement('div'); label.className = 'phillnet-interactive-label'; label.textContent = idx; label.style.left = (rect.left + rect.width) + 'px'; label.style.top = rect.top + 'px'; document.body.appendChild(label); labeled.push({ idx, tag: el.tagName, text: (el.innerText || el.value || '').slice(0, 30), box: {x: rect.left, y: rect.top, w: rect.width, h: rect.height} }); } }); return labeled; }; window.__phillnetScanComponents = () => { document.querySelectorAll('.phillnet-component-box').forEach(el => el.remove()); const regions = ['header', 'nav', 'main', 'footer', 'aside', 'section']; const found = []; regions.forEach(tag => { const el = document.querySelector(tag); if (el) { const rect = el.getBoundingClientRect(); if (rect.width > 50 && rect.height > 50) { const box = document.createElement('div'); box.className = 'phillnet-component-box'; box.style.left = rect.left + 'px'; box.style.top = rect.top + 'px'; box.style.width = rect.width + 'px'; box.style.height = rect.height + 'px'; box.innerHTML = `
${tag}
`; document.body.appendChild(box); found.push({tag, box: {x: rect.left, y: rect.top, w: rect.width, h: rect.height}}); } } }); return found; }; window.__phillnetCleanPage = () => { const selectors = [ '[id*="consent"]', '[class*="consent"]', '[id*="cookie"]', '[class*="cookie"]', '[id*="modal"]', '[class*="modal"]', '[class*="overlay"]', '[class*="popup"]', '.ad-banner', '.newsletter-signup' ]; let cleaned = 0; selectors.forEach(sel => { document.querySelectorAll(sel).forEach(el => { const rect = el.getBoundingClientRect(); if (rect.width > 300 || rect.height > 300) { el.style.display = 'none'; cleaned++; } }); }); return cleaned; }; window.__phillnetOverlayInstalled = true; } """ def __init__( self, *, headless: bool = False, timeout_ms: int = 12000, capture_dir: Optional[str] = None, enable_ocr: bool = True, flush_captures_on_init: bool = True, flush_captures_on_close: bool = True, capture_keep_last: int = 0, ): self.headless = bool(headless) self.timeout_ms = int(timeout_ms) self.capture_dir = Path(capture_dir or "outputs/captures") self.capture_dir.mkdir(parents=True, exist_ok=True) self.enable_ocr = bool(enable_ocr) self.flush_captures_on_init = bool(flush_captures_on_init) self.flush_captures_on_close = bool(flush_captures_on_close) self.capture_keep_last = max(0, int(capture_keep_last)) self._pw = None self._browser = None self._page = None self.history: List[str] = [] self.working_memory: Dict[str, str] = {} if self.flush_captures_on_init: self._cleanup_captures() def _cleanup_captures(self): try: files = sorted( [p for p in self.capture_dir.glob("*.png") if p.is_file()], key=lambda p: p.stat().st_mtime, reverse=True, ) keep = self.capture_keep_last for stale in files[keep:]: try: stale.unlink(missing_ok=True) except Exception: pass except Exception: pass def _ensure_page(self): if self._page is not None: return self._page print(f" [!] Launching Stealth Browser (Headless={self.headless})...") self._pw = sync_playwright().start() # Use a realistic user agent self._browser = self._pw.chromium.launch(headless=self.headless) self._context = self._browser.new_context( user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", viewport={"width": 1440, "height": 960} ) self._page = self._context.new_page() # Bypass bot detection: hide webdriver self._page.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") self._page.set_default_timeout(self.timeout_ms) # PERSISTENT OVERLAY INJECTION # Inject both style and HUD persistence script as a high-priority init script full_init_script = f""" (function() {{ const style = document.createElement('style'); style.textContent = `{self._OVERLAY_STYLE}`; document.head.appendChild(style); const installOverlay = () => {{ if (document.getElementById('phillnet-cursor')) return; const cursor = document.createElement('div'); cursor.id = 'phillnet-cursor'; cursor.innerHTML = '
'; document.body.appendChild(cursor); const hud = document.createElement('div'); hud.id = 'phillnet-hud'; hud.innerHTML = '
PHILLNET NEURAL
'; document.body.appendChild(hud); }}; // Ensure installation on DOM load and any subsequent updates installOverlay(); const observer = new MutationObserver(installOverlay); observer.observe(document.body, {{ childList: true, subtree: true }}); document.addEventListener('mousemove', (e) => {{ const cursor = document.getElementById('phillnet-cursor'); if (cursor) {{ cursor.style.left = e.clientX + 'px'; cursor.style.top = e.clientY + 'px'; }} }}); }})(); """ self._page.add_init_script(full_init_script) return self._page def close(self): if self._browser is not None: try: self._browser.close() except Exception: pass if self._pw is not None: try: self._pw.stop() except Exception: pass self._browser = None self._pw = None self._page = None if self.flush_captures_on_close: self._cleanup_captures() def __del__(self): self.close() def _inject_overlay(self, page, label: str = "Phillnet browser mode active"): try: # Note: add_init_script handles the logic injection now, # we just update the HUD label dynamically here. page.evaluate("(label) => window.__phillnetSetHud && window.__phillnetSetHud(label)", label) except Exception: pass def _move_cursor_to_locator(self, page, locator, label: str): try: bbox = locator.bounding_box() if bbox: x = bbox["x"] + (bbox["width"] / 2.0) y = bbox["y"] + (bbox["height"] / 2.0) # REFLEX: Skip steps if in high-speed mode move_steps = 1 if getattr(self, "reflex_mode", False) else 18 page.mouse.move(x, y, steps=move_steps) if not getattr(self, "reflex_mode", False): page.evaluate( "(payload) => window.__phillnetMoveCursor && window.__phillnetMoveCursor(payload.x, payload.y, payload.label)", {"x": x, "y": y, "label": label}, ) page.wait_for_timeout(120) except Exception: pass def _mark_locator(self, page, locator, label: str): if getattr(self, "reflex_mode", False): return try: locator.evaluate( "(element, label) => window.__phillnetMarkTarget && window.__phillnetMarkTarget(element, label)", label, ) page.wait_for_timeout(90) except Exception: pass def _move_cursor_to_point(self, page, x: float, y: float, label: str): try: move_steps = 1 if getattr(self, "reflex_mode", False) else 20 page.mouse.move(x, y, steps=move_steps) if not getattr(self, "reflex_mode", False): page.evaluate( "(payload) => window.__phillnetMoveCursor && window.__phillnetMoveCursor(payload.x, payload.y, payload.label)", {"x": x, "y": y, "label": label}, ) page.wait_for_timeout(120) except Exception: pass def enable_reflex_mode(self, enabled: bool = True): """Toggle low-latency execution path.""" self.reflex_mode = bool(enabled) mode_str = "REFLEX (Low Latency)" if enabled else "Standard (Rich Visuals)" print(f"[*] Browser Control Mode: {mode_str}") def key_down(self, key: str): page = self._ensure_page() page.keyboard.down(key) def key_up(self, key: str): page = self._ensure_page() page.keyboard.up(key) def mouse_move(self, x: float, y: float): page = self._ensure_page() page.mouse.move(x, y) def mouse_down(self): page = self._ensure_page() page.mouse.down() def mouse_up(self): page = self._ensure_page() page.mouse.up() def _candidate_locators_for_target(self, page, target: str): candidates = [] selectors = [ ("role_button", page.get_by_role("button", name=target, exact=False)), ("role_link", page.get_by_role("link", name=target, exact=False)), ("text", page.get_by_text(target, exact=False)), ("anchor_button", page.locator("a,button,[role='button']").filter(has_text=target)), ] first_word = (target or "").split(" ", 1)[0].strip() if first_word and first_word.lower() != (target or "").lower(): selectors.append(("first_word", page.locator("a,button,[role='button']").filter(has_text=first_word))) for name, locator in selectors: try: count = min(locator.count(), 6) except Exception: count = 0 for index in range(count): try: item = locator.nth(index) text = _compact_text(item.inner_text(), 180) href = "" tag = "" role = "" html_for = "" input_type = "" disabled = False try: href = str(item.get_attribute("href") or "") except Exception: href = "" try: tag = str(item.evaluate("(el) => (el.tagName || '').toLowerCase()") or "") except Exception: tag = "" try: role = str(item.get_attribute("role") or "") except Exception: role = "" try: html_for = str(item.get_attribute("for") or "") except Exception: html_for = "" try: input_type = str(item.get_attribute("type") or "") except Exception: input_type = "" try: disabled = bool(item.is_disabled()) except Exception: disabled = False score = _score_text_target(target, text, href) if tag in {"a", "button"}: score += 2.0 if role in {"button", "link", "menuitem", "tab"}: score += 1.5 if tag in {"input", "textarea", "select"}: score += 2.5 if input_type in {"submit", "button", "search"}: score += 1.0 if tag == "label" and html_for: score -= 2.0 if disabled: score -= 4.0 if score > 0: candidates.append((score, name, item)) except Exception: continue candidates.sort(key=lambda triplet: triplet[0], reverse=True) return candidates def _candidate_inputs_for_prompt(self, page, target: str): target_n = _normalize_label(target) candidates = [] locator = page.locator("input,textarea,[contenteditable='true'],[role='textbox']") try: count = min(locator.count(), 8) except Exception: count = 0 for index in range(count): try: item = locator.nth(index) attrs = [] for name in ("placeholder", "name", "aria-label", "id", "type"): try: value = str(item.get_attribute(name) or "").strip() except Exception: value = "" if value: attrs.append(value) label_blob = " ".join(attrs) score = _score_text_target(target_n, label_blob, "") if score <= 0 and any(term in label_blob.lower() for term in ("search", "query", "find")): score = 2.5 try: if item.is_disabled(): score -= 4.0 except Exception: pass if score > 0: candidates.append((score, item)) except Exception: continue candidates.sort(key=lambda pair: pair[0], reverse=True) return candidates def _iter_click_candidates(self, page, selector: Optional[str], text_target: Optional[str]): seen = set() if selector: yield page.locator(selector).first return text_target = (text_target or "").strip() if not text_target: return lowered = text_target.lower() if any(token in lowered for token in ("textbox", "input", "field", "search box", "search field", "combobox")): for _, item in self._candidate_inputs_for_prompt(page, text_target): try: key = item.evaluate("(el) => (el.tagName || '') + '|' + (el.id || '') + '|' + (el.name || '')") except Exception: key = None if key and key in seen: continue if key: seen.add(key) yield item for _, _, item in self._candidate_locators_for_target(page, text_target): try: key = item.evaluate("(el) => (el.tagName || '') + '|' + (el.id || '') + '|' + (el.innerText || el.value || '').slice(0,120)") except Exception: key = None if key and key in seen: continue if key: seen.add(key) yield item for _, item in self._candidate_inputs_for_prompt(page, text_target): try: key = item.evaluate("(el) => (el.tagName || '') + '|' + (el.id || '') + '|' + (el.name || '')") except Exception: key = None if key and key in seen: continue if key: seen.add(key) yield item yield page.locator("a,button,input,textarea,[role='button'],[role='link'],[role='textbox']").first def _click_with_fallback(self, page, candidates, label: str): errors: List[str] = [] for candidate in candidates: try: target_locator = candidate.first if hasattr(candidate, "first") else candidate target_locator.scroll_into_view_if_needed(timeout=self.timeout_ms) self._mark_locator(page, target_locator, label) self._move_cursor_to_locator(page, target_locator, label) page.wait_for_timeout(80) target_locator.click(timeout=self.timeout_ms) page.wait_for_timeout(1000) return except Exception as exc: errors.append(str(exc)) continue raise RuntimeError("; ".join(errors[:3]) or "no clickable candidate succeeded") def _flash_key(self, page, key_label: str): try: page.evaluate("(text) => window.__phillnetFlashKey && window.__phillnetFlashKey(text)", key_label) except Exception: pass def _search_results_from_page(self, page, max_results: int = 5, max_snippet_chars: int = 280) -> List[Dict[str, str]]: results: List[Dict[str, str]] = [] title_locators = [] for selector in ( "ol.searchCenterMiddle li div.compTitle h3 a", "div#web h3 a", "a.result__a", "a[data-testid='result-title-a']", ): locator = page.locator(selector) if locator.count() > 0: title_locators = locator.all()[: max_results * 10] break if not title_locators: title_locators = page.locator("a[href]").all()[: max_results * 20] for anchor in title_locators: try: href = _unwrap_yahoo_redirect((anchor.get_attribute("href") or "").strip()) title = _compact_text(anchor.inner_text(), 180) if not href or not title: continue if not href.startswith("http"): continue if href.startswith("/") or href.startswith("javascript:"): continue parsed_href = urlparse(href) if parsed_href.netloc.endswith("yahoo.com"): continue if any(blocked in href for blocked in ( "search.yahoo.com/search", "duckduckgo.com", "bing.com/search", )): continue snippet = "" try: parent = anchor.locator( "xpath=ancestor::li[1] | xpath=ancestor::*[contains(@class,'result')][1] | xpath=ancestor::*[contains(@class,'algo')][1]" ).first snippet_locator = parent.locator( ".result__snippet, [data-result='snippet'], div.compText p, p.fc-dustygray, p" ) if snippet_locator.count() > 0: snippet = _compact_text(snippet_locator.first.inner_text(), max_snippet_chars) except Exception: snippet = "" results.append({"title": title, "url": href, "snippet": snippet}) if len(results) >= max_results: break except Exception: continue return results def _capture_accessibility_tree(self, page) -> Any: try: snapshot = page.locator("body").aria_snapshot() if snapshot: return snapshot except Exception: pass try: return page.evaluate( """ () => { const nodes = Array.from(document.querySelectorAll('a,button,input,textarea,select,[role],[aria-label],h1,h2,h3,h4,h5,h6')).slice(0, 120); return nodes.map((el, idx) => ({ index: idx, tag: el.tagName.toLowerCase(), role: el.getAttribute('role') || '', ariaLabel: el.getAttribute('aria-label') || '', text: (el.innerText || el.value || '').trim().slice(0, 160), name: (el.getAttribute('name') || '').slice(0, 80), type: (el.getAttribute('type') || '').slice(0, 40), href: (el.getAttribute('href') || '').slice(0, 220), })); } """ ) except Exception: return [] def _capture_page_analysis(self, page) -> Dict[str, Any]: try: return page.evaluate( """ () => { const pick = (selector, mapper, limit=24) => Array.from(document.querySelectorAll(selector)).slice(0, limit).map(mapper); return { headings: pick('h1,h2,h3,h4', el => (el.innerText || '').trim()).filter(Boolean), links: pick('a[href]', el => ({ text: (el.innerText || '').trim().slice(0, 120), href: (el.href || '').slice(0, 220) })).filter(x => x.text || x.href), buttons: pick('button,input[type=button],input[type=submit],[role=button]', el => ({ text: (el.innerText || el.value || el.getAttribute('aria-label') || '').trim().slice(0, 120), })).filter(x => x.text), forms: pick('form', el => ({ action: (el.getAttribute('action') || '').slice(0, 200), method: (el.getAttribute('method') || 'get').toLowerCase(), }), 10), inputs: pick('input,textarea,select', el => ({ type: (el.getAttribute('type') || el.tagName.toLowerCase()).slice(0, 60), name: (el.getAttribute('name') || '').slice(0, 120), placeholder: (el.getAttribute('placeholder') || '').slice(0, 120), ariaLabel: (el.getAttribute('aria-label') || '').slice(0, 120), }), 24), landmarks: pick('header,nav,main,aside,footer,section,[role]', el => ({ tag: el.tagName.toLowerCase(), role: (el.getAttribute('role') || '').slice(0, 60), label: (el.getAttribute('aria-label') || '').slice(0, 120), }), 24), }; } """ ) except Exception: return {} def _capture_screenshot(self, page, prefix: str = "browser") -> str: path = self.capture_dir / f"{prefix}_{int(time.time() * 1000)}_{uuid.uuid4().hex[:8]}.png" page.screenshot(path=str(path), full_page=True) return str(path) def _run_vision_node(self, image_path: str) -> Dict[str, Any]: """Breakthrough Vision Node: Extracts deep visual forms from pixels.""" if not self.enable_ocr: return {"error": "vision nodes disabled"} # Try advanced vision first, fallback to basic OCR script_path = Path(__file__).with_name("browser_vision.js") if not script_path.exists(): script_path = Path(__file__).with_name("browser_ocr.js") if not script_path.exists(): return {"error": f"vision script missing: {script_path}"} try: proc = subprocess.run( ["node", str(script_path), image_path], capture_output=True, text=True, encoding="utf-8", errors="replace", check=True, timeout=150, ) stdout = (proc.stdout or "").strip() if stdout: return json.loads(stdout) return {"error": "vision node produced no output"} except Exception as exc: return {"error": str(exc)} def _compare_visuals(self, path_a: str, path_b: str) -> Dict[str, Any]: """Compares two screenshots to detect visual state changes (Visual Latch).""" # Placeholder for real pixel comparison if needed, # for now we use file existence and basic metadata return { "state_changed": path_a != path_b, "verification": "Visual state transitioned" if path_a != path_b else "No visual change detected" } def _page_payload(self, page, *, action: str, max_chars: int = 1600, include_ocr: bool = False) -> Dict[str, Any]: text = _compact_text(page.locator("body").inner_text(), max_chars) # 🔗 High-Speed OODA: Visual injection without blocking delays try: # Ensure premium visuals are active page.evaluate("window.__phillnetLabelInteractive && window.__phillnetLabelInteractive()") # Small yield for DOM stability page.wait_for_timeout(20) except Exception: pass screenshot_path = self._capture_screenshot(page, prefix=action) # Capture semantic metadata after screenshot to overlap compute labeled_elements = [] semantic_components = [] try: labeled_elements = page.evaluate("window.__phillnetGetLastLabels && window.__phillnetGetLastLabels()") semantic_components = page.evaluate("window.__phillnetScanComponents && window.__phillnetScanComponents()") except Exception: pass payload = { "action": action, "url": page.url, "title": page.title(), "text": text, "summary": f"{page.title()} | {text}", "accessibility_tree": self._capture_accessibility_tree(page), "page_analysis": self._capture_page_analysis(page), "labeled_elements": labeled_elements, "semantic_components": semantic_components, "history": self.history[-5:], "working_memory": self.working_memory, "screenshot_path": screenshot_path, } if include_ocr: payload["vision_node_data"] = self._run_vision_node(screenshot_path) # Legacy compatibility if "vision_node_data" in payload and "ocr" in payload["vision_node_data"]: payload["ocr"] = payload["vision_node_data"]["ocr"] payload["agentic_context"] = { "current_state": "active", "can_continue": True, "system_instruction": "Evaluate if the current result satisfies your objective. If not, chain the next logical tool call." } return payload def search( self, query: str, *, max_results: int = 5, max_snippet_chars: int = 280, include_ocr: bool = False, ) -> Dict[str, Any]: query = (query or "").strip() if not query: return {"action": "search", "query": query, "results": [], "summary": ""} page = self._ensure_page() tried = [] results: List[Dict[str, str]] = [] for engine_name, url in ( ("google", f"https://www.google.com/search?q={quote_plus(query)}"), ("yahoo", f"https://search.yahoo.com/search?p={quote_plus(query)}"), ("duckduckgo_html", f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"), ): tried.append(engine_name) try: page.goto(url, wait_until="domcontentloaded") page.wait_for_timeout(1400) self._inject_overlay(page, f"Phillnet browser_mode: {engine_name} search '{query}'") results = self._search_results_from_page(page, max_results=max_results, max_snippet_chars=max_snippet_chars) body_text = page.locator("body").inner_text().lower() if results: break if any(challenge in body_text for challenge in ("confirm you're not a robot", "complete the following challenge", "captcha")): continue except Exception: continue payload = self._page_payload(page, action="search", max_chars=max_snippet_chars * 4, include_ocr=include_ocr) payload.update( { "query": query, "search_engines_tried": tried, "results": results, "summary": "\n".join( f"- {item['title']} | {item['snippet']} | {item['url']}" for item in results ) or payload["summary"], } ) payload["agentic_context"] = { "current_state": "active", "can_continue": True, "system_instruction": "Evaluate if the current result satisfies your objective. If not, chain the next logical tool call." } return payload def open(self, url: str, *, max_chars: int = 1600, include_ocr: bool = False) -> Dict[str, Any]: page = self._ensure_page() page.goto((url or "").strip(), wait_until="domcontentloaded") page.wait_for_timeout(800) self._inject_overlay(page, f"Phillnet browser_mode: open {page.url}") return self._page_payload(page, action="open", max_chars=max_chars, include_ocr=include_ocr) def click( self, url: str = "", *, selector: Optional[str] = None, text_target: Optional[str] = None, max_chars: int = 1600, include_ocr: bool = False, ) -> Dict[str, Any]: page = self._ensure_page() if url: page.goto(url.strip(), wait_until="domcontentloaded") page.wait_for_timeout(800) self._inject_overlay(page, "Phillnet browser_mode: click") if selector: candidates = [page.locator(selector).first] elif text_target: candidates = list(self._iter_click_candidates(page, selector=None, text_target=text_target)) else: raise ValueError("selector or text_target is required for click") click_label = f"Phillnet clicking {text_target or selector or 'target'}" # CYBER: Activate Reticle Lock-On try: page.evaluate("(active) => window.__phillnetSetReticle && window.__phillnetSetReticle(active)", True) page.wait_for_timeout(300) except Exception: pass self._click_with_fallback(page, candidates, click_label) try: page.evaluate("(active) => window.__phillnetSetReticle && window.__phillnetSetReticle(active)", False) except Exception: pass payload = self._page_payload(page, action="click", max_chars=max_chars, include_ocr=include_ocr) if text_target: payload["text_target"] = text_target if selector: payload["selector"] = selector payload["agentic_context"] = { "current_state": "active", "can_continue": True, "system_instruction": "Evaluate if the current result satisfies your objective. If not, chain the next logical tool call." } return payload def click_label( self, label_idx: int, *, max_chars: int = 1600, include_ocr: bool = False, ) -> Dict[str, Any]: page = self._ensure_page() self._inject_overlay(page, f"Phillnet browser_mode: click label [{label_idx}]") # Find element by label index try: # We re-query the elements in the same order as __phillnetLabelInteractive handle = page.evaluate_handle( f"(idx) => Array.from(document.querySelectorAll('a, button, input, select, textarea, [role=button], [role=link]'))[idx]", label_idx ) element = handle.as_element() if not element: raise ValueError(f"Label index {label_idx} not found on page") # Convert element handle to locator or just use the handle for movement/clicking # Playwright mouse move on element handle? box = element.bounding_box() if box: self._move_cursor_to_point(page, box["x"] + box["width"]/2, box["y"] + box["height"]/2, f"Targeting Label [{label_idx}]") # CYBER: Activate Reticle Lock-On page.evaluate("(active) => window.__phillnetSetReticle && window.__phillnetSetReticle(active)", True) page.wait_for_timeout(200) element.click() page.evaluate("(active) => window.__phillnetSetReticle && window.__phillnetSetReticle(active)", False) page.wait_for_timeout(800) except Exception as e: raise RuntimeError(f"Failed to click label {label_idx}: {e}") return self._page_payload(page, action="click_label", max_chars=max_chars, include_ocr=include_ocr) def hover_label( self, label_idx: int, *, max_chars: int = 1600, include_ocr: bool = False, ) -> Dict[str, Any]: page = self._ensure_page() self._inject_overlay(page, f"Phillnet browser_mode: hover label [{label_idx}]") try: handle = page.evaluate_handle( f"(idx) => Array.from(document.querySelectorAll('a, button, input, select, textarea, [role=button], [role=link]'))[idx]", label_idx ) element = handle.as_element() if not element: raise ValueError(f"Label index {label_idx} not found") box = element.bounding_box() if box: self._move_cursor_to_point(page, box["x"] + box["width"]/2, box["y"] + box["height"]/2, f"Hovering Label [{label_idx}]") element.hover() page.wait_for_timeout(400) except Exception as e: raise RuntimeError(f"Hover failed: {e}") return self._page_payload(page, action="hover_label", max_chars=max_chars, include_ocr=include_ocr) def clean_page(self, *, max_chars: int = 1600, include_ocr: bool = False) -> Dict[str, Any]: page = self._ensure_page() self._inject_overlay(page, "Phillnet browser_mode: cleaning frictions") try: cleaned_count = page.evaluate("window.__phillnetCleanPage()") self._flash_key(page, f"CLEANED {cleaned_count} OVERLAYS") page.wait_for_timeout(600) except Exception: pass return self._page_payload(page, action="clean", max_chars=max_chars, include_ocr=include_ocr) def type_text( self, *, url: str = "", selector: Optional[str] = None, text: str = "", max_chars: int = 1600, include_ocr: bool = False, ) -> Dict[str, Any]: page = self._ensure_page() if url: page.goto(url.strip(), wait_until="domcontentloaded") page.wait_for_timeout(800) self._inject_overlay(page, "Phillnet browser_mode: type") if selector: locator = page.locator(selector).first elif text: input_candidates = self._candidate_inputs_for_prompt(page, text) locator = input_candidates[0][1] if input_candidates else page.locator("input,textarea,[contenteditable='true'],[role='textbox']").first else: locator = page.locator("input,textarea,[contenteditable='true'],[role='textbox']").first try: locator.scroll_into_view_if_needed(timeout=self.timeout_ms) except Exception: pass self._mark_locator(page, locator, "Phillnet targeting input") self._move_cursor_to_locator(page, locator, "Phillnet focusing input") locator.click() self._flash_key(page, f"TYPE {text[:32]}") locator.fill("") locator.type(text, delay=25) page.wait_for_timeout(500) return self._page_payload(page, action="type", max_chars=max_chars, include_ocr=include_ocr) def press_key( self, *, url: str = "", key: str = "Enter", max_chars: int = 1600, include_ocr: bool = False, ) -> Dict[str, Any]: page = self._ensure_page() if url: page.goto(url.strip(), wait_until="domcontentloaded") page.wait_for_timeout(800) self._inject_overlay(page, f"Phillnet browser_mode: key {key}") self._flash_key(page, f"KEY {key}") page.keyboard.press(key) page.wait_for_timeout(700) return self._page_payload(page, action="press", max_chars=max_chars, include_ocr=include_ocr) def scroll( self, *, url: str = "", direction: str = "down", amount: int = 900, max_chars: int = 1600, include_ocr: bool = False, ) -> Dict[str, Any]: page = self._ensure_page() if url: page.goto(url.strip(), wait_until="domcontentloaded") page.wait_for_timeout(800) self._inject_overlay(page, f"Phillnet browser_mode: scroll {direction}") amount = max(120, int(amount)) signed_amount = -amount if str(direction).lower().strip() == "up" else amount try: viewport = page.viewport_size or {"width": 1280, "height": 900} center_x = max(120, int(viewport.get("width", 1280) * 0.56)) center_y = max(120, int(viewport.get("height", 900) * 0.58)) except Exception: center_x, center_y = 720, 480 self._move_cursor_to_point(page, center_x, center_y, f"Scrolling {direction}") # CYBER: Kinetic Smooth Glide steps = 15 step_amount = signed_amount / steps for _ in range(steps): page.mouse.wheel(0, step_amount) page.wait_for_timeout(40) page.wait_for_timeout(400) payload = self._page_payload(page, action="scroll", max_chars=max_chars, include_ocr=include_ocr) payload["scroll_direction"] = str(direction) payload["scroll_amount"] = int(amount) payload["agentic_context"] = { "current_state": "active", "can_continue": True, "system_instruction": "Evaluate if the current result satisfies your objective. If not, chain the next logical tool call." } return payload def scroll_to_text( self, *, url: str = "", text_target: str = "", max_chars: int = 1600, include_ocr: bool = False, ) -> Dict[str, Any]: page = self._ensure_page() if url: page.goto(url.strip(), wait_until="domcontentloaded") page.wait_for_timeout(800) self._inject_overlay(page, "Phillnet browser_mode: scroll to text") target = (text_target or "").strip() if not target: raise ValueError("text_target is required for scroll_to_text") locator = page.get_by_text(target, exact=False) if locator.count() == 0: first_word = target.split(" ", 1)[0].strip() if first_word: locator = page.get_by_text(first_word, exact=False) locator = locator.first if locator.count() > 0: self._move_cursor_to_locator(page, locator, f"Scrolling to {target[:48]}") try: locator.scroll_into_view_if_needed(timeout=4000) except Exception: pass page.wait_for_timeout(500) payload = self._page_payload(page, action="scroll_to_text", max_chars=max_chars, include_ocr=include_ocr) payload["text_target"] = target payload["agentic_context"] = { "current_state": "active", "can_continue": True, "system_instruction": "Evaluate if the current result satisfies your objective. If not, chain the next logical tool call." } return payload def screenshot(self) -> Dict[str, Any]: page = self._ensure_page() self._inject_overlay(page, "Phillnet browser_mode: screenshot") return {"action": "screenshot", "screenshot_path": self._capture_screenshot(page, prefix="manual")} def accessibility(self) -> Dict[str, Any]: page = self._ensure_page() return { "action": "accessibility", "url": page.url, "title": page.title(), "accessibility_tree": self._capture_accessibility_tree(page), "page_analysis": self._capture_page_analysis(page), } def analyze(self, *, max_chars: int = 1600, include_ocr: bool = False) -> Dict[str, Any]: page = self._ensure_page() # CYBER: Neural Sweep Activation try: page.evaluate("(active) => window.__phillnetSetScan && window.__phillnetSetScan(active)", True) page.wait_for_timeout(800) except Exception: pass self._inject_overlay(page, "Phillnet browser_mode: analyze page") res = self._page_payload(page, action="analyze", max_chars=max_chars, include_ocr=include_ocr) try: page.evaluate("(active) => window.__phillnetSetScan && window.__phillnetSetScan(active)", False) except Exception: pass return res def vision(self, *, max_chars: int = 2400) -> Dict[str, Any]: """Layered Vision Fallback Engine: Ensures 'Total Awareness' across 5 levels.""" page = self._ensure_page() self._inject_overlay(page, "Phillnet: gathering total awareness") self._flash_key(page, "EYE OF THE SWARM ACTIVE") # Primary Payload Generation payload = self._page_payload(page, action="vision", max_chars=max_chars, include_ocr=True) awareness_layers = {} v_node = payload.get("vision_node_data") or {} # LEVEL 1: Neural Describer (Ollama/Moondream) try: img_path = payload.get("screenshot_path") if img_path and Path(img_path).exists(): page.evaluate("() => window.__phillnetSetVisionBadge && window.__phillnetSetVisionBadge(true)") desc = self._describe_image_with_vision_model(img_path) awareness_layers["level_1_neural_desc"] = desc except Exception as e: awareness_layers["level_1_neural_desc"] = f"Layer 1 Failed: {e}" finally: try: page.evaluate("() => window.__phillnetSetVisionBadge && window.__phillnetSetVisionBadge(false)") except: pass # LEVEL 2: Advanced Visual Forms (Colors, Salience) try: colors = v_node.get("colors", []) salience = v_node.get("salience", []) color_str = ", ".join([f"{c['hex']} ({int(c['weight']*100)}%)" for c in colors[:3]]) awareness_layers["level_2_visual_forms"] = f"Dominant Palette: {color_str}\nVisual Regions: {len(salience)} quadrants analyzed." except Exception as e: awareness_layers["level_2_visual_forms"] = f"Layer 2 Failed: {e}" # LEVEL 3: Structural Optical Map (Semantic Components + Labels) try: labels = payload.get("labeled_elements") or [] components = payload.get("semantic_components") or [] layout_map = [] for comp in components[:5]: layout_map.append(f"{comp['tag'].upper()} at {comp['box']}") for lbl in labels[:12]: layout_map.append(f"[{lbl['idx']}] {lbl['tag']} '{lbl['text']}'") awareness_layers["level_3_structural_map"] = "\n".join(layout_map) except Exception as e: awareness_layers["level_3_structural_map"] = f"Layer 3 Failed: {e}" # LEVEL 4: OCR (Optical Character Recognition + Word Grounding) ocr = v_node.get("ocr") or {} awareness_layers["level_4_pixel_text"] = ocr.get("text", "")[:max_chars // 2] if isinstance(ocr, dict) else "" # LEVEL 5: Logical A11y Tree (The Fallback of Last Resort) acc = payload.get("accessibility_tree") awareness_layers["level_5_logical_tree"] = str(acc)[:max_chars // 2] if acc else "" # CONSOLIDATED CORTEX OBSERVATION cortex_parts = ["## TOTAL CORTEX AWARENESS REPORT"] if "level_1_neural_desc" in awareness_layers and not awareness_layers["level_1_neural_desc"].startswith("Layer 1"): cortex_parts.append(f"### OPTICAL REASONING\n{awareness_layers['level_1_neural_desc']}") cortex_parts.append(f"### VISUAL FORMS\n{awareness_layers.get('level_2_visual_forms', '')}") cortex_parts.append(f"### INTERACTIVE LANDSCAPE\n{awareness_layers.get('level_3_structural_map', '')}") if len(awareness_layers["level_4_pixel_text"]) > 10: cortex_parts.append(f"### PIXEL DATA (OCR)\n{awareness_layers['level_4_pixel_text']}") payload["vision_summary"] = "\n\n".join(cortex_parts) payload["awareness_layers"] = awareness_layers # Rate the quality of vision rating = 0 if not awareness_layers["level_1_neural_desc"].startswith("Layer 1"): rating += 35 if awareness_layers.get("level_2_visual_forms") and "Failed" not in awareness_layers["level_2_visual_forms"]: rating += 15 if awareness_layers.get("level_3_structural_map") and "Failed" not in awareness_layers["level_3_structural_map"]: rating += 25 if awareness_layers.get("level_4_pixel_text"): rating += 15 if awareness_layers.get("level_5_logical_tree"): rating += 10 payload["awareness_rating"] = f"{rating}%" payload["agentic_context"] = { "current_state": "active", "can_continue": True, "system_instruction": "Evaluate if the current result satisfies your objective. If not, chain the next logical tool call." } return payload def _describe_image_with_vision_model(self, image_path: str) -> str: """Calls a local/remote vision model (e.g. moondream via Ollama) to describe the screen.""" try: import base64 import requests with open(image_path, "rb") as image_file: base64_image = base64.b64encode(image_file.read()).decode('utf-8') # Default to local Ollama with moondream or the currently used model url = "http://localhost:11434/api/generate" payload = { "model": "moondream", "prompt": "Analyze this screenshot. Describe the main layout, what central elements are visible, and any prominent text or images. Be concise and technical for an AI agent.", "images": [base64_image], "stream": False } response = requests.post(url, json=payload, timeout=25) if response.status_code == 200: return response.json().get("response", "Vision model failed to return a description.") return f"Ollama Error: {response.status_code}" except Exception as e: return f"Vision Analysis Failed: {e}" def update_progress(self, current: Union[str, int]): """Update the HUD progress bar.""" page = self._ensure_page() try: val = int(current) page.evaluate(f"() => window.__phillnetUpdateProgress && window.__phillnetUpdateProgress({val}, 12)") except Exception: pass return {"status": "progress_updated"} def remember(self, key: str, value: str): """Store a fact in the browser's working memory.""" self.working_memory[str(key)] = str(value) return {"status": "remembered", "memory": self.working_memory} def run( self, *, action: str, query: Optional[str] = None, url: Optional[str] = None, selector: Optional[str] = None, text_target: Optional[str] = None, text: Optional[str] = None, key: Optional[str] = None, direction: Optional[str] = None, amount: Optional[int] = None, max_results: int = 5, max_snippet_chars: int = 280, max_chars: int = 1600, include_ocr: bool = False, ) -> Dict[str, Any]: action = (action or "").strip().lower() self.history.append(f"{action} {query or url or ''}".strip()) if len(self.history) > 20: self.history.pop(0) try: if action == "remember": return self.remember(str(query or ""), str(text or "")) if action == "set_hud": page = self._ensure_page() self._inject_overlay(page, str(query or "")) return {"status": "hud_updated"} if action == "update_progress": page = self._ensure_page() try: current = int(query or 0) page.evaluate(f"() => window.__phillnetUpdateProgress && window.__phillnetUpdateProgress({current}, 12)") except Exception: pass return {"status": "progress_updated"} if action == "search": return self.search(query or "", max_results=max_results, max_snippet_chars=max_snippet_chars, include_ocr=include_ocr) if action == "open": return self.open(url or "", max_chars=max_chars, include_ocr=include_ocr) if action == "click": return self.click(url or "", selector=selector, text_target=text_target, max_chars=max_chars, include_ocr=include_ocr) if action == "type": return self.type_text(url=url or "", selector=selector, text=text or "", max_chars=max_chars, include_ocr=include_ocr) if action == "press": return self.press_key(url=url or "", key=key or "Enter", max_chars=max_chars, include_ocr=include_ocr) if action == "scroll": return self.scroll(url=url or "", direction=direction or "down", amount=amount or 900, max_chars=max_chars, include_ocr=include_ocr) if action == "scroll_to_text": return self.scroll_to_text(url=url or "", text_target=text_target or text or query or "", max_chars=max_chars, include_ocr=include_ocr) if action == "screenshot": return self.screenshot() if action == "accessibility": return self.accessibility() if action == "analyze": return self.analyze(max_chars=max_chars, include_ocr=include_ocr) if action == "vision": return self.vision(max_chars=max_chars) if action == "ocr": page = self._ensure_page() screenshot_path = self._capture_screenshot(page, prefix="ocr") return {"action": "ocr", "screenshot_path": screenshot_path, "ocr": self._run_ocr(screenshot_path)} if action == "click_label": label_idx = int(query or text_target or text or 0) return self.click_label(label_idx, max_chars=max_chars, include_ocr=include_ocr) if action == "hover_label": label_idx = int(query or text_target or text or 0) return self.hover_label(label_idx, max_chars=max_chars, include_ocr=include_ocr) if action == "clean": return self.clean_page(max_chars=max_chars, include_ocr=include_ocr) raise ValueError(f"Unsupported browser action: {action}") except PlaywrightTimeoutError as exc: return {"action": action, "error": f"timeout: {exc}"} except Exception as exc: return {"action": action, "error": str(exc)} class ReflexController: """High-speed orchestrator for real-time browser interaction (Games/Reflexes).""" def __init__(self, browser_tool: PlaywrightBrowserTool): self.tool = browser_tool self.last_burst_time = 0 def execute_burst(self, script: List[Dict[str, Any]]) -> Dict[str, Any]: """ Execute a sequence of reflex actions in a tight loop. Example script: [{'action': 'mouse_move', 'x': 500, 'y': 500}, {'action': 'mouse_down'}] """ self.tool.enable_reflex_mode(True) start = time.perf_counter() results = [] try: for step in script: action = step.get("action") if action == "mouse_move": self.tool.mouse_move(step["x"], step["y"]) elif action == "mouse_down": self.tool.mouse_down() elif action == "mouse_up": self.tool.mouse_up() elif action == "key_down": self.tool.key_down(step["key"]) elif action == "key_up": self.tool.key_up(step["key"]) elif action == "wait": time.sleep(step.get("ms", 10) / 1000.0) results.append(action) except Exception as e: return {"status": "error", "error": str(e)} self.last_burst_time = (time.perf_counter() - start) * 1000 return { "status": "success", "actions_performed": results, "latency_ms": f"{self.last_burst_time:.2f}ms" }