diff options
author | Egor Tensin <Egor.Tensin@gmail.com> | 2016-06-16 23:32:24 +0300 |
---|---|---|
committer | Egor Tensin <Egor.Tensin@gmail.com> | 2016-06-16 23:32:24 +0300 |
commit | eb930123454771b80465505579d723c92b3dd84c (patch) | |
tree | 9aeded9c85edc2ae9c00ae904d252558731d93f7 /vk/utils/tracking/db/record.py | |
parent | make "last seen" timestamps timezone-aware (diff) | |
download | vk-scripts-eb930123454771b80465505579d723c92b3dd84c.tar.gz vk-scripts-eb930123454771b80465505579d723c92b3dd84c.zip |
refactoring & support more user fields
And a bunch of other minor improvements.
Diffstat (limited to 'vk/utils/tracking/db/record.py')
-rw-r--r-- | vk/utils/tracking/db/record.py | 114 |
1 files changed, 77 insertions, 37 deletions
diff --git a/vk/utils/tracking/db/record.py b/vk/utils/tracking/db/record.py index 7cb054f..4748a37 100644 --- a/vk/utils/tracking/db/record.py +++ b/vk/utils/tracking/db/record.py @@ -3,15 +3,51 @@ # See LICENSE.txt for details. from collections import OrderedDict +from collections.abc import MutableMapping from datetime import datetime, timezone -from vk.user import Field as UserField +from vk.user import LastSeen, User, UserField -def _gen_timestamp(): - return datetime.now(timezone.utc).replace(microsecond=0) +class Timestamp: + @staticmethod + def _new(): + return datetime.utcnow() + + @staticmethod + def _is_timezone_aware(dt): + return dt.tzinfo is not None and dt.tzinfo.utcoffset(dt) is not None -class Record: - _USER_FIELDS = ( + @staticmethod + def _lose_timezone(dt): + if Timestamp._is_timezone_aware(dt): + return dt.astimezone(timezone.utc).replace(tzinfo=None) + return dt + + def __init__(self, dt=None): + if dt is None: + dt = self._new() + dt = dt.replace(microsecond=0) + dt = self._lose_timezone(dt) + self._dt = dt + + @staticmethod + def from_string(s): + return Timestamp(datetime.strptime(s, '%Y-%m-%dT%H:%M:%SZ')) + + def __str__(self): + return self._dt.isoformat() + 'Z' + + @staticmethod + def from_last_seen(ls): + return Timestamp(ls.get_time()) + + def to_last_seen(self): + ls = LastSeen() + ls.set_time(self._dt) + return ls + +class Record(MutableMapping): + FIELDS = ( UserField.UID, UserField.FIRST_NAME, UserField.LAST_NAME, @@ -20,48 +56,52 @@ class Record: UserField.LAST_SEEN, ) - def __init__(self, fields, timestamp=None): + def __init__(self, timestamp=None, fields=None): + if timestamp is None: + timestamp = Timestamp() + if fields is None: + fields = OrderedDict() + self._timestamp = timestamp self._fields = fields - self._timestamp = timestamp if timestamp is not None else _gen_timestamp() - - def __iter__(self): - return iter(self._fields) - - def __contains__(self, field): - return field in self._fields def __getitem__(self, field): + if field is UserField.LAST_SEEN: + return Timestamp.from_last_seen(self._fields[field]) return self._fields[field] def __setitem__(self, field, value): - self._fields[field] = value + if field is UserField.LAST_SEEN: + if isinstance(value, str): + value = Timestamp.from_string(value).to_last_seen() + elif isinstance(value, Timestamp): + value = value.to_last_seen() + elif isinstance(value, LastSeen): + pass + else: + raise TypeError() + self._fields[field] = User.parse(field, value) - def get_timestamp(self): - return self._timestamp + def __delitem__(self, field): + del self._fields[field] - @staticmethod - def _timestamp_from_string(s): - return datetime.fromtimestamp(s) + def __iter__(self): + return iter(self._fields) + + def __len__(self): + return len(self._fields) - def timestamp_to_string(self): - return self.get_timestamp().isoformat() + def get_timestamp(self): + return self._timestamp @staticmethod def from_user(user): - fields = OrderedDict() - for field in Record._USER_FIELDS: - fields[field] = user[field] - if UserField.LAST_SEEN in Record._USER_FIELDS: - fields[UserField.LAST_SEEN] = fields[UserField.LAST_SEEN].isoformat() - return Record(fields) + instance = Record() + for field in Record.FIELDS: + instance[field] = user[field] + return instance - @staticmethod - def from_row(row): - timestamp = Record._timestamp_from_string(row[0]) - fields = OrderedDict() - for i in range(len(Record._USER_FIELDS)): - fields[Record._USER_FIELDS[i]] = row[i + 1] - return Record(fields, timestamp) - - def to_row(self): - return [self.timestamp_to_string()] + [self[field] for field in self] + def to_user(self): + user = User() + for field in self: + user[field] = self[field] + return user |