face-search / app.py
Hawk3388's picture
Update app.py
040ae6c verified
#!/usr/bin/env python3
"""
Automated Reverse Image Search using Google without API key and without Selenium.
Followed by sequential face recognition and matching.
"""
import os
import io
import time
import requests
import tempfile
import asyncio
import aiohttp
import numpy as np
import gradio as gr
from PIL import Image
from urllib.parse import urlparse
import matplotlib.pyplot as plt
from sklearn.metrics.pairwise import cosine_similarity
# Suppress TensorFlow logging unless in debug mode
import logging
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Suppress TensorFlow INFO and WARNING messages
logging.getLogger('tensorflow').setLevel(logging.ERROR)
# TensorFlow imports
from tensorflow.keras.applications import VGG16
from tensorflow.keras.applications.vgg16 import preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
# Face Recognition imports
import face_recognition
from loguru import logger
# PicImageSearch imports
from PicImageSearch import Bing, Tineye, Network
from PicImageSearch.model import BingResponse, TineyeResponse
# ----------------------------------------
# 1. Utility Functions
# ----------------------------------------
def show_progress_bar(current, total, step_name="Processing", width=50):
"""Shows a progress bar for the current step"""
if total == 0:
return
percent = current / total
filled = int(width * percent)
bar = '█' * filled + '░' * (width - filled)
# Clear line and show progress
print(f'\r{step_name}: [{bar}] {current}/{total} ({percent:.1%})', end='', flush=True)
if current == total:
print() # New line when complete
def animate_step(step_name, duration=1.0):
"""Shows an animated loading indicator for a step"""
chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"
end_time = time.time() + duration
i = 0
while time.time() < end_time:
print(f'\r{chars[i % len(chars)]} {step_name}...', end='', flush=True)
time.sleep(0.1)
i += 1
print(f'\r✅ {step_name} completed! ')
# ----------------------------------------
# 2. Face Recognition and VGG16-based Feature Extraction
# ----------------------------------------
def get_face_encoding_from_image(image_path, resize_max=800):
"""
Detects the first face in an image file and returns the 128D embedding, or None.
"""
try:
# Load image using face_recognition
img = face_recognition.load_image_file(image_path)
# Get face locations and encodings
locations = face_recognition.face_locations(img, model='cnn')
if not locations:
return None
encodings = face_recognition.face_encodings(img, locations)
if not encodings:
return None
return encodings[0]
except Exception:
return None
def get_face_encoding_from_pil_image(pil_image):
"""
Detects the first face in a PIL image and returns the 128D embedding, or None.
"""
try:
# Convert PIL image to numpy array for face_recognition
img_array = np.array(pil_image)
# Get face locations and encodings
locations = face_recognition.face_locations(img_array)
if not locations:
return None
encodings = face_recognition.face_encodings(img_array, locations)
if not encodings:
return None
return encodings[0]
except Exception:
return None
# Global VGG16 model
_vgg16_model = None
def get_vgg16_model(debug=False):
"""Loads the VGG16 model once"""
global _vgg16_model
if _vgg16_model is None:
if debug:
print("Loading VGG16 model...")
_vgg16_model = VGG16(weights="imagenet", include_top=False, pooling="avg")
if debug:
print("VGG16 model loaded!")
return _vgg16_model
def extract_vgg16_features(image: Image.Image, target_size=(224, 224), debug=False):
"""
Extracts VGG16 features from a PIL image
"""
try:
# Resize image to VGG16 input size
img = image.resize(target_size, Image.LANCZOS)
img_array = img_to_array(img)
img_array = np.expand_dims(img_array, axis=0)
img_array = preprocess_input(img_array)
# Extract features
model = get_vgg16_model(debug)
features = model.predict(img_array, verbose=0)
return features.flatten()
except Exception as e:
if debug:
print(f"Error in feature extraction: {e}")
return None
def get_query_features(input_image_path, debug=False):
"""
Loads the input image and extracts VGG16 features
"""
if not os.path.exists(input_image_path):
if debug:
print(f"[get_query_features] Input image not found: {input_image_path}")
return None
try:
img = Image.open(input_image_path).convert('RGB')
features = extract_vgg16_features(img, debug=debug)
img.close()
if features is None and debug:
print("Error in feature extraction from input image.")
return features
except Exception as e:
if debug:
print(f"[get_query_features] Error loading: {e}")
return None
# ----------------------------------------
# 3. PicImageSearch-based Reverse Image Search
# ----------------------------------------
def extract_urls_from_bing_response(resp: BingResponse) -> list:
"""Extracts image URLs and their source page URLs from Bing response"""
url_pairs = [] # List of (image_url, source_page_url) tuples
# Pages including
if resp.pages_including:
for item in resp.pages_including:
image_url = None
source_url = None
# Get image URL (prefer original over thumbnail)
if hasattr(item, 'image_url') and item.image_url:
image_url = item.image_url
elif hasattr(item, 'thumbnail') and item.thumbnail:
image_url = item.thumbnail
# Get source page URL
if hasattr(item, 'url') and item.url:
source_url = item.url
elif hasattr(item, 'source') and item.source:
source_url = item.source
if image_url and source_url:
url_pairs.append((image_url, source_url))
# Visual search
if resp.visual_search:
for item in resp.visual_search:
image_url = None
source_url = None
# Get image URL (prefer original over thumbnail)
if hasattr(item, 'image_url') and item.image_url:
image_url = item.image_url
elif hasattr(item, 'thumbnail') and item.thumbnail:
image_url = item.thumbnail
# Get source page URL
if hasattr(item, 'url') and item.url:
source_url = item.url
elif hasattr(item, 'source') and item.source:
source_url = item.source
if image_url and source_url:
url_pairs.append((image_url, source_url))
return url_pairs
async def bing_reverse_image_search(image_path, max_results=100, debug=False):
"""
Performs Bing Reverse Image Search with PicImageSearch
"""
if debug:
print(f"[BING-PIC] === Starting Bing Reverse Search ===")
print(f"[BING-PIC] Image Path: {image_path}")
print(f"[BING-PIC] Max Results: {max_results}")
try:
async with Network(proxies=None) as client:
bing = Bing(client=client)
resp = await bing.search(file=image_path)
if debug:
print(f"[BING-PIC] Search URL: {resp.url}")
url_pairs = extract_urls_from_bing_response(resp)
if debug:
print(f"[BING-PIC] Found URL pairs: {len(url_pairs)}")
return url_pairs[:max_results]
except Exception as e:
print(f"[BING-PIC] ✗ Error: {e}")
logger.exception("Error in bing_reverse_image_search:")
return []
async def bing_with_tineye_fallback_search(image_path, max_results=200, debug=False):
"""
Performs Bing Reverse Search first, then TinEye as fallback if no results
"""
if debug:
print(f"[SEARCH] === Starting Reverse Image Search ===")
print(f"[SEARCH] Strategy: Bing first, TinEye fallback")
# Try Bing Search first
if debug:
print("\n1. Starting Bing Search...")
bing_start = time.time()
bing_url_pairs = await bing_reverse_image_search(image_path, max_results, debug)
bing_time = time.time() - bing_start
if debug:
print(f"[SEARCH] Bing: {len(bing_url_pairs)} URL pairs in {bing_time:.2f}s")
# If Bing found results, use them
if bing_url_pairs:
if debug:
print(f"[SEARCH] ✅ Bing found {len(bing_url_pairs)} results, using Bing results")
print(f"[SEARCH] === Search Completed (Bing) ===")
return bing_url_pairs
# If Bing found nothing, try TinEye as fallback
if debug:
print(f"[SEARCH] ⚠️ Bing found no results, trying TinEye as fallback...")
print("\n2. Starting TinEye Search...")
tineye_start = time.time()
tineye_url_pairs = await tineye_reverse_image_search(image_path, max_results, debug)
tineye_time = time.time() - tineye_start
if debug:
print(f"[SEARCH] TinEye: {len(tineye_url_pairs)} URL pairs in {tineye_time:.2f}s")
if tineye_url_pairs:
if debug:
print(f"[SEARCH] ✅ TinEye found {len(tineye_url_pairs)} results, using TinEye results")
print(f"[SEARCH] === Search Completed (TinEye Fallback) ===")
return tineye_url_pairs
else:
if debug:
print(f"[SEARCH] ❌ Both Bing and TinEye found no results")
print(f"[SEARCH] === Search Completed (No Results) ===")
return []
def extract_urls_from_tineye_response(resp: TineyeResponse) -> list:
"""Extracts image URLs and their source page URLs from TinEye response"""
url_pairs = [] # List of (image_url, source_page_url) tuples
if resp and resp.raw:
for item in resp.raw:
image_url = None
source_url = None
# Get image URL
if hasattr(item, 'image_url') and item.image_url:
image_url = item.image_url
elif hasattr(item, 'thumbnail') and item.thumbnail:
image_url = item.thumbnail
# Get source page URL (backlink)
if hasattr(item, 'url') and item.url:
source_url = item.url
if image_url and source_url:
url_pairs.append((image_url, source_url))
return url_pairs
async def tineye_reverse_image_search(image_path, max_results=100, debug=False):
"""
Performs TinEye Reverse Image Search with PicImageSearch
"""
if debug:
print(f"[TINEYE] === Starting TinEye Reverse Search ===")
print(f"[TINEYE] Image Path: {image_path}")
print(f"[TINEYE] Max Results: {max_results}")
try:
async with Network(proxies=None) as client:
tineye = Tineye(client=client)
resp = await tineye.search(
file=image_path,
show_unavailable_domains=False,
domain="",
tags="",
sort="score",
order="desc",
)
if debug:
if resp and hasattr(resp, 'query_hash'):
print(f"[TINEYE] Query Hash: {resp.query_hash}")
if resp and hasattr(resp, 'total_pages'):
print(f"[TINEYE] Total Pages: {resp.total_pages}")
url_pairs = extract_urls_from_tineye_response(resp)
if debug:
print(f"[TINEYE] Found URL pairs: {len(url_pairs)}")
return url_pairs[:max_results]
except Exception as e:
print(f"[TINEYE] ✗ Error: {e}")
logger.exception("Error in tineye_reverse_image_search:")
return []
# ----------------------------------------
# 4. Two-stage processing: VGG16 + Face-Recognition
# ----------------------------------------
async def process_candidate_urls_two_stage(url_pairs, query_features, query_face_encoding, vgg16_threshold=0.85, top_k=20, debug=False):
"""
Two-stage approach:
1. VGG16-Filter: REMOVE images with >85% general similarity (too similar/identical)
2. Face-Recognition: Sort ALL filtered images by face similarity
Returns list [(face_similarity, source_page_url), ...] sorted (descending face similarity).
"""
stage1_candidates = []
stage2_matches = []
timeout = aiohttp.ClientTimeout(total=10)
connector = aiohttp.TCPConnector(limit_per_host=5)
if debug:
print(f"=== TWO-STAGE FILTER ===")
print(f"Stage 1: VGG16-Filter (REMOVES images with >{vgg16_threshold*100:.0f}% image similarity)")
print(f"Stage 2: Face-Recognition (ALL remaining faces sorted by similarity)")
print(f"Processing {len(url_pairs)} URL pairs...")
async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session: # STAGE 1: VGG16-Filter (REVERSE - removes too similar images)
if debug:
print(f"\n--- STAGE 1: VGG16-Filter (removes too similar images) ---")
total_pairs = len(url_pairs)
for i, (image_url, source_page_url) in enumerate(url_pairs):
if not debug:
show_progress_bar(i + 1, total_pairs, "Stage 1: VGG16 filtering")
try:
async with session.get(image_url) as resp:
if resp.status != 200:
continue
ctype = resp.headers.get("Content-Type", "")
if "image" not in ctype:
continue
data = await resp.read()
img = Image.open(io.BytesIO(data)).convert('RGB')
except Exception:
continue # Extract VGG16 features
features = extract_vgg16_features(img, debug=debug)
if features is None:
img.close()
continue
# Calculate VGG16 similarity
try:
vgg16_similarity = cosine_similarity([query_features], [features])[0][0]
except Exception:
img.close()
continue
# Stage 1 Filter: Only images UNDER VGG16 threshold (less similar)
if vgg16_similarity < vgg16_threshold:
stage1_candidates.append((vgg16_similarity, image_url, source_page_url, img))
if debug and len(stage1_candidates) % 10 == 0:
print(f" Stage 1: {len(stage1_candidates)} candidates kept (from {i+1} processed URL pairs)")
else:
img.close()
# These images are filtered out (too similar) # Monitoring
if debug and (i + 1) % 50 == 0:
print(f" Processed {i+1}/{len(url_pairs)} URL pairs, kept: {len(stage1_candidates)} (removed too similar)")
if debug:
print(f"✅ Stage 1 completed: {len(stage1_candidates)} candidates under {vgg16_threshold*100:.0f}% VGG16 similarity kept")
if not stage1_candidates:
if debug:
print("❌ All images were too similar to original (above VGG16 threshold)!")
return []
# ADDITIONAL STEP: VGG16 duplicate filtering between candidates
if debug:
print(f"\n--- ADDITIONAL: VGG16 duplicate filtering between candidates ---")
elif not debug:
animate_step("Removing duplicates", 0.5)
filtered_candidates = []
candidate_features = []
# Extract features of all candidates
for i, (vgg16_sim, image_url, source_page_url, img) in enumerate(stage1_candidates):
features = extract_vgg16_features(img, debug=debug)
if features is not None:
candidate_features.append(features)
filtered_candidates.append((vgg16_sim, image_url, source_page_url, img, features))
# Remove similar images among each other
unique_candidates = []
used_indices = set()
for i, (vgg16_sim, image_url, source_page_url, img, features) in enumerate(filtered_candidates):
if i in used_indices:
continue
is_unique = True
for j, (_, _, _, _, other_features) in enumerate(filtered_candidates[:i]):
if j in used_indices:
continue
try:
similarity = cosine_similarity([features], [other_features])[0][0]
if similarity >= vgg16_threshold: # Too similar to an already selected image
is_unique = False
break
except Exception:
continue
if is_unique:
unique_candidates.append((vgg16_sim, image_url, source_page_url, img))
else:
img.close() # Close similar image
used_indices.add(i)
if debug:
print(f"✅ Duplicate filtering: {len(stage1_candidates)}{len(unique_candidates)} unique candidates") # STAGE 2: Face-Recognition on ALL filtered candidates
if debug:
print(f"\n--- STAGE 2: Face-Recognition (compare all faces at once) ---")
# Extract all face encodings first
all_face_encodings = []
valid_candidates = []
total_candidates = len(unique_candidates)
for i, (vgg16_sim, image_url, source_page_url, img) in enumerate(unique_candidates):
if not debug:
show_progress_bar(i + 1, total_candidates, "Stage 2: Face extraction")
# Extract face encoding
face_encoding = get_face_encoding_from_pil_image(img)
img.close()
if face_encoding is not None:
all_face_encodings.append(face_encoding)
valid_candidates.append((vgg16_sim, image_url, source_page_url))
# Monitoring
if debug and (i + 1) % 10 == 0:
print(f" Face extraction: {i+1}/{len(unique_candidates)} processed, faces found: {len(all_face_encodings)}")
if not all_face_encodings:
if debug:
print("❌ No faces found in any candidate images!")
return []
# Compare ALL faces at once using the official face_recognition approach
if debug:
print(f"Comparing {len(all_face_encodings)} faces with query face...")
try:
# Use face_recognition.compare_faces() exactly like in your example
results = face_recognition.compare_faces(all_face_encodings, query_face_encoding)
# Also get distances for ranking the matches
face_distances = face_recognition.face_distance(all_face_encodings, query_face_encoding) # Collect only the matches (where results[i] == True)
for i, (is_match, face_distance) in enumerate(zip(results, face_distances)):
if is_match == True: # Like: if results[0] == True: print("It's a picture of me!")
face_similarity = 1 - face_distance # Convert distance to similarity
# Filter out faces that are not similar (under 50% similarity)
if face_similarity < 0.5:
if debug:
print(f" Filtering out face {i}: {face_similarity*100:.1f}% similarity (not similar)")
continue
_, image_url, source_page_url = valid_candidates[i]
stage2_matches.append((face_similarity, image_url, source_page_url))
if debug:
matches_found = len(stage2_matches)
total_faces = len(results)
matches_before_filter = len([r for r in results if r == True])
filtered_faces = matches_before_filter - matches_found
print(f"✅ Face comparison completed: {matches_before_filter}/{total_faces} faces match, {filtered_faces} filtered out (>50% similarity), {matches_found} kept!")
except Exception as e:
if debug:
print(f"❌ Error in face comparison: {e}")
return []
# Sort by face similarity (descending order)
stage2_matches.sort(key=lambda x: x[0], reverse=True)
# Remove duplicates (same source URLs)
seen_urls = set()
unique_matches = []
for face_similarity, image_url, source_url in stage2_matches:
if source_url not in seen_urls:
seen_urls.add(source_url)
unique_matches.append((face_similarity, image_url, source_url))
if debug and len(stage2_matches) != len(unique_matches):
print(f" 🔄 {len(stage2_matches) - len(unique_matches)} duplicates removed")
# Limit to top-K
if len(unique_matches) > top_k:
unique_matches = unique_matches[:top_k]
if debug:
print(f"✅ Stage 2 completed: {len(unique_matches)} unique faces found and sorted")
if unique_matches:
best_face_sim = unique_matches[0][0] * 100
worst_face_sim = unique_matches[-1][0] * 100
print(f" Best face similarity: {best_face_sim:.1f}%")
print(f" Worst face similarity: {worst_face_sim:.1f}%")
return unique_matches
# ----------------------------------------
# 5. Display of top matches
# ----------------------------------------
def show_top_matches(top_matches, debug=False):
"""
Downloads the top match URLs again and shows thumbnails with URLs.
"""
if not top_matches:
print("No matches to display.")
return
print(f"\n🖼️ Loading {len(top_matches)} images for thumbnail display...")
cols = min(len(top_matches), 5)
rows = (len(top_matches) + cols - 1)//cols
plt.figure(figsize=(4*cols, 4*rows))
successful_images = 0
failed_images = [] # Track failed image URLs
total_matches = len(top_matches)
for i, (similarity, image_url, source_url) in enumerate(top_matches):
if not debug:
show_progress_bar(i + 1, total_matches, "Loading thumbnails")
elif debug:
print(f" Loading image {i+1}/{len(top_matches)}: {image_url[:60]}...")
try:
resp = requests.get(image_url, timeout=10, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
if resp.status_code != 200:
if debug:
print(f" ❌ HTTP {resp.status_code}")
failed_images.append((similarity, image_url, source_url))
continue
img = Image.open(io.BytesIO(resp.content)).convert('RGB')
if debug:
print(f" ✅ Successfully loaded ({img.size})")
except requests.exceptions.Timeout:
if debug:
print(f" ❌ Timeout while loading")
failed_images.append((similarity, image_url, source_url))
continue
except requests.exceptions.RequestException as e:
if debug:
print(f" ❌ Network error: {e}")
failed_images.append((similarity, image_url, source_url))
continue
except Exception as e:
if debug:
print(f" ❌ Image error: {e}")
failed_images.append((similarity, image_url, source_url))
continue
img.thumbnail((200, 200), Image.LANCZOS)
ax = plt.subplot(rows, cols, i+1)
ax.imshow(img)
ax.axis('off')
# Show similarity and URL
similarity_percent = similarity * 100
# Shorten URL for better display (domain + path)
parsed_url = urlparse(source_url)
short_url = f"{parsed_url.netloc}"
if parsed_url.path:
path_parts = parsed_url.path.split('/')
if len(path_parts) > 1:
short_url += f"/.../{path_parts[-1]}"
# Display text below the image
ax.text(0.5, -0.15, f"{similarity_percent:.1f}%\n{short_url}",
transform=ax.transAxes, ha='center', va='top',
fontsize=8, wrap=True)
successful_images += 1
print(f"\n✅ {successful_images}/{len(top_matches)} images successfully loaded")
if successful_images > 0:
plt.tight_layout()
plt.show()
else:
print("❌ No images could be loaded!")
if debug:
print("Possible causes:")
print("- Images are no longer available")
print("- Servers block access")
print("- Network problems")
# Show URLs as text fallback when images can't be loaded
print("\n📋 Showing results as text list since images couldn't be loaded:")
print("=" * 80)
for idx, (similarity, image_url, source_url) in enumerate(top_matches, 1):
similarity_percent = similarity * 100
print(f"\n🏆 MATCH #{idx}:")
print(f" Face similarity: {similarity_percent:.1f}%")
print(f" Source page: {source_url}")
print(f" Image URL: {image_url}")
print(f" {'-' * 60}")
print("=" * 80)
# ----------------------------------------
# Gradio Web Interface for Hugging Face Spaces
# ----------------------------------------
def format_gallery_results(top_matches):
"""
Format the results for Gradio Gallery output: list of [image, caption] pairs.
"""
if not top_matches:
# Return None for empty results to avoid Gallery processing errors
return None
gallery = []
for idx, (face_similarity, image_url, source_url) in enumerate(top_matches, 1):
face_similarity_percent = face_similarity * 100
caption = f"#{idx}{face_similarity_percent:.1f}% | {source_url}"
gallery.append([image_url, caption])
return gallery
async def face_search_gradio(image, max_results=100, top_k=5, vgg16_threshold=0.85, debug=False):
"""
Main Gradio function: takes a PIL image, runs the search, returns results as gallery and status message.
"""
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
image.save(tmp.name)
input_path = tmp.name
# Step 1: Extract features
query_features = get_query_features(input_path, debug=debug)
if query_features is None:
# Return None and error message when VGG16 feature extraction fails
return None, "❌ Error: Could not extract VGG16 features from input image."
query_face_encoding = get_face_encoding_from_image(input_path)
if query_face_encoding is None:
# Return None and error message when no face detected
return None, "❌ Error: No face detected in input image."
# Step 2: Reverse Image Search
url_pairs = await bing_with_tineye_fallback_search(input_path, max_results=max_results, debug=debug)
if not url_pairs:
# Return None and message when no URLs found
return None, "⚠️ No images found via reverse image search (tried Bing and TinEye)."
# Step 3: Two-stage matching
top_matches = await process_candidate_urls_two_stage(
url_pairs,
query_features,
query_face_encoding,
vgg16_threshold=vgg16_threshold,
top_k=top_k,
debug=debug
)
# Step 4: Format results
gallery = format_gallery_results(top_matches)
# Additional error handling: if no matches found after processing
if not gallery:
return None, "⚠️ No matching faces found above similarity thresholds."
return gallery, f"✅ Found {len(top_matches)} matching faces!"
def update_status_visibility(status_text):
"""Make status field visible only when there's a message"""
if status_text and status_text.strip():
return gr.update(value=status_text, visible=True)
else:
return gr.update(value="", visible=False)
def gradio_sync_wrapper(image):
"""
Wrapper to run async Gradio function in sync context, with fixed parameters.
"""
# Fixed values for the most important parameters
max_results = 100
top_k = 10
vgg16_threshold = 0.85
debug = False
return asyncio.run(face_search_gradio(image, max_results, top_k, vgg16_threshold, debug))
def hide_status():
"""Hide status message at start of search."""
return gr.update(visible=False, value="")
with gr.Blocks() as demo:
gr.Markdown("""
# Face Search Tool Demo
<div style='margin-bottom: 1em;'>
<b>Upload a photo to find similar faces on the web (Bing/TinEye + Face Recognition).</b><br>
The top 10 results will be shown as a gallery with links. This might take a minute.
</div>
""")
image_input = gr.Image(type="pil", label="Upload an image", show_label=True)
submit_btn = gr.Button("🔍 Start Search", elem_id="search-btn")
gallery = gr.Gallery(label="Top 10 Matches", columns=5, height="auto", show_label=True)
# Status message below gallery (only shows when there's a message)
status_msg = gr.Textbox(label="Status", interactive=False, visible=False, show_label=False)
# Create a chain: first hide status, then run search, then show status if needed
submit_btn.click(
hide_status,
inputs=[],
outputs=[status_msg]
).then(
gradio_sync_wrapper,
inputs=[image_input],
outputs=[gallery, status_msg]
).then(
update_status_visibility,
inputs=[status_msg],
outputs=[status_msg]
)
demo.launch(share=True, server_name="0.0.0.0")