import os import time import json import subprocess import urllib.request import urllib.parse class BrowserController: """Jarvis's standalone hands for browser automation.""" def __init__(self, reporting_url="http://localhost:18700/api/redteam/report"): self.playwright = None self.browser = None self.context = None self.page = None self.reporting_url = reporting_url # ๐Ÿ”ฑ ํด๋ผ์šฐ๋“œ ํ™˜๊ฒฝ: HUGIN_URL์ด ์„ค์ •๋œ ๊ฒฝ์šฐ ์›๊ฒฉ HTTP ๋ชจ๋“œ๋กœ ๋™์ž‘ self.hugin_remote_url = os.environ.get("HUGIN_URL", "").rstrip("/") def _report(self, message, level="INFO"): """Reports progress back to the Red Team console.""" try: import requests requests.post(self.reporting_url, json={ "message": message, "level": level, "agent": "Jarvis Brain (Autonomous)" }, timeout=2) except: pass def start(self, headless: bool = True) -> str: """Initializes the browser.""" try: if self.playwright or self.browser: self.stop() self._report(f"[STATUS] ์ž์œจ ๋ธŒ๋ผ์šฐ์ € ์ œ์–ด ์—”์ง„ ์ดˆ๊ธฐํ™” ์ค‘... (headless={headless})", "INFO") # Apply nest_asyncio to handle cases where an event loop is already running (e.g. within GenAI SDK) try: import nest_asyncio nest_asyncio.apply() except ImportError: pass from playwright.sync_api import sync_playwright self.playwright = sync_playwright().start() self.browser = self.playwright.chromium.launch( headless=headless, args=["--no-sandbox", "--disable-setuid-sandbox"] ) # Grant clipboard permissions self.context = self.browser.new_context( permissions=['clipboard-read', 'clipboard-write'] ) self.page = self.context.new_page() self._report("[STATUS] ์ œ๊ตญ ์ „์šฉ ๋ธŒ๋ผ์šฐ์ € ๊ธฐ๋™ ์™„๋ฃŒ.", "INFO") return "Browser started." except Exception as e: import traceback traceback.print_exc() self._report(f"[ERROR] ๋ธŒ๋ผ์šฐ์ € ์ดˆ๊ธฐํ™” ์‹คํŒจ: {str(e)}", "ERROR") return f"Error starting browser: {str(e)}" def open_url(self, url: str): """Navigates to a URL.""" self._report(f"[ACTION] ํƒ€๊ฒŸ ์œ„์ƒ ์ ‘์† ์ค‘: {url}", "ACTION") try: self.page.goto(url, timeout=30000) return f"Successfully opened {url}" except Exception as e: self._report(f"[ERROR] ์ ‘์† ์‹คํŒจ: {str(e)}", "ERROR") return f"Error opening {url}: {str(e)}" def click(self, selector: str): """Clicks an element.""" self._report(f"[ACTION] ์š”์†Œ ํƒ€๊ฒฉ ์‹œ๋„: {selector}", "ACTION") try: self.page.click(selector, timeout=10000) return f"Clicked {selector}" except Exception as e: self._report(f"[ERROR] ํด๋ฆญ ์‹คํŒจ: {str(e)}", "ERROR") return f"Error clicking {selector}: {str(e)}" def type_text(self, selector: str, text: str): """Types text into an element.""" self._report(f"[ACTION] ๋ฐ์ดํ„ฐ ์ฃผ์ž…: {selector} -> [REDACTED]", "ACTION") try: self.page.fill(selector, text, timeout=10000) return f"Typed text into {selector}" except Exception as e: self._report(f"[ERROR] ์ž…๋ ฅ ์‹คํŒจ: {str(e)}", "ERROR") return f"Error typing into {selector}: {str(e)}" def smart_type(self, selector: str, text: str): """ Types text using clipboard bypass (Smart Type). Essential for Korean input on Flutter/CanvasKit interfaces. """ self._report(f"[ACTION] ์Šค๋งˆํŠธ ๋ฐ์ดํ„ฐ ์ฃผ์ž… (์šฐํšŒ): {selector} -> [REDACTED]", "ACTION") try: # 1. Focus element if selector: self.page.click(selector, timeout=5000) # 2. Inject text into clipboard via JS escaped_text = text.replace("`", "\\`").replace("${", "\\${") self.page.evaluate(f"navigator.clipboard.writeText(`{escaped_text}`)") # 3. Paste via Keyboard time.sleep(0.2) self.page.keyboard.press("Control+V") return f"Smart typed (Pasted) text into {selector or 'current focus'}" except Exception as e: self._report(f"[ERROR] ์Šค๋งˆํŠธ ์ž…๋ ฅ ์‹คํŒจ: {str(e)}", "ERROR") # Fallback to standard fill return self.type_text(selector, text) def get_content(self) -> str: """Gets page content for analysis.""" self._report("[STATUS] ํƒ€์ผ“ ๋ฐ์ดํ„ฐ ์ •๋ฐ€ ๋ถ„์„ ์ค‘...", "INFO") try: return self.page.content() except Exception as e: return f"Error getting content: {str(e)}" def capture(self, filename: str = "audit_snapshot.png"): """Takes a screenshot.""" try: path = os.path.join(os.getcwd(), "docker_data", "shared_workspace", "JarvisRun", filename) self.page.screenshot(path=path, timeout=5000) self._report(f"[STATUS] ๊ฐ์‚ฌ ์ฆ๊ฑฐ ํ™•๋ณด: {filename}", "INFO") return f"Screenshot saved to {path}" except Exception as e: self._report(f"[ERROR] ์บก์ฒ˜ ์‹คํŒจ: {str(e)}", "ERROR") return f"Error taking screenshot: {str(e)}" def press_key(self, selector: str, key: str): """Presses a key on an element (e.g. Enter to submit).""" self._report(f"[ACTION] ํ‚ค ์ž…๋ ฅ ์‹คํ–‰: {key} on {selector}", "ACTION") try: self.page.press(selector, key, timeout=10000) return f"Pressed {key} on {selector}" except Exception as e: self._report(f"[ERROR] ํ‚ค ์ž…๋ ฅ ์‹คํŒจ: {str(e)}", "ERROR") return f"Error pressing {key} on {selector}: {str(e)}" def get_text(self, selector: str): """Gets the text content of a specific element.""" self._report(f"[STATUS] ์š”์†Œ ํ…์ŠคํŠธ ์ถ”์ถœ: {selector}", "INFO") try: return self.page.inner_text(selector) except: return "(element not found or empty)" def wait_for_selector(self, selector: str, timeout: int = 10000): """Waits for an element to appear on the page.""" self._report(f"[STATUS] ์š”์†Œ ๋Œ€๊ธฐ ์ค‘: {selector}", "INFO") try: self.page.wait_for_selector(selector, timeout=timeout) return f"Element {selector} found." except: return f"Element {selector} not found (timeout)." def send_chat_message(self, selector: str, message: str, submit_key: str = "Enter"): """Types a chat message and submits it โ€” the core attack primitive.""" self._report(f"[ACTION] ์ฑ„ํŒ… ๊ณต๊ฒฉ ํŽ˜์ด๋กœ๋“œ ์ฃผ์ž…: {message[:80]}...", "ACTION") try: self.page.fill(selector, message) import time time.sleep(0.5) self.page.press(selector, submit_key) self._report(f"[STATUS] ํŽ˜์ด๋กœ๋“œ ์ „์†ก ์™„๋ฃŒ. ์‘๋‹ต ๋Œ€๊ธฐ ์ค‘...", "ACTION") time.sleep(3) # Wait for AI response return f"Chat message sent: {message[:50]}" except Exception as e: self._report(f"[ERROR] ์ฑ„ํŒ… ์ „์†ก ์‹คํŒจ: {str(e)}", "ACTION") return f"Error: {str(e)}" def hugin_navigate(self, url: str): """Navigates the Master's visible browser using the Hugin relay.""" self._report(f"[HUGIN Recon] [Hugin Relay] Navigating to: {url}", "ACTION") return self._run_hugin("navigate", url) def hugin_screenshot(self) -> str: """Takes a screenshot of the Master's visible browser.""" self._report("[ACTION] [Hugin Relay] Taking screenshot...", "ACTION") return self._run_hugin("screenshot") def hugin_click(self, text: str): """Clicks an element with the specified text in the visible browser.""" self._report(f"[ACTION] [Hugin Relay] Clicking text: {text}", "ACTION") return self._run_hugin("click_text", text) def hugin_type(self, text: str): """Types text into the currently active element in the visible browser.""" self._report(f"[ACTION] [Hugin Relay] Typing text: [REDACTED]", "ACTION") return self._run_hugin("type_text", text) def _run_hugin(self, action, value=None): """Helper to run hugin - remote HTTP if HUGIN_URL set, else local script.""" # ๐Ÿ”ฑ ํด๋ผ์šฐ๋“œ ๋ชจ๋“œ: HUGIN_URL ํ™˜๊ฒฝ๋ณ€์ˆ˜๊ฐ€ ์žˆ์œผ๋ฉด ์›๊ฒฉ ํ›„๊ธด9 API ํ˜ธ์ถœ if self.hugin_remote_url: return self._run_hugin_remote(action, value) # ๋กœ์ปฌ ๋ชจ๋“œ: ๋กœ์ปฌ Node.js ์Šคํฌ๋ฆฝํŠธ ์‹คํ–‰ return self._run_hugin_local(action, value) def _run_hugin_remote(self, action, value=None): """Calls the remote Hugin9 cloud service via HTTP.""" try: if action == "navigate": url = f"{self.hugin_remote_url}/search?query={urllib.parse.quote(str(value or ''))}" elif action == "screenshot": url = f"{self.hugin_remote_url}/" else: # ๋ฒ”์šฉ: /search ๋กœ ์ฒ˜๋ฆฌ url = f"{self.hugin_remote_url}/search?query={urllib.parse.quote(str(value or action))}" self._report(f"[HUGIN REMOTE] Calling: {url}", "INFO") req = urllib.request.Request(url, headers={"User-Agent": "ShadowBrain/1.0"}) with urllib.request.urlopen(req, timeout=30) as resp: data = resp.read().decode("utf-8") result = json.loads(data) self._report(f"[HUGIN REMOTE] Success: {str(result)[:100]}", "INFO") return json.dumps(result, ensure_ascii=False) except Exception as e: self._report(f"[ERROR] Hugin Remote Failed: {str(e)}", "ERROR") return f"Remote Hugin Error: {str(e)}" def _run_hugin_local(self, action, value=None): """Runs the local hugin_control.js script (local environment only).""" try: proj_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) script_path = os.path.join(proj_root, "scripts", "hugin", "hugin_control.js") args = ["node", script_path, action] if value: args.append(value) # Increase timeout to navigate slow pages result = subprocess.run(args, capture_output=True, encoding='utf-8', timeout=90) if result.returncode == 0: print(f"[RECON DEBUG] Hugin Success: {result.stdout.strip()}") return result.stdout.strip() else: err = result.stderr or result.stdout print(f"[RECON DEBUG] Hugin Error ({result.returncode}): {err}") self._report(f"[ERROR] Hugin Error: {err[:100]}", "ERROR") return f"Error: {err}" except Exception as e: self._report(f"[ERROR] Hugin Execution Failed: {str(e)}", "ERROR") return f"Failed: {str(e)}" def local_playwright_search(self, query: str, engine: str = "naver") -> str: """Performs a highly optimized local search using Playwright (Odin Strategy).""" self._report(f"[ACTION] ๋กœ์ปฌ ๋ธŒ๋ผ์šฐ์ €(Playwright) ์ •๋ฐ€ ๊ฒ€์ƒ‰ ๊ธฐ๋™: {query}", "ACTION") import re from datetime import datetime, timezone try: import nest_asyncio nest_asyncio.apply() except ImportError: pass # Helper algorithms from Odin def compact_text(text, limit=900): text = re.sub(r"\s+", " ", (text or "")).strip() return text[:limit] def add_result(results, title, link, snippet): title = compact_text(title, 160) link = (link or "").strip() snippet = compact_text(snippet, 700) if not title or not link.startswith("http"): return if any(item.get("link") == link for item in results): return results.append({"title": title, "link": link, "snippet": snippet}) def add_page_summary(results, source_name, url, text): summary = compact_text(text, 1400) if summary: add_result(results, f"{source_name} ๊ฒ€์ƒ‰ ํŽ˜์ด์ง€ ์š”์•ฝ", url, summary) def generic_anchor_results(page, results, selectors, max_results=8): for selector in selectors: for item in page.query_selector_all(selector): if len(results) >= max_results: return try: title = item.inner_text().strip() link = item.get_attribute("href") or "" parent = item.evaluate_handle("node => node.closest('li, div, section, article') || node.parentElement") snippet = "" if parent: try: snippet = parent.as_element().inner_text() except Exception: snippet = "" add_result(results, title, link, snippet) except Exception: continue def korea_president_direct_sources(page, q): if not re.search(r"(๋Œ€ํ•œ๋ฏผ๊ตญ|ํ•œ๊ตญ|์šฐ๋ฆฌ๋‚˜๋ผ|๋Œ€ํ†ต๋ น|ํ˜„์ง|ํ˜„์žฌ)", q): return [] if "๋Œ€ํ†ต๋ น" not in q: return [] direct_results = [] sources = [ ( "Wikipedia - ๋Œ€ํ•œ๋ฏผ๊ตญ ๋Œ€ํ†ต๋ น", "https://ko.wikipedia.org/wiki/%EB%8C%80%ED%95%9C%EB%AF%BC%EA%B5%AD%EC%9D%98_%EB%8C%80%ED%86%B5%EB%A0%B9", ), ( "Wikipedia - ๋Œ€ํ•œ๋ฏผ๊ตญ ๋Œ€ํ†ต๋ น ๋ชฉ๋ก", "https://ko.wikipedia.org/wiki/%EB%8C%80%ED%95%9C%EB%AF%BC%EA%B5%AD_%EB%8C%80%ED%86%B5%EB%A0%B9_%EB%AA%A9%EB%A1%9D", ), ("๋Œ€ํ•œ๋ฏผ๊ตญ ๋Œ€ํ†ต๋ น์‹ค", "https://www.president.go.kr/"), ] for title, url in sources: if len(direct_results) >= 3: break try: page.goto(url, wait_until="domcontentloaded", timeout=10000) page.wait_for_timeout(1000) text = page.locator("body").inner_text(timeout=4000) if text: add_result(direct_results, title, url, text) except Exception: continue return direct_results # Search Engines def naver_search(page, encoded_q): results = [] url = f"https://search.naver.com/search.naver?where=nexearch&query={encoded_q}" try: page.goto(url, wait_until="domcontentloaded", timeout=9000) page.wait_for_timeout(1200) add_page_summary(results, "๋„ค์ด๋ฒ„", url, page.locator("body").inner_text(timeout=3000)) generic_anchor_results(page, results, [ "a.news_tit", "a.title_link", "a.link_tit", "a.api_txt_lines", "a.link_name", "a[href*='news.naver.com']", "a[href*='namu.wiki']", "a[href*='wikipedia.org']", "a[href^='http']", ]) except Exception: pass return results def daum_search(page, encoded_q): results = [] url = f"https://search.daum.net/search?w=tot&q={encoded_q}" try: page.goto(url, wait_until="domcontentloaded", timeout=9000) page.wait_for_timeout(1000) add_page_summary(results, "๋‹ค์Œ", url, page.locator("body").inner_text(timeout=3000)) generic_anchor_results(page, results, [ "a.tit-g", "a.tit_main", "strong.tit-g a", "div.wrap_tit a", "a[href^='http']", ]) except Exception: pass return results def google_search(page, encoded_q): results = [] url = f"https://www.google.com/search?q={encoded_q}" try: page.goto(url, wait_until="domcontentloaded", timeout=8000) page.wait_for_timeout(1000) for item in page.query_selector_all("div.g")[:8]: try: title_el = item.query_selector("h3") link_el = item.query_selector("a") snippet_el = item.query_selector("div.VwiC3b, span.aCOpbc") add_result( results, title_el.inner_text() if title_el else "", link_el.get_attribute("href") if link_el else "", snippet_el.inner_text() if snippet_el else "", ) except Exception: continue except Exception: pass return results from playwright.sync_api import sync_playwright now_utc = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") enriched_query = f"{query} ํ˜„์žฌ ์ตœ์‹  {datetime.now().year}" encoded_query = urllib.parse.quote(enriched_query) # ๐Ÿ”ฑ [Imperial Fix] Playwright Sync API๋Š” ์‹คํ–‰ ์ค‘์ธ asyncio ์ด๋ฒคํŠธ ๋ฃจํ”„ ์•ˆ์—์„œ # ํ˜ธ์ถœ๋˜๋ฉด "Please use the Async API instead" ์—๋Ÿฌ๋กœ ์ฆ‰์‹œ ์‹คํŒจํ•œ๋‹ค(์›น๊ฒ€์ƒ‰ ๋ถˆ๊ฐ€ โ†’ # LLM์ด '์ผ์‹œ์  ์˜ค๋ฅ˜'๋กœ ๋ณด๊ณ ํ•˜๋˜ ์›์ธ). ํ˜ธ์ถœ ์Šคํƒ์˜ ๋ฃจํ”„ ์ƒํƒœ์™€ ๋ฌด๊ด€ํ•˜๊ฒŒ ํ•ญ์ƒ # ๋™์ž‘ํ•˜๋„๋ก, ์‹ค์ œ sync Playwright ์ž‘์—…์€ ์ „์šฉ ์Šค๋ ˆ๋“œ(๋ฃจํ”„ ์—†์Œ)์—์„œ ์‹คํ–‰ํ•œ๋‹ค. def _do_search(): pw = None browser = None results = [] try: pw = sync_playwright().start() browser = pw.chromium.launch(headless=True, args=["--no-sandbox", "--disable-setuid-sandbox"]) context = browser.new_context( locale="ko-KR", timezone_id="Asia/Seoul", user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36" ) page = context.new_page() # 1. ๋Œ€ํ†ต๋ น ๋ฐ ํ•œ๊ตญ ํ˜„์ง ๊ด€๋ จ ํŠนํ™” ์†Œ์Šค ์šฐ์„  ์ •์ฐฐ for item in korea_president_direct_sources(page, query): add_result(results, item.get("title"), item.get("link"), item.get("snippet")) # 2. ๋ฒ”์šฉ ํฌํ„ธ ์ˆœ์ฐจ ํƒ์ƒ‰ (๋„ค์ด๋ฒ„ โž” ๋‹ค์Œ โž” ๊ตฌ๊ธ€) engine_key = (engine or "naver").lower().strip() searchers = { "naver": [naver_search], "daum": [daum_search], "google": [google_search], "all": [naver_search, daum_search, google_search], }.get(engine_key, [naver_search]) for searcher in searchers: if len(results) >= 5: break for item in searcher(page, encoded_query): add_result(results, item.get("title"), item.get("link"), item.get("snippet")) # ์ •์ฐฐ ๊ฒฐ๊ณผ ์กฐ๋ฆฝ lines = [ "=== [Local Playwright Search Grounding Context] ===", f"์ •์ฐฐ ๊ธฐ์ค€ ์‹œ๊ฐ(UTC): {now_utc}", f"์งˆ์˜์–ด: '{query}' (์ตœ์‹ ์„ฑ ๊ฐ•ํ™”๋ฅผ ์œ„ํ•œ ํ™•์žฅ ํ‚ค์›Œ๋“œ: '{enriched_query}')", "์•„๋ž˜๋Š” ์›น๊ฒ€์ƒ‰์œผ๋กœ ์ด๋ฏธ ์ˆ˜์ง‘์„ ์™„๋ฃŒํ•œ ์‹ค์ œ ๊ฒฐ๊ณผ๋‹ค. (์ถ”๊ฐ€ ๊ฒ€์ƒ‰ ๋ถˆํ•„์š”)", "" ] if not results: lines.append("โš ๏ธ ์›น ์ •์ฐฐ์„ ์‹œ๋„ํ•˜์˜€์œผ๋‚˜ ์œ ํšจํ•œ ์ •๋ณด๋ฅผ ์ˆ˜์ง‘ํ•˜์ง€ ๋ชปํ–ˆ์Šต๋‹ˆ๋‹ค.") lines.append("") lines.append("[์ค‘์š” ์ง€์นจ] ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ๊ฐ€ ๋น„์–ด ์žˆ๋‹ค. ์ถ”์ธกํ•˜์ง€ ๋ง๊ณ , ๊ฒฐ๊ณผ๋ฅผ ์ฐพ์ง€ ๋ชปํ–ˆ๋‹ค๊ณ  ์†”์งํžˆ ํ•œ ์ค„๋กœ ์•Œ๋ฆฌ๊ณ  ๋‹ค๋ฅธ ๊ฒ€์ƒ‰์–ด๋ฅผ ์ œ์•ˆํ•˜๋ผ.") lines.append("=================================================") else: for idx, r in enumerate(results[:6]): lines.append(f"[์ •๋ณด {idx + 1}] Title: {r['title']}") lines.append(f"URL: {r['link']}") lines.append(f"Snippet: {r['snippet']}") lines.append("") lines.append("[์ค‘์š” ์ง€์นจ]") lines.append( "์œ„ ๊ฒฐ๊ณผ๋Š” ์ด๋ฏธ ์ˆ˜์ง‘์ด ๋๋‚œ ์ตœ์‹  ์ •๋ณด๋‹ค. ์ง€๊ธˆ ์ด ์‘๋‹ต์—์„œ ์ฆ‰์‹œ ํ•ต์‹ฌ์„ " "์š”์•ฝํ•ด ๋‹ตํ•˜๋ผ. '๊ฒ€์ƒ‰ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค / ์ •๋ณด๋ฅผ ์ˆ˜์ง‘ํ•˜๋Š” ๋Œ€๋กœ / ์š”์•ฝํ•ด ๋“œ๋ฆฌ๊ฒ ์Šต๋‹ˆ๋‹ค' " "๊ฐ™์€ ์˜ˆ๊ณ ๋งŒ ๋‚จ๊ธฐ๊ณ  ํ„ด์„ ๋๋‚ด๋Š” ๊ฒƒ์€ ์ ˆ๋Œ€ ๊ธˆ์ง€๋‹ค โ€” ๋ฐ˜๋“œ์‹œ ์‹ค์ œ ์š”์•ฝ ๋ณธ๋ฌธ์„ ์ž‘์„ฑํ•˜๋ผ. " "๋ชจ๋ธ์˜ ๊ธฐ์กด ์ง€์‹๊ณผ ์ถฉ๋Œํ•˜๋ฉด ์œ„ ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ๋ฅผ ์ตœ์šฐ์„ ์œผ๋กœ ์‹ ๋ขฐํ•˜๋ผ. " "์ถœ์ฒ˜๊ฐ€ ํ•„์š”ํ•˜๋ฉด ์œ„ URL์„ ์ธ์šฉํ•˜๋ผ." ) lines.append("=================================================") return "\n".join(lines) except Exception as e: import traceback tb = traceback.format_exc() self._report(f"[ERROR] ๋กœ์ปฌ ๋ธŒ๋ผ์šฐ์ € ๊ฒ€์ƒ‰ ์‹คํŒจ: {str(e)}", "ERROR") # ์ •ํ™•ํ•œ ์›์ธ ์ถ”์ ์„ ์œ„ํ•ด ์ „์ฒด ํŠธ๋ ˆ์ด์Šค๋ฐฑ์„ ์ฝ˜์†”/ํ‘œ์ค€์ถœ๋ ฅ์— ๋‚จ๊ธด๋‹ค. print(f"[Web Search][ERROR] engine={engine} query={query!r}\n{tb}", flush=True) return f"Error executing local search: {str(e)}" finally: try: if browser: browser.close() if pw: pw.stop() except Exception: pass # ์ „์šฉ ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰ โ†’ ํ˜ธ์ถœ ์ธก asyncio ๋ฃจํ”„ ์œ ๋ฌด์™€ ๋ฌด๊ด€ํ•˜๊ฒŒ sync Playwright ๋™์ž‘ ๋ณด์žฅ import concurrent.futures with concurrent.futures.ThreadPoolExecutor(max_workers=1) as _ex: try: return _ex.submit(_do_search).result(timeout=60) except concurrent.futures.TimeoutError: self._report("[ERROR] ๋กœ์ปฌ ๋ธŒ๋ผ์šฐ์ € ๊ฒ€์ƒ‰ ํƒ€์ž„์•„์›ƒ(60s)", "ERROR") return "Error executing local search: timeout(60s)" def stop(self) -> None: """Shuts down the browser with defensive error handling.""" try: if self.page: self.page.close() if self.context: self.context.close() if self.browser: self.browser.close() if self.playwright: self.playwright.stop() except Exception as e: # Silently handle greenlet/thread errors during cleanup print(f"[RECON DEBUG] Browser stop warning: {e}") finally: self.page = None self.context = None self.browser = None self.playwright = None self._report("[STATUS] ์ž์œจ ๋ธŒ๋ผ์šฐ์ € ์ œ์–ด ์„ธ์…˜ ์ข…๋ฃŒ.", "INFO") if __name__ == "__main__": # Test script ctrl = BrowserController() try: ctrl.start() ctrl.open_url("http://localhost:18889") ctrl.capture("test_audit.png") finally: ctrl.stop()