| import gradio as gr |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from torchvision import models, transforms |
| from PIL import Image |
| import numpy as np |
| import cv2 |
| from scipy import stats |
| import requests |
| from io import BytesIO |
| import base64 |
| from fastapi import FastAPI, HTTPException |
| from pydantic import BaseModel |
| import uvicorn |
| from fastapi.middleware.cors import CORSMiddleware |
| import json |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
| # ==================== MODEL DEFINITIONS ==================== |
| |
| class DualPathSiamese(nn.Module): |
| def __init__(self, embedding_dim=256): |
| super(DualPathSiamese, self).__init__() |
| |
| # Deep learning path - use weights parameter instead of pretrained |
| resnet = models.resnet50(weights=None) |
| self.cnn_backbone = nn.Sequential(*list(resnet.children())[:-1]) |
| self.cnn_embedding = nn.Sequential( |
| nn.Linear(2048, 512), |
| nn.BatchNorm1d(512), |
| nn.ReLU(), |
| nn.Dropout(0.5), |
| nn.Linear(512, embedding_dim) |
| ) |
| |
| # Traditional CV path |
| self.feature_embedding = nn.Sequential( |
| nn.Linear(29, 128), |
| nn.BatchNorm1d(128), |
| nn.ReLU(), |
| nn.Dropout(0.3), |
| nn.Linear(128, 64) |
| ) |
| |
| # Fusion layer |
| self.fusion = nn.Sequential( |
| nn.Linear(embedding_dim + 64, 256), |
| nn.BatchNorm1d(256), |
| nn.ReLU(), |
| nn.Dropout(0.3), |
| nn.Linear(256, embedding_dim) |
| ) |
| |
| def forward_once(self, img, features): |
| cnn_out = self.cnn_backbone(img) |
| cnn_out = cnn_out.view(cnn_out.size(0), -1) |
| cnn_embed = self.cnn_embedding(cnn_out) |
| |
| feat_embed = self.feature_embedding(features) |
| combined = torch.cat([cnn_embed, feat_embed], dim=1) |
| output = self.fusion(combined) |
| return F.normalize(output, p=2, dim=1) |
| |
| def forward(self, img1, img2, features): |
| feat1 = features[:, :29] |
| feat2 = features[:, 29:] |
| output1 = self.forward_once(img1, feat1) |
| output2 = self.forward_once(img2, feat2) |
| return output1, output2 |
| |
| class EnsembleSiamese: |
| def __init__(self, device='cpu'): |
| self.device = device |
| self.models = {} |
| self.model_names = ['dualpath', 'resnet50', 'efficientnet'] |
| self.weights = [0.34, 0.33, 0.33] |
| self.models_loaded = False |
| |
| try: |
| # Load DualPath model |
| self.models['dualpath'] = DualPathSiamese(embedding_dim=256).to(device) |
| |
| # Load ResNet50 model |
| resnet = models.resnet50(weights=None) |
| self.models['resnet50'] = self.create_resnet_siamese(resnet, 2048, 256).to(device) |
| |
| # Load EfficientNet model |
| from torchvision.models import efficientnet_b3 |
| efficientnet = efficientnet_b3(weights=None) |
| self.models['efficientnet'] = self.create_efficientnet_siamese(efficientnet, 256).to(device) |
| |
| # Load trained weights with proper settings |
| self.load_weights() |
| self.models_loaded = True |
| print("β
Ensemble model initialized successfully!") |
| |
| except Exception as e: |
| print(f"β Error initializing models: {e}") |
| self.models_loaded = False |
| |
| def create_resnet_siamese(self, resnet, in_features, embedding_dim): |
| class ResNetSiam(nn.Module): |
| def __init__(self): |
| super(ResNetSiam, self).__init__() |
| self.backbone = nn.Sequential(*list(resnet.children())[:-1]) |
| self.embedding = nn.Sequential( |
| nn.Linear(in_features, 512), |
| nn.BatchNorm1d(512), |
| nn.ReLU(), |
| nn.Dropout(0.5), |
| nn.Linear(512, embedding_dim) |
| ) |
| |
| def forward_once(self, x): |
| x = self.backbone(x) |
| x = x.view(x.size(0), -1) |
| x = self.embedding(x) |
| return F.normalize(x, p=2, dim=1) |
| |
| def forward(self, img1, img2, features=None): |
| return self.forward_once(img1), self.forward_once(img2) |
| |
| return ResNetSiam() |
| |
| def create_efficientnet_siamese(self, efficientnet, embedding_dim): |
| class EfficientNetSiam(nn.Module): |
| def __init__(self): |
| super(EfficientNetSiam, self).__init__() |
| self.backbone = efficientnet.features |
| self.avgpool = nn.AdaptiveAvgPool2d(1) |
| self.embedding = nn.Sequential( |
| nn.Linear(1536, 512), |
| nn.BatchNorm1d(512), |
| nn.ReLU(), |
| nn.Dropout(0.4), |
| nn.Linear(512, embedding_dim) |
| ) |
| |
| def forward_once(self, x): |
| x = self.backbone(x) |
| x = self.avgpool(x) |
| x = x.view(x.size(0), -1) |
| x = self.embedding(x) |
| return F.normalize(x, p=2, dim=1) |
| |
| def forward(self, img1, img2, features=None): |
| return self.forward_once(img1), self.forward_once(img2) |
| |
| return EfficientNetSiam() |
| |
| def load_weights(self): |
| """Load trained model weights with proper error handling""" |
| try: |
| # Load DualPath with weights_only=False for compatibility |
| dualpath_state = torch.load('ensemble_dualpath.pth', map_location=self.device, weights_only=False) |
| self.models['dualpath'].load_state_dict(dualpath_state['model_state_dict']) |
| print("β
DualPath weights loaded") |
| |
| # Load ResNet50 |
| resnet_state = torch.load('ensemble_resnet50.pth', map_location=self.device, weights_only=False) |
| self.models['resnet50'].load_state_dict(resnet_state['model_state_dict']) |
| print("β
ResNet50 weights loaded") |
| |
| # Load EfficientNet |
| efficient_state = torch.load('ensemble_efficientnet.pth', map_location=self.device, weights_only=False) |
| self.models['efficientnet'].load_state_dict(efficient_state['model_state_dict']) |
| print("β
EfficientNet weights loaded") |
| |
| except Exception as e: |
| print(f"β οΈ Partial weight loading error: {e}") |
| # Initialize with random weights if loading fails |
| for name, model in self.models.items(): |
| model.apply(self._init_weights) |
| print("π Models initialized with random weights") |
| |
| def _init_weights(self, m): |
| """Initialize weights for models""" |
| if isinstance(m, nn.Linear): |
| torch.nn.init.xavier_uniform_(m.weight) |
| if m.bias is not None: |
| m.bias.data.fill_(0.01) |
| |
| def extract_handcrafted_features(self, img_array): |
| """Extract traditional CV features from numpy array""" |
| if img_array is None: |
| return np.zeros(29) |
| |
| try: |
| features = [] |
| |
| # Color histogram |
| for i in range(3): |
| hist = cv2.calcHist([img_array], [i], None, [8], [0, 256]) |
| features.extend(hist.flatten() / (hist.sum() + 1e-6)) |
| |
| # HSV features |
| hsv = cv2.cvtColor(img_array, cv2.COLOR_RGB2HSV) |
| features.extend([hsv[:,:,i].mean() for i in range(3)]) |
| |
| # Edge density |
| gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) |
| edges = cv2.Canny(gray, 50, 150) |
| features.append(edges.sum() / (edges.size + 1e-6)) |
| |
| # Texture |
| features.append(cv2.Laplacian(gray, cv2.CV_64F).var()) |
| |
| return np.array(features, dtype=np.float32) |
| except Exception as e: |
| print(f"Feature extraction error: {e}") |
| return np.zeros(29) |
| |
| def predict_detailed(self, question_img, answer_imgs, threshold=0.312): |
| """Predict similarity with detailed model breakdown""" |
| if not self.models_loaded: |
| return [{ |
| 'answer_index': i, |
| 'model_predictions': { |
| 'dualpath': {'distance': 1.0, 'confidence': 0.0, 'is_match': False}, |
| 'resnet50': {'distance': 1.0, 'confidence': 0.0, 'is_match': False}, |
| 'efficientnet': {'distance': 1.0, 'confidence': 0.0, 'is_match': False} |
| }, |
| 'ensemble_confidence': 0.0, |
| 'ensemble_distance': 1.0, |
| 'ensemble_match': False, |
| 'final_decision': False |
| } for i in range(len(answer_imgs))] |
| |
| transform = transforms.Compose([ |
| transforms.Resize((224, 224)), |
| transforms.ToTensor(), |
| transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) |
| ]) |
| |
| all_results = [] |
| |
| for answer_idx, answer_img in enumerate(answer_imgs): |
| try: |
| # Preprocess images |
| q_img = transform(question_img.convert('RGB')).unsqueeze(0).to(self.device) |
| a_img = transform(answer_img.convert('RGB')).unsqueeze(0).to(self.device) |
| |
| # Extract features |
| q_features = self.extract_handcrafted_features(np.array(question_img)) |
| a_features = self.extract_handcrafted_features(np.array(answer_img)) |
| features = np.concatenate([q_features, a_features]) |
| features_tensor = torch.tensor(features, dtype=torch.float32).unsqueeze(0).to(self.device) |
| |
| # Get predictions from all models |
| model_predictions = {} |
| distances = [] |
| confidences = [] |
| |
| for name, model in self.models.items(): |
| model.eval() |
| with torch.no_grad(): |
| if name == 'dualpath': |
| out1, out2 = model(q_img, a_img, features_tensor) |
| else: |
| out1, out2 = model(q_img, a_img) |
| |
| dist = F.pairwise_distance(out1, out2) |
| confidence = max(0, 100 * (1 - dist.item())) |
| |
| model_predictions[name] = { |
| 'distance': float(dist.item()), |
| 'confidence': float(confidence), |
| 'is_match': bool(dist.item() < threshold) |
| } |
| |
| distances.append(dist.item()) |
| confidences.append(confidence) |
| |
| # Weighted average |
| weighted_distance = sum(w * d for w, d in zip(self.weights, distances)) |
| weighted_confidence = sum(w * c for w, c in zip(self.weights, confidences)) |
| is_match = weighted_distance < threshold |
| |
| answer_result = { |
| 'answer_index': answer_idx, |
| 'model_predictions': model_predictions, |
| 'ensemble_distance': float(weighted_distance), |
| 'ensemble_confidence': float(weighted_confidence), |
| 'ensemble_match': bool(is_match), |
| 'final_decision': is_match |
| } |
| |
| all_results.append(answer_result) |
| |
| except Exception as e: |
| print(f"Error processing answer {answer_idx}: {e}") |
| # Add fallback result |
| all_results.append({ |
| 'answer_index': answer_idx, |
| 'model_predictions': { |
| 'dualpath': {'distance': 1.0, 'confidence': 0.0, 'is_match': False}, |
| 'resnet50': {'distance': 1.0, 'confidence': 0.0, 'is_match': False}, |
| 'efficientnet': {'distance': 1.0, 'confidence': 0.0, 'is_match': False} |
| }, |
| 'ensemble_confidence': 0.0, |
| 'ensemble_distance': 1.0, |
| 'ensemble_match': False, |
| 'final_decision': False |
| }) |
| |
| return all_results |
| |
| # ==================== INITIALIZE MODEL ==================== |
| |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' |
| print(f"π§ Using device: {device}") |
| ensemble_model = EnsembleSiamese(device=device) |
| |
| # ==================== GRADIO INTERFACE ==================== |
| |
| def format_detailed_results(results): |
| """Format results with detailed model breakdown""" |
| if not ensemble_model.models_loaded: |
| return "β οΈ **MODELS NOT PROPERLY LOADED**\n\nPlease check that all model files are uploaded:\n- ensemble_dualpath.pth\n- ensemble_resnet50.pth\n- ensemble_efficientnet.pth\n\nCurrently using fallback mode with random weights." |
| |
| output = "" |
| |
| # Find best match |
| valid_results = [r for r in results if r['ensemble_confidence'] > 0] |
| if not valid_results: |
| return "β No valid predictions could be made. Please check your images." |
| |
| best_match = max(valid_results, key=lambda x: x['ensemble_confidence']) |
| best_index = best_match['answer_index'] |
| |
| output += "π― **FINAL PREDICTION RESULTS** π―\n\n" |
| output += f"**Best Match: Answer {best_index + 1}** \n" |
| output += f"**Overall Confidence: {best_match['ensemble_confidence']:.2f}%** \n" |
| output += f"**Distance: {best_match['ensemble_distance']:.4f}** \n" |
| output += f"**Match: {'β
YES' if best_match['final_decision'] else 'β NO'}** \n\n" |
| |
| output += " |
| output += "**π DETAILED MODEL BREAKDOWN:**\n\n" |
| |
| for result in results: |
| output += f"## **Answer {result['answer_index'] + 1}**\n" |
| output += f"**Ensemble:** {result['ensemble_confidence']:.2f}% | Distance: {result['ensemble_distance']:.4f} | {'β
MATCH' if result['final_decision'] else 'β NO MATCH'}\n\n" |
| |
| for model_name, prediction in result['model_predictions'].items(): |
| emoji = "π’" if prediction['is_match'] else "π΄" |
| output += f" - **{model_name.upper()}:** {emoji} {prediction['confidence']:.2f}% | Distance: {prediction['distance']:.4f}\n" |
| |
| output += "\n" |
| |
| # Model agreement analysis |
| output += " |
| output += "**π€ MODEL AGREEMENT ANALYSIS:**\n\n" |
| |
| for result in results: |
| matches = sum(1 for pred in result['model_predictions'].values() if pred['is_match']) |
| total_models = len(result['model_predictions']) |
| agreement = (matches / total_models) * 100 |
| |
| consensus_emoji = "π’" if agreement > 66 else "π‘" if agreement > 33 else "π΄" |
| output += f"**Answer {result['answer_index'] + 1}:** {consensus_emoji} {matches}/{total_models} models agree ({agreement:.1f}% consensus)\n" |
| |
| # Add warning if models show suspicious behavior |
| suspicious = any(any(pred['confidence'] > 99.9 for pred in r['model_predictions'].values()) for r in results) |
| if suspicious: |
| output += "\n |
| output += "β οΈ **WARNING:** Some models are showing unusually high confidence scores. This may indicate model loading issues.\n" |
| |
| return output |
| |
| def predict_captcha_detailed(question_image, *answer_images): |
| """Gradio prediction function with detailed output""" |
| # Filter out None images |
| answer_imgs = [img for img in answer_images if img is not None] |
| |
| if not question_image or len(answer_imgs) == 0: |
| return "β Please upload both question and answer images" |
| |
| try: |
| print(f"π Processing: 1 question + {len(answer_imgs)} answers") |
| |
| # Get detailed predictions |
| results = ensemble_model.predict_detailed(question_image, answer_imgs) |
| |
| # Format output |
| output = format_detailed_results(results) |
| |
| # Add technical details |
| output += "\n |
| output += "**βοΈ TECHNICAL DETAILS:**\n\n" |
| output += f"- **Threshold:** 0.312 (optimized during training)\n" |
| output += f"- **Models:** DualPath (CNN + Handcrafted), ResNet50, EfficientNet-B3\n" |
| output += f"- **Ensemble Weights:** DualPath(34%), ResNet50(33%), EfficientNet(33%)\n" |
| output += f"- **Training Accuracy:** 98.67%\n" |
| output += f"- **Device:** {device.upper()}\n" |
| output += f"- **Models Loaded:** {'β
YES' if ensemble_model.models_loaded else 'β NO'}\n" |
| |
| return output |
| |
| except Exception as e: |
| return f"β Error during prediction: {str(e)}" |
| |
| # ==================== FASTAPI SETUP ==================== |
| |
| app = FastAPI(title="CAPTCHA Solver API", version="1.0") |
| |
| # Add CORS middleware |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
| |
| class Base64PredictionRequest(BaseModel): |
| question_base64: str |
| answers_base64: list[str] |
| |
| def base64_to_image(base64_string): |
| """Convert base64 string to PIL Image""" |
| try: |
| # Remove data URL prefix if present |
| if ',' in base64_string: |
| base64_string = base64_string.split(',')[1] |
| |
| image_data = base64.b64decode(base64_string) |
| return Image.open(BytesIO(image_data)) |
| except Exception as e: |
| print(f"Error decoding base64: {e}") |
| return None |
| |
| @app.post("/api/predict") |
| async def api_predict_endpoint(request: Base64PredictionRequest): |
| """API endpoint for userscript with base64 images""" |
| try: |
| print(f"π₯ Received API request: {len(request.answers_base64)} answers") |
| |
| # Convert base64 to images |
| question_img = base64_to_image(request.question_base64) |
| if not question_img: |
| return {"success": False, "error": "Failed to decode question image"} |
| |
| answer_imgs = [] |
| for i, base64_str in enumerate(request.answers_base64): |
| img = base64_to_image(base64_str) |
| if img: |
| answer_imgs.append(img) |
| print(f"β
Decoded answer {i+1}") |
| else: |
| print(f"β Failed to decode answer {i+1}") |
| # Use fallback image |
| answer_imgs.append(Image.new('RGB', (100, 100), color='white')) |
| |
| if len(answer_imgs) == 0: |
| return {"success": False, "error": "No answer images could be decoded"} |
| |
| # Make prediction |
| results = ensemble_model.predict_detailed(question_img, answer_imgs) |
| |
| # Find best match |
| valid_results = [r for r in results if r['ensemble_confidence'] > 0] |
| if not valid_results: |
| return {"success": False, "error": "No valid predictions could be made"} |
| |
| best_index = np.argmax([r['ensemble_confidence'] for r in valid_results]) |
| best_result = valid_results[best_index] |
| |
| # Prepare response |
| response_data = { |
| 'success': True, |
| 'predictions': [ |
| { |
| 'answer_index': r['answer_index'], |
| 'ensemble_confidence': r['ensemble_confidence'], |
| 'ensemble_distance': r['ensemble_distance'], |
| 'ensemble_match': r['ensemble_match'], |
| 'model_predictions': r['model_predictions'] |
| } for r in results |
| ], |
| 'best_match': int(best_index), |
| 'best_confidence': float(best_result['ensemble_confidence']), |
| 'best_distance': float(best_result['ensemble_distance']), |
| 'models_loaded': ensemble_model.models_loaded |
| } |
| |
| print(f"β
API Prediction complete. Best match: {best_index} with {best_result['ensemble_confidence']:.2f}% confidence") |
| return response_data |
| |
| except Exception as e: |
| print(f"β API error: {str(e)}") |
| return {"success": False, "error": str(e)} |
| |
| @app.get("/") |
| async def root(): |
| return { |
| "message": "CAPTCHA Solver API is running!", |
| "version": "1.0", |
| "accuracy": "98.67%", |
| "models_loaded": ensemble_model.models_loaded |
| } |
| |
| @app.get("/health") |
| async def health_check(): |
| return { |
| "status": "healthy", |
| "models_loaded": ensemble_model.models_loaded, |
| "device": device |
| } |
| |
| # ==================== GRADIO UI ==================== |
| |
| with gr.Blocks(title="CAPTCHA Solver - Ensemble AI", theme=gr.themes.Soft()) as demo: |
| gr.Markdown(""" |
| # π CAPTCHA Solver - Ensemble Siamese Network |
| ### **Achieved 98.67% Accuracy during Training** |
| |
| This system uses an ensemble of three advanced neural networks to solve CAPTCHA challenges. |
| """) |
| |
| # Status indicator |
| status = gr.Markdown( |
| value=f"**Status:** {'β
Models Loaded Successfully' if ensemble_model.models_loaded else 'β οΈ Models Not Properly Loaded - Using Fallback Mode'}" |
| ) |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.Markdown("### πΈ Upload Images") |
| question = gr.Image(label="Question Image", type="pil", height=200) |
| |
| gr.Markdown("### π― Answer Images") |
| with gr.Row(): |
| answer1 = gr.Image(label="Answer 1", type="pil", height=150) |
| answer2 = gr.Image(label="Answer 2", type="pil", height=150) |
| with gr.Row(): |
| answer3 = gr.Image(label="Answer 3", type="pil", height=150) |
| answer4 = gr.Image(label="Answer 4", type="pil", height=150) |
| with gr.Row(): |
| answer5 = gr.Image(label="Answer 5", type="pil", height=150) |
| |
| predict_btn = gr.Button("π Analyze CAPTCHA", variant="primary", size="lg") |
| |
| with gr.Column(scale=2): |
| gr.Markdown("### π Prediction Results") |
| output = gr.Markdown( |
| label="Detailed Analysis", |
| value="π Upload images and click 'Analyze CAPTCHA' to see predictions here..." |
| ) |
| |
| # Connect the prediction function |
| predict_btn.click( |
| fn=predict_captcha_detailed, |
| inputs=[question, answer1, answer2, answer3, answer4, answer5], |
| outputs=output |
| ) |
| |
| # ==================== RUN APPLICATION ==================== |
| |
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=7860) |