Spaces:
Sleeping
Sleeping
| import json | |
| import logging | |
| import os | |
| from pathlib import Path | |
| import xgboost as xgb | |
| import numpy as np | |
| from decimal import Decimal | |
| logger = logging.getLogger(__name__) | |
| class RefundModelService: | |
| """Service to predict refund percentage using XGBoost model""" | |
| _instance = None | |
| _model = None | |
| _feature_columns = None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| return cls._instance | |
| def __init__(self): | |
| if self._model is None: | |
| self._load_model() | |
| def _load_model(self): | |
| """Load the XGBoost model and feature columns""" | |
| try: | |
| # Get the model directory (adjust path based on your Django setup) | |
| base_dir = Path(__file__).resolve().parent.parent # backend/ | |
| model_path = base_dir / 'customer_score_models' / 'xgb_model.ubj' | |
| features_path = base_dir / 'customer_score_models' / 'feature_columns.json' | |
| # Load model | |
| self._model = xgb.Booster() | |
| self._model.load_model(str(model_path)) | |
| logger.info(f"XGBoost model loaded from {model_path}") | |
| # Load feature columns | |
| with open(features_path, 'r') as f: | |
| self._feature_columns = json.load(f) | |
| logger.info(f"Feature columns loaded: {len(self._feature_columns)} features") | |
| except Exception as e: | |
| logger.error(f"Error loading refund model: {str(e)}", exc_info=True) | |
| raise | |
| def _prepare_features(self, order_detail): | |
| """ | |
| Prepare feature vector from OrderDetail instance | |
| Expected features from feature_columns.json: | |
| - user_age | |
| - address_pincode | |
| - base_price | |
| - order_quantity | |
| - product_price | |
| - discount_applied | |
| - days_to_return | |
| - is_exchanged | |
| - risk_score (calculated) | |
| - discount_ratio (calculated) | |
| - price_inverse (calculated) | |
| - days_inverse (calculated) | |
| - product_name_* (one-hot encoded) | |
| - payment_method_* (one-hot encoded) | |
| """ | |
| try: | |
| order = order_detail.order | |
| user = order.user | |
| address = order.shipping_address | |
| product = order_detail.product | |
| # Initialize feature dictionary with all features set to 0 | |
| features = {col: 0 for col in self._feature_columns} | |
| # Basic numerical features | |
| features['user_age'] = user.user_age if user.user_age else 30 # default age | |
| features['address_pincode'] = int(address.pincode) if address.pincode.isdigit() else 0 | |
| features['base_price'] = float(product.base_price) | |
| features['order_quantity'] = int(order_detail.order_quantity) | |
| features['product_price'] = float(order_detail.product_price) | |
| features['discount_applied'] = float(order_detail.discount_applied) | |
| features['days_to_return'] = int(order_detail.days_to_return) | |
| features['is_exchanged'] = 1 if order_detail.is_exchanged else 0 | |
| # Calculated features | |
| # Risk score (simple heuristic based on return behavior) | |
| features['risk_score'] = self._calculate_risk_score(order_detail) | |
| # Discount ratio | |
| if features['product_price'] > 0: | |
| features['discount_ratio'] = features['discount_applied'] / features['product_price'] | |
| else: | |
| features['discount_ratio'] = 0 | |
| # Price inverse (1/price for scaling) | |
| if features['product_price'] > 0: | |
| features['price_inverse'] = 1.0 / features['product_price'] | |
| else: | |
| features['price_inverse'] = 0 | |
| # Days inverse | |
| if features['days_to_return'] > 0: | |
| features['days_inverse'] = 1.0 / features['days_to_return'] | |
| else: | |
| features['days_inverse'] = 0 | |
| # One-hot encode product_name | |
| product_name_normalized = product.product_name.strip() | |
| product_feature_name = f"product_name_{product_name_normalized}" | |
| if product_feature_name in features: | |
| features[product_feature_name] = 1 | |
| # One-hot encode payment_method | |
| payment_method_feature = f"payment_method_{order.payment_method}" | |
| if payment_method_feature in features: | |
| features[payment_method_feature] = 1 | |
| # Convert to array in the correct order | |
| feature_array = np.array([features[col] for col in self._feature_columns], dtype=np.float32) | |
| return feature_array | |
| except Exception as e: | |
| logger.error(f"Error preparing features: {str(e)}", exc_info=True) | |
| raise | |
| def _calculate_risk_score(self, order_detail): | |
| """ | |
| Calculate a risk score for the order | |
| This is a simple heuristic - adjust based on your business logic | |
| """ | |
| risk = 0.0 | |
| # Higher discount = higher risk | |
| discount_pct = float(order_detail.discount_applied) | |
| if discount_pct > 20: | |
| risk += 0.3 | |
| elif discount_pct > 10: | |
| risk += 0.15 | |
| # Quick return = higher risk | |
| if order_detail.days_to_return < 7: | |
| risk += 0.3 | |
| elif order_detail.days_to_return < 14: | |
| risk += 0.15 | |
| # High price items = higher risk | |
| if float(order_detail.product_price) > 1000: | |
| risk += 0.2 | |
| return min(risk, 1.0) # Cap at 1.0 | |
| def predict_refund_percentage(self, order_detail): | |
| """ | |
| Predict refund percentage for an order detail | |
| Args: | |
| order_detail: OrderDetail instance | |
| Returns: | |
| float: Refund percentage (0-100) | |
| """ | |
| try: | |
| # Prepare features | |
| feature_array = self._prepare_features(order_detail) | |
| # Create DMatrix for prediction | |
| dmatrix = xgb.DMatrix(feature_array.reshape(1, -1)) | |
| # Predict | |
| prediction = self._model.predict(dmatrix, validate_features=False)[0] | |
| # Ensure prediction is in valid range (0-100) | |
| refund_percentage = max(0.0, min(100.0, float(prediction))) | |
| logger.info(f"Predicted refund percentage for OrderDetail {order_detail.id}: {refund_percentage:.2f}%") | |
| return refund_percentage | |
| except Exception as e: | |
| logger.error(f"Error predicting refund percentage: {str(e)}", exc_info=True) | |
| # Return a default safe percentage on error | |
| return 80.0 # Default to 80% refund on error | |
| def calculate_refund_amount(self, order_detail): | |
| """ | |
| Calculate actual refund amount based on predicted percentage | |
| Args: | |
| order_detail: OrderDetail instance | |
| Returns: | |
| Decimal: Refund amount | |
| """ | |
| try: | |
| refund_percentage = self.predict_refund_percentage(order_detail) | |
| # Calculate base amount (price after discount) | |
| total_price = order_detail.product_price * order_detail.order_quantity | |
| discount_amount = (order_detail.discount_applied / Decimal(100)) * total_price | |
| final_price = total_price - discount_amount | |
| # Apply refund percentage | |
| refund_amount = final_price * Decimal((100 - refund_percentage)) | |
| # Round to 2 decimal places | |
| refund_amount = refund_amount.quantize(Decimal('0.01')) | |
| logger.info( | |
| f"OrderDetail {order_detail.id}: " | |
| f"Total={final_price}, Refund%={refund_percentage:.2f}, " | |
| f"RefundAmount={refund_amount}" | |
| ) | |
| return refund_amount, refund_percentage | |
| except Exception as e: | |
| logger.error(f"Error calculating refund amount: {str(e)}", exc_info=True) | |
| # Return full price on error as a safe default | |
| total = order_detail.product_price * order_detail.order_quantity | |
| return total, 100.0 | |
| # Singleton instance getter | |
| def get_refund_model_service(): | |
| """Get or create the singleton RefundModelService instance""" | |
| return RefundModelService() | |