diff options
author | Egor Tensin <Egor.Tensin@gmail.com> | 2023-03-05 14:49:48 +0100 |
---|---|---|
committer | Egor Tensin <Egor.Tensin@gmail.com> | 2023-03-05 14:50:37 +0100 |
commit | 406f804a8042874d5760709398d95c61ec33b6db (patch) | |
tree | b7ca5f4bf21f2be6b3d0c5cb4fbd399b7f49c786 /src | |
parent | add README.md (diff) | |
download | audit-scripts-406f804a8042874d5760709398d95c61ec33b6db.tar.gz audit-scripts-406f804a8042874d5760709398d95c61ec33b6db.zip |
rename scripts
Diffstat (limited to 'src')
-rwxr-xr-x | src/bad-attrs | 135 | ||||
-rwxr-xr-x | src/writable-dirs | 274 |
2 files changed, 409 insertions, 0 deletions
diff --git a/src/bad-attrs b/src/bad-attrs new file mode 100755 index 0000000..4683540 --- /dev/null +++ b/src/bad-attrs @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com> +# This file is part of the "audit-scripts" project. +# For details, see https://github.com/egor-tensin/audit-scripts. +# Distributed under the MIT License. + +import argparse +import array +from contextlib import contextmanager +import errno +import fcntl +import logging +import os +import stat +import sys + + +@contextmanager +def setup_logging(): + logging.basicConfig( + format='%(asctime)s | %(levelname)s | %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=logging.DEBUG) + try: + yield + except Exception as e: + logging.exception(e) + raise + + +def scandir(dir_path): + try: + entry_it = os.scandir(dir_path) + except (PermissionError, FileNotFoundError) as e: + logging.warning('%s', e) + return + with entry_it: + yield from entry_it + + +def traverse_tree(root): + queue = [root] + while queue: + for entry in scandir(queue.pop(0)): + if entry.is_dir(follow_symlinks=False): + if os.path.ismount(entry.path): + continue + queue.append(entry.path) + yield entry + + +def skip_leaf(entry): + if entry.is_dir(follow_symlinks=False): + return False + if entry.is_file(follow_symlinks=False): + return False + return True + + +@contextmanager +def low_level_open(path, flags): + fd = os.open(path, flags) + try: + yield fd + finally: + os.close(fd) + + +FS_IOC_GETFLAGS = 0x80086601 + +FS_IMMUTABLE_FL = 0x00000010 +FS_APPEND_FL = 0x00000020 + +BAD_FLAGS = [FS_IMMUTABLE_FL, FS_APPEND_FL] + + +def flags_contain_bad_flags(flags): + return any([flags & bad_flag for bad_flag in BAD_FLAGS]) + + +def fd_get_flags(fd): + a = array.array('L', [0]) + fcntl.ioctl(fd, FS_IOC_GETFLAGS, a, True) + return a[0] + + +def path_get_flags(path): + with low_level_open(path, os.O_RDONLY) as fd: + return fd_get_flags(fd) + + +def path_has_bad_flags(path): + try: + flags = path_get_flags(path) + except OSError as e: + if e.errno == errno.ENOTTY or e.errno == errno.EPERM: + # Either one of: + # Inappropriate ioctl for device + # Permission denied + # It's relied upon that fcntl throws OSError instead of + # PermissionError. + logging.warning('%s: %s', path, e) + return False + raise + return flags_contain_bad_flags(flags) + + +def do_dir(root): + logging.info('Directory: %s', root) + for entry in traverse_tree(root): + if skip_leaf(entry): + continue + #logging.debug('Path: %s', entry.path) + if path_has_bad_flags(entry.path): + logging.warning('Bad flags: %s', entry.path) + + +def parse_args(argv=None): + if argv is None: + argv = sys.argv[1:] + parser = argparse.ArgumentParser() + parser.add_argument('dir', metavar='DIR', + help='set root directory') + return parser.parse_args() + + +def main(argv=None): + with setup_logging(): + args = parse_args() + do_dir(args.dir) + + +if __name__ == '__main__': + main() diff --git a/src/writable-dirs b/src/writable-dirs new file mode 100755 index 0000000..19f6094 --- /dev/null +++ b/src/writable-dirs @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2018 Egor Tensin <Egor.Tensin@gmail.com> +# This file is part of the "audit-scripts" project. +# For details, see https://github.com/egor-tensin/audit-scripts. +# Distributed under the MIT License. + +import argparse +import contextlib +import grp +import logging +import logging.config +import logging.handlers +import multiprocessing as mp +import os +import pwd +import sys + + +def console_log_formatter(): + fmt = '%(asctime)s | %(processName)s | %(levelname)s | %(message)s' + datefmt = '%Y-%m-%d %H:%M:%S' + return logging.Formatter(fmt=fmt, datefmt=datefmt) + + +def console_log_level(): + return logging.DEBUG + + +def console_log_handler(): + handler = logging.StreamHandler() + handler.setFormatter(console_log_formatter()) + handler.setLevel(console_log_level()) + return handler + + +@contextlib.contextmanager +def launch_logging_thread(queue): + process = mp.current_process() + process.name = 'scandir' + listener = logging.handlers.QueueListener(queue, console_log_handler()) + listener.start() + try: + yield + finally: + listener.stop() + + +@contextlib.contextmanager +def setup_logging(queue): + config = { + 'version': 1, + 'handlers': { + 'sink': { + 'class': 'logging.handlers.QueueHandler', + 'queue': queue, + }, + }, + 'root': { + 'handlers': ['sink'], + 'level': 'DEBUG', + }, + } + logging.config.dictConfig(config) + try: + yield + except Exception as e: + logging.exception(e) + + +def validate_uid(uid): + try: + return pwd.getpwuid(uid).pw_uid + except KeyError: + return None + + +def validate_gid(gid): + try: + return grp.getgrgid(gid).gr_gid + except KeyError: + return None + + +def map_user_name(user_name): + try: + return pwd.getpwnam(user_name).pw_uid + except KeyError: + return None + + +def map_group_name(group_name): + try: + return grp.getgrnam(group_name).gr_gid + except KeyError: + return None + + +def parse_user_name(src): + uid = map_user_name(src) + if uid is None: + raise argparse.ArgumentTypeError('unknown user name: {}'.format(src)) + return uid + + +def parse_group_name(src): + gid = map_group_name(src) + if gid is None: + raise argparse.ArgumentTypeError('unknown group name: {}'.format(src)) + return gid + + +def parse_uid(src): + try: + uid = int(src) + except ValueError: + return parse_user_name(src) + uid = validate_uid(uid) + if uid is None: + raise argparse.ArgumentTypeError('unknown UID: {}'.format(src)) + return uid + + +def parse_gid(src): + try: + gid = int(src) + except ValueError: + return parse_group_name(src) + gid = validate_gid(gid) + if gid is None: + raise argparse.ArgumentTypeError('unknown GID: {}'.format(src)) + return gid + + +def get_primary_gid(uid): + return pwd.getpwuid(uid).pw_gid + + +def parse_args(argv=None): + if argv is None: + argv = sys.argv[1:] + parser = argparse.ArgumentParser() + parser.add_argument('root_dir', default='/', nargs='?', metavar='DIR', + help='set root directory') + parser.add_argument('--user', '-u', dest='uid', required=True, + metavar='USER', type=parse_uid, + help='set new process\' UID') + parser.add_argument('--group', '-g', dest='gid', + metavar='GROUP', type=parse_gid, + help='set new process\' GID') + args = parser.parse_args(argv) + if args.gid is None: + args.gid = get_primary_gid(args.uid) + return args + + +def dump_process_info(): + ruid, euid, suid = os.getresuid() + logging.info('User IDs:') + logging.info('\tReal: %d', ruid) + logging.info('\tEffective: %d', euid) + logging.info('\tSaved: %d', suid) + rgid, egid, sgid = os.getresgid() + logging.info('Group IDs:') + logging.info('\tReal: %d', rgid) + logging.info('\tEffective: %d', egid) + logging.info('\tSaved: %d', sgid) + + +def check_root(): + if os.getuid() == 0: + return True + logging.error('Must be run as root') + return False + + +def scandir(dir_path): + try: + entry_it = os.scandir(dir_path) + except (PermissionError, FileNotFoundError) as e: + logging.warning('%s', e) + return + with entry_it: + yield from entry_it + + +def enum_dirs(dir_path): + for entry in scandir(dir_path): + if entry.is_dir(follow_symlinks=False): + yield entry.path + + +def is_writable_via_access(dir_path): + return os.access(dir_path, os.W_OK | os.X_OK) + + +def is_writable(dir_path): + return is_writable_via_access(dir_path) + + +def drop_privileges(uid, gid): + if not check_root(): + return + os.setgroups([]) + os.setgid(gid) + os.setuid(uid) + os.umask(0o77) + + +@contextlib.contextmanager +def close_queue(queue): + try: + yield + finally: + queue.put(None) + + +def access_loop(access_queue, scandir_queue): + for dir_list in iter(access_queue.get, None): + denied_dir_list = [] + for dir_path in dir_list: + if is_writable(dir_path): + logging.info('Writable: %s', dir_path) + else: + denied_dir_list.append(dir_path) + if not denied_dir_list: + break + scandir_queue.put(denied_dir_list) + + +def access_main(args, log_queue, access_queue, scandir_queue): + with close_queue(scandir_queue), setup_logging(log_queue): + drop_privileges(args.uid, args.gid) + dump_process_info() + access_loop(access_queue, scandir_queue) + + +def scandir_loop(access_queue, scandir_queue): + for parent_dir_list in iter(scandir_queue.get, None): + child_dir_list = [child_dir + for parent_dir in parent_dir_list + for child_dir in enum_dirs(parent_dir)] + if not child_dir_list: + break + access_queue.put(child_dir_list) + + +def scandir_main(access_queue, scandir_queue): + with close_queue(access_queue): + if not check_root(): + return + dump_process_info() + scandir_loop(access_queue, scandir_queue) + + +def main(argv=None): + log_queue = mp.Queue() + with launch_logging_thread(log_queue), setup_logging(log_queue): + prog_args = parse_args(argv) + access_queue = mp.SimpleQueue() + scandir_queue = mp.SimpleQueue() + access_process_args = prog_args, log_queue, access_queue, scandir_queue + access_process = mp.Process(target=access_main, + args=access_process_args, + name='access') + access_queue.put([prog_args.root_dir]) + access_process.start() + scandir_main(access_queue, scandir_queue) + access_process.join() + + +if __name__ == '__main__': + mp.set_start_method('spawn') + main() |