amplify / backend /brightdata_api.py
github-actions
Sync from GitHub Fri Dec 26 12:29:52 UTC 2025
aff341e
import os
import time
import requests
from urllib.parse import quote_plus
from typing import List, Dict, Any, Optional
from dotenv import load_dotenv
load_dotenv()
# TODO : Add async function here
# from google.colab import userdata
bd_apikey = os.getenv('BRIGHTDATA_API_KEY')
def _make_api_request(url, **kwargs):
headers = {
"Authorization": f"Bearer {bd_apikey}",
"Content-Type": "application/json",
}
try:
response = requests.post(url, headers=headers, **kwargs)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"API request failed: {e}")
return None
except Exception as e:
print(f"Unknown error: {e}")
return None
def poll_snapshot_status(
snapshot_id: str, max_attempts: int = 200, delay: int = 10
) -> bool:
progress_url = f"https://api.brightdata.com/datasets/v3/progress/{snapshot_id}"
headers = {"Authorization": f"Bearer {bd_apikey}"}
for attempt in range(max_attempts):
try:
print(
f"⏳ Checking snapshot progress... (attempt {attempt + 1}/{max_attempts})"
)
response = requests.get(progress_url, headers=headers)
response.raise_for_status()
progress_data = response.json()
status = progress_data.get("status")
if status == "ready":
print("βœ… Snapshot completed!")
return True
elif status == "failed":
print("❌ Snapshot failed")
return False
elif status == "running":
print("πŸ”„ Still processing...")
time.sleep(delay)
else:
print(f"❓ Unknown status: {status}")
time.sleep(delay)
except Exception as e:
print(f"⚠️ Error checking progress: {e}")
time.sleep(delay)
print("⏰ Timeout waiting for snapshot completion")
return False
def download_snapshot(
snapshot_id: str, format: str = "json"
) -> Optional[List[Dict[Any, Any]]]:
download_url = (
f"https://api.brightdata.com/datasets/v3/snapshot/{snapshot_id}?format={format}"
)
headers = {"Authorization": f"Bearer {bd_apikey}"}
print(f"Snapshot id : {snapshot_id}")
try:
print("πŸ“₯ Downloading snapshot data...")
response = requests.get(download_url, headers=headers)
response.raise_for_status()
data = response.json()
print(
f"πŸŽ‰ Successfully downloaded {len(data) if isinstance(data, list) else 1} items"
)
return data
except Exception as e:
print(f"❌ Error downloading snapshot: {e}")
return None
def _trigger_and_download_snapshot(trigger_url, params, data, operation_name="operation"):
trigger_result = _make_api_request(trigger_url, params=params, json=data)
print("===================")
print(trigger_result)
if not trigger_result:
return None
snapshot_id = trigger_result.get("snapshot_id")
if not snapshot_id:
return None
if not poll_snapshot_status(snapshot_id):
return None
raw_data = download_snapshot(snapshot_id)
return raw_data
def reddit_search_api(subreddit_url, date="Today", sort_by="Hot", num_of_posts=12):
trigger_url = "https://api.brightdata.com/datasets/v3/trigger"
params = {
"dataset_id": "gd_lvz8ah06191smkebj4",
"include_errors": "true",
"type": "discover_new",
"discover_by": "subreddit_url"
}
data = [
{
"url": subreddit_url,
"sort_by": sort_by,
"num_of_posts": num_of_posts,
"sort_by_time": date
}
]
raw_data = _trigger_and_download_snapshot(
trigger_url, params, data, operation_name="reddit"
)
if not raw_data:
return None
parsed_data = []
for post in raw_data:
print(post)
parsed_post = {
"title": post.get("title"),
"url": post.get("url"),
"user_posted": post.get("user_posted"),
"description": post.get("description"),
"upvotes": post.get("num_upvotes"),
"num_comments": post.get("num_comments"),
"date_posted": post.get("date_posted"),
}
parsed_data.append(parsed_post)
return {"parsed_posts": parsed_data, "total_found": len(parsed_data)}
def reddit_post_retrieval(urls, days_back=1, load_all_replies=False, comment_limit=""):
if not urls:
return None
trigger_url = "https://api.brightdata.com/datasets/v3/trigger"
params = {
"dataset_id": "gd_lvz8ah06191smkebj4",
"include_errors": "true"
}
data = [
{
"url": url,
"days_back": days_back,
"load_all_replies": load_all_replies,
"comment_limit": comment_limit
}
for url in urls
]
raw_data = _trigger_and_download_snapshot(
trigger_url, params, data, operation_name="reddit comments"
)
if not raw_data:
return None
parsed_comments = []
for comment in raw_data:
parsed_comment = {
"comment_id": comment.get("comment_id"),
"content": comment.get("comment"),
"date": comment.get("date_posted"),
}
parsed_comments.append(parsed_comment)
return {"comments": parsed_comments, "total_retrieved": len(parsed_comments)}
def scrape_and_download_reddit(url="https://www.reddit.com/r/ArtificialInteligence/"):
reddit_response = reddit_search_api(url)
if not reddit_response or reddit_response.get("total_found", 0) == 0:
print("No posts found or error occurred during Reddit search.")
return None
return reddit_response