Spaces:
Sleeping
Sleeping
| from supabase_auth import datetime | |
| from flexible_blog_database import FlexibleBlogDatabase | |
| from supabase_api import insert_blog_post | |
| import os, time, logging, requests, json | |
| from typing import List, Dict, Optional | |
| from llm_agent import process_story | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| logger = logging.getLogger("rd_pipeline") | |
| logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO")) | |
| def fetch_reddit_posts(url: Optional[str] = None, attempts: int = 3, backoff_base: float = 2.0) -> List[Dict]: | |
| """Fetch recent posts from Reddit with retry + fallback. | |
| Returns list (may be empty). No exceptions bubble for HTTP/JSON failures. | |
| """ | |
| if not url: | |
| url = "https://www.reddit.com/r/TrueOffMyChest/hot.json?limit=25&raw_json=1" | |
| headers = { | |
| "User-Agent": os.getenv( | |
| "REDDIT_USER_AGENT", | |
| "script:amplify.rd_pipeline:v1.0 (by u/exampleuser contact: noreply@example.com)" | |
| ), | |
| "Accept": "application/json", | |
| "Accept-Encoding": "gzip, deflate, br", | |
| "Connection": "close" | |
| } | |
| last_err: Optional[Exception] = None | |
| for attempt in range(1, attempts + 1): | |
| try: | |
| resp = requests.get(url, headers=headers, timeout=15) | |
| status = resp.status_code | |
| if status == 200: | |
| try: | |
| reddit_data = resp.json() | |
| except ValueError as e: | |
| last_err = e | |
| logger.warning("JSON decode failed (attempt %s): %s", attempt, e) | |
| else: | |
| posts_info = [ | |
| { | |
| "title": c["data"].get("title"), | |
| "post_content": c["data"].get("selftext"), | |
| "author": c["data"].get("author"), | |
| "upvote_ratio": c["data"].get("upvote_ratio"), | |
| "ups": c["data"].get("ups"), | |
| "num_comments": c["data"].get("num_comments"), | |
| } | |
| for c in reddit_data.get("data", {}).get("children", []) | |
| ] | |
| logger.info("Fetched %d Reddit posts", len(posts_info)) | |
| return posts_info | |
| elif status in (403, 429): | |
| logger.warning("Reddit returned %s (attempt %s/%s)", status, attempt, attempts) | |
| else: | |
| logger.warning("Unexpected status %s (attempt %s/%s)", status, attempt, attempts) | |
| time.sleep(backoff_base ** (attempt - 1)) | |
| except (requests.RequestException, Exception) as e: | |
| last_err = e | |
| logger.warning("Error fetching Reddit posts (attempt %s/%s): %s", attempt, attempts, e) | |
| time.sleep(backoff_base ** (attempt - 1)) | |
| fallback_path = os.getenv("REDDIT_FALLBACK_JSON") | |
| if fallback_path and os.path.isfile(fallback_path): | |
| try: | |
| with open(fallback_path, "r", encoding="utf-8") as f: | |
| cached = json.load(f) | |
| posts_info = [ | |
| { | |
| "title": c["data"].get("title"), | |
| "post_content": c["data"].get("selftext"), | |
| "author": c["data"].get("author"), | |
| "upvote_ratio": c["data"].get("upvote_ratio"), | |
| "ups": c["data"].get("ups"), | |
| "num_comments": c["data"].get("num_comments"), | |
| } | |
| for c in cached.get("data", {}).get("children", []) | |
| ] | |
| logger.info("Loaded %d posts from fallback JSON", len(posts_info)) | |
| return posts_info | |
| except Exception as e: | |
| logger.error("Failed reading fallback JSON: %s", e) | |
| logger.error("All Reddit fetch attempts failed. Last error: %s", last_err) | |
| return [] | |
| def find_best_post(posts_dict): | |
| """Return post indexes in descending order based on scoring""" | |
| posts_info = posts_dict | |
| if not posts_info: | |
| raise ValueError("No posts found from Reddit API.") | |
| # weight configuration (tweak as desired) | |
| weights = { | |
| "length": 0.3, # weight for length of post_content | |
| "ups": 0.3, # weight for ups | |
| "comments": 0.2, # weight for num_comments | |
| "ratio": 0.2 # weight for upvote_ratio | |
| } | |
| # calculate maxima for normalization | |
| len_max = max(len(p["post_content"]) if p["post_content"] else 0 for p in posts_info) or 1 | |
| ups_max = max(p["ups"] or 0 for p in posts_info) or 1 | |
| comments_max = max(p["num_comments"] or 0 for p in posts_info) or 1 | |
| def score(post): | |
| length_score = (len(post["post_content"]) if post["post_content"] else 0) / len_max | |
| ups_score = (post["ups"] or 0) / ups_max | |
| comments_score = (post["num_comments"] or 0) / comments_max | |
| ratio_score = post["upvote_ratio"] or 0 | |
| return (weights["length"] * length_score + | |
| weights["ups"] * ups_score + | |
| weights["comments"] * comments_score + | |
| weights["ratio"] * ratio_score) | |
| # Get scores for each post and sort indexes | |
| scored_indexes = sorted( | |
| range(len(posts_info)), | |
| key=lambda idx: score(posts_info[idx]), | |
| reverse=True | |
| ) | |
| return scored_indexes | |
| def process_and_store_post(user_input=None, max_trials=5): | |
| """ | |
| Simplified + optimized: | |
| - If user_input given, process it directly. | |
| - Else fetch Reddit posts, try top candidates until one succeeds. | |
| """ | |
| if user_input: | |
| raw_story = user_input | |
| meta = {"title": "User Provided Story", "author": "anonymous"} | |
| result = process_story(raw_story, enhanced=False) | |
| else: | |
| posts = fetch_reddit_posts() | |
| if not posts: | |
| logger.warning("No Reddit posts available after retries; aborting.") | |
| return None | |
| order = find_best_post(posts) | |
| result = None | |
| meta = None | |
| for idx in order[:max_trials]: | |
| post = posts[idx] | |
| content = post.get("post_content") | |
| if not content: | |
| continue | |
| try: | |
| result = process_story(content, enhanced=False) | |
| print(result) | |
| raw_story = content | |
| meta = post | |
| break | |
| except Exception as e: | |
| print(f"Exception occurred : {str(e)}") | |
| continue | |
| if result is None or not meta: | |
| logger.error("Could not process any candidate post.") | |
| return None | |
| if not result or not meta: | |
| return None | |
| print(f"Story Preview:\n{result['polished_story'][:500]}...") | |
| keywords = result.get("keywords") or [] | |
| if keywords: | |
| print("Keywords:", ", ".join(keywords)) | |
| from datetime import datetime | |
| write_data = { | |
| "title": meta.get("title"), | |
| "content": result.get("polished_story", ""), | |
| "author": meta.get("author", "unknown"), | |
| "tags": result.get("keywords", []), # Fixed: use .get() with default empty list | |
| "created_at": meta.get("date_posted", datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # Fixed: use date_posted instead of timestamp | |
| } | |
| #print(f"Write Data : {write_data}") | |
| # print("==========================") | |
| # print(f"Here are the meta details : {meta}") | |
| # print("==========================") | |
| # print(f"Here is the write data : {write_data}") | |
| # print("==========================") | |
| write_response = insert_blog_post(write_data) | |
| reddit_done = f"Data written to Supabase with response: {write_response}" | |
| # blog_db = FlexibleBlogDatabase() | |
| # blog_id = blog_db.create_blog_post( | |
| # title=meta.get("title") or "Untitled", | |
| # content=result['polished_story'], | |
| # author=meta.get("author") or "unknown", | |
| # tags=keywords | |
| # ) | |
| return reddit_done | |
| if __name__ == "__main__": | |
| process_and_store_post() |