|
|
import gradio as gr |
|
|
import subprocess |
|
|
import time |
|
|
import os |
|
|
from selenium import webdriver |
|
|
from selenium.webdriver.chrome.options import Options |
|
|
from selenium.webdriver.chrome.service import Service |
|
|
from webdriver_manager.chrome import ChromeDriverManager |
|
|
from selenium.webdriver.support.ui import WebDriverWait |
|
|
import psutil |
|
|
import socket |
|
|
from pathlib import Path |
|
|
from PIL import Image |
|
|
import io |
|
|
|
|
|
class SimpleChromeController: |
|
|
def __init__(self): |
|
|
self.chrome_process = None |
|
|
self.is_running = False |
|
|
self.setup_directories() |
|
|
|
|
|
def setup_directories(self): |
|
|
"""Setup required directories with proper permissions""" |
|
|
home_dir = Path.home() |
|
|
|
|
|
directories = [ |
|
|
Path('/app/chrome_profile'), |
|
|
Path('/app/chrome_crashes'), |
|
|
home_dir / '.local' / 'share' / 'applications', |
|
|
home_dir / '.config' |
|
|
] |
|
|
|
|
|
for path in directories: |
|
|
try: |
|
|
path.mkdir(parents=True, exist_ok=True) |
|
|
os.chmod(path, 0o755) |
|
|
except (PermissionError, OSError): |
|
|
pass |
|
|
|
|
|
def get_chrome_args(self): |
|
|
"""Get Chrome arguments for containerized environments""" |
|
|
return [ |
|
|
"google-chrome", |
|
|
"--remote-debugging-port=9222", |
|
|
"--user-data-dir=/app/chrome_profile", |
|
|
"--crash-dumps-dir=/app/chrome_crashes", |
|
|
"--no-sandbox", |
|
|
"--disable-dev-shm-usage", |
|
|
"--disable-gpu", |
|
|
"--disable-software-rasterizer", |
|
|
"--disable-background-timer-throttling", |
|
|
"--disable-renderer-backgrounding", |
|
|
"--disable-backgrounding-occluded-windows", |
|
|
"--disable-features=TranslateUI,BlinkGenPropertyTrees", |
|
|
"--disable-ipc-flooding-protection", |
|
|
"--disable-default-apps", |
|
|
"--disable-extensions", |
|
|
"--disable-component-extensions-with-background-pages", |
|
|
"--disable-background-networking", |
|
|
"--disable-sync", |
|
|
"--disable-translate", |
|
|
"--hide-scrollbars", |
|
|
"--mute-audio", |
|
|
"--no-first-run", |
|
|
"--disable-notifications", |
|
|
"--disable-popup-blocking", |
|
|
"--disable-prompt-on-repost", |
|
|
"--disable-hang-monitor", |
|
|
"--disable-logging", |
|
|
"--disable-login-animations", |
|
|
"--disable-modal-animations", |
|
|
"--disable-infobars", |
|
|
"--headless=new", |
|
|
"--window-size=1920,1080", |
|
|
"--virtual-time-budget=25000", |
|
|
"--disable-crash-reporter", |
|
|
"--disable-crashpad", |
|
|
"--disable-breakpad", |
|
|
"--no-crash-upload", |
|
|
"--single-process" |
|
|
] |
|
|
|
|
|
def is_chrome_accessible(self): |
|
|
"""Check if Chrome debugging port is accessible""" |
|
|
try: |
|
|
with socket.create_connection(("127.0.0.1", 9222), timeout=2) as sock: |
|
|
return True |
|
|
except (socket.timeout, ConnectionRefusedError): |
|
|
return False |
|
|
|
|
|
def start_chrome(self): |
|
|
"""Start Chrome browser process""" |
|
|
if self.is_running and self.is_chrome_accessible(): |
|
|
return "β οΈ Chrome is already running" |
|
|
|
|
|
self.kill_chrome() |
|
|
|
|
|
try: |
|
|
chrome_args = self.get_chrome_args() |
|
|
|
|
|
self.chrome_process = subprocess.Popen( |
|
|
chrome_args, |
|
|
stdout=subprocess.PIPE, |
|
|
stderr=subprocess.PIPE, |
|
|
preexec_fn=os.setsid if hasattr(os, 'setsid') else None |
|
|
) |
|
|
|
|
|
|
|
|
for attempt in range(15): |
|
|
time.sleep(2) |
|
|
if self.is_chrome_accessible(): |
|
|
self.is_running = True |
|
|
return f"β
Chrome started successfully (attempt {attempt+1}/15)" |
|
|
|
|
|
if self.chrome_process.poll() is not None: |
|
|
stdout, stderr = self.chrome_process.communicate() |
|
|
error_msg = stderr.decode('utf-8', errors='ignore')[:300] |
|
|
return f"β Chrome failed to start: {error_msg}" |
|
|
|
|
|
return "β οΈ Chrome may have started but debugging port not accessible. Try taking a screenshot to test." |
|
|
|
|
|
except Exception as e: |
|
|
return f"β Failed to start Chrome: {str(e)}" |
|
|
|
|
|
def get_driver(self): |
|
|
"""Get a WebDriver instance connected to running Chrome""" |
|
|
if not self.is_chrome_accessible(): |
|
|
raise Exception("Chrome is not running or not accessible") |
|
|
|
|
|
chrome_options = Options() |
|
|
chrome_options.add_experimental_option("debuggerAddress", "127.0.0.1:9222") |
|
|
chrome_options.add_argument("--no-sandbox") |
|
|
chrome_options.add_argument("--disable-dev-shm-usage") |
|
|
|
|
|
|
|
|
service = Service(ChromeDriverManager().install()) |
|
|
return webdriver.Chrome(service=service, options=chrome_options) |
|
|
|
|
|
def navigate_to_url(self, url): |
|
|
"""Navigate to a specific URL""" |
|
|
if not url or not url.strip(): |
|
|
return "β Please enter a valid URL" |
|
|
|
|
|
if not self.is_chrome_accessible(): |
|
|
return "β Chrome is not running. Please start Chrome first." |
|
|
|
|
|
if not url.startswith(('http://', 'https://')): |
|
|
url = 'https://' + url.strip() |
|
|
|
|
|
driver = None |
|
|
try: |
|
|
driver = self.get_driver() |
|
|
driver.set_page_load_timeout(20) |
|
|
driver.get(url) |
|
|
|
|
|
WebDriverWait(driver, 10).until( |
|
|
lambda d: d.execute_script("return document.readyState") == "complete" |
|
|
) |
|
|
|
|
|
page_title = driver.title or "Unknown" |
|
|
return f"π§ Navigated to: {url}\nπ Page title: {page_title}" |
|
|
|
|
|
except Exception as e: |
|
|
return f"β Error navigating to URL: {str(e)}" |
|
|
finally: |
|
|
if driver: |
|
|
driver.quit() |
|
|
|
|
|
def take_screenshot(self): |
|
|
"""Take a screenshot of the current page""" |
|
|
if not self.is_chrome_accessible(): |
|
|
return None, "β Chrome is not running. Please start Chrome first." |
|
|
|
|
|
driver = None |
|
|
try: |
|
|
driver = self.get_driver() |
|
|
|
|
|
current_url = driver.current_url |
|
|
if not current_url or current_url == "data:,": |
|
|
driver.get("https://www.google.com") |
|
|
WebDriverWait(driver, 10).until( |
|
|
lambda d: d.execute_script("return document.readyState") == "complete" |
|
|
) |
|
|
|
|
|
screenshot_data = driver.get_screenshot_as_png() |
|
|
screenshot_image = Image.open(io.BytesIO(screenshot_data)) |
|
|
|
|
|
page_title = driver.title or "Unknown" |
|
|
page_url = driver.current_url or "Unknown" |
|
|
|
|
|
return screenshot_image, f"πΈ Screenshot captured!\nπ Page: {page_title}\nπ URL: {page_url}" |
|
|
|
|
|
except Exception as e: |
|
|
return None, f"β Error taking screenshot: {str(e)}" |
|
|
finally: |
|
|
if driver: |
|
|
driver.quit() |
|
|
|
|
|
def execute_javascript(self, js_code): |
|
|
"""Execute JavaScript code in the browser""" |
|
|
if not js_code or not js_code.strip(): |
|
|
return "β Please enter JavaScript code to execute" |
|
|
|
|
|
if not self.is_chrome_accessible(): |
|
|
return "β Chrome is not running. Please start Chrome first." |
|
|
|
|
|
driver = None |
|
|
try: |
|
|
driver = self.get_driver() |
|
|
result = driver.execute_script(js_code.strip()) |
|
|
return f"β
JavaScript executed successfully\nResult: {str(result)[:500]}" |
|
|
|
|
|
except Exception as e: |
|
|
return f"β Error executing JavaScript: {str(e)}" |
|
|
finally: |
|
|
if driver: |
|
|
driver.quit() |
|
|
|
|
|
def kill_chrome(self): |
|
|
"""Kill all Chrome processes""" |
|
|
self.is_running = False |
|
|
killed_processes = 0 |
|
|
|
|
|
try: |
|
|
for proc in psutil.process_iter(['pid', 'name']): |
|
|
if 'chrome' in proc.info['name'].lower(): |
|
|
try: |
|
|
proc.terminate() |
|
|
killed_processes += 1 |
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied): |
|
|
pass |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
if self.chrome_process: |
|
|
try: |
|
|
os.killpg(os.getpgid(self.chrome_process.pid), 9) |
|
|
except: |
|
|
pass |
|
|
self.chrome_process = None |
|
|
|
|
|
time.sleep(1) |
|
|
|
|
|
return f"π Chrome processes terminated ({killed_processes} killed)" |
|
|
|
|
|
def get_status(self): |
|
|
"""Get current Chrome status""" |
|
|
if self.is_chrome_accessible(): |
|
|
return "β
Chrome is running and accessible on port 9222" |
|
|
else: |
|
|
return "β Chrome is not running or not accessible" |
|
|
|
|
|
chrome_controller = SimpleChromeController() |
|
|
|
|
|
def create_interface(): |
|
|
with gr.Blocks(theme=gr.themes.Soft(), title="Chrome Controller") as demo: |
|
|
gr.Markdown("# π Chrome Controller Pro") |
|
|
gr.Markdown("Control a headless Chrome instance for advanced web automation.") |
|
|
|
|
|
with gr.Tabs(): |
|
|
with gr.TabItem("π Control & Status"): |
|
|
with gr.Row(): |
|
|
start_btn = gr.Button("π Start Chrome", variant="primary", scale=1) |
|
|
stop_btn = gr.Button("π Stop Chrome", variant="stop", scale=1) |
|
|
status_btn = gr.Button("π Check Status", scale=1) |
|
|
|
|
|
status_output = gr.Textbox( |
|
|
label="Status", |
|
|
interactive=False, |
|
|
lines=5, |
|
|
value="Chrome is not running. Click 'Start Chrome' to begin." |
|
|
) |
|
|
|
|
|
with gr.TabItem("π§ Navigate & Screenshot"): |
|
|
with gr.Row(): |
|
|
url_input = gr.Textbox(label="URL", placeholder="e.g., google.com", scale=4) |
|
|
navigate_btn = gr.Button("π§ Navigate", variant="primary", scale=1) |
|
|
|
|
|
screenshot_btn = gr.Button("πΈ Take Screenshot", variant="secondary") |
|
|
|
|
|
screenshot_output = gr.Image(label="Screenshot", type="pil", height=500) |
|
|
screenshot_status = gr.Textbox(label="Action Status", interactive=False) |
|
|
|
|
|
with gr.TabItem("β‘ Execute JavaScript"): |
|
|
js_input = gr.Textbox( |
|
|
label="JavaScript Code", |
|
|
lines=8, |
|
|
value="// Example: return document.title;" |
|
|
) |
|
|
execute_btn = gr.Button("β‘ Execute JS", variant="primary") |
|
|
js_output = gr.Textbox(label="JS Execution Result", interactive=False, lines=8) |
|
|
|
|
|
|
|
|
start_btn.click(chrome_controller.start_chrome, outputs=status_output) |
|
|
stop_btn.click(chrome_controller.kill_chrome, outputs=status_output) |
|
|
status_btn.click(chrome_controller.get_status, outputs=status_output) |
|
|
|
|
|
navigate_btn.click( |
|
|
chrome_controller.navigate_to_url, |
|
|
inputs=url_input, |
|
|
outputs=screenshot_status |
|
|
) |
|
|
screenshot_btn.click( |
|
|
chrome_controller.take_screenshot, |
|
|
outputs=[screenshot_output, screenshot_status] |
|
|
) |
|
|
execute_btn.click( |
|
|
chrome_controller.execute_javascript, |
|
|
inputs=js_input, |
|
|
outputs=js_output |
|
|
) |
|
|
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo = create_interface() |
|
|
demo.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
share=True |
|
|
) |