AlainDeLong commited on
Commit
564f32e
·
1 Parent(s): 1556ad1

feat(api): add exception quota exceed

Browse files
app/api/endpoints/analysis.py CHANGED
@@ -13,6 +13,7 @@ from trendspy import Trends
13
 
14
  from app.core.config import settings
15
  from app.core.clients import qstash_client
 
16
  from app.schemas.analysis_schema import (
17
  WeeklyTrendListResponse,
18
  TrendDetailResponseSchema,
@@ -306,7 +307,7 @@ async def create_on_demand_analysis(request_data: OnDemandRequestSchema):
306
 
307
  try:
308
  qstash_client.message.publish_json(
309
- url=callback_url, body={"keyword": keyword, "job_id": job_id}, retries=3
310
  )
311
  except Exception as e:
312
  # If publishing fails, update the job status to 'failed'
@@ -323,6 +324,7 @@ async def create_on_demand_analysis(request_data: OnDemandRequestSchema):
323
  async def get_analysis_status(job_id: str):
324
  """
325
  Checks the status of an on-demand analysis job from the 'on_demand_jobs' collection.
 
326
  """
327
  job = await db.on_demand_jobs.find_one({"_id": job_id})
328
 
@@ -334,6 +336,7 @@ async def get_analysis_status(job_id: str):
334
  "status": job["status"],
335
  "keyword": job["keyword"],
336
  "result": None,
 
337
  }
338
 
339
  # If job is completed, fetch the full result data
@@ -360,179 +363,261 @@ async def process_on_demand_job(request: Request):
360
  results to the database.
361
  """
362
  start = time.perf_counter()
 
363
  # 1. Initialization
364
  job_data = await request.json()
 
365
  keyword = job_data.get("keyword")
366
  job_id = job_data.get("job_id")
367
 
 
 
 
368
  if not keyword:
369
  # Acknowledge the request but do nothing if keyword is missing
370
- return {"message": "Keyword is missing, job ignored."}
 
 
 
 
 
371
 
372
- print(f"Processing job {job_id} for keyword: {keyword}")
373
  # Update job status to 'processing'
374
  await db.on_demand_jobs.update_one(
375
  {"_id": job_id},
376
  {"$set": {"status": "processing", "updated_at": datetime.now()}},
377
  )
 
378
 
379
- # 2. Fetch data (similar to a mini-producer)
380
- # Note: For on-demand, I might use a smaller fetching strategy
381
- videos = yt_service.search_videos(query_string=keyword)
382
- if not videos:
383
- print(f"No videos found for on-demand keyword: {keyword}")
384
- return {"message": "No videos found."}
385
-
386
- comments_for_entity: List[Dict[str, Any]] = []
387
- for video in videos:
388
- video_id = video.get("id", {}).get("videoId")
389
- snippet = video.get("snippet", {})
390
- if not video_id or not snippet:
391
- continue
392
-
393
- comments = yt_service.fetch_comments(
394
- video_id=video_id, limit=settings.ON_DEMAND_COMMENTS_PER_VIDEO
395
- ) # Smaller limit for on-demand
396
-
397
- for comment in comments:
398
- comment["video_id"] = video_id
399
- comment["video_title"] = snippet.get("title")
400
- comment["video_publish_date"] = snippet.get("publishedAt")
401
- comment["video_url"] = f"https://www.youtube.com/watch?v={video_id}"
402
- comments_for_entity.extend(comments)
403
-
404
- if (
405
- len(comments_for_entity) >= settings.ON_DEMAND_TOTAL_COMMENTS
406
- ): # Smaller total limit for on-demand
407
- break
408
-
409
- final_comments = comments_for_entity[: settings.ON_DEMAND_TOTAL_COMMENTS]
410
- if not final_comments:
411
- print(f"No comments found for on-demand keyword: {keyword}")
412
- return {"message": "No comments found."}
413
-
414
- # 3. Perform Sentiment Analysis
415
- print(f"Analyzing {len(final_comments)} comments in batches...")
416
- texts_to_predict = [comment.get("text", "") for comment in final_comments]
417
- predictions = sentiment_service.predict(texts_to_predict)
418
- print(f"Successfully analyzed {len(final_comments)} comments!!!")
419
-
420
- # 4. Save raw data and aggregate counts in memory to Database (similar to a mini-consumer)
421
-
422
- # 4a. Upsert Entity first to get a stable entity_id
423
- video_id = videos[0].get("id", {}).get("videoId", "")
424
- entity_video_url = f"https://www.youtube.com/watch?v={video_id}"
425
- entity_thumbnail_url = (
426
- videos[0].get("snippet", {}).get("thumbnails", {}).get("high", {}).get("url")
427
- )
428
-
429
- entity_doc = await db.entities.find_one_and_update(
430
- {"keyword": keyword},
431
- {
432
- "$set": {
433
- "thumbnail_url": entity_thumbnail_url,
434
- "video_url": entity_video_url,
435
- },
436
- "$setOnInsert": {
437
- "keyword": keyword,
438
- "geo": settings.FETCH_TRENDS_GEO,
439
- "volume": 0, # Placeholder values
440
- "start_date": datetime.now(),
441
- },
442
- },
443
- upsert=True,
444
- return_document=True,
445
- )
446
- entity_id = entity_doc["_id"]
447
-
448
- # 4b. Process and save each comment
449
- # Initialize in-memory counters
450
- sentiment_counts = {"positive": 0, "negative": 0, "neutral": 0}
451
-
452
- video_id_cache: Dict[str, ObjectId] = {}
453
- comments_to_insert: List[Dict[str, Any]] = []
454
 
455
- for comment_data, prediction in zip(final_comments, predictions):
456
- sentiment_label = prediction["label"].lower()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
457
 
458
- # Increment the counter in memory instead of calling the DB
459
- sentiment_counts[sentiment_label] += 1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
460
 
461
- # Upsert Source Video
462
- video_id = comment_data.get("video_id")
463
- source_id: ObjectId | None = video_id_cache.get(video_id)
464
- if not source_id:
465
- source_doc = await db.sources_youtube.find_one_and_update(
466
- {"video_id": video_id},
467
  {
468
- "$set": {"entity_id": entity_id},
469
- "$setOnInsert": {
470
- "video_id": video_id,
471
- "url": comment_data.get("video_url"),
472
- "title": comment_data.get("video_title"),
473
- "publish_date": datetime.strptime(
474
- comment_data.get("video_publish_date"), "%Y-%m-%dT%H:%M:%SZ"
475
- ),
476
- },
477
- },
478
- upsert=True,
479
- return_document=True,
480
  )
481
- source_id = source_doc["_id"]
482
- video_id_cache[video_id] = source_id
483
 
484
- # Prepare comment for bulk insertion
485
- comments_to_insert.append(
 
 
 
 
 
486
  {
487
- "source_id": source_id,
488
- "comment_id": comment_data.get("comment_id"),
489
- "text": comment_data.get("text"),
490
- "author": comment_data.get("author"),
491
- "publish_date": datetime.strptime(
492
- comment_data.get("publish_date"), "%Y-%m-%dT%H:%M:%SZ"
493
- ),
494
- "sentiment": sentiment_label,
495
- }
 
 
 
 
 
 
 
496
  )
 
497
 
498
- # 4c. Bulk insert all comments after the loop
499
- if comments_to_insert:
500
- await db.comments_youtube.insert_many(comments_to_insert)
501
-
502
- # 4d. Update analysis_results only ONCE with the final aggregated counts
503
- analysis_result_doc = await db.analysis_results.find_one_and_update(
504
- {"entity_id": entity_id, "analysis_type": "on-demand"},
505
- {
506
- "$inc": {
507
- "results.positive_count": sentiment_counts["positive"],
508
- "results.negative_count": sentiment_counts["negative"],
509
- "results.neutral_count": sentiment_counts["neutral"],
510
- "results.total_comments": len(final_comments),
511
  },
512
- "$setOnInsert": {
513
- "entity_id": entity_id,
514
- "analysis_type": "on-demand",
515
- "created_at": datetime.now(),
516
- "status": "processing",
517
- "interest_over_time": [],
 
 
 
 
 
 
518
  },
519
- },
520
- upsert=True,
521
- return_document=True,
522
- )
523
- result_id = analysis_result_doc["_id"]
524
 
525
- # 4e. Final update to job status
526
- await db.on_demand_jobs.update_one(
527
- {"_id": job_id},
528
- {
529
- "$set": {
530
- "status": "completed",
531
- "result_id": result_id,
532
- "updated_at": datetime.now(),
533
- }
534
- },
535
- )
 
 
 
 
 
 
 
 
 
 
 
 
536
 
537
  end = time.perf_counter()
538
  print(
 
13
 
14
  from app.core.config import settings
15
  from app.core.clients import qstash_client
16
+ from app.core.exceptions import QuotaExceededError
17
  from app.schemas.analysis_schema import (
18
  WeeklyTrendListResponse,
19
  TrendDetailResponseSchema,
 
307
 
308
  try:
309
  qstash_client.message.publish_json(
310
+ url=callback_url, body={"keyword": keyword, "job_id": job_id}, retries=0
311
  )
312
  except Exception as e:
313
  # If publishing fails, update the job status to 'failed'
 
324
  async def get_analysis_status(job_id: str):
325
  """
326
  Checks the status of an on-demand analysis job from the 'on_demand_jobs' collection.
327
+ If complete or failed, it returns the final result or an error message.
328
  """
329
  job = await db.on_demand_jobs.find_one({"_id": job_id})
330
 
 
336
  "status": job["status"],
337
  "keyword": job["keyword"],
338
  "result": None,
339
+ "error_message": job.get("error_message"),
340
  }
341
 
342
  # If job is completed, fetch the full result data
 
363
  results to the database.
364
  """
365
  start = time.perf_counter()
366
+
367
  # 1. Initialization
368
  job_data = await request.json()
369
+ print(job_data)
370
  keyword = job_data.get("keyword")
371
  job_id = job_data.get("job_id")
372
 
373
+ if not job_id:
374
+ raise HTTPException(status_code=400, detail="Job ID is missing.")
375
+
376
  if not keyword:
377
  # Acknowledge the request but do nothing if keyword is missing
378
+ # If we have a job_id but no keyword, mark the job as failed.
379
+ await db.on_demand_jobs.update_one(
380
+ {"_id": job_id},
381
+ {"$set": {"status": "failed", "updated_at": datetime.now()}},
382
+ )
383
+ raise HTTPException(status_code=400, detail="Keyword is missing, job ignored.")
384
 
 
385
  # Update job status to 'processing'
386
  await db.on_demand_jobs.update_one(
387
  {"_id": job_id},
388
  {"$set": {"status": "processing", "updated_at": datetime.now()}},
389
  )
390
+ print(f"Processing job {job_id} for keyword: {keyword}")
391
 
392
+ try:
393
+ # 2. Fetch data (similar to a mini-producer)
394
+ # Note: For on-demand, I might use a smaller fetching strategy
395
+ videos = yt_service.search_videos(query_string=keyword)
396
+ if not videos:
397
+ error_msg: str = f"No videos found for on-demand keyword '{keyword}'."
398
+ print(error_msg)
399
+
400
+ # Update job status to failed and raise an exception
401
+ await db.on_demand_jobs.update_one(
402
+ {"_id": job_id},
403
+ {
404
+ "$set": {
405
+ "status": "failed",
406
+ "error_message": error_msg,
407
+ "updated_at": datetime.now(),
408
+ }
409
+ },
410
+ )
411
+ raise HTTPException(
412
+ status_code=404,
413
+ detail=error_msg,
414
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
415
 
416
+ comments_for_entity: List[Dict[str, Any]] = []
417
+ for video in videos:
418
+ video_id = video.get("id", {}).get("videoId")
419
+ snippet = video.get("snippet", {})
420
+ if not video_id or not snippet:
421
+ continue
422
+
423
+ comments = yt_service.fetch_comments(
424
+ video_id=video_id, limit=settings.ON_DEMAND_COMMENTS_PER_VIDEO
425
+ ) # Smaller limit for on-demand
426
+
427
+ for comment in comments:
428
+ comment["video_id"] = video_id
429
+ comment["video_title"] = snippet.get("title")
430
+ comment["video_publish_date"] = snippet.get("publishedAt")
431
+ comment["video_url"] = f"https://www.youtube.com/watch?v={video_id}"
432
+ comments_for_entity.extend(comments)
433
+
434
+ if (
435
+ len(comments_for_entity) >= settings.ON_DEMAND_TOTAL_COMMENTS
436
+ ): # Smaller total limit for on-demand
437
+ break
438
+
439
+ final_comments = comments_for_entity[: settings.ON_DEMAND_TOTAL_COMMENTS]
440
+ if not final_comments:
441
+ error_msg = f"No comments found for on-demand keyword '{keyword}'."
442
+ print(error_msg)
443
+
444
+ # Update job status to failed and raise an exception
445
+ await db.on_demand_jobs.update_one(
446
+ {"_id": job_id},
447
+ {
448
+ "$set": {
449
+ "status": "failed",
450
+ "error_message": error_msg,
451
+ "updated_at": datetime.now(),
452
+ }
453
+ },
454
+ )
455
+ raise HTTPException(status_code=404, detail=error_msg)
456
+
457
+ # 3. Perform Sentiment Analysis
458
+ print(f"Analyzing {len(final_comments)} comments in batches...")
459
+ texts_to_predict = [comment.get("text", "") for comment in final_comments]
460
+ predictions = sentiment_service.predict(texts_to_predict)
461
+ print(f"Successfully analyzed {len(final_comments)} comments!!!")
462
+
463
+ # 4. Save raw data and aggregate counts in memory to Database (similar to a mini-consumer)
464
+
465
+ # 4a. Upsert Entity first to get a stable entity_id
466
+ video_id = videos[0].get("id", {}).get("videoId", "")
467
+ entity_video_url = f"https://www.youtube.com/watch?v={video_id}"
468
+ entity_thumbnail_url = (
469
+ videos[0]
470
+ .get("snippet", {})
471
+ .get("thumbnails", {})
472
+ .get("high", {})
473
+ .get("url")
474
+ )
475
 
476
+ entity_doc = await db.entities.find_one_and_update(
477
+ {"keyword": keyword},
478
+ {
479
+ "$set": {
480
+ "thumbnail_url": entity_thumbnail_url,
481
+ "video_url": entity_video_url,
482
+ },
483
+ "$setOnInsert": {
484
+ "keyword": keyword,
485
+ "geo": settings.FETCH_TRENDS_GEO,
486
+ "volume": 0, # Placeholder values
487
+ "start_date": datetime.now(),
488
+ },
489
+ },
490
+ upsert=True,
491
+ return_document=True,
492
+ )
493
+ entity_id = entity_doc["_id"]
494
+
495
+ # 4b. Process and save each comment
496
+ # Initialize in-memory counters
497
+ sentiment_counts = {"positive": 0, "negative": 0, "neutral": 0}
498
+
499
+ video_id_cache: Dict[str, ObjectId] = {}
500
+ comments_to_insert: List[Dict[str, Any]] = []
501
+
502
+ for comment_data, prediction in zip(final_comments, predictions):
503
+ sentiment_label = prediction["label"].lower()
504
+
505
+ # Increment the counter in memory instead of calling the DB
506
+ sentiment_counts[sentiment_label] += 1
507
+
508
+ # Upsert Source Video
509
+ video_id = comment_data.get("video_id")
510
+ source_id: ObjectId | None = video_id_cache.get(video_id)
511
+ if not source_id:
512
+ source_doc = await db.sources_youtube.find_one_and_update(
513
+ {"video_id": video_id},
514
+ {
515
+ "$set": {"entity_id": entity_id},
516
+ "$setOnInsert": {
517
+ "video_id": video_id,
518
+ "url": comment_data.get("video_url"),
519
+ "title": comment_data.get("video_title"),
520
+ "publish_date": datetime.strptime(
521
+ comment_data.get("video_publish_date"),
522
+ "%Y-%m-%dT%H:%M:%SZ",
523
+ ),
524
+ },
525
+ },
526
+ upsert=True,
527
+ return_document=True,
528
+ )
529
+ source_id = source_doc["_id"]
530
+ video_id_cache[video_id] = source_id
531
 
532
+ # Prepare comment for bulk insertion
533
+ comments_to_insert.append(
 
 
 
 
534
  {
535
+ "source_id": source_id,
536
+ "comment_id": comment_data.get("comment_id"),
537
+ "text": comment_data.get("text"),
538
+ "author": comment_data.get("author"),
539
+ "publish_date": datetime.strptime(
540
+ comment_data.get("publish_date"), "%Y-%m-%dT%H:%M:%SZ"
541
+ ),
542
+ "sentiment": sentiment_label,
543
+ }
 
 
 
544
  )
 
 
545
 
546
+ # 4c. Bulk insert all comments after the loop
547
+ if comments_to_insert:
548
+ await db.comments_youtube.insert_many(comments_to_insert)
549
+
550
+ # 4d. Update analysis_results only ONCE with the final aggregated counts
551
+ analysis_result_doc = await db.analysis_results.find_one_and_update(
552
+ {"entity_id": entity_id, "analysis_type": "on-demand"},
553
  {
554
+ "$inc": {
555
+ "results.positive_count": sentiment_counts["positive"],
556
+ "results.negative_count": sentiment_counts["negative"],
557
+ "results.neutral_count": sentiment_counts["neutral"],
558
+ "results.total_comments": len(final_comments),
559
+ },
560
+ "$setOnInsert": {
561
+ "entity_id": entity_id,
562
+ "analysis_type": "on-demand",
563
+ "created_at": datetime.now(),
564
+ "status": "processing",
565
+ "interest_over_time": [],
566
+ },
567
+ },
568
+ upsert=True,
569
+ return_document=True,
570
  )
571
+ result_id = analysis_result_doc["_id"]
572
 
573
+ # 4e. Final update to job status
574
+ await db.on_demand_jobs.update_one(
575
+ {"_id": job_id},
576
+ {
577
+ "$set": {
578
+ "status": "completed",
579
+ "result_id": result_id,
580
+ "updated_at": datetime.now(),
581
+ }
 
 
 
 
582
  },
583
+ )
584
+ except QuotaExceededError as e: # Catch the specific QuotaExceededError
585
+ error_msg = str(e)
586
+ print(f"Quota exceeded for job {job_id}: {error_msg}")
587
+ await db.on_demand_jobs.update_one(
588
+ {"_id": job_id},
589
+ {
590
+ "$set": {
591
+ "status": "failed",
592
+ "error_message": error_msg,
593
+ "updated_at": datetime.now(),
594
+ }
595
  },
596
+ )
 
 
 
 
597
 
598
+ # Raise a generic exception to QStash
599
+ raise HTTPException(
600
+ status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=error_msg
601
+ )
602
+ except Exception as e: # The general exception handler set a message
603
+ # Use the actual exception message for the error_message
604
+ error_msg = str(e)
605
+ print(f"An error occurred processing job {job_id}: {error_msg}")
606
+ await db.on_demand_jobs.update_one(
607
+ {"_id": job_id},
608
+ {
609
+ "$set": {
610
+ "status": "failed",
611
+ "error_message": error_msg,
612
+ "updated_at": datetime.now(),
613
+ }
614
+ },
615
+ )
616
+
617
+ # Raise a generic exception to QStash
618
+ raise HTTPException(
619
+ status_code=500, detail="An internal processing error occurred."
620
+ )
621
 
622
  end = time.perf_counter()
623
  print(
app/core/exceptions.py ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ class QuotaExceededError(Exception):
2
+ """
3
+ Custom exception raised when the YouTube API quota is exceeded.
4
+ """
5
+
6
+ pass
app/schemas/analysis_schema.py CHANGED
@@ -115,3 +115,4 @@ class JobStatusResponseSchema(BaseModel):
115
  status: str
116
  keyword: str
117
  result: TrendDetailResponseSchema | None = None
 
 
115
  status: str
116
  keyword: str
117
  result: TrendDetailResponseSchema | None = None
118
+ error_message: str | None = None # To send error messages to the frontend
app/services/youtube_service.py CHANGED
@@ -1,5 +1,6 @@
1
  from typing import Any, List, Dict
2
  from app.core.config import settings
 
3
 
4
  from googleapiclient.discovery import build, Resource
5
  from googleapiclient.errors import HttpError
@@ -40,6 +41,12 @@ class YouTubeService:
40
  return response.get("items", [])
41
 
42
  except HttpError as e:
 
 
 
 
 
 
43
  print(
44
  f"An HTTP error {e.resp.status} occurred during video search: {e.content}"
45
  )
@@ -92,6 +99,12 @@ class YouTubeService:
92
  break
93
 
94
  except HttpError as e:
 
 
 
 
 
 
95
  # It's common for comments to be disabled, so we'll log it but not treat as a fatal error.
96
  if "commentsDisabled" in str(e.content):
97
  print(f"Comments are disabled for video {video_id}.")
 
1
  from typing import Any, List, Dict
2
  from app.core.config import settings
3
+ from app.core.exceptions import QuotaExceededError
4
 
5
  from googleapiclient.discovery import build, Resource
6
  from googleapiclient.errors import HttpError
 
41
  return response.get("items", [])
42
 
43
  except HttpError as e:
44
+ # Specific error handling for quota exceeded
45
+ content_str = e.content.decode("utf-8")
46
+ if e.resp.status == 403 and "quotaExceeded" in content_str:
47
+ print("YouTube API quota exceeded.")
48
+ raise QuotaExceededError("YouTube API quota exceeded")
49
+
50
  print(
51
  f"An HTTP error {e.resp.status} occurred during video search: {e.content}"
52
  )
 
99
  break
100
 
101
  except HttpError as e:
102
+ # Specific error handling for quota exceeded
103
+ content_str = e.content.decode("utf-8")
104
+ if e.resp.status == 403 and "quotaExceeded" in content_str:
105
+ print("YouTube API quota exceeded.")
106
+ raise QuotaExceededError("YouTube API quota has been exceeded.")
107
+
108
  # It's common for comments to be disabled, so we'll log it but not treat as a fatal error.
109
  if "commentsDisabled" in str(e.content):
110
  print(f"Comments are disabled for video {video_id}.")