import os from urllib.parse import urlparse, urljoin from flask import request from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes def is_safe_url(target): ref_url = urlparse(request.host_url) test_url = urlparse(urljoin(request.host_url, target)) return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc def encrypt_stream(input_stream, output_stream, key, nonce): """ Encrypts via AES-GCM. NOTE: In GCM streaming, the Tag is only generated at the VERY end. """ encryptor = Cipher(algorithms.AES(key), modes.GCM(nonce)).encryptor() chunk_size = 64 * 1024 while True: chunk = input_stream.read(chunk_size) if not chunk: break output_stream.write(encryptor.update(chunk)) encryptor.finalize() # Write the 16-byte authentication tag at the end of the file output_stream.write(encryptor.tag) def decrypt_stream(input_stream, output_stream, key, nonce, file_size): """ Decrypts AES-GCM stream. Expects the last 16 bytes of the file to be the authentication tag. """ # GCM Tag is always 16 bytes tag_size = 16 payload_size = file_size - tag_size input_stream.seek(payload_size) tag = input_stream.read(tag_size) input_stream.seek(0) decryptor = Cipher(algorithms.AES(key), modes.GCM(nonce, tag)).decryptor() chunk_size = 64 * 1024 read_so_far = 0 while read_so_far < payload_size: to_read = min(chunk_size, payload_size - read_so_far) chunk = input_stream.read(to_read) if not chunk: break output_stream.write(decryptor.update(chunk)) read_so_far += len(chunk) decryptor.finalize()