Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Repository Service | |
| This module provides functionality for cloning and managing Git repositories. | |
| """ | |
| import os | |
| import shutil | |
| import tempfile | |
| import logging | |
| import re | |
| from git import Repo | |
| from git.exc import GitCommandError | |
| logger = logging.getLogger(__name__) | |
| class RepositoryService: | |
| """ | |
| Service for cloning and managing Git repositories. | |
| """ | |
| def __init__(self, base_temp_dir=None): | |
| """ | |
| Initialize the RepositoryService. | |
| Args: | |
| base_temp_dir (str, optional): Base directory for temporary repositories. | |
| If None, system temp directory will be used. | |
| """ | |
| self.base_temp_dir = base_temp_dir or tempfile.gettempdir() | |
| self.repos = {} | |
| logger.info(f"Initialized RepositoryService with base temp dir: {self.base_temp_dir}") | |
| def validate_github_url(self, url): | |
| """ | |
| Validate if the provided URL is a valid GitHub repository URL. | |
| Args: | |
| url (str): The GitHub repository URL to validate. | |
| Returns: | |
| bool: True if the URL is valid, False otherwise. | |
| """ | |
| # GitHub URL patterns | |
| patterns = [ | |
| r'^https?://github\.com/[\w.-]+/[\w.-]+(\.git)?$', # https://github.com/user/repo[.git] | |
| r'^git@github\.com:[\w.-]+/[\w.-]+(\.git)?$', # git@github.com:user/repo[.git] | |
| ] | |
| for pattern in patterns: | |
| if re.match(pattern, url): | |
| return True | |
| return False | |
| def normalize_github_url(self, url): | |
| """ | |
| Normalize a GitHub URL to a consistent format. | |
| Args: | |
| url (str): The GitHub repository URL to normalize. | |
| Returns: | |
| str: The normalized URL. | |
| """ | |
| # Convert SSH URL to HTTPS URL | |
| if url.startswith('git@github.com:'): | |
| user_repo = url[len('git@github.com:'):] | |
| if user_repo.endswith('.git'): | |
| user_repo = user_repo[:-4] | |
| return f"https://github.com/{user_repo}" | |
| # Ensure HTTPS URL ends without .git | |
| if url.startswith('http'): | |
| if url.endswith('.git'): | |
| return url[:-4] | |
| return url | |
| def extract_repo_name(self, url): | |
| """ | |
| Extract repository name from a GitHub URL. | |
| Args: | |
| url (str): The GitHub repository URL. | |
| Returns: | |
| str: The repository name. | |
| """ | |
| normalized_url = self.normalize_github_url(url) | |
| return normalized_url.split('/')[-1] | |
| def clone_repository(self, url, branch=None): | |
| """ | |
| Clone a Git repository from the provided URL. | |
| Args: | |
| url (str): The repository URL to clone. | |
| branch (str, optional): The branch to checkout. If None, the default branch is used. | |
| Returns: | |
| str: The path to the cloned repository. | |
| Raises: | |
| ValueError: If the URL is not a valid GitHub repository URL. | |
| GitCommandError: If there's an error during the Git operation. | |
| """ | |
| if not self.validate_github_url(url): | |
| raise ValueError(f"Invalid GitHub repository URL: {url}") | |
| repo_name = self.extract_repo_name(url) | |
| repo_dir = os.path.join(self.base_temp_dir, f"codereview_{repo_name}_{os.urandom(4).hex()}") | |
| logger.info(f"Cloning repository {url} to {repo_dir}") | |
| try: | |
| # Clone the repository | |
| if branch: | |
| repo = Repo.clone_from(url, repo_dir, branch=branch) | |
| logger.info(f"Cloned repository {url} (branch: {branch}) to {repo_dir}") | |
| else: | |
| repo = Repo.clone_from(url, repo_dir) | |
| logger.info(f"Cloned repository {url} (default branch) to {repo_dir}") | |
| # Store the repository instance | |
| self.repos[repo_dir] = repo | |
| return repo_dir | |
| except GitCommandError as e: | |
| logger.error(f"Error cloning repository {url}: {e}") | |
| # Clean up the directory if it was created | |
| if os.path.exists(repo_dir): | |
| shutil.rmtree(repo_dir, ignore_errors=True) | |
| raise | |
| def get_repository_info(self, repo_path): | |
| """ | |
| Get information about a repository. | |
| Args: | |
| repo_path (str): The path to the repository. | |
| Returns: | |
| dict: A dictionary containing repository information. | |
| """ | |
| if repo_path not in self.repos: | |
| try: | |
| self.repos[repo_path] = Repo(repo_path) | |
| except Exception as e: | |
| logger.error(f"Error opening repository at {repo_path}: {e}") | |
| return {} | |
| repo = self.repos[repo_path] | |
| try: | |
| # Get the active branch | |
| try: | |
| active_branch = repo.active_branch.name | |
| except TypeError: | |
| # Detached HEAD state | |
| active_branch = 'HEAD detached' | |
| # Get the latest commit | |
| latest_commit = repo.head.commit | |
| # Get remote URL | |
| try: | |
| remote_url = repo.remotes.origin.url | |
| except AttributeError: | |
| remote_url = 'No remote URL found' | |
| # Get repository size (approximate) | |
| repo_size = sum(os.path.getsize(os.path.join(dirpath, filename)) | |
| for dirpath, _, filenames in os.walk(repo_path) | |
| for filename in filenames) | |
| # Count files | |
| file_count = sum(len(files) for _, _, files in os.walk(repo_path)) | |
| return { | |
| 'path': repo_path, | |
| 'active_branch': active_branch, | |
| 'latest_commit': { | |
| 'hash': latest_commit.hexsha, | |
| 'author': f"{latest_commit.author.name} <{latest_commit.author.email}>", | |
| 'date': latest_commit.committed_datetime.isoformat(), | |
| 'message': latest_commit.message.strip(), | |
| }, | |
| 'remote_url': remote_url, | |
| 'size_bytes': repo_size, | |
| 'file_count': file_count, | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting repository info for {repo_path}: {e}") | |
| return { | |
| 'path': repo_path, | |
| 'error': str(e), | |
| } | |
| def cleanup_repository(self, repo_path): | |
| """ | |
| Clean up a cloned repository. | |
| Args: | |
| repo_path (str): The path to the repository to clean up. | |
| Returns: | |
| bool: True if the cleanup was successful, False otherwise. | |
| """ | |
| logger.info(f"Cleaning up repository at {repo_path}") | |
| # Remove the repository from the tracked repos | |
| if repo_path in self.repos: | |
| del self.repos[repo_path] | |
| # Remove the directory | |
| try: | |
| if os.path.exists(repo_path): | |
| shutil.rmtree(repo_path, ignore_errors=True) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error cleaning up repository at {repo_path}: {e}") | |
| return False | |
| def cleanup_all_repositories(self): | |
| """ | |
| Clean up all cloned repositories. | |
| Returns: | |
| bool: True if all cleanups were successful, False otherwise. | |
| """ | |
| logger.info("Cleaning up all repositories") | |
| success = True | |
| for repo_path in list(self.repos.keys()): | |
| if not self.cleanup_repository(repo_path): | |
| success = False | |
| return success |