# Copyright 2015 Egor Tensin <Egor.Tensin@gmail.com>
# This file is licensed under the terms of the MIT License.
# See LICENSE.txt for details.
from collections import OrderedDict
import configparser
from datetime import datetime
from enum import Enum
import logging
import os.path
import sys
import zipfile
from toolkit import *
class _MultiOrderedDict(OrderedDict):
def __setitem__(self, key, value):
if isinstance(value, list) and key in self:
self[key].extend(value)
else:
super(OrderedDict, self).__setitem__(key, value)
def _gen_inputs(keys, plaintexts, init_vectors):
if init_vectors is None:
init_vectors = [None for key in keys]
for key, plaintext, iv in zip(keys, plaintexts, init_vectors):
yield BlockInput(key, [plaintext], iv)
def _split_into_chunks(expected_output, inputs, max_len=100):
for i in range(0, len(inputs), max_len):
yield expected_output[i:i+max_len], inputs[i:i+max_len]
def verify_test_output(actual, expected):
if len(actual) != len(expected):
logging.error('Unexpected output length {0} (expected {1})'.format(len(actual), len(expected)))
return False
if actual != expected:
logging.error('Expected output:\n' + '\n'.join(expected))
return False
return True
class TestExitCode(Enum):
SUCCESS, FAILURE, ERROR, SKIPPED = range(1, 5)
class _TestVectorsFile:
def __init__(self, path, archive):
self._archive = archive
self._path = path
self._fn = os.path.split(path)[1]
self._recognized = False
self._parse()
def recognized(self):
return self._recognized
def algorithm(self):
return self._algorithm
def mode(self):
return self._mode
def parse(self):
self._parser = configparser.ConfigParser(
dict_type=_MultiOrderedDict,
strict=False,
interpolation=None,
empty_lines_in_values=False)
self._parser.read_string(self._archive.read(self._path).decode('utf-8'))
def _extract_test_data(self, section):
keys = self._parser.get(section, 'key')
plaintexts = self._parser.get(section, 'plaintext')
ciphertexts = self._parser.get(section, 'ciphertext')
init_vectors = None
if self.mode().requires_init_vector():
init_vectors = self._parser.get(section, 'iv')
return keys, plaintexts, ciphertexts, init_vectors
def _run_tests(self, tool, inputs, expected_output, use_boxes=False):
for expected_output_chunk, input_chunk in _split_into_chunks(expected_output, list(inputs)):
actual_output = tool(self.algorithm(), self.mode(), input_chunk, use_boxes=use_boxes)
if not verify_test_output(actual_output, expected_output_chunk):
return TestExitCode.FAILURE
return TestExitCode.SUCCESS
def run_encryption_tests(self, tools, use_boxes=False):
logging.info('Running encryption tests...')
keys, plaintexts, ciphertexts, init_vectors = self._extract_test_data('ENCRYPT')
inputs = _gen_inputs(keys, plaintexts, init_vectors)
return self._run_tests(tools.run_encrypt_block, inputs, ciphertexts, use_boxes)
def run_decryption_tests(self, tools, use_boxes=False):
logging.info('Running decryption tests...')
keys, plaintexts, ciphertexts, init_vectors = self._extract_test_data('DECRYPT')
inputs = _gen_inputs(keys, ciphertexts, init_vectors)
return self._run_tests(tools.run_decrypt_block, inputs, plaintexts, use_boxes)
def _parse(self):
logging.info('Trying to parse test vectors file name \'{0}\'...'.format(self._fn))
stub = self._strip_extension(self._fn)
if not stub: return
stub = self._strip_algorithm(stub)
if not stub: return
stub = self._strip_method(stub)
if not stub: return
stub = self._strip_mode(stub)
if not stub: return
self._recognized = True
def _strip_extension(self, stub):
stub, ext = os.path.splitext(stub)
if ext != '.rsp':
logging.warn('Unknown test vectors file extension \'{0}\'!'.format(self._fn))
return None
return stub
def _strip_algorithm(self, stub):
key_size = stub[-3:]
maybe_algorithm = 'aes{0}'.format(key_size)
self._algorithm = Algorithm.try_parse(maybe_algorithm)
if self._algorithm:
logging.info('\tAlgorithm: {0}'.format(self._algorithm))
return stub[0:-3]
else:
logging.warn('Unknown or unsupported algorithm: ' + self._fn)
return None
def _strip_method(self, stub):
for method in ('GFSbox', 'KeySbox', 'VarKey', 'VarTxt'):
if stub.endswith(method):
logging.info('\tMethod: {0}'.format(method))
return stub[0:len(stub) - len(method)]
logging.warn('Unknown or unsupported method: ' + self._fn)
def _strip_mode(self, stub):
self._mode = Mode.try_parse(stub)
if self._mode:
logging.info('\tMode: {0}'.format(self._mode))
return self._mode
else:
logging.warn('Unknown or unsupported mode: ' + self._fn)
return None
def _parse_archive_and_run_tests(tools, archive_path, use_boxes=False):
archive = zipfile.ZipFile(archive_path)
exit_codes = []
for fn in archive.namelist():
member = _TestVectorsFile(fn, archive)
if member.recognized():
member.parse()
try:
exit_codes.append(member.run_encryption_tests(tools, use_boxes))
except Exception as e:
logging.error('Encountered an exception!')
logging.exception(e)
exit_codes.append(TestExitCode.ERROR)
try:
exit_codes.append(member.run_decryption_tests(tools, use_boxes))
except Exception as e:
logging.error('Encountered an exception!')
logging.exception(e)
exit_codes.append(TestExitCode.ERROR)
else:
exit_codes.append(TestExitCode.SKIPPED)
logging.info('Test exit codes:')
logging.info('\tSkipped: {}'.format(exit_codes.count(TestExitCode.SKIPPED)))
logging.info('\tError(s): {}'.format(exit_codes.count(TestExitCode.ERROR)))
logging.info('\tSucceeded: {}'.format(exit_codes.count(TestExitCode.SUCCESS)))
logging.info('\tFailed: {}'.format(exit_codes.count(TestExitCode.FAILURE)))
if (exit_codes.count(TestExitCode.ERROR) == 0 and
exit_codes.count(TestExitCode.FAILURE) == 0):
sys.exit()
else:
sys.exit(1)
def _build_default_log_path():
return datetime.now().strftime('{}_%Y-%m-%d_%H-%M-%S.log').format(
os.path.splitext(os.path.basename(__file__))[0])
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--path', '-p', nargs='*',
help='set path to block encryption utilities')
parser.add_argument('--sde', '-e', action='store_true',
help='use Intel SDE to run *.exe files')
parser.add_argument('--use-boxes', '-b', action='store_true',
help='use the "boxes" interface')
parser.add_argument('--archive', '-a', default='KAT_AES.zip',
help='set path of the archive with the test vectors')
parser.add_argument('--log', '-l', default=_build_default_log_path(),
help='set log file path')
args = parser.parse_args()
logging.basicConfig(filename=args.log,
format='%(asctime)s | %(module)s | %(levelname)s | %(message)s',
level=logging.DEBUG)
tools = Tools(args.path, use_sde=args.sde)
_parse_archive_and_run_tests(tools, args.archive, use_boxes=args.use_boxes)