H / app.py
Kenqt's picture
Upload 4 files
1053eaa verified
#!/usr/bin/env python3
from flask import Flask, request, jsonify
import time
import random
import json
import re
from io import BytesIO
from typing import Dict, List
import tempfile
import os
import subprocess
import zipfile
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.action_chains import ActionChains
from PIL import Image
import numpy as np
import cv2
from transformers import CLIPProcessor, CLIPModel
import torch
import requests
app = Flask(__name__)
print("πŸ”„ Loading CLIP model...")
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
print("βœ… Model ready")
TMPFILES_HOST = "https://tmpfiles.org/api/v1/upload"
def upload_to_tmpfiles(image_path: str) -> str:
try:
with open(image_path, 'rb') as f:
files = {'file': f}
response = requests.post(TMPFILES_HOST, files=files, timeout=10)
if response.status_code == 200:
data = response.json()
url = data.get('data', {}).get('url', '')
return url.replace('tmpfiles.org/', 'tmpfiles.org/dl/')
return None
except Exception as e:
print(f"Upload error: {e}")
return None
def get_chrome_version():
try:
result = subprocess.run(['google-chrome', '--version'], capture_output=True, text=True)
version = result.stdout.strip().split()[-1]
major_version = version.split('.')[0]
return version, major_version
except:
return "unknown", "unknown"
def download_chromedriver():
full_version, major_version = get_chrome_version()
print(f"πŸ” Chrome version: {full_version}")
driver_dir = os.path.expanduser("~/.chromedriver")
driver_path = os.path.join(driver_dir, "chromedriver")
if os.path.exists(driver_path):
print(f"βœ… ChromeDriver exists: {driver_path}")
return driver_path
os.makedirs(driver_dir, exist_ok=True)
try:
url = f"https://googlechromelabs.github.io/chrome-for-testing/LATEST_RELEASE_{major_version}"
response = requests.get(url, timeout=10)
driver_version = response.text.strip()
print(f"πŸ” ChromeDriver version: {driver_version}")
download_url = f"https://storage.googleapis.com/chrome-for-testing-public/{driver_version}/linux64/chromedriver-linux64.zip"
print(f"⬇️ Downloading...")
zip_path = os.path.join(driver_dir, "chromedriver.zip")
response = requests.get(download_url, timeout=60)
with open(zip_path, 'wb') as f:
f.write(response.content)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(driver_dir)
extracted_driver = os.path.join(driver_dir, "chromedriver-linux64", "chromedriver")
if os.path.exists(extracted_driver):
os.rename(extracted_driver, driver_path)
os.chmod(driver_path, 0o755)
os.remove(zip_path)
import shutil
extracted_folder = os.path.join(driver_dir, "chromedriver-linux64")
if os.path.exists(extracted_folder):
shutil.rmtree(extracted_folder)
print(f"βœ… ChromeDriver installed: {driver_path}")
return driver_path
except Exception as e:
print(f"❌ Download failed: {e}")
raise e
def human_move_to_element(driver, element):
action = ActionChains(driver)
current_x = random.randint(100, 500)
current_y = random.randint(100, 500)
target_x = element.location['x'] + element.size['width'] / 2
target_y = element.location['y'] + element.size['height'] / 2
steps = random.randint(15, 30)
for i in range(steps):
progress = i / steps
noise_x = random.uniform(-3, 3)
noise_y = random.uniform(-3, 3)
intermediate_x = current_x + (target_x - current_x) * progress + noise_x
intermediate_y = current_y + (target_y - current_y) * progress + noise_y
action.move_by_offset(intermediate_x - current_x, intermediate_y - current_y)
current_x = intermediate_x
current_y = intermediate_y
time.sleep(random.uniform(0.001, 0.005))
action.perform()
time.sleep(random.uniform(0.1, 0.3))
def human_click(driver, element):
human_move_to_element(driver, element)
time.sleep(random.uniform(0.05, 0.15))
element.click()
time.sleep(random.uniform(0.1, 0.2))
def create_driver():
print("πŸ”„ Initializing ChromeDriver...")
driver_path = download_chromedriver()
options = Options()
options.add_argument('--headless=new')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
options.add_argument('--window-size=1920,1080')
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_argument('--disable-web-security')
options.add_argument('--disable-features=IsolateOrigins,site-per-process')
options.add_argument('--allow-running-insecure-content')
options.add_argument('--disable-setuid-sandbox')
options.add_argument('--disable-software-rasterizer')
options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option('useAutomationExtension', False)
options.set_capability('goog:loggingPrefs', {'browser': 'ALL'})
service = Service(driver_path)
service.log_path = '/dev/null'
driver = webdriver.Chrome(service=service, options=options)
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
driver.set_window_size(1920, 1080)
print("βœ… Driver ready")
return driver
def solve_image_with_ai(image: Image.Image, target: str) -> float:
inputs = clip_processor(
text=[f"a photo of {target}", "other objects"],
images=image,
return_tensors="pt",
padding=True
)
outputs = clip_model(**inputs)
probs = outputs.logits_per_image.softmax(dim=1)
return probs[0][0].item()
def detect_puzzle_points(image: Image.Image) -> List[Dict]:
img_array = np.array(image)
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
circles = cv2.HoughCircles(gray, cv2.HOUGH_GRADIENT, dp=1, minDist=30, param1=50, param2=30, minRadius=8, maxRadius=30)
points = []
if circles is not None:
circles = np.uint16(np.around(circles))
for i, (x, y, r) in enumerate(circles[0, :]):
points.append({'number': i + 1, 'x': int(x), 'y': int(y), 'radius': int(r)})
points = sorted(points, key=lambda p: (p['y'], p['x']))
return points
def solve_puzzle_captcha(driver, canvas_element) -> bool:
try:
png = canvas_element.screenshot_as_png()
img = Image.open(BytesIO(png))
points = detect_puzzle_points(img)
if not points:
return False
print(f"βœ… Detected {len(points)} puzzle points")
action = ActionChains(driver)
start_point = points[0]
offset_x = start_point['x'] - canvas_element.size['width'] / 2
offset_y = start_point['y'] - canvas_element.size['height'] / 2
action.move_to_element_with_offset(canvas_element, offset_x, offset_y)
action.click_and_hold()
for point in points[1:]:
offset_x = point['x'] - canvas_element.size['width'] / 2
offset_y = point['y'] - canvas_element.size['height'] / 2
action.move_to_element_with_offset(canvas_element, offset_x, offset_y)
time.sleep(random.uniform(0.1, 0.3))
action.release()
action.perform()
time.sleep(1)
return True
except Exception as e:
print(f"❌ Puzzle error: {e}")
return False
def extract_challenge_info(driver):
try:
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CLASS_NAME, "prompt-text")))
prompt_text = driver.find_element(By.CLASS_NAME, "prompt-text").text
match = re.search(r'Select all (\w+)', prompt_text, re.IGNORECASE)
target = match.group(1).rstrip('s').lower() if match else "unknown"
images = []
img_elements = driver.find_elements(By.CSS_SELECTOR, ".task-image")
for img_elem in img_elements:
png = img_elem.screenshot_as_png()
img = Image.open(BytesIO(png))
images.append(img)
return {'target': target, 'images': images, 'count': len(images), 'elements': img_elements}
except Exception as e:
print(f"❌ Extract error: {e}")
return None
def solve_grid_captcha(driver, challenge_info) -> bool:
try:
selected_indices = []
for idx, img in enumerate(challenge_info['images']):
confidence = solve_image_with_ai(img, challenge_info['target'])
if confidence > 0.55:
selected_indices.append(idx)
print(f"βœ… Image {idx}: {confidence:.2%}")
else:
print(f"⏭️ Image {idx}: {confidence:.2%}")
if not selected_indices:
return False
for idx in selected_indices:
if idx < len(challenge_info['elements']):
human_click(driver, challenge_info['elements'][idx])
submit_btn = driver.find_element(By.CSS_SELECTOR, ".button-submit")
human_click(driver, submit_btn)
time.sleep(2)
return True
except Exception as e:
print(f"❌ Grid error: {e}")
return False
def detect_challenge_type(driver) -> str:
try:
if driver.find_elements(By.CSS_SELECTOR, "canvas"):
return "puzzle"
elif driver.find_elements(By.CSS_SELECTOR, ".task-image"):
return "image_grid"
else:
return "unknown"
except:
return "unknown"
def screenshot_and_upload(element) -> str:
try:
png = element.screenshot_as_png()
temp_dir = tempfile.gettempdir()
temp_path = os.path.join(temp_dir, f"hcaptcha_{int(time.time())}_{random.randint(1000,9999)}.png")
with open(temp_path, 'wb') as f:
f.write(png)
url = upload_to_tmpfiles(temp_path)
try:
os.remove(temp_path)
except:
pass
return url
except Exception as e:
print(f"❌ Screenshot error: {e}")
return None
def solve_hcaptcha(sitekey: str, url: str) -> Dict:
driver = None
screenshot_urls = []
try:
driver = create_driver()
driver.get(url)
print(f"βœ… Opened: {url}")
time.sleep(random.uniform(2, 4))
WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, "iframe[src*='hcaptcha']")))
iframes = driver.find_elements(By.CSS_SELECTOR, "iframe[src*='hcaptcha']")
checkbox_iframe = None
for iframe in iframes:
if 'checkbox' in iframe.get_attribute('src'):
checkbox_iframe = iframe
break
if not checkbox_iframe:
return {'success': False, 'error': 'Checkbox not found'}
screenshot_url = screenshot_and_upload(checkbox_iframe)
if screenshot_url:
screenshot_urls.append({'type': 'checkbox', 'url': screenshot_url})
driver.switch_to.frame(checkbox_iframe)
checkbox = WebDriverWait(driver, 10).until(EC.element_to_be_clickable((By.ID, "checkbox")))
human_click(driver, checkbox)
print("βœ… Checkbox clicked")
driver.switch_to.default_content()
time.sleep(random.uniform(2, 4))
iframes = driver.find_elements(By.CSS_SELECTOR, "iframe[src*='hcaptcha']")
challenge_iframe = None
for iframe in iframes:
if 'challenge' in iframe.get_attribute('src'):
challenge_iframe = iframe
break
if not challenge_iframe:
token = extract_token(driver)
if token:
return {'success': True, 'token': token, 'method': 'checkbox_only', 'screenshots': screenshot_urls}
else:
return {'success': False, 'error': 'No token', 'screenshots': screenshot_urls}
screenshot_url = screenshot_and_upload(challenge_iframe)
if screenshot_url:
screenshot_urls.append({'type': 'challenge', 'url': screenshot_url})
driver.switch_to.frame(challenge_iframe)
challenge_type = detect_challenge_type(driver)
print(f"🎯 Challenge: {challenge_type}")
if challenge_type == "puzzle":
canvas = driver.find_element(By.CSS_SELECTOR, "canvas")
success = solve_puzzle_captcha(driver, canvas)
if not success:
driver.switch_to.default_content()
return {'success': False, 'error': 'Puzzle failed', 'screenshots': screenshot_urls}
elif challenge_type == "image_grid":
challenge_info = extract_challenge_info(driver)
if not challenge_info:
driver.switch_to.default_content()
return {'success': False, 'error': 'Extract failed', 'screenshots': screenshot_urls}
print(f"🎯 Target: {challenge_info['target']}")
success = solve_grid_captcha(driver, challenge_info)
if not success:
driver.switch_to.default_content()
return {'success': False, 'error': 'Grid failed', 'screenshots': screenshot_urls}
else:
driver.switch_to.default_content()
return {'success': False, 'error': f'Unknown: {challenge_type}', 'screenshots': screenshot_urls}
driver.switch_to.default_content()
time.sleep(3)
token = extract_token(driver)
if token:
return {'success': True, 'token': token, 'challenge_type': challenge_type, 'screenshots': screenshot_urls}
else:
return {'success': False, 'error': 'No token', 'screenshots': screenshot_urls}
except Exception as e:
import traceback
error_detail = traceback.format_exc()
print(f"❌ Error: {error_detail}")
return {'success': False, 'error': str(e), 'error_detail': error_detail, 'screenshots': screenshot_urls}
finally:
if driver:
try:
driver.quit()
except:
pass
def extract_token(driver):
try:
time.sleep(2)
token_element = driver.find_element(By.NAME, "h-captcha-response")
token = token_element.get_attribute("value")
if token and len(token) > 10:
return token
token_element = driver.find_element(By.NAME, "g-recaptcha-response")
token = token_element.get_attribute("value")
if token and len(token) > 10:
return token
return None
except:
return None
@app.route('/solve', methods=['GET'])
def solve():
sitekey = request.args.get('sitekey')
url = request.args.get('url')
if not sitekey or not url:
return jsonify({'success': False, 'error': 'Missing params'}), 400
print(f"\n{'='*60}")
print(f"πŸš€ Solving: {url}")
print(f"{'='*60}\n")
result = solve_hcaptcha(sitekey, url)
return jsonify(result)
@app.route('/health', methods=['GET'])
def health():
full_version, major_version = get_chrome_version()
return jsonify({
'status': 'online',
'model': 'CLIP',
'chrome': full_version
})
@app.route('/', methods=['GET'])
def root():
return jsonify({
'service': 'hCaptcha Solver',
'version': '2.3',
'endpoints': {
'/solve': 'GET ?sitekey=X&url=Y',
'/health': 'GET'
}
})
if __name__ == '__main__':
print("\nπŸ€– hCaptcha Solver API v2.3")
print("🌐 http://0.0.0.0:7860\n")
app.run(host='0.0.0.0', port=7860, debug=False)