From 104e49ea4563e7d7ae9a1ffdba808e98bfec3615 Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Fri, 17 Mar 2023 23:43:03 +0100 Subject: add extensions to script files --- src/bad-attrs | 135 ------------------------- src/bad-attrs.py | 135 +++++++++++++++++++++++++ src/writable-dirs | 274 --------------------------------------------------- src/writable-dirs.py | 274 +++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 409 insertions(+), 409 deletions(-) delete mode 100755 src/bad-attrs create mode 100755 src/bad-attrs.py delete mode 100755 src/writable-dirs create mode 100755 src/writable-dirs.py (limited to 'src') diff --git a/src/bad-attrs b/src/bad-attrs deleted file mode 100755 index 4683540..0000000 --- a/src/bad-attrs +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (c) 2023 Egor Tensin -# This file is part of the "audit-scripts" project. -# For details, see https://github.com/egor-tensin/audit-scripts. -# Distributed under the MIT License. - -import argparse -import array -from contextlib import contextmanager -import errno -import fcntl -import logging -import os -import stat -import sys - - -@contextmanager -def setup_logging(): - logging.basicConfig( - format='%(asctime)s | %(levelname)s | %(message)s', - datefmt='%Y-%m-%d %H:%M:%S', - level=logging.DEBUG) - try: - yield - except Exception as e: - logging.exception(e) - raise - - -def scandir(dir_path): - try: - entry_it = os.scandir(dir_path) - except (PermissionError, FileNotFoundError) as e: - logging.warning('%s', e) - return - with entry_it: - yield from entry_it - - -def traverse_tree(root): - queue = [root] - while queue: - for entry in scandir(queue.pop(0)): - if entry.is_dir(follow_symlinks=False): - if os.path.ismount(entry.path): - continue - queue.append(entry.path) - yield entry - - -def skip_leaf(entry): - if entry.is_dir(follow_symlinks=False): - return False - if entry.is_file(follow_symlinks=False): - return False - return True - - -@contextmanager -def low_level_open(path, flags): - fd = os.open(path, flags) - try: - yield fd - finally: - os.close(fd) - - -FS_IOC_GETFLAGS = 0x80086601 - -FS_IMMUTABLE_FL = 0x00000010 -FS_APPEND_FL = 0x00000020 - -BAD_FLAGS = [FS_IMMUTABLE_FL, FS_APPEND_FL] - - -def flags_contain_bad_flags(flags): - return any([flags & bad_flag for bad_flag in BAD_FLAGS]) - - -def fd_get_flags(fd): - a = array.array('L', [0]) - fcntl.ioctl(fd, FS_IOC_GETFLAGS, a, True) - return a[0] - - -def path_get_flags(path): - with low_level_open(path, os.O_RDONLY) as fd: - return fd_get_flags(fd) - - -def path_has_bad_flags(path): - try: - flags = path_get_flags(path) - except OSError as e: - if e.errno == errno.ENOTTY or e.errno == errno.EPERM: - # Either one of: - # Inappropriate ioctl for device - # Permission denied - # It's relied upon that fcntl throws OSError instead of - # PermissionError. - logging.warning('%s: %s', path, e) - return False - raise - return flags_contain_bad_flags(flags) - - -def do_dir(root): - logging.info('Directory: %s', root) - for entry in traverse_tree(root): - if skip_leaf(entry): - continue - #logging.debug('Path: %s', entry.path) - if path_has_bad_flags(entry.path): - logging.warning('Bad flags: %s', entry.path) - - -def parse_args(argv=None): - if argv is None: - argv = sys.argv[1:] - parser = argparse.ArgumentParser() - parser.add_argument('dir', metavar='DIR', - help='set root directory') - return parser.parse_args() - - -def main(argv=None): - with setup_logging(): - args = parse_args() - do_dir(args.dir) - - -if __name__ == '__main__': - main() diff --git a/src/bad-attrs.py b/src/bad-attrs.py new file mode 100755 index 0000000..4683540 --- /dev/null +++ b/src/bad-attrs.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2023 Egor Tensin +# This file is part of the "audit-scripts" project. +# For details, see https://github.com/egor-tensin/audit-scripts. +# Distributed under the MIT License. + +import argparse +import array +from contextlib import contextmanager +import errno +import fcntl +import logging +import os +import stat +import sys + + +@contextmanager +def setup_logging(): + logging.basicConfig( + format='%(asctime)s | %(levelname)s | %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=logging.DEBUG) + try: + yield + except Exception as e: + logging.exception(e) + raise + + +def scandir(dir_path): + try: + entry_it = os.scandir(dir_path) + except (PermissionError, FileNotFoundError) as e: + logging.warning('%s', e) + return + with entry_it: + yield from entry_it + + +def traverse_tree(root): + queue = [root] + while queue: + for entry in scandir(queue.pop(0)): + if entry.is_dir(follow_symlinks=False): + if os.path.ismount(entry.path): + continue + queue.append(entry.path) + yield entry + + +def skip_leaf(entry): + if entry.is_dir(follow_symlinks=False): + return False + if entry.is_file(follow_symlinks=False): + return False + return True + + +@contextmanager +def low_level_open(path, flags): + fd = os.open(path, flags) + try: + yield fd + finally: + os.close(fd) + + +FS_IOC_GETFLAGS = 0x80086601 + +FS_IMMUTABLE_FL = 0x00000010 +FS_APPEND_FL = 0x00000020 + +BAD_FLAGS = [FS_IMMUTABLE_FL, FS_APPEND_FL] + + +def flags_contain_bad_flags(flags): + return any([flags & bad_flag for bad_flag in BAD_FLAGS]) + + +def fd_get_flags(fd): + a = array.array('L', [0]) + fcntl.ioctl(fd, FS_IOC_GETFLAGS, a, True) + return a[0] + + +def path_get_flags(path): + with low_level_open(path, os.O_RDONLY) as fd: + return fd_get_flags(fd) + + +def path_has_bad_flags(path): + try: + flags = path_get_flags(path) + except OSError as e: + if e.errno == errno.ENOTTY or e.errno == errno.EPERM: + # Either one of: + # Inappropriate ioctl for device + # Permission denied + # It's relied upon that fcntl throws OSError instead of + # PermissionError. + logging.warning('%s: %s', path, e) + return False + raise + return flags_contain_bad_flags(flags) + + +def do_dir(root): + logging.info('Directory: %s', root) + for entry in traverse_tree(root): + if skip_leaf(entry): + continue + #logging.debug('Path: %s', entry.path) + if path_has_bad_flags(entry.path): + logging.warning('Bad flags: %s', entry.path) + + +def parse_args(argv=None): + if argv is None: + argv = sys.argv[1:] + parser = argparse.ArgumentParser() + parser.add_argument('dir', metavar='DIR', + help='set root directory') + return parser.parse_args() + + +def main(argv=None): + with setup_logging(): + args = parse_args() + do_dir(args.dir) + + +if __name__ == '__main__': + main() diff --git a/src/writable-dirs b/src/writable-dirs deleted file mode 100755 index 19f6094..0000000 --- a/src/writable-dirs +++ /dev/null @@ -1,274 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (c) 2018 Egor Tensin -# This file is part of the "audit-scripts" project. -# For details, see https://github.com/egor-tensin/audit-scripts. -# Distributed under the MIT License. - -import argparse -import contextlib -import grp -import logging -import logging.config -import logging.handlers -import multiprocessing as mp -import os -import pwd -import sys - - -def console_log_formatter(): - fmt = '%(asctime)s | %(processName)s | %(levelname)s | %(message)s' - datefmt = '%Y-%m-%d %H:%M:%S' - return logging.Formatter(fmt=fmt, datefmt=datefmt) - - -def console_log_level(): - return logging.DEBUG - - -def console_log_handler(): - handler = logging.StreamHandler() - handler.setFormatter(console_log_formatter()) - handler.setLevel(console_log_level()) - return handler - - -@contextlib.contextmanager -def launch_logging_thread(queue): - process = mp.current_process() - process.name = 'scandir' - listener = logging.handlers.QueueListener(queue, console_log_handler()) - listener.start() - try: - yield - finally: - listener.stop() - - -@contextlib.contextmanager -def setup_logging(queue): - config = { - 'version': 1, - 'handlers': { - 'sink': { - 'class': 'logging.handlers.QueueHandler', - 'queue': queue, - }, - }, - 'root': { - 'handlers': ['sink'], - 'level': 'DEBUG', - }, - } - logging.config.dictConfig(config) - try: - yield - except Exception as e: - logging.exception(e) - - -def validate_uid(uid): - try: - return pwd.getpwuid(uid).pw_uid - except KeyError: - return None - - -def validate_gid(gid): - try: - return grp.getgrgid(gid).gr_gid - except KeyError: - return None - - -def map_user_name(user_name): - try: - return pwd.getpwnam(user_name).pw_uid - except KeyError: - return None - - -def map_group_name(group_name): - try: - return grp.getgrnam(group_name).gr_gid - except KeyError: - return None - - -def parse_user_name(src): - uid = map_user_name(src) - if uid is None: - raise argparse.ArgumentTypeError('unknown user name: {}'.format(src)) - return uid - - -def parse_group_name(src): - gid = map_group_name(src) - if gid is None: - raise argparse.ArgumentTypeError('unknown group name: {}'.format(src)) - return gid - - -def parse_uid(src): - try: - uid = int(src) - except ValueError: - return parse_user_name(src) - uid = validate_uid(uid) - if uid is None: - raise argparse.ArgumentTypeError('unknown UID: {}'.format(src)) - return uid - - -def parse_gid(src): - try: - gid = int(src) - except ValueError: - return parse_group_name(src) - gid = validate_gid(gid) - if gid is None: - raise argparse.ArgumentTypeError('unknown GID: {}'.format(src)) - return gid - - -def get_primary_gid(uid): - return pwd.getpwuid(uid).pw_gid - - -def parse_args(argv=None): - if argv is None: - argv = sys.argv[1:] - parser = argparse.ArgumentParser() - parser.add_argument('root_dir', default='/', nargs='?', metavar='DIR', - help='set root directory') - parser.add_argument('--user', '-u', dest='uid', required=True, - metavar='USER', type=parse_uid, - help='set new process\' UID') - parser.add_argument('--group', '-g', dest='gid', - metavar='GROUP', type=parse_gid, - help='set new process\' GID') - args = parser.parse_args(argv) - if args.gid is None: - args.gid = get_primary_gid(args.uid) - return args - - -def dump_process_info(): - ruid, euid, suid = os.getresuid() - logging.info('User IDs:') - logging.info('\tReal: %d', ruid) - logging.info('\tEffective: %d', euid) - logging.info('\tSaved: %d', suid) - rgid, egid, sgid = os.getresgid() - logging.info('Group IDs:') - logging.info('\tReal: %d', rgid) - logging.info('\tEffective: %d', egid) - logging.info('\tSaved: %d', sgid) - - -def check_root(): - if os.getuid() == 0: - return True - logging.error('Must be run as root') - return False - - -def scandir(dir_path): - try: - entry_it = os.scandir(dir_path) - except (PermissionError, FileNotFoundError) as e: - logging.warning('%s', e) - return - with entry_it: - yield from entry_it - - -def enum_dirs(dir_path): - for entry in scandir(dir_path): - if entry.is_dir(follow_symlinks=False): - yield entry.path - - -def is_writable_via_access(dir_path): - return os.access(dir_path, os.W_OK | os.X_OK) - - -def is_writable(dir_path): - return is_writable_via_access(dir_path) - - -def drop_privileges(uid, gid): - if not check_root(): - return - os.setgroups([]) - os.setgid(gid) - os.setuid(uid) - os.umask(0o77) - - -@contextlib.contextmanager -def close_queue(queue): - try: - yield - finally: - queue.put(None) - - -def access_loop(access_queue, scandir_queue): - for dir_list in iter(access_queue.get, None): - denied_dir_list = [] - for dir_path in dir_list: - if is_writable(dir_path): - logging.info('Writable: %s', dir_path) - else: - denied_dir_list.append(dir_path) - if not denied_dir_list: - break - scandir_queue.put(denied_dir_list) - - -def access_main(args, log_queue, access_queue, scandir_queue): - with close_queue(scandir_queue), setup_logging(log_queue): - drop_privileges(args.uid, args.gid) - dump_process_info() - access_loop(access_queue, scandir_queue) - - -def scandir_loop(access_queue, scandir_queue): - for parent_dir_list in iter(scandir_queue.get, None): - child_dir_list = [child_dir - for parent_dir in parent_dir_list - for child_dir in enum_dirs(parent_dir)] - if not child_dir_list: - break - access_queue.put(child_dir_list) - - -def scandir_main(access_queue, scandir_queue): - with close_queue(access_queue): - if not check_root(): - return - dump_process_info() - scandir_loop(access_queue, scandir_queue) - - -def main(argv=None): - log_queue = mp.Queue() - with launch_logging_thread(log_queue), setup_logging(log_queue): - prog_args = parse_args(argv) - access_queue = mp.SimpleQueue() - scandir_queue = mp.SimpleQueue() - access_process_args = prog_args, log_queue, access_queue, scandir_queue - access_process = mp.Process(target=access_main, - args=access_process_args, - name='access') - access_queue.put([prog_args.root_dir]) - access_process.start() - scandir_main(access_queue, scandir_queue) - access_process.join() - - -if __name__ == '__main__': - mp.set_start_method('spawn') - main() diff --git a/src/writable-dirs.py b/src/writable-dirs.py new file mode 100755 index 0000000..19f6094 --- /dev/null +++ b/src/writable-dirs.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2018 Egor Tensin +# This file is part of the "audit-scripts" project. +# For details, see https://github.com/egor-tensin/audit-scripts. +# Distributed under the MIT License. + +import argparse +import contextlib +import grp +import logging +import logging.config +import logging.handlers +import multiprocessing as mp +import os +import pwd +import sys + + +def console_log_formatter(): + fmt = '%(asctime)s | %(processName)s | %(levelname)s | %(message)s' + datefmt = '%Y-%m-%d %H:%M:%S' + return logging.Formatter(fmt=fmt, datefmt=datefmt) + + +def console_log_level(): + return logging.DEBUG + + +def console_log_handler(): + handler = logging.StreamHandler() + handler.setFormatter(console_log_formatter()) + handler.setLevel(console_log_level()) + return handler + + +@contextlib.contextmanager +def launch_logging_thread(queue): + process = mp.current_process() + process.name = 'scandir' + listener = logging.handlers.QueueListener(queue, console_log_handler()) + listener.start() + try: + yield + finally: + listener.stop() + + +@contextlib.contextmanager +def setup_logging(queue): + config = { + 'version': 1, + 'handlers': { + 'sink': { + 'class': 'logging.handlers.QueueHandler', + 'queue': queue, + }, + }, + 'root': { + 'handlers': ['sink'], + 'level': 'DEBUG', + }, + } + logging.config.dictConfig(config) + try: + yield + except Exception as e: + logging.exception(e) + + +def validate_uid(uid): + try: + return pwd.getpwuid(uid).pw_uid + except KeyError: + return None + + +def validate_gid(gid): + try: + return grp.getgrgid(gid).gr_gid + except KeyError: + return None + + +def map_user_name(user_name): + try: + return pwd.getpwnam(user_name).pw_uid + except KeyError: + return None + + +def map_group_name(group_name): + try: + return grp.getgrnam(group_name).gr_gid + except KeyError: + return None + + +def parse_user_name(src): + uid = map_user_name(src) + if uid is None: + raise argparse.ArgumentTypeError('unknown user name: {}'.format(src)) + return uid + + +def parse_group_name(src): + gid = map_group_name(src) + if gid is None: + raise argparse.ArgumentTypeError('unknown group name: {}'.format(src)) + return gid + + +def parse_uid(src): + try: + uid = int(src) + except ValueError: + return parse_user_name(src) + uid = validate_uid(uid) + if uid is None: + raise argparse.ArgumentTypeError('unknown UID: {}'.format(src)) + return uid + + +def parse_gid(src): + try: + gid = int(src) + except ValueError: + return parse_group_name(src) + gid = validate_gid(gid) + if gid is None: + raise argparse.ArgumentTypeError('unknown GID: {}'.format(src)) + return gid + + +def get_primary_gid(uid): + return pwd.getpwuid(uid).pw_gid + + +def parse_args(argv=None): + if argv is None: + argv = sys.argv[1:] + parser = argparse.ArgumentParser() + parser.add_argument('root_dir', default='/', nargs='?', metavar='DIR', + help='set root directory') + parser.add_argument('--user', '-u', dest='uid', required=True, + metavar='USER', type=parse_uid, + help='set new process\' UID') + parser.add_argument('--group', '-g', dest='gid', + metavar='GROUP', type=parse_gid, + help='set new process\' GID') + args = parser.parse_args(argv) + if args.gid is None: + args.gid = get_primary_gid(args.uid) + return args + + +def dump_process_info(): + ruid, euid, suid = os.getresuid() + logging.info('User IDs:') + logging.info('\tReal: %d', ruid) + logging.info('\tEffective: %d', euid) + logging.info('\tSaved: %d', suid) + rgid, egid, sgid = os.getresgid() + logging.info('Group IDs:') + logging.info('\tReal: %d', rgid) + logging.info('\tEffective: %d', egid) + logging.info('\tSaved: %d', sgid) + + +def check_root(): + if os.getuid() == 0: + return True + logging.error('Must be run as root') + return False + + +def scandir(dir_path): + try: + entry_it = os.scandir(dir_path) + except (PermissionError, FileNotFoundError) as e: + logging.warning('%s', e) + return + with entry_it: + yield from entry_it + + +def enum_dirs(dir_path): + for entry in scandir(dir_path): + if entry.is_dir(follow_symlinks=False): + yield entry.path + + +def is_writable_via_access(dir_path): + return os.access(dir_path, os.W_OK | os.X_OK) + + +def is_writable(dir_path): + return is_writable_via_access(dir_path) + + +def drop_privileges(uid, gid): + if not check_root(): + return + os.setgroups([]) + os.setgid(gid) + os.setuid(uid) + os.umask(0o77) + + +@contextlib.contextmanager +def close_queue(queue): + try: + yield + finally: + queue.put(None) + + +def access_loop(access_queue, scandir_queue): + for dir_list in iter(access_queue.get, None): + denied_dir_list = [] + for dir_path in dir_list: + if is_writable(dir_path): + logging.info('Writable: %s', dir_path) + else: + denied_dir_list.append(dir_path) + if not denied_dir_list: + break + scandir_queue.put(denied_dir_list) + + +def access_main(args, log_queue, access_queue, scandir_queue): + with close_queue(scandir_queue), setup_logging(log_queue): + drop_privileges(args.uid, args.gid) + dump_process_info() + access_loop(access_queue, scandir_queue) + + +def scandir_loop(access_queue, scandir_queue): + for parent_dir_list in iter(scandir_queue.get, None): + child_dir_list = [child_dir + for parent_dir in parent_dir_list + for child_dir in enum_dirs(parent_dir)] + if not child_dir_list: + break + access_queue.put(child_dir_list) + + +def scandir_main(access_queue, scandir_queue): + with close_queue(access_queue): + if not check_root(): + return + dump_process_info() + scandir_loop(access_queue, scandir_queue) + + +def main(argv=None): + log_queue = mp.Queue() + with launch_logging_thread(log_queue), setup_logging(log_queue): + prog_args = parse_args(argv) + access_queue = mp.SimpleQueue() + scandir_queue = mp.SimpleQueue() + access_process_args = prog_args, log_queue, access_queue, scandir_queue + access_process = mp.Process(target=access_main, + args=access_process_args, + name='access') + access_queue.put([prog_args.root_dir]) + access_process.start() + scandir_main(access_queue, scandir_queue) + access_process.join() + + +if __name__ == '__main__': + mp.set_start_method('spawn') + main() -- cgit v1.2.3