amplify / backend /rd_pipeline_bdata.py
github-actions
Sync from GitHub Fri Dec 26 12:29:52 UTC 2025
aff341e
from flexible_blog_database import FlexibleBlogDatabase
import os, time, logging, requests, json
from typing import List, Dict, Optional
from llm_agent import process_story
from brightdata_api import reddit_search_api, scrape_and_download_reddit
from supabase_api import insert_blog_post
from collections import OrderedDict
import datetime
from dotenv import load_dotenv
load_dotenv()
logger = logging.getLogger("rd_pipeline_bdata")
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"))
url_category_mapping = OrderedDict({
"Artificial Intelligence": "https://www.reddit.com/r/ArtificialInteligence/",
"Social": "https://www.reddit.com/r/TrueOffMyChest/",
"Other": "https://www.reddit.com/r/relationship_advice/",
"Movies": "https://www.reddit.com/r/movies/",
"Other": "https://www.reddit.com/r/stories/",
"Developers": "https://www.reddit.com/r/developersIndia/",
"AI Agents": "https://www.reddit.com/r/aiagents/"
})
def scrape_and_download_reddit(url="https://www.reddit.com/r/ArtificialInteligence/"):
reddit_response = reddit_search_api(url)
if not reddit_response or reddit_response.get("total_found", 0) == 0:
print("No posts found or error occurred during Reddit search.")
return None
return reddit_response
def find_best_post(posts_dict):
"""Return post indexes in descending order based on scoring"""
posts_info = posts_dict
if not posts_info:
raise ValueError("No posts found from Reddit API.")
# weight configuration (tweak as desired)
weights = {
"length": 0.3, # weight for length of post_content
"ups": 0.3, # weight for ups
"comments": 0.2, # weight for num_comments
"ratio": 0.2 # weight for upvote_ratio
}
# calculate maxima for normalization
len_max = max(len(p["description"]) if p["description"] else 0 for p in posts_info) or 1
ups_max = max(p["upvotes"] or 0 for p in posts_info) or 1
comments_max = max(p["num_comments"] or 0 for p in posts_info) or 1
def score(post):
length_score = (len(post["description"]) if post["description"] else 0) / len_max
ups_score = (post["upvotes"] or 0) / ups_max
comments_score = (post["num_comments"] or 0) / comments_max
return (weights["length"] * length_score +
weights["ups"] * ups_score +
weights["comments"] * comments_score)
# Get scores for each post and sort indexes
scored_indexes = sorted(
range(len(posts_info)),
key=lambda idx: score(posts_info[idx]),
reverse=True
)
return scored_indexes
def process_and_store_post(user_input=None, max_trials=5):
"""
Simplified + optimized:
- If user_input given, process it directly.
- Else fetch Reddit posts, try top candidates until one succeeds.
"""
if user_input:
raw_story = user_input
meta = {"title": "User Provided Story", "author": "anonymous"}
result = process_story(raw_story, enhanced=False)
else:
today = datetime.date.today()
weekday_python = today.weekday()
category_list = list(url_category_mapping.keys())
category_index = weekday_python % len(category_list)
response_bd = scrape_and_download_reddit(url=url_category_mapping[category_list[category_index]])
posts = response_bd['parsed_posts'] if response_bd else []
if not posts:
logger.warning("No Reddit posts available after retries; aborting.")
return None
order = find_best_post(posts)
result = None
meta = None
for idx in order[:max_trials]:
post = posts[idx]
content = post.get("description")
if not content:
continue
try:
result = process_story(content, enhanced=False)
raw_story = content
meta = post
break
except Exception:
continue
if result is None or not meta:
logger.error("Could not process any candidate post.")
return None
if not result or not meta:
return None
print(f"Story Preview:\n{result['polished_story'][:500]}...")
keywords = result.get("keywords") or []
if keywords:
print("Keywords:", ", ".join(keywords))
write_data = {
"title": meta.get("title"),
"content": result.get("polished_story", ""),
"author": meta.get("user_posted"),
"tags": result.get("keywords", []), # Fixed: use .get() with default empty list
"created_at": meta.get("date_posted"), # Fixed: use date_posted instead of timestamp
"category": category_list[category_index] # Added category field
}
write_response = insert_blog_post(write_data)
reddit_done = f"Data written to Supabase with response: {write_response}"
return reddit_done
if __name__ == "__main__":
process_and_store_post()