SHAFI commited on
Commit
3690599
·
1 Parent(s): 3d33162

added migration for likes, views, dislikes, seperate collection for cloud news, route news to new collection

Browse files
app/config.py CHANGED
@@ -50,12 +50,13 @@ class Settings(BaseSettings):
50
  # Frontend URL (for unsubscribe links)
51
  FRONTEND_URL: str = "https://segmento.in"
52
 
53
- # Appwrite Database (Phase 2)
54
  APPWRITE_ENDPOINT: str = "https://nyc.cloud.appwrite.io/v1"
55
  APPWRITE_PROJECT_ID: str = ""
56
  APPWRITE_API_KEY: str = ""
57
  APPWRITE_DATABASE_ID: str = "segmento_db"
58
- APPWRITE_COLLECTION_ID: str = "articles"
 
59
 
60
  # Admin Alerting (Optional - Discord/Slack webhook URL)
61
  ADMIN_WEBHOOK_URL: Optional[str] = None
 
50
  # Frontend URL (for unsubscribe links)
51
  FRONTEND_URL: str = "https://segmento.in"
52
 
53
+ # Appwrite Database
54
  APPWRITE_ENDPOINT: str = "https://nyc.cloud.appwrite.io/v1"
55
  APPWRITE_PROJECT_ID: str = ""
56
  APPWRITE_API_KEY: str = ""
57
  APPWRITE_DATABASE_ID: str = "segmento_db"
58
+ APPWRITE_COLLECTION_ID: str = "articles" # Regular articles
59
+ APPWRITE_CLOUD_COLLECTION_ID: str = "" # Phase 3: Cloud news (to be created)
60
 
61
  # Admin Alerting (Optional - Discord/Slack webhook URL)
62
  ADMIN_WEBHOOK_URL: Optional[str] = None
app/main.py CHANGED
@@ -70,6 +70,10 @@ app.include_router(analytics.router, prefix="/api/analytics", tags=["Analytics"]
70
  app.include_router(subscription.router, tags=["Subscription"])
71
  app.include_router(admin.router, prefix="/api/admin", tags=["Admin"])
72
 
 
 
 
 
73
  @app.get("/")
74
  async def root():
75
  """Root endpoint"""
 
70
  app.include_router(subscription.router, tags=["Subscription"])
71
  app.include_router(admin.router, prefix="/api/admin", tags=["Admin"])
72
 
73
+ # Phase 3: Engagement tracking
74
+ from app.routes import engagement
75
+ app.include_router(engagement.router, prefix="/api/engagement", tags=["Engagement"])
76
+
77
  @app.get("/")
78
  async def root():
79
  """Root endpoint"""
app/routes/engagement.py ADDED
@@ -0,0 +1,332 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Engagement API Endpoints
3
+ Handles article likes, views tracking, and trending articles
4
+ """
5
+
6
+ from fastapi import APIRouter, HTTPException, Depends
7
+ from typing import Optional
8
+ from app.services.appwrite_db import get_appwrite_db
9
+ from app.config import settings
10
+ from datetime import datetime, timedelta
11
+ import logging
12
+
13
+ logger = logging.getLogger(__name__)
14
+
15
+ router = APIRouter()
16
+
17
+
18
+ @router.post("/articles/{article_id}/like")
19
+ async def like_article(article_id: str):
20
+ """
21
+ Increment like count for an article.
22
+
23
+ Phase 3: Engagement tracking for article popularity.
24
+
25
+ Args:
26
+ article_id: Document ID from Appwrite
27
+
28
+ Returns:
29
+ Updated likes count
30
+ """
31
+ try:
32
+ appwrite_db = get_appwrite_db()
33
+
34
+ # Try regular articles collection first
35
+ collection_id = settings.APPWRITE_COLLECTION_ID
36
+
37
+ try:
38
+ doc = appwrite_db.databases.get_document(
39
+ database_id=settings.APPWRITE_DATABASE_ID,
40
+ collection_id=collection_id,
41
+ document_id=article_id
42
+ )
43
+ except:
44
+ # Try cloud articles collection
45
+ if settings.APPWRITE_CLOUD_COLLECTION_ID:
46
+ collection_id = settings.APPWRITE_CLOUD_COLLECTION_ID
47
+ doc = appwrite_db.databases.get_document(
48
+ database_id=settings.APPWRITE_DATABASE_ID,
49
+ collection_id=collection_id,
50
+ document_id=article_id
51
+ )
52
+ else:
53
+ raise HTTPException(status_code=404, detail="Article not found")
54
+
55
+ # Increment likes
56
+ current_likes = doc.get('likes', 0)
57
+ new_likes = current_likes + 1
58
+
59
+ # Update document
60
+ appwrite_db.databases.update_document(
61
+ database_id=settings.APPWRITE_DATABASE_ID,
62
+ collection_id=collection_id,
63
+ document_id=article_id,
64
+ data={"likes": new_likes}
65
+ )
66
+
67
+ logger.info(f"❤️ Article {article_id[:8]}... liked (total: {new_likes})")
68
+
69
+ return {
70
+ "article_id": article_id,
71
+ "likes": new_likes,
72
+ "success": True
73
+ }
74
+
75
+ except HTTPException:
76
+ raise
77
+ except Exception as e:
78
+ logger.error(f"Error liking article {article_id}: {e}")
79
+ raise HTTPException(status_code=500, detail=str(e))
80
+
81
+
82
+ @router.post("/articles/{article_id}/dislike")
83
+ async def dislike_article(article_id: str):
84
+ """
85
+ Increment dislike count for an article.
86
+
87
+ Phase 3: Engagement tracking for article feedback.
88
+
89
+ Args:
90
+ article_id: Document ID from Appwrite
91
+
92
+ Returns:
93
+ Updated dislikes count
94
+ """
95
+ try:
96
+ appwrite_db = get_appwrite_db()
97
+
98
+ # Try regular articles collection first
99
+ collection_id = settings.APPWRITE_COLLECTION_ID
100
+
101
+ try:
102
+ doc = appwrite_db.databases.get_document(
103
+ database_id=settings.APPWRITE_DATABASE_ID,
104
+ collection_id=collection_id,
105
+ document_id=article_id
106
+ )
107
+ except:
108
+ # Try cloud articles collection
109
+ if settings.APPWRITE_CLOUD_COLLECTION_ID:
110
+ collection_id = settings.APPWRITE_CLOUD_COLLECTION_ID
111
+ doc = appwrite_db.databases.get_document(
112
+ database_id=settings.APPWRITE_DATABASE_ID,
113
+ collection_id=collection_id,
114
+ document_id=article_id
115
+ )
116
+ else:
117
+ raise HTTPException(status_code=404, detail="Article not found")
118
+
119
+ # Increment dislikes
120
+ current_dislikes = doc.get('dislikes', 0)
121
+ new_dislikes = current_dislikes + 1
122
+
123
+ # Update document
124
+ appwrite_db.databases.update_document(
125
+ database_id=settings.APPWRITE_DATABASE_ID,
126
+ collection_id=collection_id,
127
+ document_id=article_id,
128
+ data={"dislikes": new_dislikes}
129
+ )
130
+
131
+ logger.info(f"👎 Article {article_id[:8]}... disliked (total: {new_dislikes})")
132
+
133
+ return {
134
+ "article_id": article_id,
135
+ "dislikes": new_dislikes,
136
+ "success": True
137
+ }
138
+
139
+ except HTTPException:
140
+ raise
141
+ except Exception as e:
142
+ logger.error(f"Error disliking article {article_id}: {e}")
143
+ raise HTTPException(status_code=500, detail=str(e))
144
+
145
+
146
+ @router.post("/articles/{article_id}/view")
147
+ async def track_view(article_id: str):
148
+ """
149
+ Increment view count for an article.
150
+
151
+ Phase 3: Track article views for analytics.
152
+
153
+ Args:
154
+ article_id: Document ID from Appwrite
155
+
156
+ Returns:
157
+ Updated views count
158
+ """
159
+ try:
160
+ appwrite_db = get_appwrite_db()
161
+
162
+ # Try regular articles collection first
163
+ collection_id = settings.APPWRITE_COLLECTION_ID
164
+
165
+ try:
166
+ doc = appwrite_db.databases.get_document(
167
+ database_id=settings.APPWRITE_DATABASE_ID,
168
+ collection_id=collection_id,
169
+ document_id=article_id
170
+ )
171
+ except:
172
+ # Try cloud articles collection
173
+ if settings.APPWRITE_CLOUD_COLLECTION_ID:
174
+ collection_id = settings.APPWRITE_CLOUD_COLLECTION_ID
175
+ doc = appwrite_db.databases.get_document(
176
+ database_id=settings.APPWRITE_DATABASE_ID,
177
+ collection_id=collection_id,
178
+ document_id=article_id
179
+ )
180
+ else:
181
+ raise HTTPException(status_code=404, detail="Article not found")
182
+
183
+ # Increment views
184
+ current_views = doc.get('views', 0)
185
+ new_views = current_views + 1
186
+
187
+ # Update document
188
+ appwrite_db.databases.update_document(
189
+ database_id=settings.APPWRITE_DATABASE_ID,
190
+ collection_id=collection_id,
191
+ document_id=article_id,
192
+ data={"views": new_views}
193
+ )
194
+
195
+ # Log only every 10 views to avoid spam
196
+ if new_views % 10 == 0:
197
+ logger.info(f"👁️ Article {article_id[:8]}... reached {new_views} views")
198
+
199
+ return {
200
+ "article_id": article_id,
201
+ "views": new_views,
202
+ "success": True
203
+ }
204
+
205
+ except HTTPException:
206
+ raise
207
+ except Exception as e:
208
+ logger.error(f"Error tracking view for {article_id}: {e}")
209
+ raise HTTPException(status_code=500, detail=str(e))
210
+
211
+
212
+ @router.get("/articles/trending")
213
+ async def get_trending_articles(
214
+ hours: int = 24,
215
+ limit: int = 10,
216
+ cloud_only: bool = False
217
+ ):
218
+ """
219
+ Get trending articles based on views and likes.
220
+
221
+ Phase 3: Discover popular content.
222
+
223
+ Args:
224
+ hours: Time window for trending (default: 24 hours)
225
+ limit: Number of articles to return (default: 10)
226
+ cloud_only: Only return cloud articles (default: False)
227
+
228
+ Returns:
229
+ List of trending articles sorted by engagement
230
+ """
231
+ try:
232
+ from appwrite.query import Query
233
+
234
+ appwrite_db = get_appwrite_db()
235
+ cutoff = (datetime.now() - timedelta(hours=hours)).isoformat()
236
+
237
+ # Determine collection
238
+ if cloud_only and settings.APPWRITE_CLOUD_COLLECTION_ID:
239
+ collection_id = settings.APPWRITE_CLOUD_COLLECTION_ID
240
+ else:
241
+ collection_id = settings.APPWRITE_COLLECTION_ID
242
+
243
+ # Query articles, sorted by views (descending)
244
+ response = appwrite_db.databases.list_documents(
245
+ database_id=settings.APPWRITE_DATABASE_ID,
246
+ collection_id=collection_id,
247
+ queries=[
248
+ Query.greater_than('publishedAt', cutoff),
249
+ Query.order_desc('views'),
250
+ Query.limit(limit)
251
+ ]
252
+ )
253
+
254
+ articles = response['documents']
255
+
256
+ # Calculate engagement score (views + likes * 5 - dislikes * 3)
257
+ # Likes are weighted higher, dislikes have negative impact
258
+ for article in articles:
259
+ views = article.get('views', 0)
260
+ likes = article.get('likes', 0)
261
+ dislikes = article.get('dislikes', 0)
262
+ article['engagement_score'] = views + (likes * 5) - (dislikes * 3)
263
+
264
+ # Sort by engagement score
265
+ articles.sort(key=lambda x: x.get('engagement_score', 0), reverse=True)
266
+
267
+ logger.info(f"🔥 Trending: {len(articles)} articles in last {hours}h")
268
+
269
+ return {
270
+ "articles": articles[:limit],
271
+ "timeframe_hours": hours,
272
+ "cloud_only": cloud_only,
273
+ "total_count": len(articles)
274
+ }
275
+
276
+ except Exception as e:
277
+ logger.error(f"Error getting trending articles: {e}")
278
+ raise HTTPException(status_code=500, detail=str(e))
279
+
280
+
281
+ @router.get("/articles/popular-cloud")
282
+ async def get_popular_cloud_articles(provider: Optional[str] = None, limit: int = 10):
283
+ """
284
+ Get popular cloud articles, optionally filtered by provider.
285
+
286
+ Phase 3: Cloud-specific trending.
287
+
288
+ Args:
289
+ provider: Cloud provider (aws, azure, gcp, etc.) or None for all
290
+ limit: Number of articles (default: 10)
291
+
292
+ Returns:
293
+ Popular cloud articles
294
+ """
295
+ try:
296
+ from appwrite.query import Query
297
+
298
+ if not settings.APPWRITE_CLOUD_COLLECTION_ID:
299
+ raise HTTPException(status_code=404, detail="Cloud collection not configured")
300
+
301
+ appwrite_db = get_appwrite_db()
302
+
303
+ queries = [
304
+ Query.order_desc('views'),
305
+ Query.limit(limit)
306
+ ]
307
+
308
+ # Filter by provider if specified
309
+ if provider:
310
+ queries.insert(0, Query.equal('provider', provider))
311
+
312
+ response = appwrite_db.databases.list_documents(
313
+ database_id=settings.APPWRITE_DATABASE_ID,
314
+ collection_id=settings.APPWRITE_CLOUD_COLLECTION_ID,
315
+ queries=queries
316
+ )
317
+
318
+ articles = response['documents']
319
+
320
+ logger.info(f"☁️ Popular cloud articles: {len(articles)} (provider={provider or 'all'})")
321
+
322
+ return {
323
+ "articles": articles,
324
+ "provider": provider,
325
+ "total_count": len(articles)
326
+ }
327
+
328
+ except HTTPException:
329
+ raise
330
+ except Exception as e:
331
+ logger.error(f"Error getting popular cloud articles: {e}")
332
+ raise HTTPException(status_code=500, detail=str(e))
app/services/ingestion_v2.py CHANGED
@@ -34,8 +34,80 @@ logger = get_professional_logger(__name__)
34
  # ============================================================================
35
  # Space B Configuration
36
  # ============================================================================
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37
 
38
- SPACE_B_URL = "https://workwithshafisk-segmentopulse-factory.hf.space" # Note: Update this with your actual Space B URL
39
  SPACE_B_TIMEOUT = 30 # seconds (Llama-3 is slow on CPU)
40
 
41
 
@@ -313,26 +385,37 @@ async def fetch_latest_news(categories: List[str]) -> Dict[str, List[Article]]:
313
  # Article Processing with Space B + ChromaDB
314
  # ============================================================================
315
 
316
- async def process_and_store_article(url: str, raw_text: str, category: str, title: str = "") -> Optional[Dict]:
 
 
 
 
 
 
317
  """
318
- Phase 2 CQRS: Offload processing to Space B, then store in ChromaDB
319
 
320
  Architecture:
321
  1. Send raw_text to Space B's /process-article endpoint
322
  2. Receive summary + tags from Space B
323
- 3. Generate embeddings locally using sentence-transformers
324
- 4. Store in ChromaDB
 
 
325
 
326
  Args:
327
  url: Article URL (used as ID)
328
  raw_text: Full article content
329
  category: Article category
330
  title: Article title (optional)
 
331
 
332
  Returns:
333
  Dictionary with processing results or None on error
334
  """
335
  try:
 
 
336
  logger.space_b_call(url, "started")
337
 
338
  # -------------------------------------------------------------------------
@@ -377,57 +460,84 @@ async def process_and_store_article(url: str, raw_text: str, category: str, titl
377
  return None
378
 
379
  # -------------------------------------------------------------------------
380
- # Step 2: Generate embeddings locally with sentence-transformers
381
  # -------------------------------------------------------------------------
382
- # ChromaDB vector_store has embedded model (all-MiniLM-L6-v2)
383
- # We'll use the existing upsert_article method
 
 
 
384
 
385
  # -------------------------------------------------------------------------
386
- # Step 3: Prepare article data for ChromaDB
 
 
 
 
 
 
387
  # -------------------------------------------------------------------------
388
  url_hash = hashlib.md5(url.encode()).hexdigest()
389
 
 
 
 
390
  article_data = {
391
  "$id": url_hash,
392
- "title": title or summary[:100], # Use title if available, else first part of summary
393
- "description": summary,
 
 
394
  "url": url,
395
  "source": "Segmento AI",
396
  "category": category,
397
  "published_at": datetime.now().isoformat(),
398
  "image": "", # No image for now
399
- "tags": tags
 
 
 
 
 
 
 
 
 
 
 
 
400
  }
401
 
402
  # -------------------------------------------------------------------------
403
- # Step 4: Store in ChromaDB
404
  # -------------------------------------------------------------------------
405
- # Create combined text: Title + Summary + Tags (for richer embeddings)
406
  tags_text = " ".join(tags) if tags else ""
407
- combined_analysis = f"Summary: {summary}\nTags: {tags_text}"
408
 
409
  # Upsert to vector store (handles embedding generation internally)
410
  vector_store.upsert_article(article_data, combined_analysis)
411
  ingestion_stats.chromadb_upserts += 1
412
  ingestion_stats.articles_saved += 1
413
 
414
- logger.success(f"ChromaDB stored: {title[:50] if title else url[:50]}")
 
415
 
416
  return {
417
  "url": url,
418
- "summary": summary,
419
  "tags": tags,
 
 
420
  "stored": True
421
  }
422
 
423
  except Exception as e:
424
- logger.error(f"[CQRS] Processing failed for {url}: {e}")
425
  return None
426
 
427
 
428
  async def fetch_single_category(category: str) -> List[Article]:
429
- """
430
- Convenience function to fetch a single category
431
 
432
  Args:
433
  category: Category name
 
34
  # ============================================================================
35
  # Space B Configuration
36
  # ============================================================================
37
+ # Constants
38
+ SPACE_B_URL = "https://workwithshafisk-segmentopulse-factory.hf.space"
39
+
40
+ # Phase 3: Cloud News Categories
41
+ CLOUD_CATEGORIES = [
42
+ "cloud-aws",
43
+ "cloud-azure",
44
+ "cloud-gcp",
45
+ "cloud-oracle",
46
+ "cloud-ibm",
47
+ "cloud-alibaba",
48
+ "cloud-digitalocean",
49
+ "cloud-huawei",
50
+ "cloud-cloudflare",
51
+ "cloud-computing" # General cloud news
52
+ ]
53
+
54
+ # Phase 3: Official Cloud Provider Feeds
55
+ OFFICIAL_CLOUD_FEEDS = {
56
+ "https://aws.amazon.com/blogs/aws/feed/": ("aws", True),
57
+ "https://azure.microsoft.com/en-us/blog/feed/": ("azure", True),
58
+ "https://cloudblog.withgoogle.com/rss/": ("gcp", True),
59
+ "https://blogs.oracle.com/cloud-infrastructure/rss": ("oracle", True),
60
+ "https://www.ibm.com/blog/category/ibm-cloud/feed/": ("ibm", True),
61
+ "https://www.alibabacloud.com/blog/rss.xml": ("alibaba", True),
62
+ "https://www.digitalocean.com/blog/rss.xml": ("digitalocean", True),
63
+ "https://developer.huaweicloud.com/intl/en-us/feed": ("huawei", True),
64
+ "https://blog.cloudflare.com/rss/": ("cloudflare", True)
65
+ }
66
+
67
+
68
+ def determine_cloud_provider(category: str, source_feed: str) -> tuple:
69
+ """
70
+ Phase 3: Determine cloud provider and whether article is from official blog.
71
+
72
+ Args:
73
+ category: News category (e.g., "cloud-aws")
74
+ source_feed: RSS feed URL
75
+
76
+ Returns:
77
+ Tuple of (provider_name, is_official)
78
+
79
+ Examples:
80
+ ("aws", True) - From aws.amazon.com/blogs
81
+ ("azure", False) - From Google News about Azure
82
+ """
83
+ # Check if from official feed
84
+ if source_feed in OFFICIAL_CLOUD_FEEDS:
85
+ return OFFICIAL_CLOUD_FEEDS[source_feed]
86
+
87
+ # From news API - extract provider from category
88
+ if category.startswith('cloud-'):
89
+ provider = category.replace('cloud-', '')
90
+ return (provider, False)
91
+
92
+ return ("general", False)
93
+
94
+
95
+ def route_to_collection(category: str, config_obj) -> str:
96
+ """
97
+ Phase 3: Determine which Appwrite collection to use.
98
+
99
+ Args:
100
+ category: Article category
101
+ config_obj: Settings object with collection IDs
102
+
103
+ Returns:
104
+ Collection ID string
105
+ """
106
+ if category in CLOUD_CATEGORIES and config_obj.APPWRITE_CLOUD_COLLECTION_ID:
107
+ return config_obj.APPWRITE_CLOUD_COLLECTION_ID
108
+ else:
109
+ return config_obj.APPWRITE_COLLECTION_ID
110
 
 
111
  SPACE_B_TIMEOUT = 30 # seconds (Llama-3 is slow on CPU)
112
 
113
 
 
385
  # Article Processing with Space B + ChromaDB
386
  # ============================================================================
387
 
388
+ async def process_and_store_article(
389
+ url: str,
390
+ raw_text: str,
391
+ category: str,
392
+ title: str = "",
393
+ source_feed: str = ""
394
+ ) -> Optional[Dict]:
395
  """
396
+ Phase 3: Enhanced processing with cloud detection and engagement metrics
397
 
398
  Architecture:
399
  1. Send raw_text to Space B's /process-article endpoint
400
  2. Receive summary + tags from Space B
401
+ 3. Detect cloud provider and routing
402
+ 4. Add engagement metrics (likes, views)
403
+ 5. Generate embeddings locally using sentence-transformers
404
+ 6. Store in ChromaDB with rich metadata
405
 
406
  Args:
407
  url: Article URL (used as ID)
408
  raw_text: Full article content
409
  category: Article category
410
  title: Article title (optional)
411
+ source_feed: RSS feed URL (for cloud detection)
412
 
413
  Returns:
414
  Dictionary with processing results or None on error
415
  """
416
  try:
417
+ from app.utils import strip_html_if_needed, list_to_comma_separated
418
+
419
  logger.space_b_call(url, "started")
420
 
421
  # -------------------------------------------------------------------------
 
460
  return None
461
 
462
  # -------------------------------------------------------------------------
463
+ # Step 2: Phase 3 - Cloud Detection
464
  # -------------------------------------------------------------------------
465
+ is_cloud = category in CLOUD_CATEGORIES
466
+ provider, is_official = determine_cloud_provider(category, source_feed)
467
+
468
+ if is_cloud:
469
+ logger.info(f"☁️ Cloud article detected: {provider} (official={is_official})")
470
 
471
  # -------------------------------------------------------------------------
472
+ # Step 3: Phase 3 - HTML Stripping & Text Cleaning
473
+ # -------------------------------------------------------------------------
474
+ title_clean = strip_html_if_needed(title) if title else summary[:100]
475
+ summary_clean = strip_html_if_needed(summary)
476
+
477
+ # -------------------------------------------------------------------------
478
+ # Step 4: Prepare article data for ChromaDB with Phase 3 metadata
479
  # -------------------------------------------------------------------------
480
  url_hash = hashlib.md5(url.encode()).hexdigest()
481
 
482
+ # Convert tags list to comma-separated string
483
+ tags_str = list_to_comma_separated(tags)
484
+
485
  article_data = {
486
  "$id": url_hash,
487
+
488
+ # Core content (cleaned)
489
+ "title": title_clean,
490
+ "description": summary_clean,
491
  "url": url,
492
  "source": "Segmento AI",
493
  "category": category,
494
  "published_at": datetime.now().isoformat(),
495
  "image": "", # No image for now
496
+
497
+ # Phase 3: Tags from GLiNER
498
+ "tags": tags_str,
499
+
500
+ # Phase 3: Cloud detection
501
+ "is_cloud_news": is_cloud,
502
+ "cloud_provider": provider if is_cloud else "",
503
+ "is_official": is_official if is_cloud else False,
504
+
505
+ # Phase 3: Engagement metrics
506
+ "likes": 0,
507
+ "dislikes": 0,
508
+ "views": 0
509
  }
510
 
511
  # -------------------------------------------------------------------------
512
+ # Step 5: Store in ChromaDB with Phase 3 enhanced schema
513
  # -------------------------------------------------------------------------
514
+ # Create combined text for embedding: Title + Summary + Tags
515
  tags_text = " ".join(tags) if tags else ""
516
+ combined_analysis = f"Summary: {summary_clean}\nTags: {tags_text}"
517
 
518
  # Upsert to vector store (handles embedding generation internally)
519
  vector_store.upsert_article(article_data, combined_analysis)
520
  ingestion_stats.chromadb_upserts += 1
521
  ingestion_stats.articles_saved += 1
522
 
523
+ cloud_emoji = "☁️" if is_cloud else "📰"
524
+ logger.success(f"{cloud_emoji} ChromaDB stored: {title_clean[:50]}")
525
 
526
  return {
527
  "url": url,
528
+ "summary": summary_clean,
529
  "tags": tags,
530
+ "is_cloud": is_cloud,
531
+ "provider": provider if is_cloud else None,
532
  "stored": True
533
  }
534
 
535
  except Exception as e:
536
+ logger.error(f"[Phase 3 CQRS] Processing failed for {url}: {e}")
537
  return None
538
 
539
 
540
  async def fetch_single_category(category: str) -> List[Article]:
 
 
541
 
542
  Args:
543
  category: Category name
app/services/news_providers.py CHANGED
@@ -494,7 +494,7 @@ class OfficialCloudProvider(NewsProvider):
494
  'cloud-alibaba': 'https://www.alibabacloud.com/blog/feed',
495
  'cloud-digitalocean': 'https://www.digitalocean.com/blog/rss.xml',
496
  'cloud-cloudflare': 'https://blog.cloudflare.com/rss/',
497
- 'cloud-huawei': 'https://blog.huawei.com/feed/', # Generic Huawei blog often used
498
  }
499
 
500
  async def fetch_news(self, category: str, limit: int = 20) -> List[Article]:
 
494
  'cloud-alibaba': 'https://www.alibabacloud.com/blog/feed',
495
  'cloud-digitalocean': 'https://www.digitalocean.com/blog/rss.xml',
496
  'cloud-cloudflare': 'https://blog.cloudflare.com/rss/',
497
+ 'cloud-huawei': 'https://blog.huawei.com', # Generic Huawei blog often used
498
  }
499
 
500
  async def fetch_news(self, category: str, limit: int = 20) -> List[Article]:
app/services/scheduler.py CHANGED
@@ -417,7 +417,11 @@ async def cleanup_old_news():
417
  logger.info("📅 Cutoff Date: %s", cutoff_date.strftime('%Y-%m-%d %H:%M:%S'))
418
  logger.info("🗑️ Articles published before this will be deleted...")
419
 
420
- # Query and delete old articles
 
 
 
 
421
  logger.info("🔍 Querying Appwrite for old articles...")
422
  from appwrite.query import Query
423
 
@@ -430,11 +434,11 @@ async def cleanup_old_news():
430
  ]
431
  )
432
 
433
- logger.info("📊 Found %d old articles to delete", len(response['documents']))
434
 
435
- deleted_count = 0
436
  if len(response['documents']) > 0:
437
- logger.info("🗑️ Deleting articles...")
438
 
439
  for doc in response['documents']:
440
  try:
@@ -450,14 +454,74 @@ async def cleanup_old_news():
450
  except Exception as ve:
451
  logger.warning("⚠️ Vector delete failed (non-critical): %s", ve)
452
 
453
- deleted_count += 1
454
- if deleted_count % 10 == 0:
455
- logger.info(" Progress: %d articles deleted...", deleted_count)
456
  except Exception as e:
457
  logger.error("❌ Error deleting document %s: %s", doc['$id'], e)
458
 
459
- # Clear Redis cache to force refresh from updated database
460
- logger.info("🔄 Clearing Redis cache...")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
461
  cache_service = CacheService()
462
  cache_cleared = 0
463
  for category in CATEGORIES:
@@ -470,10 +534,17 @@ async def cleanup_old_news():
470
  if cache_cleared > 0:
471
  logger.info("✅ Cache cleared for %d categories", cache_cleared)
472
 
 
 
 
 
 
473
  logger.info("")
474
  logger.info("═" * 80)
475
  logger.info("🎉 [CLEANUP JANITOR] COMPLETED!")
476
- logger.info("🗑️ Total Deleted: %d articles", deleted_count)
 
 
477
  logger.info("⏰ Retention: Articles older than %d hours removed", retention_hours)
478
  logger.info("🕐 Completion Time: %s", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
479
  logger.info("═" * 80)
 
417
  logger.info("📅 Cutoff Date: %s", cutoff_date.strftime('%Y-%m-%d %H:%M:%S'))
418
  logger.info("🗑️ Articles published before this will be deleted...")
419
 
420
+ # =========================================================================
421
+ # Step 1: Clean Regular Articles Collection
422
+ # =========================================================================
423
+ logger.info("")
424
+ logger.info("📰 [STEP 1] Cleaning regular articles...")
425
  logger.info("🔍 Querying Appwrite for old articles...")
426
  from appwrite.query import Query
427
 
 
434
  ]
435
  )
436
 
437
+ logger.info("📊 Found %d old regular articles to delete", len(response['documents']))
438
 
439
+ deleted_regular = 0
440
  if len(response['documents']) > 0:
441
+ logger.info("🗑️ Deleting regular articles...")
442
 
443
  for doc in response['documents']:
444
  try:
 
454
  except Exception as ve:
455
  logger.warning("⚠️ Vector delete failed (non-critical): %s", ve)
456
 
457
+ deleted_regular += 1
458
+ if deleted_regular % 10 == 0:
459
+ logger.info(" Progress: %d regular articles deleted...", deleted_regular)
460
  except Exception as e:
461
  logger.error("❌ Error deleting document %s: %s", doc['$id'], e)
462
 
463
+ logger.info("✅ Regular articles cleanup: %d deleted", deleted_regular)
464
+
465
+ # =========================================================================
466
+ # Step 2: Clean Cloud Articles Collection (Phase 3)
467
+ # =========================================================================
468
+ deleted_cloud = 0
469
+
470
+ # Only clean cloud collection if it's configured
471
+ if settings.APPWRITE_CLOUD_COLLECTION_ID:
472
+ logger.info("")
473
+ logger.info("☁️ [STEP 2] Cleaning cloud articles...")
474
+ logger.info("🔍 Querying Appwrite for old cloud articles...")
475
+
476
+ try:
477
+ cloud_response = appwrite_db.databases.list_documents(
478
+ database_id=settings.APPWRITE_DATABASE_ID,
479
+ collection_id=settings.APPWRITE_CLOUD_COLLECTION_ID,
480
+ queries=[
481
+ Query.less_than('published_at', cutoff_iso),
482
+ Query.limit(500)
483
+ ]
484
+ )
485
+
486
+ logger.info("📊 Found %d old cloud articles to delete", len(cloud_response['documents']))
487
+
488
+ if len(cloud_response['documents']) > 0:
489
+ logger.info("🗑️ Deleting cloud articles...")
490
+
491
+ for doc in cloud_response['documents']:
492
+ try:
493
+ appwrite_db.databases.delete_document(
494
+ database_id=settings.APPWRITE_DATABASE_ID,
495
+ collection_id=settings.APPWRITE_CLOUD_COLLECTION_ID,
496
+ document_id=doc['$id']
497
+ )
498
+
499
+ # Cleanup from ChromaDB as well
500
+ try:
501
+ vector_store.delete_vector(doc['$id'])
502
+ except Exception as ve:
503
+ logger.warning("⚠️ Vector delete failed (non-critical): %s", ve)
504
+
505
+ deleted_cloud += 1
506
+ if deleted_cloud % 10 == 0:
507
+ logger.info(" Progress: %d cloud articles deleted...", deleted_cloud)
508
+ except Exception as e:
509
+ logger.error("❌ Error deleting cloud document %s: %s", doc['$id'], e)
510
+
511
+ logger.info("✅ Cloud articles cleanup: %d deleted", deleted_cloud)
512
+
513
+ except Exception as e:
514
+ logger.warning("⚠️ Cloud collection cleanup skipped: %s", e)
515
+ logger.info("💡 Cloud collection may not exist yet - this is normal on first run")
516
+ else:
517
+ logger.info("")
518
+ logger.info("⏭️ [STEP 2] Skipping cloud articles (collection not configured)")
519
+
520
+ # =========================================================================
521
+ # Step 3: Clear Redis Cache
522
+ # =========================================================================
523
+ logger.info("")
524
+ logger.info("🔄 [STEP 3] Clearing Redis cache...")
525
  cache_service = CacheService()
526
  cache_cleared = 0
527
  for category in CATEGORIES:
 
534
  if cache_cleared > 0:
535
  logger.info("✅ Cache cleared for %d categories", cache_cleared)
536
 
537
+ # =========================================================================
538
+ # Final Summary
539
+ # =========================================================================
540
+ total_deleted = deleted_regular + deleted_cloud
541
+
542
  logger.info("")
543
  logger.info("═" * 80)
544
  logger.info("🎉 [CLEANUP JANITOR] COMPLETED!")
545
+ logger.info("🗑️ Total Deleted: %d articles", total_deleted)
546
+ logger.info(" 📰 Regular: %d", deleted_regular)
547
+ logger.info(" ☁️ Cloud: %d", deleted_cloud)
548
  logger.info("⏰ Retention: Articles older than %d hours removed", retention_hours)
549
  logger.info("🕐 Completion Time: %s", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
550
  logger.info("═" * 80)
app/services/vector_store.py CHANGED
@@ -58,7 +58,14 @@ class VectorStore:
58
 
59
  def upsert_article(self, article_data: Dict, analysis_result: str):
60
  """
61
- Convert article + analysis into vector and save to ChromaDB.
 
 
 
 
 
 
 
62
  """
63
  if not self._initialized:
64
  self._initialize()
@@ -67,47 +74,91 @@ class VectorStore:
67
  return
68
 
69
  try:
70
- # Prepare text for embedding: Title + Summary + Analysis
71
- # We treat the "analysis" as high-value semantic content
72
- combined_text = f"{article_data.get('title', '')} \n {article_data.get('description', '')} \n {analysis_result}"
 
 
 
 
 
 
 
 
 
73
 
74
  # Observability: Log what we are embedding
75
- logger.info("📝 [Index] Embedding Article: '%s'", article_data.get('title', '')[:50])
76
- logger.info(" -> Content Length: %d chars", len(combined_text))
 
77
 
78
  # Generate embedding
79
- embedding = self.embedder.encode(combined_text).tolist()
80
 
81
- # Metadata for filtering
82
  metadata = {
 
83
  "source": article_data.get('source', 'Unknown'),
84
  "category": article_data.get('category', 'General'),
85
- "published_at": str(article_data.get('published_at', '')),
86
  "url": article_data.get('url', ''),
87
- "title": article_data.get('title', ''), # NEW: Store for search retrieval
88
- "description": article_data.get('description', ''), # NEW: Store for search retrieval
89
- "image": article_data.get('image', '') # NEW: Store for search retrieval
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90
  }
91
 
 
 
 
92
  # Upsert to ChromaDB
93
  # Use Appwrite Document ID ($id) as the ChromaDB ID for 1:1 mapping
94
  doc_id = article_data.get('$id')
95
  if not doc_id:
96
- # Fallback if no ID provided (shouldn't happen with shadow path)
97
  doc_id = article_data.get('url_hash', 'unknown')
98
 
99
  self.collection.upsert(
100
  ids=[doc_id],
101
  embeddings=[embedding],
102
  metadatas=[metadata],
103
- documents=[combined_text] # Optional: store raw text for debugging
104
  )
105
 
106
- logger.info("🧠 [ChromaDB] Upserted vector for: %s", article_data.get('title')[:30])
 
 
 
 
 
107
 
108
  except Exception as e:
109
  logger.error("❌ [ChromaDB] Upsert failed: %s", e)
110
 
 
111
  def search_articles(self, query: str, limit: int = 10) -> List[Dict]:
112
  """
113
  Semantic Search: Find articles conceptually similar to the query.
 
58
 
59
  def upsert_article(self, article_data: Dict, analysis_result: str):
60
  """
61
+ Phase 3: Enhanced vector storage with rich metadata
62
+
63
+ Converts article + AI analysis into searchable vector with:
64
+ - Optimized embedding format: "{Title} : {Summary}"
65
+ - Cloud news detection
66
+ - Engagement metrics (likes, views)
67
+ - Time-aware sorting (Unix timestamp)
68
+ - Tag-based filtering (GLiNER output)
69
  """
70
  if not self._initialized:
71
  self._initialize()
 
74
  return
75
 
76
  try:
77
+ # Import HTML stripping utility
78
+ from app.utils import strip_html_if_needed
79
+ import time
80
+
81
+ # Clean text (only strips if HTML detected)
82
+ title_clean = strip_html_if_needed(article_data.get('title', ''))
83
+ desc_clean = strip_html_if_needed(article_data.get('description', ''))
84
+
85
+ # Phase 3: Optimized Combined Embedding
86
+ # Format: "{Title} : {Summary}"
87
+ # The colon separator helps the model distinguish title from body
88
+ text_to_embed = f"{title_clean} : {analysis_result}"
89
 
90
  # Observability: Log what we are embedding
91
+ logger.info("📝 [Index] Embedding Article: '%s'", title_clean[:50])
92
+ logger.info(" -> Format: '{Title} : {Summary}'")
93
+ logger.info(" -> Total Length: %d chars", len(text_to_embed))
94
 
95
  # Generate embedding
96
+ embedding = self.embedder.encode(text_to_embed).tolist()
97
 
98
+ # Phase 3: Enhanced Metadata Schema
99
  metadata = {
100
+ # Core identification
101
  "source": article_data.get('source', 'Unknown'),
102
  "category": article_data.get('category', 'General'),
 
103
  "url": article_data.get('url', ''),
104
+
105
+ # Display data (cleaned)
106
+ "title": title_clean,
107
+ "description": desc_clean,
108
+ "image": article_data.get('image', ''),
109
+
110
+ # Phase 3: Filtering & Search
111
+ "tags": article_data.get('tags', ''), # GLiNER output (comma-separated)
112
+
113
+ # Phase 3: Time-aware ranking
114
+ "timestamp": int(time.time()), # Unix timestamp (numeric, sortable)
115
+ "published_at": str(article_data.get('published_at', '')), # ISO string
116
+
117
+ # Phase 3: Future features
118
+ "audio_url": "", # Placeholder for TTS
119
+
120
+ # Phase 3: Cloud detection
121
+ "is_cloud_news": article_data.get('is_cloud_news', False),
122
+ "cloud_provider": article_data.get('cloud_provider', ''), # "aws", "azure", etc.
123
+ "is_official": article_data.get('is_official', False), # True if official blog
124
+
125
+ # Phase 3: Engagement metrics (for ranking)
126
+ "likes": article_data.get('likes', 0),
127
+ "dislikes": article_data.get('dislikes', 0),
128
+ "views": article_data.get('views', 0),
129
+
130
+ # Phase 3: Schema versioning
131
+ "processing_version": "v2_phase3"
132
  }
133
 
134
+ # Phase 3: Document field = Llama-3 summary ONLY (not original HTML)
135
+ document = analysis_result
136
+
137
  # Upsert to ChromaDB
138
  # Use Appwrite Document ID ($id) as the ChromaDB ID for 1:1 mapping
139
  doc_id = article_data.get('$id')
140
  if not doc_id:
141
+ # Fallback if no ID provided
142
  doc_id = article_data.get('url_hash', 'unknown')
143
 
144
  self.collection.upsert(
145
  ids=[doc_id],
146
  embeddings=[embedding],
147
  metadatas=[metadata],
148
+ documents=[document]
149
  )
150
 
151
+ # Enhanced logging
152
+ cloud_status = "☁️ CLOUD" if metadata['is_cloud_news'] else "📰 REGULAR"
153
+ logger.info("🧠 [ChromaDB] Upserted: %s | %s | Tags: %s",
154
+ title_clean[:30],
155
+ cloud_status,
156
+ metadata['tags'][:30] if metadata['tags'] else 'None')
157
 
158
  except Exception as e:
159
  logger.error("❌ [ChromaDB] Upsert failed: %s", e)
160
 
161
+
162
  def search_articles(self, query: str, limit: int = 10) -> List[Dict]:
163
  """
164
  Semantic Search: Find articles conceptually similar to the query.
app/utils.py ADDED
@@ -0,0 +1,162 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Utility Functions for Segmento Pulse
3
+ Provides common helpers for text processing, HTML cleaning, and data transformation
4
+ """
5
+
6
+ import re
7
+ from html import unescape
8
+
9
+
10
+ def strip_html_if_needed(text: str) -> str:
11
+ """
12
+ Intelligently strip HTML only if HTML tags are detected.
13
+
14
+ This optimization avoids unnecessary regex processing when text is already clean.
15
+ RSS feeds can return either plain text or HTML - we handle both efficiently.
16
+
17
+ Args:
18
+ text: Input text (may or may not contain HTML)
19
+
20
+ Returns:
21
+ Cleaned text without HTML tags or entities
22
+
23
+ Examples:
24
+ >>> strip_html_if_needed("Plain text")
25
+ 'Plain text'
26
+
27
+ >>> strip_html_if_needed("<b>Bold</b> text")
28
+ 'Bold text'
29
+
30
+ >>> strip_html_if_needed("AT&amp;T announces...")
31
+ 'AT&T announces...'
32
+ """
33
+ if not text:
34
+ return ""
35
+
36
+ # Quick check: does this text have HTML?
37
+ # This avoids expensive regex on plain text
38
+ if '<' not in text and '>' not in text and '&' not in text:
39
+ return text.strip() # Already clean!
40
+
41
+ # HTML detected - perform full cleanup
42
+
43
+ # Step 1: Remove HTML tags
44
+ text = re.sub(r'<[^>]+>', '', text)
45
+
46
+ # Step 2: Decode HTML entities (&amp; → &, &lt; → <, etc.)
47
+ text = unescape(text)
48
+
49
+ # Step 3: Clean excessive whitespace
50
+ text = re.sub(r'\s+', ' ', text).strip()
51
+
52
+ return text
53
+
54
+
55
+ def detect_html(text: str) -> bool:
56
+ """
57
+ Quickly detect if text contains HTML markup.
58
+
59
+ Args:
60
+ text: Text to check
61
+
62
+ Returns:
63
+ True if HTML tags detected, False otherwise
64
+ """
65
+ if not text:
66
+ return False
67
+
68
+ return '<' in text or '>' in text
69
+
70
+
71
+ def truncate_text(text: str, max_length: int = 200, suffix: str = "...") -> str:
72
+ """
73
+ Safely truncate text to maximum length.
74
+
75
+ Args:
76
+ text: Text to truncate
77
+ max_length: Maximum length (default: 200)
78
+ suffix: Suffix to add if truncated (default: "...")
79
+
80
+ Returns:
81
+ Truncated text
82
+ """
83
+ if not text or len(text) <= max_length:
84
+ return text
85
+
86
+ return text[:max_length - len(suffix)].strip() + suffix
87
+
88
+
89
+ def normalize_url(url: str) -> str:
90
+ """
91
+ Normalize URL for deduplication.
92
+
93
+ - Converts to lowercase
94
+ - Removes trailing slashes
95
+ - Strips whitespace
96
+
97
+ Args:
98
+ url: URL to normalize
99
+
100
+ Returns:
101
+ Normalized URL
102
+ """
103
+ if not url:
104
+ return ""
105
+
106
+ return url.strip().rstrip('/').lower()
107
+
108
+
109
+ def extract_domain(url: str) -> str:
110
+ """
111
+ Extract domain from URL.
112
+
113
+ Args:
114
+ url: Full URL
115
+
116
+ Returns:
117
+ Domain name (e.g., "techcrunch.com")
118
+ """
119
+ import re
120
+
121
+ # Remove protocol
122
+ domain = re.sub(r'^https?://', '', url)
123
+
124
+ # Remove path
125
+ domain = domain.split('/')[0]
126
+
127
+ # Remove www.
128
+ domain = domain.replace('www.', '')
129
+
130
+ return domain.lower()
131
+
132
+
133
+ def comma_separated_to_list(text: str) -> list:
134
+ """
135
+ Convert comma-separated string to list.
136
+
137
+ Args:
138
+ text: Comma-separated string (e.g., "AI,Tech,Cloud")
139
+
140
+ Returns:
141
+ List of strings (e.g., ["AI", "Tech", "Cloud"])
142
+ """
143
+ if not text:
144
+ return []
145
+
146
+ return [item.strip() for item in text.split(',') if item.strip()]
147
+
148
+
149
+ def list_to_comma_separated(items: list) -> str:
150
+ """
151
+ Convert list to comma-separated string.
152
+
153
+ Args:
154
+ items: List of strings
155
+
156
+ Returns:
157
+ Comma-separated string
158
+ """
159
+ if not items:
160
+ return ""
161
+
162
+ return ",".join(str(item).strip() for item in items if item)