import os import subprocess import numpy as np from pathlib import Path from dotenv import load_dotenv from supabase import create_client, Client from langchain_community.vectorstores import Chroma ## Get the base directory (src folder) BASE_DIR = Path(__file__).parent ## Loading Environment Variables # Load from config.env if it exists (local dev), otherwise rely on system env vars (HF Spaces) env_path = os.environ.get("ENV_FILE", BASE_DIR / "config.env") if os.path.exists(env_path): load_dotenv(env_path) SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL") SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY") SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") if not SUPABASE_URL or not SUPABASE_KEY: print("⚠️ Warning: Supabase env vars not set. Check HF Spaces secrets.") supabase = None else: supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) # Service-role client — bypasses RLS, used by recommenders to read user data if not SUPABASE_URL or not SUPABASE_SERVICE_KEY: supabase_service = None else: supabase_service: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) ## Loading the Vector Database (lazy — created on first use) CHROMA_DB_PATH = str(BASE_DIR / "chroma_db") _vector_db = None def get_vector_db(): """Single ChromaDB collection — title + description + tags.""" global _vector_db if _vector_db is None: from models import get_embedder _vector_db = Chroma( collection_name='products', embedding_function=get_embedder(), persist_directory=CHROMA_DB_PATH ) return _vector_db def update_vectordb(): if supabase is None: print("⚠️ Skipping vector DB update — Supabase not configured") return print("Fetching products from Supabase...") products = supabase.table("products").select("id, title, description, tags").execute().data existing_ids = {m["id"] for m in get_vector_db().get(include=["metadatas"])["metadatas"]} contents = [] metadatas = [] for product in products: pid = product['id'] if pid not in existing_ids: tags = product.get('tags') or [] tags_str = ' '.join(tags) title = product.get('title') or '' description = product.get('description') or '' contents.append(f"{title} {description} {tags_str}") metadatas.append({"id": pid, "title": title, "tags": tags_str}) if contents: get_vector_db().add_texts(texts=contents, metadatas=metadatas) get_vector_db().persist() print(f"✅ Added {len(contents)} new products to ChromaDB") else: print("✅ No new products to add, ChromaDB is up to date") def add_product_to_vectordb(product_id: str): """ Add a single product's embedding to ChromaDB. Called via API when a new product is created — no need to restart the server. """ if supabase is None: return {"error": "Supabase not configured"} # Check if already indexed existing_ids = {m["id"] for m in get_vector_db().get(include=["metadatas"])["metadatas"]} if product_id in existing_ids: return {"status": "already_indexed", "product_id": product_id} # Fetch product from Supabase response = supabase.table("products").select("id, title, description, tags").eq("id", product_id).execute() if not response.data: return {"error": f"Product {product_id} not found in Supabase"} product = response.data[0] tags = product.get('tags') or [] tags_str = ' '.join(tags) title = product.get('title') or '' description = product.get('description') or '' content = f"{title} {description} {tags_str}" meta = {"id": product_id, "title": title, "tags": tags_str} get_vector_db().add_texts(texts=[content], metadatas=[meta]) get_vector_db().persist() return {"status": "added", "product_id": product_id, "title": title} def get_product_images(product_ids: list) -> dict: """ Fetch product images from the product_images table in Supabase. Returns a dict mapping product_id -> image_url """ if not product_ids: return {} try: # Query product_images table for the given product IDs response = supabase.table("product_images").select("product_id, url").in_("product_id", list(product_ids)).execute() # Build a mapping of product_id -> url (use first image if multiple) images_map = {} for row in response.data: pid = row.get("product_id") url = row.get("url") if pid and url and pid not in images_map: images_map[pid] = url return images_map except Exception as e: print(f"Error fetching product images: {e}") return {} def get_product_prices(product_ids: list) -> dict: """ Fetch product prices from the products table in Supabase. Returns a dict mapping product_id -> price """ if not product_ids: return {} try: response = supabase.table("products").select("id, price").in_("id", list(product_ids)).execute() prices_map = {} for row in response.data: pid = row.get("id") price = row.get("price") if pid: prices_map[pid] = price return prices_map except Exception as e: print(f"Error fetching product prices: {e}") return {} def get_product_details(product_id: str) -> dict: """ Fetch complete product details from Supabase by product ID. Returns product info including title, description, price, old_price, sku, stock, store name, etc. """ try: response = supabase.table("products").select("*").eq("id", product_id).execute() if not response.data: return None product = response.data[0] # Get product images images_response = supabase.table("product_images").select("url").eq("product_id", product_id).execute() images = [img.get("url") for img in images_response.data if img.get("url")] # Get store name store_name = None store_id = product.get("store_id") if store_id: store_response = supabase.table("stores").select("name").eq("id", store_id).execute() if store_response.data: store_name = store_response.data[0].get("name") return { "id": product.get("id"), "title": product.get("title"), "description": product.get("description"), "price": product.get("price"), "old_price": product.get("old_price"), "sku": product.get("sku"), "stock": product.get("stock", 0), "sold_by": store_name, "images": images, } except Exception as e: print(f"Error fetching product details: {e}") return None def get_random_products(limit: int = 10) -> list: """ Fetch products from Supabase to display before searching. Returns a list of products with id, title, price, and image_url. """ try: response = supabase.table("products").select("id, title, price").limit(limit).execute() if not response.data: return [] products = response.data product_ids = [p.get("id") for p in products] images_map = get_product_images(product_ids) return [ { "id": p.get("id"), "title": p.get("title"), "price": p.get("price"), "image_url": images_map.get(p.get("id")) } for p in products ] except Exception as e: print(f"Error fetching random products: {e}") return [] def load_categories(file_name=None): categories_path = BASE_DIR / "smart_search" / "categories.txt" if file_name is None: file_name = str(categories_path) try: with open(file_name, 'r') as file: return [line.strip() for line in file.readlines() if line.strip()] except FileNotFoundError: print("Categories.txt file is not found") return ["Product", "Electronics", "Fashion", "Home"] def sync_categories_from_db(): """ Sync categories.txt with the database: fetches all unique product tags from Supabase and appends any tags not already present in categories.txt. Run at startup so new top-level categories from the DB are always available for image classification without manual edits. """ if supabase is None: print("⚠️ Skipping category sync — Supabase not configured") return categories_path = BASE_DIR / "smart_search" / "categories.txt" # Load existing categories (case-insensitive set for comparison) existing = load_categories(str(categories_path)) existing_lower = {c.lower() for c in existing} print("Syncing categories from DB...") response = supabase.table("products").select("tags").execute() all_tags: set = set() for row in response.data: for tag in (row.get("tags") or []): if tag and tag.strip(): all_tags.add(tag.strip()) new_categories = sorted(t for t in all_tags if t.lower() not in existing_lower) if new_categories: with open(categories_path, "a") as f: f.write("\n") for cat in new_categories: f.write(f"{cat}\n") print(f"✅ Added {len(new_categories)} new categories to categories.txt") else: print("✅ categories.txt is already up to date") def load_audio_bytes_ffmpeg(audio_bytes): process = subprocess.Popen( [ "ffmpeg", "-i", "pipe:0", "-f", "f32le", "-ac", "1", "-ar", "16000", "pipe:1" ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) out, _ = process.communicate(input=audio_bytes) return np.frombuffer(out, dtype=np.float32) def save_audio_bytes_as_wav(audio_bytes: bytes) -> str: """Convert audio bytes to a 16 kHz mono WAV temp file via ffmpeg. Returns the file path.""" import tempfile with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f: tmp_path = f.name process = subprocess.Popen( [ "ffmpeg", "-y", "-i", "pipe:0", "-f", "wav", "-ac", "1", "-ar", "16000", tmp_path ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) process.communicate(input=audio_bytes) return tmp_path