Spaces:
Running
Running
| """ | |
| core/authorization.py - Scan Authorization Management | |
| Track which targets you're authorized to scan for legal compliance | |
| """ | |
| import json | |
| from pathlib import Path | |
| from datetime import datetime, timedelta | |
| from typing import Tuple, Optional, List, Dict | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class AuthorizationManager: | |
| """Manage scan authorizations for legal compliance""" | |
| def __init__(self, auth_file: str = "authorizations.json"): | |
| """ | |
| Initialize authorization manager | |
| Args: | |
| auth_file: Path to authorization storage file | |
| """ | |
| self.auth_file = Path(auth_file) | |
| self.authorizations = self._load_authorizations() | |
| def _load_authorizations(self) -> Dict: | |
| """Load authorizations from file""" | |
| if self.auth_file.exists(): | |
| try: | |
| with open(self.auth_file) as f: | |
| return json.load(f) | |
| except Exception as e: | |
| logger.error(f"Failed to load authorizations: {e}") | |
| return {} | |
| def _save_authorizations(self): | |
| """Save authorizations to file""" | |
| try: | |
| self.auth_file.parent.mkdir(parents=True, exist_ok=True) | |
| with open(self.auth_file, 'w') as f: | |
| json.dump(self.authorizations, f, indent=2, default=str) | |
| except Exception as e: | |
| logger.error(f"Failed to save authorizations: {e}") | |
| def add_authorization( | |
| self, | |
| domain: str, | |
| email: str, | |
| days_valid: int = 30, | |
| proof_path: str = "", | |
| notes: str = "" | |
| ) -> bool: | |
| """ | |
| Add authorized target with proof of permission | |
| Args: | |
| domain: Domain to authorize (e.g., "example.com" or "*.example.com") | |
| email: Contact email for authorization | |
| days_valid: Number of days authorization is valid | |
| proof_path: Path to written authorization document | |
| notes: Additional notes | |
| Returns: | |
| bool: True if successful | |
| """ | |
| domain = domain.lower().strip() | |
| self.authorizations[domain] = { | |
| 'email': email, | |
| 'added': datetime.utcnow().isoformat(), | |
| 'expiry': (datetime.utcnow() + timedelta(days=days_valid)).isoformat(), | |
| 'days_valid': days_valid, | |
| 'proof_path': proof_path, | |
| 'notes': notes, | |
| 'status': 'active' | |
| } | |
| self._save_authorizations() | |
| logger.info(f"Added authorization for {domain} (expires in {days_valid} days)") | |
| return True | |
| def is_authorized(self, domain: str) -> Tuple[bool, str]: | |
| """ | |
| Check if domain is authorized for scanning | |
| Args: | |
| domain: Domain to check | |
| Returns: | |
| tuple: (is_authorized, reason) | |
| """ | |
| domain = domain.lower().strip() | |
| # Check exact match | |
| if domain in self.authorizations: | |
| return self._check_authorization_status(domain) | |
| # Check wildcard matches (*.example.com) | |
| for auth_domain, auth_data in self.authorizations.items(): | |
| if auth_domain.startswith('*.'): | |
| base_domain = auth_domain[2:] | |
| if domain.endswith(f'.{base_domain}') or domain == base_domain: | |
| return self._check_authorization_status(auth_domain) | |
| return False, "No authorization found for this domain" | |
| def _check_authorization_status(self, domain: str) -> Tuple[bool, str]: | |
| """Check authorization status for a specific domain""" | |
| auth = self.authorizations[domain] | |
| # Check if explicitly revoked | |
| if auth.get('status') == 'revoked': | |
| return False, "Authorization has been revoked" | |
| # Check expiry | |
| expiry = datetime.fromisoformat(auth['expiry']) | |
| if datetime.utcnow() > expiry: | |
| return False, f"Authorization expired on {expiry.strftime('%Y-%m-%d')}" | |
| days_remaining = (expiry - datetime.utcnow()).days | |
| return True, f"Authorized (expires in {days_remaining} days)" | |
| def revoke_authorization(self, domain: str) -> bool: | |
| """ | |
| Revoke authorization for a domain | |
| Args: | |
| domain: Domain to revoke | |
| Returns: | |
| bool: True if successful | |
| """ | |
| domain = domain.lower().strip() | |
| if domain in self.authorizations: | |
| self.authorizations[domain]['status'] = 'revoked' | |
| self.authorizations[domain]['revoked_at'] = datetime.utcnow().isoformat() | |
| self._save_authorizations() | |
| logger.info(f"Revoked authorization for {domain}") | |
| return True | |
| logger.warning(f"Cannot revoke: {domain} not found") | |
| return False | |
| def extend_authorization(self, domain: str, days: int = 30) -> bool: | |
| """ | |
| Extend authorization expiry | |
| Args: | |
| domain: Domain to extend | |
| days: Additional days | |
| Returns: | |
| bool: True if successful | |
| """ | |
| domain = domain.lower().strip() | |
| if domain in self.authorizations: | |
| current_expiry = datetime.fromisoformat(self.authorizations[domain]['expiry']) | |
| new_expiry = current_expiry + timedelta(days=days) | |
| self.authorizations[domain]['expiry'] = new_expiry.isoformat() | |
| self._save_authorizations() | |
| logger.info(f"Extended authorization for {domain} by {days} days") | |
| return True | |
| logger.warning(f"Cannot extend: {domain} not found") | |
| return False | |
| def list_authorizations(self, active_only: bool = True) -> List[Dict]: | |
| """ | |
| List all authorizations | |
| Args: | |
| active_only: Only show active authorizations | |
| Returns: | |
| list: List of authorization records | |
| """ | |
| active = [] | |
| for domain, auth in self.authorizations.items(): | |
| if active_only and auth.get('status') != 'active': | |
| continue | |
| expiry = datetime.fromisoformat(auth['expiry']) | |
| days_remaining = (expiry - datetime.utcnow()).days | |
| active.append({ | |
| 'domain': domain, | |
| 'email': auth['email'], | |
| 'added': auth['added'], | |
| 'expiry': auth['expiry'], | |
| 'days_remaining': days_remaining, | |
| 'status': auth.get('status', 'active'), | |
| 'proof_path': auth.get('proof_path', ''), | |
| 'notes': auth.get('notes', '') | |
| }) | |
| return sorted(active, key=lambda x: x['days_remaining']) | |
| def get_expiring_soon(self, days: int = 7) -> List[Dict]: | |
| """ | |
| Get authorizations expiring soon | |
| Args: | |
| days: Number of days to consider "soon" | |
| Returns: | |
| list: List of expiring authorizations | |
| """ | |
| expiring = [] | |
| for auth in self.list_authorizations(active_only=True): | |
| if 0 <= auth['days_remaining'] <= days: | |
| expiring.append(auth) | |
| return expiring | |
| def export_authorizations(self, output_path: str) -> bool: | |
| """ | |
| Export authorizations to a file | |
| Args: | |
| output_path: Path to export file | |
| Returns: | |
| bool: True if successful | |
| """ | |
| try: | |
| output = Path(output_path) | |
| output.parent.mkdir(parents=True, exist_ok=True) | |
| with open(output, 'w') as f: | |
| json.dump(self.authorizations, f, indent=2, default=str) | |
| logger.info(f"Exported authorizations to {output_path}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to export authorizations: {e}") | |
| return False | |
| def import_authorizations(self, input_path: str, merge: bool = True) -> bool: | |
| """ | |
| Import authorizations from a file | |
| Args: | |
| input_path: Path to import file | |
| merge: Merge with existing (True) or replace (False) | |
| Returns: | |
| bool: True if successful | |
| """ | |
| try: | |
| input_file = Path(input_path) | |
| if not input_file.exists(): | |
| logger.error(f"Import file not found: {input_path}") | |
| return False | |
| with open(input_file) as f: | |
| imported = json.load(f) | |
| if merge: | |
| self.authorizations.update(imported) | |
| else: | |
| self.authorizations = imported | |
| self._save_authorizations() | |
| logger.info(f"Imported authorizations from {input_path}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to import authorizations: {e}") | |
| return False | |
| # Global instance | |
| _auth_manager = None | |
| def get_auth_manager(auth_file: str = "authorizations.json") -> AuthorizationManager: | |
| """Get or create authorization manager instance""" | |
| global _auth_manager | |
| if _auth_manager is None: | |
| _auth_manager = AuthorizationManager(auth_file) | |
| return _auth_manager | |
| # Convenience functions | |
| def is_authorized(domain: str) -> Tuple[bool, str]: | |
| """Check if domain is authorized""" | |
| manager = get_auth_manager() | |
| return manager.is_authorized(domain) | |
| def add_authorization(domain: str, email: str, days_valid: int = 30) -> bool: | |
| """Add authorization""" | |
| manager = get_auth_manager() | |
| return manager.add_authorization(domain, email, days_valid) |