| import gradio as gr |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from torchvision import models, transforms |
| from PIL import Image |
| import numpy as np |
| import cv2 |
| import requests |
| from io import BytesIO |
| import base64 |
| from fastapi import FastAPI |
| from pydantic import BaseModel |
| import uvicorn |
| from fastapi.middleware.cors import CORSMiddleware |
| import json |
| import warnings |
| import threading |
| import time |
| import random |
| from itertools import combinations |
| warnings.filterwarnings('ignore') |
|
|
| |
|
|
| def keep_alive_ping(): |
| def ping(): |
| time.sleep(10) |
| while True: |
| try: |
| requests.get("https://huijio-easycap.hf.space/api/health", timeout=10) |
| print("π Keep-alive ping sent") |
| except Exception as e: |
| print(f"β Keep-alive ping failed: {e}") |
| time.sleep(1200) |
| |
| thread = threading.Thread(target=ping, daemon=True) |
| thread.start() |
| print("β
Keep-alive service started") |
|
|
| |
|
|
| class DualPathSiamese(nn.Module): |
| def __init__(self, embedding_dim=256): |
| super(DualPathSiamese, self).__init__() |
| |
| |
| resnet = models.resnet50(weights=None) |
| self.cnn_backbone = nn.Sequential(*list(resnet.children())[:-1]) |
| self.cnn_embedding = nn.Sequential( |
| nn.Linear(2048, 512), |
| nn.BatchNorm1d(512), |
| nn.ReLU(), |
| nn.Dropout(0.5), |
| nn.Linear(512, embedding_dim) |
| ) |
| |
| |
| self.feature_embedding = nn.Sequential( |
| nn.Linear(29, 128), |
| nn.BatchNorm1d(128), |
| nn.ReLU(), |
| nn.Dropout(0.3), |
| nn.Linear(128, 64) |
| ) |
| |
| |
| self.fusion = nn.Sequential( |
| nn.Linear(embedding_dim + 64, 256), |
| nn.BatchNorm1d(256), |
| nn.ReLU(), |
| nn.Dropout(0.3), |
| nn.Linear(256, embedding_dim) |
| ) |
|
|
| def forward_once(self, img, features): |
| cnn_out = self.cnn_backbone(img) |
| cnn_out = cnn_out.view(cnn_out.size(0), -1) |
| cnn_embed = self.cnn_embedding(cnn_out) |
| |
| feat_embed = self.feature_embedding(features) |
| combined = torch.cat([cnn_embed, feat_embed], dim=1) |
| output = self.fusion(combined) |
| return F.normalize(output, p=2, dim=1) |
|
|
| def forward(self, img1, img2, features): |
| feat1 = features[:, :29] |
| feat2 = features[:, 29:] |
| output1 = self.forward_once(img1, feat1) |
| output2 = self.forward_once(img2, feat2) |
| return output1, output2 |
|
|
| class EnsembleSiamese: |
| def __init__(self, device='cpu'): |
| self.device = device |
| self.models = {} |
| self.model_names = ['dualpath', 'resnet50', 'efficientnet'] |
| self.weights = [0.34, 0.33, 0.33] |
| self.models_loaded = False |
| |
| try: |
| |
| self.models['dualpath'] = DualPathSiamese(embedding_dim=256).to(device) |
| |
| |
| resnet = models.resnet50(weights=None) |
| self.models['resnet50'] = self.create_resnet_siamese(resnet, 2048, 256).to(device) |
| |
| |
| from torchvision.models import efficientnet_b3 |
| efficientnet = efficientnet_b3(weights=None) |
| self.models['efficientnet'] = self.create_efficientnet_siamese(efficientnet, 256).to(device) |
| |
| |
| self.load_weights() |
| self.models_loaded = True |
| print("β
Ensemble model initialized successfully with your trained weights!") |
| |
| except Exception as e: |
| print(f"β Error initializing models: {e}") |
| self.models_loaded = False |
|
|
| def create_resnet_siamese(self, resnet, in_features, embedding_dim): |
| class ResNetSiam(nn.Module): |
| def __init__(self): |
| super(ResNetSiam, self).__init__() |
| self.backbone = nn.Sequential(*list(resnet.children())[:-1]) |
| self.embedding = nn.Sequential( |
| nn.Linear(in_features, 512), |
| nn.BatchNorm1d(512), |
| nn.ReLU(), |
| nn.Dropout(0.5), |
| nn.Linear(512, embedding_dim) |
| ) |
|
|
| def forward_once(self, x): |
| x = self.backbone(x) |
| x = x.view(x.size(0), -1) |
| x = self.embedding(x) |
| return F.normalize(x, p=2, dim=1) |
|
|
| def forward(self, img1, img2, features=None): |
| return self.forward_once(img1), self.forward_once(img2) |
|
|
| return ResNetSiam() |
|
|
| def create_efficientnet_siamese(self, efficientnet, embedding_dim): |
| class EfficientNetSiam(nn.Module): |
| def __init__(self): |
| super(EfficientNetSiam, self).__init__() |
| self.backbone = efficientnet.features |
| self.avgpool = nn.AdaptiveAvgPool2d(1) |
| self.embedding = nn.Sequential( |
| nn.Linear(1536, 512), |
| nn.BatchNorm1d(512), |
| nn.ReLU(), |
| nn.Dropout(0.4), |
| nn.Linear(512, embedding_dim) |
| ) |
|
|
| def forward_once(self, x): |
| x = self.backbone(x) |
| x = self.avgpool(x) |
| x = x.view(x.size(0), -1) |
| x = self.embedding(x) |
| return F.normalize(x, p=2, dim=1) |
|
|
| def forward(self, img1, img2, features=None): |
| return self.forward_once(img1), self.forward_once(img2) |
|
|
| return EfficientNetSiam() |
|
|
| def load_weights(self): |
| """Load your trained model weights""" |
| try: |
| |
| dualpath_state = torch.load('ensemble_dualpath.pth', map_location=self.device, weights_only=False) |
| self.models['dualpath'].load_state_dict(dualpath_state['model_state_dict']) |
| print("β
DualPath weights loaded") |
| |
| |
| resnet_state = torch.load('ensemble_resnet50.pth', map_location=self.device, weights_only=False) |
| self.models['resnet50'].load_state_dict(resnet_state['model_state_dict']) |
| print("β
ResNet50 weights loaded") |
| |
| |
| efficient_state = torch.load('ensemble_efficientnet.pth', map_location=self.device, weights_only=False) |
| self.models['efficientnet'].load_state_dict(efficient_state['model_state_dict']) |
| print("β
EfficientNet weights loaded") |
| |
| except Exception as e: |
| print(f"β Error loading your models: {e}") |
| print("Please make sure these files are uploaded:") |
| print("- ensemble_dualpath.pth") |
| print("- ensemble_resnet50.pth") |
| print("- ensemble_efficientnet.pth") |
| raise e |
|
|
| def extract_handcrafted_features(self, img_array): |
| """Extract traditional CV features from numpy array""" |
| if img_array is None: |
| return np.zeros(29) |
|
|
| try: |
| features = [] |
| |
| |
| for i in range(3): |
| hist = cv2.calcHist([img_array], [i], None, [8], [0, 256]) |
| features.extend(hist.flatten() / (hist.sum() + 1e-6)) |
|
|
| |
| hsv = cv2.cvtColor(img_array, cv2.COLOR_RGB2HSV) |
| features.extend([hsv[:,:,i].mean() for i in range(3)]) |
|
|
| |
| gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) |
| edges = cv2.Canny(gray, 50, 150) |
| features.append(edges.sum() / (edges.size + 1e-6)) |
|
|
| |
| features.append(cv2.Laplacian(gray, cv2.CV_64F).var()) |
|
|
| return np.array(features, dtype=np.float32) |
| except Exception as e: |
| print(f"Feature extraction error: {e}") |
| return np.zeros(29) |
|
|
| def compare_pair(self, img1, img2): |
| """Compare two images and return similarity score""" |
| transform = transforms.Compose([ |
| transforms.Resize((224, 224)), |
| transforms.ToTensor(), |
| transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) |
| ]) |
| |
| try: |
| img1_tensor = transform(img1.convert('RGB')).unsqueeze(0).to(self.device) |
| img2_tensor = transform(img2.convert('RGB')).unsqueeze(0).to(self.device) |
| |
| |
| img1_features = self.extract_handcrafted_features(np.array(img1)) |
| img2_features = self.extract_handcrafted_features(np.array(img2)) |
| features = np.concatenate([img1_features, img2_features]) |
| features_tensor = torch.tensor(features, dtype=torch.float32).unsqueeze(0).to(self.device) |
| |
| distances = [] |
| |
| for name, model in self.models.items(): |
| model.eval() |
| with torch.no_grad(): |
| if name == 'dualpath': |
| out1, out2 = model(img1_tensor, img2_tensor, features_tensor) |
| else: |
| out1, out2 = model(img1_tensor, img2_tensor) |
| |
| dist = F.pairwise_distance(out1, out2) |
| distances.append(dist.item()) |
| |
| |
| weighted_distance = sum(w * d for w, d in zip(self.weights, distances)) |
| confidence = max(0, 100 * (1 - weighted_distance)) |
| |
| return { |
| 'distance': float(weighted_distance), |
| 'confidence': float(confidence), |
| 'is_match': weighted_distance < 0.312, |
| 'individual_distances': { |
| 'dualpath': distances[0], |
| 'resnet50': distances[1], |
| 'efficientnet': distances[2] |
| } |
| } |
| |
| except Exception as e: |
| print(f"Error comparing pair: {e}") |
| return { |
| 'distance': 1.0, |
| 'confidence': 0, |
| 'is_match': False |
| } |
|
|
| def find_similar_pairs(self, images): |
| """Find which two images are similar by comparing all pairs""" |
| if not self.models_loaded: |
| raise Exception("Models not loaded properly") |
| |
| if len(images) != 5: |
| raise Exception(f"Expected 5 images, got {len(images)}") |
| |
| print(f"π Comparing all pairs of {len(images)} images...") |
| |
| |
| pairs = list(combinations(range(len(images)), 2)) |
| pair_results = [] |
| |
| for i, j in pairs: |
| result = self.compare_pair(images[i], images[j]) |
| pair_results.append({ |
| 'pair': (i, j), |
| 'distance': result['distance'], |
| 'confidence': result['confidence'], |
| 'is_match': result['is_match'], |
| 'individual_distances': result.get('individual_distances', {}) |
| }) |
| print(f" Pair ({i},{j}): distance={result['distance']:.4f}, confidence={result['confidence']:.2f}%, match={result['is_match']}") |
| |
| |
| if pair_results: |
| most_similar = min(pair_results, key=lambda x: x['distance']) |
| similar_pairs = [p for p in pair_results if p['is_match']] |
| |
| print(f"π― Most similar pair: {most_similar['pair']} (distance: {most_similar['distance']:.4f})") |
| print(f"β
Matching pairs found: {[p['pair'] for p in similar_pairs]}") |
| |
| return { |
| 'most_similar_pair': most_similar['pair'], |
| 'matching_pairs': [p['pair'] for p in similar_pairs], |
| 'all_pair_results': pair_results, |
| 'best_confidence': most_similar['confidence'] |
| } |
| else: |
| raise Exception("No pairs could be compared") |
|
|
| |
|
|
| device = 'cuda' if torch.cuda.is_available() else 'cpu' |
| print(f"π§ Using device: {device}") |
|
|
| try: |
| ensemble_model = EnsembleSiamese(device=device) |
| print("π Successfully loaded your trained ensemble models!") |
| except Exception as e: |
| print(f"β Failed to load your models: {e}") |
| print("π Using fallback model...") |
| |
| |
| class FallbackModel: |
| def __init__(self): |
| self.models_loaded = False |
| |
| def find_similar_pairs(self, images): |
| |
| print("π Using fallback color-based matching...") |
| pairs = list(combinations(range(len(images)), 2)) |
| pair_results = [] |
| |
| for i, j in pairs: |
| img1_array = np.array(images[i].convert('RGB')) |
| img2_array = np.array(images[j].convert('RGB')) |
| |
| |
| color_distance = np.linalg.norm(img1_array.mean(axis=(0,1)) - img2_array.mean(axis=(0,1))) |
| confidence = max(0, 100 * (1 - color_distance / 255)) |
| is_match = color_distance < 30 |
| |
| pair_results.append({ |
| 'pair': (i, j), |
| 'distance': float(color_distance / 255), |
| 'confidence': float(confidence), |
| 'is_match': bool(is_match) |
| }) |
| |
| print(f" Pair ({i},{j}): color_distance={color_distance:.2f}, match={is_match}") |
| |
| most_similar = min(pair_results, key=lambda x: x['distance']) |
| similar_pairs = [p for p in pair_results if p['is_match']] |
| |
| return { |
| 'most_similar_pair': most_similar['pair'], |
| 'matching_pairs': [p['pair'] for p in similar_pairs], |
| 'all_pair_results': pair_results, |
| 'best_confidence': most_similar['confidence'] |
| } |
| |
| ensemble_model = FallbackModel() |
|
|
| |
|
|
| app = FastAPI(title="CAPTCHA Solver API - Pair Comparison", version="4.0") |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| class CAPTCHARequest(BaseModel): |
| answers_base64: list[str] |
|
|
| def base64_to_image(base64_string): |
| try: |
| if ',' in base64_string: |
| base64_string = base64_string.split(',')[1] |
| image_data = base64.b64decode(base64_string) |
| return Image.open(BytesIO(image_data)) |
| except Exception as e: |
| print(f"Error decoding base64: {e}") |
| return None |
|
|
| @app.post("/api/solve-captcha") |
| async def solve_captcha(request: CAPTCHARequest): |
| """Main endpoint for userscript - finds similar pairs among the 5 images""" |
| try: |
| print(f"π₯ Received CAPTCHA solve request: {len(request.answers_base64)} answer images") |
| |
| |
| answer_imgs = [] |
| for i, answer_base64 in enumerate(request.answers_base64): |
| img = base64_to_image(answer_base64) |
| if img: |
| answer_imgs.append(img) |
| print(f" β
Decoded answer image {i}") |
| else: |
| print(f" β Failed to decode answer image {i}") |
| |
| if len(answer_imgs) != 5: |
| return {"success": False, "error": f"Expected 5 images, got {len(answer_imgs)}"} |
| |
| |
| result = ensemble_model.find_similar_pairs(answer_imgs) |
| |
| |
| similar_indices = list(result['most_similar_pair']) |
| |
| response_data = { |
| 'success': True, |
| 'similar_indices': similar_indices, |
| 'most_similar_pair': result['most_similar_pair'], |
| 'matching_pairs': result['matching_pairs'], |
| 'best_confidence': result['best_confidence'], |
| 'all_pairs': result['all_pair_results'], |
| 'models_loaded': ensemble_model.models_loaded if hasattr(ensemble_model, 'models_loaded') else False, |
| 'model_type': 'your_ensemble' if hasattr(ensemble_model, 'models_loaded') else 'fallback' |
| } |
| |
| print(f"β
CAPTCHA solved. Similar images: {similar_indices}") |
| return response_data |
| |
| except Exception as e: |
| print(f"β CAPTCHA solve error: {str(e)}") |
| return {"success": False, "error": str(e)} |
|
|
| @app.get("/api/health") |
| async def health_check(): |
| return { |
| "status": "healthy", |
| "models_loaded": ensemble_model.models_loaded if hasattr(ensemble_model, 'models_loaded') else False, |
| "model_type": "your_ensemble" if hasattr(ensemble_model, 'models_loaded') else "fallback", |
| "device": device, |
| "timestamp": time.time() |
| } |
|
|
| @app.get("/") |
| async def root(): |
| return { |
| "message": "CAPTCHA Solver API - Pair Comparison", |
| "version": "4.0", |
| "models": "Your Ensemble (DualPath + ResNet50 + EfficientNet)", |
| "method": "Compares all pairs of 5 images to find duplicates", |
| "accuracy": "98.67%", |
| "endpoint": "POST /api/solve-captcha" |
| } |
|
|
| |
|
|
| def predict_captcha_gradio(*answer_images): |
| answer_imgs = [img for img in answer_images if img is not None] |
| |
| if len(answer_imgs) != 5: |
| return "β Please upload exactly 5 answer images" |
| |
| try: |
| result = ensemble_model.find_similar_pairs(answer_imgs) |
| |
| output = "π― **CAPTCHA SOLVER RESULTS - PAIR COMPARISON** π―\n\n" |
| |
| if hasattr(ensemble_model, 'models_loaded') and ensemble_model.models_loaded: |
| output += "π€ **Using Your Trained Ensemble Models**\n" |
| output += "π **Trained Accuracy: 98.67%**\n\n" |
| else: |
| output += "β οΈ **Using Fallback Model**\n\n" |
| |
| |
| similar_indices = list(result['most_similar_pair']) |
| output += f"β
**DUPLICATE IMAGES FOUND:** Areas {[i+1 for i in similar_indices]}\n\n" |
| output += f"π **Confidence:** {result['best_confidence']:.2f}%\n\n" |
| |
| output += "**All Pair Comparisons:**\n" |
| for pair_result in result['all_pair_results']: |
| i, j = pair_result['pair'] |
| status = "β
MATCH" if pair_result['is_match'] else "β DIFFERENT" |
| output += f"Areas {i+1} & {j+1}: {status} | Distance: {pair_result['distance']:.4f} | Confidence: {pair_result['confidence']:.2f}%\n" |
| |
| output += f"\n**π€ RECOMMENDATION:** Click on areas {similar_indices[0] + 1} and {similar_indices[1] + 1}" |
| |
| return output |
| |
| except Exception as e: |
| return f"β Error: {str(e)}" |
|
|
| |
|
|
| with gr.Blocks(title="CAPTCHA Solver - Pair Comparison", theme=gr.themes.Soft()) as demo: |
| gr.Markdown(""" |
| # π CAPTCHA Solver - Pair Comparison |
| ### **Finds Duplicate Images Among 5 Options** |
| |
| **How it works:** |
| - Compares all 10 possible pairs of the 5 images |
| - Finds the two images that are most similar |
| - Uses your trained ensemble models (98.67% accuracy) |
| |
| **Models:** DualPath + ResNet50 + EfficientNet-B3 |
| **Method:** Pairwise comparison of all 5 answer images |
| |
| **API Endpoint:** `POST /api/solve-captcha` |
| """) |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.Markdown("### π― Upload 5 Answer Images") |
| gr.Markdown("The system will compare all pairs to find the two duplicates") |
| |
| with gr.Row(): |
| answer1 = gr.Image(label="Area 1", type="pil", height=120) |
| answer2 = gr.Image(label="Area 2", type="pil", height=120) |
| with gr.Row(): |
| answer3 = gr.Image(label="Area 3", type="pil", height=120) |
| answer4 = gr.Image(label="Area 4", type="pil", height=120) |
| answer5 = gr.Image(label="Area 5", type="pil", height=120) |
| |
| predict_btn = gr.Button("π Find Duplicates", variant="primary") |
| |
| with gr.Column(scale=2): |
| gr.Markdown("### π Pair Comparison Results") |
| output = gr.Markdown( |
| label="Solution", |
| value="Upload 5 images to find duplicates..." |
| ) |
| |
| predict_btn.click( |
| fn=predict_captcha_gradio, |
| inputs=[answer1, answer2, answer3, answer4, answer5], |
| outputs=output |
| ) |
|
|
| |
| app = gr.mount_gradio_app(app, demo, path="/") |
|
|
| |
|
|
| keep_alive_ping() |
|
|
| if __name__ == "__main__": |
| print("π Starting CAPTCHA Solver API Server...") |
| print("β
Keep-Alive Service: ACTIVE") |
| print("π API URL: https://huijio-easycap.hf.space/api/solve-captcha") |
| print("π― Using Your Trained Ensemble Models") |
| print("π Method: Pairwise comparison of all 5 images") |
| uvicorn.run(app, host="0.0.0.0", port=7860, timeout_keep_alive=60) |