diff options
Diffstat (limited to 'vk/tracking/online_streaks.py')
-rw-r--r-- | vk/tracking/online_streaks.py | 140 |
1 files changed, 140 insertions, 0 deletions
diff --git a/vk/tracking/online_streaks.py b/vk/tracking/online_streaks.py new file mode 100644 index 0000000..5c9aa48 --- /dev/null +++ b/vk/tracking/online_streaks.py @@ -0,0 +1,140 @@ +# Copyright 2016 Egor Tensin <Egor.Tensin@gmail.com> +# This file is licensed under the terms of the MIT License. +# See LICENSE.txt for details. + +from collections import OrderedDict +from collections.abc import MutableMapping +from datetime import timedelta +from enum import Enum + +from vk.user import User + +class Weekday(Enum): + MONDAY = 0 + TUESDAY = 1 + WEDNESDAY = 2 + THURSDAY = 3 + FRIDAY = 4 + SATURDAY = 5 + SUNDAY = 6 + + def __str__(self): + return self.name[0] + self.name[1:].lower() + +class OnlineStreakEnumerator(MutableMapping): + def __init__(self, date_from=None, date_to=None): + self._records = {} + self._date_from = date_from + self._date_to = date_to + + def __getitem__(self, user): + return self._records[user] + + def __setitem__(self, user, record): + self._records[user] = record + + def __delitem__(self, user): + del self._records[user] + + def __iter__(self): + return iter(self._records) + + def __len__(self): + return len(self._records) + + def _cut_period(self, streak): + user, time_from, time_to = streak + #print(user.get_first_name(), time_from, self._date_from) + if self._date_from is not None: + if time_to < self._date_from: + #print(1) + return None + if time_from < self._date_from: + #print(2) + time_from = self._date_from + if self._date_to is not None: + if time_from > self._date_to: + #print(3) + return None + if time_to > self._date_to: + #print(4) + time_to = self._date_to + return user, time_from, time_to + + def enum(self, db_reader): + for record in db_reader: + streak = self._insert_record(record) + if streak is not None: + streak = self._cut_period(streak) + if streak is not None: + yield streak + + def group_by_user(self, db_reader): + by_user = {} + for user, time_from, time_to in self.enum(db_reader): + if user not in by_user: + by_user[user] = timedelta() + by_user[user] += time_to - time_from + return by_user + + def group_by_date(self, db_reader): + by_date = OrderedDict() + for _, time_from, time_to in self.enum(db_reader): + for date, duration in self._enum_dates_and_durations(time_from, time_to): + if date not in by_date: + by_date[date] = timedelta() + by_date[date] += duration + return by_date + + def group_by_weekday(self, db_reader): + by_weekday = OrderedDict() + for weekday in Weekday: + by_weekday[weekday] = timedelta() + for _, time_from, time_to in self.enum(db_reader): + for date, duration in self._enum_dates_and_durations(time_from, time_to): + by_weekday[Weekday(date.weekday())] += duration + return by_weekday + + def group_by_hour(self, db_reader): + by_hour = OrderedDict() + for i in range(24): + by_hour[i] = timedelta() + for _, time_from, time_to in self.enum(db_reader): + for hour, duration in self._enum_hours_and_durations(time_from, time_to): + by_hour[hour] += duration + return by_hour + + @staticmethod + def _enum_dates_and_durations(time_from, time_to): + while time_from.date() != time_to.date(): + next_day = time_from.date() + timedelta(days=1) + yield time_from.date(), next_day - time_from + time_from = next_day + yield time_to.date(), time_to - time_from + + @staticmethod + def _enum_hours_and_durations(time_from, time_to): + while time_from.date() != time_to.date() or time_from.hour != time_to.hour: + next_hour = time_from.replace(minute=0, second=0) + timedelta(hours=1) + yield time_from.hour, next_hour - time_from + time_from = next_hour + yield time_to.hour, time_to - time_from + + def _insert_record(self, record): + return self._insert_user(record.to_user()) + + def _known_user(self, user): + return user.get_uid() in self._records + + def _unknown_user(self, user): + return not self._known_user(user) + + def _insert_user(self, user): + if user not in self or self[user].is_offline(): + self[user] = user + return None + if user.is_online(): + return None + streak = user, self[user].get_last_seen_time(), user.get_last_seen_time() + self[user] = user + return streak |