Spaces:
Running
Running
| import os | |
| import time | |
| import requests | |
| from urllib.parse import quote_plus | |
| from typing import List, Dict, Any, Optional | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # TODO : Add async function here | |
| # from google.colab import userdata | |
| bd_apikey = os.getenv('BRIGHTDATA_API_KEY') | |
| def _make_api_request(url, **kwargs): | |
| headers = { | |
| "Authorization": f"Bearer {bd_apikey}", | |
| "Content-Type": "application/json", | |
| } | |
| try: | |
| response = requests.post(url, headers=headers, **kwargs) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| print(f"API request failed: {e}") | |
| return None | |
| except Exception as e: | |
| print(f"Unknown error: {e}") | |
| return None | |
| def poll_snapshot_status( | |
| snapshot_id: str, max_attempts: int = 200, delay: int = 10 | |
| ) -> bool: | |
| progress_url = f"https://api.brightdata.com/datasets/v3/progress/{snapshot_id}" | |
| headers = {"Authorization": f"Bearer {bd_apikey}"} | |
| for attempt in range(max_attempts): | |
| try: | |
| print( | |
| f"β³ Checking snapshot progress... (attempt {attempt + 1}/{max_attempts})" | |
| ) | |
| response = requests.get(progress_url, headers=headers) | |
| response.raise_for_status() | |
| progress_data = response.json() | |
| status = progress_data.get("status") | |
| if status == "ready": | |
| print("β Snapshot completed!") | |
| return True | |
| elif status == "failed": | |
| print("β Snapshot failed") | |
| return False | |
| elif status == "running": | |
| print("π Still processing...") | |
| time.sleep(delay) | |
| else: | |
| print(f"β Unknown status: {status}") | |
| time.sleep(delay) | |
| except Exception as e: | |
| print(f"β οΈ Error checking progress: {e}") | |
| time.sleep(delay) | |
| print("β° Timeout waiting for snapshot completion") | |
| return False | |
| def download_snapshot( | |
| snapshot_id: str, format: str = "json" | |
| ) -> Optional[List[Dict[Any, Any]]]: | |
| download_url = ( | |
| f"https://api.brightdata.com/datasets/v3/snapshot/{snapshot_id}?format={format}" | |
| ) | |
| headers = {"Authorization": f"Bearer {bd_apikey}"} | |
| print(f"Snapshot id : {snapshot_id}") | |
| try: | |
| print("π₯ Downloading snapshot data...") | |
| response = requests.get(download_url, headers=headers) | |
| response.raise_for_status() | |
| data = response.json() | |
| print( | |
| f"π Successfully downloaded {len(data) if isinstance(data, list) else 1} items" | |
| ) | |
| return data | |
| except Exception as e: | |
| print(f"β Error downloading snapshot: {e}") | |
| return None | |
| def _trigger_and_download_snapshot(trigger_url, params, data, operation_name="operation"): | |
| trigger_result = _make_api_request(trigger_url, params=params, json=data) | |
| print("===================") | |
| print(trigger_result) | |
| if not trigger_result: | |
| return None | |
| snapshot_id = trigger_result.get("snapshot_id") | |
| if not snapshot_id: | |
| return None | |
| if not poll_snapshot_status(snapshot_id): | |
| return None | |
| raw_data = download_snapshot(snapshot_id) | |
| return raw_data | |
| def reddit_search_api(subreddit_url, date="Today", sort_by="Hot", num_of_posts=12): | |
| trigger_url = "https://api.brightdata.com/datasets/v3/trigger" | |
| params = { | |
| "dataset_id": "gd_lvz8ah06191smkebj4", | |
| "include_errors": "true", | |
| "type": "discover_new", | |
| "discover_by": "subreddit_url" | |
| } | |
| data = [ | |
| { | |
| "url": subreddit_url, | |
| "sort_by": sort_by, | |
| "num_of_posts": num_of_posts, | |
| "sort_by_time": date | |
| } | |
| ] | |
| raw_data = _trigger_and_download_snapshot( | |
| trigger_url, params, data, operation_name="reddit" | |
| ) | |
| if not raw_data: | |
| return None | |
| parsed_data = [] | |
| for post in raw_data: | |
| print(post) | |
| parsed_post = { | |
| "title": post.get("title"), | |
| "url": post.get("url"), | |
| "user_posted": post.get("user_posted"), | |
| "description": post.get("description"), | |
| "upvotes": post.get("num_upvotes"), | |
| "num_comments": post.get("num_comments"), | |
| "date_posted": post.get("date_posted"), | |
| } | |
| parsed_data.append(parsed_post) | |
| return {"parsed_posts": parsed_data, "total_found": len(parsed_data)} | |
| def reddit_post_retrieval(urls, days_back=1, load_all_replies=False, comment_limit=""): | |
| if not urls: | |
| return None | |
| trigger_url = "https://api.brightdata.com/datasets/v3/trigger" | |
| params = { | |
| "dataset_id": "gd_lvz8ah06191smkebj4", | |
| "include_errors": "true" | |
| } | |
| data = [ | |
| { | |
| "url": url, | |
| "days_back": days_back, | |
| "load_all_replies": load_all_replies, | |
| "comment_limit": comment_limit | |
| } | |
| for url in urls | |
| ] | |
| raw_data = _trigger_and_download_snapshot( | |
| trigger_url, params, data, operation_name="reddit comments" | |
| ) | |
| if not raw_data: | |
| return None | |
| parsed_comments = [] | |
| for comment in raw_data: | |
| parsed_comment = { | |
| "comment_id": comment.get("comment_id"), | |
| "content": comment.get("comment"), | |
| "date": comment.get("date_posted"), | |
| } | |
| parsed_comments.append(parsed_comment) | |
| return {"comments": parsed_comments, "total_retrieved": len(parsed_comments)} | |
| def scrape_and_download_reddit(url="https://www.reddit.com/r/ArtificialInteligence/"): | |
| reddit_response = reddit_search_api(url) | |
| if not reddit_response or reddit_response.get("total_found", 0) == 0: | |
| print("No posts found or error occurred during Reddit search.") | |
| return None | |
| return reddit_response |