Spaces:
Runtime error
Runtime error
| from datetime import datetime | |
| from typing import List, Dict, Optional | |
| import re | |
| import unidecode | |
| import logging | |
| from db.supabase_client import SupabaseClient | |
| # Configure logger | |
| logger = logging.getLogger(__name__) | |
| class CijeneRepository: | |
| def __init__(self): | |
| self.supabase = SupabaseClient().get_client() | |
| def get_all_brands(self) -> List[Dict]: | |
| """Get all brands from the database""" | |
| try: | |
| logger.info("Fetching all brands from database...") | |
| response = self.supabase.table("brands").select("brand_id, brand_name").execute() | |
| logger.info(f"Successfully fetched {len(response.data)} brands") | |
| return response.data | |
| except Exception as e: | |
| logger.error(f"Error fetching brands: {str(e)}") | |
| raise Exception(f"Error fetching brands: {str(e)}") | |
| def get_all_store_chains(self) -> List[Dict]: | |
| """Get all store chains from the database""" | |
| try: | |
| logger.info("Fetching all store chains from database...") | |
| response = self.supabase.table("store_chains").select("store_chain_id, store_chain_name").execute() | |
| logger.info(f"Successfully fetched {len(response.data)} store chains") | |
| return response.data | |
| except Exception as e: | |
| logger.error(f"Error fetching store chains: {str(e)}") | |
| raise Exception(f"Error fetching store chains: {str(e)}") | |
| def normalize_string(self, text: str) -> str: | |
| """Normalize string for comparison by removing special characters and converting to lowercase""" | |
| if not text: | |
| return "" | |
| # Remove special characters and convert to lowercase | |
| normalized = unidecode.unidecode(text) | |
| normalized = re.sub(r'[^a-zA-Z0-9\s]', '', normalized) | |
| normalized = re.sub(r'\s+', ' ', normalized).strip().lower() | |
| return normalized | |
| def safe_log_string(self, text: str) -> str: | |
| """Safely encode string for logging to avoid Unicode errors""" | |
| if not text: | |
| return "" | |
| try: | |
| # Try to encode/decode to handle special characters | |
| return text.encode('utf-8', errors='replace').decode('utf-8') | |
| except: | |
| # Fallback to ASCII if UTF-8 fails | |
| return unidecode.unidecode(text) | |
| def find_matching_store_chain(self, chain_name: str, store_chains: List[Dict]) -> Optional[str]: | |
| """Find matching store chain ID by comparing normalized names""" | |
| normalized_chain = self.normalize_string(chain_name) | |
| safe_chain_name = self.safe_log_string(chain_name) | |
| logger.debug(f"Looking for store chain match for: '{safe_chain_name}' (normalized: '{normalized_chain}')") | |
| for store_chain in store_chains: | |
| normalized_store_chain = self.normalize_string(store_chain['store_chain_name']) | |
| if normalized_chain == normalized_store_chain: | |
| safe_store_name = self.safe_log_string(store_chain['store_chain_name']) | |
| logger.debug(f"Found match: '{safe_chain_name}' -> '{safe_store_name}' (ID: {store_chain['store_chain_id']})") | |
| return store_chain['store_chain_id'] | |
| logger.warning(f"No matching store chain found for: '{safe_chain_name}'") | |
| return None | |
| def check_product_exists(self, ean: str, product_name: str, brand_id: str) -> Optional[str]: | |
| """Check if product already exists by EAN or (product_name, brand_id) and return product_id if found""" | |
| try: | |
| # First check by EAN | |
| logger.debug(f"Checking if product exists with EAN: {ean}") | |
| response = self.supabase.table("products").select("product_id").eq("product_ean", ean).execute() | |
| if response.data: | |
| logger.debug(f"Product exists with EAN {ean}, ID: {response.data[0]['product_id']}") | |
| return response.data[0]['product_id'] | |
| # Then check by product_name and brand_id | |
| safe_name = self.safe_log_string(product_name) | |
| logger.debug(f"Checking if product exists with name '{safe_name}' and brand_id: {brand_id}") | |
| response = self.supabase.table("products").select("product_id").eq("product_name", product_name).eq("brand_id", brand_id).execute() | |
| if response.data: | |
| logger.debug(f"Product exists with name and brand, ID: {response.data[0]['product_id']}") | |
| return response.data[0]['product_id'] | |
| logger.debug(f"Product with EAN {ean} or name '{safe_name}' does not exist") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error checking product existence for EAN {ean}: {str(e)}") | |
| raise Exception(f"Error checking product existence: {str(e)}") | |
| def insert_product(self, product_data: Dict) -> str: | |
| """Insert a new product and return the product_id""" | |
| try: | |
| safe_name = self.safe_log_string(product_data['product_name']) | |
| logger.info(f"Inserting new product: {safe_name} (EAN: {product_data['product_ean']})") | |
| response = self.supabase.table("products").insert(product_data).execute() | |
| product_id = response.data[0]['product_id'] | |
| logger.info(f"Successfully inserted product with ID: {product_id}") | |
| return product_id | |
| except Exception as e: | |
| safe_name = self.safe_log_string(product_data.get('product_name', 'unknown')) | |
| logger.error(f"Error inserting product {safe_name}: {str(e)}") | |
| raise Exception(f"Error inserting product: {str(e)}") | |
| def get_or_create_store_product(self, product_id: str, store_chain_id: str) -> str: | |
| """Get existing store_product or create new one, return store_product_id""" | |
| try: | |
| # Check if store_product mapping already exists | |
| logger.debug(f"Checking if store_product exists for product {product_id} and store_chain {store_chain_id}") | |
| response = self.supabase.table("store_products").select("store_product_id").eq("product_id", product_id).eq("store_chain_id", store_chain_id).execute() | |
| if response.data: | |
| store_product_id = response.data[0]['store_product_id'] | |
| logger.debug(f"Store_product exists with ID: {store_product_id}") | |
| return store_product_id | |
| # Create new store_product mapping | |
| logger.debug(f"Creating new store_product mapping for product {product_id} and store_chain {store_chain_id}") | |
| store_product_data = { | |
| "product_id": product_id, | |
| "store_chain_id": store_chain_id | |
| } | |
| response = self.supabase.table("store_products").insert(store_product_data).execute() | |
| store_product_id = response.data[0]['store_product_id'] | |
| logger.debug(f"Successfully created store_product with ID: {store_product_id}") | |
| return store_product_id | |
| except Exception as e: | |
| logger.error(f"Error getting/creating store_product for product {product_id} and store_chain {store_chain_id}: {str(e)}") | |
| raise Exception(f"Error getting/creating store_product: {str(e)}") | |
| def insert_price_history(self, price_data: Dict) -> None: | |
| """Insert price history record""" | |
| try: | |
| logger.debug(f"Inserting price history for store_product {price_data['store_product_id']}") | |
| self.supabase.table("product_price_history").insert(price_data).execute() | |
| logger.debug("Price history inserted successfully") | |
| except Exception as e: | |
| logger.error(f"Error inserting price history for store_product {price_data.get('store_product_id', 'unknown')}: {str(e)}") | |
| raise Exception(f"Error inserting price history: {str(e)}") | |
| def combine_quantity_unit(self, quantity: str, unit: str) -> str: | |
| """Combine quantity and unit into a single weight string without spaces""" | |
| if not quantity and not unit: | |
| return "" | |
| # Clean quantity (remove commas, extra spaces) | |
| clean_quantity = quantity.replace(',', '.').strip() if quantity else "" | |
| clean_unit = unit.lower().strip() if unit else "" | |
| # Combine without space | |
| if clean_quantity and clean_unit: | |
| result = f"{clean_quantity}{clean_unit}" | |
| elif clean_quantity: | |
| result = clean_quantity | |
| elif clean_unit: | |
| result = clean_unit | |
| else: | |
| result = "" | |
| logger.debug(f"Combined quantity '{quantity}' and unit '{unit}' -> '{result}'") | |
| return result | |