- request = dict({'similar': '?method=artist.getsimilar&artist=%s&' +\
- 'api_key={apikey}'.format(**LFM),
- 'top': '?method=artist.gettoptracks&artist=%s&' +\
- 'api_key={apikey}'.format(**LFM),
- 'track': '?method=track.getsimilar&artist=%s' +\
- '&track=%s' + 'api_key={apikey}'.format(**LFM),
- 'info': '?method=artist.getinfo&artist=%s' +\
- 'api_key={apikey}'.format(**LFM),
- })
- payloads = dict({'similar': {'method':'artist.getsimilar',
- 'artist':None, 'api_key':LFM.get('apikey'),},
- 'top': {'method':'artist.gettoptracks',
- 'artist':None, 'api_key':LFM.get('apikey'),},
- 'track': {'method':'track.getsimilar',
- 'artist':None, 'track':None,
- 'api_key':LFM.get('apikey'),},
- 'info': {'method':'artist.getinfo', 'artist':None,
- 'api_key':LFM.get('apikey'),},
- })
- cache = dict({})
- timestamp = datetime.utcnow()
- count = 0
-
- def __init__(self, artist=None, cache=True):
- self._url = None
- #SimaFM.count += 1
- self.current_element = None
- self.caching = cache
- self.purge_cache()
-
- def _is_in_cache(self):
- """Controls presence of url in cache.
- """
- if self._url in SimaFM.cache:
- #print('already fetch {0}'.format(self.artist))
- return True
- return False
-
- def _fetch(self):
- """Use cached elements or proceed http request"""
- if self._is_in_cache():
- self.current_element = SimaFM.cache.get(self._url).gettree()
- return
- self._fetch_lfm()
-
- @Throttle(WAIT_BETWEEN_REQUESTS)
- def _fetch_ws(self):
- pass
-
- @Throttle(WAIT_BETWEEN_REQUESTS)
- def _fetch_lfm(self):
- """Get artists, fetch xml from last.fm"""
- try:
- fd = urllib.request.urlopen(url=self._url,
- timeout=15)
- except SocketTimeOut:
- raise XmlFMTimeOut()
- except BadStatusLine as err:
- raise XmlFMHTTPError(err)
- except urllib.error.URLError as err:
- if hasattr(err, 'reason'):
- # URLError, failed to reach server
- raise XmlFMError(repr(err.reason))
- if hasattr(err, 'code'):
- # HTTPError, the server couldn't fulfill the request
- if err.code == 400:
- raise XmlFMNotFound()
- raise XmlFMHTTPError(err)
- raise XmlFMError(err)
- headers = dict(fd.getheaders())
- content_type = headers.get('Content-Type').split(';')
- if content_type[0] != "text/xml":
- raise XmlFMError('None XML returned from the server')
- if content_type[1].strip() != "charset=utf-8":
- raise XmlFMError('XML not UTF-8 encoded!')
- try:
- self.current_element = ElementTree(file=fd)
- except SocketTimeOut:
- raise XmlFMTimeOut()
- finally:
- fd.close()
- self._controls_lfm_answer()
- if self.caching:
- SimaFM.cache[self._url] = Cache(self.current_element)
-
- def _controls_lfm_answer(self):
- """Controls last.fm answer.
- """
- status = self.current_element.getroot().attrib.get('status')
- if status == 'ok':
- return True
- if status == 'failed':
- error = self.current_element.find('error').attrib.get('code')
- errormsg = self.current_element.findtext('error')
- raise XmlFMNotFound(errormsg)
-
- def _controls_artist(self, artist):