AlainDeLong commited on
Commit
54c2a57
·
1 Parent(s): a5411db

chore: preprare project for first deployment

Browse files
.gitignore ADDED
@@ -0,0 +1,210 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Byte-compiled / optimized / DLL files
2
+ __pycache__/
3
+ *.py[codz]
4
+ *$py.class
5
+
6
+ # C extensions
7
+ *.so
8
+
9
+ # Distribution / packaging
10
+ .Python
11
+ build/
12
+ develop-eggs/
13
+ dist/
14
+ downloads/
15
+ eggs/
16
+ .eggs/
17
+ lib/
18
+ lib64/
19
+ parts/
20
+ sdist/
21
+ var/
22
+ wheels/
23
+ share/python-wheels/
24
+ *.egg-info/
25
+ .installed.cfg
26
+ *.egg
27
+ MANIFEST
28
+
29
+ # PyInstaller
30
+ # Usually these files are written by a python script from a template
31
+ # before PyInstaller builds the exe, so as to inject date/other infos into it.
32
+ *.manifest
33
+ *.spec
34
+
35
+ # Installer logs
36
+ pip-log.txt
37
+ pip-delete-this-directory.txt
38
+
39
+ # Unit test / coverage reports
40
+ htmlcov/
41
+ .tox/
42
+ .nox/
43
+ .coverage
44
+ .coverage.*
45
+ .cache
46
+ nosetests.xml
47
+ coverage.xml
48
+ *.cover
49
+ *.py.cover
50
+ .hypothesis/
51
+ .pytest_cache/
52
+ cover/
53
+
54
+ # Translations
55
+ *.mo
56
+ *.pot
57
+
58
+ # Django stuff:
59
+ *.log
60
+ local_settings.py
61
+ db.sqlite3
62
+ db.sqlite3-journal
63
+
64
+ # Flask stuff:
65
+ instance/
66
+ .webassets-cache
67
+
68
+ # Scrapy stuff:
69
+ .scrapy
70
+
71
+ # Sphinx documentation
72
+ docs/_build/
73
+
74
+ # PyBuilder
75
+ .pybuilder/
76
+ target/
77
+
78
+ # Jupyter Notebook
79
+ .ipynb_checkpoints
80
+
81
+ # IPython
82
+ profile_default/
83
+ ipython_config.py
84
+
85
+ # pyenv
86
+ # For a library or package, you might want to ignore these files since the code is
87
+ # intended to run in multiple environments; otherwise, check them in:
88
+ # .python-version
89
+
90
+ # pipenv
91
+ # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
92
+ # However, in case of collaboration, if having platform-specific dependencies or dependencies
93
+ # having no cross-platform support, pipenv may install dependencies that don't work, or not
94
+ # install all needed dependencies.
95
+ #Pipfile.lock
96
+
97
+ # UV
98
+ # Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
99
+ # This is especially recommended for binary packages to ensure reproducibility, and is more
100
+ # commonly ignored for libraries.
101
+ #uv.lock
102
+
103
+ # poetry
104
+ # Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
105
+ # This is especially recommended for binary packages to ensure reproducibility, and is more
106
+ # commonly ignored for libraries.
107
+ # https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
108
+ #poetry.lock
109
+ #poetry.toml
110
+
111
+ # pdm
112
+ # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
113
+ # pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python.
114
+ # https://pdm-project.org/en/latest/usage/project/#working-with-version-control
115
+ #pdm.lock
116
+ #pdm.toml
117
+ .pdm-python
118
+ .pdm-build/
119
+
120
+ # pixi
121
+ # Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control.
122
+ #pixi.lock
123
+ # Pixi creates a virtual environment in the .pixi directory, just like venv module creates one
124
+ # in the .venv directory. It is recommended not to include this directory in version control.
125
+ .pixi
126
+
127
+ # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
128
+ __pypackages__/
129
+
130
+ # Celery stuff
131
+ celerybeat-schedule
132
+ celerybeat.pid
133
+
134
+ # SageMath parsed files
135
+ *.sage.py
136
+
137
+ # Environments
138
+ .env
139
+ .envrc
140
+ .venv
141
+ env/
142
+ venv/
143
+ ENV/
144
+ env.bak/
145
+ venv.bak/
146
+
147
+ # Spyder project settings
148
+ .spyderproject
149
+ .spyproject
150
+
151
+ # Rope project settings
152
+ .ropeproject
153
+
154
+ # mkdocs documentation
155
+ /site
156
+
157
+ # mypy
158
+ .mypy_cache/
159
+ .dmypy.json
160
+ dmypy.json
161
+
162
+ # Pyre type checker
163
+ .pyre/
164
+
165
+ # pytype static type analyzer
166
+ .pytype/
167
+
168
+ # Cython debug symbols
169
+ cython_debug/
170
+
171
+ # PyCharm
172
+ # JetBrains specific template is maintained in a separate JetBrains.gitignore that can
173
+ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
174
+ # and can be added to the global gitignore or merged into this file. For a more nuclear
175
+ # option (not recommended) you can uncomment the following to ignore the entire idea folder.
176
+ .idea/
177
+
178
+ # Abstra
179
+ # Abstra is an AI-powered process automation framework.
180
+ # Ignore directories containing user credentials, local state, and settings.
181
+ # Learn more at https://abstra.io/docs
182
+ .abstra/
183
+
184
+ # Visual Studio Code
185
+ # Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
186
+ # that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
187
+ # and can be added to the global gitignore or merged into this file. However, if you prefer,
188
+ # you could uncomment the following to ignore the entire vscode folder
189
+ .vscode/
190
+
191
+ # Ruff stuff:
192
+ .ruff_cache/
193
+
194
+ # PyPI configuration file
195
+ .pypirc
196
+
197
+ # Cursor
198
+ # Cursor is an AI-powered code editor. `.cursorignore` specifies files/directories to
199
+ # exclude from AI features like autocomplete and code analysis. Recommended for sensitive data
200
+ # refer to https://docs.cursor.com/context/ignore-files
201
+ .cursorignore
202
+ .cursorindexingignore
203
+
204
+ # Marimo
205
+ marimo/_static/
206
+ marimo/_lsp/
207
+ __marimo__/
208
+
209
+ # Kafka Commands
210
+ command.txt
Dockerfile ADDED
@@ -0,0 +1,16 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # use small official image
2
+ FROM python:3.11-slim
3
+
4
+ # set workdir
5
+ WORKDIR /app
6
+
7
+ # copy only requirements first (for cache)
8
+ COPY requirements.txt .
9
+
10
+ RUN pip install --no-cache-dir -r requirements.txt
11
+
12
+ # copy code
13
+ COPY . .
14
+
15
+ # uvicorn listen on 0.0.0.0:7860 (Spaces default port)
16
+ CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "7860", "--proxy-headers"]
README.md CHANGED
@@ -1,5 +1,5 @@
1
  ---
2
- title: Backend Sentiment System
3
  emoji: 👁
4
  colorFrom: pink
5
  colorTo: yellow
 
1
  ---
2
+ title: Develop Social Sentiment Analysis System
3
  emoji: 👁
4
  colorFrom: pink
5
  colorTo: yellow
app/__init__.py ADDED
File without changes
app/api/__init__.py ADDED
File without changes
app/api/endpoints/__init__.py ADDED
File without changes
app/api/endpoints/analysis.py ADDED
@@ -0,0 +1,548 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Any, List, Dict
2
+ import uuid
3
+ from datetime import datetime
4
+ from fastapi import APIRouter, HTTPException, status, Request
5
+
6
+ import asyncio
7
+ from motor.motor_asyncio import AsyncIOMotorClient
8
+ from bson import ObjectId
9
+
10
+ import pandas as pd
11
+ from trendspy import Trends
12
+
13
+ from app.core.config import settings
14
+ from app.core.clients import qstash_client, qstash_receiver
15
+ from app.schemas.analysis_schema import (
16
+ WeeklyTrendResponseSchema,
17
+ WeeklyTrendListResponse,
18
+ TrendDetailResponseSchema,
19
+ OnDemandRequestSchema,
20
+ OnDemandResponseSchema,
21
+ JobStatusResponseSchema,
22
+ )
23
+ from app.services.sentiment_service import SentimentService
24
+ from app.services.youtube_service import YouTubeService
25
+
26
+ # Create a router to organize endpoints
27
+ # router = APIRouter(prefix="/trends", tags=["trends"])
28
+ router = APIRouter(prefix=settings.API_PREFIX_TRENDS)
29
+
30
+ # --- MongoDB Connection ---
31
+ client = AsyncIOMotorClient(settings.MONGODB_CONNECTION_STRING)
32
+ db = client[settings.DB_NAME]
33
+
34
+ # Initialize services once when the application starts.
35
+ # This avoids reloading the heavy AI model on every request.
36
+ print("Initializing services...")
37
+ tr = Trends(request_delay=2.0)
38
+ yt_service = YouTubeService(api_key=settings.YT_API_KEY)
39
+ sentiment_service = SentimentService()
40
+
41
+
42
+ async def fetch_repr_comments(entity_id):
43
+ # Find all source videos linked to this entity
44
+ source_docs = await db.sources_youtube.find({"entity_id": entity_id}).to_list(
45
+ length=None
46
+ )
47
+ source_ids = [doc["_id"] for doc in source_docs]
48
+
49
+ if not source_ids:
50
+ return {"positive": [], "neutral": [], "negative": []}
51
+
52
+ # Fetch 2 comments for each sentiment
53
+ sentiments = ["positive", "neutral", "negative"]
54
+ comment_tasks = []
55
+ limit = settings.REPRESENTATIVE_COMMENTS_LIMIT
56
+ for sentiment in sentiments:
57
+ task = (
58
+ db.comments_youtube.find(
59
+ {"source_id": {"$in": source_ids}, "sentiment": sentiment},
60
+ {"text": 1, "author": 1, "publish_date": 1, "_id": 0},
61
+ )
62
+ .sort("publish_date", -1)
63
+ .limit(limit)
64
+ .to_list(length=limit)
65
+ )
66
+
67
+ comment_tasks.append(task)
68
+
69
+ results = await asyncio.gather(*comment_tasks)
70
+ # Convert datetime objects to string format for JSON response
71
+ for sentiment_list in results:
72
+ for comment in sentiment_list:
73
+ if "publish_date" in comment and hasattr(
74
+ comment["publish_date"], "isoformat"
75
+ ):
76
+ comment["publish_date"] = comment["publish_date"].isoformat()
77
+
78
+ return dict(zip(sentiments, results))
79
+
80
+
81
+ async def _get_full_entity_details(
82
+ entity_id: ObjectId, analysis_type: str
83
+ ) -> Dict[str, Any] | None:
84
+ """
85
+ Fetches all detailed data for an entity. It runs the database query,
86
+ interest data fetching, and comment fetching as concurrent, independent tasks.
87
+ """
88
+
89
+ async def fetch_main_data_task():
90
+ """Fetches the main analysis data from the database."""
91
+ pipeline = [
92
+ {"$match": {"entity_id": entity_id, "analysis_type": analysis_type}},
93
+ {"$sort": {"created_at": -1}},
94
+ {"$limit": 1},
95
+ {
96
+ "$lookup": {
97
+ "from": "entities",
98
+ "localField": "entity_id",
99
+ "foreignField": "_id",
100
+ "as": "entity_info",
101
+ }
102
+ },
103
+ {"$unwind": "$entity_info"},
104
+ {
105
+ "$project": {
106
+ "analysis_result_id": "$_id",
107
+ "_id": {"$toString": "$entity_info._id"},
108
+ "keyword": "$entity_info.keyword",
109
+ "thumbnail_url": "$entity_info.thumbnail_url",
110
+ "representative_video_url": "$entity_info.video_url",
111
+ "analysis": "$results",
112
+ "interest_over_time": "$interest_over_time",
113
+ }
114
+ },
115
+ ]
116
+ results = await db.analysis_results.aggregate(pipeline).to_list(length=1)
117
+ return results[0] if results else None
118
+
119
+ # Run the main DB query and comment fetching concurrently
120
+ main_data_task = fetch_main_data_task()
121
+ comments_task = fetch_repr_comments(entity_id)
122
+
123
+ main_data, rep_comments = await asyncio.gather(main_data_task, comments_task)
124
+
125
+ if not main_data:
126
+ # If the main entity/analysis is not found, we can't proceed.
127
+ return None
128
+
129
+ # Now, handle the interest data fetching based on the result of the main query
130
+ if not main_data.get("interest_over_time"):
131
+ print(
132
+ f"Interest data not found in DB for '{main_data['keyword']}'. Fetching live..."
133
+ )
134
+
135
+ def blocking_interest_fetch(keyword: str):
136
+ """Synchronous wrapper for the blocking trendspy call."""
137
+ df = tr.interest_over_time(keywords=[keyword], timeframe="now 7-d")
138
+ if df.empty:
139
+ return []
140
+ daily_df = df[[keyword]].resample("D").mean().round(0).astype(int)
141
+ return [
142
+ {"date": index.strftime("%Y-%m-%d"), "value": int(row.iloc[0])}
143
+ for index, row in daily_df.iterrows()
144
+ ]
145
+
146
+ try:
147
+ # Run the blocking call in a separate thread to not block the server
148
+ interest_data_to_cache = await asyncio.to_thread(
149
+ blocking_interest_fetch, main_data["keyword"]
150
+ )
151
+
152
+ if interest_data_to_cache:
153
+ main_data["interest_over_time"] = interest_data_to_cache
154
+ await db.analysis_results.update_one(
155
+ {"_id": main_data["analysis_result_id"]},
156
+ {"$set": {"interest_over_time": interest_data_to_cache}},
157
+ )
158
+ print(
159
+ f"Successfully cached interest data for '{main_data['keyword']}'."
160
+ )
161
+ except Exception as e:
162
+ print(f"Could not fetch live interest data: {e}")
163
+ main_data["interest_over_time"] = []
164
+
165
+ # Combine all results
166
+ main_data.pop("analysis_result_id", None)
167
+ return {**main_data, "representative_comments": rep_comments}
168
+
169
+
170
+ @router.get("/weekly", response_model=WeeklyTrendListResponse)
171
+ async def get_weekly_trends():
172
+ """
173
+ Retrieves the latest weekly sentiment analysis results.
174
+
175
+ This endpoint fetches data from the 'analysis_results' collection and
176
+ joins it with the 'entities' collection to get keyword and thumbnail details.
177
+ """
178
+ try:
179
+ # MongoDB Aggregation Pipeline to join collections
180
+ pipeline = [
181
+ # 1. Filter for weekly analysis and sort by date to get the latest run
182
+ {"$match": {"analysis_type": "weekly"}},
183
+ {"$sort": {"created_at": -1}},
184
+ {"$limit": settings.HOME_PAGE_ENTITIES_LIMIT},
185
+ # 2. Join with the 'entities' collection
186
+ {
187
+ "$lookup": {
188
+ "from": "entities",
189
+ "localField": "entity_id",
190
+ "foreignField": "_id",
191
+ "as": "entity_info",
192
+ }
193
+ },
194
+ # 3. Deconstruct the entity_info array
195
+ {"$unwind": "$entity_info"},
196
+ # 4. Project the final structure for the API response
197
+ {
198
+ "$project": {
199
+ "_id": {"$toString": "$entity_info._id"},
200
+ "keyword": "$entity_info.keyword",
201
+ "thumbnail_url": "$entity_info.thumbnail_url",
202
+ "analysis": {
203
+ "positive_count": "$results.positive_count",
204
+ "negative_count": "$results.negative_count",
205
+ "neutral_count": "$results.neutral_count",
206
+ "total_comments": "$results.total_comments",
207
+ },
208
+ }
209
+ },
210
+ ]
211
+
212
+ results = await db.analysis_results.aggregate(pipeline).to_list(length=None)
213
+ if not results:
214
+ raise HTTPException(status_code=500, detail="Internal server error")
215
+
216
+ response_data = {"data": results}
217
+ return response_data
218
+
219
+ except Exception as e:
220
+ # Log the error for debugging
221
+ print(f"An error occurred: {e}")
222
+ raise HTTPException(status_code=500, detail="Internal server error")
223
+
224
+
225
+ @router.get("/{analysis_type}/{entity_id}", response_model=TrendDetailResponseSchema)
226
+ async def get_trend_detail_by_type(analysis_type: str, entity_id: str):
227
+ """
228
+ Retrieves detailed information for a single entity, specifying
229
+ whether to fetch the 'weekly' or 'on-demand' analysis result.
230
+ """
231
+ if analysis_type not in ["weekly", "on-demand"]:
232
+ raise HTTPException(
233
+ status_code=400,
234
+ detail="Invalid analysis type. Must be 'weekly' or 'on-demand'.",
235
+ )
236
+
237
+ try:
238
+ entity_obj_id = ObjectId(entity_id)
239
+ except Exception:
240
+ raise HTTPException(status_code=400, detail="Invalid entity ID format.")
241
+
242
+ # Call the helper function with the specified type
243
+ full_details = await _get_full_entity_details(entity_obj_id, analysis_type)
244
+
245
+ if not full_details:
246
+ raise HTTPException(
247
+ status_code=404,
248
+ detail=f"'{analysis_type}' analysis for this entity not found.",
249
+ )
250
+
251
+ return full_details
252
+
253
+
254
+ @router.post(
255
+ "/analysis/on-demand",
256
+ status_code=status.HTTP_202_ACCEPTED,
257
+ response_model=OnDemandResponseSchema,
258
+ )
259
+ async def create_on_demand_analysis(request_data: OnDemandRequestSchema):
260
+ """
261
+ Handles an on-demand analysis request.
262
+ First, it checks if a recent 'weekly' analysis for the keyword exists.
263
+ If yes, it returns a 'found' status with the entity_id for immediate redirection.
264
+ If not, it queues a new analysis job via QStash and returns a 'queued' status.
265
+ """
266
+ if not request_data.keyword or not request_data.keyword.strip():
267
+ raise HTTPException(status_code=400, detail="Keyword cannot be empty.")
268
+
269
+ # Convert incoming keyword to lowercase for consistent matching
270
+ keyword = request_data.keyword.lower().strip()
271
+
272
+ # Check for existing weekly analysis
273
+ entity = await db.entities.find_one({"keyword": keyword})
274
+ if entity:
275
+ analysis = await db.analysis_results.find_one(
276
+ {"entity_id": entity["_id"], "analysis_type": "weekly"}
277
+ )
278
+ if analysis:
279
+ print(
280
+ f"Found existing weekly analysis for '{keyword}'. Returning redirect info."
281
+ )
282
+ # Return a different response if data already exists
283
+ return {"status": "found", "entity_id": str(entity["_id"])}
284
+
285
+ # If no existing analysis, proceed with queuing a new job
286
+ print(f"No weekly analysis found for '{keyword}'. Queuing a new job.")
287
+
288
+ job_id = str(uuid.uuid4())
289
+
290
+ job_document = {
291
+ "_id": job_id,
292
+ "keyword": keyword,
293
+ "status": "pending",
294
+ "created_at": datetime.now(),
295
+ "updated_at": datetime.now(),
296
+ "result_id": None,
297
+ }
298
+ await db.on_demand_jobs.insert_one(job_document)
299
+
300
+ # callback_url = f"{settings.BASE_URL}/api/v1/trends/analysis/process-job"
301
+ callback_url = f"{settings.BASE_URL}{settings.API_PREFIX}{settings.API_VERSION}{settings.API_PREFIX_TRENDS}/analysis/process-job"
302
+
303
+ print(
304
+ f"Queuing job {job_id} for keyword '{keyword}' with callback to {callback_url}"
305
+ )
306
+
307
+ try:
308
+ qstash_client.message.publish_json(
309
+ url=callback_url, body={"keyword": keyword, "job_id": job_id}, retries=3
310
+ )
311
+ except Exception as e:
312
+ # If publishing fails, update the job status to 'failed'
313
+ await db.on_demand_jobs.update_one(
314
+ {"_id": job_id}, {"$set": {"status": "failed"}}
315
+ )
316
+ print(f"Error publishing to QStash: {e}")
317
+ raise HTTPException(status_code=500, detail="Failed to queue analysis job.")
318
+
319
+ return {"status": "queued", "job_id": job_id}
320
+
321
+
322
+ @router.get("/analysis/status/{job_id}", response_model=JobStatusResponseSchema)
323
+ async def get_analysis_status(job_id: str):
324
+ """
325
+ Checks the status of an on-demand analysis job from the 'on_demand_jobs' collection.
326
+ """
327
+ job = await db.on_demand_jobs.find_one({"_id": job_id})
328
+
329
+ if not job:
330
+ raise HTTPException(status_code=404, detail="Job not found.")
331
+
332
+ response_data = {
333
+ "_id": job["_id"],
334
+ "status": job["status"],
335
+ "keyword": job["keyword"],
336
+ "result": None,
337
+ }
338
+
339
+ # If job is completed, fetch the full result data
340
+ if job["status"] == "completed" and job.get("result_id"):
341
+ analysis_doc = await db.analysis_results.find_one({"_id": job["result_id"]})
342
+
343
+ # Check if the analysis document exists and contains an entity_id
344
+ if analysis_doc and analysis_doc.get("entity_id"):
345
+ # Get the correct entity_id from the analysis document
346
+ entity_id = analysis_doc["entity_id"]
347
+
348
+ # Call the helper with the correct entity_id and type
349
+ full_details = await _get_full_entity_details(entity_id, "on-demand")
350
+ response_data["result"] = full_details
351
+
352
+ return response_data
353
+
354
+
355
+ @router.post("/analysis/process-job", include_in_schema=False)
356
+ async def process_on_demand_job(request: Request):
357
+ """
358
+ A webhook endpoint called by QStash to perform the full analysis for a
359
+ single keyword. It fetches data, runs sentiment analysis, and saves all
360
+ results to the database.
361
+ """
362
+ # 1. Initialization
363
+ job_data = await request.json()
364
+ keyword = job_data.get("keyword")
365
+ job_id = job_data.get("job_id")
366
+
367
+ if not keyword:
368
+ # Acknowledge the request but do nothing if keyword is missing
369
+ return {"message": "Keyword is missing, job ignored."}
370
+
371
+ print(f"Processing job {job_id} for keyword: {keyword}")
372
+ # Update job status to 'processing'
373
+ await db.on_demand_jobs.update_one(
374
+ {"_id": job_id},
375
+ {"$set": {"status": "processing", "updated_at": datetime.now()}},
376
+ )
377
+
378
+ # 2. Fetch data (similar to a mini-producer)
379
+ # Note: For on-demand, I might use a smaller fetching strategy
380
+ videos = yt_service.search_videos(query_string=keyword)
381
+ if not videos:
382
+ print(f"No videos found for on-demand keyword: {keyword}")
383
+ return {"message": "No videos found."}
384
+
385
+ comments_for_entity: List[Dict[str, Any]] = []
386
+ for video in videos:
387
+ video_id = video.get("id", {}).get("videoId")
388
+ snippet = video.get("snippet", {})
389
+ if not video_id or not snippet:
390
+ continue
391
+
392
+ comments = yt_service.fetch_comments(
393
+ video_id=video_id, limit=settings.ON_DEMAND_COMMENTS_PER_VIDEO
394
+ ) # Smaller limit for on-demand
395
+
396
+ for comment in comments:
397
+ comment["video_id"] = video_id
398
+ comment["video_title"] = snippet.get("title")
399
+ comment["video_publish_date"] = snippet.get("publishedAt")
400
+ comment["video_url"] = f"https://www.youtube.com/watch?v={video_id}"
401
+ comments_for_entity.extend(comments)
402
+
403
+ if (
404
+ len(comments_for_entity) >= settings.ON_DEMAND_TOTAL_COMMENTS
405
+ ): # Smaller total limit for on-demand
406
+ break
407
+
408
+ final_comments = comments_for_entity[: settings.ON_DEMAND_TOTAL_COMMENTS]
409
+ if not final_comments:
410
+ print(f"No comments found for on-demand keyword: {keyword}")
411
+ return {"message": "No comments found."}
412
+
413
+ # 3. Perform Sentiment Analysis IN BATCHES
414
+ print(f"Analyzing {len(final_comments)} comments in batches...")
415
+ batch_size = settings.CONSUMER_BATCH_SIZE
416
+ all_predictions = []
417
+
418
+ # Loop through comments in chunks of batch_size
419
+ for i in range(0, len(final_comments), batch_size):
420
+ batch_comments = final_comments[i : i + batch_size]
421
+ texts_to_predict = [comment.get("text", "") for comment in batch_comments]
422
+
423
+ # Process one small batch at a time
424
+ batch_predictions = sentiment_service.predict_batch(texts_to_predict)
425
+ all_predictions.extend(batch_predictions)
426
+ print(f" - Processed batch {i // batch_size + 1}...")
427
+
428
+ # 4. Save results to Database (similar to a mini-consumer)
429
+ video_id_cache: Dict[str, ObjectId] = {}
430
+ comments_to_insert: List[Dict[str, Any]] = []
431
+
432
+ # 4a. Upsert Entity first to get a stable entity_id
433
+ video_id = None
434
+ for video in videos:
435
+ video_id = video.get("id", {}).get("videoId", "")
436
+ if video_id:
437
+ break
438
+
439
+ entity_thumbnail_url = (
440
+ videos[0].get("snippet", {}).get("thumbnails", {}).get("high", {}).get("url")
441
+ )
442
+ entity_video_url = f"https://www.youtube.com/watch?v={video_id}"
443
+
444
+ entity_doc = await db.entities.find_one_and_update(
445
+ {"keyword": keyword},
446
+ {
447
+ "$set": {
448
+ "thumbnail_url": entity_thumbnail_url,
449
+ "video_url": entity_video_url,
450
+ },
451
+ "$setOnInsert": {
452
+ "keyword": keyword,
453
+ "geo": settings.FETCH_TRENDS_GEO,
454
+ "volume": 0, # Placeholder values
455
+ # "thumbnail_url": entity_thumbnail_url,
456
+ # "video_url": entity_video_url,
457
+ "start_date": datetime.now(),
458
+ },
459
+ },
460
+ upsert=True,
461
+ return_document=True,
462
+ )
463
+ entity_id = entity_doc["_id"]
464
+
465
+ # 4b. Process and save each comment
466
+ for comment_data, prediction in zip(final_comments, all_predictions):
467
+ sentiment_label = prediction["label"].lower()
468
+
469
+ # Upsert Source Video
470
+ video_id = comment_data.get("video_id")
471
+ source_id: ObjectId | None = video_id_cache.get(video_id)
472
+ if not source_id:
473
+ source_doc = await db.sources_youtube.find_one_and_update(
474
+ {"video_id": video_id},
475
+ {
476
+ "$set": {"entity_id": entity_id},
477
+ "$setOnInsert": {
478
+ # "entity_id": entity_id,
479
+ "video_id": video_id,
480
+ "url": comment_data.get("video_url"),
481
+ "title": comment_data.get("video_title"),
482
+ "publish_date": datetime.strptime(
483
+ comment_data.get("video_publish_date"), "%Y-%m-%dT%H:%M:%SZ"
484
+ ),
485
+ },
486
+ },
487
+ upsert=True,
488
+ return_document=True,
489
+ )
490
+ source_id = source_doc["_id"]
491
+ video_id_cache[video_id] = source_id
492
+
493
+ # Prepare comment for bulk insertion
494
+ comments_to_insert.append(
495
+ {
496
+ "source_id": source_id,
497
+ "comment_id": comment_data.get("comment_id"),
498
+ "text": comment_data.get("text"),
499
+ "author": comment_data.get("author"),
500
+ "publish_date": datetime.strptime(
501
+ comment_data.get("publish_date"), "%Y-%m-%dT%H:%M:%SZ"
502
+ ),
503
+ "sentiment": sentiment_label,
504
+ }
505
+ )
506
+
507
+ # Update aggregated results in real-time
508
+ await db.analysis_results.update_one(
509
+ {"entity_id": entity_id},
510
+ {
511
+ "$inc": {
512
+ f"results.{sentiment_label}_count": 1,
513
+ "results.total_comments": 1,
514
+ },
515
+ "$setOnInsert": {
516
+ "entity_id": entity_id,
517
+ "analysis_type": "on-demand", # Note the type
518
+ "created_at": datetime.now(),
519
+ "status": "completed",
520
+ "interest_over_time": [],
521
+ },
522
+ },
523
+ upsert=True,
524
+ )
525
+
526
+ # 4c. Bulk insert all comments after the loop
527
+ if comments_to_insert:
528
+ await db.comments_youtube.insert_many(comments_to_insert)
529
+
530
+ # 4d. Final update to job status
531
+ analysis_result_doc = await db.analysis_results.find_one(
532
+ {"entity_id": entity_id, "analysis_type": "on-demand"}
533
+ )
534
+ result_id = analysis_result_doc["_id"] if analysis_result_doc else None
535
+
536
+ await db.on_demand_jobs.update_one(
537
+ {"_id": job_id},
538
+ {
539
+ "$set": {
540
+ "status": "completed",
541
+ "result_id": result_id,
542
+ "updated_at": datetime.now(),
543
+ }
544
+ },
545
+ )
546
+
547
+ print(f"Successfully processed and saved analysis for job {job_id}")
548
+ return {"message": f"Job {job_id} for '{keyword}' processed successfully."}
app/cli.py ADDED
File without changes
app/core/__init__.py ADDED
File without changes
app/core/clients.py ADDED
@@ -0,0 +1,12 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from qstash import QStash, Receiver
2
+ from app.core.config import settings
3
+
4
+ # Initialize the QStash client for publishing messages
5
+ # qstash_client = QStash(token=settings.QSTASH_TOKEN, base_url=settings.QSTASH_URL)
6
+ qstash_client = QStash(token=settings.QSTASH_TOKEN)
7
+
8
+ # Initialize the QStash receiver for verifying incoming messages
9
+ qstash_receiver = Receiver(
10
+ current_signing_key=settings.QSTASH_CURRENT_SIGNING_KEY,
11
+ next_signing_key=settings.QSTASH_NEXT_SIGNING_KEY,
12
+ )
app/core/config.py ADDED
@@ -0,0 +1,85 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pydantic_settings import BaseSettings, SettingsConfigDict
2
+ from urllib.parse import quote_plus
3
+
4
+
5
+ class Settings(BaseSettings):
6
+ """
7
+ Manages application settings and loads environment variables from a .env file.
8
+ It constructs the complete MongoDB connection string from individual components.
9
+ """
10
+
11
+ # Database name
12
+ DB_NAME: str
13
+
14
+ # AI Model name
15
+ SENTIMENT_MODEL: str
16
+
17
+ # YouTube API Key
18
+ YT_API_KEY: str
19
+
20
+ # MongoDB connection details
21
+ MONGODB_URI: str
22
+ MONGODB_USR: str
23
+ MONGODB_PWD: str
24
+
25
+ # Kafka Name Topic
26
+ KAFKA_TOPIC: str
27
+
28
+ # Data Fetching Strategy Settings
29
+ FETCH_NUM_ENTITIES: int = 20
30
+ FETCH_VIDEOS_PER_ENTITY: int = 5
31
+ FETCH_COMMENTS_PER_VIDEO: int = 100
32
+ FETCH_TOTAL_COMMENTS_PER_ENTITY: int = 500
33
+ FETCH_TRENDS_GEO: str = "US"
34
+ FETCH_TRENDS_WITHIN_DAYS: int = 7
35
+
36
+ # Consumer Performance Settings
37
+ CONSUMER_BATCH_SIZE: int = 32
38
+ CONSUMER_BATCH_TIMEOUT_SECONDS: int = 5
39
+
40
+ # FastAPI
41
+ API_PREFIX: str = "/api"
42
+ API_VERSION: str = "/v1"
43
+ API_PREFIX_TRENDS: str
44
+ DEBUG: bool = False
45
+
46
+ # QStash Settings
47
+ QSTASH_URL: str = "https://qstash.upstash.io"
48
+ QSTASH_TOKEN: str
49
+ QSTASH_CURRENT_SIGNING_KEY: str
50
+ QSTASH_NEXT_SIGNING_KEY: str
51
+
52
+ # Base URL for the application, used for constructing callback URLs
53
+ BASE_URL: str = "http://localhost:8000"
54
+
55
+ # Display Settings
56
+ HOME_PAGE_ENTITIES_LIMIT: int = 10
57
+ REPRESENTATIVE_COMMENTS_LIMIT: int = 3
58
+
59
+ # On-Demand Use Case
60
+ ON_DEMAND_COMMENTS_PER_VIDEO: int = 100
61
+ ON_DEMAND_TOTAL_COMMENTS: int = 500
62
+
63
+ # Pydantic model configuration to load from .env file
64
+ model_config = SettingsConfigDict(
65
+ env_file=".env", env_file_encoding="utf-8", extra="ignore"
66
+ )
67
+
68
+ @property
69
+ def MONGODB_CONNECTION_STRING(self) -> str:
70
+ """
71
+ Constructs and returns the full MongoDB connection string,
72
+ properly escaping the username and password.
73
+ """
74
+ # Escape username and password to handle special characters
75
+ escaped_usr = quote_plus(self.MONGODB_USR)
76
+ escaped_pwd = quote_plus(self.MONGODB_PWD)
77
+
78
+ # Replace placeholders in the URI
79
+ return self.MONGODB_URI.replace("<username>", escaped_usr).replace(
80
+ "<password>", escaped_pwd
81
+ )
82
+
83
+
84
+ # Create a single instance of the Settings class
85
+ settings = Settings()
app/core/db.py ADDED
File without changes
app/main.py ADDED
@@ -0,0 +1,37 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI
2
+ from fastapi.middleware.cors import CORSMiddleware
3
+
4
+ from app.core.config import settings
5
+ from app.api.endpoints import analysis
6
+
7
+ app = FastAPI(
8
+ title="Sentiment Analysis API",
9
+ description="API for analyzing sentiment of trending topics.",
10
+ version="1.0.0",
11
+ docs_url="/docs",
12
+ redoc_url="/redoc"
13
+ )
14
+
15
+ app.add_middleware(
16
+ CORSMiddleware,
17
+ allow_origins=["*"],
18
+ allow_credentials=True,
19
+ allow_methods=["*"],
20
+ allow_headers=["*"]
21
+ )
22
+
23
+ app.include_router(analysis.router, prefix=f"{settings.API_PREFIX}{settings.API_VERSION}", tags=["Analysis"])
24
+
25
+
26
+ @app.get("/", tags=["Root"])
27
+ def read_root() -> dict:
28
+ """
29
+ Root endpoint to check if the API is running.
30
+ """
31
+ return {"message": "Welcome to the Sentiment Analysis API"}
32
+
33
+
34
+ if __name__ == '__main__':
35
+ import uvicorn
36
+
37
+ uvicorn.run("app.main:app", host='0.0.0.0', port=8000, reload=True)
app/schemas/__init__.py ADDED
File without changes
app/schemas/analysis_schema.py ADDED
@@ -0,0 +1,117 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import List, Literal, Union
2
+ from pydantic import BaseModel, Field
3
+
4
+
5
+ # --- Schemas for Weekly Trends ---
6
+ class AnalysisResultSchema(BaseModel):
7
+ """
8
+ Represents the aggregated analysis counts for an entity.
9
+ """
10
+
11
+ total_comments: int = 0
12
+ positive_count: int = 0
13
+ neutral_count: int = 0
14
+ negative_count: int = 0
15
+
16
+
17
+ class WeeklyTrendResponseSchema(BaseModel):
18
+ """
19
+ Represents a single trending entity in the weekly results API response.
20
+ """
21
+
22
+ entity_id: str = Field(..., alias="_id")
23
+ keyword: str
24
+ thumbnail_url: str | None = None
25
+ analysis: AnalysisResultSchema
26
+
27
+
28
+ class WeeklyTrendListResponse(BaseModel):
29
+ """
30
+ Represents the top-level API response for the weekly trends endpoint.
31
+ """
32
+
33
+ data: List[WeeklyTrendResponseSchema]
34
+
35
+
36
+ # --- Schemas for Trend Detail ---
37
+ class InterestOverTimePoint(BaseModel):
38
+ """
39
+ Represents a single data point in the interest over time chart.
40
+ """
41
+
42
+ date: str
43
+ value: int
44
+
45
+
46
+ class CommentSchema(BaseModel):
47
+ """
48
+ Represents a single representative comment.
49
+ """
50
+
51
+ publish_date: str
52
+ text: str
53
+ author: str
54
+
55
+
56
+ class RepresentativeCommentsSchema(BaseModel):
57
+ """
58
+ Holds lists of representative comments for each sentiment.
59
+ """
60
+
61
+ positive: List[CommentSchema]
62
+ neutral: List[CommentSchema]
63
+ negative: List[CommentSchema]
64
+
65
+
66
+ class TrendDetailResponseSchema(WeeklyTrendResponseSchema):
67
+ """
68
+ Represents the full detailed response for a single entity,
69
+ inheriting basic fields from WeeklyTrendResponseSchema.
70
+ """
71
+
72
+ representative_video_url: str | None = None
73
+ interest_over_time: List[InterestOverTimePoint]
74
+ representative_comments: RepresentativeCommentsSchema
75
+
76
+
77
+ # --- Schemas for On-Demand Analysis ---
78
+ class OnDemandRequestSchema(BaseModel):
79
+ """
80
+ Defines the request body for an on-demand analysis request from a user.
81
+ """
82
+
83
+ keyword: str
84
+
85
+
86
+ # Define two distinct response schemas
87
+ class QueuedResponseSchema(BaseModel):
88
+ """
89
+ Response when a new job is successfully queued.
90
+ """
91
+
92
+ status: Literal["queued"]
93
+ job_id: str
94
+
95
+
96
+ class FoundResponseSchema(BaseModel):
97
+ """
98
+ Response when an existing analysis is found.
99
+ """
100
+
101
+ status: Literal["found"]
102
+ entity_id: str
103
+
104
+
105
+ # The final response model is a Union of the two possibilities
106
+ OnDemandResponseSchema = Union[QueuedResponseSchema, FoundResponseSchema]
107
+
108
+
109
+ class JobStatusResponseSchema(BaseModel):
110
+ """
111
+ Represents the status of an on-demand job.
112
+ """
113
+
114
+ job_id: str = Field(..., alias="_id")
115
+ status: str
116
+ keyword: str
117
+ result: TrendDetailResponseSchema | None = None
app/scripts/__init__.py ADDED
File without changes
app/scripts/consumer_job.py ADDED
@@ -0,0 +1,222 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Any, List, Dict
2
+ import json
3
+ import time
4
+ from datetime import datetime
5
+ from confluent_kafka import Consumer, KafkaError, Message
6
+
7
+ from pymongo import MongoClient
8
+ from pymongo.database import Database
9
+ from bson import ObjectId
10
+
11
+ from app.core.config import settings
12
+ from app.services.sentiment_service import SentimentService
13
+
14
+
15
+ def process_message_batch(
16
+ batch: List[Message],
17
+ sentiment_service: SentimentService,
18
+ db: Database,
19
+ ) -> None:
20
+ """
21
+ Processes a batch of Kafka messages: performs sentiment analysis and updates all database collections.
22
+ """
23
+ if not batch:
24
+ return
25
+
26
+ print(f"Processing a batch of {len(batch)} messages...")
27
+
28
+ # --- 1. Prepare data for the model ---
29
+ messages_data: List[Dict[str, Any]] = []
30
+ texts_to_predict: List[str] = []
31
+
32
+ for msg in batch:
33
+ message_data = json.loads(msg.value().decode("utf-8"))
34
+ messages_data.append(message_data)
35
+ texts_to_predict.append(
36
+ message_data.get("video_and_comment_data", {}).get("text", "")
37
+ )
38
+
39
+ if not texts_to_predict:
40
+ print("Batch contains only empty comments after preprocessing. Skipping.")
41
+ return
42
+
43
+ # --- 2. Perform Batch Sentiment Analysis ---
44
+ predictions = sentiment_service.predict_batch(texts_to_predict)
45
+
46
+ # --- 3. Save data to Database ---
47
+ video_id_cache: Dict[str, ObjectId] = {}
48
+ comments_to_insert: List[Dict[str, Any]] = []
49
+
50
+ for message_data, prediction in zip(messages_data, predictions):
51
+ entity_keyword = message_data.get("entity_keyword")
52
+ entity_thumbnail = message_data.get("entity_thumbnail_url")
53
+ entity_video_url = message_data.get("entity_video_url")
54
+ entity_volume = message_data.get("entity_volume")
55
+ interest_data = message_data.get("interest_over_time")
56
+ data = message_data.get("video_and_comment_data", {})
57
+
58
+ video_id = data.get("video_id")
59
+ video_title = data.get("video_title")
60
+ video_publish_date_str = data.get("video_publish_date")
61
+ video_url = data.get("video_url")
62
+
63
+ sentiment_label = prediction["label"].lower()
64
+
65
+ if not all([entity_keyword, entity_volume is not None, video_id]):
66
+ continue
67
+
68
+ # 3a. Upsert Entity and get its ID
69
+ entity_doc = db.entities.find_one_and_update(
70
+ {"keyword": entity_keyword},
71
+ {
72
+ "$set": {
73
+ "volume": entity_volume,
74
+ "thumbnail_url": entity_thumbnail,
75
+ "video_url": entity_video_url,
76
+ },
77
+ "$setOnInsert": {
78
+ "keyword": entity_keyword,
79
+ "geo": settings.FETCH_TRENDS_GEO,
80
+ # "volume": entity_volume,
81
+ # "thumbnail_url": entity_thumbnail,
82
+ # "video_url": entity_video_url,
83
+ "start_date": datetime.now(),
84
+ },
85
+ },
86
+ upsert=True,
87
+ return_document=True,
88
+ )
89
+ entity_id = entity_doc["_id"]
90
+
91
+ # 3b. Upsert Source Video and get its ID (with in-batch caching)
92
+ source_id: ObjectId | None = video_id_cache.get(video_id)
93
+ if not source_id:
94
+ source_doc = db.sources_youtube.find_one_and_update(
95
+ {"video_id": video_id},
96
+ {
97
+ "$set": {"entity_id": entity_id},
98
+ "$setOnInsert": {
99
+ "entity_id": entity_id,
100
+ "video_id": video_id,
101
+ "url": video_url,
102
+ "title": video_title,
103
+ "publish_date": datetime.strptime(
104
+ video_publish_date_str, "%Y-%m-%dT%H:%M:%SZ"
105
+ ),
106
+ },
107
+ },
108
+ upsert=True,
109
+ return_document=True,
110
+ )
111
+ source_id = source_doc["_id"]
112
+ video_id_cache[video_id] = source_id
113
+
114
+ # 3c. Prepare comment for bulk insertion
115
+ comments_to_insert.append(
116
+ {
117
+ "source_id": source_id,
118
+ "comment_id": data.get("comment_id"),
119
+ "text": data.get("text"),
120
+ "author": data.get("author"),
121
+ "publish_date": datetime.strptime(
122
+ data.get("publish_date"), "%Y-%m-%dT%H:%M:%SZ"
123
+ ),
124
+ "sentiment": sentiment_label,
125
+ }
126
+ )
127
+
128
+ # 3d. Update Aggregated Analysis Result
129
+ db.analysis_results.update_one(
130
+ {"entity_id": entity_id},
131
+ {
132
+ "$inc": {
133
+ f"results.{sentiment_label}_count": 1,
134
+ "results.total_comments": 1,
135
+ },
136
+ "$setOnInsert": {
137
+ "entity_id": entity_id,
138
+ "analysis_type": "weekly",
139
+ "created_at": datetime.now(),
140
+ "status": "completed",
141
+ "interest_over_time": interest_data,
142
+ },
143
+ },
144
+ upsert=True,
145
+ )
146
+
147
+ # 3e. Bulk insert all comments from the batch
148
+ if comments_to_insert:
149
+ db.comments_youtube.insert_many(comments_to_insert)
150
+ print(f"Inserted {len(comments_to_insert)} raw comments into database.")
151
+
152
+
153
+ def run_consumer_job() -> None:
154
+ """
155
+ This job consumes raw comments from Kafka in batches, performs sentiment analysis,
156
+ and saves the results into MongoDB.
157
+ """
158
+ # --- 1. Initialization ---
159
+ print("Initializing services...")
160
+ sentiment_service = SentimentService()
161
+ mongo_client = MongoClient(settings.MONGODB_CONNECTION_STRING)
162
+ db = mongo_client[settings.DB_NAME]
163
+
164
+ kafka_conf = {
165
+ "bootstrap.servers": "localhost:9092",
166
+ "group.id": "sentiment_analyzer_group",
167
+ "auto.offset.reset": "earliest",
168
+ "enable.auto.commit": False,
169
+ }
170
+ consumer = Consumer(kafka_conf)
171
+ consumer.subscribe([settings.KAFKA_TOPIC])
172
+
173
+ print("Consumer job started. Waiting for messages...")
174
+
175
+ # --- 2. Batch Processing Loop ---
176
+ message_batch: List[Message] = []
177
+ last_process_time = time.time()
178
+
179
+ try:
180
+ while True:
181
+ msg = consumer.poll(timeout=1.0)
182
+
183
+ if msg is None:
184
+ # No new message, check for timeout
185
+ if message_batch and (
186
+ time.time() - last_process_time
187
+ > settings.CONSUMER_BATCH_TIMEOUT_SECONDS
188
+ ):
189
+ process_message_batch(message_batch, sentiment_service, db)
190
+ consumer.commit(message=msg, asynchronous=False)
191
+
192
+ message_batch.clear()
193
+ last_process_time = time.time()
194
+ continue
195
+
196
+ if msg.error():
197
+ # Handle Kafka errors
198
+ if msg.error().code() != KafkaError._PARTITION_EOF:
199
+ print(f"Kafka error: {msg.error()}")
200
+ continue
201
+
202
+ # Add message to batch and check if batch is full
203
+ message_batch.append(msg)
204
+ if len(message_batch) >= settings.CONSUMER_BATCH_SIZE:
205
+ process_message_batch(message_batch, sentiment_service, db)
206
+ consumer.commit(message=msg, asynchronous=False)
207
+
208
+ message_batch.clear()
209
+ last_process_time = time.time()
210
+
211
+ except KeyboardInterrupt:
212
+ print("Stopping consumer job...")
213
+ # Process any remaining messages in the batch before exiting
214
+ process_message_batch(message_batch, sentiment_service, db)
215
+ finally:
216
+ consumer.close()
217
+ mongo_client.close()
218
+ print("Consumer and DB connection closed.")
219
+
220
+
221
+ if __name__ == "__main__":
222
+ run_consumer_job()
app/scripts/producer_job.py ADDED
@@ -0,0 +1,155 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import pandas as pd
3
+ from typing import Any, List, Dict
4
+ from datetime import datetime as dt, timedelta
5
+ from trendspy import Trends
6
+ from confluent_kafka import Producer
7
+
8
+ from app.core.config import settings
9
+ from app.services.youtube_service import YouTubeService
10
+
11
+
12
+ def get_rfc_time_ago(days: int) -> str:
13
+ """
14
+ Calculates the datetime N days ago from now and returns it in RFC 3339 format.
15
+ """
16
+ time_ago = dt.now() - timedelta(days=days)
17
+ return time_ago.strftime("%Y-%m-%dT%H:%M:%SZ")
18
+
19
+
20
+ def run_producer_job() -> None:
21
+ """
22
+ This job fetches trending topics, searches for related YouTube videos,
23
+ collects comments from those videos, and sends them to a Kafka topic.
24
+ It uses a contextual query building strategy for better search accuracy.
25
+ """
26
+ # --- 1. Initialization ---
27
+ yt_service = YouTubeService(api_key=settings.YT_API_KEY)
28
+
29
+ kafka_conf = {"bootstrap.servers": "localhost:9092"}
30
+ producer = Producer(kafka_conf)
31
+
32
+ kafka_topic = settings.KAFKA_TOPIC
33
+ print("Producer job started...")
34
+
35
+ # --- 2. Get Trending Entities ---
36
+ try:
37
+ trends_client = Trends()
38
+ trends = trends_client.trending_now(
39
+ geo=settings.FETCH_TRENDS_GEO, hours=24 * settings.FETCH_TRENDS_WITHIN_DAYS
40
+ )
41
+ trends.sort(key=lambda item: item.volume, reverse=True)
42
+ top_trends = trends[: settings.FETCH_NUM_ENTITIES]
43
+ print(f"Successfully fetched {len(top_trends)} trending entities.")
44
+ except Exception as e:
45
+ print(f"Failed to fetch trends: {e}")
46
+ return
47
+
48
+ time_filter = get_rfc_time_ago(days=settings.FETCH_TRENDS_WITHIN_DAYS)
49
+
50
+ # --- 3. Process Each Entity ---
51
+ for trend in top_trends:
52
+ entity_keyword = trend.keyword
53
+ print(f"\n--- Processing entity: {entity_keyword} ---")
54
+
55
+ # --- Fetch and process interest over time data ---
56
+ interest_data = []
57
+ try:
58
+ trends_client_for_interest = Trends()
59
+ df = trends_client_for_interest.interest_over_time(
60
+ keywords=[entity_keyword], timeframe=f"now {settings.FETCH_TRENDS_WITHIN_DAYS}-d"
61
+ )
62
+ if not df.empty:
63
+ daily_df = df[[entity_keyword]].resample('D').mean().round(0).astype(int)
64
+ interest_data = [
65
+ {"date": index.strftime('%Y-%m-%d'), "value": int(row.iloc[0])}
66
+ for index, row in daily_df.iterrows()
67
+ ]
68
+ print(f"Successfully fetched interest over time data for '{entity_keyword}'.")
69
+ except Exception as e:
70
+ print(f"Could not fetch interest over time data for '{entity_keyword}': {e}")
71
+
72
+ # --- 3a. Contextual Query Building ---
73
+ # Sort related keywords by length to prioritize more specific ones
74
+ related_keywords = sorted(trend.trend_keywords, key=len, reverse=True)
75
+ # Combine the main keyword with the top 2 longest related keywords
76
+ keywords_to_search = [entity_keyword] + related_keywords[:2]
77
+ # Remove duplicates
78
+ keywords_to_search = list(dict.fromkeys(keywords_to_search))
79
+ # Create a query like: '"long keyword" OR "main keyword"'
80
+ query_string = " OR ".join([f'"{k}"' for k in keywords_to_search])
81
+
82
+ print(f"Constructed query: {query_string}")
83
+
84
+ # --- 3b. Search and Get Thumbnail ---
85
+ videos = yt_service.search_videos(
86
+ query_string=query_string, published_after=time_filter
87
+ )
88
+
89
+ if not videos:
90
+ print(f"No videos found for '{entity_keyword}'. Skipping...")
91
+ continue
92
+
93
+ first_video = videos[0]
94
+ entity_thumbnail_url = first_video.get("snippet", {}).get("thumbnails", {}).get("high", {}).get("url")
95
+ # Construct the representative video URL
96
+ video_id = first_video.get("id", {}).get("videoId", "")
97
+ entity_video_url = f"https://www.youtube.com/watch?v={video_id}" if video_id else None
98
+
99
+ # --- 3c. Fetch Comments with Smart Sampling ---
100
+ comments_for_entity: List[Dict[str, Any]] = []
101
+ for video in videos:
102
+ video_id = video.get("id", {}).get("videoId")
103
+ snippet = video.get("snippet", {})
104
+ if not video_id or not snippet:
105
+ continue
106
+
107
+ print(
108
+ f"Fetching comments from video: {snippet.get('title', 'N/A')[:50]}..."
109
+ )
110
+ comments = yt_service.fetch_comments(
111
+ video_id=video_id, limit=settings.FETCH_COMMENTS_PER_VIDEO
112
+ )
113
+
114
+ for comment in comments:
115
+ comment["video_id"] = video_id
116
+ comment["video_title"] = snippet.get("title")
117
+ comment["video_publish_date"] = snippet.get("publishedAt")
118
+ comment["video_url"] = f"https://www.youtube.com/watch?v={video_id}"
119
+
120
+ comments_for_entity.extend(comments)
121
+
122
+ if len(comments_for_entity) >= settings.FETCH_TOTAL_COMMENTS_PER_ENTITY:
123
+ break
124
+
125
+ final_comments = comments_for_entity[: settings.FETCH_TOTAL_COMMENTS_PER_ENTITY]
126
+
127
+ # --- 4. Produce Messages to Kafka ---
128
+ if not final_comments:
129
+ print(f"No comments collected for '{entity_keyword}'.")
130
+ continue
131
+
132
+ print(
133
+ f"Producing {len(final_comments)} comments for '{entity_keyword}' to Kafka topic '{kafka_topic}'..."
134
+ )
135
+ for comment in final_comments:
136
+ message_payload = {
137
+ "entity_keyword": entity_keyword,
138
+ "entity_thumbnail_url": entity_thumbnail_url,
139
+ "entity_video_url": entity_video_url,
140
+ "entity_volume": trend.volume,
141
+ "interest_over_time": interest_data,
142
+ "video_and_comment_data": comment,
143
+ }
144
+ producer.produce(
145
+ kafka_topic,
146
+ key=entity_keyword.encode("utf-8"),
147
+ value=json.dumps(message_payload).encode("utf-8"),
148
+ )
149
+
150
+ producer.flush()
151
+ print("\nProducer job finished. All messages flushed to Kafka.")
152
+
153
+
154
+ if __name__ == "__main__":
155
+ run_producer_job()
app/services/__init__.py ADDED
File without changes
app/services/sentiment_service.py ADDED
@@ -0,0 +1,103 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import List, Dict, Any
2
+ from app.core.config import settings
3
+
4
+ import torch
5
+ import numpy as np
6
+ from scipy.special import softmax
7
+ from transformers import AutoModelForSequenceClassification, AutoTokenizer, AutoConfig
8
+
9
+
10
+ class SentimentService:
11
+ """
12
+ A service for loading the sentiment analysis model and performing predictions.
13
+ """
14
+
15
+ def __init__(self) -> None:
16
+ """
17
+ Initialize the service by loading the sentiment analysis model and tokenizer.
18
+ """
19
+ # Select device (GPU if available, otherwise CPU)
20
+ self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
21
+ if self.device.type == "cuda":
22
+ print(
23
+ f"GPU found: {torch.cuda.get_device_name(0)}. Loading model onto GPU."
24
+ )
25
+ else:
26
+ print("GPU not found. Loading model onto CPU.")
27
+
28
+ # Load model, tokenizer, and config (for id2label mapping)
29
+ model_name = settings.SENTIMENT_MODEL
30
+ self.tokenizer = AutoTokenizer.from_pretrained(model_name)
31
+ self.config = AutoConfig.from_pretrained(model_name)
32
+ self.model = AutoModelForSequenceClassification.from_pretrained(model_name).to(
33
+ self.device
34
+ )
35
+ self.model.eval() # set model to inference mode
36
+ print("Sentiment model loaded successfully.")
37
+
38
+ def _preprocess_text(self, text: str) -> str:
39
+ """
40
+ Replace @user mentions and http links with placeholders.
41
+ """
42
+ if not isinstance(text, str):
43
+ return ""
44
+ new_text = []
45
+ for t in text.split(" "):
46
+ t = "@user" if t.startswith("@") and len(t) > 1 else t
47
+ t = "http" if t.startswith("http") else t
48
+ new_text.append(t)
49
+ return " ".join(new_text)
50
+
51
+ def predict_batch(self, texts: List[str]) -> List[Dict[str, Any]]:
52
+ """
53
+ Predict sentiment for a batch of texts (batch size is assumed to be small).
54
+ """
55
+ # Preprocess all texts
56
+ preprocessed_texts = [self._preprocess_text(text) for text in texts]
57
+
58
+ # Keep only non-empty texts and remember their original indices
59
+ non_empty_texts_with_indices = [
60
+ (i, text) for i, text in enumerate(preprocessed_texts) if text.strip()
61
+ ]
62
+ if not non_empty_texts_with_indices:
63
+ return []
64
+
65
+ indices, texts_to_predict = zip(*non_empty_texts_with_indices)
66
+
67
+ # Tokenize the batch
68
+ encoded_inputs = self.tokenizer(
69
+ list(texts_to_predict),
70
+ return_tensors="pt",
71
+ padding=True,
72
+ truncation=True,
73
+ max_length=512,
74
+ ).to(self.device)
75
+
76
+ # Run inference
77
+ with torch.no_grad():
78
+ outputs = self.model(**encoded_inputs)
79
+ logits = outputs.logits.detach().cpu().numpy()
80
+
81
+ # Explicitly clear intermediate tensors from VRAM
82
+ del encoded_inputs, outputs
83
+ torch.cuda.empty_cache()
84
+
85
+ # Apply softmax to get probabilities
86
+ probs = softmax(logits, axis=1)
87
+
88
+ # Map predictions to labels with highest probability
89
+ predictions = []
90
+ for prob in probs:
91
+ max_idx = int(np.argmax(prob))
92
+ predictions.append(
93
+ {"label": self.config.id2label[max_idx], "score": float(prob[max_idx])}
94
+ )
95
+
96
+ # Map predictions back to their original positions
97
+ final_results: List[Dict[str, Any] | None] = [None] * len(texts)
98
+ for original_index, prediction in zip(indices, predictions):
99
+ final_results[original_index] = prediction
100
+
101
+ # Replace None results with a default neutral prediction
102
+ default_prediction = {"label": "neutral", "score": 1.0}
103
+ return [res if res is not None else default_prediction for res in final_results]
app/services/youtube_service.py ADDED
@@ -0,0 +1,106 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Any, List, Dict
2
+ from app.core.config import settings
3
+
4
+ from googleapiclient.discovery import build, Resource
5
+ from googleapiclient.errors import HttpError
6
+
7
+
8
+ class YouTubeService:
9
+ """
10
+ A service class for interacting with the YouTube Data API.
11
+ """
12
+
13
+ def __init__(self, api_key: str) -> None:
14
+ self.api_key: str = api_key
15
+ try:
16
+ self.youtube: Resource = build("youtube", "v3", developerKey=api_key)
17
+ except Exception as e:
18
+ print(f"Failed to build Youtube service: {e}")
19
+ raise
20
+
21
+ def search_videos(
22
+ self, query_string: str, published_after: str | None = None
23
+ ) -> List[Dict[str, Any]]:
24
+ """
25
+ Searches for videos related to a keyword.
26
+ """
27
+ try:
28
+ search_request = self.youtube.search().list(
29
+ q=query_string,
30
+ part="snippet",
31
+ type="video",
32
+ maxResults=settings.FETCH_VIDEOS_PER_ENTITY,
33
+ regionCode="US",
34
+ relevanceLanguage="en",
35
+ publishedAfter=published_after,
36
+ order="relevance",
37
+ )
38
+
39
+ response = search_request.execute()
40
+ return response.get("items", [])
41
+
42
+ except HttpError as e:
43
+ print(
44
+ f"An HTTP error {e.resp.status} occurred during video search: {e.content}"
45
+ )
46
+ return []
47
+ except Exception as e:
48
+ print(f"An unexpected error occurred during video search: {e}")
49
+ return []
50
+
51
+ def fetch_comments(self, video_id: str, limit: int = 100) -> List[Dict[str, Any]]:
52
+ """
53
+ Fetches top-level comments for a given video ID using pagination
54
+ until the specified limit is reached or there are no more comments.
55
+ """
56
+ comments: List[Dict[str, Any]] = []
57
+ next_page_token: str | None = None
58
+
59
+ try:
60
+ while len(comments) < limit:
61
+ response = (
62
+ self.youtube.commentThreads()
63
+ .list(
64
+ part="snippet",
65
+ videoId=video_id,
66
+ maxResults=100,
67
+ textFormat="plainText",
68
+ order="relevance", # Fetch most relevant comments first
69
+ pageToken=next_page_token,
70
+ )
71
+ .execute()
72
+ )
73
+
74
+ for item in response.get("items", []):
75
+ snippet = (
76
+ item.get("snippet", {})
77
+ .get("topLevelComment", {})
78
+ .get("snippet", {})
79
+ )
80
+ if snippet:
81
+ comment_data = {
82
+ "comment_id": item.get("id"),
83
+ "text": snippet.get("textDisplay"),
84
+ "author": snippet.get("authorDisplayName"),
85
+ "publish_date": snippet.get("publishedAt"),
86
+ }
87
+ comments.append(comment_data)
88
+
89
+ next_page_token = response.get("nextPageToken")
90
+ if not next_page_token:
91
+ # No more pages of comments
92
+ break
93
+
94
+ except HttpError as e:
95
+ # It's common for comments to be disabled, so we'll log it but not treat as a fatal error.
96
+ if "commentsDisabled" in str(e.content):
97
+ print(f"Comments are disabled for video {video_id}.")
98
+ else:
99
+ print(
100
+ f"An HTTP error {e.resp.status} occurred fetching comments: {e.content}"
101
+ )
102
+ except Exception as e:
103
+ print(f"An unexpected error occurred fetching comments: {e}")
104
+
105
+ # Always return the comments collected so far, truncated to the limit
106
+ return comments[:limit]
requirements.txt ADDED
@@ -0,0 +1,20 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ aiokafka==0.12.0
2
+ confluent-kafka==2.11.1
3
+ datasets==4.0.0
4
+ fastapi[all]==0.116.1
5
+ google-api-python-client==2.181.0
6
+ google-auth-oauthlib==1.2.2
7
+ httpx==0.28.1
8
+ motor==3.7.1
9
+ numpy==2.2.6
10
+ pydantic==2.11.8
11
+ pymongo==4.14.0
12
+ pyngrok==7.3.0
13
+ python-dotenv==1.1.1
14
+ qstash==3.0.0
15
+ requests==2.32.5
16
+ scipy==1.16.2
17
+ transformers==4.56.1
18
+ trendspy==0.1.6
19
+ uvicorn==0.35.0
20
+ torch==2.6.0