|
|
from typing import Any, List, Dict |
|
|
import json |
|
|
import time |
|
|
from datetime import datetime |
|
|
from confluent_kafka import Consumer, KafkaError, Message |
|
|
|
|
|
from pymongo import MongoClient |
|
|
from pymongo.database import Database |
|
|
from bson import ObjectId |
|
|
|
|
|
from app.core.config import settings |
|
|
from app.services.sentiment_service import SentimentService |
|
|
|
|
|
|
|
|
def process_message_batch( |
|
|
batch: List[Message], |
|
|
sentiment_service: SentimentService, |
|
|
db: Database, |
|
|
) -> None: |
|
|
""" |
|
|
Processes a batch of Kafka messages: performs sentiment analysis and updates all database collections. |
|
|
""" |
|
|
if not batch: |
|
|
return |
|
|
|
|
|
print(f"Processing a batch of {len(batch)} messages...") |
|
|
|
|
|
|
|
|
messages_data: List[Dict[str, Any]] = [] |
|
|
texts_to_predict: List[str] = [] |
|
|
|
|
|
for msg in batch: |
|
|
message_data = json.loads(msg.value().decode("utf-8")) |
|
|
messages_data.append(message_data) |
|
|
texts_to_predict.append( |
|
|
message_data.get("video_and_comment_data", {}).get("text", "") |
|
|
) |
|
|
|
|
|
if not texts_to_predict: |
|
|
print("Batch contains only empty comments after preprocessing. Skipping.") |
|
|
return |
|
|
|
|
|
|
|
|
predictions = sentiment_service.predict(texts_to_predict) |
|
|
|
|
|
|
|
|
video_id_cache: Dict[str, ObjectId] = {} |
|
|
comments_to_insert: List[Dict[str, Any]] = [] |
|
|
|
|
|
for message_data, prediction in zip(messages_data, predictions): |
|
|
entity_keyword = message_data.get("entity_keyword") |
|
|
entity_thumbnail = message_data.get("entity_thumbnail_url") |
|
|
entity_video_url = message_data.get("entity_video_url") |
|
|
entity_volume = message_data.get("entity_volume") |
|
|
interest_data = message_data.get("interest_over_time") |
|
|
data = message_data.get("video_and_comment_data", {}) |
|
|
|
|
|
video_id = data.get("video_id") |
|
|
video_title = data.get("video_title") |
|
|
video_publish_date_str = data.get("video_publish_date") |
|
|
video_url = data.get("video_url") |
|
|
|
|
|
sentiment_label = prediction["label"].lower() |
|
|
|
|
|
if not all([entity_keyword, entity_volume is not None, video_id]): |
|
|
continue |
|
|
|
|
|
|
|
|
entity_doc = db.entities.find_one_and_update( |
|
|
{"keyword": entity_keyword}, |
|
|
{ |
|
|
"$set": { |
|
|
"volume": entity_volume, |
|
|
"thumbnail_url": entity_thumbnail, |
|
|
"video_url": entity_video_url, |
|
|
}, |
|
|
"$setOnInsert": { |
|
|
"keyword": entity_keyword, |
|
|
"geo": settings.FETCH_TRENDS_GEO, |
|
|
|
|
|
|
|
|
|
|
|
"start_date": datetime.now(), |
|
|
}, |
|
|
}, |
|
|
upsert=True, |
|
|
return_document=True, |
|
|
) |
|
|
entity_id = entity_doc["_id"] |
|
|
|
|
|
|
|
|
source_id: ObjectId | None = video_id_cache.get(video_id) |
|
|
if not source_id: |
|
|
source_doc = db.sources_youtube.find_one_and_update( |
|
|
{"video_id": video_id}, |
|
|
{ |
|
|
"$set": {"entity_id": entity_id}, |
|
|
"$setOnInsert": { |
|
|
"entity_id": entity_id, |
|
|
"video_id": video_id, |
|
|
"url": video_url, |
|
|
"title": video_title, |
|
|
"publish_date": datetime.strptime( |
|
|
video_publish_date_str, "%Y-%m-%dT%H:%M:%SZ" |
|
|
), |
|
|
}, |
|
|
}, |
|
|
upsert=True, |
|
|
return_document=True, |
|
|
) |
|
|
source_id = source_doc["_id"] |
|
|
video_id_cache[video_id] = source_id |
|
|
|
|
|
|
|
|
comments_to_insert.append( |
|
|
{ |
|
|
"source_id": source_id, |
|
|
"comment_id": data.get("comment_id"), |
|
|
"text": data.get("text"), |
|
|
"author": data.get("author"), |
|
|
"publish_date": datetime.strptime( |
|
|
data.get("publish_date"), "%Y-%m-%dT%H:%M:%SZ" |
|
|
), |
|
|
"sentiment": sentiment_label, |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
db.analysis_results.update_one( |
|
|
{"entity_id": entity_id}, |
|
|
{ |
|
|
"$inc": { |
|
|
f"results.{sentiment_label}_count": 1, |
|
|
"results.total_comments": 1, |
|
|
}, |
|
|
"$setOnInsert": { |
|
|
"entity_id": entity_id, |
|
|
"analysis_type": "weekly", |
|
|
"created_at": datetime.now(), |
|
|
"status": "completed", |
|
|
"interest_over_time": interest_data, |
|
|
}, |
|
|
}, |
|
|
upsert=True, |
|
|
) |
|
|
|
|
|
|
|
|
if comments_to_insert: |
|
|
db.comments_youtube.insert_many(comments_to_insert) |
|
|
print(f"Inserted {len(comments_to_insert)} raw comments into database.") |
|
|
|
|
|
|
|
|
def run_consumer_job() -> None: |
|
|
""" |
|
|
This job consumes raw comments from Kafka in batches, performs sentiment analysis, |
|
|
and saves the results into MongoDB. |
|
|
""" |
|
|
|
|
|
print("Initializing services...") |
|
|
sentiment_service = SentimentService() |
|
|
mongo_client = MongoClient(settings.MONGODB_CONNECTION_STRING) |
|
|
db = mongo_client[settings.DB_NAME] |
|
|
|
|
|
kafka_conf = { |
|
|
"bootstrap.servers": "localhost:9092", |
|
|
"group.id": "sentiment_analyzer_group", |
|
|
"auto.offset.reset": "earliest", |
|
|
"enable.auto.commit": False, |
|
|
} |
|
|
consumer = Consumer(kafka_conf) |
|
|
consumer.subscribe([settings.KAFKA_TOPIC]) |
|
|
|
|
|
print("Consumer job started. Waiting for messages...") |
|
|
|
|
|
|
|
|
message_batch: List[Message] = [] |
|
|
last_process_time = time.time() |
|
|
|
|
|
try: |
|
|
while True: |
|
|
msg = consumer.poll(timeout=1.0) |
|
|
|
|
|
if msg is None: |
|
|
|
|
|
if message_batch and ( |
|
|
time.time() - last_process_time |
|
|
> settings.CONSUMER_BATCH_TIMEOUT_SECONDS |
|
|
): |
|
|
process_message_batch(message_batch, sentiment_service, db) |
|
|
consumer.commit(message=msg, asynchronous=False) |
|
|
|
|
|
message_batch.clear() |
|
|
last_process_time = time.time() |
|
|
continue |
|
|
|
|
|
if msg.error(): |
|
|
|
|
|
if msg.error().code() != KafkaError._PARTITION_EOF: |
|
|
print(f"Kafka error: {msg.error()}") |
|
|
continue |
|
|
|
|
|
|
|
|
message_batch.append(msg) |
|
|
if len(message_batch) >= settings.CONSUMER_BATCH_SIZE: |
|
|
process_message_batch(message_batch, sentiment_service, db) |
|
|
consumer.commit(message=msg, asynchronous=False) |
|
|
|
|
|
message_batch.clear() |
|
|
last_process_time = time.time() |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
print("Stopping consumer job...") |
|
|
|
|
|
process_message_batch(message_batch, sentiment_service, db) |
|
|
finally: |
|
|
consumer.close() |
|
|
mongo_client.close() |
|
|
print("Consumer and DB connection closed.") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
run_consumer_job() |
|
|
|