X-Git-Url: http://git.kaliko.me/?a=blobdiff_plain;f=sima%2Flib%2Fsimafm.py;h=18625a6761bbc80c4aee7e060e90918193d0696a;hb=24993cd99b847733894f57fd004ed52b5390244a;hp=79184037daa66d71447effd86a48cc7a40d6016f;hpb=c1bda032095902bdcd183c530a9c4de28f3c828a;p=mpd-sima.git diff --git a/sima/lib/simafm.py b/sima/lib/simafm.py index 7918403..18625a6 100644 --- a/sima/lib/simafm.py +++ b/sima/lib/simafm.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright (c) 2009, 2010, 2011, 2012, 2013 Jack Kaliko -# Copyright (c) 2010 Eric Casteleijn (Throttle decorator) +# Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Jack Kaliko # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -19,291 +18,114 @@ # """ -Consume last.fm web service - +Consume EchoNest web service """ -__version__ = '0.3.0' +__version__ = '0.5.0' __author__ = 'Jack Kaliko' -import urllib.request, urllib.error, urllib.parse - from datetime import datetime, timedelta -from http.client import BadStatusLine -from socket import timeout as SocketTimeOut -from time import sleep -from xml.etree.cElementTree import ElementTree - -# Some definitions -WAIT_BETWEEN_REQUESTS = timedelta(0, 0.4) -LFM_ERRORS = dict({'2': 'Invalid service -This service does not exist', - '3': 'Invalid Method - No method with that name in this package', - '4': 'Authentication Failed - You do not have permissions to access the service', - '5': "'Invalid format - This service doesn't exist in that format", - '6': 'Invalid parameters - Your request is missing a required parameter', - '7': 'Invalid resource specified', - '9': 'Invalid session key - Please re-authenticate', - '10': 'Invalid API key - You must be granted a valid key by last.fm', - '11': 'Service Offline - This service is temporarily offline. Try again later.', - '12': 'Subscription Error - The user needs to be subscribed in order to do that', - '13': 'Invalid method signature supplied', - '26': 'Suspended API key - Access for your account has been suspended, please contact Last.fm', - }) - - -class XmlFMError(Exception): # Errors - """ - Exception raised for errors in the input. - """ - - def __init__(self, expression): - self.expression = expression - - def __str__(self): - return repr(self.expression) - - -class EncodingError(XmlFMError): - """Raised when string is not unicode""" - pass - - -class XmlFMHTTPError(XmlFMError): - """Raised when failed to connect server""" - - def __init__(self, expression): - if hasattr(expression, 'code'): - self.expression = 'error %d: %s' % (expression.code, - expression.msg) - else: - self.expression = 'error: %s' % expression - - -class XmlFMNotFound(XmlFMError): - """Raised when no artist is found""" - - def __init__(self, message=None): - if not message: - message = 'Artist probably not found (http error 400)' - self.expression = (message) - - -class XmlFMMissingArtist(XmlFMError): - """Raised when no artist name provided""" - - def __init__(self, message=None): - if not message: - message = 'Missing artist name.' - self.expression = (message) - - -class XmlFMTimeOut(XmlFMError): - """Raised when urlopen times out""" - - def __init__(self, message=None): - if not message: - message = 'Connection to last.fm web services times out!' - self.expression = (message) +from requests import get, Request, Timeout, ConnectionError -class Throttle(): - def __init__(self, wait): - self.wait = wait - self.last_called = datetime.now() +from sima import LFM +from sima.lib.meta import Artist +from sima.utils.utils import WSError, WSNotFound, WSTimeout, WSHTTPError +from sima.utils.utils import getws, Throttle, Cache, purge_cache +if len(LFM.get('apikey')) == 43: # simple hack allowing imp.reload + getws(LFM) - def __call__(self, func): - def wrapper(*args, **kwargs): - while self.last_called + self.wait > datetime.now(): - #print('waiting…') - sleep(0.1) - result = func(*args, **kwargs) - self.last_called = datetime.now() - return result - return wrapper - - -class AudioScrobblerCache(): - def __init__(self, elem, last): - self.elemtree = elem - self.requestdate = last - - def created(self): - return self.requestdate - - def gettree(self): - return self.elemtree +# Some definitions +WAIT_BETWEEN_REQUESTS = timedelta(0, 1) +SOCKET_TIMEOUT = 6 class SimaFM(): """ """ - api_key = '4a1c9ddec29816ed803d7be9113ba4cb' - host = 'ws.audioscrobbler.com' - version = '2.0' - root_url = 'http://%s/%s/' % (host, version) - request = dict({'similar': '?method=artist.getsimilar&artist=%s&' +\ - 'api_key=%s' % api_key, - 'top': '?method=artist.gettoptracks&artist=%s&' +\ - 'api_key=%s' % api_key, - 'track': '?method=track.getsimilar&artist=%s' +\ - '&track=%s' + '&api_key=%s' % api_key, - 'info': '?method=artist.getinfo&artist=%s' +\ - '&api_key=%s' % api_key, - }) - cache = dict({}) + root_url = 'http://{host}/{version}/'.format(**LFM) + cache = {} timestamp = datetime.utcnow() - count = 0 + name = 'Last.fm' + ratelimit = None - def __init__(self, artist=None, cache=True): - self._url = None - #SimaFM.count += 1 + def __init__(self, cache=True): + self.artist = None + self._url = self.__class__.root_url self.current_element = None self.caching = cache - self.purge_cache() - - def _is_in_cache(self): - """Controls presence of url in cache. - """ - if self._url in SimaFM.cache: - #print('already fetch {0}'.format(self.artist)) - return True - return False + purge_cache(self.__class__) - def _fetch(self): + def _fetch(self, payload): """Use cached elements or proceed http request""" - if self._is_in_cache(): - self.current_element = SimaFM.cache.get(self._url).gettree() + url = Request('GET', self._url, params=payload,).prepare().url + if url in SimaFM.cache: + self.current_element = SimaFM.cache.get(url).elem return - self._fetch_lfm() + try: + self._fetch_ech(payload) + except Timeout: + raise WSTimeout('Failed to reach server within {0}s'.format( + SOCKET_TIMEOUT)) + except ConnectionError as err: + raise WSError(err) @Throttle(WAIT_BETWEEN_REQUESTS) - def _fetch_lfm(self): - """Get artists, fetch xml from last.fm""" - try: - fd = urllib.request.urlopen(url=self._url, - timeout=15) - except SocketTimeOut: - raise XmlFMTimeOut() - except BadStatusLine as err: - raise XmlFMHTTPError(err) - except urllib.error.URLError as err: - if hasattr(err, 'reason'): - # URLError, failed to reach server - raise XmlFMError(repr(err.reason)) - if hasattr(err, 'code'): - # HTTPError, the server couldn't fulfill the request - if err.code == 400: - raise XmlFMNotFound() - raise XmlFMHTTPError(err) - raise XmlFMError(err) - headers = dict(fd.getheaders()) - content_type = headers.get('Content-Type').split(';') - if content_type[0] != "text/xml": - raise XmlFMError('None XML returned from the server') - if content_type[1].strip() != "charset=utf-8": - raise XmlFMError('XML not UTF-8 encoded!') - try: - self.current_element = ElementTree(file=fd) - except SocketTimeOut: - raise XmlFMTimeOut() - finally: - fd.close() - self._controls_lfm_answer() + def _fetch_ech(self, payload): + """fetch from web service""" + req = get(self._url, params=payload, + timeout=SOCKET_TIMEOUT) + #self.__class__.ratelimit = req.headers.get('x-ratelimit-remaining', None) + if req.status_code is not 200: + raise WSHTTPError(req.status_code) + self.current_element = req.json() + self._controls_answer() if self.caching: - SimaFM.cache[self._url] = AudioScrobblerCache(self.current_element, - datetime.utcnow()) + SimaFM.cache.update({req.url: + Cache(self.current_element)}) - def _controls_lfm_answer(self): - """Controls last.fm answer. + def _controls_answer(self): + """Controls answer. """ - status = self.current_element.getroot().attrib.get('status') - if status == 'ok': - return True - if status == 'failed': - error = self.current_element.find('error').attrib.get('code') - errormsg = self.current_element.findtext('error') - #if error in LFM_ERRORS.keys(): - # print LFM_ERRORS.get(error) - raise XmlFMNotFound(errormsg) - - def _controls_artist(self, artist): + if 'error' in self.current_element: + code = self.current_element.get('error') + mess = self.current_element.get('message') + if code == 6: + raise WSNotFound('{0}: "{1}"'.format(mess, self.artist)) + raise WSError(mess) + return True + + def _forge_payload(self, artist, method='similar', track=None): """ """ + payloads = dict({'similar': {'method':'artist.getsimilar',}, + 'top': {'method':'artist.gettoptracks',}, + 'track': {'method':'track.getsimilar',}, + 'info': {'method':'artist.getinfo',}, + }) + payload = payloads.get(method) + payload.update(api_key=LFM.get('apikey'), format='json') + if not isinstance(artist, Artist): + raise TypeError('"{0!r}" not an Artist object'.format(artist)) self.artist = artist - if not self.artist: - raise XmlFMMissingArtist('Missing artist name calling SimaFM.get_()') - if not isinstance(self.artist, str): - raise EncodingError('"%s" not unicode object' % self.artist) - # last.fm is UTF-8 encoded URL - self.artist_utf8 = self.artist.encode('UTF-8') - - def purge_cache(self, age=4): - now = datetime.utcnow() - if now.hour == SimaFM.timestamp.hour: - return - SimaFM.timestamp = datetime.utcnow() - cache = SimaFM.cache - delta = timedelta(hours=age) - for url in list(cache.keys()): - timestamp = cache.get(url).created() - if now - timestamp > delta: - cache.pop(url) + if artist.mbid: + payload.update(mbid='{0}'.format(artist.mbid)) + else: + payload.update(artist=artist.name) + payload.update(results=100) + if method == 'track': + payload.update(track=track) + return payload def get_similar(self, artist=None): """ """ - self._controls_artist(artist) - # Construct URL - url = SimaFM.root_url + SimaFM.request.get('similar') - self._url = url % (urllib.parse.quote(self.artist_utf8, safe='')) - self._fetch() - # TODO: controls name encoding - elem = self.current_element - for art in elem.getiterator(tag='artist'): - yield str(art.findtext('name')), 100 * float(art.findtext('match')) - - def get_toptracks(self, artist=None): - """ - """ - self._controls_artist(artist) - # Construct URL - url = SimaFM.root_url + SimaFM.request.get('top') - self._url = url % (urllib.parse.quote(self.artist_utf8, safe='')) - self._fetch() - # TODO: controls name encoding - elem = self.current_element - for track in elem.getiterator(tag='track'): - yield str(track.findtext('name')), int(track.attrib.get('rank')) - - def get_mbid(self, artist=None): - """ - """ - self._controls_artist(artist) + payload = self._forge_payload(artist) # Construct URL - url = SimaFM.root_url + SimaFM.request.get('info') - self._url = url % (urllib.parse.quote(self.artist_utf8, safe='')) - self._fetch() - # TODO: controls name encoding - elem = self.current_element - return str(elem.find('artist').findtext('mbid')) - - -def run(): - test = SimaFM() - for a, m in test.get_similar(artist='Tool'): - pass - return - -if __name__ == '__main__': - try: - run() - except XmlFMHTTPError as conn_err: - print("error trying to connect: %s" % conn_err) - except XmlFMNotFound as not_found: - print("looks like no artists were found: %s" % not_found) - except XmlFMError as err: - print(err) + self._fetch(payload) + for art in self.current_element.get('similarartists').get('artist'): + yield Artist(name=art.get('name'), mbid=art.get('mbid', None)) # VIM MODLINE