aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/api.py
blob: 40df08a24c7e9457680d02597ca50bad4c9662ce (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# Copyright 2015 Egor Tensin <Egor.Tensin@gmail.com>
# This file is licensed under the terms of the MIT License.
# See LICENSE.txt for details.

import collections
from datetime import datetime
from enum import Enum
import json
import sys
from urllib.error import URLError
import urllib.request

class Language(Enum):
    DEFAULT = None
    EN = 'en'

    def __str__(self):
        return self.value

class Method(Enum):
    USERS_GET = 'users.get'
    FRIENDS_GET = 'friends.get'

    def __str__(self):
        return self.value

class User:
    def __init__(self, impl):
        self._impl = impl

    def get_uid(self):
        return self._impl[self.Field.UID.value]

    def get_first_name(self):
        return self._impl[self.Field.FIRST_NAME.value]

    def get_last_name(self):
        return self._impl[self.Field.LAST_NAME.value]

    def has_last_name(self):
        return self.Field.LAST_NAME.value in self._impl and self.get_last_name()

    def has_screen_name(self):
        return self.Field.SCREEN_NAME.value in self._impl

    def get_screen_name(self):
        if self.has_screen_name():
            return self._impl[self.Field.SCREEN_NAME.value]
        else:
            return 'id' + str(self.get_uid())

    def is_online(self):
        return self._impl[self.Field.ONLINE.value]

    def get_last_seen(self):
        return datetime.fromtimestamp(self._impl[self.Field.LAST_SEEN.value]['time'])

    def __str__(self):
        return repr(self._impl)

    def __hash__(self):
        return hash(self.get_uid())

    def __eq__(self, other):
        return self.get_uid() == other.get_uid()

    class Field(Enum):
        UID = 'uid'
        FIRST_NAME = 'first_name'
        LAST_NAME = 'last_name'
        SCREEN_NAME = 'screen_name'
        ONLINE = 'online'
        LAST_SEEN = 'last_seen'

        def __str__(self):
            return self.value


class Error(RuntimeError):
    pass

class InvalidResponseError(Error):
    def __init__(self, response):
        self.response = response

    def __str__(self):
        return str(self.response)

class ConnectionError(Error):
    pass

class API:
    def __init__(self, lang=Language.DEFAULT):
        self.lang = lang

    def _lang_is_specified(self):
        return self.lang != Language.DEFAULT

    def _format_method_params(self, **kwargs):
        params = '&'.join(map(lambda k: '{}={}'.format(k, kwargs[k]), kwargs))
        if self._lang_is_specified():
            if params:
                params += '&'
            params += 'lang={}'.format(self.lang)
        return params

    def _build_method_url(self, method, **kwargs):
        return 'https://api.vk.com/method/{}?{}'.format(
            method, self._format_method_params(**kwargs))

    def _call_method(self, method, **kwargs):
        url = self._build_method_url(method, **kwargs)
        try:
            with urllib.request.urlopen(url) as request:
                response = json.loads(request.read().decode())
                if 'response' not in response:
                    raise InvalidResponseError(response)
                return response['response']
        except URLError:
            raise ConnectionError()

    @staticmethod
    def _format_param_values(xs):
        if isinstance(xs, str):
            return xs
        if isinstance(xs, collections.Iterable):
            return ','.join(map(str, xs))
        return str(xs)

    def users_get(self, user_ids, fields=()):
        return map(User, self._call_method(
            Method.USERS_GET,
            user_ids=self._format_param_values(user_ids),
            fields=self._format_param_values(fields)))

    def friends_get(self, user_id, fields=()):
        return map(User, self._call_method(
            Method.FRIENDS_GET,
            user_id=str(user_id),
            fields=self._format_param_values(fields)))