diff options
author | Egor Tensin <Egor.Tensin@gmail.com> | 2019-12-23 07:20:36 +0300 |
---|---|---|
committer | Egor Tensin <Egor.Tensin@gmail.com> | 2019-12-23 07:20:36 +0300 |
commit | 7b8cc8a9f455eda41b9c7d70f4561a84fcda941e (patch) | |
tree | b9e262e9a1dbb663c3b9f704a9fe4daf54be0ce9 /bin/online_sessions.py | |
parent | Travis: online_sessions.sh: refactoring (diff) | |
download | vk-scripts-7b8cc8a9f455eda41b9c7d70f4561a84fcda941e.tar.gz vk-scripts-7b8cc8a9f455eda41b9c7d70f4561a84fcda941e.zip |
pylint/pep8 fixes
Diffstat (limited to 'bin/online_sessions.py')
-rw-r--r-- | bin/online_sessions.py | 45 |
1 files changed, 31 insertions, 14 deletions
diff --git a/bin/online_sessions.py b/bin/online_sessions.py index 670a8c3..e0de7c9 100644 --- a/bin/online_sessions.py +++ b/bin/online_sessions.py @@ -17,6 +17,7 @@ from vk.user import UserField from .utils.bar_chart import BarChartBuilder from .utils import io + class GroupBy(Enum): USER = 'user' DATE = 'date' @@ -30,14 +31,14 @@ class GroupBy(Enum): online_streaks = OnlineSessionEnumerator(time_from, time_to) if self is GroupBy.USER: return online_streaks.group_by_user(db_reader) - elif self is GroupBy.DATE: + if self is GroupBy.DATE: return online_streaks.group_by_date(db_reader) - elif self is GroupBy.WEEKDAY: + if self is GroupBy.WEEKDAY: return online_streaks.group_by_weekday(db_reader) - elif self is GroupBy.HOUR: + if self is GroupBy.HOUR: return online_streaks.group_by_hour(db_reader) - else: - raise NotImplementedError('unsupported grouping: ' + str(self)) + raise NotImplementedError('unsupported grouping: ' + str(self)) + _OUTPUT_USER_FIELDS = ( UserField.UID, @@ -46,11 +47,13 @@ _OUTPUT_USER_FIELDS = ( UserField.DOMAIN, ) + class OutputSinkOnlineSessions(metaclass=abc.ABCMeta): @abc.abstractmethod def process_database(self, group_by, db_reader, time_from=None, time_to=None): pass + class OutputConverterCSV: @staticmethod def convert_user(user): @@ -68,6 +71,7 @@ class OutputConverterCSV: def convert_hour(hour): return [str(timedelta(hours=hour))] + class OutputSinkCSV(OutputSinkOnlineSessions): def __init__(self, fd=sys.stdout): self._writer = io.FileWriterCSV(fd) @@ -91,6 +95,7 @@ class OutputSinkCSV(OutputSinkOnlineSessions): row.append(str(duration)) self._writer.write_row(row) + class OutputConverterJSON: _DATE_FIELD = 'date' _WEEKDAY_FIELD = 'weekday' @@ -125,6 +130,7 @@ class OutputConverterJSON: obj[OutputConverterJSON._HOUR_FIELD] = str(timedelta(hours=hour)) return obj + class OutputSinkJSON(OutputSinkOnlineSessions): def __init__(self, fd=sys.stdout): self._writer = io.FileWriterJSON(fd) @@ -142,7 +148,7 @@ class OutputSinkJSON(OutputSinkOnlineSessions): @staticmethod def _key_to_object(group_by, key): - if not group_by in OutputSinkJSON._CONVERT_KEY: + if group_by not in OutputSinkJSON._CONVERT_KEY: raise NotImplementedError('unsupported grouping: ' + str(group_by)) return OutputSinkJSON._CONVERT_KEY[group_by](key) @@ -154,6 +160,7 @@ class OutputSinkJSON(OutputSinkOnlineSessions): entries.append(entry) self._writer.write(entries) + class OutputConverterPlot: @staticmethod def convert_user(user): @@ -171,6 +178,7 @@ class OutputConverterPlot: def convert_hour(hour): return '{}:00'.format(hour) + class OutputSinkPlot(OutputSinkOnlineSessions): def __init__(self, fd=sys.stdout): self._fd = fd @@ -200,11 +208,11 @@ class OutputSinkPlot(OutputSinkOnlineSessions): @staticmethod def _extract_labels(group_by, durations): - return tuple(map(lambda key: OutputSinkPlot._format_key(group_by, key), durations.keys())) + return (OutputSinkPlot._format_key(group_by, key) for key in durations.keys()) @staticmethod def _extract_values(durations): - return tuple(map(OutputSinkPlot._duration_to_seconds, durations.values())) + return (OutputSinkPlot._duration_to_seconds(duration) for duration in durations.values()) def process_database( self, group_by, db_reader, time_from=None, time_to=None): @@ -219,8 +227,8 @@ class OutputSinkPlot(OutputSinkOnlineSessions): fontsize='small', rotation=30) bar_chart.set_value_label_formatter(self._format_duration) - labels = self._extract_labels(group_by, durations) - durations = self._extract_values(durations) + labels = tuple(self._extract_labels(group_by, durations)) + durations = tuple(self._extract_values(durations)) if group_by is GroupBy.HOUR: bar_chart.labels_align_middle = False @@ -237,6 +245,7 @@ class OutputSinkPlot(OutputSinkOnlineSessions): else: bar_chart.save(self._fd) + class OutputFormat(Enum): CSV = 'csv' JSON = 'json' @@ -248,38 +257,42 @@ class OutputFormat(Enum): def create_sink(self, fd=sys.stdout): if self is OutputFormat.CSV: return OutputSinkCSV(fd) - elif self is OutputFormat.JSON: + if self is OutputFormat.JSON: return OutputSinkJSON(fd) - elif self is OutputFormat.PLOT: + if self is OutputFormat.PLOT: return OutputSinkPlot(fd) - else: - raise NotImplementedError('unsupported output format: ' + str(self)) + raise NotImplementedError('unsupported output format: ' + str(self)) def open_file(self, path=None): if self is OutputFormat.PLOT: return io.open_output_binary_file(path) return io.open_output_text_file(path) + def _parse_group_by(s): try: return GroupBy(s) except ValueError: raise argparse.ArgumentTypeError('invalid "group by" value: ' + s) + def _parse_database_format(s): try: return DatabaseFormat(s) except ValueError: raise argparse.ArgumentTypeError('invalid database format: ' + s) + def _parse_output_format(s): try: return OutputFormat(s) except ValueError: raise argparse.ArgumentTypeError('invalid output format: ' + s) + _DATE_RANGE_LIMIT_FORMAT = '%Y-%m-%dT%H:%M:%SZ' + def _parse_date_range_limit(s): try: dt = datetime.strptime(s, _DATE_RANGE_LIMIT_FORMAT) @@ -289,6 +302,7 @@ def _parse_date_range_limit(s): raise argparse.ArgumentTypeError( msg.format(_DATE_RANGE_LIMIT_FORMAT, s)) + def _parse_args(args=None): if args is None: args = sys.argv[1:] @@ -324,6 +338,7 @@ def _parse_args(args=None): return parser.parse_args(args) + def process_online_sessions( db_path=None, db_fmt=DatabaseFormat.CSV, out_path=None, out_fmt=OutputFormat.CSV, @@ -343,8 +358,10 @@ def process_online_sessions( time_from=time_from, time_to=time_to) + def main(args=None): process_online_sessions(**vars(_parse_args(args))) + if __name__ == '__main__': main() |