Spaces:
Sleeping
Sleeping
| """ | |
| Permission validator for API credentials. | |
| Validates that provided credentials have the necessary permissions/scopes | |
| for the requested actions. | |
| """ | |
| from typing import Optional, Set, Dict, List | |
| import re | |
| class PermissionValidator: | |
| """Validates API permissions before executing actions.""" | |
| def __init__(self): | |
| self.validators = { | |
| 'github': self._validate_github_token, | |
| 'slack': self._validate_slack_token, | |
| 'linear': self._validate_linear_token, | |
| 'notion': self._validate_notion_token, | |
| 'stripe': self._validate_stripe_key, | |
| 'sentry': self._validate_sentry_token, | |
| } | |
| def validate(self, service: str, action: str, credentials: dict, required_scopes: Set[str]) -> Dict: | |
| """ | |
| Validate credentials have required permissions. | |
| Args: | |
| service: Service name (e.g., 'github') | |
| action: Action being performed | |
| credentials: Credential dict (format varies by service) | |
| required_scopes: Set of required permission scopes | |
| Returns: | |
| Dict with validation result: | |
| { | |
| 'valid': bool, | |
| 'has_scopes': Set[str], | |
| 'missing_scopes': Set[str], | |
| 'error': Optional[str], | |
| 'warning': Optional[str] | |
| } | |
| """ | |
| if service not in self.validators: | |
| return { | |
| 'valid': False, | |
| 'error': f"Unknown service: {service}", | |
| 'has_scopes': set(), | |
| 'missing_scopes': required_scopes | |
| } | |
| validator_func = self.validators[service] | |
| return validator_func(credentials, required_scopes, action) | |
| def _validate_github_token(self, credentials: dict, required_scopes: Set[str], action: str) -> Dict: | |
| """ | |
| Validate GitHub token. | |
| In mock mode, we do basic validation. In real mode (Phase 2), | |
| we'd make an API call to check actual scopes. | |
| """ | |
| token = credentials.get('token', '') | |
| # Basic format validation | |
| if not token: | |
| return { | |
| 'valid': False, | |
| 'error': 'Missing GitHub token', | |
| 'has_scopes': set(), | |
| 'missing_scopes': required_scopes | |
| } | |
| # Check token format (ghp_, gho_, ghs_, etc.) | |
| valid_prefixes = ['ghp_', 'gho_', 'ghs_', 'github_pat_'] | |
| if not any(token.startswith(prefix) for prefix in valid_prefixes): | |
| return { | |
| 'valid': False, | |
| 'error': 'Invalid GitHub token format. Expected format: ghp_xxx or github_pat_xxx', | |
| 'has_scopes': set(), | |
| 'missing_scopes': required_scopes, | |
| 'warning': 'Token format appears incorrect' | |
| } | |
| # In Phase 2, we'd actually verify scopes via API | |
| # For now, we'll simulate based on token type | |
| simulated_scopes = self._simulate_github_scopes(token) | |
| missing_scopes = required_scopes - simulated_scopes | |
| if missing_scopes: | |
| return { | |
| 'valid': False, | |
| 'error': 'INSUFFICIENT_PERMISSIONS', | |
| 'has_scopes': simulated_scopes, | |
| 'missing_scopes': missing_scopes, | |
| 'warning': f'Missing required scopes: {", ".join(missing_scopes)}', | |
| 'docs_url': 'https://docs.github.com/en/apps/oauth-apps/building-oauth-apps/scopes-for-oauth-apps' | |
| } | |
| return { | |
| 'valid': True, | |
| 'has_scopes': simulated_scopes, | |
| 'missing_scopes': set(), | |
| 'note': '✓ All required permissions present' | |
| } | |
| def _simulate_github_scopes(self, token: str) -> Set[str]: | |
| """ | |
| Simulate GitHub token scopes based on token type. | |
| In Phase 2, this will be replaced with actual API verification. | |
| """ | |
| # Personal access tokens (ghp_) typically have broader scopes | |
| if token.startswith('ghp_'): | |
| return {'repo', 'user', 'read:org', 'workflow'} | |
| # OAuth tokens (gho_) have limited scopes | |
| elif token.startswith('gho_'): | |
| return {'repo'} | |
| # Fine-grained PATs (github_pat_) - assume repo access | |
| elif token.startswith('github_pat_'): | |
| return {'repo', 'read:org'} | |
| # Default: minimal permissions | |
| return {'public_repo'} | |
| def _validate_slack_token(self, credentials: dict, required_scopes: Set[str], action: str) -> Dict: | |
| """Validate Slack token.""" | |
| token = credentials.get('token', '') | |
| if not token: | |
| return { | |
| 'valid': False, | |
| 'error': 'Missing Slack token', | |
| 'has_scopes': set(), | |
| 'missing_scopes': required_scopes | |
| } | |
| # Slack bot tokens start with xoxb- | |
| # Slack user tokens start with xoxp- | |
| if not (token.startswith('xoxb-') or token.startswith('xoxp-')): | |
| return { | |
| 'valid': False, | |
| 'error': 'Invalid Slack token format. Expected xoxb-xxx (bot) or xoxp-xxx (user)', | |
| 'has_scopes': set(), | |
| 'missing_scopes': required_scopes | |
| } | |
| # Simulate scopes (Phase 2: verify via API) | |
| simulated_scopes = self._simulate_slack_scopes(token) | |
| missing_scopes = required_scopes - simulated_scopes | |
| if missing_scopes: | |
| return { | |
| 'valid': False, | |
| 'error': 'INSUFFICIENT_PERMISSIONS', | |
| 'has_scopes': simulated_scopes, | |
| 'missing_scopes': missing_scopes, | |
| 'warning': f'Missing required scopes: {", ".join(missing_scopes)}', | |
| 'docs_url': 'https://api.slack.com/scopes' | |
| } | |
| return { | |
| 'valid': True, | |
| 'has_scopes': simulated_scopes, | |
| 'missing_scopes': set() | |
| } | |
| def _simulate_slack_scopes(self, token: str) -> Set[str]: | |
| """Simulate Slack token scopes.""" | |
| if token.startswith('xoxb-'): | |
| # Bot tokens typically have these scopes | |
| return {'chat:write', 'channels:history', 'channels:read'} | |
| else: | |
| # User tokens have broader access | |
| return {'chat:write', 'channels:history', 'channels:read', 'channels:manage', 'files:write'} | |
| def _validate_linear_token(self, credentials: dict, required_scopes: Set[str], action: str) -> Dict: | |
| """Validate Linear API key.""" | |
| api_key = credentials.get('api_key', '') | |
| if not api_key: | |
| return { | |
| 'valid': False, | |
| 'error': 'Missing Linear API key', | |
| 'has_scopes': set(), | |
| 'missing_scopes': required_scopes | |
| } | |
| # Linear API keys start with lin_api_ | |
| if not api_key.startswith('lin_api_'): | |
| return { | |
| 'valid': False, | |
| 'error': 'Invalid Linear API key format', | |
| 'has_scopes': set(), | |
| 'missing_scopes': required_scopes, | |
| 'warning': 'Expected format: lin_api_xxx' | |
| } | |
| # Linear has simple read/write permissions | |
| # Assume full access for now | |
| simulated_scopes = {'read', 'write'} | |
| missing_scopes = required_scopes - simulated_scopes | |
| if missing_scopes: | |
| return { | |
| 'valid': False, | |
| 'error': 'INSUFFICIENT_PERMISSIONS', | |
| 'has_scopes': simulated_scopes, | |
| 'missing_scopes': missing_scopes | |
| } | |
| return {'valid': True, 'has_scopes': simulated_scopes, 'missing_scopes': set()} | |
| def _validate_notion_token(self, credentials: dict, required_scopes: Set[str], action: str) -> Dict: | |
| """Validate Notion integration token.""" | |
| token = credentials.get('token', '') | |
| if not token: | |
| return { | |
| 'valid': False, | |
| 'error': 'Missing Notion integration token', | |
| 'has_scopes': set(), | |
| 'missing_scopes': required_scopes | |
| } | |
| # Notion tokens start with secret_ | |
| if not token.startswith('secret_'): | |
| return { | |
| 'valid': False, | |
| 'error': 'Invalid Notion token format', | |
| 'has_scopes': set(), | |
| 'missing_scopes': required_scopes, | |
| 'warning': 'Expected format: secret_xxx' | |
| } | |
| # Assume full permissions (Notion integrations have granular page-level permissions) | |
| simulated_scopes = {'pages:write', 'pages:read', 'databases:read', 'databases:write'} | |
| missing_scopes = required_scopes - simulated_scopes | |
| if missing_scopes: | |
| return { | |
| 'valid': False, | |
| 'error': 'INSUFFICIENT_PERMISSIONS', | |
| 'has_scopes': simulated_scopes, | |
| 'missing_scopes': missing_scopes | |
| } | |
| return {'valid': True, 'has_scopes': simulated_scopes, 'missing_scopes': set()} | |
| def _validate_stripe_key(self, credentials: dict, required_scopes: Set[str], action: str) -> Dict: | |
| """Validate Stripe API key.""" | |
| api_key = credentials.get('api_key', '') | |
| if not api_key: | |
| return { | |
| 'valid': False, | |
| 'error': 'Missing Stripe API key', | |
| 'has_scopes': set(), | |
| 'missing_scopes': required_scopes | |
| } | |
| # Stripe keys: sk_ (secret), pk_ (publishable), rk_ (restricted) | |
| if not any(api_key.startswith(prefix) for prefix in ['sk_test_', 'sk_live_', 'rk_test_', 'rk_live_']): | |
| return { | |
| 'valid': False, | |
| 'error': 'Invalid Stripe API key format', | |
| 'has_scopes': set(), | |
| 'missing_scopes': required_scopes, | |
| 'warning': 'Expected format: sk_test_xxx or sk_live_xxx' | |
| } | |
| # Assume full permissions for secret keys | |
| if api_key.startswith('sk_'): | |
| simulated_scopes = required_scopes # Full access | |
| else: | |
| simulated_scopes = {'customers_read'} # Restricted keys have limited access | |
| missing_scopes = required_scopes - simulated_scopes | |
| if missing_scopes: | |
| return { | |
| 'valid': False, | |
| 'error': 'INSUFFICIENT_PERMISSIONS', | |
| 'has_scopes': simulated_scopes, | |
| 'missing_scopes': missing_scopes, | |
| 'warning': 'Restricted key may not have required permissions' | |
| } | |
| return {'valid': True, 'has_scopes': simulated_scopes, 'missing_scopes': set()} | |
| def _validate_sentry_token(self, credentials: dict, required_scopes: Set[str], action: str) -> Dict: | |
| """Validate Sentry auth token.""" | |
| token = credentials.get('token', '') | |
| if not token: | |
| return { | |
| 'valid': False, | |
| 'error': 'Missing Sentry auth token', | |
| 'has_scopes': set(), | |
| 'missing_scopes': required_scopes | |
| } | |
| # Sentry tokens are typically 64-character hex strings | |
| if len(token) != 64 or not all(c in '0123456789abcdef' for c in token.lower()): | |
| return { | |
| 'valid': False, | |
| 'error': 'Invalid Sentry token format', | |
| 'has_scopes': set(), | |
| 'missing_scopes': required_scopes, | |
| 'warning': 'Expected 64-character hexadecimal string' | |
| } | |
| # Assume full project permissions | |
| simulated_scopes = {'project:read', 'project:write', 'event:read', 'event:write'} | |
| missing_scopes = required_scopes - simulated_scopes | |
| if missing_scopes: | |
| return { | |
| 'valid': False, | |
| 'error': 'INSUFFICIENT_PERMISSIONS', | |
| 'has_scopes': simulated_scopes, | |
| 'missing_scopes': missing_scopes | |
| } | |
| return {'valid': True, 'has_scopes': simulated_scopes, 'missing_scopes': set()} | |