-
- def __init__(self, expression):
- self.expression = expression
-
- def __str__(self):
- return repr(self.expression)
-
-
-class EncodingError(XmlFMError):
- """Raised when string is not unicode"""
- pass
-
-
-class XmlFMHTTPError(XmlFMError):
- """Raised when failed to connect server"""
-
- def __init__(self, expression):
- if hasattr(expression, 'code'):
- self.expression = 'error %d: %s' % (expression.code,
- expression.msg)
- else:
- self.expression = 'error: %s' % expression
-
-
-class XmlFMNotFound(XmlFMError):
- """Raised when no artist is found"""
-
- def __init__(self, message=None):
- if not message:
- message = 'Artist probably not found (http error 400)'
- self.expression = (message)
-
-
-class XmlFMMissingArtist(XmlFMError):
- """Raised when no artist name provided"""
-
- def __init__(self, message=None):
- if not message:
- message = 'Missing artist name.'
- self.expression = (message)
-
-
-class XmlFMTimeOut(XmlFMError):
- """Raised when urlopen times out"""
-
- def __init__(self, message=None):
- if not message:
- message = 'Connection to last.fm web services times out!'
- self.expression = (message)
-
-
-class Throttle():
- def __init__(self, wait):
- self.wait = wait
- self.last_called = datetime.now()
-
- def __call__(self, func):
- def wrapper(*args, **kwargs):
- while self.last_called + self.wait > datetime.now():
- #print('waiting…')
- sleep(0.1)
- result = func(*args, **kwargs)
- self.last_called = datetime.now()
- return result
- return wrapper
-
-
-class AudioScrobblerCache():
- def __init__(self, elem, last):
- self.elemtree = elem
- self.requestdate = last
-
- def created(self):
- return self.requestdate
-
- def gettree(self):
- return self.elemtree
-
-
-class SimaFM():
- """
- """
- api_key = '4a1c9ddec29816ed803d7be9113ba4cb'
- host = 'ws.audioscrobbler.com'
- version = '2.0'
- root_url = 'http://%s/%s/' % (host, version)
- request = dict({'similar': '?method=artist.getsimilar&artist=%s&' +\
- 'api_key=%s' % api_key,
- 'top': '?method=artist.gettoptracks&artist=%s&' +\
- 'api_key=%s' % api_key,
- 'track': '?method=track.getsimilar&artist=%s' +\
- '&track=%s' + '&api_key=%s' % api_key,
- 'info': '?method=artist.getinfo&artist=%s' +\
- '&api_key=%s' % api_key,
- })
- cache = dict({})
- timestamp = datetime.utcnow()
- count = 0
-
- def __init__(self, artist=None, cache=True):
- self._url = None
- #SimaFM.count += 1
- self.current_element = None
- self.caching = cache
- self.purge_cache()
-
- def _is_in_cache(self):
- """Controls presence of url in cache.
- """
- if self._url in SimaFM.cache:
- #print('already fetch {0}'.format(self.artist))
- return True
- return False
-
- def _fetch(self):
- """Use cached elements or proceed http request"""
- if self._is_in_cache():
- self.current_element = SimaFM.cache.get(self._url).gettree()
- return
- self._fetch_lfm()
-
- @Throttle(WAIT_BETWEEN_REQUESTS)
- def _fetch_lfm(self):
- """Get artists, fetch xml from last.fm"""
- try:
- fd = urllib.request.urlopen(url=self._url,
- timeout=15)
- except SocketTimeOut:
- raise XmlFMTimeOut()
- except BadStatusLine as err:
- raise XmlFMHTTPError(err)
- except urllib.error.URLError as err:
- if hasattr(err, 'reason'):
- # URLError, failed to reach server
- raise XmlFMError(repr(err.reason))
- if hasattr(err, 'code'):
- # HTTPError, the server couldn't fulfill the request
- if err.code == 400:
- raise XmlFMNotFound()
- raise XmlFMHTTPError(err)
- raise XmlFMError(err)
- headers = dict(fd.getheaders())
- content_type = headers.get('Content-Type').split(';')
- if content_type[0] != "text/xml":
- raise XmlFMError('None XML returned from the server')
- if content_type[1].strip() != "charset=utf-8":
- raise XmlFMError('XML not UTF-8 encoded!')
- try:
- self.current_element = ElementTree(file=fd)
- except SocketTimeOut:
- raise XmlFMTimeOut()
- finally:
- fd.close()
- self._controls_lfm_answer()
- if self.caching:
- SimaFM.cache[self._url] = AudioScrobblerCache(self.current_element,
- datetime.utcnow())
-
- def _controls_lfm_answer(self):
- """Controls last.fm answer.
- """
- status = self.current_element.getroot().attrib.get('status')
- if status == 'ok':
- return True
- if status == 'failed':
- error = self.current_element.find('error').attrib.get('code')
- errormsg = self.current_element.findtext('error')
- #if error in LFM_ERRORS.keys():
- # print LFM_ERRORS.get(error)
- raise XmlFMNotFound(errormsg)
-
- def _controls_artist(self, artist):