Spaces:
Sleeping
Sleeping
File size: 5,970 Bytes
5113422 5046196 5113422 5046196 5113422 4a0119a 5113422 5046196 5113422 5046196 5113422 5046196 5113422 5046196 5113422 5046196 5113422 5046196 5113422 4a0119a 5113422 4a0119a 5113422 4a0119a 5046196 4a0119a 5113422 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
import requests
import json
import gradio as gr
from dotenv import load_dotenv
import os
load_dotenv()
def get_access_token():
client_secret = os.getenv("CLIENT_SECRET")
client_id = os.getenv("CLIENT_ID")
password = os.getenv("PASSWORD")
username = "Conscious_Abroad_946"
user_agent = "my_reddit_app/0.1 by Conscious_Abroad_946"
auth = requests.auth.HTTPBasicAuth(client_id, client_secret)
data = {
'grant_type': 'password',
'username': username,
'password': password
}
headers = {'User-Agent': user_agent}
response = requests.post(
'https://www.reddit.com/api/v1/access_token',
auth=auth,
data=data,
headers=headers
)
access_token = ""
if response.status_code == 200:
token = response.json()['access_token']
access_token += token
print(f'Access token: {token}')
else:
print(f'Error: {response.status_code}, {response.text}')
print(f"access_token after oauth call is: {access_token}")
return access_token
#column a - links
#column b - posts
#column c - ai generated ready-made response
#even list of links is meaningful improvement
def to_list(x):
if x == None:
return None
if isinstance(x, str):
return [s.strip() for s in x.split(",") if s.strip()] or None
def reddit_hack(keyword, count=10, *, subreddits=None):
#sub_reddit_list = ["legaladviceuk", "housinguk", "ukpersonalfinance"]#
sub_reddit_list = to_list(subreddits)
results_overall = []
access_token = get_access_token()
if not sub_reddit_list:#if no subreddits, then search everything
url = "https://oauth.reddit.com/search"
params = {
"q": keyword,
"limit": count, # clamp to [1,100]
"sort": "relevance",
"t": "year",
"type": "link", # posts only (omit if you want comments/users too)
# "after": "t3_...", # pagination if you add it later
}
headers = {
"Authorization": f"bearer {access_token}",
"User-Agent": "inauguralai-reddit-search-bot/1.0",
}
response = requests.get(url, headers=headers, params=params, timeout=20)
if response.status_code == 200:
results = response.json()
if len(results) == 0:
print("there are no results!")
for idx, post in enumerate(results.get("data", {}).get("children", []), 1):
data = post["data"]
print(f"{idx}. {data['title']} (Score: {data['score']})")
print(f" Link: https://reddit.com{data['permalink']}\n")
results_overall.append(data)
else:
print(f"Error {response.status_code}: {response.text}")
else:
for subreddit in sub_reddit_list:
url = f"https://oauth.reddit.com/r/{subreddit}/search"
# Query parameters
params = {
"q": keyword, # Search query
"limit": count, # Max number of results (up to 100)
"sort": "relevance", # Sort by relevance
"t": "year", # Time filter
"restrict_sr": "true", # Restrict to the subreddit
"include_facets": "false",# Don't include facets
"sr_detail": "false", # Don't expand subreddit details
# Optional params:
# "after": "t3_xyz", # for pagination
# "before": "t3_abc",
# "type": "link,user" # restrict types of results
}
# Headers with OAuth2 token
headers = {
"Authorization": f"bearer {access_token}",
"User-Agent": "inauguralai-reddit-search-bot/1.0"
}
# Make the request
response = requests.get(url, headers=headers, params=params)
# Handle response
if response.status_code == 200:
results = response.json()
if len(results) == 0:
print("there are no results!")
for idx, post in enumerate(results.get("data", {}).get("children", []), 1):
data = post["data"]
print(f"{idx}. {data['title']} (Score: {data['score']})")
print(f" Link: https://reddit.com{data['permalink']}\n")
results_overall.append(data)
else:
print(f"Error {response.status_code}: {response.text}")
#google client id, 625949348680-343bebli3ol5c8aflufclntj67j2s3gg.apps.googleusercontent.com
#google client secret, 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
results_json = json.dumps(results_overall, ensure_ascii=False, indent=2)
print("JSON string:\n", results_json)
results_text = "\n".join(
f"{i+1}. [{r['subreddit']}] {r['title']} (Score: {r['score']})\n https://reddit.com{r['permalink']}"
for i, r in enumerate(results_overall)
)
print("\nPretty text:\n", results_text)
return results_text
def main(keyword, count, subreddits):
print(f"subreddits are {subreddits}")
result = reddit_hack(keyword, count, subreddits=subreddits)
return result
#reddit_hack("ccj", subreddits="legaladviceuk, housinguk, ukpersonalfinance")
#main("ccj","legaladviceuk, housinguk, ukpersonalfinance")
demo = gr.Interface(
fn=main,
inputs=[
gr.Textbox(label="Keyword", placeholder="e.g., ccj"),
gr.Number(label="Max posts", precision=0, minimum=1, value=10),
gr.Textbox(label="Subreddits (comma-separated). Or leave empty and search everywhere", placeholder="music, indieheads")
],
outputs=gr.Textbox(label="Result"),
title="redditHack",
description="enter keyword, and comma seerate string of subreddits"
)
demo.launch(share=True)
|