hf-contributions-graph / hf_contributions.py
efecelik's picture
Initial release: GitHub-style contribution graph for HF users
20a2ac1
"""
Hugging Face Hub Contributions Fetcher
Fetches contribution data (commits) from a user's models, datasets, and spaces
using the Hugging Face Hub API.
"""
from huggingface_hub import HfApi
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Optional
from dataclasses import dataclass
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ContributionStats:
"""Statistics about a user's contributions."""
total_commits: int
total_repos: int
models_count: int
datasets_count: int
spaces_count: int
longest_streak: int
current_streak: int
most_active_day: Optional[str]
most_active_count: int
contributions_by_date: dict[str, int]
contributions_by_repo: dict[str, list[dict]]
def get_date_range(days: int = 365) -> list[str]:
"""Generate a list of date strings for the past N days."""
today = datetime.now().date()
return [
(today - timedelta(days=i)).strftime("%Y-%m-%d")
for i in range(days - 1, -1, -1)
]
def calculate_streaks(contributions: dict[str, int], days: int = 365) -> tuple[int, int]:
"""Calculate longest and current contribution streaks."""
dates = get_date_range(days)
longest_streak = 0
current_streak = 0
temp_streak = 0
for date in dates:
if contributions.get(date, 0) > 0:
temp_streak += 1
longest_streak = max(longest_streak, temp_streak)
else:
temp_streak = 0
# Calculate current streak (from today backwards)
for date in reversed(dates):
if contributions.get(date, 0) > 0:
current_streak += 1
else:
break
return longest_streak, current_streak
def fetch_user_contributions(
username: str,
token: Optional[str] = None,
days: int = 365,
progress_callback=None
) -> ContributionStats:
"""
Fetch all contributions for a Hugging Face user.
Args:
username: The HF username to fetch contributions for
token: Optional HF token for accessing private repos
days: Number of days to look back (default 365)
progress_callback: Optional callback for progress updates
Returns:
ContributionStats object with all contribution data
"""
api = HfApi(token=token)
contributions_by_date = defaultdict(int)
contributions_by_repo = defaultdict(list)
cutoff_date = datetime.now() - timedelta(days=days)
# Collect all repos
repos = []
def update_progress(message: str):
if progress_callback:
progress_callback(message)
logger.info(message)
# Fetch models
update_progress(f"Fetching models for {username}...")
try:
models = list(api.list_models(author=username))
for model in models:
repos.append(("model", model.id))
except Exception as e:
logger.warning(f"Error fetching models: {e}")
models = []
# Fetch datasets
update_progress(f"Fetching datasets for {username}...")
try:
datasets = list(api.list_datasets(author=username))
for dataset in datasets:
repos.append(("dataset", dataset.id))
except Exception as e:
logger.warning(f"Error fetching datasets: {e}")
datasets = []
# Fetch spaces
update_progress(f"Fetching spaces for {username}...")
try:
spaces = list(api.list_spaces(author=username))
for space in spaces:
repos.append(("space", space.id))
except Exception as e:
logger.warning(f"Error fetching spaces: {e}")
spaces = []
total_repos = len(repos)
update_progress(f"Found {total_repos} repositories. Fetching commits...")
# Fetch commits for each repo
for idx, (repo_type, repo_id) in enumerate(repos):
update_progress(f"Processing {idx + 1}/{total_repos}: {repo_id}")
try:
commits = list(api.list_repo_commits(repo_id, repo_type=repo_type))
for commit in commits:
# Handle different date formats
if hasattr(commit, 'created_at'):
commit_date = commit.created_at
elif hasattr(commit, 'date'):
commit_date = commit.date
else:
continue
# Convert to datetime if string
if isinstance(commit_date, str):
try:
commit_date = datetime.fromisoformat(commit_date.replace('Z', '+00:00'))
except:
continue
# Skip commits older than cutoff
if commit_date.replace(tzinfo=None) < cutoff_date:
continue
date_str = commit_date.strftime("%Y-%m-%d")
contributions_by_date[date_str] += 1
contributions_by_repo[repo_id].append({
"date": date_str,
"message": getattr(commit, 'title', getattr(commit, 'message', 'No message')),
"repo_type": repo_type
})
except Exception as e:
logger.warning(f"Error fetching commits for {repo_id}: {e}")
continue
# Calculate statistics
total_commits = sum(contributions_by_date.values())
longest_streak, current_streak = calculate_streaks(contributions_by_date, days)
most_active_day = None
most_active_count = 0
if contributions_by_date:
most_active_day = max(contributions_by_date, key=contributions_by_date.get)
most_active_count = contributions_by_date[most_active_day]
return ContributionStats(
total_commits=total_commits,
total_repos=total_repos,
models_count=len(models),
datasets_count=len(datasets),
spaces_count=len(spaces),
longest_streak=longest_streak,
current_streak=current_streak,
most_active_day=most_active_day,
most_active_count=most_active_count,
contributions_by_date=dict(contributions_by_date),
contributions_by_repo=dict(contributions_by_repo)
)
if __name__ == "__main__":
# Test with a known active user
stats = fetch_user_contributions("huggingface")
print(f"Total commits: {stats.total_commits}")
print(f"Total repos: {stats.total_repos}")
print(f"Longest streak: {stats.longest_streak} days")