#!/usr/bin/env python3 from flask import Flask, request, jsonify import time import random import json import re from io import BytesIO from typing import Dict, List import tempfile import os import subprocess import zipfile from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.action_chains import ActionChains from PIL import Image import numpy as np import cv2 from transformers import CLIPProcessor, CLIPModel import torch import requests app = Flask(__name__) print("šŸ”„ Loading CLIP model...") clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") print("āœ… Model ready") TMPFILES_HOST = "https://tmpfiles.org/api/v1/upload" def upload_to_tmpfiles(image_path: str) -> str: try: with open(image_path, 'rb') as f: files = {'file': f} response = requests.post(TMPFILES_HOST, files=files, timeout=10) if response.status_code == 200: data = response.json() url = data.get('data', {}).get('url', '') return url.replace('tmpfiles.org/', 'tmpfiles.org/dl/') return None except Exception as e: print(f"Upload error: {e}") return None def get_chrome_version(): try: result = subprocess.run(['google-chrome', '--version'], capture_output=True, text=True) version = result.stdout.strip().split()[-1] major_version = version.split('.')[0] return version, major_version except: return "unknown", "unknown" def download_chromedriver(): full_version, major_version = get_chrome_version() print(f"šŸ” Chrome version: {full_version}") driver_dir = os.path.expanduser("~/.chromedriver") driver_path = os.path.join(driver_dir, "chromedriver") if os.path.exists(driver_path): print(f"āœ… ChromeDriver exists: {driver_path}") return driver_path os.makedirs(driver_dir, exist_ok=True) try: url = f"https://googlechromelabs.github.io/chrome-for-testing/LATEST_RELEASE_{major_version}" response = requests.get(url, timeout=10) driver_version = response.text.strip() print(f"šŸ” ChromeDriver version: {driver_version}") download_url = f"https://storage.googleapis.com/chrome-for-testing-public/{driver_version}/linux64/chromedriver-linux64.zip" print(f"ā¬‡ļø Downloading...") zip_path = os.path.join(driver_dir, "chromedriver.zip") response = requests.get(download_url, timeout=60) with open(zip_path, 'wb') as f: f.write(response.content) with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(driver_dir) extracted_driver = os.path.join(driver_dir, "chromedriver-linux64", "chromedriver") if os.path.exists(extracted_driver): os.rename(extracted_driver, driver_path) os.chmod(driver_path, 0o755) os.remove(zip_path) import shutil extracted_folder = os.path.join(driver_dir, "chromedriver-linux64") if os.path.exists(extracted_folder): shutil.rmtree(extracted_folder) print(f"āœ… ChromeDriver installed: {driver_path}") return driver_path except Exception as e: print(f"āŒ Download failed: {e}") raise e def human_move_to_element(driver, element): action = ActionChains(driver) current_x = random.randint(100, 500) current_y = random.randint(100, 500) target_x = element.location['x'] + element.size['width'] / 2 target_y = element.location['y'] + element.size['height'] / 2 steps = random.randint(15, 30) for i in range(steps): progress = i / steps noise_x = random.uniform(-3, 3) noise_y = random.uniform(-3, 3) intermediate_x = current_x + (target_x - current_x) * progress + noise_x intermediate_y = current_y + (target_y - current_y) * progress + noise_y action.move_by_offset(intermediate_x - current_x, intermediate_y - current_y) current_x = intermediate_x current_y = intermediate_y time.sleep(random.uniform(0.001, 0.005)) action.perform() time.sleep(random.uniform(0.1, 0.3)) def human_click(driver, element): human_move_to_element(driver, element) time.sleep(random.uniform(0.05, 0.15)) element.click() time.sleep(random.uniform(0.1, 0.2)) def create_driver(): print("šŸ”„ Initializing ChromeDriver...") driver_path = download_chromedriver() options = Options() options.add_argument('--headless=new') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--disable-gpu') options.add_argument('--window-size=1920,1080') options.add_argument('--disable-blink-features=AutomationControlled') options.add_argument('--disable-web-security') options.add_argument('--disable-features=IsolateOrigins,site-per-process') options.add_argument('--allow-running-insecure-content') options.add_argument('--disable-setuid-sandbox') options.add_argument('--disable-software-rasterizer') options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36') options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option('useAutomationExtension', False) options.set_capability('goog:loggingPrefs', {'browser': 'ALL'}) service = Service(driver_path) service.log_path = '/dev/null' driver = webdriver.Chrome(service=service, options=options) driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") driver.set_window_size(1920, 1080) print("āœ… Driver ready") return driver def solve_image_with_ai(image: Image.Image, target: str) -> float: inputs = clip_processor( text=[f"a photo of {target}", "other objects"], images=image, return_tensors="pt", padding=True ) outputs = clip_model(**inputs) probs = outputs.logits_per_image.softmax(dim=1) return probs[0][0].item() def detect_puzzle_points(image: Image.Image) -> List[Dict]: img_array = np.array(image) gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) circles = cv2.HoughCircles(gray, cv2.HOUGH_GRADIENT, dp=1, minDist=30, param1=50, param2=30, minRadius=8, maxRadius=30) points = [] if circles is not None: circles = np.uint16(np.around(circles)) for i, (x, y, r) in enumerate(circles[0, :]): points.append({'number': i + 1, 'x': int(x), 'y': int(y), 'radius': int(r)}) points = sorted(points, key=lambda p: (p['y'], p['x'])) return points def solve_puzzle_captcha(driver, canvas_element) -> bool: try: png = canvas_element.screenshot_as_png() img = Image.open(BytesIO(png)) points = detect_puzzle_points(img) if not points: return False print(f"āœ… Detected {len(points)} puzzle points") action = ActionChains(driver) start_point = points[0] offset_x = start_point['x'] - canvas_element.size['width'] / 2 offset_y = start_point['y'] - canvas_element.size['height'] / 2 action.move_to_element_with_offset(canvas_element, offset_x, offset_y) action.click_and_hold() for point in points[1:]: offset_x = point['x'] - canvas_element.size['width'] / 2 offset_y = point['y'] - canvas_element.size['height'] / 2 action.move_to_element_with_offset(canvas_element, offset_x, offset_y) time.sleep(random.uniform(0.1, 0.3)) action.release() action.perform() time.sleep(1) return True except Exception as e: print(f"āŒ Puzzle error: {e}") return False def extract_challenge_info(driver): try: WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CLASS_NAME, "prompt-text"))) prompt_text = driver.find_element(By.CLASS_NAME, "prompt-text").text match = re.search(r'Select all (\w+)', prompt_text, re.IGNORECASE) target = match.group(1).rstrip('s').lower() if match else "unknown" images = [] img_elements = driver.find_elements(By.CSS_SELECTOR, ".task-image") for img_elem in img_elements: png = img_elem.screenshot_as_png() img = Image.open(BytesIO(png)) images.append(img) return {'target': target, 'images': images, 'count': len(images), 'elements': img_elements} except Exception as e: print(f"āŒ Extract error: {e}") return None def solve_grid_captcha(driver, challenge_info) -> bool: try: selected_indices = [] for idx, img in enumerate(challenge_info['images']): confidence = solve_image_with_ai(img, challenge_info['target']) if confidence > 0.55: selected_indices.append(idx) print(f"āœ… Image {idx}: {confidence:.2%}") else: print(f"ā­ļø Image {idx}: {confidence:.2%}") if not selected_indices: return False for idx in selected_indices: if idx < len(challenge_info['elements']): human_click(driver, challenge_info['elements'][idx]) submit_btn = driver.find_element(By.CSS_SELECTOR, ".button-submit") human_click(driver, submit_btn) time.sleep(2) return True except Exception as e: print(f"āŒ Grid error: {e}") return False def detect_challenge_type(driver) -> str: try: if driver.find_elements(By.CSS_SELECTOR, "canvas"): return "puzzle" elif driver.find_elements(By.CSS_SELECTOR, ".task-image"): return "image_grid" else: return "unknown" except: return "unknown" def screenshot_and_upload(element) -> str: try: png = element.screenshot_as_png() temp_dir = tempfile.gettempdir() temp_path = os.path.join(temp_dir, f"hcaptcha_{int(time.time())}_{random.randint(1000,9999)}.png") with open(temp_path, 'wb') as f: f.write(png) url = upload_to_tmpfiles(temp_path) try: os.remove(temp_path) except: pass return url except Exception as e: print(f"āŒ Screenshot error: {e}") return None def solve_hcaptcha(sitekey: str, url: str) -> Dict: driver = None screenshot_urls = [] try: driver = create_driver() driver.get(url) print(f"āœ… Opened: {url}") time.sleep(random.uniform(2, 4)) WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "iframe[src*='hcaptcha']"))) iframes = driver.find_elements(By.CSS_SELECTOR, "iframe[src*='hcaptcha']") checkbox_iframe = None for iframe in iframes: if 'checkbox' in iframe.get_attribute('src'): checkbox_iframe = iframe break if not checkbox_iframe: return {'success': False, 'error': 'Checkbox not found'} screenshot_url = screenshot_and_upload(checkbox_iframe) if screenshot_url: screenshot_urls.append({'type': 'checkbox', 'url': screenshot_url}) driver.switch_to.frame(checkbox_iframe) checkbox = WebDriverWait(driver, 10).until(EC.element_to_be_clickable((By.ID, "checkbox"))) human_click(driver, checkbox) print("āœ… Checkbox clicked") driver.switch_to.default_content() time.sleep(random.uniform(2, 4)) iframes = driver.find_elements(By.CSS_SELECTOR, "iframe[src*='hcaptcha']") challenge_iframe = None for iframe in iframes: if 'challenge' in iframe.get_attribute('src'): challenge_iframe = iframe break if not challenge_iframe: token = extract_token(driver) if token: return {'success': True, 'token': token, 'method': 'checkbox_only', 'screenshots': screenshot_urls} else: return {'success': False, 'error': 'No token', 'screenshots': screenshot_urls} screenshot_url = screenshot_and_upload(challenge_iframe) if screenshot_url: screenshot_urls.append({'type': 'challenge', 'url': screenshot_url}) driver.switch_to.frame(challenge_iframe) challenge_type = detect_challenge_type(driver) print(f"šŸŽÆ Challenge: {challenge_type}") if challenge_type == "puzzle": canvas = driver.find_element(By.CSS_SELECTOR, "canvas") success = solve_puzzle_captcha(driver, canvas) if not success: driver.switch_to.default_content() return {'success': False, 'error': 'Puzzle failed', 'screenshots': screenshot_urls} elif challenge_type == "image_grid": challenge_info = extract_challenge_info(driver) if not challenge_info: driver.switch_to.default_content() return {'success': False, 'error': 'Extract failed', 'screenshots': screenshot_urls} print(f"šŸŽÆ Target: {challenge_info['target']}") success = solve_grid_captcha(driver, challenge_info) if not success: driver.switch_to.default_content() return {'success': False, 'error': 'Grid failed', 'screenshots': screenshot_urls} else: driver.switch_to.default_content() return {'success': False, 'error': f'Unknown: {challenge_type}', 'screenshots': screenshot_urls} driver.switch_to.default_content() time.sleep(3) token = extract_token(driver) if token: return {'success': True, 'token': token, 'challenge_type': challenge_type, 'screenshots': screenshot_urls} else: return {'success': False, 'error': 'No token', 'screenshots': screenshot_urls} except Exception as e: import traceback error_detail = traceback.format_exc() print(f"āŒ Error: {error_detail}") return {'success': False, 'error': str(e), 'error_detail': error_detail, 'screenshots': screenshot_urls} finally: if driver: try: driver.quit() except: pass def extract_token(driver): try: time.sleep(2) token_element = driver.find_element(By.NAME, "h-captcha-response") token = token_element.get_attribute("value") if token and len(token) > 10: return token token_element = driver.find_element(By.NAME, "g-recaptcha-response") token = token_element.get_attribute("value") if token and len(token) > 10: return token return None except: return None @app.route('/solve', methods=['GET']) def solve(): sitekey = request.args.get('sitekey') url = request.args.get('url') if not sitekey or not url: return jsonify({'success': False, 'error': 'Missing params'}), 400 print(f"\n{'='*60}") print(f"šŸš€ Solving: {url}") print(f"{'='*60}\n") result = solve_hcaptcha(sitekey, url) return jsonify(result) @app.route('/health', methods=['GET']) def health(): full_version, major_version = get_chrome_version() return jsonify({ 'status': 'online', 'model': 'CLIP', 'chrome': full_version }) @app.route('/', methods=['GET']) def root(): return jsonify({ 'service': 'hCaptcha Solver', 'version': '2.3', 'endpoints': { '/solve': 'GET ?sitekey=X&url=Y', '/health': 'GET' } }) if __name__ == '__main__': print("\nšŸ¤– hCaptcha Solver API v2.3") print("🌐 http://0.0.0.0:7860\n") app.run(host='0.0.0.0', port=7860, debug=False)