Spaces:
Running
Running
| # main.py | |
| import asyncio | |
| import os | |
| import shutil | |
| import uuid | |
| import re | |
| import inflect | |
| from urllib.parse import urlparse | |
| from typing import List | |
| from contextlib import asynccontextmanager | |
| from collections import OrderedDict | |
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import cloudinary | |
| import cloudinary.uploader | |
| import cloudinary.api | |
| from pinecone import Pinecone, ServerlessSpec | |
| # ── Deferred imports ───────────────────────────────────────────── | |
| ai = None | |
| p = inflect.engine() | |
| MAX_CONCURRENT_INFERENCES = int(os.getenv("MAX_CONCURRENT_INFERENCES", "6")) | |
| _inference_sem: asyncio.Semaphore | |
| _pinecone_pool = OrderedDict() | |
| _POOL_MAX = 64 | |
| IDX_FACES = "enterprise-faces" | |
| IDX_OBJECTS = "enterprise-objects" | |
| def _get_pinecone(api_key: str) -> Pinecone: | |
| if api_key not in _pinecone_pool: | |
| if len(_pinecone_pool) >= _POOL_MAX: | |
| _pinecone_pool.popitem(last=False) | |
| _pinecone_pool[api_key] = Pinecone(api_key=api_key) | |
| _pinecone_pool.move_to_end(api_key) | |
| return _pinecone_pool[api_key] | |
| # ── Cloudinary: credentials injected per-call, NEVER globally configured. | |
| # If cloudinary.config() is called once, it applies to the whole process — | |
| # User A's credentials would bleed into User B's request under concurrency. | |
| def _cld_upload(tmp_path, folder, creds): | |
| return cloudinary.uploader.upload( | |
| tmp_path, folder=folder, | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"], | |
| ) | |
| def _cld_ping(creds): | |
| return cloudinary.api.ping( | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"], | |
| ) | |
| def _cld_root_folders(creds): | |
| return cloudinary.api.root_folders( | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"], | |
| ) | |
| async def lifespan(app: FastAPI): | |
| global ai, _inference_sem | |
| from src.models import AIModelManager | |
| print("⏳ Loading AI models …") | |
| loop = asyncio.get_event_loop() | |
| ai = await loop.run_in_executor(None, AIModelManager) | |
| _inference_sem = asyncio.Semaphore(MAX_CONCURRENT_INFERENCES) | |
| print("✅ Ready!") | |
| yield | |
| app = FastAPI(lifespan=lifespan) | |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) | |
| os.makedirs("temp_uploads", exist_ok=True) | |
| def standardize_category_name(name: str) -> str: | |
| clean = re.sub(r'\s+', '_', name.strip().lower()) | |
| clean = re.sub(r'[^\w]', '', clean) | |
| return p.singular_noun(clean) or clean | |
| def sanitize_filename(filename: str) -> str: | |
| return re.sub(r'[^\w.\-]', '', re.sub(r'\s+', '_', filename)) | |
| def get_cloudinary_creds(env_url: str) -> dict: | |
| if not env_url: | |
| return {} | |
| parsed = urlparse(env_url) | |
| return {"api_key": parsed.username, "api_secret": parsed.password, "cloud_name": parsed.hostname} | |
| # ══════════════════════════════════════════════════════════════════ | |
| # 1. VERIFY KEYS & AUTO-BUILD INDEXES | |
| # ══════════════════════════════════════════════════════════════════ | |
| async def verify_keys(pinecone_key: str = Form(""), cloudinary_url: str = Form("")): | |
| if cloudinary_url: | |
| try: | |
| creds_v = get_cloudinary_creds(cloudinary_url) | |
| if not creds_v.get("cloud_name"): raise ValueError("bad url") | |
| await asyncio.to_thread(_cld_ping, creds_v) | |
| except HTTPException: raise | |
| except Exception: | |
| raise HTTPException(400, "Invalid Cloudinary Environment URL.") | |
| if pinecone_key: | |
| try: | |
| pc = _get_pinecone(pinecone_key) | |
| existing = {idx.name for idx in await asyncio.to_thread(pc.list_indexes)} | |
| tasks = [] | |
| if IDX_OBJECTS not in existing: | |
| tasks.append(asyncio.to_thread(pc.create_index, name=IDX_OBJECTS, dimension=1536, metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1"))) | |
| if IDX_FACES not in existing: | |
| tasks.append(asyncio.to_thread(pc.create_index, name=IDX_FACES, dimension=512, metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1"))) | |
| if tasks: | |
| await asyncio.gather(*tasks) | |
| except Exception as e: | |
| raise HTTPException(400, f"Pinecone Error: {e}") | |
| return {"message": "Keys verified and indexes ready!"} | |
| # ══════════════════════════════════════════════════════════════════ | |
| # 2. UPLOAD | |
| # ══════════════════════════════════════════════════════════════════ | |
| async def upload_new_images(files: List[UploadFile] = File(...), folder_name: str = Form(...), detect_faces: bool = Form(True), user_pinecone_key: str = Form(""), user_cloudinary_url: str = Form("")): | |
| # DEFENSIVE FIX: The 'or ""' ensures it never becomes None, preventing 500 crashes | |
| actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "") | |
| actual_cld_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "") | |
| if not actual_pc_key or not actual_cld_url: | |
| raise HTTPException(400, "API Keys are missing. If you are a guest, the server is missing its DEFAULT_ secrets in Hugging Face.") | |
| folder = standardize_category_name(folder_name) | |
| uploaded_urls = [] | |
| creds = get_cloudinary_creds(actual_cld_url) | |
| if not creds.get("cloud_name"): | |
| raise HTTPException(400, "Invalid Cloudinary URL format.") | |
| pc = _get_pinecone(actual_pc_key) | |
| idx_obj = pc.Index(IDX_OBJECTS) | |
| idx_face = pc.Index(IDX_FACES) | |
| for file in files: | |
| tmp_path = f"temp_uploads/{uuid.uuid4().hex}_{sanitize_filename(file.filename)}" | |
| try: | |
| with open(tmp_path, "wb") as buf: | |
| shutil.copyfileobj(file.file, buf) | |
| res = await asyncio.to_thread(_cld_upload, tmp_path, folder, creds) | |
| image_url = res["secure_url"] | |
| uploaded_urls.append(image_url) | |
| async with _inference_sem: | |
| vectors = await ai.process_image_async(tmp_path, is_query=False, detect_faces=detect_faces) | |
| face_upserts, object_upserts = [], [] | |
| for v in vectors: | |
| vec_list = v["vector"].tolist() if hasattr(v["vector"], "tolist") else v["vector"] | |
| record = {"id": str(uuid.uuid4()), "values": vec_list, "metadata": {"url": image_url, "folder": folder}} | |
| (face_upserts if v["type"] == "face" else object_upserts).append(record) | |
| upsert_tasks = [] | |
| if face_upserts: upsert_tasks.append(asyncio.to_thread(idx_face.upsert, vectors=face_upserts)) | |
| if object_upserts: upsert_tasks.append(asyncio.to_thread(idx_obj.upsert, vectors=object_upserts)) | |
| if upsert_tasks: await asyncio.gather(*upsert_tasks) | |
| except Exception as e: | |
| print(f"❌ Upload error: {e}") | |
| raise HTTPException(500, f"Upload processing failed: {str(e)}") | |
| finally: | |
| if os.path.exists(tmp_path): os.remove(tmp_path) | |
| return {"message": "Done!", "urls": uploaded_urls} | |
| # ══════════════════════════════════════════════════════════════════ | |
| # 3. SEARCH | |
| # ══════════════════════════════════════════════════════════════════ | |
| async def search_database(file: UploadFile = File(...), detect_faces: bool = Form(True), user_pinecone_key: str = Form(""), user_cloudinary_url: str = Form("")): | |
| actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "") | |
| if not actual_pc_key: | |
| raise HTTPException(400, "Pinecone Key is missing. If you are a guest, the server is missing its DEFAULT_PINECONE_KEY in Hugging Face.") | |
| tmp_path = f"temp_uploads/query_{uuid.uuid4().hex}_{sanitize_filename(file.filename)}" | |
| try: | |
| with open(tmp_path, "wb") as buf: | |
| shutil.copyfileobj(file.file, buf) | |
| async with _inference_sem: | |
| vectors = await ai.process_image_async(tmp_path, is_query=True, detect_faces=detect_faces) | |
| pc = _get_pinecone(actual_pc_key) | |
| idx_obj = pc.Index(IDX_OBJECTS) | |
| idx_face = pc.Index(IDX_FACES) | |
| async def _query_one(vec_dict: dict): | |
| vec_list = vec_dict["vector"].tolist() if hasattr(vec_dict["vector"], "tolist") else vec_dict["vector"] | |
| target_idx = idx_face if vec_dict["type"] == "face" else idx_obj | |
| try: | |
| res = await asyncio.to_thread(target_idx.query, vector=vec_list, top_k=10, include_metadata=True) | |
| except Exception as e: | |
| if "404" in str(e): | |
| raise HTTPException(404, f"Pinecone Index not found. Please log in and click 'Verify Keys' in Settings to build the indexes.") | |
| raise e | |
| out = [] | |
| for match in res.get("matches", []): | |
| score = match["score"] | |
| is_face = vec_dict["type"] == "face" | |
| # ── Score filtering ────────────────────────────────────── | |
| # Face lane: GhostFaceNet 512-D cosine similarity. | |
| # Raw scores 0.3-0.5 = same person. Remap to 75-99% for UI. | |
| if is_face: | |
| RAW_THRESHOLD = 0.35 # matches original cloud_db.py | |
| if score < RAW_THRESHOLD: | |
| continue | |
| ui_score = 0.75 + ((score - RAW_THRESHOLD) / (1.0 - RAW_THRESHOLD)) * 0.24 | |
| ui_score = min(0.99, ui_score) | |
| else: | |
| # Object lane: SigLIP+DINOv2 1536-D fused cosine similarity. | |
| # Matches original cloud_db.py min_score=0.45 floor. | |
| # Scores below 0.45 are pure noise — unrelated images. | |
| MIN_OBJECT_SCORE = 0.45 | |
| if score < MIN_OBJECT_SCORE: | |
| continue | |
| ui_score = score | |
| caption = "👤 Verified Identity" if is_face else match["metadata"].get("folder", "🎯 Object Match") | |
| out.append({ | |
| "url": match["metadata"].get("url") or match["metadata"].get("image_url", ""), # "image_url" = legacy key from cloud_db.py | |
| "score": round(ui_score, 4), | |
| "caption": caption, | |
| }) | |
| return out | |
| nested = await asyncio.gather(*[_query_one(v) for v in vectors]) | |
| all_results = [r for sub in nested for r in sub] | |
| seen = {} | |
| for r in all_results: | |
| url = r["url"] | |
| if url not in seen or r["score"] > seen[url]["score"]: | |
| seen[url] = r | |
| return {"results": sorted(seen.values(), key=lambda x: x["score"], reverse=True)[:10]} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| print(f"❌ Search error: {e}") | |
| raise HTTPException(500, str(e)) | |
| finally: | |
| if os.path.exists(tmp_path): os.remove(tmp_path) | |
| # ══════════════════════════════════════════════════════════════════ | |
| # 4. CATEGORIES | |
| # ══════════════════════════════════════════════════════════════════ | |
| async def get_categories(user_cloudinary_url: str = Form("")): | |
| actual_cld_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "") | |
| if not actual_cld_url: | |
| return {"categories": []} | |
| try: | |
| creds = get_cloudinary_creds(actual_cld_url) | |
| if not creds.get("cloud_name"): | |
| return {"categories": []} | |
| result = await asyncio.to_thread(_cld_root_folders, creds) | |
| return {"categories": [f["name"] for f in result.get("folders", [])]} | |
| except Exception as e: | |
| print(f"Category fetch error: {e}") | |
| return {"categories": []} | |
| async def health(): | |
| return {"status": "ok"} | |
| # ══════════════════════════════════════════════════════════════════ | |
| # 5. LIST FOLDER IMAGES | |
| # ══════════════════════════════════════════════════════════════════ | |
| def _cld_list_folder_images(folder: str, creds: dict, next_cursor: str = None): | |
| kwargs = dict( | |
| type="upload", prefix=f"{folder}/", | |
| max_results=500, | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"], | |
| ) | |
| if next_cursor: | |
| kwargs["next_cursor"] = next_cursor | |
| return cloudinary.api.resources(**kwargs) | |
| async def list_folder_images( | |
| user_cloudinary_url: str = Form(""), | |
| folder_name: str = Form(...), | |
| ): | |
| actual_cld_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "") | |
| creds = get_cloudinary_creds(actual_cld_url) | |
| if not creds.get("cloud_name"): | |
| raise HTTPException(400, "Invalid Cloudinary URL.") | |
| images = [] | |
| next_cursor = None | |
| while True: | |
| result = await asyncio.to_thread(_cld_list_folder_images, folder_name, creds, next_cursor) | |
| for r in result.get("resources", []): | |
| images.append({"url": r["secure_url"], "public_id": r["public_id"]}) | |
| next_cursor = result.get("next_cursor") | |
| if not next_cursor: | |
| break | |
| return {"images": images, "count": len(images)} | |
| # ══════════════════════════════════════════════════════════════════ | |
| # 6. DELETE SINGLE IMAGE | |
| # ══════════════════════════════════════════════════════════════════ | |
| def url_to_public_id(image_url: str, cloud_name: str) -> str: | |
| """Extract Cloudinary public_id from secure_url.""" | |
| try: | |
| path = urlparse(image_url).path | |
| parts = path.split("/") | |
| # Strip leading slash, cloud_name, delivery_type (image), access_mode (upload) | |
| # Format: /cloud_name/image/upload/[v12345/]folder/filename.ext | |
| upload_idx = parts.index("upload") | |
| after_upload = parts[upload_idx + 1:] | |
| # Strip version segment if present (starts with 'v' + digits) | |
| if after_upload and after_upload[0].startswith("v") and after_upload[0][1:].isdigit(): | |
| after_upload = after_upload[1:] | |
| public_id_with_ext = "/".join(after_upload) | |
| # Strip file extension | |
| public_id = public_id_with_ext.rsplit(".", 1)[0] | |
| return public_id | |
| except Exception: | |
| return "" | |
| def _cld_delete_resource(public_id: str, creds: dict): | |
| return cloudinary.uploader.destroy( | |
| public_id, | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"], | |
| ) | |
| async def delete_image( | |
| user_pinecone_key: str = Form(""), | |
| user_cloudinary_url: str = Form(""), | |
| image_url: str = Form(""), | |
| public_id: str = Form(""), | |
| ): | |
| actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "") | |
| actual_cld_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "") | |
| creds = get_cloudinary_creds(actual_cld_url) | |
| if not creds.get("cloud_name"): | |
| raise HTTPException(400, "Invalid Cloudinary URL.") | |
| pid = public_id or url_to_public_id(image_url, creds["cloud_name"]) | |
| if not pid: | |
| raise HTTPException(400, "Could not determine public_id.") | |
| # Delete from Cloudinary | |
| await asyncio.to_thread(_cld_delete_resource, pid, creds) | |
| # Delete from Pinecone by metadata filter | |
| if actual_pc_key and image_url: | |
| try: | |
| pc = _get_pinecone(actual_pc_key) | |
| for idx_name in [IDX_OBJECTS, IDX_FACES]: | |
| idx = pc.Index(idx_name) | |
| await asyncio.to_thread(idx.delete, filter={"url": {"$eq": image_url}}) | |
| except Exception as e: | |
| print(f"Pinecone delete warning: {e}") | |
| return {"message": "Image deleted successfully."} | |
| # ══════════════════════════════════════════════════════════════════ | |
| # 7. DELETE ENTIRE FOLDER | |
| # ══════════════════════════════════════════════════════════════════ | |
| def _cld_delete_folder(folder: str, creds: dict): | |
| return cloudinary.api.delete_resources_by_prefix( | |
| f"{folder}/", | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"], | |
| ) | |
| def _cld_remove_folder(folder: str, creds: dict): | |
| try: | |
| return cloudinary.api.delete_folder( | |
| folder, | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"], | |
| ) | |
| except Exception: | |
| pass # Folder may already be empty/gone | |
| async def delete_folder( | |
| user_pinecone_key: str = Form(""), | |
| user_cloudinary_url: str = Form(""), | |
| folder_name: str = Form(...), | |
| ): | |
| actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "") | |
| actual_cld_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "") | |
| creds = get_cloudinary_creds(actual_cld_url) | |
| if not creds.get("cloud_name"): | |
| raise HTTPException(400, "Invalid Cloudinary URL.") | |
| # 1. List all images in folder first (for Pinecone cleanup) | |
| all_images = [] | |
| next_cursor = None | |
| while True: | |
| result = await asyncio.to_thread(_cld_list_folder_images, folder_name, creds, next_cursor) | |
| all_images.extend(result.get("resources", [])) | |
| next_cursor = result.get("next_cursor") | |
| if not next_cursor: | |
| break | |
| # 2. Delete all images from Cloudinary | |
| await asyncio.to_thread(_cld_delete_folder, folder_name, creds) | |
| # 3. Remove the folder itself from Cloudinary | |
| await asyncio.to_thread(_cld_remove_folder, folder_name, creds) | |
| # 4. Delete Pinecone vectors for each image | |
| if actual_pc_key: | |
| try: | |
| pc = _get_pinecone(actual_pc_key) | |
| # Try bulk delete by folder metadata filter first | |
| for idx_name in [IDX_OBJECTS, IDX_FACES]: | |
| idx = pc.Index(idx_name) | |
| try: | |
| await asyncio.to_thread(idx.delete, filter={"folder": {"$eq": folder_name}}) | |
| except Exception: | |
| # Fallback: delete by individual image URLs | |
| for img in all_images: | |
| try: | |
| url = img.get("secure_url", "") | |
| if url: | |
| await asyncio.to_thread(idx.delete, filter={"url": {"$eq": url}}) | |
| except Exception: | |
| pass | |
| except Exception as e: | |
| print(f"Pinecone folder delete warning: {e}") | |
| return {"message": f"Folder '{folder_name}' and all its contents deleted.", "deleted_count": len(all_images)} | |
| # ══════════════════════════════════════════════════════════════════ | |
| # 8. RESET DATABASE | |
| # ══════════════════════════════════════════════════════════════════ | |
| DEFAULT_PC_KEY = os.getenv("DEFAULT_PINECONE_KEY", "") | |
| DEFAULT_CLD_URL = os.getenv("DEFAULT_CLOUDINARY_URL","") | |
| def _is_default_key(key: str, default: str) -> bool: | |
| return bool(default) and key.strip() == default.strip() | |
| async def reset_database( | |
| user_pinecone_key: str = Form(""), | |
| user_cloudinary_url: str = Form(""), | |
| ): | |
| if _is_default_key(user_pinecone_key, DEFAULT_PC_KEY) or _is_default_key(user_cloudinary_url, DEFAULT_CLD_URL): | |
| raise HTTPException(403, "Reset is not allowed on the shared demo database.") | |
| creds = get_cloudinary_creds(user_cloudinary_url) | |
| if not creds.get("cloud_name"): | |
| raise HTTPException(400, "Invalid Cloudinary URL.") | |
| # Wipe all Cloudinary resources | |
| try: | |
| await asyncio.to_thread( | |
| lambda: cloudinary.api.delete_all_resources( | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"], | |
| ) | |
| ) | |
| except Exception as e: | |
| print(f"Cloudinary wipe warning: {e}") | |
| # Delete and recreate Pinecone indexes | |
| try: | |
| pc = _get_pinecone(user_pinecone_key) | |
| existing = {idx.name for idx in await asyncio.to_thread(pc.list_indexes)} | |
| delete_tasks = [] | |
| if IDX_OBJECTS in existing: | |
| delete_tasks.append(asyncio.to_thread(pc.delete_index, IDX_OBJECTS)) | |
| if IDX_FACES in existing: | |
| delete_tasks.append(asyncio.to_thread(pc.delete_index, IDX_FACES)) | |
| if delete_tasks: | |
| await asyncio.gather(*delete_tasks) | |
| await asyncio.sleep(2) | |
| await asyncio.gather( | |
| asyncio.to_thread(pc.create_index, name=IDX_OBJECTS, dimension=1536, metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1")), | |
| asyncio.to_thread(pc.create_index, name=IDX_FACES, dimension=512, metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1")), | |
| ) | |
| except Exception as e: | |
| raise HTTPException(500, f"Pinecone reset error: {e}") | |
| return {"message": "Database reset complete. All data wiped and indexes recreated."} | |
| # ══════════════════════════════════════════════════════════════════ | |
| # 9. DELETE ACCOUNT | |
| # ══════════════════════════════════════════════════════════════════ | |
| async def delete_account( | |
| user_pinecone_key: str = Form(""), | |
| user_cloudinary_url: str = Form(""), | |
| user_id: str = Form(""), | |
| ): | |
| if _is_default_key(user_pinecone_key, DEFAULT_PC_KEY) or _is_default_key(user_cloudinary_url, DEFAULT_CLD_URL): | |
| raise HTTPException(403, "Account deletion is not allowed on the shared demo database.") | |
| # Full wipe (same as reset) | |
| creds = get_cloudinary_creds(user_cloudinary_url) | |
| try: | |
| await asyncio.to_thread( | |
| lambda: cloudinary.api.delete_all_resources( | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"], | |
| ) | |
| ) | |
| except Exception as e: | |
| print(f"Account delete Cloudinary warning: {e}") | |
| try: | |
| pc = _get_pinecone(user_pinecone_key) | |
| existing = {idx.name for idx in await asyncio.to_thread(pc.list_indexes)} | |
| delete_tasks = [] | |
| if IDX_OBJECTS in existing: | |
| delete_tasks.append(asyncio.to_thread(pc.delete_index, IDX_OBJECTS)) | |
| if IDX_FACES in existing: | |
| delete_tasks.append(asyncio.to_thread(pc.delete_index, IDX_FACES)) | |
| if delete_tasks: | |
| await asyncio.gather(*delete_tasks) | |
| except Exception as e: | |
| print(f"Account delete Pinecone warning: {e}") | |
| return {"message": "Account data deleted. Sign out initiated."} |