Nishith312's picture
feat: Add endpoint to delete a collection and its associated biometric data from Firestore and SQLite.
0ba9d44
import os
import json
import io
import gc
from PIL import Image
from pillow_heif import register_heif_opener
register_heif_opener()
import firebase_admin
from firebase_admin import credentials, storage, firestore
import time
import logging
import sqlite3
import numpy as np
import pickle
from typing import List, Optional
from fastapi import FastAPI, File, UploadFile, BackgroundTasks, HTTPException, Form, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import uvicorn
import requests
import threading
from concurrent.futures import ThreadPoolExecutor
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request as GoogleRequest
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
# Setup Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Retry Decorator for Network Operations ---
def retry_on_failure(max_retries=3, delay=1, backoff=2):
"""Decorator to retry a function on failure with exponential backoff"""
def decorator(func):
def wrapper(*args, **kwargs):
retries = 0
current_delay = delay
while retries < max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
retries += 1
if retries >= max_retries:
logger.error(f"{func.__name__} failed after {max_retries} retries: {e}")
raise
logger.warning(f"{func.__name__} failed (attempt {retries}/{max_retries}), retrying in {current_delay}s: {e}")
time.sleep(current_delay)
current_delay *= backoff
return None
return wrapper
return decorator
@retry_on_failure(max_retries=3, delay=1, backoff=2)
def fetch_url_with_retry(url, timeout=15):
"""Fetch URL content with automatic retry on failure"""
response = requests.get(url, timeout=timeout)
response.raise_for_status()
return response.content
# --- Configuration ---
DB_PATH = "facematch.db"
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'webp', 'heic'}
GOOGLE_DRIVE_SCOPES = ['https://www.googleapis.com/auth/drive.readonly']
FIREBASE_BUCKET_NAME = os.getenv("FIREBASE_STORAGE_BUCKET", "face-matching-773sec.firebasestorage.app")
# --- Database Connection Pool ---
_db_lock = threading.Lock()
_db_conn = None
def get_db():
"""Get thread-safe database connection with connection pooling"""
global _db_conn
with _db_lock:
if _db_conn is None:
_db_conn = sqlite3.connect(DB_PATH, check_same_thread=False)
_db_conn.execute("PRAGMA journal_mode=WAL")
logger.info("Database connection pool initialized")
return _db_conn
# Initialize Firebase
if not firebase_admin._apps:
try:
# Try to load from ENV first (for Hugging Face / Cloud)
firebase_key_json = os.getenv("FIREBASE_SERVICE_ACCOUNT_KEY")
if firebase_key_json:
# Parse the JSON string from env var
cred_dict = json.loads(firebase_key_json)
cred = credentials.Certificate(cred_dict)
logger.info("Loaded Firebase credentials from environment variable")
else:
# Fallback to local file
cred = credentials.Certificate("serviceAccountKey.json")
logger.info("Loaded Firebase credentials from local file")
firebase_admin.initialize_app(cred, {
'storageBucket': FIREBASE_BUCKET_NAME
})
logger.info(f"Firebase Admin initialized with bucket: {FIREBASE_BUCKET_NAME}")
except Exception as e:
logger.error(f"Failed to initialize Firebase Admin: {e}")
db_firestore = firestore.client()
# User-specific status for progress tracking
user_indexing_status = {}
def get_user_status(uid: str):
if uid not in user_indexing_status:
user_indexing_status[uid] = {
"is_indexing": False,
"total": 0,
"current": 0,
"folder_id": None
}
return user_indexing_status[uid]
# Initialize App
app = FastAPI(title="FaceMatch Backend")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins for now (Vercel app)
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def read_root():
return {"message": "FaceMatch Backend is running. GO ONLY TO /docs for API documentation."}
# --- Database ---
def init_db():
conn = get_db()
c = conn.cursor()
# Track indexed folders
c.execute('''
CREATE TABLE IF NOT EXISTS folders (
id TEXT PRIMARY KEY,
name TEXT,
last_indexed TEXT,
file_count INTEGER DEFAULT 0
)
''')
c.execute('''
CREATE TABLE IF NOT EXISTS coupons (
code TEXT PRIMARY KEY,
tier TEXT,
discount_percent INTEGER DEFAULT 0,
usage_limit INTEGER DEFAULT 1,
used_count INTEGER DEFAULT 0,
expiry_date TEXT
)
''')
c.execute('''
CREATE TABLE IF NOT EXISTS users (
uid TEXT PRIMARY KEY,
tier TEXT DEFAULT 'free',
matches_count INTEGER DEFAULT 0,
max_matches INTEGER DEFAULT 100,
subscription_period_end TEXT,
last_reset TEXT
)
''')
# migrations
try:
c.execute("ALTER TABLE images ADD COLUMN firebase_url TEXT")
logger.info("Migrated images: added firebase_url")
except sqlite3.OperationalError: pass
try:
c.execute("ALTER TABLE folders ADD COLUMN source_url TEXT")
logger.info("Migrated folders: added source_url")
except sqlite3.OperationalError: pass
try:
c.execute("ALTER TABLE images ADD COLUMN folder_id TEXT")
logger.info("Migrated images: added folder_id")
except sqlite3.OperationalError: pass
try:
c.execute("ALTER TABLE folders ADD COLUMN owner_id TEXT")
logger.info("Migrated folders: added owner_id")
except sqlite3.OperationalError: pass
try:
c.execute("ALTER TABLE folders ADD COLUMN shared_with TEXT")
except sqlite3.OperationalError: pass
try:
c.execute("ALTER TABLE folders ADD COLUMN is_public INTEGER DEFAULT 0")
except sqlite3.OperationalError: pass
c.execute('''
CREATE TABLE IF NOT EXISTS images (
id TEXT,
face_index INTEGER,
name TEXT,
drive_link TEXT,
thumbnail_link TEXT,
embedding BLOB,
firebase_url TEXT,
folder_id TEXT,
PRIMARY KEY(id, face_index)
)
''')
# Create indexes for performance optimization
c.execute('CREATE INDEX IF NOT EXISTS idx_images_folder_id ON images(folder_id)')
c.execute('CREATE INDEX IF NOT EXISTS idx_images_id ON images(id)')
c.execute('CREATE INDEX IF NOT EXISTS idx_folders_owner_id ON folders(owner_id)')
c.execute('CREATE INDEX IF NOT EXISTS idx_folders_is_public ON folders(is_public)')
logger.info("Database indexes created successfully.")
conn.commit()
logger.info("Database initialized successfully.")
init_db()
# --- Startup Sync: Rehydrate SQLite from Firestore ---
def startup_sync():
"""Auto-rehydrate SQLite from Firestore on server startup if database is empty"""
try:
conn = get_db()
with _db_lock:
c = conn.cursor()
c.execute("SELECT COUNT(*) FROM images")
count = c.fetchone()[0]
if count > 0:
logger.info(f"Database already has {count} images. Skipping startup sync.")
return
logger.info("Database is empty. Starting rehydration from Firestore...")
# Fetch all collections from Firestore
collections = db_firestore.collection('collections').stream()
total_rehydrated = 0
for collection_doc in collections:
folder_id = collection_doc.id
folder_data = collection_doc.to_dict()
logger.info(f"Rehydrating collection: {folder_data.get('name', folder_id)}")
# Fetch biometrics for this collection
biometrics_docs = db_firestore.collection('collections').document(folder_id).collection('biometrics').stream()
batch_rows = []
for bio_doc in biometrics_docs:
bio_data = bio_doc.to_dict()
for face in bio_data.get('faces', []):
emb = np.array(face['embedding'], dtype=np.float64)
batch_rows.append((
bio_doc.id,
face['face_index'],
bio_data.get('name'),
bio_data.get('drive_link'),
bio_data.get('thumbnail_link'),
pickle.dumps(emb),
folder_id
))
if batch_rows:
with _db_lock:
c = conn.cursor()
c.executemany(
"INSERT OR REPLACE INTO images (id, face_index, name, drive_link, thumbnail_link, embedding, folder_id) VALUES (?, ?, ?, ?, ?, ?, ?)",
batch_rows
)
conn.commit()
total_rehydrated += len(batch_rows)
logger.info(f"Rehydrated {len(batch_rows)} faces from {folder_data.get('name', folder_id)}")
# Also restore folder metadata
with _db_lock:
c = conn.cursor()
c.execute(
"INSERT OR REPLACE INTO folders (id, name, last_indexed, file_count, source_url, owner_id, is_public) VALUES (?, ?, ?, ?, ?, ?, ?)",
(
folder_id,
folder_data.get('name'),
folder_data.get('last_indexed'),
folder_data.get('file_count', 0),
folder_data.get('source_url'),
folder_data.get('owner_id'),
1 if folder_data.get('is_public', False) else 0
)
)
conn.commit()
if total_rehydrated > 0:
logger.info(f"✓ Startup sync complete. Rehydrated {total_rehydrated} total faces from Firestore.")
else:
logger.info("No data found in Firestore to rehydrate.")
except Exception as e:
logger.error(f"Startup sync failed: {e}")
# Run startup sync
startup_sync()
# --- Firestore Migration: Users, Coupons, Folders ---
def get_or_create_user(uid: str):
user_ref = db_firestore.collection('users').document(uid)
doc = user_ref.get()
if doc.exists:
return doc.to_dict()
else:
new_data = {
"uid": uid,
"tier": "free",
"matches_count": 0,
"max_matches": 100,
"subscription_period_end": None,
"last_reset": time.strftime("%Y-%m-%d %H:%M:%S")
}
user_ref.set(new_data)
return new_data
def increment_usage(uid: str, amount: int = 1):
user_ref = db_firestore.collection('users').document(uid)
user_ref.update({"matches_count": firestore.Increment(amount)})
@app.get("/user-usage/{uid}")
def get_user_usage(uid: str):
return get_or_create_user(uid)
@app.post("/upgrade-plan")
def upgrade_plan(uid: str = Form(...), tier: str = Form(...)):
limits = {"free": 100, "basic": 500, "pro": 2500, "enterprise": 999999999}
if tier not in limits:
raise HTTPException(status_code=400, detail="Invalid tier")
user_ref = db_firestore.collection('users').document(uid)
user_ref.update({
"tier": tier,
"max_matches": limits[tier],
"matches_count": 0
})
return {"message": f"Successfully upgraded to {tier} plan"}
@app.post("/coupons")
def create_coupon(code: str = Form(...), tier: str = Form(...), usage_limit: int = Form(1), discount: int = Form(0)):
coupon_ref = db_firestore.collection('coupons').document(code)
if coupon_ref.get().exists:
raise HTTPException(status_code=400, detail="Coupon code already exists")
coupon_ref.set({
"code": code,
"tier": tier,
"usage_limit": usage_limit,
"used_count": 0,
"discount_percent": discount
})
return {"message": "Coupon created successfully"}
@app.get("/coupons")
def list_coupons():
docs = db_firestore.collection('coupons').stream()
return [doc.to_dict() for doc in docs]
@app.post("/apply-coupon")
def apply_coupon(uid: str = Form(...), code: str = Form(...)):
coupon_ref = db_firestore.collection('coupons').document(code)
doc = coupon_ref.get()
if not doc.exists:
raise HTTPException(status_code=404, detail="Coupon not found")
coupon = doc.to_dict()
if coupon['used_count'] >= coupon['usage_limit']:
raise HTTPException(status_code=403, detail="Coupon usage limit reached")
limits = {"free": 100, "basic": 500, "pro": 2500, "enterprise": 999999999}
tier = coupon['tier']
# Atomic transaction
batch = db_firestore.batch()
user_ref = db_firestore.collection('users').document(uid)
batch.update(user_ref, {"tier": tier, "max_matches": limits[tier], "matches_count": 0})
batch.update(coupon_ref, {"used_count": firestore.Increment(1)})
batch.commit()
return {"message": f"Coupon applied! You are now on the {tier} plan.", "tier": tier}
# --- Face Recognition (Lazy Load) ---
# We lazy load to avoid startup crashes if dlib/cmake issues exist
face_recognition = None
def get_face_recognition():
global face_recognition
if face_recognition is None:
try:
import face_recognition as fr
face_recognition = fr
logger.info("Biometric Engine initialized successfully")
except ImportError:
logger.error("Biometric Engine failed to initialize")
raise HTTPException(status_code=500, detail="Face recognition library not available")
return face_recognition
# --- Google Drive Helpers ---
def get_drive_service():
"""Authenticates with Google Drive using user_credentials.json or ENV"""
creds_json = os.getenv('GOOGLE_USER_CREDENTIALS')
creds_data = None
if creds_json:
creds_data = json.load(io.StringIO(creds_json)) if not isinstance(creds_json, dict) else creds_json
logger.info("Loaded Google Drive credentials from GOOGLE_USER_CREDENTIALS environment variable")
elif os.path.exists('user_credentials.json'):
with open('user_credentials.json', 'r') as f:
creds_data = json.load(f)
logger.info("Loaded Google Drive credentials from local user_credentials.json")
if not creds_data:
logger.error("No Google Drive credentials found! Please set GOOGLE_USER_CREDENTIALS or provide user_credentials.json")
raise Exception("No Google Credentials found (ENV or user_credentials.json)")
creds = Credentials.from_authorized_user_info(creds_data, GOOGLE_DRIVE_SCOPES)
if creds.expired and creds.refresh_token:
try:
creds.refresh(GoogleRequest())
except Exception as e:
logger.error(f"Failed to refresh Google Token: {e}. The token might be expired or revoked.")
raise Exception("Google Drive Token Expired. Please run 'python3 generate_token.py' to re-authenticate.")
return build('drive', 'v3', credentials=creds, cache_discovery=False)
def index_drive_folder(gallery_id: str, owner_id: str):
"""Background task to scan drive folder and index faces recursively with parallel processing"""
logger.info(f"Starting optimized index for gallery: {gallery_id} (Owner: {owner_id})")
status = get_user_status(owner_id)
try:
service = get_drive_service()
# Extract original Drive folder ID from unique gallery ID (format: drive_id_uid_timestamp)
# Use rsplit to handle folder IDs that might contain underscores
parts = gallery_id.rsplit('_', 2) # Split from right, max 2 times
drive_folder_id = parts[0] if len(parts) == 3 else gallery_id
logger.info(f"Extracted Drive folder ID: {drive_folder_id} from gallery ID: {gallery_id}")
# 1. Get folder metadata from Google Drive
folder_meta = service.files().get(fileId=drive_folder_id, fields="name").execute()
folder_name = folder_meta.get('name', 'Unknown Folder')
# Check if gallery already has a custom name in Firestore
try:
existing_doc = db_firestore.collection('collections').document(gallery_id).get()
if existing_doc.exists:
existing_name = existing_doc.to_dict().get('name')
if existing_name and existing_name not in ["New Gallery", "Pending..."]:
folder_name = existing_name # Preserve custom name
except Exception as e:
logger.warning(f"Could not check existing gallery name: {e}")
status.update({
"is_indexing": True,
"total": 0,
"current": 0,
"folder_id": gallery_id
})
# 2. Recursive file discovery
all_items = []
def find_files(f_id):
query = f"'{f_id}' in parents and trashed=false"
results = service.files().list(q=query, fields="files(id, name, mimeType, webContentLink, thumbnailLink)").execute()
for f in results.get('files', []):
if 'image/' in f['mimeType'] or f['mimeType'] == 'application/octet-stream': # Octet-stream for some HEIC
all_items.append(f)
elif f['mimeType'] == 'application/vnd.google-apps.folder':
find_files(f['id'])
find_files(drive_folder_id)
status["total"] = len(all_items)
# 3. Bulk skip check (Get existing IDs)
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("PRAGMA journal_mode=WAL")
c.execute("SELECT DISTINCT id FROM images")
existing_ids = {row[0] for row in c.fetchall()}
to_process = [item for item in all_items if item['id'] not in existing_ids]
skipped = len(all_items) - len(to_process)
status["current"] = skipped
logger.info(f"Discovery complete: {len(all_items)} total nodes, {skipped} already cached, {len(to_process)} to process.")
if skipped > 0:
logger.info(f"Skipping {skipped} previously indexed nodes to save time.")
def process_single_file(item):
file_id = item['id']
try:
# Thread-safe service and recognition instances
logger.debug(f"Worker active for node: {item['name']}")
local_service = get_drive_service()
local_fr = get_face_recognition()
request = local_service.files().get_media(fileId=file_id)
fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, request)
done = False
while not done:
_, done = downloader.next_chunk()
fh.seek(0)
pil_img = Image.open(fh).convert("RGB")
# Biometric optimization: Downscale to 1000px max
if max(pil_img.size) > 1000:
pil_img.thumbnail((1000, 1000), Image.LANCZOS)
image_np = np.array(pil_img)
encodings = local_fr.face_encodings(image_np)
logger.info(f"Biometric signal isolated: Detected {len(encodings)} subjects in {item['name']}")
# Safety check: If a normal image has 100+ faces, it's likely noise/false positives
if len(encodings) > 100:
logger.warning(f"Abnormal face count ({len(encodings)}) in {item['name']}. Likely false positives. Skipping.")
return []
# Cleanup memory immediately
del pil_img
del image_np
gc.collect()
results = []
for idx, encoding in enumerate(encodings):
results.append((
file_id,
idx,
item['name'],
item.get('webContentLink'),
item.get('thumbnailLink'),
pickle.dumps(encoding)
))
return results
except Exception as e:
logger.error(f"Failed processing {file_id}: {e}")
return []
# 4. Parallel Execution
from concurrent.futures import as_completed
newly_indexed_files = 0
total_faces_found = 0
# Firestore batch setup for persistence
firestore_batch = db_firestore.batch()
batch_count = 0
# Reduced max_workers to 2 to avoid CPU starvation on HF Spaces
with ThreadPoolExecutor(max_workers=2) as executor:
# Distribute tasks to workers
future_to_item = {executor.submit(process_single_file, item): item for item in to_process}
for future in as_completed(future_to_item):
item = future_to_item[future]
try:
file_results = future.result()
if file_results:
# SQLite Insert
c.executemany(
"INSERT INTO images (id, face_index, name, drive_link, thumbnail_link, embedding, folder_id) VALUES (?, ?, ?, ?, ?, ?, ?)",
[(*r, gallery_id) for r in file_results]
)
# Firestore Persistence (Sub-collection 'biometrics')
# Group by file_id to optimize Firestore writes
face_data = []
for r in file_results:
# r structure: (id, face_index, name, drive_link, thumbnail_link, pickle_blob)
encoding = pickle.loads(r[5])
face_data.append({
"face_index": r[1],
"embedding": encoding.tolist()
})
file_biometrics_ref = db_firestore.collection('collections').document(gallery_id).collection('biometrics').document(item['id'])
firestore_batch.set(file_biometrics_ref, {
"name": item['name'],
"drive_link": item.get('webContentLink'),
"thumbnail_link": item.get('thumbnailLink'),
"faces": face_data
})
batch_count += 1
# Commit Firestore batch every 100 files
if batch_count >= 100:
firestore_batch.commit()
firestore_batch = db_firestore.batch()
batch_count = 0
newly_indexed_files += 1
total_faces_found += len(file_results)
logger.info(f"Biometric indexing successful: {item['name']} ({len(file_results)} faces)")
else:
logger.debug(f"No biometric data found: {item['name']}")
except Exception as e:
logger.error(f"Worker failure for {item['id']}: {e}")
# Update progress immediately
status["current"] += 1
# Commit every 5 completions to keep progress bar fluid
if status["current"] % 5 == 0:
conn.commit()
# Final commits
conn.commit()
if batch_count > 0:
firestore_batch.commit()
# 5. Update collection metadata in Firestore for persistence
folder_ref = db_firestore.collection('collections').document(gallery_id)
folder_ref.set({
"id": gallery_id,
"name": folder_name,
"last_indexed": time.strftime("%Y-%m-%d %H:%M:%S"),
"file_count": len(all_items),
"source_url": f"https://drive.google.com/drive/folders/{drive_folder_id}",
"source_folder_id": drive_folder_id,
"owner_id": owner_id
}, merge=True)
# 6. Local cache for Search Join performance
try:
conn_local = get_db()
with _db_lock:
c_local = conn_local.cursor()
c_local.execute("INSERT OR REPLACE INTO folders (id, name, last_indexed, file_count, source_url, owner_id) VALUES (?, ?, ?, ?, ?, ?)",
(gallery_id, folder_name, time.strftime("%Y-%m-%d %H:%M:%S"), len(all_items), f"https://drive.google.com/drive/folders/{drive_folder_id}", owner_id))
conn_local.commit()
except Exception as e:
logger.error(f"Failed to update local folders cache: {e}")
conn.close()
logger.info(f"Indexing complete. Processed {newly_indexed_files} images, found {total_faces_found} total faces.")
except Exception as e:
logger.error(f"Global indexing failure: {e}")
finally:
status["is_indexing"] = False
def index_public_url(url: str, owner_id: str, gallery_id: str = None):
"""Background task to index a single public image URL"""
logger.info(f"Indexing public image node: {url} for {owner_id}")
status = get_user_status(owner_id)
folder_id = gallery_id if gallery_id else f"public_{owner_id}" # Use provided gallery_id or fallback
try:
status.update({
"is_indexing": True,
"total": 1,
"current": 0,
"folder_id": folder_id
})
# Check if already indexed
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("SELECT id FROM images WHERE drive_link=?", (url,))
if c.fetchone():
logger.info("URL already indexed as a public image node")
return
# Fetch image with retry logic
contents = fetch_url_with_retry(url, timeout=15)
fr = get_face_recognition()
image = fr.load_image_file(io.BytesIO(contents))
encodings = fr.face_encodings(image)
if not encodings:
logger.warn("No faces found in public image node")
return
file_name = url.split("/")[-1] or "public_image.jpg"
for idx, encoding in enumerate(encodings):
c.execute(
"INSERT INTO images (id, face_index, name, drive_link, thumbnail_link, embedding, folder_id) VALUES (?, ?, ?, ?, ?, ?, ?)",
(f"url_{int(time.time())}_{idx}", idx, file_name, url, url, pickle.dumps(encoding), folder_id)
)
# Add to Firestore collections
folder_ref = db_firestore.collection('collections').document(folder_id)
folder_ref.set({
"id": folder_id,
"name": "Public Web Nodes",
"last_indexed": time.strftime("%Y-%m-%d %H:%M:%S"),
"file_count": 1,
"source_url": url,
"owner_id": owner_id
}, merge=True)
# Local cache
try:
c.execute("INSERT OR REPLACE INTO folders (id, name, last_indexed, file_count, source_url, owner_id) VALUES (?, ?, ?, ?, ?, ?)",
(folder_id, "Public Web Nodes", time.strftime("%Y-%m-%d %H:%M:%S"), 1, url, owner_id))
except: pass
conn.commit()
conn.close()
logger.info(f"Registered {len(encodings)} biometric markers from public node")
except Exception as e:
logger.error(f"Public node indexing failed: {e}")
finally:
status["current"] = 1
status["is_indexing"] = False
# --- Endpoints ---
@app.get("/status")
def get_status(uid: str, background_tasks: BackgroundTasks):
"""Returns the current system status and indexing progress for the specific user"""
# Simply using a query param, but generally should be in Header or Auth token.
# The frontend passes uid as query param mostly? Or we can just take it.
conn = get_db()
with _db_lock:
c = conn.cursor()
c.execute("SELECT COUNT(DISTINCT id) FROM images")
count = c.fetchone()[0]
status = get_user_status(uid)
return {
"status": "online",
"total_indexed": count,
"is_indexing": status["is_indexing"],
"current": status["current"],
"total": status["total"],
"folder_id": status["folder_id"]
}
@app.get("/folders")
def get_folders(uid: Optional[str] = None):
"""Returns a list of indexed folders accessible to the user (owned, shared, or public)"""
if not uid:
return {"collections": []}
collections_ref = db_firestore.collection('collections')
# We want (owner_id == uid) OR (shared_with contains uid) OR (is_public == True)
# Since Firestore OR queries are complex, we'll fetch them separately or filter.
# Given the scale, fetching owned, shared, and public separately is fine.
folders = {}
# 1. Owned folders
owned_docs = collections_ref.where("owner_id", "==", uid).stream()
for doc in owned_docs:
folders[doc.id] = doc.to_dict()
# 2. Shared folders
try:
shared_docs = collections_ref.where("shared_with", "array_contains", uid).stream()
for doc in shared_docs:
folders[doc.id] = doc.to_dict()
except Exception as e:
logger.warning(f"Shared folders query failed: {e}")
# 3. Public folders
try:
public_docs = collections_ref.where("is_public", "==", True).stream()
for doc in public_docs:
folders[doc.id] = doc.to_dict()
except Exception as e:
logger.warning(f"Public folders query failed: {e}")
# Convert dict to list and sort
folder_list = list(folders.values())
# Ensure ID is present in each object for the frontend
for item, fid in zip(folder_list, folders.keys()):
if 'id' not in item:
item['id'] = fid
folder_list.sort(key=lambda x: x.get('last_indexed', ''), reverse=True)
return {"collections": folder_list}
@app.post("/collections/toggle-public")
def toggle_collection_public(collection_id: str = Form(...), uid: str = Form(...)):
"""Toggles the public status of a collection (Owner only)"""
# 1. Update Firestore
doc_ref = db_firestore.collection("collections").document(collection_id)
doc = doc_ref.get()
if not doc.exists:
raise HTTPException(status_code=404, detail="Collection not found")
data = doc.to_dict()
if data.get("owner_id") != uid:
raise HTTPException(status_code=403, detail="Only the owner can change visibility")
new_public = not data.get("is_public", False)
doc_ref.update({"is_public": new_public})
# 2. Update SQLite
try:
conn = get_db()
with _db_lock:
c = conn.cursor()
c.execute("UPDATE folders SET is_public = ? WHERE id = ?", (1 if new_public else 0, collection_id))
conn.commit()
except Exception as e:
logger.error(f"Failed to sync visibility to SQLite: {e}")
return {"message": "Visibility updated", "is_public": new_public}
@app.post("/collections/join")
def join_collection(uid: str = Form(...), collection_id: str = Form(...)):
"""Allows a user to add an existing collection to their list (Share/Import)"""
col_ref = db_firestore.collection('collections').document(collection_id)
doc = col_ref.get()
if not doc.exists:
raise HTTPException(status_code=404, detail="Collection not found")
data = doc.to_dict()
# If already owner, do nothing
if data.get('owner_id') == uid:
return {"message": "You already own this collection"}
# Add to shared_with
col_ref.update({
"shared_with": firestore.ArrayUnion([uid])
})
return {"message": f"Successfully added collection: {data.get('name')}"}
@app.post("/collections/rename")
def rename_collection(collection_id: str = Form(...), new_name: str = Form(...), uid: str = Form(...)):
"""Rename a gallery/collection"""
if not new_name or len(new_name.strip()) == 0:
raise HTTPException(status_code=400, detail="Gallery name cannot be empty")
# Verify collection exists and user has permission
col_ref = db_firestore.collection('collections').document(collection_id)
doc = col_ref.get()
if not doc.exists:
raise HTTPException(status_code=404, detail="Gallery not found")
data = doc.to_dict()
# Check if user is owner
if data.get('owner_id') != uid:
raise HTTPException(status_code=403, detail="Only the owner can rename this gallery")
# Update name in Firestore
col_ref.update({"name": new_name.strip()})
# Update local SQLite cache
try:
conn = get_db()
with _db_lock:
c = conn.cursor()
c.execute("UPDATE folders SET name = ? WHERE id = ?", (new_name.strip(), collection_id))
conn.commit()
logger.info(f"Renamed gallery {collection_id} to '{new_name}'")
except Exception as e:
logger.error(f"Failed to update SQLite folder name: {e}")
return {
"message": "Gallery renamed successfully",
"collection_id": collection_id,
"new_name": new_name.strip()
}
@app.delete("/collections/{collection_id}")
def delete_collection(collection_id: str, uid: str = Form(...)):
"""Deletes a specific collection and its associated biometric data"""
# 1. Verify ownership and existence
col_ref = db_firestore.collection('collections').document(collection_id)
doc = col_ref.get()
if not doc.exists:
raise HTTPException(status_code=404, detail="Collection not found")
data = doc.to_dict()
if data.get('owner_id') != uid:
raise HTTPException(status_code=403, detail="Only the owner can delete this collection")
try:
# 2. Delete from Firestore (Recursive delete of subcollections is tricky in client SDKs,
# so we'll delete the document and let a cloud function handle cleanup or just delete known subcollections)
# For this MVP, we will try to delete the biometrics subcollection manually if possible
# Delete biometrics subcollection documents (batch)
bio_docs = col_ref.collection('biometrics').limit(500).stream()
batch = db_firestore.batch()
count = 0
for bio_doc in bio_docs:
batch.delete(bio_doc.reference)
count += 1
if count >= 400:
batch.commit()
batch = db_firestore.batch()
count = 0
if count > 0:
batch.commit()
# Delete the collection document itself
col_ref.delete()
# 3. Delete from SQLite
conn = get_db()
with _db_lock:
c = conn.cursor()
# Delete images associated with this folder
c.execute("DELETE FROM images WHERE folder_id = ?", (collection_id,))
# Delete the folder record
c.execute("DELETE FROM folders WHERE id = ?", (collection_id,))
conn.commit()
logger.info(f"Deleted gallery {collection_id} and associated data.")
return {"message": f"Gallery '{data.get('name')}' deleted successfully"}
except Exception as e:
logger.error(f"Failed to delete collection {collection_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/clear")
def clear_database():
"""Wipes all face biometric data but preserves source link metadata in Firestore for re-indexing"""
conn = get_db()
try:
with _db_lock:
c = conn.cursor()
c.execute("DELETE FROM images")
conn.commit()
# Reset file counts in Firestore collections to 0, but keep the links
docs = db_firestore.collection('collections').stream()
for doc in docs:
doc.reference.update({"file_count": 0})
logger.info("Database cleared. Biometric data purged. Source links preserved in cloud.")
return {"message": "Success! Biometric data purged. Source links preserved in cloud."}
except Exception as e:
logger.error(f"Failed to clear database: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/index")
def trigger_index(background_tasks: BackgroundTasks, folder_url: str = Form(...), uid: str = Form(...), gallery_name: Optional[str] = Form(None)):
"""Triggers background indexing of a Google Drive folder or Public URL - Creates NEW gallery each time"""
status = get_user_status(uid)
if status["is_indexing"]:
raise HTTPException(status_code=409, detail="Indexing already in progress for this user")
# 1. Extract Info
source_folder_id = None
folder_name = gallery_name or "New Gallery"
is_drive = False
if "drive.google.com/drive/folders" in folder_url:
try:
source_folder_id = folder_url.split('/')[-1].split('?')[0]
is_drive = True
except:
raise HTTPException(status_code=400, detail="Invalid Google Drive Folder URL")
elif folder_url.startswith("http"):
source_folder_id = "web_image"
folder_name = gallery_name or "Web Link"
else:
raise HTTPException(status_code=400, detail="Invalid URL format. Must be G-Drive Folder or Public Image Link.")
# 2. Create UNIQUE gallery ID (allows same source to be indexed multiple times as different galleries)
unique_gallery_id = f"{source_folder_id}_{uid}_{int(time.time() * 1000)}"
# 3. Pre-create record in Firestore so it shows up in UI immediately
try:
col_ref = db_firestore.collection('collections').document(unique_gallery_id)
col_ref.set({
"id": unique_gallery_id,
"name": folder_name,
"last_indexed": "Pending...",
"file_count": 0,
"source_url": folder_url,
"source_folder_id": source_folder_id, # Original Drive folder ID for reference
"owner_id": uid,
"is_public": False,
"created_at": time.strftime("%Y-%m-%d %H:%M:%S")
})
logger.info(f"Created new gallery: {unique_gallery_id} with name '{folder_name}'")
except Exception as e:
logger.error(f"Failed to pre-create collection record: {e}")
raise HTTPException(status_code=500, detail=f"Failed to create gallery: {e}")
# 4. Start Task
if is_drive:
background_tasks.add_task(index_drive_folder, unique_gallery_id, uid)
return {
"message": f"New gallery created: {folder_name}",
"type": "drive_folder",
"gallery_id": unique_gallery_id,
"gallery_name": folder_name
}
else:
background_tasks.add_task(index_public_url, folder_url, uid, unique_gallery_id)
return {
"message": f"New gallery created: {folder_name}",
"type": "public_url",
"gallery_id": unique_gallery_id,
"gallery_name": folder_name
}
@app.get("/thumbnail/{file_id}")
async def get_thumbnail(file_id: str):
"""Proxy image content from Google Drive to avoid CORs/Auth issues on frontend"""
try:
# Check if we have a Firebase URL for this file first (Optimized)
conn = get_db()
with _db_lock:
c = conn.cursor()
c.execute("SELECT firebase_url FROM images WHERE id=?", (file_id,))
row = c.fetchone()
if row and row[0]:
# Redirect to Firebase URL if available
from fastapi.responses import RedirectResponse
return RedirectResponse(url=row[0])
service = get_drive_service()
request = service.files().get_media(fileId=file_id)
fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, request)
done = False
while not done:
_, done = downloader.next_chunk()
fh.seek(0)
return StreamingResponse(fh, media_type="image/jpeg")
except Exception as e:
# Fallback placeholder
return StreamingResponse(io.BytesIO(b""), media_type="image/jpeg")
@app.post("/search")
async def search_face(
request: Request,
file: Optional[UploadFile] = File(None),
url: Optional[str] = Form(None),
limit: int = Form(1),
uid: str = Form(...),
folder_id: Optional[str] = Form(None),
threshold: float = Form(0.6)
):
logger.info(f"Received search request for UID {uid}. File: {file.filename if file else 'None'}, URL: {url}, Limit: {limit}, FolderId: {folder_id}, Threshold: {threshold}")
# Check Usage Limit
user_stats = get_or_create_user(uid)
if user_stats["matches_count"] >= user_stats["max_matches"]:
raise HTTPException(status_code=403, detail="Search usage limit reached. Please upgrade your plan.")
if not file and not url:
raise HTTPException(status_code=400, detail="No search subject provided. Upload a file or provide a public URL.")
# Variable to hold the stable Firebase URL for the query image
query_firebase_url = None
try:
fr = get_face_recognition()
contents = b""
if file:
contents = await file.read()
logger.info(f"Read {len(contents)} bytes from upload")
elif url:
logger.info(f"Fetching remote image from URL: {url}")
contents = fetch_url_with_retry(url, timeout=15)
logger.info(f"Fetched {len(contents)} bytes from remote URL")
# --- BACKUP QUERY IMAGE TO FIREBASE ---
try:
if firebase_admin._apps:
bucket = storage.bucket()
# Unique name for query
query_filename = f"queries/query_{int(time.time())}_{np.random.randint(1000,9999)}.jpg"
blob = bucket.blob(query_filename)
blob.upload_from_string(contents, content_type='image/jpeg')
blob.make_public()
query_firebase_url = blob.public_url
logger.info(f"Query image backed up to Firebase: {query_firebase_url}")
except Exception as fb_e:
logger.error(f"Failed to backup query image to Firebase: {fb_e}")
# --------------------------------------
image_stream = io.BytesIO(contents)
# Compute Query Embedding
try:
# Try standard open (handled by registered heif opener)
try:
pil_image = Image.open(image_stream)
except Exception as e:
# ... (rest of search function)
logger.info(f"Standard Image.open failed, trying direct pillow_heif: {e}")
import pillow_heif
image_stream.seek(0)
heif_file = pillow_heif.read_heif(image_stream)
pil_image = Image.frombytes(
heif_file.mode,
heif_file.size,
heif_file.data,
"raw",
heif_file.mode,
heif_file.stride,
)
if pil_image.mode != "RGB":
pil_image = pil_image.convert("RGB")
# Convert PIL image to numpy array for face_recognition
image = np.array(pil_image)
query_encodings = fr.face_encodings(image)
except Exception as e:
logger.error(f"Image processing error: {e}")
raise HTTPException(status_code=400, detail=f"Invalid image format or processing error: {e}")
if len(query_encodings) == 0:
logger.info("No face found in query image")
return {"hasMatch": False, "message": "No face found in query image"}
query_encoding = query_encodings[0]
# Compare with DB
logger.info("Opening database for matching")
conn = get_db()
with _db_lock:
c = conn.cursor()
# Left join to get source_url from folders table using folder_id
# If folder_id is provided, filter by it
if folder_id:
logger.info(f"Filtering search to folder: {folder_id}")
c.execute("""
SELECT i.id, i.name, i.drive_link, i.thumbnail_link, i.embedding, i.firebase_url, f.source_url
FROM images i
LEFT JOIN folders f ON i.folder_id = f.id
WHERE i.folder_id = ?
""", (folder_id,))
else:
logger.info("Searching across all folders")
c.execute("""
SELECT i.id, i.name, i.drive_link, i.thumbnail_link, i.embedding, i.firebase_url, f.source_url
FROM images i
LEFT JOIN folders f ON i.folder_id = f.id
""")
rows = c.fetchall()
# DEBUG: Check if there are ANY rows in the table at all
c.execute("SELECT COUNT(*) FROM images")
global_count = c.fetchone()[0]
logger.info(f"Found {len(rows)} image faces in database for folder '{folder_id}'. Total faces in DB: {global_count}")
if not rows:
# Rehydrate from Firestore if SQLite is empty
logger.info(f"SQLite empty for folder {folder_id}. Checking Firestore for persistence...")
try:
# 1. Verify folder exists in Firestore
f_doc = db_firestore.collection('collections').document(folder_id).get()
if f_doc.exists:
# 2. Fetch all biometrics for this folder
bio_docs = db_firestore.collection('collections').document(folder_id).collection('biometrics').stream()
rehydrated_rows = []
conn_re = sqlite3.connect(DB_PATH)
c_re = conn_re.cursor()
c_re.execute("PRAGMA journal_mode=WAL")
count = 0
for doc in bio_docs:
d = doc.to_dict()
for face in d.get('faces', []):
emb = np.array(face['embedding'], dtype=np.float64)
rehydrated_rows.append((
doc.id,
face['face_index'],
d.get('name'),
d.get('drive_link'),
d.get('thumbnail_link'),
pickle.dumps(emb),
folder_id
))
count += 1
if rehydrated_rows:
logger.info(f"Rehydrating {count} faces from Firestore into SQLite...")
c_re.executemany(
"INSERT OR REPLACE INTO images (id, face_index, name, drive_link, thumbnail_link, embedding, folder_id) VALUES (?, ?, ?, ?, ?, ?, ?)",
rehydrated_rows
)
conn_re.commit()
# Re-run search query
c_re.execute("""
SELECT i.id, i.name, i.drive_link, i.thumbnail_link, i.embedding, i.firebase_url, f.source_url
FROM images i
LEFT JOIN folders f ON i.folder_id = f.id
WHERE i.folder_id = ?
""", (folder_id,))
rows = c_re.fetchall()
global_count += count
conn_re.close()
except Exception as e:
logger.error(f"Rehydration failed: {e}")
# Final check after rehydration attempt
if not rows:
if global_count == 0:
msg = "Your database is empty. Please index a Google Drive folder first."
else:
msg = f"No images found for the selected gallery '{folder_id}'. Please re-index it."
return {"hasMatch": False, "message": msg}
known_encodings = []
metadata = []
for row in rows:
try:
encoding = pickle.loads(row[4])
known_encodings.append(encoding)
metadata.append({
"file_id": row[0],
"name": row[1],
"drive_link": row[2],
"thumbnail": row[3],
"firebase_url": row[5],
"source_url": row[6]
})
except Exception as e:
logger.warning(f"Failed to unpickle encoding for row {row[0]}: {e}")
continue
if not known_encodings:
return {"hasMatch": False, "message": "No valid face encodings found in database"}
# Check matches and distances
face_distances = fr.face_distance(known_encodings, query_encoding)
# Zip distances with metadata
results = []
base_url = str(request.base_url).rstrip('/')
for i, dist in enumerate(face_distances):
if dist <= threshold: # Configurable tolerance threshold (default: 0.6)
content_id = metadata[i]['file_id']
thumb = metadata[i]['thumbnail']
firebase_ref = metadata[i]['firebase_url']
# Priority: Firebase URL > G-Drive Proxy > Original Link
if firebase_ref:
thumb = firebase_ref
elif not content_id.startswith('url_'):
# If it's a G-Drive file and no firebase_url, use our proxy endpoint to avoid 403
thumb = f"{base_url}/thumbnail/{content_id}"
results.append({
"driveLink": metadata[i]['drive_link'],
"thumbnail": thumb,
"confidence": float(1 - dist),
"name": metadata[i]['name'],
"firebaseUrl": firebase_ref,
"sourceUrl": metadata[i]['source_url']
})
# Sort by best confidence
results.sort(key=lambda x: x['confidence'], reverse=True)
# Apply limit (if limit is 0, return all)
final_results = results if limit == 0 else results[:limit]
if final_results:
logger.info(f"Match found! Returning {len(final_results)} results.")
# Record matched images found against quota
increment_usage(uid, len(final_results))
return {
"hasMatch": True,
"matches": final_results,
"query_backup_url": query_firebase_url
}
logger.info("No match found below tolerance")
return {
"hasMatch": False,
"query_backup_url": query_firebase_url
}
except HTTPException as he:
raise he
except Exception as e:
logger.exception("Unexpected error during search")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)