AlainDeLong commited on
Commit
59d66b9
·
1 Parent(s): 5ed3596

perf(api): refactor data insertion to enhance performance

Browse files
app/api/endpoints/analysis.py CHANGED
@@ -48,7 +48,7 @@ async def fetch_repr_comments(entity_id):
48
  if not source_ids:
49
  return {"positive": [], "neutral": [], "negative": []}
50
 
51
- # Fetch new comments for each sentiment
52
  sentiments = ["positive", "neutral", "negative"]
53
  comment_tasks = []
54
  limit = settings.REPRESENTATIVE_COMMENTS_LIMIT
@@ -409,36 +409,20 @@ async def process_on_demand_job(request: Request):
409
  print(f"No comments found for on-demand keyword: {keyword}")
410
  return {"message": "No comments found."}
411
 
412
- # 3. Perform Sentiment Analysis IN BATCHES
413
  print(f"Analyzing {len(final_comments)} comments in batches...")
414
- batch_size = settings.CONSUMER_BATCH_SIZE
415
- all_predictions = []
 
416
 
417
- # Loop through comments in chunks of batch_size
418
- for i in range(0, len(final_comments), batch_size):
419
- batch_comments = final_comments[i : i + batch_size]
420
- texts_to_predict = [comment.get("text", "") for comment in batch_comments]
421
-
422
- # Process one small batch at a time
423
- batch_predictions = sentiment_service.predict(texts_to_predict)
424
- all_predictions.extend(batch_predictions)
425
- print(f" - Processed batch {i // batch_size + 1}...")
426
-
427
- # 4. Save results to Database (similar to a mini-consumer)
428
- video_id_cache: Dict[str, ObjectId] = {}
429
- comments_to_insert: List[Dict[str, Any]] = []
430
 
431
  # 4a. Upsert Entity first to get a stable entity_id
432
- video_id = None
433
- for video in videos:
434
- video_id = video.get("id", {}).get("videoId", "")
435
- if video_id:
436
- break
437
-
438
  entity_thumbnail_url = (
439
  videos[0].get("snippet", {}).get("thumbnails", {}).get("high", {}).get("url")
440
  )
441
- entity_video_url = f"https://www.youtube.com/watch?v={video_id}"
442
 
443
  entity_doc = await db.entities.find_one_and_update(
444
  {"keyword": keyword},
@@ -451,8 +435,6 @@ async def process_on_demand_job(request: Request):
451
  "keyword": keyword,
452
  "geo": settings.FETCH_TRENDS_GEO,
453
  "volume": 0, # Placeholder values
454
- # "thumbnail_url": entity_thumbnail_url,
455
- # "video_url": entity_video_url,
456
  "start_date": datetime.now(),
457
  },
458
  },
@@ -462,9 +444,18 @@ async def process_on_demand_job(request: Request):
462
  entity_id = entity_doc["_id"]
463
 
464
  # 4b. Process and save each comment
465
- for comment_data, prediction in zip(final_comments, all_predictions):
 
 
 
 
 
 
466
  sentiment_label = prediction["label"].lower()
467
 
 
 
 
468
  # Upsert Source Video
469
  video_id = comment_data.get("video_id")
470
  source_id: ObjectId | None = video_id_cache.get(video_id)
@@ -474,7 +465,6 @@ async def process_on_demand_job(request: Request):
474
  {
475
  "$set": {"entity_id": entity_id},
476
  "$setOnInsert": {
477
- # "entity_id": entity_id,
478
  "video_id": video_id,
479
  "url": comment_data.get("video_url"),
480
  "title": comment_data.get("video_title"),
@@ -503,35 +493,34 @@ async def process_on_demand_job(request: Request):
503
  }
504
  )
505
 
506
- # Update aggregated results in real-time
507
- await db.analysis_results.update_one(
508
- {"entity_id": entity_id},
509
- {
510
- "$inc": {
511
- f"results.{sentiment_label}_count": 1,
512
- "results.total_comments": 1,
513
- },
514
- "$setOnInsert": {
515
- "entity_id": entity_id,
516
- "analysis_type": "on-demand", # Note the type
517
- "created_at": datetime.now(),
518
- "status": "completed",
519
- "interest_over_time": [],
520
- },
521
- },
522
- upsert=True,
523
- )
524
-
525
  # 4c. Bulk insert all comments after the loop
526
  if comments_to_insert:
527
  await db.comments_youtube.insert_many(comments_to_insert)
528
 
529
- # 4d. Final update to job status
530
- analysis_result_doc = await db.analysis_results.find_one(
531
- {"entity_id": entity_id, "analysis_type": "on-demand"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
532
  )
533
- result_id = analysis_result_doc["_id"] if analysis_result_doc else None
534
 
 
535
  await db.on_demand_jobs.update_one(
536
  {"_id": job_id},
537
  {
 
48
  if not source_ids:
49
  return {"positive": [], "neutral": [], "negative": []}
50
 
51
+ # Fetch newest comments for each sentiment
52
  sentiments = ["positive", "neutral", "negative"]
53
  comment_tasks = []
54
  limit = settings.REPRESENTATIVE_COMMENTS_LIMIT
 
409
  print(f"No comments found for on-demand keyword: {keyword}")
410
  return {"message": "No comments found."}
411
 
412
+ # 3. Perform Sentiment Analysis
413
  print(f"Analyzing {len(final_comments)} comments in batches...")
414
+ texts_to_predict = [comment.get("text", "") for comment in final_comments]
415
+ predictions = sentiment_service.predict(texts_to_predict)
416
+ print(f"Successfully analyzed {len(final_comments)} comments!!!")
417
 
418
+ # 4. Save raw data and aggregate counts in memory to Database (similar to a mini-consumer)
 
 
 
 
 
 
 
 
 
 
 
 
419
 
420
  # 4a. Upsert Entity first to get a stable entity_id
421
+ video_id = videos[0].get("id", {}).get("videoId", "")
422
+ entity_video_url = f"https://www.youtube.com/watch?v={video_id}"
 
 
 
 
423
  entity_thumbnail_url = (
424
  videos[0].get("snippet", {}).get("thumbnails", {}).get("high", {}).get("url")
425
  )
 
426
 
427
  entity_doc = await db.entities.find_one_and_update(
428
  {"keyword": keyword},
 
435
  "keyword": keyword,
436
  "geo": settings.FETCH_TRENDS_GEO,
437
  "volume": 0, # Placeholder values
 
 
438
  "start_date": datetime.now(),
439
  },
440
  },
 
444
  entity_id = entity_doc["_id"]
445
 
446
  # 4b. Process and save each comment
447
+ # Initialize in-memory counters
448
+ sentiment_counts = {"positive": 0, "negative": 0, "neutral": 0}
449
+
450
+ video_id_cache: Dict[str, ObjectId] = {}
451
+ comments_to_insert: List[Dict[str, Any]] = []
452
+
453
+ for comment_data, prediction in zip(final_comments, predictions):
454
  sentiment_label = prediction["label"].lower()
455
 
456
+ # Increment the counter in memory instead of calling the DB
457
+ sentiment_counts[sentiment_label] += 1
458
+
459
  # Upsert Source Video
460
  video_id = comment_data.get("video_id")
461
  source_id: ObjectId | None = video_id_cache.get(video_id)
 
465
  {
466
  "$set": {"entity_id": entity_id},
467
  "$setOnInsert": {
 
468
  "video_id": video_id,
469
  "url": comment_data.get("video_url"),
470
  "title": comment_data.get("video_title"),
 
493
  }
494
  )
495
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
496
  # 4c. Bulk insert all comments after the loop
497
  if comments_to_insert:
498
  await db.comments_youtube.insert_many(comments_to_insert)
499
 
500
+ # 4d. Update analysis_results only ONCE with the final aggregated counts
501
+ analysis_result_doc = await db.analysis_results.find_one_and_update(
502
+ {"entity_id": entity_id, "analysis_type": "on-demand"},
503
+ {
504
+ "$inc": {
505
+ "results.positive_count": sentiment_counts["positive"],
506
+ "results.negative_count": sentiment_counts["negative"],
507
+ "results.neutral_count": sentiment_counts["neutral"],
508
+ "results.total_comments": len(final_comments),
509
+ },
510
+ "$setOnInsert": {
511
+ "entity_id": entity_id,
512
+ "analysis_type": "on-demand",
513
+ "created_at": datetime.now(),
514
+ "status": "processing",
515
+ "interest_over_time": [],
516
+ },
517
+ },
518
+ upsert=True,
519
+ return_document=True,
520
  )
521
+ result_id = analysis_result_doc["_id"]
522
 
523
+ # 4e. Final update to job status
524
  await db.on_demand_jobs.update_one(
525
  {"_id": job_id},
526
  {
app/services/sentiment_service.py CHANGED
@@ -115,7 +115,7 @@ class SentimentService:
115
  }
116
  )
117
 
118
- print(f" - Processed batch {start // batch_size + 1}...")
119
 
120
  # Map predictions back to their original positions
121
  final_results: List[Dict[str, Any] | None] = [None] * len(texts)
 
115
  }
116
  )
117
 
118
+ # print(f" - Processed batch {start // batch_size + 1}...")
119
 
120
  # Map predictions back to their original positions
121
  final_results: List[Dict[str, Any] | None] = [None] * len(texts)