MCPilot / src /permissions /validator.py
girish-hari's picture
checking-in the project source code
2358888
"""
Permission validator for API credentials.
Validates that provided credentials have the necessary permissions/scopes
for the requested actions.
"""
from typing import Optional, Set, Dict, List
import re
class PermissionValidator:
"""Validates API permissions before executing actions."""
def __init__(self):
self.validators = {
'github': self._validate_github_token,
'slack': self._validate_slack_token,
'linear': self._validate_linear_token,
'notion': self._validate_notion_token,
'stripe': self._validate_stripe_key,
'sentry': self._validate_sentry_token,
}
def validate(self, service: str, action: str, credentials: dict, required_scopes: Set[str]) -> Dict:
"""
Validate credentials have required permissions.
Args:
service: Service name (e.g., 'github')
action: Action being performed
credentials: Credential dict (format varies by service)
required_scopes: Set of required permission scopes
Returns:
Dict with validation result:
{
'valid': bool,
'has_scopes': Set[str],
'missing_scopes': Set[str],
'error': Optional[str],
'warning': Optional[str]
}
"""
if service not in self.validators:
return {
'valid': False,
'error': f"Unknown service: {service}",
'has_scopes': set(),
'missing_scopes': required_scopes
}
validator_func = self.validators[service]
return validator_func(credentials, required_scopes, action)
def _validate_github_token(self, credentials: dict, required_scopes: Set[str], action: str) -> Dict:
"""
Validate GitHub token.
In mock mode, we do basic validation. In real mode (Phase 2),
we'd make an API call to check actual scopes.
"""
token = credentials.get('token', '')
# Basic format validation
if not token:
return {
'valid': False,
'error': 'Missing GitHub token',
'has_scopes': set(),
'missing_scopes': required_scopes
}
# Check token format (ghp_, gho_, ghs_, etc.)
valid_prefixes = ['ghp_', 'gho_', 'ghs_', 'github_pat_']
if not any(token.startswith(prefix) for prefix in valid_prefixes):
return {
'valid': False,
'error': 'Invalid GitHub token format. Expected format: ghp_xxx or github_pat_xxx',
'has_scopes': set(),
'missing_scopes': required_scopes,
'warning': 'Token format appears incorrect'
}
# In Phase 2, we'd actually verify scopes via API
# For now, we'll simulate based on token type
simulated_scopes = self._simulate_github_scopes(token)
missing_scopes = required_scopes - simulated_scopes
if missing_scopes:
return {
'valid': False,
'error': 'INSUFFICIENT_PERMISSIONS',
'has_scopes': simulated_scopes,
'missing_scopes': missing_scopes,
'warning': f'Missing required scopes: {", ".join(missing_scopes)}',
'docs_url': 'https://docs.github.com/en/apps/oauth-apps/building-oauth-apps/scopes-for-oauth-apps'
}
return {
'valid': True,
'has_scopes': simulated_scopes,
'missing_scopes': set(),
'note': '✓ All required permissions present'
}
def _simulate_github_scopes(self, token: str) -> Set[str]:
"""
Simulate GitHub token scopes based on token type.
In Phase 2, this will be replaced with actual API verification.
"""
# Personal access tokens (ghp_) typically have broader scopes
if token.startswith('ghp_'):
return {'repo', 'user', 'read:org', 'workflow'}
# OAuth tokens (gho_) have limited scopes
elif token.startswith('gho_'):
return {'repo'}
# Fine-grained PATs (github_pat_) - assume repo access
elif token.startswith('github_pat_'):
return {'repo', 'read:org'}
# Default: minimal permissions
return {'public_repo'}
def _validate_slack_token(self, credentials: dict, required_scopes: Set[str], action: str) -> Dict:
"""Validate Slack token."""
token = credentials.get('token', '')
if not token:
return {
'valid': False,
'error': 'Missing Slack token',
'has_scopes': set(),
'missing_scopes': required_scopes
}
# Slack bot tokens start with xoxb-
# Slack user tokens start with xoxp-
if not (token.startswith('xoxb-') or token.startswith('xoxp-')):
return {
'valid': False,
'error': 'Invalid Slack token format. Expected xoxb-xxx (bot) or xoxp-xxx (user)',
'has_scopes': set(),
'missing_scopes': required_scopes
}
# Simulate scopes (Phase 2: verify via API)
simulated_scopes = self._simulate_slack_scopes(token)
missing_scopes = required_scopes - simulated_scopes
if missing_scopes:
return {
'valid': False,
'error': 'INSUFFICIENT_PERMISSIONS',
'has_scopes': simulated_scopes,
'missing_scopes': missing_scopes,
'warning': f'Missing required scopes: {", ".join(missing_scopes)}',
'docs_url': 'https://api.slack.com/scopes'
}
return {
'valid': True,
'has_scopes': simulated_scopes,
'missing_scopes': set()
}
def _simulate_slack_scopes(self, token: str) -> Set[str]:
"""Simulate Slack token scopes."""
if token.startswith('xoxb-'):
# Bot tokens typically have these scopes
return {'chat:write', 'channels:history', 'channels:read'}
else:
# User tokens have broader access
return {'chat:write', 'channels:history', 'channels:read', 'channels:manage', 'files:write'}
def _validate_linear_token(self, credentials: dict, required_scopes: Set[str], action: str) -> Dict:
"""Validate Linear API key."""
api_key = credentials.get('api_key', '')
if not api_key:
return {
'valid': False,
'error': 'Missing Linear API key',
'has_scopes': set(),
'missing_scopes': required_scopes
}
# Linear API keys start with lin_api_
if not api_key.startswith('lin_api_'):
return {
'valid': False,
'error': 'Invalid Linear API key format',
'has_scopes': set(),
'missing_scopes': required_scopes,
'warning': 'Expected format: lin_api_xxx'
}
# Linear has simple read/write permissions
# Assume full access for now
simulated_scopes = {'read', 'write'}
missing_scopes = required_scopes - simulated_scopes
if missing_scopes:
return {
'valid': False,
'error': 'INSUFFICIENT_PERMISSIONS',
'has_scopes': simulated_scopes,
'missing_scopes': missing_scopes
}
return {'valid': True, 'has_scopes': simulated_scopes, 'missing_scopes': set()}
def _validate_notion_token(self, credentials: dict, required_scopes: Set[str], action: str) -> Dict:
"""Validate Notion integration token."""
token = credentials.get('token', '')
if not token:
return {
'valid': False,
'error': 'Missing Notion integration token',
'has_scopes': set(),
'missing_scopes': required_scopes
}
# Notion tokens start with secret_
if not token.startswith('secret_'):
return {
'valid': False,
'error': 'Invalid Notion token format',
'has_scopes': set(),
'missing_scopes': required_scopes,
'warning': 'Expected format: secret_xxx'
}
# Assume full permissions (Notion integrations have granular page-level permissions)
simulated_scopes = {'pages:write', 'pages:read', 'databases:read', 'databases:write'}
missing_scopes = required_scopes - simulated_scopes
if missing_scopes:
return {
'valid': False,
'error': 'INSUFFICIENT_PERMISSIONS',
'has_scopes': simulated_scopes,
'missing_scopes': missing_scopes
}
return {'valid': True, 'has_scopes': simulated_scopes, 'missing_scopes': set()}
def _validate_stripe_key(self, credentials: dict, required_scopes: Set[str], action: str) -> Dict:
"""Validate Stripe API key."""
api_key = credentials.get('api_key', '')
if not api_key:
return {
'valid': False,
'error': 'Missing Stripe API key',
'has_scopes': set(),
'missing_scopes': required_scopes
}
# Stripe keys: sk_ (secret), pk_ (publishable), rk_ (restricted)
if not any(api_key.startswith(prefix) for prefix in ['sk_test_', 'sk_live_', 'rk_test_', 'rk_live_']):
return {
'valid': False,
'error': 'Invalid Stripe API key format',
'has_scopes': set(),
'missing_scopes': required_scopes,
'warning': 'Expected format: sk_test_xxx or sk_live_xxx'
}
# Assume full permissions for secret keys
if api_key.startswith('sk_'):
simulated_scopes = required_scopes # Full access
else:
simulated_scopes = {'customers_read'} # Restricted keys have limited access
missing_scopes = required_scopes - simulated_scopes
if missing_scopes:
return {
'valid': False,
'error': 'INSUFFICIENT_PERMISSIONS',
'has_scopes': simulated_scopes,
'missing_scopes': missing_scopes,
'warning': 'Restricted key may not have required permissions'
}
return {'valid': True, 'has_scopes': simulated_scopes, 'missing_scopes': set()}
def _validate_sentry_token(self, credentials: dict, required_scopes: Set[str], action: str) -> Dict:
"""Validate Sentry auth token."""
token = credentials.get('token', '')
if not token:
return {
'valid': False,
'error': 'Missing Sentry auth token',
'has_scopes': set(),
'missing_scopes': required_scopes
}
# Sentry tokens are typically 64-character hex strings
if len(token) != 64 or not all(c in '0123456789abcdef' for c in token.lower()):
return {
'valid': False,
'error': 'Invalid Sentry token format',
'has_scopes': set(),
'missing_scopes': required_scopes,
'warning': 'Expected 64-character hexadecimal string'
}
# Assume full project permissions
simulated_scopes = {'project:read', 'project:write', 'event:read', 'event:write'}
missing_scopes = required_scopes - simulated_scopes
if missing_scopes:
return {
'valid': False,
'error': 'INSUFFICIENT_PERMISSIONS',
'has_scopes': simulated_scopes,
'missing_scopes': missing_scopes
}
return {'valid': True, 'has_scopes': simulated_scopes, 'missing_scopes': set()}