Reddit-Poster / app.py
Nymbo's picture
Update app.py
38c4d52 verified
"""
Reddit-Poster: Plan and post updates to many subreddits efficiently.
Features
- Authenticate with Reddit using PRAW (script app credentials).
- Paste a list of subreddits and fetch basic posting constraints.
- Provide a title template and body once; auto-generate per-subreddit posts.
- Throttle actions to avoid spam bans; queue enabled, single-concurrency events.
- Open prefilled compose pages in your browser (manual review per sub).
- Optional: submit via API (text/link) with per-post delay.
Notes
- Credentials are held only in-memory per session; not persisted.
- For image/galleries, we currently open the compose page; API submit for images is
not universally accepted across subs and requires extra checks.
"""
from __future__ import annotations
import os
import time
import html
import json
import textwrap
import threading
import webbrowser
import re
import tempfile
import shutil
from pathlib import Path
from dataclasses import dataclass, asdict, field
from typing import Dict, List, Optional, Tuple
from urllib.parse import urlsplit, urlunsplit, parse_qsl
import gradio as gr
import requests
try:
import praw # PRAW: Python Reddit API Wrapper
except Exception as e: # pragma: no cover
praw = None # handled in UI
try: # Rate limit exception is in prawcore
from prawcore.exceptions import RateLimitExceeded # type: ignore
except Exception: # pragma: no cover
RateLimitExceeded = None # type: ignore
try: # Other useful exceptions for subreddit fetch handling
from prawcore.exceptions import NotFound, Forbidden, Redirect, BadRequest # type: ignore
except Exception: # pragma: no cover
NotFound = Forbidden = Redirect = BadRequest = None # type: ignore
# -------------------------
# Constants / Helpers
# -------------------------
REDDIT_TITLE_MAX = 300 # Current Reddit title character cap
REDDIT_BODY_MAX = 40000 # Current selftext cap
PREFILL_BODY_MAX = 9500 # Conservative limit for URL prefill payload (avoid extremely long query URLs)
def _truncate(s: str, limit: int) -> str:
if s is None:
return ""
if len(s) <= limit:
return s
return s[: max(0, limit - 1)] + "…"
def _safe_join_lines(text: str) -> str:
# Normalize Windows/Mac newlines, ensure body is clean
return (text or "").replace("\r\n", "\n").replace("\r", "\n").strip()
_MD_IMAGE_RE = re.compile(r"!\[(?P<alt>[^\]]*)\]\((?P<url>https?://[^)\s]+)\)")
def _extract_markdown_images(md: str) -> List[Tuple[str, str]]:
"""Return list of (alt, url) for markdown image tags in the given text."""
out: List[Tuple[str, str]] = []
if not md:
return out
for m in _MD_IMAGE_RE.finditer(md):
alt = m.group("alt") or ""
url = m.group("url")
out.append((alt, url))
return out
def _strip_markdown_images(md: str) -> str:
"""Remove markdown image tags from text (keeps the rest)."""
if not md:
return md
return _MD_IMAGE_RE.sub("", md).strip()
def _normalize_sub_name(s: str) -> str:
s = s.strip()
s = s.removeprefix("r/").removeprefix("/r/")
return s
def _prefill_submit_url(
subreddit: str,
*,
title: str = "",
text: Optional[str] = None,
url: Optional[str] = None,
nsfw: bool = False,
spoiler: bool = False,
kind: Optional[str] = None,
) -> str:
# New Reddit submit (prefill works when logged in). Old style still honored.
# https://www.reddit.com/r/{sub}/submit?title=...&text=... or &url=...
from urllib.parse import urlencode
params = {"title": title}
if text is not None:
params["text"] = text
# Some variants also look for selftext
params["selftext"] = text
if url is not None:
params["url"] = url
# Infer kind if not explicitly provided
if kind is None:
if text is not None and url is None:
kind = "self"
elif url is not None and text is None:
kind = "link"
if kind:
params["kind"] = kind
if nsfw:
params["nsfw"] = "true"
if spoiler:
params["spoiler"] = "true"
return f"https://www.reddit.com/r/{subreddit}/submit?" + urlencode(params)
@dataclass
class SubInfo:
name: str
submission_type: str # "any" | "link" | "self"
allow_images: bool
allow_videos: bool
link_flair_enabled: bool
nsfw: bool
title_limit: int = REDDIT_TITLE_MAX
@dataclass
class PlannedPost:
subreddit: str
post_type: str # "text" | "link"
title: str
body: str
link: Optional[str]
compose_url: str
needs_flair: bool
flair_id: Optional[str] = None
flair_text: Optional[str] = None
notes: str = ""
images: Optional[List[Tuple[str, str]]] = None # list of (alt, url) found in body
local_images: Optional[List[str]] = None # local file paths uploaded via UI
local_captions: Optional[List[str]] = None # optional captions, aligned with local_images
def _ensure_praw_available() -> Optional[str]:
if praw is None:
return "PRAW library not available. Install 'praw' in this environment."
return None
def create_reddit(client_id: str, client_secret: str, user_agent: str, username: str, password: str):
missing = [k for k, v in {
"client_id": client_id,
"client_secret": client_secret,
"user_agent": user_agent,
"username": username,
"password": password,
}.items() if not v]
if missing:
raise ValueError(f"Missing required fields: {', '.join(missing)}")
reddit = praw.Reddit(
client_id=client_id,
client_secret=client_secret,
user_agent=user_agent,
username=username,
password=password,
)
# Lightweight auth check
_ = reddit.user.me()
return reddit
def fetch_sub_info(reddit, sub: str) -> SubInfo:
sr = reddit.subreddit(sub)
# Accessing properties triggers API requests; keep minimal
return SubInfo(
name=sr.display_name,
submission_type=str(sr.submission_type or "any"),
allow_images=bool(getattr(sr, "allow_images", False)),
allow_videos=bool(getattr(sr, "allow_videos", False)),
link_flair_enabled=bool(getattr(sr, "link_flair_enabled", False)),
nsfw=bool(sr.over18),
title_limit=REDDIT_TITLE_MAX,
)
def list_link_flairs(reddit, sub: str) -> List[Dict[str, str]]:
try:
sr = reddit.subreddit(sub)
# PRAW: returns a ListingGenerator of flair templates
# We convert to simple dicts with id and text
flairs = []
for f in sr.flair.link_templates: # type: ignore[attr-defined]
flairs.append({
"id": f.get("id", ""),
"text": f.get("text", ""),
"mod_only": f.get("mod_only", False),
"required": f.get("required", False),
})
return flairs
except Exception:
return []
def generate_plan(
title_template: str,
body: str,
link: str | None,
subs: List[SubInfo],
nsfw: bool,
spoiler: bool,
prefer_text: bool = False,
*,
uploaded_image_paths: Optional[List[str]] = None,
uploaded_captions: Optional[List[str]] = None,
) -> List[PlannedPost]:
body_norm = _safe_join_lines(body)
images = _extract_markdown_images(body_norm)
plan: List[PlannedPost] = []
for si in subs:
title = (title_template or "").format(subreddit=si.name, r=si.name)
title = _truncate(title.strip(), si.title_limit)
# Decide initial desired type (before subreddit restrictions)
initial_post_type = "text" if (prefer_text or not link) else "link"
final_post_type = initial_post_type
use_link: Optional[str] = link if initial_post_type == "link" else None
note = ""
body_norm_post = body_norm
# Adapt to subreddit submission constraints with auto-flip + notes
allowed = (si.submission_type or "any").lower()
if allowed == "self":
# Text-only subreddit
if final_post_type == "link":
final_post_type = "text"
use_link = None
# Embed the link into the body so content still gets shared
if link and (link not in body_norm_post):
body_norm_post = body_norm_post + ("\n\n" if body_norm_post else "") + str(link)
note = (note + " " if note else "") + "Auto-flipped: subreddit is text-only; embedded link into body."
elif allowed == "link":
# Link-only subreddit
if final_post_type == "text":
if link:
final_post_type = "link"
use_link = link
note = (note + " " if note else "") + "Auto-flipped: subreddit is link-only; switched to link."
else:
# No URL available; we must stay text but warn the user clearly
final_post_type = "text"
use_link = None
note = (note + " " if note else "") + "Warning: subreddit is link-only but no URL provided; staying text (composer may reject)."
else:
# 'any' or unknown -> keep initial choice
if final_post_type == "text" and link and (link not in body_norm_post) and prefer_text:
# When user prefers text but provided a link, include it for convenience
body_norm_post = body_norm_post + ("\n\n" if body_norm_post else "") + str(link)
post_type = final_post_type
# Compose URL for manual posting
# For very large bodies, truncate in the prefill URL but keep full body inside plan object.
prefill_body = body_norm_post if post_type == "text" else None
truncated_for_prefill = False
if prefill_body is not None and len(prefill_body) > PREFILL_BODY_MAX:
prefill_body = prefill_body[:PREFILL_BODY_MAX - 1] + "…"
truncated_for_prefill = True
compose_url = _prefill_submit_url(
si.name,
title=title,
text=(prefill_body if post_type == "text" else None),
url=use_link if post_type == "link" else None,
nsfw=nsfw,
spoiler=spoiler,
kind=("self" if post_type == "text" else "link"),
)
flairs = list_link_flairs(_STATE.reddit, si.name) if _STATE.reddit else []
needs_flair = any(f.get("required", False) for f in flairs)
if needs_flair:
note = (note + " " if note else "") + "Flair required; you'll need to pick one in the composer."
# Body cap (for preview/storage; link posts keep body for preview/reference only)
use_body = _truncate(body_norm_post, REDDIT_BODY_MAX)
if truncated_for_prefill:
note = (note + " " if note else "") + f"Prefill truncated to {PREFILL_BODY_MAX} chars for URL; full body preserved.".strip()
chosen = _STATE.selected_flairs.get(si.name.lower()) if hasattr(_STATE, 'selected_flairs') else None
flair_id = chosen.get("id") if chosen else None
flair_text = chosen.get("text") if chosen else None
if flair_id and note:
note = f"{note} Flair selected: {flair_text}".strip()
elif flair_id and not note:
note = f"Flair selected: {flair_text}"
# If markdown images detected, annotate plan so the submitter can decide how to handle
imgs_for_sub = images[:] if images else []
if imgs_for_sub:
if si.allow_images:
note = (note + " " if note else "") + f"Detected {len(imgs_for_sub)} image(s) in body. API can convert to image/gallery post."
else:
note = (note + " " if note else "") + f"Detected {len(imgs_for_sub)} image(s) but subreddit may not allow images; inline markdown images won't render on Reddit."
# Uploaded local images (apply to all subs equally)
local_imgs = [p for p in (uploaded_image_paths or []) if p]
local_caps = (uploaded_captions or [])
if local_imgs:
if si.allow_images:
note = (note + " " if note else "") + f"Using {len(local_imgs)} uploaded image(s) for media post."
else:
note = (note + " " if note else "") + f"Uploaded images provided but subreddit may not allow images."
plan.append(PlannedPost(
subreddit=si.name,
post_type=post_type,
title=title,
body=use_body,
link=use_link,
compose_url=compose_url,
needs_flair=needs_flair,
flair_id=flair_id,
flair_text=flair_text,
notes=note,
images=imgs_for_sub,
local_images=local_imgs if local_imgs else None,
local_captions=local_caps if local_caps else None,
))
return plan
def open_compose_tabs(plan: List[PlannedPost], delay_seconds: float = 2.0) -> str:
"""Open each compose URL in the default browser with delay."""
count = 0
for p in plan:
webbrowser.open_new_tab(p.compose_url)
count += 1
if delay_seconds > 0 and count < len(plan):
time.sleep(delay_seconds)
return f"Opened {count} compose tabs. Review and submit manually to respect sub rules."
"""Submission utilities (advanced throttled version defined below)."""
# Throttled submit implementation (replaces basic version above)
def submit_via_api(
reddit,
plan: List[PlannedPost],
*,
per_post_delay: float = 15.0, # used if no throttle config set
apply_nsfw_flag: bool = False,
spoiler: bool = False,
duplicate_window_days: int = 14,
skip_duplicates: bool = True,
convert_markdown_images: bool = False,
comment_text_under_media: bool = True,
):
"""Submit posts via Reddit API (text/link) applying advanced throttle rules if configured.
Yields log lines as they are generated for real-time display.
"""
if reddit is None:
yield "Not authenticated."
return
cfg: ThrottleConfig = _STATE.throttle or ThrottleConfig(min_delay=per_post_delay)
ordered = _enforce_order(plan, cfg)
for idx, p in enumerate(ordered, start=1):
if _posts_in_window(cfg) >= cfg.global_max_posts:
yield f"🚫 Global cap {cfg.global_max_posts} reached in last {cfg.global_window_minutes}m — stopping."
break
# Duplicate guard
is_dup, reason = _is_duplicate_recent(reddit, p.subreddit, p.title, (p.link if p.post_type == "link" else None), duplicate_window_days)
if is_dup:
msg = f"[{idx}/{len(plan)}] r/{p.subreddit}: Duplicate detected ({reason})."
if skip_duplicates:
yield msg + " Skipping."
_STATE.post_history.append({
"timestamp": _now_ts().isoformat(),
"subreddit": p.subreddit,
"method": "duplicate_skip",
"reason": reason or "duplicate",
})
continue
else:
yield msg + " Proceeding due to settings."
# Collect throttle logs in a temp list and yield them
throttle_logs: List[str] = []
_sleep_before_post(p.subreddit, cfg, throttle_logs, idx)
for log in throttle_logs:
yield log
try:
sr = reddit.subreddit(p.subreddit)
submit_kwargs = {"title": p.title}
# flair (prefer id)
if p.flair_id:
submit_kwargs["flair_id"] = p.flair_id
elif p.flair_text:
submit_kwargs["flair_text"] = p.flair_text
if p.post_type == "text":
# Prefer uploaded local images; otherwise optionally convert markdown images.
si = _STATE.sub_info_cache.get(p.subreddit.lower(), SubInfo(p.subreddit, "any", False, False, False, False))
if (p.local_images and si.allow_images) or (convert_markdown_images and p.images and si.allow_images):
sub_logs: List[str] = []
submission = _submit_images_with_ratelimit(sr, p, sub_logs)
for log in sub_logs:
yield log
if submission and comment_text_under_media:
try:
body_no_imgs = _strip_markdown_images(p.body)
if body_no_imgs:
submission.reply(body_no_imgs)
except Exception:
pass
else:
submit_kwargs["selftext"] = p.body
sub_logs = []
submission = _submit_with_ratelimit(sr, submit_kwargs, sub_logs)
for log in sub_logs:
yield log
elif p.post_type == "link" and p.link:
submit_kwargs["url"] = p.link
sub_logs = []
submission = _submit_with_ratelimit(sr, submit_kwargs, sub_logs)
for log in sub_logs:
yield log
else:
yield f"[{idx}/{len(plan)}] r/{p.subreddit}: Skipped unsupported type '{p.post_type}'."
continue
if apply_nsfw_flag:
try: submission.mod.nsfw()
except Exception: pass
if spoiler:
try: submission.mod.spoiler()
except Exception: pass
yield f"[{idx}/{len(plan)}] r/{p.subreddit}: Submitted (id={submission.id})."
_STATE.post_history.append({
"timestamp": _now_ts().isoformat(),
"subreddit": p.subreddit,
"method": "api",
"id": submission.id,
})
except Exception as e:
yield f"[{idx}/{len(plan)}] r/{p.subreddit}: ERROR {e}"
_STATE.post_history.append({
"timestamp": _now_ts().isoformat(),
"subreddit": p.subreddit,
"method": "api_error",
"error": str(e),
})
def _download_image_to_temp(url: str, tmpdir: Path, idx: int) -> Optional[Path]:
try:
resp = requests.get(url, timeout=30)
resp.raise_for_status()
# Guess extension from content-type or url
ext = None
ctype = resp.headers.get("content-type", "").lower()
if "image/" in ctype:
ext = "." + ctype.split("/", 1)[1].split(";")[0].strip()
if ext in {".jpeg"}: ext = ".jpg"
if not ext:
for cand in [".png", ".jpg", ".jpeg", ".gif", ".webp"]:
if url.lower().endswith(cand):
ext = ".jpg" if cand == ".jpeg" else cand
break
if not ext:
ext = ".jpg"
fname = tmpdir / f"img_{idx}{ext}"
with open(fname, "wb") as f:
f.write(resp.content)
return fname
except Exception:
return None
def _submit_images_with_ratelimit(sr, p: PlannedPost, logs: List[str]):
"""Submit a single image or gallery. If local images are provided, use them; otherwise download from body URLs."""
tmpdir = Path(tempfile.mkdtemp(prefix="reddit_poster_"))
files: List[Tuple[str, Path]] = [] # (caption, path)
try:
if p.local_images:
# Use uploaded local images first
caps = p.local_captions or []
for i, path_str in enumerate(p.local_images):
try:
path = Path(path_str)
cap = caps[i] if i < len(caps) else ""
if path.exists():
files.append((cap, path))
except Exception:
continue
else:
# Fall back to downloading images from markdown URLs
for i, (alt, url) in enumerate(p.images or []):
path = _download_image_to_temp(url, tmpdir, i)
if path:
files.append((alt, path))
if not files:
# Fallback to text submit if no files were fetched
submit_kwargs = {"title": p.title, "selftext": p.body}
# preserve flair
if p.flair_id:
submit_kwargs["flair_id"] = p.flair_id
elif p.flair_text:
submit_kwargs["flair_text"] = p.flair_text
return _submit_with_ratelimit(sr, submit_kwargs, logs)
# One image -> submit_image; many -> submit_gallery
if len(files) == 1:
caption, path = files[0]
attempts = 0
while True:
try:
attempts += 1
return sr.submit_image(title=p.title, image_path=str(path), flair_id=p.flair_id if p.flair_id else None, flair_text=p.flair_text if (p.flair_text and not p.flair_id) else None)
except Exception as e:
if RateLimitExceeded and isinstance(e, RateLimitExceeded):
sleep_s = getattr(e, "sleep_time", None) or 60
logs.append(f"⛔ Rate limited (image): sleeping {int(sleep_s)}s.")
time.sleep(float(sleep_s))
if attempts < 3:
continue
raise
else:
# Build images payload: list of dicts with image_path and optional caption
images_payload = []
for cap, path in files:
item = {"image_path": str(path)}
if cap:
item["caption"] = cap[:180]
images_payload.append(item)
attempts = 0
while True:
try:
attempts += 1
return sr.submit_gallery(title=p.title, images=images_payload, flair_id=p.flair_id if p.flair_id else None, flair_text=p.flair_text if (p.flair_text and not p.flair_id) else None)
except Exception as e:
if RateLimitExceeded and isinstance(e, RateLimitExceeded):
sleep_s = getattr(e, "sleep_time", None) or 60
logs.append(f"⛔ Rate limited (gallery): sleeping {int(sleep_s)}s.")
time.sleep(float(sleep_s))
if attempts < 3:
continue
raise
finally:
try:
shutil.rmtree(tmpdir, ignore_errors=True)
except Exception:
pass
def ui_open_tabs(open_delay: float):
if not _STATE.plan_cache:
return "No plan generated yet."
msg = open_compose_tabs(_STATE.plan_cache, delay_seconds=float(open_delay))
return msg
def ui_submit_api(api_delay: float, nsfw_flag: bool, spoiler_flag: bool, dup_days: int = 14, skip_dups: bool = True, convert_md_images: bool = True, comment_body_under_media: bool = True):
if _STATE.reddit is None:
yield "Not authenticated."
return
if not _STATE.plan_cache:
yield "No plan generated. Generate plan before submitting."
return
# Accumulate logs for display
accumulated_logs = []
for log_line in submit_via_api(
_STATE.reddit,
_STATE.plan_cache,
per_post_delay=float(api_delay),
apply_nsfw_flag=bool(nsfw_flag),
spoiler=bool(spoiler_flag),
duplicate_window_days=int(dup_days),
skip_duplicates=bool(skip_dups),
convert_markdown_images=bool(convert_md_images),
comment_text_under_media=bool(comment_body_under_media),
):
accumulated_logs.append(log_line)
yield "\n".join(accumulated_logs)
# -------------------------
# Gradio App
# -------------------------
class _State:
reddit = None # praw.Reddit | None
sub_info_cache: Dict[str, SubInfo] = {}
flairs_cache: Dict[str, List[Dict[str, str]]] = {}
selected_flairs: Dict[str, Dict[str, str]] = {}
plan_cache: List[PlannedPost] = []
throttle = None # ThrottleConfig | None
post_history: List[Dict[str, str]] = [] # simple in-memory log entries
verbose: bool = True # toggle for noisy console logging
me_name: Optional[str] = None # authenticated username for duplicate checks
# Track subreddits omitted from the rules table (e.g., banned, 404, etc.)
omitted_subs: List[Dict[str, str]] = []
_STATE = _State()
# -------------------------
# Advanced Throttling Utilities
# -------------------------
from datetime import datetime, timedelta
import random
@dataclass
class ThrottleConfig:
"""Configuration controlling automated throttling when submitting via API.
Attributes:
min_delay: Base minimum seconds between ANY two posts (global).
jitter: Random +/- seconds added to each enforced delay (prevents pattern detection).
per_sub_cooldown: Minutes before another post to the same subreddit.
global_window_minutes: Rolling window size in minutes for global cap.
global_max_posts: Maximum posts allowed inside the rolling window (defensive).
randomize_order: Shuffle plan order prior to posting.
"""
min_delay: float = 20.0
jitter: float = 5.0
per_sub_cooldown: float = 60.0
global_window_minutes: float = 30.0
global_max_posts: int = 8
randomize_order: bool = False
def to_dict(self) -> Dict[str, object]:
return asdict(self)
def _now_ts() -> datetime:
return datetime.utcnow()
def _posts_in_window(cfg: ThrottleConfig) -> int:
cutoff = _now_ts() - timedelta(minutes=cfg.global_window_minutes)
return sum(1 for p in _STATE.post_history if datetime.fromisoformat(p["timestamp"]) >= cutoff)
def _last_post_time() -> Optional[datetime]:
if not _STATE.post_history:
return None
return datetime.fromisoformat(_STATE.post_history[-1]["timestamp"])
def _last_post_time_for_sub(sub: str) -> Optional[datetime]:
for entry in reversed(_STATE.post_history):
if entry.get("subreddit") == sub:
return datetime.fromisoformat(entry["timestamp"])
return None
def _enforce_order(plan: List[PlannedPost], cfg: ThrottleConfig) -> List[PlannedPost]:
if not cfg.randomize_order:
return plan
shuffled = plan[:]
random.shuffle(shuffled)
return shuffled
def _sleep_before_post(sub: str, cfg: ThrottleConfig, logs: List[str], idx: int):
now = _now_ts()
delay_needed = 0.0
last_global = _last_post_time()
if last_global is not None:
elapsed = (now - last_global).total_seconds()
if elapsed < cfg.min_delay:
delay_needed = cfg.min_delay - elapsed
last_sub = _last_post_time_for_sub(sub)
if last_sub is not None:
sub_elapsed = (now - last_sub).total_seconds()
sub_needed = (cfg.per_sub_cooldown * 60.0) - sub_elapsed
if sub_needed > delay_needed:
delay_needed = sub_needed
# Apply jitter
if cfg.jitter > 0:
delay_needed += random.uniform(-cfg.jitter, cfg.jitter)
if delay_needed < 0:
delay_needed = 0
if delay_needed > 0 and idx > 1: # skip sleeping before first post
logs.append(f"⏳ Throttle: sleeping {delay_needed:.1f}s before posting to r/{sub} (global + per-sub rules).")
time.sleep(delay_needed)
# -------------------------
# Duplicate Guard Utilities
# -------------------------
def _normalize_title(s: str) -> str:
s = (s or "").strip().casefold()
# collapse whitespace
s = re.sub(r"\s+", " ", s)
return s
def _normalize_url(u: Optional[str]) -> Optional[str]:
if not u:
return None
try:
parts = urlsplit(u)
scheme = parts.scheme.lower() or "https"
netloc = parts.netloc.lower()
path = parts.path.rstrip("/")
# remove tracking params commonly used
q = []
for k, v in parse_qsl(parts.query, keep_blank_values=True):
lk = k.lower()
if lk.startswith("utm_") or lk in {"ref", "ref_source", "source"}:
continue
q.append((k, v))
query = "&".join([f"{k}={v}" for k, v in q])
return urlunsplit((scheme, netloc, path, query, ""))
except Exception:
return u
def _is_duplicate_recent(reddit, subreddit: str, title: str, link: Optional[str], window_days: int) -> Tuple[bool, Optional[str]]:
"""Return (is_dup, reason) against the authenticated user's recent submissions."""
try:
if reddit is None or not getattr(_STATE, "me_name", None):
return False, None
me = reddit.redditor(_STATE.me_name)
cutoff_ts = time.time() - (window_days * 86400)
target_title = _normalize_title(title)
target_url = _normalize_url(link)
for subm in me.submissions.new(limit=200):
try:
if getattr(subm.subreddit, "display_name", "").lower() != subreddit.lower():
continue
if getattr(subm, "created_utc", 0) < cutoff_ts:
break # listing is newest first
# same title?
if target_title and _normalize_title(getattr(subm, "title", "")) == target_title:
return True, "same title in recent submissions"
# same URL? (only meaningful for link posts)
if target_url:
subm_url = _normalize_url(getattr(subm, "url", None))
if subm_url and subm_url == target_url:
return True, "same URL in recent submissions"
except Exception:
continue
return False, None
except Exception:
return False, None
# -------------------------
# Rate-limit aware submit
# -------------------------
def _submit_with_ratelimit(sr, submit_kwargs: Dict[str, object], logs: List[str]):
"""Call sr.submit with automatic handling for RateLimitExceeded.
Sleeps exactly the time suggested by Reddit and retries (up to 3 attempts).
This function intentionally does not call throttle sleeps to avoid double-sleeping.
"""
attempts = 0
while True:
try:
attempts += 1
return sr.submit(**submit_kwargs)
except Exception as e:
# Handle rate limit explicitly if available
if RateLimitExceeded and isinstance(e, RateLimitExceeded): # type: ignore[arg-type]
# Prefer provided sleep_time; otherwise parse from message
sleep_s = getattr(e, "sleep_time", None)
if not sleep_s:
# Parse patterns like "in 5 minutes" or "in 43 seconds"
m = re.search(r"in\s+(\d+)\s+(second|seconds|minute|minutes)", str(e), re.I)
if m:
val = int(m.group(1))
unit = m.group(2).lower()
sleep_s = val * (60 if unit.startswith("minute") else 1)
if not sleep_s:
sleep_s = 60 # fallback 1 minute
logs.append(f"⛔ Rate limited: sleeping {int(sleep_s)}s exactly as requested by Reddit.")
time.sleep(float(sleep_s))
if attempts < 3:
continue
# Non-rate limit or max attempts reached
raise
def ui_set_throttle(min_delay, jitter, per_sub_cooldown, global_window, global_cap, randomize):
cfg = ThrottleConfig(
min_delay=float(min_delay),
jitter=float(jitter),
per_sub_cooldown=float(per_sub_cooldown),
global_window_minutes=float(global_window),
global_max_posts=int(global_cap),
randomize_order=bool(randomize),
)
_STATE.throttle = cfg
return json.dumps(cfg.to_dict(), indent=2)
def ui_connect(client_id, client_secret, user_agent, username, password):
err = _ensure_praw_available()
if err:
return gr.update(value=f"❌ {err}"), None
try:
reddit = create_reddit(client_id, client_secret, user_agent, username, password)
_STATE.reddit = reddit
me = reddit.user.me()
_STATE.me_name = str(me)
# Reset omitted list on new auth
_STATE.omitted_subs = []
return f"✅ Authenticated as u/{me}", json.dumps({"user": str(me)}, indent=2)
except Exception as e:
_STATE.reddit = None
return f"❌ Authentication failed: {e}", None
def ui_fetch_rules(subs_text: str, per_request_delay: float = 1.0):
"""Fetch basic posting constraints AND cache available link flairs for each subreddit."""
if _STATE.reddit is None:
return "Not authenticated.", gr.update(), gr.update(value=""), gr.update(headers=["subreddit", "reason"], value=[])
subs = [_normalize_sub_name(s) for s in subs_text.splitlines() if s.strip()]
if _STATE.verbose:
print(f"[fetch_rules] Request for {len(subs)} subs")
infos: List[SubInfo] = []
rows: List[List[str]] = []
omitted: List[Dict[str, str]] = []
for i, name in enumerate(subs, start=1):
cache_key = name.lower()
try:
# First, check if user is banned from this subreddit; this requires fetching the about data.
sr = _STATE.reddit.subreddit(name)
is_banned = bool(getattr(sr, "user_is_banned", False))
if is_banned:
omitted.append({"subreddit": name, "reason": "banned"})
if _STATE.verbose:
print(f" [{i}/{len(subs)}] r/{name}: omitted (banned)")
continue
# Fetch SubInfo (will also trigger network and raise if 404, etc.)
if cache_key in _STATE.sub_info_cache:
si = _STATE.sub_info_cache[cache_key]
else:
si = fetch_sub_info(_STATE.reddit, name)
_STATE.sub_info_cache[cache_key] = si
# Fetch flairs if flair enabled
flairs: List[Dict[str, str]] = []
if si.link_flair_enabled:
flairs = list_link_flairs(_STATE.reddit, si.name)
_STATE.flairs_cache[cache_key] = flairs
infos.append(si)
rows.append([
si.name,
si.submission_type,
"yes" if si.allow_images else "no",
"yes" if si.allow_videos else "no",
"yes" if si.link_flair_enabled else "no",
"yes" if si.nsfw else "no",
])
if _STATE.verbose:
print(f" [{i}/{len(subs)}] r/{si.name}: flair_templates={len(_STATE.flairs_cache.get(cache_key, []))}")
time.sleep(float(per_request_delay))
except Exception as e:
# Classify common error cases
reason = None
etype = type(e).__name__
if NotFound and isinstance(e, NotFound):
reason = "not found (404)"
elif Forbidden and isinstance(e, Forbidden):
reason = "forbidden (403)"
elif Redirect and isinstance(e, Redirect):
reason = "redirect/invalid subreddit"
else:
reason = f"error: {etype}"
omitted.append({"subreddit": name, "reason": reason})
if _STATE.verbose:
print(f" OMIT {name}: {reason}")
# Save omitted list to state
_STATE.omitted_subs = omitted
summary = f"Fetched {len(infos)} subreddits."
omit_summary = (
f"Omitted {len(omitted)} subreddit(s)." if omitted else "No subreddits were omitted."
)
return (
summary,
gr.update(headers=["subreddit", "allowed", "images", "videos", "flair", "nsfw"], value=rows),
gr.update(value=omit_summary),
gr.update(headers=["subreddit", "reason"], value=[[o["subreddit"], o["reason"]] for o in omitted]),
)
def ui_generate_plan(title_tmpl: str, body: str, link: str | None, nsfw: bool, spoiler: bool, prefer_text: bool, subs_table_rows, uploaded_images_paths=None, uploaded_captions_text: str = ""):
if _STATE.reddit is None:
return "Not authenticated.", gr.update()
# Build SubInfo list from cache using rows
# Convert incoming value (could be pandas DataFrame, list, None) into list of lists
rows_input = subs_table_rows
if rows_input is None:
resolved_rows: List[List[str]] = []
elif hasattr(rows_input, "values") and hasattr(rows_input, "columns"):
# Likely a pandas DataFrame
try:
resolved_rows = rows_input.values.tolist()
except Exception:
resolved_rows = []
elif isinstance(rows_input, list):
resolved_rows = rows_input
else:
resolved_rows = []
subs: List[SubInfo] = []
if _STATE.verbose:
print(f"[generate_plan] Received raw type={type(rows_input)} row_count={len(resolved_rows)}")
if len(resolved_rows) > 0:
print(f"[generate_plan] Sample first row={resolved_rows[0]}")
print(f"[generate_plan] Current cache keys={list(_STATE.sub_info_cache.keys())}")
for r in resolved_rows:
if not r:
continue
name = r[0]
# attempt direct key, then lower-case fallback
si = _STATE.sub_info_cache.get(name) or _STATE.sub_info_cache.get(str(name).lower())
if si:
subs.append(si)
elif _STATE.verbose:
print(f"[generate_plan] No cache hit for row name='{name}' (tried '{str(name).lower()}')")
if not subs and resolved_rows and _STATE.verbose:
print("[generate_plan] WARNING: No SubInfo objects resolved; likely a cache key case mismatch or empty table value.")
# Handle uploaded images/captions
up_paths: Optional[List[str]] = None
if isinstance(uploaded_images_paths, list) and uploaded_images_paths:
up_paths = [str(p) for p in uploaded_images_paths if p]
caps_list: Optional[List[str]] = None
if isinstance(uploaded_captions_text, str) and uploaded_captions_text.strip():
caps_list = [ln.strip() for ln in uploaded_captions_text.splitlines()]
plan = generate_plan(
title_tmpl,
body,
link or None,
subs,
nsfw,
spoiler,
prefer_text=prefer_text,
uploaded_image_paths=up_paths,
uploaded_captions=caps_list,
)
_STATE.plan_cache = plan
headers = ["subreddit", "type", "title_len", "body_len", "needs_flair", "flair", "notes"]
rows = []
for p in plan:
flair_display = p.flair_text or (p.flair_id if p.flair_id else "")
rows.append([
p.subreddit,
p.post_type,
str(len(p.title)),
str(len(p.body)),
"yes" if p.needs_flair else "no",
flair_display,
p.notes,
])
if _STATE.verbose:
print(f"[generate_plan] Planned {len(plan)} posts.")
if len(plan) > 0:
print("[generate_plan] First plan entry sample:", asdict(plan[0]))
return f"Planned {len(plan)} posts.", gr.update(headers=headers, value=rows)
# -------------------------
# UI Construction
# -------------------------
with gr.Blocks(title="Reddit Poster", fill_width=True, theme="Nymbo/Nymbo_Theme") as demo:
gr.Markdown("""
# Reddit Poster
Plan compliant posts across many subreddits, with throttling and safe review.
## WARNING: YOU CAN EASILY GET YOURSELF BANNED FROM REDDIT BY SPAMMING. USE WITH EXTREME CAUTION.
1) Connect to Reddit with your app credentials. 2) Paste subreddits and fetch constraints. 3) Provide your title/body. 4) Generate a Plan. 5) Open compose tabs or submit via API.
""")
with gr.Accordion("1) Credentials", open=True):
with gr.Row():
client_id = gr.Textbox(label="Client ID", value=os.getenv("REDDIT_CLIENT_ID", ""))
client_secret = gr.Textbox(label="Client Secret", type="password", value=os.getenv("REDDIT_CLIENT_SECRET", ""))
user_agent = gr.Textbox(label="User Agent", value=os.getenv("REDDIT_USER_AGENT", "RedditPoster/1.0 by u/your_username"))
with gr.Row():
username = gr.Textbox(label="Username", value=os.getenv("REDDIT_USERNAME", ""))
password = gr.Textbox(label="Password", type="password", value=os.getenv("REDDIT_PASSWORD", ""))
connect_btn = gr.Button("Connect")
auth_status = gr.Markdown()
auth_info = gr.JSON(label="Auth Info", visible=False)
with gr.Accordion("2) Subreddits", open=True):
subs_text = gr.Textbox(label="Subreddits (one per line)", placeholder="technology\npython\nlearnprogramming")
with gr.Row():
per_req_delay = gr.Slider(0, 5, value=1.0, step=0.5, label="Delay between rule fetches (sec)")
fetch_btn = gr.Button("Fetch Rules")
rules_summary = gr.Markdown()
subs_table = gr.Dataframe(headers=["subreddit", "allowed", "images", "videos", "flair", "nsfw"], value=[], interactive=False)
gr.Markdown("### Omitted Subreddits")
omitted_summary = gr.Markdown(value="No subreddits were omitted.")
omitted_table = gr.Dataframe(headers=["subreddit", "reason"], value=[], interactive=False)
def ui_preview_post(index: int):
# Return markdown preview for selected planned post
if not _STATE.plan_cache:
return "No plan yet. Generate a plan first."
if index < 0 or index >= len(_STATE.plan_cache):
return f"Index out of range (0-{len(_STATE.plan_cache)-1})."
p = _STATE.plan_cache[index]
md = [f"### Preview: r/{p.subreddit}"]
md.append(f"**Type:** {p.post_type}")
md.append(f"**Title ({len(p.title)} chars):** {p.title}")
if p.post_type == "text":
md.append("\n**Body:**\n\n" + p.body)
else:
if p.link:
md.append(f"\n**Link:** {p.link}")
if p.body:
md.append("\n(Body not submitted; shown here only for reference)\n\n" + p.body)
# Media info
if getattr(p, 'local_images', None):
md.append(f"\nWill upload {len(p.local_images)} local image(s) on submit.")
elif getattr(p, 'images', None):
md.append(f"\nWill attempt to fetch {len(p.images)} image(s) from URLs on submit.")
if p.needs_flair:
md.append("\n> ⚠️ This subreddit requires a link flair. You'll need to choose one manually.")
return "\n\n".join(md)
with gr.Accordion("3) Content", open=True):
title_tmpl = gr.Textbox(label="Title Template", placeholder="New update for {subreddit}: Feature X is live!", info="You can use {subreddit}.")
body = gr.Textbox(label="Body (Markdown)", lines=12)
link = gr.Textbox(label="Link (optional)")
with gr.Row():
nsfw = gr.Checkbox(label="NSFW")
spoiler = gr.Checkbox(label="Spoiler")
prefer_text = gr.Checkbox(label="Prefer text posts (embed link in body)", value=False)
with gr.Accordion("Optional: Upload images for media post", open=False):
uploaded_images = gr.Files(label="Upload images (1-20)", file_types=["image"], type="filepath")
uploaded_captions = gr.Textbox(label="Captions (optional, one per line; aligned with uploaded images)", lines=4)
with gr.Accordion("4) Plan & Review", open=True):
gen_btn = gr.Button("Generate Plan")
plan_summary = gr.Markdown()
plan_table = gr.Dataframe(headers=["subreddit", "type", "title_len", "body_len", "needs_flair", "flair", "notes"], value=[], interactive=False)
export_json = gr.JSON(label="Plan JSON", value=[], visible=False)
with gr.Row():
preview_index = gr.Slider(0, 0, value=0, step=1, label="Preview Index")
refresh_preview = gr.Button("Show Preview")
post_preview = gr.Markdown(label="Post Preview")
with gr.Accordion("Flair Selection", open=False):
flair_status = gr.Markdown("Load or generate a plan to see flairs.")
# Pre-create placeholder radio groups for up to N subreddits
MAX_FLAIR_SUBS = 50
flair_radio_groups = [] # list[tuple[gr.Markdown, gr.Radio]]
for i in range(50):
label_md = gr.Markdown(f"Subreddit Slot {i+1}", visible=False)
radio = gr.Radio(choices=[], label="Flairs", interactive=True, visible=False)
flair_radio_groups.append((label_md, radio))
with gr.Row():
apply_all_flairs_btn = gr.Button("Apply Flair Selections")
clear_all_flairs_btn = gr.Button("Clear All Flairs")
with gr.Accordion("5) Act", open=True):
with gr.Accordion("Throttle Settings", open=False):
with gr.Row():
th_min_delay = gr.Slider(0, 180, value=20.0, step=1.0, label="Min delay between posts (s)")
th_jitter = gr.Slider(0, 60, value=5.0, step=0.5, label="Jitter ± (s)")
with gr.Row():
th_per_sub = gr.Slider(0, 360, value=60.0, step=5.0, label="Per-subreddit cooldown (min)")
th_window = gr.Slider(5, 240, value=30.0, step=5.0, label="Global window (min)")
with gr.Row():
th_global_cap = gr.Slider(1, 200, value=8, step=1, label="Global max posts / window")
th_random = gr.Checkbox(label="Randomize order", value=False)
apply_throttle_btn = gr.Button("Apply Throttle Config")
throttle_preview = gr.JSON(label="Active Throttle Config", value={})
with gr.Row():
open_delay = gr.Slider(0, 10, value=2.0, step=0.5, label="Delay between opened tabs (sec)")
open_btn = gr.Button("Open Compose Tabs")
open_log = gr.Textbox(label="Open Status", interactive=False)
gr.Markdown("---")
gr.Markdown("Submitting via API is optional and riskier. Prefer manual review via compose tabs; throttle conservatively.")
with gr.Row():
api_delay = gr.Slider(0, 120, value=20.0, step=5.0, label="Delay between API submissions (sec)")
nsfw_flag = gr.Checkbox(label="Mark NSFW via API")
spoiler_flag = gr.Checkbox(label="Mark Spoiler via API")
dup_days = gr.Slider(1, 60, value=14, step=1, label="Duplicate window (days)")
skip_dups = gr.Checkbox(label="Skip duplicates (same title/URL)", value=True)
with gr.Row():
convert_md_images = gr.Checkbox(label="Convert markdown images in body to image/gallery post", value=True)
comment_body_under_media = gr.Checkbox(label="After posting media, comment with body text", value=True)
submit_btn = gr.Button("Submit via API (text/link/image)")
submit_log = gr.Textbox(label="Submission Log", lines=10)
# Event wiring
connect_btn.click(
fn=ui_connect,
inputs=[client_id, client_secret, user_agent, username, password],
outputs=[auth_status, auth_info],
concurrency_limit=1,
api_name="connect",
)
fetch_btn.click(
fn=ui_fetch_rules,
inputs=[subs_text, per_req_delay],
outputs=[rules_summary, subs_table, omitted_summary, omitted_table],
concurrency_limit=1,
api_name="fetch_rules",
)
def _populate_flair_radios():
"""Populate per-subreddit radio groups with cached flair templates."""
subs = sorted({si.name for si in _STATE.sub_info_cache.values()})
updates = [] # interleaved label, radio updates
for idx, sub in enumerate(subs):
if idx >= len(flair_radio_groups):
break
label_md, radio = flair_radio_groups[idx]
key = sub.lower()
flairs = _STATE.flairs_cache.get(key, [])
choices = [f"{f['text']} | {f['id']}" if f.get('text') else f['id'] for f in flairs] if flairs else []
selected = None
if key in _STATE.selected_flairs:
cur = _STATE.selected_flairs[key]
match_label = f"{cur.get('text')} | {cur.get('id')}" if cur.get('text') else cur.get('id')
if match_label in choices:
selected = match_label
updates.append(gr.update(value=f"### r/{sub}", visible=True))
updates.append(gr.update(choices=choices, value=selected, interactive=bool(choices), visible=True))
# Hide remaining
for j in range(len(subs), len(flair_radio_groups)):
label_md, radio = flair_radio_groups[j]
updates.append(gr.update(value="", visible=False))
updates.append(gr.update(choices=[], value=None, interactive=False, visible=False))
status = f"Flair sets loaded for {min(len(subs), len(flair_radio_groups))} subreddits." if subs else "No subreddits loaded yet."
return [status, *updates]
def _apply_selected_flairs(*radio_values):
# radio_values order corresponds to flair_radio_groups radios only
subs = sorted({si.name for si in _STATE.sub_info_cache.values()})
for idx, sub in enumerate(subs):
if idx >= len(flair_radio_groups):
break
choice = radio_values[idx]
if not choice:
continue
key = sub.lower()
# parse id from label
if " | " in choice:
text_part, id_part = choice.split(" | ", 1)
_STATE.selected_flairs[key] = {"id": id_part, "text": text_part}
else:
_STATE.selected_flairs[key] = {"id": choice, "text": None}
# Refresh plan table flair column
rows = []
for p in _STATE.plan_cache:
# resolve newly applied flair if any
chosen = _STATE.selected_flairs.get(p.subreddit.lower())
if chosen:
p.flair_id = chosen.get('id')
p.flair_text = chosen.get('text')
flair_display = p.flair_text or (p.flair_id if p.flair_id else "")
rows.append([
p.subreddit,
p.post_type,
str(len(p.title)),
str(len(p.body)),
"yes" if p.needs_flair else "no",
flair_display,
p.notes,
])
return "Applied flair selections.", gr.update(value=rows)
def _clear_all_flairs():
_STATE.selected_flairs.clear()
for p in _STATE.plan_cache:
p.flair_id = None
p.flair_text = None
rows = []
for p in _STATE.plan_cache:
rows.append([
p.subreddit,
p.post_type,
str(len(p.title)),
str(len(p.body)),
"yes" if p.needs_flair else "no",
"",
p.notes,
])
return "Cleared all flairs.", gr.update(value=rows)
# Build list of outputs for _populate_flair_radios: flair_status + (label, radio) pairs
_populate_outputs = [flair_status]
for label_md, radio in flair_radio_groups:
_populate_outputs.append(label_md)
_populate_outputs.append(radio)
# Trigger population after fetch rules and after plan generation (in case of new subs)
fetch_btn.click(
fn=_populate_flair_radios,
inputs=None,
outputs=_populate_outputs,
queue=False,
)
gen_btn.click(
fn=_populate_flair_radios,
inputs=None,
outputs=_populate_outputs,
queue=False,
)
# Apply / Clear buttons wiring
apply_all_flairs_btn.click(
fn=_apply_selected_flairs,
inputs=[r for _, r in flair_radio_groups],
outputs=[flair_status, plan_table],
)
clear_all_flairs_btn.click(
fn=_clear_all_flairs,
inputs=None,
outputs=[flair_status, plan_table],
)
def _update_preview_slider():
# Adjust slider bounds dynamically after planning; disable if no posts
count = len(_STATE.plan_cache)
if count == 0:
return gr.update(minimum=0, maximum=0, value=0, interactive=False)
return gr.update(minimum=0, maximum=count-1, value=0, interactive=True)
gen_btn.click(
fn=ui_generate_plan,
inputs=[title_tmpl, body, link, nsfw, spoiler, prefer_text, subs_table, uploaded_images, uploaded_captions],
outputs=[plan_summary, plan_table],
concurrency_limit=1,
api_name="generate_plan",
).then(
lambda: json.dumps([asdict(p) for p in _STATE.plan_cache], indent=2),
None,
export_json,
).then(
_update_preview_slider,
None,
preview_index,
)
refresh_preview.click(
fn=ui_preview_post,
inputs=[preview_index],
outputs=[post_preview],
api_name="preview_post",
concurrency_limit=1,
)
open_btn.click(
fn=ui_open_tabs,
inputs=[open_delay],
outputs=[open_log],
concurrency_limit=1,
api_name="open_tabs",
)
submit_btn.click(
fn=ui_submit_api,
inputs=[api_delay, nsfw_flag, spoiler_flag, dup_days, skip_dups, convert_md_images, comment_body_under_media],
outputs=[submit_log],
concurrency_limit=1,
api_name="submit_api",
)
apply_throttle_btn.click(
fn=ui_set_throttle,
inputs=[th_min_delay, th_jitter, th_per_sub, th_window, th_global_cap, th_random],
outputs=[throttle_preview],
concurrency_limit=1,
api_name="set_throttle",
)
demo.queue(default_concurrency_limit=1)
if __name__ == "__main__":
demo.launch(inbrowser=True, show_error=True)