Spaces:
Sleeping
Sleeping
| from flexible_blog_database import FlexibleBlogDatabase | |
| import os, time, logging, requests, json | |
| from typing import List, Dict, Optional | |
| from llm_agent import process_story | |
| from brightdata_api import reddit_search_api, scrape_and_download_reddit | |
| from supabase_api import insert_blog_post | |
| from collections import OrderedDict | |
| import datetime | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| logger = logging.getLogger("rd_pipeline_bdata") | |
| logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO")) | |
| url_category_mapping = OrderedDict({ | |
| "Artificial Intelligence": "https://www.reddit.com/r/ArtificialInteligence/", | |
| "Social": "https://www.reddit.com/r/TrueOffMyChest/", | |
| "Other": "https://www.reddit.com/r/relationship_advice/", | |
| "Movies": "https://www.reddit.com/r/movies/", | |
| "Other": "https://www.reddit.com/r/stories/", | |
| "Developers": "https://www.reddit.com/r/developersIndia/", | |
| "AI Agents": "https://www.reddit.com/r/aiagents/" | |
| }) | |
| def scrape_and_download_reddit(url="https://www.reddit.com/r/ArtificialInteligence/"): | |
| reddit_response = reddit_search_api(url) | |
| if not reddit_response or reddit_response.get("total_found", 0) == 0: | |
| print("No posts found or error occurred during Reddit search.") | |
| return None | |
| return reddit_response | |
| def find_best_post(posts_dict): | |
| """Return post indexes in descending order based on scoring""" | |
| posts_info = posts_dict | |
| if not posts_info: | |
| raise ValueError("No posts found from Reddit API.") | |
| # weight configuration (tweak as desired) | |
| weights = { | |
| "length": 0.3, # weight for length of post_content | |
| "ups": 0.3, # weight for ups | |
| "comments": 0.2, # weight for num_comments | |
| "ratio": 0.2 # weight for upvote_ratio | |
| } | |
| # calculate maxima for normalization | |
| len_max = max(len(p["description"]) if p["description"] else 0 for p in posts_info) or 1 | |
| ups_max = max(p["upvotes"] or 0 for p in posts_info) or 1 | |
| comments_max = max(p["num_comments"] or 0 for p in posts_info) or 1 | |
| def score(post): | |
| length_score = (len(post["description"]) if post["description"] else 0) / len_max | |
| ups_score = (post["upvotes"] or 0) / ups_max | |
| comments_score = (post["num_comments"] or 0) / comments_max | |
| return (weights["length"] * length_score + | |
| weights["ups"] * ups_score + | |
| weights["comments"] * comments_score) | |
| # Get scores for each post and sort indexes | |
| scored_indexes = sorted( | |
| range(len(posts_info)), | |
| key=lambda idx: score(posts_info[idx]), | |
| reverse=True | |
| ) | |
| return scored_indexes | |
| def process_and_store_post(user_input=None, max_trials=5): | |
| """ | |
| Simplified + optimized: | |
| - If user_input given, process it directly. | |
| - Else fetch Reddit posts, try top candidates until one succeeds. | |
| """ | |
| if user_input: | |
| raw_story = user_input | |
| meta = {"title": "User Provided Story", "author": "anonymous"} | |
| result = process_story(raw_story, enhanced=False) | |
| else: | |
| today = datetime.date.today() | |
| weekday_python = today.weekday() | |
| category_list = list(url_category_mapping.keys()) | |
| category_index = weekday_python % len(category_list) | |
| response_bd = scrape_and_download_reddit(url=url_category_mapping[category_list[category_index]]) | |
| posts = response_bd['parsed_posts'] if response_bd else [] | |
| if not posts: | |
| logger.warning("No Reddit posts available after retries; aborting.") | |
| return None | |
| order = find_best_post(posts) | |
| result = None | |
| meta = None | |
| for idx in order[:max_trials]: | |
| post = posts[idx] | |
| content = post.get("description") | |
| if not content: | |
| continue | |
| try: | |
| result = process_story(content, enhanced=False) | |
| raw_story = content | |
| meta = post | |
| break | |
| except Exception: | |
| continue | |
| if result is None or not meta: | |
| logger.error("Could not process any candidate post.") | |
| return None | |
| if not result or not meta: | |
| return None | |
| print(f"Story Preview:\n{result['polished_story'][:500]}...") | |
| keywords = result.get("keywords") or [] | |
| if keywords: | |
| print("Keywords:", ", ".join(keywords)) | |
| write_data = { | |
| "title": meta.get("title"), | |
| "content": result.get("polished_story", ""), | |
| "author": meta.get("user_posted"), | |
| "tags": result.get("keywords", []), # Fixed: use .get() with default empty list | |
| "created_at": meta.get("date_posted"), # Fixed: use date_posted instead of timestamp | |
| "category": category_list[category_index] # Added category field | |
| } | |
| write_response = insert_blog_post(write_data) | |
| reddit_done = f"Data written to Supabase with response: {write_response}" | |
| return reddit_done | |
| if __name__ == "__main__": | |
| process_and_store_post() |