Spaces:
Running
Running
| import os | |
| import time | |
| import json | |
| import subprocess | |
| import urllib.request | |
| import urllib.parse | |
| class BrowserController: | |
| """Jarvis's standalone hands for browser automation.""" | |
| def __init__(self, reporting_url="http://localhost:18700/api/redteam/report"): | |
| self.playwright = None | |
| self.browser = None | |
| self.context = None | |
| self.page = None | |
| self.reporting_url = reporting_url | |
| # ๐ฑ ํด๋ผ์ฐ๋ ํ๊ฒฝ: HUGIN_URL์ด ์ค์ ๋ ๊ฒฝ์ฐ ์๊ฒฉ HTTP ๋ชจ๋๋ก ๋์ | |
| self.hugin_remote_url = os.environ.get("HUGIN_URL", "").rstrip("/") | |
| def _report(self, message, level="INFO"): | |
| """Reports progress back to the Red Team console.""" | |
| try: | |
| import requests | |
| requests.post(self.reporting_url, json={ | |
| "message": message, | |
| "level": level, | |
| "agent": "Jarvis Brain (Autonomous)" | |
| }, timeout=2) | |
| except: | |
| pass | |
| def start(self, headless: bool = True) -> str: | |
| """Initializes the browser.""" | |
| try: | |
| if self.playwright or self.browser: | |
| self.stop() | |
| self._report(f"[STATUS] ์์จ ๋ธ๋ผ์ฐ์ ์ ์ด ์์ง ์ด๊ธฐํ ์ค... (headless={headless})", "INFO") | |
| # Apply nest_asyncio to handle cases where an event loop is already running (e.g. within GenAI SDK) | |
| try: | |
| import nest_asyncio | |
| nest_asyncio.apply() | |
| except ImportError: | |
| pass | |
| from playwright.sync_api import sync_playwright | |
| self.playwright = sync_playwright().start() | |
| self.browser = self.playwright.chromium.launch( | |
| headless=headless, | |
| args=["--no-sandbox", "--disable-setuid-sandbox"] | |
| ) | |
| # Grant clipboard permissions | |
| self.context = self.browser.new_context( | |
| permissions=['clipboard-read', 'clipboard-write'] | |
| ) | |
| self.page = self.context.new_page() | |
| self._report("[STATUS] ์ ๊ตญ ์ ์ฉ ๋ธ๋ผ์ฐ์ ๊ธฐ๋ ์๋ฃ.", "INFO") | |
| return "Browser started." | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| self._report(f"[ERROR] ๋ธ๋ผ์ฐ์ ์ด๊ธฐํ ์คํจ: {str(e)}", "ERROR") | |
| return f"Error starting browser: {str(e)}" | |
| def open_url(self, url: str): | |
| """Navigates to a URL.""" | |
| self._report(f"[ACTION] ํ๊ฒ ์์ ์ ์ ์ค: {url}", "ACTION") | |
| try: | |
| self.page.goto(url, timeout=30000) | |
| return f"Successfully opened {url}" | |
| except Exception as e: | |
| self._report(f"[ERROR] ์ ์ ์คํจ: {str(e)}", "ERROR") | |
| return f"Error opening {url}: {str(e)}" | |
| def click(self, selector: str): | |
| """Clicks an element.""" | |
| self._report(f"[ACTION] ์์ ํ๊ฒฉ ์๋: {selector}", "ACTION") | |
| try: | |
| self.page.click(selector, timeout=10000) | |
| return f"Clicked {selector}" | |
| except Exception as e: | |
| self._report(f"[ERROR] ํด๋ฆญ ์คํจ: {str(e)}", "ERROR") | |
| return f"Error clicking {selector}: {str(e)}" | |
| def type_text(self, selector: str, text: str): | |
| """Types text into an element.""" | |
| self._report(f"[ACTION] ๋ฐ์ดํฐ ์ฃผ์ : {selector} -> [REDACTED]", "ACTION") | |
| try: | |
| self.page.fill(selector, text, timeout=10000) | |
| return f"Typed text into {selector}" | |
| except Exception as e: | |
| self._report(f"[ERROR] ์ ๋ ฅ ์คํจ: {str(e)}", "ERROR") | |
| return f"Error typing into {selector}: {str(e)}" | |
| def smart_type(self, selector: str, text: str): | |
| """ | |
| Types text using clipboard bypass (Smart Type). | |
| Essential for Korean input on Flutter/CanvasKit interfaces. | |
| """ | |
| self._report(f"[ACTION] ์ค๋งํธ ๋ฐ์ดํฐ ์ฃผ์ (์ฐํ): {selector} -> [REDACTED]", "ACTION") | |
| try: | |
| # 1. Focus element | |
| if selector: | |
| self.page.click(selector, timeout=5000) | |
| # 2. Inject text into clipboard via JS | |
| escaped_text = text.replace("`", "\\`").replace("${", "\\${") | |
| self.page.evaluate(f"navigator.clipboard.writeText(`{escaped_text}`)") | |
| # 3. Paste via Keyboard | |
| time.sleep(0.2) | |
| self.page.keyboard.press("Control+V") | |
| return f"Smart typed (Pasted) text into {selector or 'current focus'}" | |
| except Exception as e: | |
| self._report(f"[ERROR] ์ค๋งํธ ์ ๋ ฅ ์คํจ: {str(e)}", "ERROR") | |
| # Fallback to standard fill | |
| return self.type_text(selector, text) | |
| def get_content(self) -> str: | |
| """Gets page content for analysis.""" | |
| self._report("[STATUS] ํ์ผ ๋ฐ์ดํฐ ์ ๋ฐ ๋ถ์ ์ค...", "INFO") | |
| try: | |
| return self.page.content() | |
| except Exception as e: | |
| return f"Error getting content: {str(e)}" | |
| def capture(self, filename: str = "audit_snapshot.png"): | |
| """Takes a screenshot.""" | |
| try: | |
| path = os.path.join(os.getcwd(), "docker_data", "shared_workspace", "JarvisRun", filename) | |
| self.page.screenshot(path=path, timeout=5000) | |
| self._report(f"[STATUS] ๊ฐ์ฌ ์ฆ๊ฑฐ ํ๋ณด: {filename}", "INFO") | |
| return f"Screenshot saved to {path}" | |
| except Exception as e: | |
| self._report(f"[ERROR] ์บก์ฒ ์คํจ: {str(e)}", "ERROR") | |
| return f"Error taking screenshot: {str(e)}" | |
| def press_key(self, selector: str, key: str): | |
| """Presses a key on an element (e.g. Enter to submit).""" | |
| self._report(f"[ACTION] ํค ์ ๋ ฅ ์คํ: {key} on {selector}", "ACTION") | |
| try: | |
| self.page.press(selector, key, timeout=10000) | |
| return f"Pressed {key} on {selector}" | |
| except Exception as e: | |
| self._report(f"[ERROR] ํค ์ ๋ ฅ ์คํจ: {str(e)}", "ERROR") | |
| return f"Error pressing {key} on {selector}: {str(e)}" | |
| def get_text(self, selector: str): | |
| """Gets the text content of a specific element.""" | |
| self._report(f"[STATUS] ์์ ํ ์คํธ ์ถ์ถ: {selector}", "INFO") | |
| try: | |
| return self.page.inner_text(selector) | |
| except: | |
| return "(element not found or empty)" | |
| def wait_for_selector(self, selector: str, timeout: int = 10000): | |
| """Waits for an element to appear on the page.""" | |
| self._report(f"[STATUS] ์์ ๋๊ธฐ ์ค: {selector}", "INFO") | |
| try: | |
| self.page.wait_for_selector(selector, timeout=timeout) | |
| return f"Element {selector} found." | |
| except: | |
| return f"Element {selector} not found (timeout)." | |
| def send_chat_message(self, selector: str, message: str, submit_key: str = "Enter"): | |
| """Types a chat message and submits it โ the core attack primitive.""" | |
| self._report(f"[ACTION] ์ฑํ ๊ณต๊ฒฉ ํ์ด๋ก๋ ์ฃผ์ : {message[:80]}...", "ACTION") | |
| try: | |
| self.page.fill(selector, message) | |
| import time | |
| time.sleep(0.5) | |
| self.page.press(selector, submit_key) | |
| self._report(f"[STATUS] ํ์ด๋ก๋ ์ ์ก ์๋ฃ. ์๋ต ๋๊ธฐ ์ค...", "ACTION") | |
| time.sleep(3) # Wait for AI response | |
| return f"Chat message sent: {message[:50]}" | |
| except Exception as e: | |
| self._report(f"[ERROR] ์ฑํ ์ ์ก ์คํจ: {str(e)}", "ACTION") | |
| return f"Error: {str(e)}" | |
| def hugin_navigate(self, url: str): | |
| """Navigates the Master's visible browser using the Hugin relay.""" | |
| self._report(f"[HUGIN Recon] [Hugin Relay] Navigating to: {url}", "ACTION") | |
| return self._run_hugin("navigate", url) | |
| def hugin_screenshot(self) -> str: | |
| """Takes a screenshot of the Master's visible browser.""" | |
| self._report("[ACTION] [Hugin Relay] Taking screenshot...", "ACTION") | |
| return self._run_hugin("screenshot") | |
| def hugin_click(self, text: str): | |
| """Clicks an element with the specified text in the visible browser.""" | |
| self._report(f"[ACTION] [Hugin Relay] Clicking text: {text}", "ACTION") | |
| return self._run_hugin("click_text", text) | |
| def hugin_type(self, text: str): | |
| """Types text into the currently active element in the visible browser.""" | |
| self._report(f"[ACTION] [Hugin Relay] Typing text: [REDACTED]", "ACTION") | |
| return self._run_hugin("type_text", text) | |
| def _run_hugin(self, action, value=None): | |
| """Helper to run hugin - remote HTTP if HUGIN_URL set, else local script.""" | |
| # ๐ฑ ํด๋ผ์ฐ๋ ๋ชจ๋: HUGIN_URL ํ๊ฒฝ๋ณ์๊ฐ ์์ผ๋ฉด ์๊ฒฉ ํ๊ธด9 API ํธ์ถ | |
| if self.hugin_remote_url: | |
| return self._run_hugin_remote(action, value) | |
| # ๋ก์ปฌ ๋ชจ๋: ๋ก์ปฌ Node.js ์คํฌ๋ฆฝํธ ์คํ | |
| return self._run_hugin_local(action, value) | |
| def _run_hugin_remote(self, action, value=None): | |
| """Calls the remote Hugin9 cloud service via HTTP.""" | |
| try: | |
| if action == "navigate": | |
| url = f"{self.hugin_remote_url}/search?query={urllib.parse.quote(str(value or ''))}" | |
| elif action == "screenshot": | |
| url = f"{self.hugin_remote_url}/" | |
| else: | |
| # ๋ฒ์ฉ: /search ๋ก ์ฒ๋ฆฌ | |
| url = f"{self.hugin_remote_url}/search?query={urllib.parse.quote(str(value or action))}" | |
| self._report(f"[HUGIN REMOTE] Calling: {url}", "INFO") | |
| req = urllib.request.Request(url, headers={"User-Agent": "ShadowBrain/1.0"}) | |
| with urllib.request.urlopen(req, timeout=30) as resp: | |
| data = resp.read().decode("utf-8") | |
| result = json.loads(data) | |
| self._report(f"[HUGIN REMOTE] Success: {str(result)[:100]}", "INFO") | |
| return json.dumps(result, ensure_ascii=False) | |
| except Exception as e: | |
| self._report(f"[ERROR] Hugin Remote Failed: {str(e)}", "ERROR") | |
| return f"Remote Hugin Error: {str(e)}" | |
| def _run_hugin_local(self, action, value=None): | |
| """Runs the local hugin_control.js script (local environment only).""" | |
| try: | |
| proj_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| script_path = os.path.join(proj_root, "scripts", "hugin", "hugin_control.js") | |
| args = ["node", script_path, action] | |
| if value: | |
| args.append(value) | |
| # Increase timeout to navigate slow pages | |
| result = subprocess.run(args, capture_output=True, encoding='utf-8', timeout=90) | |
| if result.returncode == 0: | |
| print(f"[RECON DEBUG] Hugin Success: {result.stdout.strip()}") | |
| return result.stdout.strip() | |
| else: | |
| err = result.stderr or result.stdout | |
| print(f"[RECON DEBUG] Hugin Error ({result.returncode}): {err}") | |
| self._report(f"[ERROR] Hugin Error: {err[:100]}", "ERROR") | |
| return f"Error: {err}" | |
| except Exception as e: | |
| self._report(f"[ERROR] Hugin Execution Failed: {str(e)}", "ERROR") | |
| return f"Failed: {str(e)}" | |
| def local_playwright_search(self, query: str, engine: str = "naver") -> str: | |
| """Performs a highly optimized local search using Playwright (Odin Strategy).""" | |
| self._report(f"[ACTION] ๋ก์ปฌ ๋ธ๋ผ์ฐ์ (Playwright) ์ ๋ฐ ๊ฒ์ ๊ธฐ๋: {query}", "ACTION") | |
| import re | |
| from datetime import datetime, timezone | |
| try: | |
| import nest_asyncio | |
| nest_asyncio.apply() | |
| except ImportError: | |
| pass | |
| # Helper algorithms from Odin | |
| def compact_text(text, limit=900): | |
| text = re.sub(r"\s+", " ", (text or "")).strip() | |
| return text[:limit] | |
| def add_result(results, title, link, snippet): | |
| title = compact_text(title, 160) | |
| link = (link or "").strip() | |
| snippet = compact_text(snippet, 700) | |
| if not title or not link.startswith("http"): | |
| return | |
| if any(item.get("link") == link for item in results): | |
| return | |
| results.append({"title": title, "link": link, "snippet": snippet}) | |
| def add_page_summary(results, source_name, url, text): | |
| summary = compact_text(text, 1400) | |
| if summary: | |
| add_result(results, f"{source_name} ๊ฒ์ ํ์ด์ง ์์ฝ", url, summary) | |
| def generic_anchor_results(page, results, selectors, max_results=8): | |
| for selector in selectors: | |
| for item in page.query_selector_all(selector): | |
| if len(results) >= max_results: | |
| return | |
| try: | |
| title = item.inner_text().strip() | |
| link = item.get_attribute("href") or "" | |
| parent = item.evaluate_handle("node => node.closest('li, div, section, article') || node.parentElement") | |
| snippet = "" | |
| if parent: | |
| try: | |
| snippet = parent.as_element().inner_text() | |
| except Exception: | |
| snippet = "" | |
| add_result(results, title, link, snippet) | |
| except Exception: | |
| continue | |
| def korea_president_direct_sources(page, q): | |
| if not re.search(r"(๋ํ๋ฏผ๊ตญ|ํ๊ตญ|์ฐ๋ฆฌ๋๋ผ|๋ํต๋ น|ํ์ง|ํ์ฌ)", q): | |
| return [] | |
| if "๋ํต๋ น" not in q: | |
| return [] | |
| direct_results = [] | |
| sources = [ | |
| ( | |
| "Wikipedia - ๋ํ๋ฏผ๊ตญ ๋ํต๋ น", | |
| "https://ko.wikipedia.org/wiki/%EB%8C%80%ED%95%9C%EB%AF%BC%EA%B5%AD%EC%9D%98_%EB%8C%80%ED%86%B5%EB%A0%B9", | |
| ), | |
| ( | |
| "Wikipedia - ๋ํ๋ฏผ๊ตญ ๋ํต๋ น ๋ชฉ๋ก", | |
| "https://ko.wikipedia.org/wiki/%EB%8C%80%ED%95%9C%EB%AF%BC%EA%B5%AD_%EB%8C%80%ED%86%B5%EB%A0%B9_%EB%AA%A9%EB%A1%9D", | |
| ), | |
| ("๋ํ๋ฏผ๊ตญ ๋ํต๋ น์ค", "https://www.president.go.kr/"), | |
| ] | |
| for title, url in sources: | |
| if len(direct_results) >= 3: | |
| break | |
| try: | |
| page.goto(url, wait_until="domcontentloaded", timeout=10000) | |
| page.wait_for_timeout(1000) | |
| text = page.locator("body").inner_text(timeout=4000) | |
| if text: | |
| add_result(direct_results, title, url, text) | |
| except Exception: | |
| continue | |
| return direct_results | |
| # Search Engines | |
| def naver_search(page, encoded_q): | |
| results = [] | |
| url = f"https://search.naver.com/search.naver?where=nexearch&query={encoded_q}" | |
| try: | |
| page.goto(url, wait_until="domcontentloaded", timeout=9000) | |
| page.wait_for_timeout(1200) | |
| add_page_summary(results, "๋ค์ด๋ฒ", url, page.locator("body").inner_text(timeout=3000)) | |
| generic_anchor_results(page, results, [ | |
| "a.news_tit", | |
| "a.title_link", | |
| "a.link_tit", | |
| "a.api_txt_lines", | |
| "a.link_name", | |
| "a[href*='news.naver.com']", | |
| "a[href*='namu.wiki']", | |
| "a[href*='wikipedia.org']", | |
| "a[href^='http']", | |
| ]) | |
| except Exception: | |
| pass | |
| return results | |
| def daum_search(page, encoded_q): | |
| results = [] | |
| url = f"https://search.daum.net/search?w=tot&q={encoded_q}" | |
| try: | |
| page.goto(url, wait_until="domcontentloaded", timeout=9000) | |
| page.wait_for_timeout(1000) | |
| add_page_summary(results, "๋ค์", url, page.locator("body").inner_text(timeout=3000)) | |
| generic_anchor_results(page, results, [ | |
| "a.tit-g", | |
| "a.tit_main", | |
| "strong.tit-g a", | |
| "div.wrap_tit a", | |
| "a[href^='http']", | |
| ]) | |
| except Exception: | |
| pass | |
| return results | |
| def google_search(page, encoded_q): | |
| results = [] | |
| url = f"https://www.google.com/search?q={encoded_q}" | |
| try: | |
| page.goto(url, wait_until="domcontentloaded", timeout=8000) | |
| page.wait_for_timeout(1000) | |
| for item in page.query_selector_all("div.g")[:8]: | |
| try: | |
| title_el = item.query_selector("h3") | |
| link_el = item.query_selector("a") | |
| snippet_el = item.query_selector("div.VwiC3b, span.aCOpbc") | |
| add_result( | |
| results, | |
| title_el.inner_text() if title_el else "", | |
| link_el.get_attribute("href") if link_el else "", | |
| snippet_el.inner_text() if snippet_el else "", | |
| ) | |
| except Exception: | |
| continue | |
| except Exception: | |
| pass | |
| return results | |
| from playwright.sync_api import sync_playwright | |
| now_utc = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") | |
| enriched_query = f"{query} ํ์ฌ ์ต์ {datetime.now().year}" | |
| encoded_query = urllib.parse.quote(enriched_query) | |
| # ๐ฑ [Imperial Fix] Playwright Sync API๋ ์คํ ์ค์ธ asyncio ์ด๋ฒคํธ ๋ฃจํ ์์์ | |
| # ํธ์ถ๋๋ฉด "Please use the Async API instead" ์๋ฌ๋ก ์ฆ์ ์คํจํ๋ค(์น๊ฒ์ ๋ถ๊ฐ โ | |
| # LLM์ด '์ผ์์ ์ค๋ฅ'๋ก ๋ณด๊ณ ํ๋ ์์ธ). ํธ์ถ ์คํ์ ๋ฃจํ ์ํ์ ๋ฌด๊ดํ๊ฒ ํญ์ | |
| # ๋์ํ๋๋ก, ์ค์ sync Playwright ์์ ์ ์ ์ฉ ์ค๋ ๋(๋ฃจํ ์์)์์ ์คํํ๋ค. | |
| def _do_search(): | |
| pw = None | |
| browser = None | |
| results = [] | |
| try: | |
| pw = sync_playwright().start() | |
| browser = pw.chromium.launch(headless=True, args=["--no-sandbox", "--disable-setuid-sandbox"]) | |
| context = browser.new_context( | |
| locale="ko-KR", | |
| timezone_id="Asia/Seoul", | |
| user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36" | |
| ) | |
| page = context.new_page() | |
| # 1. ๋ํต๋ น ๋ฐ ํ๊ตญ ํ์ง ๊ด๋ จ ํนํ ์์ค ์ฐ์ ์ ์ฐฐ | |
| for item in korea_president_direct_sources(page, query): | |
| add_result(results, item.get("title"), item.get("link"), item.get("snippet")) | |
| # 2. ๋ฒ์ฉ ํฌํธ ์์ฐจ ํ์ (๋ค์ด๋ฒ โ ๋ค์ โ ๊ตฌ๊ธ) | |
| engine_key = (engine or "naver").lower().strip() | |
| searchers = { | |
| "naver": [naver_search], | |
| "daum": [daum_search], | |
| "google": [google_search], | |
| "all": [naver_search, daum_search, google_search], | |
| }.get(engine_key, [naver_search]) | |
| for searcher in searchers: | |
| if len(results) >= 5: | |
| break | |
| for item in searcher(page, encoded_query): | |
| add_result(results, item.get("title"), item.get("link"), item.get("snippet")) | |
| # ์ ์ฐฐ ๊ฒฐ๊ณผ ์กฐ๋ฆฝ | |
| lines = [ | |
| "=== [Local Playwright Search Grounding Context] ===", | |
| f"์ ์ฐฐ ๊ธฐ์ค ์๊ฐ(UTC): {now_utc}", | |
| f"์ง์์ด: '{query}' (์ต์ ์ฑ ๊ฐํ๋ฅผ ์ํ ํ์ฅ ํค์๋: '{enriched_query}')", | |
| "์๋๋ ์น๊ฒ์์ผ๋ก ์ด๋ฏธ ์์ง์ ์๋ฃํ ์ค์ ๊ฒฐ๊ณผ๋ค. (์ถ๊ฐ ๊ฒ์ ๋ถํ์)", | |
| "" | |
| ] | |
| if not results: | |
| lines.append("โ ๏ธ ์น ์ ์ฐฐ์ ์๋ํ์์ผ๋ ์ ํจํ ์ ๋ณด๋ฅผ ์์งํ์ง ๋ชปํ์ต๋๋ค.") | |
| lines.append("") | |
| lines.append("[์ค์ ์ง์นจ] ๊ฒ์ ๊ฒฐ๊ณผ๊ฐ ๋น์ด ์๋ค. ์ถ์ธกํ์ง ๋ง๊ณ , ๊ฒฐ๊ณผ๋ฅผ ์ฐพ์ง ๋ชปํ๋ค๊ณ ์์งํ ํ ์ค๋ก ์๋ฆฌ๊ณ ๋ค๋ฅธ ๊ฒ์์ด๋ฅผ ์ ์ํ๋ผ.") | |
| lines.append("=================================================") | |
| else: | |
| for idx, r in enumerate(results[:6]): | |
| lines.append(f"[์ ๋ณด {idx + 1}] Title: {r['title']}") | |
| lines.append(f"URL: {r['link']}") | |
| lines.append(f"Snippet: {r['snippet']}") | |
| lines.append("") | |
| lines.append("[์ค์ ์ง์นจ]") | |
| lines.append( | |
| "์ ๊ฒฐ๊ณผ๋ ์ด๋ฏธ ์์ง์ด ๋๋ ์ต์ ์ ๋ณด๋ค. ์ง๊ธ ์ด ์๋ต์์ ์ฆ์ ํต์ฌ์ " | |
| "์์ฝํด ๋ตํ๋ผ. '๊ฒ์ํ๊ฒ ์ต๋๋ค / ์ ๋ณด๋ฅผ ์์งํ๋ ๋๋ก / ์์ฝํด ๋๋ฆฌ๊ฒ ์ต๋๋ค' " | |
| "๊ฐ์ ์๊ณ ๋ง ๋จ๊ธฐ๊ณ ํด์ ๋๋ด๋ ๊ฒ์ ์ ๋ ๊ธ์ง๋ค โ ๋ฐ๋์ ์ค์ ์์ฝ ๋ณธ๋ฌธ์ ์์ฑํ๋ผ. " | |
| "๋ชจ๋ธ์ ๊ธฐ์กด ์ง์๊ณผ ์ถฉ๋ํ๋ฉด ์ ๊ฒ์ ๊ฒฐ๊ณผ๋ฅผ ์ต์ฐ์ ์ผ๋ก ์ ๋ขฐํ๋ผ. " | |
| "์ถ์ฒ๊ฐ ํ์ํ๋ฉด ์ URL์ ์ธ์ฉํ๋ผ." | |
| ) | |
| lines.append("=================================================") | |
| return "\n".join(lines) | |
| except Exception as e: | |
| import traceback | |
| tb = traceback.format_exc() | |
| self._report(f"[ERROR] ๋ก์ปฌ ๋ธ๋ผ์ฐ์ ๊ฒ์ ์คํจ: {str(e)}", "ERROR") | |
| # ์ ํํ ์์ธ ์ถ์ ์ ์ํด ์ ์ฒด ํธ๋ ์ด์ค๋ฐฑ์ ์ฝ์/ํ์ค์ถ๋ ฅ์ ๋จ๊ธด๋ค. | |
| print(f"[Web Search][ERROR] engine={engine} query={query!r}\n{tb}", flush=True) | |
| return f"Error executing local search: {str(e)}" | |
| finally: | |
| try: | |
| if browser: | |
| browser.close() | |
| if pw: | |
| pw.stop() | |
| except Exception: | |
| pass | |
| # ์ ์ฉ ์ค๋ ๋์์ ์คํ โ ํธ์ถ ์ธก asyncio ๋ฃจํ ์ ๋ฌด์ ๋ฌด๊ดํ๊ฒ sync Playwright ๋์ ๋ณด์ฅ | |
| import concurrent.futures | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=1) as _ex: | |
| try: | |
| return _ex.submit(_do_search).result(timeout=60) | |
| except concurrent.futures.TimeoutError: | |
| self._report("[ERROR] ๋ก์ปฌ ๋ธ๋ผ์ฐ์ ๊ฒ์ ํ์์์(60s)", "ERROR") | |
| return "Error executing local search: timeout(60s)" | |
| def stop(self) -> None: | |
| """Shuts down the browser with defensive error handling.""" | |
| try: | |
| if self.page: | |
| self.page.close() | |
| if self.context: | |
| self.context.close() | |
| if self.browser: | |
| self.browser.close() | |
| if self.playwright: | |
| self.playwright.stop() | |
| except Exception as e: | |
| # Silently handle greenlet/thread errors during cleanup | |
| print(f"[RECON DEBUG] Browser stop warning: {e}") | |
| finally: | |
| self.page = None | |
| self.context = None | |
| self.browser = None | |
| self.playwright = None | |
| self._report("[STATUS] ์์จ ๋ธ๋ผ์ฐ์ ์ ์ด ์ธ์ ์ข ๋ฃ.", "INFO") | |
| if __name__ == "__main__": | |
| # Test script | |
| ctrl = BrowserController() | |
| try: | |
| ctrl.start() | |
| ctrl.open_url("http://localhost:18889") | |
| ctrl.capture("test_audit.png") | |
| finally: | |
| ctrl.stop() | |