faizee07 commited on
Commit
02fe9c6
·
verified ·
1 Parent(s): 2ae2c3e

Create workflows/rss_poller.py

Browse files
Files changed (1) hide show
  1. workflows/rss_poller.py +67 -0
workflows/rss_poller.py ADDED
@@ -0,0 +1,67 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from prefect import flow, task
2
+ import feedparser
3
+ from typing import List, Dict
4
+ from datetime import datetime
5
+ from database.operations import is_entry_processed, save_rss_entry, SessionLocal
6
+ from config.settings import settings
7
+ import hashlib
8
+
9
+ @task(retries=3, retry_delay_seconds=60)
10
+ def fetch_rss_feed(feed_url: str) -> List[Dict]:
11
+ """Fetch and parse RSS feed"""
12
+ feed = feedparser.parse(feed_url)
13
+ entries = []
14
+
15
+ for entry in feed.entries:
16
+ # Create unique ID for entry
17
+ entry_id = hashlib.md5(
18
+ f"{feed_url}{entry.get('link', '')}".encode()
19
+ ).hexdigest()
20
+
21
+ entries.append({
22
+ "entry_id": entry_id,
23
+ "feed_url": feed_url,
24
+ "title": entry.get("title", ""),
25
+ "link": entry.get("link", ""),
26
+ "published": datetime(*entry.published_parsed[:6]) if hasattr(entry, 'published_parsed') else datetime.utcnow(),
27
+ "content": entry.get("summary", "") or entry.get("description", "")
28
+ })
29
+
30
+ return entries
31
+
32
+ @task
33
+ def filter_new_entries(entries: List[Dict]) -> List[Dict]:
34
+ """Filter out already processed entries"""
35
+ db = SessionLocal()
36
+ new_entries = []
37
+
38
+ try:
39
+ for entry in entries:
40
+ if not is_entry_processed(db, entry["entry_id"]):
41
+ # Save to database
42
+ save_rss_entry(
43
+ db,
44
+ entry_id=entry["entry_id"],
45
+ feed_url=entry["feed_url"],
46
+ title=entry["title"],
47
+ link=entry["link"],
48
+ published=entry["published"],
49
+ content=entry["content"]
50
+ )
51
+ new_entries.append(entry)
52
+ finally:
53
+ db.close()
54
+
55
+ return new_entries
56
+
57
+ @flow(name="RSS Feed Polling")
58
+ def poll_rss_feeds():
59
+ """Poll all RSS feeds and return new entries"""
60
+ all_new_entries = []
61
+
62
+ for feed_url in settings.rss_feed_list:
63
+ entries = fetch_rss_feed(feed_url)
64
+ new_entries = filter_new_entries(entries)
65
+ all_new_entries.extend(new_entries)
66
+
67
+ return all_new_entries