1 # -*- coding: utf-8 -*-
3 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Jack Kaliko <kaliko@azylum.org>
4 # Copyright (c) 2010 Eric Casteleijn <thisfred@gmail.com> (Throttle decorator)
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 Consume last.fm web service
27 __author__ = 'Jack Kaliko'
30 import urllib.request, urllib.error, urllib.parse
32 from datetime import datetime, timedelta
33 from http.client import BadStatusLine
34 from socket import timeout as SocketTimeOut
35 from xml.etree.cElementTree import ElementTree
37 from request import get
40 from sima.utils.utils import getws, Throttle, Cache
41 if len(LFM.get('apikey')) == 43: # simple hack allowing imp.reload
45 WAIT_BETWEEN_REQUESTS = timedelta(0, 0.4)
48 class XmlFMError(Exception): # Errors
50 Exception raised for errors in the input.
53 def __init__(self, expression):
54 self.expression = expression
57 return repr(self.expression)
60 class EncodingError(XmlFMError):
61 """Raised when string is not unicode"""
65 class XmlFMHTTPError(XmlFMError):
66 """Raised when failed to connect server"""
68 def __init__(self, expression):
69 if hasattr(expression, 'code'):
70 self.expression = 'error %d: %s' % (expression.code,
73 self.expression = 'error: %s' % expression
76 class XmlFMNotFound(XmlFMError):
77 """Raised when no artist is found"""
79 def __init__(self, message=None):
81 message = 'Artist probably not found (http error 400)'
82 self.expression = (message)
85 class XmlFMMissingArtist(XmlFMError):
86 """Raised when no artist name provided"""
88 def __init__(self, message=None):
90 message = 'Missing artist name.'
91 self.expression = (message)
94 class XmlFMTimeOut(XmlFMError):
95 """Raised when urlopen times out"""
97 def __init__(self, message=None):
99 message = 'Connection to last.fm web services times out!'
100 self.expression = (message)
106 root_url = 'http://{host}/{version}/'.format(**LFM)
107 request = dict({'similar': '?method=artist.getsimilar&artist=%s&' +\
108 'api_key={apikey}'.format(**LFM),
109 'top': '?method=artist.gettoptracks&artist=%s&' +\
110 'api_key={apikey}'.format(**LFM),
111 'track': '?method=track.getsimilar&artist=%s' +\
112 '&track=%s' + 'api_key={apikey}'.format(**LFM),
113 'info': '?method=artist.getinfo&artist=%s' +\
114 'api_key={apikey}'.format(**LFM),
116 payloads = dict({'similar': {'method':'artist.getsimilar',
117 'artist':None, 'api_key':LFM.get('apikey'),},
118 'top': {'method':'artist.gettoptracks',
119 'artist':None, 'api_key':LFM.get('apikey'),},
120 'track': {'method':'track.getsimilar',
121 'artist':None, 'track':None,
122 'api_key':LFM.get('apikey'),},
123 'info': {'method':'artist.getinfo', 'artist':None,
124 'api_key':LFM.get('apikey'),},
127 timestamp = datetime.utcnow()
130 def __init__(self, artist=None, cache=True):
133 self.current_element = None
137 def _is_in_cache(self):
138 """Controls presence of url in cache.
140 if self._url in SimaFM.cache:
141 #print('already fetch {0}'.format(self.artist))
146 """Use cached elements or proceed http request"""
147 if self._is_in_cache():
148 self.current_element = SimaFM.cache.get(self._url).gettree()
152 @Throttle(WAIT_BETWEEN_REQUESTS)
156 @Throttle(WAIT_BETWEEN_REQUESTS)
157 def _fetch_lfm(self):
158 """Get artists, fetch xml from last.fm"""
160 fd = urllib.request.urlopen(url=self._url,
162 except SocketTimeOut:
164 except BadStatusLine as err:
165 raise XmlFMHTTPError(err)
166 except urllib.error.URLError as err:
167 if hasattr(err, 'reason'):
168 # URLError, failed to reach server
169 raise XmlFMError(repr(err.reason))
170 if hasattr(err, 'code'):
171 # HTTPError, the server couldn't fulfill the request
173 raise XmlFMNotFound()
174 raise XmlFMHTTPError(err)
175 raise XmlFMError(err)
176 headers = dict(fd.getheaders())
177 content_type = headers.get('Content-Type').split(';')
178 if content_type[0] != "text/xml":
179 raise XmlFMError('None XML returned from the server')
180 if content_type[1].strip() != "charset=utf-8":
181 raise XmlFMError('XML not UTF-8 encoded!')
183 self.current_element = ElementTree(file=fd)
184 except SocketTimeOut:
188 self._controls_lfm_answer()
190 SimaFM.cache[self._url] = Cache(self.current_element)
192 def _controls_lfm_answer(self):
193 """Controls last.fm answer.
195 status = self.current_element.getroot().attrib.get('status')
198 if status == 'failed':
199 error = self.current_element.find('error').attrib.get('code')
200 errormsg = self.current_element.findtext('error')
201 raise XmlFMNotFound(errormsg)
203 def _controls_artist(self, artist):
208 raise XmlFMMissingArtist('Missing artist name calling SimaFM.get_<method>()')
209 if not isinstance(self.artist, str):
210 raise EncodingError('"%s" not unicode object' % self.artist)
211 # last.fm is UTF-8 encoded URL
212 self.artist_utf8 = self.artist.encode('UTF-8')
214 def purge_cache(self, age=4):
215 now = datetime.utcnow()
216 if now.hour == SimaFM.timestamp.hour:
218 SimaFM.timestamp = datetime.utcnow()
220 delta = timedelta(hours=age)
221 for url in list(cache.keys()):
222 timestamp = cache.get(url).created()
223 if now - timestamp > delta:
226 def get_similar_ng(self, artist=None):
229 self._controls_artist(artist)
231 self._req = get(SimaFM.root_url, params=None, timeout=5)
233 if self._is_in_cache():
234 self.current_element = SimaFM.cache.get(self._url).gettree()
237 elem = self.current_element
238 for art in elem.getiterator(tag='artist'):
239 yield str(art.findtext('name')), 100 * float(art.findtext('match'))
241 def get_similar(self, artist=None):
244 self._controls_artist(artist)
246 url = SimaFM.root_url + SimaFM.request.get('similar')
247 self._url = url % (urllib.parse.quote(self.artist_utf8, safe=''))
249 # TODO: controls name encoding
250 elem = self.current_element
251 for art in elem.getiterator(tag='artist'):
252 yield str(art.findtext('name')), 100 * float(art.findtext('match'))
254 def get_toptracks(self, artist=None):
257 self._controls_artist(artist)
259 url = SimaFM.root_url + SimaFM.request.get('top')
260 self._url = url % (urllib.parse.quote(self.artist_utf8, safe=''))
262 # TODO: controls name encoding
263 elem = self.current_element
264 for track in elem.getiterator(tag='track'):
265 yield str(track.findtext('name')), int(track.attrib.get('rank'))
267 def get_similartracks(self, track=None, artist=None):
271 url = SimaFM.root_url + SimaFM.request.get('track')
272 self._url = url % (urllib.parse.quote(artist.encode('UTF-8'), safe=''),
273 urllib.parse.quote(track.encode('UTF-8'), safe=''))
275 elem = self.current_element
276 for trk in elem.getiterator(tag='track'):
277 yield (str(trk.findtext('artist/name')),
278 str(trk.findtext('name')),
279 100 * float(trk.findtext('match')))
281 def get_mbid(self, artist=None):
284 self._controls_artist(artist)
286 url = SimaFM.root_url + SimaFM.request.get('info')
287 self._url = url % (urllib.parse.quote(self.artist_utf8, safe=''))
289 # TODO: controls name encoding
290 elem = self.current_element
291 return str(elem.find('artist').findtext('mbid'))
296 for t, a, m in test.get_similartracks(artist='Nirvana', track='Smells Like Teen Spirit'):
300 if __name__ == '__main__':
303 except XmlFMHTTPError as conn_err:
304 print("error trying to connect: %s" % conn_err)
305 except XmlFMNotFound as not_found:
306 print("looks like no artists were found: %s" % not_found)
307 except XmlFMError as err:
312 # vim: ai ts=4 sw=4 sts=4 expandtab