| """ |
| GitHub Login Helper for MCPMark |
| ================================ |
| |
| This module provides GitHub token authentication and validation utilities. |
| Unlike browser-based services, GitHub uses token-based authentication. |
| """ |
|
|
| import json |
| import requests |
| from pathlib import Path |
| from typing import Optional, Dict, Any |
|
|
| from src.base.login_helper import BaseLoginHelper |
| from src.logger import get_logger |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| class GitHubLoginHelper(BaseLoginHelper): |
| """ |
| Utility helper for GitHub token authentication and validation. |
| """ |
|
|
| def __init__( |
| self, |
| token: Optional[str] = None, |
| state_path: Optional[Path] = None, |
| ) -> None: |
| """ |
| Initialize the GitHub login helper. |
| |
| Args: |
| token: GitHub Personal Access Token |
| state_path: Path to save authentication state |
| """ |
| self.token = token |
| self.state_path = state_path or Path.home() / ".mcpmark" / "github_auth.json" |
|
|
| |
| self.state_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
| def login_and_save_state(self, **kwargs) -> bool: |
| """ |
| Validate GitHub token and save authentication state. |
| |
| Returns: |
| bool: True if authentication successful, False otherwise |
| """ |
| if not self.token: |
| logger.error("No GitHub token provided") |
| return False |
|
|
| try: |
| |
| session = requests.Session() |
| session.headers.update( |
| { |
| "Authorization": f"Bearer {self.token}", |
| "Accept": "application/vnd.github.v3+json", |
| "X-GitHub-Api-Version": "2022-11-28", |
| "User-Agent": "MCPMark/1.0", |
| } |
| ) |
|
|
| |
| response = session.get("https://api.github.com/user") |
|
|
| if response.status_code != 200: |
| logger.error( |
| f"GitHub authentication failed: {response.status_code} {response.text}" |
| ) |
| return False |
|
|
| user_info = response.json() |
| logger.info( |
| f"GitHub authentication successful for user: {user_info['login']}" |
| ) |
|
|
| |
| token_scopes = self._get_token_scopes(session) |
|
|
| |
| auth_state = { |
| "user": user_info, |
| "token_scopes": token_scopes, |
| "authenticated_at": self._get_current_timestamp(), |
| } |
| self._save_auth_state(auth_state) |
|
|
| |
| if not self._verify_required_permissions(token_scopes): |
| logger.warning("GitHub token may not have all required permissions") |
| return False |
|
|
| return True |
|
|
| except Exception as e: |
| logger.error(f"GitHub authentication error: {e}") |
| return False |
|
|
| def _get_token_scopes(self, session: requests.Session) -> list: |
| """Get the scopes available to the current token.""" |
| try: |
| response = session.get("https://api.github.com/user") |
| scopes_header = response.headers.get("X-OAuth-Scopes", "") |
| if scopes_header: |
| return [ |
| scope.strip() for scope in scopes_header.split(",") if scope.strip() |
| ] |
| return [] |
| except Exception as e: |
| logger.warning(f"Could not determine token scopes: {e}") |
| return [] |
|
|
| def _verify_required_permissions(self, scopes: list) -> bool: |
| """ |
| Verify that the token has the minimum required permissions. |
| |
| For MCPMark GitHub tasks, we typically need: |
| - repo (for repository access) |
| - read:user (for user information) |
| """ |
| required_scopes = ["repo"] |
| recommended_scopes = ["repo", "read:user", "read:org"] |
|
|
| has_required = all(scope in scopes for scope in required_scopes) |
| if not has_required: |
| logger.error( |
| f"Token missing required scopes. Required: {required_scopes}, Available: {scopes}" |
| ) |
| return False |
|
|
| has_recommended = all(scope in scopes for scope in recommended_scopes) |
| if not has_recommended: |
| logger.warning( |
| f"Token missing some recommended scopes. Recommended: {recommended_scopes}, Available: {scopes}" |
| ) |
|
|
| return True |
|
|
| def _save_auth_state(self, auth_state: Dict[str, Any]): |
| """Save authentication state to local file.""" |
| try: |
| with open(self.state_path, "w") as f: |
| json.dump(auth_state, f, indent=2, default=str) |
|
|
| |
| self.state_path.chmod(0o600) |
| logger.info(f"Authentication state saved to: {self.state_path}") |
|
|
| except Exception as e: |
| logger.error(f"Failed to save authentication state: {e}") |
|
|
| def _get_current_timestamp(self) -> str: |
| """Get current timestamp in ISO format.""" |
| from datetime import datetime |
|
|
| return datetime.utcnow().isoformat() + "Z" |
|
|
| def get_saved_auth_state(self) -> Optional[Dict[str, Any]]: |
| """Load and return saved authentication state.""" |
| try: |
| if self.state_path.exists(): |
| with open(self.state_path, "r") as f: |
| return json.load(f) |
| except Exception as e: |
| logger.error(f"Failed to load authentication state: {e}") |
| return None |
|
|
| def is_token_valid(self) -> bool: |
| """Check if the current token is still valid.""" |
| if not self.token: |
| return False |
|
|
| try: |
| session = requests.Session() |
| session.headers.update( |
| { |
| "Authorization": f"Bearer {self.token}", |
| "Accept": "application/vnd.github.v3+json", |
| } |
| ) |
|
|
| response = session.get("https://api.github.com/user") |
| return response.status_code == 200 |
|
|
| except Exception: |
| return False |
|
|
| def get_rate_limit_info(self) -> Dict[str, Any]: |
| """Get current rate limit information for the token.""" |
| if not self.token: |
| return {} |
|
|
| try: |
| session = requests.Session() |
| session.headers.update( |
| { |
| "Authorization": f"Bearer {self.token}", |
| "Accept": "application/vnd.github.v3+json", |
| } |
| ) |
|
|
| response = session.get("https://api.github.com/rate_limit") |
| if response.status_code == 200: |
| return response.json() |
|
|
| except Exception as e: |
| logger.warning(f"Failed to get rate limit info: {e}") |
|
|
| return {} |
|
|
| def test_repository_access(self, owner: str, repo: str) -> bool: |
| """Test if the token has access to a specific repository.""" |
| if not self.token: |
| return False |
|
|
| try: |
| session = requests.Session() |
| session.headers.update( |
| { |
| "Authorization": f"Bearer {self.token}", |
| "Accept": "application/vnd.github.v3+json", |
| } |
| ) |
|
|
| response = session.get(f"https://api.github.com/repos/{owner}/{repo}") |
| return response.status_code == 200 |
|
|
| except Exception: |
| return False |
|
|