| | import os |
| | import subprocess |
| | import numpy as np |
| | from pathlib import Path |
| | from dotenv import load_dotenv |
| | from supabase import create_client, Client |
| | from langchain_community.vectorstores import Chroma |
| |
|
| | |
| | BASE_DIR = Path(__file__).parent |
| |
|
| | |
| | |
| | env_path = os.environ.get("ENV_FILE", BASE_DIR / "config.env") |
| | if os.path.exists(env_path): |
| | load_dotenv(env_path) |
| |
|
| | SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL") |
| | SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY") |
| | SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") |
| |
|
| | if not SUPABASE_URL or not SUPABASE_KEY: |
| | print("β οΈ Warning: Supabase env vars not set. Check HF Spaces secrets.") |
| | supabase = None |
| | else: |
| | supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) |
| |
|
| | |
| | if not SUPABASE_URL or not SUPABASE_SERVICE_KEY: |
| | supabase_service = None |
| | else: |
| | supabase_service: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) |
| |
|
| | |
| | CHROMA_DB_PATH = str(BASE_DIR / "chroma_db") |
| | _vector_db = None |
| |
|
| | def get_vector_db(): |
| | """Single ChromaDB collection β title + description + tags.""" |
| | global _vector_db |
| | if _vector_db is None: |
| | from models import get_embedder |
| | _vector_db = Chroma( |
| | collection_name='products', |
| | embedding_function=get_embedder(), |
| | persist_directory=CHROMA_DB_PATH |
| | ) |
| | return _vector_db |
| |
|
| |
|
| | def update_vectordb(): |
| |
|
| | if supabase is None: |
| | print("β οΈ Skipping vector DB update β Supabase not configured") |
| | return |
| |
|
| | print("Fetching products from Supabase...") |
| | products = supabase.table("products").select("id, title, description, tags").execute().data |
| |
|
| | existing_ids = {m["id"] for m in get_vector_db().get(include=["metadatas"])["metadatas"]} |
| |
|
| | contents = [] |
| | metadatas = [] |
| |
|
| | for product in products: |
| | pid = product['id'] |
| | if pid not in existing_ids: |
| | tags = product.get('tags') or [] |
| | tags_str = ' '.join(tags) |
| | title = product.get('title') or '' |
| | description = product.get('description') or '' |
| |
|
| | contents.append(f"{title} {description} {tags_str}") |
| | metadatas.append({"id": pid, "title": title, "tags": tags_str}) |
| |
|
| | if contents: |
| | get_vector_db().add_texts(texts=contents, metadatas=metadatas) |
| | get_vector_db().persist() |
| | print(f"β
Added {len(contents)} new products to ChromaDB") |
| | else: |
| | print("β
No new products to add, ChromaDB is up to date") |
| |
|
| |
|
| | def add_product_to_vectordb(product_id: str): |
| | """ |
| | Add a single product's embedding to ChromaDB. |
| | Called via API when a new product is created β no need to restart the server. |
| | """ |
| | if supabase is None: |
| | return {"error": "Supabase not configured"} |
| |
|
| | |
| | existing_ids = {m["id"] for m in get_vector_db().get(include=["metadatas"])["metadatas"]} |
| | if product_id in existing_ids: |
| | return {"status": "already_indexed", "product_id": product_id} |
| |
|
| | |
| | response = supabase.table("products").select("id, title, description, tags").eq("id", product_id).execute() |
| | if not response.data: |
| | return {"error": f"Product {product_id} not found in Supabase"} |
| |
|
| | product = response.data[0] |
| | tags = product.get('tags') or [] |
| | tags_str = ' '.join(tags) |
| | title = product.get('title') or '' |
| | description = product.get('description') or '' |
| |
|
| | content = f"{title} {description} {tags_str}" |
| | meta = {"id": product_id, "title": title, "tags": tags_str} |
| |
|
| | get_vector_db().add_texts(texts=[content], metadatas=[meta]) |
| | get_vector_db().persist() |
| |
|
| | return {"status": "added", "product_id": product_id, "title": title} |
| |
|
| |
|
| | def get_product_images(product_ids: list) -> dict: |
| | """ |
| | Fetch product images from the product_images table in Supabase. |
| | Returns a dict mapping product_id -> image_url |
| | """ |
| | if not product_ids: |
| | return {} |
| | |
| | try: |
| | |
| | response = supabase.table("product_images").select("product_id, url").in_("product_id", list(product_ids)).execute() |
| | |
| | |
| | images_map = {} |
| | for row in response.data: |
| | pid = row.get("product_id") |
| | url = row.get("url") |
| | if pid and url and pid not in images_map: |
| | images_map[pid] = url |
| | |
| | return images_map |
| | |
| | except Exception as e: |
| | print(f"Error fetching product images: {e}") |
| | return {} |
| |
|
| |
|
| | def get_product_prices(product_ids: list) -> dict: |
| | """ |
| | Fetch product prices from the products table in Supabase. |
| | Returns a dict mapping product_id -> price |
| | """ |
| | if not product_ids: |
| | return {} |
| |
|
| | try: |
| | response = supabase.table("products").select("id, price").in_("id", list(product_ids)).execute() |
| |
|
| | prices_map = {} |
| | for row in response.data: |
| | pid = row.get("id") |
| | price = row.get("price") |
| | if pid: |
| | prices_map[pid] = price |
| |
|
| | return prices_map |
| |
|
| | except Exception as e: |
| | print(f"Error fetching product prices: {e}") |
| | return {} |
| |
|
| |
|
| | def get_product_details(product_id: str) -> dict: |
| | """ |
| | Fetch complete product details from Supabase by product ID. |
| | Returns product info including title, description, price, old_price, sku, stock, store name, etc. |
| | """ |
| | try: |
| | response = supabase.table("products").select("*").eq("id", product_id).execute() |
| |
|
| | if not response.data: |
| | return None |
| |
|
| | product = response.data[0] |
| |
|
| | |
| | images_response = supabase.table("product_images").select("url").eq("product_id", product_id).execute() |
| | images = [img.get("url") for img in images_response.data if img.get("url")] |
| |
|
| | |
| | store_name = None |
| | store_id = product.get("store_id") |
| | if store_id: |
| | store_response = supabase.table("stores").select("name").eq("id", store_id).execute() |
| | if store_response.data: |
| | store_name = store_response.data[0].get("name") |
| |
|
| | return { |
| | "id": product.get("id"), |
| | "title": product.get("title"), |
| | "description": product.get("description"), |
| | "price": product.get("price"), |
| | "old_price": product.get("old_price"), |
| | "sku": product.get("sku"), |
| | "stock": product.get("stock", 0), |
| | "sold_by": store_name, |
| | "images": images, |
| | } |
| |
|
| | except Exception as e: |
| | print(f"Error fetching product details: {e}") |
| | return None |
| |
|
| |
|
| | def get_random_products(limit: int = 10) -> list: |
| | """ |
| | Fetch products from Supabase to display before searching. |
| | Returns a list of products with id, title, price, and image_url. |
| | """ |
| | try: |
| | response = supabase.table("products").select("id, title, price").limit(limit).execute() |
| |
|
| | if not response.data: |
| | return [] |
| |
|
| | products = response.data |
| | product_ids = [p.get("id") for p in products] |
| | images_map = get_product_images(product_ids) |
| |
|
| | return [ |
| | { |
| | "id": p.get("id"), |
| | "title": p.get("title"), |
| | "price": p.get("price"), |
| | "image_url": images_map.get(p.get("id")) |
| | } |
| | for p in products |
| | ] |
| |
|
| | except Exception as e: |
| | print(f"Error fetching random products: {e}") |
| | return [] |
| |
|
| |
|
| | def load_categories(file_name=None): |
| | categories_path = BASE_DIR / "smart_search" / "categories.txt" |
| | if file_name is None: |
| | file_name = str(categories_path) |
| | try: |
| | with open(file_name, 'r') as file: |
| | return [line.strip() for line in file.readlines() if line.strip()] |
| | except FileNotFoundError: |
| | print("Categories.txt file is not found") |
| | return ["Product", "Electronics", "Fashion", "Home"] |
| |
|
| |
|
| | def sync_categories_from_db(): |
| | """ |
| | Sync categories.txt with the database: fetches all unique product tags |
| | from Supabase and appends any tags not already present in categories.txt. |
| | Run at startup so new top-level categories from the DB are always available |
| | for image classification without manual edits. |
| | """ |
| | if supabase is None: |
| | print("β οΈ Skipping category sync β Supabase not configured") |
| | return |
| |
|
| | categories_path = BASE_DIR / "smart_search" / "categories.txt" |
| |
|
| | |
| | existing = load_categories(str(categories_path)) |
| | existing_lower = {c.lower() for c in existing} |
| |
|
| | print("Syncing categories from DB...") |
| | response = supabase.table("products").select("tags").execute() |
| |
|
| | all_tags: set = set() |
| | for row in response.data: |
| | for tag in (row.get("tags") or []): |
| | if tag and tag.strip(): |
| | all_tags.add(tag.strip()) |
| |
|
| | new_categories = sorted(t for t in all_tags if t.lower() not in existing_lower) |
| |
|
| | if new_categories: |
| | with open(categories_path, "a") as f: |
| | f.write("\n") |
| | for cat in new_categories: |
| | f.write(f"{cat}\n") |
| | print(f"β
Added {len(new_categories)} new categories to categories.txt") |
| | else: |
| | print("β
categories.txt is already up to date") |
| |
|
| |
|
| | def load_audio_bytes_ffmpeg(audio_bytes): |
| | process = subprocess.Popen( |
| | [ |
| | "ffmpeg", "-i", "pipe:0", |
| | "-f", "f32le", |
| | "-ac", "1", |
| | "-ar", "16000", |
| | "pipe:1" |
| | ], |
| | stdin=subprocess.PIPE, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE |
| | ) |
| | out, _ = process.communicate(input=audio_bytes) |
| | return np.frombuffer(out, dtype=np.float32) |
| |
|
| |
|
| | def save_audio_bytes_as_wav(audio_bytes: bytes) -> str: |
| | """Convert audio bytes to a 16 kHz mono WAV temp file via ffmpeg. Returns the file path.""" |
| | import tempfile |
| | with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f: |
| | tmp_path = f.name |
| | process = subprocess.Popen( |
| | [ |
| | "ffmpeg", "-y", "-i", "pipe:0", |
| | "-f", "wav", |
| | "-ac", "1", |
| | "-ar", "16000", |
| | tmp_path |
| | ], |
| | stdin=subprocess.PIPE, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE |
| | ) |
| | process.communicate(input=audio_bytes) |
| | return tmp_path |
| |
|