zeracap / app.py
huijio's picture
Update app.py
3275706 verified
import gradio as gr
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models, transforms
from PIL import Image
import numpy as np
import cv2
from scipy import stats
import requests
from io import BytesIO
import base64
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
import json
# ==================== MODEL DEFINITIONS ====================
class DualPathSiamese(nn.Module):
def __init__(self, embedding_dim=256):
super(DualPathSiamese, self).__init__()
# Deep learning path
resnet = models.resnet50(pretrained=False)
self.cnn_backbone = nn.Sequential(*list(resnet.children())[:-1])
self.cnn_embedding = nn.Sequential(
nn.Linear(2048, 512),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(512, embedding_dim)
)
# Traditional CV path
self.feature_embedding = nn.Sequential(
nn.Linear(29, 128),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(128, 64)
)
# Fusion layer
self.fusion = nn.Sequential(
nn.Linear(embedding_dim + 64, 256),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(256, embedding_dim)
)
def forward_once(self, img, features):
cnn_out = self.cnn_backbone(img)
cnn_out = cnn_out.view(cnn_out.size(0), -1)
cnn_embed = self.cnn_embedding(cnn_out)
feat_embed = self.feature_embedding(features)
combined = torch.cat([cnn_embed, feat_embed], dim=1)
output = self.fusion(combined)
return F.normalize(output, p=2, dim=1)
def forward(self, img1, img2, features):
feat1 = features[:, :29]
feat2 = features[:, 29:]
output1 = self.forward_once(img1, feat1)
output2 = self.forward_once(img2, feat2)
return output1, output2
class EnsembleSiamese:
def __init__(self, device='cpu'):
self.device = device
self.models = {}
self.model_names = ['dualpath', 'resnet50', 'efficientnet']
self.weights = [0.34, 0.33, 0.33]
# Load DualPath model
self.models['dualpath'] = DualPathSiamese(embedding_dim=256).to(device)
# Load ResNet50 model
resnet = models.resnet50(pretrained=False)
self.models['resnet50'] = self.create_resnet_siamese(resnet, 2048, 256).to(device)
# Load EfficientNet model
from torchvision.models import efficientnet_b3
efficientnet = efficientnet_b3(pretrained=False)
self.models['efficientnet'] = self.create_efficientnet_siamese(efficientnet, 256).to(device)
# Load trained weights
self.load_weights()
print("βœ… Ensemble model initialized!")
def create_resnet_siamese(self, resnet, in_features, embedding_dim):
class ResNetSiam(nn.Module):
def __init__(self):
super(ResNetSiam, self).__init__()
self.backbone = nn.Sequential(*list(resnet.children())[:-1])
self.embedding = nn.Sequential(
nn.Linear(in_features, 512),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(512, embedding_dim)
)
def forward_once(self, x):
x = self.backbone(x)
x = x.view(x.size(0), -1)
x = self.embedding(x)
return F.normalize(x, p=2, dim=1)
def forward(self, img1, img2, features=None):
return self.forward_once(img1), self.forward_once(img2)
return ResNetSiam()
def create_efficientnet_siamese(self, efficientnet, embedding_dim):
class EfficientNetSiam(nn.Module):
def __init__(self):
super(EfficientNetSiam, self).__init__()
self.backbone = efficientnet.features
self.avgpool = nn.AdaptiveAvgPool2d(1)
self.embedding = nn.Sequential(
nn.Linear(1536, 512),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.4),
nn.Linear(512, embedding_dim)
)
def forward_once(self, x):
x = self.backbone(x)
x = self.avgpool(x)
x = x.view(x.size(0), -1)
x = self.embedding(x)
return F.normalize(x, p=2, dim=1)
def forward(self, img1, img2, features=None):
return self.forward_once(img1), self.forward_once(img2)
return EfficientNetSiam()
def load_weights(self):
"""Load trained model weights"""
try:
# Load DualPath
dualpath_state = torch.load('ensemble_dualpath.pth', map_location=self.device)
self.models['dualpath'].load_state_dict(dualpath_state['model_state_dict'])
# Load ResNet50
resnet_state = torch.load('ensemble_resnet50.pth', map_location=self.device)
self.models['resnet50'].load_state_dict(resnet_state['model_state_dict'])
# Load EfficientNet
efficient_state = torch.load('ensemble_efficientnet.pth', map_location=self.device)
self.models['efficientnet'].load_state_dict(efficient_state['model_state_dict'])
print("βœ… All model weights loaded successfully!")
except Exception as e:
print(f"❌ Error loading weights: {e}")
def extract_handcrafted_features(self, img_array):
"""Extract traditional CV features from numpy array"""
if img_array is None:
return np.zeros(29)
features = []
# Color histogram
for i in range(3):
hist = cv2.calcHist([img_array], [i], None, [8], [0, 256])
features.extend(hist.flatten() / (hist.sum() + 1e-6))
# HSV features
hsv = cv2.cvtColor(img_array, cv2.COLOR_RGB2HSV)
features.extend([hsv[:,:,i].mean() for i in range(3)])
# Edge density
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
edges = cv2.Canny(gray, 50, 150)
features.append(edges.sum() / edges.size)
# Texture
features.append(cv2.Laplacian(gray, cv2.CV_64F).var())
return np.array(features, dtype=np.float32)
def predict_detailed(self, question_img, answer_imgs, threshold=0.312):
"""Predict similarity with detailed model breakdown"""
transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
all_results = []
for answer_idx, answer_img in enumerate(answer_imgs):
# Preprocess images
q_img = transform(question_img.convert('RGB')).unsqueeze(0).to(self.device)
a_img = transform(answer_img.convert('RGB')).unsqueeze(0).to(self.device)
# Extract features
q_features = self.extract_handcrafted_features(np.array(question_img))
a_features = self.extract_handcrafted_features(np.array(answer_img))
features = np.concatenate([q_features, a_features])
features_tensor = torch.tensor(features, dtype=torch.float32).unsqueeze(0).to(self.device)
# Get predictions from all models
model_predictions = {}
distances = []
confidences = []
for name, model in self.models.items():
model.eval()
with torch.no_grad():
if name == 'dualpath':
out1, out2 = model(q_img, a_img, features_tensor)
else:
out1, out2 = model(q_img, a_img)
dist = F.pairwise_distance(out1, out2)
confidence = max(0, 100 * (1 - dist.item()))
model_predictions[name] = {
'distance': float(dist.item()),
'confidence': float(confidence),
'is_match': bool(dist.item() < threshold)
}
distances.append(dist.item())
confidences.append(confidence)
# Weighted average
weighted_distance = sum(w * d for w, d in zip(self.weights, distances))
weighted_confidence = sum(w * c for w, c in zip(self.weights, confidences))
is_match = weighted_distance < threshold
answer_result = {
'answer_index': answer_idx,
'model_predictions': model_predictions,
'ensemble_distance': float(weighted_distance),
'ensemble_confidence': float(weighted_confidence),
'ensemble_match': bool(is_match),
'final_decision': is_match
}
all_results.append(answer_result)
return all_results
# ==================== INITIALIZE MODEL ====================
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"πŸ”§ Using device: {device}")
ensemble_model = EnsembleSiamese(device=device)
# ==================== GRADIO INTERFACE ====================
def format_detailed_results(results):
"""Format results with detailed model breakdown"""
output = ""
# Find best match
best_match = max(results, key=lambda x: x['ensemble_confidence'])
best_index = best_match['answer_index']
output += "🎯 **FINAL PREDICTION RESULTS** 🎯\n\n"
output += f"**Best Match: Answer {best_index + 1}** \n"
output += f"**Overall Confidence: {best_match['ensemble_confidence']:.2f}%** \n"
output += f"**Distance: {best_match['ensemble_distance']:.4f}** \n"
output += f"**Match: {'βœ… YES' if best_match['final_decision'] else '❌ NO'}** \n\n"
output += "---\n\n"
output += "**πŸ“Š DETAILED MODEL BREAKDOWN:**\n\n"
for result in results:
output += f"## **Answer {result['answer_index'] + 1}**\n"
output += f"**Ensemble:** {result['ensemble_confidence']:.2f}% | Distance: {result['ensemble_distance']:.4f} | {'βœ… MATCH' if result['final_decision'] else '❌ NO MATCH'}\n\n"
for model_name, prediction in result['model_predictions'].items():
emoji = "🟒" if prediction['is_match'] else "πŸ”΄"
output += f" - **{model_name.upper()}:** {emoji} {prediction['confidence']:.2f}% | Distance: {prediction['distance']:.4f}\n"
output += "\n"
# Model agreement analysis
output += "---\n\n"
output += "**🀝 MODEL AGREEMENT ANALYSIS:**\n\n"
for result in results:
matches = sum(1 for pred in result['model_predictions'].values() if pred['is_match'])
total_models = len(result['model_predictions'])
agreement = (matches / total_models) * 100
output += f"**Answer {result['answer_index'] + 1}:** {matches}/{total_models} models agree ({agreement:.1f}% consensus)\n"
return output
def predict_captcha_detailed(question_image, *answer_images):
"""Gradio prediction function with detailed output"""
# Filter out None images
answer_imgs = [img for img in answer_images if img is not None]
if not question_image or len(answer_imgs) == 0:
return "❌ Please upload both question and answer images"
try:
print(f"πŸ” Processing: 1 question + {len(answer_imgs)} answers")
# Get detailed predictions
results = ensemble_model.predict_detailed(question_image, answer_imgs)
# Format output
output = format_detailed_results(results)
# Add technical details
output += "\n---\n\n"
output += "**βš™οΈ TECHNICAL DETAILS:**\n\n"
output += f"- **Threshold:** 0.312 (optimized during training)\n"
output += f"- **Models:** DualPath (CNN + Handcrafted), ResNet50, EfficientNet-B3\n"
output += f"- **Ensemble Weights:** DualPath(34%), ResNet50(33%), EfficientNet(33%)\n"
output += f"- **Training Accuracy:** 98.67%\n"
output += f"- **Device:** {device.upper()}\n"
return output
except Exception as e:
return f"❌ Error during prediction: {str(e)}"
# ==================== FASTAPI SETUP ====================
app = FastAPI(title="CAPTCHA Solver API", version="1.0")
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class Base64PredictionRequest(BaseModel):
question_base64: str
answers_base64: list[str]
def base64_to_image(base64_string):
"""Convert base64 string to PIL Image"""
try:
# Remove data URL prefix if present
if ',' in base64_string:
base64_string = base64_string.split(',')[1]
image_data = base64.b64decode(base64_string)
return Image.open(BytesIO(image_data))
except Exception as e:
print(f"Error decoding base64: {e}")
return None
@app.post("/api/predict")
async def api_predict_endpoint(request: Base64PredictionRequest):
"""API endpoint for userscript with base64 images"""
try:
print(f"πŸ“₯ Received API request: {len(request.answers_base64)} answers")
# Convert base64 to images
question_img = base64_to_image(request.question_base64)
if not question_img:
return {"success": False, "error": "Failed to decode question image"}
answer_imgs = []
for i, base64_str in enumerate(request.answers_base64):
img = base64_to_image(base64_str)
if img:
answer_imgs.append(img)
print(f"βœ… Decoded answer {i+1}")
else:
print(f"❌ Failed to decode answer {i+1}")
# Use fallback image
answer_imgs.append(Image.new('RGB', (100, 100), color='white'))
if len(answer_imgs) == 0:
return {"success": False, "error": "No answer images could be decoded"}
# Make prediction
results = ensemble_model.predict_detailed(question_img, answer_imgs)
# Find best match
best_index = np.argmax([r['ensemble_confidence'] for r in results])
best_result = results[best_index]
# Prepare response
response_data = {
'success': True,
'predictions': [
{
'answer_index': r['answer_index'],
'ensemble_confidence': r['ensemble_confidence'],
'ensemble_distance': r['ensemble_distance'],
'ensemble_match': r['ensemble_match'],
'model_predictions': r['model_predictions']
} for r in results
],
'best_match': int(best_index),
'best_confidence': float(best_result['ensemble_confidence']),
'best_distance': float(best_result['ensemble_distance'])
}
print(f"βœ… API Prediction complete. Best match: {best_index} with {best_result['ensemble_confidence']:.2f}% confidence")
return response_data
except Exception as e:
print(f"❌ API error: {str(e)}")
return {"success": False, "error": str(e)}
@app.get("/")
async def root():
return {"message": "CAPTCHA Solver API is running!", "version": "1.0", "accuracy": "98.67%"}
@app.get("/health")
async def health_check():
return {"status": "healthy", "models_loaded": len(ensemble_model.models)}
# ==================== GRADIO UI ====================
with gr.Blocks(title="CAPTCHA Solver - Ensemble AI", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# πŸ” CAPTCHA Solver - Ensemble Siamese Network
### **Achieved 98.67% Accuracy during Training**
This system uses an ensemble of three advanced neural networks to solve CAPTCHA challenges:
- **DualPath Siamese** (CNN + Handcrafted Features)
- **ResNet50** (Deep Residual Network)
- **EfficientNet-B3** (State-of-the-art Efficiency)
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### πŸ“Έ Upload Images")
question = gr.Image(label="Question Image", type="pil", height=200)
gr.Markdown("### 🎯 Answer Images")
with gr.Row():
answer1 = gr.Image(label="Answer 1", type="pil", height=150)
answer2 = gr.Image(label="Answer 2", type="pil", height=150)
with gr.Row():
answer3 = gr.Image(label="Answer 3", type="pil", height=150)
answer4 = gr.Image(label="Answer 4", type="pil", height=150)
with gr.Row():
answer5 = gr.Image(label="Answer 5", type="pil", height=150)
predict_btn = gr.Button("πŸš€ Analyze CAPTCHA", variant="primary", size="lg")
gr.Markdown("""
### πŸ’‘ Usage Instructions:
1. Upload the **question image** (CAPTCHA challenge)
2. Upload **answer images** (options to choose from)
3. Click **Analyze CAPTCHA** to get detailed predictions
4. The system will show which answer matches best with confidence scores
""")
with gr.Column(scale=2):
gr.Markdown("### πŸ“Š Prediction Results")
output = gr.Markdown(
label="Detailed Analysis",
value="πŸ‘† Upload images and click 'Analyze CAPTCHA' to see predictions here..."
)
# Examples section
gr.Markdown("---")
gr.Markdown("### πŸš€ API Endpoint for Browser Automation")
gr.Markdown("""
**For userscript automation, use this endpoint:**
```
POST https://huijio-zeracap2.hf.space/api/predict
```
**Request Body:**
```json
{
"question_base64": "data:image/jpeg;base64,...",
"answers_base64": ["data:image/jpeg;base64,...", ...]
}
```
""")
# Connect the prediction function
predict_btn.click(
fn=predict_captcha_detailed,
inputs=[question, answer1, answer2, answer3, answer4, answer5],
outputs=output
)
# ==================== RUN APPLICATION ====================
if __name__ == "__main__":
# For Hugging Face Spaces
demo.launch(share=True, server_name="0.0.0.0", server_port=7860)