]> kaliko git repositories - mpd-sima.git/blob - sima/lib/simafm.py
Plain api keys obfuscation
[mpd-sima.git] / sima / lib / simafm.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Jack Kaliko <kaliko@azylum.org>
4 # Copyright (c) 2010 Eric Casteleijn <thisfred@gmail.com> (Throttle decorator)
5 #
6 #   This program is free software: you can redistribute it and/or modify
7 #   it under the terms of the GNU General Public License as published by
8 #   the Free Software Foundation, either version 3 of the License, or
9 #   (at your option) any later version.
10 #
11 #   This program is distributed in the hope that it will be useful,
12 #   but WITHOUT ANY WARRANTY; without even the implied warranty of
13 #   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 #   GNU General Public License for more details.
15 #
16 #   You should have received a copy of the GNU General Public License
17 #   along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19 #
20
21 """
22 Consume last.fm web service
23
24 """
25
26 __version__ = '0.3.1'
27 __author__ = 'Jack Kaliko'
28
29
30 import urllib.request, urllib.error, urllib.parse
31
32 from datetime import datetime, timedelta
33 from http.client import BadStatusLine
34 from socket import timeout as SocketTimeOut
35 from time import sleep
36 from xml.etree.cElementTree import ElementTree
37
38 from sima import LFM
39 from sima.utils.utils import getws
40 getws(LFM)
41
42 # Some definitions
43 WAIT_BETWEEN_REQUESTS = timedelta(0, 0.4)
44 LFM_ERRORS = dict({'2': 'Invalid service -This service does not exist',
45     '3': 'Invalid Method - No method with that name in this package',
46     '4': 'Authentication Failed - You do not have permissions to access the service',
47     '5': "'Invalid format - This service doesn't exist in that format",
48     '6': 'Invalid parameters - Your request is missing a required parameter',
49     '7': 'Invalid resource specified',
50     '9': 'Invalid session key - Please re-authenticate',
51     '10': 'Invalid API key - You must be granted a valid key by last.fm',
52     '11': 'Service Offline - This service is temporarily offline. Try again later.',
53     '12': 'Subscription Error - The user needs to be subscribed in order to do that',
54     '13': 'Invalid method signature supplied',
55     '26': 'Suspended API key - Access for your account has been suspended, please contact Last.fm',
56     })
57
58
59 class XmlFMError(Exception):  # Errors
60     """
61     Exception raised for errors in the input.
62     """
63
64     def __init__(self, expression):
65         self.expression = expression
66
67     def __str__(self):
68         return repr(self.expression)
69
70
71 class EncodingError(XmlFMError):
72     """Raised when string is not unicode"""
73     pass
74
75
76 class XmlFMHTTPError(XmlFMError):
77     """Raised when failed to connect server"""
78
79     def __init__(self, expression):
80         if hasattr(expression, 'code'):
81             self.expression = 'error %d: %s' % (expression.code,
82                 expression.msg)
83         else:
84             self.expression = 'error: %s' % expression
85
86
87 class XmlFMNotFound(XmlFMError):
88     """Raised when no artist is found"""
89
90     def __init__(self, message=None):
91         if not message:
92             message = 'Artist probably not found (http error 400)'
93         self.expression = (message)
94
95
96 class XmlFMMissingArtist(XmlFMError):
97     """Raised when no artist name provided"""
98
99     def __init__(self, message=None):
100         if not message:
101             message = 'Missing artist name.'
102         self.expression = (message)
103
104
105 class XmlFMTimeOut(XmlFMError):
106     """Raised when urlopen times out"""
107
108     def __init__(self, message=None):
109         if not message:
110             message = 'Connection to last.fm web services times out!'
111         self.expression = (message)
112
113
114 class Throttle():
115     def __init__(self, wait):
116         self.wait = wait
117         self.last_called = datetime.now()
118
119     def __call__(self, func):
120         def wrapper(*args, **kwargs):
121             while self.last_called + self.wait > datetime.now():
122                 #print('waiting…')
123                 sleep(0.1)
124             result = func(*args, **kwargs)
125             self.last_called = datetime.now()
126             return result
127         return wrapper
128
129
130 class AudioScrobblerCache():
131     def __init__(self, elem, last):
132         self.elemtree = elem
133         self.requestdate = last
134
135     def created(self):
136         return self.requestdate
137
138     def gettree(self):
139         return self.elemtree
140
141
142 class SimaFM():
143     """
144     """
145     root_url = 'http://{host}/{version}/'.format(**LFM)
146     request = dict({'similar': '?method=artist.getsimilar&artist=%s&' +\
147                                 'api_key={apikey}'.format(**LFM),
148                     'top': '?method=artist.gettoptracks&artist=%s&' +\
149                                 'api_key={apikey}'.format(**LFM),
150                     'track': '?method=track.getsimilar&artist=%s' +\
151                             '&track=%s' + 'api_key={apikey}'.format(**LFM),
152                     'info': '?method=artist.getinfo&artist=%s' +\
153                             'api_key={apikey}'.format(**LFM),
154                     })
155     cache = dict({})
156     timestamp = datetime.utcnow()
157     count = 0
158
159     def __init__(self, artist=None, cache=True):
160         self._url = None
161         #SimaFM.count += 1
162         self.current_element = None
163         self.caching = cache
164         self.purge_cache()
165
166     def _is_in_cache(self):
167         """Controls presence of url in cache.
168         """
169         if self._url in SimaFM.cache:
170             #print('already fetch {0}'.format(self.artist))
171             return True
172         return False
173
174     def _fetch(self):
175         """Use cached elements or proceed http request"""
176         if self._is_in_cache():
177             self.current_element = SimaFM.cache.get(self._url).gettree()
178             return
179         self._fetch_lfm()
180
181     @Throttle(WAIT_BETWEEN_REQUESTS)
182     def _fetch_lfm(self):
183         """Get artists, fetch xml from last.fm"""
184         try:
185             fd = urllib.request.urlopen(url=self._url,
186                     timeout=15)
187         except SocketTimeOut:
188             raise XmlFMTimeOut()
189         except BadStatusLine as err:
190             raise XmlFMHTTPError(err)
191         except urllib.error.URLError as err:
192             if hasattr(err, 'reason'):
193                 # URLError, failed to reach server
194                 raise XmlFMError(repr(err.reason))
195             if hasattr(err, 'code'):
196                 # HTTPError, the server couldn't fulfill the request
197                 if err.code == 400:
198                     raise XmlFMNotFound()
199                 raise XmlFMHTTPError(err)
200             raise XmlFMError(err)
201         headers = dict(fd.getheaders())
202         content_type = headers.get('Content-Type').split(';')
203         if content_type[0] != "text/xml":
204             raise XmlFMError('None XML returned from the server')
205         if content_type[1].strip() != "charset=utf-8":
206             raise XmlFMError('XML not UTF-8 encoded!')
207         try:
208             self.current_element = ElementTree(file=fd)
209         except SocketTimeOut:
210             raise XmlFMTimeOut()
211         finally:
212             fd.close()
213         self._controls_lfm_answer()
214         if self.caching:
215             SimaFM.cache[self._url] = AudioScrobblerCache(self.current_element,
216                     datetime.utcnow())
217
218     def _controls_lfm_answer(self):
219         """Controls last.fm answer.
220         """
221         status = self.current_element.getroot().attrib.get('status')
222         if status == 'ok':
223             return True
224         if status == 'failed':
225             error = self.current_element.find('error').attrib.get('code')
226             errormsg = self.current_element.findtext('error')
227             #if error in LFM_ERRORS.keys():
228             #    print LFM_ERRORS.get(error)
229             raise XmlFMNotFound(errormsg)
230
231     def _controls_artist(self, artist):
232         """
233         """
234         self.artist = artist
235         if not self.artist:
236             raise XmlFMMissingArtist('Missing artist name calling SimaFM.get_<method>()')
237         if not isinstance(self.artist, str):
238             raise EncodingError('"%s" not unicode object' % self.artist)
239         # last.fm is UTF-8 encoded URL
240         self.artist_utf8 = self.artist.encode('UTF-8')
241
242     def purge_cache(self, age=4):
243         now = datetime.utcnow()
244         if now.hour == SimaFM.timestamp.hour:
245             return
246         SimaFM.timestamp = datetime.utcnow()
247         cache = SimaFM.cache
248         delta = timedelta(hours=age)
249         for url in list(cache.keys()):
250             timestamp = cache.get(url).created()
251             if now - timestamp > delta:
252                 cache.pop(url)
253
254     def get_similar(self, artist=None):
255         """
256         """
257         self._controls_artist(artist)
258         # Construct URL
259         url = SimaFM.root_url + SimaFM.request.get('similar')
260         self._url = url % (urllib.parse.quote(self.artist_utf8, safe=''))
261         self._fetch()
262         # TODO: controls name encoding
263         elem = self.current_element
264         for art in elem.getiterator(tag='artist'):
265             yield str(art.findtext('name')), 100 * float(art.findtext('match'))
266
267     def get_toptracks(self, artist=None):
268         """
269         """
270         self._controls_artist(artist)
271         # Construct URL
272         url = SimaFM.root_url + SimaFM.request.get('top')
273         self._url = url % (urllib.parse.quote(self.artist_utf8, safe=''))
274         self._fetch()
275         # TODO: controls name encoding
276         elem = self.current_element
277         for track in elem.getiterator(tag='track'):
278             yield str(track.findtext('name')), int(track.attrib.get('rank'))
279
280     def get_similartracks(self, track=None, artist=None):
281         """
282         """
283         # Construct URL
284         url = SimaFM.root_url + SimaFM.request.get('track')
285         self._url = url % (urllib.parse.quote(artist.encode('UTF-8'), safe=''),
286                            urllib.parse.quote(track.encode('UTF-8'), safe=''))
287         self._fetch()
288         elem = self.current_element
289         for trk in elem.getiterator(tag='track'):
290             yield (str(trk.findtext('artist/name')),
291                    str(trk.findtext('name')),
292                    100 * float(trk.findtext('match')))
293
294     def get_mbid(self, artist=None):
295         """
296         """
297         self._controls_artist(artist)
298         # Construct URL
299         url = SimaFM.root_url + SimaFM.request.get('info')
300         self._url = url % (urllib.parse.quote(self.artist_utf8, safe=''))
301         self._fetch()
302         # TODO: controls name encoding
303         elem = self.current_element
304         return str(elem.find('artist').findtext('mbid'))
305
306
307 def run():
308     test = SimaFM()
309     for t, a, m in test.get_similartracks(artist='Nirvana', track='Smells Like Teen Spirit'):
310         print(a, t, m)
311     return
312
313 if __name__ == '__main__':
314     try:
315         run()
316     except XmlFMHTTPError as conn_err:
317         print("error trying to connect: %s" % conn_err)
318     except XmlFMNotFound as not_found:
319         print("looks like no artists were found: %s" % not_found)
320     except XmlFMError as err:
321         print(err)
322
323
324 # VIM MODLINE
325 # vim: ai ts=4 sw=4 sts=4 expandtab