diff options
Diffstat (limited to 'src/writable-dirs')
-rwxr-xr-x | src/writable-dirs | 274 |
1 files changed, 274 insertions, 0 deletions
diff --git a/src/writable-dirs b/src/writable-dirs new file mode 100755 index 0000000..19f6094 --- /dev/null +++ b/src/writable-dirs @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2018 Egor Tensin <Egor.Tensin@gmail.com> +# This file is part of the "audit-scripts" project. +# For details, see https://github.com/egor-tensin/audit-scripts. +# Distributed under the MIT License. + +import argparse +import contextlib +import grp +import logging +import logging.config +import logging.handlers +import multiprocessing as mp +import os +import pwd +import sys + + +def console_log_formatter(): + fmt = '%(asctime)s | %(processName)s | %(levelname)s | %(message)s' + datefmt = '%Y-%m-%d %H:%M:%S' + return logging.Formatter(fmt=fmt, datefmt=datefmt) + + +def console_log_level(): + return logging.DEBUG + + +def console_log_handler(): + handler = logging.StreamHandler() + handler.setFormatter(console_log_formatter()) + handler.setLevel(console_log_level()) + return handler + + +@contextlib.contextmanager +def launch_logging_thread(queue): + process = mp.current_process() + process.name = 'scandir' + listener = logging.handlers.QueueListener(queue, console_log_handler()) + listener.start() + try: + yield + finally: + listener.stop() + + +@contextlib.contextmanager +def setup_logging(queue): + config = { + 'version': 1, + 'handlers': { + 'sink': { + 'class': 'logging.handlers.QueueHandler', + 'queue': queue, + }, + }, + 'root': { + 'handlers': ['sink'], + 'level': 'DEBUG', + }, + } + logging.config.dictConfig(config) + try: + yield + except Exception as e: + logging.exception(e) + + +def validate_uid(uid): + try: + return pwd.getpwuid(uid).pw_uid + except KeyError: + return None + + +def validate_gid(gid): + try: + return grp.getgrgid(gid).gr_gid + except KeyError: + return None + + +def map_user_name(user_name): + try: + return pwd.getpwnam(user_name).pw_uid + except KeyError: + return None + + +def map_group_name(group_name): + try: + return grp.getgrnam(group_name).gr_gid + except KeyError: + return None + + +def parse_user_name(src): + uid = map_user_name(src) + if uid is None: + raise argparse.ArgumentTypeError('unknown user name: {}'.format(src)) + return uid + + +def parse_group_name(src): + gid = map_group_name(src) + if gid is None: + raise argparse.ArgumentTypeError('unknown group name: {}'.format(src)) + return gid + + +def parse_uid(src): + try: + uid = int(src) + except ValueError: + return parse_user_name(src) + uid = validate_uid(uid) + if uid is None: + raise argparse.ArgumentTypeError('unknown UID: {}'.format(src)) + return uid + + +def parse_gid(src): + try: + gid = int(src) + except ValueError: + return parse_group_name(src) + gid = validate_gid(gid) + if gid is None: + raise argparse.ArgumentTypeError('unknown GID: {}'.format(src)) + return gid + + +def get_primary_gid(uid): + return pwd.getpwuid(uid).pw_gid + + +def parse_args(argv=None): + if argv is None: + argv = sys.argv[1:] + parser = argparse.ArgumentParser() + parser.add_argument('root_dir', default='/', nargs='?', metavar='DIR', + help='set root directory') + parser.add_argument('--user', '-u', dest='uid', required=True, + metavar='USER', type=parse_uid, + help='set new process\' UID') + parser.add_argument('--group', '-g', dest='gid', + metavar='GROUP', type=parse_gid, + help='set new process\' GID') + args = parser.parse_args(argv) + if args.gid is None: + args.gid = get_primary_gid(args.uid) + return args + + +def dump_process_info(): + ruid, euid, suid = os.getresuid() + logging.info('User IDs:') + logging.info('\tReal: %d', ruid) + logging.info('\tEffective: %d', euid) + logging.info('\tSaved: %d', suid) + rgid, egid, sgid = os.getresgid() + logging.info('Group IDs:') + logging.info('\tReal: %d', rgid) + logging.info('\tEffective: %d', egid) + logging.info('\tSaved: %d', sgid) + + +def check_root(): + if os.getuid() == 0: + return True + logging.error('Must be run as root') + return False + + +def scandir(dir_path): + try: + entry_it = os.scandir(dir_path) + except (PermissionError, FileNotFoundError) as e: + logging.warning('%s', e) + return + with entry_it: + yield from entry_it + + +def enum_dirs(dir_path): + for entry in scandir(dir_path): + if entry.is_dir(follow_symlinks=False): + yield entry.path + + +def is_writable_via_access(dir_path): + return os.access(dir_path, os.W_OK | os.X_OK) + + +def is_writable(dir_path): + return is_writable_via_access(dir_path) + + +def drop_privileges(uid, gid): + if not check_root(): + return + os.setgroups([]) + os.setgid(gid) + os.setuid(uid) + os.umask(0o77) + + +@contextlib.contextmanager +def close_queue(queue): + try: + yield + finally: + queue.put(None) + + +def access_loop(access_queue, scandir_queue): + for dir_list in iter(access_queue.get, None): + denied_dir_list = [] + for dir_path in dir_list: + if is_writable(dir_path): + logging.info('Writable: %s', dir_path) + else: + denied_dir_list.append(dir_path) + if not denied_dir_list: + break + scandir_queue.put(denied_dir_list) + + +def access_main(args, log_queue, access_queue, scandir_queue): + with close_queue(scandir_queue), setup_logging(log_queue): + drop_privileges(args.uid, args.gid) + dump_process_info() + access_loop(access_queue, scandir_queue) + + +def scandir_loop(access_queue, scandir_queue): + for parent_dir_list in iter(scandir_queue.get, None): + child_dir_list = [child_dir + for parent_dir in parent_dir_list + for child_dir in enum_dirs(parent_dir)] + if not child_dir_list: + break + access_queue.put(child_dir_list) + + +def scandir_main(access_queue, scandir_queue): + with close_queue(access_queue): + if not check_root(): + return + dump_process_info() + scandir_loop(access_queue, scandir_queue) + + +def main(argv=None): + log_queue = mp.Queue() + with launch_logging_thread(log_queue), setup_logging(log_queue): + prog_args = parse_args(argv) + access_queue = mp.SimpleQueue() + scandir_queue = mp.SimpleQueue() + access_process_args = prog_args, log_queue, access_queue, scandir_queue + access_process = mp.Process(target=access_main, + args=access_process_args, + name='access') + access_queue.put([prog_args.root_dir]) + access_process.start() + scandir_main(access_queue, scandir_queue) + access_process.join() + + +if __name__ == '__main__': + mp.set_start_method('spawn') + main() |