From 104e49ea4563e7d7ae9a1ffdba808e98bfec3615 Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Fri, 17 Mar 2023 23:43:03 +0100 Subject: add extensions to script files --- src/writable-dirs | 274 ------------------------------------------------------ 1 file changed, 274 deletions(-) delete mode 100755 src/writable-dirs (limited to 'src/writable-dirs') diff --git a/src/writable-dirs b/src/writable-dirs deleted file mode 100755 index 19f6094..0000000 --- a/src/writable-dirs +++ /dev/null @@ -1,274 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (c) 2018 Egor Tensin -# This file is part of the "audit-scripts" project. -# For details, see https://github.com/egor-tensin/audit-scripts. -# Distributed under the MIT License. - -import argparse -import contextlib -import grp -import logging -import logging.config -import logging.handlers -import multiprocessing as mp -import os -import pwd -import sys - - -def console_log_formatter(): - fmt = '%(asctime)s | %(processName)s | %(levelname)s | %(message)s' - datefmt = '%Y-%m-%d %H:%M:%S' - return logging.Formatter(fmt=fmt, datefmt=datefmt) - - -def console_log_level(): - return logging.DEBUG - - -def console_log_handler(): - handler = logging.StreamHandler() - handler.setFormatter(console_log_formatter()) - handler.setLevel(console_log_level()) - return handler - - -@contextlib.contextmanager -def launch_logging_thread(queue): - process = mp.current_process() - process.name = 'scandir' - listener = logging.handlers.QueueListener(queue, console_log_handler()) - listener.start() - try: - yield - finally: - listener.stop() - - -@contextlib.contextmanager -def setup_logging(queue): - config = { - 'version': 1, - 'handlers': { - 'sink': { - 'class': 'logging.handlers.QueueHandler', - 'queue': queue, - }, - }, - 'root': { - 'handlers': ['sink'], - 'level': 'DEBUG', - }, - } - logging.config.dictConfig(config) - try: - yield - except Exception as e: - logging.exception(e) - - -def validate_uid(uid): - try: - return pwd.getpwuid(uid).pw_uid - except KeyError: - return None - - -def validate_gid(gid): - try: - return grp.getgrgid(gid).gr_gid - except KeyError: - return None - - -def map_user_name(user_name): - try: - return pwd.getpwnam(user_name).pw_uid - except KeyError: - return None - - -def map_group_name(group_name): - try: - return grp.getgrnam(group_name).gr_gid - except KeyError: - return None - - -def parse_user_name(src): - uid = map_user_name(src) - if uid is None: - raise argparse.ArgumentTypeError('unknown user name: {}'.format(src)) - return uid - - -def parse_group_name(src): - gid = map_group_name(src) - if gid is None: - raise argparse.ArgumentTypeError('unknown group name: {}'.format(src)) - return gid - - -def parse_uid(src): - try: - uid = int(src) - except ValueError: - return parse_user_name(src) - uid = validate_uid(uid) - if uid is None: - raise argparse.ArgumentTypeError('unknown UID: {}'.format(src)) - return uid - - -def parse_gid(src): - try: - gid = int(src) - except ValueError: - return parse_group_name(src) - gid = validate_gid(gid) - if gid is None: - raise argparse.ArgumentTypeError('unknown GID: {}'.format(src)) - return gid - - -def get_primary_gid(uid): - return pwd.getpwuid(uid).pw_gid - - -def parse_args(argv=None): - if argv is None: - argv = sys.argv[1:] - parser = argparse.ArgumentParser() - parser.add_argument('root_dir', default='/', nargs='?', metavar='DIR', - help='set root directory') - parser.add_argument('--user', '-u', dest='uid', required=True, - metavar='USER', type=parse_uid, - help='set new process\' UID') - parser.add_argument('--group', '-g', dest='gid', - metavar='GROUP', type=parse_gid, - help='set new process\' GID') - args = parser.parse_args(argv) - if args.gid is None: - args.gid = get_primary_gid(args.uid) - return args - - -def dump_process_info(): - ruid, euid, suid = os.getresuid() - logging.info('User IDs:') - logging.info('\tReal: %d', ruid) - logging.info('\tEffective: %d', euid) - logging.info('\tSaved: %d', suid) - rgid, egid, sgid = os.getresgid() - logging.info('Group IDs:') - logging.info('\tReal: %d', rgid) - logging.info('\tEffective: %d', egid) - logging.info('\tSaved: %d', sgid) - - -def check_root(): - if os.getuid() == 0: - return True - logging.error('Must be run as root') - return False - - -def scandir(dir_path): - try: - entry_it = os.scandir(dir_path) - except (PermissionError, FileNotFoundError) as e: - logging.warning('%s', e) - return - with entry_it: - yield from entry_it - - -def enum_dirs(dir_path): - for entry in scandir(dir_path): - if entry.is_dir(follow_symlinks=False): - yield entry.path - - -def is_writable_via_access(dir_path): - return os.access(dir_path, os.W_OK | os.X_OK) - - -def is_writable(dir_path): - return is_writable_via_access(dir_path) - - -def drop_privileges(uid, gid): - if not check_root(): - return - os.setgroups([]) - os.setgid(gid) - os.setuid(uid) - os.umask(0o77) - - -@contextlib.contextmanager -def close_queue(queue): - try: - yield - finally: - queue.put(None) - - -def access_loop(access_queue, scandir_queue): - for dir_list in iter(access_queue.get, None): - denied_dir_list = [] - for dir_path in dir_list: - if is_writable(dir_path): - logging.info('Writable: %s', dir_path) - else: - denied_dir_list.append(dir_path) - if not denied_dir_list: - break - scandir_queue.put(denied_dir_list) - - -def access_main(args, log_queue, access_queue, scandir_queue): - with close_queue(scandir_queue), setup_logging(log_queue): - drop_privileges(args.uid, args.gid) - dump_process_info() - access_loop(access_queue, scandir_queue) - - -def scandir_loop(access_queue, scandir_queue): - for parent_dir_list in iter(scandir_queue.get, None): - child_dir_list = [child_dir - for parent_dir in parent_dir_list - for child_dir in enum_dirs(parent_dir)] - if not child_dir_list: - break - access_queue.put(child_dir_list) - - -def scandir_main(access_queue, scandir_queue): - with close_queue(access_queue): - if not check_root(): - return - dump_process_info() - scandir_loop(access_queue, scandir_queue) - - -def main(argv=None): - log_queue = mp.Queue() - with launch_logging_thread(log_queue), setup_logging(log_queue): - prog_args = parse_args(argv) - access_queue = mp.SimpleQueue() - scandir_queue = mp.SimpleQueue() - access_process_args = prog_args, log_queue, access_queue, scandir_queue - access_process = mp.Process(target=access_main, - args=access_process_args, - name='access') - access_queue.put([prog_args.root_dir]) - access_process.start() - scandir_main(access_queue, scandir_queue) - access_process.join() - - -if __name__ == '__main__': - mp.set_start_method('spawn') - main() -- cgit v1.2.3