import gradio as gr import subprocess import time import os from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.support.ui import WebDriverWait import psutil import socket from pathlib import Path from PIL import Image import io class SimpleChromeController: def __init__(self): self.chrome_process = None self.is_running = False self.setup_directories() def setup_directories(self): """Setup required directories with proper permissions""" home_dir = Path.home() directories = [ Path('/app/chrome_profile'), Path('/app/chrome_crashes'), home_dir / '.local' / 'share' / 'applications', home_dir / '.config' ] for path in directories: try: path.mkdir(parents=True, exist_ok=True) os.chmod(path, 0o755) except (PermissionError, OSError): pass # Continue silently def get_chrome_args(self): """Get Chrome arguments for containerized environments""" return [ "google-chrome", "--remote-debugging-port=9222", "--user-data-dir=/app/chrome_profile", "--crash-dumps-dir=/app/chrome_crashes", "--no-sandbox", "--disable-dev-shm-usage", "--disable-gpu", "--disable-software-rasterizer", "--disable-background-timer-throttling", "--disable-renderer-backgrounding", "--disable-backgrounding-occluded-windows", "--disable-features=TranslateUI,BlinkGenPropertyTrees", "--disable-ipc-flooding-protection", "--disable-default-apps", "--disable-extensions", "--disable-component-extensions-with-background-pages", "--disable-background-networking", "--disable-sync", "--disable-translate", "--hide-scrollbars", "--mute-audio", "--no-first-run", "--disable-notifications", "--disable-popup-blocking", "--disable-prompt-on-repost", "--disable-hang-monitor", "--disable-logging", "--disable-login-animations", "--disable-modal-animations", "--disable-infobars", "--headless=new", "--window-size=1920,1080", "--virtual-time-budget=25000", "--disable-crash-reporter", "--disable-crashpad", "--disable-breakpad", "--no-crash-upload", "--single-process" ] def is_chrome_accessible(self): """Check if Chrome debugging port is accessible""" try: with socket.create_connection(("127.0.0.1", 9222), timeout=2) as sock: return True except (socket.timeout, ConnectionRefusedError): return False def start_chrome(self): """Start Chrome browser process""" if self.is_running and self.is_chrome_accessible(): return "⚠️ Chrome is already running" self.kill_chrome() try: chrome_args = self.get_chrome_args() self.chrome_process = subprocess.Popen( chrome_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid if hasattr(os, 'setsid') else None ) # Wait for Chrome to start for attempt in range(15): time.sleep(2) if self.is_chrome_accessible(): self.is_running = True return f"✅ Chrome started successfully (attempt {attempt+1}/15)" if self.chrome_process.poll() is not None: stdout, stderr = self.chrome_process.communicate() error_msg = stderr.decode('utf-8', errors='ignore')[:300] return f"❌ Chrome failed to start: {error_msg}" return "⚠️ Chrome may have started but debugging port not accessible. Try taking a screenshot to test." except Exception as e: return f"❌ Failed to start Chrome: {str(e)}" def get_driver(self): """Get a WebDriver instance connected to running Chrome""" if not self.is_chrome_accessible(): raise Exception("Chrome is not running or not accessible") chrome_options = Options() chrome_options.add_experimental_option("debuggerAddress", "127.0.0.1:9222") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") # Use a cached driver install service = Service(ChromeDriverManager().install()) return webdriver.Chrome(service=service, options=chrome_options) def navigate_to_url(self, url): """Navigate to a specific URL""" if not url or not url.strip(): return "❌ Please enter a valid URL" if not self.is_chrome_accessible(): return "❌ Chrome is not running. Please start Chrome first." if not url.startswith(('http://', 'https://')): url = 'https://' + url.strip() driver = None try: driver = self.get_driver() driver.set_page_load_timeout(20) driver.get(url) WebDriverWait(driver, 10).until( lambda d: d.execute_script("return document.readyState") == "complete" ) page_title = driver.title or "Unknown" return f"🧭 Navigated to: {url}\n📄 Page title: {page_title}" except Exception as e: return f"❌ Error navigating to URL: {str(e)}" finally: if driver: driver.quit() def take_screenshot(self): """Take a screenshot of the current page""" if not self.is_chrome_accessible(): return None, "❌ Chrome is not running. Please start Chrome first." driver = None try: driver = self.get_driver() current_url = driver.current_url if not current_url or current_url == "data:,": driver.get("https://www.google.com") WebDriverWait(driver, 10).until( lambda d: d.execute_script("return document.readyState") == "complete" ) screenshot_data = driver.get_screenshot_as_png() screenshot_image = Image.open(io.BytesIO(screenshot_data)) page_title = driver.title or "Unknown" page_url = driver.current_url or "Unknown" return screenshot_image, f"📸 Screenshot captured!\n📄 Page: {page_title}\n🔗 URL: {page_url}" except Exception as e: return None, f"❌ Error taking screenshot: {str(e)}" finally: if driver: driver.quit() def execute_javascript(self, js_code): """Execute JavaScript code in the browser""" if not js_code or not js_code.strip(): return "❌ Please enter JavaScript code to execute" if not self.is_chrome_accessible(): return "❌ Chrome is not running. Please start Chrome first." driver = None try: driver = self.get_driver() result = driver.execute_script(js_code.strip()) return f"✅ JavaScript executed successfully\nResult: {str(result)[:500]}" except Exception as e: return f"❌ Error executing JavaScript: {str(e)}" finally: if driver: driver.quit() def kill_chrome(self): """Kill all Chrome processes""" self.is_running = False killed_processes = 0 try: for proc in psutil.process_iter(['pid', 'name']): if 'chrome' in proc.info['name'].lower(): try: proc.terminate() killed_processes += 1 except (psutil.NoSuchProcess, psutil.AccessDenied): pass except Exception: pass if self.chrome_process: try: os.killpg(os.getpgid(self.chrome_process.pid), 9) except: pass self.chrome_process = None time.sleep(1) return f"🛑 Chrome processes terminated ({killed_processes} killed)" def get_status(self): """Get current Chrome status""" if self.is_chrome_accessible(): return "✅ Chrome is running and accessible on port 9222" else: return "⭕ Chrome is not running or not accessible" chrome_controller = SimpleChromeController() def create_interface(): with gr.Blocks(theme=gr.themes.Soft(), title="Chrome Controller") as demo: gr.Markdown("# 🌐 Chrome Controller Pro") gr.Markdown("Control a headless Chrome instance for advanced web automation.") with gr.Tabs(): with gr.TabItem("🚀 Control & Status"): with gr.Row(): start_btn = gr.Button("🚀 Start Chrome", variant="primary", scale=1) stop_btn = gr.Button("🛑 Stop Chrome", variant="stop", scale=1) status_btn = gr.Button("📊 Check Status", scale=1) status_output = gr.Textbox( label="Status", interactive=False, lines=5, value="Chrome is not running. Click 'Start Chrome' to begin." ) with gr.TabItem("🧭 Navigate & Screenshot"): with gr.Row(): url_input = gr.Textbox(label="URL", placeholder="e.g., google.com", scale=4) navigate_btn = gr.Button("🧭 Navigate", variant="primary", scale=1) screenshot_btn = gr.Button("📸 Take Screenshot", variant="secondary") screenshot_output = gr.Image(label="Screenshot", type="pil", height=500) screenshot_status = gr.Textbox(label="Action Status", interactive=False) with gr.TabItem("⚡ Execute JavaScript"): js_input = gr.Textbox( label="JavaScript Code", lines=8, value="// Example: return document.title;" ) execute_btn = gr.Button("⚡ Execute JS", variant="primary") js_output = gr.Textbox(label="JS Execution Result", interactive=False, lines=8) # Event Handlers start_btn.click(chrome_controller.start_chrome, outputs=status_output) stop_btn.click(chrome_controller.kill_chrome, outputs=status_output) status_btn.click(chrome_controller.get_status, outputs=status_output) navigate_btn.click( chrome_controller.navigate_to_url, inputs=url_input, outputs=screenshot_status ) screenshot_btn.click( chrome_controller.take_screenshot, outputs=[screenshot_output, screenshot_status] ) execute_btn.click( chrome_controller.execute_javascript, inputs=js_input, outputs=js_output ) return demo if __name__ == "__main__": demo = create_interface() demo.launch( server_name="0.0.0.0", server_port=7860, share=True # Important for Hugging Face Spaces )