From d8785fa60477b6804f6a84074472eb5a8affe910 Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Sat, 13 Feb 2016 03:51:02 +0300 Subject: test: refactoring --- test/cavp.py | 180 +++++++++++++++++++++++++-------------------- test/file.py | 192 +++++++++++++++++++++++++----------------------- test/nist-sp-800-38a.py | 2 + 3 files changed, 200 insertions(+), 174 deletions(-) (limited to 'test') diff --git a/test/cavp.py b/test/cavp.py index fcc5341..88cc0ee 100644 --- a/test/cavp.py +++ b/test/cavp.py @@ -9,6 +9,7 @@ from enum import Enum import logging import os.path import sys +from tempfile import TemporaryDirectory import zipfile from toolkit import * @@ -20,16 +21,6 @@ class _MultiOrderedDict(OrderedDict): else: super(OrderedDict, self).__setitem__(key, value) -def _gen_inputs(keys, plaintexts, init_vectors): - if init_vectors is None: - init_vectors = [None for key in keys] - for key, plaintext, iv in zip(keys, plaintexts, init_vectors): - yield BlockInput(key, [plaintext], iv) - -def _split_into_chunks(expected_output, inputs, max_len=100): - for i in range(0, len(inputs), max_len): - yield expected_output[i:i+max_len], inputs[i:i+max_len] - def verify_test_output(actual, expected): if len(actual) != len(expected): logging.error('Unexpected output length {0} (expected {1})'.format(len(actual), len(expected))) @@ -42,13 +33,14 @@ def verify_test_output(actual, expected): class TestExitCode(Enum): SUCCESS, FAILURE, ERROR, SKIPPED = range(1, 5) -class _TestVectorsFile: - def __init__(self, path, archive): - self._archive = archive +class TestFile: + def __init__(self, path): self._path = path - self._fn = os.path.split(path)[1] self._recognized = False - self._parse() + self._parse_path() + if not self.recognized(): + return + self._parse_data() def recognized(self): return self._recognized @@ -59,25 +51,40 @@ class _TestVectorsFile: def mode(self): return self._mode - def parse(self): - self._parser = configparser.ConfigParser( + def _parse_data_section(self, parser, section): + keys = parser.get(section, 'key') + plaintexts = parser.get(section, 'plaintext') + ciphertexts = parser.get(section, 'ciphertext') + init_vectors = None + if self.mode().requires_init_vector(): + init_vectors = parser.get(section, 'iv') + return keys, plaintexts, ciphertexts, init_vectors + + def _parse_data(self): + parser = configparser.ConfigParser( dict_type=_MultiOrderedDict, strict=False, interpolation=None, empty_lines_in_values=False) - self._parser.read_string(self._archive.read(self._path).decode('utf-8')) - - def _extract_test_data(self, section): - keys = self._parser.get(section, 'key') - plaintexts = self._parser.get(section, 'plaintext') - ciphertexts = self._parser.get(section, 'ciphertext') - init_vectors = None - if self.mode().requires_init_vector(): - init_vectors = self._parser.get(section, 'iv') - return keys, plaintexts, ciphertexts, init_vectors + with open(self._path) as fd: + parser.read_string(fd.read()) + self._encryption_data = self._parse_data_section(parser, 'ENCRYPT') + self._decryption_data = self._parse_data_section(parser, 'DECRYPT') + + @staticmethod + def _gen_inputs(keys, plaintexts, init_vectors): + if init_vectors is None: + init_vectors = [None for key in keys] + for key, plaintext, iv in zip(keys, plaintexts, init_vectors): + yield BlockInput(key, [plaintext], iv) + + @staticmethod + def _split_into_chunks(expected_output, inputs, max_len=100): + for i in range(0, len(inputs), max_len): + yield expected_output[i:i+max_len], inputs[i:i+max_len] def _run_tests(self, tool, inputs, expected_output, use_boxes=False): - for expected_output_chunk, input_chunk in _split_into_chunks(expected_output, list(inputs)): + for expected_output_chunk, input_chunk in self._split_into_chunks(expected_output, list(inputs)): actual_output = tool(self.algorithm(), self.mode(), input_chunk, use_boxes=use_boxes) if not verify_test_output(actual_output, expected_output_chunk): return TestExitCode.FAILURE @@ -85,19 +92,33 @@ class _TestVectorsFile: def run_encryption_tests(self, tools, use_boxes=False): logging.info('Running encryption tests...') - keys, plaintexts, ciphertexts, init_vectors = self._extract_test_data('ENCRYPT') - inputs = _gen_inputs(keys, plaintexts, init_vectors) - return self._run_tests(tools.run_encrypt_block, inputs, ciphertexts, use_boxes) + if not self.recognized(): + return TestExitCode.SKIPPED + try: + keys, plaintexts, ciphertexts, init_vectors = self._encryption_data + inputs = self._gen_inputs(keys, plaintexts, init_vectors) + return self._run_tests(tools.run_encrypt_block, inputs, ciphertexts, use_boxes) + except Exception as e: + logging.error('Encountered an exception!') + logging.exception(e) + return TestExitCode.ERROR def run_decryption_tests(self, tools, use_boxes=False): logging.info('Running decryption tests...') - keys, plaintexts, ciphertexts, init_vectors = self._extract_test_data('DECRYPT') - inputs = _gen_inputs(keys, ciphertexts, init_vectors) - return self._run_tests(tools.run_decrypt_block, inputs, plaintexts, use_boxes) - - def _parse(self): - logging.info('Trying to parse test vectors file name \'{0}\'...'.format(self._fn)) - stub = self._strip_extension(self._fn) + if not self.recognized(): + return TestExitCode.SKIPPED + try: + keys, plaintexts, ciphertexts, init_vectors = self._decryption_data + inputs = self._gen_inputs(keys, ciphertexts, init_vectors) + return self._run_tests(tools.run_decrypt_block, inputs, plaintexts, use_boxes) + except Exception as e: + logging.error('Encountered an exception!') + logging.exception(e) + return TestExitCode.ERROR + + def _parse_path(self): + logging.info('Trying to parse test file path \'{0}\'...'.format(self._path)) + stub = self._strip_extension(os.path.basename(self._path)) if not stub: return stub = self._strip_algorithm(stub) if not stub: return @@ -107,10 +128,12 @@ class _TestVectorsFile: if not stub: return self._recognized = True - def _strip_extension(self, stub): - stub, ext = os.path.splitext(stub) - if ext != '.rsp': - logging.warn('Unknown test vectors file extension \'{0}\'!'.format(self._fn)) + _RECOGNIZED_EXT = '.rsp' + + def _strip_extension(self, path): + stub, ext = os.path.splitext(path) + if ext != self._RECOGNIZED_EXT: + logging.warn('Unknown test vectors file extension \'{0}\'!'.format(self._path)) return None return stub @@ -118,65 +141,44 @@ class _TestVectorsFile: key_size = stub[-3:] maybe_algorithm = 'aes{0}'.format(key_size) self._algorithm = Algorithm.try_parse(maybe_algorithm) - if self._algorithm: + if self._algorithm is not None: logging.info('\tAlgorithm: {0}'.format(self._algorithm)) return stub[0:-3] else: - logging.warn('Unknown or unsupported algorithm: ' + self._fn) + logging.warn('Unknown or unsupported algorithm: ' + self._path) return None + _RECOGNIZED_METHODS = ('GFSbox', 'KeySbox', 'VarKey', 'VarTxt') + def _strip_method(self, stub): - for method in ('GFSbox', 'KeySbox', 'VarKey', 'VarTxt'): + for method in self._RECOGNIZED_METHODS: if stub.endswith(method): logging.info('\tMethod: {0}'.format(method)) return stub[0:len(stub) - len(method)] - logging.warn('Unknown or unsupported method: ' + self._fn) + logging.warn('Unknown or unsupported method: ' + self._path) def _strip_mode(self, stub): self._mode = Mode.try_parse(stub) - if self._mode: + if self._mode is not None: logging.info('\tMode: {0}'.format(self._mode)) return self._mode else: - logging.warn('Unknown or unsupported mode: ' + self._fn) + logging.warn('Unknown or unsupported mode: ' + self._path) return None -def _parse_archive_and_run_tests(tools, archive_path, use_boxes=False): - archive = zipfile.ZipFile(archive_path) - exit_codes = [] - for fn in archive.namelist(): - member = _TestVectorsFile(fn, archive) - if member.recognized(): - member.parse() - try: - exit_codes.append(member.run_encryption_tests(tools, use_boxes)) - except Exception as e: - logging.error('Encountered an exception!') - logging.exception(e) - exit_codes.append(TestExitCode.ERROR) - try: - exit_codes.append(member.run_decryption_tests(tools, use_boxes)) - except Exception as e: - logging.error('Encountered an exception!') - logging.exception(e) - exit_codes.append(TestExitCode.ERROR) - else: - exit_codes.append(TestExitCode.SKIPPED) - logging.info('Test exit codes:') - logging.info('\tSkipped: {}'.format(exit_codes.count(TestExitCode.SKIPPED))) - logging.info('\tError(s): {}'.format(exit_codes.count(TestExitCode.ERROR))) - logging.info('\tSucceeded: {}'.format(exit_codes.count(TestExitCode.SUCCESS))) - logging.info('\tFailed: {}'.format(exit_codes.count(TestExitCode.FAILURE))) - if (exit_codes.count(TestExitCode.ERROR) == 0 and - exit_codes.count(TestExitCode.FAILURE) == 0): - sys.exit() - else: - sys.exit(1) - def _build_default_log_path(): return datetime.now().strftime('{}_%Y-%m-%d_%H-%M-%S.log').format( os.path.splitext(os.path.basename(__file__))[0]) +class TestArchive(zipfile.ZipFile): + def __init__(self, path): + super().__init__(path) + + def list_test_files(self): + with TemporaryDirectory() as tmp_dir: + for p in self.namelist(): + yield TestFile(self.extract(p, tmp_dir)) + if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() @@ -197,4 +199,20 @@ if __name__ == '__main__': level=logging.DEBUG) tools = Tools(args.path, use_sde=args.sde) - _parse_archive_and_run_tests(tools, args.archive, use_boxes=args.use_boxes) + archive = TestArchive(args.archive) + exit_codes = [] + + for test_file in archive.list_test_files(): + exit_codes.append(test_file.run_encryption_tests(tools, args.use_boxes)) + exit_codes.append(test_file.run_decryption_tests(tools, args.use_boxes)) + + logging.info('Test exit codes:') + logging.info('\tSkipped: {}'.format(exit_codes.count(TestExitCode.SKIPPED))) + logging.info('\tError(s): {}'.format(exit_codes.count(TestExitCode.ERROR))) + logging.info('\tSucceeded: {}'.format(exit_codes.count(TestExitCode.SUCCESS))) + logging.info('\tFailed: {}'.format(exit_codes.count(TestExitCode.FAILURE))) + if (exit_codes.count(TestExitCode.ERROR) == 0 and + exit_codes.count(TestExitCode.FAILURE) == 0): + sys.exit() + else: + sys.exit(1) diff --git a/test/file.py b/test/file.py index a918984..b64b828 100644 --- a/test/file.py +++ b/test/file.py @@ -22,36 +22,6 @@ _IV_EXT = 'iv' _PLAIN_EXT = 'plain' _CIPHER_EXT = 'cipher' -def _run_encryption_test(tools, tmp_dir, algorithm, mode, key, plain_path, cipher_path, iv=None, force=False): - logging.info('Running encryption test...') - logging.info('\tPlaintext file path: ' + plain_path) - logging.info('\tExpected ciphertext file path: ' + cipher_path) - tmp_path = os.path.join(tmp_dir, os.path.basename(cipher_path)) - logging.info('\tEncrypted file path: ' + tmp_path) - tools.run_encrypt_file(algorithm, mode, key, plain_path, tmp_path, iv) - if force: - logging.warn('Overwriting expected ciphertext file') - shutil.copy(tmp_path, cipher_path) - return TestExitCode.SKIPPED - if filecmp.cmp(cipher_path, tmp_path): - return TestExitCode.SUCCESS - else: - logging.error('The encrypted file doesn\'t match the ciphertext file') - return TestExitCode.FAILURE - -def _run_decryption_test(tools, tmp_dir, algorithm, mode, key, cipher_path, plain_path, iv=None): - logging.info('Running decryption test...') - logging.info('\tCiphertext file path: ' + cipher_path) - logging.info('\tExpected plaintext file path: ' + plain_path) - tmp_path = os.path.join(tmp_dir, os.path.basename(plain_path)) - logging.info('\tDecrypted file path: ' + tmp_path) - tools.run_decrypt_file(algorithm, mode, key, cipher_path, tmp_path, iv) - if filecmp.cmp(tmp_path, plain_path): - return TestExitCode.SUCCESS - else: - logging.error('The decrypted file doesn\'t match the plaintext file') - return TestExitCode.FAILURE - def _list_dirs(root_path): xs = map(lambda x: os.path.join(root_path, x), os.listdir(root_path)) return filter(os.path.isdir, xs) @@ -79,77 +49,97 @@ def _extract_test_name(key_path): def _replace_ext(path, new_ext): return '{}.{}'.format(os.path.splitext(path)[0], new_ext) -def _build_iv_path(key_path): +def _extract_iv_path(key_path): return _replace_ext(key_path, _IV_EXT) -def _build_plain_path(key_path): +def _extract_plain_path(key_path): return _replace_ext(key_path, _PLAIN_EXT) -def _build_cipher_path(key_path): +def _extract_cipher_path(key_path): return _replace_ext(key_path, _CIPHER_EXT) -def _run_tests(tools, suite_dir, force=False): - exit_codes = [] +class TestCase: + def __init__(self, algorithm, mode, key, plain_path, cipher_path, iv=None): + self.algorithm = algorithm + self.mode = mode + self.key = key + self.plain_path = plain_path + self.cipher_path = cipher_path + self.iv = iv + + def run_encryption_test(self, tools, tmp_dir, force=False): + tmp_dir = os.path.join(tmp_dir, str(self.algorithm), str(self.mode)) + os.makedirs(tmp_dir, 0o777, True) + + logging.info('Running encryption test...') + logging.info('\tPlaintext file path: ' + self.plain_path) + logging.info('\tExpected ciphertext file path: ' + self.cipher_path) + tmp_path = os.path.join(tmp_dir, os.path.basename(self.cipher_path)) + logging.info('\tEncrypted file path: ' + tmp_path) + logging.info('\tAlgorithm: {}'.format(self.algorithm)) + logging.info('\tMode: {}'.format(self.mode)) + + tools.run_encrypt_file(self.algorithm, self.mode, self.key, + self.plain_path, tmp_path, self.iv) + if force: + logging.warn('Overwriting expected ciphertext file') + shutil.copy(tmp_path, self.cipher_path) + return TestExitCode.SKIPPED + if filecmp.cmp(self.cipher_path, tmp_path): + return TestExitCode.SUCCESS + else: + logging.error('The encrypted file doesn\'t match the ciphertext file') + return TestExitCode.FAILURE + + def run_decryption_test(self, tools, tmp_dir): + tmp_dir = os.path.join(tmp_dir, str(self.algorithm), str(self.mode)) + os.makedirs(tmp_dir, 0o777, True) + + logging.info('Running decryption test...') + logging.info('\tCiphertext file path: ' + self.cipher_path) + logging.info('\tExpected plaintext file path: ' + self.plain_path) + tmp_path = os.path.join(tmp_dir, os.path.basename(self.plain_path)) + logging.info('\tDecrypted file path: ' + tmp_path) + logging.info('\tAlgorithm: {}'.format(self.algorithm)) + logging.info('\tMode: {}'.format(self.mode)) + + tools.run_decrypt_file(self.algorithm, self.mode, self.key, + self.cipher_path, tmp_path, self.iv) + if filecmp.cmp(tmp_path, self.plain_path): + return TestExitCode.SUCCESS + else: + logging.error('The decrypted file doesn\'t match the plaintext file') + return TestExitCode.FAILURE + +def list_test_cases(suite_dir): suite_dir = os.path.abspath(suite_dir) logging.info('Suite directory path: ' + suite_dir) - with TemporaryDirectory() as tmp_dir: - for algorithm_dir in _list_dirs(suite_dir): - algorithm = os.path.basename(algorithm_dir) - maybe_algorithm = Algorithm.try_parse(algorithm) - if maybe_algorithm is None: - logging.warn('Unknown or unsupported algorithm: ' + algorithm) - exit_codes.append(TestExitCode.SKIPPED) + for algorithm_dir in _list_dirs(suite_dir): + algorithm = os.path.basename(algorithm_dir) + maybe_algorithm = Algorithm.try_parse(algorithm) + if maybe_algorithm is None: + logging.warn('Unknown or unsupported algorithm: ' + algorithm) + continue + algorithm = maybe_algorithm + for mode_dir in _list_dirs(algorithm_dir): + mode = os.path.basename(mode_dir) + maybe_mode = Mode.try_parse(mode) + if maybe_mode is None: + logging.warn('Unknown or unsupported mode: ' + mode) continue - algorithm = maybe_algorithm - logging.info('Algorithm: {}'.format(algorithm)) - for mode_dir in _list_dirs(algorithm_dir): - mode = os.path.basename(mode_dir) - maybe_mode = Mode.try_parse(mode) - if maybe_mode is None: - logging.warn('Unknown or unsupported mode: ' + mode) - exit_codes.append(TestExitCode.SKIPPED) - continue - mode = maybe_mode - logging.info('Mode: {}'.format(mode)) - for key_path in _list_keys(mode_dir): - key = _read_key(key_path) - logging.info('Key: ' + key) - test_name = _extract_test_name(key_path) - logging.info('Test name: ' + test_name) - iv = None - if mode.requires_init_vector(): - iv_path = _build_iv_path(key_path) - iv = _read_iv(iv_path) - plain_path = _build_plain_path(key_path) - cipher_path = _build_cipher_path(key_path) - os.makedirs(os.path.join(tmp_dir, str(algorithm), str(mode)), 0o777, True) - try: - exit_codes.append(_run_encryption_test( - tools, os.path.join(tmp_dir, str(algorithm), str(mode)), - algorithm, mode, key, plain_path, cipher_path, iv, force)) - except Exception as e: - logging.error('Encountered an exception!') - logging.exception(e) - exit_codes.append(TestExitCode.ERROR) - if not force: - try: - exit_codes.append(_run_decryption_test( - tools, os.path.join(tmp_dir, str(algorithm), str(mode)), - algorithm, mode, key, cipher_path, plain_path, iv)) - except Exception as e: - logging.error('Encountered an exception!') - logging.exception(e) - exit_codes.append(TestExitCode.ERROR) - logging.info('Test exit codes:') - logging.info('\tSkipped: {}'.format(exit_codes.count(TestExitCode.SKIPPED))) - logging.info('\tError(s): {}'.format(exit_codes.count(TestExitCode.ERROR))) - logging.info('\tSucceeded: {}'.format(exit_codes.count(TestExitCode.SUCCESS))) - logging.info('\tFailed: {}'.format(exit_codes.count(TestExitCode.FAILURE))) - if (exit_codes.count(TestExitCode.ERROR) == 0 and - exit_codes.count(TestExitCode.FAILURE) == 0): - sys.exit() - else: - sys.exit(1) + mode = maybe_mode + for key_path in _list_keys(mode_dir): + key = _read_key(key_path) + logging.info('Key: ' + key) + test_name = _extract_test_name(key_path) + logging.info('Test name: ' + test_name) + iv = None + if mode.requires_init_vector(): + iv_path = _extract_iv_path(key_path) + iv = _read_iv(iv_path) + plain_path = _extract_plain_path(key_path) + cipher_path = _extract_cipher_path(key_path) + yield TestCase(algorithm, mode, key, plain_path, cipher_path, iv) def _build_default_log_path(): return datetime.now().strftime('{}_%Y-%m-%d_%H-%M-%S.log').format( @@ -175,4 +165,20 @@ if __name__ == '__main__': level=logging.DEBUG) tools = Tools(args.path, use_sde=args.sde) - _run_tests(tools, args.suite, args.force) + + exit_codes = [] + with TemporaryDirectory() as tmp_dir: + for test_case in list_test_cases(args.suite): + exit_codes.append(test_case.run_encryption_test(tools, tmp_dir, args.force)) + exit_codes.append(test_case.run_decryption_test(tools, tmp_dir)) + + logging.info('Test exit codes:') + logging.info('\tSkipped: {}'.format(exit_codes.count(TestExitCode.SKIPPED))) + logging.info('\tError(s): {}'.format(exit_codes.count(TestExitCode.ERROR))) + logging.info('\tSucceeded: {}'.format(exit_codes.count(TestExitCode.SUCCESS))) + logging.info('\tFailed: {}'.format(exit_codes.count(TestExitCode.FAILURE))) + if (exit_codes.count(TestExitCode.ERROR) == 0 and + exit_codes.count(TestExitCode.FAILURE) == 0): + sys.exit() + else: + sys.exit(1) diff --git a/test/nist-sp-800-38a.py b/test/nist-sp-800-38a.py index 1bbd3bc..119ddb9 100644 --- a/test/nist-sp-800-38a.py +++ b/test/nist-sp-800-38a.py @@ -172,6 +172,7 @@ def run_encryption_test(tools, algorithm, mode, use_boxes=False): logging.info('Running encryption test...') logging.info('Algorithm: {}'.format(algorithm)) logging.info('Mode: {}'.format(mode)) + plaintexts = get_test_plaintexts(algorithm, mode) key = get_test_key(algorithm, mode) iv = get_test_iv(algorithm, mode) @@ -187,6 +188,7 @@ def run_decryption_test(tools, algorithm, mode, use_boxes=False): logging.info('Running decryption test...') logging.info('Algorithm: {}'.format(algorithm)) logging.info('Mode: {}'.format(mode)) + ciphertexts = get_test_ciphertexts(algorithm, mode) key = get_test_key(algorithm, mode) iv = get_test_iv(algorithm, mode) -- cgit v1.2.3