1 # -*- coding: utf-8 -*-
3 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Jack Kaliko <kaliko@azylum.org>
4 # Copyright (c) 2010 Eric Casteleijn <thisfred@gmail.com> (Throttle decorator)
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 Consume last.fm web service
27 __author__ = 'Jack Kaliko'
30 import urllib.request, urllib.error, urllib.parse
32 from datetime import datetime, timedelta
33 from http.client import BadStatusLine
34 from socket import timeout as SocketTimeOut
35 from time import sleep
36 from xml.etree.cElementTree import ElementTree
39 WAIT_BETWEEN_REQUESTS = timedelta(0, 0.4)
40 LFM_ERRORS = dict({'2': 'Invalid service -This service does not exist',
41 '3': 'Invalid Method - No method with that name in this package',
42 '4': 'Authentication Failed - You do not have permissions to access the service',
43 '5': "'Invalid format - This service doesn't exist in that format",
44 '6': 'Invalid parameters - Your request is missing a required parameter',
45 '7': 'Invalid resource specified',
46 '9': 'Invalid session key - Please re-authenticate',
47 '10': 'Invalid API key - You must be granted a valid key by last.fm',
48 '11': 'Service Offline - This service is temporarily offline. Try again later.',
49 '12': 'Subscription Error - The user needs to be subscribed in order to do that',
50 '13': 'Invalid method signature supplied',
51 '26': 'Suspended API key - Access for your account has been suspended, please contact Last.fm',
55 class XmlFMError(Exception): # Errors
57 Exception raised for errors in the input.
60 def __init__(self, expression):
61 self.expression = expression
64 return repr(self.expression)
67 class EncodingError(XmlFMError):
68 """Raised when string is not unicode"""
72 class XmlFMHTTPError(XmlFMError):
73 """Raised when failed to connect server"""
75 def __init__(self, expression):
76 if hasattr(expression, 'code'):
77 self.expression = 'error %d: %s' % (expression.code,
80 self.expression = 'error: %s' % expression
83 class XmlFMNotFound(XmlFMError):
84 """Raised when no artist is found"""
86 def __init__(self, message=None):
88 message = 'Artist probably not found (http error 400)'
89 self.expression = (message)
92 class XmlFMMissingArtist(XmlFMError):
93 """Raised when no artist name provided"""
95 def __init__(self, message=None):
97 message = 'Missing artist name.'
98 self.expression = (message)
101 class XmlFMTimeOut(XmlFMError):
102 """Raised when urlopen times out"""
104 def __init__(self, message=None):
106 message = 'Connection to last.fm web services times out!'
107 self.expression = (message)
111 def __init__(self, wait):
113 self.last_called = datetime.now()
115 def __call__(self, func):
116 def wrapper(*args, **kwargs):
117 while self.last_called + self.wait > datetime.now():
120 result = func(*args, **kwargs)
121 self.last_called = datetime.now()
126 class AudioScrobblerCache():
127 def __init__(self, elem, last):
129 self.requestdate = last
132 return self.requestdate
141 api_key = '4a1c9ddec29816ed803d7be9113ba4cb'
142 host = 'ws.audioscrobbler.com'
144 root_url = 'http://%s/%s/' % (host, version)
145 request = dict({'similar': '?method=artist.getsimilar&artist=%s&' +\
146 'api_key=%s' % api_key,
147 'top': '?method=artist.gettoptracks&artist=%s&' +\
148 'api_key=%s' % api_key,
149 'track': '?method=track.getsimilar&artist=%s' +\
150 '&track=%s' + '&api_key=%s' % api_key,
151 'info': '?method=artist.getinfo&artist=%s' +\
152 '&api_key=%s' % api_key,
155 timestamp = datetime.utcnow()
158 def __init__(self, artist=None, cache=True):
161 self.current_element = None
165 def _is_in_cache(self):
166 """Controls presence of url in cache.
168 if self._url in SimaFM.cache:
169 #print('already fetch {0}'.format(self.artist))
174 """Use cached elements or proceed http request"""
175 if self._is_in_cache():
176 self.current_element = SimaFM.cache.get(self._url).gettree()
180 @Throttle(WAIT_BETWEEN_REQUESTS)
181 def _fetch_lfm(self):
182 """Get artists, fetch xml from last.fm"""
184 fd = urllib.request.urlopen(url=self._url,
186 except SocketTimeOut:
188 except BadStatusLine as err:
189 raise XmlFMHTTPError(err)
190 except urllib.error.URLError as err:
191 if hasattr(err, 'reason'):
192 # URLError, failed to reach server
193 raise XmlFMError(repr(err.reason))
194 if hasattr(err, 'code'):
195 # HTTPError, the server couldn't fulfill the request
197 raise XmlFMNotFound()
198 raise XmlFMHTTPError(err)
199 raise XmlFMError(err)
200 headers = dict(fd.getheaders())
201 content_type = headers.get('Content-Type').split(';')
202 if content_type[0] != "text/xml":
203 raise XmlFMError('None XML returned from the server')
204 if content_type[1].strip() != "charset=utf-8":
205 raise XmlFMError('XML not UTF-8 encoded!')
207 self.current_element = ElementTree(file=fd)
208 except SocketTimeOut:
212 self._controls_lfm_answer()
214 SimaFM.cache[self._url] = AudioScrobblerCache(self.current_element,
217 def _controls_lfm_answer(self):
218 """Controls last.fm answer.
220 status = self.current_element.getroot().attrib.get('status')
223 if status == 'failed':
224 error = self.current_element.find('error').attrib.get('code')
225 errormsg = self.current_element.findtext('error')
226 #if error in LFM_ERRORS.keys():
227 # print LFM_ERRORS.get(error)
228 raise XmlFMNotFound(errormsg)
230 def _controls_artist(self, artist):
235 raise XmlFMMissingArtist('Missing artist name calling SimaFM.get_<method>()')
236 if not isinstance(self.artist, str):
237 raise EncodingError('"%s" not unicode object' % self.artist)
238 # last.fm is UTF-8 encoded URL
239 self.artist_utf8 = self.artist.encode('UTF-8')
241 def purge_cache(self, age=4):
242 now = datetime.utcnow()
243 if now.hour == SimaFM.timestamp.hour:
245 SimaFM.timestamp = datetime.utcnow()
247 delta = timedelta(hours=age)
248 for url in list(cache.keys()):
249 timestamp = cache.get(url).created()
250 if now - timestamp > delta:
253 def get_similar(self, artist=None):
256 self._controls_artist(artist)
258 url = SimaFM.root_url + SimaFM.request.get('similar')
259 self._url = url % (urllib.parse.quote(self.artist_utf8, safe=''))
261 # TODO: controls name encoding
262 elem = self.current_element
263 for art in elem.getiterator(tag='artist'):
264 yield str(art.findtext('name')), 100 * float(art.findtext('match'))
266 def get_toptracks(self, artist=None):
269 self._controls_artist(artist)
271 url = SimaFM.root_url + SimaFM.request.get('top')
272 self._url = url % (urllib.parse.quote(self.artist_utf8, safe=''))
274 # TODO: controls name encoding
275 elem = self.current_element
276 for track in elem.getiterator(tag='track'):
277 yield str(track.findtext('name')), int(track.attrib.get('rank'))
279 def get_mbid(self, artist=None):
282 self._controls_artist(artist)
284 url = SimaFM.root_url + SimaFM.request.get('info')
285 self._url = url % (urllib.parse.quote(self.artist_utf8, safe=''))
287 # TODO: controls name encoding
288 elem = self.current_element
289 return str(elem.find('artist').findtext('mbid'))
294 for a, m in test.get_similar(artist='Tool'):
298 if __name__ == '__main__':
301 except XmlFMHTTPError as conn_err:
302 print("error trying to connect: %s" % conn_err)
303 except XmlFMNotFound as not_found:
304 print("looks like no artists were found: %s" % not_found)
305 except XmlFMError as err:
310 # vim: ai ts=4 sw=4 sts=4 expandtab