Spaces:
Sleeping
Sleeping
| import requests | |
| import random | |
| import numpy as np | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import logging | |
| # Setup logging | |
| logger = logging.getLogger(__name__) | |
| DELTA_API = "https://delta-api.pricelyst.co.zw" | |
| HEADERS = {"Accept": "application/json"} | |
| # ---------------------------- | |
| # FETCHERS | |
| # ---------------------------- | |
| def fetch_all_products_paginated(max_pages=5): | |
| """ | |
| Fetch products from multiple pages if needed. | |
| max_pages: limit to prevent excessive API calls in production | |
| """ | |
| logger.info(f"Fetching products from multiple pages (max: {max_pages})...") | |
| all_products = [] | |
| current_page = 1 | |
| while current_page <= max_pages: | |
| try: | |
| url = f"{DELTA_API}/api/products?page={current_page}" | |
| logger.debug(f"API call: GET {url}") | |
| res = requests.get(url, headers=HEADERS, timeout=30) | |
| res.raise_for_status() | |
| raw_response = res.json() | |
| data = raw_response.get("data", {}) | |
| if isinstance(data, dict) and 'products' in data: | |
| products = data.get('products', []) | |
| total_pages = data.get('totalPages', 1) | |
| logger.info(f"Page {current_page}: Retrieved {len(products)} products") | |
| all_products.extend(products) | |
| # Stop if we've reached the last page or if no products returned | |
| if current_page >= total_pages or not products: | |
| logger.info(f"Reached end at page {current_page} of {total_pages}") | |
| break | |
| current_page += 1 | |
| else: | |
| logger.warning(f"Unexpected response format on page {current_page}") | |
| break | |
| except Exception as e: | |
| logger.error(f"Error fetching page {current_page}: {e}") | |
| break | |
| logger.info(f"Total products fetched across {current_page} pages: {len(all_products)}") | |
| return all_products | |
| def fetch_all_products(): | |
| logger.info("Fetching all products...") | |
| try: | |
| url = f"{DELTA_API}/api/products" | |
| logger.debug(f"API call: GET {url}") | |
| res = requests.get(url, headers=HEADERS, timeout=30) | |
| res.raise_for_status() # Raise exception for bad status codes | |
| # Debug the raw response | |
| raw_response = res.json() | |
| logger.debug(f"Raw API response type: {type(raw_response)}") | |
| logger.debug(f"Raw API response keys: {list(raw_response.keys()) if isinstance(raw_response, dict) else 'Not a dict'}") | |
| data = raw_response.get("data", []) | |
| logger.debug(f"Data type: {type(data)}") | |
| # Handle paginated response structure | |
| if isinstance(data, dict) and 'products' in data: | |
| logger.info("Detected paginated response format") | |
| products = data.get('products', []) | |
| total_count = data.get('totalItemCount', 0) | |
| current_page = data.get('currentPage', 1) | |
| total_pages = data.get('totalPages', 1) | |
| logger.info(f"Pagination info: Page {current_page}/{total_pages}, Total items: {total_count}") | |
| logger.info(f"Retrieved {len(products)} products from current page") | |
| # If there are multiple pages and we have few products, fetch more pages | |
| if total_pages > 1 and len(products) < 50: | |
| logger.info("Multiple pages available, fetching additional pages...") | |
| return fetch_all_products_paginated(max_pages=min(5, total_pages)) | |
| # For now, just use the first page. In production, you might want to fetch all pages | |
| if isinstance(products, list): | |
| if products: | |
| sample_ids = [p.get('id', 'no_id') for p in products[:3]] | |
| logger.debug(f"Sample product IDs: {sample_ids}") | |
| return products | |
| else: | |
| logger.error(f"Products field is not a list: {type(products)}") | |
| return [] | |
| elif isinstance(data, list): | |
| logger.info("Direct list response format") | |
| products = data | |
| if products: | |
| sample_ids = [p.get('id', 'no_id') for p in products[:3]] | |
| logger.debug(f"Sample product IDs: {sample_ids}") | |
| logger.info(f"Retrieved {len(products)} products") | |
| return products | |
| else: | |
| logger.error(f"Unexpected data format: {type(data)}, content: {data}") | |
| return [] | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Failed to fetch products: {e}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Unexpected error fetching products: {e}") | |
| logger.exception("Full traceback:") | |
| return [] | |
| def fetch_user_addresses(user_id): | |
| logger.info(f"Fetching addresses for user {user_id}...") | |
| try: | |
| url = f"{DELTA_API}/api/addresses/{user_id}" | |
| logger.debug(f"API call: GET {url}") | |
| res = requests.get(url, headers=HEADERS, timeout=30) | |
| res.raise_for_status() | |
| # Debug the raw response | |
| raw_response = res.json() | |
| logger.debug(f"Addresses raw response type: {type(raw_response)}") | |
| data = raw_response.get("data", []) | |
| logger.debug(f"Addresses data type: {type(data)}") | |
| logger.info(f"Found {len(data) if hasattr(data, '__len__') else 'unknown'} addresses for user {user_id}") | |
| if isinstance(data, list) and data: | |
| safe_addresses = [] | |
| for addr in data: | |
| if isinstance(addr, dict): | |
| city = addr.get('city', 'unknown') | |
| suburb = addr.get('suburb', 'unknown') | |
| safe_addresses.append(f"{city}, {suburb}") | |
| else: | |
| safe_addresses.append(f"invalid_address_type_{type(addr)}") | |
| logger.debug(f"User {user_id} addresses: {safe_addresses}") | |
| return data if isinstance(data, list) else [] | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Failed to fetch addresses for user {user_id}: {e}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Unexpected error fetching addresses for user {user_id}: {e}") | |
| logger.exception("Full traceback:") | |
| return [] | |
| def fetch_all_users(): | |
| logger.info("Fetching all users...") | |
| try: | |
| url = f"{DELTA_API}/api/users" | |
| logger.debug(f"API call: GET {url}") | |
| res = requests.get(url, headers=HEADERS, timeout=30) | |
| res.raise_for_status() | |
| # Debug the raw response | |
| raw_response = res.json() | |
| logger.debug(f"Users raw response type: {type(raw_response)}") | |
| data = raw_response.get("data", []) | |
| logger.debug(f"Users data type: {type(data)}") | |
| logger.info(f"Successfully fetched {len(data) if hasattr(data, '__len__') else 'unknown'} users") | |
| return data if isinstance(data, list) else [] | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Failed to fetch users: {e}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Unexpected error fetching users: {e}") | |
| logger.exception("Full traceback:") | |
| return [] | |
| def fetch_user_cart_items(user_id): | |
| logger.info(f"Fetching cart items for user {user_id}...") | |
| try: | |
| # Get user's carts | |
| carts_url = f"{DELTA_API}/api/carts?user_id={user_id}" | |
| logger.debug(f"API call: GET {carts_url}") | |
| res = requests.get(carts_url, headers=HEADERS, timeout=30) | |
| res.raise_for_status() | |
| carts = res.json().get("data", []) | |
| logger.info(f"Found {len(carts)} carts for user {user_id}") | |
| product_ids = set() | |
| for cart in carts: | |
| cart_id = cart.get("id") | |
| logger.debug(f"Fetching items for cart {cart_id}") | |
| items_url = f"{DELTA_API}/api/cart-items?cart_id={cart_id}" | |
| items_res = requests.get(items_url, headers=HEADERS, timeout=30) | |
| items_res.raise_for_status() | |
| items = items_res.json().get("data", []) | |
| cart_product_ids = [item["product_id"] for item in items if "product_id" in item] | |
| product_ids.update(cart_product_ids) | |
| logger.debug(f"Cart {cart_id} contains {len(cart_product_ids)} products") | |
| result = list(product_ids) | |
| logger.info(f"User {user_id} has {len(result)} unique products in cart history") | |
| logger.debug(f"User {user_id} product IDs: {result[:10]}...") # Show first 10 | |
| return result | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Failed to fetch cart items for user {user_id}: {e}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Unexpected error fetching cart items for user {user_id}: {e}") | |
| return [] | |
| def fetch_users_by_location(city=None, suburb=None): | |
| logger.info(f"Fetching users by location - city: {city}, suburb: {suburb}") | |
| all_users = fetch_all_users() | |
| if not all_users: | |
| logger.warning("No users found, cannot filter by location") | |
| return [] | |
| matching_users = [] | |
| checked_users = 0 | |
| for u in all_users: | |
| uid = u.get("id") | |
| if not uid: | |
| continue | |
| checked_users += 1 | |
| addresses = fetch_user_addresses(uid) | |
| user_matches = False | |
| for addr in addresses: | |
| addr_city = addr.get("city") | |
| addr_suburb = addr.get("suburb") | |
| if city and addr_city == city: | |
| matching_users.append(uid) | |
| user_matches = True | |
| logger.debug(f"User {uid} matches city: {city}") | |
| break | |
| elif suburb and addr_suburb == suburb: | |
| matching_users.append(uid) | |
| user_matches = True | |
| logger.debug(f"User {uid} matches suburb: {suburb}") | |
| break | |
| if checked_users % 10 == 0: # Log progress every 10 users | |
| logger.debug(f"Checked {checked_users}/{len(all_users)} users, found {len(matching_users)} matches") | |
| result = list(set(matching_users)) | |
| logger.info(f"Found {len(result)} users matching location criteria") | |
| return result | |
| # ---------------------------- | |
| # RECOMMENDER CORE | |
| # ---------------------------- | |
| def recommend_products(user_id, top_n=5): | |
| logger.info(f"=== STARTING RECOMMENDATION FOR USER {user_id} ===") | |
| logger.info(f"Parameters: top_n={top_n}") | |
| # Fetch all products | |
| all_products = fetch_all_products() | |
| if not all_products: | |
| logger.error("No products available - cannot generate recommendations") | |
| return [] | |
| logger.info(f"Working with {len(all_products)} total products") | |
| # Step 1: Get user location | |
| logger.info("Step 1: Getting user location...") | |
| user_addresses = fetch_user_addresses(user_id) | |
| user_city = user_addresses[0]["city"] if user_addresses else None | |
| user_suburb = user_addresses[0]["suburb"] if user_addresses else None | |
| logger.info(f"User location: city={user_city}, suburb={user_suburb}") | |
| # Step 2: Get user's history | |
| logger.info("Step 2: Getting user purchase history...") | |
| user_history = fetch_user_cart_items(user_id) | |
| logger.info(f"User has {len(user_history)} products in history") | |
| # Cold-start fallback | |
| if not user_history: | |
| logger.info("Step 2a: No user history found - using cold-start approach") | |
| local_users = fetch_users_by_location(city=user_city, suburb=user_suburb) | |
| logger.info(f"Found {len(local_users)} local users for cold-start") | |
| peer_history = [] | |
| for uid in local_users[:10]: # Limit to first 10 users for performance | |
| peer_items = fetch_user_cart_items(uid) | |
| peer_history.extend(peer_items) | |
| logger.debug(f"Local user {uid} contributed {len(peer_items)} items") | |
| peer_history = list(set(peer_history)) | |
| logger.info(f"Collected {len(peer_history)} unique products from local users") | |
| if not peer_history: | |
| logger.info("No peer history found - returning random products") | |
| # FIX: Ensure all_products is a list and handle the random sampling properly | |
| if isinstance(all_products, list) and len(all_products) > 0: | |
| sample_size = min(top_n, len(all_products)) | |
| random_products = random.sample(all_products, sample_size) | |
| logger.info(f"Returning {len(random_products)} random products") | |
| return random_products | |
| else: | |
| logger.error(f"all_products is not a proper list: type={type(all_products)}, len={len(all_products) if hasattr(all_products, '__len__') else 'N/A'}") | |
| return [] | |
| else: | |
| user_history = peer_history | |
| logger.info(f"Using peer history: {len(user_history)} products") | |
| # Step 3: Content vectorization | |
| logger.info("Step 3: Building content vectors...") | |
| product_map = {p["id"]: p for p in all_products} | |
| product_ids = list(product_map.keys()) | |
| logger.info(f"Created product map with {len(product_map)} products") | |
| def product_text(p): | |
| # Extract text from various fields, handling nested structures | |
| text_parts = [] | |
| # Basic product info | |
| text_parts.append(p.get('name', '')) | |
| text_parts.append(p.get('description', '')) | |
| text_parts.append(p.get('product_code', '')) | |
| # Category information (nested) | |
| category = p.get('category', {}) | |
| if isinstance(category, dict): | |
| text_parts.append(category.get('name', '')) | |
| text_parts.append(category.get('code', '')) | |
| # Brand information (nested) | |
| brand = p.get('brand', {}) | |
| if isinstance(brand, dict): | |
| text_parts.append(brand.get('name', '')) | |
| text_parts.append(brand.get('brand_code', '')) | |
| # Multiple categories if available | |
| categories = p.get('categories', []) | |
| if isinstance(categories, list): | |
| for cat in categories: | |
| if isinstance(cat, dict): | |
| text_parts.append(cat.get('name', '')) | |
| text_parts.append(cat.get('code', '')) | |
| # Join all non-empty text parts | |
| text = ' '.join([part for part in text_parts if part and isinstance(part, str)]) | |
| return text.strip() | |
| try: | |
| product_texts = [product_text(product_map[pid]) for pid in product_ids] | |
| logger.info(f"Generated text representations for {len(product_texts)} products") | |
| # Filter out empty texts | |
| valid_indices = [i for i, text in enumerate(product_texts) if text.strip()] | |
| if not valid_indices: | |
| logger.error("No valid product texts found for vectorization") | |
| return [] | |
| valid_product_ids = [product_ids[i] for i in valid_indices] | |
| valid_product_texts = [product_texts[i] for i in valid_indices] | |
| logger.info(f"Using {len(valid_product_texts)} products with valid text content") | |
| vectorizer = TfidfVectorizer(stop_words="english", max_features=1000) | |
| tfidf_matrix = vectorizer.fit_transform(valid_product_texts) | |
| logger.info(f"TF-IDF matrix shape: {tfidf_matrix.shape}") | |
| except Exception as e: | |
| logger.error(f"Error in vectorization: {e}") | |
| return [] | |
| # Step 4: Compute similarity | |
| logger.info("Step 4: Computing similarities...") | |
| try: | |
| # Find user history items that exist in our valid product set | |
| user_history_valid = [pid for pid in user_history if pid in valid_product_ids] | |
| logger.info(f"User history items in valid set: {len(user_history_valid)}") | |
| if not user_history_valid: | |
| logger.warning("No user history items found in valid product set - returning random products") | |
| sample_size = min(top_n, len(all_products)) | |
| return random.sample(all_products, sample_size) | |
| # Get indices of user history items | |
| user_indices = [valid_product_ids.index(pid) for pid in user_history_valid] | |
| logger.debug(f"User product indices: {user_indices[:5]}...") # Show first 5 | |
| # Compute user preference vector (mean of user history items) | |
| user_vectors = tfidf_matrix[user_indices] | |
| user_vec = np.mean(user_vectors, axis=0) | |
| logger.info(f"User preference vector shape: {user_vec.shape}") | |
| # Compute similarities | |
| sim_scores = cosine_similarity(user_vec, tfidf_matrix).flatten() | |
| logger.info(f"Computed {len(sim_scores)} similarity scores") | |
| logger.debug(f"Similarity score range: {sim_scores.min():.4f} to {sim_scores.max():.4f}") | |
| # Step 5: Rank and filter | |
| ranked_indices = np.argsort(sim_scores)[::-1] | |
| logger.info("Step 5: Ranking products and filtering...") | |
| recommendations = [] | |
| considered_products = 0 | |
| for idx in ranked_indices: | |
| pid = valid_product_ids[idx] | |
| score = sim_scores[idx] | |
| considered_products += 1 | |
| if pid not in user_history: | |
| product = product_map[pid] | |
| # Add similarity score for debugging | |
| if isinstance(product, dict): | |
| product['similarity_score'] = float(score) | |
| recommendations.append(product) | |
| logger.debug(f"Added recommendation {len(recommendations)}: Product {pid} (score: {score:.4f})") | |
| if len(recommendations) >= top_n: | |
| break | |
| logger.info(f"Considered {considered_products} products, generated {len(recommendations)} recommendations") | |
| if recommendations: | |
| rec_ids = [r.get('id') for r in recommendations if isinstance(r, dict)] | |
| rec_scores = [r.get('similarity_score', 0) for r in recommendations if isinstance(r, dict)] | |
| logger.info(f"Final recommendations: IDs {rec_ids}, Scores {rec_scores}") | |
| logger.info(f"=== RECOMMENDATION COMPLETE FOR USER {user_id} ===") | |
| return recommendations | |
| except Exception as e: | |
| logger.error(f"Error in similarity computation: {e}") | |
| logger.exception("Full traceback:") | |
| return [] |