from flexible_blog_database import FlexibleBlogDatabase import os, time, logging, requests, json from typing import List, Dict, Optional from llm_agent import process_story from brightdata_api import reddit_search_api, scrape_and_download_reddit from supabase_api import insert_blog_post from collections import OrderedDict import datetime from dotenv import load_dotenv load_dotenv() logger = logging.getLogger("rd_pipeline_bdata") logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO")) url_category_mapping = OrderedDict({ "Artificial Intelligence": "https://www.reddit.com/r/ArtificialInteligence/", "Social": "https://www.reddit.com/r/TrueOffMyChest/", "Other": "https://www.reddit.com/r/relationship_advice/", "Movies": "https://www.reddit.com/r/movies/", "Other": "https://www.reddit.com/r/stories/", "Developers": "https://www.reddit.com/r/developersIndia/", "AI Agents": "https://www.reddit.com/r/aiagents/" }) def scrape_and_download_reddit(url="https://www.reddit.com/r/ArtificialInteligence/"): reddit_response = reddit_search_api(url) if not reddit_response or reddit_response.get("total_found", 0) == 0: print("No posts found or error occurred during Reddit search.") return None return reddit_response def find_best_post(posts_dict): """Return post indexes in descending order based on scoring""" posts_info = posts_dict if not posts_info: raise ValueError("No posts found from Reddit API.") # weight configuration (tweak as desired) weights = { "length": 0.3, # weight for length of post_content "ups": 0.3, # weight for ups "comments": 0.2, # weight for num_comments "ratio": 0.2 # weight for upvote_ratio } # calculate maxima for normalization len_max = max(len(p["description"]) if p["description"] else 0 for p in posts_info) or 1 ups_max = max(p["upvotes"] or 0 for p in posts_info) or 1 comments_max = max(p["num_comments"] or 0 for p in posts_info) or 1 def score(post): length_score = (len(post["description"]) if post["description"] else 0) / len_max ups_score = (post["upvotes"] or 0) / ups_max comments_score = (post["num_comments"] or 0) / comments_max return (weights["length"] * length_score + weights["ups"] * ups_score + weights["comments"] * comments_score) # Get scores for each post and sort indexes scored_indexes = sorted( range(len(posts_info)), key=lambda idx: score(posts_info[idx]), reverse=True ) return scored_indexes def process_and_store_post(user_input=None, max_trials=5): """ Simplified + optimized: - If user_input given, process it directly. - Else fetch Reddit posts, try top candidates until one succeeds. """ if user_input: raw_story = user_input meta = {"title": "User Provided Story", "author": "anonymous"} result = process_story(raw_story, enhanced=False) else: today = datetime.date.today() weekday_python = today.weekday() category_list = list(url_category_mapping.keys()) category_index = weekday_python % len(category_list) response_bd = scrape_and_download_reddit(url=url_category_mapping[category_list[category_index]]) posts = response_bd['parsed_posts'] if response_bd else [] if not posts: logger.warning("No Reddit posts available after retries; aborting.") return None order = find_best_post(posts) result = None meta = None for idx in order[:max_trials]: post = posts[idx] content = post.get("description") if not content: continue try: result = process_story(content, enhanced=False) raw_story = content meta = post break except Exception: continue if result is None or not meta: logger.error("Could not process any candidate post.") return None if not result or not meta: return None print(f"Story Preview:\n{result['polished_story'][:500]}...") keywords = result.get("keywords") or [] if keywords: print("Keywords:", ", ".join(keywords)) write_data = { "title": meta.get("title"), "content": result.get("polished_story", ""), "author": meta.get("user_posted"), "tags": result.get("keywords", []), # Fixed: use .get() with default empty list "created_at": meta.get("date_posted"), # Fixed: use date_posted instead of timestamp "category": category_list[category_index] # Added category field } write_response = insert_blog_post(write_data) reddit_done = f"Data written to Supabase with response: {write_response}" return reddit_done if __name__ == "__main__": process_and_store_post()