AlainDeLong commited on
Commit
5ed3596
·
1 Parent(s): 025ec54

perf(api): refactor data insertion to enhance performance

Browse files
app/api/endpoints/analysis.py CHANGED
@@ -2,12 +2,14 @@ from typing import Any, List, Dict
2
  import uuid
3
  from datetime import datetime
4
  from fastapi import APIRouter, HTTPException, status, Request
5
- from trendspy import Trends
6
 
7
  import asyncio
8
  from motor.motor_asyncio import AsyncIOMotorClient
9
  from bson import ObjectId
10
 
 
 
 
11
  from app.core.config import settings
12
  from app.core.clients import qstash_client
13
  from app.schemas.analysis_schema import (
@@ -46,7 +48,7 @@ async def fetch_repr_comments(entity_id):
46
  if not source_ids:
47
  return {"positive": [], "neutral": [], "negative": []}
48
 
49
- # Fetch 2 comments for each sentiment
50
  sentiments = ["positive", "neutral", "negative"]
51
  comment_tasks = []
52
  limit = settings.REPRESENTATIVE_COMMENTS_LIMIT
@@ -418,7 +420,7 @@ async def process_on_demand_job(request: Request):
418
  texts_to_predict = [comment.get("text", "") for comment in batch_comments]
419
 
420
  # Process one small batch at a time
421
- batch_predictions = sentiment_service.predict_batch(texts_to_predict)
422
  all_predictions.extend(batch_predictions)
423
  print(f" - Processed batch {i // batch_size + 1}...")
424
 
 
2
  import uuid
3
  from datetime import datetime
4
  from fastapi import APIRouter, HTTPException, status, Request
 
5
 
6
  import asyncio
7
  from motor.motor_asyncio import AsyncIOMotorClient
8
  from bson import ObjectId
9
 
10
+ import pandas as pd
11
+ from trendspy import Trends
12
+
13
  from app.core.config import settings
14
  from app.core.clients import qstash_client
15
  from app.schemas.analysis_schema import (
 
48
  if not source_ids:
49
  return {"positive": [], "neutral": [], "negative": []}
50
 
51
+ # Fetch new comments for each sentiment
52
  sentiments = ["positive", "neutral", "negative"]
53
  comment_tasks = []
54
  limit = settings.REPRESENTATIVE_COMMENTS_LIMIT
 
420
  texts_to_predict = [comment.get("text", "") for comment in batch_comments]
421
 
422
  # Process one small batch at a time
423
+ batch_predictions = sentiment_service.predict(texts_to_predict)
424
  all_predictions.extend(batch_predictions)
425
  print(f" - Processed batch {i // batch_size + 1}...")
426
 
app/core/config.py CHANGED
@@ -61,6 +61,9 @@ class Settings(BaseSettings):
61
  ON_DEMAND_COMMENTS_PER_VIDEO: int = 100
62
  ON_DEMAND_TOTAL_COMMENTS: int = 500
63
 
 
 
 
64
  # Pydantic model configuration to load from .env file
65
  model_config = SettingsConfigDict(
66
  env_file=".env", env_file_encoding="utf-8", extra="ignore"
 
61
  ON_DEMAND_COMMENTS_PER_VIDEO: int = 100
62
  ON_DEMAND_TOTAL_COMMENTS: int = 500
63
 
64
+ # Inference Batch Size
65
+ INFERENCE_BATCH_SIZE: int = 32
66
+
67
  # Pydantic model configuration to load from .env file
68
  model_config = SettingsConfigDict(
69
  env_file=".env", env_file_encoding="utf-8", extra="ignore"
app/scripts/consumer_job.py CHANGED
@@ -41,7 +41,7 @@ def process_message_batch(
41
  return
42
 
43
  # --- 2. Perform Batch Sentiment Analysis ---
44
- predictions = sentiment_service.predict_batch(texts_to_predict)
45
 
46
  # --- 3. Save data to Database ---
47
  video_id_cache: Dict[str, ObjectId] = {}
 
41
  return
42
 
43
  # --- 2. Perform Batch Sentiment Analysis ---
44
+ predictions = sentiment_service.predict(texts_to_predict)
45
 
46
  # --- 3. Save data to Database ---
47
  video_id_cache: Dict[str, ObjectId] = {}
app/services/sentiment_service.py CHANGED
@@ -1,8 +1,6 @@
1
  import os
2
- from pathlib import Path
3
  from typing import List, Dict, Any
4
-
5
- # from app.core.config import settings
6
 
7
  import torch
8
  import numpy as np
@@ -46,12 +44,6 @@ class SentimentService:
46
  self.device
47
  )
48
 
49
- # self.tokenizer = AutoTokenizer.from_pretrained(model_source)
50
- # self.config = AutoConfig.from_pretrained(model_source)
51
- # self.model = AutoModelForSequenceClassification.from_pretrained(
52
- # model_source
53
- # ).to(self.device)
54
-
55
  self.model.eval() # set model to inference mode
56
  print("Sentiment model loaded successfully.")
57
 
@@ -68,9 +60,10 @@ class SentimentService:
68
  new_text.append(t)
69
  return " ".join(new_text)
70
 
71
- def predict_batch(self, texts: List[str]) -> List[Dict[str, Any]]:
72
  """
73
- Predict sentiment for a batch of texts (batch size is assumed to be small).
 
74
  """
75
  # Preprocess all texts
76
  preprocessed_texts = [self._preprocess_text(text) for text in texts]
@@ -84,34 +77,45 @@ class SentimentService:
84
 
85
  indices, texts_to_predict = zip(*non_empty_texts_with_indices)
86
 
87
- # Tokenize the batch
88
- encoded_inputs = self.tokenizer(
89
- list(texts_to_predict),
90
- return_tensors="pt",
91
- padding=True,
92
- truncation=True,
93
- max_length=512,
94
- ).to(self.device)
95
-
96
- # Run inference
97
- with torch.no_grad():
98
- outputs = self.model(**encoded_inputs)
99
- logits = outputs.logits.detach().cpu().numpy()
100
 
101
- # Explicitly clear intermediate tensors from VRAM
102
- del encoded_inputs, outputs
103
- torch.cuda.empty_cache()
104
-
105
- # Apply softmax to get probabilities
106
- probs = softmax(logits, axis=1)
107
-
108
- # Map predictions to labels with highest probability
109
  predictions = []
110
- for prob in probs:
111
- max_idx = int(np.argmax(prob))
112
- predictions.append(
113
- {"label": self.config.id2label[max_idx], "score": float(prob[max_idx])}
114
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
115
 
116
  # Map predictions back to their original positions
117
  final_results: List[Dict[str, Any] | None] = [None] * len(texts)
 
1
  import os
 
2
  from typing import List, Dict, Any
3
+ from app.core.config import settings
 
4
 
5
  import torch
6
  import numpy as np
 
44
  self.device
45
  )
46
 
 
 
 
 
 
 
47
  self.model.eval() # set model to inference mode
48
  print("Sentiment model loaded successfully.")
49
 
 
60
  new_text.append(t)
61
  return " ".join(new_text)
62
 
63
+ def predict(self, texts: List[str]) -> List[Dict[str, Any]]:
64
  """
65
+ Predict sentiment for a batch of texts, splitting into sub-batches
66
+ for efficiency on CPU.
67
  """
68
  # Preprocess all texts
69
  preprocessed_texts = [self._preprocess_text(text) for text in texts]
 
77
 
78
  indices, texts_to_predict = zip(*non_empty_texts_with_indices)
79
 
80
+ # --- Define batch size for CPU ---
81
+ batch_size = settings.INFERENCE_BATCH_SIZE
 
 
 
 
 
 
 
 
 
 
 
82
 
 
 
 
 
 
 
 
 
83
  predictions = []
84
+ # --- Process in chunks ---
85
+ for start in range(0, len(texts_to_predict), batch_size):
86
+ sub_texts = texts_to_predict[start : start + batch_size]
87
+
88
+ # Tokenize
89
+ encoded_inputs = self.tokenizer(
90
+ list(sub_texts),
91
+ return_tensors="pt",
92
+ padding=True,
93
+ truncation=True,
94
+ max_length=512,
95
+ ).to(self.device)
96
+
97
+ # Inference
98
+ with torch.no_grad():
99
+ outputs = self.model(**encoded_inputs)
100
+ logits = outputs.logits.detach().cpu().numpy()
101
+
102
+ # Clear memory
103
+ del encoded_inputs, outputs
104
+ if torch.cuda.is_available():
105
+ torch.cuda.empty_cache()
106
+
107
+ # Softmax + map to labels
108
+ probs = softmax(logits, axis=1)
109
+ for prob in probs:
110
+ max_idx = int(np.argmax(prob))
111
+ predictions.append(
112
+ {
113
+ "label": self.config.id2label[max_idx],
114
+ "score": float(prob[max_idx]),
115
+ }
116
+ )
117
+
118
+ print(f" - Processed batch {start // batch_size + 1}...")
119
 
120
  # Map predictions back to their original positions
121
  final_results: List[Dict[str, Any] | None] = [None] * len(texts)