# -*- coding: utf-8 -*-
-# Copyright (c) 2009, 2010, 2011, 2012, 2013 Jack Kaliko <kaliko@azylum.org>
-# Copyright (c) 2010 Eric Casteleijn <thisfred@gmail.com> (Throttle decorator)
+# Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Jack Kaliko <kaliko@azylum.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
#
"""
-Consume last.fm web service
-
+Consume Last.fm web service
"""
-__version__ = '0.4.0'
+__version__ = '0.5.1'
__author__ = 'Jack Kaliko'
-import urllib.request, urllib.error, urllib.parse
-from datetime import datetime, timedelta
-from http.client import BadStatusLine
-from socket import timeout as SocketTimeOut
-from xml.etree.cElementTree import ElementTree
+from requests import Session, Request, Timeout, ConnectionError
-from request import get
+from sima import LFM, SOCKET_TIMEOUT, WAIT_BETWEEN_REQUESTS
+from sima.lib.meta import Artist
+from sima.lib.track import Track
-from sima import LFM
-from sima.utils.utils import getws, Throttle, Cache
+from sima.lib.http import CacheController
+from sima.utils.utils import WSError, WSNotFound, WSTimeout, WSHTTPError
+from sima.utils.utils import getws, Throttle
if len(LFM.get('apikey')) == 43: # simple hack allowing imp.reload
getws(LFM)
-# Some definitions
-WAIT_BETWEEN_REQUESTS = timedelta(0, 0.4)
-
-class XmlFMError(Exception): # Errors
- """
- Exception raised for errors in the input.
- """
-
- def __init__(self, expression):
- self.expression = expression
-
- def __str__(self):
- return repr(self.expression)
-
-
-class EncodingError(XmlFMError):
- """Raised when string is not unicode"""
- pass
-
-
-class XmlFMHTTPError(XmlFMError):
- """Raised when failed to connect server"""
-
- def __init__(self, expression):
- if hasattr(expression, 'code'):
- self.expression = 'error %d: %s' % (expression.code,
- expression.msg)
- else:
- self.expression = 'error: %s' % expression
-
-
-class XmlFMNotFound(XmlFMError):
- """Raised when no artist is found"""
-
- def __init__(self, message=None):
- if not message:
- message = 'Artist probably not found (http error 400)'
- self.expression = (message)
-
-
-class XmlFMMissingArtist(XmlFMError):
- """Raised when no artist name provided"""
-
- def __init__(self, message=None):
- if not message:
- message = 'Missing artist name.'
- self.expression = (message)
-
-
-class XmlFMTimeOut(XmlFMError):
- """Raised when urlopen times out"""
-
- def __init__(self, message=None):
- if not message:
- message = 'Connection to last.fm web services times out!'
- self.expression = (message)
-
-
-class SimaFM():
- """
+class SimaFM:
+ """Last.fm http client
"""
root_url = 'http://{host}/{version}/'.format(**LFM)
- request = dict({'similar': '?method=artist.getsimilar&artist=%s&' +\
- 'api_key={apikey}'.format(**LFM),
- 'top': '?method=artist.gettoptracks&artist=%s&' +\
- 'api_key={apikey}'.format(**LFM),
- 'track': '?method=track.getsimilar&artist=%s' +\
- '&track=%s' + 'api_key={apikey}'.format(**LFM),
- 'info': '?method=artist.getinfo&artist=%s' +\
- 'api_key={apikey}'.format(**LFM),
- })
- payloads = dict({'similar': {'method':'artist.getsimilar',
- 'artist':None, 'api_key':LFM.get('apikey'),},
- 'top': {'method':'artist.gettoptracks',
- 'artist':None, 'api_key':LFM.get('apikey'),},
- 'track': {'method':'track.getsimilar',
- 'artist':None, 'track':None,
- 'api_key':LFM.get('apikey'),},
- 'info': {'method':'artist.getinfo', 'artist':None,
- 'api_key':LFM.get('apikey'),},
- })
- cache = dict({})
- timestamp = datetime.utcnow()
- count = 0
-
- def __init__(self, artist=None, cache=True):
- self._url = None
- #SimaFM.count += 1
- self.current_element = None
- self.caching = cache
- self.purge_cache()
-
- def _is_in_cache(self):
- """Controls presence of url in cache.
+ ratelimit = None
+ name = 'Last.fm'
+ cache = False
+ stats = {'etag':0,
+ 'ccontrol':0,
+ 'total':0}
+
+ def __init__(self):
+ self.controller = CacheController(self.cache)
+ self.artist = None
+
+ def _fetch(self, payload):
"""
- if self._url in SimaFM.cache:
- #print('already fetch {0}'.format(self.artist))
- return True
- return False
-
- def _fetch(self):
- """Use cached elements or proceed http request"""
- if self._is_in_cache():
- self.current_element = SimaFM.cache.get(self._url).gettree()
- return
- self._fetch_lfm()
-
- @Throttle(WAIT_BETWEEN_REQUESTS)
- def _fetch_ws(self):
- pass
-
- @Throttle(WAIT_BETWEEN_REQUESTS)
- def _fetch_lfm(self):
- """Get artists, fetch xml from last.fm"""
- try:
- fd = urllib.request.urlopen(url=self._url,
- timeout=15)
- except SocketTimeOut:
- raise XmlFMTimeOut()
- except BadStatusLine as err:
- raise XmlFMHTTPError(err)
- except urllib.error.URLError as err:
- if hasattr(err, 'reason'):
- # URLError, failed to reach server
- raise XmlFMError(repr(err.reason))
- if hasattr(err, 'code'):
- # HTTPError, the server couldn't fulfill the request
- if err.code == 400:
- raise XmlFMNotFound()
- raise XmlFMHTTPError(err)
- raise XmlFMError(err)
- headers = dict(fd.getheaders())
- content_type = headers.get('Content-Type').split(';')
- if content_type[0] != "text/xml":
- raise XmlFMError('None XML returned from the server')
- if content_type[1].strip() != "charset=utf-8":
- raise XmlFMError('XML not UTF-8 encoded!')
- try:
- self.current_element = ElementTree(file=fd)
- except SocketTimeOut:
- raise XmlFMTimeOut()
- finally:
- fd.close()
- self._controls_lfm_answer()
- if self.caching:
- SimaFM.cache[self._url] = Cache(self.current_element)
-
- def _controls_lfm_answer(self):
- """Controls last.fm answer.
+ Prepare http request
+ Use cached elements or proceed http request
"""
- status = self.current_element.getroot().attrib.get('status')
- if status == 'ok':
- return True
- if status == 'failed':
- error = self.current_element.find('error').attrib.get('code')
- errormsg = self.current_element.findtext('error')
- raise XmlFMNotFound(errormsg)
+ req = Request('GET', SimaFM.root_url, params=payload,
+ ).prepare()
+ SimaFM.stats.update(total=SimaFM.stats.get('total')+1)
+ if self.cache:
+ cached_response = self.controller.cached_request(req.url, req.headers)
+ if cached_response:
+ SimaFM.stats.update(ccontrol=SimaFM.stats.get('ccontrol')+1)
+ return cached_response.json()
+ try:
+ return self._fetch_ws(req)
+ except Timeout:
+ raise WSTimeout('Failed to reach server within {0}s'.format(
+ SOCKET_TIMEOUT))
+ except ConnectionError as err:
+ raise WSError(err)
- def _controls_artist(self, artist):
+ @Throttle(WAIT_BETWEEN_REQUESTS)
+ def _fetch_ws(self, prepreq):
+ """fetch from web service"""
+ sess = Session()
+ resp = sess.send(prepreq, timeout=SOCKET_TIMEOUT)
+ if resp.status_code == 304:
+ SimaFM.stats.update(etag=SimaFM.stats.get('etag')+1)
+ resp = self.controller.update_cached_response(prepreq, resp)
+ elif resp.status_code != 200:
+ raise WSHTTPError('{0.status_code}: {0.reason}'.format(resp))
+ ans = resp.json()
+ self._controls_answer(ans)
+ if self.cache:
+ self.controller.cache_response(resp.request, resp)
+ return ans
+
+ def _controls_answer(self, ans):
+ """Controls answer.
"""
+ if 'error' in ans:
+ code = ans.get('error')
+ mess = ans.get('message')
+ if code == 6:
+ raise WSNotFound('{0}: "{1}"'.format(mess, self.artist))
+ raise WSError(mess)
+ return True
+
+ def _forge_payload(self, artist, method='similar', track=None):
+ """Build payload
"""
+ payloads = dict({'similar': {'method':'artist.getsimilar',},
+ 'top': {'method':'artist.gettoptracks',},
+ 'track': {'method':'track.getsimilar',},
+ 'info': {'method':'artist.getinfo',},
+ })
+ payload = payloads.get(method)
+ payload.update(api_key=LFM.get('apikey'), format='json')
+ if not isinstance(artist, Artist):
+ raise TypeError('"{0!r}" not an Artist object'.format(artist))
self.artist = artist
- if not self.artist:
- raise XmlFMMissingArtist('Missing artist name calling SimaFM.get_<method>()')
- if not isinstance(self.artist, str):
- raise EncodingError('"%s" not unicode object' % self.artist)
- # last.fm is UTF-8 encoded URL
- self.artist_utf8 = self.artist.encode('UTF-8')
-
- def purge_cache(self, age=4):
- now = datetime.utcnow()
- if now.hour == SimaFM.timestamp.hour:
- return
- SimaFM.timestamp = datetime.utcnow()
- cache = SimaFM.cache
- delta = timedelta(hours=age)
- for url in list(cache.keys()):
- timestamp = cache.get(url).created()
- if now - timestamp > delta:
- cache.pop(url)
-
- def get_similar_ng(self, artist=None):
- """
- """
- self._controls_artist(artist)
- # Construct URL
- self._req = get(SimaFM.root_url, params=None, timeout=5)
- self._url = req.url
- if self._is_in_cache():
- self.current_element = SimaFM.cache.get(self._url).gettree()
+ if artist.mbid:
+ payload.update(mbid='{0}'.format(artist.mbid))
else:
- self._fetch_ws()
- elem = self.current_element
- for art in elem.getiterator(tag='artist'):
- yield str(art.findtext('name')), 100 * float(art.findtext('match'))
+ payload.update(artist=artist.name,
+ autocorrect=1)
+ payload.update(results=100)
+ if method == 'track':
+ payload.update(track=track)
+ # > hashing the URL into a cache key
+ # return a sorted list of 2-tuple to have consistent cache
+ return sorted(payload.items(), key=lambda param: param[0])
def get_similar(self, artist=None):
+ """Fetch similar artists
"""
- """
- self._controls_artist(artist)
- # Construct URL
- url = SimaFM.root_url + SimaFM.request.get('similar')
- self._url = url % (urllib.parse.quote(self.artist_utf8, safe=''))
- self._fetch()
- # TODO: controls name encoding
- elem = self.current_element
- for art in elem.getiterator(tag='artist'):
- yield str(art.findtext('name')), 100 * float(art.findtext('match'))
-
- def get_toptracks(self, artist=None):
- """
- """
- self._controls_artist(artist)
- # Construct URL
- url = SimaFM.root_url + SimaFM.request.get('top')
- self._url = url % (urllib.parse.quote(self.artist_utf8, safe=''))
- self._fetch()
- # TODO: controls name encoding
- elem = self.current_element
- for track in elem.getiterator(tag='track'):
- yield str(track.findtext('name')), int(track.attrib.get('rank'))
-
- def get_similartracks(self, track=None, artist=None):
- """
- """
+ payload = self._forge_payload(artist)
# Construct URL
- url = SimaFM.root_url + SimaFM.request.get('track')
- self._url = url % (urllib.parse.quote(artist.encode('UTF-8'), safe=''),
- urllib.parse.quote(track.encode('UTF-8'), safe=''))
- self._fetch()
- elem = self.current_element
- for trk in elem.getiterator(tag='track'):
- yield (str(trk.findtext('artist/name')),
- str(trk.findtext('name')),
- 100 * float(trk.findtext('match')))
-
- def get_mbid(self, artist=None):
- """
+ ans = self._fetch(payload)
+ # Artist might be found be return no 'artist' list…
+ # cf. "Mulatu Astatqe" vs. "Mulatu Astatqé" with autocorrect=0
+ # json format is broken IMHO, xml is more consistent IIRC
+ # Here what we got:
+ # >>> {"similarartists":{"#text":"\n","artist":"Mulatu Astatqe"}}
+ # autocorrect=1 should fix it, checking anyway.
+ simarts = ans.get('similarartists').get('artist')
+ if not isinstance(simarts, list):
+ raise WSError('Artist found but no similarities returned')
+ for art in ans.get('similarartists').get('artist'):
+ yield Artist(name=art.get('name'), mbid=art.get('mbid', None))
+
+ def get_toptrack(self, artist=None):
+ """Fetch artist top tracks
"""
- self._controls_artist(artist)
- # Construct URL
- url = SimaFM.root_url + SimaFM.request.get('info')
- self._url = url % (urllib.parse.quote(self.artist_utf8, safe=''))
- self._fetch()
- # TODO: controls name encoding
- elem = self.current_element
- return str(elem.find('artist').findtext('mbid'))
-
-
-def run():
- test = SimaFM()
- for t, a, m in test.get_similartracks(artist='Nirvana', track='Smells Like Teen Spirit'):
- print(a, t, m)
- return
-
-if __name__ == '__main__':
- try:
- run()
- except XmlFMHTTPError as conn_err:
- print("error trying to connect: %s" % conn_err)
- except XmlFMNotFound as not_found:
- print("looks like no artists were found: %s" % not_found)
- except XmlFMError as err:
- print(err)
-
+ payload = self._forge_payload(artist, method='top')
+ ans = self._fetch(payload)
+ tops = ans.get('toptracks').get('track')
+ art = {
+ 'artist': artist.name,
+ 'musicbrainz_artistid': artist.mbid,
+ }
+ for song in tops:
+ for key in ['artist', 'streamable', 'listeners',
+ 'url', 'image', '@attr']:
+ if key in song:
+ song.pop(key)
+ song.update(art)
+ song.update(title=song.pop('name'))
+ song.update(time=song.pop('duration'))
+ yield Track(**song)
# VIM MODLINE
# vim: ai ts=4 sw=4 sts=4 expandtab