RSS_News_1 / rss_processor.py
broadfield-dev's picture
Update rss_processor.py
804e02f verified
import os
import feedparser
from chromadb import PersistentClient
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_core.documents import Document
import logging
from huggingface_hub import HfApi, login, snapshot_download
from datetime import datetime
import dateutil.parser
import hashlib
import json
import re
import requests
import pandas as pd
from datasets import Dataset, load_dataset, concatenate_datasets
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
LOCAL_DB_DIR = "chroma_db"
FEEDS_FILE = "rss_feeds.json"
COLLECTION_NAME = "news_articles"
HF_API_TOKEN = os.getenv("HF_TOKEN")
REPO_ID = "broadfield-dev/news-rag-db"
DATASET_REPO_ID = "broadfield-dev/RSS-DATASET"
MAX_ARTICLES_PER_FEED = 1000
RAW_FEEDS_DIR = "raw_rss_feeds"
def initialize_hf_api():
if not HF_API_TOKEN:
logger.error("Hugging Face API token (HF_TOKEN) not set.")
raise ValueError("HF_TOKEN environment variable is not set.")
try:
login(token=HF_API_TOKEN)
return HfApi()
except Exception as e:
logger.error(f"Failed to login to Hugging Face Hub: {e}")
raise
hf_api = initialize_hf_api()
def get_embedding_model():
if not hasattr(get_embedding_model, "model"):
get_embedding_model.model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
return get_embedding_model.model
def clean_text(html_text):
"""
Cleans HTML text by prioritizing content within <p> tags,
then falling back to stripping all HTML tags.
"""
if not html_text or not isinstance(html_text, str):
return ""
# If <p> tags are present, extract their content
if '<p>' in html_text.lower():
p_contents = re.findall(r'<p>(.*?)</p>', html_text, re.DOTALL | re.IGNORECASE)
if p_contents:
# Join the content of all p tags and then strip any remaining inner HTML tags
text = ' '.join(p_contents)
text = re.sub(r'<.*?>', '', text) # Cleans tags like <i>, <a>
return ' '.join(text.split()).strip()
# Fallback for descriptions without <p> tags or if regex fails
text = re.sub(r'<.*?>', '', html_text)
return ' '.join(text.split()).strip()
def save_raw_rss_to_file(feed_url, content):
if not os.path.exists(RAW_FEEDS_DIR):
os.makedirs(RAW_FEEDS_DIR)
filename = re.sub(r'[^a-zA-Z0-9]', '_', feed_url) + ".xml"
filepath = os.path.join(RAW_FEEDS_DIR, filename)
try:
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
logger.info(f"Saved raw RSS from {feed_url} to {filepath}")
except Exception as e:
logger.error(f"Could not save raw RSS from {feed_url}: {e}")
def fetch_rss_feeds():
articles = []
seen_links = set()
try:
with open(FEEDS_FILE, 'r') as f:
feed_categories = json.load(f)
except FileNotFoundError:
logger.error(f"{FEEDS_FILE} not found. No feeds to process.")
return []
for category, feeds in feed_categories.items():
for feed_info in feeds:
feed_url = feed_info.get("url")
if not feed_url:
logger.warning(f"Skipping feed with no URL in category '{category}'")
continue
try:
logger.info(f"Fetching {feed_url}")
response = requests.get(feed_url, headers={'User-Agent': 'Mozilla/5.0'})
response.raise_for_status()
raw_content = response.text
save_raw_rss_to_file(feed_url, raw_content)
feed = feedparser.parse(raw_content)
if feed.bozo:
logger.warning(f"Parse error for {feed_url}: {feed.bozo_exception}")
continue
for entry in feed.entries[:MAX_ARTICLES_PER_FEED]:
link = entry.get("link", "")
if not link or link in seen_links:
continue
seen_links.add(link)
title = entry.get("title", "No Title")
# Prioritize content:encoded, then summary, then description
description_raw = ""
if 'content' in entry and entry.content:
description_raw = entry.content[0].get('value', '')
if not description_raw:
description_raw = entry.get("summary", entry.get("description", ""))
description = clean_text(description_raw)
if not description:
continue
# Expanded date fields to check
published_str = "Unknown Date"
for date_field in ["published", "updated", "created", "pubDate", "dc:date"]:
if date_field in entry:
try:
parsed_date = dateutil.parser.parse(entry[date_field])
published_str = parsed_date.isoformat()
break
except (ValueError, TypeError, AttributeError):
continue
# Prioritized and expanded image sources
image = "svg" # Default fallback image
image_sources = [
lambda e: e.get("media_thumbnail", [{}])[0].get("url") if e.get("media_thumbnail") else None,
lambda e: e.get("media_content", [{}])[0].get("url") if e.get("media_content") else None,
lambda e: e.get("enclosure", {}).get("url") if e.get("enclosure") and e.get("enclosure", {}).get('type', '').startswith('image') else None,
lambda e: next((lnk.get("href") for lnk in e.get("links", []) if lnk.get("type", "").startswith("image")), None),
]
for source_func in image_sources:
try:
img_url = source_func(entry)
if img_url and isinstance(img_url, str) and img_url.strip():
image = img_url
break
except (IndexError, AttributeError, TypeError):
continue
articles.append({
"title": title,
"link": link,
"description": description,
"published": published_str,
"category": category,
"image": image,
})
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching {feed_url}: {e}")
except Exception as e:
logger.error(f"Error processing {feed_url}: {e}")
logger.info(f"Total unique articles fetched: {len(articles)}")
return articles
def process_and_store_articles(articles):
if not os.path.exists(LOCAL_DB_DIR):
os.makedirs(LOCAL_DB_DIR)
client = PersistentClient(path=LOCAL_DB_DIR)
collection = client.get_or_create_collection(name=COLLECTION_NAME)
try:
existing_ids = set(collection.get(include=[])["ids"])
logger.info(f"Loaded {len(existing_ids)} existing document IDs from {LOCAL_DB_DIR}.")
except Exception:
logger.info("No existing DB found or it is empty. Starting fresh.")
existing_ids = set()
contents_to_add = []
metadatas_to_add = []
ids_to_add = []
rss_dataset_store = []
for article in articles:
if not article.get('link'):
continue
doc_id = hashlib.sha256(article['link'].encode('utf-8')).hexdigest()
if doc_id in existing_ids:
continue
metadata = {
"title": article["title"],
"link": article["link"],
"published": article["published"],
"category": article["category"],
"image": article["image"],
}
contents_to_add.append(article["description"])
metadatas_to_add.append(metadata)
ids_to_add.append(doc_id)
rss_dataset_json = {
"id": doc_id,
"published": article["published"],
"title": article["title"],
"description": article["description"],
"link": article["link"],
"category": article["category"],
"image": article["image"],
}
rss_dataset_store.append(rss_dataset_json)
with open('local_rss_store.json', 'w') as f:
f.write(json.dumps(rss_dataset_store))
f.close()
if ids_to_add:
logger.info(f"Found {len(ids_to_add)} new articles to add to the database.")
try:
embedding_model = get_embedding_model()
embeddings_to_add = embedding_model.embed_documents(contents_to_add)
collection.add(
embeddings=embeddings_to_add,
documents=contents_to_add,
metadatas=metadatas_to_add,
ids=ids_to_add
)
logger.info(f"Successfully added {len(ids_to_add)} new articles to DB. Total in DB: {collection.count()}")
except Exception as e:
logger.error(f"Error storing articles in ChromaDB: {e}", exc_info=True)
else:
logger.info("No new articles to add to the database.")
def download_from_hf_hub():
if not os.path.exists(os.path.join(LOCAL_DB_DIR, "chroma.sqlite3")):
try:
logger.info(f"Downloading Chroma DB from {REPO_ID} to {LOCAL_DB_DIR}...")
snapshot_download(
repo_id=REPO_ID,
repo_type="dataset",
local_dir=".",
local_dir_use_symlinks=False,
allow_patterns=[f"{LOCAL_DB_DIR}/**"],
token=HF_API_TOKEN
)
logger.info("Finished downloading DB.")
except Exception as e:
logger.warning(f"Could not download from Hugging Face Hub (this is normal on first run): {e}")
else:
logger.info(f"Local Chroma DB found at '{LOCAL_DB_DIR}', skipping download.")
def upload_to_hf_hub():
commit_message = f"Update RSS news database and raw feeds {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
if os.path.exists(LOCAL_DB_DIR):
try:
logger.info(f"Uploading updated Chroma DB '{LOCAL_DB_DIR}' to {REPO_ID}...")
hf_api.upload_folder(
folder_path=LOCAL_DB_DIR, path_in_repo=LOCAL_DB_DIR, repo_id=REPO_ID,
repo_type="dataset", commit_message=commit_message, ignore_patterns=["*.bak", "*.tmp"]
)
logger.info(f"Database folder '{LOCAL_DB_DIR}' uploaded to: {REPO_ID}")
except Exception as e:
logger.error(f"Error uploading Chroma DB to Hugging Face Hub: {e}", exc_info=True)
if os.path.exists(RAW_FEEDS_DIR):
try:
logger.info(f"Uploading raw RSS feeds from '{RAW_FEEDS_DIR}' to {REPO_ID}...")
hf_api.upload_folder(
folder_path=RAW_FEEDS_DIR, path_in_repo=RAW_FEEDS_DIR, repo_id=REPO_ID,
repo_type="dataset", commit_message=commit_message
)
logger.info(f"Raw feeds folder '{RAW_FEEDS_DIR}' uploaded to: {REPO_ID}")
except Exception as e:
logger.error(f"Error uploading raw feeds to Hugging Face Hub: {e}", exc_info=True)
try:
logger.info(f"Processing RSS feeds for {DATASET_REPO_ID}...")
# 1. Load Local JSON
with open('local_rss_store.json', 'r') as f:
json_list = json.load(f)
if not json_list:
logger.info("No local RSS data to upload.")
# return # Optional: Exit if empty
else:
# Create a HF Dataset object from the new local data
new_dataset = Dataset.from_list(json_list)
# 2. Try to Load Existing Dataset from the Hub
try:
# We load the existing dataset to append to it
existing_dataset = load_dataset(DATASET_REPO_ID, split="train")
logger.info(f"Found existing dataset with {len(existing_dataset)} rows.")
# OPTIONAL: Align features (columns) if RSS structure changes
# new_dataset = new_dataset.cast(existing_dataset.features)
# 3. Concatenate (Append)
final_dataset = concatenate_datasets([existing_dataset, new_dataset])
logger.info(f"Appending {len(new_dataset)} new rows. Total size: {len(final_dataset)}")
except Exception as e:
# If dataset doesn't exist yet, start fresh
logger.info(f"No existing dataset found (or error loading). Creating new. Details: {e}")
final_dataset = new_dataset
# 4. Push the Unified Dataset back to Hub
# This updates the main parquet file(s) cleanly
final_dataset.push_to_hub(DATASET_REPO_ID)
logger.info(f"Successfully pushed updated dataset to {DATASET_REPO_ID}")
except Exception as e:
logger.error(f"Error appending RSS feeds to Hugging Face Hub: {e}", exc_info=True)
def main():
try:
download_from_hf_hub()
articles_to_process = fetch_rss_feeds()
if articles_to_process:
process_and_store_articles(articles_to_process)
upload_to_hf_hub()
else:
logger.info("No articles fetched, skipping database processing and upload.")
except Exception as e:
logger.critical(f"An unhandled error occurred in main execution: {e}", exc_info=True)
if __name__ == "__main__":
main()