aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--api.py131
-rw-r--r--print_mutual_friends.py38
-rw-r--r--track_online.py77
3 files changed, 171 insertions, 75 deletions
diff --git a/api.py b/api.py
index e7fbb81..fb08ca4 100644
--- a/api.py
+++ b/api.py
@@ -2,19 +2,118 @@
# This file is licensed under the terms of the MIT License.
# See LICENSE.txt for details.
-import json, sys, urllib.request
-
-def call_method(method_name, **kwargs):
- get_args = '&'.join(map(lambda k: '{}={}'.format(k, kwargs[k]), kwargs))
- url = 'https://api.vk.com/method/{}?lang=en&{}'.format(method_name, get_args)
- response = json.loads(urllib.request.urlopen(url).read().decode())
- if 'response' not in response:
- print(response, file=sys.stderr)
- sys.exit(1)
- return response['response']
-
-def users_get(**kwargs):
- return call_method('users.get', **kwargs)
-
-def friends_get(**kwargs):
- return call_method('friends.get', **kwargs)
+import collections
+from datetime import datetime
+from enum import Enum
+import json
+import sys
+import urllib.request
+
+class Language(Enum):
+ DEFAULT = None
+ EN = 'en'
+
+ def __str__(self):
+ return self.value
+
+class Method(Enum):
+ USERS_GET = 'users.get'
+ FRIENDS_GET = 'friends.get'
+
+ def __str__(self):
+ return self.value
+
+class User:
+ def __init__(self, impl):
+ self._impl = impl
+
+ def get_uid(self):
+ return self._impl[self.Field.UID.value]
+
+ def get_first_name(self):
+ return self._impl[self.Field.FIRST_NAME.value]
+
+ def get_last_name(self):
+ return self._impl[self.Field.LAST_NAME.value]
+
+ def has_last_name(self):
+ return self.Field.LAST_NAME.value in self._impl and self.get_last_name()
+
+ def has_screen_name(self):
+ return self.Field.SCREEN_NAME.value in self._impl
+
+ def get_screen_name(self):
+ if self.has_screen_name():
+ return self._impl[self.Field.SCREEN_NAME.value]
+ else:
+ return 'id' + str(self.get_uid())
+
+ def is_online(self):
+ return self._impl[self.Field.ONLINE.value]
+
+ def get_last_seen(self):
+ return datetime.fromtimestamp(self._impl[self.Field.LAST_SEEN.value]['time'])
+
+ def __str__(self):
+ return repr(self._impl)
+
+ def __hash__(self):
+ return hash(self.get_uid())
+
+ def __eq__(self, other):
+ return self.get_uid() == other.get_uid()
+
+ class Field(Enum):
+ UID = 'uid'
+ FIRST_NAME = 'first_name'
+ LAST_NAME = 'last_name'
+ SCREEN_NAME = 'screen_name'
+ ONLINE = 'online'
+ LAST_SEEN = 'last_seen'
+
+ def __str__(self):
+ return self.value
+
+class API:
+ def __init__(self, lang=Language.DEFAULT):
+ self.lang = lang
+
+ def _lang_is_specified(self):
+ return self.lang != Language.DEFAULT
+
+ def _call_method(self, method, **kwargs):
+ get_args = '&'.join(map(lambda k: '{}={}'.format(k, kwargs[k]), kwargs))
+ if self._lang_is_specified():
+ get_args += '&lang={}'.format(self.lang.value)
+ url = 'https://api.vk.com/method/{}?{}'.format(method, get_args)
+ response = json.loads(urllib.request.urlopen(url).read().decode())
+ if 'response' not in response:
+ raise self.Error(response)
+ return response['response']
+
+ @staticmethod
+ def _format_param_values(xs):
+ if isinstance(xs, str):
+ return xs
+ if isinstance(xs, collections.Iterable):
+ return ','.join(map(str, xs))
+ return str(xs)
+
+ def users_get(self, user_ids, fields=()):
+ return map(User, self._call_method(
+ Method.USERS_GET,
+ user_ids=self._format_param_values(user_ids),
+ fields=self._format_param_values(fields)))
+
+ def friends_get(self, user_id, fields=()):
+ return map(User, self._call_method(
+ Method.FRIENDS_GET,
+ user_id=str(user_id),
+ fields=self._format_param_values(fields)))
+
+ class Error(RuntimeError):
+ def __init__(self, impl):
+ self._impl = impl
+
+ def __str__(self):
+ return repr(self._impl)
diff --git a/print_mutual_friends.py b/print_mutual_friends.py
index b3beb39..9de77cb 100644
--- a/print_mutual_friends.py
+++ b/print_mutual_friends.py
@@ -2,41 +2,33 @@
# This file is licensed under the terms of the MIT License.
# See LICENSE.txt for details.
-import api, sys
+import sys
-def users_get(user_ids):
- response = api.users_get(user_ids=','.join(user_ids),
- fields='screen_name')
- if len(response) < len(user_ids):
- raise RuntimeError('Couldn\'t find at least one of the users!')
- return response
+from api import *
-def friends_get(user_id):
- return api.friends_get(user_id=user_id)
+def query_friends(api, user):
+ return api.friends_get(user.get_uid(), fields=User.Field.SCREEN_NAME)
-def extract_screen_name_or_uid(user):
- if 'screen_name' in user:
- return user['screen_name']
- return 'id' + str(user['uid'])
-
-def format_friend(user):
- return '{} {} ({})'.format(
- user['last_name'], user['first_name'],
- extract_screen_name_or_uid(user))
+def format_user(user):
+ if user.has_last_name():
+ return '{} {} ({})'.format(user.get_last_name(), user.get_first_name(), user.get_screen_name())
+ else:
+ return '{} ({})'.format(user.get_first_name(), user.get_screen_name())
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(
description='Learn who your ex and her new boyfriend are both friends with.')
+
parser.add_argument(metavar='UID', dest='user_ids', nargs='+',
help='user IDs or "screen names"')
args = parser.parse_args()
- users = users_get(args.user_ids)
- user_ids = map(lambda user: user['uid'], users)
- friend_lists = map(frozenset, map(friends_get, user_ids))
+ api = API(Language.EN)
+ users = api.users_get(args.user_ids, fields=User.Field.SCREEN_NAME)
+
+ friend_lists = map(lambda user: frozenset(query_friends(api, user)), users)
mutual_friends = frozenset.intersection(*friend_lists)
if mutual_friends:
- mutual_friends = users_get(list(map(str, mutual_friends)))
for friend in mutual_friends:
- print(format_friend(friend))
+ print(format_user(friend))
diff --git a/track_online.py b/track_online.py
index ed43f11..2c6776e 100644
--- a/track_online.py
+++ b/track_online.py
@@ -3,49 +3,54 @@
# See LICENSE.txt for details.
import argparse
-from datetime import datetime
import logging
import time
import sys
-import api
+from api import *
-def users_get(user_ids):
- response = api.users_get(user_ids=','.join(user_ids),
- fields='online,last_seen')
- if len(response) < len(user_ids):
- raise RuntimeError('Couldn\'t update status of at least one of the users!')
- return response
+def format_user(user):
+ if user.has_last_name():
+ return '{} {}'.format(user.get_last_name(), user.get_first_name())
+ else:
+ return '{}'.format(user.get_first_name())
+
+def format_user_is_online(user):
+ return '{} is ONLINE'.format(format_user(user))
+
+def format_user_is_offline(user):
+ return '{} is OFFLINE'.format(format_user(user))
+
+def format_user_last_seen(user):
+ return '{} was last seen at {}'.format(format_user(user), user.get_last_seen())
-def format_user_name(user):
- return '{} {}'.format(user['last_name'], user['first_name'])
+def format_user_went_online(user):
+ return '{} went ONLINE'.format(format_user(user))
-def user_is_online(user):
- logging.info('{} is ONLINE'.format(format_user_name(user)))
+def format_user_went_offline(user):
+ return '{} went OFFLINE'.format(format_user(user))
def user_is_offline(user):
- user_name = format_user_name(user)
- logging.info('{} is OFFLINE'.format(user_name))
- last_seen = datetime.fromtimestamp(user['last_seen']['time'])
- logging.info('{} was last seen at {}'.format(user_name, last_seen))
+ logging.info(format_user_is_offline(user))
+ logging.info(format_user_last_seen(user))
def user_went_online(user):
- logging.info('{} went ONLINE'.format(format_user_name(user)))
+ logging.info(format_user_went_online(user))
def user_went_offline(user):
- logging.info('{} went OFFLINE'.format(format_user_name(user)))
+ logging.info(format_usre_went_offline(user))
-def print_user(status):
- if status['online']:
- user_is_online(status)
+def print_status(user):
+ if user.is_online():
+ user_is_online(user)
else:
- user_is_offline(status)
+ user_is_offline(user)
-def print_status_update(status):
- if status['online']:
- user_went_online(status)
+def print_status_update(user):
+ if user.is_online():
+ user_went_online(user)
else:
- user_went_offline(status)
+ user_went_offline(user)
def parse_timeout(source):
timeout = int(source)
@@ -56,24 +61,23 @@ def parse_timeout(source):
DEFAULT_TIMEOUT=5
-def print_initial_status(user_ids):
- users = users_get(user_ids)
+def loop_update_status(api, user_ids, timeout=DEFAULT_TIMEOUT):
+ fields = User.Field.ONLINE, User.Field.LAST_SEEN
+ users = list(api.users_get(user_ids, fields))
for user in users:
- print_user(user)
- return users
-
-def loop_update_status(users, user_ids, timeout=DEFAULT_TIMEOUT):
+ print_status(user)
while True:
time.sleep(timeout)
- updated_users = users_get(user_ids)
+ updated_users = list(api.users_get(user_ids, fields))
for i in range(len(updated_users)):
- if users[i]['online'] != updated_users[i]['online']:
+ if users[i].is_online() != updated_users[i].is_online():
users[i] = updated_users[i]
print_status_update(updated_users[i])
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Track when people go online/offline.')
+
parser.add_argument(metavar='UID', dest='user_ids', nargs='+',
help='user IDs or "screen names"')
parser.add_argument('-t', '--timeout', default=DEFAULT_TIMEOUT,
@@ -89,9 +93,10 @@ if __name__ == '__main__':
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
+ api = API(Language.EN)
+
try:
- users = print_initial_status(args.user_ids)
- loop_update_status(users, args.user_ids, timeout=args.timeout)
+ loop_update_status(api, args.user_ids, timeout=args.timeout)
except KeyboardInterrupt:
pass
except Exception as e: