aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/vk/tracking/online_streaks.py
diff options
context:
space:
mode:
Diffstat (limited to 'vk/tracking/online_streaks.py')
-rw-r--r--vk/tracking/online_streaks.py140
1 files changed, 140 insertions, 0 deletions
diff --git a/vk/tracking/online_streaks.py b/vk/tracking/online_streaks.py
new file mode 100644
index 0000000..5c9aa48
--- /dev/null
+++ b/vk/tracking/online_streaks.py
@@ -0,0 +1,140 @@
+# Copyright 2016 Egor Tensin <Egor.Tensin@gmail.com>
+# This file is licensed under the terms of the MIT License.
+# See LICENSE.txt for details.
+
+from collections import OrderedDict
+from collections.abc import MutableMapping
+from datetime import timedelta
+from enum import Enum
+
+from vk.user import User
+
+class Weekday(Enum):
+ MONDAY = 0
+ TUESDAY = 1
+ WEDNESDAY = 2
+ THURSDAY = 3
+ FRIDAY = 4
+ SATURDAY = 5
+ SUNDAY = 6
+
+ def __str__(self):
+ return self.name[0] + self.name[1:].lower()
+
+class OnlineStreakEnumerator(MutableMapping):
+ def __init__(self, date_from=None, date_to=None):
+ self._records = {}
+ self._date_from = date_from
+ self._date_to = date_to
+
+ def __getitem__(self, user):
+ return self._records[user]
+
+ def __setitem__(self, user, record):
+ self._records[user] = record
+
+ def __delitem__(self, user):
+ del self._records[user]
+
+ def __iter__(self):
+ return iter(self._records)
+
+ def __len__(self):
+ return len(self._records)
+
+ def _cut_period(self, streak):
+ user, time_from, time_to = streak
+ #print(user.get_first_name(), time_from, self._date_from)
+ if self._date_from is not None:
+ if time_to < self._date_from:
+ #print(1)
+ return None
+ if time_from < self._date_from:
+ #print(2)
+ time_from = self._date_from
+ if self._date_to is not None:
+ if time_from > self._date_to:
+ #print(3)
+ return None
+ if time_to > self._date_to:
+ #print(4)
+ time_to = self._date_to
+ return user, time_from, time_to
+
+ def enum(self, db_reader):
+ for record in db_reader:
+ streak = self._insert_record(record)
+ if streak is not None:
+ streak = self._cut_period(streak)
+ if streak is not None:
+ yield streak
+
+ def group_by_user(self, db_reader):
+ by_user = {}
+ for user, time_from, time_to in self.enum(db_reader):
+ if user not in by_user:
+ by_user[user] = timedelta()
+ by_user[user] += time_to - time_from
+ return by_user
+
+ def group_by_date(self, db_reader):
+ by_date = OrderedDict()
+ for _, time_from, time_to in self.enum(db_reader):
+ for date, duration in self._enum_dates_and_durations(time_from, time_to):
+ if date not in by_date:
+ by_date[date] = timedelta()
+ by_date[date] += duration
+ return by_date
+
+ def group_by_weekday(self, db_reader):
+ by_weekday = OrderedDict()
+ for weekday in Weekday:
+ by_weekday[weekday] = timedelta()
+ for _, time_from, time_to in self.enum(db_reader):
+ for date, duration in self._enum_dates_and_durations(time_from, time_to):
+ by_weekday[Weekday(date.weekday())] += duration
+ return by_weekday
+
+ def group_by_hour(self, db_reader):
+ by_hour = OrderedDict()
+ for i in range(24):
+ by_hour[i] = timedelta()
+ for _, time_from, time_to in self.enum(db_reader):
+ for hour, duration in self._enum_hours_and_durations(time_from, time_to):
+ by_hour[hour] += duration
+ return by_hour
+
+ @staticmethod
+ def _enum_dates_and_durations(time_from, time_to):
+ while time_from.date() != time_to.date():
+ next_day = time_from.date() + timedelta(days=1)
+ yield time_from.date(), next_day - time_from
+ time_from = next_day
+ yield time_to.date(), time_to - time_from
+
+ @staticmethod
+ def _enum_hours_and_durations(time_from, time_to):
+ while time_from.date() != time_to.date() or time_from.hour != time_to.hour:
+ next_hour = time_from.replace(minute=0, second=0) + timedelta(hours=1)
+ yield time_from.hour, next_hour - time_from
+ time_from = next_hour
+ yield time_to.hour, time_to - time_from
+
+ def _insert_record(self, record):
+ return self._insert_user(record.to_user())
+
+ def _known_user(self, user):
+ return user.get_uid() in self._records
+
+ def _unknown_user(self, user):
+ return not self._known_user(user)
+
+ def _insert_user(self, user):
+ if user not in self or self[user].is_offline():
+ self[user] = user
+ return None
+ if user.is_online():
+ return None
+ streak = user, self[user].get_last_seen_time(), user.get_last_seen_time()
+ self[user] = user
+ return streak