File size: 5,082 Bytes
5e56bcf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""
upif.core.licensing
~~~~~~~~~~~~~~~~~~~

Manages Commercial Licensing and Feature Unlocking.
Integrates with Gumroad API for key verification and uses local AES encryption
to cache license state offline.

:copyright: (c) 2025 Yash Dhone.
:license: Proprietary, see LICENSE for details.
"""

import os
import json
import logging
import requests
import base64
from typing import Optional, Dict
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC

logger = logging.getLogger("upif.licensing")

class LicenseManager:
    """
    License Manager.
    
    Security Note: 
    In a real-world scenario, the '_INTERNAL_KEY' and salt should be:
    1. Obfuscated via Cython compilation (which we do in build.py).
    2. Ideally fetched from a separate secure enclave or injected at build time.
    """
    
    # Placeholder: Developer should replace this
    PRODUCT_PERMALINK = "xokvjp" 
    
    # Obfuscated internal key material (Static for Demo)
    _INTERNAL_SALT = b'upif_secure_salt_2025'
    _INTERNAL_KEY = b'static_key_for_demo_purposes_only'

    def __init__(self, storage_path: str = ".upif_license.enc"):
        self.storage_path = storage_path
        self._cipher = self._get_cipher()
        self.license_data: Optional[Dict] = None

    def _get_cipher(self) -> Fernet:
        """Derives a strong AES-256 key from the internal secret using PBKDF2."""
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=self._INTERNAL_SALT,
            iterations=100000,
        )
        key = base64.urlsafe_b64encode(kdf.derive(self._INTERNAL_KEY))
        return Fernet(key)

    def activate(self, license_key: str, product_permalink: Optional[str] = None) -> bool:
        """
        Activates the license online via Gumroad API.
        
        Args:
            license_key (str): The user's key (e.g. from email).
            product_permalink (str, optional): Override product ID.

        Returns:
            bool: True if activation successful.
        """
        permalink = product_permalink or self.PRODUCT_PERMALINK
        url = "https://api.gumroad.com/v2/licenses/verify"
        
        try:
            logger.info(f"Contacting License Server...")
            # Timeout is critical to avoid hanging app startup
            response = requests.post(url, data={
                "product_permalink": permalink,
                "license_key": license_key
            }, timeout=10)
            
            data = response.json()
            
            if data.get("success") is True:
                # Valid License: Cache minimal data locally
                self.license_data = {
                    "key": license_key,
                    "uses": data.get("uses"),
                    "email": data.get("purchase", {}).get("email"),
                    "timestamp": data.get("purchase", {}).get("created_at"),
                    "tier": "PRO" # Logic to distinguish tiers could go here
                }
                self._save_local()
                logger.info("License Activated. PRO Features Unlocked.")
                return True
            else:
                logger.error(f"License Activation Failed: {data.get('message')}")
                return False
                
        except Exception as e:
            logger.error(f"License Verification Network Error: {e}")
            return False

    def validate_offline(self) -> bool:
        """
        Checks for a valid, tamper-evident local license file.
        Useful for air-gapped or offline startups.
        """
        if self.license_data:
            return True
        return self._load_local()

    def _save_local(self) -> None:
        """Encrypts and persists the license state."""
        if not self.license_data:
            return
            
        try:
            raw_json = json.dumps(self.license_data).encode('utf-8')
            encrypted = self._cipher.encrypt(raw_json)
            with open(self.storage_path, "wb") as f:
                f.write(encrypted)
        except Exception as e:
            logger.error(f"Failed to persist license: {e}")

    def _load_local(self) -> bool:
        """Decrypts and validates the local license file."""
        if not os.path.exists(self.storage_path):
            return False
            
        try:
            with open(self.storage_path, "rb") as f:
                encrypted = f.read()
                
            decrypted = self._cipher.decrypt(encrypted)
            self.license_data = json.loads(decrypted.decode('utf-8'))
            return True
        except Exception:
            # Decryption failure implies file tampering or wrong key
            logger.warning("Local license file corrupted or tampered.")
            return False

    def get_tier(self) -> str:
        """Returns 'PRO' or 'BASELINE'."""
        if self.validate_offline():
            return self.license_data.get("tier", "BASELINE")
        return "BASELINE"