visual-search-api2 / src /services /db_client.py
AdarshDRC's picture
Update src/services/db_client.py
c56ede7 verified
Raw
History Blame Contribute Delete
6.11 kB
import time
from typing import Any, Dict, List
import cloudinary
import cloudinary.uploader
import cloudinary.api
from pinecone import Pinecone, ServerlessSpec
from src.core.config import IDX_FACES, IDX_OBJECTS
class PineconePool:
def __init__(self):
self._clients = {}
def get(self, api_key: str) -> Pinecone:
if api_key not in self._clients:
self._clients[api_key] = Pinecone(api_key=api_key)
return self._clients[api_key]
pinecone_pool = PineconePool()
def _set_cld_config(creds: dict):
cloudinary.config(
cloud_name=creds.get("cloud_name"),
api_key=creds.get("api_key"),
api_secret=creds.get("api_secret"),
secure=True
)
def cld_ping(creds: dict):
_set_cld_config(creds)
cloudinary.api.ping()
def cld_upload(file_obj, folder: str, creds: dict) -> dict:
_set_cld_config(creds)
return cloudinary.uploader.upload(file_obj, folder=folder)
def cld_root_folders(creds: dict) -> dict:
_set_cld_config(creds)
return cloudinary.api.root_folders()
def cld_list_folder_images(folder: str, creds: dict, cursor: str = None, page_size: int = 100) -> dict:
_set_cld_config(creds)
kwargs = {"type": "upload", "prefix": f"{folder}/", "max_results": page_size}
if cursor:
kwargs["next_cursor"] = cursor
return cloudinary.api.resources(**kwargs)
def cld_delete_resource(public_id: str, creds: dict):
_set_cld_config(creds)
cloudinary.uploader.destroy(public_id)
def cld_delete_folder_resources(folder: str, creds: dict):
_set_cld_config(creds)
cloudinary.api.delete_resources_by_prefix(f"{folder}/")
def cld_remove_folder(folder: str, creds: dict):
_set_cld_config(creds)
try:
cloudinary.api.delete_folder(folder)
except Exception:
pass
def cld_delete_all_paginated(creds: dict) -> int:
_set_cld_config(creds)
deleted = 0
cursor = None
while True:
kwargs = {"type": "upload", "max_results": 500}
if cursor:
kwargs["next_cursor"] = cursor
res = cloudinary.api.resources(**kwargs)
resources = res.get("resources", [])
if not resources:
break
pids = [r["public_id"] for r in resources]
cloudinary.api.delete_resources(pids)
deleted += len(pids)
cursor = res.get("next_cursor")
if not cursor:
break
return deleted
def ensure_indexes(pc: Pinecone) -> List[str]:
created = []
existing = [idx.name for idx in pc.list_indexes()]
for name in [IDX_FACES, IDX_OBJECTS]:
if name not in existing:
pc.create_index(
name=name,
dimension=1024 if name == IDX_FACES else 1536,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
created.append(name)
return created
def delete_and_recreate_indexes(pc: Pinecone):
existing = [idx.name for idx in pc.list_indexes()]
for name in [IDX_FACES, IDX_OBJECTS]:
if name in existing:
pc.delete_index(name)
time.sleep(5)
ensure_indexes(pc)
def search_faces(idx, vec: List[float], det_score: float) -> Dict[str, Any]:
res = idx.query(vector=vec, top_k=50, include_metadata=True)
image_map = {}
for match in res.get("matches", []):
meta = match.get("metadata", {})
url = meta.get("url")
if not url:
continue
score = match.get("score", 0)
if url not in image_map or image_map[url]["raw_score"] < score:
image_map[url] = {
"raw_score": score,
"face_crop": meta.get("face_crop", ""),
"folder": meta.get("folder", "uncategorized")
}
return image_map
import numpy as np
def search_objects(idx, vec: List[float], filter_dict: dict = None) -> List[Dict[str, Any]]:
query_kwargs = {"vector": vec, "top_k": 50, "include_metadata": True}
if filter_dict:
query_kwargs["filter"] = filter_dict
res = idx.query(**query_kwargs)
matches = res.get("matches", [])
if not matches:
return []
# ── ENTERPRISE FIX: Dynamic Gradient Analysis ──
# Extract the raw scores
scores = [m.get("score", 0) for m in matches]
# Calculate the drop-off from the absolute best match to the 5th match
if len(scores) >= 5:
top_score = scores[0]
fifth_score = scores[4]
gradient = top_score - fifth_score
# If the highest score is mediocre AND there is no statistical "cliff",
# it means the AI just grabbed a random cluster of distant neighbors.
# This dynamically catches out-of-distribution items without hardcoding
# strict global cutoffs.
if top_score < 0.65 and gradient < 0.05:
return [] # System realizes it's hallucinating and returns nothing
# Proceed to map results normally...
results = []
for match in matches:
meta = match.get("metadata", {})
results.append({
"url": meta.get("url", ""),
"score": round(match.get("score", 0), 4),
"raw_score": match.get("score", 0),
"folder": meta.get("folder", "uncategorized")
})
return results
def merge_face_results(groups: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
merged = {}
for group in groups:
for match in group.get("matches", []):
url = match["url"]
if url not in merged or merged[url]["score"] < match["score"]:
merged[url] = match
return sorted(merged.values(), key=lambda x: x["score"], reverse=True)
def merge_object_results(nested_results: List[List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
merged = {}
for res_list in nested_results:
for match in res_list:
url = match["url"]
if url not in merged or merged[url]["score"] < match["score"]:
merged[url] = match
return sorted(merged.values(), key=lambda x: x["score"], reverse=True)