jebin2 commited on
Commit
24b2623
·
1 Parent(s): 2bccb50

Fix encryption: support direct RSA-OAEP and hybrid RSA+AES-GCM modes

Browse files
Files changed (1) hide show
  1. encryption.py +110 -74
encryption.py CHANGED
@@ -1,5 +1,9 @@
1
  """
2
- RSA Decryption utilities for the URL Blink application.
 
 
 
 
3
  """
4
  import base64
5
  import json
@@ -8,6 +12,7 @@ import logging
8
  from typing import Any, Optional
9
  from cryptography.hazmat.primitives import serialization, hashes
10
  from cryptography.hazmat.primitives.asymmetric import padding
 
11
  from cryptography.hazmat.backends import default_backend
12
 
13
  logger = logging.getLogger(__name__)
@@ -69,66 +74,122 @@ def load_private_key():
69
  return None
70
 
71
 
72
- def decrypt_data(encrypted_base64: str) -> Optional[Any]:
73
  """
74
- Decrypt base64-encoded RSA encrypted data.
75
 
76
  Args:
77
- encrypted_base64: Base64 URL-safe encoded encrypted string
 
78
 
79
  Returns:
80
- Decrypted data parsed as JSON, encrypted data if no key available, or None on error
81
  """
82
- try:
83
- # Load the private key
84
- private_key = load_private_key()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
85
 
86
- # If no private key, return the encrypted data as-is
87
- if private_key is None:
88
- logger.warning("No private key - returning encrypted data")
89
- return {"encrypted_data": encrypted_base64, "decryption_status": "no_key_available"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90
 
91
- # Decode base64 URL-safe encoded data
92
- # Add padding if necessary
93
- padded = encrypted_base64 + '=' * (4 - len(encrypted_base64) % 4)
94
- encrypted_bytes = base64.urlsafe_b64decode(padded)
 
 
 
 
 
 
 
 
 
 
95
 
96
- # Decrypt using RSA OAEP with SHA256
97
- decrypted_bytes = private_key.decrypt(
98
- encrypted_bytes,
99
- padding.OAEP(
100
- mgf=padding.MGF1(algorithm=hashes.SHA256()),
101
- algorithm=hashes.SHA256(),
102
- label=None
103
- )
104
- )
105
 
106
- # Decode to string
107
- decrypted_str = decrypted_bytes.decode('utf-8')
 
 
 
 
 
108
 
109
- # Try to parse as JSON
110
  try:
111
  return json.loads(decrypted_str)
112
  except json.JSONDecodeError:
113
- # Return as raw string if not valid JSON
114
- logger.warning("Decrypted data is not valid JSON, returning raw string")
115
  return {"raw_data": decrypted_str}
116
 
117
  except Exception as e:
118
  logger.error(f"Decryption failed: {e}")
119
- # Return encrypted data on failure
120
- return {"encrypted_data": encrypted_base64, "decryption_error": str(e)}
121
 
122
 
123
  def decrypt_multiple_blocks(encrypted_data: str) -> list[Any]:
124
  """
125
  Decrypt multiple concatenated encrypted blocks.
126
 
127
- RSA 2048-bit encrypted data is 256 bytes, which can be:
128
- - 344 chars in base64 with padding
129
- - 342 chars in base64 without padding (URL-safe)
130
-
131
- This function tries multiple block sizes to find the right one.
132
 
133
  Args:
134
  encrypted_data: Concatenated base64-encoded encrypted blocks
@@ -138,46 +199,21 @@ def decrypt_multiple_blocks(encrypted_data: str) -> list[Any]:
138
  """
139
  results = []
140
 
141
- # Common block sizes for RSA-2048 in base64
142
- # 344 = with padding, 342 = without padding
143
- POSSIBLE_BLOCK_SIZES = [344, 342, 343, 256]
 
 
 
 
144
 
145
- # First, try to decrypt as a single block
146
- if len(encrypted_data) <= 350:
 
 
 
147
  result = decrypt_data(encrypted_data)
148
  if result:
149
  results.append(result)
150
- return results
151
-
152
- # Try each possible block size
153
- for block_size in POSSIBLE_BLOCK_SIZES:
154
- # Check if data length is divisible by block size
155
- if len(encrypted_data) % block_size == 0:
156
- blocks_results = []
157
- success = True
158
-
159
- for i in range(0, len(encrypted_data), block_size):
160
- block = encrypted_data[i:i + block_size]
161
- result = decrypt_data(block)
162
- if result and "decryption_error" not in result:
163
- blocks_results.append(result)
164
- else:
165
- success = False
166
- break
167
-
168
- if success and blocks_results:
169
- logger.info(f"Successfully decrypted {len(blocks_results)} blocks with block_size={block_size}")
170
- return blocks_results
171
-
172
- # Fallback: try to decrypt with default block size 344, collecting all results
173
- block_size = 344
174
- logger.warning(f"Falling back to block_size={block_size}, data length={len(encrypted_data)}")
175
-
176
- for i in range(0, len(encrypted_data), block_size):
177
- block = encrypted_data[i:i + block_size]
178
- if block:
179
- result = decrypt_data(block)
180
- if result:
181
- results.append(result)
182
 
183
  return results
 
1
  """
2
+ RSA/Hybrid Decryption utilities for the URL Blink application.
3
+
4
+ Supports two encryption modes from the client:
5
+ 1. Direct RSA-OAEP (for data ≤ 190 bytes)
6
+ 2. Hybrid RSA-OAEP + AES-GCM (for data > 190 bytes)
7
  """
8
  import base64
9
  import json
 
12
  from typing import Any, Optional
13
  from cryptography.hazmat.primitives import serialization, hashes
14
  from cryptography.hazmat.primitives.asymmetric import padding
15
+ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
16
  from cryptography.hazmat.backends import default_backend
17
 
18
  logger = logging.getLogger(__name__)
 
74
  return None
75
 
76
 
77
+ def decrypt_direct(payload: dict, private_key) -> str:
78
  """
79
+ Decrypt directly RSA-OAEP encrypted data.
80
 
81
  Args:
82
+ payload: Dict with 'data' field containing base64 RSA-encrypted data
83
+ private_key: RSA private key object
84
 
85
  Returns:
86
+ Decrypted plaintext string
87
  """
88
+ encrypted_bytes = base64.b64decode(payload['data'])
89
+ decrypted = private_key.decrypt(
90
+ encrypted_bytes,
91
+ padding.OAEP(
92
+ mgf=padding.MGF1(algorithm=hashes.SHA256()),
93
+ algorithm=hashes.SHA256(),
94
+ label=None
95
+ )
96
+ )
97
+ return decrypted.decode('utf-8')
98
+
99
+
100
+ def decrypt_hybrid(payload: dict, private_key) -> str:
101
+ """
102
+ Decrypt hybrid RSA+AES-GCM encrypted data.
103
+
104
+ Args:
105
+ payload: Dict with 'key' (RSA-encrypted AES key), 'iv', and 'data' (AES-encrypted)
106
+ private_key: RSA private key object
107
 
108
+ Returns:
109
+ Decrypted plaintext string
110
+ """
111
+ # 1. Decrypt the AES key with RSA-OAEP
112
+ encrypted_aes_key = base64.b64decode(payload['key'])
113
+ aes_key = private_key.decrypt(
114
+ encrypted_aes_key,
115
+ padding.OAEP(
116
+ mgf=padding.MGF1(algorithm=hashes.SHA256()),
117
+ algorithm=hashes.SHA256(),
118
+ label=None
119
+ )
120
+ )
121
+
122
+ # 2. Decrypt the data with AES-GCM
123
+ iv = base64.b64decode(payload['iv'])
124
+ encrypted_data = base64.b64decode(payload['data'])
125
+
126
+ # AES-GCM: the tag is appended to the ciphertext (last 16 bytes)
127
+ # Split ciphertext and tag
128
+ tag = encrypted_data[-16:]
129
+ ciphertext = encrypted_data[:-16]
130
+
131
+ cipher = Cipher(
132
+ algorithms.AES(aes_key),
133
+ modes.GCM(iv, tag),
134
+ backend=default_backend()
135
+ )
136
+ decryptor = cipher.decryptor()
137
+ decrypted = decryptor.update(ciphertext) + decryptor.finalize()
138
+
139
+ return decrypted.decode('utf-8')
140
+
141
+
142
+ def decrypt_data(encrypted_base64: str) -> Optional[Any]:
143
+ """
144
+ Decrypt data encrypted by the client usageService.
145
+
146
+ The encrypted data format is: btoa(JSON.stringify({ type: 'direct'|'hybrid', ... }))
147
+
148
+ Args:
149
+ encrypted_base64: The outer base64 string from the client
150
 
151
+ Returns:
152
+ Decrypted data parsed as JSON, or error info if decryption fails
153
+ """
154
+ private_key = load_private_key()
155
+
156
+ # If no private key, return the encrypted data as-is
157
+ if private_key is None:
158
+ logger.warning("No private key - returning encrypted data")
159
+ return {"encrypted_data": encrypted_base64, "decryption_status": "no_key_available"}
160
+
161
+ try:
162
+ # Decode outer base64 and parse JSON
163
+ outer_json = base64.b64decode(encrypted_base64).decode('utf-8')
164
+ payload = json.loads(outer_json)
165
 
166
+ encryption_type = payload.get('type')
 
 
 
 
 
 
 
 
167
 
168
+ if encryption_type == 'direct':
169
+ decrypted_str = decrypt_direct(payload, private_key)
170
+ elif encryption_type == 'hybrid':
171
+ decrypted_str = decrypt_hybrid(payload, private_key)
172
+ else:
173
+ logger.error(f"Unknown encryption type: {encryption_type}")
174
+ return {"encrypted_data": encrypted_base64, "decryption_error": f"Unknown type: {encryption_type}"}
175
 
176
+ # Try to parse decrypted string as JSON
177
  try:
178
  return json.loads(decrypted_str)
179
  except json.JSONDecodeError:
 
 
180
  return {"raw_data": decrypted_str}
181
 
182
  except Exception as e:
183
  logger.error(f"Decryption failed: {e}")
184
+ return {"encrypted_data": encrypted_base64[:100] + "...", "decryption_error": str(e)}
 
185
 
186
 
187
  def decrypt_multiple_blocks(encrypted_data: str) -> list[Any]:
188
  """
189
  Decrypt multiple concatenated encrypted blocks.
190
 
191
+ Each block is a complete base64 JSON payload. Since blocks can vary in size,
192
+ we need to find block boundaries by looking for valid JSON structures.
 
 
 
193
 
194
  Args:
195
  encrypted_data: Concatenated base64-encoded encrypted blocks
 
199
  """
200
  results = []
201
 
202
+ # If it looks like a single block (starts with valid base64 for JSON)
203
+ # Try single block first
204
+ if encrypted_data:
205
+ result = decrypt_data(encrypted_data)
206
+ if result and "decryption_error" not in result:
207
+ results.append(result)
208
+ return results
209
 
210
+ # If single block fails, the data might be multiple blocks
211
+ # Since each block is base64(JSON), we need to find block boundaries
212
+ # Common approach: try to find where one base64 ends and another begins
213
+ # For now, store the error result from single attempt
214
+ if not results:
215
  result = decrypt_data(encrypted_data)
216
  if result:
217
  results.append(result)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
218
 
219
  return results