# Copyright (c) 2016 Egor Tensin # This file is part of the "VK scripts" project. # For details, see https://github.com/egor-tensin/vk-scripts. # Distributed under the MIT License. from collections import OrderedDict from collections.abc import MutableMapping from datetime import timedelta from enum import Enum class Weekday(Enum): MONDAY = 0 TUESDAY = 1 WEDNESDAY = 2 THURSDAY = 3 FRIDAY = 4 SATURDAY = 5 SUNDAY = 6 def __str__(self): return self.name[0] + self.name[1:].lower() class OnlineSessionEnumerator(MutableMapping): def __init__(self, time_from=None, time_to=None): self._records = {} self._time_from = time_from self._time_to = time_to def __getitem__(self, user): return self._records[user] def __setitem__(self, user, record): self._records[user] = record def __delitem__(self, user): del self._records[user] def __iter__(self): return iter(self._records) def __len__(self): return len(self._records) def _trim_or_drop_session(self, session): user, started_at, ended_at = session if self._time_from is not None: if ended_at < self._time_from: return None if started_at < self._time_from: started_at = self._time_from if self._time_to is not None: if started_at > self._time_to: return None if ended_at > self._time_to: ended_at = self._time_to return user, started_at, ended_at def read_database(self, db_reader): for record in db_reader: session = self._process_database_record(record) if session is not None: session = self._trim_or_drop_session(session) if session is not None: yield session def group_by_user(self, db_reader): by_user = {} for user, started_at, ended_at in self.read_database(db_reader): if user not in by_user: by_user[user] = timedelta() by_user[user] += ended_at - started_at return by_user def group_by_date(self, db_reader): by_date = {} for _, started_at, ended_at in self.read_database(db_reader): for date, duration in self._split_into_days(started_at, ended_at): if date not in by_date: by_date[date] = timedelta() by_date[date] += duration return by_date def group_by_weekday(self, db_reader): by_weekday = OrderedDict() for weekday in Weekday: by_weekday[weekday] = timedelta() for _, started_at, ended_at in self.read_database(db_reader): for date, duration in self._split_into_days(started_at, ended_at): by_weekday[Weekday(date.weekday())] += duration return by_weekday def group_by_hour(self, db_reader): by_hour = OrderedDict() for i in range(24): by_hour[i] = timedelta() for _, started_at, ended_at in self.read_database(db_reader): for hour, duration in self._split_into_hours(started_at, ended_at): by_hour[hour] += duration return by_hour @staticmethod def _split_into_days(a, b): while a.date() != b.date(): next_day = a.date() + timedelta(days=1) yield a.date(), next_day - a a = next_day yield b.date(), b - a @staticmethod def _split_into_hours(a, b): while a.date() != b.date() or a.hour != b.hour: next_hour = a.replace(minute=0, second=0) + timedelta(hours=1) yield a.hour, next_hour - a a = next_hour yield b.hour, b - a def _process_database_record(self, record): return self._close_user_session(record.to_user()) def _known_user(self, user): return user.get_uid() in self._records def _unknown_user(self, user): return not self._known_user(user) def _close_user_session(self, user): if user not in self or self[user].is_offline(): self[user] = user return None if user.is_online(): return None session = user, self[user].get_last_seen_time(), user.get_last_seen_time() self[user] = user return session