From 21c3caaba44fc13221ba63600262e6eb15cbe824 Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Fri, 12 Feb 2016 08:07:19 +0300 Subject: refactoring --- api.py | 131 ++++++++++++++++++++++++++++++++++++++++++------ print_mutual_friends.py | 38 ++++++-------- track_online.py | 77 +++++++++++++++------------- 3 files changed, 171 insertions(+), 75 deletions(-) diff --git a/api.py b/api.py index e7fbb81..fb08ca4 100644 --- a/api.py +++ b/api.py @@ -2,19 +2,118 @@ # This file is licensed under the terms of the MIT License. # See LICENSE.txt for details. -import json, sys, urllib.request - -def call_method(method_name, **kwargs): - get_args = '&'.join(map(lambda k: '{}={}'.format(k, kwargs[k]), kwargs)) - url = 'https://api.vk.com/method/{}?lang=en&{}'.format(method_name, get_args) - response = json.loads(urllib.request.urlopen(url).read().decode()) - if 'response' not in response: - print(response, file=sys.stderr) - sys.exit(1) - return response['response'] - -def users_get(**kwargs): - return call_method('users.get', **kwargs) - -def friends_get(**kwargs): - return call_method('friends.get', **kwargs) +import collections +from datetime import datetime +from enum import Enum +import json +import sys +import urllib.request + +class Language(Enum): + DEFAULT = None + EN = 'en' + + def __str__(self): + return self.value + +class Method(Enum): + USERS_GET = 'users.get' + FRIENDS_GET = 'friends.get' + + def __str__(self): + return self.value + +class User: + def __init__(self, impl): + self._impl = impl + + def get_uid(self): + return self._impl[self.Field.UID.value] + + def get_first_name(self): + return self._impl[self.Field.FIRST_NAME.value] + + def get_last_name(self): + return self._impl[self.Field.LAST_NAME.value] + + def has_last_name(self): + return self.Field.LAST_NAME.value in self._impl and self.get_last_name() + + def has_screen_name(self): + return self.Field.SCREEN_NAME.value in self._impl + + def get_screen_name(self): + if self.has_screen_name(): + return self._impl[self.Field.SCREEN_NAME.value] + else: + return 'id' + str(self.get_uid()) + + def is_online(self): + return self._impl[self.Field.ONLINE.value] + + def get_last_seen(self): + return datetime.fromtimestamp(self._impl[self.Field.LAST_SEEN.value]['time']) + + def __str__(self): + return repr(self._impl) + + def __hash__(self): + return hash(self.get_uid()) + + def __eq__(self, other): + return self.get_uid() == other.get_uid() + + class Field(Enum): + UID = 'uid' + FIRST_NAME = 'first_name' + LAST_NAME = 'last_name' + SCREEN_NAME = 'screen_name' + ONLINE = 'online' + LAST_SEEN = 'last_seen' + + def __str__(self): + return self.value + +class API: + def __init__(self, lang=Language.DEFAULT): + self.lang = lang + + def _lang_is_specified(self): + return self.lang != Language.DEFAULT + + def _call_method(self, method, **kwargs): + get_args = '&'.join(map(lambda k: '{}={}'.format(k, kwargs[k]), kwargs)) + if self._lang_is_specified(): + get_args += '&lang={}'.format(self.lang.value) + url = 'https://api.vk.com/method/{}?{}'.format(method, get_args) + response = json.loads(urllib.request.urlopen(url).read().decode()) + if 'response' not in response: + raise self.Error(response) + return response['response'] + + @staticmethod + def _format_param_values(xs): + if isinstance(xs, str): + return xs + if isinstance(xs, collections.Iterable): + return ','.join(map(str, xs)) + return str(xs) + + def users_get(self, user_ids, fields=()): + return map(User, self._call_method( + Method.USERS_GET, + user_ids=self._format_param_values(user_ids), + fields=self._format_param_values(fields))) + + def friends_get(self, user_id, fields=()): + return map(User, self._call_method( + Method.FRIENDS_GET, + user_id=str(user_id), + fields=self._format_param_values(fields))) + + class Error(RuntimeError): + def __init__(self, impl): + self._impl = impl + + def __str__(self): + return repr(self._impl) diff --git a/print_mutual_friends.py b/print_mutual_friends.py index b3beb39..9de77cb 100644 --- a/print_mutual_friends.py +++ b/print_mutual_friends.py @@ -2,41 +2,33 @@ # This file is licensed under the terms of the MIT License. # See LICENSE.txt for details. -import api, sys +import sys -def users_get(user_ids): - response = api.users_get(user_ids=','.join(user_ids), - fields='screen_name') - if len(response) < len(user_ids): - raise RuntimeError('Couldn\'t find at least one of the users!') - return response +from api import * -def friends_get(user_id): - return api.friends_get(user_id=user_id) +def query_friends(api, user): + return api.friends_get(user.get_uid(), fields=User.Field.SCREEN_NAME) -def extract_screen_name_or_uid(user): - if 'screen_name' in user: - return user['screen_name'] - return 'id' + str(user['uid']) - -def format_friend(user): - return '{} {} ({})'.format( - user['last_name'], user['first_name'], - extract_screen_name_or_uid(user)) +def format_user(user): + if user.has_last_name(): + return '{} {} ({})'.format(user.get_last_name(), user.get_first_name(), user.get_screen_name()) + else: + return '{} ({})'.format(user.get_first_name(), user.get_screen_name()) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser( description='Learn who your ex and her new boyfriend are both friends with.') + parser.add_argument(metavar='UID', dest='user_ids', nargs='+', help='user IDs or "screen names"') args = parser.parse_args() - users = users_get(args.user_ids) - user_ids = map(lambda user: user['uid'], users) - friend_lists = map(frozenset, map(friends_get, user_ids)) + api = API(Language.EN) + users = api.users_get(args.user_ids, fields=User.Field.SCREEN_NAME) + + friend_lists = map(lambda user: frozenset(query_friends(api, user)), users) mutual_friends = frozenset.intersection(*friend_lists) if mutual_friends: - mutual_friends = users_get(list(map(str, mutual_friends))) for friend in mutual_friends: - print(format_friend(friend)) + print(format_user(friend)) diff --git a/track_online.py b/track_online.py index ed43f11..2c6776e 100644 --- a/track_online.py +++ b/track_online.py @@ -3,49 +3,54 @@ # See LICENSE.txt for details. import argparse -from datetime import datetime import logging import time import sys -import api +from api import * -def users_get(user_ids): - response = api.users_get(user_ids=','.join(user_ids), - fields='online,last_seen') - if len(response) < len(user_ids): - raise RuntimeError('Couldn\'t update status of at least one of the users!') - return response +def format_user(user): + if user.has_last_name(): + return '{} {}'.format(user.get_last_name(), user.get_first_name()) + else: + return '{}'.format(user.get_first_name()) + +def format_user_is_online(user): + return '{} is ONLINE'.format(format_user(user)) + +def format_user_is_offline(user): + return '{} is OFFLINE'.format(format_user(user)) + +def format_user_last_seen(user): + return '{} was last seen at {}'.format(format_user(user), user.get_last_seen()) -def format_user_name(user): - return '{} {}'.format(user['last_name'], user['first_name']) +def format_user_went_online(user): + return '{} went ONLINE'.format(format_user(user)) -def user_is_online(user): - logging.info('{} is ONLINE'.format(format_user_name(user))) +def format_user_went_offline(user): + return '{} went OFFLINE'.format(format_user(user)) def user_is_offline(user): - user_name = format_user_name(user) - logging.info('{} is OFFLINE'.format(user_name)) - last_seen = datetime.fromtimestamp(user['last_seen']['time']) - logging.info('{} was last seen at {}'.format(user_name, last_seen)) + logging.info(format_user_is_offline(user)) + logging.info(format_user_last_seen(user)) def user_went_online(user): - logging.info('{} went ONLINE'.format(format_user_name(user))) + logging.info(format_user_went_online(user)) def user_went_offline(user): - logging.info('{} went OFFLINE'.format(format_user_name(user))) + logging.info(format_usre_went_offline(user)) -def print_user(status): - if status['online']: - user_is_online(status) +def print_status(user): + if user.is_online(): + user_is_online(user) else: - user_is_offline(status) + user_is_offline(user) -def print_status_update(status): - if status['online']: - user_went_online(status) +def print_status_update(user): + if user.is_online(): + user_went_online(user) else: - user_went_offline(status) + user_went_offline(user) def parse_timeout(source): timeout = int(source) @@ -56,24 +61,23 @@ def parse_timeout(source): DEFAULT_TIMEOUT=5 -def print_initial_status(user_ids): - users = users_get(user_ids) +def loop_update_status(api, user_ids, timeout=DEFAULT_TIMEOUT): + fields = User.Field.ONLINE, User.Field.LAST_SEEN + users = list(api.users_get(user_ids, fields)) for user in users: - print_user(user) - return users - -def loop_update_status(users, user_ids, timeout=DEFAULT_TIMEOUT): + print_status(user) while True: time.sleep(timeout) - updated_users = users_get(user_ids) + updated_users = list(api.users_get(user_ids, fields)) for i in range(len(updated_users)): - if users[i]['online'] != updated_users[i]['online']: + if users[i].is_online() != updated_users[i].is_online(): users[i] = updated_users[i] print_status_update(updated_users[i]) if __name__ == '__main__': parser = argparse.ArgumentParser( description='Track when people go online/offline.') + parser.add_argument(metavar='UID', dest='user_ids', nargs='+', help='user IDs or "screen names"') parser.add_argument('-t', '--timeout', default=DEFAULT_TIMEOUT, @@ -89,9 +93,10 @@ if __name__ == '__main__': level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') + api = API(Language.EN) + try: - users = print_initial_status(args.user_ids) - loop_update_status(users, args.user_ids, timeout=args.timeout) + loop_update_status(api, args.user_ids, timeout=args.timeout) except KeyboardInterrupt: pass except Exception as e: -- cgit v1.2.3