Spaces:
Build error
Build error
| #!/usr/bin/env python3 | |
| """ | |
| Automated Reverse Image Search using Google without API key and without Selenium. | |
| Followed by sequential face recognition and matching. | |
| """ | |
| import os | |
| import io | |
| import time | |
| import requests | |
| import tempfile | |
| import asyncio | |
| import aiohttp | |
| import numpy as np | |
| import gradio as gr | |
| from PIL import Image | |
| from urllib.parse import urlparse | |
| import matplotlib.pyplot as plt | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| # Suppress TensorFlow logging unless in debug mode | |
| import logging | |
| os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Suppress TensorFlow INFO and WARNING messages | |
| logging.getLogger('tensorflow').setLevel(logging.ERROR) | |
| # TensorFlow imports | |
| from tensorflow.keras.applications import VGG16 | |
| from tensorflow.keras.applications.vgg16 import preprocess_input | |
| from tensorflow.keras.preprocessing.image import img_to_array | |
| # Face Recognition imports | |
| import face_recognition | |
| from loguru import logger | |
| # PicImageSearch imports | |
| from PicImageSearch import Bing, Tineye, Network | |
| from PicImageSearch.model import BingResponse, TineyeResponse | |
| # ---------------------------------------- | |
| # 1. Utility Functions | |
| # ---------------------------------------- | |
| def show_progress_bar(current, total, step_name="Processing", width=50): | |
| """Shows a progress bar for the current step""" | |
| if total == 0: | |
| return | |
| percent = current / total | |
| filled = int(width * percent) | |
| bar = '█' * filled + '░' * (width - filled) | |
| # Clear line and show progress | |
| print(f'\r{step_name}: [{bar}] {current}/{total} ({percent:.1%})', end='', flush=True) | |
| if current == total: | |
| print() # New line when complete | |
| def animate_step(step_name, duration=1.0): | |
| """Shows an animated loading indicator for a step""" | |
| chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" | |
| end_time = time.time() + duration | |
| i = 0 | |
| while time.time() < end_time: | |
| print(f'\r{chars[i % len(chars)]} {step_name}...', end='', flush=True) | |
| time.sleep(0.1) | |
| i += 1 | |
| print(f'\r✅ {step_name} completed! ') | |
| # ---------------------------------------- | |
| # 2. Face Recognition and VGG16-based Feature Extraction | |
| # ---------------------------------------- | |
| def get_face_encoding_from_image(image_path, resize_max=800): | |
| """ | |
| Detects the first face in an image file and returns the 128D embedding, or None. | |
| """ | |
| try: | |
| # Load image using face_recognition | |
| img = face_recognition.load_image_file(image_path) | |
| # Get face locations and encodings | |
| locations = face_recognition.face_locations(img, model='cnn') | |
| if not locations: | |
| return None | |
| encodings = face_recognition.face_encodings(img, locations) | |
| if not encodings: | |
| return None | |
| return encodings[0] | |
| except Exception: | |
| return None | |
| def get_face_encoding_from_pil_image(pil_image): | |
| """ | |
| Detects the first face in a PIL image and returns the 128D embedding, or None. | |
| """ | |
| try: | |
| # Convert PIL image to numpy array for face_recognition | |
| img_array = np.array(pil_image) | |
| # Get face locations and encodings | |
| locations = face_recognition.face_locations(img_array) | |
| if not locations: | |
| return None | |
| encodings = face_recognition.face_encodings(img_array, locations) | |
| if not encodings: | |
| return None | |
| return encodings[0] | |
| except Exception: | |
| return None | |
| # Global VGG16 model | |
| _vgg16_model = None | |
| def get_vgg16_model(debug=False): | |
| """Loads the VGG16 model once""" | |
| global _vgg16_model | |
| if _vgg16_model is None: | |
| if debug: | |
| print("Loading VGG16 model...") | |
| _vgg16_model = VGG16(weights="imagenet", include_top=False, pooling="avg") | |
| if debug: | |
| print("VGG16 model loaded!") | |
| return _vgg16_model | |
| def extract_vgg16_features(image: Image.Image, target_size=(224, 224), debug=False): | |
| """ | |
| Extracts VGG16 features from a PIL image | |
| """ | |
| try: | |
| # Resize image to VGG16 input size | |
| img = image.resize(target_size, Image.LANCZOS) | |
| img_array = img_to_array(img) | |
| img_array = np.expand_dims(img_array, axis=0) | |
| img_array = preprocess_input(img_array) | |
| # Extract features | |
| model = get_vgg16_model(debug) | |
| features = model.predict(img_array, verbose=0) | |
| return features.flatten() | |
| except Exception as e: | |
| if debug: | |
| print(f"Error in feature extraction: {e}") | |
| return None | |
| def get_query_features(input_image_path, debug=False): | |
| """ | |
| Loads the input image and extracts VGG16 features | |
| """ | |
| if not os.path.exists(input_image_path): | |
| if debug: | |
| print(f"[get_query_features] Input image not found: {input_image_path}") | |
| return None | |
| try: | |
| img = Image.open(input_image_path).convert('RGB') | |
| features = extract_vgg16_features(img, debug=debug) | |
| img.close() | |
| if features is None and debug: | |
| print("Error in feature extraction from input image.") | |
| return features | |
| except Exception as e: | |
| if debug: | |
| print(f"[get_query_features] Error loading: {e}") | |
| return None | |
| # ---------------------------------------- | |
| # 3. PicImageSearch-based Reverse Image Search | |
| # ---------------------------------------- | |
| def extract_urls_from_bing_response(resp: BingResponse) -> list: | |
| """Extracts image URLs and their source page URLs from Bing response""" | |
| url_pairs = [] # List of (image_url, source_page_url) tuples | |
| # Pages including | |
| if resp.pages_including: | |
| for item in resp.pages_including: | |
| image_url = None | |
| source_url = None | |
| # Get image URL (prefer original over thumbnail) | |
| if hasattr(item, 'image_url') and item.image_url: | |
| image_url = item.image_url | |
| elif hasattr(item, 'thumbnail') and item.thumbnail: | |
| image_url = item.thumbnail | |
| # Get source page URL | |
| if hasattr(item, 'url') and item.url: | |
| source_url = item.url | |
| elif hasattr(item, 'source') and item.source: | |
| source_url = item.source | |
| if image_url and source_url: | |
| url_pairs.append((image_url, source_url)) | |
| # Visual search | |
| if resp.visual_search: | |
| for item in resp.visual_search: | |
| image_url = None | |
| source_url = None | |
| # Get image URL (prefer original over thumbnail) | |
| if hasattr(item, 'image_url') and item.image_url: | |
| image_url = item.image_url | |
| elif hasattr(item, 'thumbnail') and item.thumbnail: | |
| image_url = item.thumbnail | |
| # Get source page URL | |
| if hasattr(item, 'url') and item.url: | |
| source_url = item.url | |
| elif hasattr(item, 'source') and item.source: | |
| source_url = item.source | |
| if image_url and source_url: | |
| url_pairs.append((image_url, source_url)) | |
| return url_pairs | |
| async def bing_reverse_image_search(image_path, max_results=100, debug=False): | |
| """ | |
| Performs Bing Reverse Image Search with PicImageSearch | |
| """ | |
| if debug: | |
| print(f"[BING-PIC] === Starting Bing Reverse Search ===") | |
| print(f"[BING-PIC] Image Path: {image_path}") | |
| print(f"[BING-PIC] Max Results: {max_results}") | |
| try: | |
| async with Network(proxies=None) as client: | |
| bing = Bing(client=client) | |
| resp = await bing.search(file=image_path) | |
| if debug: | |
| print(f"[BING-PIC] Search URL: {resp.url}") | |
| url_pairs = extract_urls_from_bing_response(resp) | |
| if debug: | |
| print(f"[BING-PIC] Found URL pairs: {len(url_pairs)}") | |
| return url_pairs[:max_results] | |
| except Exception as e: | |
| print(f"[BING-PIC] ✗ Error: {e}") | |
| logger.exception("Error in bing_reverse_image_search:") | |
| return [] | |
| async def bing_with_tineye_fallback_search(image_path, max_results=200, debug=False): | |
| """ | |
| Performs Bing Reverse Search first, then TinEye as fallback if no results | |
| """ | |
| if debug: | |
| print(f"[SEARCH] === Starting Reverse Image Search ===") | |
| print(f"[SEARCH] Strategy: Bing first, TinEye fallback") | |
| # Try Bing Search first | |
| if debug: | |
| print("\n1. Starting Bing Search...") | |
| bing_start = time.time() | |
| bing_url_pairs = await bing_reverse_image_search(image_path, max_results, debug) | |
| bing_time = time.time() - bing_start | |
| if debug: | |
| print(f"[SEARCH] Bing: {len(bing_url_pairs)} URL pairs in {bing_time:.2f}s") | |
| # If Bing found results, use them | |
| if bing_url_pairs: | |
| if debug: | |
| print(f"[SEARCH] ✅ Bing found {len(bing_url_pairs)} results, using Bing results") | |
| print(f"[SEARCH] === Search Completed (Bing) ===") | |
| return bing_url_pairs | |
| # If Bing found nothing, try TinEye as fallback | |
| if debug: | |
| print(f"[SEARCH] ⚠️ Bing found no results, trying TinEye as fallback...") | |
| print("\n2. Starting TinEye Search...") | |
| tineye_start = time.time() | |
| tineye_url_pairs = await tineye_reverse_image_search(image_path, max_results, debug) | |
| tineye_time = time.time() - tineye_start | |
| if debug: | |
| print(f"[SEARCH] TinEye: {len(tineye_url_pairs)} URL pairs in {tineye_time:.2f}s") | |
| if tineye_url_pairs: | |
| if debug: | |
| print(f"[SEARCH] ✅ TinEye found {len(tineye_url_pairs)} results, using TinEye results") | |
| print(f"[SEARCH] === Search Completed (TinEye Fallback) ===") | |
| return tineye_url_pairs | |
| else: | |
| if debug: | |
| print(f"[SEARCH] ❌ Both Bing and TinEye found no results") | |
| print(f"[SEARCH] === Search Completed (No Results) ===") | |
| return [] | |
| def extract_urls_from_tineye_response(resp: TineyeResponse) -> list: | |
| """Extracts image URLs and their source page URLs from TinEye response""" | |
| url_pairs = [] # List of (image_url, source_page_url) tuples | |
| if resp and resp.raw: | |
| for item in resp.raw: | |
| image_url = None | |
| source_url = None | |
| # Get image URL | |
| if hasattr(item, 'image_url') and item.image_url: | |
| image_url = item.image_url | |
| elif hasattr(item, 'thumbnail') and item.thumbnail: | |
| image_url = item.thumbnail | |
| # Get source page URL (backlink) | |
| if hasattr(item, 'url') and item.url: | |
| source_url = item.url | |
| if image_url and source_url: | |
| url_pairs.append((image_url, source_url)) | |
| return url_pairs | |
| async def tineye_reverse_image_search(image_path, max_results=100, debug=False): | |
| """ | |
| Performs TinEye Reverse Image Search with PicImageSearch | |
| """ | |
| if debug: | |
| print(f"[TINEYE] === Starting TinEye Reverse Search ===") | |
| print(f"[TINEYE] Image Path: {image_path}") | |
| print(f"[TINEYE] Max Results: {max_results}") | |
| try: | |
| async with Network(proxies=None) as client: | |
| tineye = Tineye(client=client) | |
| resp = await tineye.search( | |
| file=image_path, | |
| show_unavailable_domains=False, | |
| domain="", | |
| tags="", | |
| sort="score", | |
| order="desc", | |
| ) | |
| if debug: | |
| if resp and hasattr(resp, 'query_hash'): | |
| print(f"[TINEYE] Query Hash: {resp.query_hash}") | |
| if resp and hasattr(resp, 'total_pages'): | |
| print(f"[TINEYE] Total Pages: {resp.total_pages}") | |
| url_pairs = extract_urls_from_tineye_response(resp) | |
| if debug: | |
| print(f"[TINEYE] Found URL pairs: {len(url_pairs)}") | |
| return url_pairs[:max_results] | |
| except Exception as e: | |
| print(f"[TINEYE] ✗ Error: {e}") | |
| logger.exception("Error in tineye_reverse_image_search:") | |
| return [] | |
| # ---------------------------------------- | |
| # 4. Two-stage processing: VGG16 + Face-Recognition | |
| # ---------------------------------------- | |
| async def process_candidate_urls_two_stage(url_pairs, query_features, query_face_encoding, vgg16_threshold=0.85, top_k=20, debug=False): | |
| """ | |
| Two-stage approach: | |
| 1. VGG16-Filter: REMOVE images with >85% general similarity (too similar/identical) | |
| 2. Face-Recognition: Sort ALL filtered images by face similarity | |
| Returns list [(face_similarity, source_page_url), ...] sorted (descending face similarity). | |
| """ | |
| stage1_candidates = [] | |
| stage2_matches = [] | |
| timeout = aiohttp.ClientTimeout(total=10) | |
| connector = aiohttp.TCPConnector(limit_per_host=5) | |
| if debug: | |
| print(f"=== TWO-STAGE FILTER ===") | |
| print(f"Stage 1: VGG16-Filter (REMOVES images with >{vgg16_threshold*100:.0f}% image similarity)") | |
| print(f"Stage 2: Face-Recognition (ALL remaining faces sorted by similarity)") | |
| print(f"Processing {len(url_pairs)} URL pairs...") | |
| async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session: # STAGE 1: VGG16-Filter (REVERSE - removes too similar images) | |
| if debug: | |
| print(f"\n--- STAGE 1: VGG16-Filter (removes too similar images) ---") | |
| total_pairs = len(url_pairs) | |
| for i, (image_url, source_page_url) in enumerate(url_pairs): | |
| if not debug: | |
| show_progress_bar(i + 1, total_pairs, "Stage 1: VGG16 filtering") | |
| try: | |
| async with session.get(image_url) as resp: | |
| if resp.status != 200: | |
| continue | |
| ctype = resp.headers.get("Content-Type", "") | |
| if "image" not in ctype: | |
| continue | |
| data = await resp.read() | |
| img = Image.open(io.BytesIO(data)).convert('RGB') | |
| except Exception: | |
| continue # Extract VGG16 features | |
| features = extract_vgg16_features(img, debug=debug) | |
| if features is None: | |
| img.close() | |
| continue | |
| # Calculate VGG16 similarity | |
| try: | |
| vgg16_similarity = cosine_similarity([query_features], [features])[0][0] | |
| except Exception: | |
| img.close() | |
| continue | |
| # Stage 1 Filter: Only images UNDER VGG16 threshold (less similar) | |
| if vgg16_similarity < vgg16_threshold: | |
| stage1_candidates.append((vgg16_similarity, image_url, source_page_url, img)) | |
| if debug and len(stage1_candidates) % 10 == 0: | |
| print(f" Stage 1: {len(stage1_candidates)} candidates kept (from {i+1} processed URL pairs)") | |
| else: | |
| img.close() | |
| # These images are filtered out (too similar) # Monitoring | |
| if debug and (i + 1) % 50 == 0: | |
| print(f" Processed {i+1}/{len(url_pairs)} URL pairs, kept: {len(stage1_candidates)} (removed too similar)") | |
| if debug: | |
| print(f"✅ Stage 1 completed: {len(stage1_candidates)} candidates under {vgg16_threshold*100:.0f}% VGG16 similarity kept") | |
| if not stage1_candidates: | |
| if debug: | |
| print("❌ All images were too similar to original (above VGG16 threshold)!") | |
| return [] | |
| # ADDITIONAL STEP: VGG16 duplicate filtering between candidates | |
| if debug: | |
| print(f"\n--- ADDITIONAL: VGG16 duplicate filtering between candidates ---") | |
| elif not debug: | |
| animate_step("Removing duplicates", 0.5) | |
| filtered_candidates = [] | |
| candidate_features = [] | |
| # Extract features of all candidates | |
| for i, (vgg16_sim, image_url, source_page_url, img) in enumerate(stage1_candidates): | |
| features = extract_vgg16_features(img, debug=debug) | |
| if features is not None: | |
| candidate_features.append(features) | |
| filtered_candidates.append((vgg16_sim, image_url, source_page_url, img, features)) | |
| # Remove similar images among each other | |
| unique_candidates = [] | |
| used_indices = set() | |
| for i, (vgg16_sim, image_url, source_page_url, img, features) in enumerate(filtered_candidates): | |
| if i in used_indices: | |
| continue | |
| is_unique = True | |
| for j, (_, _, _, _, other_features) in enumerate(filtered_candidates[:i]): | |
| if j in used_indices: | |
| continue | |
| try: | |
| similarity = cosine_similarity([features], [other_features])[0][0] | |
| if similarity >= vgg16_threshold: # Too similar to an already selected image | |
| is_unique = False | |
| break | |
| except Exception: | |
| continue | |
| if is_unique: | |
| unique_candidates.append((vgg16_sim, image_url, source_page_url, img)) | |
| else: | |
| img.close() # Close similar image | |
| used_indices.add(i) | |
| if debug: | |
| print(f"✅ Duplicate filtering: {len(stage1_candidates)} → {len(unique_candidates)} unique candidates") # STAGE 2: Face-Recognition on ALL filtered candidates | |
| if debug: | |
| print(f"\n--- STAGE 2: Face-Recognition (compare all faces at once) ---") | |
| # Extract all face encodings first | |
| all_face_encodings = [] | |
| valid_candidates = [] | |
| total_candidates = len(unique_candidates) | |
| for i, (vgg16_sim, image_url, source_page_url, img) in enumerate(unique_candidates): | |
| if not debug: | |
| show_progress_bar(i + 1, total_candidates, "Stage 2: Face extraction") | |
| # Extract face encoding | |
| face_encoding = get_face_encoding_from_pil_image(img) | |
| img.close() | |
| if face_encoding is not None: | |
| all_face_encodings.append(face_encoding) | |
| valid_candidates.append((vgg16_sim, image_url, source_page_url)) | |
| # Monitoring | |
| if debug and (i + 1) % 10 == 0: | |
| print(f" Face extraction: {i+1}/{len(unique_candidates)} processed, faces found: {len(all_face_encodings)}") | |
| if not all_face_encodings: | |
| if debug: | |
| print("❌ No faces found in any candidate images!") | |
| return [] | |
| # Compare ALL faces at once using the official face_recognition approach | |
| if debug: | |
| print(f"Comparing {len(all_face_encodings)} faces with query face...") | |
| try: | |
| # Use face_recognition.compare_faces() exactly like in your example | |
| results = face_recognition.compare_faces(all_face_encodings, query_face_encoding) | |
| # Also get distances for ranking the matches | |
| face_distances = face_recognition.face_distance(all_face_encodings, query_face_encoding) # Collect only the matches (where results[i] == True) | |
| for i, (is_match, face_distance) in enumerate(zip(results, face_distances)): | |
| if is_match == True: # Like: if results[0] == True: print("It's a picture of me!") | |
| face_similarity = 1 - face_distance # Convert distance to similarity | |
| # Filter out faces that are not similar (under 50% similarity) | |
| if face_similarity < 0.5: | |
| if debug: | |
| print(f" Filtering out face {i}: {face_similarity*100:.1f}% similarity (not similar)") | |
| continue | |
| _, image_url, source_page_url = valid_candidates[i] | |
| stage2_matches.append((face_similarity, image_url, source_page_url)) | |
| if debug: | |
| matches_found = len(stage2_matches) | |
| total_faces = len(results) | |
| matches_before_filter = len([r for r in results if r == True]) | |
| filtered_faces = matches_before_filter - matches_found | |
| print(f"✅ Face comparison completed: {matches_before_filter}/{total_faces} faces match, {filtered_faces} filtered out (>50% similarity), {matches_found} kept!") | |
| except Exception as e: | |
| if debug: | |
| print(f"❌ Error in face comparison: {e}") | |
| return [] | |
| # Sort by face similarity (descending order) | |
| stage2_matches.sort(key=lambda x: x[0], reverse=True) | |
| # Remove duplicates (same source URLs) | |
| seen_urls = set() | |
| unique_matches = [] | |
| for face_similarity, image_url, source_url in stage2_matches: | |
| if source_url not in seen_urls: | |
| seen_urls.add(source_url) | |
| unique_matches.append((face_similarity, image_url, source_url)) | |
| if debug and len(stage2_matches) != len(unique_matches): | |
| print(f" 🔄 {len(stage2_matches) - len(unique_matches)} duplicates removed") | |
| # Limit to top-K | |
| if len(unique_matches) > top_k: | |
| unique_matches = unique_matches[:top_k] | |
| if debug: | |
| print(f"✅ Stage 2 completed: {len(unique_matches)} unique faces found and sorted") | |
| if unique_matches: | |
| best_face_sim = unique_matches[0][0] * 100 | |
| worst_face_sim = unique_matches[-1][0] * 100 | |
| print(f" Best face similarity: {best_face_sim:.1f}%") | |
| print(f" Worst face similarity: {worst_face_sim:.1f}%") | |
| return unique_matches | |
| # ---------------------------------------- | |
| # 5. Display of top matches | |
| # ---------------------------------------- | |
| def show_top_matches(top_matches, debug=False): | |
| """ | |
| Downloads the top match URLs again and shows thumbnails with URLs. | |
| """ | |
| if not top_matches: | |
| print("No matches to display.") | |
| return | |
| print(f"\n🖼️ Loading {len(top_matches)} images for thumbnail display...") | |
| cols = min(len(top_matches), 5) | |
| rows = (len(top_matches) + cols - 1)//cols | |
| plt.figure(figsize=(4*cols, 4*rows)) | |
| successful_images = 0 | |
| failed_images = [] # Track failed image URLs | |
| total_matches = len(top_matches) | |
| for i, (similarity, image_url, source_url) in enumerate(top_matches): | |
| if not debug: | |
| show_progress_bar(i + 1, total_matches, "Loading thumbnails") | |
| elif debug: | |
| print(f" Loading image {i+1}/{len(top_matches)}: {image_url[:60]}...") | |
| try: | |
| resp = requests.get(image_url, timeout=10, headers={ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
| }) | |
| if resp.status_code != 200: | |
| if debug: | |
| print(f" ❌ HTTP {resp.status_code}") | |
| failed_images.append((similarity, image_url, source_url)) | |
| continue | |
| img = Image.open(io.BytesIO(resp.content)).convert('RGB') | |
| if debug: | |
| print(f" ✅ Successfully loaded ({img.size})") | |
| except requests.exceptions.Timeout: | |
| if debug: | |
| print(f" ❌ Timeout while loading") | |
| failed_images.append((similarity, image_url, source_url)) | |
| continue | |
| except requests.exceptions.RequestException as e: | |
| if debug: | |
| print(f" ❌ Network error: {e}") | |
| failed_images.append((similarity, image_url, source_url)) | |
| continue | |
| except Exception as e: | |
| if debug: | |
| print(f" ❌ Image error: {e}") | |
| failed_images.append((similarity, image_url, source_url)) | |
| continue | |
| img.thumbnail((200, 200), Image.LANCZOS) | |
| ax = plt.subplot(rows, cols, i+1) | |
| ax.imshow(img) | |
| ax.axis('off') | |
| # Show similarity and URL | |
| similarity_percent = similarity * 100 | |
| # Shorten URL for better display (domain + path) | |
| parsed_url = urlparse(source_url) | |
| short_url = f"{parsed_url.netloc}" | |
| if parsed_url.path: | |
| path_parts = parsed_url.path.split('/') | |
| if len(path_parts) > 1: | |
| short_url += f"/.../{path_parts[-1]}" | |
| # Display text below the image | |
| ax.text(0.5, -0.15, f"{similarity_percent:.1f}%\n{short_url}", | |
| transform=ax.transAxes, ha='center', va='top', | |
| fontsize=8, wrap=True) | |
| successful_images += 1 | |
| print(f"\n✅ {successful_images}/{len(top_matches)} images successfully loaded") | |
| if successful_images > 0: | |
| plt.tight_layout() | |
| plt.show() | |
| else: | |
| print("❌ No images could be loaded!") | |
| if debug: | |
| print("Possible causes:") | |
| print("- Images are no longer available") | |
| print("- Servers block access") | |
| print("- Network problems") | |
| # Show URLs as text fallback when images can't be loaded | |
| print("\n📋 Showing results as text list since images couldn't be loaded:") | |
| print("=" * 80) | |
| for idx, (similarity, image_url, source_url) in enumerate(top_matches, 1): | |
| similarity_percent = similarity * 100 | |
| print(f"\n🏆 MATCH #{idx}:") | |
| print(f" Face similarity: {similarity_percent:.1f}%") | |
| print(f" Source page: {source_url}") | |
| print(f" Image URL: {image_url}") | |
| print(f" {'-' * 60}") | |
| print("=" * 80) | |
| # ---------------------------------------- | |
| # Gradio Web Interface for Hugging Face Spaces | |
| # ---------------------------------------- | |
| def format_gallery_results(top_matches): | |
| """ | |
| Format the results for Gradio Gallery output: list of [image, caption] pairs. | |
| """ | |
| if not top_matches: | |
| # Return None for empty results to avoid Gallery processing errors | |
| return None | |
| gallery = [] | |
| for idx, (face_similarity, image_url, source_url) in enumerate(top_matches, 1): | |
| face_similarity_percent = face_similarity * 100 | |
| caption = f"#{idx} – {face_similarity_percent:.1f}% | {source_url}" | |
| gallery.append([image_url, caption]) | |
| return gallery | |
| async def face_search_gradio(image, max_results=100, top_k=5, vgg16_threshold=0.85, debug=False): | |
| """ | |
| Main Gradio function: takes a PIL image, runs the search, returns results as gallery and status message. | |
| """ | |
| with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: | |
| image.save(tmp.name) | |
| input_path = tmp.name | |
| # Step 1: Extract features | |
| query_features = get_query_features(input_path, debug=debug) | |
| if query_features is None: | |
| # Return None and error message when VGG16 feature extraction fails | |
| return None, "❌ Error: Could not extract VGG16 features from input image." | |
| query_face_encoding = get_face_encoding_from_image(input_path) | |
| if query_face_encoding is None: | |
| # Return None and error message when no face detected | |
| return None, "❌ Error: No face detected in input image." | |
| # Step 2: Reverse Image Search | |
| url_pairs = await bing_with_tineye_fallback_search(input_path, max_results=max_results, debug=debug) | |
| if not url_pairs: | |
| # Return None and message when no URLs found | |
| return None, "⚠️ No images found via reverse image search (tried Bing and TinEye)." | |
| # Step 3: Two-stage matching | |
| top_matches = await process_candidate_urls_two_stage( | |
| url_pairs, | |
| query_features, | |
| query_face_encoding, | |
| vgg16_threshold=vgg16_threshold, | |
| top_k=top_k, | |
| debug=debug | |
| ) | |
| # Step 4: Format results | |
| gallery = format_gallery_results(top_matches) | |
| # Additional error handling: if no matches found after processing | |
| if not gallery: | |
| return None, "⚠️ No matching faces found above similarity thresholds." | |
| return gallery, f"✅ Found {len(top_matches)} matching faces!" | |
| def update_status_visibility(status_text): | |
| """Make status field visible only when there's a message""" | |
| if status_text and status_text.strip(): | |
| return gr.update(value=status_text, visible=True) | |
| else: | |
| return gr.update(value="", visible=False) | |
| def gradio_sync_wrapper(image): | |
| """ | |
| Wrapper to run async Gradio function in sync context, with fixed parameters. | |
| """ | |
| # Fixed values for the most important parameters | |
| max_results = 100 | |
| top_k = 10 | |
| vgg16_threshold = 0.85 | |
| debug = False | |
| return asyncio.run(face_search_gradio(image, max_results, top_k, vgg16_threshold, debug)) | |
| def hide_status(): | |
| """Hide status message at start of search.""" | |
| return gr.update(visible=False, value="") | |
| with gr.Blocks() as demo: | |
| gr.Markdown(""" | |
| # Face Search Tool Demo | |
| <div style='margin-bottom: 1em;'> | |
| <b>Upload a photo to find similar faces on the web (Bing/TinEye + Face Recognition).</b><br> | |
| The top 10 results will be shown as a gallery with links. This might take a minute. | |
| </div> | |
| """) | |
| image_input = gr.Image(type="pil", label="Upload an image", show_label=True) | |
| submit_btn = gr.Button("🔍 Start Search", elem_id="search-btn") | |
| gallery = gr.Gallery(label="Top 10 Matches", columns=5, height="auto", show_label=True) | |
| # Status message below gallery (only shows when there's a message) | |
| status_msg = gr.Textbox(label="Status", interactive=False, visible=False, show_label=False) | |
| # Create a chain: first hide status, then run search, then show status if needed | |
| submit_btn.click( | |
| hide_status, | |
| inputs=[], | |
| outputs=[status_msg] | |
| ).then( | |
| gradio_sync_wrapper, | |
| inputs=[image_input], | |
| outputs=[gallery, status_msg] | |
| ).then( | |
| update_status_visibility, | |
| inputs=[status_msg], | |
| outputs=[status_msg] | |
| ) | |
| demo.launch(share=True, server_name="0.0.0.0") | |