""" Data Aggregation Pipeline for Event-Centric User Segmentation Author: AI Generated Created: 2025-11-24 (Fixed for actual MongoDB schema) Purpose: Aggregate user features based on EMBEDDED UserFollows and nested comments """ from typing import List, Dict from datetime import datetime from bson import ObjectId from database import db from config import settings class UserDataAggregator: """ Aggregates user behavioral data for segmentation per event. CORRECTED to use: - User.UserFollows (embedded array) - PostSocialMedia.Images.UserCommentPosts (nested) """ def __init__(self, event_code: str): """ Initialize aggregator for a specific event. Args: event_code: Event identifier (ObjectId string) """ self.event_code = event_code self.db = db def aggregate_user_features(self) -> List[Dict]: """ Aggregate user features for the specified event. Users are considered "interacted" if they: 1. Bought tickets (Payment.eventCode) 2. Follow event (User.UserFollows.eventCode) 3. Commented on posts (PostSocialMedia.Images.UserCommentPosts where PostSocialMedia.eventCode) Returns: List of user feature vectors """ pipeline = [ # Stage 1: Start with Active users only { "$match": { "status": "Active" } }, # Stage 2: Lookup ticket purchases for THIS EVENT { "$lookup": { "from": settings.COLLECTION_PAYMENTS, "let": {"user_id": "$_id"}, "pipeline": [ { "$match": { "$expr": { "$and": [ {"$eq": ["$userId", "$$user_id"]}, {"$eq": ["$eventCode", ObjectId(self.event_code)]}, {"$eq": ["$status", "Completed"]} ] } } } ], "as": "event_tickets" } }, # Stage 3: Check if user follows THIS EVENT (embedded UserFollows) { "$addFields": { "is_following_event": { "$cond": { "if": { "$in": [ ObjectId(self.event_code), { "$map": { "input": {"$ifNull": ["$UserFollows", []]}, "as": "follow", "in": "$$follow.eventCode" } } ] }, "then": 1, "else": 0 } } } }, # Stage 4: Lookup ALL payments for global RFM { "$lookup": { "from": settings.COLLECTION_PAYMENTS, "let": {"user_id": "$_id"}, "pipeline": [ { "$match": { "$expr": { "$and": [ {"$eq": ["$userId", "$$user_id"]}, {"$eq": ["$status", "Completed"]} ] } } } ], "as": "all_payments" } }, # Stage 5: Filter users who interacted with THIS EVENT { "$match": { "$or": [ {"event_tickets": {"$ne": []}}, # Bought tickets {"is_following_event": 1} # Following event ] } }, # Stage 6: Calculate event-specific metrics { "$addFields": { # Event-specific features "event_ticket_count": {"$size": "$event_tickets"}, "event_total_spend": {"$sum": "$event_tickets.amount"}, # Global RFM "global_total_spend": {"$sum": "$all_payments.amount"}, "global_transaction_count": {"$size": "$all_payments"}, "global_last_transaction": {"$max": "$all_payments.transactionDate"} } }, # Stage 7: Calculate recency { "$addFields": { "global_recency_days": { "$cond": { "if": {"$ne": ["$global_last_transaction", None]}, "then": { "$dateDiff": { "startDate": "$global_last_transaction", "endDate": "$$NOW", "unit": "day" } }, "else": 999999 } } } }, # Stage 8: Project final feature vector { "$project": { "_id": 1, "user_id": "$_id", "email": 1, "firstName": "$firstName", "lastName": "$lastName", # Event-specific features "event_ticket_count": 1, "event_total_spend": 1, "is_follower": "$is_following_event", # Global features "global_recency": "$global_recency_days", "global_frequency": "$global_transaction_count", "global_monetary": "$global_total_spend" } } ] print(f"🔄 Running aggregation for event: {self.event_code}") results = list(self.db.users.aggregate(pipeline, allowDiskUse=True)) print(f"✓ Found {len(results)} users who interacted with this event") return results