import html import time import threading from datetime import datetime from functools import lru_cache from concurrent.futures import ThreadPoolExecutor, as_completed import gradio as gr import pandas as pd import requests from huggingface_hub import HfApi # ============================================================ # Constants / shared config # ============================================================ REPO_KINDS = ("model", "dataset", "space") REPO_KIND_LABELS = { "model": "Models", "dataset": "Datasets", "space": "Spaces", } REPO_KIND_EMOJIS = { "model": "🧠", "dataset": "📦", "space": "🚀", } REPO_KIND_PALETTES = { "model": "blue", "dataset": "pink", "space": "orange", } RECENT_ACTIVITY_LIMIT = 20 RECENT_ACTIVITY_MAX_PAGES = 300 RECENT_ACTIVITY_MAX_STALLED_PAGES = 5 # ============================================================ # API # ============================================================ def get_api(token: str | None = None) -> HfApi: return HfApi(token=token) if token else HfApi() # ============================================================ # HTTP helpers # ============================================================ def build_session(accept: str, token: str | None = None) -> requests.Session: session = requests.Session() session.headers.update( { "User-Agent": "hf-contributions-app/1.0", "Accept": accept, "Cache-Control": "no-cache", } ) if token: session.headers["Authorization"] = f"Bearer {token}" return session # ============================================================ # Viewer helpers # ============================================================ def get_viewer_username(profile: gr.OAuthProfile | None) -> str | None: return getattr(profile, "username", None) if profile is not None else None def is_self_view(username: str, profile: gr.OAuthProfile | None) -> bool: username = (username or "").strip() viewer = get_viewer_username(profile) return bool(viewer and username and viewer.lower() == username.lower()) # ============================================================ # Recent-activity shared helpers # ============================================================ def normalize_recent_activity_payload(payload): if isinstance(payload, list): return payload, None if isinstance(payload, dict): events = payload.get("recentActivity") if isinstance(events, list): return events, payload.get("cursor") for key in ["items", "events", "activity"]: value = payload.get(key) if isinstance(value, list): return value, payload.get("cursor") return [], None def get_year_bounds(selected_year: int): year_start = pd.Timestamp(f"{selected_year}-01-01", tz="UTC") year_end = pd.Timestamp(f"{selected_year}-12-31 23:59:59.999999", tz="UTC") return year_start, year_end def build_events_page_signature(events: list[dict]) -> tuple: signature = [] for event in events: if not isinstance(event, dict): continue signature.append( ( event.get("eventId"), event.get("time"), event.get("repoId"), event.get("type"), event.get("targetUrl"), ) ) return tuple(signature) def parse_recent_activity_event_ts(event: dict): raw_time = event.get("time") if not raw_time: return None try: return pd.to_datetime(raw_time, utc=True) except Exception: return None def update_recent_activity_progress( event_ts, *, year_start, year_end, oldest_event_ts_seen, page_oldest_event_ts, seen_target_year, crossed_before_year_start, ): if event_ts is None: return ( oldest_event_ts_seen, page_oldest_event_ts, seen_target_year, crossed_before_year_start, ) if oldest_event_ts_seen is None or event_ts < oldest_event_ts_seen: oldest_event_ts_seen = event_ts if page_oldest_event_ts is None or event_ts < page_oldest_event_ts: page_oldest_event_ts = event_ts if year_start <= event_ts <= year_end: seen_target_year = True elif event_ts < year_start: crossed_before_year_start = True return ( oldest_event_ts_seen, page_oldest_event_ts, seen_target_year, crossed_before_year_start, ) def update_stalled_pagination_state( *, page_oldest_event_ts, previous_oldest_event_ts_seen, stalled_oldest_count, ): if previous_oldest_event_ts_seen is not None and page_oldest_event_ts is not None: if page_oldest_event_ts >= previous_oldest_event_ts_seen: stalled_oldest_count += 1 else: stalled_oldest_count = 0 if page_oldest_event_ts is not None: previous_oldest_event_ts_seen = page_oldest_event_ts return previous_oldest_event_ts_seen, stalled_oldest_count # ============================================================ # Thread-safe rate limiters # ============================================================ class RateLimiter: def __init__(self, calls_per_second=10): self.calls_per_second = calls_per_second self.last_call = 0.0 self.lock = threading.Lock() def wait(self): with self.lock: now = time.time() min_interval = 1.0 / self.calls_per_second elapsed = now - self.last_call if elapsed < min_interval: time.sleep(min_interval - elapsed) self.last_call = time.time() rate_limiter = RateLimiter(calls_per_second=10) recent_activity_rate_limiter = RateLimiter(calls_per_second=1.0) # ============================================================ # Simple in-memory cache # ============================================================ CACHE = {} CACHE_TTL = 60 * 10 # 10 minutes CACHE_LOCK = threading.Lock() def get_cache(key): with CACHE_LOCK: entry = CACHE.get(key) if not entry: return None value, timestamp = entry if time.time() - timestamp > CACHE_TTL: del CACHE[key] return None print(f"[CACHE HIT] {key}") return value def set_cache(key, value): with CACHE_LOCK: CACHE[key] = (value, time.time()) # ============================================================ # Username validation / identity resolution # ============================================================ def empty_identity(username: str, error: str): username = (username or "").strip() return { "exists": False, "username": username, "fullname": None, "display_name": f"@{username}" if username else "", "error": error, "source": "empty", } def resolve_identity(username: str, profile: gr.OAuthProfile | None): username = (username or "").strip() if not username: return empty_identity("", "No username provided.") return validate_username(username, profile) def is_invalid_identity(identity: dict, username: str) -> bool: username = (username or "").strip() return bool(username and not identity.get("exists")) def validate_username(username: str, profile: gr.OAuthProfile | None): username = (username or "").strip() if not username: return empty_identity(username, "No username provided.") cache_key = ("identity", username.lower()) cached = get_cache(cache_key) if cached is not None: return cached profile_username = get_viewer_username(profile) profile_name = getattr(profile, "name", None) if profile is not None else None if profile_username and profile_username.lower() == username.lower(): result = { "exists": True, "username": profile_username, "fullname": profile_name.strip() if isinstance(profile_name, str) and profile_name.strip() else None, "display_name": ( f"{profile_name.strip()} (@{profile_username})" if isinstance(profile_name, str) and profile_name.strip() else f"@{profile_username}" ), "error": None, "source": "oauth", } set_cache(cache_key, result) return result session = build_session("text/html,application/json") try: response = session.get( f"https://huggingface.co/{username}", timeout=15, allow_redirects=True, ) if response.status_code == 404: result = empty_identity(username, f"Username or org '{username}' was not found.") result["source"] = "profile_page" set_cache(cache_key, result) return result response.raise_for_status() result = { "exists": True, "username": username, "fullname": None, "display_name": f"@{username}", "error": None, "source": "profile_page", } set_cache(cache_key, result) return result except requests.HTTPError: result = empty_identity( username, f"Username or org '{username}' could not be verified." ) result["source"] = "profile_page_error" set_cache(cache_key, result) return result except requests.RequestException: result = empty_identity( username, "Could not verify this username right now. Please try again." ) result["source"] = "network_error" set_cache(cache_key, result) return result except Exception: result = empty_identity( username, "Could not verify this username right now. Please try again." ) result["source"] = "unknown_error" set_cache(cache_key, result) return result # ============================================================ # Public cache only # ============================================================ def list_items_with_api(api: HfApi, username: str, kind: str): fetchers = { "model": api.list_models, "dataset": api.list_datasets, "space": api.list_spaces, } fetcher = fetchers.get(kind) return list(fetcher(author=username)) if fetcher else [] @lru_cache(maxsize=1000) def cached_list_commits_public(repo_id: str, repo_type: str): return list(HfApi().list_repo_commits(repo_id=repo_id, repo_type=repo_type)) @lru_cache(maxsize=200) def cached_list_items_public(username: str, kind: str): return list_items_with_api(HfApi(), username, kind) def list_items(username: str, kind: str, token: str | None = None): if token: return list_items_with_api(get_api(token), username, kind) return cached_list_items_public(username, kind) def list_commits(repo_id: str, repo_type: str, token: str | None = None): if token: return list(get_api(token).list_repo_commits(repo_id=repo_id, repo_type=repo_type)) return cached_list_commits_public(repo_id, repo_type) # ============================================================ # Repo activity fetch # ============================================================ def fetch_commits_for_repo( repo_id: str, repo_type: str, selected_year: int, token: str | None = None, ): try: rate_limiter.wait() commits = list_commits(repo_id, repo_type, token=token) commit_dates = [] commit_count = 0 for commit in commits: commit_created_at = getattr(commit, "created_at", None) if commit_created_at is None: continue dt = pd.to_datetime(commit_created_at).tz_localize(None).date() if dt.year == selected_year: commit_dates.append(dt) commit_count += 1 return commit_dates, commit_count, None except Exception as e: return [], 0, f"{repo_type}:{repo_id} -> {str(e)}" def build_repo_entry(item, kind: str, selected_year: int, token: str | None = None): repo_id = item.id is_private = bool(getattr(item, "private", False)) created_dates = [] created_at = getattr(item, "created_at", None) if created_at is not None: dt = pd.to_datetime(created_at).tz_localize(None).date() if dt.year == selected_year: created_dates.append(dt) repo_commit_dates, repo_commit_count, err = fetch_commits_for_repo( repo_id=repo_id, repo_type=kind, selected_year=selected_year, token=token, ) created_count = len(created_dates) activity_dates = created_dates + repo_commit_dates activity_count = created_count + repo_commit_count return { "repo_id": repo_id, "kind": kind, "is_private": is_private, "created_dates": created_dates, "created_count": created_count, "repo_commit_dates": repo_commit_dates, "repo_commit_count": repo_commit_count, "activity_dates": activity_dates, "activity_count": activity_count, "error": err, } def collect_kind_repo_entries( username: str, kind: str, selected_year: int, token: str | None = None, ): try: items = list_items(username, kind, token=token) repo_entries = [] errors = [] chunk_size = 5 for i in range(0, len(items), chunk_size): chunk = items[i:i + chunk_size] if not chunk: continue with ThreadPoolExecutor(max_workers=min(5, len(chunk))) as executor: futures = { executor.submit( build_repo_entry, item, kind, selected_year, token, ): item.id for item in chunk } for future in as_completed(futures): entry = future.result() repo_entries.append(entry) if entry["error"]: errors.append(entry["error"]) return { "total_items": len(items), "repos": repo_entries, "errors": errors, } except Exception as e: return { "total_items": 0, "repos": [], "errors": [f"Error fetching {kind}s for {username}: {str(e)}"], } def aggregate_repo_entries(repo_entries: list[dict], include_private: bool): filtered = repo_entries if include_private else [r for r in repo_entries if not r["is_private"]] created_dates = [] repo_commit_dates = [] activity_dates = [] created_count = 0 repo_commit_count = 0 repos_with_activity = 0 for repo in filtered: created_dates.extend(repo["created_dates"]) repo_commit_dates.extend(repo["repo_commit_dates"]) activity_dates.extend(repo["activity_dates"]) created_count += repo["created_count"] repo_commit_count += repo["repo_commit_count"] if repo["activity_count"] > 0: repos_with_activity += 1 return { "created_dates": created_dates, "created_count": created_count, "repo_commit_dates": repo_commit_dates, "repo_commit_count": repo_commit_count, "activity_dates": activity_dates, "activity_count": created_count + repo_commit_count, "repos_with_activity": repos_with_activity, } # ============================================================ # Generic activity bucket helpers # ============================================================ def empty_activity_bucket(title: str): return { "title": title, "fetched": False, "dates": [], "count": 0, "logs": f"{title} not fetched yet.", } def empty_bucketed_activity_state( selected_year: int, logs_text: str, bucket_titles: dict[str, str], ): state = { "username": "", "selected_year": int(selected_year), "fetched": False, "logs": logs_text, } for key, title in bucket_titles.items(): state[key] = empty_activity_bucket(title) return state def build_fetched_bucket(title: str, dates: list, selected_year: int, label: str): return { "title": title, "fetched": True, "dates": dates, "count": len(dates), "logs": f"Found {len(dates)} {label} in {selected_year}.", } def build_failed_bucket(title: str): return { "title": title, "fetched": True, "dates": [], "count": 0, "logs": "", } def build_invalid_section_state(username: str, selected_year: int, empty_state_fn, identity: dict): state = empty_state_fn(selected_year) state["username"] = username state["selected_year"] = int(selected_year) state["logs"] = identity.get("error") or f"Username or org '{username}' was not found." return state # ============================================================ # Community activity state # ============================================================ def empty_community_state(selected_year: int): return { "username": "", "selected_year": int(selected_year), "fetched": False, "pr_opened": empty_activity_bucket("Pull Requests"), "pr_activity": empty_activity_bucket("PR Activity"), "thread_opened": empty_activity_bucket("Community Threads"), "discussion_activity": empty_activity_bucket("Community Activity"), "logs": "Community activity not fetched yet.", } # ============================================================ # Generic recent-activity engine # ============================================================ def collect_recent_activity( *, username: str, selected_year: int, activity_type: str, processor, build_session_fn, rate_limiter_obj, token: str | None = None, limit: int = RECENT_ACTIVITY_LIMIT, max_pages: int = RECENT_ACTIVITY_MAX_PAGES, max_stalled_pages: int = RECENT_ACTIVITY_MAX_STALLED_PAGES, ): username = (username or "").strip() selected_year = int(selected_year) processor_state = processor.init_state() year_start, year_end = get_year_bounds(selected_year) session = build_session_fn("application/json", token=token) cursor = None skip = 0 pages_fetched = 0 seen_target_year = False crossed_before_year_start = False oldest_event_ts_seen = None previous_oldest_event_ts_seen = None stalled_oldest_count = 0 seen_cursors = set() seen_page_signatures = set() logs = [] try: while True: if pages_fetched >= max_pages: logs.append("Stopped due to safety max pages.") break rate_limiter_obj.wait() params = { "limit": limit, "activityType": activity_type, "feedType": "user", "entity": username, } if cursor: params["cursor"] = cursor else: params["skip"] = skip response = session.get( "https://huggingface.co/api/recent-activity", params=params, timeout=20, ) response.raise_for_status() payload = response.json() pages_fetched += 1 events, next_cursor = normalize_recent_activity_payload(payload) if not events: logs.append("No more events.") break page_signature = build_events_page_signature(events) if page_signature in seen_page_signatures: logs.append("Stopped because recent-activity returned a repeated page.") break seen_page_signatures.add(page_signature) if next_cursor: if next_cursor in seen_cursors: logs.append("Stopped because recent-activity returned a repeated cursor.") break seen_cursors.add(next_cursor) page_oldest_event_ts = None for event in events: if not isinstance(event, dict): continue event_ts = parse_recent_activity_event_ts(event) ( oldest_event_ts_seen, page_oldest_event_ts, seen_target_year, crossed_before_year_start, ) = update_recent_activity_progress( event_ts, year_start=year_start, year_end=year_end, oldest_event_ts_seen=oldest_event_ts_seen, page_oldest_event_ts=page_oldest_event_ts, seen_target_year=seen_target_year, crossed_before_year_start=crossed_before_year_start, ) processor.process_event( event=event, event_ts=event_ts, year_start=year_start, year_end=year_end, state=processor_state, ) if crossed_before_year_start: logs.append("Stopped after crossing before selected year.") break previous_oldest_event_ts_seen, stalled_oldest_count = update_stalled_pagination_state( page_oldest_event_ts=page_oldest_event_ts, previous_oldest_event_ts_seen=previous_oldest_event_ts_seen, stalled_oldest_count=stalled_oldest_count, ) if stalled_oldest_count >= max_stalled_pages: logs.append("Stopped because pagination no longer progressed toward older events.") break if next_cursor: cursor = next_cursor else: skip += limit return processor.build_result( username=username, selected_year=selected_year, state=processor_state, logs=logs, pages_fetched=pages_fetched, oldest_event_ts_seen=oldest_event_ts_seen, seen_target_year=seen_target_year, crossed_before_year_start=crossed_before_year_start, ) except Exception as e: return processor.build_error_result( username=username, selected_year=selected_year, error=e, logs=logs, ) # ============================================================ # Recent-activity processors # ============================================================ class CommunityActivityProcessor: TITLE_MAP = { "pr_opened": "Pull Requests", "pr_activity": "PR Activity", "thread_opened": "Community Threads", "discussion_activity": "Community Activity", } LOG_LABEL_MAP = { "pr_opened": "unique pr_opened event(s)", "pr_activity": "unique pr_activity event(s)", "thread_opened": "unique thread_opened event(s)", "discussion_activity": "unique discussion_activity event(s)", } OPEN_EVENT_EPSILON_SECONDS = 90 def __init__(self, username: str): self.username = (username or "").strip() def init_state(self): return { "dates": { "pr_opened": [], "pr_activity": [], "thread_opened": [], "discussion_activity": [], }, "seen_ids": { "pr_opened": set(), "pr_activity": set(), "thread_opened": set(), "discussion_activity": set(), }, } def extract_author_name(self, discussion: dict) -> str | None: author = discussion.get("author") if isinstance(author, str): return author if isinstance(author, dict): for key in ["name", "username", "login"]: value = author.get(key) if isinstance(value, str): return value return None def parse_ts(self, raw_value): if not raw_value: return None try: return pd.to_datetime(raw_value, utc=True) except Exception: return None def classify_event(self, event: dict, discussion: dict) -> tuple[str | None, str | None]: is_pr = bool(discussion.get("isPullRequest", False)) author_name = self.extract_author_name(discussion) repo_id = event.get("repoId", "unknown-repo") discussion_num = discussion.get("num", "unknown-num") event_id = event.get("eventId", "unknown-event") if is_pr: if author_name and author_name.lower() == self.username.lower(): return "pr_opened", f"{repo_id}::pr::{discussion_num}" return "pr_activity", f"event::{event_id}" if author_name and author_name.lower() == self.username.lower(): return "thread_opened", f"{repo_id}::thread::{discussion_num}" return "discussion_activity", f"event::{event_id}" def extract_business_timestamp(self, bucket_key: str, event: dict, discussion: dict): event_ts = self.parse_ts(event.get("time")) created_ts = self.parse_ts(discussion.get("createdAt")) if bucket_key in {"pr_opened", "thread_opened"}: return created_ts if bucket_key in {"pr_activity", "discussion_activity"}: if event_ts is None: return None if created_ts is not None: delta_seconds = abs((event_ts - created_ts).total_seconds()) if delta_seconds <= self.OPEN_EVENT_EPSILON_SECONDS: return None return event_ts return None def process_event( self, *, event: dict, event_ts, year_start, year_end, state: dict, ): _ = event_ts discussion = event.get("discussionData") or {} if not isinstance(discussion, dict): return bucket_key, unique_id = self.classify_event(event, discussion) if not bucket_key or not unique_id: return business_ts = self.extract_business_timestamp(bucket_key, event, discussion) if business_ts is None: return if not (year_start <= business_ts <= year_end): return if unique_id in state["seen_ids"][bucket_key]: return state["seen_ids"][bucket_key].add(unique_id) state["dates"][bucket_key].append(business_ts.tz_localize(None).date()) def build_bucket(self, key: str, dates: list, selected_year: int): return { "title": self.TITLE_MAP[key], "fetched": True, "dates": dates, "count": len(dates), "logs": f"Found {len(dates)} {self.LOG_LABEL_MAP[key]} in {selected_year}.", } def build_result( self, *, username: str, selected_year: int, state: dict, logs: list[str], pages_fetched: int, oldest_event_ts_seen, seen_target_year: bool, crossed_before_year_start: bool, ): logs = list(logs) logs.append(f"Fetched {pages_fetched} page(s) from recent-activity.") if oldest_event_ts_seen is not None: logs.append(f"Oldest event reached: {oldest_event_ts_seen.date()}.") logs.append(f"Seen target year: {seen_target_year}.") logs.append(f"Crossed before year start: {crossed_before_year_start}.") logs.append( " | ".join( f"{self.TITLE_MAP[key]}: {len(state['dates'][key])}" for key in ["pr_opened", "pr_activity", "thread_opened", "discussion_activity"] ) ) combined_logs = "\n\n".join( [ "[Pull Requests]\n" + self.build_bucket("pr_opened", state["dates"]["pr_opened"], selected_year)["logs"], "[PR Activity]\n" + self.build_bucket("pr_activity", state["dates"]["pr_activity"], selected_year)["logs"], "[Community Threads]\n" + self.build_bucket("thread_opened", state["dates"]["thread_opened"], selected_year)["logs"], "[Community Activity]\n" + self.build_bucket("discussion_activity", state["dates"]["discussion_activity"], selected_year)["logs"], "[Fetch]\n" + "\n".join(logs), ] ) return { "username": username, "selected_year": selected_year, "fetched": True, "pr_opened": self.build_bucket("pr_opened", state["dates"]["pr_opened"], selected_year), "pr_activity": self.build_bucket("pr_activity", state["dates"]["pr_activity"], selected_year), "thread_opened": self.build_bucket("thread_opened", state["dates"]["thread_opened"], selected_year), "discussion_activity": self.build_bucket("discussion_activity", state["dates"]["discussion_activity"], selected_year), "logs": combined_logs, } def build_error_result( self, *, username: str, selected_year: int, error, logs: list[str], ): _ = logs return { "username": username, "selected_year": selected_year, "fetched": True, "pr_opened": build_failed_bucket("Pull Requests"), "pr_activity": build_failed_bucket("PR Activity"), "thread_opened": build_failed_bucket("Community Threads"), "discussion_activity": build_failed_bucket("Community Activity"), "logs": f"Community fetch failed: {str(error)}", } class BucketedActivityProcessor: def __init__( self, *, bucket_titles: dict[str, str], bucket_log_labels: dict[str, str], classifier, error_label: str, ): self.bucket_titles = bucket_titles self.bucket_log_labels = bucket_log_labels self.classifier = classifier self.error_label = error_label def init_state(self): return { "bucket_dates": {key: [] for key in self.bucket_titles}, "bucket_seen_ids": {key: set() for key in self.bucket_titles}, } def process_event( self, *, event: dict, event_ts, year_start, year_end, state: dict, ): if event_ts is None: return if not (year_start <= event_ts <= year_end): return bucket_key, unique_id = self.classifier(event) if not bucket_key or not unique_id: return if bucket_key not in state["bucket_dates"]: return if unique_id in state["bucket_seen_ids"][bucket_key]: return state["bucket_seen_ids"][bucket_key].add(unique_id) state["bucket_dates"][bucket_key].append(event_ts.tz_localize(None).date()) def build_result( self, *, username: str, selected_year: int, state: dict, logs: list[str], pages_fetched: int, oldest_event_ts_seen, seen_target_year: bool, crossed_before_year_start: bool, ): logs = list(logs) logs.append(f"Fetched {pages_fetched} page(s) from recent-activity.") if oldest_event_ts_seen is not None: logs.append(f"Oldest event reached: {oldest_event_ts_seen.date()}.") logs.append(f"Seen target year: {seen_target_year}.") logs.append(f"Crossed before year start: {crossed_before_year_start}.") logs.append( " | ".join( f"{self.bucket_titles[key]}: {len(state['bucket_dates'][key])}" for key in self.bucket_titles ) ) result = { "username": username, "selected_year": selected_year, "fetched": True, "logs": "\n".join(logs), } for key, title in self.bucket_titles.items(): dates = state["bucket_dates"][key] result[key] = build_fetched_bucket( title=title, dates=dates, selected_year=selected_year, label=self.bucket_log_labels[key], ) return result def build_error_result( self, *, username: str, selected_year: int, error, logs: list[str], ): _ = logs result = { "username": username, "selected_year": selected_year, "fetched": True, "logs": f"{self.error_label} fetch failed: {str(error)}", } for key, title in self.bucket_titles.items(): result[key] = build_failed_bucket(title) return result # ============================================================ # Activity classifiers + config # ============================================================ def classify_upvote_event(event: dict): target_type = event.get("targetType") target = event.get("target") or {} if not isinstance(target, dict): target = {} unique_id = event.get("targetUrl") or target.get("id") or target.get("_id") mapping = { "paper": "papers", "article": "blog", "changelog": "changelog", } bucket_key = mapping.get(target_type) if not bucket_key or not unique_id: return None, None return bucket_key, unique_id def classify_like_event(event: dict): repo_data = event.get("repoData") or {} if not isinstance(repo_data, dict): repo_data = {} repo_type = event.get("repoType") or repo_data.get("repoType") unique_id = event.get("repoId") or repo_data.get("id") mapping = { "model": "models", "dataset": "datasets", "space": "spaces", } bucket_key = mapping.get(repo_type) if not bucket_key or not unique_id: return None, None return bucket_key, unique_id def classify_paper_event(event: dict): event_type = event.get("type") paper = event.get("paper") or {} if not isinstance(paper, dict): paper = {} unique_id = paper.get("id") mapping = { "paper-daily": "daily", "paper": "authored", } bucket_key = mapping.get(event_type) if not bucket_key or not unique_id: return None, None return bucket_key, unique_id SINGLE_SECTION_CONFIG = { "upvotes": { "section_label": "upvotes activity", "empty_chart_title": "Upvotes", "empty_logs_text": "Upvotes activity not fetched yet.", "needs_fetch_text": "Upvotes activity needs to be fetched for the current username/year.", "default_mode": "all", "palette_name": "purple", "bucket_titles": { "papers": "Paper Upvotes", "blog": "Blog Upvotes", "changelog": "Changelog Upvotes", }, "bucket_log_labels": { "papers": "paper upvote(s)", "blog": "blog upvote(s)", "changelog": "changelog upvote(s)", }, "activity_type": "upvote", "cache_scope": "upvotes", "classifier": classify_upvote_event, "mode_labels": { "papers": "paper upvote(s)", "blog": "blog upvote(s)", "changelog": "changelog upvote(s)", }, "button_text": "Fetch upvotes activity", "empty_context_text": "Not logged in · Enter a username to fetch upvotes activity", "radio_choices": [ ("All", "all"), ("Papers", "papers"), ("Blog", "blog"), ("Changelog", "changelog"), ], "radio_label": "Upvotes view", "radio_elem_id": "upvotes-view-toggle", "logs_label": "Upvotes Fetch Logs", "logs_lines": 10, }, "likes": { "section_label": "likes activity", "empty_chart_title": "Likes", "empty_logs_text": "Likes activity not fetched yet.", "needs_fetch_text": "Likes activity needs to be fetched for the current username/year.", "default_mode": "all", "palette_name": "blue", "bucket_titles": { "models": "Model Likes", "datasets": "Dataset Likes", "spaces": "Space Likes", }, "bucket_log_labels": { "models": "model like(s)", "datasets": "dataset like(s)", "spaces": "space like(s)", }, "activity_type": "like", "cache_scope": "likes", "classifier": classify_like_event, "mode_labels": { "models": "model like(s)", "datasets": "dataset like(s)", "spaces": "space like(s)", }, "button_text": "Fetch likes activity", "empty_context_text": "Not logged in · Enter a username to fetch likes activity", "radio_choices": [ ("All", "all"), ("Models", "models"), ("Datasets", "datasets"), ("Spaces", "spaces"), ], "radio_label": "Likes view", "radio_elem_id": "likes-view-toggle", "logs_label": "Likes Fetch Logs", "logs_lines": 10, }, "papers": { "section_label": "papers activity", "empty_chart_title": "Papers", "empty_logs_text": "Papers activity not fetched yet.", "needs_fetch_text": "Papers activity needs to be fetched for the current username/year.", "default_mode": "all", "palette_name": "teal", "bucket_titles": { "daily": "Daily Papers", "authored": "Authored Papers", }, "bucket_log_labels": { "daily": "daily paper submission(s)", "authored": "authored paper(s)", }, "activity_type": "update-paper", "cache_scope": "papers", "classifier": classify_paper_event, "mode_labels": { "daily": "Daily Papers submission(s)", "authored": "authored paper(s)", }, "button_text": "Fetch papers activity", "empty_context_text": "Not logged in · Enter a username to fetch papers activity", "radio_choices": [ ("All", "all"), ("Daily Papers", "daily"), ("Authored", "authored"), ], "radio_label": "Papers view", "radio_elem_id": "papers-view-toggle", "logs_label": "Papers Fetch Logs", "logs_lines": 10, }, } def empty_single_section_state(section_key: str, selected_year: int): cfg = SINGLE_SECTION_CONFIG[section_key] return empty_bucketed_activity_state( selected_year=selected_year, logs_text=cfg["empty_logs_text"], bucket_titles=cfg["bucket_titles"], ) # ============================================================ # Public activity fetch functions # ============================================================ def fetch_all_community_activity(username: str, selected_year: int): username = (username or "").strip() selected_year = int(selected_year) cache_key = (username.lower(), selected_year, "community") cached = get_cache(cache_key) if cached: return cached if not username: state = empty_community_state(selected_year) state["fetched"] = True state["logs"] = "No username provided." set_cache(cache_key, state) return state processor = CommunityActivityProcessor(username=username) result = collect_recent_activity( username=username, selected_year=selected_year, activity_type="discussion", processor=processor, build_session_fn=build_session, rate_limiter_obj=recent_activity_rate_limiter, ) set_cache(cache_key, result) return result def fetch_single_section_activity(section_key: str, username: str, selected_year: int): username = (username or "").strip() selected_year = int(selected_year) cfg = SINGLE_SECTION_CONFIG[section_key] cache_key = (username.lower(), selected_year, cfg["cache_scope"]) cached = get_cache(cache_key) if cached: return cached if not username: state = empty_single_section_state(section_key, selected_year) state["fetched"] = True state["logs"] = "No username provided." set_cache(cache_key, state) return state processor = BucketedActivityProcessor( bucket_titles=cfg["bucket_titles"], bucket_log_labels=cfg["bucket_log_labels"], classifier=cfg["classifier"], error_label=cfg["cache_scope"].capitalize(), ) result = collect_recent_activity( username=username, selected_year=selected_year, activity_type=cfg["activity_type"], processor=processor, build_session_fn=build_session, rate_limiter_obj=recent_activity_rate_limiter, ) set_cache(cache_key, result) return result # ============================================================ # Contribution grid helpers # ============================================================ PALETTES = { "green": ["#0e4429", "#006d32", "#26a641", "#39d353"], "blue": ["#0a3069", "#0969da", "#54aeff", "#79c0ff"], "pink": ["#6e1f4d", "#bf3989", "#db61a2", "#f778ba"], "orange": ["#5f370e", "#a14d00", "#d97706", "#f59e0b"], "purple": ["#4c1d95", "#6d28d9", "#8b5cf6", "#c4b5fd"], "teal": ["#0f766e", "#0d9488", "#14b8a6", "#5eead4"], } TOOLTIP_SAFE_COLUMNS = 2 def level_from_count(count: int) -> int: if count <= 0: return 0 if count == 1: return 1 if count <= 3: return 2 if count <= 7: return 3 return 4 def level_color(palette_name: str, level: int) -> str: if level <= 0: return "var(--gh-empty-cell)" palette = PALETTES[palette_name] return palette[min(level - 1, len(palette) - 1)] def build_daily_count_map(df: pd.DataFrame): if df.empty: return {} counts = ( df.copy() .assign(date=pd.to_datetime(df["date"]).dt.date, count=1) .groupby("date", as_index=False)["count"] .sum() ) return {row["date"]: int(row["count"]) for _, row in counts.iterrows()} def get_calendar_bounds(year: int): start = pd.Timestamp(f"{year}-01-01") end = pd.Timestamp(f"{year}-12-31") grid_start = start - pd.Timedelta(days=start.dayofweek) grid_end = end + pd.Timedelta(days=(6 - end.dayofweek)) all_days = pd.date_range(grid_start, grid_end, freq="D") num_weeks = len(all_days) // 7 return start, end, grid_start, grid_end, all_days, num_weeks def month_positions_for_year(year: int, grid_start: pd.Timestamp): month_starts = pd.date_range(f"{year}-01-01", f"{year}-12-31", freq="MS") return [(d.strftime("%b"), int((d - grid_start).days // 7)) for d in month_starts] # ============================================================ # Name helpers # ============================================================ def resolve_display_name( username: str, profile: gr.OAuthProfile | None, identity: dict | None = None, ): username = (username or "").strip() if not username: return "" profile_username = get_viewer_username(profile) profile_name = getattr(profile, "name", None) if profile is not None else None if profile_username and profile_username.lower() == username.lower(): if isinstance(profile_name, str) and profile_name.strip(): return f"{profile_name.strip()} (@{profile_username})" return f"@{profile_username}" if identity: display_name = identity.get("display_name") if isinstance(display_name, str) and display_name.strip(): return display_name fullname = identity.get("fullname") identity_username = identity.get("username") or username if isinstance(fullname, str) and fullname.strip(): return f"{fullname.strip()} (@{identity_username})" if isinstance(identity_username, str) and identity_username.strip(): return f"@{identity_username.strip()}" return f"@{username}" # ============================================================ # HTML render helpers # ============================================================ def render_status_badge(text: str): return f'
{text}
' def render_metric_cards( total_activity: int, created_by_type: dict, repo_commit_counts_by_type: dict, selected_year: int, ): return f"""
Total activity
{total_activity}
Creations + repo contributions in {selected_year}
{REPO_KIND_EMOJIS["model"]} Models created
{created_by_type["model"]}
{repo_commit_counts_by_type["model"]} contributions on model repos
{REPO_KIND_EMOJIS["dataset"]} Datasets created
{created_by_type["dataset"]}
{repo_commit_counts_by_type["dataset"]} contributions on dataset repos
{REPO_KIND_EMOJIS["space"]} Spaces created
{created_by_type["space"]}
{repo_commit_counts_by_type["space"]} contributions on space repos
""" # ============================================================ # Generic banner helpers # ============================================================ def viewer_banner_prefix(profile: gr.OAuthProfile | None) -> list[str]: viewer = get_viewer_username(profile) if viewer: return [f"Logged in as {html.escape(viewer)}"] return ["Not logged in"] def render_generic_invalid_banner(username: str, error_text: str | None = None): username = (username or "").strip() message = error_text or f"Username or org '{username}' was not found." return render_status_badge(html.escape(message)) def render_section_idle_banner( section_label: str, username, selected_year, profile: gr.OAuthProfile | None, ): username = (username or "").strip() selected_year = int(selected_year) parts = viewer_banner_prefix(profile) if not username: parts.append(f"Enter a username to fetch {section_label.lower()}") return render_status_badge(" · ".join(parts)) if is_self_view(username, profile): parts.append(f"Fetch your {section_label.lower()} or enter another username") else: parts.append(f"Ready to fetch {section_label.lower()} for {html.escape(username)}") parts.append(str(selected_year)) return render_status_badge(" · ".join(parts)) def render_section_fetching_banner( section_label: str, username, selected_year, profile: gr.OAuthProfile | None, ): username = (username or "").strip() selected_year = int(selected_year) if not username: return render_section_idle_banner(section_label, "", selected_year, profile) parts = viewer_banner_prefix(profile) if is_self_view(username, profile): parts.append(f"Fetching your {section_label.lower()}") else: parts.append(f"Fetching {section_label.lower()} for {html.escape(username)}") parts.append(str(selected_year)) return render_status_badge(" · ".join(parts)) def render_section_loaded_banner( section_label: str, state: dict | None, profile: gr.OAuthProfile | None, ): year = state.get("selected_year", datetime.now().year) if state else datetime.now().year if not state or not state.get("username"): return render_section_idle_banner(section_label, "", year, profile) username = state["username"] selected_year = int(state["selected_year"]) parts = viewer_banner_prefix(profile) if is_self_view(username, profile): parts.append(f"Viewing your {section_label.lower()}") else: parts.append(f"Viewing {section_label.lower()} for {html.escape(username)}") parts.append(str(selected_year)) return render_status_badge(" · ".join(parts)) # ============================================================ # Repo-specific banners # ============================================================ def render_idle_context_banner( username: str, profile: gr.OAuthProfile | None, ): username = (username or "").strip() viewer = get_viewer_username(profile) if viewer: if not username or username.lower() == viewer.lower(): return render_status_badge( f"Logged in as {html.escape(viewer)} · Fetch your activity or enter another username" ) return render_status_badge( f"Logged in as {html.escape(viewer)} · Ready to fetch {html.escape(username)} · Public repos only" ) if username: return render_status_badge( f"Not logged in · Ready to fetch {html.escape(username)} · Public repos only" ) return render_status_badge("Not logged in · Enter a username to get started") def render_context_banner(app_state: dict | None, profile: gr.OAuthProfile | None): if not app_state or not app_state.get("username"): return render_idle_context_banner("", profile) username = app_state["username"] selected_year = int(app_state["selected_year"]) is_own = bool(app_state.get("is_own_profile", False)) can_toggle = bool(app_state.get("can_toggle_visibility", False)) viewer = get_viewer_username(profile) parts = [] if viewer: parts.append(f"Logged in as {html.escape(viewer)}") else: parts.append("Not logged in") if is_self_view(username, profile): parts.append("Viewing your activity") else: parts.append(f"Viewing {html.escape(username)}") parts.append(str(selected_year)) if is_own and can_toggle: parts.append("Private repos available") else: parts.append("Public repos only") return render_status_badge(" · ".join(parts)) def render_fetching_context_banner( username, selected_year, profile: gr.OAuthProfile | None, ): username = (username or "").strip() selected_year = int(selected_year) viewer = get_viewer_username(profile) if not username: return render_idle_context_banner("", profile) parts = [] if viewer: parts.append(f"Logged in as {html.escape(viewer)}") else: parts.append("Not logged in") if is_self_view(username, profile): parts.append("Fetching your activity") parts.append(str(selected_year)) parts.append("Checking public and private repos") else: parts.append(f"Fetching {html.escape(username)}") parts.append(str(selected_year)) parts.append("Checking public repos") return render_status_badge(" · ".join(parts)) def render_empty_chart(title: str): safe_title = html.escape(title) return f"""
{safe_title}
No activity found.
""" def render_contrib_chart( df: pd.DataFrame, year: int, title: str, palette_name: str = "green", subtitle: str | None = None, ): safe_title = html.escape(title) safe_subtitle = html.escape(subtitle) if subtitle else "" counts_map = build_daily_count_map(df) year_start, year_end, grid_start, _, all_days, num_weeks = get_calendar_bounds(year) months = month_positions_for_year(year, grid_start) padded_weeks = num_weeks + (2 * TOOLTIP_SAFE_COLUMNS) cells_html = [] total = 0 for day in all_days: day_date = day.date() in_year = year_start.date() <= day_date <= year_end.date() count = counts_map.get(day_date, 0) if in_year else 0 total += count week = int((day - grid_start).days // 7) display_week = week + TOOLTIP_SAFE_COLUMNS dow = int(day.dayofweek) level = level_from_count(count) color = level_color(palette_name, level) if in_year else "transparent" label = f"{count} contribution{'s' if count != 1 else ''} on {day.strftime('%b %d, %Y')}" if in_year: classes = ["gh-cell"] if dow == 0: classes.append("gh-tooltip-bottom") if week <= 1: classes.append("gh-tooltip-right") elif week >= num_weeks - 2: classes.append("gh-tooltip-left") class_attr = " ".join(classes) safe_label = html.escape(label) cells_html.append( f""" """ ) else: cells_html.append( f""" """ ) month_labels = [] for label, pos in months: display_pos = pos + TOOLTIP_SAFE_COLUMNS month_labels.append( f'
{html.escape(label)}
' ) legend_colors = ["var(--gh-empty-cell)"] + PALETTES[palette_name] legend_html = "".join( f'' for c in legend_colors ) subtitle_html = f'
{safe_subtitle}
' if subtitle else "" return f"""
{safe_title}
{subtitle_html}
{total} contributions
{''.join(month_labels)}
Mon
Wed
Fri
{''.join(cells_html)}
Less {legend_html} More
""" def merge_dates(opened_dates: list, activity_dates: list, mode: str): if mode == "opened": return list(opened_dates) if mode == "activity": return list(activity_dates) return list(opened_dates) + list(activity_dates) def render_dual_mode_chart( state: dict | None, *, opened_key: str, activity_key: str, mode: str, username: str, year: int, title: str, palette_name: str, opened_label: str, activity_label: str, total_label: str, ): if not state or not state.get("fetched"): return render_empty_chart(title) opened = state[opened_key]["dates"] activity = state[activity_key]["dates"] merged = merge_dates(opened, activity, mode) df = pd.DataFrame(merged, columns=["date"]) subtitle = { "opened": f"{len(opened)} {opened_label} by {username} in {year}", "activity": f"{len(activity)} {activity_label} for {username} in {year}", "both": f"{len(opened) + len(activity)} total {total_label} for {username} in {year}", }[mode] return render_contrib_chart( df, year, title, palette_name=palette_name, subtitle=subtitle, ) def render_pr_combined_chart(community_state: dict | None, mode: str, username: str, year: int): return render_dual_mode_chart( community_state, opened_key="pr_opened", activity_key="pr_activity", mode=mode, username=username, year=year, title="Pull Requests", palette_name="purple", opened_label="PRs opened", activity_label="PR follow-up events", total_label="PR events", ) def render_discussion_combined_chart(community_state: dict | None, mode: str, username: str, year: int): return render_dual_mode_chart( community_state, opened_key="thread_opened", activity_key="discussion_activity", mode=mode, username=username, year=year, title="Community Discussions", palette_name="teal", opened_label="threads opened", activity_label="discussion follow-up events", total_label="discussion events", ) def render_single_section_chart(section_key: str, state: dict | None, mode: str, username: str, year: int): cfg = SINGLE_SECTION_CONFIG[section_key] if not state or not state.get("fetched"): return render_empty_chart(cfg["empty_chart_title"]) bucket_keys = list(cfg["bucket_titles"].keys()) bucket_dates = {key: state[key]["dates"] for key in bucket_keys} if mode in bucket_dates: merged = list(bucket_dates[mode]) subtitle = f"{len(merged)} {cfg['mode_labels'][mode]} by {username} in {year}" else: merged = [d for key in bucket_keys for d in bucket_dates[key]] subtitle = f"{len(merged)} total {cfg['empty_chart_title'].lower()} event(s) by {username} in {year}" df = pd.DataFrame(merged, columns=["date"]) return render_contrib_chart( df, year, cfg["empty_chart_title"], palette_name=cfg["palette_name"], subtitle=subtitle, ) # ============================================================ # App state + render layer # ============================================================ def build_empty_repo_kind_data(): return { kind: {"total_items": 0, "repos": [], "errors": []} for kind in REPO_KINDS } def build_empty_app_state( username: str, selected_year: int, viewer_username: str | None = None, identity: dict | None = None, ): resolved_identity = identity or ( validate_username(username, None) if username else empty_identity("", "No username provided.") ) return { "username": username, "selected_year": int(selected_year), "viewer_username": viewer_username, "is_own_profile": False, "can_toggle_visibility": False, "default_visibility": "public", "kind_repo_data": build_empty_repo_kind_data(), "identity": resolved_identity, "is_valid_username": bool(resolved_identity.get("exists")), "validation_error": resolved_identity.get("error"), } def build_app_state( username: str, selected_year: int, profile: gr.OAuthProfile | None, oauth_token: gr.OAuthToken | None, ): username = (username or "").strip() selected_year = int(selected_year) viewer_username = get_viewer_username(profile) token = oauth_token.token if oauth_token is not None else None if not username: return build_empty_app_state(username, selected_year, viewer_username) identity = validate_username(username, profile) if not identity.get("exists"): return { **build_empty_app_state( username, selected_year, viewer_username, identity=identity, ), "identity": identity, "is_valid_username": False, "validation_error": identity.get("error") or f"Username or org '{username}' was not found.", } is_own_profile = bool( viewer_username and username and viewer_username.lower() == username.lower() and token ) can_toggle_visibility = is_own_profile token_for_fetch = token if is_own_profile else None scope = "repo_private" if is_own_profile else "repo_public" cache_key = (username.lower(), selected_year, scope) cached = get_cache(cache_key) if cached: cached_with_identity = dict(cached) cached_with_identity["identity"] = identity cached_with_identity["is_valid_username"] = True cached_with_identity["validation_error"] = None return cached_with_identity kind_repo_data = {} for kind in REPO_KINDS: kind_repo_data[kind] = collect_kind_repo_entries( username=username, kind=kind, selected_year=selected_year, token=token_for_fetch, ) result = { "username": username, "selected_year": selected_year, "viewer_username": viewer_username, "is_own_profile": is_own_profile, "can_toggle_visibility": can_toggle_visibility, "default_visibility": "all" if can_toggle_visibility else "public", "kind_repo_data": kind_repo_data, "identity": identity, "is_valid_username": True, "validation_error": None, } set_cache(cache_key, result) return result def render_invalid_app_state(app_state: dict | None): username = (app_state or {}).get("username", "") selected_year = int((app_state or {}).get("selected_year", datetime.now().year)) error_text = (app_state or {}).get("validation_error") or f"Username or org '{username}' was not found." empty_counts = {kind: 0 for kind in REPO_KINDS} empty_metrics = render_metric_cards( 0, empty_counts, empty_counts, selected_year, ) return ( "## Ready", render_generic_invalid_banner(username, error_text), empty_metrics, render_empty_chart("All activity"), render_empty_chart("Models"), render_empty_chart("Datasets"), render_empty_chart("Spaces"), error_text, ) def render_app_state( app_state: dict | None, visibility_mode: str | None, profile: gr.OAuthProfile | None, ): if not app_state or not app_state.get("username"): selected_year = datetime.now().year empty_counts = {kind: 0 for kind in REPO_KINDS} empty_metrics = render_metric_cards( 0, empty_counts, empty_counts, selected_year, ) context_html = render_idle_context_banner("", profile) return ( "## Ready", context_html, empty_metrics, render_empty_chart("All activity"), render_empty_chart("Models"), render_empty_chart("Datasets"), render_empty_chart("Spaces"), "No username provided.", ) if not app_state.get("is_valid_username", True): return render_invalid_app_state(app_state) username = app_state["username"] selected_year = int(app_state["selected_year"]) can_toggle_visibility = bool(app_state["can_toggle_visibility"]) effective_visibility = "all" if (visibility_mode == "all" and can_toggle_visibility) else "public" include_private = effective_visibility == "all" kind_views = {} warnings = [] for kind in REPO_KINDS: kind_data = app_state["kind_repo_data"][kind] kind_views[kind] = aggregate_repo_entries( kind_data["repos"], include_private=include_private, ) warnings.extend(kind_data["errors"]) total_activity = sum(v["activity_count"] for v in kind_views.values()) all_activity_dates = [] for kind in REPO_KINDS: all_activity_dates.extend(kind_views[kind]["activity_dates"]) all_df = pd.DataFrame(all_activity_dates, columns=["date"]) kind_dfs = { kind: pd.DataFrame(kind_views[kind]["activity_dates"], columns=["date"]) for kind in REPO_KINDS } context_html = render_context_banner(app_state, profile) display_name = resolve_display_name(username, profile, app_state.get("identity")) metrics_html = render_metric_cards( total_activity=total_activity, created_by_type={kind: kind_views[kind]["created_count"] for kind in REPO_KINDS}, repo_commit_counts_by_type={kind: kind_views[kind]["repo_commit_count"] for kind in REPO_KINDS}, selected_year=selected_year, ) all_chart_html = render_contrib_chart( all_df, selected_year, f"{display_name}'s activity", palette_name="green", subtitle=f"Repo creations + repo contributions in {selected_year}", ) kind_chart_html = {} for kind in REPO_KINDS: kind_chart_html[kind] = render_contrib_chart( kind_dfs[kind], selected_year, REPO_KIND_LABELS[kind], palette_name=REPO_KIND_PALETTES[kind], subtitle=( f'{kind_views[kind]["created_count"]} created • ' f'{kind_views[kind]["repo_commit_count"]} repo contributions' ), ) logs_text = "\n".join(warnings) if warnings else "Done." return ( f"## {display_name}'s activity in {selected_year}", context_html, metrics_html, all_chart_html, kind_chart_html["model"], kind_chart_html["dataset"], kind_chart_html["space"], logs_text, ) # ============================================================ # Standardized section responses # ============================================================ def build_invalid_single_section_response( section_key: str, username: str, selected_year: int, identity: dict, ): cfg = SINGLE_SECTION_CONFIG[section_key] state = build_invalid_section_state( username, selected_year, lambda y: empty_single_section_state(section_key, y), identity, ) return ( state, render_generic_invalid_banner(username, identity.get("error")), gr.update(value=cfg["default_mode"], visible="hidden"), render_empty_chart(cfg["empty_chart_title"]), state["logs"], ) def build_idle_single_section_response( section_key: str, username: str, selected_year: int, profile: gr.OAuthProfile | None, logs_text: str | None = None, ): cfg = SINGLE_SECTION_CONFIG[section_key] state = empty_single_section_state(section_key, selected_year) return ( state, render_section_idle_banner(cfg["section_label"], username, selected_year, profile), gr.update(value=cfg["default_mode"], visible="hidden"), render_empty_chart(cfg["empty_chart_title"]), logs_text or cfg["empty_logs_text"], ) def build_invalid_community_response( username: str, selected_year: int, identity: dict, ): state = build_invalid_section_state(username, selected_year, empty_community_state, identity) return ( state, render_generic_invalid_banner(username, identity.get("error")), gr.update(value="both", visible="hidden"), render_empty_chart("Pull Requests"), gr.update(value="both", visible="hidden"), render_empty_chart("Community Discussions"), state["logs"], ) def build_idle_community_response( username: str, selected_year: int, profile: gr.OAuthProfile | None, logs_text: str, ): state = empty_community_state(selected_year) return ( state, render_section_idle_banner("community activity", username, selected_year, profile), gr.update(value="both", visible="hidden"), render_empty_chart("Pull Requests"), gr.update(value="both", visible="hidden"), render_empty_chart("Community Discussions"), logs_text, ) # ============================================================ # Stable return-based fetch / reset logic # ============================================================ def prepare_repo_fetch_ui( username, selected_year, current_visibility_mode, profile: gr.OAuthProfile | None, ): username = (username or "").strip() _ = int(selected_year) if is_self_view(username, profile): value = current_visibility_mode if current_visibility_mode in {"all", "public"} else "all" visibility_update = gr.update( choices=[("All repos", "all"), ("Public only", "public")], value=value, visible=True, interactive=True, ) else: visibility_update = gr.update( choices=[("All repos", "all"), ("Public only", "public")], value="public", visible="hidden", interactive=False, ) return render_fetching_context_banner(username, selected_year, profile), visibility_update def fetch_and_render_repo_only( username, selected_year, current_visibility_mode, profile: gr.OAuthProfile | None, oauth_token: gr.OAuthToken | None, ): username = (username or "").strip() selected_year = int(selected_year) app_state = build_app_state(username, selected_year, profile, oauth_token) if not app_state.get("is_valid_username", True): title_md, context_box, metrics_box, all_chart, model_chart, dataset_chart, space_chart, logs_box = render_invalid_app_state( app_state ) visibility_update = gr.update( choices=[("All repos", "all"), ("Public only", "public")], value="public", visible="hidden", interactive=False, ) return ( app_state, title_md, context_box, visibility_update, metrics_box, all_chart, model_chart, dataset_chart, space_chart, logs_box, ) requested_visibility = current_visibility_mode if current_visibility_mode in {"all", "public"} else app_state["default_visibility"] final_visibility = requested_visibility if app_state["can_toggle_visibility"] else "public" title_md, context_box, metrics_box, all_chart, model_chart, dataset_chart, space_chart, logs_box = render_app_state( app_state, final_visibility, profile, ) visibility_update = gr.update( choices=[("All repos", "all"), ("Public only", "public")], value=final_visibility, visible=True if app_state["can_toggle_visibility"] else "hidden", interactive=bool(app_state["can_toggle_visibility"]), ) return ( app_state, title_md, context_box, visibility_update, metrics_box, all_chart, model_chart, dataset_chart, space_chart, logs_box, ) def rerender_from_state(app_state, visibility_mode, profile: gr.OAuthProfile | None): return render_app_state(app_state, visibility_mode, profile) def maybe_reset_community_section( current_community_state, username, selected_year, pr_mode, discussion_mode, profile: gr.OAuthProfile | None, ): username = (username or "").strip() selected_year = int(selected_year) pr_mode = pr_mode or "both" discussion_mode = discussion_mode or "both" identity = resolve_identity(username, profile) if is_invalid_identity(identity, username): return build_invalid_community_response(username, selected_year, identity) if not current_community_state: return build_idle_community_response( username, selected_year, profile, "Community activity not fetched yet.", ) previous_username = (current_community_state.get("username") or "").strip() previous_year = int(current_community_state.get("selected_year") or selected_year) same_username = previous_username.lower() == username.lower() same_year = previous_year == selected_year if same_username and same_year and current_community_state.get("fetched"): pr_chart = render_pr_combined_chart( current_community_state, pr_mode, username=username, year=selected_year, ) discussion_chart = render_discussion_combined_chart( current_community_state, discussion_mode, username=username, year=selected_year, ) return ( current_community_state, render_section_loaded_banner("community activity", current_community_state, profile), gr.update(value=pr_mode, visible=True), pr_chart, gr.update(value=discussion_mode, visible=True), discussion_chart, current_community_state.get("logs", "Community activity already fetched."), ) return build_idle_community_response( username, selected_year, profile, "Community activity needs to be fetched for the current username/year.", ) def prepare_single_section_fetch_ui( section_key: str, username, selected_year, profile: gr.OAuthProfile | None, ): username = (username or "").strip() section_label = SINGLE_SECTION_CONFIG[section_key]["section_label"] identity = resolve_identity(username, profile) if is_invalid_identity(identity, username): return render_generic_invalid_banner(username, identity.get("error")) return render_section_fetching_banner(section_label, username, selected_year, profile) def maybe_reset_single_chart_section( section_key: str, current_state, username, selected_year, mode, profile: gr.OAuthProfile | None, ): cfg = SINGLE_SECTION_CONFIG[section_key] username = (username or "").strip() selected_year = int(selected_year) mode = mode or cfg["default_mode"] identity = resolve_identity(username, profile) if is_invalid_identity(identity, username): return build_invalid_single_section_response(section_key, username, selected_year, identity) if not current_state: return build_idle_single_section_response( section_key, username, selected_year, profile, cfg["empty_logs_text"], ) previous_username = (current_state.get("username") or "").strip() previous_year = int(current_state.get("selected_year") or selected_year) same_username = previous_username.lower() == username.lower() same_year = previous_year == selected_year if same_username and same_year and current_state.get("fetched"): chart = render_single_section_chart( section_key, current_state, mode, username=username, year=selected_year, ) return ( current_state, render_section_loaded_banner(cfg["section_label"], current_state, profile), gr.update(value=mode, visible=True), chart, current_state.get("logs", "Already fetched."), ) return build_idle_single_section_response( section_key, username, selected_year, profile, cfg["needs_fetch_text"], ) def fetch_and_render_single_chart_section( section_key: str, current_state, username, selected_year, mode, profile: gr.OAuthProfile | None, ): _ = current_state cfg = SINGLE_SECTION_CONFIG[section_key] username = (username or "").strip() selected_year = int(selected_year) mode = mode or cfg["default_mode"] identity = resolve_identity(username, profile) if is_invalid_identity(identity, username): state, context_box, mode_update, chart, logs_text = build_invalid_single_section_response( section_key, username, selected_year, identity, ) return ( state, context_box, chart, logs_text, mode_update, ) state = fetch_single_section_activity(section_key, username, selected_year) chart = render_single_section_chart( section_key, state, mode, username=username, year=selected_year, ) return ( state, render_section_loaded_banner(cfg["section_label"], state, profile), chart, state["logs"], gr.update(value=mode, visible=True), ) def rerender_single_chart_from_state(section_key: str, state, mode): cfg = SINGLE_SECTION_CONFIG[section_key] if not state: return render_empty_chart(cfg["empty_chart_title"]) return render_single_section_chart( section_key, state, mode or cfg["default_mode"], username=(state.get("username") or ""), year=int(state.get("selected_year") or datetime.now().year), ) def prepare_community_fetch_ui(username, selected_year, profile: gr.OAuthProfile | None): username = (username or "").strip() identity = resolve_identity(username, profile) if is_invalid_identity(identity, username): return render_generic_invalid_banner(username, identity.get("error")) return render_section_fetching_banner("community activity", username, selected_year, profile) def fetch_and_render_community( current_state, username, selected_year, pr_mode, discussion_mode, profile: gr.OAuthProfile | None, ): username = (username or "").strip() selected_year = int(selected_year) pr_mode = pr_mode or "both" discussion_mode = discussion_mode or "both" identity = resolve_identity(username, profile) if is_invalid_identity(identity, username): state, context_box, pr_mode_update, pr_chart, discussion_mode_update, discussion_chart, logs_text = build_invalid_community_response( username, selected_year, identity, ) return ( state, context_box, pr_chart, discussion_chart, logs_text, pr_mode_update, discussion_mode_update, ) state = fetch_all_community_activity(username, selected_year) pr_chart = render_pr_combined_chart( state, pr_mode, username=username, year=selected_year, ) discussion_chart = render_discussion_combined_chart( state, discussion_mode, username=username, year=selected_year, ) return ( state, render_section_loaded_banner("community activity", state, profile), pr_chart, discussion_chart, state["logs"], gr.update(value=pr_mode, visible=True), gr.update(value=discussion_mode, visible=True), ) def rerender_pr_from_state(community_state, pr_mode): if not community_state: return render_empty_chart("Pull Requests") return render_pr_combined_chart( community_state, pr_mode or "both", username=(community_state.get("username") or ""), year=int(community_state.get("selected_year") or datetime.now().year), ) def rerender_discussions_from_state(community_state, discussion_mode): if not community_state: return render_empty_chart("Community Discussions") return render_discussion_combined_chart( community_state, discussion_mode or "both", username=(community_state.get("username") or ""), year=int(community_state.get("selected_year") or datetime.now().year), ) # ============================================================ # Explicit wrappers for single sections # ============================================================ def prepare_upvotes_fetch_ui(username, selected_year, profile: gr.OAuthProfile | None): return prepare_single_section_fetch_ui("upvotes", username, selected_year, profile) def prepare_likes_fetch_ui(username, selected_year, profile: gr.OAuthProfile | None): return prepare_single_section_fetch_ui("likes", username, selected_year, profile) def prepare_papers_fetch_ui(username, selected_year, profile: gr.OAuthProfile | None): return prepare_single_section_fetch_ui("papers", username, selected_year, profile) def fetch_and_render_upvotes_section( current_state, username, selected_year, mode, profile: gr.OAuthProfile | None, ): return fetch_and_render_single_chart_section( "upvotes", current_state, username, selected_year, mode, profile, ) def fetch_and_render_likes_section( current_state, username, selected_year, mode, profile: gr.OAuthProfile | None, ): return fetch_and_render_single_chart_section( "likes", current_state, username, selected_year, mode, profile, ) def fetch_and_render_papers_section( current_state, username, selected_year, mode, profile: gr.OAuthProfile | None, ): return fetch_and_render_single_chart_section( "papers", current_state, username, selected_year, mode, profile, ) def rerender_upvotes_from_state(state, mode): return rerender_single_chart_from_state("upvotes", state, mode) def rerender_likes_from_state(state, mode): return rerender_single_chart_from_state("likes", state, mode) def rerender_papers_from_state(state, mode): return rerender_single_chart_from_state("papers", state, mode) def maybe_reset_upvotes_section( current_state, username, selected_year, mode, profile: gr.OAuthProfile | None, ): return maybe_reset_single_chart_section( "upvotes", current_state, username, selected_year, mode, profile, ) def maybe_reset_likes_section( current_state, username, selected_year, mode, profile: gr.OAuthProfile | None, ): return maybe_reset_single_chart_section( "likes", current_state, username, selected_year, mode, profile, ) def maybe_reset_papers_section( current_state, username, selected_year, mode, profile: gr.OAuthProfile | None, ): return maybe_reset_single_chart_section( "papers", current_state, username, selected_year, mode, profile, ) def hydrate_header_on_load( current_username, selected_year, profile: gr.OAuthProfile | None, ): current_username = (current_username or "").strip() selected_year = int(selected_year) profile_username = get_viewer_username(profile) username_update = gr.update() effective_username = current_username if not effective_username and profile_username: username_update = gr.update(value=profile_username) effective_username = profile_username repo_context_html = render_idle_context_banner(effective_username, profile) community_context_html = render_section_idle_banner("community activity", effective_username, selected_year, profile) upvotes_context_html = render_section_idle_banner("upvotes activity", effective_username, selected_year, profile) likes_context_html = render_section_idle_banner("likes activity", effective_username, selected_year, profile) papers_context_html = render_section_idle_banner("papers activity", effective_username, selected_year, profile) return ( username_update, repo_context_html, community_context_html, upvotes_context_html, likes_context_html, papers_context_html, ) # ============================================================ # UI helpers # ============================================================ def build_single_chart_section_ui(section_key: str): cfg = SINGLE_SECTION_CONFIG[section_key] gr.Markdown(f"## {cfg['section_label'].capitalize()}") fetch_button = gr.Button(cfg["button_text"], variant="secondary") context_box = gr.HTML(render_status_badge(cfg["empty_context_text"])) with gr.Column(elem_classes="toggles-container"): view_mode = gr.Radio( choices=cfg["radio_choices"], value=cfg["default_mode"], label=cfg["radio_label"], visible="hidden", elem_classes=["gh-toggle-radio"], elem_id=cfg["radio_elem_id"], ) chart = gr.HTML(render_empty_chart(cfg["empty_chart_title"])) logs_box = gr.Textbox( label=cfg["logs_label"], interactive=False, lines=cfg["logs_lines"], value=cfg["empty_logs_text"], ) return { "fetch_button": fetch_button, "context_box": context_box, "view_mode": view_mode, "chart": chart, "logs_box": logs_box, } def wire_single_chart_section( *, fetch_button, context_box, state_component, mode_component, chart_component, logs_component, username_component, year_component, prepare_fn, fetch_fn, rerender_fn, ): fetch_button.click( fn=prepare_fn, inputs=[username_component, year_component], outputs=context_box, show_progress="hidden", ).then( fn=fetch_fn, inputs=[state_component, username_component, year_component, mode_component], outputs=[state_component, context_box, chart_component, logs_component, mode_component], show_progress="full", show_progress_on=[chart_component, logs_component], ) mode_component.change( fn=rerender_fn, inputs=[state_component, mode_component], outputs=chart_component, show_progress="hidden", ) # ============================================================ # UI # ============================================================ year_options = list(range(datetime.now().year, 2017, -1)) with gr.Blocks(title="HF Contributions") as demo: app_state = gr.State(value=None) community_state = gr.State(value=None) section_states = { section_key: gr.State(value=None) for section_key in SINGLE_SECTION_CONFIG } gr.Markdown("# 🤗 Hugging Face Contributions") with gr.Row(elem_id="top-controls"): gr.LoginButton("Sign in with Hugging Face", logout_value="Logout ({})") username = gr.Textbox( label="Username / org", placeholder="Enter a Hugging Face username or org", scale=3, ) selected_year = gr.Dropdown( choices=year_options, value=year_options[0], label="Year", scale=1, ) run_button = gr.Button("Fetch activity", variant="primary", scale=1) context_box = gr.HTML( render_status_badge("Not logged in · Enter a username to get started") ) title_md = gr.Markdown("## Ready") with gr.Column(elem_classes="toggles-container"): visibility_mode = gr.Radio( choices=[("All repos", "all"), ("Public only", "public")], value="public", label="Visibility", visible="hidden", interactive=False, elem_classes=["gh-toggle-radio"], elem_id="repo-visibility-toggle", ) zero_counts = {kind: 0 for kind in REPO_KINDS} metrics_box = gr.HTML( render_metric_cards( 0, zero_counts, zero_counts, year_options[0], ) ) all_chart = gr.HTML(render_empty_chart("All activity")) model_chart = gr.HTML(render_empty_chart("Models")) dataset_chart = gr.HTML(render_empty_chart("Datasets")) space_chart = gr.HTML(render_empty_chart("Spaces")) logs_box = gr.Textbox(label="Logs / Warnings", interactive=False, lines=8) gr.Markdown("## Community activity") fetch_community_button = gr.Button("Fetch community activity", variant="secondary") community_context_box = gr.HTML( render_status_badge("Not logged in · Enter a username to fetch community activity") ) with gr.Column(elem_classes="toggles-container"): pr_view_mode = gr.Radio( choices=[ ("Opened", "opened"), ("Follow-up", "activity"), ("Both", "both"), ], value="both", label="Pull Requests view", visible="hidden", elem_classes=["gh-toggle-radio"], elem_id="pr-view-toggle", ) pr_chart = gr.HTML(render_empty_chart("Pull Requests")) with gr.Column(elem_classes="toggles-container"): discussion_view_mode = gr.Radio( choices=[ ("Opened", "opened"), ("Follow-up", "activity"), ("Both", "both"), ], value="both", label="Community Discussions view", visible="hidden", elem_classes=["gh-toggle-radio"], elem_id="discussion-view-toggle", ) discussion_chart = gr.HTML(render_empty_chart("Community Discussions")) community_logs_box = gr.Textbox( label="Community Fetch Logs", interactive=False, lines=12, value="Community activity not fetched yet.", ) single_section_components = {} for section_key in ("upvotes", "likes", "papers"): single_section_components[section_key] = build_single_chart_section_ui(section_key) repo_fetch_outputs = [ app_state, title_md, context_box, visibility_mode, metrics_box, all_chart, model_chart, dataset_chart, space_chart, logs_box, ] repo_render_outputs = [ title_md, context_box, metrics_box, all_chart, model_chart, dataset_chart, space_chart, logs_box, ] community_reset_outputs = [ community_state, community_context_box, pr_view_mode, pr_chart, discussion_view_mode, discussion_chart, community_logs_box, ] community_fetch_outputs = [ community_state, community_context_box, pr_chart, discussion_chart, community_logs_box, pr_view_mode, discussion_view_mode, ] single_section_outputs = {} for section_key, components in single_section_components.items(): single_section_outputs[f"{section_key}_reset"] = [ section_states[section_key], components["context_box"], components["view_mode"], components["chart"], components["logs_box"], ] single_section_outputs[f"{section_key}_fetch"] = [ section_states[section_key], components["context_box"], components["chart"], components["logs_box"], components["view_mode"], ] demo.load( fn=hydrate_header_on_load, inputs=[username, selected_year], outputs=[username, context_box, community_context_box] + [single_section_components[k]["context_box"] for k in ("upvotes", "likes", "papers")], show_progress="hidden", ) repo_chain = run_button.click( fn=prepare_repo_fetch_ui, inputs=[username, selected_year, visibility_mode], outputs=[context_box, visibility_mode], show_progress="hidden", ).then( fn=fetch_and_render_repo_only, inputs=[username, selected_year, visibility_mode], outputs=repo_fetch_outputs, show_progress="full", show_progress_on=[ metrics_box, all_chart, model_chart, dataset_chart, space_chart, logs_box, ], ).then( fn=maybe_reset_community_section, inputs=[community_state, username, selected_year, pr_view_mode, discussion_view_mode], outputs=community_reset_outputs, show_progress="hidden", ).then( fn=maybe_reset_upvotes_section, inputs=[ section_states["upvotes"], username, selected_year, single_section_components["upvotes"]["view_mode"], ], outputs=single_section_outputs["upvotes_reset"], show_progress="hidden", ).then( fn=maybe_reset_likes_section, inputs=[ section_states["likes"], username, selected_year, single_section_components["likes"]["view_mode"], ], outputs=single_section_outputs["likes_reset"], show_progress="hidden", ).then( fn=maybe_reset_papers_section, inputs=[ section_states["papers"], username, selected_year, single_section_components["papers"]["view_mode"], ], outputs=single_section_outputs["papers_reset"], show_progress="hidden", ) visibility_mode.change( fn=rerender_from_state, inputs=[app_state, visibility_mode], outputs=repo_render_outputs, show_progress="hidden", ) fetch_community_button.click( fn=prepare_community_fetch_ui, inputs=[username, selected_year], outputs=community_context_box, show_progress="hidden", ).then( fn=fetch_and_render_community, inputs=[community_state, username, selected_year, pr_view_mode, discussion_view_mode], outputs=community_fetch_outputs, show_progress="full", show_progress_on=[ pr_chart, discussion_chart, community_logs_box, ], ) pr_view_mode.change( fn=rerender_pr_from_state, inputs=[community_state, pr_view_mode], outputs=pr_chart, show_progress="hidden", ) discussion_view_mode.change( fn=rerender_discussions_from_state, inputs=[community_state, discussion_view_mode], outputs=discussion_chart, show_progress="hidden", ) wire_single_chart_section( fetch_button=single_section_components["upvotes"]["fetch_button"], context_box=single_section_components["upvotes"]["context_box"], state_component=section_states["upvotes"], mode_component=single_section_components["upvotes"]["view_mode"], chart_component=single_section_components["upvotes"]["chart"], logs_component=single_section_components["upvotes"]["logs_box"], username_component=username, year_component=selected_year, prepare_fn=prepare_upvotes_fetch_ui, fetch_fn=fetch_and_render_upvotes_section, rerender_fn=rerender_upvotes_from_state, ) wire_single_chart_section( fetch_button=single_section_components["likes"]["fetch_button"], context_box=single_section_components["likes"]["context_box"], state_component=section_states["likes"], mode_component=single_section_components["likes"]["view_mode"], chart_component=single_section_components["likes"]["chart"], logs_component=single_section_components["likes"]["logs_box"], username_component=username, year_component=selected_year, prepare_fn=prepare_likes_fetch_ui, fetch_fn=fetch_and_render_likes_section, rerender_fn=rerender_likes_from_state, ) wire_single_chart_section( fetch_button=single_section_components["papers"]["fetch_button"], context_box=single_section_components["papers"]["context_box"], state_component=section_states["papers"], mode_component=single_section_components["papers"]["view_mode"], chart_component=single_section_components["papers"]["chart"], logs_component=single_section_components["papers"]["logs_box"], username_component=username, year_component=selected_year, prepare_fn=prepare_papers_fetch_ui, fetch_fn=fetch_and_render_papers_section, rerender_fn=rerender_papers_from_state, ) demo.queue().launch(ssr_mode=False, mcp_server=False, css_paths=["styles.css"])