Spaces:
Sleeping
Sleeping
| """ | |
| RSA Decryption utilities for the URL Blink application. | |
| """ | |
| import base64 | |
| import json | |
| import os | |
| import logging | |
| from typing import Any, Optional | |
| from cryptography.hazmat.primitives import serialization, hashes | |
| from cryptography.hazmat.primitives.asymmetric import padding | |
| from cryptography.hazmat.backends import default_backend | |
| logger = logging.getLogger(__name__) | |
| # Path to the private key file | |
| PRIVATE_KEY_PATH = os.getenv("PRIVATE_KEY_PATH", "./PRIVATE_KEY.pem") | |
| # Cache the private key after first load | |
| _private_key = None | |
| def load_private_key(): | |
| """ | |
| Load the RSA private key from the PEM file. | |
| Caches the key for subsequent calls. | |
| Returns: | |
| RSA private key object | |
| Raises: | |
| FileNotFoundError: If private key file doesn't exist | |
| ValueError: If private key is invalid | |
| """ | |
| global _private_key | |
| if _private_key is not None: | |
| return _private_key | |
| try: | |
| with open(PRIVATE_KEY_PATH, "rb") as key_file: | |
| _private_key = serialization.load_pem_private_key( | |
| key_file.read(), | |
| password=None, | |
| backend=default_backend() | |
| ) | |
| logger.info(f"Successfully loaded private key from {PRIVATE_KEY_PATH}") | |
| return _private_key | |
| except FileNotFoundError: | |
| logger.error(f"Private key file not found: {PRIVATE_KEY_PATH}") | |
| raise | |
| except Exception as e: | |
| logger.error(f"Failed to load private key: {e}") | |
| raise ValueError(f"Invalid private key: {e}") | |
| def decrypt_data(encrypted_base64: str) -> Optional[Any]: | |
| """ | |
| Decrypt base64-encoded RSA encrypted data. | |
| Args: | |
| encrypted_base64: Base64 URL-safe encoded encrypted string | |
| Returns: | |
| Decrypted data parsed as JSON, or raw string if not valid JSON | |
| Raises: | |
| ValueError: If decryption fails | |
| """ | |
| try: | |
| # Load the private key | |
| private_key = load_private_key() | |
| # Decode base64 URL-safe encoded data | |
| # Add padding if necessary | |
| padded = encrypted_base64 + '=' * (4 - len(encrypted_base64) % 4) | |
| encrypted_bytes = base64.urlsafe_b64decode(padded) | |
| # Decrypt using RSA OAEP with SHA256 | |
| decrypted_bytes = private_key.decrypt( | |
| encrypted_bytes, | |
| padding.OAEP( | |
| mgf=padding.MGF1(algorithm=hashes.SHA256()), | |
| algorithm=hashes.SHA256(), | |
| label=None | |
| ) | |
| ) | |
| # Decode to string | |
| decrypted_str = decrypted_bytes.decode('utf-8') | |
| # Try to parse as JSON | |
| try: | |
| return json.loads(decrypted_str) | |
| except json.JSONDecodeError: | |
| # Return as raw string if not valid JSON | |
| logger.warning("Decrypted data is not valid JSON, returning raw string") | |
| return {"raw_data": decrypted_str} | |
| except Exception as e: | |
| logger.error(f"Decryption failed: {e}") | |
| raise ValueError(f"Failed to decrypt data: {e}") | |
| def decrypt_multiple_blocks(encrypted_data: str, block_size: int = 344) -> list[Any]: | |
| """ | |
| Decrypt multiple concatenated encrypted blocks. | |
| RSA 2048-bit encrypted data is typically 256 bytes, which is ~344 chars in base64. | |
| Args: | |
| encrypted_data: Concatenated base64-encoded encrypted blocks | |
| block_size: Size of each encrypted block in base64 chars (default 344 for RSA-2048) | |
| Returns: | |
| List of decrypted data objects | |
| """ | |
| results = [] | |
| # If the data is smaller than block_size, treat as single block | |
| if len(encrypted_data) <= block_size: | |
| try: | |
| result = decrypt_data(encrypted_data) | |
| if result: | |
| results.append(result) | |
| except Exception as e: | |
| logger.error(f"Failed to decrypt single block: {e}") | |
| return results | |
| # Split into blocks and decrypt each | |
| for i in range(0, len(encrypted_data), block_size): | |
| block = encrypted_data[i:i + block_size] | |
| if block: # Skip empty blocks | |
| try: | |
| result = decrypt_data(block) | |
| if result: | |
| results.append(result) | |
| except Exception as e: | |
| logger.error(f"Failed to decrypt block {i // block_size}: {e}") | |
| continue | |
| return results | |