minhvtt commited on
Commit
ea06065
·
verified ·
1 Parent(s): c221a89

Upload 19 files

Browse files
config.py CHANGED
@@ -16,12 +16,10 @@ class Settings(BaseSettings):
16
  # Hugging Face Token (optional)
17
  HF_TOKEN: str = os.getenv("HF_TOKEN", "")
18
 
19
- # Collection Names
20
  COLLECTION_USERS: str = "User"
21
  COLLECTION_PAYMENTS: str = "Payment"
22
  COLLECTION_EVENT_VERSIONS: str = "EventVersion"
23
- COLLECTION_USER_FOLLOWS: str = "UserFollow"
24
- COLLECTION_USER_COMMENT_POST: str = "UserCommentPost"
25
  COLLECTION_POST_SOCIAL_MEDIA: str = "PostSocialMedia"
26
 
27
  # AI Result Collections
 
16
  # Hugging Face Token (optional)
17
  HF_TOKEN: str = os.getenv("HF_TOKEN", "")
18
 
19
+ # Collection Names (ACTUAL MongoDB collections)
20
  COLLECTION_USERS: str = "User"
21
  COLLECTION_PAYMENTS: str = "Payment"
22
  COLLECTION_EVENT_VERSIONS: str = "EventVersion"
 
 
23
  COLLECTION_POST_SOCIAL_MEDIA: str = "PostSocialMedia"
24
 
25
  # AI Result Collections
database.py CHANGED
@@ -53,28 +53,28 @@ class DatabaseManager:
53
  self._client.close()
54
  print("✓ MongoDB connection closed")
55
 
56
- # Collection accessors
57
  @property
58
  def users(self) -> Collection:
 
59
  return self.get_collection(settings.COLLECTION_USERS)
60
 
61
  @property
62
  def payments(self) -> Collection:
 
63
  return self.get_collection(settings.COLLECTION_PAYMENTS)
64
 
65
  @property
66
  def event_versions(self) -> Collection:
 
67
  return self.get_collection(settings.COLLECTION_EVENT_VERSIONS)
68
 
69
  @property
70
- def user_follows(self) -> Collection:
71
- return self.get_collection(settings.COLLECTION_USER_FOLLOWS)
 
72
 
73
- @property
74
- def user_comment_post(self) -> Collection:
75
- return self.get_collection(settings.COLLECTION_USER_COMMENT_POST)
76
-
77
- # AI Result Collections (DEPRECATED - use event-centric versions)
78
  @property
79
  def audience_segments(self) -> Collection:
80
  """AudienceSegment collection (DEPRECATED - use event_audience_segments)"""
 
53
  self._client.close()
54
  print("✓ MongoDB connection closed")
55
 
56
+ # ACTUAL Collections (matching models.txt)
57
  @property
58
  def users(self) -> Collection:
59
+ """User collection (contains embedded UserFollows array)"""
60
  return self.get_collection(settings.COLLECTION_USERS)
61
 
62
  @property
63
  def payments(self) -> Collection:
64
+ """Payment collection"""
65
  return self.get_collection(settings.COLLECTION_PAYMENTS)
66
 
67
  @property
68
  def event_versions(self) -> Collection:
69
+ """EventVersion collection"""
70
  return self.get_collection(settings.COLLECTION_EVENT_VERSIONS)
71
 
72
  @property
73
+ def post_social_media(self) -> Collection:
74
+ """PostSocialMedia collection (contains nested Images.UserCommentPosts)"""
75
+ return self.get_collection(settings.COLLECTION_POST_SOCIAL_MEDIA)
76
 
77
+ # AI Result Collections
 
 
 
 
78
  @property
79
  def audience_segments(self) -> Collection:
80
  """AudienceSegment collection (DEPRECATED - use event_audience_segments)"""
models/event_models.py CHANGED
@@ -12,20 +12,33 @@ from bson import ObjectId
12
 
13
 
14
  class PyObjectId(ObjectId):
15
- """Custom ObjectId type for Pydantic"""
16
- @classmethod
17
- def __get_validators__(cls):
18
- yield cls.validate
19
 
20
  @classmethod
21
- def validate(cls, v):
22
- if not ObjectId.is_valid(v):
23
- raise ValueError("Invalid ObjectId")
24
- return ObjectId(v)
 
 
 
 
 
 
 
 
 
25
 
26
  @classmethod
27
- def __modify_schema__(cls, field_schema):
28
- field_schema.update(type="string")
 
 
 
 
 
 
 
29
 
30
 
31
  class MarketingContent(BaseModel):
 
12
 
13
 
14
  class PyObjectId(ObjectId):
15
+ """Custom ObjectId type for Pydantic v2"""
 
 
 
16
 
17
  @classmethod
18
+ def __get_pydantic_core_schema__(cls, source_type, handler):
19
+ from pydantic_core import core_schema
20
+
21
+ return core_schema.union_schema([
22
+ core_schema.is_instance_schema(ObjectId),
23
+ core_schema.chain_schema([
24
+ core_schema.str_schema(),
25
+ core_schema.no_info_plain_validator_function(cls.validate),
26
+ ])
27
+ ],
28
+ serialization=core_schema.plain_serializer_function_ser_schema(
29
+ lambda x: str(x)
30
+ ))
31
 
32
  @classmethod
33
+ def validate(cls, v):
34
+ if isinstance(v, ObjectId):
35
+ return v
36
+ if isinstance(v, str):
37
+ if not ObjectId.is_valid(v):
38
+ raise ValueError(f"Invalid ObjectId: {v}")
39
+ return ObjectId(v)
40
+ raise ValueError(f"Expected ObjectId or string, got {type(v)}")
41
+
42
 
43
 
44
  class MarketingContent(BaseModel):
models/segmentation_models.py CHANGED
@@ -12,20 +12,32 @@ from bson import ObjectId
12
 
13
 
14
  class PyObjectId(ObjectId):
15
- """Custom ObjectId type for Pydantic"""
 
16
  @classmethod
17
- def __get_validators__(cls):
18
- yield cls.validate
19
-
 
 
 
 
 
 
 
 
 
 
 
20
  @classmethod
21
  def validate(cls, v):
22
- if not ObjectId.is_valid(v):
23
- raise ValueError("Invalid ObjectId")
24
- return ObjectId(v)
25
-
26
- @classmethod
27
- def __modify_schema__(cls, field_schema):
28
- field_schema.update(type="string")
29
 
30
 
31
  class AudienceSegment(BaseModel):
 
12
 
13
 
14
  class PyObjectId(ObjectId):
15
+ """Custom ObjectId type for Pydantic v2"""
16
+
17
  @classmethod
18
+ def __get_pydantic_core_schema__(cls, source_type, handler):
19
+ from pydantic_core import core_schema
20
+
21
+ return core_schema.union_schema([
22
+ core_schema.is_instance_schema(ObjectId),
23
+ core_schema.chain_schema([
24
+ core_schema.str_schema(),
25
+ core_schema.no_info_plain_validator_function(cls.validate),
26
+ ])
27
+ ],
28
+ serialization=core_schema.plain_serializer_function_ser_schema(
29
+ lambda x: str(x)
30
+ ))
31
+
32
  @classmethod
33
  def validate(cls, v):
34
+ if isinstance(v, ObjectId):
35
+ return v
36
+ if isinstance(v, str):
37
+ if not ObjectId.is_valid(v):
38
+ raise ValueError(f"Invalid ObjectId: {v}")
39
+ return ObjectId(v)
40
+ raise ValueError(f"Expected ObjectId or string, got {type(v)}")
41
 
42
 
43
  class AudienceSegment(BaseModel):
models/sentiment_models.py CHANGED
@@ -12,20 +12,33 @@ from bson import ObjectId
12
 
13
 
14
  class PyObjectId(ObjectId):
15
- """Custom ObjectId type for Pydantic"""
16
- @classmethod
17
- def __get_validators__(cls):
18
- yield cls.validate
19
 
20
  @classmethod
21
- def validate(cls, v):
22
- if not ObjectId.is_valid(v):
23
- raise ValueError("Invalid ObjectId")
24
- return ObjectId(v)
 
 
 
 
 
 
 
 
 
25
 
26
  @classmethod
27
- def __modify_schema__(cls, field_schema):
28
- field_schema.update(type="string")
 
 
 
 
 
 
 
29
 
30
 
31
  class SentimentAnalysisResult(BaseModel):
 
12
 
13
 
14
  class PyObjectId(ObjectId):
15
+ """Custom ObjectId type for Pydantic v2"""
 
 
 
16
 
17
  @classmethod
18
+ def __get_pydantic_core_schema__(cls, source_type, handler):
19
+ from pydantic_core import core_schema
20
+
21
+ return core_schema.union_schema([
22
+ core_schema.is_instance_schema(ObjectId),
23
+ core_schema.chain_schema([
24
+ core_schema.str_schema(),
25
+ core_schema.no_info_plain_validator_function(cls.validate),
26
+ ])
27
+ ],
28
+ serialization=core_schema.plain_serializer_function_ser_schema(
29
+ lambda x: str(x)
30
+ ))
31
 
32
  @classmethod
33
+ def validate(cls, v):
34
+ if isinstance(v, ObjectId):
35
+ return v
36
+ if isinstance(v, str):
37
+ if not ObjectId.is_valid(v):
38
+ raise ValueError(f"Invalid ObjectId: {v}")
39
+ return ObjectId(v)
40
+ raise ValueError(f"Expected ObjectId or string, got {type(v)}")
41
+
42
 
43
 
44
  class SentimentAnalysisResult(BaseModel):
scripts/create_indexes.py CHANGED
@@ -1,8 +1,8 @@
1
  """
2
  MongoDB Index Creation Script
3
  Author: AI Generated
4
- Created: 2025-11-24
5
- Purpose: Create performance indexes for event-centric queries
6
  """
7
 
8
  from database import db
@@ -12,11 +12,11 @@ from config import settings
12
  def create_all_indexes():
13
  """
14
  Create all necessary indexes for optimal performance.
15
- Run this once during deployment or when setting up a new environment.
16
  """
17
 
18
  print("=" * 60)
19
- print("🔧 Creating MongoDB Indexes")
20
  print("=" * 60)
21
 
22
  # Payment Collection Indexes
@@ -24,37 +24,55 @@ def create_all_indexes():
24
 
25
  # Index for event-specific ticket purchases
26
  db.payments.create_index(
27
- [("EventCode", 1), ("Status", 1), ("UserId", 1)],
28
  name="idx_payment_event_status_user"
29
  )
30
  print(" ✓ Created: idx_payment_event_status_user")
31
 
32
  # Index for user RFM calculation
33
  db.payments.create_index(
34
- [("UserId", 1), ("TransactionDate", -1)],
35
  name="idx_payment_user_date"
36
  )
37
  print(" ✓ Created: idx_payment_user_date")
38
 
39
- # UserFollow Collection Indexes
40
- print("\n👥 UserFollow Collection:")
41
 
42
- # Index for event followers
43
- db.user_follows.create_index(
44
- [("EventCode", 1), ("userId", 1)],
45
- name="idx_follow_event_user"
 
 
46
  )
47
- print(" ✓ Created: idx_follow_event_user")
48
 
49
- # UserCommentPost Collection Indexes
50
- print("\n💬 UserCommentPost Collection:")
 
 
 
 
 
 
 
51
 
52
- # Index for event comments
53
- db.user_comment_post.create_index(
54
- [("EventCode", 1), ("CreatedDate", -1)],
55
- name="idx_comment_event_date"
 
 
 
 
 
 
 
 
 
56
  )
57
- print(" ✓ Created: idx_comment_event_date")
58
 
59
  # EventAudienceSegment Collection Indexes
60
  print("\n🎯 EventAudienceSegment Collection:")
@@ -93,6 +111,13 @@ def create_all_indexes():
93
  )
94
  print(" ✓ Created: idx_sentiment_result_event_date")
95
 
 
 
 
 
 
 
 
96
  print("\n" + "=" * 60)
97
  print("✅ All Indexes Created Successfully!")
98
  print("=" * 60)
@@ -100,8 +125,8 @@ def create_all_indexes():
100
  # List all indexes for verification
101
  print("\n📋 Index Summary:")
102
  print(f" Payment: {len(list(db.payments.list_indexes()))} indexes")
103
- print(f" UserFollow: {len(list(db.user_follows.list_indexes()))} indexes")
104
- print(f" UserCommentPost: {len(list(db.user_comment_post.list_indexes()))} indexes")
105
  print(f" EventAudienceSegment: {len(list(db.event_audience_segments.list_indexes()))} indexes")
106
  print(f" EventSentimentSummary: {len(list(db.event_sentiment_summary.list_indexes()))} indexes")
107
  print(f" SentimentAnalysisResult: {len(list(db.sentiment_results.list_indexes()))} indexes")
 
1
  """
2
  MongoDB Index Creation Script
3
  Author: AI Generated
4
+ Created: 2025-11-24 (Fixed for actual schema)
5
+ Purpose: Create performance indexes matching actual MongoDB structure
6
  """
7
 
8
  from database import db
 
12
  def create_all_indexes():
13
  """
14
  Create all necessary indexes for optimal performance.
15
+ Based on ACTUAL MongoDB structure from models.txt
16
  """
17
 
18
  print("=" * 60)
19
+ print("🔧 Creating MongoDB Indexes (Corrected Schema)")
20
  print("=" * 60)
21
 
22
  # Payment Collection Indexes
 
24
 
25
  # Index for event-specific ticket purchases
26
  db.payments.create_index(
27
+ [("eventCode", 1), ("status", 1), ("userId", 1)],
28
  name="idx_payment_event_status_user"
29
  )
30
  print(" ✓ Created: idx_payment_event_status_user")
31
 
32
  # Index for user RFM calculation
33
  db.payments.create_index(
34
+ [("userId", 1), ("transactionDate", -1)],
35
  name="idx_payment_user_date"
36
  )
37
  print(" ✓ Created: idx_payment_user_date")
38
 
39
+ # User Collection Indexes (UserFollows is EMBEDDED)
40
+ print("\n👥 User Collection:")
41
 
42
+ # Index for finding users who follow a specific event
43
+ # UserFollows is an embedded array, so we use dot notation
44
+ db.users.create_index(
45
+ [("UserFollows.eventCode", 1)],
46
+ name="idx_user_follows_event",
47
+ sparse=True # Skip documents without UserFollows
48
  )
49
+ print(" ✓ Created: idx_user_follows_event (embedded array)")
50
 
51
+ # Index for user status (to filter Active users)
52
+ db.users.create_index(
53
+ [("status", 1)],
54
+ name="idx_user_status"
55
+ )
56
+ print(" ✓ Created: idx_user_status")
57
+
58
+ # PostSocialMedia Collection Indexes
59
+ print("\n💬 PostSocialMedia Collection:")
60
 
61
+ # Index for event-specific posts
62
+ db.post_social_media.create_index(
63
+ [("eventCode", 1), ("createdAt", -1)],
64
+ name="idx_post_event_date"
65
+ )
66
+ print(" ✓ Created: idx_post_event_date")
67
+
68
+ # Index for searching comments (nested in Images.UserCommentPosts)
69
+ # Using wildcard for nested arrays
70
+ db.post_social_media.create_index(
71
+ [("eventCode", 1), ("images.userCommentPosts.commentedAt", -1)],
72
+ name="idx_post_comments",
73
+ sparse=True
74
  )
75
+ print(" ✓ Created: idx_post_comments (nested array)")
76
 
77
  # EventAudienceSegment Collection Indexes
78
  print("\n🎯 EventAudienceSegment Collection:")
 
111
  )
112
  print(" ✓ Created: idx_sentiment_result_event_date")
113
 
114
+ # Index for sentiment label filtering
115
+ db.sentiment_results.create_index(
116
+ [("event_code", 1), ("sentiment_label", 1)],
117
+ name="idx_sentiment_event_label"
118
+ )
119
+ print(" ✓ Created: idx_sentiment_event_label")
120
+
121
  print("\n" + "=" * 60)
122
  print("✅ All Indexes Created Successfully!")
123
  print("=" * 60)
 
125
  # List all indexes for verification
126
  print("\n📋 Index Summary:")
127
  print(f" Payment: {len(list(db.payments.list_indexes()))} indexes")
128
+ print(f" User: {len(list(db.users.list_indexes()))} indexes")
129
+ print(f" PostSocialMedia: {len(list(db.post_social_media.list_indexes()))} indexes")
130
  print(f" EventAudienceSegment: {len(list(db.event_audience_segments.list_indexes()))} indexes")
131
  print(f" EventSentimentSummary: {len(list(db.event_sentiment_summary.list_indexes()))} indexes")
132
  print(f" SentimentAnalysisResult: {len(list(db.sentiment_results.list_indexes()))} indexes")
services/data_aggregation.py CHANGED
@@ -1,12 +1,13 @@
1
  """
2
  Data Aggregation Pipeline for Event-Centric User Segmentation
3
  Author: AI Generated
4
- Created: 2025-11-24 (Refactored for event-centric analysis)
5
- Purpose: Aggregate user features for a specific event using MongoDB pipelines
6
  """
7
 
8
  from typing import List, Dict
9
  from datetime import datetime
 
10
  from database import db
11
  from config import settings
12
 
@@ -14,7 +15,9 @@ from config import settings
14
  class UserDataAggregator:
15
  """
16
  Aggregates user behavioral data for segmentation per event.
17
- Uses MongoDB Aggregation Framework to minimize data transfer.
 
 
18
  """
19
 
20
  def __init__(self, event_code: str):
@@ -22,7 +25,7 @@ class UserDataAggregator:
22
  Initialize aggregator for a specific event.
23
 
24
  Args:
25
- event_code: Event identifier to filter users
26
  """
27
  self.event_code = event_code
28
  self.db = db
@@ -31,23 +34,23 @@ class UserDataAggregator:
31
  """
32
  Aggregate user features for the specified event.
33
 
34
- Returns users who:
35
- 1. Bought tickets for this event
36
- 2. Follow this event
37
- 3. Commented on this event
38
 
39
- Returns: List of user feature vectors with event-specific context
40
  """
41
 
42
  pipeline = [
43
- # Stage 1: Start with users who interacted with THIS event
44
  {
45
  "$match": {
46
- "Status": "Active"
47
  }
48
  },
49
 
50
- # Stage 2: Lookup tickets bought for THIS EVENT
51
  {
52
  "$lookup": {
53
  "from": settings.COLLECTION_PAYMENTS,
@@ -57,9 +60,9 @@ class UserDataAggregator:
57
  "$match": {
58
  "$expr": {
59
  "$and": [
60
- {"$eq": ["$UserId", "$$user_id"]},
61
- {"$eq": ["$EventCode", self.event_code]},
62
- {"$eq": ["$Status", "Completed"]}
63
  ]
64
  }
65
  }
@@ -69,10 +72,34 @@ class UserDataAggregator:
69
  }
70
  },
71
 
72
- # Stage 3: Lookup follows for THIS EVENT
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
  {
74
  "$lookup": {
75
- "from": settings.COLLECTION_USER_FOLLOWS,
76
  "let": {"user_id": "$_id"},
77
  "pipeline": [
78
  {
@@ -80,39 +107,22 @@ class UserDataAggregator:
80
  "$expr": {
81
  "$and": [
82
  {"$eq": ["$userId", "$$user_id"]},
83
- {"$eq": ["$EventCode", self.event_code]}
84
  ]
85
  }
86
  }
87
  }
88
  ],
89
- "as": "event_follows"
90
  }
91
  },
92
 
93
- # Stage 4: Lookup all payments for global RFM (user lifetime value)
94
- {
95
- "$lookup": {
96
- "from": settings.COLLECTION_PAYMENTS,
97
- "localField": "_id",
98
- "foreignField": "UserId",
99
- "as": "all_payments",
100
- "pipeline": [
101
- {
102
- "$match": {
103
- "Status": "Completed"
104
- }
105
- }
106
- ]
107
- }
108
- },
109
-
110
- # Stage 5: Filter users who interacted with this event
111
  {
112
  "$match": {
113
  "$or": [
114
- {"event_tickets": {"$ne": []}},
115
- {"event_follows": {"$ne": []}}
116
  ]
117
  }
118
  },
@@ -120,32 +130,23 @@ class UserDataAggregator:
120
  # Stage 6: Calculate event-specific metrics
121
  {
122
  "$addFields": {
123
- # Event-specific: tickets bought for THIS event
124
  "event_ticket_count": {"$size": "$event_tickets"},
125
- "event_total_spend": {"$sum": "$event_tickets.Amount"},
126
-
127
- # Event-specific: follow status
128
- "is_follower": {
129
- "$cond": [
130
- {"$gt": [{"$size": "$event_follows"}, 0]},
131
- 1,
132
- 0
133
- ]
134
- },
135
 
136
- # Global RFM: user's overall purchasing power
137
- "global_total_spend": {"$sum": "$all_payments.Amount"},
138
  "global_transaction_count": {"$size": "$all_payments"},
139
- "global_last_transaction": {"$max": "$all_payments.TransactionDate"}
140
  }
141
  },
142
 
143
- # Stage 7: Calculate global recency
144
  {
145
  "$addFields": {
146
  "global_recency_days": {
147
  "$cond": {
148
- "if": {"$gt": ["$global_last_transaction", None]},
149
  "then": {
150
  "$dateDiff": {
151
  "startDate": "$global_last_transaction",
@@ -165,15 +166,15 @@ class UserDataAggregator:
165
  "_id": 1,
166
  "user_id": "$_id",
167
  "email": 1,
168
- "firstName": "$FirstName",
169
- "lastName": "$LastName",
170
 
171
  # Event-specific features
172
  "event_ticket_count": 1,
173
  "event_total_spend": 1,
174
- "is_follower": 1,
175
 
176
- # Global features (user power)
177
  "global_recency": "$global_recency_days",
178
  "global_frequency": "$global_transaction_count",
179
  "global_monetary": "$global_total_spend"
 
1
  """
2
  Data Aggregation Pipeline for Event-Centric User Segmentation
3
  Author: AI Generated
4
+ Created: 2025-11-24 (Fixed for actual MongoDB schema)
5
+ Purpose: Aggregate user features based on EMBEDDED UserFollows and nested comments
6
  """
7
 
8
  from typing import List, Dict
9
  from datetime import datetime
10
+ from bson import ObjectId
11
  from database import db
12
  from config import settings
13
 
 
15
  class UserDataAggregator:
16
  """
17
  Aggregates user behavioral data for segmentation per event.
18
+ CORRECTED to use:
19
+ - User.UserFollows (embedded array)
20
+ - PostSocialMedia.Images.UserCommentPosts (nested)
21
  """
22
 
23
  def __init__(self, event_code: str):
 
25
  Initialize aggregator for a specific event.
26
 
27
  Args:
28
+ event_code: Event identifier (ObjectId string)
29
  """
30
  self.event_code = event_code
31
  self.db = db
 
34
  """
35
  Aggregate user features for the specified event.
36
 
37
+ Users are considered "interacted" if they:
38
+ 1. Bought tickets (Payment.eventCode)
39
+ 2. Follow event (User.UserFollows.eventCode)
40
+ 3. Commented on posts (PostSocialMedia.Images.UserCommentPosts where PostSocialMedia.eventCode)
41
 
42
+ Returns: List of user feature vectors
43
  """
44
 
45
  pipeline = [
46
+ # Stage 1: Start with Active users only
47
  {
48
  "$match": {
49
+ "status": "Active"
50
  }
51
  },
52
 
53
+ # Stage 2: Lookup ticket purchases for THIS EVENT
54
  {
55
  "$lookup": {
56
  "from": settings.COLLECTION_PAYMENTS,
 
60
  "$match": {
61
  "$expr": {
62
  "$and": [
63
+ {"$eq": ["$userId", "$$user_id"]},
64
+ {"$eq": ["$eventCode", ObjectId(self.event_code)]},
65
+ {"$eq": ["$status", "Completed"]}
66
  ]
67
  }
68
  }
 
72
  }
73
  },
74
 
75
+ # Stage 3: Check if user follows THIS EVENT (embedded UserFollows)
76
+ {
77
+ "$addFields": {
78
+ "is_following_event": {
79
+ "$cond": {
80
+ "if": {
81
+ "$in": [
82
+ ObjectId(self.event_code),
83
+ {
84
+ "$map": {
85
+ "input": {"$ifNull": ["$UserFollows", []]},
86
+ "as": "follow",
87
+ "in": "$$follow.eventCode"
88
+ }
89
+ }
90
+ ]
91
+ },
92
+ "then": 1,
93
+ "else": 0
94
+ }
95
+ }
96
+ }
97
+ },
98
+
99
+ # Stage 4: Lookup ALL payments for global RFM
100
  {
101
  "$lookup": {
102
+ "from": settings.COLLECTION_PAYMENTS,
103
  "let": {"user_id": "$_id"},
104
  "pipeline": [
105
  {
 
107
  "$expr": {
108
  "$and": [
109
  {"$eq": ["$userId", "$$user_id"]},
110
+ {"$eq": ["$status", "Completed"]}
111
  ]
112
  }
113
  }
114
  }
115
  ],
116
+ "as": "all_payments"
117
  }
118
  },
119
 
120
+ # Stage 5: Filter users who interacted with THIS EVENT
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
121
  {
122
  "$match": {
123
  "$or": [
124
+ {"event_tickets": {"$ne": []}}, # Bought tickets
125
+ {"is_following_event": 1} # Following event
126
  ]
127
  }
128
  },
 
130
  # Stage 6: Calculate event-specific metrics
131
  {
132
  "$addFields": {
133
+ # Event-specific features
134
  "event_ticket_count": {"$size": "$event_tickets"},
135
+ "event_total_spend": {"$sum": "$event_tickets.amount"},
 
 
 
 
 
 
 
 
 
136
 
137
+ # Global RFM
138
+ "global_total_spend": {"$sum": "$all_payments.amount"},
139
  "global_transaction_count": {"$size": "$all_payments"},
140
+ "global_last_transaction": {"$max": "$all_payments.transactionDate"}
141
  }
142
  },
143
 
144
+ # Stage 7: Calculate recency
145
  {
146
  "$addFields": {
147
  "global_recency_days": {
148
  "$cond": {
149
+ "if": {"$ne": ["$global_last_transaction", None]},
150
  "then": {
151
  "$dateDiff": {
152
  "startDate": "$global_last_transaction",
 
166
  "_id": 1,
167
  "user_id": "$_id",
168
  "email": 1,
169
+ "firstName": "$firstName",
170
+ "lastName": "$lastName",
171
 
172
  # Event-specific features
173
  "event_ticket_count": 1,
174
  "event_total_spend": 1,
175
+ "is_follower": "$is_following_event",
176
 
177
+ # Global features
178
  "global_recency": "$global_recency_days",
179
  "global_frequency": "$global_transaction_count",
180
  "global_monetary": "$global_total_spend"
services/genai_service.py CHANGED
@@ -258,7 +258,7 @@ BODY:
258
  """
259
  Generate AI insights from negative comments.
260
  """
261
- # Get negative comments
262
  negative_results = list(db.sentiment_results.find({
263
  "event_code": self.event_code,
264
  "sentiment_label": "Negative"
@@ -272,13 +272,27 @@ BODY:
272
  predicted_nps=70.0
273
  )
274
 
275
- # Get comment texts
276
- comment_ids = [r['source_id'] for r in negative_results]
277
- comments = list(db.user_comment_post.find({
278
- "_id": {"$in": comment_ids}
279
- }))
 
 
 
 
 
 
 
 
 
 
 
 
 
280
 
281
- negative_texts = [c.get('CommentText', '') for c in comments if c.get('CommentText')]
 
282
 
283
  # Build prompt
284
  comments_sample = "\n".join([f"- {text[:100]}" for text in negative_texts[:15]])
 
258
  """
259
  Generate AI insights from negative comments.
260
  """
261
+ # Get negative sentiment results (already analyzed and saved)
262
  negative_results = list(db.sentiment_results.find({
263
  "event_code": self.event_code,
264
  "sentiment_label": "Negative"
 
272
  predicted_nps=70.0
273
  )
274
 
275
+ # Get original comment texts from PostSocialMedia
276
+ comment_ids = [ObjectId(r['source_id']) for r in negative_results]
277
+
278
+ # Extract comments from nested structure
279
+ pipeline = [
280
+ {"$unwind": "$images"},
281
+ {"$unwind": "$images.userCommentPosts"},
282
+ {
283
+ "$match": {
284
+ "images.userCommentPosts.commentId": {"$in": comment_ids}
285
+ }
286
+ },
287
+ {
288
+ "$project": {
289
+ "comment_text": "$images.userCommentPosts.commentText"
290
+ }
291
+ }
292
+ ]
293
 
294
+ comments = list(db.post_social_media.aggregate(pipeline))
295
+ negative_texts = [c.get('comment_text', '') for c in comments if c.get('comment_text')]
296
 
297
  # Build prompt
298
  comments_sample = "\n".join([f"- {text[:100]}" for text in negative_texts[:15]])
services/sentiment_service.py CHANGED
@@ -1,8 +1,8 @@
1
  """
2
  Event-Centric Sentiment Analysis Service
3
  Author: AI Generated
4
- Created: 2025-11-24 (Refactored)
5
- Purpose: Analyze sentiment for event comments and generate summary
6
  """
7
 
8
  import torch
@@ -22,6 +22,7 @@ from services.monitoring import monitor
22
  class SentimentAnalysisService:
23
  """
24
  Event-centric sentiment analysis using PhoBERT.
 
25
  """
26
 
27
  def __init__(self, event_code: str):
@@ -29,7 +30,7 @@ class SentimentAnalysisService:
29
  Initialize for a specific event.
30
 
31
  Args:
32
- event_code: Event identifier
33
  """
34
  self.event_code = event_code
35
  self.model_name = settings.SENTIMENT_MODEL
@@ -58,7 +59,7 @@ class SentimentAnalysisService:
58
  print(f"✓ Model loaded on {self.device}")
59
 
60
  def analyze_text(self, text: str) -> Tuple[str, float]:
61
- """Analyze single text with preprocessing."""
62
  if not self.model:
63
  self.load_model()
64
 
@@ -87,11 +88,60 @@ class SentimentAnalysisService:
87
  sentiment_label = self.label_map.get(predicted_class, "Neutral")
88
  return sentiment_label, confidence
89
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90
  def analyze_event_comments(self) -> Dict:
91
  """
92
  Analyze all comments for this event.
93
-
94
- Returns summary statistics.
95
  """
96
  import time
97
  start_time = time.time()
@@ -104,10 +154,8 @@ class SentimentAnalysisService:
104
  if not self.model:
105
  self.load_model()
106
 
107
- # Fetch comments for THIS EVENT only
108
- comments = list(db.user_comment_post.find({
109
- "EventCode": self.event_code
110
- }).limit(1000))
111
 
112
  print(f"✓ Found {len(comments)} comments for this event")
113
 
@@ -122,7 +170,7 @@ class SentimentAnalysisService:
122
  all_keywords = []
123
 
124
  for comment in comments:
125
- text = comment.get('CommentText', '')
126
  if not text:
127
  continue
128
 
@@ -131,9 +179,9 @@ class SentimentAnalysisService:
131
 
132
  # Save individual result
133
  result = SentimentAnalysisResult(
134
- source_id=comment['_id'],
135
  source_type="UserCommentPost",
136
- event_code=self.event_code, # NEW: link to event
137
  sentiment_label=sentiment,
138
  confidence_score=confidence,
139
  key_phrases=keywords,
@@ -147,7 +195,7 @@ class SentimentAnalysisService:
147
  total_confidence += confidence
148
  all_keywords.extend(keywords)
149
 
150
- # Bulk insert results
151
  if results_to_save:
152
  db.sentiment_results.insert_many(results_to_save)
153
  print(f"✓ Saved {len(results_to_save)} sentiment results")
@@ -155,7 +203,7 @@ class SentimentAnalysisService:
155
  # Calculate summary
156
  avg_confidence = total_confidence / len(results_to_save) if results_to_save else 0
157
 
158
- # Get top keywords
159
  keyword_freq = {}
160
  for kw in all_keywords:
161
  keyword_freq[kw] = keyword_freq.get(kw, 0) + 1
@@ -173,7 +221,7 @@ class SentimentAnalysisService:
173
  sentiment_distribution=sentiment_counts,
174
  avg_confidence=avg_confidence,
175
  top_keywords=top_keywords,
176
- ai_insights=None, # Will be filled by GenAI
177
  last_updated=datetime.utcnow()
178
  )
179
 
@@ -186,10 +234,10 @@ class SentimentAnalysisService:
186
  # Print summary
187
  print("\n📊 Sentiment Distribution:")
188
  for label, count in sentiment_counts.items():
189
- pct = (count / len(results_to_save) *100) if results_to_save else 0
190
  print(f" {label}: {count} ({pct:.1f}%)")
191
 
192
- # Log metrics
193
  execution_time = time.time() - start_time
194
  metrics = {
195
  "event_code": self.event_code,
 
1
  """
2
  Event-Centric Sentiment Analysis Service
3
  Author: AI Generated
4
+ Created: 2025-11-24 (Fixed for actual MongoDB schema)
5
+ Purpose: Analyze sentiment for comments nested in PostSocialMedia.Images
6
  """
7
 
8
  import torch
 
22
  class SentimentAnalysisService:
23
  """
24
  Event-centric sentiment analysis using PhoBERT.
25
+ Comments are nested: PostSocialMedia.Images.UserCommentPosts
26
  """
27
 
28
  def __init__(self, event_code: str):
 
30
  Initialize for a specific event.
31
 
32
  Args:
33
+ event_code: Event identifier (ObjectId string)
34
  """
35
  self.event_code = event_code
36
  self.model_name = settings.SENTIMENT_MODEL
 
59
  print(f"✓ Model loaded on {self.device}")
60
 
61
  def analyze_text(self, text: str) -> Tuple[str, float]:
62
+ """Analyze single text"""
63
  if not self.model:
64
  self.load_model()
65
 
 
88
  sentiment_label = self.label_map.get(predicted_class, "Neutral")
89
  return sentiment_label, confidence
90
 
91
+ def extract_comments_from_posts(self) -> List[Dict]:
92
+ """
93
+ Extract all comments from PostSocialMedia for this event.
94
+
95
+ Structure: PostSocialMedia → Images[] → UserCommentPosts[]
96
+ """
97
+ pipeline = [
98
+ # Match posts for this event
99
+ {
100
+ "$match": {
101
+ "eventCode": ObjectId(self.event_code)
102
+ }
103
+ },
104
+
105
+ # Unwind images array
106
+ {
107
+ "$unwind": {
108
+ "path": "$images",
109
+ "preserveNullAndEmptyArrays": False
110
+ }
111
+ },
112
+
113
+ # Unwind UserCommentPosts within each image
114
+ {
115
+ "$unwind": {
116
+ "path": "$images.userCommentPosts",
117
+ "preserveNullAndEmptyArrays": False
118
+ }
119
+ },
120
+
121
+ # Project the fields we need
122
+ {
123
+ "$project": {
124
+ "post_id": "$_id",
125
+ "image_id": "$images.imageInPostId",
126
+ "comment_id": "$images.userCommentPosts.commentId",
127
+ "user_id": "$images.userCommentPosts.userId",
128
+ "comment_text": "$images.userCommentPosts.commentText",
129
+ "commented_at": "$images.userCommentPosts.commentedAt"
130
+ }
131
+ },
132
+
133
+ # Limit for performance
134
+ {
135
+ "$limit": 1000
136
+ }
137
+ ]
138
+
139
+ comments = list(db.post_social_media.aggregate(pipeline))
140
+ return comments
141
+
142
  def analyze_event_comments(self) -> Dict:
143
  """
144
  Analyze all comments for this event.
 
 
145
  """
146
  import time
147
  start_time = time.time()
 
154
  if not self.model:
155
  self.load_model()
156
 
157
+ # Extract comments
158
+ comments = self.extract_comments_from_posts()
 
 
159
 
160
  print(f"✓ Found {len(comments)} comments for this event")
161
 
 
170
  all_keywords = []
171
 
172
  for comment in comments:
173
+ text = comment.get('comment_text', '')
174
  if not text:
175
  continue
176
 
 
179
 
180
  # Save individual result
181
  result = SentimentAnalysisResult(
182
+ source_id=ObjectId(comment['comment_id']),
183
  source_type="UserCommentPost",
184
+ event_code=self.event_code,
185
  sentiment_label=sentiment,
186
  confidence_score=confidence,
187
  key_phrases=keywords,
 
195
  total_confidence += confidence
196
  all_keywords.extend(keywords)
197
 
198
+ # Bulk insert
199
  if results_to_save:
200
  db.sentiment_results.insert_many(results_to_save)
201
  print(f"✓ Saved {len(results_to_save)} sentiment results")
 
203
  # Calculate summary
204
  avg_confidence = total_confidence / len(results_to_save) if results_to_save else 0
205
 
206
+ # Top keywords
207
  keyword_freq = {}
208
  for kw in all_keywords:
209
  keyword_freq[kw] = keyword_freq.get(kw, 0) + 1
 
221
  sentiment_distribution=sentiment_counts,
222
  avg_confidence=avg_confidence,
223
  top_keywords=top_keywords,
224
+ ai_insights=None,
225
  last_updated=datetime.utcnow()
226
  )
227
 
 
234
  # Print summary
235
  print("\n📊 Sentiment Distribution:")
236
  for label, count in sentiment_counts.items():
237
+ pct = (count / len(results_to_save) * 100) if results_to_save else 0
238
  print(f" {label}: {count} ({pct:.1f}%)")
239
 
240
+ # Monitoring
241
  execution_time = time.time() - start_time
242
  metrics = {
243
  "event_code": self.event_code,