aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
authorEgor Tensin <Egor.Tensin@gmail.com>2016-02-13 03:51:02 +0300
committerEgor Tensin <Egor.Tensin@gmail.com>2016-02-13 03:51:02 +0300
commitd8785fa60477b6804f6a84074472eb5a8affe910 (patch)
treefb0105c6cc487a44f85e650747f44acf4c88a8f2
parenttest: code style (diff)
downloadaes-tools-d8785fa60477b6804f6a84074472eb5a8affe910.tar.gz
aes-tools-d8785fa60477b6804f6a84074472eb5a8affe910.zip
test: refactoring
-rw-r--r--test/cavp.py180
-rw-r--r--test/file.py192
-rw-r--r--test/nist-sp-800-38a.py2
3 files changed, 200 insertions, 174 deletions
diff --git a/test/cavp.py b/test/cavp.py
index fcc5341..88cc0ee 100644
--- a/test/cavp.py
+++ b/test/cavp.py
@@ -9,6 +9,7 @@ from enum import Enum
import logging
import os.path
import sys
+from tempfile import TemporaryDirectory
import zipfile
from toolkit import *
@@ -20,16 +21,6 @@ class _MultiOrderedDict(OrderedDict):
else:
super(OrderedDict, self).__setitem__(key, value)
-def _gen_inputs(keys, plaintexts, init_vectors):
- if init_vectors is None:
- init_vectors = [None for key in keys]
- for key, plaintext, iv in zip(keys, plaintexts, init_vectors):
- yield BlockInput(key, [plaintext], iv)
-
-def _split_into_chunks(expected_output, inputs, max_len=100):
- for i in range(0, len(inputs), max_len):
- yield expected_output[i:i+max_len], inputs[i:i+max_len]
-
def verify_test_output(actual, expected):
if len(actual) != len(expected):
logging.error('Unexpected output length {0} (expected {1})'.format(len(actual), len(expected)))
@@ -42,13 +33,14 @@ def verify_test_output(actual, expected):
class TestExitCode(Enum):
SUCCESS, FAILURE, ERROR, SKIPPED = range(1, 5)
-class _TestVectorsFile:
- def __init__(self, path, archive):
- self._archive = archive
+class TestFile:
+ def __init__(self, path):
self._path = path
- self._fn = os.path.split(path)[1]
self._recognized = False
- self._parse()
+ self._parse_path()
+ if not self.recognized():
+ return
+ self._parse_data()
def recognized(self):
return self._recognized
@@ -59,25 +51,40 @@ class _TestVectorsFile:
def mode(self):
return self._mode
- def parse(self):
- self._parser = configparser.ConfigParser(
+ def _parse_data_section(self, parser, section):
+ keys = parser.get(section, 'key')
+ plaintexts = parser.get(section, 'plaintext')
+ ciphertexts = parser.get(section, 'ciphertext')
+ init_vectors = None
+ if self.mode().requires_init_vector():
+ init_vectors = parser.get(section, 'iv')
+ return keys, plaintexts, ciphertexts, init_vectors
+
+ def _parse_data(self):
+ parser = configparser.ConfigParser(
dict_type=_MultiOrderedDict,
strict=False,
interpolation=None,
empty_lines_in_values=False)
- self._parser.read_string(self._archive.read(self._path).decode('utf-8'))
-
- def _extract_test_data(self, section):
- keys = self._parser.get(section, 'key')
- plaintexts = self._parser.get(section, 'plaintext')
- ciphertexts = self._parser.get(section, 'ciphertext')
- init_vectors = None
- if self.mode().requires_init_vector():
- init_vectors = self._parser.get(section, 'iv')
- return keys, plaintexts, ciphertexts, init_vectors
+ with open(self._path) as fd:
+ parser.read_string(fd.read())
+ self._encryption_data = self._parse_data_section(parser, 'ENCRYPT')
+ self._decryption_data = self._parse_data_section(parser, 'DECRYPT')
+
+ @staticmethod
+ def _gen_inputs(keys, plaintexts, init_vectors):
+ if init_vectors is None:
+ init_vectors = [None for key in keys]
+ for key, plaintext, iv in zip(keys, plaintexts, init_vectors):
+ yield BlockInput(key, [plaintext], iv)
+
+ @staticmethod
+ def _split_into_chunks(expected_output, inputs, max_len=100):
+ for i in range(0, len(inputs), max_len):
+ yield expected_output[i:i+max_len], inputs[i:i+max_len]
def _run_tests(self, tool, inputs, expected_output, use_boxes=False):
- for expected_output_chunk, input_chunk in _split_into_chunks(expected_output, list(inputs)):
+ for expected_output_chunk, input_chunk in self._split_into_chunks(expected_output, list(inputs)):
actual_output = tool(self.algorithm(), self.mode(), input_chunk, use_boxes=use_boxes)
if not verify_test_output(actual_output, expected_output_chunk):
return TestExitCode.FAILURE
@@ -85,19 +92,33 @@ class _TestVectorsFile:
def run_encryption_tests(self, tools, use_boxes=False):
logging.info('Running encryption tests...')
- keys, plaintexts, ciphertexts, init_vectors = self._extract_test_data('ENCRYPT')
- inputs = _gen_inputs(keys, plaintexts, init_vectors)
- return self._run_tests(tools.run_encrypt_block, inputs, ciphertexts, use_boxes)
+ if not self.recognized():
+ return TestExitCode.SKIPPED
+ try:
+ keys, plaintexts, ciphertexts, init_vectors = self._encryption_data
+ inputs = self._gen_inputs(keys, plaintexts, init_vectors)
+ return self._run_tests(tools.run_encrypt_block, inputs, ciphertexts, use_boxes)
+ except Exception as e:
+ logging.error('Encountered an exception!')
+ logging.exception(e)
+ return TestExitCode.ERROR
def run_decryption_tests(self, tools, use_boxes=False):
logging.info('Running decryption tests...')
- keys, plaintexts, ciphertexts, init_vectors = self._extract_test_data('DECRYPT')
- inputs = _gen_inputs(keys, ciphertexts, init_vectors)
- return self._run_tests(tools.run_decrypt_block, inputs, plaintexts, use_boxes)
-
- def _parse(self):
- logging.info('Trying to parse test vectors file name \'{0}\'...'.format(self._fn))
- stub = self._strip_extension(self._fn)
+ if not self.recognized():
+ return TestExitCode.SKIPPED
+ try:
+ keys, plaintexts, ciphertexts, init_vectors = self._decryption_data
+ inputs = self._gen_inputs(keys, ciphertexts, init_vectors)
+ return self._run_tests(tools.run_decrypt_block, inputs, plaintexts, use_boxes)
+ except Exception as e:
+ logging.error('Encountered an exception!')
+ logging.exception(e)
+ return TestExitCode.ERROR
+
+ def _parse_path(self):
+ logging.info('Trying to parse test file path \'{0}\'...'.format(self._path))
+ stub = self._strip_extension(os.path.basename(self._path))
if not stub: return
stub = self._strip_algorithm(stub)
if not stub: return
@@ -107,10 +128,12 @@ class _TestVectorsFile:
if not stub: return
self._recognized = True
- def _strip_extension(self, stub):
- stub, ext = os.path.splitext(stub)
- if ext != '.rsp':
- logging.warn('Unknown test vectors file extension \'{0}\'!'.format(self._fn))
+ _RECOGNIZED_EXT = '.rsp'
+
+ def _strip_extension(self, path):
+ stub, ext = os.path.splitext(path)
+ if ext != self._RECOGNIZED_EXT:
+ logging.warn('Unknown test vectors file extension \'{0}\'!'.format(self._path))
return None
return stub
@@ -118,65 +141,44 @@ class _TestVectorsFile:
key_size = stub[-3:]
maybe_algorithm = 'aes{0}'.format(key_size)
self._algorithm = Algorithm.try_parse(maybe_algorithm)
- if self._algorithm:
+ if self._algorithm is not None:
logging.info('\tAlgorithm: {0}'.format(self._algorithm))
return stub[0:-3]
else:
- logging.warn('Unknown or unsupported algorithm: ' + self._fn)
+ logging.warn('Unknown or unsupported algorithm: ' + self._path)
return None
+ _RECOGNIZED_METHODS = ('GFSbox', 'KeySbox', 'VarKey', 'VarTxt')
+
def _strip_method(self, stub):
- for method in ('GFSbox', 'KeySbox', 'VarKey', 'VarTxt'):
+ for method in self._RECOGNIZED_METHODS:
if stub.endswith(method):
logging.info('\tMethod: {0}'.format(method))
return stub[0:len(stub) - len(method)]
- logging.warn('Unknown or unsupported method: ' + self._fn)
+ logging.warn('Unknown or unsupported method: ' + self._path)
def _strip_mode(self, stub):
self._mode = Mode.try_parse(stub)
- if self._mode:
+ if self._mode is not None:
logging.info('\tMode: {0}'.format(self._mode))
return self._mode
else:
- logging.warn('Unknown or unsupported mode: ' + self._fn)
+ logging.warn('Unknown or unsupported mode: ' + self._path)
return None
-def _parse_archive_and_run_tests(tools, archive_path, use_boxes=False):
- archive = zipfile.ZipFile(archive_path)
- exit_codes = []
- for fn in archive.namelist():
- member = _TestVectorsFile(fn, archive)
- if member.recognized():
- member.parse()
- try:
- exit_codes.append(member.run_encryption_tests(tools, use_boxes))
- except Exception as e:
- logging.error('Encountered an exception!')
- logging.exception(e)
- exit_codes.append(TestExitCode.ERROR)
- try:
- exit_codes.append(member.run_decryption_tests(tools, use_boxes))
- except Exception as e:
- logging.error('Encountered an exception!')
- logging.exception(e)
- exit_codes.append(TestExitCode.ERROR)
- else:
- exit_codes.append(TestExitCode.SKIPPED)
- logging.info('Test exit codes:')
- logging.info('\tSkipped: {}'.format(exit_codes.count(TestExitCode.SKIPPED)))
- logging.info('\tError(s): {}'.format(exit_codes.count(TestExitCode.ERROR)))
- logging.info('\tSucceeded: {}'.format(exit_codes.count(TestExitCode.SUCCESS)))
- logging.info('\tFailed: {}'.format(exit_codes.count(TestExitCode.FAILURE)))
- if (exit_codes.count(TestExitCode.ERROR) == 0 and
- exit_codes.count(TestExitCode.FAILURE) == 0):
- sys.exit()
- else:
- sys.exit(1)
-
def _build_default_log_path():
return datetime.now().strftime('{}_%Y-%m-%d_%H-%M-%S.log').format(
os.path.splitext(os.path.basename(__file__))[0])
+class TestArchive(zipfile.ZipFile):
+ def __init__(self, path):
+ super().__init__(path)
+
+ def list_test_files(self):
+ with TemporaryDirectory() as tmp_dir:
+ for p in self.namelist():
+ yield TestFile(self.extract(p, tmp_dir))
+
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
@@ -197,4 +199,20 @@ if __name__ == '__main__':
level=logging.DEBUG)
tools = Tools(args.path, use_sde=args.sde)
- _parse_archive_and_run_tests(tools, args.archive, use_boxes=args.use_boxes)
+ archive = TestArchive(args.archive)
+ exit_codes = []
+
+ for test_file in archive.list_test_files():
+ exit_codes.append(test_file.run_encryption_tests(tools, args.use_boxes))
+ exit_codes.append(test_file.run_decryption_tests(tools, args.use_boxes))
+
+ logging.info('Test exit codes:')
+ logging.info('\tSkipped: {}'.format(exit_codes.count(TestExitCode.SKIPPED)))
+ logging.info('\tError(s): {}'.format(exit_codes.count(TestExitCode.ERROR)))
+ logging.info('\tSucceeded: {}'.format(exit_codes.count(TestExitCode.SUCCESS)))
+ logging.info('\tFailed: {}'.format(exit_codes.count(TestExitCode.FAILURE)))
+ if (exit_codes.count(TestExitCode.ERROR) == 0 and
+ exit_codes.count(TestExitCode.FAILURE) == 0):
+ sys.exit()
+ else:
+ sys.exit(1)
diff --git a/test/file.py b/test/file.py
index a918984..b64b828 100644
--- a/test/file.py
+++ b/test/file.py
@@ -22,36 +22,6 @@ _IV_EXT = 'iv'
_PLAIN_EXT = 'plain'
_CIPHER_EXT = 'cipher'
-def _run_encryption_test(tools, tmp_dir, algorithm, mode, key, plain_path, cipher_path, iv=None, force=False):
- logging.info('Running encryption test...')
- logging.info('\tPlaintext file path: ' + plain_path)
- logging.info('\tExpected ciphertext file path: ' + cipher_path)
- tmp_path = os.path.join(tmp_dir, os.path.basename(cipher_path))
- logging.info('\tEncrypted file path: ' + tmp_path)
- tools.run_encrypt_file(algorithm, mode, key, plain_path, tmp_path, iv)
- if force:
- logging.warn('Overwriting expected ciphertext file')
- shutil.copy(tmp_path, cipher_path)
- return TestExitCode.SKIPPED
- if filecmp.cmp(cipher_path, tmp_path):
- return TestExitCode.SUCCESS
- else:
- logging.error('The encrypted file doesn\'t match the ciphertext file')
- return TestExitCode.FAILURE
-
-def _run_decryption_test(tools, tmp_dir, algorithm, mode, key, cipher_path, plain_path, iv=None):
- logging.info('Running decryption test...')
- logging.info('\tCiphertext file path: ' + cipher_path)
- logging.info('\tExpected plaintext file path: ' + plain_path)
- tmp_path = os.path.join(tmp_dir, os.path.basename(plain_path))
- logging.info('\tDecrypted file path: ' + tmp_path)
- tools.run_decrypt_file(algorithm, mode, key, cipher_path, tmp_path, iv)
- if filecmp.cmp(tmp_path, plain_path):
- return TestExitCode.SUCCESS
- else:
- logging.error('The decrypted file doesn\'t match the plaintext file')
- return TestExitCode.FAILURE
-
def _list_dirs(root_path):
xs = map(lambda x: os.path.join(root_path, x), os.listdir(root_path))
return filter(os.path.isdir, xs)
@@ -79,77 +49,97 @@ def _extract_test_name(key_path):
def _replace_ext(path, new_ext):
return '{}.{}'.format(os.path.splitext(path)[0], new_ext)
-def _build_iv_path(key_path):
+def _extract_iv_path(key_path):
return _replace_ext(key_path, _IV_EXT)
-def _build_plain_path(key_path):
+def _extract_plain_path(key_path):
return _replace_ext(key_path, _PLAIN_EXT)
-def _build_cipher_path(key_path):
+def _extract_cipher_path(key_path):
return _replace_ext(key_path, _CIPHER_EXT)
-def _run_tests(tools, suite_dir, force=False):
- exit_codes = []
+class TestCase:
+ def __init__(self, algorithm, mode, key, plain_path, cipher_path, iv=None):
+ self.algorithm = algorithm
+ self.mode = mode
+ self.key = key
+ self.plain_path = plain_path
+ self.cipher_path = cipher_path
+ self.iv = iv
+
+ def run_encryption_test(self, tools, tmp_dir, force=False):
+ tmp_dir = os.path.join(tmp_dir, str(self.algorithm), str(self.mode))
+ os.makedirs(tmp_dir, 0o777, True)
+
+ logging.info('Running encryption test...')
+ logging.info('\tPlaintext file path: ' + self.plain_path)
+ logging.info('\tExpected ciphertext file path: ' + self.cipher_path)
+ tmp_path = os.path.join(tmp_dir, os.path.basename(self.cipher_path))
+ logging.info('\tEncrypted file path: ' + tmp_path)
+ logging.info('\tAlgorithm: {}'.format(self.algorithm))
+ logging.info('\tMode: {}'.format(self.mode))
+
+ tools.run_encrypt_file(self.algorithm, self.mode, self.key,
+ self.plain_path, tmp_path, self.iv)
+ if force:
+ logging.warn('Overwriting expected ciphertext file')
+ shutil.copy(tmp_path, self.cipher_path)
+ return TestExitCode.SKIPPED
+ if filecmp.cmp(self.cipher_path, tmp_path):
+ return TestExitCode.SUCCESS
+ else:
+ logging.error('The encrypted file doesn\'t match the ciphertext file')
+ return TestExitCode.FAILURE
+
+ def run_decryption_test(self, tools, tmp_dir):
+ tmp_dir = os.path.join(tmp_dir, str(self.algorithm), str(self.mode))
+ os.makedirs(tmp_dir, 0o777, True)
+
+ logging.info('Running decryption test...')
+ logging.info('\tCiphertext file path: ' + self.cipher_path)
+ logging.info('\tExpected plaintext file path: ' + self.plain_path)
+ tmp_path = os.path.join(tmp_dir, os.path.basename(self.plain_path))
+ logging.info('\tDecrypted file path: ' + tmp_path)
+ logging.info('\tAlgorithm: {}'.format(self.algorithm))
+ logging.info('\tMode: {}'.format(self.mode))
+
+ tools.run_decrypt_file(self.algorithm, self.mode, self.key,
+ self.cipher_path, tmp_path, self.iv)
+ if filecmp.cmp(tmp_path, self.plain_path):
+ return TestExitCode.SUCCESS
+ else:
+ logging.error('The decrypted file doesn\'t match the plaintext file')
+ return TestExitCode.FAILURE
+
+def list_test_cases(suite_dir):
suite_dir = os.path.abspath(suite_dir)
logging.info('Suite directory path: ' + suite_dir)
- with TemporaryDirectory() as tmp_dir:
- for algorithm_dir in _list_dirs(suite_dir):
- algorithm = os.path.basename(algorithm_dir)
- maybe_algorithm = Algorithm.try_parse(algorithm)
- if maybe_algorithm is None:
- logging.warn('Unknown or unsupported algorithm: ' + algorithm)
- exit_codes.append(TestExitCode.SKIPPED)
+ for algorithm_dir in _list_dirs(suite_dir):
+ algorithm = os.path.basename(algorithm_dir)
+ maybe_algorithm = Algorithm.try_parse(algorithm)
+ if maybe_algorithm is None:
+ logging.warn('Unknown or unsupported algorithm: ' + algorithm)
+ continue
+ algorithm = maybe_algorithm
+ for mode_dir in _list_dirs(algorithm_dir):
+ mode = os.path.basename(mode_dir)
+ maybe_mode = Mode.try_parse(mode)
+ if maybe_mode is None:
+ logging.warn('Unknown or unsupported mode: ' + mode)
continue
- algorithm = maybe_algorithm
- logging.info('Algorithm: {}'.format(algorithm))
- for mode_dir in _list_dirs(algorithm_dir):
- mode = os.path.basename(mode_dir)
- maybe_mode = Mode.try_parse(mode)
- if maybe_mode is None:
- logging.warn('Unknown or unsupported mode: ' + mode)
- exit_codes.append(TestExitCode.SKIPPED)
- continue
- mode = maybe_mode
- logging.info('Mode: {}'.format(mode))
- for key_path in _list_keys(mode_dir):
- key = _read_key(key_path)
- logging.info('Key: ' + key)
- test_name = _extract_test_name(key_path)
- logging.info('Test name: ' + test_name)
- iv = None
- if mode.requires_init_vector():
- iv_path = _build_iv_path(key_path)
- iv = _read_iv(iv_path)
- plain_path = _build_plain_path(key_path)
- cipher_path = _build_cipher_path(key_path)
- os.makedirs(os.path.join(tmp_dir, str(algorithm), str(mode)), 0o777, True)
- try:
- exit_codes.append(_run_encryption_test(
- tools, os.path.join(tmp_dir, str(algorithm), str(mode)),
- algorithm, mode, key, plain_path, cipher_path, iv, force))
- except Exception as e:
- logging.error('Encountered an exception!')
- logging.exception(e)
- exit_codes.append(TestExitCode.ERROR)
- if not force:
- try:
- exit_codes.append(_run_decryption_test(
- tools, os.path.join(tmp_dir, str(algorithm), str(mode)),
- algorithm, mode, key, cipher_path, plain_path, iv))
- except Exception as e:
- logging.error('Encountered an exception!')
- logging.exception(e)
- exit_codes.append(TestExitCode.ERROR)
- logging.info('Test exit codes:')
- logging.info('\tSkipped: {}'.format(exit_codes.count(TestExitCode.SKIPPED)))
- logging.info('\tError(s): {}'.format(exit_codes.count(TestExitCode.ERROR)))
- logging.info('\tSucceeded: {}'.format(exit_codes.count(TestExitCode.SUCCESS)))
- logging.info('\tFailed: {}'.format(exit_codes.count(TestExitCode.FAILURE)))
- if (exit_codes.count(TestExitCode.ERROR) == 0 and
- exit_codes.count(TestExitCode.FAILURE) == 0):
- sys.exit()
- else:
- sys.exit(1)
+ mode = maybe_mode
+ for key_path in _list_keys(mode_dir):
+ key = _read_key(key_path)
+ logging.info('Key: ' + key)
+ test_name = _extract_test_name(key_path)
+ logging.info('Test name: ' + test_name)
+ iv = None
+ if mode.requires_init_vector():
+ iv_path = _extract_iv_path(key_path)
+ iv = _read_iv(iv_path)
+ plain_path = _extract_plain_path(key_path)
+ cipher_path = _extract_cipher_path(key_path)
+ yield TestCase(algorithm, mode, key, plain_path, cipher_path, iv)
def _build_default_log_path():
return datetime.now().strftime('{}_%Y-%m-%d_%H-%M-%S.log').format(
@@ -175,4 +165,20 @@ if __name__ == '__main__':
level=logging.DEBUG)
tools = Tools(args.path, use_sde=args.sde)
- _run_tests(tools, args.suite, args.force)
+
+ exit_codes = []
+ with TemporaryDirectory() as tmp_dir:
+ for test_case in list_test_cases(args.suite):
+ exit_codes.append(test_case.run_encryption_test(tools, tmp_dir, args.force))
+ exit_codes.append(test_case.run_decryption_test(tools, tmp_dir))
+
+ logging.info('Test exit codes:')
+ logging.info('\tSkipped: {}'.format(exit_codes.count(TestExitCode.SKIPPED)))
+ logging.info('\tError(s): {}'.format(exit_codes.count(TestExitCode.ERROR)))
+ logging.info('\tSucceeded: {}'.format(exit_codes.count(TestExitCode.SUCCESS)))
+ logging.info('\tFailed: {}'.format(exit_codes.count(TestExitCode.FAILURE)))
+ if (exit_codes.count(TestExitCode.ERROR) == 0 and
+ exit_codes.count(TestExitCode.FAILURE) == 0):
+ sys.exit()
+ else:
+ sys.exit(1)
diff --git a/test/nist-sp-800-38a.py b/test/nist-sp-800-38a.py
index 1bbd3bc..119ddb9 100644
--- a/test/nist-sp-800-38a.py
+++ b/test/nist-sp-800-38a.py
@@ -172,6 +172,7 @@ def run_encryption_test(tools, algorithm, mode, use_boxes=False):
logging.info('Running encryption test...')
logging.info('Algorithm: {}'.format(algorithm))
logging.info('Mode: {}'.format(mode))
+
plaintexts = get_test_plaintexts(algorithm, mode)
key = get_test_key(algorithm, mode)
iv = get_test_iv(algorithm, mode)
@@ -187,6 +188,7 @@ def run_decryption_test(tools, algorithm, mode, use_boxes=False):
logging.info('Running decryption test...')
logging.info('Algorithm: {}'.format(algorithm))
logging.info('Mode: {}'.format(mode))
+
ciphertexts = get_test_ciphertexts(algorithm, mode)
key = get_test_key(algorithm, mode)
iv = get_test_iv(algorithm, mode)