aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/vk/utils/tracking/db/record.py
diff options
context:
space:
mode:
Diffstat (limited to 'vk/utils/tracking/db/record.py')
-rw-r--r--vk/utils/tracking/db/record.py114
1 files changed, 77 insertions, 37 deletions
diff --git a/vk/utils/tracking/db/record.py b/vk/utils/tracking/db/record.py
index 7cb054f..4748a37 100644
--- a/vk/utils/tracking/db/record.py
+++ b/vk/utils/tracking/db/record.py
@@ -3,15 +3,51 @@
# See LICENSE.txt for details.
from collections import OrderedDict
+from collections.abc import MutableMapping
from datetime import datetime, timezone
-from vk.user import Field as UserField
+from vk.user import LastSeen, User, UserField
-def _gen_timestamp():
- return datetime.now(timezone.utc).replace(microsecond=0)
+class Timestamp:
+ @staticmethod
+ def _new():
+ return datetime.utcnow()
+
+ @staticmethod
+ def _is_timezone_aware(dt):
+ return dt.tzinfo is not None and dt.tzinfo.utcoffset(dt) is not None
-class Record:
- _USER_FIELDS = (
+ @staticmethod
+ def _lose_timezone(dt):
+ if Timestamp._is_timezone_aware(dt):
+ return dt.astimezone(timezone.utc).replace(tzinfo=None)
+ return dt
+
+ def __init__(self, dt=None):
+ if dt is None:
+ dt = self._new()
+ dt = dt.replace(microsecond=0)
+ dt = self._lose_timezone(dt)
+ self._dt = dt
+
+ @staticmethod
+ def from_string(s):
+ return Timestamp(datetime.strptime(s, '%Y-%m-%dT%H:%M:%SZ'))
+
+ def __str__(self):
+ return self._dt.isoformat() + 'Z'
+
+ @staticmethod
+ def from_last_seen(ls):
+ return Timestamp(ls.get_time())
+
+ def to_last_seen(self):
+ ls = LastSeen()
+ ls.set_time(self._dt)
+ return ls
+
+class Record(MutableMapping):
+ FIELDS = (
UserField.UID,
UserField.FIRST_NAME,
UserField.LAST_NAME,
@@ -20,48 +56,52 @@ class Record:
UserField.LAST_SEEN,
)
- def __init__(self, fields, timestamp=None):
+ def __init__(self, timestamp=None, fields=None):
+ if timestamp is None:
+ timestamp = Timestamp()
+ if fields is None:
+ fields = OrderedDict()
+ self._timestamp = timestamp
self._fields = fields
- self._timestamp = timestamp if timestamp is not None else _gen_timestamp()
-
- def __iter__(self):
- return iter(self._fields)
-
- def __contains__(self, field):
- return field in self._fields
def __getitem__(self, field):
+ if field is UserField.LAST_SEEN:
+ return Timestamp.from_last_seen(self._fields[field])
return self._fields[field]
def __setitem__(self, field, value):
- self._fields[field] = value
+ if field is UserField.LAST_SEEN:
+ if isinstance(value, str):
+ value = Timestamp.from_string(value).to_last_seen()
+ elif isinstance(value, Timestamp):
+ value = value.to_last_seen()
+ elif isinstance(value, LastSeen):
+ pass
+ else:
+ raise TypeError()
+ self._fields[field] = User.parse(field, value)
- def get_timestamp(self):
- return self._timestamp
+ def __delitem__(self, field):
+ del self._fields[field]
- @staticmethod
- def _timestamp_from_string(s):
- return datetime.fromtimestamp(s)
+ def __iter__(self):
+ return iter(self._fields)
+
+ def __len__(self):
+ return len(self._fields)
- def timestamp_to_string(self):
- return self.get_timestamp().isoformat()
+ def get_timestamp(self):
+ return self._timestamp
@staticmethod
def from_user(user):
- fields = OrderedDict()
- for field in Record._USER_FIELDS:
- fields[field] = user[field]
- if UserField.LAST_SEEN in Record._USER_FIELDS:
- fields[UserField.LAST_SEEN] = fields[UserField.LAST_SEEN].isoformat()
- return Record(fields)
+ instance = Record()
+ for field in Record.FIELDS:
+ instance[field] = user[field]
+ return instance
- @staticmethod
- def from_row(row):
- timestamp = Record._timestamp_from_string(row[0])
- fields = OrderedDict()
- for i in range(len(Record._USER_FIELDS)):
- fields[Record._USER_FIELDS[i]] = row[i + 1]
- return Record(fields, timestamp)
-
- def to_row(self):
- return [self.timestamp_to_string()] + [self[field] for field in self]
+ def to_user(self):
+ user = User()
+ for field in self:
+ user[field] = self[field]
+ return user