From d62549f87b4ebad16bbbae20058fecc1a147c112 Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Sat, 24 Oct 2015 03:37:35 +0300 Subject: add test/file.py --- test/file.py | 158 ++++++++++++++++++++++++++++++++++++++++++ test/file/aes128/ecb/0.cipher | 1 + test/file/aes128/ecb/0.key | 1 + test/file/aes128/ecb/0.plain | 0 test/file/aes192/ecb/0.cipher | 2 + test/file/aes192/ecb/0.key | 1 + test/file/aes192/ecb/0.plain | 0 test/file/aes256/ecb/0.cipher | 1 + test/file/aes256/ecb/0.key | 1 + test/file/aes256/ecb/0.plain | 0 test/toolkit.py | 54 +++++++++++---- 11 files changed, 204 insertions(+), 15 deletions(-) create mode 100644 test/file.py create mode 100644 test/file/aes128/ecb/0.cipher create mode 100644 test/file/aes128/ecb/0.key create mode 100644 test/file/aes128/ecb/0.plain create mode 100644 test/file/aes192/ecb/0.cipher create mode 100644 test/file/aes192/ecb/0.key create mode 100644 test/file/aes192/ecb/0.plain create mode 100644 test/file/aes256/ecb/0.cipher create mode 100644 test/file/aes256/ecb/0.key create mode 100644 test/file/aes256/ecb/0.plain diff --git a/test/file.py b/test/file.py new file mode 100644 index 0000000..7952529 --- /dev/null +++ b/test/file.py @@ -0,0 +1,158 @@ +# Copyright 2015 Egor Tensin +# This file is licensed under the terms of the MIT License. +# See LICENSE.txt for details. + +import toolkit + +from datetime import datetime +from glob import iglob as glob +import filecmp +import logging +import os +import shutil +import sys +from tempfile import TemporaryDirectory + +class _TestExitCode: + SUCCESS, FAILURE, ERROR, SKIPPED = range(4) + +_KEY_EXT = 'key' +_IV_EXT = 'iv' +_PLAIN_EXT = 'plain' +_CIPHER_EXT = 'cipher' + +def _run_encryption_test(tools, tmp_dir, algo, mode, key, plain_path, cipher_path, iv=None, force=False): + logging.info('Running encryption test...') + logging.info('\tPlaintext file path: ' + plain_path) + logging.info('\tExpected ciphertext file path: ' + cipher_path) + tmp_path = os.path.join(tmp_dir, os.path.basename(cipher_path)) + logging.info('\tEncrypted file path: ' + tmp_path) + tools.run_encrypt_file(algo, mode, key, plain_path, tmp_path, iv) + if force: + logging.info('Overwriting expected ciphertext file') + shutil.copy(tmp_path, cipher_path) + return _TestExitCode.SUCCESS + if filecmp.cmp(cipher_path, tmp_path): + logging.info('The encrypted file matches the ciphertext file') + return _TestExitCode.SUCCESS + else: + logging.info('The encrypted file doesn\'t match the ciphertext file') + return _TestExitCode.FAILURE + +def _run_decryption_test(tools, tmp_dir, algo, mode, key, cipher_path, plain_path, iv=None): + logging.info('Running decryption test...') + logging.info('\tCiphertext file path: ' + cipher_path) + logging.info('\tExpected plaintext file path: ' + plain_path) + tmp_path = os.path.join(tmp_dir, os.path.basename(cipher_path)) + logging.info('\tDecrypted file path: ' + tmp_path) + tools.run_decrypt_file(algo, mode, key, cipher_path, tmp_path, iv) + if filecmp.cmp(tmp_path, plain_path): + logging.info('The decrypted file matches the plaintext file') + return _TestExitCode.SUCCESS + else: + logging.info('The decrypted file doesn\'t match the plaintext file') + return _TestExitCode.FAILURE + +def _list_dirs(root_path): + xs = map(lambda x: os.path.join(root_path, x), os.listdir(root_path)) + return filter(os.path.isdir, xs) + +def _list_files(root_path, ext): + xs = glob(os.path.join(root_path, '*.{}'.format(ext))) + return filter(os.path.isfile, xs) + +def _list_keys(root_path): + return _list_files(root_path, _KEY_EXT) + +def _read_line(path): + with open(path) as f: + return f.readline() + +def _read_key(key_path): + return _read_line(key_path) + +def _read_iv(iv_path): + return _read_line(key_path) + +def _extract_test_name(key_path): + return os.path.splitext(os.path.basename(key_path))[0] + +def _replace_ext(path, new_ext): + return '{}.{}'.format(os.path.splitext(path)[0], new_ext) + +def _build_iv_path(key_path): + return _replace_ext(key_path, _IV_EXT) + +def _build_plain_path(key_path): + return _replace_ext(key_path, _PLAIN_EXT) + +def _build_cipher_path(key_path): + return _replace_ext(key_path, _CIPHER_EXT) + +def _run_tests(tools, suite_dir, force=False): + exit_codes = [] + suite_dir = os.path.abspath(suite_dir) + logging.info('Suite directory path: ' + suite_dir) + with TemporaryDirectory() as tmp_dir: + for algo_dir in _list_dirs(suite_dir): + algo = os.path.basename(algo_dir) + algo = toolkit.to_supported_algorithm(algo) + logging.info('Algorithm: ' + algo) + for mode_dir in _list_dirs(algo_dir): + mode = os.path.basename(mode_dir) + mode = toolkit.to_supported_mode(mode) + logging.info('Mode: ' + mode) + for key_path in _list_keys(mode_dir): + key = _read_key(key_path) + logging.info('Key: ' + key) + test_name = _extract_test_name(key_path) + logging.info('Test name: ' + test_name) + iv = None + if toolkit.mode_requires_init_vector(mode): + iv_path = _build_iv_path(key_path) + iv = _read_iv(iv_path) + plain_path = _build_plain_path(key_path) + cipher_path = _build_cipher_path(key_path) + os.makedirs(os.path.join(tmp_dir, algo, mode)) + exit_codes.append(_run_encryption_test( + tools, os.path.join(tmp_dir, algo, mode), + algo, mode, key, plain_path, cipher_path, iv, force)) + if not force: + exit_codes.append(_run_decryption_test( + tools, os.path.join(tmp_dir, algo, mode), + algo, mode, key, cipher_path, plain_path, iv)) + logging.info('Test exit codes:') + logging.info('\tSkipped: {0}'.format(exit_codes.count(_TestExitCode.SKIPPED))) + logging.info('\tError(s): {0}'.format(exit_codes.count(_TestExitCode.ERROR))) + logging.info('\tSucceeded: {0}'.format(exit_codes.count(_TestExitCode.SUCCESS))) + logging.info('\tFailed: {0}'.format(exit_codes.count(_TestExitCode.FAILURE))) + if (exit_codes.count(_TestExitCode.ERROR) == 0 and + exit_codes.count(_TestExitCode.FAILURE) == 0): + sys.exit() + else: + sys.exit(1) + +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser() + parser.add_argument('--path', '-p', nargs='*', + help='set path to file encryption utilities') + parser.add_argument('--sde', '-e', action='store_true', + help='use Intel SDE to run *.exe files') + parser.add_argument('--log', '-l', help='set log file path') + parser.add_argument('--force', '-f', action='store_true', + help='overwrite ciphertext files') + parser.add_argument('--suite', '-s', default='file', + help='set test suite directory path') + args = parser.parse_args() + + logging_options = {'format': '%(asctime)s | %(module)s | %(levelname)s | %(message)s', + 'level': logging.DEBUG} + if args.log is None: + logging_options['filename'] = datetime.now().strftime('file_%Y-%m-%d_%H-%M-%S.log') + else: + logging_options['filename'] = args.log + logging.basicConfig(**logging_options) + + tools = toolkit.Tools(args.path, use_sde=args.sde, use_boxes=False) + _run_tests(tools, args.suite, args.force) diff --git a/test/file/aes128/ecb/0.cipher b/test/file/aes128/ecb/0.cipher new file mode 100644 index 0000000..b2e558d --- /dev/null +++ b/test/file/aes128/ecb/0.cipher @@ -0,0 +1 @@ +Ccfiv \ No newline at end of file diff --git a/test/file/aes128/ecb/0.key b/test/file/aes128/ecb/0.key new file mode 100644 index 0000000..445c724 --- /dev/null +++ b/test/file/aes128/ecb/0.key @@ -0,0 +1 @@ +00000000000000000000000000000000 \ No newline at end of file diff --git a/test/file/aes128/ecb/0.plain b/test/file/aes128/ecb/0.plain new file mode 100644 index 0000000..e69de29 diff --git a/test/file/aes192/ecb/0.cipher b/test/file/aes192/ecb/0.cipher new file mode 100644 index 0000000..bb790e8 --- /dev/null +++ b/test/file/aes192/ecb/0.cipher @@ -0,0 +1,2 @@ +)%'&Q)Mo + \ No newline at end of file diff --git a/test/file/aes192/ecb/0.key b/test/file/aes192/ecb/0.key new file mode 100644 index 0000000..b644ad8 --- /dev/null +++ b/test/file/aes192/ecb/0.key @@ -0,0 +1 @@ +000000000000000000000000000000000000000000000000 \ No newline at end of file diff --git a/test/file/aes192/ecb/0.plain b/test/file/aes192/ecb/0.plain new file mode 100644 index 0000000..e69de29 diff --git a/test/file/aes256/ecb/0.cipher b/test/file/aes256/ecb/0.cipher new file mode 100644 index 0000000..fd6ec3b --- /dev/null +++ b/test/file/aes256/ecb/0.cipher @@ -0,0 +1 @@ +xl1uIi C \ No newline at end of file diff --git a/test/file/aes256/ecb/0.key b/test/file/aes256/ecb/0.key new file mode 100644 index 0000000..9c7bc66 --- /dev/null +++ b/test/file/aes256/ecb/0.key @@ -0,0 +1 @@ +0000000000000000000000000000000000000000000000000000000000000000 \ No newline at end of file diff --git a/test/file/aes256/ecb/0.plain b/test/file/aes256/ecb/0.plain new file mode 100644 index 0000000..e69de29 diff --git a/test/toolkit.py b/test/toolkit.py index 1cf26b6..8890269 100644 --- a/test/toolkit.py +++ b/test/toolkit.py @@ -6,36 +6,37 @@ import collections import logging import os.path import subprocess -import sys AES128, AES192, AES256 = 'aes128', 'aes192', 'aes256' ECB, CBC, CFB, OFB, CTR = 'ecb', 'cbc', 'cfb', 'ofb', 'ctr' -_supported_algorithms = AES128, AES192, AES256 -_supported_modes = ECB, CBC, CFB, OFB, CTR +_SUPPORTED_ALGORITHMS = AES128, AES192, AES256 +_SUPPORTED_MODES = ECB, CBC, CFB, OFB, CTR def get_supported_algorithms(): - return _supported_algorithms + return _SUPPORTED_ALGORITHMS def get_supported_modes(): - return _supported_modes + return _SUPPORTED_MODES def mode_requires_init_vector(mode): + if mode not in _SUPPORTED_MODES: + raise NotImplementedError('unsupported mode of operation ' + s) return mode != ECB def to_supported_algorithm(s): s = s.lower() - if s in _supported_algorithms: + if s in _SUPPORTED_ALGORITHMS: return s - return None + raise NotImplementedError('unsupported algorithm ' + s) def to_supported_mode(s): s = s.lower() - if s in _supported_modes: + if s in _SUPPORTED_MODES: return s if s == CFB + '128': return CFB - return None + raise NotImplementedError('unsupported algorithm ' + s) class EncryptionInput: def __init__(self, key, plaintexts, iv=None): @@ -81,12 +82,14 @@ class Tools: _ENCRYPT_BLOCK = 'encrypt_block.exe' _DECRYPT_BLOCK = 'decrypt_block.exe' + _ENCRYPT_FILE = 'encrypt_file.exe' + _DECRYPT_FILE = 'decrypt_file.exe' def run(self, tool_path, algo, mode, args): cmd_list = ['sde', '--', tool_path] if self._use_sde else [tool_path] if self._use_boxes: cmd_list.append('-b') - cmd_list.extend(('-a', algo, '-m', mode, '--')) + cmd_list.extend(('-a', algo, '-m', mode)) cmd_list.extend(args) logging.info('Trying to execute: {0}'.format(subprocess.list2cmdline(cmd_list))) try: @@ -100,11 +103,12 @@ class Tools: return output.split() @staticmethod - def _inputs_to_args(inputs): + def _block_inputs_to_args(inputs): head = next(inputs, None) if head is None: - return [] - args = head.to_args() + return ['--'] + args = ['--'] + args.extend(head.to_args()) while True: tail = next(inputs, None) if tail is None: @@ -115,14 +119,34 @@ class Tools: def run_encrypt_block(self, algo, mode, inputs): if isinstance(inputs, collections.Iterable): - args = self._inputs_to_args(iter(inputs)) + args = self._block_inputs_to_args(iter(inputs)) else: args = inputs.to_args() return self.run(self._ENCRYPT_BLOCK, algo, mode, args) def run_decrypt_block(self, algo, mode, inputs): if isinstance(inputs, collections.Iterable): - args = self._inputs_to_args(iter(inputs)) + args = self._block_inputs_to_args(iter(inputs)) else: args = inputs.to_args() return self.run(self._DECRYPT_BLOCK, algo, mode, args) + + def run_encrypt_file(self, algo, mode, key, input_path, output_path, iv=None): + if mode_requires_init_vector(mode): + if not iv: + raise ToolkitError('mode \'{}\' requires init vector'.format(mode)) + return self.run(self._ENCRYPT_FILE, algo, mode, + (key, iv, input_path, output_path)) + else: + return self.run(self._ENCRYPT_FILE, algo, mode, + (key, input_path, output_path)) + + def run_decrypt_file(self, algo, mode, key, input_path, output_path, iv=None): + if mode_requires_init_vector(mode): + if not iv: + raise ToolkitError('mode \'{}\' requires init vector'.format(mode)) + return self.run(self._DECRYPT_FILE, algo, mode, + (key, iv, input_path, output_path)) + else: + return self.run(self._DECRYPT_FILE, algo, mode, + (key, input_path, output_path)) -- cgit v1.2.3