|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import re |
|
|
import unittest |
|
|
import warnings |
|
|
from binascii import unhexlify, hexlify |
|
|
|
|
|
from Crypto.Util.py3compat import tobytes |
|
|
from Crypto.Util.strxor import strxor_c |
|
|
from Crypto.SelfTest.st_common import list_test_cases |
|
|
|
|
|
from Crypto.Hash import BLAKE2b, BLAKE2s |
|
|
|
|
|
|
|
|
class Blake2Test(unittest.TestCase): |
|
|
|
|
|
def test_new_positive(self): |
|
|
|
|
|
h = self.BLAKE2.new(digest_bits=self.max_bits) |
|
|
for new_func in self.BLAKE2.new, h.new: |
|
|
|
|
|
for dbits in range(8, self.max_bits + 1, 8): |
|
|
hobj = new_func(digest_bits=dbits) |
|
|
self.assertEqual(hobj.digest_size, dbits // 8) |
|
|
|
|
|
for dbytes in range(1, self.max_bytes + 1): |
|
|
hobj = new_func(digest_bytes=dbytes) |
|
|
self.assertEqual(hobj.digest_size, dbytes) |
|
|
|
|
|
digest1 = new_func(data=b"\x90", digest_bytes=self.max_bytes).digest() |
|
|
digest2 = new_func(digest_bytes=self.max_bytes).update(b"\x90").digest() |
|
|
self.assertEqual(digest1, digest2) |
|
|
|
|
|
new_func(data=b"A", key=b"5", digest_bytes=self.max_bytes) |
|
|
|
|
|
hobj = h.new() |
|
|
self.assertEqual(hobj.digest_size, self.max_bytes) |
|
|
|
|
|
def test_new_negative(self): |
|
|
|
|
|
h = self.BLAKE2.new(digest_bits=self.max_bits) |
|
|
for new_func in self.BLAKE2.new, h.new: |
|
|
self.assertRaises(TypeError, new_func, |
|
|
digest_bytes=self.max_bytes, |
|
|
digest_bits=self.max_bits) |
|
|
self.assertRaises(ValueError, new_func, digest_bytes=0) |
|
|
self.assertRaises(ValueError, new_func, |
|
|
digest_bytes=self.max_bytes + 1) |
|
|
self.assertRaises(ValueError, new_func, digest_bits=7) |
|
|
self.assertRaises(ValueError, new_func, digest_bits=15) |
|
|
self.assertRaises(ValueError, new_func, |
|
|
digest_bits=self.max_bits + 1) |
|
|
self.assertRaises(TypeError, new_func, |
|
|
digest_bytes=self.max_bytes, |
|
|
key=u"string") |
|
|
self.assertRaises(TypeError, new_func, |
|
|
digest_bytes=self.max_bytes, |
|
|
data=u"string") |
|
|
|
|
|
def test_default_digest_size(self): |
|
|
digest = self.BLAKE2.new(data=b'abc').digest() |
|
|
self.assertEqual(len(digest), self.max_bytes) |
|
|
|
|
|
def test_update(self): |
|
|
pieces = [b"\x0A" * 200, b"\x14" * 300] |
|
|
h = self.BLAKE2.new(digest_bytes=self.max_bytes) |
|
|
h.update(pieces[0]).update(pieces[1]) |
|
|
digest = h.digest() |
|
|
h = self.BLAKE2.new(digest_bytes=self.max_bytes) |
|
|
h.update(pieces[0] + pieces[1]) |
|
|
self.assertEqual(h.digest(), digest) |
|
|
|
|
|
def test_update_negative(self): |
|
|
h = self.BLAKE2.new(digest_bytes=self.max_bytes) |
|
|
self.assertRaises(TypeError, h.update, u"string") |
|
|
|
|
|
def test_digest(self): |
|
|
h = self.BLAKE2.new(digest_bytes=self.max_bytes) |
|
|
digest = h.digest() |
|
|
|
|
|
|
|
|
self.assertEqual(h.digest(), digest) |
|
|
|
|
|
self.assertTrue(isinstance(digest, type(b"digest"))) |
|
|
|
|
|
def test_update_after_digest(self): |
|
|
msg = b"rrrrttt" |
|
|
|
|
|
|
|
|
h = self.BLAKE2.new(digest_bits=256, data=msg[:4]) |
|
|
dig1 = h.digest() |
|
|
self.assertRaises(TypeError, h.update, msg[4:]) |
|
|
dig2 = self.BLAKE2.new(digest_bits=256, data=msg).digest() |
|
|
|
|
|
|
|
|
h = self.BLAKE2.new(digest_bits=256, data=msg[:4], update_after_digest=True) |
|
|
self.assertEqual(h.digest(), dig1) |
|
|
|
|
|
|
|
|
h.update(msg[4:]) |
|
|
self.assertEqual(h.digest(), dig2) |
|
|
|
|
|
def test_hex_digest(self): |
|
|
mac = self.BLAKE2.new(digest_bits=self.max_bits) |
|
|
digest = mac.digest() |
|
|
hexdigest = mac.hexdigest() |
|
|
|
|
|
|
|
|
self.assertEqual(hexlify(digest), tobytes(hexdigest)) |
|
|
|
|
|
self.assertEqual(mac.hexdigest(), hexdigest) |
|
|
|
|
|
self.assertTrue(isinstance(hexdigest, type("digest"))) |
|
|
|
|
|
def test_verify(self): |
|
|
h = self.BLAKE2.new(digest_bytes=self.max_bytes, key=b"4") |
|
|
mac = h.digest() |
|
|
h.verify(mac) |
|
|
wrong_mac = strxor_c(mac, 255) |
|
|
self.assertRaises(ValueError, h.verify, wrong_mac) |
|
|
|
|
|
def test_hexverify(self): |
|
|
h = self.BLAKE2.new(digest_bytes=self.max_bytes, key=b"4") |
|
|
mac = h.hexdigest() |
|
|
h.hexverify(mac) |
|
|
self.assertRaises(ValueError, h.hexverify, "4556") |
|
|
|
|
|
def test_oid(self): |
|
|
|
|
|
prefix = "1.3.6.1.4.1.1722.12.2." + self.oid_variant + "." |
|
|
|
|
|
for digest_bits in self.digest_bits_oid: |
|
|
h = self.BLAKE2.new(digest_bits=digest_bits) |
|
|
self.assertEqual(h.oid, prefix + str(digest_bits // 8)) |
|
|
|
|
|
h = self.BLAKE2.new(digest_bits=digest_bits, key=b"secret") |
|
|
self.assertRaises(AttributeError, lambda: h.oid) |
|
|
|
|
|
for digest_bits in (8, self.max_bits): |
|
|
if digest_bits in self.digest_bits_oid: |
|
|
continue |
|
|
self.assertRaises(AttributeError, lambda: h.oid) |
|
|
|
|
|
def test_bytearray(self): |
|
|
|
|
|
key = b'0' * 16 |
|
|
data = b"\x00\x01\x02" |
|
|
|
|
|
|
|
|
key_ba = bytearray(key) |
|
|
data_ba = bytearray(data) |
|
|
|
|
|
h1 = self.BLAKE2.new(data=data, key=key) |
|
|
h2 = self.BLAKE2.new(data=data_ba, key=key_ba) |
|
|
key_ba[:1] = b'\xFF' |
|
|
data_ba[:1] = b'\xFF' |
|
|
|
|
|
self.assertEqual(h1.digest(), h2.digest()) |
|
|
|
|
|
|
|
|
data_ba = bytearray(data) |
|
|
|
|
|
h1 = self.BLAKE2.new() |
|
|
h2 = self.BLAKE2.new() |
|
|
h1.update(data) |
|
|
h2.update(data_ba) |
|
|
data_ba[:1] = b'\xFF' |
|
|
|
|
|
self.assertEqual(h1.digest(), h2.digest()) |
|
|
|
|
|
def test_memoryview(self): |
|
|
|
|
|
key = b'0' * 16 |
|
|
data = b"\x00\x01\x02" |
|
|
|
|
|
def get_mv_ro(data): |
|
|
return memoryview(data) |
|
|
|
|
|
def get_mv_rw(data): |
|
|
return memoryview(bytearray(data)) |
|
|
|
|
|
for get_mv in (get_mv_ro, get_mv_rw): |
|
|
|
|
|
|
|
|
key_mv = get_mv(key) |
|
|
data_mv = get_mv(data) |
|
|
|
|
|
h1 = self.BLAKE2.new(data=data, key=key) |
|
|
h2 = self.BLAKE2.new(data=data_mv, key=key_mv) |
|
|
if not data_mv.readonly: |
|
|
data_mv[:1] = b'\xFF' |
|
|
key_mv[:1] = b'\xFF' |
|
|
|
|
|
self.assertEqual(h1.digest(), h2.digest()) |
|
|
|
|
|
|
|
|
data_mv = get_mv(data) |
|
|
|
|
|
h1 = self.BLAKE2.new() |
|
|
h2 = self.BLAKE2.new() |
|
|
h1.update(data) |
|
|
h2.update(data_mv) |
|
|
if not data_mv.readonly: |
|
|
data_mv[:1] = b'\xFF' |
|
|
|
|
|
self.assertEqual(h1.digest(), h2.digest()) |
|
|
|
|
|
|
|
|
class Blake2bTest(Blake2Test): |
|
|
|
|
|
BLAKE2 = BLAKE2b |
|
|
|
|
|
max_bits = 512 |
|
|
|
|
|
max_bytes = 64 |
|
|
|
|
|
digest_bits_oid = (160, 256, 384, 512) |
|
|
|
|
|
oid_variant = "1" |
|
|
|
|
|
|
|
|
class Blake2sTest(Blake2Test): |
|
|
|
|
|
BLAKE2 = BLAKE2s |
|
|
|
|
|
max_bits = 256 |
|
|
|
|
|
max_bytes = 32 |
|
|
|
|
|
digest_bits_oid = (128, 160, 224, 256) |
|
|
|
|
|
oid_variant = "2" |
|
|
|
|
|
|
|
|
class Blake2OfficialTestVector(unittest.TestCase): |
|
|
|
|
|
def _load_tests(self, test_vector_file): |
|
|
expected = "in" |
|
|
test_vectors = [] |
|
|
with open(test_vector_file, "rt") as test_vector_fd: |
|
|
for line_number, line in enumerate(test_vector_fd): |
|
|
|
|
|
if line.strip() == "" or line.startswith("#"): |
|
|
continue |
|
|
|
|
|
res = re.match("%s:\t([0-9A-Fa-f]*)" % expected, line) |
|
|
if not res: |
|
|
raise ValueError("Incorrect test vector format (line %d)" |
|
|
% line_number) |
|
|
|
|
|
if res.group(1): |
|
|
bin_value = unhexlify(tobytes(res.group(1))) |
|
|
else: |
|
|
bin_value = b"" |
|
|
if expected == "in": |
|
|
input_data = bin_value |
|
|
expected = "key" |
|
|
elif expected == "key": |
|
|
key = bin_value |
|
|
expected = "hash" |
|
|
else: |
|
|
result = bin_value |
|
|
expected = "in" |
|
|
test_vectors.append((input_data, key, result)) |
|
|
return test_vectors |
|
|
|
|
|
def setUp(self): |
|
|
|
|
|
dir_comps = ("Hash", self.name) |
|
|
file_name = self.name.lower() + "-test.txt" |
|
|
self.description = "%s tests" % self.name |
|
|
|
|
|
try: |
|
|
import pycryptodome_test_vectors |
|
|
except ImportError: |
|
|
warnings.warn("Warning: skipping extended tests for %s" % self.name, |
|
|
UserWarning) |
|
|
self.test_vectors = [] |
|
|
return |
|
|
|
|
|
init_dir = os.path.dirname(pycryptodome_test_vectors.__file__) |
|
|
full_file_name = os.path.join(os.path.join(init_dir, *dir_comps), file_name) |
|
|
self.test_vectors = self._load_tests(full_file_name) |
|
|
|
|
|
def runTest(self): |
|
|
for (input_data, key, result) in self.test_vectors: |
|
|
mac = self.BLAKE2.new(key=key, digest_bytes=self.max_bytes) |
|
|
mac.update(input_data) |
|
|
self.assertEqual(mac.digest(), result) |
|
|
|
|
|
|
|
|
class Blake2bOfficialTestVector(Blake2OfficialTestVector): |
|
|
|
|
|
BLAKE2 = BLAKE2b |
|
|
|
|
|
name = "BLAKE2b" |
|
|
|
|
|
max_bytes = 64 |
|
|
|
|
|
|
|
|
class Blake2sOfficialTestVector(Blake2OfficialTestVector): |
|
|
|
|
|
BLAKE2 = BLAKE2s |
|
|
|
|
|
name = "BLAKE2s" |
|
|
|
|
|
max_bytes = 32 |
|
|
|
|
|
|
|
|
class Blake2TestVector1(unittest.TestCase): |
|
|
|
|
|
def _load_tests(self, test_vector_file): |
|
|
test_vectors = [] |
|
|
with open(test_vector_file, "rt") as test_vector_fd: |
|
|
for line_number, line in enumerate(test_vector_fd): |
|
|
if line.strip() == "" or line.startswith("#"): |
|
|
continue |
|
|
res = re.match("digest: ([0-9A-Fa-f]*)", line) |
|
|
if not res: |
|
|
raise ValueError("Incorrect test vector format (line %d)" |
|
|
% line_number) |
|
|
|
|
|
test_vectors.append(unhexlify(tobytes(res.group(1)))) |
|
|
return test_vectors |
|
|
|
|
|
def setUp(self): |
|
|
dir_comps = ("Hash", self.name) |
|
|
file_name = "tv1.txt" |
|
|
self.description = "%s tests" % self.name |
|
|
|
|
|
try: |
|
|
import pycryptodome_test_vectors |
|
|
except ImportError: |
|
|
warnings.warn("Warning: skipping extended tests for %s" % self.name, |
|
|
UserWarning) |
|
|
self.test_vectors = [] |
|
|
return |
|
|
|
|
|
init_dir = os.path.dirname(pycryptodome_test_vectors.__file__) |
|
|
full_file_name = os.path.join(os.path.join(init_dir, *dir_comps), file_name) |
|
|
self.test_vectors = self._load_tests(full_file_name) |
|
|
|
|
|
def runTest(self): |
|
|
|
|
|
for tv in self.test_vectors: |
|
|
digest_bytes = len(tv) |
|
|
next_data = b"" |
|
|
for _ in range(100): |
|
|
h = self.BLAKE2.new(digest_bytes=digest_bytes) |
|
|
h.update(next_data) |
|
|
next_data = h.digest() + next_data |
|
|
self.assertEqual(h.digest(), tv) |
|
|
|
|
|
|
|
|
class Blake2bTestVector1(Blake2TestVector1): |
|
|
|
|
|
BLAKE2 = BLAKE2b |
|
|
|
|
|
name = "BLAKE2b" |
|
|
|
|
|
|
|
|
class Blake2sTestVector1(Blake2TestVector1): |
|
|
|
|
|
BLAKE2 = BLAKE2s |
|
|
|
|
|
name = "BLAKE2s" |
|
|
|
|
|
|
|
|
class Blake2TestVector2(unittest.TestCase): |
|
|
|
|
|
def _load_tests(self, test_vector_file): |
|
|
test_vectors = [] |
|
|
with open(test_vector_file, "rt") as test_vector_fd: |
|
|
for line_number, line in enumerate(test_vector_fd): |
|
|
if line.strip() == "" or line.startswith("#"): |
|
|
continue |
|
|
res = re.match(r"digest\(([0-9]+)\): ([0-9A-Fa-f]*)", line) |
|
|
if not res: |
|
|
raise ValueError("Incorrect test vector format (line %d)" |
|
|
% line_number) |
|
|
key_size = int(res.group(1)) |
|
|
result = unhexlify(tobytes(res.group(2))) |
|
|
test_vectors.append((key_size, result)) |
|
|
return test_vectors |
|
|
|
|
|
def setUp(self): |
|
|
dir_comps = ("Hash", self.name) |
|
|
file_name = "tv2.txt" |
|
|
self.description = "%s tests" % self.name |
|
|
|
|
|
try: |
|
|
import pycryptodome_test_vectors |
|
|
except ImportError: |
|
|
warnings.warn("Warning: skipping extended tests for %s" % self.name, |
|
|
UserWarning) |
|
|
self.test_vectors = [] |
|
|
return |
|
|
|
|
|
init_dir = os.path.dirname(pycryptodome_test_vectors.__file__) |
|
|
full_file_name = os.path.join(os.path.join(init_dir, *dir_comps), file_name) |
|
|
self.test_vectors = self._load_tests(full_file_name) |
|
|
|
|
|
def runTest(self): |
|
|
|
|
|
for key_size, result in self.test_vectors: |
|
|
next_data = b"" |
|
|
for _ in range(100): |
|
|
h = self.BLAKE2.new(digest_bytes=self.max_bytes, |
|
|
key=b"A" * key_size) |
|
|
h.update(next_data) |
|
|
next_data = h.digest() + next_data |
|
|
self.assertEqual(h.digest(), result) |
|
|
|
|
|
|
|
|
class Blake2bTestVector2(Blake2TestVector1): |
|
|
|
|
|
BLAKE2 = BLAKE2b |
|
|
|
|
|
name = "BLAKE2b" |
|
|
|
|
|
max_bytes = 64 |
|
|
|
|
|
|
|
|
class Blake2sTestVector2(Blake2TestVector1): |
|
|
|
|
|
BLAKE2 = BLAKE2s |
|
|
|
|
|
name = "BLAKE2s" |
|
|
|
|
|
max_bytes = 32 |
|
|
|
|
|
|
|
|
def get_tests(config={}): |
|
|
tests = [] |
|
|
|
|
|
tests += list_test_cases(Blake2bTest) |
|
|
tests.append(Blake2bOfficialTestVector()) |
|
|
tests.append(Blake2bTestVector1()) |
|
|
tests.append(Blake2bTestVector2()) |
|
|
|
|
|
tests += list_test_cases(Blake2sTest) |
|
|
tests.append(Blake2sOfficialTestVector()) |
|
|
tests.append(Blake2sTestVector1()) |
|
|
tests.append(Blake2sTestVector2()) |
|
|
|
|
|
return tests |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
import unittest |
|
|
def suite(): |
|
|
return unittest.TestSuite(get_tests()) |
|
|
unittest.main(defaultTest='suite') |
|
|
|