jebin2 commited on
Commit
7c8be99
Β·
1 Parent(s): a83c10f

worker test

Browse files
Files changed (1) hide show
  1. tests/test_encryption_service.py +529 -0
tests/test_encryption_service.py ADDED
@@ -0,0 +1,529 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Rigorous Tests for Encryption Service.
3
+
4
+ Tests cover:
5
+ 1. Private key loading (env, file, caching)
6
+ 2. Direct RSA-OAEP decryption
7
+ 3. Hybrid RSA+AES-GCM decryption
8
+ 4. Main decrypt_data entry point
9
+ 5. Multiple block decryption
10
+ 6. Error handling and edge cases
11
+
12
+ Uses real cryptographic operations with test keypairs.
13
+ """
14
+ import pytest
15
+ import base64
16
+ import json
17
+ import os
18
+ import tempfile
19
+ from unittest.mock import patch, MagicMock
20
+ from cryptography.hazmat.primitives import serialization, hashes
21
+ from cryptography.hazmat.primitives.asymmetric import rsa, padding
22
+ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
23
+ from cryptography.hazmat.backends import default_backend
24
+
25
+
26
+ # =============================================================================
27
+ # Test Fixtures - Generate RSA keypair for testing
28
+ # =============================================================================
29
+
30
+ @pytest.fixture(scope="module")
31
+ def test_keypair():
32
+ """Generate RSA keypair for testing."""
33
+ private_key = rsa.generate_private_key(
34
+ public_exponent=65537,
35
+ key_size=2048,
36
+ backend=default_backend()
37
+ )
38
+ public_key = private_key.public_key()
39
+
40
+ # Get PEM encoded private key
41
+ private_pem = private_key.private_bytes(
42
+ encoding=serialization.Encoding.PEM,
43
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
44
+ encryption_algorithm=serialization.NoEncryption()
45
+ ).decode('utf-8')
46
+
47
+ return {
48
+ "private_key": private_key,
49
+ "public_key": public_key,
50
+ "private_pem": private_pem
51
+ }
52
+
53
+
54
+ def encrypt_direct(public_key, plaintext: str) -> str:
55
+ """Encrypt data using RSA-OAEP (for testing)."""
56
+ encrypted = public_key.encrypt(
57
+ plaintext.encode('utf-8'),
58
+ padding.OAEP(
59
+ mgf=padding.MGF1(algorithm=hashes.SHA256()),
60
+ algorithm=hashes.SHA256(),
61
+ label=None
62
+ )
63
+ )
64
+ payload = {
65
+ "type": "direct",
66
+ "data": base64.b64encode(encrypted).decode('utf-8')
67
+ }
68
+ return base64.b64encode(json.dumps(payload).encode('utf-8')).decode('utf-8')
69
+
70
+
71
+ def encrypt_hybrid(public_key, plaintext: str) -> str:
72
+ """Encrypt data using hybrid RSA+AES-GCM (for testing)."""
73
+ # Generate random AES key and IV
74
+ aes_key = os.urandom(32) # 256-bit AES key
75
+ iv = os.urandom(12) # 96-bit IV for GCM
76
+
77
+ # Encrypt plaintext with AES-GCM
78
+ cipher = Cipher(
79
+ algorithms.AES(aes_key),
80
+ modes.GCM(iv),
81
+ backend=default_backend()
82
+ )
83
+ encryptor = cipher.encryptor()
84
+ ciphertext = encryptor.update(plaintext.encode('utf-8')) + encryptor.finalize()
85
+
86
+ # Append auth tag to ciphertext
87
+ encrypted_data = ciphertext + encryptor.tag
88
+
89
+ # Encrypt AES key with RSA-OAEP
90
+ encrypted_aes_key = public_key.encrypt(
91
+ aes_key,
92
+ padding.OAEP(
93
+ mgf=padding.MGF1(algorithm=hashes.SHA256()),
94
+ algorithm=hashes.SHA256(),
95
+ label=None
96
+ )
97
+ )
98
+
99
+ payload = {
100
+ "type": "hybrid",
101
+ "key": base64.b64encode(encrypted_aes_key).decode('utf-8'),
102
+ "iv": base64.b64encode(iv).decode('utf-8'),
103
+ "data": base64.b64encode(encrypted_data).decode('utf-8')
104
+ }
105
+ return base64.b64encode(json.dumps(payload).encode('utf-8')).decode('utf-8')
106
+
107
+
108
+ # =============================================================================
109
+ # 1. Private Key Loading Tests
110
+ # =============================================================================
111
+
112
+ class TestPrivateKeyLoading:
113
+ """Test load_private_key function."""
114
+
115
+ def test_load_key_from_env_variable(self, test_keypair):
116
+ """Load private key from PRIVATE_KEY env variable."""
117
+ import services.encryption_service as es
118
+ es._private_key = None # Reset cache
119
+
120
+ with patch.dict(os.environ, {"PRIVATE_KEY": test_keypair["private_pem"]}):
121
+ key = es.load_private_key()
122
+ assert key is not None
123
+
124
+ def test_load_key_from_file(self, test_keypair):
125
+ """Load private key from file when env var missing."""
126
+ import services.encryption_service as es
127
+ es._private_key = None # Reset cache
128
+
129
+ with tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False) as f:
130
+ f.write(test_keypair["private_pem"])
131
+ temp_path = f.name
132
+
133
+ try:
134
+ with patch.dict(os.environ, {}, clear=True):
135
+ os.environ.pop("PRIVATE_KEY", None)
136
+ with patch.object(es, 'PRIVATE_KEY_PATH', temp_path):
137
+ es._private_key = None
138
+ key = es.load_private_key()
139
+ assert key is not None
140
+ finally:
141
+ os.unlink(temp_path)
142
+
143
+ def test_returns_none_when_no_key(self):
144
+ """Return None when both env and file are missing."""
145
+ import services.encryption_service as es
146
+ es._private_key = None # Reset cache
147
+
148
+ with patch.dict(os.environ, {}, clear=True):
149
+ os.environ.pop("PRIVATE_KEY", None)
150
+ with patch.object(es, 'PRIVATE_KEY_PATH', '/nonexistent/path.pem'):
151
+ es._private_key = None
152
+ key = es.load_private_key()
153
+ assert key is None
154
+
155
+ def test_key_is_cached(self, test_keypair):
156
+ """Key is cached after first load."""
157
+ import services.encryption_service as es
158
+ es._private_key = None # Reset cache
159
+
160
+ with patch.dict(os.environ, {"PRIVATE_KEY": test_keypair["private_pem"]}):
161
+ key1 = es.load_private_key()
162
+ key2 = es.load_private_key()
163
+ assert key1 is key2
164
+
165
+ def test_invalid_pem_handling(self):
166
+ """Invalid PEM content falls back to file."""
167
+ import services.encryption_service as es
168
+ es._private_key = None # Reset cache
169
+
170
+ with patch.dict(os.environ, {"PRIVATE_KEY": "not-valid-pem"}):
171
+ with patch.object(es, 'PRIVATE_KEY_PATH', '/nonexistent/path.pem'):
172
+ es._private_key = None
173
+ key = es.load_private_key()
174
+ assert key is None # Falls through to None
175
+
176
+
177
+ # =============================================================================
178
+ # 2. Direct RSA Decryption Tests
179
+ # =============================================================================
180
+
181
+ class TestDirectDecryption:
182
+ """Test decrypt_direct function."""
183
+
184
+ def test_decrypt_valid_rsa_data(self, test_keypair):
185
+ """Decrypt valid RSA-OAEP encrypted data."""
186
+ from services.encryption_service import decrypt_direct
187
+
188
+ plaintext = "Hello, World!"
189
+ encrypted = test_keypair["public_key"].encrypt(
190
+ plaintext.encode('utf-8'),
191
+ padding.OAEP(
192
+ mgf=padding.MGF1(algorithm=hashes.SHA256()),
193
+ algorithm=hashes.SHA256(),
194
+ label=None
195
+ )
196
+ )
197
+
198
+ payload = {"data": base64.b64encode(encrypted).decode('utf-8')}
199
+ result = decrypt_direct(payload, test_keypair["private_key"])
200
+
201
+ assert result == plaintext
202
+
203
+ def test_invalid_base64(self, test_keypair):
204
+ """Handle invalid base64 input."""
205
+ from services.encryption_service import decrypt_direct
206
+
207
+ payload = {"data": "not-valid-base64!!!"}
208
+
209
+ with pytest.raises(Exception):
210
+ decrypt_direct(payload, test_keypair["private_key"])
211
+
212
+ def test_corrupted_encrypted_data(self, test_keypair):
213
+ """Handle corrupted encrypted data."""
214
+ from services.encryption_service import decrypt_direct
215
+
216
+ # Random bytes that aren't valid RSA ciphertext
217
+ payload = {"data": base64.b64encode(os.urandom(256)).decode('utf-8')}
218
+
219
+ with pytest.raises(Exception):
220
+ decrypt_direct(payload, test_keypair["private_key"])
221
+
222
+
223
+ # =============================================================================
224
+ # 3. Hybrid RSA+AES-GCM Decryption Tests
225
+ # =============================================================================
226
+
227
+ class TestHybridDecryption:
228
+ """Test decrypt_hybrid function."""
229
+
230
+ def test_decrypt_valid_hybrid_data(self, test_keypair):
231
+ """Decrypt valid hybrid RSA+AES-GCM data."""
232
+ from services.encryption_service import decrypt_hybrid
233
+
234
+ plaintext = "This is a longer message that exceeds 190 bytes and needs hybrid encryption!"
235
+
236
+ # Encrypt with test helper
237
+ aes_key = os.urandom(32)
238
+ iv = os.urandom(12)
239
+
240
+ cipher = Cipher(algorithms.AES(aes_key), modes.GCM(iv), backend=default_backend())
241
+ encryptor = cipher.encryptor()
242
+ ciphertext = encryptor.update(plaintext.encode('utf-8')) + encryptor.finalize()
243
+ encrypted_data = ciphertext + encryptor.tag
244
+
245
+ encrypted_aes_key = test_keypair["public_key"].encrypt(
246
+ aes_key,
247
+ padding.OAEP(
248
+ mgf=padding.MGF1(algorithm=hashes.SHA256()),
249
+ algorithm=hashes.SHA256(),
250
+ label=None
251
+ )
252
+ )
253
+
254
+ payload = {
255
+ "key": base64.b64encode(encrypted_aes_key).decode('utf-8'),
256
+ "iv": base64.b64encode(iv).decode('utf-8'),
257
+ "data": base64.b64encode(encrypted_data).decode('utf-8')
258
+ }
259
+
260
+ result = decrypt_hybrid(payload, test_keypair["private_key"])
261
+ assert result == plaintext
262
+
263
+ def test_tampered_ciphertext_fails(self, test_keypair):
264
+ """Tampered ciphertext fails GCM authentication."""
265
+ from services.encryption_service import decrypt_hybrid
266
+
267
+ plaintext = "Original message"
268
+ aes_key = os.urandom(32)
269
+ iv = os.urandom(12)
270
+
271
+ cipher = Cipher(algorithms.AES(aes_key), modes.GCM(iv), backend=default_backend())
272
+ encryptor = cipher.encryptor()
273
+ ciphertext = encryptor.update(plaintext.encode('utf-8')) + encryptor.finalize()
274
+ encrypted_data = ciphertext + encryptor.tag
275
+
276
+ # Tamper with ciphertext
277
+ tampered_data = bytearray(encrypted_data)
278
+ tampered_data[0] ^= 0xFF # Flip bits
279
+
280
+ encrypted_aes_key = test_keypair["public_key"].encrypt(
281
+ aes_key,
282
+ padding.OAEP(
283
+ mgf=padding.MGF1(algorithm=hashes.SHA256()),
284
+ algorithm=hashes.SHA256(),
285
+ label=None
286
+ )
287
+ )
288
+
289
+ payload = {
290
+ "key": base64.b64encode(encrypted_aes_key).decode('utf-8'),
291
+ "iv": base64.b64encode(iv).decode('utf-8'),
292
+ "data": base64.b64encode(bytes(tampered_data)).decode('utf-8')
293
+ }
294
+
295
+ with pytest.raises(Exception): # GCM auth failure
296
+ decrypt_hybrid(payload, test_keypair["private_key"])
297
+
298
+ def test_invalid_aes_key(self, test_keypair):
299
+ """Handle corrupted/invalid AES key."""
300
+ from services.encryption_service import decrypt_hybrid
301
+
302
+ payload = {
303
+ "key": base64.b64encode(os.urandom(256)).decode('utf-8'), # Random, not RSA encrypted
304
+ "iv": base64.b64encode(os.urandom(12)).decode('utf-8'),
305
+ "data": base64.b64encode(os.urandom(100)).decode('utf-8')
306
+ }
307
+
308
+ with pytest.raises(Exception):
309
+ decrypt_hybrid(payload, test_keypair["private_key"])
310
+
311
+
312
+ # =============================================================================
313
+ # 4. Main decrypt_data Entry Point Tests
314
+ # =============================================================================
315
+
316
+ class TestDecryptData:
317
+ """Test decrypt_data main entry function."""
318
+
319
+ def test_no_key_returns_status(self):
320
+ """Return no_key_available when no private key."""
321
+ import services.encryption_service as es
322
+ es._private_key = None
323
+
324
+ with patch.object(es, 'load_private_key', return_value=None):
325
+ result = es.decrypt_data("some-encrypted-data")
326
+
327
+ assert result["decryption_status"] == "no_key_available"
328
+ assert "encrypted_data" in result
329
+
330
+ def test_decrypt_direct_type(self, test_keypair):
331
+ """Decrypt data with type='direct'."""
332
+ import services.encryption_service as es
333
+ es._private_key = test_keypair["private_key"]
334
+
335
+ plaintext = '{"message": "hello"}'
336
+ encrypted = encrypt_direct(test_keypair["public_key"], plaintext)
337
+
338
+ result = es.decrypt_data(encrypted)
339
+
340
+ assert result["message"] == "hello"
341
+
342
+ def test_decrypt_hybrid_type(self, test_keypair):
343
+ """Decrypt data with type='hybrid'."""
344
+ import services.encryption_service as es
345
+ es._private_key = test_keypair["private_key"]
346
+
347
+ plaintext = '{"data": "long message here"}'
348
+ encrypted = encrypt_hybrid(test_keypair["public_key"], plaintext)
349
+
350
+ result = es.decrypt_data(encrypted)
351
+
352
+ assert result["data"] == "long message here"
353
+
354
+ def test_unknown_type_returns_error(self, test_keypair):
355
+ """Unknown encryption type returns error."""
356
+ import services.encryption_service as es
357
+ es._private_key = test_keypair["private_key"]
358
+
359
+ payload = {"type": "unknown_type", "data": "something"}
360
+ encrypted = base64.b64encode(json.dumps(payload).encode()).decode()
361
+
362
+ result = es.decrypt_data(encrypted)
363
+
364
+ assert "decryption_error" in result
365
+ assert "unknown" in result["decryption_error"].lower()
366
+
367
+ def test_invalid_outer_base64(self, test_keypair):
368
+ """Invalid outer base64 returns error."""
369
+ import services.encryption_service as es
370
+ es._private_key = test_keypair["private_key"]
371
+
372
+ result = es.decrypt_data("not-valid-base64!!!")
373
+
374
+ assert "decryption_error" in result
375
+
376
+ def test_invalid_json_payload(self, test_keypair):
377
+ """Invalid JSON payload returns error."""
378
+ import services.encryption_service as es
379
+ es._private_key = test_keypair["private_key"]
380
+
381
+ # Valid base64 but not JSON
382
+ encrypted = base64.b64encode(b"not json content").decode()
383
+
384
+ result = es.decrypt_data(encrypted)
385
+
386
+ assert "decryption_error" in result
387
+
388
+ def test_non_json_decrypted_returns_raw(self, test_keypair):
389
+ """Non-JSON decrypted content returns raw_data."""
390
+ import services.encryption_service as es
391
+ es._private_key = test_keypair["private_key"]
392
+
393
+ plaintext = "just plain text, not JSON"
394
+ encrypted = encrypt_direct(test_keypair["public_key"], plaintext)
395
+
396
+ result = es.decrypt_data(encrypted)
397
+
398
+ assert result["raw_data"] == plaintext
399
+
400
+
401
+ # =============================================================================
402
+ # 5. Multiple Blocks Tests
403
+ # =============================================================================
404
+
405
+ class TestMultipleBlocks:
406
+ """Test decrypt_multiple_blocks function."""
407
+
408
+ def test_decrypt_multiple_valid_blocks(self, test_keypair):
409
+ """Decrypt multiple valid encrypted blocks."""
410
+ import services.encryption_service as es
411
+ es._private_key = test_keypair["private_key"]
412
+
413
+ plaintext1 = '{"id": 1}'
414
+ plaintext2 = '{"id": 2}'
415
+
416
+ encrypted1 = encrypt_direct(test_keypair["public_key"], plaintext1)
417
+ encrypted2 = encrypt_direct(test_keypair["public_key"], plaintext2)
418
+
419
+ combined = f"{encrypted1},{encrypted2}"
420
+
421
+ results = es.decrypt_multiple_blocks(combined)
422
+
423
+ assert len(results) == 2
424
+ assert results[0]["id"] == 1
425
+ assert results[1]["id"] == 2
426
+
427
+ def test_empty_input_returns_empty_list(self):
428
+ """Empty input returns empty list."""
429
+ import services.encryption_service as es
430
+
431
+ results = es.decrypt_multiple_blocks("")
432
+
433
+ assert results == []
434
+
435
+ def test_handles_whitespace(self, test_keypair):
436
+ """Handle extra whitespace in input."""
437
+ import services.encryption_service as es
438
+ es._private_key = test_keypair["private_key"]
439
+
440
+ plaintext = '{"id": 1}'
441
+ encrypted = encrypt_direct(test_keypair["public_key"], plaintext)
442
+
443
+ # Add whitespace
444
+ combined = f" {encrypted} , {encrypted} "
445
+
446
+ results = es.decrypt_multiple_blocks(combined)
447
+
448
+ assert len(results) == 2
449
+
450
+ def test_mixed_valid_invalid_blocks(self, test_keypair):
451
+ """Handle mixed valid and invalid blocks."""
452
+ import services.encryption_service as es
453
+ es._private_key = test_keypair["private_key"]
454
+
455
+ valid = encrypt_direct(test_keypair["public_key"], '{"valid": true}')
456
+ invalid = "not-valid-encrypted-data"
457
+
458
+ combined = f"{valid},{invalid}"
459
+
460
+ results = es.decrypt_multiple_blocks(combined)
461
+
462
+ assert len(results) == 2
463
+ assert results[0]["valid"] == True
464
+ assert "decryption_error" in results[1]
465
+
466
+
467
+ # =============================================================================
468
+ # 6. Edge Cases and Security Tests
469
+ # =============================================================================
470
+
471
+ class TestEdgeCases:
472
+ """Test edge cases and security scenarios."""
473
+
474
+ def test_empty_plaintext(self, test_keypair):
475
+ """Handle empty plaintext."""
476
+ import services.encryption_service as es
477
+ es._private_key = test_keypair["private_key"]
478
+
479
+ plaintext = ""
480
+ encrypted = encrypt_direct(test_keypair["public_key"], plaintext)
481
+
482
+ result = es.decrypt_data(encrypted)
483
+
484
+ assert result["raw_data"] == ""
485
+
486
+ def test_unicode_plaintext(self, test_keypair):
487
+ """Handle unicode plaintext."""
488
+ import services.encryption_service as es
489
+ es._private_key = test_keypair["private_key"]
490
+
491
+ plaintext = '{"emoji": "πŸ”πŸ”‘", "chinese": "εŠ ε―†"}'
492
+ encrypted = encrypt_direct(test_keypair["public_key"], plaintext)
493
+
494
+ result = es.decrypt_data(encrypted)
495
+
496
+ assert result["emoji"] == "πŸ”πŸ”‘"
497
+ assert result["chinese"] == "εŠ ε―†"
498
+
499
+ def test_large_payload_hybrid(self, test_keypair):
500
+ """Handle large payload with hybrid encryption."""
501
+ import services.encryption_service as es
502
+ es._private_key = test_keypair["private_key"]
503
+
504
+ # Create large payload (> 190 bytes which requires hybrid)
505
+ large_data = {"data": "x" * 1000}
506
+ plaintext = json.dumps(large_data)
507
+ encrypted = encrypt_hybrid(test_keypair["public_key"], plaintext)
508
+
509
+ result = es.decrypt_data(encrypted)
510
+
511
+ assert len(result["data"]) == 1000
512
+
513
+ def test_payload_at_rsa_limit(self, test_keypair):
514
+ """Handle payload near RSA size limit."""
515
+ import services.encryption_service as es
516
+ es._private_key = test_keypair["private_key"]
517
+
518
+ # RSA-OAEP with SHA-256 and 2048-bit key: max ~190 bytes
519
+ # Test with something just under
520
+ plaintext = '{"d":"' + 'x' * 150 + '"}'
521
+ encrypted = encrypt_direct(test_keypair["public_key"], plaintext)
522
+
523
+ result = es.decrypt_data(encrypted)
524
+
525
+ assert len(result["d"]) == 150
526
+
527
+
528
+ if __name__ == "__main__":
529
+ pytest.main([__file__, "-v"])