Spaces:
Running
Running
File size: 7,983 Bytes
5d93cab | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 | from supabase_auth import datetime
from flexible_blog_database import FlexibleBlogDatabase
from supabase_api import insert_blog_post
import os, time, logging, requests, json
from typing import List, Dict, Optional
from llm_agent import process_story
from dotenv import load_dotenv
load_dotenv()
logger = logging.getLogger("rd_pipeline")
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"))
def fetch_reddit_posts(url: Optional[str] = None, attempts: int = 3, backoff_base: float = 2.0) -> List[Dict]:
"""Fetch recent posts from Reddit with retry + fallback.
Returns list (may be empty). No exceptions bubble for HTTP/JSON failures.
"""
if not url:
url = "https://www.reddit.com/r/TrueOffMyChest/hot.json?limit=25&raw_json=1"
headers = {
"User-Agent": os.getenv(
"REDDIT_USER_AGENT",
"script:amplify.rd_pipeline:v1.0 (by u/exampleuser contact: noreply@example.com)"
),
"Accept": "application/json",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "close"
}
last_err: Optional[Exception] = None
for attempt in range(1, attempts + 1):
try:
resp = requests.get(url, headers=headers, timeout=15)
status = resp.status_code
if status == 200:
try:
reddit_data = resp.json()
except ValueError as e:
last_err = e
logger.warning("JSON decode failed (attempt %s): %s", attempt, e)
else:
posts_info = [
{
"title": c["data"].get("title"),
"post_content": c["data"].get("selftext"),
"author": c["data"].get("author"),
"upvote_ratio": c["data"].get("upvote_ratio"),
"ups": c["data"].get("ups"),
"num_comments": c["data"].get("num_comments"),
}
for c in reddit_data.get("data", {}).get("children", [])
]
logger.info("Fetched %d Reddit posts", len(posts_info))
return posts_info
elif status in (403, 429):
logger.warning("Reddit returned %s (attempt %s/%s)", status, attempt, attempts)
else:
logger.warning("Unexpected status %s (attempt %s/%s)", status, attempt, attempts)
time.sleep(backoff_base ** (attempt - 1))
except (requests.RequestException, Exception) as e:
last_err = e
logger.warning("Error fetching Reddit posts (attempt %s/%s): %s", attempt, attempts, e)
time.sleep(backoff_base ** (attempt - 1))
fallback_path = os.getenv("REDDIT_FALLBACK_JSON")
if fallback_path and os.path.isfile(fallback_path):
try:
with open(fallback_path, "r", encoding="utf-8") as f:
cached = json.load(f)
posts_info = [
{
"title": c["data"].get("title"),
"post_content": c["data"].get("selftext"),
"author": c["data"].get("author"),
"upvote_ratio": c["data"].get("upvote_ratio"),
"ups": c["data"].get("ups"),
"num_comments": c["data"].get("num_comments"),
}
for c in cached.get("data", {}).get("children", [])
]
logger.info("Loaded %d posts from fallback JSON", len(posts_info))
return posts_info
except Exception as e:
logger.error("Failed reading fallback JSON: %s", e)
logger.error("All Reddit fetch attempts failed. Last error: %s", last_err)
return []
def find_best_post(posts_dict):
"""Return post indexes in descending order based on scoring"""
posts_info = posts_dict
if not posts_info:
raise ValueError("No posts found from Reddit API.")
# weight configuration (tweak as desired)
weights = {
"length": 0.3, # weight for length of post_content
"ups": 0.3, # weight for ups
"comments": 0.2, # weight for num_comments
"ratio": 0.2 # weight for upvote_ratio
}
# calculate maxima for normalization
len_max = max(len(p["post_content"]) if p["post_content"] else 0 for p in posts_info) or 1
ups_max = max(p["ups"] or 0 for p in posts_info) or 1
comments_max = max(p["num_comments"] or 0 for p in posts_info) or 1
def score(post):
length_score = (len(post["post_content"]) if post["post_content"] else 0) / len_max
ups_score = (post["ups"] or 0) / ups_max
comments_score = (post["num_comments"] or 0) / comments_max
ratio_score = post["upvote_ratio"] or 0
return (weights["length"] * length_score +
weights["ups"] * ups_score +
weights["comments"] * comments_score +
weights["ratio"] * ratio_score)
# Get scores for each post and sort indexes
scored_indexes = sorted(
range(len(posts_info)),
key=lambda idx: score(posts_info[idx]),
reverse=True
)
return scored_indexes
def process_and_store_post(user_input=None, max_trials=5):
"""
Simplified + optimized:
- If user_input given, process it directly.
- Else fetch Reddit posts, try top candidates until one succeeds.
"""
if user_input:
raw_story = user_input
meta = {"title": "User Provided Story", "author": "anonymous"}
result = process_story(raw_story, enhanced=False)
else:
posts = fetch_reddit_posts()
if not posts:
logger.warning("No Reddit posts available after retries; aborting.")
return None
order = find_best_post(posts)
result = None
meta = None
for idx in order[:max_trials]:
post = posts[idx]
content = post.get("post_content")
if not content:
continue
try:
result = process_story(content, enhanced=False)
print(result)
raw_story = content
meta = post
break
except Exception as e:
print(f"Exception occurred : {str(e)}")
continue
if result is None or not meta:
logger.error("Could not process any candidate post.")
return None
if not result or not meta:
return None
print(f"Story Preview:\n{result['polished_story'][:500]}...")
keywords = result.get("keywords") or []
if keywords:
print("Keywords:", ", ".join(keywords))
from datetime import datetime
write_data = {
"title": meta.get("title"),
"content": result.get("polished_story", ""),
"author": meta.get("author", "unknown"),
"tags": result.get("keywords", []), # Fixed: use .get() with default empty list
"created_at": meta.get("date_posted", datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # Fixed: use date_posted instead of timestamp
}
#print(f"Write Data : {write_data}")
# print("==========================")
# print(f"Here are the meta details : {meta}")
# print("==========================")
# print(f"Here is the write data : {write_data}")
# print("==========================")
write_response = insert_blog_post(write_data)
reddit_done = f"Data written to Supabase with response: {write_response}"
# blog_db = FlexibleBlogDatabase()
# blog_id = blog_db.create_blog_post(
# title=meta.get("title") or "Untitled",
# content=result['polished_story'],
# author=meta.get("author") or "unknown",
# tags=keywords
# )
return reddit_done
if __name__ == "__main__":
process_and_store_post() |