Spaces:
Runtime error
Runtime error
| from typing import Dict, Any, List | |
| from datetime import datetime, timedelta | |
| import time | |
| import requests | |
| from io import BytesIO | |
| import asyncio | |
| from db.supabase_client import SupabaseClient | |
| from utils.image_processing import process_and_store_product_image | |
| class PromoProductRepository: | |
| def __init__(self): | |
| self.supabase = SupabaseClient().get_client() | |
| def fix_promo_date(self, promo_date: str, date_type: str = "start") -> str: | |
| """Replace invalid promo dates with appropriate fallback dates""" | |
| if promo_date is None: | |
| fallback_date = datetime.now() if date_type == "start" else datetime.now() + timedelta(days=7) | |
| print(f"⚠️ {date_type} date is None, using fallback: {fallback_date.isoformat()}") | |
| return fallback_date.isoformat() | |
| try: | |
| # Parse the date string | |
| dt = datetime.fromisoformat(promo_date.replace('Z', '+00:00')) | |
| # Check for Unix epoch start date (1970-01-01) | |
| if dt.year == 1970 and dt.month == 1 and dt.day == 1: | |
| fallback_date = datetime.now() if date_type == "start" else datetime.now() + timedelta(days=7) | |
| print(f"⚠️ {date_type} date is Unix epoch (1970), using fallback: {fallback_date.isoformat()}") | |
| return fallback_date.isoformat() | |
| # Check for dates too far in the past (more than 1 year ago) | |
| if dt < datetime.now() - timedelta(days=365): | |
| fallback_date = datetime.now() if date_type == "start" else datetime.now() + timedelta(days=7) | |
| print(f"⚠️ {date_type} date too old ({dt.date()}), using fallback: {fallback_date.isoformat()}") | |
| return fallback_date.isoformat() | |
| # Check for dates too far in the future (more than 1 year from now) | |
| if dt > datetime.now() + timedelta(days=365): | |
| fallback_date = datetime.now() if date_type == "start" else datetime.now() + timedelta(days=7) | |
| print(f"⚠️ {date_type} date too far in future ({dt.date()}), using fallback: {fallback_date.isoformat()}") | |
| return fallback_date.isoformat() | |
| return promo_date | |
| except Exception as e: | |
| # If parsing fails, replace with fallback | |
| fallback_date = datetime.now() if date_type == "start" else datetime.now() + timedelta(days=7) | |
| print(f"⚠️ {date_type} date parsing failed ({promo_date}), using fallback: {fallback_date.isoformat()}") | |
| return fallback_date.isoformat() | |
| def check_dictionary(self, product_name: str, store: str) -> str | None: | |
| """Check dictionary for existing product match""" | |
| if not product_name or not store: | |
| return None | |
| # Clean and format store name for column lookup | |
| store_key = store.lower().strip() | |
| column_name = f"promo_input_{store_key}" | |
| try: | |
| # Use a more explicit query approach | |
| query = self.supabase.table("product_input_dictionary").select("product_id") | |
| # Apply the filter dynamically | |
| result = query.filter(column_name, "eq", product_name).execute() | |
| # Validate the response structure | |
| if (result and | |
| hasattr(result, 'data') and | |
| result.data is not None and | |
| len(result.data) > 0): | |
| product_id = result.data[0].get("product_id") | |
| if product_id: | |
| print(f"✅ Found existing product ID {product_id} for '{product_name}' in column '{column_name}'") | |
| return product_id | |
| print(f"📝 No match found for '{product_name}' in column '{column_name}'") | |
| return None | |
| except Exception as e: | |
| print(f"❌ Error checking dictionary for '{product_name}' in column '{column_name}': {e}") | |
| return None | |
| def normalize_store_name(self, name: str) -> str: | |
| """Helper function for relaxed string comparison""" | |
| if not name: | |
| return "" | |
| import unicodedata | |
| normalized = unicodedata.normalize('NFD', name.lower()) | |
| return ''.join(c for c in normalized if unicodedata.category(c) != 'Mn' and c.isalnum()) | |
| def get_all_store_chains(self) -> List[Dict]: | |
| """Get all store chains""" | |
| try: | |
| result = self.supabase.table("store_chains") \ | |
| .select("store_chain_id, store_chain_name") \ | |
| .execute() | |
| return [{"id": chain["store_chain_id"], "name": chain["store_chain_name"]} | |
| for chain in result.data] | |
| except Exception as e: | |
| print(f"Error fetching store chains: {e}") | |
| return [] | |
| def get_stores_by_chain(self, chain_id: str) -> List[Dict]: | |
| """Get stores for a specific chain""" | |
| try: | |
| result = self.supabase.table("stores") \ | |
| .select("store_id, store_location, store_address") \ | |
| .eq("store_chain_id", chain_id) \ | |
| .execute() | |
| return [{"id": store["store_id"], | |
| "location": store["store_location"], | |
| "address": store["store_address"]} | |
| for store in result.data] | |
| except Exception as e: | |
| print(f"Error fetching stores for chain {chain_id}: {e}") | |
| return [] | |
| def validate_date_range(self, start_date: str, end_date: str) -> bool: | |
| """Validate and limit date range to prevent timeout issues""" | |
| try: | |
| start_dt = datetime.fromisoformat(start_date.replace('Z', '+00:00')) | |
| end_dt = datetime.fromisoformat(end_date.replace('Z', '+00:00')) | |
| # Calculate number of days | |
| days_diff = (end_dt - start_dt).days + 1 | |
| if days_diff > 90: # Limit to 90 days to prevent timeouts | |
| print(f"⚠️ Date range too large ({days_diff} days), limiting to 90 days") | |
| return False | |
| if days_diff < 1: # End date before start date | |
| print(f"⚠️ Invalid date range (end before start), skipping") | |
| return False | |
| print(f"📅 Date range validated: {days_diff} days ({start_dt.date()} to {end_dt.date()})") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Error validating date range: {e}") | |
| return False | |
| def process_single_store_pricing(self, store_id: str, product_id: str, | |
| start_date: str, end_date: str, price: float) -> bool: | |
| """Process pricing for a single store with enhanced timeout handling""" | |
| max_retries = 3 | |
| retry_delay = 2 | |
| # Validate date range first | |
| if not self.validate_date_range(start_date, end_date): | |
| print(f"❌ Skipping store {store_id} due to invalid date range") | |
| return False | |
| for attempt in range(max_retries): | |
| try: | |
| print(f" 🔄 Attempt {attempt + 1}: Processing store {store_id}") | |
| # Check if store-product relationship exists with timeout | |
| print(f" 📊 Checking store-product relationship...") | |
| store_product_result = self.supabase.table("store_products") \ | |
| .select("store_product_id") \ | |
| .eq("store_id", store_id) \ | |
| .eq("product_id", product_id) \ | |
| .maybe_single() \ | |
| .execute() | |
| if store_product_result.data: | |
| store_product_id = store_product_result.data["store_product_id"] | |
| print(f" ✅ Found existing store-product relationship: {store_product_id}") | |
| else: | |
| # Create new store-product relationship | |
| print(f" ➕ Creating new store-product relationship...") | |
| new_store_product = self.supabase.table("store_products") \ | |
| .insert({"store_id": store_id, "product_id": product_id}) \ | |
| .select("store_product_id") \ | |
| .single() \ | |
| .execute() | |
| store_product_id = new_store_product.data["store_product_id"] | |
| print(f" ✅ Created store-product relationship: {store_product_id}") | |
| # Count existing entries first to understand the scope | |
| print(f" 🔍 Checking existing price history entries...") | |
| existing_count_result = self.supabase.table("product_price_history") \ | |
| .select("*", count="exact") \ | |
| .eq("store_product_id", store_product_id) \ | |
| .gte("price_date", start_date) \ | |
| .lte("price_date", end_date) \ | |
| .execute() | |
| existing_count = existing_count_result.count if existing_count_result.count else 0 | |
| print(f" 📈 Found {existing_count} existing price entries to delete") | |
| # Delete existing entries in smaller batches if there are many | |
| if existing_count > 0: | |
| print(f" 🗑️ Deleting {existing_count} existing entries...") | |
| if existing_count > 100: | |
| # For large deletions, do it in smaller chunks | |
| print(f" ⚠️ Large deletion detected, processing in chunks...") | |
| # Delete in 30-day chunks to avoid timeouts | |
| current_start = datetime.fromisoformat(start_date.replace('Z', '+00:00')) | |
| end_dt = datetime.fromisoformat(end_date.replace('Z', '+00:00')) | |
| while current_start <= end_dt: | |
| chunk_end = min(current_start + timedelta(days=30), end_dt) | |
| chunk_start_str = current_start.strftime("%Y-%m-%d") | |
| chunk_end_str = chunk_end.strftime("%Y-%m-%d") | |
| print(f" 🗑️ Deleting chunk: {chunk_start_str} to {chunk_end_str}") | |
| self.supabase.table("product_price_history") \ | |
| .delete() \ | |
| .eq("store_product_id", store_product_id) \ | |
| .gte("price_date", chunk_start_str) \ | |
| .lte("price_date", chunk_end_str) \ | |
| .execute() | |
| current_start = chunk_end + timedelta(days=1) | |
| time.sleep(0.2) # Small delay between chunks | |
| else: | |
| # Small deletion, do it all at once | |
| self.supabase.table("product_price_history") \ | |
| .delete() \ | |
| .eq("store_product_id", store_product_id) \ | |
| .gte("price_date", start_date) \ | |
| .lte("price_date", end_date) \ | |
| .execute() | |
| # Create price history entries in very small batches | |
| print(f" 📊 Creating new price history entries...") | |
| start_dt = datetime.fromisoformat(start_date.replace('Z', '+00:00')) | |
| end_dt = datetime.fromisoformat(end_date.replace('Z', '+00:00')) | |
| current_date = start_dt | |
| batch_size = 25 # Very small batch size for Konzum | |
| price_entries = [] | |
| total_days = (end_dt - start_dt).days + 1 | |
| processed_days = 0 | |
| while current_date <= end_dt: | |
| price_entries.append({ | |
| "store_product_id": store_product_id, | |
| "current_price": price, | |
| "price_date": current_date.strftime("%Y-%m-%d") | |
| }) | |
| current_date += timedelta(days=1) | |
| processed_days += 1 | |
| # Insert in small batches | |
| if len(price_entries) >= batch_size: | |
| print(f" 📈 Inserting batch ({processed_days}/{total_days} days)") | |
| self.supabase.table("product_price_history") \ | |
| .insert(price_entries) \ | |
| .execute() | |
| price_entries = [] | |
| time.sleep(0.3) # Longer delay for Konzum | |
| # Insert remaining entries | |
| if price_entries: | |
| print(f" 📈 Inserting final batch ({processed_days}/{total_days} days)") | |
| self.supabase.table("product_price_history") \ | |
| .insert(price_entries) \ | |
| .execute() | |
| print(f" ✅ Successfully processed store {store_id}") | |
| return True | |
| except Exception as e: | |
| error_msg = str(e) | |
| if ("520" in error_msg or "timeout" in error_msg.lower()) and attempt < max_retries - 1: | |
| print(f" ⚠️ Timeout/520 error on attempt {attempt + 1}, retrying in {retry_delay}s...") | |
| time.sleep(retry_delay) | |
| retry_delay *= 2 # Exponential backoff | |
| continue | |
| else: | |
| print(f" ❌ Error processing store {store_id}: {e}") | |
| return False | |
| return False | |
| def process_product_pricing(self, product_id: str, store_name: str, start_date: str, | |
| end_date: str, promo_price: float, regular_price: float) -> bool: | |
| """Process product pricing for date range across all stores in a chain""" | |
| if not product_id or not store_name: | |
| print("Missing required parameters for price processing") | |
| return False | |
| try: | |
| print(f"Starting price processing for product ID: {product_id}") | |
| # Fix invalid dates BEFORE processing | |
| print(f"📅 Original dates - Start: {start_date}, End: {end_date}") | |
| fixed_start_date = self.fix_promo_date(start_date, "start") | |
| fixed_end_date = self.fix_promo_date(end_date, "end") | |
| print(f"📅 Fixed dates - Start: {fixed_start_date}, End: {fixed_end_date}") | |
| # Use the fixed dates | |
| start_date = fixed_start_date | |
| end_date = fixed_end_date | |
| # Get all store chains | |
| store_chains = self.get_all_store_chains() | |
| # Normalize the promo store name | |
| promo_store_normalized = self.normalize_store_name(store_name) | |
| # Find matching store chain with relaxed comparison | |
| matched_chain = None | |
| for chain in store_chains: | |
| chain_normalized = self.normalize_store_name(chain["name"]) | |
| if (promo_store_normalized in chain_normalized or | |
| chain_normalized in promo_store_normalized): | |
| matched_chain = chain | |
| print(f"✅ Matched store chain: {matched_chain['name']} (ID: {matched_chain['id']})") | |
| break | |
| if not matched_chain: | |
| print("No matching store chain found") | |
| return False | |
| # Get stores for the matched chain | |
| stores_in_chain = self.get_stores_by_chain(matched_chain["id"]) | |
| if not stores_in_chain: | |
| print(f"No stores found for chain ID: {matched_chain['id']}") | |
| return False | |
| # Use promo price if available, otherwise use regular price | |
| price_to_use = promo_price if promo_price and promo_price > 0 else regular_price or 0 | |
| successful_stores = 0 | |
| total_stores = len(stores_in_chain) | |
| print(f"📊 Processing {total_stores} stores for {matched_chain['name']}") | |
| # Process each store individually with delays | |
| for i, store in enumerate(stores_in_chain): | |
| print(f"Processing store {i+1}/{total_stores}: {store['location']} (ID: {store['id']})") | |
| success = self.process_single_store_pricing( | |
| store_id=store["id"], | |
| product_id=product_id, | |
| start_date=start_date, | |
| end_date=end_date, | |
| price=price_to_use | |
| ) | |
| if success: | |
| successful_stores += 1 | |
| print(f" ✅ Store {i+1}/{total_stores} completed successfully") | |
| else: | |
| print(f" ❌ Store {i+1}/{total_stores} failed") | |
| success_rate = successful_stores / total_stores if total_stores > 0 else 0 | |
| print(f"✅ Completed price processing: {successful_stores}/{total_stores} stores ({success_rate:.1%})") | |
| # Consider it successful if at least 80% of stores were updated | |
| threshold = 0.8 | |
| return success_rate >= threshold | |
| except Exception as e: | |
| print(f"Error processing product pricing: {e}") | |
| return False | |
| def process_product_image_sync(self, picture_id: str, product_id: str) -> bool: | |
| """Process product image using direct function calls - sync wrapper""" | |
| if not picture_id or not product_id: | |
| print("No image or product ID provided for image processing") | |
| return False | |
| try: | |
| print(f"🖼️ Processing image for product ID: {product_id}") | |
| # Get the original image URL (same pattern as admin dashboard) | |
| original_image_url = f"https://backend.360promo.hr/contents/products/{picture_id}.jpg" | |
| # Fetch the image | |
| print(f"📥 Downloading image from: {original_image_url}") | |
| response = requests.get(original_image_url, timeout=30) | |
| if not response.ok: | |
| print(f"❌ Failed to fetch image: HTTP {response.status_code}") | |
| return False | |
| # Create a mock UploadFile object from the downloaded image | |
| class MockUploadFile: | |
| def __init__(self, content: bytes, filename: str): | |
| self.file = BytesIO(content) | |
| self.filename = filename | |
| self.content_type = "image/jpeg" | |
| async def read(self) -> bytes: | |
| self.file.seek(0) | |
| return self.file.read() | |
| mock_file = MockUploadFile(response.content, f"product_{picture_id}.jpg") | |
| # Run the async function in a new event loop | |
| async def process_image(): | |
| return await process_and_store_product_image( | |
| file=mock_file, | |
| remove_bg=True, | |
| upscale=True, | |
| scale_factor=2, | |
| process_order="remove_first", | |
| product_id=product_id | |
| ) | |
| # Process the image directly using the imported function | |
| print(f"🔄 Processing image directly...") | |
| # Check if we're in an event loop | |
| try: | |
| loop = asyncio.get_running_loop() | |
| # We're in an async context, run in thread pool | |
| import concurrent.futures | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| future = executor.submit(asyncio.run, process_image()) | |
| result = future.result(timeout=60) | |
| except RuntimeError: | |
| # No event loop running, we can use asyncio.run | |
| result = asyncio.run(process_image()) | |
| if result.get('status') == 'success': | |
| print(f"✅ Image processed successfully: {result.get('image_url')}") | |
| return True | |
| else: | |
| print(f"❌ Image processing failed: {result}") | |
| return False | |
| except Exception as e: | |
| print(f"❌ Error processing product image: {e}") | |
| return False | |
| def upsert_multiple_products(self, products: List[Dict[str, Any]]) -> int: | |
| """ | |
| Upsert multiple promo products in batches with dictionary check and image processing | |
| Returns the number of successfully processed products | |
| """ | |
| batch_size = 100 | |
| successfully_processed = 0 | |
| automatically_adjusted = 0 # Counter for products found in dictionary | |
| upserted_to_promo = 0 # Counter for products added to promo_products table | |
| failed_pricing_updates = 0 # Counter for failed pricing updates | |
| images_processed = 0 # Counter for successfully processed images | |
| images_failed = 0 # Counter for failed image processing | |
| date_fixes = 0 # Counter for fixed dates | |
| timestamp = datetime.now().isoformat() | |
| for i in range(0, len(products), batch_size): | |
| batch = products[i:i+batch_size] | |
| for product in batch: | |
| store = product.get("store") | |
| name = product.get("name") | |
| picture_id = product.get("pictureId") | |
| try: | |
| # Check dictionary first | |
| existing_product_id = self.check_dictionary(name, store) | |
| if existing_product_id: | |
| # Product exists in dictionary - update pricing and process image | |
| print(f"Found existing product ID {existing_product_id} for '{name}' from '{store}' - updating pricing and processing image") | |
| # Check if dates need fixing | |
| original_start = product.get("promoStartDate") | |
| original_end = product.get("promoEndDate") | |
| if (original_start is None or | |
| original_start == "1970-01-01T00:00:00Z" or | |
| original_end is None or | |
| original_end == "1970-01-01T00:00:00Z"): | |
| date_fixes += 1 | |
| # Process pricing | |
| pricing_success = self.process_product_pricing( | |
| product_id=existing_product_id, | |
| store_name=store, | |
| start_date=product.get("promoStartDate"), | |
| end_date=product.get("promoEndDate"), | |
| promo_price=product.get("promoPrice"), | |
| regular_price=product.get("regularPrice") | |
| ) | |
| # Process image if available (using sync wrapper) | |
| image_success = False | |
| if picture_id: | |
| image_success = self.process_product_image_sync(picture_id, existing_product_id) | |
| if image_success: | |
| images_processed += 1 | |
| print(f"🖼️ Successfully processed image for: {name}") | |
| else: | |
| images_failed += 1 | |
| print(f"🖼️ Failed to process image for: {name}") | |
| if pricing_success: | |
| successfully_processed += 1 | |
| automatically_adjusted += 1 | |
| print(f"✅ Automatically adjusted pricing for: {name}") | |
| else: | |
| failed_pricing_updates += 1 | |
| print(f"❌ Failed to update pricing for: {name}") | |
| else: | |
| # Product not in dictionary - proceed with normal upsert to promo_products | |
| formatted_promo_product = { | |
| "store": store, | |
| "picture_id": product.get("pictureId"), | |
| "name": name, | |
| "description": product.get("description", ""), | |
| "promo_start_date": product.get("promoStartDate"), | |
| "promo_end_date": product.get("promoEndDate"), | |
| "regular_price": product.get("regularPrice"), | |
| "promo_price": product.get("promoPrice"), | |
| "last_updated": timestamp | |
| } | |
| # Check if product exists in promo_products | |
| result = self.supabase.table("promo_products").select("*") \ | |
| .eq("store", store) \ | |
| .eq("name", name) \ | |
| .execute() | |
| if result.data and len(result.data) > 0: | |
| # Update existing promo product | |
| record_id = result.data[0]["id"] | |
| self.supabase.table("promo_products") \ | |
| .update(formatted_promo_product) \ | |
| .eq("id", record_id) \ | |
| .execute() | |
| print(f"🔄 Updated existing promo product: {name}") | |
| else: | |
| # Insert new promo product | |
| self.supabase.table("promo_products") \ | |
| .insert(formatted_promo_product) \ | |
| .execute() | |
| print(f"➕ Inserted new promo product: {name}") | |
| successfully_processed += 1 | |
| upserted_to_promo += 1 | |
| # Print progress periodically | |
| total_processed = successfully_processed + failed_pricing_updates | |
| if total_processed % 50 == 0: | |
| print(f"Processed {total_processed} / {len(products)} products so far...") | |
| except Exception as e: | |
| print(f"Failed to process product '{name}' from '{store}': {str(e)}") | |
| continue | |
| # Detailed summary logging | |
| total_processed = successfully_processed + failed_pricing_updates | |
| print(f"\n{'='*60}") | |
| print(f"SCRAPING PROCESS SUMMARY") | |
| print(f"{'='*60}") | |
| print(f"📊 Total products processed: {len(products)}") | |
| print(f"✅ Successfully processed: {successfully_processed}") | |
| print(f"🔧 Automatically adjusted (existing products): {automatically_adjusted}") | |
| print(f"📋 Upserted to promo_products table: {upserted_to_promo}") | |
| print(f"⚠️ Failed pricing updates: {failed_pricing_updates}") | |
| print(f"🖼️ Images successfully processed: {images_processed}") | |
| print(f"🖼️ Images failed to process: {images_failed}") | |
| print(f"📅 Invalid dates fixed: {date_fixes}") | |
| print(f"❌ Failed to process: {len(products) - total_processed}") | |
| print(f"{'='*60}") | |
| if automatically_adjusted > 0: | |
| print(f"🎯 {automatically_adjusted} products were found in the dictionary and had their pricing automatically updated across all stores in their respective chains.") | |
| if images_processed > 0: | |
| print(f"🖼️ {images_processed} product images were successfully processed and updated.") | |
| if images_failed > 0: | |
| print(f"⚠️ {images_failed} product images failed to process.") | |
| if date_fixes > 0: | |
| print(f"📅 {date_fixes} products had invalid dates (null/1970) that were automatically corrected.") | |
| if upserted_to_promo > 0: | |
| print(f"📝 {upserted_to_promo} products were added/updated in the temporary promo_products table for manual review.") | |
| if failed_pricing_updates > 0: | |
| print(f"⚠️ {failed_pricing_updates} products had dictionary matches but failed pricing updates (likely due to API limits).") | |
| print(f"{'='*60}\n") | |
| return successfully_processed | |