| |
|
|
| from flask import Flask, request, jsonify |
| import time |
| import random |
| import json |
| import re |
| from io import BytesIO |
| from typing import Dict, List |
| import tempfile |
| import os |
| import subprocess |
| import zipfile |
|
|
| from selenium import webdriver |
| from selenium.webdriver.common.by import By |
| from selenium.webdriver.support.ui import WebDriverWait |
| from selenium.webdriver.support import expected_conditions as EC |
| from selenium.webdriver.chrome.options import Options |
| from selenium.webdriver.chrome.service import Service |
| from selenium.webdriver.common.action_chains import ActionChains |
|
|
| from PIL import Image |
| import numpy as np |
| import cv2 |
| from transformers import CLIPProcessor, CLIPModel |
| import torch |
| import requests |
|
|
| app = Flask(__name__) |
|
|
| print("π Loading CLIP model...") |
| clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") |
| clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") |
| print("β
Model ready") |
|
|
| TMPFILES_HOST = "https://tmpfiles.org/api/v1/upload" |
|
|
| def upload_to_tmpfiles(image_path: str) -> str: |
| try: |
| with open(image_path, 'rb') as f: |
| files = {'file': f} |
| response = requests.post(TMPFILES_HOST, files=files, timeout=10) |
| |
| if response.status_code == 200: |
| data = response.json() |
| url = data.get('data', {}).get('url', '') |
| return url.replace('tmpfiles.org/', 'tmpfiles.org/dl/') |
| return None |
| except Exception as e: |
| print(f"Upload error: {e}") |
| return None |
|
|
| def get_chrome_version(): |
| try: |
| result = subprocess.run(['google-chrome', '--version'], capture_output=True, text=True) |
| version = result.stdout.strip().split()[-1] |
| major_version = version.split('.')[0] |
| return version, major_version |
| except: |
| return "unknown", "unknown" |
|
|
| def download_chromedriver(): |
| full_version, major_version = get_chrome_version() |
| print(f"π Chrome version: {full_version}") |
| |
| driver_dir = os.path.expanduser("~/.chromedriver") |
| driver_path = os.path.join(driver_dir, "chromedriver") |
| |
| if os.path.exists(driver_path): |
| print(f"β
ChromeDriver exists: {driver_path}") |
| return driver_path |
| |
| os.makedirs(driver_dir, exist_ok=True) |
| |
| try: |
| url = f"https://googlechromelabs.github.io/chrome-for-testing/LATEST_RELEASE_{major_version}" |
| response = requests.get(url, timeout=10) |
| driver_version = response.text.strip() |
| print(f"π ChromeDriver version: {driver_version}") |
| |
| download_url = f"https://storage.googleapis.com/chrome-for-testing-public/{driver_version}/linux64/chromedriver-linux64.zip" |
| print(f"β¬οΈ Downloading...") |
| |
| zip_path = os.path.join(driver_dir, "chromedriver.zip") |
| response = requests.get(download_url, timeout=60) |
| |
| with open(zip_path, 'wb') as f: |
| f.write(response.content) |
| |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: |
| zip_ref.extractall(driver_dir) |
| |
| extracted_driver = os.path.join(driver_dir, "chromedriver-linux64", "chromedriver") |
| |
| if os.path.exists(extracted_driver): |
| os.rename(extracted_driver, driver_path) |
| os.chmod(driver_path, 0o755) |
| |
| os.remove(zip_path) |
| |
| import shutil |
| extracted_folder = os.path.join(driver_dir, "chromedriver-linux64") |
| if os.path.exists(extracted_folder): |
| shutil.rmtree(extracted_folder) |
| |
| print(f"β
ChromeDriver installed: {driver_path}") |
| return driver_path |
| |
| except Exception as e: |
| print(f"β Download failed: {e}") |
| raise e |
|
|
| def human_move_to_element(driver, element): |
| action = ActionChains(driver) |
| current_x = random.randint(100, 500) |
| current_y = random.randint(100, 500) |
| target_x = element.location['x'] + element.size['width'] / 2 |
| target_y = element.location['y'] + element.size['height'] / 2 |
| steps = random.randint(15, 30) |
| |
| for i in range(steps): |
| progress = i / steps |
| noise_x = random.uniform(-3, 3) |
| noise_y = random.uniform(-3, 3) |
| intermediate_x = current_x + (target_x - current_x) * progress + noise_x |
| intermediate_y = current_y + (target_y - current_y) * progress + noise_y |
| action.move_by_offset(intermediate_x - current_x, intermediate_y - current_y) |
| current_x = intermediate_x |
| current_y = intermediate_y |
| time.sleep(random.uniform(0.001, 0.005)) |
| |
| action.perform() |
| time.sleep(random.uniform(0.1, 0.3)) |
|
|
| def human_click(driver, element): |
| human_move_to_element(driver, element) |
| time.sleep(random.uniform(0.05, 0.15)) |
| element.click() |
| time.sleep(random.uniform(0.1, 0.2)) |
|
|
| def create_driver(): |
| print("π Initializing ChromeDriver...") |
| |
| driver_path = download_chromedriver() |
| |
| options = Options() |
| options.add_argument('--headless=new') |
| options.add_argument('--no-sandbox') |
| options.add_argument('--disable-dev-shm-usage') |
| options.add_argument('--disable-gpu') |
| options.add_argument('--window-size=1920,1080') |
| options.add_argument('--disable-blink-features=AutomationControlled') |
| options.add_argument('--disable-web-security') |
| options.add_argument('--disable-features=IsolateOrigins,site-per-process') |
| options.add_argument('--allow-running-insecure-content') |
| options.add_argument('--disable-setuid-sandbox') |
| options.add_argument('--disable-software-rasterizer') |
| options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36') |
| options.add_experimental_option("excludeSwitches", ["enable-automation"]) |
| options.add_experimental_option('useAutomationExtension', False) |
| options.set_capability('goog:loggingPrefs', {'browser': 'ALL'}) |
| |
| service = Service(driver_path) |
| service.log_path = '/dev/null' |
| |
| driver = webdriver.Chrome(service=service, options=options) |
| driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") |
| |
| driver.set_window_size(1920, 1080) |
| |
| print("β
Driver ready") |
| return driver |
|
|
| def solve_image_with_ai(image: Image.Image, target: str) -> float: |
| inputs = clip_processor( |
| text=[f"a photo of {target}", "other objects"], |
| images=image, |
| return_tensors="pt", |
| padding=True |
| ) |
| outputs = clip_model(**inputs) |
| probs = outputs.logits_per_image.softmax(dim=1) |
| return probs[0][0].item() |
|
|
| def detect_puzzle_points(image: Image.Image) -> List[Dict]: |
| img_array = np.array(image) |
| gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) |
| circles = cv2.HoughCircles(gray, cv2.HOUGH_GRADIENT, dp=1, minDist=30, param1=50, param2=30, minRadius=8, maxRadius=30) |
| points = [] |
| |
| if circles is not None: |
| circles = np.uint16(np.around(circles)) |
| for i, (x, y, r) in enumerate(circles[0, :]): |
| points.append({'number': i + 1, 'x': int(x), 'y': int(y), 'radius': int(r)}) |
| points = sorted(points, key=lambda p: (p['y'], p['x'])) |
| |
| return points |
|
|
| def solve_puzzle_captcha(driver, canvas_element) -> bool: |
| try: |
| png = canvas_element.screenshot_as_png() |
| img = Image.open(BytesIO(png)) |
| points = detect_puzzle_points(img) |
| |
| if not points: |
| return False |
| |
| print(f"β
Detected {len(points)} puzzle points") |
| action = ActionChains(driver) |
| start_point = points[0] |
| offset_x = start_point['x'] - canvas_element.size['width'] / 2 |
| offset_y = start_point['y'] - canvas_element.size['height'] / 2 |
| action.move_to_element_with_offset(canvas_element, offset_x, offset_y) |
| action.click_and_hold() |
| |
| for point in points[1:]: |
| offset_x = point['x'] - canvas_element.size['width'] / 2 |
| offset_y = point['y'] - canvas_element.size['height'] / 2 |
| action.move_to_element_with_offset(canvas_element, offset_x, offset_y) |
| time.sleep(random.uniform(0.1, 0.3)) |
| |
| action.release() |
| action.perform() |
| time.sleep(1) |
| return True |
| except Exception as e: |
| print(f"β Puzzle error: {e}") |
| return False |
|
|
| def extract_challenge_info(driver): |
| try: |
| WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CLASS_NAME, "prompt-text"))) |
| prompt_text = driver.find_element(By.CLASS_NAME, "prompt-text").text |
| match = re.search(r'Select all (\w+)', prompt_text, re.IGNORECASE) |
| target = match.group(1).rstrip('s').lower() if match else "unknown" |
| |
| images = [] |
| img_elements = driver.find_elements(By.CSS_SELECTOR, ".task-image") |
| |
| for img_elem in img_elements: |
| png = img_elem.screenshot_as_png() |
| img = Image.open(BytesIO(png)) |
| images.append(img) |
| |
| return {'target': target, 'images': images, 'count': len(images), 'elements': img_elements} |
| except Exception as e: |
| print(f"β Extract error: {e}") |
| return None |
|
|
| def solve_grid_captcha(driver, challenge_info) -> bool: |
| try: |
| selected_indices = [] |
| |
| for idx, img in enumerate(challenge_info['images']): |
| confidence = solve_image_with_ai(img, challenge_info['target']) |
| if confidence > 0.55: |
| selected_indices.append(idx) |
| print(f"β
Image {idx}: {confidence:.2%}") |
| else: |
| print(f"βοΈ Image {idx}: {confidence:.2%}") |
| |
| if not selected_indices: |
| return False |
| |
| for idx in selected_indices: |
| if idx < len(challenge_info['elements']): |
| human_click(driver, challenge_info['elements'][idx]) |
| |
| submit_btn = driver.find_element(By.CSS_SELECTOR, ".button-submit") |
| human_click(driver, submit_btn) |
| time.sleep(2) |
| return True |
| except Exception as e: |
| print(f"β Grid error: {e}") |
| return False |
|
|
| def detect_challenge_type(driver) -> str: |
| try: |
| if driver.find_elements(By.CSS_SELECTOR, "canvas"): |
| return "puzzle" |
| elif driver.find_elements(By.CSS_SELECTOR, ".task-image"): |
| return "image_grid" |
| else: |
| return "unknown" |
| except: |
| return "unknown" |
|
|
| def screenshot_and_upload(element) -> str: |
| try: |
| png = element.screenshot_as_png() |
| |
| temp_dir = tempfile.gettempdir() |
| temp_path = os.path.join(temp_dir, f"hcaptcha_{int(time.time())}_{random.randint(1000,9999)}.png") |
| |
| with open(temp_path, 'wb') as f: |
| f.write(png) |
| |
| url = upload_to_tmpfiles(temp_path) |
| |
| try: |
| os.remove(temp_path) |
| except: |
| pass |
| |
| return url |
| except Exception as e: |
| print(f"β Screenshot error: {e}") |
| return None |
|
|
| def solve_hcaptcha(sitekey: str, url: str) -> Dict: |
| driver = None |
| screenshot_urls = [] |
| |
| try: |
| driver = create_driver() |
| driver.get(url) |
| print(f"β
Opened: {url}") |
| time.sleep(random.uniform(2, 4)) |
| |
| WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "iframe[src*='hcaptcha']"))) |
| iframes = driver.find_elements(By.CSS_SELECTOR, "iframe[src*='hcaptcha']") |
| |
| checkbox_iframe = None |
| for iframe in iframes: |
| if 'checkbox' in iframe.get_attribute('src'): |
| checkbox_iframe = iframe |
| break |
| |
| if not checkbox_iframe: |
| return {'success': False, 'error': 'Checkbox not found'} |
| |
| screenshot_url = screenshot_and_upload(checkbox_iframe) |
| if screenshot_url: |
| screenshot_urls.append({'type': 'checkbox', 'url': screenshot_url}) |
| |
| driver.switch_to.frame(checkbox_iframe) |
| checkbox = WebDriverWait(driver, 10).until(EC.element_to_be_clickable((By.ID, "checkbox"))) |
| human_click(driver, checkbox) |
| print("β
Checkbox clicked") |
| driver.switch_to.default_content() |
| time.sleep(random.uniform(2, 4)) |
| |
| iframes = driver.find_elements(By.CSS_SELECTOR, "iframe[src*='hcaptcha']") |
| challenge_iframe = None |
| for iframe in iframes: |
| if 'challenge' in iframe.get_attribute('src'): |
| challenge_iframe = iframe |
| break |
| |
| if not challenge_iframe: |
| token = extract_token(driver) |
| if token: |
| return {'success': True, 'token': token, 'method': 'checkbox_only', 'screenshots': screenshot_urls} |
| else: |
| return {'success': False, 'error': 'No token', 'screenshots': screenshot_urls} |
| |
| screenshot_url = screenshot_and_upload(challenge_iframe) |
| if screenshot_url: |
| screenshot_urls.append({'type': 'challenge', 'url': screenshot_url}) |
| |
| driver.switch_to.frame(challenge_iframe) |
| challenge_type = detect_challenge_type(driver) |
| print(f"π― Challenge: {challenge_type}") |
| |
| if challenge_type == "puzzle": |
| canvas = driver.find_element(By.CSS_SELECTOR, "canvas") |
| success = solve_puzzle_captcha(driver, canvas) |
| if not success: |
| driver.switch_to.default_content() |
| return {'success': False, 'error': 'Puzzle failed', 'screenshots': screenshot_urls} |
| |
| elif challenge_type == "image_grid": |
| challenge_info = extract_challenge_info(driver) |
| if not challenge_info: |
| driver.switch_to.default_content() |
| return {'success': False, 'error': 'Extract failed', 'screenshots': screenshot_urls} |
| |
| print(f"π― Target: {challenge_info['target']}") |
| success = solve_grid_captcha(driver, challenge_info) |
| if not success: |
| driver.switch_to.default_content() |
| return {'success': False, 'error': 'Grid failed', 'screenshots': screenshot_urls} |
| else: |
| driver.switch_to.default_content() |
| return {'success': False, 'error': f'Unknown: {challenge_type}', 'screenshots': screenshot_urls} |
| |
| driver.switch_to.default_content() |
| time.sleep(3) |
| token = extract_token(driver) |
| |
| if token: |
| return {'success': True, 'token': token, 'challenge_type': challenge_type, 'screenshots': screenshot_urls} |
| else: |
| return {'success': False, 'error': 'No token', 'screenshots': screenshot_urls} |
| |
| except Exception as e: |
| import traceback |
| error_detail = traceback.format_exc() |
| print(f"β Error: {error_detail}") |
| return {'success': False, 'error': str(e), 'error_detail': error_detail, 'screenshots': screenshot_urls} |
| finally: |
| if driver: |
| try: |
| driver.quit() |
| except: |
| pass |
|
|
| def extract_token(driver): |
| try: |
| time.sleep(2) |
| token_element = driver.find_element(By.NAME, "h-captcha-response") |
| token = token_element.get_attribute("value") |
| if token and len(token) > 10: |
| return token |
| |
| token_element = driver.find_element(By.NAME, "g-recaptcha-response") |
| token = token_element.get_attribute("value") |
| if token and len(token) > 10: |
| return token |
| |
| return None |
| except: |
| return None |
|
|
| @app.route('/solve', methods=['GET']) |
| def solve(): |
| sitekey = request.args.get('sitekey') |
| url = request.args.get('url') |
| |
| if not sitekey or not url: |
| return jsonify({'success': False, 'error': 'Missing params'}), 400 |
| |
| print(f"\n{'='*60}") |
| print(f"π Solving: {url}") |
| print(f"{'='*60}\n") |
| |
| result = solve_hcaptcha(sitekey, url) |
| return jsonify(result) |
|
|
| @app.route('/health', methods=['GET']) |
| def health(): |
| full_version, major_version = get_chrome_version() |
| return jsonify({ |
| 'status': 'online', |
| 'model': 'CLIP', |
| 'chrome': full_version |
| }) |
|
|
| @app.route('/', methods=['GET']) |
| def root(): |
| return jsonify({ |
| 'service': 'hCaptcha Solver', |
| 'version': '2.3', |
| 'endpoints': { |
| '/solve': 'GET ?sitekey=X&url=Y', |
| '/health': 'GET' |
| } |
| }) |
|
|
| if __name__ == '__main__': |
| print("\nπ€ hCaptcha Solver API v2.3") |
| print("π http://0.0.0.0:7860\n") |
| app.run(host='0.0.0.0', port=7860, debug=False) |