Spaces:
Running
Running
File size: 5,061 Bytes
5d93cab |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
from flexible_blog_database import FlexibleBlogDatabase
import os, time, logging, requests, json
from typing import List, Dict, Optional
from llm_agent import process_story
from brightdata_api import reddit_search_api, scrape_and_download_reddit
from supabase_api import insert_blog_post
from collections import OrderedDict
import datetime
from dotenv import load_dotenv
load_dotenv()
logger = logging.getLogger("rd_pipeline_bdata")
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"))
url_category_mapping = OrderedDict({
"Artificial Intelligence": "https://www.reddit.com/r/ArtificialInteligence/",
"Social": "https://www.reddit.com/r/TrueOffMyChest/",
"Other": "https://www.reddit.com/r/relationship_advice/",
"Movies": "https://www.reddit.com/r/movies/",
"Other": "https://www.reddit.com/r/stories/",
"Developers": "https://www.reddit.com/r/developersIndia/",
"AI Agents": "https://www.reddit.com/r/aiagents/"
})
def scrape_and_download_reddit(url="https://www.reddit.com/r/ArtificialInteligence/"):
reddit_response = reddit_search_api(url)
if not reddit_response or reddit_response.get("total_found", 0) == 0:
print("No posts found or error occurred during Reddit search.")
return None
return reddit_response
def find_best_post(posts_dict):
"""Return post indexes in descending order based on scoring"""
posts_info = posts_dict
if not posts_info:
raise ValueError("No posts found from Reddit API.")
# weight configuration (tweak as desired)
weights = {
"length": 0.3, # weight for length of post_content
"ups": 0.3, # weight for ups
"comments": 0.2, # weight for num_comments
"ratio": 0.2 # weight for upvote_ratio
}
# calculate maxima for normalization
len_max = max(len(p["description"]) if p["description"] else 0 for p in posts_info) or 1
ups_max = max(p["upvotes"] or 0 for p in posts_info) or 1
comments_max = max(p["num_comments"] or 0 for p in posts_info) or 1
def score(post):
length_score = (len(post["description"]) if post["description"] else 0) / len_max
ups_score = (post["upvotes"] or 0) / ups_max
comments_score = (post["num_comments"] or 0) / comments_max
return (weights["length"] * length_score +
weights["ups"] * ups_score +
weights["comments"] * comments_score)
# Get scores for each post and sort indexes
scored_indexes = sorted(
range(len(posts_info)),
key=lambda idx: score(posts_info[idx]),
reverse=True
)
return scored_indexes
def process_and_store_post(user_input=None, max_trials=5):
"""
Simplified + optimized:
- If user_input given, process it directly.
- Else fetch Reddit posts, try top candidates until one succeeds.
"""
if user_input:
raw_story = user_input
meta = {"title": "User Provided Story", "author": "anonymous"}
result = process_story(raw_story, enhanced=False)
else:
today = datetime.date.today()
weekday_python = today.weekday()
category_list = list(url_category_mapping.keys())
category_index = weekday_python % len(category_list)
response_bd = scrape_and_download_reddit(url=url_category_mapping[category_list[category_index]])
posts = response_bd['parsed_posts'] if response_bd else []
if not posts:
logger.warning("No Reddit posts available after retries; aborting.")
return None
order = find_best_post(posts)
result = None
meta = None
for idx in order[:max_trials]:
post = posts[idx]
content = post.get("description")
if not content:
continue
try:
result = process_story(content, enhanced=False)
raw_story = content
meta = post
break
except Exception:
continue
if result is None or not meta:
logger.error("Could not process any candidate post.")
return None
if not result or not meta:
return None
print(f"Story Preview:\n{result['polished_story'][:500]}...")
keywords = result.get("keywords") or []
if keywords:
print("Keywords:", ", ".join(keywords))
write_data = {
"title": meta.get("title"),
"content": result.get("polished_story", ""),
"author": meta.get("user_posted"),
"tags": result.get("keywords", []), # Fixed: use .get() with default empty list
"created_at": meta.get("date_posted"), # Fixed: use date_posted instead of timestamp
"category": category_list[category_index] # Added category field
}
write_response = insert_blog_post(write_data)
reddit_done = f"Data written to Supabase with response: {write_response}"
return reddit_done
if __name__ == "__main__":
process_and_store_post() |