import datetime import os import subprocess import numpy as np import requests from loguru import logger from typer import Typer from mlops_mentor.common import github_headers as headers from mlops_mentor.common.models import RepoInfo from mlops_mentor.scraper.models import RepoContent, Report, RepoStats def create_activity_matrix( commits: list, max_delta: int = 5, min_delta: int = 1, ) -> list[list[int]]: """Creates an activity matrix from the commits.""" commit_times = [ datetime.datetime.fromisoformat(commit["commit"]["committer"]["date"][:-1]) for commit in commits ] commit_times.sort() start_time = commit_times[0] end_time = max( start_time + datetime.timedelta(weeks=min_delta), min(start_time + datetime.timedelta(weeks=max_delta), commit_times[-1]), ) num_days = (end_time - start_time).days + 1 # include last day commit_matrix = np.zeros((num_days, 24), dtype=int) for commit_time in commit_times: if start_time <= commit_time <= end_time: day_index = (commit_time - start_time).days hour_index = commit_time.hour commit_matrix[day_index, hour_index] += 1 return commit_matrix.tolist() def scrape(repo_link: str) -> RepoStats: repo = RepoInfo(repo_url=repo_link) if not repo.is_accessible: logger.error(f"Repository {repo.repo_url} is not accessible.") repo_stats = RepoStats( num_contributors=None, num_prs=None, num_commits_to_main=None, average_commit_length_to_main=None, latest_commit=None, average_commit_length=None, contributions_per_contributor=None, total_commits=None, activity_matrix=None, num_docker_files=None, num_python_files=None, num_workflow_files=None, has_requirements_file=None, has_cloudbuild=None, using_dvc=None, repo_size=None, readme_length=None, actions_passing=None, num_warnings=None, ) else: logger.info(f"Scraping repository {repo.repo_url}") contributors = repo.contributors num_contributors = len(contributors) prs = repo.prs num_prs = len(prs) commits = repo.commits num_commits_to_main = len(commits) commit_messages = [c["commit"]["message"] for c in commits] average_commit_length_to_main = sum([len(c) for c in commit_messages]) / len( commit_messages ) latest_commit = commits[0]["commit"]["author"]["date"] merged_prs = [p["number"] for p in prs if p["merged_at"] is not None] for pr_num in merged_prs: pr_commits: list[dict] = requests.get( f"{repo.repo_api}/pulls/{pr_num}/commits", headers=headers, timeout=100, ).json() commit_messages += [c["commit"]["message"] for c in pr_commits] for commit in pr_commits: for contributor in contributors: commit_author = commit.get("author") # GitHub account info commit_committer = commit.get("committer") # GitHub account info commit_author_name = commit["commit"]["author"]["name"] commit_committer_name = commit["commit"]["committer"]["name"] matches = ( ( commit_author and commit_author.get("login") == contributor.login ) or ( commit_author_name and commit_author_name.lower() == contributor.login.lower() ) or ( commit_committer and commit_committer.get("login") == contributor.login ) or ( commit_committer_name and commit_committer_name.lower() == contributor.login.lower() ) ) if matches: contributor.commits_pr += 1 break commits += pr_commits activity_matrix = create_activity_matrix(commits, max_delta=3, min_delta=1) average_commit_length = sum([len(c) for c in commit_messages]) / len( commit_messages ) contributions_per_contributor = [c.total_commits for c in contributors] total_commits = sum(contributions_per_contributor) repo_content = RepoContent( repo_api=repo.repo_api, default_branch=repo.default_branch, ) num_docker_files = repo_content.num_docker_files num_python_files = repo_content.num_python_files num_workflow_files = repo_content.num_workflow_files has_requirements_file = repo_content.has_requirements_file has_cloudbuild = repo_content.has_cloudbuild using_dvc = repo_content.using_dvc repo_size = repo_content.repo_size readme_length = repo_content.readme_length actions_passing = repo_content.actions_passing report = Report( repo_api=repo.repo_api, default_branch=repo.default_branch, ) num_warnings = report.check_answers repo_stats = RepoStats( num_contributors=num_contributors, num_prs=num_prs, num_commits_to_main=num_commits_to_main, average_commit_length_to_main=average_commit_length_to_main, latest_commit=latest_commit, average_commit_length=average_commit_length, contributions_per_contributor=contributions_per_contributor, total_commits=total_commits, activity_matrix=activity_matrix, num_docker_files=num_docker_files, num_python_files=num_python_files, num_workflow_files=num_workflow_files, has_requirements_file=has_requirements_file, has_cloudbuild=has_cloudbuild, using_dvc=using_dvc, repo_size=repo_size, readme_length=readme_length, actions_passing=actions_passing, num_warnings=num_warnings, ) logger.info(f"Scraping of repository {repo.repo_url} completed successfully") logger.info(f"Repo stats: {repo_stats}") return repo_stats def clone(repo_link: str, base_dir: str = "cloned_repos"): """Clones the repositories of the groups.""" if not os.path.exists(base_dir): os.makedirs(base_dir) repo_url = repo_link # Create a directory for the group if it doesn't exist group_dir = os.path.join(base_dir, f"group_{'.'.join(repo_link.split('/')[-2:])}") if not os.path.exists(group_dir): os.makedirs(group_dir) # Extract the repository name from the URL repo_name = repo_url.split("/")[-1].replace(".git", "") # Create a directory for the repository repo_dir = os.path.join(group_dir, repo_name) if not os.path.exists(repo_dir): os.makedirs(repo_dir) # Clone the repository try: subprocess.run(["git", "clone", repo_url, repo_dir], check=True) logger.info(f"Successfully cloned {repo_url} into {repo_dir}") except subprocess.CalledProcessError as e: logger.info(f"Failed to clone {repo_url}: {e}") if __name__ == "__main__": app = Typer() app.command()(scrape) app.command()(clone) app()