| """
|
| Event-Centric Audience Segmentation Service
|
| Author: AI Generated
|
| Created: 2025-11-24 (Refactored for event-centric)
|
| Purpose: Cluster users per event and save to EventAudienceSegment
|
| """
|
|
|
| import numpy as np
|
| from typing import List, Dict, Tuple
|
| from datetime import datetime
|
| from sklearn.cluster import KMeans
|
| from bson import ObjectId
|
|
|
| from database import db
|
| from config import settings
|
| from models.event_models import EventAudienceSegment, MarketingContent
|
| from services.data_aggregation import UserDataAggregator
|
| from services.preprocessing import DataCleaner
|
| from services.monitoring import monitor
|
| from services.model_registry import registry
|
|
|
|
|
| class SegmentationService:
|
| """
|
| Event-centric user segmentation via K-Means clustering.
|
| """
|
|
|
| def __init__(self, event_code: str, n_clusters: int = None):
|
| """
|
| Initialize segmentation for a specific event.
|
|
|
| Args:
|
| event_code: Event identifier
|
| n_clusters: Number of segments (default: from settings)
|
| """
|
| self.event_code = event_code
|
| self.n_clusters = n_clusters or settings.N_CLUSTERS
|
| self.aggregator = UserDataAggregator(event_code)
|
| self.data_cleaner = DataCleaner()
|
| self.kmeans = None
|
| self.scaler = None
|
| self.feature_names = []
|
|
|
| def prepare_feature_matrix(self, user_data: List[Dict]) -> Tuple[np.ndarray, List[str]]:
|
| """
|
| Convert aggregated user data into feature matrix.
|
|
|
| Uses hybrid approach:
|
| - Event-specific: ticket_count, spend, is_follower
|
| - Global: RFM (user's overall power)
|
|
|
| Returns: (feature_matrix, user_ids)
|
| """
|
| feature_matrix = []
|
| user_ids = []
|
|
|
| for user in user_data:
|
|
|
| event_ticket_count = user.get('event_ticket_count', 0)
|
| event_total_spend = user.get('event_total_spend', 0)
|
| is_follower = user.get('is_follower', 0)
|
|
|
|
|
| global_recency = user.get('global_recency', 999999)
|
| global_frequency = user.get('global_frequency', 0)
|
| global_monetary = user.get('global_monetary', 0)
|
|
|
|
|
| features = [
|
| event_ticket_count,
|
| event_total_spend,
|
| is_follower,
|
| global_recency,
|
| global_frequency,
|
| global_monetary
|
| ]
|
|
|
| feature_matrix.append(features)
|
| user_ids.append(str(user['user_id']))
|
|
|
|
|
| self.feature_names = [
|
| 'event_tickets',
|
| 'event_spend',
|
| 'is_follower',
|
| 'global_recency',
|
| 'global_frequency',
|
| 'global_monetary'
|
| ]
|
|
|
| return np.array(feature_matrix), user_ids
|
|
|
| def fit_clustering(self, feature_matrix: np.ndarray) -> Tuple[KMeans, List[int]]:
|
| """
|
| Fit K-Means with preprocessing.
|
| """
|
|
|
| normalized_features, valid_indices = self.data_cleaner.clean_user_features(feature_matrix)
|
|
|
|
|
| self.scaler = self.data_cleaner.scaler
|
|
|
| print(f"🔄 Fitting K-Means with {self.n_clusters} clusters...")
|
| self.kmeans = KMeans(
|
| n_clusters=self.n_clusters,
|
| random_state=settings.RANDOM_STATE,
|
| n_init=10
|
| )
|
| self.kmeans.fit(normalized_features)
|
|
|
| print(f"✓ Clustering complete. Inertia: {self.kmeans.inertia_:.2f}")
|
|
|
| return self.kmeans, valid_indices
|
|
|
| def interpret_cluster(self, cluster_id: int) -> Dict:
|
| """
|
| Interpret cluster characteristics.
|
| """
|
| centroid = self.kmeans.cluster_centers_[cluster_id]
|
| centroid_original = self.scaler.inverse_transform([centroid])[0]
|
|
|
| interpretation = {}
|
| for i, feature_name in enumerate(self.feature_names):
|
| interpretation[feature_name] = float(centroid_original[i])
|
|
|
|
|
| event_spend = interpretation.get('event_spend', 0)
|
| event_tickets = interpretation.get('event_tickets', 0)
|
| global_monetary = interpretation.get('global_monetary', 0)
|
| is_follower = interpretation.get('is_follower', 0)
|
|
|
| segment_name = self._generate_segment_name(
|
| event_spend, event_tickets, global_monetary, is_follower
|
| )
|
|
|
| return {
|
| "segment_name": segment_name,
|
| "criteria": interpretation,
|
| "cluster_id": cluster_id
|
| }
|
|
|
| def _generate_segment_name(
|
| self,
|
| event_spend: float,
|
| event_tickets: float,
|
| global_monetary: float,
|
| is_follower: float
|
| ) -> str:
|
| """Generate Vietnamese segment name."""
|
|
|
|
|
| if event_spend > 500000 and event_tickets > 2:
|
| return "Khách Hàng VIP Sự Kiện"
|
|
|
|
|
| elif event_tickets > 0 and event_spend > 100000:
|
| return "Khách Hàng Tích Cực"
|
|
|
|
|
| elif is_follower > 0.5 and event_tickets == 0:
|
| return "Người Theo Dõi Tiềm Năng"
|
|
|
|
|
| elif global_monetary > 1000000 and event_spend < 100000:
|
| return "Khách Hàng Chưa Khai Phá"
|
|
|
|
|
| else:
|
| return "Khách Hàng Ít Tương Tác"
|
|
|
| def save_segments_to_db(
|
| self,
|
| cluster_interpretations: List[Dict],
|
| user_ids: List[str],
|
| labels: np.ndarray
|
| ) -> List[ObjectId]:
|
| """
|
| Save to EventAudienceSegment collection.
|
| """
|
| print("🔄 Saving event segments to database...")
|
|
|
| segment_ids = []
|
|
|
| for cluster_info in cluster_interpretations:
|
| cluster_id = cluster_info['cluster_id']
|
|
|
|
|
| cluster_user_indices = np.where(labels == cluster_id)[0]
|
| cluster_user_ids = [ObjectId(user_ids[i]) for i in cluster_user_indices]
|
|
|
| segment = EventAudienceSegment(
|
| event_code=self.event_code,
|
| segment_name=cluster_info['segment_name'],
|
| segment_type=cluster_info['segment_name'],
|
| user_count=len(cluster_user_ids),
|
| user_ids=cluster_user_ids,
|
| criteria=cluster_info['criteria'],
|
| marketing_content=None,
|
| created_at=datetime.utcnow(),
|
| last_updated=datetime.utcnow()
|
| )
|
|
|
| result = db.event_audience_segments.insert_one(
|
| segment.dict(by_alias=True, exclude={'id'})
|
| )
|
| segment_ids.append(result.inserted_id)
|
|
|
| print(f" ✓ '{segment.segment_name}': {len(cluster_user_ids)} users")
|
|
|
| return segment_ids
|
|
|
| def run_segmentation(self) -> List[ObjectId]:
|
| """
|
| Execute event-centric segmentation pipeline.
|
| """
|
| import time
|
| start_time = time.time()
|
|
|
| print("=" * 60)
|
| print(f"🚀 Segmenting Event: {self.event_code}")
|
| print("=" * 60)
|
|
|
| try:
|
|
|
| user_data = self.aggregator.aggregate_user_features()
|
|
|
| if len(user_data) < self.n_clusters:
|
| print(f"⚠ Not enough users ({len(user_data)}) for {self.n_clusters} clusters")
|
| return []
|
|
|
|
|
| feature_matrix, user_ids = self.prepare_feature_matrix(user_data)
|
| print(f"✓ Feature matrix: {feature_matrix.shape}")
|
|
|
|
|
| self.kmeans, valid_indices = self.fit_clustering(feature_matrix)
|
| user_ids = [user_ids[i] for i in valid_indices]
|
|
|
|
|
| normalized_features = self.scaler.transform(feature_matrix[valid_indices])
|
| labels = self.kmeans.labels_
|
|
|
|
|
| cluster_interpretations = [
|
| self.interpret_cluster(i) for i in range(self.n_clusters)
|
| ]
|
|
|
|
|
| segment_ids = self.save_segments_to_db(
|
| cluster_interpretations,
|
| user_ids,
|
| labels
|
| )
|
|
|
|
|
| metadata = {
|
| "event_code": self.event_code,
|
| "n_clusters": self.n_clusters,
|
| "n_users": len(user_ids),
|
| "inertia": float(self.kmeans.inertia_)
|
| }
|
| registry.save_model(
|
| self.kmeans,
|
| f"kmeans_{self.event_code}",
|
| metadata
|
| )
|
|
|
|
|
| execution_time = time.time() - start_time
|
| metrics = {
|
| "event_code": self.event_code,
|
| "n_users": len(user_ids),
|
| "n_segments": self.n_clusters,
|
| "inertia": float(self.kmeans.inertia_),
|
| "execution_time": execution_time,
|
| "centroids": self.kmeans.cluster_centers_.tolist()
|
| }
|
| monitor.log_segmentation_run(metrics)
|
|
|
| print("=" * 60)
|
| print("✅ Segmentation Complete!")
|
| print(f"⏱️ Time: {execution_time:.2f}s")
|
| print("=" * 60)
|
|
|
| return segment_ids
|
|
|
| except Exception as e:
|
| monitor.log_error("segmentation", e, {
|
| "event_code": self.event_code,
|
| "n_clusters": self.n_clusters
|
| })
|
| raise
|
|
|