import requests import random import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import logging # Setup logging logger = logging.getLogger(__name__) DELTA_API = "https://delta-api.pricelyst.co.zw" HEADERS = {"Accept": "application/json"} # ---------------------------- # FETCHERS # ---------------------------- def fetch_all_products_paginated(max_pages=5): """ Fetch products from multiple pages if needed. max_pages: limit to prevent excessive API calls in production """ logger.info(f"Fetching products from multiple pages (max: {max_pages})...") all_products = [] current_page = 1 while current_page <= max_pages: try: url = f"{DELTA_API}/api/products?page={current_page}" logger.debug(f"API call: GET {url}") res = requests.get(url, headers=HEADERS, timeout=30) res.raise_for_status() raw_response = res.json() data = raw_response.get("data", {}) if isinstance(data, dict) and 'products' in data: products = data.get('products', []) total_pages = data.get('totalPages', 1) logger.info(f"Page {current_page}: Retrieved {len(products)} products") all_products.extend(products) # Stop if we've reached the last page or if no products returned if current_page >= total_pages or not products: logger.info(f"Reached end at page {current_page} of {total_pages}") break current_page += 1 else: logger.warning(f"Unexpected response format on page {current_page}") break except Exception as e: logger.error(f"Error fetching page {current_page}: {e}") break logger.info(f"Total products fetched across {current_page} pages: {len(all_products)}") return all_products def fetch_all_products(): logger.info("Fetching all products...") try: url = f"{DELTA_API}/api/products" logger.debug(f"API call: GET {url}") res = requests.get(url, headers=HEADERS, timeout=30) res.raise_for_status() # Raise exception for bad status codes # Debug the raw response raw_response = res.json() logger.debug(f"Raw API response type: {type(raw_response)}") logger.debug(f"Raw API response keys: {list(raw_response.keys()) if isinstance(raw_response, dict) else 'Not a dict'}") data = raw_response.get("data", []) logger.debug(f"Data type: {type(data)}") # Handle paginated response structure if isinstance(data, dict) and 'products' in data: logger.info("Detected paginated response format") products = data.get('products', []) total_count = data.get('totalItemCount', 0) current_page = data.get('currentPage', 1) total_pages = data.get('totalPages', 1) logger.info(f"Pagination info: Page {current_page}/{total_pages}, Total items: {total_count}") logger.info(f"Retrieved {len(products)} products from current page") # If there are multiple pages and we have few products, fetch more pages if total_pages > 1 and len(products) < 50: logger.info("Multiple pages available, fetching additional pages...") return fetch_all_products_paginated(max_pages=min(5, total_pages)) # For now, just use the first page. In production, you might want to fetch all pages if isinstance(products, list): if products: sample_ids = [p.get('id', 'no_id') for p in products[:3]] logger.debug(f"Sample product IDs: {sample_ids}") return products else: logger.error(f"Products field is not a list: {type(products)}") return [] elif isinstance(data, list): logger.info("Direct list response format") products = data if products: sample_ids = [p.get('id', 'no_id') for p in products[:3]] logger.debug(f"Sample product IDs: {sample_ids}") logger.info(f"Retrieved {len(products)} products") return products else: logger.error(f"Unexpected data format: {type(data)}, content: {data}") return [] except requests.exceptions.RequestException as e: logger.error(f"Failed to fetch products: {e}") return [] except Exception as e: logger.error(f"Unexpected error fetching products: {e}") logger.exception("Full traceback:") return [] def fetch_user_addresses(user_id): logger.info(f"Fetching addresses for user {user_id}...") try: url = f"{DELTA_API}/api/addresses/{user_id}" logger.debug(f"API call: GET {url}") res = requests.get(url, headers=HEADERS, timeout=30) res.raise_for_status() # Debug the raw response raw_response = res.json() logger.debug(f"Addresses raw response type: {type(raw_response)}") data = raw_response.get("data", []) logger.debug(f"Addresses data type: {type(data)}") logger.info(f"Found {len(data) if hasattr(data, '__len__') else 'unknown'} addresses for user {user_id}") if isinstance(data, list) and data: safe_addresses = [] for addr in data: if isinstance(addr, dict): city = addr.get('city', 'unknown') suburb = addr.get('suburb', 'unknown') safe_addresses.append(f"{city}, {suburb}") else: safe_addresses.append(f"invalid_address_type_{type(addr)}") logger.debug(f"User {user_id} addresses: {safe_addresses}") return data if isinstance(data, list) else [] except requests.exceptions.RequestException as e: logger.error(f"Failed to fetch addresses for user {user_id}: {e}") return [] except Exception as e: logger.error(f"Unexpected error fetching addresses for user {user_id}: {e}") logger.exception("Full traceback:") return [] def fetch_all_users(): logger.info("Fetching all users...") try: url = f"{DELTA_API}/api/users" logger.debug(f"API call: GET {url}") res = requests.get(url, headers=HEADERS, timeout=30) res.raise_for_status() # Debug the raw response raw_response = res.json() logger.debug(f"Users raw response type: {type(raw_response)}") data = raw_response.get("data", []) logger.debug(f"Users data type: {type(data)}") logger.info(f"Successfully fetched {len(data) if hasattr(data, '__len__') else 'unknown'} users") return data if isinstance(data, list) else [] except requests.exceptions.RequestException as e: logger.error(f"Failed to fetch users: {e}") return [] except Exception as e: logger.error(f"Unexpected error fetching users: {e}") logger.exception("Full traceback:") return [] def fetch_user_cart_items(user_id): logger.info(f"Fetching cart items for user {user_id}...") try: # Get user's carts carts_url = f"{DELTA_API}/api/carts?user_id={user_id}" logger.debug(f"API call: GET {carts_url}") res = requests.get(carts_url, headers=HEADERS, timeout=30) res.raise_for_status() carts = res.json().get("data", []) logger.info(f"Found {len(carts)} carts for user {user_id}") product_ids = set() for cart in carts: cart_id = cart.get("id") logger.debug(f"Fetching items for cart {cart_id}") items_url = f"{DELTA_API}/api/cart-items?cart_id={cart_id}" items_res = requests.get(items_url, headers=HEADERS, timeout=30) items_res.raise_for_status() items = items_res.json().get("data", []) cart_product_ids = [item["product_id"] for item in items if "product_id" in item] product_ids.update(cart_product_ids) logger.debug(f"Cart {cart_id} contains {len(cart_product_ids)} products") result = list(product_ids) logger.info(f"User {user_id} has {len(result)} unique products in cart history") logger.debug(f"User {user_id} product IDs: {result[:10]}...") # Show first 10 return result except requests.exceptions.RequestException as e: logger.error(f"Failed to fetch cart items for user {user_id}: {e}") return [] except Exception as e: logger.error(f"Unexpected error fetching cart items for user {user_id}: {e}") return [] def fetch_users_by_location(city=None, suburb=None): logger.info(f"Fetching users by location - city: {city}, suburb: {suburb}") all_users = fetch_all_users() if not all_users: logger.warning("No users found, cannot filter by location") return [] matching_users = [] checked_users = 0 for u in all_users: uid = u.get("id") if not uid: continue checked_users += 1 addresses = fetch_user_addresses(uid) user_matches = False for addr in addresses: addr_city = addr.get("city") addr_suburb = addr.get("suburb") if city and addr_city == city: matching_users.append(uid) user_matches = True logger.debug(f"User {uid} matches city: {city}") break elif suburb and addr_suburb == suburb: matching_users.append(uid) user_matches = True logger.debug(f"User {uid} matches suburb: {suburb}") break if checked_users % 10 == 0: # Log progress every 10 users logger.debug(f"Checked {checked_users}/{len(all_users)} users, found {len(matching_users)} matches") result = list(set(matching_users)) logger.info(f"Found {len(result)} users matching location criteria") return result # ---------------------------- # RECOMMENDER CORE # ---------------------------- def recommend_products(user_id, top_n=5): logger.info(f"=== STARTING RECOMMENDATION FOR USER {user_id} ===") logger.info(f"Parameters: top_n={top_n}") # Fetch all products all_products = fetch_all_products() if not all_products: logger.error("No products available - cannot generate recommendations") return [] logger.info(f"Working with {len(all_products)} total products") # Step 1: Get user location logger.info("Step 1: Getting user location...") user_addresses = fetch_user_addresses(user_id) user_city = user_addresses[0]["city"] if user_addresses else None user_suburb = user_addresses[0]["suburb"] if user_addresses else None logger.info(f"User location: city={user_city}, suburb={user_suburb}") # Step 2: Get user's history logger.info("Step 2: Getting user purchase history...") user_history = fetch_user_cart_items(user_id) logger.info(f"User has {len(user_history)} products in history") # Cold-start fallback if not user_history: logger.info("Step 2a: No user history found - using cold-start approach") local_users = fetch_users_by_location(city=user_city, suburb=user_suburb) logger.info(f"Found {len(local_users)} local users for cold-start") peer_history = [] for uid in local_users[:10]: # Limit to first 10 users for performance peer_items = fetch_user_cart_items(uid) peer_history.extend(peer_items) logger.debug(f"Local user {uid} contributed {len(peer_items)} items") peer_history = list(set(peer_history)) logger.info(f"Collected {len(peer_history)} unique products from local users") if not peer_history: logger.info("No peer history found - returning random products") # FIX: Ensure all_products is a list and handle the random sampling properly if isinstance(all_products, list) and len(all_products) > 0: sample_size = min(top_n, len(all_products)) random_products = random.sample(all_products, sample_size) logger.info(f"Returning {len(random_products)} random products") return random_products else: logger.error(f"all_products is not a proper list: type={type(all_products)}, len={len(all_products) if hasattr(all_products, '__len__') else 'N/A'}") return [] else: user_history = peer_history logger.info(f"Using peer history: {len(user_history)} products") # Step 3: Content vectorization logger.info("Step 3: Building content vectors...") product_map = {p["id"]: p for p in all_products} product_ids = list(product_map.keys()) logger.info(f"Created product map with {len(product_map)} products") def product_text(p): # Extract text from various fields, handling nested structures text_parts = [] # Basic product info text_parts.append(p.get('name', '')) text_parts.append(p.get('description', '')) text_parts.append(p.get('product_code', '')) # Category information (nested) category = p.get('category', {}) if isinstance(category, dict): text_parts.append(category.get('name', '')) text_parts.append(category.get('code', '')) # Brand information (nested) brand = p.get('brand', {}) if isinstance(brand, dict): text_parts.append(brand.get('name', '')) text_parts.append(brand.get('brand_code', '')) # Multiple categories if available categories = p.get('categories', []) if isinstance(categories, list): for cat in categories: if isinstance(cat, dict): text_parts.append(cat.get('name', '')) text_parts.append(cat.get('code', '')) # Join all non-empty text parts text = ' '.join([part for part in text_parts if part and isinstance(part, str)]) return text.strip() try: product_texts = [product_text(product_map[pid]) for pid in product_ids] logger.info(f"Generated text representations for {len(product_texts)} products") # Filter out empty texts valid_indices = [i for i, text in enumerate(product_texts) if text.strip()] if not valid_indices: logger.error("No valid product texts found for vectorization") return [] valid_product_ids = [product_ids[i] for i in valid_indices] valid_product_texts = [product_texts[i] for i in valid_indices] logger.info(f"Using {len(valid_product_texts)} products with valid text content") vectorizer = TfidfVectorizer(stop_words="english", max_features=1000) tfidf_matrix = vectorizer.fit_transform(valid_product_texts) logger.info(f"TF-IDF matrix shape: {tfidf_matrix.shape}") except Exception as e: logger.error(f"Error in vectorization: {e}") return [] # Step 4: Compute similarity logger.info("Step 4: Computing similarities...") try: # Find user history items that exist in our valid product set user_history_valid = [pid for pid in user_history if pid in valid_product_ids] logger.info(f"User history items in valid set: {len(user_history_valid)}") if not user_history_valid: logger.warning("No user history items found in valid product set - returning random products") sample_size = min(top_n, len(all_products)) return random.sample(all_products, sample_size) # Get indices of user history items user_indices = [valid_product_ids.index(pid) for pid in user_history_valid] logger.debug(f"User product indices: {user_indices[:5]}...") # Show first 5 # Compute user preference vector (mean of user history items) user_vectors = tfidf_matrix[user_indices] user_vec = np.mean(user_vectors, axis=0) logger.info(f"User preference vector shape: {user_vec.shape}") # Compute similarities sim_scores = cosine_similarity(user_vec, tfidf_matrix).flatten() logger.info(f"Computed {len(sim_scores)} similarity scores") logger.debug(f"Similarity score range: {sim_scores.min():.4f} to {sim_scores.max():.4f}") # Step 5: Rank and filter ranked_indices = np.argsort(sim_scores)[::-1] logger.info("Step 5: Ranking products and filtering...") recommendations = [] considered_products = 0 for idx in ranked_indices: pid = valid_product_ids[idx] score = sim_scores[idx] considered_products += 1 if pid not in user_history: product = product_map[pid] # Add similarity score for debugging if isinstance(product, dict): product['similarity_score'] = float(score) recommendations.append(product) logger.debug(f"Added recommendation {len(recommendations)}: Product {pid} (score: {score:.4f})") if len(recommendations) >= top_n: break logger.info(f"Considered {considered_products} products, generated {len(recommendations)} recommendations") if recommendations: rec_ids = [r.get('id') for r in recommendations if isinstance(r, dict)] rec_scores = [r.get('similarity_score', 0) for r in recommendations if isinstance(r, dict)] logger.info(f"Final recommendations: IDs {rec_ids}, Scores {rec_scores}") logger.info(f"=== RECOMMENDATION COMPLETE FOR USER {user_id} ===") return recommendations except Exception as e: logger.error(f"Error in similarity computation: {e}") logger.exception("Full traceback:") return []