import html
import time
import threading
from datetime import datetime
from functools import lru_cache
from concurrent.futures import ThreadPoolExecutor, as_completed
import gradio as gr
import pandas as pd
import requests
from huggingface_hub import HfApi
# ============================================================
# Constants / shared config
# ============================================================
REPO_KINDS = ("model", "dataset", "space")
REPO_KIND_LABELS = {
"model": "Models",
"dataset": "Datasets",
"space": "Spaces",
}
REPO_KIND_EMOJIS = {
"model": "🧠",
"dataset": "📦",
"space": "🚀",
}
REPO_KIND_PALETTES = {
"model": "blue",
"dataset": "pink",
"space": "orange",
}
RECENT_ACTIVITY_LIMIT = 20
RECENT_ACTIVITY_MAX_PAGES = 300
RECENT_ACTIVITY_MAX_STALLED_PAGES = 5
# ============================================================
# API
# ============================================================
def get_api(token: str | None = None) -> HfApi:
return HfApi(token=token) if token else HfApi()
# ============================================================
# HTTP helpers
# ============================================================
def build_session(accept: str, token: str | None = None) -> requests.Session:
session = requests.Session()
session.headers.update(
{
"User-Agent": "hf-contributions-app/1.0",
"Accept": accept,
"Cache-Control": "no-cache",
}
)
if token:
session.headers["Authorization"] = f"Bearer {token}"
return session
# ============================================================
# Viewer helpers
# ============================================================
def get_viewer_username(profile: gr.OAuthProfile | None) -> str | None:
return getattr(profile, "username", None) if profile is not None else None
def is_self_view(username: str, profile: gr.OAuthProfile | None) -> bool:
username = (username or "").strip()
viewer = get_viewer_username(profile)
return bool(viewer and username and viewer.lower() == username.lower())
# ============================================================
# Recent-activity shared helpers
# ============================================================
def normalize_recent_activity_payload(payload):
if isinstance(payload, list):
return payload, None
if isinstance(payload, dict):
events = payload.get("recentActivity")
if isinstance(events, list):
return events, payload.get("cursor")
for key in ["items", "events", "activity"]:
value = payload.get(key)
if isinstance(value, list):
return value, payload.get("cursor")
return [], None
def get_year_bounds(selected_year: int):
year_start = pd.Timestamp(f"{selected_year}-01-01", tz="UTC")
year_end = pd.Timestamp(f"{selected_year}-12-31 23:59:59.999999", tz="UTC")
return year_start, year_end
def build_events_page_signature(events: list[dict]) -> tuple:
signature = []
for event in events:
if not isinstance(event, dict):
continue
signature.append(
(
event.get("eventId"),
event.get("time"),
event.get("repoId"),
event.get("type"),
event.get("targetUrl"),
)
)
return tuple(signature)
def parse_recent_activity_event_ts(event: dict):
raw_time = event.get("time")
if not raw_time:
return None
try:
return pd.to_datetime(raw_time, utc=True)
except Exception:
return None
def update_recent_activity_progress(
event_ts,
*,
year_start,
year_end,
oldest_event_ts_seen,
page_oldest_event_ts,
seen_target_year,
crossed_before_year_start,
):
if event_ts is None:
return (
oldest_event_ts_seen,
page_oldest_event_ts,
seen_target_year,
crossed_before_year_start,
)
if oldest_event_ts_seen is None or event_ts < oldest_event_ts_seen:
oldest_event_ts_seen = event_ts
if page_oldest_event_ts is None or event_ts < page_oldest_event_ts:
page_oldest_event_ts = event_ts
if year_start <= event_ts <= year_end:
seen_target_year = True
elif event_ts < year_start:
crossed_before_year_start = True
return (
oldest_event_ts_seen,
page_oldest_event_ts,
seen_target_year,
crossed_before_year_start,
)
def update_stalled_pagination_state(
*,
page_oldest_event_ts,
previous_oldest_event_ts_seen,
stalled_oldest_count,
):
if previous_oldest_event_ts_seen is not None and page_oldest_event_ts is not None:
if page_oldest_event_ts >= previous_oldest_event_ts_seen:
stalled_oldest_count += 1
else:
stalled_oldest_count = 0
if page_oldest_event_ts is not None:
previous_oldest_event_ts_seen = page_oldest_event_ts
return previous_oldest_event_ts_seen, stalled_oldest_count
# ============================================================
# Thread-safe rate limiters
# ============================================================
class RateLimiter:
def __init__(self, calls_per_second=10):
self.calls_per_second = calls_per_second
self.last_call = 0.0
self.lock = threading.Lock()
def wait(self):
with self.lock:
now = time.time()
min_interval = 1.0 / self.calls_per_second
elapsed = now - self.last_call
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
self.last_call = time.time()
rate_limiter = RateLimiter(calls_per_second=10)
recent_activity_rate_limiter = RateLimiter(calls_per_second=1.0)
# ============================================================
# Simple in-memory cache
# ============================================================
CACHE = {}
CACHE_TTL = 60 * 10 # 10 minutes
CACHE_LOCK = threading.Lock()
def get_cache(key):
with CACHE_LOCK:
entry = CACHE.get(key)
if not entry:
return None
value, timestamp = entry
if time.time() - timestamp > CACHE_TTL:
del CACHE[key]
return None
print(f"[CACHE HIT] {key}")
return value
def set_cache(key, value):
with CACHE_LOCK:
CACHE[key] = (value, time.time())
# ============================================================
# Username validation / identity resolution
# ============================================================
def empty_identity(username: str, error: str):
username = (username or "").strip()
return {
"exists": False,
"username": username,
"fullname": None,
"display_name": f"@{username}" if username else "",
"error": error,
"source": "empty",
}
def resolve_identity(username: str, profile: gr.OAuthProfile | None):
username = (username or "").strip()
if not username:
return empty_identity("", "No username provided.")
return validate_username(username, profile)
def is_invalid_identity(identity: dict, username: str) -> bool:
username = (username or "").strip()
return bool(username and not identity.get("exists"))
def validate_username(username: str, profile: gr.OAuthProfile | None):
username = (username or "").strip()
if not username:
return empty_identity(username, "No username provided.")
cache_key = ("identity", username.lower())
cached = get_cache(cache_key)
if cached is not None:
return cached
profile_username = get_viewer_username(profile)
profile_name = getattr(profile, "name", None) if profile is not None else None
if profile_username and profile_username.lower() == username.lower():
result = {
"exists": True,
"username": profile_username,
"fullname": profile_name.strip() if isinstance(profile_name, str) and profile_name.strip() else None,
"display_name": (
f"{profile_name.strip()} (@{profile_username})"
if isinstance(profile_name, str) and profile_name.strip()
else f"@{profile_username}"
),
"error": None,
"source": "oauth",
}
set_cache(cache_key, result)
return result
session = build_session("text/html,application/json")
try:
response = session.get(
f"https://huggingface.co/{username}",
timeout=15,
allow_redirects=True,
)
if response.status_code == 404:
result = empty_identity(username, f"Username or org '{username}' was not found.")
result["source"] = "profile_page"
set_cache(cache_key, result)
return result
response.raise_for_status()
result = {
"exists": True,
"username": username,
"fullname": None,
"display_name": f"@{username}",
"error": None,
"source": "profile_page",
}
set_cache(cache_key, result)
return result
except requests.HTTPError:
result = empty_identity(
username,
f"Username or org '{username}' could not be verified."
)
result["source"] = "profile_page_error"
set_cache(cache_key, result)
return result
except requests.RequestException:
result = empty_identity(
username,
"Could not verify this username right now. Please try again."
)
result["source"] = "network_error"
set_cache(cache_key, result)
return result
except Exception:
result = empty_identity(
username,
"Could not verify this username right now. Please try again."
)
result["source"] = "unknown_error"
set_cache(cache_key, result)
return result
# ============================================================
# Public cache only
# ============================================================
def list_items_with_api(api: HfApi, username: str, kind: str):
fetchers = {
"model": api.list_models,
"dataset": api.list_datasets,
"space": api.list_spaces,
}
fetcher = fetchers.get(kind)
return list(fetcher(author=username)) if fetcher else []
@lru_cache(maxsize=1000)
def cached_list_commits_public(repo_id: str, repo_type: str):
return list(HfApi().list_repo_commits(repo_id=repo_id, repo_type=repo_type))
@lru_cache(maxsize=200)
def cached_list_items_public(username: str, kind: str):
return list_items_with_api(HfApi(), username, kind)
def list_items(username: str, kind: str, token: str | None = None):
if token:
return list_items_with_api(get_api(token), username, kind)
return cached_list_items_public(username, kind)
def list_commits(repo_id: str, repo_type: str, token: str | None = None):
if token:
return list(get_api(token).list_repo_commits(repo_id=repo_id, repo_type=repo_type))
return cached_list_commits_public(repo_id, repo_type)
# ============================================================
# Repo activity fetch
# ============================================================
def fetch_commits_for_repo(
repo_id: str,
repo_type: str,
selected_year: int,
token: str | None = None,
):
try:
rate_limiter.wait()
commits = list_commits(repo_id, repo_type, token=token)
commit_dates = []
commit_count = 0
for commit in commits:
commit_created_at = getattr(commit, "created_at", None)
if commit_created_at is None:
continue
dt = pd.to_datetime(commit_created_at).tz_localize(None).date()
if dt.year == selected_year:
commit_dates.append(dt)
commit_count += 1
return commit_dates, commit_count, None
except Exception as e:
return [], 0, f"{repo_type}:{repo_id} -> {str(e)}"
def build_repo_entry(item, kind: str, selected_year: int, token: str | None = None):
repo_id = item.id
is_private = bool(getattr(item, "private", False))
created_dates = []
created_at = getattr(item, "created_at", None)
if created_at is not None:
dt = pd.to_datetime(created_at).tz_localize(None).date()
if dt.year == selected_year:
created_dates.append(dt)
repo_commit_dates, repo_commit_count, err = fetch_commits_for_repo(
repo_id=repo_id,
repo_type=kind,
selected_year=selected_year,
token=token,
)
created_count = len(created_dates)
activity_dates = created_dates + repo_commit_dates
activity_count = created_count + repo_commit_count
return {
"repo_id": repo_id,
"kind": kind,
"is_private": is_private,
"created_dates": created_dates,
"created_count": created_count,
"repo_commit_dates": repo_commit_dates,
"repo_commit_count": repo_commit_count,
"activity_dates": activity_dates,
"activity_count": activity_count,
"error": err,
}
def collect_kind_repo_entries(
username: str,
kind: str,
selected_year: int,
token: str | None = None,
):
try:
items = list_items(username, kind, token=token)
repo_entries = []
errors = []
chunk_size = 5
for i in range(0, len(items), chunk_size):
chunk = items[i:i + chunk_size]
if not chunk:
continue
with ThreadPoolExecutor(max_workers=min(5, len(chunk))) as executor:
futures = {
executor.submit(
build_repo_entry,
item,
kind,
selected_year,
token,
): item.id
for item in chunk
}
for future in as_completed(futures):
entry = future.result()
repo_entries.append(entry)
if entry["error"]:
errors.append(entry["error"])
return {
"total_items": len(items),
"repos": repo_entries,
"errors": errors,
}
except Exception as e:
return {
"total_items": 0,
"repos": [],
"errors": [f"Error fetching {kind}s for {username}: {str(e)}"],
}
def aggregate_repo_entries(repo_entries: list[dict], include_private: bool):
filtered = repo_entries if include_private else [r for r in repo_entries if not r["is_private"]]
created_dates = []
repo_commit_dates = []
activity_dates = []
created_count = 0
repo_commit_count = 0
repos_with_activity = 0
for repo in filtered:
created_dates.extend(repo["created_dates"])
repo_commit_dates.extend(repo["repo_commit_dates"])
activity_dates.extend(repo["activity_dates"])
created_count += repo["created_count"]
repo_commit_count += repo["repo_commit_count"]
if repo["activity_count"] > 0:
repos_with_activity += 1
return {
"created_dates": created_dates,
"created_count": created_count,
"repo_commit_dates": repo_commit_dates,
"repo_commit_count": repo_commit_count,
"activity_dates": activity_dates,
"activity_count": created_count + repo_commit_count,
"repos_with_activity": repos_with_activity,
}
# ============================================================
# Generic activity bucket helpers
# ============================================================
def empty_activity_bucket(title: str):
return {
"title": title,
"fetched": False,
"dates": [],
"count": 0,
"logs": f"{title} not fetched yet.",
}
def empty_bucketed_activity_state(
selected_year: int,
logs_text: str,
bucket_titles: dict[str, str],
):
state = {
"username": "",
"selected_year": int(selected_year),
"fetched": False,
"logs": logs_text,
}
for key, title in bucket_titles.items():
state[key] = empty_activity_bucket(title)
return state
def build_fetched_bucket(title: str, dates: list, selected_year: int, label: str):
return {
"title": title,
"fetched": True,
"dates": dates,
"count": len(dates),
"logs": f"Found {len(dates)} {label} in {selected_year}.",
}
def build_failed_bucket(title: str):
return {
"title": title,
"fetched": True,
"dates": [],
"count": 0,
"logs": "",
}
def build_invalid_section_state(username: str, selected_year: int, empty_state_fn, identity: dict):
state = empty_state_fn(selected_year)
state["username"] = username
state["selected_year"] = int(selected_year)
state["logs"] = identity.get("error") or f"Username or org '{username}' was not found."
return state
# ============================================================
# Community activity state
# ============================================================
def empty_community_state(selected_year: int):
return {
"username": "",
"selected_year": int(selected_year),
"fetched": False,
"pr_opened": empty_activity_bucket("Pull Requests"),
"pr_activity": empty_activity_bucket("PR Activity"),
"thread_opened": empty_activity_bucket("Community Threads"),
"discussion_activity": empty_activity_bucket("Community Activity"),
"logs": "Community activity not fetched yet.",
}
# ============================================================
# Generic recent-activity engine
# ============================================================
def collect_recent_activity(
*,
username: str,
selected_year: int,
activity_type: str,
processor,
build_session_fn,
rate_limiter_obj,
token: str | None = None,
limit: int = RECENT_ACTIVITY_LIMIT,
max_pages: int = RECENT_ACTIVITY_MAX_PAGES,
max_stalled_pages: int = RECENT_ACTIVITY_MAX_STALLED_PAGES,
):
username = (username or "").strip()
selected_year = int(selected_year)
processor_state = processor.init_state()
year_start, year_end = get_year_bounds(selected_year)
session = build_session_fn("application/json", token=token)
cursor = None
skip = 0
pages_fetched = 0
seen_target_year = False
crossed_before_year_start = False
oldest_event_ts_seen = None
previous_oldest_event_ts_seen = None
stalled_oldest_count = 0
seen_cursors = set()
seen_page_signatures = set()
logs = []
try:
while True:
if pages_fetched >= max_pages:
logs.append("Stopped due to safety max pages.")
break
rate_limiter_obj.wait()
params = {
"limit": limit,
"activityType": activity_type,
"feedType": "user",
"entity": username,
}
if cursor:
params["cursor"] = cursor
else:
params["skip"] = skip
response = session.get(
"https://huggingface.co/api/recent-activity",
params=params,
timeout=20,
)
response.raise_for_status()
payload = response.json()
pages_fetched += 1
events, next_cursor = normalize_recent_activity_payload(payload)
if not events:
logs.append("No more events.")
break
page_signature = build_events_page_signature(events)
if page_signature in seen_page_signatures:
logs.append("Stopped because recent-activity returned a repeated page.")
break
seen_page_signatures.add(page_signature)
if next_cursor:
if next_cursor in seen_cursors:
logs.append("Stopped because recent-activity returned a repeated cursor.")
break
seen_cursors.add(next_cursor)
page_oldest_event_ts = None
for event in events:
if not isinstance(event, dict):
continue
event_ts = parse_recent_activity_event_ts(event)
(
oldest_event_ts_seen,
page_oldest_event_ts,
seen_target_year,
crossed_before_year_start,
) = update_recent_activity_progress(
event_ts,
year_start=year_start,
year_end=year_end,
oldest_event_ts_seen=oldest_event_ts_seen,
page_oldest_event_ts=page_oldest_event_ts,
seen_target_year=seen_target_year,
crossed_before_year_start=crossed_before_year_start,
)
processor.process_event(
event=event,
event_ts=event_ts,
year_start=year_start,
year_end=year_end,
state=processor_state,
)
if crossed_before_year_start:
logs.append("Stopped after crossing before selected year.")
break
previous_oldest_event_ts_seen, stalled_oldest_count = update_stalled_pagination_state(
page_oldest_event_ts=page_oldest_event_ts,
previous_oldest_event_ts_seen=previous_oldest_event_ts_seen,
stalled_oldest_count=stalled_oldest_count,
)
if stalled_oldest_count >= max_stalled_pages:
logs.append("Stopped because pagination no longer progressed toward older events.")
break
if next_cursor:
cursor = next_cursor
else:
skip += limit
return processor.build_result(
username=username,
selected_year=selected_year,
state=processor_state,
logs=logs,
pages_fetched=pages_fetched,
oldest_event_ts_seen=oldest_event_ts_seen,
seen_target_year=seen_target_year,
crossed_before_year_start=crossed_before_year_start,
)
except Exception as e:
return processor.build_error_result(
username=username,
selected_year=selected_year,
error=e,
logs=logs,
)
# ============================================================
# Recent-activity processors
# ============================================================
class CommunityActivityProcessor:
TITLE_MAP = {
"pr_opened": "Pull Requests",
"pr_activity": "PR Activity",
"thread_opened": "Community Threads",
"discussion_activity": "Community Activity",
}
LOG_LABEL_MAP = {
"pr_opened": "unique pr_opened event(s)",
"pr_activity": "unique pr_activity event(s)",
"thread_opened": "unique thread_opened event(s)",
"discussion_activity": "unique discussion_activity event(s)",
}
OPEN_EVENT_EPSILON_SECONDS = 90
def __init__(self, username: str):
self.username = (username or "").strip()
def init_state(self):
return {
"dates": {
"pr_opened": [],
"pr_activity": [],
"thread_opened": [],
"discussion_activity": [],
},
"seen_ids": {
"pr_opened": set(),
"pr_activity": set(),
"thread_opened": set(),
"discussion_activity": set(),
},
}
def extract_author_name(self, discussion: dict) -> str | None:
author = discussion.get("author")
if isinstance(author, str):
return author
if isinstance(author, dict):
for key in ["name", "username", "login"]:
value = author.get(key)
if isinstance(value, str):
return value
return None
def parse_ts(self, raw_value):
if not raw_value:
return None
try:
return pd.to_datetime(raw_value, utc=True)
except Exception:
return None
def classify_event(self, event: dict, discussion: dict) -> tuple[str | None, str | None]:
is_pr = bool(discussion.get("isPullRequest", False))
author_name = self.extract_author_name(discussion)
repo_id = event.get("repoId", "unknown-repo")
discussion_num = discussion.get("num", "unknown-num")
event_id = event.get("eventId", "unknown-event")
if is_pr:
if author_name and author_name.lower() == self.username.lower():
return "pr_opened", f"{repo_id}::pr::{discussion_num}"
return "pr_activity", f"event::{event_id}"
if author_name and author_name.lower() == self.username.lower():
return "thread_opened", f"{repo_id}::thread::{discussion_num}"
return "discussion_activity", f"event::{event_id}"
def extract_business_timestamp(self, bucket_key: str, event: dict, discussion: dict):
event_ts = self.parse_ts(event.get("time"))
created_ts = self.parse_ts(discussion.get("createdAt"))
if bucket_key in {"pr_opened", "thread_opened"}:
return created_ts
if bucket_key in {"pr_activity", "discussion_activity"}:
if event_ts is None:
return None
if created_ts is not None:
delta_seconds = abs((event_ts - created_ts).total_seconds())
if delta_seconds <= self.OPEN_EVENT_EPSILON_SECONDS:
return None
return event_ts
return None
def process_event(
self,
*,
event: dict,
event_ts,
year_start,
year_end,
state: dict,
):
_ = event_ts
discussion = event.get("discussionData") or {}
if not isinstance(discussion, dict):
return
bucket_key, unique_id = self.classify_event(event, discussion)
if not bucket_key or not unique_id:
return
business_ts = self.extract_business_timestamp(bucket_key, event, discussion)
if business_ts is None:
return
if not (year_start <= business_ts <= year_end):
return
if unique_id in state["seen_ids"][bucket_key]:
return
state["seen_ids"][bucket_key].add(unique_id)
state["dates"][bucket_key].append(business_ts.tz_localize(None).date())
def build_bucket(self, key: str, dates: list, selected_year: int):
return {
"title": self.TITLE_MAP[key],
"fetched": True,
"dates": dates,
"count": len(dates),
"logs": f"Found {len(dates)} {self.LOG_LABEL_MAP[key]} in {selected_year}.",
}
def build_result(
self,
*,
username: str,
selected_year: int,
state: dict,
logs: list[str],
pages_fetched: int,
oldest_event_ts_seen,
seen_target_year: bool,
crossed_before_year_start: bool,
):
logs = list(logs)
logs.append(f"Fetched {pages_fetched} page(s) from recent-activity.")
if oldest_event_ts_seen is not None:
logs.append(f"Oldest event reached: {oldest_event_ts_seen.date()}.")
logs.append(f"Seen target year: {seen_target_year}.")
logs.append(f"Crossed before year start: {crossed_before_year_start}.")
logs.append(
" | ".join(
f"{self.TITLE_MAP[key]}: {len(state['dates'][key])}"
for key in ["pr_opened", "pr_activity", "thread_opened", "discussion_activity"]
)
)
combined_logs = "\n\n".join(
[
"[Pull Requests]\n" + self.build_bucket("pr_opened", state["dates"]["pr_opened"], selected_year)["logs"],
"[PR Activity]\n" + self.build_bucket("pr_activity", state["dates"]["pr_activity"], selected_year)["logs"],
"[Community Threads]\n" + self.build_bucket("thread_opened", state["dates"]["thread_opened"], selected_year)["logs"],
"[Community Activity]\n" + self.build_bucket("discussion_activity", state["dates"]["discussion_activity"], selected_year)["logs"],
"[Fetch]\n" + "\n".join(logs),
]
)
return {
"username": username,
"selected_year": selected_year,
"fetched": True,
"pr_opened": self.build_bucket("pr_opened", state["dates"]["pr_opened"], selected_year),
"pr_activity": self.build_bucket("pr_activity", state["dates"]["pr_activity"], selected_year),
"thread_opened": self.build_bucket("thread_opened", state["dates"]["thread_opened"], selected_year),
"discussion_activity": self.build_bucket("discussion_activity", state["dates"]["discussion_activity"], selected_year),
"logs": combined_logs,
}
def build_error_result(
self,
*,
username: str,
selected_year: int,
error,
logs: list[str],
):
_ = logs
return {
"username": username,
"selected_year": selected_year,
"fetched": True,
"pr_opened": build_failed_bucket("Pull Requests"),
"pr_activity": build_failed_bucket("PR Activity"),
"thread_opened": build_failed_bucket("Community Threads"),
"discussion_activity": build_failed_bucket("Community Activity"),
"logs": f"Community fetch failed: {str(error)}",
}
class BucketedActivityProcessor:
def __init__(
self,
*,
bucket_titles: dict[str, str],
bucket_log_labels: dict[str, str],
classifier,
error_label: str,
):
self.bucket_titles = bucket_titles
self.bucket_log_labels = bucket_log_labels
self.classifier = classifier
self.error_label = error_label
def init_state(self):
return {
"bucket_dates": {key: [] for key in self.bucket_titles},
"bucket_seen_ids": {key: set() for key in self.bucket_titles},
}
def process_event(
self,
*,
event: dict,
event_ts,
year_start,
year_end,
state: dict,
):
if event_ts is None:
return
if not (year_start <= event_ts <= year_end):
return
bucket_key, unique_id = self.classifier(event)
if not bucket_key or not unique_id:
return
if bucket_key not in state["bucket_dates"]:
return
if unique_id in state["bucket_seen_ids"][bucket_key]:
return
state["bucket_seen_ids"][bucket_key].add(unique_id)
state["bucket_dates"][bucket_key].append(event_ts.tz_localize(None).date())
def build_result(
self,
*,
username: str,
selected_year: int,
state: dict,
logs: list[str],
pages_fetched: int,
oldest_event_ts_seen,
seen_target_year: bool,
crossed_before_year_start: bool,
):
logs = list(logs)
logs.append(f"Fetched {pages_fetched} page(s) from recent-activity.")
if oldest_event_ts_seen is not None:
logs.append(f"Oldest event reached: {oldest_event_ts_seen.date()}.")
logs.append(f"Seen target year: {seen_target_year}.")
logs.append(f"Crossed before year start: {crossed_before_year_start}.")
logs.append(
" | ".join(
f"{self.bucket_titles[key]}: {len(state['bucket_dates'][key])}"
for key in self.bucket_titles
)
)
result = {
"username": username,
"selected_year": selected_year,
"fetched": True,
"logs": "\n".join(logs),
}
for key, title in self.bucket_titles.items():
dates = state["bucket_dates"][key]
result[key] = build_fetched_bucket(
title=title,
dates=dates,
selected_year=selected_year,
label=self.bucket_log_labels[key],
)
return result
def build_error_result(
self,
*,
username: str,
selected_year: int,
error,
logs: list[str],
):
_ = logs
result = {
"username": username,
"selected_year": selected_year,
"fetched": True,
"logs": f"{self.error_label} fetch failed: {str(error)}",
}
for key, title in self.bucket_titles.items():
result[key] = build_failed_bucket(title)
return result
# ============================================================
# Activity classifiers + config
# ============================================================
def classify_upvote_event(event: dict):
target_type = event.get("targetType")
target = event.get("target") or {}
if not isinstance(target, dict):
target = {}
unique_id = event.get("targetUrl") or target.get("id") or target.get("_id")
mapping = {
"paper": "papers",
"article": "blog",
"changelog": "changelog",
}
bucket_key = mapping.get(target_type)
if not bucket_key or not unique_id:
return None, None
return bucket_key, unique_id
def classify_like_event(event: dict):
repo_data = event.get("repoData") or {}
if not isinstance(repo_data, dict):
repo_data = {}
repo_type = event.get("repoType") or repo_data.get("repoType")
unique_id = event.get("repoId") or repo_data.get("id")
mapping = {
"model": "models",
"dataset": "datasets",
"space": "spaces",
}
bucket_key = mapping.get(repo_type)
if not bucket_key or not unique_id:
return None, None
return bucket_key, unique_id
def classify_paper_event(event: dict):
event_type = event.get("type")
paper = event.get("paper") or {}
if not isinstance(paper, dict):
paper = {}
unique_id = paper.get("id")
mapping = {
"paper-daily": "daily",
"paper": "authored",
}
bucket_key = mapping.get(event_type)
if not bucket_key or not unique_id:
return None, None
return bucket_key, unique_id
SINGLE_SECTION_CONFIG = {
"upvotes": {
"section_label": "upvotes activity",
"empty_chart_title": "Upvotes",
"empty_logs_text": "Upvotes activity not fetched yet.",
"needs_fetch_text": "Upvotes activity needs to be fetched for the current username/year.",
"default_mode": "all",
"palette_name": "purple",
"bucket_titles": {
"papers": "Paper Upvotes",
"blog": "Blog Upvotes",
"changelog": "Changelog Upvotes",
},
"bucket_log_labels": {
"papers": "paper upvote(s)",
"blog": "blog upvote(s)",
"changelog": "changelog upvote(s)",
},
"activity_type": "upvote",
"cache_scope": "upvotes",
"classifier": classify_upvote_event,
"mode_labels": {
"papers": "paper upvote(s)",
"blog": "blog upvote(s)",
"changelog": "changelog upvote(s)",
},
"button_text": "Fetch upvotes activity",
"empty_context_text": "Not logged in · Enter a username to fetch upvotes activity",
"radio_choices": [
("All", "all"),
("Papers", "papers"),
("Blog", "blog"),
("Changelog", "changelog"),
],
"radio_label": "Upvotes view",
"radio_elem_id": "upvotes-view-toggle",
"logs_label": "Upvotes Fetch Logs",
"logs_lines": 10,
},
"likes": {
"section_label": "likes activity",
"empty_chart_title": "Likes",
"empty_logs_text": "Likes activity not fetched yet.",
"needs_fetch_text": "Likes activity needs to be fetched for the current username/year.",
"default_mode": "all",
"palette_name": "blue",
"bucket_titles": {
"models": "Model Likes",
"datasets": "Dataset Likes",
"spaces": "Space Likes",
},
"bucket_log_labels": {
"models": "model like(s)",
"datasets": "dataset like(s)",
"spaces": "space like(s)",
},
"activity_type": "like",
"cache_scope": "likes",
"classifier": classify_like_event,
"mode_labels": {
"models": "model like(s)",
"datasets": "dataset like(s)",
"spaces": "space like(s)",
},
"button_text": "Fetch likes activity",
"empty_context_text": "Not logged in · Enter a username to fetch likes activity",
"radio_choices": [
("All", "all"),
("Models", "models"),
("Datasets", "datasets"),
("Spaces", "spaces"),
],
"radio_label": "Likes view",
"radio_elem_id": "likes-view-toggle",
"logs_label": "Likes Fetch Logs",
"logs_lines": 10,
},
"papers": {
"section_label": "papers activity",
"empty_chart_title": "Papers",
"empty_logs_text": "Papers activity not fetched yet.",
"needs_fetch_text": "Papers activity needs to be fetched for the current username/year.",
"default_mode": "all",
"palette_name": "teal",
"bucket_titles": {
"daily": "Daily Papers",
"authored": "Authored Papers",
},
"bucket_log_labels": {
"daily": "daily paper submission(s)",
"authored": "authored paper(s)",
},
"activity_type": "update-paper",
"cache_scope": "papers",
"classifier": classify_paper_event,
"mode_labels": {
"daily": "Daily Papers submission(s)",
"authored": "authored paper(s)",
},
"button_text": "Fetch papers activity",
"empty_context_text": "Not logged in · Enter a username to fetch papers activity",
"radio_choices": [
("All", "all"),
("Daily Papers", "daily"),
("Authored", "authored"),
],
"radio_label": "Papers view",
"radio_elem_id": "papers-view-toggle",
"logs_label": "Papers Fetch Logs",
"logs_lines": 10,
},
}
def empty_single_section_state(section_key: str, selected_year: int):
cfg = SINGLE_SECTION_CONFIG[section_key]
return empty_bucketed_activity_state(
selected_year=selected_year,
logs_text=cfg["empty_logs_text"],
bucket_titles=cfg["bucket_titles"],
)
# ============================================================
# Public activity fetch functions
# ============================================================
def fetch_all_community_activity(username: str, selected_year: int):
username = (username or "").strip()
selected_year = int(selected_year)
cache_key = (username.lower(), selected_year, "community")
cached = get_cache(cache_key)
if cached:
return cached
if not username:
state = empty_community_state(selected_year)
state["fetched"] = True
state["logs"] = "No username provided."
set_cache(cache_key, state)
return state
processor = CommunityActivityProcessor(username=username)
result = collect_recent_activity(
username=username,
selected_year=selected_year,
activity_type="discussion",
processor=processor,
build_session_fn=build_session,
rate_limiter_obj=recent_activity_rate_limiter,
)
set_cache(cache_key, result)
return result
def fetch_single_section_activity(section_key: str, username: str, selected_year: int):
username = (username or "").strip()
selected_year = int(selected_year)
cfg = SINGLE_SECTION_CONFIG[section_key]
cache_key = (username.lower(), selected_year, cfg["cache_scope"])
cached = get_cache(cache_key)
if cached:
return cached
if not username:
state = empty_single_section_state(section_key, selected_year)
state["fetched"] = True
state["logs"] = "No username provided."
set_cache(cache_key, state)
return state
processor = BucketedActivityProcessor(
bucket_titles=cfg["bucket_titles"],
bucket_log_labels=cfg["bucket_log_labels"],
classifier=cfg["classifier"],
error_label=cfg["cache_scope"].capitalize(),
)
result = collect_recent_activity(
username=username,
selected_year=selected_year,
activity_type=cfg["activity_type"],
processor=processor,
build_session_fn=build_session,
rate_limiter_obj=recent_activity_rate_limiter,
)
set_cache(cache_key, result)
return result
# ============================================================
# Contribution grid helpers
# ============================================================
PALETTES = {
"green": ["#0e4429", "#006d32", "#26a641", "#39d353"],
"blue": ["#0a3069", "#0969da", "#54aeff", "#79c0ff"],
"pink": ["#6e1f4d", "#bf3989", "#db61a2", "#f778ba"],
"orange": ["#5f370e", "#a14d00", "#d97706", "#f59e0b"],
"purple": ["#4c1d95", "#6d28d9", "#8b5cf6", "#c4b5fd"],
"teal": ["#0f766e", "#0d9488", "#14b8a6", "#5eead4"],
}
TOOLTIP_SAFE_COLUMNS = 2
def level_from_count(count: int) -> int:
if count <= 0:
return 0
if count == 1:
return 1
if count <= 3:
return 2
if count <= 7:
return 3
return 4
def level_color(palette_name: str, level: int) -> str:
if level <= 0:
return "var(--gh-empty-cell)"
palette = PALETTES[palette_name]
return palette[min(level - 1, len(palette) - 1)]
def build_daily_count_map(df: pd.DataFrame):
if df.empty:
return {}
counts = (
df.copy()
.assign(date=pd.to_datetime(df["date"]).dt.date, count=1)
.groupby("date", as_index=False)["count"]
.sum()
)
return {row["date"]: int(row["count"]) for _, row in counts.iterrows()}
def get_calendar_bounds(year: int):
start = pd.Timestamp(f"{year}-01-01")
end = pd.Timestamp(f"{year}-12-31")
grid_start = start - pd.Timedelta(days=start.dayofweek)
grid_end = end + pd.Timedelta(days=(6 - end.dayofweek))
all_days = pd.date_range(grid_start, grid_end, freq="D")
num_weeks = len(all_days) // 7
return start, end, grid_start, grid_end, all_days, num_weeks
def month_positions_for_year(year: int, grid_start: pd.Timestamp):
month_starts = pd.date_range(f"{year}-01-01", f"{year}-12-31", freq="MS")
return [(d.strftime("%b"), int((d - grid_start).days // 7)) for d in month_starts]
# ============================================================
# Name helpers
# ============================================================
def resolve_display_name(
username: str,
profile: gr.OAuthProfile | None,
identity: dict | None = None,
):
username = (username or "").strip()
if not username:
return ""
profile_username = get_viewer_username(profile)
profile_name = getattr(profile, "name", None) if profile is not None else None
if profile_username and profile_username.lower() == username.lower():
if isinstance(profile_name, str) and profile_name.strip():
return f"{profile_name.strip()} (@{profile_username})"
return f"@{profile_username}"
if identity:
display_name = identity.get("display_name")
if isinstance(display_name, str) and display_name.strip():
return display_name
fullname = identity.get("fullname")
identity_username = identity.get("username") or username
if isinstance(fullname, str) and fullname.strip():
return f"{fullname.strip()} (@{identity_username})"
if isinstance(identity_username, str) and identity_username.strip():
return f"@{identity_username.strip()}"
return f"@{username}"
# ============================================================
# HTML render helpers
# ============================================================
def render_status_badge(text: str):
return f'
{text}
'
def render_metric_cards(
total_activity: int,
created_by_type: dict,
repo_commit_counts_by_type: dict,
selected_year: int,
):
return f"""
Total activity
{total_activity}
Creations + repo contributions in {selected_year}
{REPO_KIND_EMOJIS["model"]} Models created
{created_by_type["model"]}
{repo_commit_counts_by_type["model"]} contributions on model repos
{REPO_KIND_EMOJIS["dataset"]} Datasets created
{created_by_type["dataset"]}
{repo_commit_counts_by_type["dataset"]} contributions on dataset repos
{REPO_KIND_EMOJIS["space"]} Spaces created
{created_by_type["space"]}
{repo_commit_counts_by_type["space"]} contributions on space repos
"""
# ============================================================
# Generic banner helpers
# ============================================================
def viewer_banner_prefix(profile: gr.OAuthProfile | None) -> list[str]:
viewer = get_viewer_username(profile)
if viewer:
return [f"Logged in as {html.escape(viewer)}"]
return ["Not logged in"]
def render_generic_invalid_banner(username: str, error_text: str | None = None):
username = (username or "").strip()
message = error_text or f"Username or org '{username}' was not found."
return render_status_badge(html.escape(message))
def render_section_idle_banner(
section_label: str,
username,
selected_year,
profile: gr.OAuthProfile | None,
):
username = (username or "").strip()
selected_year = int(selected_year)
parts = viewer_banner_prefix(profile)
if not username:
parts.append(f"Enter a username to fetch {section_label.lower()}")
return render_status_badge(" · ".join(parts))
if is_self_view(username, profile):
parts.append(f"Fetch your {section_label.lower()} or enter another username")
else:
parts.append(f"Ready to fetch {section_label.lower()} for {html.escape(username)}")
parts.append(str(selected_year))
return render_status_badge(" · ".join(parts))
def render_section_fetching_banner(
section_label: str,
username,
selected_year,
profile: gr.OAuthProfile | None,
):
username = (username or "").strip()
selected_year = int(selected_year)
if not username:
return render_section_idle_banner(section_label, "", selected_year, profile)
parts = viewer_banner_prefix(profile)
if is_self_view(username, profile):
parts.append(f"Fetching your {section_label.lower()}")
else:
parts.append(f"Fetching {section_label.lower()} for {html.escape(username)}")
parts.append(str(selected_year))
return render_status_badge(" · ".join(parts))
def render_section_loaded_banner(
section_label: str,
state: dict | None,
profile: gr.OAuthProfile | None,
):
year = state.get("selected_year", datetime.now().year) if state else datetime.now().year
if not state or not state.get("username"):
return render_section_idle_banner(section_label, "", year, profile)
username = state["username"]
selected_year = int(state["selected_year"])
parts = viewer_banner_prefix(profile)
if is_self_view(username, profile):
parts.append(f"Viewing your {section_label.lower()}")
else:
parts.append(f"Viewing {section_label.lower()} for {html.escape(username)}")
parts.append(str(selected_year))
return render_status_badge(" · ".join(parts))
# ============================================================
# Repo-specific banners
# ============================================================
def render_idle_context_banner(
username: str,
profile: gr.OAuthProfile | None,
):
username = (username or "").strip()
viewer = get_viewer_username(profile)
if viewer:
if not username or username.lower() == viewer.lower():
return render_status_badge(
f"Logged in as {html.escape(viewer)} · Fetch your activity or enter another username"
)
return render_status_badge(
f"Logged in as {html.escape(viewer)} · Ready to fetch {html.escape(username)} · Public repos only"
)
if username:
return render_status_badge(
f"Not logged in · Ready to fetch {html.escape(username)} · Public repos only"
)
return render_status_badge("Not logged in · Enter a username to get started")
def render_context_banner(app_state: dict | None, profile: gr.OAuthProfile | None):
if not app_state or not app_state.get("username"):
return render_idle_context_banner("", profile)
username = app_state["username"]
selected_year = int(app_state["selected_year"])
is_own = bool(app_state.get("is_own_profile", False))
can_toggle = bool(app_state.get("can_toggle_visibility", False))
viewer = get_viewer_username(profile)
parts = []
if viewer:
parts.append(f"Logged in as {html.escape(viewer)}")
else:
parts.append("Not logged in")
if is_self_view(username, profile):
parts.append("Viewing your activity")
else:
parts.append(f"Viewing {html.escape(username)}")
parts.append(str(selected_year))
if is_own and can_toggle:
parts.append("Private repos available")
else:
parts.append("Public repos only")
return render_status_badge(" · ".join(parts))
def render_fetching_context_banner(
username,
selected_year,
profile: gr.OAuthProfile | None,
):
username = (username or "").strip()
selected_year = int(selected_year)
viewer = get_viewer_username(profile)
if not username:
return render_idle_context_banner("", profile)
parts = []
if viewer:
parts.append(f"Logged in as {html.escape(viewer)}")
else:
parts.append("Not logged in")
if is_self_view(username, profile):
parts.append("Fetching your activity")
parts.append(str(selected_year))
parts.append("Checking public and private repos")
else:
parts.append(f"Fetching {html.escape(username)}")
parts.append(str(selected_year))
parts.append("Checking public repos")
return render_status_badge(" · ".join(parts))
def render_empty_chart(title: str):
safe_title = html.escape(title)
return f"""
"""
def render_contrib_chart(
df: pd.DataFrame,
year: int,
title: str,
palette_name: str = "green",
subtitle: str | None = None,
):
safe_title = html.escape(title)
safe_subtitle = html.escape(subtitle) if subtitle else ""
counts_map = build_daily_count_map(df)
year_start, year_end, grid_start, _, all_days, num_weeks = get_calendar_bounds(year)
months = month_positions_for_year(year, grid_start)
padded_weeks = num_weeks + (2 * TOOLTIP_SAFE_COLUMNS)
cells_html = []
total = 0
for day in all_days:
day_date = day.date()
in_year = year_start.date() <= day_date <= year_end.date()
count = counts_map.get(day_date, 0) if in_year else 0
total += count
week = int((day - grid_start).days // 7)
display_week = week + TOOLTIP_SAFE_COLUMNS
dow = int(day.dayofweek)
level = level_from_count(count)
color = level_color(palette_name, level) if in_year else "transparent"
label = f"{count} contribution{'s' if count != 1 else ''} on {day.strftime('%b %d, %Y')}"
if in_year:
classes = ["gh-cell"]
if dow == 0:
classes.append("gh-tooltip-bottom")
if week <= 1:
classes.append("gh-tooltip-right")
elif week >= num_weeks - 2:
classes.append("gh-tooltip-left")
class_attr = " ".join(classes)
safe_label = html.escape(label)
cells_html.append(
f"""
"""
)
else:
cells_html.append(
f"""
"""
)
month_labels = []
for label, pos in months:
display_pos = pos + TOOLTIP_SAFE_COLUMNS
month_labels.append(
f'{html.escape(label)}
'
)
legend_colors = ["var(--gh-empty-cell)"] + PALETTES[palette_name]
legend_html = "".join(
f'' for c in legend_colors
)
subtitle_html = f'{safe_subtitle}
' if subtitle else ""
return f"""
{''.join(month_labels)}
Mon
Wed
Fri
{''.join(cells_html)}
Less
{legend_html}
More
"""
def merge_dates(opened_dates: list, activity_dates: list, mode: str):
if mode == "opened":
return list(opened_dates)
if mode == "activity":
return list(activity_dates)
return list(opened_dates) + list(activity_dates)
def render_dual_mode_chart(
state: dict | None,
*,
opened_key: str,
activity_key: str,
mode: str,
username: str,
year: int,
title: str,
palette_name: str,
opened_label: str,
activity_label: str,
total_label: str,
):
if not state or not state.get("fetched"):
return render_empty_chart(title)
opened = state[opened_key]["dates"]
activity = state[activity_key]["dates"]
merged = merge_dates(opened, activity, mode)
df = pd.DataFrame(merged, columns=["date"])
subtitle = {
"opened": f"{len(opened)} {opened_label} by {username} in {year}",
"activity": f"{len(activity)} {activity_label} for {username} in {year}",
"both": f"{len(opened) + len(activity)} total {total_label} for {username} in {year}",
}[mode]
return render_contrib_chart(
df,
year,
title,
palette_name=palette_name,
subtitle=subtitle,
)
def render_pr_combined_chart(community_state: dict | None, mode: str, username: str, year: int):
return render_dual_mode_chart(
community_state,
opened_key="pr_opened",
activity_key="pr_activity",
mode=mode,
username=username,
year=year,
title="Pull Requests",
palette_name="purple",
opened_label="PRs opened",
activity_label="PR follow-up events",
total_label="PR events",
)
def render_discussion_combined_chart(community_state: dict | None, mode: str, username: str, year: int):
return render_dual_mode_chart(
community_state,
opened_key="thread_opened",
activity_key="discussion_activity",
mode=mode,
username=username,
year=year,
title="Community Discussions",
palette_name="teal",
opened_label="threads opened",
activity_label="discussion follow-up events",
total_label="discussion events",
)
def render_single_section_chart(section_key: str, state: dict | None, mode: str, username: str, year: int):
cfg = SINGLE_SECTION_CONFIG[section_key]
if not state or not state.get("fetched"):
return render_empty_chart(cfg["empty_chart_title"])
bucket_keys = list(cfg["bucket_titles"].keys())
bucket_dates = {key: state[key]["dates"] for key in bucket_keys}
if mode in bucket_dates:
merged = list(bucket_dates[mode])
subtitle = f"{len(merged)} {cfg['mode_labels'][mode]} by {username} in {year}"
else:
merged = [d for key in bucket_keys for d in bucket_dates[key]]
subtitle = f"{len(merged)} total {cfg['empty_chart_title'].lower()} event(s) by {username} in {year}"
df = pd.DataFrame(merged, columns=["date"])
return render_contrib_chart(
df,
year,
cfg["empty_chart_title"],
palette_name=cfg["palette_name"],
subtitle=subtitle,
)
# ============================================================
# App state + render layer
# ============================================================
def build_empty_repo_kind_data():
return {
kind: {"total_items": 0, "repos": [], "errors": []}
for kind in REPO_KINDS
}
def build_empty_app_state(
username: str,
selected_year: int,
viewer_username: str | None = None,
identity: dict | None = None,
):
resolved_identity = identity or (
validate_username(username, None) if username else empty_identity("", "No username provided.")
)
return {
"username": username,
"selected_year": int(selected_year),
"viewer_username": viewer_username,
"is_own_profile": False,
"can_toggle_visibility": False,
"default_visibility": "public",
"kind_repo_data": build_empty_repo_kind_data(),
"identity": resolved_identity,
"is_valid_username": bool(resolved_identity.get("exists")),
"validation_error": resolved_identity.get("error"),
}
def build_app_state(
username: str,
selected_year: int,
profile: gr.OAuthProfile | None,
oauth_token: gr.OAuthToken | None,
):
username = (username or "").strip()
selected_year = int(selected_year)
viewer_username = get_viewer_username(profile)
token = oauth_token.token if oauth_token is not None else None
if not username:
return build_empty_app_state(username, selected_year, viewer_username)
identity = validate_username(username, profile)
if not identity.get("exists"):
return {
**build_empty_app_state(
username,
selected_year,
viewer_username,
identity=identity,
),
"identity": identity,
"is_valid_username": False,
"validation_error": identity.get("error") or f"Username or org '{username}' was not found.",
}
is_own_profile = bool(
viewer_username
and username
and viewer_username.lower() == username.lower()
and token
)
can_toggle_visibility = is_own_profile
token_for_fetch = token if is_own_profile else None
scope = "repo_private" if is_own_profile else "repo_public"
cache_key = (username.lower(), selected_year, scope)
cached = get_cache(cache_key)
if cached:
cached_with_identity = dict(cached)
cached_with_identity["identity"] = identity
cached_with_identity["is_valid_username"] = True
cached_with_identity["validation_error"] = None
return cached_with_identity
kind_repo_data = {}
for kind in REPO_KINDS:
kind_repo_data[kind] = collect_kind_repo_entries(
username=username,
kind=kind,
selected_year=selected_year,
token=token_for_fetch,
)
result = {
"username": username,
"selected_year": selected_year,
"viewer_username": viewer_username,
"is_own_profile": is_own_profile,
"can_toggle_visibility": can_toggle_visibility,
"default_visibility": "all" if can_toggle_visibility else "public",
"kind_repo_data": kind_repo_data,
"identity": identity,
"is_valid_username": True,
"validation_error": None,
}
set_cache(cache_key, result)
return result
def render_invalid_app_state(app_state: dict | None):
username = (app_state or {}).get("username", "")
selected_year = int((app_state or {}).get("selected_year", datetime.now().year))
error_text = (app_state or {}).get("validation_error") or f"Username or org '{username}' was not found."
empty_counts = {kind: 0 for kind in REPO_KINDS}
empty_metrics = render_metric_cards(
0,
empty_counts,
empty_counts,
selected_year,
)
return (
"## Ready",
render_generic_invalid_banner(username, error_text),
empty_metrics,
render_empty_chart("All activity"),
render_empty_chart("Models"),
render_empty_chart("Datasets"),
render_empty_chart("Spaces"),
error_text,
)
def render_app_state(
app_state: dict | None,
visibility_mode: str | None,
profile: gr.OAuthProfile | None,
):
if not app_state or not app_state.get("username"):
selected_year = datetime.now().year
empty_counts = {kind: 0 for kind in REPO_KINDS}
empty_metrics = render_metric_cards(
0,
empty_counts,
empty_counts,
selected_year,
)
context_html = render_idle_context_banner("", profile)
return (
"## Ready",
context_html,
empty_metrics,
render_empty_chart("All activity"),
render_empty_chart("Models"),
render_empty_chart("Datasets"),
render_empty_chart("Spaces"),
"No username provided.",
)
if not app_state.get("is_valid_username", True):
return render_invalid_app_state(app_state)
username = app_state["username"]
selected_year = int(app_state["selected_year"])
can_toggle_visibility = bool(app_state["can_toggle_visibility"])
effective_visibility = "all" if (visibility_mode == "all" and can_toggle_visibility) else "public"
include_private = effective_visibility == "all"
kind_views = {}
warnings = []
for kind in REPO_KINDS:
kind_data = app_state["kind_repo_data"][kind]
kind_views[kind] = aggregate_repo_entries(
kind_data["repos"],
include_private=include_private,
)
warnings.extend(kind_data["errors"])
total_activity = sum(v["activity_count"] for v in kind_views.values())
all_activity_dates = []
for kind in REPO_KINDS:
all_activity_dates.extend(kind_views[kind]["activity_dates"])
all_df = pd.DataFrame(all_activity_dates, columns=["date"])
kind_dfs = {
kind: pd.DataFrame(kind_views[kind]["activity_dates"], columns=["date"])
for kind in REPO_KINDS
}
context_html = render_context_banner(app_state, profile)
display_name = resolve_display_name(username, profile, app_state.get("identity"))
metrics_html = render_metric_cards(
total_activity=total_activity,
created_by_type={kind: kind_views[kind]["created_count"] for kind in REPO_KINDS},
repo_commit_counts_by_type={kind: kind_views[kind]["repo_commit_count"] for kind in REPO_KINDS},
selected_year=selected_year,
)
all_chart_html = render_contrib_chart(
all_df,
selected_year,
f"{display_name}'s activity",
palette_name="green",
subtitle=f"Repo creations + repo contributions in {selected_year}",
)
kind_chart_html = {}
for kind in REPO_KINDS:
kind_chart_html[kind] = render_contrib_chart(
kind_dfs[kind],
selected_year,
REPO_KIND_LABELS[kind],
palette_name=REPO_KIND_PALETTES[kind],
subtitle=(
f'{kind_views[kind]["created_count"]} created • '
f'{kind_views[kind]["repo_commit_count"]} repo contributions'
),
)
logs_text = "\n".join(warnings) if warnings else "Done."
return (
f"## {display_name}'s activity in {selected_year}",
context_html,
metrics_html,
all_chart_html,
kind_chart_html["model"],
kind_chart_html["dataset"],
kind_chart_html["space"],
logs_text,
)
# ============================================================
# Standardized section responses
# ============================================================
def build_invalid_single_section_response(
section_key: str,
username: str,
selected_year: int,
identity: dict,
):
cfg = SINGLE_SECTION_CONFIG[section_key]
state = build_invalid_section_state(
username,
selected_year,
lambda y: empty_single_section_state(section_key, y),
identity,
)
return (
state,
render_generic_invalid_banner(username, identity.get("error")),
gr.update(value=cfg["default_mode"], visible="hidden"),
render_empty_chart(cfg["empty_chart_title"]),
state["logs"],
)
def build_idle_single_section_response(
section_key: str,
username: str,
selected_year: int,
profile: gr.OAuthProfile | None,
logs_text: str | None = None,
):
cfg = SINGLE_SECTION_CONFIG[section_key]
state = empty_single_section_state(section_key, selected_year)
return (
state,
render_section_idle_banner(cfg["section_label"], username, selected_year, profile),
gr.update(value=cfg["default_mode"], visible="hidden"),
render_empty_chart(cfg["empty_chart_title"]),
logs_text or cfg["empty_logs_text"],
)
def build_invalid_community_response(
username: str,
selected_year: int,
identity: dict,
):
state = build_invalid_section_state(username, selected_year, empty_community_state, identity)
return (
state,
render_generic_invalid_banner(username, identity.get("error")),
gr.update(value="both", visible="hidden"),
render_empty_chart("Pull Requests"),
gr.update(value="both", visible="hidden"),
render_empty_chart("Community Discussions"),
state["logs"],
)
def build_idle_community_response(
username: str,
selected_year: int,
profile: gr.OAuthProfile | None,
logs_text: str,
):
state = empty_community_state(selected_year)
return (
state,
render_section_idle_banner("community activity", username, selected_year, profile),
gr.update(value="both", visible="hidden"),
render_empty_chart("Pull Requests"),
gr.update(value="both", visible="hidden"),
render_empty_chart("Community Discussions"),
logs_text,
)
# ============================================================
# Stable return-based fetch / reset logic
# ============================================================
def prepare_repo_fetch_ui(
username,
selected_year,
current_visibility_mode,
profile: gr.OAuthProfile | None,
):
username = (username or "").strip()
_ = int(selected_year)
if is_self_view(username, profile):
value = current_visibility_mode if current_visibility_mode in {"all", "public"} else "all"
visibility_update = gr.update(
choices=[("All repos", "all"), ("Public only", "public")],
value=value,
visible=True,
interactive=True,
)
else:
visibility_update = gr.update(
choices=[("All repos", "all"), ("Public only", "public")],
value="public",
visible="hidden",
interactive=False,
)
return render_fetching_context_banner(username, selected_year, profile), visibility_update
def fetch_and_render_repo_only(
username,
selected_year,
current_visibility_mode,
profile: gr.OAuthProfile | None,
oauth_token: gr.OAuthToken | None,
):
username = (username or "").strip()
selected_year = int(selected_year)
app_state = build_app_state(username, selected_year, profile, oauth_token)
if not app_state.get("is_valid_username", True):
title_md, context_box, metrics_box, all_chart, model_chart, dataset_chart, space_chart, logs_box = render_invalid_app_state(
app_state
)
visibility_update = gr.update(
choices=[("All repos", "all"), ("Public only", "public")],
value="public",
visible="hidden",
interactive=False,
)
return (
app_state,
title_md,
context_box,
visibility_update,
metrics_box,
all_chart,
model_chart,
dataset_chart,
space_chart,
logs_box,
)
requested_visibility = current_visibility_mode if current_visibility_mode in {"all", "public"} else app_state["default_visibility"]
final_visibility = requested_visibility if app_state["can_toggle_visibility"] else "public"
title_md, context_box, metrics_box, all_chart, model_chart, dataset_chart, space_chart, logs_box = render_app_state(
app_state,
final_visibility,
profile,
)
visibility_update = gr.update(
choices=[("All repos", "all"), ("Public only", "public")],
value=final_visibility,
visible=True if app_state["can_toggle_visibility"] else "hidden",
interactive=bool(app_state["can_toggle_visibility"]),
)
return (
app_state,
title_md,
context_box,
visibility_update,
metrics_box,
all_chart,
model_chart,
dataset_chart,
space_chart,
logs_box,
)
def rerender_from_state(app_state, visibility_mode, profile: gr.OAuthProfile | None):
return render_app_state(app_state, visibility_mode, profile)
def maybe_reset_community_section(
current_community_state,
username,
selected_year,
pr_mode,
discussion_mode,
profile: gr.OAuthProfile | None,
):
username = (username or "").strip()
selected_year = int(selected_year)
pr_mode = pr_mode or "both"
discussion_mode = discussion_mode or "both"
identity = resolve_identity(username, profile)
if is_invalid_identity(identity, username):
return build_invalid_community_response(username, selected_year, identity)
if not current_community_state:
return build_idle_community_response(
username,
selected_year,
profile,
"Community activity not fetched yet.",
)
previous_username = (current_community_state.get("username") or "").strip()
previous_year = int(current_community_state.get("selected_year") or selected_year)
same_username = previous_username.lower() == username.lower()
same_year = previous_year == selected_year
if same_username and same_year and current_community_state.get("fetched"):
pr_chart = render_pr_combined_chart(
current_community_state,
pr_mode,
username=username,
year=selected_year,
)
discussion_chart = render_discussion_combined_chart(
current_community_state,
discussion_mode,
username=username,
year=selected_year,
)
return (
current_community_state,
render_section_loaded_banner("community activity", current_community_state, profile),
gr.update(value=pr_mode, visible=True),
pr_chart,
gr.update(value=discussion_mode, visible=True),
discussion_chart,
current_community_state.get("logs", "Community activity already fetched."),
)
return build_idle_community_response(
username,
selected_year,
profile,
"Community activity needs to be fetched for the current username/year.",
)
def prepare_single_section_fetch_ui(
section_key: str,
username,
selected_year,
profile: gr.OAuthProfile | None,
):
username = (username or "").strip()
section_label = SINGLE_SECTION_CONFIG[section_key]["section_label"]
identity = resolve_identity(username, profile)
if is_invalid_identity(identity, username):
return render_generic_invalid_banner(username, identity.get("error"))
return render_section_fetching_banner(section_label, username, selected_year, profile)
def maybe_reset_single_chart_section(
section_key: str,
current_state,
username,
selected_year,
mode,
profile: gr.OAuthProfile | None,
):
cfg = SINGLE_SECTION_CONFIG[section_key]
username = (username or "").strip()
selected_year = int(selected_year)
mode = mode or cfg["default_mode"]
identity = resolve_identity(username, profile)
if is_invalid_identity(identity, username):
return build_invalid_single_section_response(section_key, username, selected_year, identity)
if not current_state:
return build_idle_single_section_response(
section_key,
username,
selected_year,
profile,
cfg["empty_logs_text"],
)
previous_username = (current_state.get("username") or "").strip()
previous_year = int(current_state.get("selected_year") or selected_year)
same_username = previous_username.lower() == username.lower()
same_year = previous_year == selected_year
if same_username and same_year and current_state.get("fetched"):
chart = render_single_section_chart(
section_key,
current_state,
mode,
username=username,
year=selected_year,
)
return (
current_state,
render_section_loaded_banner(cfg["section_label"], current_state, profile),
gr.update(value=mode, visible=True),
chart,
current_state.get("logs", "Already fetched."),
)
return build_idle_single_section_response(
section_key,
username,
selected_year,
profile,
cfg["needs_fetch_text"],
)
def fetch_and_render_single_chart_section(
section_key: str,
current_state,
username,
selected_year,
mode,
profile: gr.OAuthProfile | None,
):
_ = current_state
cfg = SINGLE_SECTION_CONFIG[section_key]
username = (username or "").strip()
selected_year = int(selected_year)
mode = mode or cfg["default_mode"]
identity = resolve_identity(username, profile)
if is_invalid_identity(identity, username):
state, context_box, mode_update, chart, logs_text = build_invalid_single_section_response(
section_key,
username,
selected_year,
identity,
)
return (
state,
context_box,
chart,
logs_text,
mode_update,
)
state = fetch_single_section_activity(section_key, username, selected_year)
chart = render_single_section_chart(
section_key,
state,
mode,
username=username,
year=selected_year,
)
return (
state,
render_section_loaded_banner(cfg["section_label"], state, profile),
chart,
state["logs"],
gr.update(value=mode, visible=True),
)
def rerender_single_chart_from_state(section_key: str, state, mode):
cfg = SINGLE_SECTION_CONFIG[section_key]
if not state:
return render_empty_chart(cfg["empty_chart_title"])
return render_single_section_chart(
section_key,
state,
mode or cfg["default_mode"],
username=(state.get("username") or ""),
year=int(state.get("selected_year") or datetime.now().year),
)
def prepare_community_fetch_ui(username, selected_year, profile: gr.OAuthProfile | None):
username = (username or "").strip()
identity = resolve_identity(username, profile)
if is_invalid_identity(identity, username):
return render_generic_invalid_banner(username, identity.get("error"))
return render_section_fetching_banner("community activity", username, selected_year, profile)
def fetch_and_render_community(
current_state,
username,
selected_year,
pr_mode,
discussion_mode,
profile: gr.OAuthProfile | None,
):
username = (username or "").strip()
selected_year = int(selected_year)
pr_mode = pr_mode or "both"
discussion_mode = discussion_mode or "both"
identity = resolve_identity(username, profile)
if is_invalid_identity(identity, username):
state, context_box, pr_mode_update, pr_chart, discussion_mode_update, discussion_chart, logs_text = build_invalid_community_response(
username,
selected_year,
identity,
)
return (
state,
context_box,
pr_chart,
discussion_chart,
logs_text,
pr_mode_update,
discussion_mode_update,
)
state = fetch_all_community_activity(username, selected_year)
pr_chart = render_pr_combined_chart(
state,
pr_mode,
username=username,
year=selected_year,
)
discussion_chart = render_discussion_combined_chart(
state,
discussion_mode,
username=username,
year=selected_year,
)
return (
state,
render_section_loaded_banner("community activity", state, profile),
pr_chart,
discussion_chart,
state["logs"],
gr.update(value=pr_mode, visible=True),
gr.update(value=discussion_mode, visible=True),
)
def rerender_pr_from_state(community_state, pr_mode):
if not community_state:
return render_empty_chart("Pull Requests")
return render_pr_combined_chart(
community_state,
pr_mode or "both",
username=(community_state.get("username") or ""),
year=int(community_state.get("selected_year") or datetime.now().year),
)
def rerender_discussions_from_state(community_state, discussion_mode):
if not community_state:
return render_empty_chart("Community Discussions")
return render_discussion_combined_chart(
community_state,
discussion_mode or "both",
username=(community_state.get("username") or ""),
year=int(community_state.get("selected_year") or datetime.now().year),
)
# ============================================================
# Explicit wrappers for single sections
# ============================================================
def prepare_upvotes_fetch_ui(username, selected_year, profile: gr.OAuthProfile | None):
return prepare_single_section_fetch_ui("upvotes", username, selected_year, profile)
def prepare_likes_fetch_ui(username, selected_year, profile: gr.OAuthProfile | None):
return prepare_single_section_fetch_ui("likes", username, selected_year, profile)
def prepare_papers_fetch_ui(username, selected_year, profile: gr.OAuthProfile | None):
return prepare_single_section_fetch_ui("papers", username, selected_year, profile)
def fetch_and_render_upvotes_section(
current_state,
username,
selected_year,
mode,
profile: gr.OAuthProfile | None,
):
return fetch_and_render_single_chart_section(
"upvotes",
current_state,
username,
selected_year,
mode,
profile,
)
def fetch_and_render_likes_section(
current_state,
username,
selected_year,
mode,
profile: gr.OAuthProfile | None,
):
return fetch_and_render_single_chart_section(
"likes",
current_state,
username,
selected_year,
mode,
profile,
)
def fetch_and_render_papers_section(
current_state,
username,
selected_year,
mode,
profile: gr.OAuthProfile | None,
):
return fetch_and_render_single_chart_section(
"papers",
current_state,
username,
selected_year,
mode,
profile,
)
def rerender_upvotes_from_state(state, mode):
return rerender_single_chart_from_state("upvotes", state, mode)
def rerender_likes_from_state(state, mode):
return rerender_single_chart_from_state("likes", state, mode)
def rerender_papers_from_state(state, mode):
return rerender_single_chart_from_state("papers", state, mode)
def maybe_reset_upvotes_section(
current_state,
username,
selected_year,
mode,
profile: gr.OAuthProfile | None,
):
return maybe_reset_single_chart_section(
"upvotes",
current_state,
username,
selected_year,
mode,
profile,
)
def maybe_reset_likes_section(
current_state,
username,
selected_year,
mode,
profile: gr.OAuthProfile | None,
):
return maybe_reset_single_chart_section(
"likes",
current_state,
username,
selected_year,
mode,
profile,
)
def maybe_reset_papers_section(
current_state,
username,
selected_year,
mode,
profile: gr.OAuthProfile | None,
):
return maybe_reset_single_chart_section(
"papers",
current_state,
username,
selected_year,
mode,
profile,
)
def hydrate_header_on_load(
current_username,
selected_year,
profile: gr.OAuthProfile | None,
):
current_username = (current_username or "").strip()
selected_year = int(selected_year)
profile_username = get_viewer_username(profile)
username_update = gr.update()
effective_username = current_username
if not effective_username and profile_username:
username_update = gr.update(value=profile_username)
effective_username = profile_username
repo_context_html = render_idle_context_banner(effective_username, profile)
community_context_html = render_section_idle_banner("community activity", effective_username, selected_year, profile)
upvotes_context_html = render_section_idle_banner("upvotes activity", effective_username, selected_year, profile)
likes_context_html = render_section_idle_banner("likes activity", effective_username, selected_year, profile)
papers_context_html = render_section_idle_banner("papers activity", effective_username, selected_year, profile)
return (
username_update,
repo_context_html,
community_context_html,
upvotes_context_html,
likes_context_html,
papers_context_html,
)
# ============================================================
# UI helpers
# ============================================================
def build_single_chart_section_ui(section_key: str):
cfg = SINGLE_SECTION_CONFIG[section_key]
gr.Markdown(f"## {cfg['section_label'].capitalize()}")
fetch_button = gr.Button(cfg["button_text"], variant="secondary")
context_box = gr.HTML(render_status_badge(cfg["empty_context_text"]))
with gr.Column(elem_classes="toggles-container"):
view_mode = gr.Radio(
choices=cfg["radio_choices"],
value=cfg["default_mode"],
label=cfg["radio_label"],
visible="hidden",
elem_classes=["gh-toggle-radio"],
elem_id=cfg["radio_elem_id"],
)
chart = gr.HTML(render_empty_chart(cfg["empty_chart_title"]))
logs_box = gr.Textbox(
label=cfg["logs_label"],
interactive=False,
lines=cfg["logs_lines"],
value=cfg["empty_logs_text"],
)
return {
"fetch_button": fetch_button,
"context_box": context_box,
"view_mode": view_mode,
"chart": chart,
"logs_box": logs_box,
}
def wire_single_chart_section(
*,
fetch_button,
context_box,
state_component,
mode_component,
chart_component,
logs_component,
username_component,
year_component,
prepare_fn,
fetch_fn,
rerender_fn,
):
fetch_button.click(
fn=prepare_fn,
inputs=[username_component, year_component],
outputs=context_box,
show_progress="hidden",
).then(
fn=fetch_fn,
inputs=[state_component, username_component, year_component, mode_component],
outputs=[state_component, context_box, chart_component, logs_component, mode_component],
show_progress="full",
show_progress_on=[chart_component, logs_component],
)
mode_component.change(
fn=rerender_fn,
inputs=[state_component, mode_component],
outputs=chart_component,
show_progress="hidden",
)
# ============================================================
# UI
# ============================================================
year_options = list(range(datetime.now().year, 2017, -1))
with gr.Blocks(title="HF Contributions") as demo:
app_state = gr.State(value=None)
community_state = gr.State(value=None)
section_states = {
section_key: gr.State(value=None)
for section_key in SINGLE_SECTION_CONFIG
}
gr.Markdown("# 🤗 Hugging Face Contributions")
with gr.Row(elem_id="top-controls"):
gr.LoginButton("Sign in with Hugging Face", logout_value="Logout ({})")
username = gr.Textbox(
label="Username / org",
placeholder="Enter a Hugging Face username or org",
scale=3,
)
selected_year = gr.Dropdown(
choices=year_options,
value=year_options[0],
label="Year",
scale=1,
)
run_button = gr.Button("Fetch activity", variant="primary", scale=1)
context_box = gr.HTML(
render_status_badge("Not logged in · Enter a username to get started")
)
title_md = gr.Markdown("## Ready")
with gr.Column(elem_classes="toggles-container"):
visibility_mode = gr.Radio(
choices=[("All repos", "all"), ("Public only", "public")],
value="public",
label="Visibility",
visible="hidden",
interactive=False,
elem_classes=["gh-toggle-radio"],
elem_id="repo-visibility-toggle",
)
zero_counts = {kind: 0 for kind in REPO_KINDS}
metrics_box = gr.HTML(
render_metric_cards(
0,
zero_counts,
zero_counts,
year_options[0],
)
)
all_chart = gr.HTML(render_empty_chart("All activity"))
model_chart = gr.HTML(render_empty_chart("Models"))
dataset_chart = gr.HTML(render_empty_chart("Datasets"))
space_chart = gr.HTML(render_empty_chart("Spaces"))
logs_box = gr.Textbox(label="Logs / Warnings", interactive=False, lines=8)
gr.Markdown("## Community activity")
fetch_community_button = gr.Button("Fetch community activity", variant="secondary")
community_context_box = gr.HTML(
render_status_badge("Not logged in · Enter a username to fetch community activity")
)
with gr.Column(elem_classes="toggles-container"):
pr_view_mode = gr.Radio(
choices=[
("Opened", "opened"),
("Follow-up", "activity"),
("Both", "both"),
],
value="both",
label="Pull Requests view",
visible="hidden",
elem_classes=["gh-toggle-radio"],
elem_id="pr-view-toggle",
)
pr_chart = gr.HTML(render_empty_chart("Pull Requests"))
with gr.Column(elem_classes="toggles-container"):
discussion_view_mode = gr.Radio(
choices=[
("Opened", "opened"),
("Follow-up", "activity"),
("Both", "both"),
],
value="both",
label="Community Discussions view",
visible="hidden",
elem_classes=["gh-toggle-radio"],
elem_id="discussion-view-toggle",
)
discussion_chart = gr.HTML(render_empty_chart("Community Discussions"))
community_logs_box = gr.Textbox(
label="Community Fetch Logs",
interactive=False,
lines=12,
value="Community activity not fetched yet.",
)
single_section_components = {}
for section_key in ("upvotes", "likes", "papers"):
single_section_components[section_key] = build_single_chart_section_ui(section_key)
repo_fetch_outputs = [
app_state,
title_md,
context_box,
visibility_mode,
metrics_box,
all_chart,
model_chart,
dataset_chart,
space_chart,
logs_box,
]
repo_render_outputs = [
title_md,
context_box,
metrics_box,
all_chart,
model_chart,
dataset_chart,
space_chart,
logs_box,
]
community_reset_outputs = [
community_state,
community_context_box,
pr_view_mode,
pr_chart,
discussion_view_mode,
discussion_chart,
community_logs_box,
]
community_fetch_outputs = [
community_state,
community_context_box,
pr_chart,
discussion_chart,
community_logs_box,
pr_view_mode,
discussion_view_mode,
]
single_section_outputs = {}
for section_key, components in single_section_components.items():
single_section_outputs[f"{section_key}_reset"] = [
section_states[section_key],
components["context_box"],
components["view_mode"],
components["chart"],
components["logs_box"],
]
single_section_outputs[f"{section_key}_fetch"] = [
section_states[section_key],
components["context_box"],
components["chart"],
components["logs_box"],
components["view_mode"],
]
demo.load(
fn=hydrate_header_on_load,
inputs=[username, selected_year],
outputs=[username, context_box, community_context_box]
+ [single_section_components[k]["context_box"] for k in ("upvotes", "likes", "papers")],
show_progress="hidden",
)
repo_chain = run_button.click(
fn=prepare_repo_fetch_ui,
inputs=[username, selected_year, visibility_mode],
outputs=[context_box, visibility_mode],
show_progress="hidden",
).then(
fn=fetch_and_render_repo_only,
inputs=[username, selected_year, visibility_mode],
outputs=repo_fetch_outputs,
show_progress="full",
show_progress_on=[
metrics_box,
all_chart,
model_chart,
dataset_chart,
space_chart,
logs_box,
],
).then(
fn=maybe_reset_community_section,
inputs=[community_state, username, selected_year, pr_view_mode, discussion_view_mode],
outputs=community_reset_outputs,
show_progress="hidden",
).then(
fn=maybe_reset_upvotes_section,
inputs=[
section_states["upvotes"],
username,
selected_year,
single_section_components["upvotes"]["view_mode"],
],
outputs=single_section_outputs["upvotes_reset"],
show_progress="hidden",
).then(
fn=maybe_reset_likes_section,
inputs=[
section_states["likes"],
username,
selected_year,
single_section_components["likes"]["view_mode"],
],
outputs=single_section_outputs["likes_reset"],
show_progress="hidden",
).then(
fn=maybe_reset_papers_section,
inputs=[
section_states["papers"],
username,
selected_year,
single_section_components["papers"]["view_mode"],
],
outputs=single_section_outputs["papers_reset"],
show_progress="hidden",
)
visibility_mode.change(
fn=rerender_from_state,
inputs=[app_state, visibility_mode],
outputs=repo_render_outputs,
show_progress="hidden",
)
fetch_community_button.click(
fn=prepare_community_fetch_ui,
inputs=[username, selected_year],
outputs=community_context_box,
show_progress="hidden",
).then(
fn=fetch_and_render_community,
inputs=[community_state, username, selected_year, pr_view_mode, discussion_view_mode],
outputs=community_fetch_outputs,
show_progress="full",
show_progress_on=[
pr_chart,
discussion_chart,
community_logs_box,
],
)
pr_view_mode.change(
fn=rerender_pr_from_state,
inputs=[community_state, pr_view_mode],
outputs=pr_chart,
show_progress="hidden",
)
discussion_view_mode.change(
fn=rerender_discussions_from_state,
inputs=[community_state, discussion_view_mode],
outputs=discussion_chart,
show_progress="hidden",
)
wire_single_chart_section(
fetch_button=single_section_components["upvotes"]["fetch_button"],
context_box=single_section_components["upvotes"]["context_box"],
state_component=section_states["upvotes"],
mode_component=single_section_components["upvotes"]["view_mode"],
chart_component=single_section_components["upvotes"]["chart"],
logs_component=single_section_components["upvotes"]["logs_box"],
username_component=username,
year_component=selected_year,
prepare_fn=prepare_upvotes_fetch_ui,
fetch_fn=fetch_and_render_upvotes_section,
rerender_fn=rerender_upvotes_from_state,
)
wire_single_chart_section(
fetch_button=single_section_components["likes"]["fetch_button"],
context_box=single_section_components["likes"]["context_box"],
state_component=section_states["likes"],
mode_component=single_section_components["likes"]["view_mode"],
chart_component=single_section_components["likes"]["chart"],
logs_component=single_section_components["likes"]["logs_box"],
username_component=username,
year_component=selected_year,
prepare_fn=prepare_likes_fetch_ui,
fetch_fn=fetch_and_render_likes_section,
rerender_fn=rerender_likes_from_state,
)
wire_single_chart_section(
fetch_button=single_section_components["papers"]["fetch_button"],
context_box=single_section_components["papers"]["context_box"],
state_component=section_states["papers"],
mode_component=single_section_components["papers"]["view_mode"],
chart_component=single_section_components["papers"]["chart"],
logs_component=single_section_components["papers"]["logs_box"],
username_component=username,
year_component=selected_year,
prepare_fn=prepare_papers_fetch_ui,
fetch_fn=fetch_and_render_papers_section,
rerender_fn=rerender_papers_from_state,
)
demo.queue().launch(ssr_mode=False, mcp_server=False, css_paths=["styles.css"])