Reddit-Finder / app.py
Nymbo's picture
Create app.py
22b3c95 verified
"""Reddit Finder: Discover new subreddits and grow your outreach list.
This app connects to Reddit through PRAW and lets you search for subreddits,
apply filters, and curate a master list that you can export or copy into other
tools. It is designed as a companion to the Reddit Poster workflow.
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import Iterable, List, Optional
import gradio as gr
try: # pragma: no cover - handled gracefully in the UI
import praw # type: ignore
except Exception: # pragma: no cover
praw = None # type: ignore
try: # pragma: no cover
from prawcore import Forbidden, OAuthException, ResponseException # type: ignore
except Exception: # pragma: no cover
Forbidden = OAuthException = ResponseException = None # type: ignore
@dataclass(slots=True)
class SearchFilters:
min_subscribers: Optional[int] = None
max_subscribers: Optional[int] = None
min_active_users: Optional[int] = None
include_nsfw: bool = False
require_images: bool = False
require_videos: bool = False
language_whitelist: Optional[set[str]] = None
include_keywords: Optional[List[str]] = None
exclude_keywords: Optional[List[str]] = None
skip_existing: bool = True
@dataclass(slots=True)
class SubredditInfo:
name: str
title: str
subscribers: int
active_user_count: Optional[int]
over18: bool
allow_images: bool
allow_videos: bool
language: Optional[str]
created_utc: float
public_description: str
primary_topic: Optional[str]
url: str
@property
def created_iso(self) -> str:
return datetime.utcfromtimestamp(self.created_utc).strftime("%Y-%m-%d")
@property
def keyword_text(self) -> str:
return " ".join(
filter(
none_or_nonempty,
[
self.name,
self.title,
self.public_description,
self.primary_topic or "",
],
),
).lower()
def matches(self, filters: SearchFilters, existing_norm: set[str]) -> bool:
if filters.min_subscribers is not None and self.subscribers < filters.min_subscribers:
return False
if filters.max_subscribers is not None and self.subscribers > filters.max_subscribers:
return False
if filters.min_active_users is not None:
if self.active_user_count is None or self.active_user_count < filters.min_active_users:
return False
if not filters.include_nsfw and self.over18:
return False
if filters.require_images and not self.allow_images:
return False
if filters.require_videos and not self.allow_videos:
return False
if filters.language_whitelist and self.language:
if self.language.lower() not in filters.language_whitelist:
return False
if filters.include_keywords:
kw_text = self.keyword_text
if not all(keyword in kw_text for keyword in filters.include_keywords):
return False
if filters.exclude_keywords:
kw_text = self.keyword_text
if any(keyword in kw_text for keyword in filters.exclude_keywords):
return False
if filters.skip_existing and normalize_name(self.name) in existing_norm:
return False
return True
def to_row(self) -> List[str]:
return [
self.name,
self.title,
f"{self.subscribers:,}",
"?" if self.active_user_count is None else f"{self.active_user_count:,}",
"NSFW" if self.over18 else "Safe",
"Yes" if self.allow_images else "No",
"Yes" if self.allow_videos else "No",
self.language or "",
self.primary_topic or "",
self.created_iso,
self.url,
]
def none_or_nonempty(value: Optional[str]) -> bool:
return bool(value)
def normalize_name(name: str) -> str:
return name.strip().removeprefix("r/").removeprefix("/r/").casefold()
class _State:
reddit = None # praw.Reddit | None
latest_results: List[SubredditInfo] = []
master_list: List[str] = []
connected_user: Optional[str] = None
authenticated: bool = False
_STATE = _State()
def _ensure_praw_available() -> Optional[str]:
if praw is None:
return (
"PRAW is not available. Please install dependencies via requirements.txt "
"and restart the app."
)
return None
def _is_unauthorized(exc: Exception) -> bool:
if ResponseException and isinstance(exc, ResponseException):
resp = getattr(exc, "response", None)
return getattr(resp, "status_code", None) == 401
if OAuthException and isinstance(exc, OAuthException):
return True
return False
def _response_status(exc: Exception) -> Optional[int]:
if ResponseException and isinstance(exc, ResponseException):
resp = getattr(exc, "response", None)
return getattr(resp, "status_code", None)
if Forbidden and isinstance(exc, Forbidden):
return 403
return None
def create_reddit(
client_id: str,
client_secret: str,
user_agent: str,
username: Optional[str] = None,
password: Optional[str] = None,
) -> tuple[object, Optional[str], bool]:
base_missing = [
k
for k, v in {
"client_id": client_id,
"client_secret": client_secret,
"user_agent": user_agent,
}.items()
if not v
]
if base_missing:
raise ValueError(f"Missing required field(s): {', '.join(base_missing)}")
username = (username or "").strip()
password = (password or "").strip()
if username or password:
if not (username and password):
raise ValueError("Provide both username and password for authenticated login.")
kwargs = dict(client_id=client_id, client_secret=client_secret, user_agent=user_agent)
if username and password:
kwargs.update(username=username, password=password)
reddit = praw.Reddit(**kwargs)
try:
me = reddit.user.me()
authenticated = bool(username and password)
if me is None:
if authenticated:
return reddit, (username or None), True
reddit.read_only = True
return reddit, None, False
user_name = getattr(me, "name", username or None)
return reddit, user_name, authenticated
except Exception as exc:
if _is_unauthorized(exc):
if username and password:
raise
reddit.read_only = True
return reddit, None, False
raise
def _build_info(sr) -> SubredditInfo:
return SubredditInfo(
name=sr.display_name,
title=sr.title or sr.display_name,
subscribers=int(getattr(sr, "subscribers", 0) or 0),
active_user_count=getattr(sr, "active_user_count", None),
over18=bool(getattr(sr, "over18", False)),
allow_images=bool(getattr(sr, "allow_images", False)),
allow_videos=bool(getattr(sr, "allow_videos", False)),
language=(getattr(sr, "lang", None) or getattr(sr, "language", None) or None),
created_utc=float(getattr(sr, "created_utc", datetime.utcnow().timestamp())),
public_description=(getattr(sr, "public_description", "") or ""),
primary_topic=getattr(sr, "primary_topic", None),
url=f"https://reddit.com{sr.url}",
)
def _parse_keywords(text: str) -> Optional[List[str]]:
words = [w.strip().lower() for w in text.split(",") if w.strip()]
return words or None
def _parse_language_codes(text: str) -> Optional[set[str]]:
codes = {part.strip().lower() for part in text.split(",") if part.strip()}
return codes or None
def _filters_from_inputs(
min_subs: Optional[float],
max_subs: Optional[float],
min_active: Optional[float],
include_nsfw: bool,
require_images: bool,
require_videos: bool,
language_codes_text: str,
include_keywords_text: str,
exclude_keywords_text: str,
skip_existing: bool,
) -> SearchFilters:
return SearchFilters(
min_subscribers=int(min_subs) if min_subs else None,
max_subscribers=int(max_subs) if max_subs else None,
min_active_users=int(min_active) if min_active else None,
include_nsfw=bool(include_nsfw),
require_images=bool(require_images),
require_videos=bool(require_videos),
language_whitelist=_parse_language_codes(language_codes_text),
include_keywords=_parse_keywords(include_keywords_text),
exclude_keywords=_parse_keywords(exclude_keywords_text),
skip_existing=bool(skip_existing),
)
def search_subreddits(
reddit,
query: str,
limit: int,
sort: str,
time_filter: str,
filters: SearchFilters,
) -> tuple[List[SubredditInfo], Optional[str]]:
if not query.strip():
raise ValueError("Search query must not be empty.")
# Perform a robust search without passing params that can trip certain PRAW builds.
results: Optional[List[SubredditInfo]] = None
note: Optional[str] = None
max_limit = max(0, min(int(limit) if limit else 0, 100)) or None
def _execute_search(include_nsfw_flag: bool) -> List[SubredditInfo]:
params = {
"include_over_18": "true" if include_nsfw_flag else "false",
"type": "sr",
}
generator = reddit.subreddits.search(
query=query,
limit=max_limit,
params=params,
)
return [_build_info(sr) for sr in generator]
try:
results = _execute_search(filters.include_nsfw)
except Exception as exc:
if _is_unauthorized(exc):
raise RuntimeError("Reddit returned 401 Unauthorized. Check credentials or reconnect with script login.") from exc
status = _response_status(exc)
if status == 403 and filters.include_nsfw:
try:
results = _execute_search(False)
note = (
"Primary search returned 403 (likely NSFW restricted). Retried without NSFW communities. "
"Authenticate with script credentials and enable NSFW if you need adult results. "
f"Original error: {exc}"
)
except Exception as inner:
if _is_unauthorized(inner):
raise RuntimeError("Reddit returned 401 Unauthorized during retry search.") from inner
exc = inner
status = _response_status(inner)
results = None
if results is None:
# Fallback to name-based search if the main path fails.
try:
found = reddit.subreddits.search_by_name(query, include_nsfw=filters.include_nsfw, exact=False)
except Exception as inner:
if _is_unauthorized(inner):
raise RuntimeError("Reddit returned 401 Unauthorized during fallback search.") from inner
raise RuntimeError(f"Reddit search failed: {inner}") from inner
note = (
"Primary search failed; fell back to name-only search (Reddit limits this to ~10 results). "
f"Error: {exc}"
)
limit_slice = max(0, min(int(limit) if limit else 0, 100))
results = [_build_info(sr) for sr in (found[: limit_slice])]
# Apply client-side filtering (NSFW, images/videos, language, keywords, existing list, etc.)
existing_norm = {normalize_name(name) for name in _STATE.master_list}
filtered = [info for info in results if info.matches(filters, existing_norm)]
# Apply client-side sorting approximation for convenience
def sort_key(info: SubredditInfo):
if sort == "top":
return (-(info.subscribers or 0),)
if sort in ("activity", "hot"):
return (-(info.active_user_count or 0),)
if sort == "new":
return (-(info.created_utc or 0.0),)
# relevance -> keep original order; return zero key
return (0,)
if sort and sort != "relevance":
filtered.sort(key=sort_key)
# Final cap (in case filtering/sorting expanded aspects)
limit_cap = max(0, min(int(limit) if limit else 0, 100))
return filtered[: limit_cap or None], note
def _format_results_table(infos: Iterable[SubredditInfo]) -> List[List[str]]:
return [info.to_row() for info in infos]
def _master_text() -> str:
return "\n".join(_STATE.master_list)
def _master_urls_text() -> str:
urls: List[str] = []
for name in _STATE.master_list:
clean = name.strip()
lower = clean.lower()
if lower.startswith("/r/"):
clean = clean[3:]
elif lower.startswith("r/"):
clean = clean[2:]
urls.append(f"https://www.reddit.com/r/{clean}")
return "\n".join(urls)
def ui_connect(client_id: str, client_secret: str, user_agent: str, username: Optional[str] = None, password: Optional[str] = None):
err = _ensure_praw_available()
if err:
return gr.update(value=f"⚠️ {err}"), gr.update(value="")
try:
reddit, resolved_user, authenticated = create_reddit(
client_id,
client_secret,
user_agent,
username=username,
password=password,
)
_STATE.reddit = reddit
_STATE.connected_user = resolved_user
_STATE.authenticated = authenticated
if authenticated:
who = resolved_user or (username.strip() if isinstance(username, str) else None) or "authenticated user"
status = f"✅ Connected as {who}"
mode_note = "Authenticated mode with script credentials."
else:
status = "✅ Connected in read-only mode"
mode_note = (
"Public search only. Provide username & password for script-level access (recommended for 401 issues)."
)
info = (
"Credentials stored in memory for this session only. "
"You can now search for subreddits. "
f"{mode_note}"
)
return gr.update(value=status), gr.update(value=info)
except Exception as exc: # pragma: no cover - auth errors
_STATE.reddit = None
_STATE.connected_user = None
_STATE.authenticated = False
if _is_unauthorized(exc):
message = (
"❌ Authorization failed (401). Double-check client ID/secret and ensure the app is enabled for "
"script access. If you're using read-only credentials, Reddit may be rejecting them due to "
"missing scopes—consider adding username/password or refreshing the credentials."
)
else:
message = f"❌ Connection failed: {exc}"
return gr.update(value=message), gr.update(value="")
def ui_search(
query: str,
limit: int,
sort: str,
time_filter: str,
min_subs: Optional[float],
max_subs: Optional[float],
min_active: Optional[float],
include_nsfw: bool,
require_images: bool,
require_videos: bool,
language_codes_text: str,
include_keywords_text: str,
exclude_keywords_text: str,
skip_existing: bool,
):
if _STATE.reddit is None:
return (
gr.update(value="Please connect with Reddit credentials first."),
gr.update(value=[]),
gr.update(choices=[], value=[]),
)
filters = _filters_from_inputs(
min_subs,
max_subs,
min_active,
include_nsfw,
require_images,
require_videos,
language_codes_text,
include_keywords_text,
exclude_keywords_text,
skip_existing,
)
try:
found, note = search_subreddits(
_STATE.reddit,
query=query,
limit=limit,
sort=sort,
time_filter=time_filter,
filters=filters,
)
_STATE.latest_results = found
table = _format_results_table(found)
choices = [info.name for info in found]
status = (
f"Found {len(found)} subreddit(s) matching filters. "
"Use the checklist below to add to your master list."
)
if note:
status += f"\n\n*{note}*"
return gr.update(value=status), gr.update(value=table), gr.update(choices=choices, value=[])
except Exception as exc:
return gr.update(value=f"❌ Search failed: {exc}"), gr.update(value=[]), gr.update(choices=[], value=[])
def ui_add_to_master(selected: List[str], existing_text: str):
if not selected:
return (
gr.update(value=_master_text()),
gr.update(value="Nothing selected."),
gr.update(value=[]),
gr.update(value=_master_urls_text()),
)
current = {normalize_name(item): item for item in _STATE.master_list}
added = 0
for name in selected:
norm = normalize_name(name)
if norm not in current:
current[norm] = name
added += 1
_STATE.master_list = sorted(current.values(), key=str.casefold)
return (
gr.update(value=_master_text()),
gr.update(value=f"Added {added} subreddit(s) to master list." if added else "No new subreddits added."),
gr.update(value=[]),
gr.update(value=_master_urls_text()),
)
def ui_remove_from_master(remove_text: str, status: str):
names = [normalize_name(line) for line in (remove_text or "").splitlines() if line.strip()]
if not names:
return (
gr.update(value=_master_text()),
gr.update(value="Provide subreddit names (one per line) to remove."),
gr.update(value=""),
gr.update(value=_master_urls_text()),
)
to_remove = set(names)
remaining = [item for item in _STATE.master_list if normalize_name(item) not in to_remove]
removed = len(_STATE.master_list) - len(remaining)
_STATE.master_list = remaining
return (
gr.update(value=_master_text()),
gr.update(value=f"Removed {removed} subreddit(s)."),
gr.update(value=""),
gr.update(value=_master_urls_text()),
)
def ui_load_master(text: str):
items = [line.strip() for line in (text or "").splitlines() if line.strip()]
unique = {normalize_name(item): item for item in items}
_STATE.master_list = sorted(unique.values(), key=str.casefold)
return (
gr.update(value=_master_text()),
gr.update(value=f"Loaded {len(_STATE.master_list)} unique subreddit(s)."),
gr.update(value=""),
gr.update(value=_master_urls_text()),
)
def ui_clear_master():
_STATE.master_list = []
return (
gr.update(value=""),
gr.update(value="Master list cleared."),
gr.update(value=""),
gr.update(value=_master_urls_text()),
)
def ui_download_master():
text = _master_text()
return gr.FileData(data=text.encode("utf-8"), file_name="reddit_master_list.txt")
with gr.Blocks(title="Reddit Finder", fill_width=True, theme="Nymbo/Nymbo_Theme") as demo:
gr.Markdown(
"""
# Reddit Finder
Explore Reddit and build a curated list of subreddits for future outreach.
1. Connect with your Reddit app credentials (script or installed app).
2. Search for subreddits, tweak filters, and review the results table.
3. Add selected communities to your master list and export when ready.
"""
)
with gr.Accordion("1) Credentials", open=True):
with gr.Row():
client_id = gr.Textbox(label="Client ID", placeholder="Your Reddit app client_id")
client_secret = gr.Textbox(label="Client Secret", type="password")
user_agent = gr.Textbox(label="User Agent", value="reddit-finder by u/yourname")
with gr.Row():
username_tb = gr.Textbox(label="Username (optional)", placeholder="reddit_username")
password_tb = gr.Textbox(label="Password (optional)", type="password")
gr.Markdown(
"Provide username & password for script applications (recommended). Leave blank to use read-only app-only access."
)
connect_btn = gr.Button("Connect", variant="primary")
conn_status = gr.Textbox(label="Status", interactive=False)
conn_info = gr.Textbox(label="Details", interactive=False)
with gr.Accordion("2) Search", open=True):
query = gr.Textbox(label="Search query", placeholder="e.g. productivity apps, cozy coding")
with gr.Row():
limit = gr.Slider(label="Max results", minimum=5, maximum=100, step=5, value=25)
sort = gr.Radio(label="Sort", choices=["relevance", "activity", "hot", "new", "top"], value="relevance")
time_filter = gr.Radio(label="Time filter", choices=["hour", "day", "week", "month", "year", "all"], value="all")
with gr.Row():
min_subs = gr.Number(label="Min subscribers", value=None)
max_subs = gr.Number(label="Max subscribers", value=None)
min_active = gr.Number(label="Min active users", value=None)
with gr.Row():
include_nsfw = gr.Checkbox(label="Include NSFW", value=False)
require_images = gr.Checkbox(label="Require images enabled", value=False)
require_videos = gr.Checkbox(label="Require videos enabled", value=False)
with gr.Row():
language_codes = gr.Textbox(label="Language codes (comma separated)", placeholder="en, es, fr")
include_keywords = gr.Textbox(label="Include keywords (comma separated)", placeholder="mechanical, diy")
exclude_keywords = gr.Textbox(label="Exclude keywords (comma separated)", placeholder="nsfw, politics")
skip_existing = gr.Checkbox(label="Skip subreddits already in master list", value=True)
search_btn = gr.Button("Search", variant="primary")
search_status = gr.Markdown()
with gr.Accordion("3) Results", open=True):
results_table = gr.Dataframe(
headers=[
"subreddit",
"title",
"subscribers",
"active",
"nsfw",
"images",
"videos",
"lang",
"topic",
"created",
"url",
],
interactive=False,
wrap=True,
)
selection = gr.CheckboxGroup(label="Select subreddit(s) to add", choices=[])
add_btn = gr.Button("Add to master list", variant="secondary")
result_feedback = gr.Textbox(label="Result feedback", interactive=False)
with gr.Accordion("4) Master List", open=True):
master_text = gr.Textbox(label="Master list", lines=12, interactive=True, placeholder="Subreddit names, one per line")
master_urls = gr.Textbox(
label="Master list as URLs",
lines=12,
interactive=False,
show_copy_button=True,
value=_master_urls_text(),
)
gr.Markdown("This URL view updates automatically when the master list changes.")
with gr.Row():
load_btn = gr.Button("Load from textbox")
remove_box = gr.Textbox(label="Remove (one per line)")
remove_btn = gr.Button("Remove from list")
with gr.Row():
clear_btn = gr.Button("Clear all", variant="stop")
download_btn = gr.DownloadButton("Download master list", variant="primary")
master_status = gr.Textbox(label="Master status", interactive=False)
connect_btn.click(
fn=ui_connect,
inputs=[client_id, client_secret, user_agent, username_tb, password_tb],
outputs=[conn_status, conn_info],
api_name="connect",
)
search_btn.click(
fn=ui_search,
inputs=[
query,
limit,
sort,
time_filter,
min_subs,
max_subs,
min_active,
include_nsfw,
require_images,
require_videos,
language_codes,
include_keywords,
exclude_keywords,
skip_existing,
],
outputs=[search_status, results_table, selection],
api_name="search",
concurrency_limit=1,
)
add_btn.click(
fn=ui_add_to_master,
inputs=[selection, master_text],
outputs=[master_text, result_feedback, selection, master_urls],
)
load_btn.click(
fn=ui_load_master,
inputs=[master_text],
outputs=[master_text, master_status, remove_box, master_urls],
)
remove_btn.click(
fn=ui_remove_from_master,
inputs=[remove_box, master_status],
outputs=[master_text, master_status, remove_box, master_urls],
)
clear_btn.click(
fn=ui_clear_master,
inputs=None,
outputs=[master_text, master_status, remove_box, master_urls],
)
download_btn.click(
fn=ui_download_master,
inputs=None,
outputs=download_btn,
)
demo.queue(default_concurrency_limit=1)
if __name__ == "__main__":
demo.launch()