From d10c1d977205dd5a2f1b227cb943b393a3a4184f Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Mon, 23 Dec 2019 07:52:24 +0300 Subject: test: pylint/pep8 fixes --- test/cavp.py | 35 ++++++++++++++++++++----------- test/file.py | 64 ++++++++++++++++++++++++++++++++++++++------------------- test/nist.py | 22 ++++++++++++++++---- test/toolkit.py | 9 +++++--- 4 files changed, 90 insertions(+), 40 deletions(-) diff --git a/test/cavp.py b/test/cavp.py index b5473eb..1e6ebb8 100644 --- a/test/cavp.py +++ b/test/cavp.py @@ -18,6 +18,7 @@ import zipfile from toolkit import Algorithm, BlockInput, Mode, Tools + class _MultiOrderedDict(OrderedDict): def __setitem__(self, key, value): if isinstance(value, MutableSequence) and key in self: @@ -25,6 +26,7 @@ class _MultiOrderedDict(OrderedDict): else: super(OrderedDict, self).__setitem__(key, value) + def verify_test_output(actual, expected): if len(actual) != len(expected): logging.error('Unexpected output length!') @@ -32,13 +34,15 @@ def verify_test_output(actual, expected): logging.error('\tActual: %d', len(actual)) return False if actual != expected: - logging.error('Expected output:\n' + '\n'.join(expected)) + logging.error('Expected output:\n%s', '\n'.join(expected)) return False return True + class TestExitCode(Enum): SUCCESS, FAILURE, ERROR, SKIPPED = range(1, 5) + class TestFile: def __init__(self, path): self._path = path @@ -149,12 +153,11 @@ class TestFile: key_size = stub[-3:] maybe_algorithm = 'aes{}'.format(key_size) self._algorithm = Algorithm.try_parse(maybe_algorithm) - if self._algorithm is not None: - logging.info('\tAlgorithm: %s', self._algorithm) - return stub[0:-3] - else: - logging.warning('Unknown or unsupported algorithm: ' + self._path) + if self._algorithm is None: + logging.warning('Unknown or unsupported algorithm: %s', self._path) return None + logging.info('\tAlgorithm: %s', self._algorithm) + return stub[0:-3] _RECOGNIZED_METHODS = ('GFSbox', 'KeySbox', 'VarKey', 'VarTxt') @@ -163,16 +166,17 @@ class TestFile: if stub.endswith(method): logging.info('\tMethod: %s', method) return stub[0:len(stub) - len(method)] - logging.warning('Unknown or unsupported method: ' + self._path) + logging.warning('Unknown or unsupported method: %s', self._path) + return None def _strip_mode(self, stub): self._mode = Mode.try_parse(stub) - if self._mode is not None: - logging.info('\tMode: %s', self._mode) - return self._mode - else: - logging.warning('Unknown or unsupported mode: ' + self._path) + if self._mode is None: + logging.warning('Unknown or unsupported mode: %s', self._path) return None + logging.info('\tMode: %s', self._mode) + return self._mode + class TestArchive(zipfile.ZipFile): def __init__(self, path): @@ -183,14 +187,17 @@ class TestArchive(zipfile.ZipFile): for path in self.namelist(): yield TestFile(self.extract(path, tmp_dir)) + _script_dir = os.path.dirname(__file__) _script_name = os.path.splitext(os.path.basename(__file__))[0] + def _build_default_log_path(): timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') fn = '{}_{}.log'.format(_script_name, timestamp) return os.path.join(_script_dir, fn) + def _setup_logging(log_path=None): if log_path is None: log_path = _build_default_log_path() @@ -200,6 +207,7 @@ def _setup_logging(log_path=None): format='%(asctime)s | %(module)s | %(levelname)s | %(message)s', level=logging.DEBUG) + def run_tests(archive_path, tools_path=(), use_sde=False, use_boxes=False, log_path=None): _setup_logging(log_path) tools = Tools(tools_path, use_sde=use_sde) @@ -222,6 +230,7 @@ def run_tests(archive_path, tools_path=(), use_sde=False, use_boxes=False, log_p else: return 1 + def _parse_args(args=None): if args is None: args = sys.argv[1:] @@ -240,8 +249,10 @@ def _parse_args(args=None): help='set log file path') return parser.parse_args(args) + def main(args=None): return run_tests(**vars(_parse_args(args))) + if __name__ == '__main__': sys.exit(main()) diff --git a/test/file.py b/test/file.py index 7a02a15..b6d685d 100644 --- a/test/file.py +++ b/test/file.py @@ -18,53 +18,67 @@ from tempfile import NamedTemporaryFile from toolkit import Algorithm, Mode, Tools + class TestExitCode(Enum): SUCCESS, FAILURE, ERROR, SKIPPED = range(1, 5) + _KEY_EXT = 'key' _IV_EXT = 'iv' _PLAIN_EXT = 'plain' _CIPHER_EXT = 'cipher' + def _list_dirs(root_path): for path in os.listdir(root_path): path = os.path.join(root_path, path) if os.path.isdir(path): yield path + def _list_files(root_path, ext): for path in glob(os.path.join(root_path, '*.{}'.format(ext))): if os.path.isfile(path): yield path + def _list_keys(root_path): return _list_files(root_path, _KEY_EXT) + def _read_first_line(path): with open(path) as fd: return fd.readline() + def _read_key(key_path): return _read_first_line(key_path) + def _read_iv(iv_path): return _read_first_line(iv_path) + def _extract_test_name(key_path): return os.path.splitext(os.path.basename(key_path))[0] + def _replace_ext(path, new_ext): return '{}.{}'.format(os.path.splitext(path)[0], new_ext) + def _extract_iv_path(key_path): return _replace_ext(key_path, _IV_EXT) + def _extract_plaintext_path(key_path): return _replace_ext(key_path, _PLAIN_EXT) + def _extract_ciphertext_path(key_path): return _replace_ext(key_path, _CIPHER_EXT) + @contextmanager def _make_output_file(): with NamedTemporaryFile(delete=False) as tmp_file: @@ -72,16 +86,17 @@ def _make_output_file(): yield tmp_path os.remove(tmp_path) + def run_encryption_test(tools, algorithm, mode, key, plaintext_path, ciphertext_path, iv=None, force=False): logging.info('Running encryption test...') - logging.info('\tPlaintext file path: ' + plaintext_path) - logging.info('\tExpected ciphertext file path: ' + ciphertext_path) - logging.info('\tAlgorithm: ' + str(algorithm)) - logging.info('\tMode: ' + str(mode)) + logging.info('\tPlaintext file path: %s', plaintext_path) + logging.info('\tExpected ciphertext file path: %s', ciphertext_path) + logging.info('\tAlgorithm: %s', algorithm) + logging.info('\tMode: %s', mode) with _make_output_file() as tmp_path: - logging.info('\tEncrypted file path: ' + tmp_path) + logging.info('\tEncrypted file path: %s', tmp_path) try: tools.run_encrypt_file(algorithm, mode, key, plaintext_path, @@ -92,60 +107,60 @@ def run_encryption_test(tools, algorithm, mode, key, plaintext_path, return TestExitCode.SKIPPED if filecmp.cmp(ciphertext_path, tmp_path): return TestExitCode.SUCCESS - else: - logging.error('The encrypted file doesn\'t match the ciphertext file') - return TestExitCode.FAILURE + logging.error('The encrypted file doesn\'t match the ciphertext file') + return TestExitCode.FAILURE except CalledProcessError as e: logging.error('Encountered an exception!') logging.exception(e) return TestExitCode.ERROR + def run_decryption_test(tools, algorithm, mode, key, plaintext_path, ciphertext_path, iv=None): logging.info('Running decryption test...') - logging.info('\tCiphertext file path: ' + ciphertext_path) - logging.info('\tExpected plaintext file path: ' + plaintext_path) - logging.info('\tAlgorithm: ' + str(algorithm)) - logging.info('\tMode: ' + str(mode)) + logging.info('\tCiphertext file path: %s', ciphertext_path) + logging.info('\tExpected plaintext file path: %s', plaintext_path) + logging.info('\tAlgorithm: %s', algorithm) + logging.info('\tMode: %s', mode) with _make_output_file() as tmp_path: - logging.info('\tDecrypted file path: ' + tmp_path) + logging.info('\tDecrypted file path: %s', tmp_path) try: tools.run_decrypt_file(algorithm, mode, key, ciphertext_path, tmp_path, iv) if filecmp.cmp(tmp_path, plaintext_path): return TestExitCode.SUCCESS - else: - logging.error('The decrypted file doesn\'t match the plaintext file') - return TestExitCode.FAILURE + logging.error('The decrypted file doesn\'t match the plaintext file') + return TestExitCode.FAILURE except CalledProcessError as e: logging.error('Encountered an exception!') logging.exception(e) return TestExitCode.ERROR + def enum_tests(suite_dir): suite_dir = os.path.abspath(suite_dir) - logging.info('Suite directory path: ' + suite_dir) + logging.info('Suite directory path: %s', suite_dir) for algorithm_dir in _list_dirs(suite_dir): algorithm = os.path.basename(algorithm_dir) maybe_algorithm = Algorithm.try_parse(algorithm) if maybe_algorithm is None: - logging.warning('Unknown or unsupported algorithm: ' + algorithm) + logging.warning('Unknown or unsupported algorithm: %s', algorithm) continue algorithm = maybe_algorithm for mode_dir in _list_dirs(algorithm_dir): mode = os.path.basename(mode_dir) maybe_mode = Mode.try_parse(mode) if maybe_mode is None: - logging.warning('Unknown or unsupported mode: ' + mode) + logging.warning('Unknown or unsupported mode: %s', mode) continue mode = maybe_mode for key_path in _list_keys(mode_dir): key = _read_key(key_path) - logging.info('Key: ' + key) + logging.info('Key: %s', key) test_name = _extract_test_name(key_path) - logging.info('Test name: ' + test_name) + logging.info('Test name: %s', test_name) iv = None if mode.requires_init_vector(): iv_path = _extract_iv_path(key_path) @@ -154,14 +169,17 @@ def enum_tests(suite_dir): ciphertext_path = _extract_ciphertext_path(key_path) yield algorithm, mode, key, plaintext_path, ciphertext_path, iv + _script_dir = os.path.dirname(__file__) _script_name = os.path.splitext(os.path.basename(__file__))[0] + def _build_default_log_path(): timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') fn = '{}_{}.log'.format(_script_name, timestamp) return os.path.join(_script_dir, fn) + def _setup_logging(log_path=None): if log_path is None: log_path = _build_default_log_path() @@ -171,6 +189,7 @@ def _setup_logging(log_path=None): format='%(asctime)s | %(module)s | %(levelname)s | %(message)s', level=logging.DEBUG) + def run_tests(suite_path, tools_path=(), log_path=None, use_sde=False, force=False): _setup_logging(log_path) tools = Tools(tools_path, use_sde=use_sde) @@ -192,6 +211,7 @@ def run_tests(suite_path, tools_path=(), log_path=None, use_sde=False, force=Fal else: return 1 + def _parse_args(args=None): if args is None: args = sys.argv[1:] @@ -210,8 +230,10 @@ def _parse_args(args=None): help='set test suite directory path') return parser.parse_args(args) + def main(args=None): return run_tests(**vars(_parse_args(args))) + if __name__ == '__main__': sys.exit(main()) diff --git a/test/nist.py b/test/nist.py index f7a119f..9448a6d 100644 --- a/test/nist.py +++ b/test/nist.py @@ -140,25 +140,31 @@ _TEST_CIPHERTEXTS = { } } + def get_test_plaintexts(*_): return _TEST_PLAINTEXTS + def get_test_key(algorithm, *_): return _TEST_KEYS[algorithm] + def get_test_iv(algorithm, mode): if not mode.requires_init_vector(): return None return _TEST_INIT_VECTORS[algorithm][mode] + def get_test_ciphertexts(algorithm, mode): return _TEST_CIPHERTEXTS[algorithm][mode] + def get_tested_algorithms_and_modes(): for algorithm in _TEST_CIPHERTEXTS: for mode in _TEST_CIPHERTEXTS[algorithm]: yield algorithm, mode + def verify_test_output(actual, expected): if len(actual) != len(expected): logging.error('Unexpected output length!') @@ -170,9 +176,11 @@ def verify_test_output(actual, expected): return False return True + class TestExitCode(Enum): SUCCESS, FAILURE, ERROR, SKIPPED = range(1, 5) + def run_encryption_test(tools, algorithm, mode, use_boxes=False): logging.info('Running encryption test...') logging.info('Algorithm: %s', algorithm) @@ -188,13 +196,13 @@ def run_encryption_test(tools, algorithm, mode, use_boxes=False): algorithm, mode, input_, use_boxes) if verify_test_output(actual_ciphertexts, expected_ciphertexts): return TestExitCode.SUCCESS - else: - return TestExitCode.FAILURE + return TestExitCode.FAILURE except CalledProcessError as e: logging.error('Encountered an exception!') logging.exception(e) return TestExitCode.ERROR + def run_decryption_test(tools, algorithm, mode, use_boxes=False): logging.info('Running decryption test...') logging.info('Algorithm: %s', algorithm) @@ -210,21 +218,23 @@ def run_decryption_test(tools, algorithm, mode, use_boxes=False): algorithm, mode, input_, use_boxes) if verify_test_output(actual_plaintexts, expected_plaintexts): return TestExitCode.SUCCESS - else: - return TestExitCode.FAILURE + return TestExitCode.FAILURE except CalledProcessError as e: logging.error('Encountered an exception!') logging.exception(e) return TestExitCode.ERROR + _script_dir = os.path.dirname(__file__) _script_name = os.path.splitext(os.path.basename(__file__))[0] + def _build_default_log_path(): timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') fn = '{}_{}.log'.format(_script_name, timestamp) return os.path.join(_script_dir, fn) + def _setup_logging(log_path=None): if log_path is None: log_path = _build_default_log_path() @@ -234,6 +244,7 @@ def _setup_logging(log_path=None): format='%(asctime)s | %(module)s | %(levelname)s | %(message)s', level=logging.DEBUG) + def run_tests(tools_path=(), use_sde=False, use_boxes=False, log_path=None): _setup_logging(log_path) tools = Tools(tools_path, use_sde=use_sde) @@ -257,6 +268,7 @@ def run_tests(tools_path=(), use_sde=False, use_boxes=False, log_path=None): else: return 1 + def _parse_args(args=None): if args is None: args = sys.argv[1:] @@ -272,8 +284,10 @@ def _parse_args(args=None): help='set log file path') return parser.parse_args(args) + def main(args=None): return run_tests(**vars(_parse_args(args))) + if __name__ == '__main__': sys.exit(main()) diff --git a/test/toolkit.py b/test/toolkit.py index f8a74ba..4abdbf4 100644 --- a/test/toolkit.py +++ b/test/toolkit.py @@ -9,6 +9,7 @@ import logging import os.path import subprocess + class Algorithm(Enum): @staticmethod def parse(s): @@ -50,6 +51,7 @@ class Mode(Enum): def __str__(self): return self.value + class BlockInput: def __init__(self, key, plaintexts, iv=None): self.key = key @@ -63,6 +65,7 @@ class BlockInput: args.extend(self.plaintexts) return args + class Tools: def __init__(self, search_dirs, use_sde=False): if search_dirs: @@ -82,14 +85,14 @@ class Tools: def run(self, tool_path, args): cmd_list = ['sde', '--', tool_path] if self._use_sde else [tool_path] cmd_list.extend(args) - logging.info('Trying to execute: ' + subprocess.list2cmdline(cmd_list)) + logging.info('Trying to execute: %s', subprocess.list2cmdline(cmd_list)) try: output = subprocess.check_output( cmd_list, universal_newlines=True, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as e: - logging.error('Output:\n' + e.output) + logging.error('Output:\n%s', e.output) raise - logging.info('Output:\n' + output) + logging.info('Output:\n%s', output) return output.split() @staticmethod -- cgit v1.2.3