diff options
author | Egor Tensin <Egor.Tensin@gmail.com> | 2016-06-16 23:32:24 +0300 |
---|---|---|
committer | Egor Tensin <Egor.Tensin@gmail.com> | 2016-06-16 23:32:24 +0300 |
commit | eb930123454771b80465505579d723c92b3dd84c (patch) | |
tree | 9aeded9c85edc2ae9c00ae904d252558731d93f7 /vk/user.py | |
parent | make "last seen" timestamps timezone-aware (diff) | |
download | vk-scripts-eb930123454771b80465505579d723c92b3dd84c.tar.gz vk-scripts-eb930123454771b80465505579d723c92b3dd84c.zip |
refactoring & support more user fields
And a bunch of other minor improvements.
Diffstat (limited to '')
-rw-r--r-- | vk/user.py | 208 |
1 files changed, 141 insertions, 67 deletions
@@ -2,11 +2,13 @@ # This file is licensed under the terms of the MIT License. # See LICENSE.txt for details. +from collections import OrderedDict +from collections.abc import Hashable, Mapping, MutableMapping from datetime import datetime, timezone from enum import Enum from numbers import Real, Integral -class Field(Enum): +class UserField(Enum): UID = 'uid' FIRST_NAME = 'first_name' LAST_NAME = 'last_name' @@ -17,115 +19,187 @@ class Field(Enum): def __str__(self): return self.value -class User: - def __init__(self, impl): - self._impl = impl +class LastSeenField(Enum): + TIME = 'time' def __str__(self): - return str(self._impl) + return self.value + +class LastSeen(MutableMapping): + @staticmethod + def from_api_response(source): + instance = LastSeen() + for field in LastSeenField: + if str(field) in source: + instance[field] = source[str(field)] + return instance + + def __init__(self, fields=None): + if fields is None: + fields = OrderedDict() + self._fields = fields + + def __getitem__(self, field): + return self._fields[field] + + def __setitem__(self, field, value): + self._fields[field] = self.parse(field, value) + + def __delitem__(self, field): + del self._fields[field] + + def __iter__(self, field): + return iter(self._fields) + + def __len__(self, field): + return len(self._fields) + + @staticmethod + def parse(field, value): + if field in LastSeen._FIELD_PARSERS: + return LastSeen._FIELD_PARSERS[field](value) + else: + return LastSeen._DEFAULT_FIELD_PARSER(value) + + def _parse_time(x): + if isinstance(x, datetime): + if x.tzinfo is None or x.tzinfo.utcoffset(x) is None: + x = x.replace(tzinfo=timezone.utc) + return x + elif isinstance(x, Real) or isinstance(x, Integral): + return datetime.fromtimestamp(x, tz=timezone.utc) + else: + raise TypeError() + + _FIELD_PARSERS = { + LastSeenField.TIME: _parse_time, + } + + _DEFAULT_FIELD_PARSER = str + + def has_time(self): + return LastSeenField.TIME in self + + def get_time(self): + return self[LastSeenField.TIME] + + def set_time(self, t): + self[LastSeenField.TIME] = t + +class User(Hashable, MutableMapping): + @staticmethod + def from_api_response(source): + instance = User() + for field in UserField: + if str(field) in source: + instance[field] = source[str(field)] + return instance + + def __init__(self, fields=None): + if fields is None: + fields = OrderedDict() + self._fields = fields def __eq__(self, other): - return self.get_uid() == other.get_uid() + return self._fields == other._fields - def __hash__(self): + def __hash__(self, fields=None): return hash(self.get_uid()) + def __getitem__(self, field): + return self._fields[field] + + def __setitem__(self, field, value): + self._fields[field] = self.parse(field, value) + + def __delitem__(self, field): + del self._fields[field] + def __iter__(self): - return iter(self._impl) + return iter(self._fields) - def __contains__(self, field): - if field is Field.LAST_SEEN: - return self._has_last_seen() - return self._normalize_field(field) in self._impl + def __len__(self): + return len(self._fields) - def __getitem__(self, field): - if field is Field.LAST_SEEN: - return self._get_last_seen() - return self._impl[self._normalize_field(field)] + @staticmethod + def parse(field, value): + if field in User._FIELD_PARSERS: + return User._FIELD_PARSERS[field](value) + else: + return User._DEFAULT_FIELD_PARSER(value) - def __setitem__(self, field, value): - if field is Field.LAST_SEEN: - self._set_last_seen(value) + def _parse_last_seen(x): + if isinstance(x, LastSeen): + return x + elif isinstance(x, Mapping): + return LastSeen.from_api_response(x) else: - self._impl[self._normalize_field(field)] = value + raise TypeError() - @staticmethod - def _normalize_field(field): - if isinstance(field, Field): - return field.value - return field + _FIELD_PARSERS = { + UserField.UID: int, + UserField.ONLINE: bool, + UserField.LAST_SEEN: _parse_last_seen, + } + + _DEFAULT_FIELD_PARSER = str def get_uid(self): - return self[Field.UID] + return self[UserField.UID] def get_first_name(self): - return self[Field.FIRST_NAME] + return self[UserField.FIRST_NAME] def set_first_name(self, name): - self[Field.FIRST_NAME] = name + self[UserField.FIRST_NAME] = name def has_last_name(self): - return Field.LAST_NAME in self and self.get_last_name() + return UserField.LAST_NAME in self and self.get_last_name() def get_last_name(self): - return self[Field.LAST_NAME] + return self[UserField.LAST_NAME] def set_last_name(self, name): - self[Field.LAST_NAME] = name + self[UserField.LAST_NAME] = name def has_screen_name(self): - return Field.SCREEN_NAME in self + return UserField.SCREEN_NAME in self def get_screen_name(self): if self.has_screen_name(): - return self[Field.SCREEN_NAME] + return self[UserField.SCREEN_NAME] else: return 'id' + str(self.get_uid()) def set_screen_name(self, name): - self[Field.SCREEN_NAME] = name + self[UserField.SCREEN_NAME] = name - def has_online(self): - return Field.ONLINE in self + def has_online_flag(self): + return UserField.ONLINE in self def is_online(self): - return bool(self[Field.ONLINE]) - - def set_online(self, value=True): - self[Field.ONLINE] = value + return self[UserField.ONLINE] - @staticmethod - def _last_seen_from_timestamp(t): - return datetime.fromtimestamp(t, timezone.utc) - - @staticmethod - def _last_seen_to_timestamp(t): - if isinstance(t, datetime): - return t.timestamp() - elif isinstance(t, Real) or isinstance(t, Integral): - return t - else: - raise TypeError('"last seen" time must be either a `datetime` or a POSIX timestamp') + def is_offline(self): + return not self.is_online() - def _has_last_seen(self): - return Field.LAST_SEEN.value in self._impl and 'time' in self._impl[Field.LAST_SEEN.value] + def set_online_flag(self, value=True): + self[UserField.ONLINE] = value def has_last_seen(self): - return self._has_last_seen() + return UserField.LAST_SEEN in self - def _get_last_seen(self): - return self._last_seen_from_timestamp(self._impl[Field.LAST_SEEN.value]['time']) + def get_last_seen(self): + return self[UserField.LAST_SEEN] - def get_last_seen_utc(self): - return self._get_last_seen() + def set_last_seen(self, last_seen): + self[UserField.LAST_SEEN] = last_seen - def get_last_seen_local(self): - return self._get_last_seen().astimezone() + def get_last_seen_time(self): + return self.has_last_seen() and self.get_last_seen().has_time() - def _set_last_seen(self, t): - if Field.LAST_SEEN.value not in self._impl: - self._impl[Field.LAST_SEEN.value] = {} - self._impl[Field.LAST_SEEN.value]['time'] = self._last_seen_to_timestamp(t) + def get_last_seen_time(self): + return self[UserField.LAST_SEEN].get_time() - def set_last_seen(self, t): - self._set_last_seen(t) + def get_last_seen_time_local(self): + return self[UserField.LAST_SEEN].get_time().astimezone() |