mcpmark / src /mcp_services /github /github_login_helper.py
haochengsama's picture
Add files using upload-large-folder tool
a2ec7b6 verified
Raw
History Blame Contribute Delete
7.67 kB
"""
GitHub Login Helper for MCPMark
================================
This module provides GitHub token authentication and validation utilities.
Unlike browser-based services, GitHub uses token-based authentication.
"""
import json
import requests
from pathlib import Path
from typing import Optional, Dict, Any
from src.base.login_helper import BaseLoginHelper
from src.logger import get_logger
logger = get_logger(__name__)
class GitHubLoginHelper(BaseLoginHelper):
"""
Utility helper for GitHub token authentication and validation.
"""
def __init__(
self,
token: Optional[str] = None,
state_path: Optional[Path] = None,
) -> None:
"""
Initialize the GitHub login helper.
Args:
token: GitHub Personal Access Token
state_path: Path to save authentication state
"""
self.token = token
self.state_path = state_path or Path.home() / ".mcpmark" / "github_auth.json"
# Ensure state directory exists
self.state_path.parent.mkdir(parents=True, exist_ok=True)
def login_and_save_state(self, **kwargs) -> bool:
"""
Validate GitHub token and save authentication state.
Returns:
bool: True if authentication successful, False otherwise
"""
if not self.token:
logger.error("No GitHub token provided")
return False
try:
# Validate token by making an authenticated request
session = requests.Session()
session.headers.update(
{
"Authorization": f"Bearer {self.token}",
"Accept": "application/vnd.github.v3+json",
"X-GitHub-Api-Version": "2022-11-28",
"User-Agent": "MCPMark/1.0",
}
)
# Get user information
response = session.get("https://api.github.com/user")
if response.status_code != 200:
logger.error(
f"GitHub authentication failed: {response.status_code} {response.text}"
)
return False
user_info = response.json()
logger.info(
f"GitHub authentication successful for user: {user_info['login']}"
)
# Get token scopes
token_scopes = self._get_token_scopes(session)
# Save authentication state
auth_state = {
"user": user_info,
"token_scopes": token_scopes,
"authenticated_at": self._get_current_timestamp(),
}
self._save_auth_state(auth_state)
# Verify required permissions
if not self._verify_required_permissions(token_scopes):
logger.warning("GitHub token may not have all required permissions")
return False
return True
except Exception as e:
logger.error(f"GitHub authentication error: {e}")
return False
def _get_token_scopes(self, session: requests.Session) -> list:
"""Get the scopes available to the current token."""
try:
response = session.get("https://api.github.com/user")
scopes_header = response.headers.get("X-OAuth-Scopes", "")
if scopes_header:
return [
scope.strip() for scope in scopes_header.split(",") if scope.strip()
]
return []
except Exception as e:
logger.warning(f"Could not determine token scopes: {e}")
return []
def _verify_required_permissions(self, scopes: list) -> bool:
"""
Verify that the token has the minimum required permissions.
For MCPMark GitHub tasks, we typically need:
- repo (for repository access)
- read:user (for user information)
"""
required_scopes = ["repo"] # Minimum requirement
recommended_scopes = ["repo", "read:user", "read:org"]
has_required = all(scope in scopes for scope in required_scopes)
if not has_required:
logger.error(
f"Token missing required scopes. Required: {required_scopes}, Available: {scopes}"
)
return False
has_recommended = all(scope in scopes for scope in recommended_scopes)
if not has_recommended:
logger.warning(
f"Token missing some recommended scopes. Recommended: {recommended_scopes}, Available: {scopes}"
)
return True
def _save_auth_state(self, auth_state: Dict[str, Any]):
"""Save authentication state to local file."""
try:
with open(self.state_path, "w") as f:
json.dump(auth_state, f, indent=2, default=str)
# Set restrictive permissions (user read/write only)
self.state_path.chmod(0o600)
logger.info(f"Authentication state saved to: {self.state_path}")
except Exception as e:
logger.error(f"Failed to save authentication state: {e}")
def _get_current_timestamp(self) -> str:
"""Get current timestamp in ISO format."""
from datetime import datetime
return datetime.utcnow().isoformat() + "Z"
def get_saved_auth_state(self) -> Optional[Dict[str, Any]]:
"""Load and return saved authentication state."""
try:
if self.state_path.exists():
with open(self.state_path, "r") as f:
return json.load(f)
except Exception as e:
logger.error(f"Failed to load authentication state: {e}")
return None
def is_token_valid(self) -> bool:
"""Check if the current token is still valid."""
if not self.token:
return False
try:
session = requests.Session()
session.headers.update(
{
"Authorization": f"Bearer {self.token}",
"Accept": "application/vnd.github.v3+json",
}
)
response = session.get("https://api.github.com/user")
return response.status_code == 200
except Exception:
return False
def get_rate_limit_info(self) -> Dict[str, Any]:
"""Get current rate limit information for the token."""
if not self.token:
return {}
try:
session = requests.Session()
session.headers.update(
{
"Authorization": f"Bearer {self.token}",
"Accept": "application/vnd.github.v3+json",
}
)
response = session.get("https://api.github.com/rate_limit")
if response.status_code == 200:
return response.json()
except Exception as e:
logger.warning(f"Failed to get rate limit info: {e}")
return {}
def test_repository_access(self, owner: str, repo: str) -> bool:
"""Test if the token has access to a specific repository."""
if not self.token:
return False
try:
session = requests.Session()
session.headers.update(
{
"Authorization": f"Bearer {self.token}",
"Accept": "application/vnd.github.v3+json",
}
)
response = session.get(f"https://api.github.com/repos/{owner}/{repo}")
return response.status_code == 200
except Exception:
return False