minhvtt commited on
Commit
4823fe6
·
verified ·
1 Parent(s): de1a145

Upload 19 files

Browse files
Files changed (3) hide show
  1. app.py +447 -447
  2. config.py +1 -2
  3. services/genai_service.py +90 -121
app.py CHANGED
@@ -1,447 +1,447 @@
1
- """
2
- FastAPI Application for Event-Centric Audience Segmentation AI
3
- Author: AI Generated
4
- Created: 2025-11-24 (Refactored)
5
- Purpose: REST API with event-based endpoints
6
- """
7
-
8
- from fastapi import FastAPI, HTTPException, BackgroundTasks, status, Query
9
- from fastapi.middleware.cors import CORSMiddleware
10
- from pydantic import BaseModel
11
- from typing import List, Dict, Optional, Any
12
- from datetime import datetime
13
- from bson import ObjectId
14
-
15
- # Import services
16
- from services.segmentation_service import SegmentationService
17
- from services.sentiment_service import SentimentAnalysisService
18
- from services.genai_service import GenerativeAIService
19
- from database import db
20
- from config import settings
21
-
22
-
23
- # FastAPI app
24
- app = FastAPI(
25
- title="Audience Segmentation AI - Event-Centric",
26
- description="REST API for per-event audience analysis",
27
- version="2.0.0",
28
- docs_url="/api/docs",
29
- redoc_url="/api/redoc"
30
- )
31
-
32
- # CORS
33
- app.add_middleware(
34
- CORSMiddleware,
35
- allow_origins=["*"],
36
- allow_credentials=True,
37
- allow_methods=["*"],
38
- allow_headers=["*"],
39
- )
40
-
41
-
42
- # Helper
43
- def serialize_doc(doc: Dict) -> Optional[Dict]:
44
- """Convert MongoDB document to JSON-serializable dict"""
45
- if doc is None:
46
- return None
47
- if '_id' in doc:
48
- doc['id'] = str(doc.pop('_id'))
49
-
50
- # Handle nested ObjectIds and lists
51
- for key, value in list(doc.items()):
52
- if isinstance(value, ObjectId):
53
- doc[key] = str(value)
54
- elif isinstance(value, list):
55
- doc[key] = [str(v) if isinstance(v, ObjectId) else v for v in value]
56
- elif isinstance(value, dict):
57
- doc[key] = serialize_doc(value)
58
-
59
- return doc
60
-
61
-
62
- # ===== HEALTH =====
63
- @app.get("/health", tags=["System"])
64
- async def health_check():
65
- """Health check"""
66
- try:
67
- db.client.server_info()
68
- return {
69
- "status": "healthy",
70
- "timestamp": datetime.utcnow(),
71
- "database": "connected"
72
- }
73
- except Exception as e:
74
- raise HTTPException(
75
- status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
76
- detail=f"Unhealthy: {str(e)}"
77
- )
78
-
79
-
80
- # ===== EVENT ANALYSIS =====
81
- @app.post("/api/events/{event_code}/analyze", tags=["Event Analysis"])
82
- async def analyze_event(event_code: str, background_tasks: BackgroundTasks):
83
- """Run full AI pipeline for an event"""
84
-
85
- def run_pipeline():
86
- # Step 1: Segmentation
87
- seg_service = SegmentationService(event_code)
88
- seg_service.run_segmentation()
89
-
90
- # Step 2: Sentiment
91
- sent_service = SentimentAnalysisService(event_code)
92
- sent_service.analyze_event_comments()
93
-
94
- # Step 3: Email generation
95
- genai_service = GenerativeAIService(event_code)
96
- genai_service.generate_emails_for_all_segments()
97
-
98
- # Step 4: Insights
99
- genai_service.update_sentiment_summary_with_insights()
100
-
101
- background_tasks.add_task(run_pipeline)
102
-
103
- return {
104
- "status": "started",
105
- "message": f"Analysis pipeline started for event {event_code}"
106
- }
107
-
108
-
109
- @app.get("/api/events/{event_code}/dashboard", tags=["Event Analysis"])
110
- async def get_event_dashboard(event_code: str):
111
- """Get complete dashboard for Event Owner"""
112
-
113
- # Get segments
114
- segments = list(db.event_audience_segments.find({"event_code": event_code}))
115
-
116
- # Get sentiment summary
117
- sentiment_summary = db.event_sentiment_summary.find_one({"event_code": event_code})
118
-
119
- return {
120
- "event_code": event_code,
121
- "segments": [serialize_doc(s) for s in segments],
122
- "sentiment_summary": serialize_doc(sentiment_summary) if sentiment_summary else None
123
- }
124
-
125
-
126
- # ===== SEGMENTATION =====
127
- @app.post("/api/events/{event_code}/segmentation/run", tags=["Segmentation"])
128
- async def run_event_segmentation(
129
- event_code: str,
130
- background_tasks: BackgroundTasks,
131
- n_clusters: int = Query(default=5, ge=2, le=10)
132
- ):
133
- """Run segmentation for an event"""
134
-
135
- def run_task():
136
- service = SegmentationService(event_code, n_clusters=n_clusters)
137
- service.run_segmentation()
138
-
139
- background_tasks.add_task(run_task)
140
-
141
- return {
142
- "status": "started",
143
- "message": f"Segmentation started for event {event_code}",
144
- "event_code": event_code
145
- }
146
-
147
-
148
- @app.get("/api/events/{event_code}/segments", tags=["Segmentation"])
149
- async def get_event_segments(
150
- event_code: str,
151
- status_filter: Optional[str] = Query(default=None, description="Filter by Draft, Approved, Sent")
152
- ):
153
- """Get all segments for an event"""
154
-
155
- query = {"event_code": event_code}
156
- if status_filter:
157
- query["marketing_content.status"] = status_filter
158
-
159
- segments = list(db.event_audience_segments.find(query))
160
-
161
- return [serialize_doc(s) for s in segments]
162
-
163
-
164
- @app.get("/api/events/{event_code}/segments/{segment_id}", tags=["Segmentation"])
165
- async def get_segment_detail(event_code: str, segment_id: str):
166
- """Get specific segment details"""
167
-
168
- segment = db.event_audience_segments.find_one({
169
- "_id": ObjectId(segment_id),
170
- "event_code": event_code
171
- })
172
-
173
- if not segment:
174
- raise HTTPException(status_code=404, detail="Segment not found")
175
-
176
- return serialize_doc(segment)
177
-
178
-
179
- @app.get("/api/events/{event_code}/segments/{segment_id}/users", tags=["Segmentation"])
180
- async def get_segment_users(
181
- event_code: str,
182
- segment_id: str,
183
- skip: int = 0,
184
- limit: int = 100
185
- ):
186
- """Get users in a segment with details"""
187
-
188
- segment = db.event_audience_segments.find_one({
189
- "_id": ObjectId(segment_id),
190
- "event_code": event_code
191
- })
192
-
193
- if not segment:
194
- raise HTTPException(status_code=404, detail="Segment not found")
195
-
196
- user_ids = segment.get('user_ids', [])
197
- total_users = len(user_ids)
198
-
199
- # Paginate
200
- paginated_ids = user_ids[skip:skip + limit]
201
-
202
- # Get user details
203
- users = list(db.users.find({
204
- "_id": {"$in": paginated_ids}
205
- }))
206
-
207
- # Enrich with stats (optional)
208
- enriched_users = []
209
- for user in users:
210
- enriched_users.append({
211
- "user_id": str(user['_id']),
212
- "email": user.get('email'),
213
- "full_name": f"{user.get('FirstName', '')} {user.get('LastName', '')}".strip()
214
- })
215
-
216
- return {
217
- "segment_id": segment_id,
218
- "total_users": total_users,
219
- "users": enriched_users
220
- }
221
-
222
-
223
- # ===== APPROVAL WORKFLOW =====
224
- @app.post("/api/events/{event_code}/segments/{segment_id}/approve", tags=["Approval"])
225
- async def approve_segment(
226
- event_code: str,
227
- segment_id: str,
228
- approved_by: Optional[str] = None,
229
- modified_subject: Optional[str] = None,
230
- modified_body: Optional[str] = None
231
- ):
232
- """Event Owner approves marketing content"""
233
-
234
- segment = db.event_audience_segments.find_one({
235
- "_id": ObjectId(segment_id),
236
- "event_code": event_code
237
- })
238
-
239
- if not segment:
240
- raise HTTPException(status_code=404, detail="Segment not found")
241
-
242
- # Update fields
243
- update = {
244
- "marketing_content.status": "Approved",
245
- "marketing_content.approved_at": datetime.utcnow(),
246
- "marketing_content.approved_by": approved_by,
247
- "last_updated": datetime.utcnow()
248
- }
249
-
250
- if modified_subject:
251
- update["marketing_content.email_subject"] = modified_subject
252
- if modified_body:
253
- update["marketing_content.email_body"] = modified_body
254
-
255
- db.event_audience_segments.update_one(
256
- {"_id": ObjectId(segment_id)},
257
- {"$set": update}
258
- )
259
-
260
- updated_segment = db.event_audience_segments.find_one({"_id": ObjectId(segment_id)})
261
-
262
- return {
263
- "status": "success",
264
- "message": "Segment approved",
265
- "segment_id": segment_id,
266
- "marketing_content": updated_segment.get('marketing_content')
267
- }
268
-
269
-
270
- @app.post("/api/events/{event_code}/segments/{segment_id}/send-email", tags=["Approval"])
271
- async def send_segment_email(
272
- event_code: str,
273
- segment_id: str,
274
- send_immediately: bool = True
275
- ):
276
- """Send approved marketing email"""
277
-
278
- segment = db.event_audience_segments.find_one({
279
- "_id": ObjectId(segment_id),
280
- "event_code": event_code
281
- })
282
-
283
- if not segment:
284
- raise HTTPException(status_code=404, detail="Segment not found")
285
-
286
- marketing_content = segment.get('marketing_content', {})
287
- if marketing_content.get('status') != "Approved":
288
- raise HTTPException(status_code=400, detail="Segment not approved yet")
289
-
290
- # TODO: Integrate with email service (SendGrid, AWS SES, etc.)
291
- # For now, just mark as sent
292
-
293
- db.event_audience_segments.update_one(
294
- {"_id": ObjectId(segment_id)},
295
- {"$set": {
296
- "marketing_content.status": "Sent",
297
- "last_updated": datetime.utcnow()
298
- }}
299
- )
300
-
301
- return {
302
- "status": "success",
303
- "message": f"Email sent to {segment.get('user_count', 0)} users",
304
- "segment_id": segment_id,
305
- "emails_sent": segment.get('user_count', 0),
306
- "emails_failed": 0
307
- }
308
-
309
-
310
- # ===== SENTIMENT =====
311
- @app.post("/api/events/{event_code}/sentiment/analyze", tags=["Sentiment"])
312
- async def analyze_event_sentiment(event_code: str, background_tasks: BackgroundTasks):
313
- """Analyze sentiment for event comments"""
314
-
315
- def run_task():
316
- service = SentimentAnalysisService(event_code)
317
- service.analyze_event_comments()
318
-
319
- background_tasks.add_task(run_task)
320
-
321
- return {
322
- "status": "started",
323
- "message": f"Sentiment analysis started for event {event_code}"
324
- }
325
-
326
-
327
- @app.get("/api/events/{event_code}/sentiment/summary", tags=["Sentiment"])
328
- async def get_sentiment_summary(event_code: str):
329
- """Get sentiment summary for an event"""
330
-
331
- summary = db.event_sentiment_summary.find_one({"event_code": event_code})
332
-
333
- if not summary:
334
- raise HTTPException(status_code=404, detail="No sentiment data for this event")
335
-
336
- return serialize_doc(summary)
337
-
338
-
339
- @app.get("/api/events/{event_code}/sentiment/results", tags=["Sentiment"])
340
- async def get_sentiment_results(
341
- event_code: str,
342
- sentiment_label: Optional[str] = None,
343
- skip: int = 0,
344
- limit: int = 100
345
- ):
346
- """Get detailed sentiment results"""
347
-
348
- query = {"event_code": event_code}
349
- if sentiment_label:
350
- query["sentiment_label"] = sentiment_label
351
-
352
- total = db.sentiment_results.count_documents(query)
353
- results = list(
354
- db.sentiment_results.find(query)
355
- .sort("analyzed_at", -1)
356
- .skip(skip)
357
- .limit(limit)
358
- )
359
-
360
- return {
361
- "total": total,
362
- "results": [serialize_doc(r) for r in results]
363
- }
364
-
365
-
366
- # ===== GENAI =====
367
- @app.post("/api/events/{event_code}/genai/generate-emails", tags=["GenAI"])
368
- async def generate_event_emails(event_code: str, background_tasks: BackgroundTasks):
369
- """Generate marketing emails for all segments"""
370
-
371
- def run_task():
372
- service = GenerativeAIService(event_code)
373
- service.generate_emails_for_all_segments()
374
-
375
- background_tasks.add_task(run_task)
376
-
377
- return {
378
- "status": "started",
379
- "message": "Email generation started"
380
- }
381
-
382
-
383
- @app.post("/api/events/{event_code}/genai/generate-insights", tags=["GenAI"])
384
- async def generate_event_insights(event_code: str, background_tasks: BackgroundTasks):
385
- """Generate AI insights from negative feedback"""
386
-
387
- def run_task():
388
- service = GenerativeAIService(event_code)
389
- service.update_sentiment_summary_with_insights()
390
-
391
- background_tasks.add_task(run_task)
392
-
393
- return {
394
- "status": "started",
395
- "message": "Insight generation started"
396
- }
397
-
398
-
399
- # ===== MONITORING =====
400
- @app.get("/api/monitoring/pipelines/{pipeline}/metrics", tags=["Monitoring"])
401
- async def get_pipeline_metrics(
402
- pipeline: str,
403
- event_code: Optional[str] = None,
404
- days: int = 7
405
- ):
406
- """Get performance metrics"""
407
- # TODO: Implement based on monitoring.py
408
- return {
409
- "pipeline": pipeline,
410
- "event_code": event_code,
411
- "message": "Metrics endpoint - implement as needed"
412
- }
413
-
414
-
415
- # ===== ADMIN =====
416
- @app.post("/api/admin/indexes/create", tags=["Admin"])
417
- async def create_indexes():
418
- """Create MongoDB indexes"""
419
- from scripts.create_indexes import create_all_indexes
420
-
421
- try:
422
- create_all_indexes()
423
- return {"status": "success", "message": "Indexes created"}
424
- except Exception as e:
425
- raise HTTPException(status_code=500, detail=str(e))
426
-
427
-
428
- # ===== ROOT =====
429
- @app.get("/")
430
- async def root():
431
- """API root"""
432
- return {
433
- "name": "Audience Segmentation AI - Event-Centric",
434
- "version": "2.0.0",
435
- "docs": "/api/docs",
436
- "health": "/health"
437
- }
438
-
439
-
440
- if __name__ == "__main__":
441
- import uvicorn
442
- uvicorn.run(
443
- "app:app",
444
- host="0.0.0.0",
445
- port=7860,
446
- reload=False
447
- )
 
1
+ """
2
+ FastAPI Application for Event-Centric Audience Segmentation AI
3
+ Author: AI Generated
4
+ Created: 2025-11-24 (Refactored)
5
+ Purpose: REST API with event-based endpoints
6
+ """
7
+
8
+ from fastapi import FastAPI, HTTPException, BackgroundTasks, status, Query
9
+ from fastapi.middleware.cors import CORSMiddleware
10
+ from pydantic import BaseModel
11
+ from typing import List, Dict, Optional, Any
12
+ from datetime import datetime
13
+ from bson import ObjectId
14
+
15
+ # Import services
16
+ from services.segmentation_service import SegmentationService
17
+ from services.sentiment_service import SentimentAnalysisService
18
+ from services.genai_service import GenerativeAIService
19
+ from database import db
20
+ from config import settings
21
+
22
+
23
+ # FastAPI app
24
+ app = FastAPI(
25
+ title="Audience Segmentation AI - Event-Centric",
26
+ description="REST API for per-event audience analysis",
27
+ version="2.0.0",
28
+ docs_url="/api/docs",
29
+ redoc_url="/api/redoc"
30
+ )
31
+
32
+ # CORS
33
+ app.add_middleware(
34
+ CORSMiddleware,
35
+ allow_origins=["*"],
36
+ allow_credentials=True,
37
+ allow_methods=["*"],
38
+ allow_headers=["*"],
39
+ )
40
+
41
+
42
+ # Helper
43
+ def serialize_doc(doc: Dict) -> Optional[Dict]:
44
+ """Convert MongoDB document to JSON-serializable dict"""
45
+ if doc is None:
46
+ return None
47
+ if '_id' in doc:
48
+ doc['id'] = str(doc.pop('_id'))
49
+
50
+ # Handle nested ObjectIds and lists
51
+ for key, value in list(doc.items()):
52
+ if isinstance(value, ObjectId):
53
+ doc[key] = str(value)
54
+ elif isinstance(value, list):
55
+ doc[key] = [str(v) if isinstance(v, ObjectId) else v for v in value]
56
+ elif isinstance(value, dict):
57
+ doc[key] = serialize_doc(value)
58
+
59
+ return doc
60
+
61
+
62
+ # ===== HEALTH =====
63
+ @app.get("/health", tags=["System"])
64
+ async def health_check():
65
+ """Health check"""
66
+ try:
67
+ db.client.server_info()
68
+ return {
69
+ "status": "healthy",
70
+ "timestamp": datetime.utcnow(),
71
+ "database": "connected"
72
+ }
73
+ except Exception as e:
74
+ raise HTTPException(
75
+ status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
76
+ detail=f"Unhealthy: {str(e)}"
77
+ )
78
+
79
+
80
+ # ===== EVENT ANALYSIS =====
81
+ @app.post("/api/events/{event_code}/analyze", tags=["Event Analysis"])
82
+ async def analyze_event(event_code: str, background_tasks: BackgroundTasks):
83
+ """Run full AI pipeline for an event"""
84
+
85
+ def run_pipeline():
86
+ # Step 1: Segmentation
87
+ seg_service = SegmentationService(event_code)
88
+ seg_service.run_segmentation()
89
+
90
+ # Step 2: Sentiment
91
+ sent_service = SentimentAnalysisService(event_code)
92
+ sent_service.analyze_event_comments()
93
+
94
+ # Step 3: Email generation
95
+ genai_service = GenerativeAIService(event_code)
96
+ genai_service.generate_emails_for_all_segments()
97
+
98
+ # Step 4: Insights
99
+ genai_service.update_sentiment_summary_with_insights()
100
+
101
+ background_tasks.add_task(run_pipeline)
102
+
103
+ return {
104
+ "status": "started",
105
+ "message": f"Analysis pipeline started for event {event_code}"
106
+ }
107
+
108
+
109
+ @app.get("/api/events/{event_code}/dashboard", tags=["Event Analysis"])
110
+ async def get_event_dashboard(event_code: str):
111
+ """Get complete dashboard for Event Owner"""
112
+
113
+ # Get segments
114
+ segments = list(db.event_audience_segments.find({"event_code": event_code}))
115
+
116
+ # Get sentiment summary
117
+ sentiment_summary = db.event_sentiment_summary.find_one({"event_code": event_code})
118
+
119
+ return {
120
+ "event_code": event_code,
121
+ "segments": [serialize_doc(s) for s in segments],
122
+ "sentiment_summary": serialize_doc(sentiment_summary) if sentiment_summary else None
123
+ }
124
+
125
+
126
+ # ===== SEGMENTATION =====
127
+ @app.post("/api/events/{event_code}/segmentation/run", tags=["Segmentation"])
128
+ async def run_event_segmentation(
129
+ event_code: str,
130
+ background_tasks: BackgroundTasks,
131
+ n_clusters: int = Query(default=5, ge=2, le=10)
132
+ ):
133
+ """Run segmentation for an event"""
134
+
135
+ def run_task():
136
+ service = SegmentationService(event_code, n_clusters=n_clusters)
137
+ service.run_segmentation()
138
+
139
+ background_tasks.add_task(run_task)
140
+
141
+ return {
142
+ "status": "started",
143
+ "message": f"Segmentation started for event {event_code}",
144
+ "event_code": event_code
145
+ }
146
+
147
+
148
+ @app.get("/api/events/{event_code}/segments", tags=["Segmentation"])
149
+ async def get_event_segments(
150
+ event_code: str,
151
+ status_filter: Optional[str] = Query(default=None, description="Filter by Draft, Approved, Sent")
152
+ ):
153
+ """Get all segments for an event"""
154
+
155
+ query = {"event_code": event_code}
156
+ if status_filter:
157
+ query["marketing_content.status"] = status_filter
158
+
159
+ segments = list(db.event_audience_segments.find(query))
160
+
161
+ return [serialize_doc(s) for s in segments]
162
+
163
+
164
+ @app.get("/api/events/{event_code}/segments/{segment_id}", tags=["Segmentation"])
165
+ async def get_segment_detail(event_code: str, segment_id: str):
166
+ """Get specific segment details"""
167
+
168
+ segment = db.event_audience_segments.find_one({
169
+ "_id": ObjectId(segment_id),
170
+ "event_code": event_code
171
+ })
172
+
173
+ if not segment:
174
+ raise HTTPException(status_code=404, detail="Segment not found")
175
+
176
+ return serialize_doc(segment)
177
+
178
+
179
+ @app.get("/api/events/{event_code}/segments/{segment_id}/users", tags=["Segmentation"])
180
+ async def get_segment_users(
181
+ event_code: str,
182
+ segment_id: str,
183
+ skip: int = 0,
184
+ limit: int = 100
185
+ ):
186
+ """Get users in a segment with details"""
187
+
188
+ segment = db.event_audience_segments.find_one({
189
+ "_id": ObjectId(segment_id),
190
+ "event_code": event_code
191
+ })
192
+
193
+ if not segment:
194
+ raise HTTPException(status_code=404, detail="Segment not found")
195
+
196
+ user_ids = segment.get('user_ids', [])
197
+ total_users = len(user_ids)
198
+
199
+ # Paginate
200
+ paginated_ids = user_ids[skip:skip + limit]
201
+
202
+ # Get user details
203
+ users = list(db.users.find({
204
+ "_id": {"$in": paginated_ids}
205
+ }))
206
+
207
+ # Enrich with stats (optional)
208
+ enriched_users = []
209
+ for user in users:
210
+ enriched_users.append({
211
+ "user_id": str(user['_id']),
212
+ "email": user.get('email'),
213
+ "full_name": f"{user.get('FirstName', '')} {user.get('LastName', '')}".strip()
214
+ })
215
+
216
+ return {
217
+ "segment_id": segment_id,
218
+ "total_users": total_users,
219
+ "users": enriched_users
220
+ }
221
+
222
+
223
+ # ===== APPROVAL WORKFLOW =====
224
+ @app.post("/api/events/{event_code}/segments/{segment_id}/approve", tags=["Approval"])
225
+ async def approve_segment(
226
+ event_code: str,
227
+ segment_id: str,
228
+ approved_by: Optional[str] = None,
229
+ modified_subject: Optional[str] = None,
230
+ modified_body: Optional[str] = None
231
+ ):
232
+ """Event Owner approves marketing content"""
233
+
234
+ segment = db.event_audience_segments.find_one({
235
+ "_id": ObjectId(segment_id),
236
+ "event_code": event_code
237
+ })
238
+
239
+ if not segment:
240
+ raise HTTPException(status_code=404, detail="Segment not found")
241
+
242
+ # Update fields
243
+ update = {
244
+ "marketing_content.status": "Approved",
245
+ "marketing_content.approved_at": datetime.utcnow(),
246
+ "marketing_content.approved_by": approved_by,
247
+ "last_updated": datetime.utcnow()
248
+ }
249
+
250
+ if modified_subject:
251
+ update["marketing_content.email_subject"] = modified_subject
252
+ if modified_body:
253
+ update["marketing_content.email_body"] = modified_body
254
+
255
+ db.event_audience_segments.update_one(
256
+ {"_id": ObjectId(segment_id)},
257
+ {"$set": update}
258
+ )
259
+
260
+ updated_segment = db.event_audience_segments.find_one({"_id": ObjectId(segment_id)})
261
+
262
+ return {
263
+ "status": "success",
264
+ "message": "Segment approved",
265
+ "segment_id": segment_id,
266
+ "marketing_content": updated_segment.get('marketing_content')
267
+ }
268
+
269
+
270
+ @app.post("/api/events/{event_code}/segments/{segment_id}/send-email", tags=["Approval"])
271
+ async def send_segment_email(
272
+ event_code: str,
273
+ segment_id: str,
274
+ send_immediately: bool = True
275
+ ):
276
+ """Send approved marketing email"""
277
+
278
+ segment = db.event_audience_segments.find_one({
279
+ "_id": ObjectId(segment_id),
280
+ "event_code": event_code
281
+ })
282
+
283
+ if not segment:
284
+ raise HTTPException(status_code=404, detail="Segment not found")
285
+
286
+ marketing_content = segment.get('marketing_content', {})
287
+ if marketing_content.get('status') != "Approved":
288
+ raise HTTPException(status_code=400, detail="Segment not approved yet")
289
+
290
+ # TODO: Integrate with email service (SendGrid, AWS SES, etc.)
291
+ # For now, just mark as sent
292
+
293
+ db.event_audience_segments.update_one(
294
+ {"_id": ObjectId(segment_id)},
295
+ {"$set": {
296
+ "marketing_content.status": "Sent",
297
+ "last_updated": datetime.utcnow()
298
+ }}
299
+ )
300
+
301
+ return {
302
+ "status": "success",
303
+ "message": f"Email sent to {segment.get('user_count', 0)} users",
304
+ "segment_id": segment_id,
305
+ "emails_sent": segment.get('user_count', 0),
306
+ "emails_failed": 0
307
+ }
308
+
309
+
310
+ # ===== SENTIMENT =====
311
+ @app.post("/api/events/{event_code}/sentiment/analyze", tags=["Sentiment"])
312
+ async def analyze_event_sentiment(event_code: str, background_tasks: BackgroundTasks):
313
+ """Analyze sentiment for event comments"""
314
+
315
+ def run_task():
316
+ service = SentimentAnalysisService(event_code)
317
+ service.analyze_event_comments()
318
+
319
+ background_tasks.add_task(run_task)
320
+
321
+ return {
322
+ "status": "started",
323
+ "message": f"Sentiment analysis started for event {event_code}"
324
+ }
325
+
326
+
327
+ @app.get("/api/events/{event_code}/sentiment/summary", tags=["Sentiment"])
328
+ async def get_sentiment_summary(event_code: str):
329
+ """Get sentiment summary for an event"""
330
+
331
+ summary = db.event_sentiment_summary.find_one({"event_code": event_code})
332
+
333
+ if not summary:
334
+ raise HTTPException(status_code=404, detail="No sentiment data for this event")
335
+
336
+ return serialize_doc(summary)
337
+
338
+
339
+ @app.get("/api/events/{event_code}/sentiment/results", tags=["Sentiment"])
340
+ async def get_sentiment_results(
341
+ event_code: str,
342
+ sentiment_label: Optional[str] = None,
343
+ skip: int = 0,
344
+ limit: int = 100
345
+ ):
346
+ """Get detailed sentiment results"""
347
+
348
+ query = {"event_code": event_code}
349
+ if sentiment_label:
350
+ query["sentiment_label"] = sentiment_label
351
+
352
+ total = db.sentiment_results.count_documents(query)
353
+ results = list(
354
+ db.sentiment_results.find(query)
355
+ .sort("analyzed_at", -1)
356
+ .skip(skip)
357
+ .limit(limit)
358
+ )
359
+
360
+ return {
361
+ "total": total,
362
+ "results": [serialize_doc(r) for r in results]
363
+ }
364
+
365
+
366
+ # ===== GENAI =====
367
+ @app.post("/api/events/{event_code}/genai/generate-emails", tags=["GenAI"])
368
+ async def generate_event_emails(event_code: str, background_tasks: BackgroundTasks):
369
+ """Generate marketing emails for all segments"""
370
+
371
+ def run_task():
372
+ service = GenerativeAIService(event_code)
373
+ service.generate_emails_for_all_segments()
374
+
375
+ background_tasks.add_task(run_task)
376
+
377
+ return {
378
+ "status": "started",
379
+ "message": "Email generation started"
380
+ }
381
+
382
+
383
+ @app.post("/api/events/{event_code}/genai/generate-insights", tags=["GenAI"])
384
+ async def generate_event_insights(event_code: str, background_tasks: BackgroundTasks):
385
+ """Generate AI insights from negative feedback"""
386
+
387
+ def run_task():
388
+ service = GenerativeAIService(event_code)
389
+ service.update_sentiment_summary_with_insights()
390
+
391
+ background_tasks.add_task(run_task)
392
+
393
+ return {
394
+ "status": "started",
395
+ "message": "Insight generation started"
396
+ }
397
+
398
+
399
+ # ===== MONITORING =====
400
+ @app.get("/api/monitoring/pipelines/{pipeline}/metrics", tags=["Monitoring"])
401
+ async def get_pipeline_metrics(
402
+ pipeline: str,
403
+ event_code: Optional[str] = None,
404
+ days: int = 7
405
+ ):
406
+ """Get performance metrics"""
407
+ # TODO: Implement based on monitoring.py
408
+ return {
409
+ "pipeline": pipeline,
410
+ "event_code": event_code,
411
+ "message": "Metrics endpoint - implement as needed"
412
+ }
413
+
414
+
415
+ # ===== ADMIN =====
416
+ @app.post("/api/admin/indexes/create", tags=["Admin"])
417
+ async def create_indexes():
418
+ """Create MongoDB indexes"""
419
+ from scripts.create_indexes import create_all_indexes
420
+
421
+ try:
422
+ create_all_indexes()
423
+ return {"status": "success", "message": "Indexes created"}
424
+ except Exception as e:
425
+ raise HTTPException(status_code=500, detail=str(e))
426
+
427
+
428
+ # ===== ROOT =====
429
+ @app.get("/")
430
+ async def root():
431
+ """API root"""
432
+ return {
433
+ "name": "Audience Segmentation AI - Event-Centric",
434
+ "version": "2.0.0",
435
+ "docs": "/api/docs",
436
+ "health": "/health"
437
+ }
438
+
439
+
440
+ if __name__ == "__main__":
441
+ import uvicorn
442
+ uvicorn.run(
443
+ "app:app",
444
+ host="0.0.0.0",
445
+ port=7860,
446
+ reload=True
447
+ )
config.py CHANGED
@@ -31,9 +31,8 @@ class Settings(BaseSettings):
31
  # AI Models
32
  SENTIMENT_MODEL: str = "wonrax/phobert-base-vietnamese-sentiment"
33
 
34
- # Vistral LLM (Auto-download via Transformers)
35
  LLM_MODEL_NAME: str = os.getenv("LLM_MODEL_NAME", "Viet-Mistral/Vistral-7B-Chat")
36
- LLM_CACHE_DIR: str = os.getenv("LLM_CACHE_DIR", "./models/cache")
37
 
38
  # Clustering
39
  N_CLUSTERS: int = 5
 
31
  # AI Models
32
  SENTIMENT_MODEL: str = "wonrax/phobert-base-vietnamese-sentiment"
33
 
34
+ # Vistral LLM (Via HuggingFace Inference API)
35
  LLM_MODEL_NAME: str = os.getenv("LLM_MODEL_NAME", "Viet-Mistral/Vistral-7B-Chat")
 
36
 
37
  # Clustering
38
  N_CLUSTERS: int = 5
services/genai_service.py CHANGED
@@ -1,13 +1,12 @@
1
  """
2
  Event-Centric Generative AI Service
3
  Author: AI Generated
4
- Created: 2025-11-24 (Refactored with Transformers)
5
- Purpose: Generate marketing content and insights using Vistral-7B-Chat
6
  """
7
 
8
- import torch
9
- from transformers import AutoModelForCausalLM, AutoTokenizer
10
- from typing import Dict, List, Optional
11
  from datetime import datetime
12
  from bson import ObjectId
13
 
@@ -15,12 +14,12 @@ from database import db
15
  from config import settings
16
  from models.event_models import EventSentimentSummary, AIInsights, MarketingContent
17
  from services.monitoring import monitor
18
- from services.model_registry import registry
19
 
20
 
21
  class GenerativeAIService:
22
  """
23
- Event-centric GenAI using Vistral-7B-Chat via Transformers.
 
24
  """
25
 
26
  def __init__(self, event_code: str):
@@ -32,10 +31,7 @@ class GenerativeAIService:
32
  """
33
  self.event_code = event_code
34
  self.model_name = settings.LLM_MODEL_NAME
35
- self.cache_dir = settings.LLM_CACHE_DIR
36
- self.tokenizer = None
37
- self.model = None
38
- self.device = "cuda" if torch.cuda.is_available() else "cpu"
39
 
40
  # System prompt theo official docs
41
  self.system_prompt = (
@@ -45,99 +41,63 @@ class GenerativeAIService:
45
  "phân biệt chủng tộc, phân biệt giới tính, độc hại, nguy hiểm hoặc bất hợp pháp nào."
46
  )
47
 
48
- def load_model(self):
49
- """
50
- Load Vistral-7B-Chat model using Transformers.
51
- Auto-downloads from HuggingFace Hub.
52
- """
53
- try:
54
- print(f"🔄 Loading Vistral-7B-Chat: {self.model_name}")
55
- print(f" Device: {self.device}")
56
- print(f" Cache: {self.cache_dir}")
57
 
58
- # Load tokenizer
59
- self.tokenizer = AutoTokenizer.from_pretrained(
60
- self.model_name,
61
- cache_dir=self.cache_dir,
62
  token=settings.HF_TOKEN if settings.HF_TOKEN else None
63
  )
64
-
65
- # Load model with appropriate dtype
66
- dtype = torch.float16 if self.device == "cuda" else torch.float32
67
-
68
- print(f" Loading model with dtype={dtype}...")
69
- self.model = AutoModelForCausalLM.from_pretrained(
70
- self.model_name,
71
- torch_dtype=dtype,
72
- device_map="auto" if self.device == "cuda" else None,
73
- cache_dir=self.cache_dir,
74
- token=settings.HF_TOKEN if settings.HF_TOKEN else None,
75
- use_cache=True,
76
- low_cpu_mem_usage=True
77
- )
78
-
79
- # Move to device if CPU
80
- if self.device == "cpu":
81
- self.model = self.model.to(self.device)
82
-
83
- self.model.eval()
84
- print("✓ Model loaded successfully!")
85
-
86
- except Exception as e:
87
- print(f"❌ Error loading model: {str(e)}")
88
- print(f"⚠️ GenAI features will be disabled.")
89
- print(f" Segmentation and Sentiment analysis will still work.")
90
- self.model = None
91
- self.tokenizer = None
92
 
93
  def generate_text(self, prompt: str, max_new_tokens: int = 512) -> str:
94
  """
95
- Generate text using Vistral with chat template.
96
- """
97
- if not self.model or not self.tokenizer:
98
- self.load_model()
99
-
100
- if not self.model:
101
- return ""
102
-
103
- # Build conversation with system prompt
104
- conversation = [
105
- {"role": "system", "content": self.system_prompt},
106
- {"role": "user", "content": prompt}
107
- ]
108
-
109
- # Apply chat template
110
- input_ids = self.tokenizer.apply_chat_template(
111
- conversation,
112
- return_tensors="pt"
113
- ).to(self.model.device)
114
 
115
- # Generate
116
- with torch.no_grad():
117
- output_ids = self.model.generate(
118
- input_ids=input_ids,
119
- max_new_tokens=max_new_tokens,
120
- do_sample=True,
121
- top_p=0.95,
122
- top_k=40,
 
 
 
 
 
 
 
 
 
 
 
 
123
  temperature=0.7,
124
- repetition_penalty=1.05,
125
  )
 
 
 
 
126
 
127
- # Decode (only new tokens)
128
- generated_text = self.tokenizer.batch_decode(
129
- output_ids[:, input_ids.size(1):],
130
- skip_special_tokens=True
131
- )[0].strip()
132
-
133
- return generated_text
134
 
135
  def generate_email_for_segment(self, segment: Dict) -> MarketingContent:
136
  """
137
  Generate personalized email for a segment.
138
  """
139
  # Get event info
140
- event = db.event_versions.find_one({"_id": self.event_code})
141
  event_name = event.get("EventName", "Sự kiện") if event else "Sự kiện"
142
 
143
  # Build prompt
@@ -161,6 +121,14 @@ BODY:
161
 
162
  generated = self.generate_text(prompt, max_new_tokens=400)
163
 
 
 
 
 
 
 
 
 
164
  # Parse response
165
  lines = generated.split('\n')
166
  subject = ""
@@ -199,13 +167,6 @@ BODY:
199
  print("=" * 60)
200
 
201
  try:
202
- if not self.model:
203
- self.load_model()
204
-
205
- if not self.model:
206
- print("⚠️ Model not available, skipping email generation")
207
- return
208
-
209
  # Find segments without marketing content
210
  segments = list(db.event_audience_segments.find({
211
  "event_code": self.event_code,
@@ -294,6 +255,14 @@ BODY:
294
  comments = list(db.post_social_media.aggregate(pipeline))
295
  negative_texts = [c.get('comment_text', '') for c in comments if c.get('comment_text')]
296
 
 
 
 
 
 
 
 
 
297
  # Build prompt
298
  comments_sample = "\n".join([f"- {text[:100]}" for text in negative_texts[:15]])
299
 
@@ -331,32 +300,32 @@ NPS: [số]
331
  suggestions = []
332
  predicted_nps = 60.0
333
 
334
- lines = generated.split('\n')
335
- current_section = None
336
-
337
- for line in lines:
338
- line = line.strip()
339
- if "TOP_ISSUES" in line or "VẤN ĐỀ" in line:
340
- current_section = "issues"
341
- elif "SUGGESTIONS" in line or "ĐỀ XUẤT" in line:
342
- current_section = "suggestions"
343
- elif "NPS" in line:
344
- try:
345
- # Extract number from line
346
- import re
347
- numbers = re.findall(r'\d+', line)
348
- if numbers:
349
- predicted_nps = float(numbers[0])
350
- except:
351
- pass
352
- elif current_section == "issues" and (line.startswith("-") or line[0].isdigit()):
353
- issue = line.lstrip("0123456789.-) ").strip()
354
- if issue and len(issue) > 5:
355
- top_issues.append(issue)
356
- elif current_section == "suggestions" and line.startswith("-"):
357
- suggestion = line.lstrip("- ").strip()
358
- if suggestion and len(suggestion) > 5:
359
- suggestions.append(suggestion)
360
 
361
  # Create summary
362
  total_comments = db.sentiment_results.count_documents({"event_code": self.event_code})
 
1
  """
2
  Event-Centric Generative AI Service
3
  Author: AI Generated
4
+ Created: 2025-11-24 (Using HuggingFace Inference API)
5
+ Purpose: Generate marketing content using Vistral-7B-Chat via API
6
  """
7
 
8
+ from huggingface_hub import InferenceClient
9
+ from typing import Dict, List
 
10
  from datetime import datetime
11
  from bson import ObjectId
12
 
 
14
  from config import settings
15
  from models.event_models import EventSentimentSummary, AIInsights, MarketingContent
16
  from services.monitoring import monitor
 
17
 
18
 
19
  class GenerativeAIService:
20
  """
21
+ Event-centric GenAI using Vistral-7B-Chat via HuggingFace Inference API.
22
+ Much faster and lighter than loading model locally.
23
  """
24
 
25
  def __init__(self, event_code: str):
 
31
  """
32
  self.event_code = event_code
33
  self.model_name = settings.LLM_MODEL_NAME
34
+ self.client = None
 
 
 
35
 
36
  # System prompt theo official docs
37
  self.system_prompt = (
 
41
  "phân biệt chủng tộc, phân biệt giới tính, độc hại, nguy hiểm hoặc bất hợp pháp nào."
42
  )
43
 
44
+ def get_client(self):
45
+ """Get or create InferenceClient"""
46
+ if not self.client:
47
+ print(f"🔄 Initializing HuggingFace Inference API")
48
+ print(f" Model: {self.model_name}")
 
 
 
 
49
 
50
+ self.client = InferenceClient(
51
+ model=self.model_name,
 
 
52
  token=settings.HF_TOKEN if settings.HF_TOKEN else None
53
  )
54
+ print("✓ Inference client ready!")
55
+
56
+ return self.client
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
 
58
  def generate_text(self, prompt: str, max_new_tokens: int = 512) -> str:
59
  """
60
+ Generate text using Vistral via HuggingFace Inference API.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61
 
62
+ Args:
63
+ prompt: User prompt
64
+ max_new_tokens: Max tokens to generate
65
+
66
+ Returns:
67
+ Generated text
68
+ """
69
+ try:
70
+ client = self.get_client()
71
+
72
+ # Build messages with system prompt
73
+ messages = [
74
+ {"role": "system", "content": self.system_prompt},
75
+ {"role": "user", "content": prompt}
76
+ ]
77
+
78
+ # Call Inference API
79
+ response = client.chat_completion(
80
+ messages=messages,
81
+ max_tokens=max_new_tokens,
82
  temperature=0.7,
83
+ top_p=0.95,
84
  )
85
+
86
+ # Extract generated text
87
+ generated = response.choices[0].message.content
88
+ return generated.strip()
89
 
90
+ except Exception as e:
91
+ print(f"❌ Error calling Inference API: {str(e)}")
92
+ print(f"⚠️ Returning empty response")
93
+ return ""
 
 
 
94
 
95
  def generate_email_for_segment(self, segment: Dict) -> MarketingContent:
96
  """
97
  Generate personalized email for a segment.
98
  """
99
  # Get event info
100
+ event = db.event_versions.find_one({"_id": ObjectId(self.event_code)})
101
  event_name = event.get("EventName", "Sự kiện") if event else "Sự kiện"
102
 
103
  # Build prompt
 
121
 
122
  generated = self.generate_text(prompt, max_new_tokens=400)
123
 
124
+ if not generated:
125
+ return MarketingContent(
126
+ email_subject=f"Ưu đãi đặc biệt cho {segment['segment_name']}",
127
+ email_body="Nội dung email sẽ được tạo khi API khả dụng.",
128
+ status="Draft",
129
+ generated_at=datetime.utcnow()
130
+ )
131
+
132
  # Parse response
133
  lines = generated.split('\n')
134
  subject = ""
 
167
  print("=" * 60)
168
 
169
  try:
 
 
 
 
 
 
 
170
  # Find segments without marketing content
171
  segments = list(db.event_audience_segments.find({
172
  "event_code": self.event_code,
 
255
  comments = list(db.post_social_media.aggregate(pipeline))
256
  negative_texts = [c.get('comment_text', '') for c in comments if c.get('comment_text')]
257
 
258
+ if not negative_texts:
259
+ return AIInsights(
260
+ summary="Không thể truy xuất nội dung feedback tiêu cực.",
261
+ top_issues=[],
262
+ improvement_suggestions=[],
263
+ predicted_nps=60.0
264
+ )
265
+
266
  # Build prompt
267
  comments_sample = "\n".join([f"- {text[:100]}" for text in negative_texts[:15]])
268
 
 
300
  suggestions = []
301
  predicted_nps = 60.0
302
 
303
+ if generated:
304
+ lines = generated.split('\n')
305
+ current_section = None
306
+
307
+ for line in lines:
308
+ line = line.strip()
309
+ if "TOP_ISSUES" in line or "VẤN ĐỀ" in line:
310
+ current_section = "issues"
311
+ elif "SUGGESTIONS" in line or "ĐỀ XUẤT" in line:
312
+ current_section = "suggestions"
313
+ elif "NPS" in line:
314
+ try:
315
+ import re
316
+ numbers = re.findall(r'\d+', line)
317
+ if numbers:
318
+ predicted_nps = float(numbers[0])
319
+ except:
320
+ pass
321
+ elif current_section == "issues" and (line.startswith("-") or line[0].isdigit()):
322
+ issue = line.lstrip("0123456789.-) ").strip()
323
+ if issue and len(issue) > 5:
324
+ top_issues.append(issue)
325
+ elif current_section == "suggestions" and line.startswith("-"):
326
+ suggestion = line.lstrip("- ").strip()
327
+ if suggestion and len(suggestion) > 5:
328
+ suggestions.append(suggestion)
329
 
330
  # Create summary
331
  total_comments = db.sentiment_results.count_documents({"event_code": self.event_code})