apigateway / encryption.py
jebin2's picture
Initial commit: FastAPI URL Blink application
c05ab2d
raw
history blame
4.42 kB
"""
RSA Decryption utilities for the URL Blink application.
"""
import base64
import json
import os
import logging
from typing import Any, Optional
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.backends import default_backend
logger = logging.getLogger(__name__)
# Path to the private key file
PRIVATE_KEY_PATH = os.getenv("PRIVATE_KEY_PATH", "./PRIVATE_KEY.pem")
# Cache the private key after first load
_private_key = None
def load_private_key():
"""
Load the RSA private key from the PEM file.
Caches the key for subsequent calls.
Returns:
RSA private key object
Raises:
FileNotFoundError: If private key file doesn't exist
ValueError: If private key is invalid
"""
global _private_key
if _private_key is not None:
return _private_key
try:
with open(PRIVATE_KEY_PATH, "rb") as key_file:
_private_key = serialization.load_pem_private_key(
key_file.read(),
password=None,
backend=default_backend()
)
logger.info(f"Successfully loaded private key from {PRIVATE_KEY_PATH}")
return _private_key
except FileNotFoundError:
logger.error(f"Private key file not found: {PRIVATE_KEY_PATH}")
raise
except Exception as e:
logger.error(f"Failed to load private key: {e}")
raise ValueError(f"Invalid private key: {e}")
def decrypt_data(encrypted_base64: str) -> Optional[Any]:
"""
Decrypt base64-encoded RSA encrypted data.
Args:
encrypted_base64: Base64 URL-safe encoded encrypted string
Returns:
Decrypted data parsed as JSON, or raw string if not valid JSON
Raises:
ValueError: If decryption fails
"""
try:
# Load the private key
private_key = load_private_key()
# Decode base64 URL-safe encoded data
# Add padding if necessary
padded = encrypted_base64 + '=' * (4 - len(encrypted_base64) % 4)
encrypted_bytes = base64.urlsafe_b64decode(padded)
# Decrypt using RSA OAEP with SHA256
decrypted_bytes = private_key.decrypt(
encrypted_bytes,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
# Decode to string
decrypted_str = decrypted_bytes.decode('utf-8')
# Try to parse as JSON
try:
return json.loads(decrypted_str)
except json.JSONDecodeError:
# Return as raw string if not valid JSON
logger.warning("Decrypted data is not valid JSON, returning raw string")
return {"raw_data": decrypted_str}
except Exception as e:
logger.error(f"Decryption failed: {e}")
raise ValueError(f"Failed to decrypt data: {e}")
def decrypt_multiple_blocks(encrypted_data: str, block_size: int = 344) -> list[Any]:
"""
Decrypt multiple concatenated encrypted blocks.
RSA 2048-bit encrypted data is typically 256 bytes, which is ~344 chars in base64.
Args:
encrypted_data: Concatenated base64-encoded encrypted blocks
block_size: Size of each encrypted block in base64 chars (default 344 for RSA-2048)
Returns:
List of decrypted data objects
"""
results = []
# If the data is smaller than block_size, treat as single block
if len(encrypted_data) <= block_size:
try:
result = decrypt_data(encrypted_data)
if result:
results.append(result)
except Exception as e:
logger.error(f"Failed to decrypt single block: {e}")
return results
# Split into blocks and decrypt each
for i in range(0, len(encrypted_data), block_size):
block = encrypted_data[i:i + block_size]
if block: # Skip empty blocks
try:
result = decrypt_data(block)
if result:
results.append(result)
except Exception as e:
logger.error(f"Failed to decrypt block {i // block_size}: {e}")
continue
return results