amplify / backend /rd_pipeline_local.py
github-actions
Sync from GitHub Fri Dec 26 12:29:52 UTC 2025
aff341e
from supabase_auth import datetime
from flexible_blog_database import FlexibleBlogDatabase
from supabase_api import insert_blog_post
import os, time, logging, requests, json
from typing import List, Dict, Optional
from llm_agent import process_story
from dotenv import load_dotenv
load_dotenv()
logger = logging.getLogger("rd_pipeline")
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"))
def fetch_reddit_posts(url: Optional[str] = None, attempts: int = 3, backoff_base: float = 2.0) -> List[Dict]:
"""Fetch recent posts from Reddit with retry + fallback.
Returns list (may be empty). No exceptions bubble for HTTP/JSON failures.
"""
if not url:
url = "https://www.reddit.com/r/TrueOffMyChest/hot.json?limit=25&raw_json=1"
headers = {
"User-Agent": os.getenv(
"REDDIT_USER_AGENT",
"script:amplify.rd_pipeline:v1.0 (by u/exampleuser contact: noreply@example.com)"
),
"Accept": "application/json",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "close"
}
last_err: Optional[Exception] = None
for attempt in range(1, attempts + 1):
try:
resp = requests.get(url, headers=headers, timeout=15)
status = resp.status_code
if status == 200:
try:
reddit_data = resp.json()
except ValueError as e:
last_err = e
logger.warning("JSON decode failed (attempt %s): %s", attempt, e)
else:
posts_info = [
{
"title": c["data"].get("title"),
"post_content": c["data"].get("selftext"),
"author": c["data"].get("author"),
"upvote_ratio": c["data"].get("upvote_ratio"),
"ups": c["data"].get("ups"),
"num_comments": c["data"].get("num_comments"),
}
for c in reddit_data.get("data", {}).get("children", [])
]
logger.info("Fetched %d Reddit posts", len(posts_info))
return posts_info
elif status in (403, 429):
logger.warning("Reddit returned %s (attempt %s/%s)", status, attempt, attempts)
else:
logger.warning("Unexpected status %s (attempt %s/%s)", status, attempt, attempts)
time.sleep(backoff_base ** (attempt - 1))
except (requests.RequestException, Exception) as e:
last_err = e
logger.warning("Error fetching Reddit posts (attempt %s/%s): %s", attempt, attempts, e)
time.sleep(backoff_base ** (attempt - 1))
fallback_path = os.getenv("REDDIT_FALLBACK_JSON")
if fallback_path and os.path.isfile(fallback_path):
try:
with open(fallback_path, "r", encoding="utf-8") as f:
cached = json.load(f)
posts_info = [
{
"title": c["data"].get("title"),
"post_content": c["data"].get("selftext"),
"author": c["data"].get("author"),
"upvote_ratio": c["data"].get("upvote_ratio"),
"ups": c["data"].get("ups"),
"num_comments": c["data"].get("num_comments"),
}
for c in cached.get("data", {}).get("children", [])
]
logger.info("Loaded %d posts from fallback JSON", len(posts_info))
return posts_info
except Exception as e:
logger.error("Failed reading fallback JSON: %s", e)
logger.error("All Reddit fetch attempts failed. Last error: %s", last_err)
return []
def find_best_post(posts_dict):
"""Return post indexes in descending order based on scoring"""
posts_info = posts_dict
if not posts_info:
raise ValueError("No posts found from Reddit API.")
# weight configuration (tweak as desired)
weights = {
"length": 0.3, # weight for length of post_content
"ups": 0.3, # weight for ups
"comments": 0.2, # weight for num_comments
"ratio": 0.2 # weight for upvote_ratio
}
# calculate maxima for normalization
len_max = max(len(p["post_content"]) if p["post_content"] else 0 for p in posts_info) or 1
ups_max = max(p["ups"] or 0 for p in posts_info) or 1
comments_max = max(p["num_comments"] or 0 for p in posts_info) or 1
def score(post):
length_score = (len(post["post_content"]) if post["post_content"] else 0) / len_max
ups_score = (post["ups"] or 0) / ups_max
comments_score = (post["num_comments"] or 0) / comments_max
ratio_score = post["upvote_ratio"] or 0
return (weights["length"] * length_score +
weights["ups"] * ups_score +
weights["comments"] * comments_score +
weights["ratio"] * ratio_score)
# Get scores for each post and sort indexes
scored_indexes = sorted(
range(len(posts_info)),
key=lambda idx: score(posts_info[idx]),
reverse=True
)
return scored_indexes
def process_and_store_post(user_input=None, max_trials=5):
"""
Simplified + optimized:
- If user_input given, process it directly.
- Else fetch Reddit posts, try top candidates until one succeeds.
"""
if user_input:
raw_story = user_input
meta = {"title": "User Provided Story", "author": "anonymous"}
result = process_story(raw_story, enhanced=False)
else:
posts = fetch_reddit_posts()
if not posts:
logger.warning("No Reddit posts available after retries; aborting.")
return None
order = find_best_post(posts)
result = None
meta = None
for idx in order[:max_trials]:
post = posts[idx]
content = post.get("post_content")
if not content:
continue
try:
result = process_story(content, enhanced=False)
print(result)
raw_story = content
meta = post
break
except Exception as e:
print(f"Exception occurred : {str(e)}")
continue
if result is None or not meta:
logger.error("Could not process any candidate post.")
return None
if not result or not meta:
return None
print(f"Story Preview:\n{result['polished_story'][:500]}...")
keywords = result.get("keywords") or []
if keywords:
print("Keywords:", ", ".join(keywords))
from datetime import datetime
write_data = {
"title": meta.get("title"),
"content": result.get("polished_story", ""),
"author": meta.get("author", "unknown"),
"tags": result.get("keywords", []), # Fixed: use .get() with default empty list
"created_at": meta.get("date_posted", datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # Fixed: use date_posted instead of timestamp
}
#print(f"Write Data : {write_data}")
# print("==========================")
# print(f"Here are the meta details : {meta}")
# print("==========================")
# print(f"Here is the write data : {write_data}")
# print("==========================")
write_response = insert_blog_post(write_data)
reddit_done = f"Data written to Supabase with response: {write_response}"
# blog_db = FlexibleBlogDatabase()
# blog_id = blog_db.create_blog_post(
# title=meta.get("title") or "Untitled",
# content=result['polished_story'],
# author=meta.get("author") or "unknown",
# tags=keywords
# )
return reddit_done
if __name__ == "__main__":
process_and_store_post()