From 104e49ea4563e7d7ae9a1ffdba808e98bfec3615 Mon Sep 17 00:00:00 2001
From: Egor Tensin <Egor.Tensin@gmail.com>
Date: Fri, 17 Mar 2023 23:43:03 +0100
Subject: add extensions to script files

---
 src/bad-attrs        | 135 -------------------------
 src/bad-attrs.py     | 135 +++++++++++++++++++++++++
 src/writable-dirs    | 274 ---------------------------------------------------
 src/writable-dirs.py | 274 +++++++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 409 insertions(+), 409 deletions(-)
 delete mode 100755 src/bad-attrs
 create mode 100755 src/bad-attrs.py
 delete mode 100755 src/writable-dirs
 create mode 100755 src/writable-dirs.py

(limited to 'src')

diff --git a/src/bad-attrs b/src/bad-attrs
deleted file mode 100755
index 4683540..0000000
--- a/src/bad-attrs
+++ /dev/null
@@ -1,135 +0,0 @@
-#!/usr/bin/env python3
-
-# Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com>
-# This file is part of the "audit-scripts" project.
-# For details, see https://github.com/egor-tensin/audit-scripts.
-# Distributed under the MIT License.
-
-import argparse
-import array
-from contextlib import contextmanager
-import errno
-import fcntl
-import logging
-import os
-import stat
-import sys
-
-
-@contextmanager
-def setup_logging():
-    logging.basicConfig(
-        format='%(asctime)s | %(levelname)s | %(message)s',
-        datefmt='%Y-%m-%d %H:%M:%S',
-        level=logging.DEBUG)
-    try:
-        yield
-    except Exception as e:
-        logging.exception(e)
-        raise
-
-
-def scandir(dir_path):
-    try:
-        entry_it = os.scandir(dir_path)
-    except (PermissionError, FileNotFoundError) as e:
-        logging.warning('%s', e)
-        return
-    with entry_it:
-        yield from entry_it
-
-
-def traverse_tree(root):
-    queue = [root]
-    while queue:
-        for entry in scandir(queue.pop(0)):
-            if entry.is_dir(follow_symlinks=False):
-                if os.path.ismount(entry.path):
-                    continue
-                queue.append(entry.path)
-            yield entry
-
-
-def skip_leaf(entry):
-    if entry.is_dir(follow_symlinks=False):
-        return False
-    if entry.is_file(follow_symlinks=False):
-        return False
-    return True
-
-
-@contextmanager
-def low_level_open(path, flags):
-    fd = os.open(path, flags)
-    try:
-        yield fd
-    finally:
-        os.close(fd)
-
-
-FS_IOC_GETFLAGS = 0x80086601
-
-FS_IMMUTABLE_FL = 0x00000010
-FS_APPEND_FL    = 0x00000020
-
-BAD_FLAGS = [FS_IMMUTABLE_FL, FS_APPEND_FL]
-
-
-def flags_contain_bad_flags(flags):
-    return any([flags & bad_flag for bad_flag in BAD_FLAGS])
-
-
-def fd_get_flags(fd):
-    a = array.array('L', [0])
-    fcntl.ioctl(fd, FS_IOC_GETFLAGS, a, True)
-    return a[0]
-
-
-def path_get_flags(path):
-    with low_level_open(path, os.O_RDONLY) as fd:
-        return fd_get_flags(fd)
-
-
-def path_has_bad_flags(path):
-    try:
-        flags = path_get_flags(path)
-    except OSError as e:
-        if e.errno == errno.ENOTTY or e.errno == errno.EPERM:
-            # Either one of:
-            #     Inappropriate ioctl for device
-            #     Permission denied
-            # It's relied upon that fcntl throws OSError instead of
-            # PermissionError.
-            logging.warning('%s: %s', path, e)
-            return False
-        raise
-    return flags_contain_bad_flags(flags)
-
-
-def do_dir(root):
-    logging.info('Directory: %s', root)
-    for entry in traverse_tree(root):
-        if skip_leaf(entry):
-            continue
-        #logging.debug('Path: %s', entry.path)
-        if path_has_bad_flags(entry.path):
-            logging.warning('Bad flags: %s', entry.path)
-
-
-def parse_args(argv=None):
-    if argv is None:
-        argv = sys.argv[1:]
-    parser = argparse.ArgumentParser()
-    parser.add_argument('dir', metavar='DIR',
-                        help='set root directory')
-    return parser.parse_args()
-
-
-def main(argv=None):
-    with setup_logging():
-        args = parse_args()
-        do_dir(args.dir)
-
-
-if __name__ == '__main__':
-    main()
diff --git a/src/bad-attrs.py b/src/bad-attrs.py
new file mode 100755
index 0000000..4683540
--- /dev/null
+++ b/src/bad-attrs.py
@@ -0,0 +1,135 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com>
+# This file is part of the "audit-scripts" project.
+# For details, see https://github.com/egor-tensin/audit-scripts.
+# Distributed under the MIT License.
+
+import argparse
+import array
+from contextlib import contextmanager
+import errno
+import fcntl
+import logging
+import os
+import stat
+import sys
+
+
+@contextmanager
+def setup_logging():
+    logging.basicConfig(
+        format='%(asctime)s | %(levelname)s | %(message)s',
+        datefmt='%Y-%m-%d %H:%M:%S',
+        level=logging.DEBUG)
+    try:
+        yield
+    except Exception as e:
+        logging.exception(e)
+        raise
+
+
+def scandir(dir_path):
+    try:
+        entry_it = os.scandir(dir_path)
+    except (PermissionError, FileNotFoundError) as e:
+        logging.warning('%s', e)
+        return
+    with entry_it:
+        yield from entry_it
+
+
+def traverse_tree(root):
+    queue = [root]
+    while queue:
+        for entry in scandir(queue.pop(0)):
+            if entry.is_dir(follow_symlinks=False):
+                if os.path.ismount(entry.path):
+                    continue
+                queue.append(entry.path)
+            yield entry
+
+
+def skip_leaf(entry):
+    if entry.is_dir(follow_symlinks=False):
+        return False
+    if entry.is_file(follow_symlinks=False):
+        return False
+    return True
+
+
+@contextmanager
+def low_level_open(path, flags):
+    fd = os.open(path, flags)
+    try:
+        yield fd
+    finally:
+        os.close(fd)
+
+
+FS_IOC_GETFLAGS = 0x80086601
+
+FS_IMMUTABLE_FL = 0x00000010
+FS_APPEND_FL    = 0x00000020
+
+BAD_FLAGS = [FS_IMMUTABLE_FL, FS_APPEND_FL]
+
+
+def flags_contain_bad_flags(flags):
+    return any([flags & bad_flag for bad_flag in BAD_FLAGS])
+
+
+def fd_get_flags(fd):
+    a = array.array('L', [0])
+    fcntl.ioctl(fd, FS_IOC_GETFLAGS, a, True)
+    return a[0]
+
+
+def path_get_flags(path):
+    with low_level_open(path, os.O_RDONLY) as fd:
+        return fd_get_flags(fd)
+
+
+def path_has_bad_flags(path):
+    try:
+        flags = path_get_flags(path)
+    except OSError as e:
+        if e.errno == errno.ENOTTY or e.errno == errno.EPERM:
+            # Either one of:
+            #     Inappropriate ioctl for device
+            #     Permission denied
+            # It's relied upon that fcntl throws OSError instead of
+            # PermissionError.
+            logging.warning('%s: %s', path, e)
+            return False
+        raise
+    return flags_contain_bad_flags(flags)
+
+
+def do_dir(root):
+    logging.info('Directory: %s', root)
+    for entry in traverse_tree(root):
+        if skip_leaf(entry):
+            continue
+        #logging.debug('Path: %s', entry.path)
+        if path_has_bad_flags(entry.path):
+            logging.warning('Bad flags: %s', entry.path)
+
+
+def parse_args(argv=None):
+    if argv is None:
+        argv = sys.argv[1:]
+    parser = argparse.ArgumentParser()
+    parser.add_argument('dir', metavar='DIR',
+                        help='set root directory')
+    return parser.parse_args()
+
+
+def main(argv=None):
+    with setup_logging():
+        args = parse_args()
+        do_dir(args.dir)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/src/writable-dirs b/src/writable-dirs
deleted file mode 100755
index 19f6094..0000000
--- a/src/writable-dirs
+++ /dev/null
@@ -1,274 +0,0 @@
-#!/usr/bin/env python3
-
-# Copyright (c) 2018 Egor Tensin <Egor.Tensin@gmail.com>
-# This file is part of the "audit-scripts" project.
-# For details, see https://github.com/egor-tensin/audit-scripts.
-# Distributed under the MIT License.
-
-import argparse
-import contextlib
-import grp
-import logging
-import logging.config
-import logging.handlers
-import multiprocessing as mp
-import os
-import pwd
-import sys
-
-
-def console_log_formatter():
-    fmt = '%(asctime)s | %(processName)s | %(levelname)s | %(message)s'
-    datefmt = '%Y-%m-%d %H:%M:%S'
-    return logging.Formatter(fmt=fmt, datefmt=datefmt)
-
-
-def console_log_level():
-    return logging.DEBUG
-
-
-def console_log_handler():
-    handler = logging.StreamHandler()
-    handler.setFormatter(console_log_formatter())
-    handler.setLevel(console_log_level())
-    return handler
-
-
-@contextlib.contextmanager
-def launch_logging_thread(queue):
-    process = mp.current_process()
-    process.name = 'scandir'
-    listener = logging.handlers.QueueListener(queue, console_log_handler())
-    listener.start()
-    try:
-        yield
-    finally:
-        listener.stop()
-
-
-@contextlib.contextmanager
-def setup_logging(queue):
-    config = {
-        'version': 1,
-        'handlers': {
-            'sink': {
-                'class': 'logging.handlers.QueueHandler',
-                'queue': queue,
-            },
-        },
-        'root': {
-            'handlers': ['sink'],
-            'level': 'DEBUG',
-        },
-    }
-    logging.config.dictConfig(config)
-    try:
-        yield
-    except Exception as e:
-        logging.exception(e)
-
-
-def validate_uid(uid):
-    try:
-        return pwd.getpwuid(uid).pw_uid
-    except KeyError:
-        return None
-
-
-def validate_gid(gid):
-    try:
-        return grp.getgrgid(gid).gr_gid
-    except KeyError:
-        return None
-
-
-def map_user_name(user_name):
-    try:
-        return pwd.getpwnam(user_name).pw_uid
-    except KeyError:
-        return None
-
-
-def map_group_name(group_name):
-    try:
-        return grp.getgrnam(group_name).gr_gid
-    except KeyError:
-        return None
-
-
-def parse_user_name(src):
-    uid = map_user_name(src)
-    if uid is None:
-        raise argparse.ArgumentTypeError('unknown user name: {}'.format(src))
-    return uid
-
-
-def parse_group_name(src):
-    gid = map_group_name(src)
-    if gid is None:
-        raise argparse.ArgumentTypeError('unknown group name: {}'.format(src))
-    return gid
-
-
-def parse_uid(src):
-    try:
-        uid = int(src)
-    except ValueError:
-        return parse_user_name(src)
-    uid = validate_uid(uid)
-    if uid is None:
-        raise argparse.ArgumentTypeError('unknown UID: {}'.format(src))
-    return uid
-
-
-def parse_gid(src):
-    try:
-        gid = int(src)
-    except ValueError:
-        return parse_group_name(src)
-    gid = validate_gid(gid)
-    if gid is None:
-        raise argparse.ArgumentTypeError('unknown GID: {}'.format(src))
-    return gid
-
-
-def get_primary_gid(uid):
-    return pwd.getpwuid(uid).pw_gid
-
-
-def parse_args(argv=None):
-    if argv is None:
-        argv = sys.argv[1:]
-    parser = argparse.ArgumentParser()
-    parser.add_argument('root_dir', default='/', nargs='?', metavar='DIR',
-                        help='set root directory')
-    parser.add_argument('--user', '-u', dest='uid', required=True,
-                        metavar='USER', type=parse_uid,
-                        help='set new process\' UID')
-    parser.add_argument('--group', '-g', dest='gid',
-                        metavar='GROUP', type=parse_gid,
-                        help='set new process\' GID')
-    args = parser.parse_args(argv)
-    if args.gid is None:
-        args.gid = get_primary_gid(args.uid)
-    return args
-
-
-def dump_process_info():
-    ruid, euid, suid = os.getresuid()
-    logging.info('User IDs:')
-    logging.info('\tReal: %d', ruid)
-    logging.info('\tEffective: %d', euid)
-    logging.info('\tSaved: %d', suid)
-    rgid, egid, sgid = os.getresgid()
-    logging.info('Group IDs:')
-    logging.info('\tReal: %d', rgid)
-    logging.info('\tEffective: %d', egid)
-    logging.info('\tSaved: %d', sgid)
-
-
-def check_root():
-    if os.getuid() == 0:
-        return True
-    logging.error('Must be run as root')
-    return False
-
-
-def scandir(dir_path):
-    try:
-        entry_it = os.scandir(dir_path)
-    except (PermissionError, FileNotFoundError) as e:
-        logging.warning('%s', e)
-        return
-    with entry_it:
-        yield from entry_it
-
-
-def enum_dirs(dir_path):
-    for entry in scandir(dir_path):
-        if entry.is_dir(follow_symlinks=False):
-            yield entry.path
-
-
-def is_writable_via_access(dir_path):
-    return os.access(dir_path, os.W_OK | os.X_OK)
-
-
-def is_writable(dir_path):
-    return is_writable_via_access(dir_path)
-
-
-def drop_privileges(uid, gid):
-    if not check_root():
-        return
-    os.setgroups([])
-    os.setgid(gid)
-    os.setuid(uid)
-    os.umask(0o77)
-
-
-@contextlib.contextmanager
-def close_queue(queue):
-    try:
-        yield
-    finally:
-        queue.put(None)
-
-
-def access_loop(access_queue, scandir_queue):
-    for dir_list in iter(access_queue.get, None):
-        denied_dir_list = []
-        for dir_path in dir_list:
-            if is_writable(dir_path):
-                logging.info('Writable: %s', dir_path)
-            else:
-                denied_dir_list.append(dir_path)
-        if not denied_dir_list:
-            break
-        scandir_queue.put(denied_dir_list)
-
-
-def access_main(args, log_queue, access_queue, scandir_queue):
-    with close_queue(scandir_queue), setup_logging(log_queue):
-        drop_privileges(args.uid, args.gid)
-        dump_process_info()
-        access_loop(access_queue, scandir_queue)
-
-
-def scandir_loop(access_queue, scandir_queue):
-    for parent_dir_list in iter(scandir_queue.get, None):
-        child_dir_list = [child_dir
-                          for parent_dir in parent_dir_list
-                          for child_dir in enum_dirs(parent_dir)]
-        if not child_dir_list:
-            break
-        access_queue.put(child_dir_list)
-
-
-def scandir_main(access_queue, scandir_queue):
-    with close_queue(access_queue):
-        if not check_root():
-            return
-        dump_process_info()
-        scandir_loop(access_queue, scandir_queue)
-
-
-def main(argv=None):
-    log_queue = mp.Queue()
-    with launch_logging_thread(log_queue), setup_logging(log_queue):
-        prog_args = parse_args(argv)
-        access_queue = mp.SimpleQueue()
-        scandir_queue = mp.SimpleQueue()
-        access_process_args = prog_args, log_queue, access_queue, scandir_queue
-        access_process = mp.Process(target=access_main,
-                                    args=access_process_args,
-                                    name='access')
-        access_queue.put([prog_args.root_dir])
-        access_process.start()
-        scandir_main(access_queue, scandir_queue)
-        access_process.join()
-
-
-if __name__ == '__main__':
-    mp.set_start_method('spawn')
-    main()
diff --git a/src/writable-dirs.py b/src/writable-dirs.py
new file mode 100755
index 0000000..19f6094
--- /dev/null
+++ b/src/writable-dirs.py
@@ -0,0 +1,274 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2018 Egor Tensin <Egor.Tensin@gmail.com>
+# This file is part of the "audit-scripts" project.
+# For details, see https://github.com/egor-tensin/audit-scripts.
+# Distributed under the MIT License.
+
+import argparse
+import contextlib
+import grp
+import logging
+import logging.config
+import logging.handlers
+import multiprocessing as mp
+import os
+import pwd
+import sys
+
+
+def console_log_formatter():
+    fmt = '%(asctime)s | %(processName)s | %(levelname)s | %(message)s'
+    datefmt = '%Y-%m-%d %H:%M:%S'
+    return logging.Formatter(fmt=fmt, datefmt=datefmt)
+
+
+def console_log_level():
+    return logging.DEBUG
+
+
+def console_log_handler():
+    handler = logging.StreamHandler()
+    handler.setFormatter(console_log_formatter())
+    handler.setLevel(console_log_level())
+    return handler
+
+
+@contextlib.contextmanager
+def launch_logging_thread(queue):
+    process = mp.current_process()
+    process.name = 'scandir'
+    listener = logging.handlers.QueueListener(queue, console_log_handler())
+    listener.start()
+    try:
+        yield
+    finally:
+        listener.stop()
+
+
+@contextlib.contextmanager
+def setup_logging(queue):
+    config = {
+        'version': 1,
+        'handlers': {
+            'sink': {
+                'class': 'logging.handlers.QueueHandler',
+                'queue': queue,
+            },
+        },
+        'root': {
+            'handlers': ['sink'],
+            'level': 'DEBUG',
+        },
+    }
+    logging.config.dictConfig(config)
+    try:
+        yield
+    except Exception as e:
+        logging.exception(e)
+
+
+def validate_uid(uid):
+    try:
+        return pwd.getpwuid(uid).pw_uid
+    except KeyError:
+        return None
+
+
+def validate_gid(gid):
+    try:
+        return grp.getgrgid(gid).gr_gid
+    except KeyError:
+        return None
+
+
+def map_user_name(user_name):
+    try:
+        return pwd.getpwnam(user_name).pw_uid
+    except KeyError:
+        return None
+
+
+def map_group_name(group_name):
+    try:
+        return grp.getgrnam(group_name).gr_gid
+    except KeyError:
+        return None
+
+
+def parse_user_name(src):
+    uid = map_user_name(src)
+    if uid is None:
+        raise argparse.ArgumentTypeError('unknown user name: {}'.format(src))
+    return uid
+
+
+def parse_group_name(src):
+    gid = map_group_name(src)
+    if gid is None:
+        raise argparse.ArgumentTypeError('unknown group name: {}'.format(src))
+    return gid
+
+
+def parse_uid(src):
+    try:
+        uid = int(src)
+    except ValueError:
+        return parse_user_name(src)
+    uid = validate_uid(uid)
+    if uid is None:
+        raise argparse.ArgumentTypeError('unknown UID: {}'.format(src))
+    return uid
+
+
+def parse_gid(src):
+    try:
+        gid = int(src)
+    except ValueError:
+        return parse_group_name(src)
+    gid = validate_gid(gid)
+    if gid is None:
+        raise argparse.ArgumentTypeError('unknown GID: {}'.format(src))
+    return gid
+
+
+def get_primary_gid(uid):
+    return pwd.getpwuid(uid).pw_gid
+
+
+def parse_args(argv=None):
+    if argv is None:
+        argv = sys.argv[1:]
+    parser = argparse.ArgumentParser()
+    parser.add_argument('root_dir', default='/', nargs='?', metavar='DIR',
+                        help='set root directory')
+    parser.add_argument('--user', '-u', dest='uid', required=True,
+                        metavar='USER', type=parse_uid,
+                        help='set new process\' UID')
+    parser.add_argument('--group', '-g', dest='gid',
+                        metavar='GROUP', type=parse_gid,
+                        help='set new process\' GID')
+    args = parser.parse_args(argv)
+    if args.gid is None:
+        args.gid = get_primary_gid(args.uid)
+    return args
+
+
+def dump_process_info():
+    ruid, euid, suid = os.getresuid()
+    logging.info('User IDs:')
+    logging.info('\tReal: %d', ruid)
+    logging.info('\tEffective: %d', euid)
+    logging.info('\tSaved: %d', suid)
+    rgid, egid, sgid = os.getresgid()
+    logging.info('Group IDs:')
+    logging.info('\tReal: %d', rgid)
+    logging.info('\tEffective: %d', egid)
+    logging.info('\tSaved: %d', sgid)
+
+
+def check_root():
+    if os.getuid() == 0:
+        return True
+    logging.error('Must be run as root')
+    return False
+
+
+def scandir(dir_path):
+    try:
+        entry_it = os.scandir(dir_path)
+    except (PermissionError, FileNotFoundError) as e:
+        logging.warning('%s', e)
+        return
+    with entry_it:
+        yield from entry_it
+
+
+def enum_dirs(dir_path):
+    for entry in scandir(dir_path):
+        if entry.is_dir(follow_symlinks=False):
+            yield entry.path
+
+
+def is_writable_via_access(dir_path):
+    return os.access(dir_path, os.W_OK | os.X_OK)
+
+
+def is_writable(dir_path):
+    return is_writable_via_access(dir_path)
+
+
+def drop_privileges(uid, gid):
+    if not check_root():
+        return
+    os.setgroups([])
+    os.setgid(gid)
+    os.setuid(uid)
+    os.umask(0o77)
+
+
+@contextlib.contextmanager
+def close_queue(queue):
+    try:
+        yield
+    finally:
+        queue.put(None)
+
+
+def access_loop(access_queue, scandir_queue):
+    for dir_list in iter(access_queue.get, None):
+        denied_dir_list = []
+        for dir_path in dir_list:
+            if is_writable(dir_path):
+                logging.info('Writable: %s', dir_path)
+            else:
+                denied_dir_list.append(dir_path)
+        if not denied_dir_list:
+            break
+        scandir_queue.put(denied_dir_list)
+
+
+def access_main(args, log_queue, access_queue, scandir_queue):
+    with close_queue(scandir_queue), setup_logging(log_queue):
+        drop_privileges(args.uid, args.gid)
+        dump_process_info()
+        access_loop(access_queue, scandir_queue)
+
+
+def scandir_loop(access_queue, scandir_queue):
+    for parent_dir_list in iter(scandir_queue.get, None):
+        child_dir_list = [child_dir
+                          for parent_dir in parent_dir_list
+                          for child_dir in enum_dirs(parent_dir)]
+        if not child_dir_list:
+            break
+        access_queue.put(child_dir_list)
+
+
+def scandir_main(access_queue, scandir_queue):
+    with close_queue(access_queue):
+        if not check_root():
+            return
+        dump_process_info()
+        scandir_loop(access_queue, scandir_queue)
+
+
+def main(argv=None):
+    log_queue = mp.Queue()
+    with launch_logging_thread(log_queue), setup_logging(log_queue):
+        prog_args = parse_args(argv)
+        access_queue = mp.SimpleQueue()
+        scandir_queue = mp.SimpleQueue()
+        access_process_args = prog_args, log_queue, access_queue, scandir_queue
+        access_process = mp.Process(target=access_main,
+                                    args=access_process_args,
+                                    name='access')
+        access_queue.put([prog_args.root_dir])
+        access_process.start()
+        scandir_main(access_queue, scandir_queue)
+        access_process.join()
+
+
+if __name__ == '__main__':
+    mp.set_start_method('spawn')
+    main()
-- 
cgit v1.2.3