Spaces:
Build error
Build error
| import os | |
| import json | |
| import torch | |
| import clip | |
| import faiss | |
| import numpy as np | |
| from PIL import Image | |
| import gradio as gr | |
| import openai | |
| import requests | |
| import sqlite3 | |
| from tqdm import tqdm | |
| from io import BytesIO | |
| from datetime import datetime | |
| from pathlib import Path | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # π§ STEP 1: LOAD CLIP MODEL | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| model, preprocess = clip.load("ViT-B/32", device=device) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # π STEP 2: PATH CONFIGURATION | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Default paths for Hugging Face Spaces | |
| HF_SPACE_PATH = os.getenv("HF_SPACE_PATH", ".") | |
| DEFAULT_JSON_PATH = os.path.join(HF_SPACE_PATH, "profiles.json") | |
| DEFAULT_DB_PATH = os.path.join(HF_SPACE_PATH, "tinder_profiles.db") | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # ποΈ STEP 3: DATABASE SETUP | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def setup_database(db_path=DEFAULT_DB_PATH): | |
| """Initialize SQLite database with required tables""" | |
| conn = sqlite3.connect(db_path) | |
| cursor = conn.cursor() | |
| # Create tables if they don't exist | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS profiles ( | |
| id TEXT PRIMARY KEY, | |
| name TEXT, | |
| age INTEGER, | |
| bio TEXT, | |
| added_date TEXT | |
| ) | |
| ''') | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS photos ( | |
| photo_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| profile_id TEXT, | |
| url TEXT UNIQUE, | |
| embedding BLOB, | |
| FOREIGN KEY (profile_id) REFERENCES profiles(id) | |
| ) | |
| ''') | |
| conn.commit() | |
| conn.close() | |
| print(f"β Database initialized at {db_path}") | |
| return db_path | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # π¦ STEP 4: PROFILE DATA MANAGEMENT | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_profile_data(json_file_path=None, json_data=None): | |
| """Load profile data either from a file or directly from JSON data""" | |
| if json_file_path and os.path.exists(json_file_path): | |
| with open(json_file_path, 'r') as f: | |
| profiles = json.load(f) | |
| elif json_data: | |
| profiles = json_data | |
| else: | |
| # Default to profiles.json in the Hugging Face space | |
| if os.path.exists(DEFAULT_JSON_PATH): | |
| with open(DEFAULT_JSON_PATH, 'r') as f: | |
| profiles = json.load(f) | |
| else: | |
| # Sample data structure as fallback | |
| profiles = [ | |
| { | |
| "Id": "sample-id", | |
| "Name": "Sample Profile", | |
| "Age": 25, | |
| "Bio": "Sample bio", | |
| "Photos": [ | |
| "https://example.com/sample.jpg" | |
| ] | |
| } | |
| ] | |
| return profiles | |
| def store_profiles_in_db(profiles, db_path=DEFAULT_DB_PATH): | |
| """Store profiles in the SQLite database""" | |
| conn = sqlite3.connect(db_path) | |
| cursor = conn.cursor() | |
| today = datetime.now().strftime("%Y-%m-%d") | |
| new_profiles = 0 | |
| new_photos = 0 | |
| for profile in tqdm(profiles, desc="Storing profiles"): | |
| profile_id = profile.get("Id", str(hash(profile.get("Name", "") + str(profile.get("Age", 0))))) | |
| name = profile.get("Name", "Unknown") | |
| age = profile.get("Age", 0) | |
| bio = profile.get("Bio", "") | |
| # Check if profile exists | |
| cursor.execute("SELECT id FROM profiles WHERE id=?", (profile_id,)) | |
| exists = cursor.fetchone() | |
| if not exists: | |
| cursor.execute( | |
| "INSERT INTO profiles (id, name, age, bio, added_date) VALUES (?, ?, ?, ?, ?)", | |
| (profile_id, name, age, bio, today) | |
| ) | |
| new_profiles += 1 | |
| # Add photos | |
| for photo_url in profile.get("Photos", []): | |
| cursor.execute("SELECT photo_id FROM photos WHERE url=?", (photo_url,)) | |
| photo_exists = cursor.fetchone() | |
| if not photo_exists: | |
| cursor.execute( | |
| "INSERT INTO photos (profile_id, url, embedding) VALUES (?, ?, NULL)", | |
| (profile_id, photo_url) | |
| ) | |
| new_photos += 1 | |
| conn.commit() | |
| conn.close() | |
| return new_profiles, new_photos | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # πΌοΈ STEP 5: IMAGE PROCESSING & EMBEDDINGS | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def download_and_process_image(url): | |
| """Download image from URL and return PIL Image""" | |
| try: | |
| response = requests.get(url, timeout=10) | |
| response.raise_for_status() | |
| img = Image.open(BytesIO(response.content)).convert("RGB") | |
| return img | |
| except Exception as e: | |
| print(f"β οΈ Error downloading image from {url}: {e}") | |
| return None | |
| def generate_and_store_embeddings(db_path=DEFAULT_DB_PATH, max_images=1000): | |
| """Generate CLIP embeddings for profile images and store in database""" | |
| conn = sqlite3.connect(db_path) | |
| cursor = conn.cursor() | |
| # Get photos without embeddings | |
| cursor.execute(""" | |
| SELECT p.photo_id, p.url, pr.id, pr.name, pr.age, pr.bio | |
| FROM photos p | |
| JOIN profiles pr ON p.profile_id = pr.id | |
| WHERE p.embedding IS NULL | |
| LIMIT ? | |
| """, (max_images,)) | |
| photos = cursor.fetchall() | |
| processed = 0 | |
| errors = 0 | |
| print(f"π§ Generating CLIP embeddings for {len(photos)} new images...") | |
| for photo in tqdm(photos, desc="Processing images"): | |
| photo_id, url, profile_id, name, age, bio = photo | |
| try: | |
| img = download_and_process_image(url) | |
| if img is None: | |
| errors += 1 | |
| continue | |
| img_input = preprocess(img).unsqueeze(0).to(device) | |
| with torch.no_grad(): | |
| emb = model.encode_image(img_input).cpu().numpy().flatten() | |
| emb /= np.linalg.norm(emb) # Normalize | |
| # Store the embedding as a binary blob | |
| cursor.execute( | |
| "UPDATE photos SET embedding = ? WHERE photo_id = ?", | |
| (emb.tobytes(), photo_id) | |
| ) | |
| processed += 1 | |
| # Commit every 10 images to avoid losing work | |
| if processed % 10 == 0: | |
| conn.commit() | |
| except Exception as e: | |
| print(f"β οΈ Error with {url}: {e}") | |
| errors += 1 | |
| conn.commit() | |
| conn.close() | |
| print(f"β Finished embedding {processed} images with {errors} errors.") | |
| return processed, errors | |
| def load_embeddings_from_db(db_path=DEFAULT_DB_PATH): | |
| """Load all embeddings, urls and profile info from the database""" | |
| conn = sqlite3.connect(db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| SELECT p.embedding, p.url, pr.id, pr.name, pr.age, pr.bio | |
| FROM photos p | |
| JOIN profiles pr ON p.profile_id = pr.id | |
| WHERE p.embedding IS NOT NULL | |
| """) | |
| result = cursor.fetchall() | |
| conn.close() | |
| embeddings = [] | |
| image_urls = [] | |
| profile_info = [] | |
| for row in result: | |
| embedding_bytes, url, profile_id, name, age, bio = row | |
| if embedding_bytes: # Ensure we have embedding data | |
| # Convert bytes back to numpy array | |
| emb = np.frombuffer(embedding_bytes, dtype=np.float32) | |
| embeddings.append(emb) | |
| image_urls.append(url) | |
| profile_info.append({ | |
| "Id": profile_id, | |
| "Name": name, | |
| "Age": age, | |
| "Bio": bio | |
| }) | |
| if embeddings: | |
| embeddings_array = np.vstack(embeddings).astype("float32") | |
| else: | |
| embeddings_array = np.array([]).astype("float32") | |
| print(f"π Loaded {len(embeddings_array)} embeddings from database") | |
| return embeddings_array, image_urls, profile_info | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # β‘ STEP 6: BUILD FAISS INDEX | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_faiss_index(embeddings): | |
| """Build FAISS index from embeddings""" | |
| if len(embeddings) == 0: | |
| return None | |
| dimension = embeddings.shape[1] | |
| index = faiss.IndexFlatIP(dimension) | |
| index.add(embeddings) | |
| return index | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # π STEP 7: OPENAI API SETUP | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def init_openai(): | |
| openai.api_key = os.getenv("OPENAI_API_KEY") | |
| if not openai.api_key: | |
| print("β οΈ Warning: OPENAI_API_KEY not found. GPT-4 analysis will not be available.") | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # π STEP 8: SEARCH FUNCTIONALITY | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def search_similar_faces(user_image, index, image_urls, profile_info, top_k=20, min_score=0.80): | |
| """Search for similar faces using CLIP + FAISS with minimum score threshold""" | |
| if index is None: | |
| return [], [], 0, "No index available. Please load profile data first." | |
| try: | |
| user_image = user_image.convert("RGB") | |
| tensor = preprocess(user_image).unsqueeze(0).to(device) | |
| with torch.no_grad(): | |
| query_emb = model.encode_image(tensor).cpu().numpy().astype("float32") | |
| query_emb /= np.linalg.norm(query_emb) | |
| except Exception as e: | |
| return [], [], 0, f"Image preprocessing failed: {e}" | |
| # Search for more matches than we need (we'll filter by score) | |
| scores, indices = index.search(query_emb, top_k) | |
| scores, indices = scores.flatten(), indices.flatten() | |
| matching_images = [] | |
| match_details = [] | |
| for i in range(len(indices)): | |
| idx = indices[i] | |
| score = scores[i] | |
| # Only include matches with score >= min_score (0.80) | |
| if score < min_score: | |
| continue | |
| try: | |
| url = image_urls[idx] | |
| info = profile_info[idx] | |
| img = download_and_process_image(url) | |
| if img: | |
| matching_images.append(img) | |
| match_details.append({ | |
| "url": url, | |
| "score": score, | |
| "info": info | |
| }) | |
| except Exception as e: | |
| print(f"β οΈ Error processing match at index {idx}: {e}") | |
| # Calculate risk score based on high-quality matches only | |
| match_scores = [d["score"] for d in match_details] | |
| risk_score = min(100, int(np.mean(match_scores) * 100)) if match_scores else 0 | |
| return matching_images, match_details, risk_score | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # π§ STEP 9: GPT-4 ANALYSIS | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def generate_gpt4_analysis(match_details): | |
| """Generate fun analysis using GPT-4""" | |
| if not openai.api_key: | |
| return "GPT-4 analysis not available (API key not configured)" | |
| if not match_details: | |
| return "No high-similarity matches found for analysis" | |
| try: | |
| names = [f"{d['info']['Name']} ({d['info']['Age']})" for d in match_details] | |
| scores = [f"{d['score']:.2f}" for d in match_details] | |
| prompt = ( | |
| f"The uploaded face matches closely with: {', '.join(names)} with similarity scores: {', '.join(scores)}. " | |
| f"These are very high similarity matches (0.80-1.00 range). " | |
| f"Based on this, should the user be suspicious? " | |
| f"Analyze like a funny but smart AI dating detective. Keep it concise." | |
| ) | |
| response = openai.chat.completions.create( | |
| model="gpt-4", | |
| messages=[ | |
| {"role": "system", "content": "You're a playful but intelligent AI face-matching analyst."}, | |
| {"role": "user", "content": prompt} | |
| ] | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| return f"(OpenAI error): {e}" | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # ποΈ STEP 10: APPLICATION CLASS | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| class TinderScanner: | |
| def __init__(self): | |
| self.index = None | |
| self.image_urls = [] | |
| self.profile_info = [] | |
| self.profiles = [] | |
| self.db_path = None | |
| # Setup database | |
| self.db_path = setup_database() | |
| # Initialize OpenAI | |
| init_openai() | |
| def init_from_database(self): | |
| """Initialize scanner from database content""" | |
| try: | |
| # Load embeddings from database | |
| embeddings, self.image_urls, self.profile_info = load_embeddings_from_db(self.db_path) | |
| if len(embeddings) > 0: | |
| self.index = build_faiss_index(embeddings) | |
| return f"β Successfully loaded {len(self.image_urls)} photos from database" | |
| else: | |
| return "β οΈ No embeddings found in database. Upload profile data first." | |
| except Exception as e: | |
| return f"β Error loading from database: {e}" | |
| def load_data(self, json_text=None, json_file=None): | |
| """Load profile data and build index""" | |
| try: | |
| # Load profiles from JSON | |
| if json_text: | |
| json_data = json.loads(json_text) | |
| self.profiles = load_profile_data(json_data=json_data) | |
| elif json_file: | |
| self.profiles = load_profile_data(json_file_path=json_file) | |
| else: | |
| # Try to load from default location | |
| self.profiles = load_profile_data(json_file_path=DEFAULT_JSON_PATH) | |
| if not self.profiles: | |
| return "β οΈ No profile data found" | |
| # Store profiles in database | |
| new_profiles, new_photos = store_profiles_in_db(self.profiles, self.db_path) | |
| # Generate embeddings for new photos | |
| processed, errors = generate_and_store_embeddings(self.db_path) | |
| # Load all embeddings (including newly processed ones) | |
| embeddings, self.image_urls, self.profile_info = load_embeddings_from_db(self.db_path) | |
| if len(embeddings) > 0: | |
| self.index = build_faiss_index(embeddings) | |
| return (f"β Database updated: {new_profiles} new profiles, {new_photos} new photos, " | |
| f"{processed} photos processed. Total: {len(self.image_urls)} photos indexed.") | |
| else: | |
| return "β οΈ No valid images found in the provided data" | |
| except Exception as e: | |
| return f"β Error loading data: {e}" | |
| def scan_face(self, user_image, min_score=0.80): | |
| """Process a user image and find matches with minimum score""" | |
| # Try to initialize from database if not already | |
| if not self.index: | |
| init_result = self.init_from_database() | |
| if "Successfully" not in init_result: | |
| return [], "", "", "Please load profile data first by providing JSON input" | |
| if user_image is None: | |
| return [], "", "", "Please upload a face image" | |
| images, match_details, risk_score = search_similar_faces( | |
| user_image, self.index, self.image_urls, self.profile_info, | |
| min_score=min_score | |
| ) | |
| if not match_details: | |
| return [], "", "0/100", "No matches with similarity score β₯ 0.80 found" | |
| # Format match captions | |
| captions = [] | |
| for detail in match_details: | |
| info = detail["info"] | |
| captions.append(f"{info['Name']} ({info['Age']}) - Score: {detail['score']:.2f}") | |
| # Generate GPT-4 analysis | |
| explanation = generate_gpt4_analysis(match_details) | |
| return images, "\n".join(captions), f"{risk_score}/100", explanation | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # π₯οΈ STEP 11: GRADIO UI | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def create_ui(): | |
| scanner = TinderScanner() | |
| with gr.Blocks(title="Tinder Scanner Pro") as demo: | |
| gr.Markdown("# π Tinder Scanner Pro β High-Similarity Face Matcher") | |
| gr.Markdown("Scan a face image to find high-similarity matches (0.80-1.00) in Tinder profiles.") | |
| with gr.Tabs(): | |
| with gr.TabItem("Setup Data"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Load from profiles.json (auto)") | |
| auto_load_btn = gr.Button("Load from profiles.json", variant="primary") | |
| gr.Markdown("### OR: Paste JSON Data") | |
| json_input = gr.Textbox( | |
| label="JSON Profile Data", | |
| placeholder='Paste JSON data here. Format: [{"Id": "...", "Name": "...", "Age": 25, "Photos": ["url1", "url2"]}]', | |
| lines=10 | |
| ) | |
| manual_load_btn = gr.Button("Load Pasted Data", variant="secondary") | |
| data_status = gr.Textbox(label="Status") | |
| auto_load_btn.click( | |
| fn=lambda: scanner.load_data(), | |
| outputs=[data_status] | |
| ) | |
| manual_load_btn.click( | |
| fn=scanner.load_data, | |
| inputs=[json_input], | |
| outputs=[data_status] | |
| ) | |
| with gr.TabItem("Scan Face"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| user_image = gr.Image(type="pil", label="Upload a Face Image") | |
| scan_btn = gr.Button("Run the Scan", variant="primary") | |
| with gr.Column(): | |
| matches_gallery = gr.Gallery(label="π High-Similarity Matches", columns=[3], height="auto") | |
| match_details = gr.Textbox(label="Match Details") | |
| risk_score = gr.Textbox(label="π¨ Similarity Score") | |
| gpt_analysis = gr.Textbox(label="π§ GPT-4 Analysis") | |
| scan_btn.click( | |
| fn=lambda img: scanner.scan_face(img, min_score=0.80), | |
| inputs=[user_image], | |
| outputs=[matches_gallery, match_details, risk_score, gpt_analysis] | |
| ) | |
| return demo | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # π STEP 12: MAIN EXECUTION | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| demo = create_ui() | |
| demo.launch() |