TEST-FRANKO / db /cijene_repository.py
Franko Fišter
Working script that fetches products by brand
028bcd8
from datetime import datetime
from typing import List, Dict, Optional
import re
import unidecode
import logging
from db.supabase_client import SupabaseClient
# Configure logger
logger = logging.getLogger(__name__)
class CijeneRepository:
def __init__(self):
self.supabase = SupabaseClient().get_client()
def get_all_brands(self) -> List[Dict]:
"""Get all brands from the database"""
try:
logger.info("Fetching all brands from database...")
response = self.supabase.table("brands").select("brand_id, brand_name").execute()
logger.info(f"Successfully fetched {len(response.data)} brands")
return response.data
except Exception as e:
logger.error(f"Error fetching brands: {str(e)}")
raise Exception(f"Error fetching brands: {str(e)}")
def get_all_store_chains(self) -> List[Dict]:
"""Get all store chains from the database"""
try:
logger.info("Fetching all store chains from database...")
response = self.supabase.table("store_chains").select("store_chain_id, store_chain_name").execute()
logger.info(f"Successfully fetched {len(response.data)} store chains")
return response.data
except Exception as e:
logger.error(f"Error fetching store chains: {str(e)}")
raise Exception(f"Error fetching store chains: {str(e)}")
def normalize_string(self, text: str) -> str:
"""Normalize string for comparison by removing special characters and converting to lowercase"""
if not text:
return ""
# Remove special characters and convert to lowercase
normalized = unidecode.unidecode(text)
normalized = re.sub(r'[^a-zA-Z0-9\s]', '', normalized)
normalized = re.sub(r'\s+', ' ', normalized).strip().lower()
return normalized
def safe_log_string(self, text: str) -> str:
"""Safely encode string for logging to avoid Unicode errors"""
if not text:
return ""
try:
# Try to encode/decode to handle special characters
return text.encode('utf-8', errors='replace').decode('utf-8')
except:
# Fallback to ASCII if UTF-8 fails
return unidecode.unidecode(text)
def find_matching_store_chain(self, chain_name: str, store_chains: List[Dict]) -> Optional[str]:
"""Find matching store chain ID by comparing normalized names"""
normalized_chain = self.normalize_string(chain_name)
safe_chain_name = self.safe_log_string(chain_name)
logger.debug(f"Looking for store chain match for: '{safe_chain_name}' (normalized: '{normalized_chain}')")
for store_chain in store_chains:
normalized_store_chain = self.normalize_string(store_chain['store_chain_name'])
if normalized_chain == normalized_store_chain:
safe_store_name = self.safe_log_string(store_chain['store_chain_name'])
logger.debug(f"Found match: '{safe_chain_name}' -> '{safe_store_name}' (ID: {store_chain['store_chain_id']})")
return store_chain['store_chain_id']
logger.warning(f"No matching store chain found for: '{safe_chain_name}'")
return None
def check_product_exists(self, ean: str, product_name: str, brand_id: str) -> Optional[str]:
"""Check if product already exists by EAN or (product_name, brand_id) and return product_id if found"""
try:
# First check by EAN
logger.debug(f"Checking if product exists with EAN: {ean}")
response = self.supabase.table("products").select("product_id").eq("product_ean", ean).execute()
if response.data:
logger.debug(f"Product exists with EAN {ean}, ID: {response.data[0]['product_id']}")
return response.data[0]['product_id']
# Then check by product_name and brand_id
safe_name = self.safe_log_string(product_name)
logger.debug(f"Checking if product exists with name '{safe_name}' and brand_id: {brand_id}")
response = self.supabase.table("products").select("product_id").eq("product_name", product_name).eq("brand_id", brand_id).execute()
if response.data:
logger.debug(f"Product exists with name and brand, ID: {response.data[0]['product_id']}")
return response.data[0]['product_id']
logger.debug(f"Product with EAN {ean} or name '{safe_name}' does not exist")
return None
except Exception as e:
logger.error(f"Error checking product existence for EAN {ean}: {str(e)}")
raise Exception(f"Error checking product existence: {str(e)}")
def insert_product(self, product_data: Dict) -> str:
"""Insert a new product and return the product_id"""
try:
safe_name = self.safe_log_string(product_data['product_name'])
logger.info(f"Inserting new product: {safe_name} (EAN: {product_data['product_ean']})")
response = self.supabase.table("products").insert(product_data).execute()
product_id = response.data[0]['product_id']
logger.info(f"Successfully inserted product with ID: {product_id}")
return product_id
except Exception as e:
safe_name = self.safe_log_string(product_data.get('product_name', 'unknown'))
logger.error(f"Error inserting product {safe_name}: {str(e)}")
raise Exception(f"Error inserting product: {str(e)}")
def get_or_create_store_product(self, product_id: str, store_chain_id: str) -> str:
"""Get existing store_product or create new one, return store_product_id"""
try:
# Check if store_product mapping already exists
logger.debug(f"Checking if store_product exists for product {product_id} and store_chain {store_chain_id}")
response = self.supabase.table("store_products").select("store_product_id").eq("product_id", product_id).eq("store_chain_id", store_chain_id).execute()
if response.data:
store_product_id = response.data[0]['store_product_id']
logger.debug(f"Store_product exists with ID: {store_product_id}")
return store_product_id
# Create new store_product mapping
logger.debug(f"Creating new store_product mapping for product {product_id} and store_chain {store_chain_id}")
store_product_data = {
"product_id": product_id,
"store_chain_id": store_chain_id
}
response = self.supabase.table("store_products").insert(store_product_data).execute()
store_product_id = response.data[0]['store_product_id']
logger.debug(f"Successfully created store_product with ID: {store_product_id}")
return store_product_id
except Exception as e:
logger.error(f"Error getting/creating store_product for product {product_id} and store_chain {store_chain_id}: {str(e)}")
raise Exception(f"Error getting/creating store_product: {str(e)}")
def insert_price_history(self, price_data: Dict) -> None:
"""Insert price history record"""
try:
logger.debug(f"Inserting price history for store_product {price_data['store_product_id']}")
self.supabase.table("product_price_history").insert(price_data).execute()
logger.debug("Price history inserted successfully")
except Exception as e:
logger.error(f"Error inserting price history for store_product {price_data.get('store_product_id', 'unknown')}: {str(e)}")
raise Exception(f"Error inserting price history: {str(e)}")
def combine_quantity_unit(self, quantity: str, unit: str) -> str:
"""Combine quantity and unit into a single weight string without spaces"""
if not quantity and not unit:
return ""
# Clean quantity (remove commas, extra spaces)
clean_quantity = quantity.replace(',', '.').strip() if quantity else ""
clean_unit = unit.lower().strip() if unit else ""
# Combine without space
if clean_quantity and clean_unit:
result = f"{clean_quantity}{clean_unit}"
elif clean_quantity:
result = clean_quantity
elif clean_unit:
result = clean_unit
else:
result = ""
logger.debug(f"Combined quantity '{quantity}' and unit '{unit}' -> '{result}'")
return result