Spaces:
Sleeping
Sleeping
| """ | |
| Reddit-Poster: Plan and post updates to many subreddits efficiently. | |
| Features | |
| - Authenticate with Reddit using PRAW (script app credentials). | |
| - Paste a list of subreddits and fetch basic posting constraints. | |
| - Provide a title template and body once; auto-generate per-subreddit posts. | |
| - Throttle actions to avoid spam bans; queue enabled, single-concurrency events. | |
| - Open prefilled compose pages in your browser (manual review per sub). | |
| - Optional: submit via API (text/link) with per-post delay. | |
| Notes | |
| - Credentials are held only in-memory per session; not persisted. | |
| - For image/galleries, we currently open the compose page; API submit for images is | |
| not universally accepted across subs and requires extra checks. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import time | |
| import html | |
| import json | |
| import textwrap | |
| import threading | |
| import webbrowser | |
| import re | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| from dataclasses import dataclass, asdict, field | |
| from typing import Dict, List, Optional, Tuple | |
| from urllib.parse import urlsplit, urlunsplit, parse_qsl | |
| import gradio as gr | |
| import requests | |
| try: | |
| import praw # PRAW: Python Reddit API Wrapper | |
| except Exception as e: # pragma: no cover | |
| praw = None # handled in UI | |
| try: # Rate limit exception is in prawcore | |
| from prawcore.exceptions import RateLimitExceeded # type: ignore | |
| except Exception: # pragma: no cover | |
| RateLimitExceeded = None # type: ignore | |
| try: # Other useful exceptions for subreddit fetch handling | |
| from prawcore.exceptions import NotFound, Forbidden, Redirect, BadRequest # type: ignore | |
| except Exception: # pragma: no cover | |
| NotFound = Forbidden = Redirect = BadRequest = None # type: ignore | |
| # ------------------------- | |
| # Constants / Helpers | |
| # ------------------------- | |
| REDDIT_TITLE_MAX = 300 # Current Reddit title character cap | |
| REDDIT_BODY_MAX = 40000 # Current selftext cap | |
| PREFILL_BODY_MAX = 9500 # Conservative limit for URL prefill payload (avoid extremely long query URLs) | |
| def _truncate(s: str, limit: int) -> str: | |
| if s is None: | |
| return "" | |
| if len(s) <= limit: | |
| return s | |
| return s[: max(0, limit - 1)] + "…" | |
| def _safe_join_lines(text: str) -> str: | |
| # Normalize Windows/Mac newlines, ensure body is clean | |
| return (text or "").replace("\r\n", "\n").replace("\r", "\n").strip() | |
| _MD_IMAGE_RE = re.compile(r"!\[(?P<alt>[^\]]*)\]\((?P<url>https?://[^)\s]+)\)") | |
| def _extract_markdown_images(md: str) -> List[Tuple[str, str]]: | |
| """Return list of (alt, url) for markdown image tags in the given text.""" | |
| out: List[Tuple[str, str]] = [] | |
| if not md: | |
| return out | |
| for m in _MD_IMAGE_RE.finditer(md): | |
| alt = m.group("alt") or "" | |
| url = m.group("url") | |
| out.append((alt, url)) | |
| return out | |
| def _strip_markdown_images(md: str) -> str: | |
| """Remove markdown image tags from text (keeps the rest).""" | |
| if not md: | |
| return md | |
| return _MD_IMAGE_RE.sub("", md).strip() | |
| def _normalize_sub_name(s: str) -> str: | |
| s = s.strip() | |
| s = s.removeprefix("r/").removeprefix("/r/") | |
| return s | |
| def _prefill_submit_url( | |
| subreddit: str, | |
| *, | |
| title: str = "", | |
| text: Optional[str] = None, | |
| url: Optional[str] = None, | |
| nsfw: bool = False, | |
| spoiler: bool = False, | |
| kind: Optional[str] = None, | |
| ) -> str: | |
| # New Reddit submit (prefill works when logged in). Old style still honored. | |
| # https://www.reddit.com/r/{sub}/submit?title=...&text=... or &url=... | |
| from urllib.parse import urlencode | |
| params = {"title": title} | |
| if text is not None: | |
| params["text"] = text | |
| # Some variants also look for selftext | |
| params["selftext"] = text | |
| if url is not None: | |
| params["url"] = url | |
| # Infer kind if not explicitly provided | |
| if kind is None: | |
| if text is not None and url is None: | |
| kind = "self" | |
| elif url is not None and text is None: | |
| kind = "link" | |
| if kind: | |
| params["kind"] = kind | |
| if nsfw: | |
| params["nsfw"] = "true" | |
| if spoiler: | |
| params["spoiler"] = "true" | |
| return f"https://www.reddit.com/r/{subreddit}/submit?" + urlencode(params) | |
| class SubInfo: | |
| name: str | |
| submission_type: str # "any" | "link" | "self" | |
| allow_images: bool | |
| allow_videos: bool | |
| link_flair_enabled: bool | |
| nsfw: bool | |
| title_limit: int = REDDIT_TITLE_MAX | |
| class PlannedPost: | |
| subreddit: str | |
| post_type: str # "text" | "link" | |
| title: str | |
| body: str | |
| link: Optional[str] | |
| compose_url: str | |
| needs_flair: bool | |
| flair_id: Optional[str] = None | |
| flair_text: Optional[str] = None | |
| notes: str = "" | |
| images: Optional[List[Tuple[str, str]]] = None # list of (alt, url) found in body | |
| local_images: Optional[List[str]] = None # local file paths uploaded via UI | |
| local_captions: Optional[List[str]] = None # optional captions, aligned with local_images | |
| def _ensure_praw_available() -> Optional[str]: | |
| if praw is None: | |
| return "PRAW library not available. Install 'praw' in this environment." | |
| return None | |
| def create_reddit(client_id: str, client_secret: str, user_agent: str, username: str, password: str): | |
| missing = [k for k, v in { | |
| "client_id": client_id, | |
| "client_secret": client_secret, | |
| "user_agent": user_agent, | |
| "username": username, | |
| "password": password, | |
| }.items() if not v] | |
| if missing: | |
| raise ValueError(f"Missing required fields: {', '.join(missing)}") | |
| reddit = praw.Reddit( | |
| client_id=client_id, | |
| client_secret=client_secret, | |
| user_agent=user_agent, | |
| username=username, | |
| password=password, | |
| ) | |
| # Lightweight auth check | |
| _ = reddit.user.me() | |
| return reddit | |
| def fetch_sub_info(reddit, sub: str) -> SubInfo: | |
| sr = reddit.subreddit(sub) | |
| # Accessing properties triggers API requests; keep minimal | |
| return SubInfo( | |
| name=sr.display_name, | |
| submission_type=str(sr.submission_type or "any"), | |
| allow_images=bool(getattr(sr, "allow_images", False)), | |
| allow_videos=bool(getattr(sr, "allow_videos", False)), | |
| link_flair_enabled=bool(getattr(sr, "link_flair_enabled", False)), | |
| nsfw=bool(sr.over18), | |
| title_limit=REDDIT_TITLE_MAX, | |
| ) | |
| def list_link_flairs(reddit, sub: str) -> List[Dict[str, str]]: | |
| try: | |
| sr = reddit.subreddit(sub) | |
| # PRAW: returns a ListingGenerator of flair templates | |
| # We convert to simple dicts with id and text | |
| flairs = [] | |
| for f in sr.flair.link_templates: # type: ignore[attr-defined] | |
| flairs.append({ | |
| "id": f.get("id", ""), | |
| "text": f.get("text", ""), | |
| "mod_only": f.get("mod_only", False), | |
| "required": f.get("required", False), | |
| }) | |
| return flairs | |
| except Exception: | |
| return [] | |
| def generate_plan( | |
| title_template: str, | |
| body: str, | |
| link: str | None, | |
| subs: List[SubInfo], | |
| nsfw: bool, | |
| spoiler: bool, | |
| prefer_text: bool = False, | |
| *, | |
| uploaded_image_paths: Optional[List[str]] = None, | |
| uploaded_captions: Optional[List[str]] = None, | |
| ) -> List[PlannedPost]: | |
| body_norm = _safe_join_lines(body) | |
| images = _extract_markdown_images(body_norm) | |
| plan: List[PlannedPost] = [] | |
| for si in subs: | |
| title = (title_template or "").format(subreddit=si.name, r=si.name) | |
| title = _truncate(title.strip(), si.title_limit) | |
| # Decide initial desired type (before subreddit restrictions) | |
| initial_post_type = "text" if (prefer_text or not link) else "link" | |
| final_post_type = initial_post_type | |
| use_link: Optional[str] = link if initial_post_type == "link" else None | |
| note = "" | |
| body_norm_post = body_norm | |
| # Adapt to subreddit submission constraints with auto-flip + notes | |
| allowed = (si.submission_type or "any").lower() | |
| if allowed == "self": | |
| # Text-only subreddit | |
| if final_post_type == "link": | |
| final_post_type = "text" | |
| use_link = None | |
| # Embed the link into the body so content still gets shared | |
| if link and (link not in body_norm_post): | |
| body_norm_post = body_norm_post + ("\n\n" if body_norm_post else "") + str(link) | |
| note = (note + " " if note else "") + "Auto-flipped: subreddit is text-only; embedded link into body." | |
| elif allowed == "link": | |
| # Link-only subreddit | |
| if final_post_type == "text": | |
| if link: | |
| final_post_type = "link" | |
| use_link = link | |
| note = (note + " " if note else "") + "Auto-flipped: subreddit is link-only; switched to link." | |
| else: | |
| # No URL available; we must stay text but warn the user clearly | |
| final_post_type = "text" | |
| use_link = None | |
| note = (note + " " if note else "") + "Warning: subreddit is link-only but no URL provided; staying text (composer may reject)." | |
| else: | |
| # 'any' or unknown -> keep initial choice | |
| if final_post_type == "text" and link and (link not in body_norm_post) and prefer_text: | |
| # When user prefers text but provided a link, include it for convenience | |
| body_norm_post = body_norm_post + ("\n\n" if body_norm_post else "") + str(link) | |
| post_type = final_post_type | |
| # Compose URL for manual posting | |
| # For very large bodies, truncate in the prefill URL but keep full body inside plan object. | |
| prefill_body = body_norm_post if post_type == "text" else None | |
| truncated_for_prefill = False | |
| if prefill_body is not None and len(prefill_body) > PREFILL_BODY_MAX: | |
| prefill_body = prefill_body[:PREFILL_BODY_MAX - 1] + "…" | |
| truncated_for_prefill = True | |
| compose_url = _prefill_submit_url( | |
| si.name, | |
| title=title, | |
| text=(prefill_body if post_type == "text" else None), | |
| url=use_link if post_type == "link" else None, | |
| nsfw=nsfw, | |
| spoiler=spoiler, | |
| kind=("self" if post_type == "text" else "link"), | |
| ) | |
| flairs = list_link_flairs(_STATE.reddit, si.name) if _STATE.reddit else [] | |
| needs_flair = any(f.get("required", False) for f in flairs) | |
| if needs_flair: | |
| note = (note + " " if note else "") + "Flair required; you'll need to pick one in the composer." | |
| # Body cap (for preview/storage; link posts keep body for preview/reference only) | |
| use_body = _truncate(body_norm_post, REDDIT_BODY_MAX) | |
| if truncated_for_prefill: | |
| note = (note + " " if note else "") + f"Prefill truncated to {PREFILL_BODY_MAX} chars for URL; full body preserved.".strip() | |
| chosen = _STATE.selected_flairs.get(si.name.lower()) if hasattr(_STATE, 'selected_flairs') else None | |
| flair_id = chosen.get("id") if chosen else None | |
| flair_text = chosen.get("text") if chosen else None | |
| if flair_id and note: | |
| note = f"{note} Flair selected: {flair_text}".strip() | |
| elif flair_id and not note: | |
| note = f"Flair selected: {flair_text}" | |
| # If markdown images detected, annotate plan so the submitter can decide how to handle | |
| imgs_for_sub = images[:] if images else [] | |
| if imgs_for_sub: | |
| if si.allow_images: | |
| note = (note + " " if note else "") + f"Detected {len(imgs_for_sub)} image(s) in body. API can convert to image/gallery post." | |
| else: | |
| note = (note + " " if note else "") + f"Detected {len(imgs_for_sub)} image(s) but subreddit may not allow images; inline markdown images won't render on Reddit." | |
| # Uploaded local images (apply to all subs equally) | |
| local_imgs = [p for p in (uploaded_image_paths or []) if p] | |
| local_caps = (uploaded_captions or []) | |
| if local_imgs: | |
| if si.allow_images: | |
| note = (note + " " if note else "") + f"Using {len(local_imgs)} uploaded image(s) for media post." | |
| else: | |
| note = (note + " " if note else "") + f"Uploaded images provided but subreddit may not allow images." | |
| plan.append(PlannedPost( | |
| subreddit=si.name, | |
| post_type=post_type, | |
| title=title, | |
| body=use_body, | |
| link=use_link, | |
| compose_url=compose_url, | |
| needs_flair=needs_flair, | |
| flair_id=flair_id, | |
| flair_text=flair_text, | |
| notes=note, | |
| images=imgs_for_sub, | |
| local_images=local_imgs if local_imgs else None, | |
| local_captions=local_caps if local_caps else None, | |
| )) | |
| return plan | |
| def open_compose_tabs(plan: List[PlannedPost], delay_seconds: float = 2.0) -> str: | |
| """Open each compose URL in the default browser with delay.""" | |
| count = 0 | |
| for p in plan: | |
| webbrowser.open_new_tab(p.compose_url) | |
| count += 1 | |
| if delay_seconds > 0 and count < len(plan): | |
| time.sleep(delay_seconds) | |
| return f"Opened {count} compose tabs. Review and submit manually to respect sub rules." | |
| """Submission utilities (advanced throttled version defined below).""" | |
| # Throttled submit implementation (replaces basic version above) | |
| def submit_via_api( | |
| reddit, | |
| plan: List[PlannedPost], | |
| *, | |
| per_post_delay: float = 15.0, # used if no throttle config set | |
| apply_nsfw_flag: bool = False, | |
| spoiler: bool = False, | |
| duplicate_window_days: int = 14, | |
| skip_duplicates: bool = True, | |
| convert_markdown_images: bool = False, | |
| comment_text_under_media: bool = True, | |
| ): | |
| """Submit posts via Reddit API (text/link) applying advanced throttle rules if configured. | |
| Yields log lines as they are generated for real-time display. | |
| """ | |
| if reddit is None: | |
| yield "Not authenticated." | |
| return | |
| cfg: ThrottleConfig = _STATE.throttle or ThrottleConfig(min_delay=per_post_delay) | |
| ordered = _enforce_order(plan, cfg) | |
| for idx, p in enumerate(ordered, start=1): | |
| if _posts_in_window(cfg) >= cfg.global_max_posts: | |
| yield f"🚫 Global cap {cfg.global_max_posts} reached in last {cfg.global_window_minutes}m — stopping." | |
| break | |
| # Duplicate guard | |
| is_dup, reason = _is_duplicate_recent(reddit, p.subreddit, p.title, (p.link if p.post_type == "link" else None), duplicate_window_days) | |
| if is_dup: | |
| msg = f"[{idx}/{len(plan)}] r/{p.subreddit}: Duplicate detected ({reason})." | |
| if skip_duplicates: | |
| yield msg + " Skipping." | |
| _STATE.post_history.append({ | |
| "timestamp": _now_ts().isoformat(), | |
| "subreddit": p.subreddit, | |
| "method": "duplicate_skip", | |
| "reason": reason or "duplicate", | |
| }) | |
| continue | |
| else: | |
| yield msg + " Proceeding due to settings." | |
| # Collect throttle logs in a temp list and yield them | |
| throttle_logs: List[str] = [] | |
| _sleep_before_post(p.subreddit, cfg, throttle_logs, idx) | |
| for log in throttle_logs: | |
| yield log | |
| try: | |
| sr = reddit.subreddit(p.subreddit) | |
| submit_kwargs = {"title": p.title} | |
| # flair (prefer id) | |
| if p.flair_id: | |
| submit_kwargs["flair_id"] = p.flair_id | |
| elif p.flair_text: | |
| submit_kwargs["flair_text"] = p.flair_text | |
| if p.post_type == "text": | |
| # Prefer uploaded local images; otherwise optionally convert markdown images. | |
| si = _STATE.sub_info_cache.get(p.subreddit.lower(), SubInfo(p.subreddit, "any", False, False, False, False)) | |
| if (p.local_images and si.allow_images) or (convert_markdown_images and p.images and si.allow_images): | |
| sub_logs: List[str] = [] | |
| submission = _submit_images_with_ratelimit(sr, p, sub_logs) | |
| for log in sub_logs: | |
| yield log | |
| if submission and comment_text_under_media: | |
| try: | |
| body_no_imgs = _strip_markdown_images(p.body) | |
| if body_no_imgs: | |
| submission.reply(body_no_imgs) | |
| except Exception: | |
| pass | |
| else: | |
| submit_kwargs["selftext"] = p.body | |
| sub_logs = [] | |
| submission = _submit_with_ratelimit(sr, submit_kwargs, sub_logs) | |
| for log in sub_logs: | |
| yield log | |
| elif p.post_type == "link" and p.link: | |
| submit_kwargs["url"] = p.link | |
| sub_logs = [] | |
| submission = _submit_with_ratelimit(sr, submit_kwargs, sub_logs) | |
| for log in sub_logs: | |
| yield log | |
| else: | |
| yield f"[{idx}/{len(plan)}] r/{p.subreddit}: Skipped unsupported type '{p.post_type}'." | |
| continue | |
| if apply_nsfw_flag: | |
| try: submission.mod.nsfw() | |
| except Exception: pass | |
| if spoiler: | |
| try: submission.mod.spoiler() | |
| except Exception: pass | |
| yield f"[{idx}/{len(plan)}] r/{p.subreddit}: Submitted (id={submission.id})." | |
| _STATE.post_history.append({ | |
| "timestamp": _now_ts().isoformat(), | |
| "subreddit": p.subreddit, | |
| "method": "api", | |
| "id": submission.id, | |
| }) | |
| except Exception as e: | |
| yield f"[{idx}/{len(plan)}] r/{p.subreddit}: ERROR {e}" | |
| _STATE.post_history.append({ | |
| "timestamp": _now_ts().isoformat(), | |
| "subreddit": p.subreddit, | |
| "method": "api_error", | |
| "error": str(e), | |
| }) | |
| def _download_image_to_temp(url: str, tmpdir: Path, idx: int) -> Optional[Path]: | |
| try: | |
| resp = requests.get(url, timeout=30) | |
| resp.raise_for_status() | |
| # Guess extension from content-type or url | |
| ext = None | |
| ctype = resp.headers.get("content-type", "").lower() | |
| if "image/" in ctype: | |
| ext = "." + ctype.split("/", 1)[1].split(";")[0].strip() | |
| if ext in {".jpeg"}: ext = ".jpg" | |
| if not ext: | |
| for cand in [".png", ".jpg", ".jpeg", ".gif", ".webp"]: | |
| if url.lower().endswith(cand): | |
| ext = ".jpg" if cand == ".jpeg" else cand | |
| break | |
| if not ext: | |
| ext = ".jpg" | |
| fname = tmpdir / f"img_{idx}{ext}" | |
| with open(fname, "wb") as f: | |
| f.write(resp.content) | |
| return fname | |
| except Exception: | |
| return None | |
| def _submit_images_with_ratelimit(sr, p: PlannedPost, logs: List[str]): | |
| """Submit a single image or gallery. If local images are provided, use them; otherwise download from body URLs.""" | |
| tmpdir = Path(tempfile.mkdtemp(prefix="reddit_poster_")) | |
| files: List[Tuple[str, Path]] = [] # (caption, path) | |
| try: | |
| if p.local_images: | |
| # Use uploaded local images first | |
| caps = p.local_captions or [] | |
| for i, path_str in enumerate(p.local_images): | |
| try: | |
| path = Path(path_str) | |
| cap = caps[i] if i < len(caps) else "" | |
| if path.exists(): | |
| files.append((cap, path)) | |
| except Exception: | |
| continue | |
| else: | |
| # Fall back to downloading images from markdown URLs | |
| for i, (alt, url) in enumerate(p.images or []): | |
| path = _download_image_to_temp(url, tmpdir, i) | |
| if path: | |
| files.append((alt, path)) | |
| if not files: | |
| # Fallback to text submit if no files were fetched | |
| submit_kwargs = {"title": p.title, "selftext": p.body} | |
| # preserve flair | |
| if p.flair_id: | |
| submit_kwargs["flair_id"] = p.flair_id | |
| elif p.flair_text: | |
| submit_kwargs["flair_text"] = p.flair_text | |
| return _submit_with_ratelimit(sr, submit_kwargs, logs) | |
| # One image -> submit_image; many -> submit_gallery | |
| if len(files) == 1: | |
| caption, path = files[0] | |
| attempts = 0 | |
| while True: | |
| try: | |
| attempts += 1 | |
| return sr.submit_image(title=p.title, image_path=str(path), flair_id=p.flair_id if p.flair_id else None, flair_text=p.flair_text if (p.flair_text and not p.flair_id) else None) | |
| except Exception as e: | |
| if RateLimitExceeded and isinstance(e, RateLimitExceeded): | |
| sleep_s = getattr(e, "sleep_time", None) or 60 | |
| logs.append(f"⛔ Rate limited (image): sleeping {int(sleep_s)}s.") | |
| time.sleep(float(sleep_s)) | |
| if attempts < 3: | |
| continue | |
| raise | |
| else: | |
| # Build images payload: list of dicts with image_path and optional caption | |
| images_payload = [] | |
| for cap, path in files: | |
| item = {"image_path": str(path)} | |
| if cap: | |
| item["caption"] = cap[:180] | |
| images_payload.append(item) | |
| attempts = 0 | |
| while True: | |
| try: | |
| attempts += 1 | |
| return sr.submit_gallery(title=p.title, images=images_payload, flair_id=p.flair_id if p.flair_id else None, flair_text=p.flair_text if (p.flair_text and not p.flair_id) else None) | |
| except Exception as e: | |
| if RateLimitExceeded and isinstance(e, RateLimitExceeded): | |
| sleep_s = getattr(e, "sleep_time", None) or 60 | |
| logs.append(f"⛔ Rate limited (gallery): sleeping {int(sleep_s)}s.") | |
| time.sleep(float(sleep_s)) | |
| if attempts < 3: | |
| continue | |
| raise | |
| finally: | |
| try: | |
| shutil.rmtree(tmpdir, ignore_errors=True) | |
| except Exception: | |
| pass | |
| def ui_open_tabs(open_delay: float): | |
| if not _STATE.plan_cache: | |
| return "No plan generated yet." | |
| msg = open_compose_tabs(_STATE.plan_cache, delay_seconds=float(open_delay)) | |
| return msg | |
| def ui_submit_api(api_delay: float, nsfw_flag: bool, spoiler_flag: bool, dup_days: int = 14, skip_dups: bool = True, convert_md_images: bool = True, comment_body_under_media: bool = True): | |
| if _STATE.reddit is None: | |
| yield "Not authenticated." | |
| return | |
| if not _STATE.plan_cache: | |
| yield "No plan generated. Generate plan before submitting." | |
| return | |
| # Accumulate logs for display | |
| accumulated_logs = [] | |
| for log_line in submit_via_api( | |
| _STATE.reddit, | |
| _STATE.plan_cache, | |
| per_post_delay=float(api_delay), | |
| apply_nsfw_flag=bool(nsfw_flag), | |
| spoiler=bool(spoiler_flag), | |
| duplicate_window_days=int(dup_days), | |
| skip_duplicates=bool(skip_dups), | |
| convert_markdown_images=bool(convert_md_images), | |
| comment_text_under_media=bool(comment_body_under_media), | |
| ): | |
| accumulated_logs.append(log_line) | |
| yield "\n".join(accumulated_logs) | |
| # ------------------------- | |
| # Gradio App | |
| # ------------------------- | |
| class _State: | |
| reddit = None # praw.Reddit | None | |
| sub_info_cache: Dict[str, SubInfo] = {} | |
| flairs_cache: Dict[str, List[Dict[str, str]]] = {} | |
| selected_flairs: Dict[str, Dict[str, str]] = {} | |
| plan_cache: List[PlannedPost] = [] | |
| throttle = None # ThrottleConfig | None | |
| post_history: List[Dict[str, str]] = [] # simple in-memory log entries | |
| verbose: bool = True # toggle for noisy console logging | |
| me_name: Optional[str] = None # authenticated username for duplicate checks | |
| # Track subreddits omitted from the rules table (e.g., banned, 404, etc.) | |
| omitted_subs: List[Dict[str, str]] = [] | |
| _STATE = _State() | |
| # ------------------------- | |
| # Advanced Throttling Utilities | |
| # ------------------------- | |
| from datetime import datetime, timedelta | |
| import random | |
| class ThrottleConfig: | |
| """Configuration controlling automated throttling when submitting via API. | |
| Attributes: | |
| min_delay: Base minimum seconds between ANY two posts (global). | |
| jitter: Random +/- seconds added to each enforced delay (prevents pattern detection). | |
| per_sub_cooldown: Minutes before another post to the same subreddit. | |
| global_window_minutes: Rolling window size in minutes for global cap. | |
| global_max_posts: Maximum posts allowed inside the rolling window (defensive). | |
| randomize_order: Shuffle plan order prior to posting. | |
| """ | |
| min_delay: float = 20.0 | |
| jitter: float = 5.0 | |
| per_sub_cooldown: float = 60.0 | |
| global_window_minutes: float = 30.0 | |
| global_max_posts: int = 8 | |
| randomize_order: bool = False | |
| def to_dict(self) -> Dict[str, object]: | |
| return asdict(self) | |
| def _now_ts() -> datetime: | |
| return datetime.utcnow() | |
| def _posts_in_window(cfg: ThrottleConfig) -> int: | |
| cutoff = _now_ts() - timedelta(minutes=cfg.global_window_minutes) | |
| return sum(1 for p in _STATE.post_history if datetime.fromisoformat(p["timestamp"]) >= cutoff) | |
| def _last_post_time() -> Optional[datetime]: | |
| if not _STATE.post_history: | |
| return None | |
| return datetime.fromisoformat(_STATE.post_history[-1]["timestamp"]) | |
| def _last_post_time_for_sub(sub: str) -> Optional[datetime]: | |
| for entry in reversed(_STATE.post_history): | |
| if entry.get("subreddit") == sub: | |
| return datetime.fromisoformat(entry["timestamp"]) | |
| return None | |
| def _enforce_order(plan: List[PlannedPost], cfg: ThrottleConfig) -> List[PlannedPost]: | |
| if not cfg.randomize_order: | |
| return plan | |
| shuffled = plan[:] | |
| random.shuffle(shuffled) | |
| return shuffled | |
| def _sleep_before_post(sub: str, cfg: ThrottleConfig, logs: List[str], idx: int): | |
| now = _now_ts() | |
| delay_needed = 0.0 | |
| last_global = _last_post_time() | |
| if last_global is not None: | |
| elapsed = (now - last_global).total_seconds() | |
| if elapsed < cfg.min_delay: | |
| delay_needed = cfg.min_delay - elapsed | |
| last_sub = _last_post_time_for_sub(sub) | |
| if last_sub is not None: | |
| sub_elapsed = (now - last_sub).total_seconds() | |
| sub_needed = (cfg.per_sub_cooldown * 60.0) - sub_elapsed | |
| if sub_needed > delay_needed: | |
| delay_needed = sub_needed | |
| # Apply jitter | |
| if cfg.jitter > 0: | |
| delay_needed += random.uniform(-cfg.jitter, cfg.jitter) | |
| if delay_needed < 0: | |
| delay_needed = 0 | |
| if delay_needed > 0 and idx > 1: # skip sleeping before first post | |
| logs.append(f"⏳ Throttle: sleeping {delay_needed:.1f}s before posting to r/{sub} (global + per-sub rules).") | |
| time.sleep(delay_needed) | |
| # ------------------------- | |
| # Duplicate Guard Utilities | |
| # ------------------------- | |
| def _normalize_title(s: str) -> str: | |
| s = (s or "").strip().casefold() | |
| # collapse whitespace | |
| s = re.sub(r"\s+", " ", s) | |
| return s | |
| def _normalize_url(u: Optional[str]) -> Optional[str]: | |
| if not u: | |
| return None | |
| try: | |
| parts = urlsplit(u) | |
| scheme = parts.scheme.lower() or "https" | |
| netloc = parts.netloc.lower() | |
| path = parts.path.rstrip("/") | |
| # remove tracking params commonly used | |
| q = [] | |
| for k, v in parse_qsl(parts.query, keep_blank_values=True): | |
| lk = k.lower() | |
| if lk.startswith("utm_") or lk in {"ref", "ref_source", "source"}: | |
| continue | |
| q.append((k, v)) | |
| query = "&".join([f"{k}={v}" for k, v in q]) | |
| return urlunsplit((scheme, netloc, path, query, "")) | |
| except Exception: | |
| return u | |
| def _is_duplicate_recent(reddit, subreddit: str, title: str, link: Optional[str], window_days: int) -> Tuple[bool, Optional[str]]: | |
| """Return (is_dup, reason) against the authenticated user's recent submissions.""" | |
| try: | |
| if reddit is None or not getattr(_STATE, "me_name", None): | |
| return False, None | |
| me = reddit.redditor(_STATE.me_name) | |
| cutoff_ts = time.time() - (window_days * 86400) | |
| target_title = _normalize_title(title) | |
| target_url = _normalize_url(link) | |
| for subm in me.submissions.new(limit=200): | |
| try: | |
| if getattr(subm.subreddit, "display_name", "").lower() != subreddit.lower(): | |
| continue | |
| if getattr(subm, "created_utc", 0) < cutoff_ts: | |
| break # listing is newest first | |
| # same title? | |
| if target_title and _normalize_title(getattr(subm, "title", "")) == target_title: | |
| return True, "same title in recent submissions" | |
| # same URL? (only meaningful for link posts) | |
| if target_url: | |
| subm_url = _normalize_url(getattr(subm, "url", None)) | |
| if subm_url and subm_url == target_url: | |
| return True, "same URL in recent submissions" | |
| except Exception: | |
| continue | |
| return False, None | |
| except Exception: | |
| return False, None | |
| # ------------------------- | |
| # Rate-limit aware submit | |
| # ------------------------- | |
| def _submit_with_ratelimit(sr, submit_kwargs: Dict[str, object], logs: List[str]): | |
| """Call sr.submit with automatic handling for RateLimitExceeded. | |
| Sleeps exactly the time suggested by Reddit and retries (up to 3 attempts). | |
| This function intentionally does not call throttle sleeps to avoid double-sleeping. | |
| """ | |
| attempts = 0 | |
| while True: | |
| try: | |
| attempts += 1 | |
| return sr.submit(**submit_kwargs) | |
| except Exception as e: | |
| # Handle rate limit explicitly if available | |
| if RateLimitExceeded and isinstance(e, RateLimitExceeded): # type: ignore[arg-type] | |
| # Prefer provided sleep_time; otherwise parse from message | |
| sleep_s = getattr(e, "sleep_time", None) | |
| if not sleep_s: | |
| # Parse patterns like "in 5 minutes" or "in 43 seconds" | |
| m = re.search(r"in\s+(\d+)\s+(second|seconds|minute|minutes)", str(e), re.I) | |
| if m: | |
| val = int(m.group(1)) | |
| unit = m.group(2).lower() | |
| sleep_s = val * (60 if unit.startswith("minute") else 1) | |
| if not sleep_s: | |
| sleep_s = 60 # fallback 1 minute | |
| logs.append(f"⛔ Rate limited: sleeping {int(sleep_s)}s exactly as requested by Reddit.") | |
| time.sleep(float(sleep_s)) | |
| if attempts < 3: | |
| continue | |
| # Non-rate limit or max attempts reached | |
| raise | |
| def ui_set_throttle(min_delay, jitter, per_sub_cooldown, global_window, global_cap, randomize): | |
| cfg = ThrottleConfig( | |
| min_delay=float(min_delay), | |
| jitter=float(jitter), | |
| per_sub_cooldown=float(per_sub_cooldown), | |
| global_window_minutes=float(global_window), | |
| global_max_posts=int(global_cap), | |
| randomize_order=bool(randomize), | |
| ) | |
| _STATE.throttle = cfg | |
| return json.dumps(cfg.to_dict(), indent=2) | |
| def ui_connect(client_id, client_secret, user_agent, username, password): | |
| err = _ensure_praw_available() | |
| if err: | |
| return gr.update(value=f"❌ {err}"), None | |
| try: | |
| reddit = create_reddit(client_id, client_secret, user_agent, username, password) | |
| _STATE.reddit = reddit | |
| me = reddit.user.me() | |
| _STATE.me_name = str(me) | |
| # Reset omitted list on new auth | |
| _STATE.omitted_subs = [] | |
| return f"✅ Authenticated as u/{me}", json.dumps({"user": str(me)}, indent=2) | |
| except Exception as e: | |
| _STATE.reddit = None | |
| return f"❌ Authentication failed: {e}", None | |
| def ui_fetch_rules(subs_text: str, per_request_delay: float = 1.0): | |
| """Fetch basic posting constraints AND cache available link flairs for each subreddit.""" | |
| if _STATE.reddit is None: | |
| return "Not authenticated.", gr.update(), gr.update(value=""), gr.update(headers=["subreddit", "reason"], value=[]) | |
| subs = [_normalize_sub_name(s) for s in subs_text.splitlines() if s.strip()] | |
| if _STATE.verbose: | |
| print(f"[fetch_rules] Request for {len(subs)} subs") | |
| infos: List[SubInfo] = [] | |
| rows: List[List[str]] = [] | |
| omitted: List[Dict[str, str]] = [] | |
| for i, name in enumerate(subs, start=1): | |
| cache_key = name.lower() | |
| try: | |
| # First, check if user is banned from this subreddit; this requires fetching the about data. | |
| sr = _STATE.reddit.subreddit(name) | |
| is_banned = bool(getattr(sr, "user_is_banned", False)) | |
| if is_banned: | |
| omitted.append({"subreddit": name, "reason": "banned"}) | |
| if _STATE.verbose: | |
| print(f" [{i}/{len(subs)}] r/{name}: omitted (banned)") | |
| continue | |
| # Fetch SubInfo (will also trigger network and raise if 404, etc.) | |
| if cache_key in _STATE.sub_info_cache: | |
| si = _STATE.sub_info_cache[cache_key] | |
| else: | |
| si = fetch_sub_info(_STATE.reddit, name) | |
| _STATE.sub_info_cache[cache_key] = si | |
| # Fetch flairs if flair enabled | |
| flairs: List[Dict[str, str]] = [] | |
| if si.link_flair_enabled: | |
| flairs = list_link_flairs(_STATE.reddit, si.name) | |
| _STATE.flairs_cache[cache_key] = flairs | |
| infos.append(si) | |
| rows.append([ | |
| si.name, | |
| si.submission_type, | |
| "yes" if si.allow_images else "no", | |
| "yes" if si.allow_videos else "no", | |
| "yes" if si.link_flair_enabled else "no", | |
| "yes" if si.nsfw else "no", | |
| ]) | |
| if _STATE.verbose: | |
| print(f" [{i}/{len(subs)}] r/{si.name}: flair_templates={len(_STATE.flairs_cache.get(cache_key, []))}") | |
| time.sleep(float(per_request_delay)) | |
| except Exception as e: | |
| # Classify common error cases | |
| reason = None | |
| etype = type(e).__name__ | |
| if NotFound and isinstance(e, NotFound): | |
| reason = "not found (404)" | |
| elif Forbidden and isinstance(e, Forbidden): | |
| reason = "forbidden (403)" | |
| elif Redirect and isinstance(e, Redirect): | |
| reason = "redirect/invalid subreddit" | |
| else: | |
| reason = f"error: {etype}" | |
| omitted.append({"subreddit": name, "reason": reason}) | |
| if _STATE.verbose: | |
| print(f" OMIT {name}: {reason}") | |
| # Save omitted list to state | |
| _STATE.omitted_subs = omitted | |
| summary = f"Fetched {len(infos)} subreddits." | |
| omit_summary = ( | |
| f"Omitted {len(omitted)} subreddit(s)." if omitted else "No subreddits were omitted." | |
| ) | |
| return ( | |
| summary, | |
| gr.update(headers=["subreddit", "allowed", "images", "videos", "flair", "nsfw"], value=rows), | |
| gr.update(value=omit_summary), | |
| gr.update(headers=["subreddit", "reason"], value=[[o["subreddit"], o["reason"]] for o in omitted]), | |
| ) | |
| def ui_generate_plan(title_tmpl: str, body: str, link: str | None, nsfw: bool, spoiler: bool, prefer_text: bool, subs_table_rows, uploaded_images_paths=None, uploaded_captions_text: str = ""): | |
| if _STATE.reddit is None: | |
| return "Not authenticated.", gr.update() | |
| # Build SubInfo list from cache using rows | |
| # Convert incoming value (could be pandas DataFrame, list, None) into list of lists | |
| rows_input = subs_table_rows | |
| if rows_input is None: | |
| resolved_rows: List[List[str]] = [] | |
| elif hasattr(rows_input, "values") and hasattr(rows_input, "columns"): | |
| # Likely a pandas DataFrame | |
| try: | |
| resolved_rows = rows_input.values.tolist() | |
| except Exception: | |
| resolved_rows = [] | |
| elif isinstance(rows_input, list): | |
| resolved_rows = rows_input | |
| else: | |
| resolved_rows = [] | |
| subs: List[SubInfo] = [] | |
| if _STATE.verbose: | |
| print(f"[generate_plan] Received raw type={type(rows_input)} row_count={len(resolved_rows)}") | |
| if len(resolved_rows) > 0: | |
| print(f"[generate_plan] Sample first row={resolved_rows[0]}") | |
| print(f"[generate_plan] Current cache keys={list(_STATE.sub_info_cache.keys())}") | |
| for r in resolved_rows: | |
| if not r: | |
| continue | |
| name = r[0] | |
| # attempt direct key, then lower-case fallback | |
| si = _STATE.sub_info_cache.get(name) or _STATE.sub_info_cache.get(str(name).lower()) | |
| if si: | |
| subs.append(si) | |
| elif _STATE.verbose: | |
| print(f"[generate_plan] No cache hit for row name='{name}' (tried '{str(name).lower()}')") | |
| if not subs and resolved_rows and _STATE.verbose: | |
| print("[generate_plan] WARNING: No SubInfo objects resolved; likely a cache key case mismatch or empty table value.") | |
| # Handle uploaded images/captions | |
| up_paths: Optional[List[str]] = None | |
| if isinstance(uploaded_images_paths, list) and uploaded_images_paths: | |
| up_paths = [str(p) for p in uploaded_images_paths if p] | |
| caps_list: Optional[List[str]] = None | |
| if isinstance(uploaded_captions_text, str) and uploaded_captions_text.strip(): | |
| caps_list = [ln.strip() for ln in uploaded_captions_text.splitlines()] | |
| plan = generate_plan( | |
| title_tmpl, | |
| body, | |
| link or None, | |
| subs, | |
| nsfw, | |
| spoiler, | |
| prefer_text=prefer_text, | |
| uploaded_image_paths=up_paths, | |
| uploaded_captions=caps_list, | |
| ) | |
| _STATE.plan_cache = plan | |
| headers = ["subreddit", "type", "title_len", "body_len", "needs_flair", "flair", "notes"] | |
| rows = [] | |
| for p in plan: | |
| flair_display = p.flair_text or (p.flair_id if p.flair_id else "") | |
| rows.append([ | |
| p.subreddit, | |
| p.post_type, | |
| str(len(p.title)), | |
| str(len(p.body)), | |
| "yes" if p.needs_flair else "no", | |
| flair_display, | |
| p.notes, | |
| ]) | |
| if _STATE.verbose: | |
| print(f"[generate_plan] Planned {len(plan)} posts.") | |
| if len(plan) > 0: | |
| print("[generate_plan] First plan entry sample:", asdict(plan[0])) | |
| return f"Planned {len(plan)} posts.", gr.update(headers=headers, value=rows) | |
| # ------------------------- | |
| # UI Construction | |
| # ------------------------- | |
| with gr.Blocks(title="Reddit Poster", fill_width=True, theme="Nymbo/Nymbo_Theme") as demo: | |
| gr.Markdown(""" | |
| # Reddit Poster | |
| Plan compliant posts across many subreddits, with throttling and safe review. | |
| ## WARNING: YOU CAN EASILY GET YOURSELF BANNED FROM REDDIT BY SPAMMING. USE WITH EXTREME CAUTION. | |
| 1) Connect to Reddit with your app credentials. 2) Paste subreddits and fetch constraints. 3) Provide your title/body. 4) Generate a Plan. 5) Open compose tabs or submit via API. | |
| """) | |
| with gr.Accordion("1) Credentials", open=True): | |
| with gr.Row(): | |
| client_id = gr.Textbox(label="Client ID", value=os.getenv("REDDIT_CLIENT_ID", "")) | |
| client_secret = gr.Textbox(label="Client Secret", type="password", value=os.getenv("REDDIT_CLIENT_SECRET", "")) | |
| user_agent = gr.Textbox(label="User Agent", value=os.getenv("REDDIT_USER_AGENT", "RedditPoster/1.0 by u/your_username")) | |
| with gr.Row(): | |
| username = gr.Textbox(label="Username", value=os.getenv("REDDIT_USERNAME", "")) | |
| password = gr.Textbox(label="Password", type="password", value=os.getenv("REDDIT_PASSWORD", "")) | |
| connect_btn = gr.Button("Connect") | |
| auth_status = gr.Markdown() | |
| auth_info = gr.JSON(label="Auth Info", visible=False) | |
| with gr.Accordion("2) Subreddits", open=True): | |
| subs_text = gr.Textbox(label="Subreddits (one per line)", placeholder="technology\npython\nlearnprogramming") | |
| with gr.Row(): | |
| per_req_delay = gr.Slider(0, 5, value=1.0, step=0.5, label="Delay between rule fetches (sec)") | |
| fetch_btn = gr.Button("Fetch Rules") | |
| rules_summary = gr.Markdown() | |
| subs_table = gr.Dataframe(headers=["subreddit", "allowed", "images", "videos", "flair", "nsfw"], value=[], interactive=False) | |
| gr.Markdown("### Omitted Subreddits") | |
| omitted_summary = gr.Markdown(value="No subreddits were omitted.") | |
| omitted_table = gr.Dataframe(headers=["subreddit", "reason"], value=[], interactive=False) | |
| def ui_preview_post(index: int): | |
| # Return markdown preview for selected planned post | |
| if not _STATE.plan_cache: | |
| return "No plan yet. Generate a plan first." | |
| if index < 0 or index >= len(_STATE.plan_cache): | |
| return f"Index out of range (0-{len(_STATE.plan_cache)-1})." | |
| p = _STATE.plan_cache[index] | |
| md = [f"### Preview: r/{p.subreddit}"] | |
| md.append(f"**Type:** {p.post_type}") | |
| md.append(f"**Title ({len(p.title)} chars):** {p.title}") | |
| if p.post_type == "text": | |
| md.append("\n**Body:**\n\n" + p.body) | |
| else: | |
| if p.link: | |
| md.append(f"\n**Link:** {p.link}") | |
| if p.body: | |
| md.append("\n(Body not submitted; shown here only for reference)\n\n" + p.body) | |
| # Media info | |
| if getattr(p, 'local_images', None): | |
| md.append(f"\nWill upload {len(p.local_images)} local image(s) on submit.") | |
| elif getattr(p, 'images', None): | |
| md.append(f"\nWill attempt to fetch {len(p.images)} image(s) from URLs on submit.") | |
| if p.needs_flair: | |
| md.append("\n> ⚠️ This subreddit requires a link flair. You'll need to choose one manually.") | |
| return "\n\n".join(md) | |
| with gr.Accordion("3) Content", open=True): | |
| title_tmpl = gr.Textbox(label="Title Template", placeholder="New update for {subreddit}: Feature X is live!", info="You can use {subreddit}.") | |
| body = gr.Textbox(label="Body (Markdown)", lines=12) | |
| link = gr.Textbox(label="Link (optional)") | |
| with gr.Row(): | |
| nsfw = gr.Checkbox(label="NSFW") | |
| spoiler = gr.Checkbox(label="Spoiler") | |
| prefer_text = gr.Checkbox(label="Prefer text posts (embed link in body)", value=False) | |
| with gr.Accordion("Optional: Upload images for media post", open=False): | |
| uploaded_images = gr.Files(label="Upload images (1-20)", file_types=["image"], type="filepath") | |
| uploaded_captions = gr.Textbox(label="Captions (optional, one per line; aligned with uploaded images)", lines=4) | |
| with gr.Accordion("4) Plan & Review", open=True): | |
| gen_btn = gr.Button("Generate Plan") | |
| plan_summary = gr.Markdown() | |
| plan_table = gr.Dataframe(headers=["subreddit", "type", "title_len", "body_len", "needs_flair", "flair", "notes"], value=[], interactive=False) | |
| export_json = gr.JSON(label="Plan JSON", value=[], visible=False) | |
| with gr.Row(): | |
| preview_index = gr.Slider(0, 0, value=0, step=1, label="Preview Index") | |
| refresh_preview = gr.Button("Show Preview") | |
| post_preview = gr.Markdown(label="Post Preview") | |
| with gr.Accordion("Flair Selection", open=False): | |
| flair_status = gr.Markdown("Load or generate a plan to see flairs.") | |
| # Pre-create placeholder radio groups for up to N subreddits | |
| MAX_FLAIR_SUBS = 50 | |
| flair_radio_groups = [] # list[tuple[gr.Markdown, gr.Radio]] | |
| for i in range(50): | |
| label_md = gr.Markdown(f"Subreddit Slot {i+1}", visible=False) | |
| radio = gr.Radio(choices=[], label="Flairs", interactive=True, visible=False) | |
| flair_radio_groups.append((label_md, radio)) | |
| with gr.Row(): | |
| apply_all_flairs_btn = gr.Button("Apply Flair Selections") | |
| clear_all_flairs_btn = gr.Button("Clear All Flairs") | |
| with gr.Accordion("5) Act", open=True): | |
| with gr.Accordion("Throttle Settings", open=False): | |
| with gr.Row(): | |
| th_min_delay = gr.Slider(0, 180, value=20.0, step=1.0, label="Min delay between posts (s)") | |
| th_jitter = gr.Slider(0, 60, value=5.0, step=0.5, label="Jitter ± (s)") | |
| with gr.Row(): | |
| th_per_sub = gr.Slider(0, 360, value=60.0, step=5.0, label="Per-subreddit cooldown (min)") | |
| th_window = gr.Slider(5, 240, value=30.0, step=5.0, label="Global window (min)") | |
| with gr.Row(): | |
| th_global_cap = gr.Slider(1, 200, value=8, step=1, label="Global max posts / window") | |
| th_random = gr.Checkbox(label="Randomize order", value=False) | |
| apply_throttle_btn = gr.Button("Apply Throttle Config") | |
| throttle_preview = gr.JSON(label="Active Throttle Config", value={}) | |
| with gr.Row(): | |
| open_delay = gr.Slider(0, 10, value=2.0, step=0.5, label="Delay between opened tabs (sec)") | |
| open_btn = gr.Button("Open Compose Tabs") | |
| open_log = gr.Textbox(label="Open Status", interactive=False) | |
| gr.Markdown("---") | |
| gr.Markdown("Submitting via API is optional and riskier. Prefer manual review via compose tabs; throttle conservatively.") | |
| with gr.Row(): | |
| api_delay = gr.Slider(0, 120, value=20.0, step=5.0, label="Delay between API submissions (sec)") | |
| nsfw_flag = gr.Checkbox(label="Mark NSFW via API") | |
| spoiler_flag = gr.Checkbox(label="Mark Spoiler via API") | |
| dup_days = gr.Slider(1, 60, value=14, step=1, label="Duplicate window (days)") | |
| skip_dups = gr.Checkbox(label="Skip duplicates (same title/URL)", value=True) | |
| with gr.Row(): | |
| convert_md_images = gr.Checkbox(label="Convert markdown images in body to image/gallery post", value=True) | |
| comment_body_under_media = gr.Checkbox(label="After posting media, comment with body text", value=True) | |
| submit_btn = gr.Button("Submit via API (text/link/image)") | |
| submit_log = gr.Textbox(label="Submission Log", lines=10) | |
| # Event wiring | |
| connect_btn.click( | |
| fn=ui_connect, | |
| inputs=[client_id, client_secret, user_agent, username, password], | |
| outputs=[auth_status, auth_info], | |
| concurrency_limit=1, | |
| api_name="connect", | |
| ) | |
| fetch_btn.click( | |
| fn=ui_fetch_rules, | |
| inputs=[subs_text, per_req_delay], | |
| outputs=[rules_summary, subs_table, omitted_summary, omitted_table], | |
| concurrency_limit=1, | |
| api_name="fetch_rules", | |
| ) | |
| def _populate_flair_radios(): | |
| """Populate per-subreddit radio groups with cached flair templates.""" | |
| subs = sorted({si.name for si in _STATE.sub_info_cache.values()}) | |
| updates = [] # interleaved label, radio updates | |
| for idx, sub in enumerate(subs): | |
| if idx >= len(flair_radio_groups): | |
| break | |
| label_md, radio = flair_radio_groups[idx] | |
| key = sub.lower() | |
| flairs = _STATE.flairs_cache.get(key, []) | |
| choices = [f"{f['text']} | {f['id']}" if f.get('text') else f['id'] for f in flairs] if flairs else [] | |
| selected = None | |
| if key in _STATE.selected_flairs: | |
| cur = _STATE.selected_flairs[key] | |
| match_label = f"{cur.get('text')} | {cur.get('id')}" if cur.get('text') else cur.get('id') | |
| if match_label in choices: | |
| selected = match_label | |
| updates.append(gr.update(value=f"### r/{sub}", visible=True)) | |
| updates.append(gr.update(choices=choices, value=selected, interactive=bool(choices), visible=True)) | |
| # Hide remaining | |
| for j in range(len(subs), len(flair_radio_groups)): | |
| label_md, radio = flair_radio_groups[j] | |
| updates.append(gr.update(value="", visible=False)) | |
| updates.append(gr.update(choices=[], value=None, interactive=False, visible=False)) | |
| status = f"Flair sets loaded for {min(len(subs), len(flair_radio_groups))} subreddits." if subs else "No subreddits loaded yet." | |
| return [status, *updates] | |
| def _apply_selected_flairs(*radio_values): | |
| # radio_values order corresponds to flair_radio_groups radios only | |
| subs = sorted({si.name for si in _STATE.sub_info_cache.values()}) | |
| for idx, sub in enumerate(subs): | |
| if idx >= len(flair_radio_groups): | |
| break | |
| choice = radio_values[idx] | |
| if not choice: | |
| continue | |
| key = sub.lower() | |
| # parse id from label | |
| if " | " in choice: | |
| text_part, id_part = choice.split(" | ", 1) | |
| _STATE.selected_flairs[key] = {"id": id_part, "text": text_part} | |
| else: | |
| _STATE.selected_flairs[key] = {"id": choice, "text": None} | |
| # Refresh plan table flair column | |
| rows = [] | |
| for p in _STATE.plan_cache: | |
| # resolve newly applied flair if any | |
| chosen = _STATE.selected_flairs.get(p.subreddit.lower()) | |
| if chosen: | |
| p.flair_id = chosen.get('id') | |
| p.flair_text = chosen.get('text') | |
| flair_display = p.flair_text or (p.flair_id if p.flair_id else "") | |
| rows.append([ | |
| p.subreddit, | |
| p.post_type, | |
| str(len(p.title)), | |
| str(len(p.body)), | |
| "yes" if p.needs_flair else "no", | |
| flair_display, | |
| p.notes, | |
| ]) | |
| return "Applied flair selections.", gr.update(value=rows) | |
| def _clear_all_flairs(): | |
| _STATE.selected_flairs.clear() | |
| for p in _STATE.plan_cache: | |
| p.flair_id = None | |
| p.flair_text = None | |
| rows = [] | |
| for p in _STATE.plan_cache: | |
| rows.append([ | |
| p.subreddit, | |
| p.post_type, | |
| str(len(p.title)), | |
| str(len(p.body)), | |
| "yes" if p.needs_flair else "no", | |
| "", | |
| p.notes, | |
| ]) | |
| return "Cleared all flairs.", gr.update(value=rows) | |
| # Build list of outputs for _populate_flair_radios: flair_status + (label, radio) pairs | |
| _populate_outputs = [flair_status] | |
| for label_md, radio in flair_radio_groups: | |
| _populate_outputs.append(label_md) | |
| _populate_outputs.append(radio) | |
| # Trigger population after fetch rules and after plan generation (in case of new subs) | |
| fetch_btn.click( | |
| fn=_populate_flair_radios, | |
| inputs=None, | |
| outputs=_populate_outputs, | |
| queue=False, | |
| ) | |
| gen_btn.click( | |
| fn=_populate_flair_radios, | |
| inputs=None, | |
| outputs=_populate_outputs, | |
| queue=False, | |
| ) | |
| # Apply / Clear buttons wiring | |
| apply_all_flairs_btn.click( | |
| fn=_apply_selected_flairs, | |
| inputs=[r for _, r in flair_radio_groups], | |
| outputs=[flair_status, plan_table], | |
| ) | |
| clear_all_flairs_btn.click( | |
| fn=_clear_all_flairs, | |
| inputs=None, | |
| outputs=[flair_status, plan_table], | |
| ) | |
| def _update_preview_slider(): | |
| # Adjust slider bounds dynamically after planning; disable if no posts | |
| count = len(_STATE.plan_cache) | |
| if count == 0: | |
| return gr.update(minimum=0, maximum=0, value=0, interactive=False) | |
| return gr.update(minimum=0, maximum=count-1, value=0, interactive=True) | |
| gen_btn.click( | |
| fn=ui_generate_plan, | |
| inputs=[title_tmpl, body, link, nsfw, spoiler, prefer_text, subs_table, uploaded_images, uploaded_captions], | |
| outputs=[plan_summary, plan_table], | |
| concurrency_limit=1, | |
| api_name="generate_plan", | |
| ).then( | |
| lambda: json.dumps([asdict(p) for p in _STATE.plan_cache], indent=2), | |
| None, | |
| export_json, | |
| ).then( | |
| _update_preview_slider, | |
| None, | |
| preview_index, | |
| ) | |
| refresh_preview.click( | |
| fn=ui_preview_post, | |
| inputs=[preview_index], | |
| outputs=[post_preview], | |
| api_name="preview_post", | |
| concurrency_limit=1, | |
| ) | |
| open_btn.click( | |
| fn=ui_open_tabs, | |
| inputs=[open_delay], | |
| outputs=[open_log], | |
| concurrency_limit=1, | |
| api_name="open_tabs", | |
| ) | |
| submit_btn.click( | |
| fn=ui_submit_api, | |
| inputs=[api_delay, nsfw_flag, spoiler_flag, dup_days, skip_dups, convert_md_images, comment_body_under_media], | |
| outputs=[submit_log], | |
| concurrency_limit=1, | |
| api_name="submit_api", | |
| ) | |
| apply_throttle_btn.click( | |
| fn=ui_set_throttle, | |
| inputs=[th_min_delay, th_jitter, th_per_sub, th_window, th_global_cap, th_random], | |
| outputs=[throttle_preview], | |
| concurrency_limit=1, | |
| api_name="set_throttle", | |
| ) | |
| demo.queue(default_concurrency_limit=1) | |
| if __name__ == "__main__": | |
| demo.launch(inbrowser=True, show_error=True) | |