|
|
from typing import Any, List, Dict |
|
|
import uuid |
|
|
import time |
|
|
from datetime import datetime |
|
|
from fastapi import APIRouter, HTTPException, status, Request |
|
|
|
|
|
import asyncio |
|
|
from motor.motor_asyncio import AsyncIOMotorClient |
|
|
from bson import ObjectId |
|
|
|
|
|
import pandas as pd |
|
|
from trendspy import Trends |
|
|
|
|
|
from app.core.config import settings |
|
|
from app.core.clients import qstash_client |
|
|
from app.core.exceptions import QuotaExceededError |
|
|
from app.schemas.analysis_schema import ( |
|
|
WeeklyTrendListResponse, |
|
|
TrendDetailResponseSchema, |
|
|
OnDemandRequestSchema, |
|
|
OnDemandResponseSchema, |
|
|
JobStatusResponseSchema, |
|
|
) |
|
|
from app.services.sentiment_service import SentimentService |
|
|
from app.services.youtube_service import YouTubeService |
|
|
|
|
|
|
|
|
|
|
|
router = APIRouter(prefix=settings.API_PREFIX_TRENDS) |
|
|
|
|
|
|
|
|
client = AsyncIOMotorClient(settings.MONGODB_CONNECTION_STRING) |
|
|
db = client[settings.DB_NAME] |
|
|
|
|
|
|
|
|
|
|
|
print("Initializing services...") |
|
|
tr = Trends(request_delay=2.0) |
|
|
yt_service = YouTubeService(api_key=settings.YT_API_KEY) |
|
|
sentiment_service = SentimentService() |
|
|
|
|
|
|
|
|
async def fetch_repr_comments(entity_id): |
|
|
|
|
|
source_docs = await db.sources_youtube.find({"entity_id": entity_id}).to_list( |
|
|
length=None |
|
|
) |
|
|
source_ids = [doc["_id"] for doc in source_docs] |
|
|
|
|
|
if not source_ids: |
|
|
return {"positive": [], "neutral": [], "negative": []} |
|
|
|
|
|
|
|
|
sentiments = ["positive", "neutral", "negative"] |
|
|
comment_tasks = [] |
|
|
limit = settings.REPRESENTATIVE_COMMENTS_LIMIT |
|
|
for sentiment in sentiments: |
|
|
task = ( |
|
|
db.comments_youtube.find( |
|
|
{"source_id": {"$in": source_ids}, "sentiment": sentiment}, |
|
|
{"text": 1, "author": 1, "publish_date": 1, "_id": 0}, |
|
|
) |
|
|
.sort("publish_date", -1) |
|
|
.limit(limit) |
|
|
.to_list(length=limit) |
|
|
) |
|
|
|
|
|
comment_tasks.append(task) |
|
|
|
|
|
results = await asyncio.gather(*comment_tasks) |
|
|
|
|
|
for sentiment_list in results: |
|
|
for comment in sentiment_list: |
|
|
if "publish_date" in comment and hasattr( |
|
|
comment["publish_date"], "isoformat" |
|
|
): |
|
|
comment["publish_date"] = comment["publish_date"].isoformat() |
|
|
|
|
|
return dict(zip(sentiments, results)) |
|
|
|
|
|
|
|
|
async def _get_full_entity_details( |
|
|
entity_id: ObjectId, analysis_type: str |
|
|
) -> Dict[str, Any] | None: |
|
|
""" |
|
|
Fetches all detailed data for an entity. It runs the database query, |
|
|
interest data fetching, and comment fetching as concurrent, independent tasks. |
|
|
""" |
|
|
|
|
|
async def fetch_main_data_task(): |
|
|
"""Fetches the main analysis data from the database.""" |
|
|
pipeline = [ |
|
|
{"$match": {"entity_id": entity_id, "analysis_type": analysis_type}}, |
|
|
{"$sort": {"created_at": -1}}, |
|
|
{"$limit": 1}, |
|
|
{ |
|
|
"$lookup": { |
|
|
"from": "entities", |
|
|
"localField": "entity_id", |
|
|
"foreignField": "_id", |
|
|
"as": "entity_info", |
|
|
} |
|
|
}, |
|
|
{"$unwind": "$entity_info"}, |
|
|
{ |
|
|
"$project": { |
|
|
"analysis_result_id": "$_id", |
|
|
"_id": {"$toString": "$entity_info._id"}, |
|
|
"keyword": "$entity_info.keyword", |
|
|
"thumbnail_url": "$entity_info.thumbnail_url", |
|
|
"representative_video_url": "$entity_info.video_url", |
|
|
"analysis": "$results", |
|
|
"interest_over_time": "$interest_over_time", |
|
|
} |
|
|
}, |
|
|
] |
|
|
results = await db.analysis_results.aggregate(pipeline).to_list(length=1) |
|
|
return results[0] if results else None |
|
|
|
|
|
|
|
|
main_data_task = fetch_main_data_task() |
|
|
comments_task = fetch_repr_comments(entity_id) |
|
|
|
|
|
main_data, rep_comments = await asyncio.gather(main_data_task, comments_task) |
|
|
|
|
|
if not main_data: |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
if not main_data.get("interest_over_time"): |
|
|
print( |
|
|
f"Interest data not found in DB for '{main_data['keyword']}'. Fetching live..." |
|
|
) |
|
|
|
|
|
def blocking_interest_fetch(keyword: str): |
|
|
"""Synchronous wrapper for the blocking trendspy call.""" |
|
|
df = tr.interest_over_time(keywords=[keyword], timeframe="now 7-d") |
|
|
if df.empty: |
|
|
return [] |
|
|
daily_df = df[[keyword]].resample("D").mean().round(0).astype(int) |
|
|
return [ |
|
|
{"date": index.strftime("%Y-%m-%d"), "value": int(row.iloc[0])} |
|
|
for index, row in daily_df.iterrows() |
|
|
] |
|
|
|
|
|
try: |
|
|
|
|
|
interest_data_to_cache = await asyncio.to_thread( |
|
|
blocking_interest_fetch, main_data["keyword"] |
|
|
) |
|
|
|
|
|
if interest_data_to_cache: |
|
|
main_data["interest_over_time"] = interest_data_to_cache |
|
|
await db.analysis_results.update_one( |
|
|
{"_id": main_data["analysis_result_id"]}, |
|
|
{"$set": {"interest_over_time": interest_data_to_cache}}, |
|
|
) |
|
|
print( |
|
|
f"Successfully cached interest data for '{main_data['keyword']}'." |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"Could not fetch live interest data: {e}") |
|
|
main_data["interest_over_time"] = [] |
|
|
|
|
|
|
|
|
main_data.pop("analysis_result_id", None) |
|
|
return {**main_data, "representative_comments": rep_comments} |
|
|
|
|
|
|
|
|
@router.get("/weekly", response_model=WeeklyTrendListResponse) |
|
|
async def get_weekly_trends(): |
|
|
""" |
|
|
Retrieves the latest weekly sentiment analysis results. |
|
|
|
|
|
This endpoint fetches data from the 'analysis_results' collection and |
|
|
joins it with the 'entities' collection to get keyword and thumbnail details. |
|
|
""" |
|
|
try: |
|
|
|
|
|
pipeline = [ |
|
|
|
|
|
{"$match": {"analysis_type": "weekly"}}, |
|
|
{"$sort": {"created_at": -1}}, |
|
|
{"$limit": settings.HOME_PAGE_ENTITIES_LIMIT}, |
|
|
|
|
|
{ |
|
|
"$lookup": { |
|
|
"from": "entities", |
|
|
"localField": "entity_id", |
|
|
"foreignField": "_id", |
|
|
"as": "entity_info", |
|
|
} |
|
|
}, |
|
|
|
|
|
{"$unwind": "$entity_info"}, |
|
|
|
|
|
{ |
|
|
"$project": { |
|
|
"_id": {"$toString": "$entity_info._id"}, |
|
|
"keyword": "$entity_info.keyword", |
|
|
"thumbnail_url": "$entity_info.thumbnail_url", |
|
|
"analysis": { |
|
|
"positive_count": "$results.positive_count", |
|
|
"negative_count": "$results.negative_count", |
|
|
"neutral_count": "$results.neutral_count", |
|
|
"total_comments": "$results.total_comments", |
|
|
}, |
|
|
} |
|
|
}, |
|
|
] |
|
|
|
|
|
results = await db.analysis_results.aggregate(pipeline).to_list(length=None) |
|
|
if not results: |
|
|
raise HTTPException(status_code=500, detail="Internal server error") |
|
|
|
|
|
response_data = {"data": results} |
|
|
return response_data |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
print(f"An error occurred: {e}") |
|
|
raise HTTPException(status_code=500, detail="Internal server error") |
|
|
|
|
|
|
|
|
@router.get("/{analysis_type}/{entity_id}", response_model=TrendDetailResponseSchema) |
|
|
async def get_trend_detail_by_type(analysis_type: str, entity_id: str): |
|
|
""" |
|
|
Retrieves detailed information for a single entity, specifying |
|
|
whether to fetch the 'weekly' or 'on-demand' analysis result. |
|
|
""" |
|
|
if analysis_type not in ["weekly", "on-demand"]: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail="Invalid analysis type. Must be 'weekly' or 'on-demand'.", |
|
|
) |
|
|
|
|
|
try: |
|
|
entity_obj_id = ObjectId(entity_id) |
|
|
except Exception: |
|
|
raise HTTPException(status_code=400, detail="Invalid entity ID format.") |
|
|
|
|
|
|
|
|
full_details = await _get_full_entity_details(entity_obj_id, analysis_type) |
|
|
|
|
|
if not full_details: |
|
|
raise HTTPException( |
|
|
status_code=404, |
|
|
detail=f"'{analysis_type}' analysis for this entity not found.", |
|
|
) |
|
|
|
|
|
return full_details |
|
|
|
|
|
|
|
|
@router.post( |
|
|
"/analysis/on-demand", |
|
|
status_code=status.HTTP_202_ACCEPTED, |
|
|
response_model=OnDemandResponseSchema, |
|
|
) |
|
|
async def create_on_demand_analysis(request_data: OnDemandRequestSchema): |
|
|
""" |
|
|
Handles an on-demand analysis request. |
|
|
First, it checks if a recent 'weekly' analysis for the keyword exists. |
|
|
If yes, it returns a 'found' status with the entity_id for immediate redirection. |
|
|
If not, it queues a new analysis job via QStash and returns a 'queued' status. |
|
|
""" |
|
|
if not request_data.keyword or not request_data.keyword.strip(): |
|
|
raise HTTPException(status_code=400, detail="Keyword cannot be empty.") |
|
|
|
|
|
|
|
|
keyword = request_data.keyword.lower().strip() |
|
|
|
|
|
|
|
|
entity = await db.entities.find_one({"keyword": keyword}) |
|
|
if entity: |
|
|
analysis = await db.analysis_results.find_one( |
|
|
{"entity_id": entity["_id"], "analysis_type": "weekly"} |
|
|
) |
|
|
if analysis: |
|
|
print( |
|
|
f"Found existing weekly analysis for '{keyword}'. Returning redirect info." |
|
|
) |
|
|
|
|
|
return {"status": "found", "entity_id": str(entity["_id"])} |
|
|
|
|
|
|
|
|
print(f"No weekly analysis found for '{keyword}'. Queuing a new job.") |
|
|
|
|
|
job_id = str(uuid.uuid4()) |
|
|
|
|
|
job_document = { |
|
|
"_id": job_id, |
|
|
"keyword": keyword, |
|
|
"status": "pending", |
|
|
"created_at": datetime.now(), |
|
|
"updated_at": datetime.now(), |
|
|
"result_id": None, |
|
|
} |
|
|
await db.on_demand_jobs.insert_one(job_document) |
|
|
|
|
|
|
|
|
callback_url = f"{settings.BASE_URL}{settings.API_PREFIX}{settings.API_VERSION}{settings.API_PREFIX_TRENDS}/analysis/process-job" |
|
|
|
|
|
print( |
|
|
f"Queuing job {job_id} for keyword '{keyword}' with callback to {callback_url}" |
|
|
) |
|
|
|
|
|
try: |
|
|
qstash_client.message.publish_json( |
|
|
url=callback_url, body={"keyword": keyword, "job_id": job_id}, retries=0 |
|
|
) |
|
|
except Exception as e: |
|
|
|
|
|
await db.on_demand_jobs.update_one( |
|
|
{"_id": job_id}, {"$set": {"status": "failed"}} |
|
|
) |
|
|
print(f"Error publishing to QStash: {e}") |
|
|
raise HTTPException(status_code=500, detail="Failed to queue analysis job.") |
|
|
|
|
|
return {"status": "queued", "job_id": job_id} |
|
|
|
|
|
|
|
|
@router.get("/analysis/status/{job_id}", response_model=JobStatusResponseSchema) |
|
|
async def get_analysis_status(job_id: str): |
|
|
""" |
|
|
Checks the status of an on-demand analysis job from the 'on_demand_jobs' collection. |
|
|
If complete or failed, it returns the final result or an error message. |
|
|
""" |
|
|
job = await db.on_demand_jobs.find_one({"_id": job_id}) |
|
|
|
|
|
if not job: |
|
|
raise HTTPException(status_code=404, detail="Job not found.") |
|
|
|
|
|
response_data = { |
|
|
"_id": job["_id"], |
|
|
"status": job["status"], |
|
|
"keyword": job["keyword"], |
|
|
"result": None, |
|
|
"error_message": job.get("error_message"), |
|
|
} |
|
|
|
|
|
|
|
|
if job["status"] == "completed" and job.get("result_id"): |
|
|
analysis_doc = await db.analysis_results.find_one({"_id": job["result_id"]}) |
|
|
|
|
|
|
|
|
if analysis_doc and analysis_doc.get("entity_id"): |
|
|
|
|
|
entity_id = analysis_doc["entity_id"] |
|
|
|
|
|
|
|
|
full_details = await _get_full_entity_details(entity_id, "on-demand") |
|
|
response_data["result"] = full_details |
|
|
|
|
|
return response_data |
|
|
|
|
|
|
|
|
@router.post("/analysis/process-job", include_in_schema=False) |
|
|
async def process_on_demand_job(request: Request): |
|
|
""" |
|
|
A webhook endpoint called by QStash to perform the full analysis for a |
|
|
single keyword. It fetches data, runs sentiment analysis, and saves all |
|
|
results to the database. |
|
|
""" |
|
|
start = time.perf_counter() |
|
|
|
|
|
|
|
|
job_data = await request.json() |
|
|
|
|
|
keyword = job_data.get("keyword") |
|
|
job_id = job_data.get("job_id") |
|
|
|
|
|
if not job_id: |
|
|
raise HTTPException(status_code=400, detail="Job ID is missing.") |
|
|
|
|
|
if not keyword: |
|
|
|
|
|
|
|
|
await db.on_demand_jobs.update_one( |
|
|
{"_id": job_id}, |
|
|
{"$set": {"status": "failed", "updated_at": datetime.now()}}, |
|
|
) |
|
|
raise HTTPException(status_code=400, detail="Keyword is missing, job ignored.") |
|
|
|
|
|
|
|
|
await db.on_demand_jobs.update_one( |
|
|
{"_id": job_id}, |
|
|
{"$set": {"status": "processing", "updated_at": datetime.now()}}, |
|
|
) |
|
|
print(f"Processing job {job_id} for keyword: {keyword}") |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
videos = yt_service.search_videos(query_string=keyword) |
|
|
if not videos: |
|
|
error_msg: str = ( |
|
|
f"No videos found for on-demand keyword '{keyword}' of job {job_id}." |
|
|
) |
|
|
print(error_msg) |
|
|
|
|
|
|
|
|
await db.on_demand_jobs.update_one( |
|
|
{"_id": job_id}, |
|
|
{ |
|
|
"$set": { |
|
|
"status": "failed", |
|
|
"error_message": error_msg, |
|
|
"updated_at": datetime.now(), |
|
|
} |
|
|
}, |
|
|
) |
|
|
raise HTTPException( |
|
|
status_code=404, |
|
|
detail=error_msg, |
|
|
) |
|
|
|
|
|
comments_for_entity: List[Dict[str, Any]] = [] |
|
|
for video in videos: |
|
|
video_id = video.get("id", {}).get("videoId") |
|
|
snippet = video.get("snippet", {}) |
|
|
if not video_id or not snippet: |
|
|
continue |
|
|
|
|
|
comments = yt_service.fetch_comments( |
|
|
video_id=video_id, limit=settings.ON_DEMAND_COMMENTS_PER_VIDEO |
|
|
) |
|
|
|
|
|
for comment in comments: |
|
|
comment["video_id"] = video_id |
|
|
comment["video_title"] = snippet.get("title") |
|
|
comment["video_publish_date"] = snippet.get("publishedAt") |
|
|
comment["video_url"] = f"https://www.youtube.com/watch?v={video_id}" |
|
|
comments_for_entity.extend(comments) |
|
|
|
|
|
if ( |
|
|
len(comments_for_entity) >= settings.ON_DEMAND_TOTAL_COMMENTS |
|
|
): |
|
|
break |
|
|
|
|
|
final_comments = comments_for_entity[: settings.ON_DEMAND_TOTAL_COMMENTS] |
|
|
if not final_comments: |
|
|
error_msg = ( |
|
|
f"No comments found for on-demand keyword '{keyword}' of job {job_id}." |
|
|
) |
|
|
print(error_msg) |
|
|
|
|
|
|
|
|
await db.on_demand_jobs.update_one( |
|
|
{"_id": job_id}, |
|
|
{ |
|
|
"$set": { |
|
|
"status": "failed", |
|
|
"error_message": error_msg, |
|
|
"updated_at": datetime.now(), |
|
|
} |
|
|
}, |
|
|
) |
|
|
raise HTTPException(status_code=404, detail=error_msg) |
|
|
|
|
|
|
|
|
print( |
|
|
f"Analyzing {len(final_comments)} comments in batches for job {job_id} to background thread..." |
|
|
) |
|
|
texts_to_predict = [comment.get("text", "") for comment in final_comments] |
|
|
predictions = await asyncio.to_thread( |
|
|
sentiment_service.predict, texts_to_predict |
|
|
) |
|
|
print( |
|
|
f"Successfully analyzed {len(final_comments)} comments for job {job_id}!!!" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
video_id = videos[0].get("id", {}).get("videoId", "") |
|
|
entity_video_url = f"https://www.youtube.com/watch?v={video_id}" |
|
|
entity_thumbnail_url = ( |
|
|
videos[0] |
|
|
.get("snippet", {}) |
|
|
.get("thumbnails", {}) |
|
|
.get("high", {}) |
|
|
.get("url") |
|
|
) |
|
|
|
|
|
entity_doc = await db.entities.find_one_and_update( |
|
|
{"keyword": keyword}, |
|
|
{ |
|
|
"$set": { |
|
|
"thumbnail_url": entity_thumbnail_url, |
|
|
"video_url": entity_video_url, |
|
|
}, |
|
|
"$setOnInsert": { |
|
|
"keyword": keyword, |
|
|
"geo": settings.FETCH_TRENDS_GEO, |
|
|
"volume": 0, |
|
|
"start_date": datetime.now(), |
|
|
}, |
|
|
}, |
|
|
upsert=True, |
|
|
return_document=True, |
|
|
) |
|
|
entity_id = entity_doc["_id"] |
|
|
|
|
|
|
|
|
|
|
|
sentiment_counts = {"positive": 0, "negative": 0, "neutral": 0} |
|
|
|
|
|
video_id_cache: Dict[str, ObjectId] = {} |
|
|
comments_to_insert: List[Dict[str, Any]] = [] |
|
|
|
|
|
for comment_data, prediction in zip(final_comments, predictions): |
|
|
sentiment_label = prediction["label"].lower() |
|
|
|
|
|
|
|
|
sentiment_counts[sentiment_label] += 1 |
|
|
|
|
|
|
|
|
video_id = comment_data.get("video_id") |
|
|
source_id: ObjectId | None = video_id_cache.get(video_id) |
|
|
if not source_id: |
|
|
source_doc = await db.sources_youtube.find_one_and_update( |
|
|
{"video_id": video_id}, |
|
|
{ |
|
|
"$set": {"entity_id": entity_id}, |
|
|
"$setOnInsert": { |
|
|
"video_id": video_id, |
|
|
"url": comment_data.get("video_url"), |
|
|
"title": comment_data.get("video_title"), |
|
|
"publish_date": datetime.strptime( |
|
|
comment_data.get("video_publish_date"), |
|
|
"%Y-%m-%dT%H:%M:%SZ", |
|
|
), |
|
|
}, |
|
|
}, |
|
|
upsert=True, |
|
|
return_document=True, |
|
|
) |
|
|
source_id = source_doc["_id"] |
|
|
video_id_cache[video_id] = source_id |
|
|
|
|
|
|
|
|
comments_to_insert.append( |
|
|
{ |
|
|
"source_id": source_id, |
|
|
"comment_id": comment_data.get("comment_id"), |
|
|
"text": comment_data.get("text"), |
|
|
"author": comment_data.get("author"), |
|
|
"publish_date": datetime.strptime( |
|
|
comment_data.get("publish_date"), "%Y-%m-%dT%H:%M:%SZ" |
|
|
), |
|
|
"sentiment": sentiment_label, |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
if comments_to_insert: |
|
|
await db.comments_youtube.insert_many(comments_to_insert) |
|
|
|
|
|
|
|
|
analysis_result_doc = await db.analysis_results.find_one_and_update( |
|
|
{"entity_id": entity_id, "analysis_type": "on-demand"}, |
|
|
{ |
|
|
"$inc": { |
|
|
"results.positive_count": sentiment_counts["positive"], |
|
|
"results.negative_count": sentiment_counts["negative"], |
|
|
"results.neutral_count": sentiment_counts["neutral"], |
|
|
"results.total_comments": len(final_comments), |
|
|
}, |
|
|
"$setOnInsert": { |
|
|
"entity_id": entity_id, |
|
|
"analysis_type": "on-demand", |
|
|
"created_at": datetime.now(), |
|
|
"status": "processing", |
|
|
"interest_over_time": [], |
|
|
}, |
|
|
}, |
|
|
upsert=True, |
|
|
return_document=True, |
|
|
) |
|
|
result_id = analysis_result_doc["_id"] |
|
|
|
|
|
|
|
|
await db.on_demand_jobs.update_one( |
|
|
{"_id": job_id}, |
|
|
{ |
|
|
"$set": { |
|
|
"status": "completed", |
|
|
"result_id": result_id, |
|
|
"updated_at": datetime.now(), |
|
|
} |
|
|
}, |
|
|
) |
|
|
except QuotaExceededError as e: |
|
|
error_msg = str(e) |
|
|
print(f"Quota exceeded for job {job_id}: {error_msg}") |
|
|
await db.on_demand_jobs.update_one( |
|
|
{"_id": job_id}, |
|
|
{ |
|
|
"$set": { |
|
|
"status": "failed", |
|
|
"error_message": error_msg, |
|
|
"updated_at": datetime.now(), |
|
|
} |
|
|
}, |
|
|
) |
|
|
|
|
|
|
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=error_msg |
|
|
) |
|
|
except Exception as e: |
|
|
|
|
|
error_msg = str(e) |
|
|
print(f"An error occurred processing job {job_id}: {error_msg}") |
|
|
await db.on_demand_jobs.update_one( |
|
|
{"_id": job_id}, |
|
|
{ |
|
|
"$set": { |
|
|
"status": "failed", |
|
|
"error_message": error_msg, |
|
|
"updated_at": datetime.now(), |
|
|
} |
|
|
}, |
|
|
) |
|
|
|
|
|
|
|
|
raise HTTPException( |
|
|
status_code=500, detail="An internal processing error occurred." |
|
|
) |
|
|
|
|
|
end = time.perf_counter() |
|
|
print( |
|
|
f"Successfully processed and saved analysis for job {job_id} in {end-start:.6f}" |
|
|
) |
|
|
return {"message": f"Job {job_id} for '{keyword}' processed successfully."} |
|
|
|