import os import time import requests from urllib.parse import quote_plus from typing import List, Dict, Any, Optional from dotenv import load_dotenv load_dotenv() # TODO : Add async function here # from google.colab import userdata bd_apikey = os.getenv('BRIGHTDATA_API_KEY') def _make_api_request(url, **kwargs): headers = { "Authorization": f"Bearer {bd_apikey}", "Content-Type": "application/json", } try: response = requests.post(url, headers=headers, **kwargs) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"API request failed: {e}") return None except Exception as e: print(f"Unknown error: {e}") return None def poll_snapshot_status( snapshot_id: str, max_attempts: int = 200, delay: int = 10 ) -> bool: progress_url = f"https://api.brightdata.com/datasets/v3/progress/{snapshot_id}" headers = {"Authorization": f"Bearer {bd_apikey}"} for attempt in range(max_attempts): try: print( f"⏳ Checking snapshot progress... (attempt {attempt + 1}/{max_attempts})" ) response = requests.get(progress_url, headers=headers) response.raise_for_status() progress_data = response.json() status = progress_data.get("status") if status == "ready": print("✅ Snapshot completed!") return True elif status == "failed": print("❌ Snapshot failed") return False elif status == "running": print("🔄 Still processing...") time.sleep(delay) else: print(f"❓ Unknown status: {status}") time.sleep(delay) except Exception as e: print(f"⚠️ Error checking progress: {e}") time.sleep(delay) print("⏰ Timeout waiting for snapshot completion") return False def download_snapshot( snapshot_id: str, format: str = "json" ) -> Optional[List[Dict[Any, Any]]]: download_url = ( f"https://api.brightdata.com/datasets/v3/snapshot/{snapshot_id}?format={format}" ) headers = {"Authorization": f"Bearer {bd_apikey}"} print(f"Snapshot id : {snapshot_id}") try: print("📥 Downloading snapshot data...") response = requests.get(download_url, headers=headers) response.raise_for_status() data = response.json() print( f"🎉 Successfully downloaded {len(data) if isinstance(data, list) else 1} items" ) return data except Exception as e: print(f"❌ Error downloading snapshot: {e}") return None def _trigger_and_download_snapshot(trigger_url, params, data, operation_name="operation"): trigger_result = _make_api_request(trigger_url, params=params, json=data) print("===================") print(trigger_result) if not trigger_result: return None snapshot_id = trigger_result.get("snapshot_id") if not snapshot_id: return None if not poll_snapshot_status(snapshot_id): return None raw_data = download_snapshot(snapshot_id) return raw_data def reddit_search_api(subreddit_url, date="Today", sort_by="Hot", num_of_posts=12): trigger_url = "https://api.brightdata.com/datasets/v3/trigger" params = { "dataset_id": "gd_lvz8ah06191smkebj4", "include_errors": "true", "type": "discover_new", "discover_by": "subreddit_url" } data = [ { "url": subreddit_url, "sort_by": sort_by, "num_of_posts": num_of_posts, "sort_by_time": date } ] raw_data = _trigger_and_download_snapshot( trigger_url, params, data, operation_name="reddit" ) if not raw_data: return None parsed_data = [] for post in raw_data: print(post) parsed_post = { "title": post.get("title"), "url": post.get("url"), "user_posted": post.get("user_posted"), "description": post.get("description"), "upvotes": post.get("num_upvotes"), "num_comments": post.get("num_comments"), "date_posted": post.get("date_posted"), } parsed_data.append(parsed_post) return {"parsed_posts": parsed_data, "total_found": len(parsed_data)} def reddit_post_retrieval(urls, days_back=1, load_all_replies=False, comment_limit=""): if not urls: return None trigger_url = "https://api.brightdata.com/datasets/v3/trigger" params = { "dataset_id": "gd_lvz8ah06191smkebj4", "include_errors": "true" } data = [ { "url": url, "days_back": days_back, "load_all_replies": load_all_replies, "comment_limit": comment_limit } for url in urls ] raw_data = _trigger_and_download_snapshot( trigger_url, params, data, operation_name="reddit comments" ) if not raw_data: return None parsed_comments = [] for comment in raw_data: parsed_comment = { "comment_id": comment.get("comment_id"), "content": comment.get("comment"), "date": comment.get("date_posted"), } parsed_comments.append(parsed_comment) return {"comments": parsed_comments, "total_retrieved": len(parsed_comments)} def scrape_and_download_reddit(url="https://www.reddit.com/r/ArtificialInteligence/"): reddit_response = reddit_search_api(url) if not reddit_response or reddit_response.get("total_found", 0) == 0: print("No posts found or error occurred during Reddit search.") return None return reddit_response