|
|
"""
|
|
|
Data Aggregation Pipeline for Event-Centric User Segmentation
|
|
|
Author: AI Generated
|
|
|
Created: 2025-11-24 (Fixed for actual MongoDB schema)
|
|
|
Purpose: Aggregate user features based on EMBEDDED UserFollows and nested comments
|
|
|
"""
|
|
|
|
|
|
from typing import List, Dict
|
|
|
from datetime import datetime
|
|
|
from bson import ObjectId
|
|
|
from database import db
|
|
|
from config import settings
|
|
|
|
|
|
|
|
|
class UserDataAggregator:
|
|
|
"""
|
|
|
Aggregates user behavioral data for segmentation per event.
|
|
|
CORRECTED to use:
|
|
|
- User.UserFollows (embedded array)
|
|
|
- PostSocialMedia.Images.UserCommentPosts (nested)
|
|
|
"""
|
|
|
|
|
|
def __init__(self, event_code: str):
|
|
|
"""
|
|
|
Initialize aggregator for a specific event.
|
|
|
|
|
|
Args:
|
|
|
event_code: Event identifier (ObjectId string)
|
|
|
"""
|
|
|
self.event_code = event_code
|
|
|
self.db = db
|
|
|
|
|
|
def aggregate_user_features(self) -> List[Dict]:
|
|
|
"""
|
|
|
Aggregate user features for the specified event.
|
|
|
|
|
|
Users are considered "interacted" if they:
|
|
|
1. Bought tickets (Payment.eventCode)
|
|
|
2. Follow event (User.UserFollows.eventCode)
|
|
|
3. Commented on posts (PostSocialMedia.Images.UserCommentPosts where PostSocialMedia.eventCode)
|
|
|
|
|
|
Returns: List of user feature vectors
|
|
|
"""
|
|
|
|
|
|
pipeline = [
|
|
|
|
|
|
{
|
|
|
"$match": {
|
|
|
"status": "Active"
|
|
|
}
|
|
|
},
|
|
|
|
|
|
|
|
|
{
|
|
|
"$lookup": {
|
|
|
"from": settings.COLLECTION_PAYMENTS,
|
|
|
"let": {"user_id": "$_id"},
|
|
|
"pipeline": [
|
|
|
{
|
|
|
"$match": {
|
|
|
"$expr": {
|
|
|
"$and": [
|
|
|
{"$eq": ["$userId", "$$user_id"]},
|
|
|
{"$eq": ["$eventCode", ObjectId(self.event_code)]},
|
|
|
{"$eq": ["$status", "Completed"]}
|
|
|
]
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
],
|
|
|
"as": "event_tickets"
|
|
|
}
|
|
|
},
|
|
|
|
|
|
|
|
|
{
|
|
|
"$addFields": {
|
|
|
"is_following_event": {
|
|
|
"$cond": {
|
|
|
"if": {
|
|
|
"$in": [
|
|
|
ObjectId(self.event_code),
|
|
|
{
|
|
|
"$map": {
|
|
|
"input": {"$ifNull": ["$UserFollows", []]},
|
|
|
"as": "follow",
|
|
|
"in": "$$follow.eventCode"
|
|
|
}
|
|
|
}
|
|
|
]
|
|
|
},
|
|
|
"then": 1,
|
|
|
"else": 0
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
},
|
|
|
|
|
|
|
|
|
{
|
|
|
"$lookup": {
|
|
|
"from": settings.COLLECTION_PAYMENTS,
|
|
|
"let": {"user_id": "$_id"},
|
|
|
"pipeline": [
|
|
|
{
|
|
|
"$match": {
|
|
|
"$expr": {
|
|
|
"$and": [
|
|
|
{"$eq": ["$userId", "$$user_id"]},
|
|
|
{"$eq": ["$status", "Completed"]}
|
|
|
]
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
],
|
|
|
"as": "all_payments"
|
|
|
}
|
|
|
},
|
|
|
|
|
|
|
|
|
{
|
|
|
"$match": {
|
|
|
"$or": [
|
|
|
{"event_tickets": {"$ne": []}},
|
|
|
{"is_following_event": 1}
|
|
|
]
|
|
|
}
|
|
|
},
|
|
|
|
|
|
|
|
|
{
|
|
|
"$addFields": {
|
|
|
|
|
|
"event_ticket_count": {"$size": "$event_tickets"},
|
|
|
"event_total_spend": {"$sum": "$event_tickets.amount"},
|
|
|
|
|
|
|
|
|
"global_total_spend": {"$sum": "$all_payments.amount"},
|
|
|
"global_transaction_count": {"$size": "$all_payments"},
|
|
|
"global_last_transaction": {"$max": "$all_payments.transactionDate"}
|
|
|
}
|
|
|
},
|
|
|
|
|
|
|
|
|
{
|
|
|
"$addFields": {
|
|
|
"global_recency_days": {
|
|
|
"$cond": {
|
|
|
"if": {"$ne": ["$global_last_transaction", None]},
|
|
|
"then": {
|
|
|
"$dateDiff": {
|
|
|
"startDate": "$global_last_transaction",
|
|
|
"endDate": "$$NOW",
|
|
|
"unit": "day"
|
|
|
}
|
|
|
},
|
|
|
"else": 999999
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
},
|
|
|
|
|
|
|
|
|
{
|
|
|
"$project": {
|
|
|
"_id": 1,
|
|
|
"user_id": "$_id",
|
|
|
"email": 1,
|
|
|
"firstName": "$firstName",
|
|
|
"lastName": "$lastName",
|
|
|
|
|
|
|
|
|
"event_ticket_count": 1,
|
|
|
"event_total_spend": 1,
|
|
|
"is_follower": "$is_following_event",
|
|
|
|
|
|
|
|
|
"global_recency": "$global_recency_days",
|
|
|
"global_frequency": "$global_transaction_count",
|
|
|
"global_monetary": "$global_total_spend"
|
|
|
}
|
|
|
}
|
|
|
]
|
|
|
|
|
|
print(f"🔄 Running aggregation for event: {self.event_code}")
|
|
|
results = list(self.db.users.aggregate(pipeline, allowDiskUse=True))
|
|
|
print(f"✓ Found {len(results)} users who interacted with this event")
|
|
|
|
|
|
return results
|
|
|
|