Aus_F / services /segmentation_service.py
minhvtt's picture
Upload 15 files
34b2632 verified
"""
Event-Centric Audience Segmentation Service
Author: AI Generated
Created: 2025-11-24 (Refactored for event-centric)
Purpose: Cluster users per event and save to EventAudienceSegment
"""
import numpy as np
from typing import List, Dict, Tuple
from datetime import datetime
from sklearn.cluster import KMeans
from bson import ObjectId
from database import db
from config import settings
from models.event_models import EventAudienceSegment, MarketingContent
from services.data_aggregation import UserDataAggregator
from services.preprocessing import DataCleaner
from services.monitoring import monitor
from services.model_registry import registry
class SegmentationService:
"""
Event-centric user segmentation via K-Means clustering.
"""
def __init__(self, event_code: str, n_clusters: int = None):
"""
Initialize segmentation for a specific event.
Args:
event_code: Event identifier
n_clusters: Number of segments (default: from settings)
"""
self.event_code = event_code
self.n_clusters = n_clusters or settings.N_CLUSTERS
self.aggregator = UserDataAggregator(event_code)
self.data_cleaner = DataCleaner()
self.kmeans = None
self.scaler = None
self.feature_names = []
def prepare_feature_matrix(self, user_data: List[Dict]) -> Tuple[np.ndarray, List[str]]:
"""
Convert aggregated user data into feature matrix.
Uses hybrid approach:
- Event-specific: ticket_count, spend, is_follower
- Global: RFM (user's overall power)
Returns: (feature_matrix, user_ids)
"""
feature_matrix = []
user_ids = []
for user in user_data:
# Event-specific features
event_ticket_count = user.get('event_ticket_count', 0)
event_total_spend = user.get('event_total_spend', 0)
is_follower = user.get('is_follower', 0)
# Global RFM (user power)
global_recency = user.get('global_recency', 999999)
global_frequency = user.get('global_frequency', 0)
global_monetary = user.get('global_monetary', 0)
# Combine features
features = [
event_ticket_count,
event_total_spend,
is_follower,
global_recency,
global_frequency,
global_monetary
]
feature_matrix.append(features)
user_ids.append(str(user['user_id']))
# Store feature names
self.feature_names = [
'event_tickets',
'event_spend',
'is_follower',
'global_recency',
'global_frequency',
'global_monetary'
]
return np.array(feature_matrix), user_ids
def fit_clustering(self, feature_matrix: np.ndarray) -> Tuple[KMeans, List[int]]:
"""
Fit K-Means with preprocessing.
"""
# Clean and normalize
normalized_features, valid_indices = self.data_cleaner.clean_user_features(feature_matrix)
# Save scaler for later use
self.scaler = self.data_cleaner.scaler
print(f"🔄 Fitting K-Means with {self.n_clusters} clusters...")
self.kmeans = KMeans(
n_clusters=self.n_clusters,
random_state=settings.RANDOM_STATE,
n_init=10
)
self.kmeans.fit(normalized_features)
print(f"✓ Clustering complete. Inertia: {self.kmeans.inertia_:.2f}")
return self.kmeans, valid_indices
def interpret_cluster(self, cluster_id: int) -> Dict:
"""
Interpret cluster characteristics.
"""
centroid = self.kmeans.cluster_centers_[cluster_id]
centroid_original = self.scaler.inverse_transform([centroid])[0]
interpretation = {}
for i, feature_name in enumerate(self.feature_names):
interpretation[feature_name] = float(centroid_original[i])
# Generate segment name
event_spend = interpretation.get('event_spend', 0)
event_tickets = interpretation.get('event_tickets', 0)
global_monetary = interpretation.get('global_monetary', 0)
is_follower = interpretation.get('is_follower', 0)
segment_name = self._generate_segment_name(
event_spend, event_tickets, global_monetary, is_follower
)
return {
"segment_name": segment_name,
"criteria": interpretation,
"cluster_id": cluster_id
}
def _generate_segment_name(
self,
event_spend: float,
event_tickets: float,
global_monetary: float,
is_follower: float
) -> str:
"""Generate Vietnamese segment name."""
# High spenders on this event
if event_spend > 500000 and event_tickets > 2:
return "Khách Hàng VIP Sự Kiện"
# Bought tickets but moderate spend
elif event_tickets > 0 and event_spend > 100000:
return "Khách Hàng Tích Cực"
# Only followers, no tickets yet
elif is_follower > 0.5 and event_tickets == 0:
return "Người Theo Dõi Tiềm Năng"
# High global value but low event engagement
elif global_monetary > 1000000 and event_spend < 100000:
return "Khách Hàng Chưa Khai Phá"
# Low event engagement
else:
return "Khách Hàng Ít Tương Tác"
def save_segments_to_db(
self,
cluster_interpretations: List[Dict],
user_ids: List[str],
labels: np.ndarray
) -> List[ObjectId]:
"""
Save to EventAudienceSegment collection.
"""
print("🔄 Saving event segments to database...")
segment_ids = []
for cluster_info in cluster_interpretations:
cluster_id = cluster_info['cluster_id']
# Get user_ids in this cluster
cluster_user_indices = np.where(labels == cluster_id)[0]
cluster_user_ids = [ObjectId(user_ids[i]) for i in cluster_user_indices]
segment = EventAudienceSegment(
event_code=self.event_code,
segment_name=cluster_info['segment_name'],
segment_type=cluster_info['segment_name'], # Can categorize further
user_count=len(cluster_user_ids),
user_ids=cluster_user_ids,
criteria=cluster_info['criteria'],
marketing_content=None, # Will be generated by GenAI
created_at=datetime.utcnow(),
last_updated=datetime.utcnow()
)
result = db.event_audience_segments.insert_one(
segment.dict(by_alias=True, exclude={'id'})
)
segment_ids.append(result.inserted_id)
print(f" ✓ '{segment.segment_name}': {len(cluster_user_ids)} users")
return segment_ids
def run_segmentation(self) -> List[ObjectId]:
"""
Execute event-centric segmentation pipeline.
"""
import time
start_time = time.time()
print("=" * 60)
print(f"🚀 Segmenting Event: {self.event_code}")
print("=" * 60)
try:
# Step 1: Aggregate event users
user_data = self.aggregator.aggregate_user_features()
if len(user_data) < self.n_clusters:
print(f"⚠ Not enough users ({len(user_data)}) for {self.n_clusters} clusters")
return []
# Step 2: Prepare features
feature_matrix, user_ids = self.prepare_feature_matrix(user_data)
print(f"✓ Feature matrix: {feature_matrix.shape}")
# Step 3: Clustering
self.kmeans, valid_indices = self.fit_clustering(feature_matrix)
user_ids = [user_ids[i] for i in valid_indices]
# Step 4: Get labels
normalized_features = self.scaler.transform(feature_matrix[valid_indices])
labels = self.kmeans.labels_
# Step 5: Interpret clusters
cluster_interpretations = [
self.interpret_cluster(i) for i in range(self.n_clusters)
]
# Step 6: Save to EventAudienceSegment
segment_ids = self.save_segments_to_db(
cluster_interpretations,
user_ids,
labels
)
# Step 7: Save model
metadata = {
"event_code": self.event_code,
"n_clusters": self.n_clusters,
"n_users": len(user_ids),
"inertia": float(self.kmeans.inertia_)
}
registry.save_model(
self.kmeans,
f"kmeans_{self.event_code}",
metadata
)
# Step 8: Monitoring
execution_time = time.time() - start_time
metrics = {
"event_code": self.event_code,
"n_users": len(user_ids),
"n_segments": self.n_clusters,
"inertia": float(self.kmeans.inertia_),
"execution_time": execution_time,
"centroids": self.kmeans.cluster_centers_.tolist()
}
monitor.log_segmentation_run(metrics)
print("=" * 60)
print("✅ Segmentation Complete!")
print(f"⏱️ Time: {execution_time:.2f}s")
print("=" * 60)
return segment_ids
except Exception as e:
monitor.log_error("segmentation", e, {
"event_code": self.event_code,
"n_clusters": self.n_clusters
})
raise