1 # -*- coding: utf-8 -*-
3 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Jack Kaliko <kaliko@azylum.org>
4 # Copyright (c) 2010 Eric Casteleijn <thisfred@gmail.com> (Throttle decorator)
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 Consume last.fm web service
27 __author__ = 'Jack Kaliko'
30 import urllib.request, urllib.error, urllib.parse
32 from datetime import datetime, timedelta
33 from http.client import BadStatusLine
34 from socket import timeout as SocketTimeOut
35 from time import sleep
36 from xml.etree.cElementTree import ElementTree
39 from sima.utils.utils import getws
43 WAIT_BETWEEN_REQUESTS = timedelta(0, 0.4)
44 LFM_ERRORS = dict({'2': 'Invalid service -This service does not exist',
45 '3': 'Invalid Method - No method with that name in this package',
46 '4': 'Authentication Failed - You do not have permissions to access the service',
47 '5': "'Invalid format - This service doesn't exist in that format",
48 '6': 'Invalid parameters - Your request is missing a required parameter',
49 '7': 'Invalid resource specified',
50 '9': 'Invalid session key - Please re-authenticate',
51 '10': 'Invalid API key - You must be granted a valid key by last.fm',
52 '11': 'Service Offline - This service is temporarily offline. Try again later.',
53 '12': 'Subscription Error - The user needs to be subscribed in order to do that',
54 '13': 'Invalid method signature supplied',
55 '26': 'Suspended API key - Access for your account has been suspended, please contact Last.fm',
59 class XmlFMError(Exception): # Errors
61 Exception raised for errors in the input.
64 def __init__(self, expression):
65 self.expression = expression
68 return repr(self.expression)
71 class EncodingError(XmlFMError):
72 """Raised when string is not unicode"""
76 class XmlFMHTTPError(XmlFMError):
77 """Raised when failed to connect server"""
79 def __init__(self, expression):
80 if hasattr(expression, 'code'):
81 self.expression = 'error %d: %s' % (expression.code,
84 self.expression = 'error: %s' % expression
87 class XmlFMNotFound(XmlFMError):
88 """Raised when no artist is found"""
90 def __init__(self, message=None):
92 message = 'Artist probably not found (http error 400)'
93 self.expression = (message)
96 class XmlFMMissingArtist(XmlFMError):
97 """Raised when no artist name provided"""
99 def __init__(self, message=None):
101 message = 'Missing artist name.'
102 self.expression = (message)
105 class XmlFMTimeOut(XmlFMError):
106 """Raised when urlopen times out"""
108 def __init__(self, message=None):
110 message = 'Connection to last.fm web services times out!'
111 self.expression = (message)
115 def __init__(self, wait):
117 self.last_called = datetime.now()
119 def __call__(self, func):
120 def wrapper(*args, **kwargs):
121 while self.last_called + self.wait > datetime.now():
124 result = func(*args, **kwargs)
125 self.last_called = datetime.now()
130 class AudioScrobblerCache():
131 def __init__(self, elem, last):
133 self.requestdate = last
136 return self.requestdate
145 root_url = 'http://{host}/{version}/'.format(**LFM)
146 request = dict({'similar': '?method=artist.getsimilar&artist=%s&' +\
147 'api_key={apikey}'.format(**LFM),
148 'top': '?method=artist.gettoptracks&artist=%s&' +\
149 'api_key={apikey}'.format(**LFM),
150 'track': '?method=track.getsimilar&artist=%s' +\
151 '&track=%s' + 'api_key={apikey}'.format(**LFM),
152 'info': '?method=artist.getinfo&artist=%s' +\
153 'api_key={apikey}'.format(**LFM),
156 timestamp = datetime.utcnow()
159 def __init__(self, artist=None, cache=True):
162 self.current_element = None
166 def _is_in_cache(self):
167 """Controls presence of url in cache.
169 if self._url in SimaFM.cache:
170 #print('already fetch {0}'.format(self.artist))
175 """Use cached elements or proceed http request"""
176 if self._is_in_cache():
177 self.current_element = SimaFM.cache.get(self._url).gettree()
181 @Throttle(WAIT_BETWEEN_REQUESTS)
182 def _fetch_lfm(self):
183 """Get artists, fetch xml from last.fm"""
185 fd = urllib.request.urlopen(url=self._url,
187 except SocketTimeOut:
189 except BadStatusLine as err:
190 raise XmlFMHTTPError(err)
191 except urllib.error.URLError as err:
192 if hasattr(err, 'reason'):
193 # URLError, failed to reach server
194 raise XmlFMError(repr(err.reason))
195 if hasattr(err, 'code'):
196 # HTTPError, the server couldn't fulfill the request
198 raise XmlFMNotFound()
199 raise XmlFMHTTPError(err)
200 raise XmlFMError(err)
201 headers = dict(fd.getheaders())
202 content_type = headers.get('Content-Type').split(';')
203 if content_type[0] != "text/xml":
204 raise XmlFMError('None XML returned from the server')
205 if content_type[1].strip() != "charset=utf-8":
206 raise XmlFMError('XML not UTF-8 encoded!')
208 self.current_element = ElementTree(file=fd)
209 except SocketTimeOut:
213 self._controls_lfm_answer()
215 SimaFM.cache[self._url] = AudioScrobblerCache(self.current_element,
218 def _controls_lfm_answer(self):
219 """Controls last.fm answer.
221 status = self.current_element.getroot().attrib.get('status')
224 if status == 'failed':
225 error = self.current_element.find('error').attrib.get('code')
226 errormsg = self.current_element.findtext('error')
227 #if error in LFM_ERRORS.keys():
228 # print LFM_ERRORS.get(error)
229 raise XmlFMNotFound(errormsg)
231 def _controls_artist(self, artist):
236 raise XmlFMMissingArtist('Missing artist name calling SimaFM.get_<method>()')
237 if not isinstance(self.artist, str):
238 raise EncodingError('"%s" not unicode object' % self.artist)
239 # last.fm is UTF-8 encoded URL
240 self.artist_utf8 = self.artist.encode('UTF-8')
242 def purge_cache(self, age=4):
243 now = datetime.utcnow()
244 if now.hour == SimaFM.timestamp.hour:
246 SimaFM.timestamp = datetime.utcnow()
248 delta = timedelta(hours=age)
249 for url in list(cache.keys()):
250 timestamp = cache.get(url).created()
251 if now - timestamp > delta:
254 def get_similar(self, artist=None):
257 self._controls_artist(artist)
259 url = SimaFM.root_url + SimaFM.request.get('similar')
260 self._url = url % (urllib.parse.quote(self.artist_utf8, safe=''))
262 # TODO: controls name encoding
263 elem = self.current_element
264 for art in elem.getiterator(tag='artist'):
265 yield str(art.findtext('name')), 100 * float(art.findtext('match'))
267 def get_toptracks(self, artist=None):
270 self._controls_artist(artist)
272 url = SimaFM.root_url + SimaFM.request.get('top')
273 self._url = url % (urllib.parse.quote(self.artist_utf8, safe=''))
275 # TODO: controls name encoding
276 elem = self.current_element
277 for track in elem.getiterator(tag='track'):
278 yield str(track.findtext('name')), int(track.attrib.get('rank'))
280 def get_similartracks(self, track=None, artist=None):
284 url = SimaFM.root_url + SimaFM.request.get('track')
285 self._url = url % (urllib.parse.quote(artist.encode('UTF-8'), safe=''),
286 urllib.parse.quote(track.encode('UTF-8'), safe=''))
288 elem = self.current_element
289 for trk in elem.getiterator(tag='track'):
290 yield (str(trk.findtext('artist/name')),
291 str(trk.findtext('name')),
292 100 * float(trk.findtext('match')))
294 def get_mbid(self, artist=None):
297 self._controls_artist(artist)
299 url = SimaFM.root_url + SimaFM.request.get('info')
300 self._url = url % (urllib.parse.quote(self.artist_utf8, safe=''))
302 # TODO: controls name encoding
303 elem = self.current_element
304 return str(elem.find('artist').findtext('mbid'))
309 for t, a, m in test.get_similartracks(artist='Nirvana', track='Smells Like Teen Spirit'):
313 if __name__ == '__main__':
316 except XmlFMHTTPError as conn_err:
317 print("error trying to connect: %s" % conn_err)
318 except XmlFMNotFound as not_found:
319 print("looks like no artists were found: %s" % not_found)
320 except XmlFMError as err:
325 # vim: ai ts=4 sw=4 sts=4 expandtab