raij-ai / utils.py
github-actions[bot]
chore: sync from GitHub 2026-03-11 15:51:10 UTC
0e027ea
import os
import subprocess
import numpy as np
from pathlib import Path
from dotenv import load_dotenv
from supabase import create_client, Client
from langchain_community.vectorstores import Chroma
## Get the base directory (src folder)
BASE_DIR = Path(__file__).parent
## Loading Environment Variables
# Load from config.env if it exists (local dev), otherwise rely on system env vars (HF Spaces)
env_path = os.environ.get("ENV_FILE", BASE_DIR / "config.env")
if os.path.exists(env_path):
load_dotenv(env_path)
SUPABASE_URL = os.getenv("NEXT_PUBLIC_SUPABASE_URL")
SUPABASE_KEY = os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY")
SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
if not SUPABASE_URL or not SUPABASE_KEY:
print("⚠️ Warning: Supabase env vars not set. Check HF Spaces secrets.")
supabase = None
else:
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
# Service-role client β€” bypasses RLS, used by recommenders to read user data
if not SUPABASE_URL or not SUPABASE_SERVICE_KEY:
supabase_service = None
else:
supabase_service: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY)
## Loading the Vector Database (lazy β€” created on first use)
CHROMA_DB_PATH = str(BASE_DIR / "chroma_db")
_vector_db = None
def get_vector_db():
"""Single ChromaDB collection β€” title + description + tags."""
global _vector_db
if _vector_db is None:
from models import get_embedder
_vector_db = Chroma(
collection_name='products',
embedding_function=get_embedder(),
persist_directory=CHROMA_DB_PATH
)
return _vector_db
def update_vectordb():
if supabase is None:
print("⚠️ Skipping vector DB update β€” Supabase not configured")
return
print("Fetching products from Supabase...")
products = supabase.table("products").select("id, title, description, tags").execute().data
existing_ids = {m["id"] for m in get_vector_db().get(include=["metadatas"])["metadatas"]}
contents = []
metadatas = []
for product in products:
pid = product['id']
if pid not in existing_ids:
tags = product.get('tags') or []
tags_str = ' '.join(tags)
title = product.get('title') or ''
description = product.get('description') or ''
contents.append(f"{title} {description} {tags_str}")
metadatas.append({"id": pid, "title": title, "tags": tags_str})
if contents:
get_vector_db().add_texts(texts=contents, metadatas=metadatas)
get_vector_db().persist()
print(f"βœ… Added {len(contents)} new products to ChromaDB")
else:
print("βœ… No new products to add, ChromaDB is up to date")
def add_product_to_vectordb(product_id: str):
"""
Add a single product's embedding to ChromaDB.
Called via API when a new product is created β€” no need to restart the server.
"""
if supabase is None:
return {"error": "Supabase not configured"}
# Check if already indexed
existing_ids = {m["id"] for m in get_vector_db().get(include=["metadatas"])["metadatas"]}
if product_id in existing_ids:
return {"status": "already_indexed", "product_id": product_id}
# Fetch product from Supabase
response = supabase.table("products").select("id, title, description, tags").eq("id", product_id).execute()
if not response.data:
return {"error": f"Product {product_id} not found in Supabase"}
product = response.data[0]
tags = product.get('tags') or []
tags_str = ' '.join(tags)
title = product.get('title') or ''
description = product.get('description') or ''
content = f"{title} {description} {tags_str}"
meta = {"id": product_id, "title": title, "tags": tags_str}
get_vector_db().add_texts(texts=[content], metadatas=[meta])
get_vector_db().persist()
return {"status": "added", "product_id": product_id, "title": title}
def get_product_images(product_ids: list) -> dict:
"""
Fetch product images from the product_images table in Supabase.
Returns a dict mapping product_id -> image_url
"""
if not product_ids:
return {}
try:
# Query product_images table for the given product IDs
response = supabase.table("product_images").select("product_id, url").in_("product_id", list(product_ids)).execute()
# Build a mapping of product_id -> url (use first image if multiple)
images_map = {}
for row in response.data:
pid = row.get("product_id")
url = row.get("url")
if pid and url and pid not in images_map:
images_map[pid] = url
return images_map
except Exception as e:
print(f"Error fetching product images: {e}")
return {}
def get_product_prices(product_ids: list) -> dict:
"""
Fetch product prices from the products table in Supabase.
Returns a dict mapping product_id -> price
"""
if not product_ids:
return {}
try:
response = supabase.table("products").select("id, price").in_("id", list(product_ids)).execute()
prices_map = {}
for row in response.data:
pid = row.get("id")
price = row.get("price")
if pid:
prices_map[pid] = price
return prices_map
except Exception as e:
print(f"Error fetching product prices: {e}")
return {}
def get_product_details(product_id: str) -> dict:
"""
Fetch complete product details from Supabase by product ID.
Returns product info including title, description, price, old_price, sku, stock, store name, etc.
"""
try:
response = supabase.table("products").select("*").eq("id", product_id).execute()
if not response.data:
return None
product = response.data[0]
# Get product images
images_response = supabase.table("product_images").select("url").eq("product_id", product_id).execute()
images = [img.get("url") for img in images_response.data if img.get("url")]
# Get store name
store_name = None
store_id = product.get("store_id")
if store_id:
store_response = supabase.table("stores").select("name").eq("id", store_id).execute()
if store_response.data:
store_name = store_response.data[0].get("name")
return {
"id": product.get("id"),
"title": product.get("title"),
"description": product.get("description"),
"price": product.get("price"),
"old_price": product.get("old_price"),
"sku": product.get("sku"),
"stock": product.get("stock", 0),
"sold_by": store_name,
"images": images,
}
except Exception as e:
print(f"Error fetching product details: {e}")
return None
def get_random_products(limit: int = 10) -> list:
"""
Fetch products from Supabase to display before searching.
Returns a list of products with id, title, price, and image_url.
"""
try:
response = supabase.table("products").select("id, title, price").limit(limit).execute()
if not response.data:
return []
products = response.data
product_ids = [p.get("id") for p in products]
images_map = get_product_images(product_ids)
return [
{
"id": p.get("id"),
"title": p.get("title"),
"price": p.get("price"),
"image_url": images_map.get(p.get("id"))
}
for p in products
]
except Exception as e:
print(f"Error fetching random products: {e}")
return []
def load_categories(file_name=None):
categories_path = BASE_DIR / "smart_search" / "categories.txt"
if file_name is None:
file_name = str(categories_path)
try:
with open(file_name, 'r') as file:
return [line.strip() for line in file.readlines() if line.strip()]
except FileNotFoundError:
print("Categories.txt file is not found")
return ["Product", "Electronics", "Fashion", "Home"]
def sync_categories_from_db():
"""
Sync categories.txt with the database: fetches all unique product tags
from Supabase and appends any tags not already present in categories.txt.
Run at startup so new top-level categories from the DB are always available
for image classification without manual edits.
"""
if supabase is None:
print("⚠️ Skipping category sync β€” Supabase not configured")
return
categories_path = BASE_DIR / "smart_search" / "categories.txt"
# Load existing categories (case-insensitive set for comparison)
existing = load_categories(str(categories_path))
existing_lower = {c.lower() for c in existing}
print("Syncing categories from DB...")
response = supabase.table("products").select("tags").execute()
all_tags: set = set()
for row in response.data:
for tag in (row.get("tags") or []):
if tag and tag.strip():
all_tags.add(tag.strip())
new_categories = sorted(t for t in all_tags if t.lower() not in existing_lower)
if new_categories:
with open(categories_path, "a") as f:
f.write("\n")
for cat in new_categories:
f.write(f"{cat}\n")
print(f"βœ… Added {len(new_categories)} new categories to categories.txt")
else:
print("βœ… categories.txt is already up to date")
def load_audio_bytes_ffmpeg(audio_bytes):
process = subprocess.Popen(
[
"ffmpeg", "-i", "pipe:0",
"-f", "f32le",
"-ac", "1",
"-ar", "16000",
"pipe:1"
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
out, _ = process.communicate(input=audio_bytes)
return np.frombuffer(out, dtype=np.float32)
def save_audio_bytes_as_wav(audio_bytes: bytes) -> str:
"""Convert audio bytes to a 16 kHz mono WAV temp file via ffmpeg. Returns the file path."""
import tempfile
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f:
tmp_path = f.name
process = subprocess.Popen(
[
"ffmpeg", "-y", "-i", "pipe:0",
"-f", "wav",
"-ac", "1",
"-ar", "16000",
tmp_path
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
process.communicate(input=audio_bytes)
return tmp_path