Text Generation
Transformers
Diffusers
Safetensors
English
gpt_oss
phillnet-2
gpt-oss
multimodal
image-generation
video-generation
speech
audio
custom-code
conversational
custom_code
Instructions to use ayjays132/Phillnet-2 with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use ayjays132/Phillnet-2 with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("text-generation", model="ayjays132/Phillnet-2", trust_remote_code=True) messages = [ {"role": "user", "content": "Who are you?"}, ] pipe(messages)# Load model directly from transformers import AutoTokenizer, AutoModelForCausalLM tokenizer = AutoTokenizer.from_pretrained("ayjays132/Phillnet-2", trust_remote_code=True) model = AutoModelForCausalLM.from_pretrained("ayjays132/Phillnet-2", trust_remote_code=True) messages = [ {"role": "user", "content": "Who are you?"}, ] inputs = tokenizer.apply_chat_template( messages, add_generation_prompt=True, tokenize=True, return_dict=True, return_tensors="pt", ).to(model.device) outputs = model.generate(**inputs, max_new_tokens=40) print(tokenizer.decode(outputs[0][inputs["input_ids"].shape[-1]:])) - Notebooks
- Google Colab
- Kaggle
- Local Apps
- vLLM
How to use ayjays132/Phillnet-2 with vLLM:
Install from pip and serve model
# Install vLLM from pip: pip install vllm # Start the vLLM server: vllm serve "ayjays132/Phillnet-2" # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:8000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "ayjays132/Phillnet-2", "messages": [ { "role": "user", "content": "What is the capital of France?" } ] }'Use Docker
docker model run hf.co/ayjays132/Phillnet-2
- SGLang
How to use ayjays132/Phillnet-2 with SGLang:
Install from pip and serve model
# Install SGLang from pip: pip install sglang # Start the SGLang server: python3 -m sglang.launch_server \ --model-path "ayjays132/Phillnet-2" \ --host 0.0.0.0 \ --port 30000 # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:30000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "ayjays132/Phillnet-2", "messages": [ { "role": "user", "content": "What is the capital of France?" } ] }'Use Docker images
docker run --gpus all \ --shm-size 32g \ -p 30000:30000 \ -v ~/.cache/huggingface:/root/.cache/huggingface \ --env "HF_TOKEN=<secret>" \ --ipc=host \ lmsysorg/sglang:latest \ python3 -m sglang.launch_server \ --model-path "ayjays132/Phillnet-2" \ --host 0.0.0.0 \ --port 30000 # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:30000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "ayjays132/Phillnet-2", "messages": [ { "role": "user", "content": "What is the capital of France?" } ] }' - Docker Model Runner
How to use ayjays132/Phillnet-2 with Docker Model Runner:
docker model run hf.co/ayjays132/Phillnet-2
| from __future__ import annotations | |
| import json | |
| import re | |
| import subprocess | |
| import time | |
| import uuid | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Union | |
| from urllib.parse import quote_plus, unquote, urlparse, parse_qs | |
| from playwright.sync_api import TimeoutError as PlaywrightTimeoutError | |
| from playwright.sync_api import sync_playwright | |
| def _compact_text(text: str, max_chars: int = 1200) -> str: | |
| text = re.sub(r"\s+", " ", (text or "")).strip() | |
| return text[:max_chars] | |
| def _extract_quoted_text(prompt: str) -> Optional[str]: | |
| for pattern in (r'"([^"]{1,160})"', r"'([^']{1,160})'"): | |
| match = re.search(pattern, prompt) | |
| if match: | |
| return match.group(1).strip() | |
| return None | |
| def _extract_click_target(prompt: str) -> Optional[str]: | |
| quoted = _extract_quoted_text(prompt or "") | |
| if quoted and any(word in (prompt or "").lower() for word in ("click", "open", "select", "follow", "press")): | |
| return quoted | |
| match = re.search(r"click\s+(?:the\s+)?(?:link|button|tab|menu|item)?\s*([A-Za-z0-9][A-Za-z0-9 _-]{1,80})", prompt or "", flags=re.I) | |
| if match: | |
| return match.group(1).strip() | |
| return None | |
| def _extract_open_target(prompt: str) -> Optional[str]: | |
| quoted = _extract_quoted_text(prompt or "") | |
| lowered = (prompt or "").lower() | |
| if quoted and any(word in lowered for word in ("open", "visit", "go to", "browse", "navigate to")): | |
| return quoted | |
| domain_match = re.search( | |
| r"(?:open|visit|go to|browse|navigate to)\s+(?:the\s+)?((?:[A-Za-z0-9-]+\.)+[A-Za-z]{2,})(?:[\\s,;:.!?]|$)", | |
| prompt or "", | |
| flags=re.I, | |
| ) | |
| if domain_match: | |
| return domain_match.group(1).strip() | |
| match = re.search( | |
| r"(?:open|visit|go to|browse|navigate to)\s+(?:the\s+)?([A-Za-z0-9][A-Za-z0-9 ._-]{2,120})", | |
| prompt or "", | |
| flags=re.I, | |
| ) | |
| if match: | |
| return match.group(1).strip() | |
| return None | |
| def _looks_like_domain(target: str) -> bool: | |
| value = (target or "").strip().strip(".,;:!?") | |
| if not value or " " in value: | |
| return False | |
| return bool(re.fullmatch(r"(?:[A-Za-z0-9-]+\.)+[A-Za-z]{2,}", value)) | |
| def _extract_type_text(prompt: str) -> Optional[str]: | |
| quoted = _extract_quoted_text(prompt or "") | |
| if quoted and any(word in (prompt or "").lower() for word in ("type", "enter", "input", "fill", "search for")): | |
| return quoted | |
| return None | |
| def _extract_search_query(prompt: str) -> Optional[str]: | |
| quoted = _extract_quoted_text(prompt or "") | |
| lowered = (prompt or "").lower() | |
| if quoted and any(word in lowered for word in ("search for", "look up", "find", "search")): | |
| return quoted | |
| match = re.search( | |
| r"(?:search for|look up|find)\s+(?:the\s+)?([A-Za-z0-9][A-Za-z0-9 _:/().,#-]{2,120})", | |
| prompt or "", | |
| flags=re.I, | |
| ) | |
| if match: | |
| return match.group(1).strip() | |
| return None | |
| def _extract_scroll_direction(prompt: str) -> Optional[str]: | |
| p = (prompt or "").lower() | |
| if any(token in p for token in ("scroll to top", "back to top", "scroll up", "move up")): | |
| return "up" | |
| if any(token in p for token in ("scroll to bottom", "scroll down", "move down", "go lower")): | |
| return "down" | |
| return None | |
| def _extract_scroll_target(prompt: str) -> Optional[str]: | |
| quoted = _extract_quoted_text(prompt or "") | |
| if quoted and any(word in (prompt or "").lower() for word in ("scroll to", "find", "locate", "until you see")): | |
| return quoted | |
| match = re.search( | |
| r"(?:scroll to|until you see|find|locate)\s+(?:the\s+)?([A-Za-z0-9][A-Za-z0-9 _:/().,#-]{2,100})", | |
| prompt or "", | |
| flags=re.I, | |
| ) | |
| if match: | |
| return match.group(1).strip() | |
| return None | |
| def _should_follow_search(prompt: str) -> bool: | |
| p = (prompt or "").lower() | |
| return any( | |
| token in p | |
| for token in ( | |
| "open", "visit", "go to", "inspect", "read", "summarize", "analyze", | |
| "explore", "browse", "website", "site", "page", "official", "login", | |
| "click", "form", "button", "dashboard", | |
| ) | |
| ) | |
| def _wants_observation(prompt: str) -> bool: | |
| p = (prompt or "").lower() | |
| return any( | |
| token in p | |
| for token in ( | |
| "what page", | |
| "what is open", | |
| "what do you see", | |
| "summarize", | |
| "inspect", | |
| "analyze", | |
| "report", | |
| "title", | |
| "url", | |
| ) | |
| ) | |
| def _browser_observation_snippet(payload: Dict[str, Any], max_chars: int = 900) -> str: | |
| parts: List[str] = [] | |
| action = str(payload.get("action") or "").strip() | |
| title = str(payload.get("title") or "").strip() | |
| url = str(payload.get("url") or "").strip() | |
| error = str(payload.get("error") or "").strip() | |
| summary = str(payload.get("summary") or payload.get("text") or "").strip() | |
| if action: | |
| parts.append(f"ACTION: {action}") | |
| if title: | |
| parts.append(f"TITLE: {title}") | |
| if url: | |
| parts.append(f"URL: {url}") | |
| if error: | |
| parts.append(f"ERROR: {_compact_text(error, max_chars // 2)}") | |
| if summary: | |
| parts.append(f"SUMMARY: {_compact_text(summary, max_chars)}") | |
| acc = payload.get("accessibility_tree") | |
| if acc: | |
| parts.append(f"ACCESSIBILITY: {_compact_text(str(acc), max_chars // 2)}") | |
| ocr = payload.get("ocr") | |
| if isinstance(ocr, dict): | |
| ocr_text = str(ocr.get("text") or "").strip() | |
| if ocr_text: | |
| parts.append(f"OCR: {_compact_text(ocr_text, max_chars // 3)}") | |
| return "\n".join(parts).strip() | |
| def _normalize_label(text: str) -> str: | |
| return re.sub(r"\s+", " ", (text or "").strip().lower()) | |
| def _score_text_target(target: str, candidate_text: str, candidate_href: str = "") -> float: | |
| target_n = _normalize_label(target) | |
| candidate_n = _normalize_label(candidate_text) | |
| href_n = _normalize_label(candidate_href) | |
| if not target_n: | |
| return 0.0 | |
| score = 0.0 | |
| if candidate_n == target_n: | |
| score += 10.0 | |
| if target_n and target_n in candidate_n: | |
| score += 5.0 | |
| target_tokens = [token for token in re.split(r"\W+", target_n) if token] | |
| candidate_tokens = set(token for token in re.split(r"\W+", candidate_n) if token) | |
| href_tokens = set(token for token in re.split(r"\W+", href_n) if token) | |
| for token in target_tokens: | |
| if token in candidate_tokens: | |
| score += 1.5 | |
| if token in href_tokens: | |
| score += 0.5 | |
| return score | |
| def run_browser_agentic_sequence( | |
| task_prompt: str, | |
| invoke_browser_action, | |
| *, | |
| max_steps: int = 12, | |
| observation_chars: int = 1500, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Advanced agentic workflow with loop prevention, self-correction, and vision-guided exploration. | |
| """ | |
| prompt = (task_prompt or "").strip() | |
| if not prompt: | |
| return {"mode": "browser_agentic", "steps": [], "summary": "", "done_reason": "empty_prompt"} | |
| steps: List[Dict[str, Any]] = [] | |
| final_payload: Dict[str, Any] = {} | |
| remaining = max(1, int(max_steps)) | |
| # LOOP PREVENTION: Track visited states and failed targets | |
| visited_states = set() # Set of (url, content_hash) | |
| blacklist = set() # Set of (url, label_idx) that did nothing | |
| last_action_was_stagnant = False | |
| def _record(payload: Dict[str, Any], *, action: str, target: str = ""): | |
| observation = _browser_observation_snippet(payload, max_chars=observation_chars) | |
| # HUD: Push rich activity update | |
| invoke_browser_action(action="set_hud", query=f"{action.upper()}: {target or 'Analyzing'}") | |
| steps.append({ | |
| "step": len(steps) + 1, | |
| "action": action, | |
| "target": target, | |
| "url": payload.get("url"), | |
| "title": payload.get("title"), | |
| "observation": observation, | |
| "memory": payload.get("working_memory") | |
| }) | |
| def _get_state_hash(payload: Dict[str, Any]) -> tuple: | |
| url = payload.get("url", "") | |
| # Create a stable hash of the interactive elements to detect content changes | |
| labels = payload.get("labeled_elements") or [] | |
| label_fingerprint = "|".join([f"{l['idx']}:{l['tag']}" for l in labels[:15]]) | |
| return (url, label_fingerprint) | |
| # INITIAL ACTION: Deep Search or Open | |
| direct_url = re.search(r"https?://[^\s)>\"]+", prompt) | |
| if direct_url: | |
| final_payload = invoke_browser_action(action="open", url=direct_url.group(0).rstrip(".,;:!?")) | |
| else: | |
| final_payload = invoke_browser_action(action="search", query=_compact_text(prompt, 180)) | |
| _record(final_payload, action="initial_dispatch") | |
| visited_states.add(_get_state_hash(final_payload)) | |
| remaining -= 1 | |
| # HUD: Initialize Mission Visuals | |
| invoke_browser_action(action="set_hud", query=f"Mission: {prompt[:40]}...") | |
| while remaining > 0: | |
| step_num = int(max_steps - remaining + 1) | |
| invoke_browser_action(action="update_progress", query=str(step_num)) | |
| current_state = _get_state_hash(final_payload) | |
| page_text = str(final_payload.get("text") or "").lower() | |
| labels = final_payload.get("labeled_elements") or [] | |
| current_url = final_payload.get("url", "") | |
| # SELF-CORRECT: If blocked by popups, clean the page | |
| if any(token in page_text for token in ("cookie", "consent", "accept our", "subscribe", "sign up")): | |
| final_payload = invoke_browser_action(action="clean") | |
| _record(final_payload, action="self_correct_cleanup") | |
| remaining -= 1 | |
| continue | |
| # DECISION LOGIC | |
| action_to_take = "observe" | |
| target_val = "" | |
| # 1. Classification: What does the task actually want right now? | |
| # Use low-level heuristics to map the overarching task to the current page state | |
| click_query = _extract_click_target(prompt) | |
| type_query = _extract_type_text(prompt) | |
| search_query = _extract_search_query(prompt) | |
| open_query = _extract_open_target(prompt) | |
| if open_query and current_url == "": # Only jump if we are nowhere | |
| action_to_take = "open" | |
| target_val = open_query | |
| elif type_query and any(l['tag'] in ('INPUT', 'TEXTAREA') for l in labels): | |
| # Find the best input box | |
| inputs = [l for l in labels if l['tag'] in ('INPUT', 'TEXTAREA')] | |
| if inputs: | |
| action_to_take = "type" | |
| target_val = type_query | |
| # Note: In a real system we'd pick the input index, but here we'll assume the primary input | |
| elif click_query: | |
| targets = [ | |
| l for l in labels | |
| if _score_text_target(click_query, l['text']) > 4 | |
| and (current_url, l['idx']) not in blacklist | |
| ] | |
| if targets: | |
| action_to_take = "click_label" | |
| target_val = str(targets[0]['idx']) | |
| if action_to_take == "observe": | |
| if any(token in prompt.lower() for token in ("scroll", "find", "look for")) or last_action_was_stagnant: | |
| action_to_take = "scroll" | |
| target_val = _extract_scroll_direction(prompt) or "down" | |
| else: | |
| action_to_take = "vision" | |
| # EXECUTE | |
| if action_to_take == "click_label": | |
| final_payload = invoke_browser_action(action=action_to_take, query=target_val) | |
| elif action_to_take == "type": | |
| final_payload = invoke_browser_action(action=action_to_take, text=target_val) | |
| elif action_to_take == "open": | |
| final_payload = invoke_browser_action(action=action_to_take, url=target_val) | |
| elif action_to_take == "scroll": | |
| final_payload = invoke_browser_action(action=action_to_take, direction=target_val) | |
| else: | |
| final_payload = invoke_browser_action(action="vision") | |
| _record(final_payload, action=action_to_take, target=target_val) | |
| # STAGNATION CHECK (for stateful actions) | |
| if action_to_take in ("click_label", "type", "open"): | |
| new_state = _get_state_hash(final_payload) | |
| if new_state == current_state: | |
| if action_to_take == "click_label" and targets: | |
| blacklist.add((current_url, targets[0]['idx'])) | |
| last_action_was_stagnant = True | |
| else: | |
| visited_states.add(new_state) | |
| last_action_was_stagnant = False | |
| remaining -= 1 | |
| # Termination condition | |
| if len(page_text) > 1200 and "results" not in final_payload: | |
| if not any(token in prompt.lower() for token in ("click", "type", "login", "submit")): | |
| break | |
| invoke_browser_action(action="set_hud", query="MISSION: COMPLETE") | |
| invoke_browser_action(action="update_progress", query="12") | |
| return { | |
| "mode": "browser_agentic", | |
| "steps": steps, | |
| "summary": "\n\n".join(step["observation"] for step in steps if step.get("observation")), | |
| "done_reason": "completed", | |
| "final_observation": final_payload, | |
| } | |
| def _unwrap_yahoo_redirect(url: str) -> str: | |
| try: | |
| parsed = urlparse(url) | |
| if "search.yahoo.com" in parsed.netloc or "r.search.yahoo.com" in parsed.netloc: | |
| qs = parse_qs(parsed.query) | |
| ru_values = qs.get("RU") or qs.get("ru") | |
| if ru_values and ru_values[0]: | |
| return unquote(ru_values[0]) | |
| match = re.search(r"/RU=([^/]+)/", parsed.path) | |
| if match: | |
| return unquote(match.group(1)) | |
| except Exception: | |
| pass | |
| return url | |
| class PlaywrightBrowserTool: | |
| """ | |
| Persistent Playwright browser/computer-use wrapper for Phillnet. | |
| Supports: | |
| - search | |
| - open | |
| - click | |
| - type | |
| - press | |
| - scroll | |
| - scroll_to_text | |
| - vision | |
| - screenshot | |
| - accessibility | |
| - ocr | |
| """ | |
| _OVERLAY_STYLE = """ | |
| #phillnet-cursor { | |
| position: fixed; | |
| width: 32px; | |
| height: 32px; | |
| border: 1px solid rgba(0, 242, 255, 0.4); | |
| border-radius: 50%; | |
| background: radial-gradient(circle, rgba(0, 242, 255, 0.1) 0%, transparent 70%); | |
| box-shadow: 0 0 20px rgba(0, 242, 255, 0.2); | |
| pointer-events: none; | |
| transform: translate(-50%, -50%); | |
| z-index: 2147483647; | |
| transition: left 160ms cubic-bezier(0.19, 1, 0.22, 1), | |
| top 160ms cubic-bezier(0.19, 1, 0.22, 1), | |
| width 0.2s ease, height 0.2s ease, border-color 0.2s ease; | |
| display: flex; | |
| align-items: center; | |
| justify-content: center; | |
| } | |
| #phillnet-cursor-inner { | |
| width: 6px; | |
| height: 6px; | |
| background: #00f2ff; | |
| border-radius: 50%; | |
| box-shadow: 0 0 12px #00f2ff, 0 0 24px rgba(0, 242, 255, 0.6); | |
| transition: transform 0.2s ease, background 0.2s ease; | |
| } | |
| #phillnet-cursor.locked { | |
| width: 48px; | |
| height: 48px; | |
| border-color: #ff00ff; | |
| border-width: 2px; | |
| box-shadow: 0 0 30px rgba(255, 0, 255, 0.4); | |
| } | |
| #phillnet-cursor.locked #phillnet-cursor-inner { | |
| background: #ff00ff; | |
| transform: scale(1.5); | |
| box-shadow: 0 0 15px #ff00ff, 0 0 30px rgba(255, 0, 255, 0.8); | |
| } | |
| #phillnet-cursor-rings { | |
| position: absolute; | |
| width: 100%; height: 100%; | |
| border: 1px solid rgba(0, 242, 255, 0.2); | |
| border-radius: 50%; | |
| animation: phillnet-pulse 2s infinite; | |
| } | |
| @keyframes phillnet-pulse { | |
| 0% { transform: scale(0.8); opacity: 0.8; } | |
| 100% { transform: scale(1.5); opacity: 0; } | |
| } | |
| #phillnet-token-tag { | |
| position: fixed; | |
| padding: 4px 10px; | |
| background: rgba(15, 23, 42, 0.96); | |
| color: #7dd3fc; | |
| font: bold 12px 'Consolas', 'Courier New', monospace; | |
| border: 1px solid rgba(125, 211, 252, 0.4); | |
| border-radius: 6px; | |
| pointer-events: none; | |
| z-index: 2147483647; | |
| white-space: nowrap; | |
| transform: translate(18px, 18px); | |
| box-shadow: 0 4px 12px rgba(0, 0, 0, 0.45); | |
| transition: left 140ms cubic-bezier(0.19, 1, 0.22, 1), top 140ms cubic-bezier(0.19, 1, 0.22, 1); | |
| } | |
| .phillnet-interactive-label { | |
| position: fixed; | |
| background: #00f2ff; | |
| color: #0f172a; | |
| padding: 2px 6px; | |
| border-radius: 4px; | |
| font: bold 10px 'Segoe UI', system-ui, sans-serif; | |
| z-index: 2147483640; | |
| pointer-events: none; | |
| box-shadow: 0 2px 8px rgba(0, 242, 255, 0.5); | |
| transform: translate(-100%, -100%); | |
| opacity: 0.9; | |
| } | |
| .phillnet-component-box { | |
| position: fixed; | |
| border: 1px dashed rgba(0, 242, 255, 0.25); | |
| background: rgba(0, 242, 255, 0.03); | |
| pointer-events: none; | |
| z-index: 2147483630; | |
| } | |
| .phillnet-component-label { | |
| position: absolute; | |
| top: 0; left: 0; | |
| background: rgba(0, 242, 255, 0.8); | |
| color: #0f172a; | |
| padding: 1px 4px; | |
| font: bold 9px monospace; | |
| text-transform: uppercase; | |
| } | |
| #phillnet-reticle { | |
| position: fixed; | |
| width: 60px; height: 60px; | |
| border: 1px dashed rgba(0, 242, 255, 0.4); | |
| border-radius: 50%; | |
| pointer-events: none; | |
| z-index: 2147483647; | |
| transform: translate(-50%, -50%) scale(2); | |
| opacity: 0; | |
| transition: transform 300ms ease-out, opacity 300ms ease; | |
| } | |
| #phillnet-reticle.active { | |
| transform: translate(-50%, -50%) scale(1); | |
| opacity: 1; | |
| } | |
| #phillnet-scanline { | |
| position: fixed; | |
| top: 0; left: 0; width: 100%; height: 2px; | |
| background: linear-gradient(90deg, transparent, rgba(0, 242, 255, 0.8), transparent); | |
| z-index: 2147483645; | |
| pointer-events: none; | |
| opacity: 0; | |
| } | |
| @keyframes phillnet-scan { | |
| 0% { top: 0%; } | |
| 100% { top: 100%; } | |
| } | |
| #phillnet-scanline.scanning { | |
| opacity: 1; | |
| animation: phillnet-scan 2s linear infinite; | |
| } | |
| #phillnet-trail { | |
| position: fixed; | |
| width: 6px; | |
| height: 6px; | |
| border-radius: 50%; | |
| background: rgba(0, 242, 255, 0.3); | |
| pointer-events: none; | |
| transform: translate(-50%, -50%); | |
| z-index: 2147483646; | |
| } | |
| #phillnet-hud { | |
| position: fixed; | |
| top: 16px; | |
| right: 16px; | |
| width: 340px; | |
| background: rgba(10, 15, 25, 0.94); | |
| backdrop-filter: blur(8px); | |
| border: 1px solid rgba(0, 242, 255, 0.25); | |
| border-radius: 12px; | |
| padding: 14px; | |
| color: #7dd3fc; | |
| font: 11px/1.4 'Consolas', monospace; | |
| z-index: 2147483647; | |
| pointer-events: none; | |
| box-shadow: 0 10px 40px rgba(0,0,0,0.6); | |
| display: flex; | |
| flex-direction: column; | |
| gap: 8px; | |
| } | |
| .hud-header { | |
| display: flex; | |
| justify-content: space-between; | |
| align-items: center; | |
| border-bottom: 1px solid rgba(0, 242, 255, 0.1); | |
| padding-bottom: 6px; | |
| } | |
| .hud-title { | |
| font-weight: 800; | |
| letter-spacing: 0.1em; | |
| color: #00f2ff; | |
| text-transform: uppercase; | |
| } | |
| .hud-progress { | |
| height: 2px; | |
| background: rgba(0, 242, 255, 0.1); | |
| flex-grow: 1; | |
| margin: 0 10px; | |
| position: relative; | |
| } | |
| .hud-progress-fill { | |
| position: absolute; | |
| top: 0; left: 0; height: 100%; | |
| background: #00f2ff; | |
| transition: width 0.3s ease; | |
| } | |
| .hud-activity { | |
| max-height: 80px; | |
| overflow: hidden; | |
| display: flex; | |
| flex-direction: column-reverse; | |
| gap: 3px; | |
| } | |
| .activity-item { | |
| color: #94a3b8; | |
| border-left: 2px solid rgba(0, 242, 255, 0.4); | |
| padding-left: 6px; | |
| opacity: 0.8; | |
| animation: ph-fade-in 0.3s ease-out; | |
| } | |
| .activity-item.active { | |
| color: #00f2ff; | |
| opacity: 1; | |
| } | |
| .hud-memory { | |
| background: rgba(0, 242, 255, 0.05); | |
| border-radius: 6px; | |
| padding: 6px; | |
| display: none; | |
| } | |
| .hud-memory.active { display: block; } | |
| .mem-tag { | |
| color: #38bdf8; | |
| font-weight: bold; | |
| margin-right: 4px; | |
| } | |
| @keyframes ph-fade-in { from { opacity: 0; transform: translateX(5px); } to { opacity: 1; transform: translateX(0); } } | |
| #phillnet-keyflash { | |
| position: fixed; | |
| bottom: 24px; | |
| right: 24px; | |
| padding: 12px 18px; | |
| background: rgba(0, 242, 255, 0.9); | |
| color: #0f172a; | |
| font: bold 14px 'Consolas', monospace; | |
| border-radius: 12px; | |
| opacity: 0; | |
| transform: translateY(10px); | |
| transition: opacity 160ms ease, transform 160ms ease; | |
| z-index: 2147483647; | |
| pointer-events: none; | |
| box-shadow: 0 0 20px rgba(0, 242, 255, 0.4); | |
| } | |
| #phillnet-keyflash.show { | |
| opacity: 1; | |
| transform: translateY(0); | |
| } | |
| .phillnet-targeted { | |
| outline: 2px solid #00f2ff !important; | |
| outline-offset: 4px !important; | |
| box-shadow: 0 0 15px rgba(0, 242, 255, 0.4) !important; | |
| scroll-margin: 150px !important; | |
| } | |
| #phillnet-vision-badge { | |
| position: fixed; | |
| bottom: 80px; | |
| right: 24px; | |
| padding: 6px 12px; | |
| background: rgba(15, 23, 42, 0.9); | |
| color: #00f2ff; | |
| font: bold 10px monospace; | |
| border: 1px solid rgba(0, 242, 255, 0.3); | |
| border-radius: 4px; | |
| z-index: 2147483647; | |
| text-transform: uppercase; | |
| opacity: 0; | |
| transition: opacity 300ms ease; | |
| } | |
| #phillnet-vision-badge.active { | |
| opacity: 1; | |
| } | |
| """ | |
| _OVERLAY_SCRIPT = """ | |
| () => { | |
| if (window.__phillnetOverlayInstalled) return; | |
| const style = document.createElement('style'); | |
| style.textContent = `%STYLE%`; | |
| document.head.appendChild(style); | |
| const cursor = document.createElement('div'); | |
| cursor.id = 'phillnet-cursor'; | |
| cursor.innerHTML = '<div id="phillnet-cursor-rings"></div><div id="phillnet-cursor-inner"></div>'; | |
| cursor.style.left = '18px'; | |
| cursor.style.top = '18px'; | |
| const trail = document.createElement('div'); | |
| trail.id = 'phillnet-trail'; | |
| trail.style.left = '18px'; | |
| trail.style.top = '18px'; | |
| const tokenTag = document.createElement('div'); | |
| tokenTag.id = 'phillnet-token-tag'; | |
| tokenTag.textContent = 'PHILLNET'; | |
| const hud = document.createElement('div'); | |
| hud.id = 'phillnet-hud'; | |
| hud.innerHTML = ` | |
| <div class="hud-header"> | |
| <div class="hud-title">PHILLNET NEURAL</div> | |
| <div class="hud-progress"><div class="hud-progress-fill" style="width: 0%"></div></div> | |
| <div id="step-id">STP 0</div> | |
| </div> | |
| <div class="hud-activity" id="activity-log"> | |
| <div class="activity-item active">Initializing Neural Cortex... </div> | |
| </div> | |
| <div class="hud-memory" id="memory-monitor"></div> | |
| `; | |
| const reticle = document.createElement('div'); | |
| reticle.id = 'phillnet-reticle'; | |
| const scanline = document.createElement('div'); | |
| scanline.id = 'phillnet-scanline'; | |
| const keyflash = document.createElement('div'); | |
| keyflash.id = 'phillnet-keyflash'; | |
| const visionBadge = document.createElement('div'); | |
| visionBadge.id = 'phillnet-vision-badge'; | |
| visionBadge.textContent = 'NEURAL VISION ACTIVE'; | |
| document.body.appendChild(trail); | |
| document.body.appendChild(cursor); | |
| document.body.appendChild(tokenTag); | |
| document.body.appendChild(hud); | |
| document.body.appendChild(reticle); | |
| document.body.appendChild(scanline); | |
| document.body.appendChild(keyflash); | |
| document.body.appendChild(visionBadge); | |
| window.__phillnetSetVisionBadge = (active) => { | |
| if (active) visionBadge.classList.add('active'); | |
| else visionBadge.classList.remove('active'); | |
| }; | |
| window.__phillnetMoveCursor = (x, y, label) => { | |
| cursor.style.left = `${x}px`; | |
| cursor.style.top = `${y}px`; | |
| tokenTag.style.left = `${x}px`; | |
| tokenTag.style.top = `${y}px`; | |
| trail.style.left = `${x}px`; | |
| trail.style.top = `${y}px`; | |
| reticle.style.left = `${x}px`; | |
| reticle.style.top = `${y}px`; | |
| if (label) { | |
| tokenTag.textContent = label.toUpperCase(); | |
| const body = hud.querySelector('.ph-body'); | |
| if (body) body.textContent = label; | |
| } | |
| }; | |
| window.__phillnetSetReticle = (active) => { | |
| if (active) { | |
| reticle.classList.add('active'); | |
| cursor.classList.add('locked'); | |
| } else { | |
| reticle.classList.remove('active'); | |
| cursor.classList.remove('locked'); | |
| } | |
| }; | |
| window.__phillnetSetScan = (active) => { | |
| if (active) scanline.classList.add('scanning'); | |
| else scanline.classList.remove('scanning'); | |
| }; | |
| window.__phillnetSetHud = (msg, isMemory = false) => { | |
| if (!msg) return; | |
| if (isMemory) { | |
| const mon = hud.querySelector('#memory-monitor'); | |
| mon.innerHTML = `<span class="mem-tag">MEM:</span> ${msg}`; | |
| mon.classList.add('active'); | |
| setTimeout(() => mon.classList.remove('active'), 2500); | |
| return; | |
| } | |
| const log = hud.querySelector('#activity-log'); | |
| const item = document.createElement('div'); | |
| item.className = 'activity-item active'; | |
| item.textContent = msg; | |
| log.querySelectorAll('.activity-item').forEach(el => el.classList.remove('active')); | |
| log.appendChild(item); | |
| if (log.children.length > 3) log.children[0].remove(); | |
| }; | |
| window.__phillnetUpdateProgress = (current, total) => { | |
| const bar = hud.querySelector('.hud-progress-fill'); | |
| const stp = hud.querySelector('#step-id'); | |
| bar.style.width = `${(current / total) * 100}%`; | |
| stp.textContent = `STP ${current}`; | |
| }; | |
| window.__phillnetMarkTarget = (element, label) => { | |
| try { | |
| document.querySelectorAll('.phillnet-targeted').forEach(node => node.classList.remove('phillnet-targeted')); | |
| if (element) { | |
| element.classList.add('phillnet-targeted'); | |
| if (label) { | |
| const body = hud.querySelector('.ph-body'); | |
| if (body) body.textContent = label; | |
| } | |
| setTimeout(() => { | |
| try { element.classList.remove('phillnet-targeted'); } catch (err) {} | |
| }, 1800); | |
| } | |
| } catch (err) {} | |
| }; | |
| window.__phillnetFlashKey = (text) => { | |
| keyflash.textContent = text || ''; | |
| keyflash.classList.add('show'); | |
| setTimeout(() => keyflash.classList.remove('show'), 850); | |
| }; | |
| window.__phillnetLabelInteractive = () => { | |
| document.querySelectorAll('.phillnet-interactive-label').forEach(el => el.remove()); | |
| const elements = Array.from(document.querySelectorAll('a, button, input, select, textarea, [role=button], [role=link]')); | |
| const labeled = []; | |
| elements.forEach((el, idx) => { | |
| const rect = el.getBoundingClientRect(); | |
| if (rect.width > 0 && rect.height > 0 && rect.top >= 0 && rect.top <= window.innerHeight) { | |
| const label = document.createElement('div'); | |
| label.className = 'phillnet-interactive-label'; | |
| label.textContent = idx; | |
| label.style.left = (rect.left + rect.width) + 'px'; | |
| label.style.top = rect.top + 'px'; | |
| document.body.appendChild(label); | |
| labeled.push({ | |
| idx, | |
| tag: el.tagName, | |
| text: (el.innerText || el.value || '').slice(0, 30), | |
| box: {x: rect.left, y: rect.top, w: rect.width, h: rect.height} | |
| }); | |
| } | |
| }); | |
| return labeled; | |
| }; | |
| window.__phillnetScanComponents = () => { | |
| document.querySelectorAll('.phillnet-component-box').forEach(el => el.remove()); | |
| const regions = ['header', 'nav', 'main', 'footer', 'aside', 'section']; | |
| const found = []; | |
| regions.forEach(tag => { | |
| const el = document.querySelector(tag); | |
| if (el) { | |
| const rect = el.getBoundingClientRect(); | |
| if (rect.width > 50 && rect.height > 50) { | |
| const box = document.createElement('div'); | |
| box.className = 'phillnet-component-box'; | |
| box.style.left = rect.left + 'px'; | |
| box.style.top = rect.top + 'px'; | |
| box.style.width = rect.width + 'px'; | |
| box.style.height = rect.height + 'px'; | |
| box.innerHTML = `<div class="phillnet-component-label">${tag}</div>`; | |
| document.body.appendChild(box); | |
| found.push({tag, box: {x: rect.left, y: rect.top, w: rect.width, h: rect.height}}); | |
| } | |
| } | |
| }); | |
| return found; | |
| }; | |
| window.__phillnetCleanPage = () => { | |
| const selectors = [ | |
| '[id*="consent"]', '[class*="consent"]', '[id*="cookie"]', '[class*="cookie"]', | |
| '[id*="modal"]', '[class*="modal"]', '[class*="overlay"]', '[class*="popup"]', | |
| '.ad-banner', '.newsletter-signup' | |
| ]; | |
| let cleaned = 0; | |
| selectors.forEach(sel => { | |
| document.querySelectorAll(sel).forEach(el => { | |
| const rect = el.getBoundingClientRect(); | |
| if (rect.width > 300 || rect.height > 300) { | |
| el.style.display = 'none'; | |
| cleaned++; | |
| } | |
| }); | |
| }); | |
| return cleaned; | |
| }; | |
| window.__phillnetOverlayInstalled = true; | |
| } | |
| """ | |
| def __init__( | |
| self, | |
| *, | |
| headless: bool = False, | |
| timeout_ms: int = 12000, | |
| capture_dir: Optional[str] = None, | |
| enable_ocr: bool = True, | |
| flush_captures_on_init: bool = True, | |
| flush_captures_on_close: bool = True, | |
| capture_keep_last: int = 0, | |
| ): | |
| self.headless = bool(headless) | |
| self.timeout_ms = int(timeout_ms) | |
| self.capture_dir = Path(capture_dir or "outputs/captures") | |
| self.capture_dir.mkdir(parents=True, exist_ok=True) | |
| self.enable_ocr = bool(enable_ocr) | |
| self.flush_captures_on_init = bool(flush_captures_on_init) | |
| self.flush_captures_on_close = bool(flush_captures_on_close) | |
| self.capture_keep_last = max(0, int(capture_keep_last)) | |
| self._pw = None | |
| self._browser = None | |
| self._page = None | |
| self.history: List[str] = [] | |
| self.working_memory: Dict[str, str] = {} | |
| if self.flush_captures_on_init: | |
| self._cleanup_captures() | |
| def _cleanup_captures(self): | |
| try: | |
| files = sorted( | |
| [p for p in self.capture_dir.glob("*.png") if p.is_file()], | |
| key=lambda p: p.stat().st_mtime, | |
| reverse=True, | |
| ) | |
| keep = self.capture_keep_last | |
| for stale in files[keep:]: | |
| try: | |
| stale.unlink(missing_ok=True) | |
| except Exception: | |
| pass | |
| except Exception: | |
| pass | |
| def _ensure_page(self): | |
| if self._page is not None: | |
| return self._page | |
| print(f" [!] Launching Stealth Browser (Headless={self.headless})...") | |
| self._pw = sync_playwright().start() | |
| # Use a realistic user agent | |
| self._browser = self._pw.chromium.launch(headless=self.headless) | |
| self._context = self._browser.new_context( | |
| user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", | |
| viewport={"width": 1440, "height": 960} | |
| ) | |
| self._page = self._context.new_page() | |
| # Bypass bot detection: hide webdriver | |
| self._page.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") | |
| self._page.set_default_timeout(self.timeout_ms) | |
| # PERSISTENT OVERLAY INJECTION | |
| # Inject both style and HUD persistence script as a high-priority init script | |
| full_init_script = f""" | |
| (function() {{ | |
| const style = document.createElement('style'); | |
| style.textContent = `{self._OVERLAY_STYLE}`; | |
| document.head.appendChild(style); | |
| const installOverlay = () => {{ | |
| if (document.getElementById('phillnet-cursor')) return; | |
| const cursor = document.createElement('div'); | |
| cursor.id = 'phillnet-cursor'; | |
| cursor.innerHTML = '<div id="phillnet-cursor-inner"></div>'; | |
| document.body.appendChild(cursor); | |
| const hud = document.createElement('div'); | |
| hud.id = 'phillnet-hud'; | |
| hud.innerHTML = '<div class="hud-header"><div class="hud-title">PHILLNET NEURAL</div></div>'; | |
| document.body.appendChild(hud); | |
| }}; | |
| // Ensure installation on DOM load and any subsequent updates | |
| installOverlay(); | |
| const observer = new MutationObserver(installOverlay); | |
| observer.observe(document.body, {{ childList: true, subtree: true }}); | |
| document.addEventListener('mousemove', (e) => {{ | |
| const cursor = document.getElementById('phillnet-cursor'); | |
| if (cursor) {{ | |
| cursor.style.left = e.clientX + 'px'; | |
| cursor.style.top = e.clientY + 'px'; | |
| }} | |
| }}); | |
| }})(); | |
| """ | |
| self._page.add_init_script(full_init_script) | |
| return self._page | |
| def close(self): | |
| if self._browser is not None: | |
| try: | |
| self._browser.close() | |
| except Exception: | |
| pass | |
| if self._pw is not None: | |
| try: | |
| self._pw.stop() | |
| except Exception: | |
| pass | |
| self._browser = None | |
| self._pw = None | |
| self._page = None | |
| if self.flush_captures_on_close: | |
| self._cleanup_captures() | |
| def __del__(self): | |
| self.close() | |
| def _inject_overlay(self, page, label: str = "Phillnet browser mode active"): | |
| try: | |
| # Note: add_init_script handles the logic injection now, | |
| # we just update the HUD label dynamically here. | |
| page.evaluate("(label) => window.__phillnetSetHud && window.__phillnetSetHud(label)", label) | |
| except Exception: | |
| pass | |
| def _move_cursor_to_locator(self, page, locator, label: str): | |
| try: | |
| bbox = locator.bounding_box() | |
| if bbox: | |
| x = bbox["x"] + (bbox["width"] / 2.0) | |
| y = bbox["y"] + (bbox["height"] / 2.0) | |
| # REFLEX: Skip steps if in high-speed mode | |
| move_steps = 1 if getattr(self, "reflex_mode", False) else 18 | |
| page.mouse.move(x, y, steps=move_steps) | |
| if not getattr(self, "reflex_mode", False): | |
| page.evaluate( | |
| "(payload) => window.__phillnetMoveCursor && window.__phillnetMoveCursor(payload.x, payload.y, payload.label)", | |
| {"x": x, "y": y, "label": label}, | |
| ) | |
| page.wait_for_timeout(120) | |
| except Exception: | |
| pass | |
| def _mark_locator(self, page, locator, label: str): | |
| if getattr(self, "reflex_mode", False): return | |
| try: | |
| locator.evaluate( | |
| "(element, label) => window.__phillnetMarkTarget && window.__phillnetMarkTarget(element, label)", | |
| label, | |
| ) | |
| page.wait_for_timeout(90) | |
| except Exception: | |
| pass | |
| def _move_cursor_to_point(self, page, x: float, y: float, label: str): | |
| try: | |
| move_steps = 1 if getattr(self, "reflex_mode", False) else 20 | |
| page.mouse.move(x, y, steps=move_steps) | |
| if not getattr(self, "reflex_mode", False): | |
| page.evaluate( | |
| "(payload) => window.__phillnetMoveCursor && window.__phillnetMoveCursor(payload.x, payload.y, payload.label)", | |
| {"x": x, "y": y, "label": label}, | |
| ) | |
| page.wait_for_timeout(120) | |
| except Exception: | |
| pass | |
| def enable_reflex_mode(self, enabled: bool = True): | |
| """Toggle low-latency execution path.""" | |
| self.reflex_mode = bool(enabled) | |
| mode_str = "REFLEX (Low Latency)" if enabled else "Standard (Rich Visuals)" | |
| print(f"[*] Browser Control Mode: {mode_str}") | |
| def key_down(self, key: str): | |
| page = self._ensure_page() | |
| page.keyboard.down(key) | |
| def key_up(self, key: str): | |
| page = self._ensure_page() | |
| page.keyboard.up(key) | |
| def mouse_move(self, x: float, y: float): | |
| page = self._ensure_page() | |
| page.mouse.move(x, y) | |
| def mouse_down(self): | |
| page = self._ensure_page() | |
| page.mouse.down() | |
| def mouse_up(self): | |
| page = self._ensure_page() | |
| page.mouse.up() | |
| def _candidate_locators_for_target(self, page, target: str): | |
| candidates = [] | |
| selectors = [ | |
| ("role_button", page.get_by_role("button", name=target, exact=False)), | |
| ("role_link", page.get_by_role("link", name=target, exact=False)), | |
| ("text", page.get_by_text(target, exact=False)), | |
| ("anchor_button", page.locator("a,button,[role='button']").filter(has_text=target)), | |
| ] | |
| first_word = (target or "").split(" ", 1)[0].strip() | |
| if first_word and first_word.lower() != (target or "").lower(): | |
| selectors.append(("first_word", page.locator("a,button,[role='button']").filter(has_text=first_word))) | |
| for name, locator in selectors: | |
| try: | |
| count = min(locator.count(), 6) | |
| except Exception: | |
| count = 0 | |
| for index in range(count): | |
| try: | |
| item = locator.nth(index) | |
| text = _compact_text(item.inner_text(), 180) | |
| href = "" | |
| tag = "" | |
| role = "" | |
| html_for = "" | |
| input_type = "" | |
| disabled = False | |
| try: | |
| href = str(item.get_attribute("href") or "") | |
| except Exception: | |
| href = "" | |
| try: | |
| tag = str(item.evaluate("(el) => (el.tagName || '').toLowerCase()") or "") | |
| except Exception: | |
| tag = "" | |
| try: | |
| role = str(item.get_attribute("role") or "") | |
| except Exception: | |
| role = "" | |
| try: | |
| html_for = str(item.get_attribute("for") or "") | |
| except Exception: | |
| html_for = "" | |
| try: | |
| input_type = str(item.get_attribute("type") or "") | |
| except Exception: | |
| input_type = "" | |
| try: | |
| disabled = bool(item.is_disabled()) | |
| except Exception: | |
| disabled = False | |
| score = _score_text_target(target, text, href) | |
| if tag in {"a", "button"}: | |
| score += 2.0 | |
| if role in {"button", "link", "menuitem", "tab"}: | |
| score += 1.5 | |
| if tag in {"input", "textarea", "select"}: | |
| score += 2.5 | |
| if input_type in {"submit", "button", "search"}: | |
| score += 1.0 | |
| if tag == "label" and html_for: | |
| score -= 2.0 | |
| if disabled: | |
| score -= 4.0 | |
| if score > 0: | |
| candidates.append((score, name, item)) | |
| except Exception: | |
| continue | |
| candidates.sort(key=lambda triplet: triplet[0], reverse=True) | |
| return candidates | |
| def _candidate_inputs_for_prompt(self, page, target: str): | |
| target_n = _normalize_label(target) | |
| candidates = [] | |
| locator = page.locator("input,textarea,[contenteditable='true'],[role='textbox']") | |
| try: | |
| count = min(locator.count(), 8) | |
| except Exception: | |
| count = 0 | |
| for index in range(count): | |
| try: | |
| item = locator.nth(index) | |
| attrs = [] | |
| for name in ("placeholder", "name", "aria-label", "id", "type"): | |
| try: | |
| value = str(item.get_attribute(name) or "").strip() | |
| except Exception: | |
| value = "" | |
| if value: | |
| attrs.append(value) | |
| label_blob = " ".join(attrs) | |
| score = _score_text_target(target_n, label_blob, "") | |
| if score <= 0 and any(term in label_blob.lower() for term in ("search", "query", "find")): | |
| score = 2.5 | |
| try: | |
| if item.is_disabled(): | |
| score -= 4.0 | |
| except Exception: | |
| pass | |
| if score > 0: | |
| candidates.append((score, item)) | |
| except Exception: | |
| continue | |
| candidates.sort(key=lambda pair: pair[0], reverse=True) | |
| return candidates | |
| def _iter_click_candidates(self, page, selector: Optional[str], text_target: Optional[str]): | |
| seen = set() | |
| if selector: | |
| yield page.locator(selector).first | |
| return | |
| text_target = (text_target or "").strip() | |
| if not text_target: | |
| return | |
| lowered = text_target.lower() | |
| if any(token in lowered for token in ("textbox", "input", "field", "search box", "search field", "combobox")): | |
| for _, item in self._candidate_inputs_for_prompt(page, text_target): | |
| try: | |
| key = item.evaluate("(el) => (el.tagName || '') + '|' + (el.id || '') + '|' + (el.name || '')") | |
| except Exception: | |
| key = None | |
| if key and key in seen: | |
| continue | |
| if key: | |
| seen.add(key) | |
| yield item | |
| for _, _, item in self._candidate_locators_for_target(page, text_target): | |
| try: | |
| key = item.evaluate("(el) => (el.tagName || '') + '|' + (el.id || '') + '|' + (el.innerText || el.value || '').slice(0,120)") | |
| except Exception: | |
| key = None | |
| if key and key in seen: | |
| continue | |
| if key: | |
| seen.add(key) | |
| yield item | |
| for _, item in self._candidate_inputs_for_prompt(page, text_target): | |
| try: | |
| key = item.evaluate("(el) => (el.tagName || '') + '|' + (el.id || '') + '|' + (el.name || '')") | |
| except Exception: | |
| key = None | |
| if key and key in seen: | |
| continue | |
| if key: | |
| seen.add(key) | |
| yield item | |
| yield page.locator("a,button,input,textarea,[role='button'],[role='link'],[role='textbox']").first | |
| def _click_with_fallback(self, page, candidates, label: str): | |
| errors: List[str] = [] | |
| for candidate in candidates: | |
| try: | |
| target_locator = candidate.first if hasattr(candidate, "first") else candidate | |
| target_locator.scroll_into_view_if_needed(timeout=self.timeout_ms) | |
| self._mark_locator(page, target_locator, label) | |
| self._move_cursor_to_locator(page, target_locator, label) | |
| page.wait_for_timeout(80) | |
| target_locator.click(timeout=self.timeout_ms) | |
| page.wait_for_timeout(1000) | |
| return | |
| except Exception as exc: | |
| errors.append(str(exc)) | |
| continue | |
| raise RuntimeError("; ".join(errors[:3]) or "no clickable candidate succeeded") | |
| def _flash_key(self, page, key_label: str): | |
| try: | |
| page.evaluate("(text) => window.__phillnetFlashKey && window.__phillnetFlashKey(text)", key_label) | |
| except Exception: | |
| pass | |
| def _search_results_from_page(self, page, max_results: int = 5, max_snippet_chars: int = 280) -> List[Dict[str, str]]: | |
| results: List[Dict[str, str]] = [] | |
| title_locators = [] | |
| for selector in ( | |
| "ol.searchCenterMiddle li div.compTitle h3 a", | |
| "div#web h3 a", | |
| "a.result__a", | |
| "a[data-testid='result-title-a']", | |
| ): | |
| locator = page.locator(selector) | |
| if locator.count() > 0: | |
| title_locators = locator.all()[: max_results * 10] | |
| break | |
| if not title_locators: | |
| title_locators = page.locator("a[href]").all()[: max_results * 20] | |
| for anchor in title_locators: | |
| try: | |
| href = _unwrap_yahoo_redirect((anchor.get_attribute("href") or "").strip()) | |
| title = _compact_text(anchor.inner_text(), 180) | |
| if not href or not title: | |
| continue | |
| if not href.startswith("http"): | |
| continue | |
| if href.startswith("/") or href.startswith("javascript:"): | |
| continue | |
| parsed_href = urlparse(href) | |
| if parsed_href.netloc.endswith("yahoo.com"): | |
| continue | |
| if any(blocked in href for blocked in ( | |
| "search.yahoo.com/search", | |
| "duckduckgo.com", | |
| "bing.com/search", | |
| )): | |
| continue | |
| snippet = "" | |
| try: | |
| parent = anchor.locator( | |
| "xpath=ancestor::li[1] | xpath=ancestor::*[contains(@class,'result')][1] | xpath=ancestor::*[contains(@class,'algo')][1]" | |
| ).first | |
| snippet_locator = parent.locator( | |
| ".result__snippet, [data-result='snippet'], div.compText p, p.fc-dustygray, p" | |
| ) | |
| if snippet_locator.count() > 0: | |
| snippet = _compact_text(snippet_locator.first.inner_text(), max_snippet_chars) | |
| except Exception: | |
| snippet = "" | |
| results.append({"title": title, "url": href, "snippet": snippet}) | |
| if len(results) >= max_results: | |
| break | |
| except Exception: | |
| continue | |
| return results | |
| def _capture_accessibility_tree(self, page) -> Any: | |
| try: | |
| snapshot = page.locator("body").aria_snapshot() | |
| if snapshot: | |
| return snapshot | |
| except Exception: | |
| pass | |
| try: | |
| return page.evaluate( | |
| """ | |
| () => { | |
| const nodes = Array.from(document.querySelectorAll('a,button,input,textarea,select,[role],[aria-label],h1,h2,h3,h4,h5,h6')).slice(0, 120); | |
| return nodes.map((el, idx) => ({ | |
| index: idx, | |
| tag: el.tagName.toLowerCase(), | |
| role: el.getAttribute('role') || '', | |
| ariaLabel: el.getAttribute('aria-label') || '', | |
| text: (el.innerText || el.value || '').trim().slice(0, 160), | |
| name: (el.getAttribute('name') || '').slice(0, 80), | |
| type: (el.getAttribute('type') || '').slice(0, 40), | |
| href: (el.getAttribute('href') || '').slice(0, 220), | |
| })); | |
| } | |
| """ | |
| ) | |
| except Exception: | |
| return [] | |
| def _capture_page_analysis(self, page) -> Dict[str, Any]: | |
| try: | |
| return page.evaluate( | |
| """ | |
| () => { | |
| const pick = (selector, mapper, limit=24) => | |
| Array.from(document.querySelectorAll(selector)).slice(0, limit).map(mapper); | |
| return { | |
| headings: pick('h1,h2,h3,h4', el => (el.innerText || '').trim()).filter(Boolean), | |
| links: pick('a[href]', el => ({ | |
| text: (el.innerText || '').trim().slice(0, 120), | |
| href: (el.href || '').slice(0, 220) | |
| })).filter(x => x.text || x.href), | |
| buttons: pick('button,input[type=button],input[type=submit],[role=button]', el => ({ | |
| text: (el.innerText || el.value || el.getAttribute('aria-label') || '').trim().slice(0, 120), | |
| })).filter(x => x.text), | |
| forms: pick('form', el => ({ | |
| action: (el.getAttribute('action') || '').slice(0, 200), | |
| method: (el.getAttribute('method') || 'get').toLowerCase(), | |
| }), 10), | |
| inputs: pick('input,textarea,select', el => ({ | |
| type: (el.getAttribute('type') || el.tagName.toLowerCase()).slice(0, 60), | |
| name: (el.getAttribute('name') || '').slice(0, 120), | |
| placeholder: (el.getAttribute('placeholder') || '').slice(0, 120), | |
| ariaLabel: (el.getAttribute('aria-label') || '').slice(0, 120), | |
| }), 24), | |
| landmarks: pick('header,nav,main,aside,footer,section,[role]', el => ({ | |
| tag: el.tagName.toLowerCase(), | |
| role: (el.getAttribute('role') || '').slice(0, 60), | |
| label: (el.getAttribute('aria-label') || '').slice(0, 120), | |
| }), 24), | |
| }; | |
| } | |
| """ | |
| ) | |
| except Exception: | |
| return {} | |
| def _capture_screenshot(self, page, prefix: str = "browser") -> str: | |
| path = self.capture_dir / f"{prefix}_{int(time.time() * 1000)}_{uuid.uuid4().hex[:8]}.png" | |
| page.screenshot(path=str(path), full_page=True) | |
| return str(path) | |
| def _run_vision_node(self, image_path: str) -> Dict[str, Any]: | |
| """Breakthrough Vision Node: Extracts deep visual forms from pixels.""" | |
| if not self.enable_ocr: | |
| return {"error": "vision nodes disabled"} | |
| # Try advanced vision first, fallback to basic OCR | |
| script_path = Path(__file__).with_name("browser_vision.js") | |
| if not script_path.exists(): | |
| script_path = Path(__file__).with_name("browser_ocr.js") | |
| if not script_path.exists(): | |
| return {"error": f"vision script missing: {script_path}"} | |
| try: | |
| proc = subprocess.run( | |
| ["node", str(script_path), image_path], | |
| capture_output=True, | |
| text=True, | |
| encoding="utf-8", | |
| errors="replace", | |
| check=True, | |
| timeout=150, | |
| ) | |
| stdout = (proc.stdout or "").strip() | |
| if stdout: | |
| return json.loads(stdout) | |
| return {"error": "vision node produced no output"} | |
| except Exception as exc: | |
| return {"error": str(exc)} | |
| def _compare_visuals(self, path_a: str, path_b: str) -> Dict[str, Any]: | |
| """Compares two screenshots to detect visual state changes (Visual Latch).""" | |
| # Placeholder for real pixel comparison if needed, | |
| # for now we use file existence and basic metadata | |
| return { | |
| "state_changed": path_a != path_b, | |
| "verification": "Visual state transitioned" if path_a != path_b else "No visual change detected" | |
| } | |
| def _page_payload(self, page, *, action: str, max_chars: int = 1600, include_ocr: bool = False) -> Dict[str, Any]: | |
| text = _compact_text(page.locator("body").inner_text(), max_chars) | |
| # 🔗 High-Speed OODA: Visual injection without blocking delays | |
| try: | |
| # Ensure premium visuals are active | |
| page.evaluate("window.__phillnetLabelInteractive && window.__phillnetLabelInteractive()") | |
| # Small yield for DOM stability | |
| page.wait_for_timeout(20) | |
| except Exception: | |
| pass | |
| screenshot_path = self._capture_screenshot(page, prefix=action) | |
| # Capture semantic metadata after screenshot to overlap compute | |
| labeled_elements = [] | |
| semantic_components = [] | |
| try: | |
| labeled_elements = page.evaluate("window.__phillnetGetLastLabels && window.__phillnetGetLastLabels()") | |
| semantic_components = page.evaluate("window.__phillnetScanComponents && window.__phillnetScanComponents()") | |
| except Exception: | |
| pass | |
| payload = { | |
| "action": action, | |
| "url": page.url, | |
| "title": page.title(), | |
| "text": text, | |
| "summary": f"{page.title()} | {text}", | |
| "accessibility_tree": self._capture_accessibility_tree(page), | |
| "page_analysis": self._capture_page_analysis(page), | |
| "labeled_elements": labeled_elements, | |
| "semantic_components": semantic_components, | |
| "history": self.history[-5:], | |
| "working_memory": self.working_memory, | |
| "screenshot_path": screenshot_path, | |
| } | |
| if include_ocr: | |
| payload["vision_node_data"] = self._run_vision_node(screenshot_path) | |
| # Legacy compatibility | |
| if "vision_node_data" in payload and "ocr" in payload["vision_node_data"]: | |
| payload["ocr"] = payload["vision_node_data"]["ocr"] | |
| payload["agentic_context"] = { | |
| "current_state": "active", | |
| "can_continue": True, | |
| "system_instruction": "Evaluate if the current result satisfies your objective. If not, chain the next logical tool call." | |
| } | |
| return payload | |
| def search( | |
| self, | |
| query: str, | |
| *, | |
| max_results: int = 5, | |
| max_snippet_chars: int = 280, | |
| include_ocr: bool = False, | |
| ) -> Dict[str, Any]: | |
| query = (query or "").strip() | |
| if not query: | |
| return {"action": "search", "query": query, "results": [], "summary": ""} | |
| page = self._ensure_page() | |
| tried = [] | |
| results: List[Dict[str, str]] = [] | |
| for engine_name, url in ( | |
| ("google", f"https://www.google.com/search?q={quote_plus(query)}"), | |
| ("yahoo", f"https://search.yahoo.com/search?p={quote_plus(query)}"), | |
| ("duckduckgo_html", f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"), | |
| ): | |
| tried.append(engine_name) | |
| try: | |
| page.goto(url, wait_until="domcontentloaded") | |
| page.wait_for_timeout(1400) | |
| self._inject_overlay(page, f"Phillnet browser_mode: {engine_name} search '{query}'") | |
| results = self._search_results_from_page(page, max_results=max_results, max_snippet_chars=max_snippet_chars) | |
| body_text = page.locator("body").inner_text().lower() | |
| if results: | |
| break | |
| if any(challenge in body_text for challenge in ("confirm you're not a robot", "complete the following challenge", "captcha")): | |
| continue | |
| except Exception: | |
| continue | |
| payload = self._page_payload(page, action="search", max_chars=max_snippet_chars * 4, include_ocr=include_ocr) | |
| payload.update( | |
| { | |
| "query": query, | |
| "search_engines_tried": tried, | |
| "results": results, | |
| "summary": "\n".join( | |
| f"- {item['title']} | {item['snippet']} | {item['url']}" for item in results | |
| ) or payload["summary"], | |
| } | |
| ) | |
| payload["agentic_context"] = { | |
| "current_state": "active", | |
| "can_continue": True, | |
| "system_instruction": "Evaluate if the current result satisfies your objective. If not, chain the next logical tool call." | |
| } | |
| return payload | |
| def open(self, url: str, *, max_chars: int = 1600, include_ocr: bool = False) -> Dict[str, Any]: | |
| page = self._ensure_page() | |
| page.goto((url or "").strip(), wait_until="domcontentloaded") | |
| page.wait_for_timeout(800) | |
| self._inject_overlay(page, f"Phillnet browser_mode: open {page.url}") | |
| return self._page_payload(page, action="open", max_chars=max_chars, include_ocr=include_ocr) | |
| def click( | |
| self, | |
| url: str = "", | |
| *, | |
| selector: Optional[str] = None, | |
| text_target: Optional[str] = None, | |
| max_chars: int = 1600, | |
| include_ocr: bool = False, | |
| ) -> Dict[str, Any]: | |
| page = self._ensure_page() | |
| if url: | |
| page.goto(url.strip(), wait_until="domcontentloaded") | |
| page.wait_for_timeout(800) | |
| self._inject_overlay(page, "Phillnet browser_mode: click") | |
| if selector: | |
| candidates = [page.locator(selector).first] | |
| elif text_target: | |
| candidates = list(self._iter_click_candidates(page, selector=None, text_target=text_target)) | |
| else: | |
| raise ValueError("selector or text_target is required for click") | |
| click_label = f"Phillnet clicking {text_target or selector or 'target'}" | |
| # CYBER: Activate Reticle Lock-On | |
| try: | |
| page.evaluate("(active) => window.__phillnetSetReticle && window.__phillnetSetReticle(active)", True) | |
| page.wait_for_timeout(300) | |
| except Exception: pass | |
| self._click_with_fallback(page, candidates, click_label) | |
| try: | |
| page.evaluate("(active) => window.__phillnetSetReticle && window.__phillnetSetReticle(active)", False) | |
| except Exception: pass | |
| payload = self._page_payload(page, action="click", max_chars=max_chars, include_ocr=include_ocr) | |
| if text_target: | |
| payload["text_target"] = text_target | |
| if selector: | |
| payload["selector"] = selector | |
| payload["agentic_context"] = { | |
| "current_state": "active", | |
| "can_continue": True, | |
| "system_instruction": "Evaluate if the current result satisfies your objective. If not, chain the next logical tool call." | |
| } | |
| return payload | |
| def click_label( | |
| self, | |
| label_idx: int, | |
| *, | |
| max_chars: int = 1600, | |
| include_ocr: bool = False, | |
| ) -> Dict[str, Any]: | |
| page = self._ensure_page() | |
| self._inject_overlay(page, f"Phillnet browser_mode: click label [{label_idx}]") | |
| # Find element by label index | |
| try: | |
| # We re-query the elements in the same order as __phillnetLabelInteractive | |
| handle = page.evaluate_handle( | |
| f"(idx) => Array.from(document.querySelectorAll('a, button, input, select, textarea, [role=button], [role=link]'))[idx]", | |
| label_idx | |
| ) | |
| element = handle.as_element() | |
| if not element: | |
| raise ValueError(f"Label index {label_idx} not found on page") | |
| # Convert element handle to locator or just use the handle for movement/clicking | |
| # Playwright mouse move on element handle? | |
| box = element.bounding_box() | |
| if box: | |
| self._move_cursor_to_point(page, box["x"] + box["width"]/2, box["y"] + box["height"]/2, f"Targeting Label [{label_idx}]") | |
| # CYBER: Activate Reticle Lock-On | |
| page.evaluate("(active) => window.__phillnetSetReticle && window.__phillnetSetReticle(active)", True) | |
| page.wait_for_timeout(200) | |
| element.click() | |
| page.evaluate("(active) => window.__phillnetSetReticle && window.__phillnetSetReticle(active)", False) | |
| page.wait_for_timeout(800) | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to click label {label_idx}: {e}") | |
| return self._page_payload(page, action="click_label", max_chars=max_chars, include_ocr=include_ocr) | |
| def hover_label( | |
| self, | |
| label_idx: int, | |
| *, | |
| max_chars: int = 1600, | |
| include_ocr: bool = False, | |
| ) -> Dict[str, Any]: | |
| page = self._ensure_page() | |
| self._inject_overlay(page, f"Phillnet browser_mode: hover label [{label_idx}]") | |
| try: | |
| handle = page.evaluate_handle( | |
| f"(idx) => Array.from(document.querySelectorAll('a, button, input, select, textarea, [role=button], [role=link]'))[idx]", | |
| label_idx | |
| ) | |
| element = handle.as_element() | |
| if not element: | |
| raise ValueError(f"Label index {label_idx} not found") | |
| box = element.bounding_box() | |
| if box: | |
| self._move_cursor_to_point(page, box["x"] + box["width"]/2, box["y"] + box["height"]/2, f"Hovering Label [{label_idx}]") | |
| element.hover() | |
| page.wait_for_timeout(400) | |
| except Exception as e: | |
| raise RuntimeError(f"Hover failed: {e}") | |
| return self._page_payload(page, action="hover_label", max_chars=max_chars, include_ocr=include_ocr) | |
| def clean_page(self, *, max_chars: int = 1600, include_ocr: bool = False) -> Dict[str, Any]: | |
| page = self._ensure_page() | |
| self._inject_overlay(page, "Phillnet browser_mode: cleaning frictions") | |
| try: | |
| cleaned_count = page.evaluate("window.__phillnetCleanPage()") | |
| self._flash_key(page, f"CLEANED {cleaned_count} OVERLAYS") | |
| page.wait_for_timeout(600) | |
| except Exception: | |
| pass | |
| return self._page_payload(page, action="clean", max_chars=max_chars, include_ocr=include_ocr) | |
| def type_text( | |
| self, | |
| *, | |
| url: str = "", | |
| selector: Optional[str] = None, | |
| text: str = "", | |
| max_chars: int = 1600, | |
| include_ocr: bool = False, | |
| ) -> Dict[str, Any]: | |
| page = self._ensure_page() | |
| if url: | |
| page.goto(url.strip(), wait_until="domcontentloaded") | |
| page.wait_for_timeout(800) | |
| self._inject_overlay(page, "Phillnet browser_mode: type") | |
| if selector: | |
| locator = page.locator(selector).first | |
| elif text: | |
| input_candidates = self._candidate_inputs_for_prompt(page, text) | |
| locator = input_candidates[0][1] if input_candidates else page.locator("input,textarea,[contenteditable='true'],[role='textbox']").first | |
| else: | |
| locator = page.locator("input,textarea,[contenteditable='true'],[role='textbox']").first | |
| try: | |
| locator.scroll_into_view_if_needed(timeout=self.timeout_ms) | |
| except Exception: | |
| pass | |
| self._mark_locator(page, locator, "Phillnet targeting input") | |
| self._move_cursor_to_locator(page, locator, "Phillnet focusing input") | |
| locator.click() | |
| self._flash_key(page, f"TYPE {text[:32]}") | |
| locator.fill("") | |
| locator.type(text, delay=25) | |
| page.wait_for_timeout(500) | |
| return self._page_payload(page, action="type", max_chars=max_chars, include_ocr=include_ocr) | |
| def press_key( | |
| self, | |
| *, | |
| url: str = "", | |
| key: str = "Enter", | |
| max_chars: int = 1600, | |
| include_ocr: bool = False, | |
| ) -> Dict[str, Any]: | |
| page = self._ensure_page() | |
| if url: | |
| page.goto(url.strip(), wait_until="domcontentloaded") | |
| page.wait_for_timeout(800) | |
| self._inject_overlay(page, f"Phillnet browser_mode: key {key}") | |
| self._flash_key(page, f"KEY {key}") | |
| page.keyboard.press(key) | |
| page.wait_for_timeout(700) | |
| return self._page_payload(page, action="press", max_chars=max_chars, include_ocr=include_ocr) | |
| def scroll( | |
| self, | |
| *, | |
| url: str = "", | |
| direction: str = "down", | |
| amount: int = 900, | |
| max_chars: int = 1600, | |
| include_ocr: bool = False, | |
| ) -> Dict[str, Any]: | |
| page = self._ensure_page() | |
| if url: | |
| page.goto(url.strip(), wait_until="domcontentloaded") | |
| page.wait_for_timeout(800) | |
| self._inject_overlay(page, f"Phillnet browser_mode: scroll {direction}") | |
| amount = max(120, int(amount)) | |
| signed_amount = -amount if str(direction).lower().strip() == "up" else amount | |
| try: | |
| viewport = page.viewport_size or {"width": 1280, "height": 900} | |
| center_x = max(120, int(viewport.get("width", 1280) * 0.56)) | |
| center_y = max(120, int(viewport.get("height", 900) * 0.58)) | |
| except Exception: | |
| center_x, center_y = 720, 480 | |
| self._move_cursor_to_point(page, center_x, center_y, f"Scrolling {direction}") | |
| # CYBER: Kinetic Smooth Glide | |
| steps = 15 | |
| step_amount = signed_amount / steps | |
| for _ in range(steps): | |
| page.mouse.wheel(0, step_amount) | |
| page.wait_for_timeout(40) | |
| page.wait_for_timeout(400) | |
| payload = self._page_payload(page, action="scroll", max_chars=max_chars, include_ocr=include_ocr) | |
| payload["scroll_direction"] = str(direction) | |
| payload["scroll_amount"] = int(amount) | |
| payload["agentic_context"] = { | |
| "current_state": "active", | |
| "can_continue": True, | |
| "system_instruction": "Evaluate if the current result satisfies your objective. If not, chain the next logical tool call." | |
| } | |
| return payload | |
| def scroll_to_text( | |
| self, | |
| *, | |
| url: str = "", | |
| text_target: str = "", | |
| max_chars: int = 1600, | |
| include_ocr: bool = False, | |
| ) -> Dict[str, Any]: | |
| page = self._ensure_page() | |
| if url: | |
| page.goto(url.strip(), wait_until="domcontentloaded") | |
| page.wait_for_timeout(800) | |
| self._inject_overlay(page, "Phillnet browser_mode: scroll to text") | |
| target = (text_target or "").strip() | |
| if not target: | |
| raise ValueError("text_target is required for scroll_to_text") | |
| locator = page.get_by_text(target, exact=False) | |
| if locator.count() == 0: | |
| first_word = target.split(" ", 1)[0].strip() | |
| if first_word: | |
| locator = page.get_by_text(first_word, exact=False) | |
| locator = locator.first | |
| if locator.count() > 0: | |
| self._move_cursor_to_locator(page, locator, f"Scrolling to {target[:48]}") | |
| try: | |
| locator.scroll_into_view_if_needed(timeout=4000) | |
| except Exception: | |
| pass | |
| page.wait_for_timeout(500) | |
| payload = self._page_payload(page, action="scroll_to_text", max_chars=max_chars, include_ocr=include_ocr) | |
| payload["text_target"] = target | |
| payload["agentic_context"] = { | |
| "current_state": "active", | |
| "can_continue": True, | |
| "system_instruction": "Evaluate if the current result satisfies your objective. If not, chain the next logical tool call." | |
| } | |
| return payload | |
| def screenshot(self) -> Dict[str, Any]: | |
| page = self._ensure_page() | |
| self._inject_overlay(page, "Phillnet browser_mode: screenshot") | |
| return {"action": "screenshot", "screenshot_path": self._capture_screenshot(page, prefix="manual")} | |
| def accessibility(self) -> Dict[str, Any]: | |
| page = self._ensure_page() | |
| return { | |
| "action": "accessibility", | |
| "url": page.url, | |
| "title": page.title(), | |
| "accessibility_tree": self._capture_accessibility_tree(page), | |
| "page_analysis": self._capture_page_analysis(page), | |
| } | |
| def analyze(self, *, max_chars: int = 1600, include_ocr: bool = False) -> Dict[str, Any]: | |
| page = self._ensure_page() | |
| # CYBER: Neural Sweep Activation | |
| try: | |
| page.evaluate("(active) => window.__phillnetSetScan && window.__phillnetSetScan(active)", True) | |
| page.wait_for_timeout(800) | |
| except Exception: pass | |
| self._inject_overlay(page, "Phillnet browser_mode: analyze page") | |
| res = self._page_payload(page, action="analyze", max_chars=max_chars, include_ocr=include_ocr) | |
| try: | |
| page.evaluate("(active) => window.__phillnetSetScan && window.__phillnetSetScan(active)", False) | |
| except Exception: pass | |
| return res | |
| def vision(self, *, max_chars: int = 2400) -> Dict[str, Any]: | |
| """Layered Vision Fallback Engine: Ensures 'Total Awareness' across 5 levels.""" | |
| page = self._ensure_page() | |
| self._inject_overlay(page, "Phillnet: gathering total awareness") | |
| self._flash_key(page, "EYE OF THE SWARM ACTIVE") | |
| # Primary Payload Generation | |
| payload = self._page_payload(page, action="vision", max_chars=max_chars, include_ocr=True) | |
| awareness_layers = {} | |
| v_node = payload.get("vision_node_data") or {} | |
| # LEVEL 1: Neural Describer (Ollama/Moondream) | |
| try: | |
| img_path = payload.get("screenshot_path") | |
| if img_path and Path(img_path).exists(): | |
| page.evaluate("() => window.__phillnetSetVisionBadge && window.__phillnetSetVisionBadge(true)") | |
| desc = self._describe_image_with_vision_model(img_path) | |
| awareness_layers["level_1_neural_desc"] = desc | |
| except Exception as e: | |
| awareness_layers["level_1_neural_desc"] = f"Layer 1 Failed: {e}" | |
| finally: | |
| try: page.evaluate("() => window.__phillnetSetVisionBadge && window.__phillnetSetVisionBadge(false)") | |
| except: pass | |
| # LEVEL 2: Advanced Visual Forms (Colors, Salience) | |
| try: | |
| colors = v_node.get("colors", []) | |
| salience = v_node.get("salience", []) | |
| color_str = ", ".join([f"{c['hex']} ({int(c['weight']*100)}%)" for c in colors[:3]]) | |
| awareness_layers["level_2_visual_forms"] = f"Dominant Palette: {color_str}\nVisual Regions: {len(salience)} quadrants analyzed." | |
| except Exception as e: | |
| awareness_layers["level_2_visual_forms"] = f"Layer 2 Failed: {e}" | |
| # LEVEL 3: Structural Optical Map (Semantic Components + Labels) | |
| try: | |
| labels = payload.get("labeled_elements") or [] | |
| components = payload.get("semantic_components") or [] | |
| layout_map = [] | |
| for comp in components[:5]: | |
| layout_map.append(f"{comp['tag'].upper()} at {comp['box']}") | |
| for lbl in labels[:12]: | |
| layout_map.append(f"[{lbl['idx']}] {lbl['tag']} '{lbl['text']}'") | |
| awareness_layers["level_3_structural_map"] = "\n".join(layout_map) | |
| except Exception as e: | |
| awareness_layers["level_3_structural_map"] = f"Layer 3 Failed: {e}" | |
| # LEVEL 4: OCR (Optical Character Recognition + Word Grounding) | |
| ocr = v_node.get("ocr") or {} | |
| awareness_layers["level_4_pixel_text"] = ocr.get("text", "")[:max_chars // 2] if isinstance(ocr, dict) else "" | |
| # LEVEL 5: Logical A11y Tree (The Fallback of Last Resort) | |
| acc = payload.get("accessibility_tree") | |
| awareness_layers["level_5_logical_tree"] = str(acc)[:max_chars // 2] if acc else "" | |
| # CONSOLIDATED CORTEX OBSERVATION | |
| cortex_parts = ["## TOTAL CORTEX AWARENESS REPORT"] | |
| if "level_1_neural_desc" in awareness_layers and not awareness_layers["level_1_neural_desc"].startswith("Layer 1"): | |
| cortex_parts.append(f"### OPTICAL REASONING\n{awareness_layers['level_1_neural_desc']}") | |
| cortex_parts.append(f"### VISUAL FORMS\n{awareness_layers.get('level_2_visual_forms', '')}") | |
| cortex_parts.append(f"### INTERACTIVE LANDSCAPE\n{awareness_layers.get('level_3_structural_map', '')}") | |
| if len(awareness_layers["level_4_pixel_text"]) > 10: | |
| cortex_parts.append(f"### PIXEL DATA (OCR)\n{awareness_layers['level_4_pixel_text']}") | |
| payload["vision_summary"] = "\n\n".join(cortex_parts) | |
| payload["awareness_layers"] = awareness_layers | |
| # Rate the quality of vision | |
| rating = 0 | |
| if not awareness_layers["level_1_neural_desc"].startswith("Layer 1"): rating += 35 | |
| if awareness_layers.get("level_2_visual_forms") and "Failed" not in awareness_layers["level_2_visual_forms"]: rating += 15 | |
| if awareness_layers.get("level_3_structural_map") and "Failed" not in awareness_layers["level_3_structural_map"]: rating += 25 | |
| if awareness_layers.get("level_4_pixel_text"): rating += 15 | |
| if awareness_layers.get("level_5_logical_tree"): rating += 10 | |
| payload["awareness_rating"] = f"{rating}%" | |
| payload["agentic_context"] = { | |
| "current_state": "active", | |
| "can_continue": True, | |
| "system_instruction": "Evaluate if the current result satisfies your objective. If not, chain the next logical tool call." | |
| } | |
| return payload | |
| def _describe_image_with_vision_model(self, image_path: str) -> str: | |
| """Calls a local/remote vision model (e.g. moondream via Ollama) to describe the screen.""" | |
| try: | |
| import base64 | |
| import requests | |
| with open(image_path, "rb") as image_file: | |
| base64_image = base64.b64encode(image_file.read()).decode('utf-8') | |
| # Default to local Ollama with moondream or the currently used model | |
| url = "http://localhost:11434/api/generate" | |
| payload = { | |
| "model": "moondream", | |
| "prompt": "Analyze this screenshot. Describe the main layout, what central elements are visible, and any prominent text or images. Be concise and technical for an AI agent.", | |
| "images": [base64_image], | |
| "stream": False | |
| } | |
| response = requests.post(url, json=payload, timeout=25) | |
| if response.status_code == 200: | |
| return response.json().get("response", "Vision model failed to return a description.") | |
| return f"Ollama Error: {response.status_code}" | |
| except Exception as e: | |
| return f"Vision Analysis Failed: {e}" | |
| def update_progress(self, current: Union[str, int]): | |
| """Update the HUD progress bar.""" | |
| page = self._ensure_page() | |
| try: | |
| val = int(current) | |
| page.evaluate(f"() => window.__phillnetUpdateProgress && window.__phillnetUpdateProgress({val}, 12)") | |
| except Exception: | |
| pass | |
| return {"status": "progress_updated"} | |
| def remember(self, key: str, value: str): | |
| """Store a fact in the browser's working memory.""" | |
| self.working_memory[str(key)] = str(value) | |
| return {"status": "remembered", "memory": self.working_memory} | |
| def run( | |
| self, | |
| *, | |
| action: str, | |
| query: Optional[str] = None, | |
| url: Optional[str] = None, | |
| selector: Optional[str] = None, | |
| text_target: Optional[str] = None, | |
| text: Optional[str] = None, | |
| key: Optional[str] = None, | |
| direction: Optional[str] = None, | |
| amount: Optional[int] = None, | |
| max_results: int = 5, | |
| max_snippet_chars: int = 280, | |
| max_chars: int = 1600, | |
| include_ocr: bool = False, | |
| ) -> Dict[str, Any]: | |
| action = (action or "").strip().lower() | |
| self.history.append(f"{action} {query or url or ''}".strip()) | |
| if len(self.history) > 20: self.history.pop(0) | |
| try: | |
| if action == "remember": | |
| return self.remember(str(query or ""), str(text or "")) | |
| if action == "set_hud": | |
| page = self._ensure_page() | |
| self._inject_overlay(page, str(query or "")) | |
| return {"status": "hud_updated"} | |
| if action == "update_progress": | |
| page = self._ensure_page() | |
| try: | |
| current = int(query or 0) | |
| page.evaluate(f"() => window.__phillnetUpdateProgress && window.__phillnetUpdateProgress({current}, 12)") | |
| except Exception: pass | |
| return {"status": "progress_updated"} | |
| if action == "search": | |
| return self.search(query or "", max_results=max_results, max_snippet_chars=max_snippet_chars, include_ocr=include_ocr) | |
| if action == "open": | |
| return self.open(url or "", max_chars=max_chars, include_ocr=include_ocr) | |
| if action == "click": | |
| return self.click(url or "", selector=selector, text_target=text_target, max_chars=max_chars, include_ocr=include_ocr) | |
| if action == "type": | |
| return self.type_text(url=url or "", selector=selector, text=text or "", max_chars=max_chars, include_ocr=include_ocr) | |
| if action == "press": | |
| return self.press_key(url=url or "", key=key or "Enter", max_chars=max_chars, include_ocr=include_ocr) | |
| if action == "scroll": | |
| return self.scroll(url=url or "", direction=direction or "down", amount=amount or 900, max_chars=max_chars, include_ocr=include_ocr) | |
| if action == "scroll_to_text": | |
| return self.scroll_to_text(url=url or "", text_target=text_target or text or query or "", max_chars=max_chars, include_ocr=include_ocr) | |
| if action == "screenshot": | |
| return self.screenshot() | |
| if action == "accessibility": | |
| return self.accessibility() | |
| if action == "analyze": | |
| return self.analyze(max_chars=max_chars, include_ocr=include_ocr) | |
| if action == "vision": | |
| return self.vision(max_chars=max_chars) | |
| if action == "ocr": | |
| page = self._ensure_page() | |
| screenshot_path = self._capture_screenshot(page, prefix="ocr") | |
| return {"action": "ocr", "screenshot_path": screenshot_path, "ocr": self._run_ocr(screenshot_path)} | |
| if action == "click_label": | |
| label_idx = int(query or text_target or text or 0) | |
| return self.click_label(label_idx, max_chars=max_chars, include_ocr=include_ocr) | |
| if action == "hover_label": | |
| label_idx = int(query or text_target or text or 0) | |
| return self.hover_label(label_idx, max_chars=max_chars, include_ocr=include_ocr) | |
| if action == "clean": | |
| return self.clean_page(max_chars=max_chars, include_ocr=include_ocr) | |
| raise ValueError(f"Unsupported browser action: {action}") | |
| except PlaywrightTimeoutError as exc: | |
| return {"action": action, "error": f"timeout: {exc}"} | |
| except Exception as exc: | |
| return {"action": action, "error": str(exc)} | |
| class ReflexController: | |
| """High-speed orchestrator for real-time browser interaction (Games/Reflexes).""" | |
| def __init__(self, browser_tool: PlaywrightBrowserTool): | |
| self.tool = browser_tool | |
| self.last_burst_time = 0 | |
| def execute_burst(self, script: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Execute a sequence of reflex actions in a tight loop. | |
| Example script: [{'action': 'mouse_move', 'x': 500, 'y': 500}, {'action': 'mouse_down'}] | |
| """ | |
| self.tool.enable_reflex_mode(True) | |
| start = time.perf_counter() | |
| results = [] | |
| try: | |
| for step in script: | |
| action = step.get("action") | |
| if action == "mouse_move": | |
| self.tool.mouse_move(step["x"], step["y"]) | |
| elif action == "mouse_down": | |
| self.tool.mouse_down() | |
| elif action == "mouse_up": | |
| self.tool.mouse_up() | |
| elif action == "key_down": | |
| self.tool.key_down(step["key"]) | |
| elif action == "key_up": | |
| self.tool.key_up(step["key"]) | |
| elif action == "wait": | |
| time.sleep(step.get("ms", 10) / 1000.0) | |
| results.append(action) | |
| except Exception as e: | |
| return {"status": "error", "error": str(e)} | |
| self.last_burst_time = (time.perf_counter() - start) * 1000 | |
| return { | |
| "status": "success", | |
| "actions_performed": results, | |
| "latency_ms": f"{self.last_burst_time:.2f}ms" | |
| } | |