diff options
author | Egor Tensin <Egor.Tensin@gmail.com> | 2016-02-13 03:51:02 +0300 |
---|---|---|
committer | Egor Tensin <Egor.Tensin@gmail.com> | 2016-02-13 03:51:02 +0300 |
commit | d8785fa60477b6804f6a84074472eb5a8affe910 (patch) | |
tree | fb0105c6cc487a44f85e650747f44acf4c88a8f2 /test/cavp.py | |
parent | test: code style (diff) | |
download | aes-tools-d8785fa60477b6804f6a84074472eb5a8affe910.tar.gz aes-tools-d8785fa60477b6804f6a84074472eb5a8affe910.zip |
test: refactoring
Diffstat (limited to '')
-rw-r--r-- | test/cavp.py | 180 |
1 files changed, 99 insertions, 81 deletions
diff --git a/test/cavp.py b/test/cavp.py index fcc5341..88cc0ee 100644 --- a/test/cavp.py +++ b/test/cavp.py @@ -9,6 +9,7 @@ from enum import Enum import logging import os.path import sys +from tempfile import TemporaryDirectory import zipfile from toolkit import * @@ -20,16 +21,6 @@ class _MultiOrderedDict(OrderedDict): else: super(OrderedDict, self).__setitem__(key, value) -def _gen_inputs(keys, plaintexts, init_vectors): - if init_vectors is None: - init_vectors = [None for key in keys] - for key, plaintext, iv in zip(keys, plaintexts, init_vectors): - yield BlockInput(key, [plaintext], iv) - -def _split_into_chunks(expected_output, inputs, max_len=100): - for i in range(0, len(inputs), max_len): - yield expected_output[i:i+max_len], inputs[i:i+max_len] - def verify_test_output(actual, expected): if len(actual) != len(expected): logging.error('Unexpected output length {0} (expected {1})'.format(len(actual), len(expected))) @@ -42,13 +33,14 @@ def verify_test_output(actual, expected): class TestExitCode(Enum): SUCCESS, FAILURE, ERROR, SKIPPED = range(1, 5) -class _TestVectorsFile: - def __init__(self, path, archive): - self._archive = archive +class TestFile: + def __init__(self, path): self._path = path - self._fn = os.path.split(path)[1] self._recognized = False - self._parse() + self._parse_path() + if not self.recognized(): + return + self._parse_data() def recognized(self): return self._recognized @@ -59,25 +51,40 @@ class _TestVectorsFile: def mode(self): return self._mode - def parse(self): - self._parser = configparser.ConfigParser( + def _parse_data_section(self, parser, section): + keys = parser.get(section, 'key') + plaintexts = parser.get(section, 'plaintext') + ciphertexts = parser.get(section, 'ciphertext') + init_vectors = None + if self.mode().requires_init_vector(): + init_vectors = parser.get(section, 'iv') + return keys, plaintexts, ciphertexts, init_vectors + + def _parse_data(self): + parser = configparser.ConfigParser( dict_type=_MultiOrderedDict, strict=False, interpolation=None, empty_lines_in_values=False) - self._parser.read_string(self._archive.read(self._path).decode('utf-8')) - - def _extract_test_data(self, section): - keys = self._parser.get(section, 'key') - plaintexts = self._parser.get(section, 'plaintext') - ciphertexts = self._parser.get(section, 'ciphertext') - init_vectors = None - if self.mode().requires_init_vector(): - init_vectors = self._parser.get(section, 'iv') - return keys, plaintexts, ciphertexts, init_vectors + with open(self._path) as fd: + parser.read_string(fd.read()) + self._encryption_data = self._parse_data_section(parser, 'ENCRYPT') + self._decryption_data = self._parse_data_section(parser, 'DECRYPT') + + @staticmethod + def _gen_inputs(keys, plaintexts, init_vectors): + if init_vectors is None: + init_vectors = [None for key in keys] + for key, plaintext, iv in zip(keys, plaintexts, init_vectors): + yield BlockInput(key, [plaintext], iv) + + @staticmethod + def _split_into_chunks(expected_output, inputs, max_len=100): + for i in range(0, len(inputs), max_len): + yield expected_output[i:i+max_len], inputs[i:i+max_len] def _run_tests(self, tool, inputs, expected_output, use_boxes=False): - for expected_output_chunk, input_chunk in _split_into_chunks(expected_output, list(inputs)): + for expected_output_chunk, input_chunk in self._split_into_chunks(expected_output, list(inputs)): actual_output = tool(self.algorithm(), self.mode(), input_chunk, use_boxes=use_boxes) if not verify_test_output(actual_output, expected_output_chunk): return TestExitCode.FAILURE @@ -85,19 +92,33 @@ class _TestVectorsFile: def run_encryption_tests(self, tools, use_boxes=False): logging.info('Running encryption tests...') - keys, plaintexts, ciphertexts, init_vectors = self._extract_test_data('ENCRYPT') - inputs = _gen_inputs(keys, plaintexts, init_vectors) - return self._run_tests(tools.run_encrypt_block, inputs, ciphertexts, use_boxes) + if not self.recognized(): + return TestExitCode.SKIPPED + try: + keys, plaintexts, ciphertexts, init_vectors = self._encryption_data + inputs = self._gen_inputs(keys, plaintexts, init_vectors) + return self._run_tests(tools.run_encrypt_block, inputs, ciphertexts, use_boxes) + except Exception as e: + logging.error('Encountered an exception!') + logging.exception(e) + return TestExitCode.ERROR def run_decryption_tests(self, tools, use_boxes=False): logging.info('Running decryption tests...') - keys, plaintexts, ciphertexts, init_vectors = self._extract_test_data('DECRYPT') - inputs = _gen_inputs(keys, ciphertexts, init_vectors) - return self._run_tests(tools.run_decrypt_block, inputs, plaintexts, use_boxes) - - def _parse(self): - logging.info('Trying to parse test vectors file name \'{0}\'...'.format(self._fn)) - stub = self._strip_extension(self._fn) + if not self.recognized(): + return TestExitCode.SKIPPED + try: + keys, plaintexts, ciphertexts, init_vectors = self._decryption_data + inputs = self._gen_inputs(keys, ciphertexts, init_vectors) + return self._run_tests(tools.run_decrypt_block, inputs, plaintexts, use_boxes) + except Exception as e: + logging.error('Encountered an exception!') + logging.exception(e) + return TestExitCode.ERROR + + def _parse_path(self): + logging.info('Trying to parse test file path \'{0}\'...'.format(self._path)) + stub = self._strip_extension(os.path.basename(self._path)) if not stub: return stub = self._strip_algorithm(stub) if not stub: return @@ -107,10 +128,12 @@ class _TestVectorsFile: if not stub: return self._recognized = True - def _strip_extension(self, stub): - stub, ext = os.path.splitext(stub) - if ext != '.rsp': - logging.warn('Unknown test vectors file extension \'{0}\'!'.format(self._fn)) + _RECOGNIZED_EXT = '.rsp' + + def _strip_extension(self, path): + stub, ext = os.path.splitext(path) + if ext != self._RECOGNIZED_EXT: + logging.warn('Unknown test vectors file extension \'{0}\'!'.format(self._path)) return None return stub @@ -118,65 +141,44 @@ class _TestVectorsFile: key_size = stub[-3:] maybe_algorithm = 'aes{0}'.format(key_size) self._algorithm = Algorithm.try_parse(maybe_algorithm) - if self._algorithm: + if self._algorithm is not None: logging.info('\tAlgorithm: {0}'.format(self._algorithm)) return stub[0:-3] else: - logging.warn('Unknown or unsupported algorithm: ' + self._fn) + logging.warn('Unknown or unsupported algorithm: ' + self._path) return None + _RECOGNIZED_METHODS = ('GFSbox', 'KeySbox', 'VarKey', 'VarTxt') + def _strip_method(self, stub): - for method in ('GFSbox', 'KeySbox', 'VarKey', 'VarTxt'): + for method in self._RECOGNIZED_METHODS: if stub.endswith(method): logging.info('\tMethod: {0}'.format(method)) return stub[0:len(stub) - len(method)] - logging.warn('Unknown or unsupported method: ' + self._fn) + logging.warn('Unknown or unsupported method: ' + self._path) def _strip_mode(self, stub): self._mode = Mode.try_parse(stub) - if self._mode: + if self._mode is not None: logging.info('\tMode: {0}'.format(self._mode)) return self._mode else: - logging.warn('Unknown or unsupported mode: ' + self._fn) + logging.warn('Unknown or unsupported mode: ' + self._path) return None -def _parse_archive_and_run_tests(tools, archive_path, use_boxes=False): - archive = zipfile.ZipFile(archive_path) - exit_codes = [] - for fn in archive.namelist(): - member = _TestVectorsFile(fn, archive) - if member.recognized(): - member.parse() - try: - exit_codes.append(member.run_encryption_tests(tools, use_boxes)) - except Exception as e: - logging.error('Encountered an exception!') - logging.exception(e) - exit_codes.append(TestExitCode.ERROR) - try: - exit_codes.append(member.run_decryption_tests(tools, use_boxes)) - except Exception as e: - logging.error('Encountered an exception!') - logging.exception(e) - exit_codes.append(TestExitCode.ERROR) - else: - exit_codes.append(TestExitCode.SKIPPED) - logging.info('Test exit codes:') - logging.info('\tSkipped: {}'.format(exit_codes.count(TestExitCode.SKIPPED))) - logging.info('\tError(s): {}'.format(exit_codes.count(TestExitCode.ERROR))) - logging.info('\tSucceeded: {}'.format(exit_codes.count(TestExitCode.SUCCESS))) - logging.info('\tFailed: {}'.format(exit_codes.count(TestExitCode.FAILURE))) - if (exit_codes.count(TestExitCode.ERROR) == 0 and - exit_codes.count(TestExitCode.FAILURE) == 0): - sys.exit() - else: - sys.exit(1) - def _build_default_log_path(): return datetime.now().strftime('{}_%Y-%m-%d_%H-%M-%S.log').format( os.path.splitext(os.path.basename(__file__))[0]) +class TestArchive(zipfile.ZipFile): + def __init__(self, path): + super().__init__(path) + + def list_test_files(self): + with TemporaryDirectory() as tmp_dir: + for p in self.namelist(): + yield TestFile(self.extract(p, tmp_dir)) + if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() @@ -197,4 +199,20 @@ if __name__ == '__main__': level=logging.DEBUG) tools = Tools(args.path, use_sde=args.sde) - _parse_archive_and_run_tests(tools, args.archive, use_boxes=args.use_boxes) + archive = TestArchive(args.archive) + exit_codes = [] + + for test_file in archive.list_test_files(): + exit_codes.append(test_file.run_encryption_tests(tools, args.use_boxes)) + exit_codes.append(test_file.run_decryption_tests(tools, args.use_boxes)) + + logging.info('Test exit codes:') + logging.info('\tSkipped: {}'.format(exit_codes.count(TestExitCode.SKIPPED))) + logging.info('\tError(s): {}'.format(exit_codes.count(TestExitCode.ERROR))) + logging.info('\tSucceeded: {}'.format(exit_codes.count(TestExitCode.SUCCESS))) + logging.info('\tFailed: {}'.format(exit_codes.count(TestExitCode.FAILURE))) + if (exit_codes.count(TestExitCode.ERROR) == 0 and + exit_codes.count(TestExitCode.FAILURE) == 0): + sys.exit() + else: + sys.exit(1) |