File size: 6,210 Bytes
c05ab2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
651af0e
c05ab2d
 
651af0e
 
 
 
c05ab2d
651af0e
c05ab2d
 
 
 
 
 
651af0e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c05ab2d
 
 
 
 
 
 
 
 
 
651af0e
c05ab2d
651af0e
 
 
 
c05ab2d
 
 
 
 
 
 
 
 
 
651af0e
c05ab2d
 
 
 
 
651af0e
 
 
 
 
c05ab2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
651af0e
 
c05ab2d
 
2bccb50
c05ab2d
 
 
2bccb50
 
 
 
 
c05ab2d
 
 
 
 
 
 
 
 
2bccb50
 
 
 
 
 
 
 
 
c05ab2d
 
2bccb50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c05ab2d
 
2bccb50
 
 
 
c05ab2d
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
"""
RSA Decryption utilities for the URL Blink application.
"""
import base64
import json
import os
import logging
from typing import Any, Optional
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.backends import default_backend

logger = logging.getLogger(__name__)

# Path to the private key file
PRIVATE_KEY_PATH = os.getenv("PRIVATE_KEY_PATH", "./PRIVATE_KEY.pem")

# Cache the private key after first load
_private_key = None


def load_private_key():
    """
    Load the RSA private key from environment variable or PEM file.
    Caches the key for subsequent calls.
    
    Priority:
    1. PRIVATE_KEY environment variable (PEM content)
    2. PRIVATE_KEY_PATH file
    
    Returns:
        RSA private key object, or None if not available
    """
    global _private_key
    
    if _private_key is not None:
        return _private_key
    
    # Try loading from environment variable first
    private_key_pem = os.getenv("PRIVATE_KEY")
    if private_key_pem:
        try:
            _private_key = serialization.load_pem_private_key(
                private_key_pem.encode(),
                password=None,
                backend=default_backend()
            )
            logger.info("Successfully loaded private key from PRIVATE_KEY env variable")
            return _private_key
        except Exception as e:
            logger.warning(f"Failed to load private key from env: {e}")
    
    # Try loading from file
    try:
        with open(PRIVATE_KEY_PATH, "rb") as key_file:
            _private_key = serialization.load_pem_private_key(
                key_file.read(),
                password=None,
                backend=default_backend()
            )
        logger.info(f"Successfully loaded private key from {PRIVATE_KEY_PATH}")
        return _private_key
    except FileNotFoundError:
        logger.warning(f"Private key file not found: {PRIVATE_KEY_PATH}")
    except Exception as e:
        logger.warning(f"Failed to load private key from file: {e}")
    
    logger.warning("No private key available - encrypted data will not be decrypted")
    return None


def decrypt_data(encrypted_base64: str) -> Optional[Any]:
    """
    Decrypt base64-encoded RSA encrypted data.
    
    Args:
        encrypted_base64: Base64 URL-safe encoded encrypted string
        
    Returns:
        Decrypted data parsed as JSON, encrypted data if no key available, or None on error
    """
    try:
        # Load the private key
        private_key = load_private_key()
        
        # If no private key, return the encrypted data as-is
        if private_key is None:
            logger.warning("No private key - returning encrypted data")
            return {"encrypted_data": encrypted_base64, "decryption_status": "no_key_available"}
        
        # Decode base64 URL-safe encoded data
        # Add padding if necessary
        padded = encrypted_base64 + '=' * (4 - len(encrypted_base64) % 4)
        encrypted_bytes = base64.urlsafe_b64decode(padded)
        
        # Decrypt using RSA OAEP with SHA256
        decrypted_bytes = private_key.decrypt(
            encrypted_bytes,
            padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )
        
        # Decode to string
        decrypted_str = decrypted_bytes.decode('utf-8')
        
        # Try to parse as JSON
        try:
            return json.loads(decrypted_str)
        except json.JSONDecodeError:
            # Return as raw string if not valid JSON
            logger.warning("Decrypted data is not valid JSON, returning raw string")
            return {"raw_data": decrypted_str}
            
    except Exception as e:
        logger.error(f"Decryption failed: {e}")
        # Return encrypted data on failure
        return {"encrypted_data": encrypted_base64, "decryption_error": str(e)}


def decrypt_multiple_blocks(encrypted_data: str) -> list[Any]:
    """
    Decrypt multiple concatenated encrypted blocks.
    
    RSA 2048-bit encrypted data is 256 bytes, which can be:
    - 344 chars in base64 with padding
    - 342 chars in base64 without padding (URL-safe)
    
    This function tries multiple block sizes to find the right one.
    
    Args:
        encrypted_data: Concatenated base64-encoded encrypted blocks
        
    Returns:
        List of decrypted data objects
    """
    results = []
    
    # Common block sizes for RSA-2048 in base64
    # 344 = with padding, 342 = without padding
    POSSIBLE_BLOCK_SIZES = [344, 342, 343, 256]
    
    # First, try to decrypt as a single block
    if len(encrypted_data) <= 350:
        result = decrypt_data(encrypted_data)
        if result:
            results.append(result)
        return results
    
    # Try each possible block size
    for block_size in POSSIBLE_BLOCK_SIZES:
        # Check if data length is divisible by block size
        if len(encrypted_data) % block_size == 0:
            blocks_results = []
            success = True
            
            for i in range(0, len(encrypted_data), block_size):
                block = encrypted_data[i:i + block_size]
                result = decrypt_data(block)
                if result and "decryption_error" not in result:
                    blocks_results.append(result)
                else:
                    success = False
                    break
            
            if success and blocks_results:
                logger.info(f"Successfully decrypted {len(blocks_results)} blocks with block_size={block_size}")
                return blocks_results
    
    # Fallback: try to decrypt with default block size 344, collecting all results
    block_size = 344
    logger.warning(f"Falling back to block_size={block_size}, data length={len(encrypted_data)}")
    
    for i in range(0, len(encrypted_data), block_size):
        block = encrypted_data[i:i + block_size]
        if block:
            result = decrypt_data(block)
            if result:
                results.append(result)
    
    return results