delta-recommend / recommender.py
rairo's picture
Update recommender.py
568991e verified
import requests
import random
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import logging
# Setup logging
logger = logging.getLogger(__name__)
DELTA_API = "https://delta-api.pricelyst.co.zw"
HEADERS = {"Accept": "application/json"}
# ----------------------------
# FETCHERS
# ----------------------------
def fetch_all_products_paginated(max_pages=5):
"""
Fetch products from multiple pages if needed.
max_pages: limit to prevent excessive API calls in production
"""
logger.info(f"Fetching products from multiple pages (max: {max_pages})...")
all_products = []
current_page = 1
while current_page <= max_pages:
try:
url = f"{DELTA_API}/api/products?page={current_page}"
logger.debug(f"API call: GET {url}")
res = requests.get(url, headers=HEADERS, timeout=30)
res.raise_for_status()
raw_response = res.json()
data = raw_response.get("data", {})
if isinstance(data, dict) and 'products' in data:
products = data.get('products', [])
total_pages = data.get('totalPages', 1)
logger.info(f"Page {current_page}: Retrieved {len(products)} products")
all_products.extend(products)
# Stop if we've reached the last page or if no products returned
if current_page >= total_pages or not products:
logger.info(f"Reached end at page {current_page} of {total_pages}")
break
current_page += 1
else:
logger.warning(f"Unexpected response format on page {current_page}")
break
except Exception as e:
logger.error(f"Error fetching page {current_page}: {e}")
break
logger.info(f"Total products fetched across {current_page} pages: {len(all_products)}")
return all_products
def fetch_all_products():
logger.info("Fetching all products...")
try:
url = f"{DELTA_API}/api/products"
logger.debug(f"API call: GET {url}")
res = requests.get(url, headers=HEADERS, timeout=30)
res.raise_for_status() # Raise exception for bad status codes
# Debug the raw response
raw_response = res.json()
logger.debug(f"Raw API response type: {type(raw_response)}")
logger.debug(f"Raw API response keys: {list(raw_response.keys()) if isinstance(raw_response, dict) else 'Not a dict'}")
data = raw_response.get("data", [])
logger.debug(f"Data type: {type(data)}")
# Handle paginated response structure
if isinstance(data, dict) and 'products' in data:
logger.info("Detected paginated response format")
products = data.get('products', [])
total_count = data.get('totalItemCount', 0)
current_page = data.get('currentPage', 1)
total_pages = data.get('totalPages', 1)
logger.info(f"Pagination info: Page {current_page}/{total_pages}, Total items: {total_count}")
logger.info(f"Retrieved {len(products)} products from current page")
# If there are multiple pages and we have few products, fetch more pages
if total_pages > 1 and len(products) < 50:
logger.info("Multiple pages available, fetching additional pages...")
return fetch_all_products_paginated(max_pages=min(5, total_pages))
# For now, just use the first page. In production, you might want to fetch all pages
if isinstance(products, list):
if products:
sample_ids = [p.get('id', 'no_id') for p in products[:3]]
logger.debug(f"Sample product IDs: {sample_ids}")
return products
else:
logger.error(f"Products field is not a list: {type(products)}")
return []
elif isinstance(data, list):
logger.info("Direct list response format")
products = data
if products:
sample_ids = [p.get('id', 'no_id') for p in products[:3]]
logger.debug(f"Sample product IDs: {sample_ids}")
logger.info(f"Retrieved {len(products)} products")
return products
else:
logger.error(f"Unexpected data format: {type(data)}, content: {data}")
return []
except requests.exceptions.RequestException as e:
logger.error(f"Failed to fetch products: {e}")
return []
except Exception as e:
logger.error(f"Unexpected error fetching products: {e}")
logger.exception("Full traceback:")
return []
def fetch_user_addresses(user_id):
logger.info(f"Fetching addresses for user {user_id}...")
try:
url = f"{DELTA_API}/api/addresses/{user_id}"
logger.debug(f"API call: GET {url}")
res = requests.get(url, headers=HEADERS, timeout=30)
res.raise_for_status()
# Debug the raw response
raw_response = res.json()
logger.debug(f"Addresses raw response type: {type(raw_response)}")
data = raw_response.get("data", [])
logger.debug(f"Addresses data type: {type(data)}")
logger.info(f"Found {len(data) if hasattr(data, '__len__') else 'unknown'} addresses for user {user_id}")
if isinstance(data, list) and data:
safe_addresses = []
for addr in data:
if isinstance(addr, dict):
city = addr.get('city', 'unknown')
suburb = addr.get('suburb', 'unknown')
safe_addresses.append(f"{city}, {suburb}")
else:
safe_addresses.append(f"invalid_address_type_{type(addr)}")
logger.debug(f"User {user_id} addresses: {safe_addresses}")
return data if isinstance(data, list) else []
except requests.exceptions.RequestException as e:
logger.error(f"Failed to fetch addresses for user {user_id}: {e}")
return []
except Exception as e:
logger.error(f"Unexpected error fetching addresses for user {user_id}: {e}")
logger.exception("Full traceback:")
return []
def fetch_all_users():
logger.info("Fetching all users...")
try:
url = f"{DELTA_API}/api/users"
logger.debug(f"API call: GET {url}")
res = requests.get(url, headers=HEADERS, timeout=30)
res.raise_for_status()
# Debug the raw response
raw_response = res.json()
logger.debug(f"Users raw response type: {type(raw_response)}")
data = raw_response.get("data", [])
logger.debug(f"Users data type: {type(data)}")
logger.info(f"Successfully fetched {len(data) if hasattr(data, '__len__') else 'unknown'} users")
return data if isinstance(data, list) else []
except requests.exceptions.RequestException as e:
logger.error(f"Failed to fetch users: {e}")
return []
except Exception as e:
logger.error(f"Unexpected error fetching users: {e}")
logger.exception("Full traceback:")
return []
def fetch_user_cart_items(user_id):
logger.info(f"Fetching cart items for user {user_id}...")
try:
# Get user's carts
carts_url = f"{DELTA_API}/api/carts?user_id={user_id}"
logger.debug(f"API call: GET {carts_url}")
res = requests.get(carts_url, headers=HEADERS, timeout=30)
res.raise_for_status()
carts = res.json().get("data", [])
logger.info(f"Found {len(carts)} carts for user {user_id}")
product_ids = set()
for cart in carts:
cart_id = cart.get("id")
logger.debug(f"Fetching items for cart {cart_id}")
items_url = f"{DELTA_API}/api/cart-items?cart_id={cart_id}"
items_res = requests.get(items_url, headers=HEADERS, timeout=30)
items_res.raise_for_status()
items = items_res.json().get("data", [])
cart_product_ids = [item["product_id"] for item in items if "product_id" in item]
product_ids.update(cart_product_ids)
logger.debug(f"Cart {cart_id} contains {len(cart_product_ids)} products")
result = list(product_ids)
logger.info(f"User {user_id} has {len(result)} unique products in cart history")
logger.debug(f"User {user_id} product IDs: {result[:10]}...") # Show first 10
return result
except requests.exceptions.RequestException as e:
logger.error(f"Failed to fetch cart items for user {user_id}: {e}")
return []
except Exception as e:
logger.error(f"Unexpected error fetching cart items for user {user_id}: {e}")
return []
def fetch_users_by_location(city=None, suburb=None):
logger.info(f"Fetching users by location - city: {city}, suburb: {suburb}")
all_users = fetch_all_users()
if not all_users:
logger.warning("No users found, cannot filter by location")
return []
matching_users = []
checked_users = 0
for u in all_users:
uid = u.get("id")
if not uid:
continue
checked_users += 1
addresses = fetch_user_addresses(uid)
user_matches = False
for addr in addresses:
addr_city = addr.get("city")
addr_suburb = addr.get("suburb")
if city and addr_city == city:
matching_users.append(uid)
user_matches = True
logger.debug(f"User {uid} matches city: {city}")
break
elif suburb and addr_suburb == suburb:
matching_users.append(uid)
user_matches = True
logger.debug(f"User {uid} matches suburb: {suburb}")
break
if checked_users % 10 == 0: # Log progress every 10 users
logger.debug(f"Checked {checked_users}/{len(all_users)} users, found {len(matching_users)} matches")
result = list(set(matching_users))
logger.info(f"Found {len(result)} users matching location criteria")
return result
# ----------------------------
# RECOMMENDER CORE
# ----------------------------
def recommend_products(user_id, top_n=5):
logger.info(f"=== STARTING RECOMMENDATION FOR USER {user_id} ===")
logger.info(f"Parameters: top_n={top_n}")
# Fetch all products
all_products = fetch_all_products()
if not all_products:
logger.error("No products available - cannot generate recommendations")
return []
logger.info(f"Working with {len(all_products)} total products")
# Step 1: Get user location
logger.info("Step 1: Getting user location...")
user_addresses = fetch_user_addresses(user_id)
user_city = user_addresses[0]["city"] if user_addresses else None
user_suburb = user_addresses[0]["suburb"] if user_addresses else None
logger.info(f"User location: city={user_city}, suburb={user_suburb}")
# Step 2: Get user's history
logger.info("Step 2: Getting user purchase history...")
user_history = fetch_user_cart_items(user_id)
logger.info(f"User has {len(user_history)} products in history")
# Cold-start fallback
if not user_history:
logger.info("Step 2a: No user history found - using cold-start approach")
local_users = fetch_users_by_location(city=user_city, suburb=user_suburb)
logger.info(f"Found {len(local_users)} local users for cold-start")
peer_history = []
for uid in local_users[:10]: # Limit to first 10 users for performance
peer_items = fetch_user_cart_items(uid)
peer_history.extend(peer_items)
logger.debug(f"Local user {uid} contributed {len(peer_items)} items")
peer_history = list(set(peer_history))
logger.info(f"Collected {len(peer_history)} unique products from local users")
if not peer_history:
logger.info("No peer history found - returning random products")
# FIX: Ensure all_products is a list and handle the random sampling properly
if isinstance(all_products, list) and len(all_products) > 0:
sample_size = min(top_n, len(all_products))
random_products = random.sample(all_products, sample_size)
logger.info(f"Returning {len(random_products)} random products")
return random_products
else:
logger.error(f"all_products is not a proper list: type={type(all_products)}, len={len(all_products) if hasattr(all_products, '__len__') else 'N/A'}")
return []
else:
user_history = peer_history
logger.info(f"Using peer history: {len(user_history)} products")
# Step 3: Content vectorization
logger.info("Step 3: Building content vectors...")
product_map = {p["id"]: p for p in all_products}
product_ids = list(product_map.keys())
logger.info(f"Created product map with {len(product_map)} products")
def product_text(p):
# Extract text from various fields, handling nested structures
text_parts = []
# Basic product info
text_parts.append(p.get('name', ''))
text_parts.append(p.get('description', ''))
text_parts.append(p.get('product_code', ''))
# Category information (nested)
category = p.get('category', {})
if isinstance(category, dict):
text_parts.append(category.get('name', ''))
text_parts.append(category.get('code', ''))
# Brand information (nested)
brand = p.get('brand', {})
if isinstance(brand, dict):
text_parts.append(brand.get('name', ''))
text_parts.append(brand.get('brand_code', ''))
# Multiple categories if available
categories = p.get('categories', [])
if isinstance(categories, list):
for cat in categories:
if isinstance(cat, dict):
text_parts.append(cat.get('name', ''))
text_parts.append(cat.get('code', ''))
# Join all non-empty text parts
text = ' '.join([part for part in text_parts if part and isinstance(part, str)])
return text.strip()
try:
product_texts = [product_text(product_map[pid]) for pid in product_ids]
logger.info(f"Generated text representations for {len(product_texts)} products")
# Filter out empty texts
valid_indices = [i for i, text in enumerate(product_texts) if text.strip()]
if not valid_indices:
logger.error("No valid product texts found for vectorization")
return []
valid_product_ids = [product_ids[i] for i in valid_indices]
valid_product_texts = [product_texts[i] for i in valid_indices]
logger.info(f"Using {len(valid_product_texts)} products with valid text content")
vectorizer = TfidfVectorizer(stop_words="english", max_features=1000)
tfidf_matrix = vectorizer.fit_transform(valid_product_texts)
logger.info(f"TF-IDF matrix shape: {tfidf_matrix.shape}")
except Exception as e:
logger.error(f"Error in vectorization: {e}")
return []
# Step 4: Compute similarity
logger.info("Step 4: Computing similarities...")
try:
# Find user history items that exist in our valid product set
user_history_valid = [pid for pid in user_history if pid in valid_product_ids]
logger.info(f"User history items in valid set: {len(user_history_valid)}")
if not user_history_valid:
logger.warning("No user history items found in valid product set - returning random products")
sample_size = min(top_n, len(all_products))
return random.sample(all_products, sample_size)
# Get indices of user history items
user_indices = [valid_product_ids.index(pid) for pid in user_history_valid]
logger.debug(f"User product indices: {user_indices[:5]}...") # Show first 5
# Compute user preference vector (mean of user history items)
user_vectors = tfidf_matrix[user_indices]
user_vec = np.mean(user_vectors, axis=0)
logger.info(f"User preference vector shape: {user_vec.shape}")
# Compute similarities
sim_scores = cosine_similarity(user_vec, tfidf_matrix).flatten()
logger.info(f"Computed {len(sim_scores)} similarity scores")
logger.debug(f"Similarity score range: {sim_scores.min():.4f} to {sim_scores.max():.4f}")
# Step 5: Rank and filter
ranked_indices = np.argsort(sim_scores)[::-1]
logger.info("Step 5: Ranking products and filtering...")
recommendations = []
considered_products = 0
for idx in ranked_indices:
pid = valid_product_ids[idx]
score = sim_scores[idx]
considered_products += 1
if pid not in user_history:
product = product_map[pid]
# Add similarity score for debugging
if isinstance(product, dict):
product['similarity_score'] = float(score)
recommendations.append(product)
logger.debug(f"Added recommendation {len(recommendations)}: Product {pid} (score: {score:.4f})")
if len(recommendations) >= top_n:
break
logger.info(f"Considered {considered_products} products, generated {len(recommendations)} recommendations")
if recommendations:
rec_ids = [r.get('id') for r in recommendations if isinstance(r, dict)]
rec_scores = [r.get('similarity_score', 0) for r in recommendations if isinstance(r, dict)]
logger.info(f"Final recommendations: IDs {rec_ids}, Scores {rec_scores}")
logger.info(f"=== RECOMMENDATION COMPLETE FOR USER {user_id} ===")
return recommendations
except Exception as e:
logger.error(f"Error in similarity computation: {e}")
logger.exception("Full traceback:")
return []