X-Git-Url: https://git.kaliko.me/?a=blobdiff_plain;f=sima%2Flib%2Fsimafm.py;h=5103aceac9455a9ac5d8a39e8fc10f5812151b3f;hb=a260ebea93f23d72aa6e0178744b0f64c469b7ba;hp=12ca0d5c4a1547bd50f8a7d3c345f02dd08da4fe;hpb=9a37ebf816f88bbf94f45afa4dcb8a40b8b5c369;p=mpd-sima.git diff --git a/sima/lib/simafm.py b/sima/lib/simafm.py index 12ca0d5..5103ace 100644 --- a/sima/lib/simafm.py +++ b/sima/lib/simafm.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright (c) 2009, 2010, 2011, 2012, 2013 Jack Kaliko -# Copyright (c) 2010 Eric Casteleijn (Throttle decorator) +# Copyright (c) 2009-2014, 2021 kaliko # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -19,306 +18,126 @@ # """ -Consume last.fm web service - +Consume Last.fm web service """ -__version__ = '0.3.0' +__version__ = '0.5.1' __author__ = 'Jack Kaliko' -import urllib.request, urllib.error, urllib.parse - -from datetime import datetime, timedelta -from http.client import BadStatusLine -from socket import timeout as SocketTimeOut -from time import sleep -from xml.etree.cElementTree import ElementTree +from sima import LFM +from sima.lib.meta import Artist +from sima.lib.track import Track -# Some definitions -WAIT_BETWEEN_REQUESTS = timedelta(0, 0.4) -LFM_ERRORS = dict({'2': 'Invalid service -This service does not exist', - '3': 'Invalid Method - No method with that name in this package', - '4': 'Authentication Failed - You do not have permissions to access the service', - '5': "'Invalid format - This service doesn't exist in that format", - '6': 'Invalid parameters - Your request is missing a required parameter', - '7': 'Invalid resource specified', - '9': 'Invalid session key - Please re-authenticate', - '10': 'Invalid API key - You must be granted a valid key by last.fm', - '11': 'Service Offline - This service is temporarily offline. Try again later.', - '12': 'Subscription Error - The user needs to be subscribed in order to do that', - '13': 'Invalid method signature supplied', - '26': 'Suspended API key - Access for your account has been suspended, please contact Last.fm', - }) +from sima.lib.http import HttpClient +from sima.utils.utils import WSError, WSNotFound +from sima.utils.utils import getws +if len(LFM.get('apikey')) == 43: # simple hack allowing imp.reload + getws(LFM) -class XmlFMError(Exception): # Errors +class SimaFM: + """Last.fm http client """ - Exception raised for errors in the input. - """ - - def __init__(self, expression): - self.expression = expression - - def __str__(self): - return repr(self.expression) - - -class EncodingError(XmlFMError): - """Raised when string is not unicode""" - pass - - -class XmlFMHTTPError(XmlFMError): - """Raised when failed to connect server""" - - def __init__(self, expression): - if hasattr(expression, 'code'): - self.expression = 'error %d: %s' % (expression.code, - expression.msg) - else: - self.expression = 'error: %s' % expression - - -class XmlFMNotFound(XmlFMError): - """Raised when no artist is found""" - - def __init__(self, message=None): - if not message: - message = 'Artist probably not found (http error 400)' - self.expression = (message) - + root_url = 'http://{host}/{version}/'.format(**LFM) + name = 'Last.fm' + cache = False + """HTTP cache to use, in memory or persitent. -class XmlFMMissingArtist(XmlFMError): - """Raised when no artist name provided""" - - def __init__(self, message=None): - if not message: - message = 'Missing artist name.' - self.expression = (message) - - -class XmlFMTimeOut(XmlFMError): - """Raised when urlopen times out""" - - def __init__(self, message=None): - if not message: - message = 'Connection to last.fm web services times out!' - self.expression = (message) - - -class Throttle(): - def __init__(self, wait): - self.wait = wait - self.last_called = datetime.now() - - def __call__(self, func): - def wrapper(*args, **kwargs): - while self.last_called + self.wait > datetime.now(): - #print('waiting…') - sleep(0.1) - result = func(*args, **kwargs) - self.last_called = datetime.now() - return result - return wrapper - - -class AudioScrobblerCache(): - def __init__(self, elem, last): - self.elemtree = elem - self.requestdate = last - - def created(self): - return self.requestdate - - def gettree(self): - return self.elemtree - - -class SimaFM(): - """ + :param BaseCache cache: Set a cache, defaults to `False`. """ - api_key = '4a1c9ddec29816ed803d7be9113ba4cb' - host = 'ws.audioscrobbler.com' - version = '2.0' - root_url = 'http://%s/%s/' % (host, version) - request = dict({'similar': '?method=artist.getsimilar&artist=%s&' +\ - 'api_key=%s' % api_key, - 'top': '?method=artist.gettoptracks&artist=%s&' +\ - 'api_key=%s' % api_key, - 'track': '?method=track.getsimilar&artist=%s' +\ - '&track=%s' + '&api_key=%s' % api_key, - 'info': '?method=artist.getinfo&artist=%s' +\ - '&api_key=%s' % api_key, - }) - cache = dict({}) - timestamp = datetime.utcnow() - count = 0 + stats = {'etag': 0, + 'ccontrol': 0, + 'total': 0} - def __init__(self, artist=None, cache=True): - self._url = None - #SimaFM.count += 1 - self.current_element = None - self.caching = cache - self.purge_cache() + def __init__(self): + self.http = HttpClient(cache=self.cache, stats=self.stats) + self.artist = None - def _is_in_cache(self): - """Controls presence of url in cache. - """ - if self._url in SimaFM.cache: - #print('already fetch {0}'.format(self.artist)) - return True - return False - - def _fetch(self): - """Use cached elements or proceed http request""" - if self._is_in_cache(): - self.current_element = SimaFM.cache.get(self._url).gettree() - return - self._fetch_lfm() - - @Throttle(WAIT_BETWEEN_REQUESTS) - def _fetch_lfm(self): - """Get artists, fetch xml from last.fm""" - try: - fd = urllib.request.urlopen(url=self._url, - timeout=15) - except SocketTimeOut: - raise XmlFMTimeOut() - except BadStatusLine as err: - raise XmlFMHTTPError(err) - except urllib.error.URLError as err: - if hasattr(err, 'reason'): - # URLError, failed to reach server - raise XmlFMError(repr(err.reason)) - if hasattr(err, 'code'): - # HTTPError, the server couldn't fulfill the request - if err.code == 400: - raise XmlFMNotFound() - raise XmlFMHTTPError(err) - raise XmlFMError(err) - headers = dict(fd.getheaders()) - content_type = headers.get('Content-Type').split(';') - if content_type[0] != "text/xml": - raise XmlFMError('None XML returned from the server') - if content_type[1].strip() != "charset=utf-8": - raise XmlFMError('XML not UTF-8 encoded!') - try: - self.current_element = ElementTree(file=fd) - except SocketTimeOut: - raise XmlFMTimeOut() - finally: - fd.close() - self._controls_lfm_answer() - if self.caching: - SimaFM.cache[self._url] = AudioScrobblerCache(self.current_element, - datetime.utcnow()) - - def _controls_lfm_answer(self): - """Controls last.fm answer. - """ - status = self.current_element.getroot().attrib.get('status') - if status == 'ok': - return True - if status == 'failed': - error = self.current_element.find('error').attrib.get('code') - errormsg = self.current_element.findtext('error') - #if error in LFM_ERRORS.keys(): - # print LFM_ERRORS.get(error) - raise XmlFMNotFound(errormsg) - - def _controls_artist(self, artist): + def _controls_answer(self, ans): + """Controls answer. """ + if 'error' in ans: + code = ans.get('error') + mess = ans.get('message') + if code == 6: + raise WSNotFound(f'{mess}: "{self.artist}"') + raise WSError(mess) + return True + + def _forge_payload(self, artist, method='similar', track=None): + """Build payload """ + payloads = dict({'similar': {'method': 'artist.getsimilar',}, + 'top': {'method': 'artist.gettoptracks',}, + 'track': {'method': 'track.getsimilar',}, + 'info': {'method': 'artist.getinfo',}, + }) + payload = payloads.get(method) + payload.update(api_key=LFM.get('apikey'), format='json') + if not isinstance(artist, Artist): + raise TypeError(f'"{artist!r}" not an Artist object') self.artist = artist - if not self.artist: - raise XmlFMMissingArtist('Missing artist name calling SimaFM.get_()') - if not isinstance(self.artist, str): - raise EncodingError('"%s" not unicode object' % self.artist) - # last.fm is UTF-8 encoded URL - self.artist_utf8 = self.artist.encode('UTF-8') - - def purge_cache(self, age=4): - now = datetime.utcnow() - if now.hour == SimaFM.timestamp.hour: - return - SimaFM.timestamp = datetime.utcnow() - cache = SimaFM.cache - delta = timedelta(hours=age) - for url in list(cache.keys()): - timestamp = cache.get(url).created() - if now - timestamp > delta: - cache.pop(url) - - def get_similar(self, artist=None): - """ - """ - self._controls_artist(artist) - # Construct URL - url = SimaFM.root_url + SimaFM.request.get('similar') - self._url = url % (urllib.parse.quote(self.artist_utf8, safe='')) - self._fetch() - # TODO: controls name encoding - elem = self.current_element - for art in elem.getiterator(tag='artist'): - yield str(art.findtext('name')), 100 * float(art.findtext('match')) - - def get_toptracks(self, artist=None): - """ - """ - self._controls_artist(artist) - # Construct URL - url = SimaFM.root_url + SimaFM.request.get('top') - self._url = url % (urllib.parse.quote(self.artist_utf8, safe='')) - self._fetch() - # TODO: controls name encoding - elem = self.current_element - for track in elem.getiterator(tag='track'): - yield str(track.findtext('name')), int(track.attrib.get('rank')) - - def get_similartracks(self, track=None, artist=None): - """ + if artist.mbid: + payload.update(mbid=f'{artist.mbid}') + else: + payload.update(artist=artist.name, + autocorrect=1) + payload.update(results=100) + if method == 'track': + payload.update(track=track) + # > hashing the URL into a cache key + # return a sorted list of 2-tuple to have consistent cache + return sorted(payload.items(), key=lambda param: param[0]) + + def get_similar(self, artist): + """Fetch similar artists + + :param sima.lib.meta.Artist artist: `Artist` to fetch similar artists from + :returns: generator of :class:`sima.lib.meta.Artist` """ + payload = self._forge_payload(artist) # Construct URL - url = SimaFM.root_url + SimaFM.request.get('track') - self._url = url % (urllib.parse.quote(artist.encode('UTF-8'), safe=''), - urllib.parse.quote(track.encode('UTF-8'), safe='')) - self._fetch() - elem = self.current_element - for trk in elem.getiterator(tag='track'): - yield (str(trk.findtext('artist/name')), - str(trk.findtext('name')), - 100 * float(trk.findtext('match'))) - - def get_mbid(self, artist=None): - """ + ans = self.http(self.root_url, payload) + try: + ans.json() + except ValueError as err: + # Corrupted/malformed cache? cf. gitlab issue #35 + raise WSError('Malformed json, try purging the cache: %s') from err + self._controls_answer(ans.json()) # pylint: disable=no-member + # Artist might be found but return no 'artist' list… + # cf. "Mulatu Astatqe" vs. "Mulatu Astatqé" with autocorrect=0 + # json format is broken IMHO, xml is more consistent IIRC + # Here what we got: + # >>> {"similarartists":{"#text":"\n","artist":"Mulatu Astatqe"}} + # autocorrect=1 should fix it, checking anyway. + simarts = ans.json().get('similarartists').get('artist') # pylint: disable=no-member + if not isinstance(simarts, list): + raise WSError('Artist found but no similarities returned') + for art in ans.json().get('similarartists').get('artist'): # pylint: disable=no-member + yield Artist(name=art.get('name'), mbid=art.get('mbid', None)) + + def get_toptrack(self, artist): + """Fetch artist top tracks + + :param sima.lib.meta.Artist artist: `Artist` to fetch top tracks from + :returns: generator of :class:`sima.lib.track.Track` """ - self._controls_artist(artist) - # Construct URL - url = SimaFM.root_url + SimaFM.request.get('info') - self._url = url % (urllib.parse.quote(self.artist_utf8, safe='')) - self._fetch() - # TODO: controls name encoding - elem = self.current_element - return str(elem.find('artist').findtext('mbid')) - - -def run(): - test = SimaFM() - for t, a, m in test.get_similartracks(artist='Nirvana', track='Smells Like Teen Spirit'): - print(a, t, m) - return - -if __name__ == '__main__': - try: - run() - except XmlFMHTTPError as conn_err: - print("error trying to connect: %s" % conn_err) - except XmlFMNotFound as not_found: - print("looks like no artists were found: %s" % not_found) - except XmlFMError as err: - print(err) - + payload = self._forge_payload(artist, method='top') + ans = self.http(self.root_url, payload) + self._controls_answer(ans.json()) # pylint: disable=no-member + tops = ans.json().get('toptracks').get('track') # pylint: disable=no-member + art = {'artist': artist.name, + 'musicbrainz_artistid': artist.mbid,} + for song in tops: + for key in ['artist', 'streamable', 'listeners', + 'url', 'image', '@attr']: + if key in song: + song.pop(key) + song.update(art) + song.update(title=song.pop('name')) + song.update(time=song.pop('duration', 0)) + yield Track(**song) # VIM MODLINE # vim: ai ts=4 sw=4 sts=4 expandtab