Aus_F / services /data_aggregation.py
minhvtt's picture
Upload 19 files
ea06065 verified
"""
Data Aggregation Pipeline for Event-Centric User Segmentation
Author: AI Generated
Created: 2025-11-24 (Fixed for actual MongoDB schema)
Purpose: Aggregate user features based on EMBEDDED UserFollows and nested comments
"""
from typing import List, Dict
from datetime import datetime
from bson import ObjectId
from database import db
from config import settings
class UserDataAggregator:
"""
Aggregates user behavioral data for segmentation per event.
CORRECTED to use:
- User.UserFollows (embedded array)
- PostSocialMedia.Images.UserCommentPosts (nested)
"""
def __init__(self, event_code: str):
"""
Initialize aggregator for a specific event.
Args:
event_code: Event identifier (ObjectId string)
"""
self.event_code = event_code
self.db = db
def aggregate_user_features(self) -> List[Dict]:
"""
Aggregate user features for the specified event.
Users are considered "interacted" if they:
1. Bought tickets (Payment.eventCode)
2. Follow event (User.UserFollows.eventCode)
3. Commented on posts (PostSocialMedia.Images.UserCommentPosts where PostSocialMedia.eventCode)
Returns: List of user feature vectors
"""
pipeline = [
# Stage 1: Start with Active users only
{
"$match": {
"status": "Active"
}
},
# Stage 2: Lookup ticket purchases for THIS EVENT
{
"$lookup": {
"from": settings.COLLECTION_PAYMENTS,
"let": {"user_id": "$_id"},
"pipeline": [
{
"$match": {
"$expr": {
"$and": [
{"$eq": ["$userId", "$$user_id"]},
{"$eq": ["$eventCode", ObjectId(self.event_code)]},
{"$eq": ["$status", "Completed"]}
]
}
}
}
],
"as": "event_tickets"
}
},
# Stage 3: Check if user follows THIS EVENT (embedded UserFollows)
{
"$addFields": {
"is_following_event": {
"$cond": {
"if": {
"$in": [
ObjectId(self.event_code),
{
"$map": {
"input": {"$ifNull": ["$UserFollows", []]},
"as": "follow",
"in": "$$follow.eventCode"
}
}
]
},
"then": 1,
"else": 0
}
}
}
},
# Stage 4: Lookup ALL payments for global RFM
{
"$lookup": {
"from": settings.COLLECTION_PAYMENTS,
"let": {"user_id": "$_id"},
"pipeline": [
{
"$match": {
"$expr": {
"$and": [
{"$eq": ["$userId", "$$user_id"]},
{"$eq": ["$status", "Completed"]}
]
}
}
}
],
"as": "all_payments"
}
},
# Stage 5: Filter users who interacted with THIS EVENT
{
"$match": {
"$or": [
{"event_tickets": {"$ne": []}}, # Bought tickets
{"is_following_event": 1} # Following event
]
}
},
# Stage 6: Calculate event-specific metrics
{
"$addFields": {
# Event-specific features
"event_ticket_count": {"$size": "$event_tickets"},
"event_total_spend": {"$sum": "$event_tickets.amount"},
# Global RFM
"global_total_spend": {"$sum": "$all_payments.amount"},
"global_transaction_count": {"$size": "$all_payments"},
"global_last_transaction": {"$max": "$all_payments.transactionDate"}
}
},
# Stage 7: Calculate recency
{
"$addFields": {
"global_recency_days": {
"$cond": {
"if": {"$ne": ["$global_last_transaction", None]},
"then": {
"$dateDiff": {
"startDate": "$global_last_transaction",
"endDate": "$$NOW",
"unit": "day"
}
},
"else": 999999
}
}
}
},
# Stage 8: Project final feature vector
{
"$project": {
"_id": 1,
"user_id": "$_id",
"email": 1,
"firstName": "$firstName",
"lastName": "$lastName",
# Event-specific features
"event_ticket_count": 1,
"event_total_spend": 1,
"is_follower": "$is_following_event",
# Global features
"global_recency": "$global_recency_days",
"global_frequency": "$global_transaction_count",
"global_monetary": "$global_total_spend"
}
}
]
print(f"🔄 Running aggregation for event: {self.event_code}")
results = list(self.db.users.aggregate(pipeline, allowDiskUse=True))
print(f"✓ Found {len(results)} users who interacted with this event")
return results