# -*- coding: utf-8 -*-
+"""WebServices API credentials and ressources
+"""
LFM = {'apikey': 'NG4xcDlxcXJwMjk4MTZycTgwM3E3b3I5MTEzb240cG8',
'host':'ws.audioscrobbler.com',
from .lib.track import Track
from .lib.meta import Album
from .lib.simastr import SimaStr
+from .utils.leven import levenshtein_ratio
class PlayerError(Exception):
if bl_getter(elem, add_not=True):
cls.log.info('Blacklisted: {0}'.format(elem))
results.remove(elem)
+ if track and cls.database.get_bl_album(elem, add_not=True):
+ # filter album as well in track mode
+ # (artist have already been)
+ cls.log.info('Blacklisted: {0}'.format(elem))
+ results.remove(elem)
return results
return wrapper
return decorated
or not hasattr(old_curr, 'id')
or not hasattr(self.current, 'id')):
return False
- return (self.current.id != old_curr.id) # pylint: disable=no-member
+ return self.current.id != old_curr.id # pylint: disable=no-member
def _flush_cache(self):
"""
return self.find('artist', artist, 'title', title)
return self.find('artist', artist)
+ @blacklist(track=True)
+ def fuzzy_find_track(self, artist, title):
+ # Retrieve all tracks from artist
+ all_tracks = self.find('artist', artist)
+ # Get all titles (filter missing titles set to 'None')
+ all_artist_titles = frozenset([tr.title for tr in all_tracks
+ if tr.title is not None])
+ match = get_close_matches(title, all_artist_titles, 50, 0.78)
+ if not match:
+ return []
+ for title_ in match:
+ leven = levenshtein_ratio(title.lower(), title_.lower())
+ if leven == 1:
+ pass
+ elif leven >= 0.79: # PARAM
+ self.log.debug('title: "%s" should match "%s" (lr=%1.3f)' %
+ (title_, title, leven))
+ else:
+ self.log.debug('title: "%s" does not match "%s" (lr=%1.3f)' %
+ (title_, title, leven))
+ return []
+ return self.find('artist', artist, 'title', title_)
+
@blacklist(artist=True)
def fuzzy_find_artist(self, art):
"""
albums.append(Album(name=album, **kwalbart))
for album in self.list('album', 'artist', artist):
album_trks = [trk for trk in self.find('album', album)]
- # TODO: add a VA filter option
if 'Various Artists' in [tr.albumartist for tr in album_trks]:
self.log.debug('Discarding {0} ("Various Artists" set)'.format(album))
continue
if len(set(arts)) < 2: # TODO: better heuristic, use a ratio instead
if album not in albums:
albums.append(Album(name=album, albumartist=artist))
- elif (album and album not in albums):
+ elif album and album not in albums:
self.log.debug('"{0}" probably not an album of "{1}"'.format(
album, artist) + '({0})'.format('/'.join(arts)))
return albums
def queue(self):
plst = self.playlist
plst.reverse()
- return [ trk for trk in plst if int(trk.pos) > int(self.current.pos)]
+ return [trk for trk in plst if int(trk.pos) > int(self.current.pos)]
@property
def playlist(self):
from sima import ECH
from sima.lib.meta import Artist
+from sima.lib.track import Track
from sima.utils.utils import WSError, WSNotFound, WSTimeout, WSHTTPError
from sima.utils.utils import getws, Throttle, Cache, purge_cache
if len(ECH.get('apikey')) == 23: # simple hack allowing imp.reload
timeout=SOCKET_TIMEOUT)
self.__class__.ratelimit = req.headers.get('x-ratelimit-remaining', None)
if req.status_code is not 200:
- raise WSHTTPError(req.status_code)
+ raise WSHTTPError('{0.status_code}: {0.reason}'.format(req))
self.current_element = req.json()
self._controls_answer()
if self.caching:
raise WSNotFound('Artist not found: "{0}"'.format(self.artist))
raise WSError(status.get('message'))
- def _forge_payload(self, artist):
+ def _forge_payload(self, artist, top=False):
"""Build payload
"""
payload = {'api_key': ECH.get('apikey')}
payload.update(name=artist.name)
payload.update(bucket='id:musicbrainz')
payload.update(results=100)
+ if top:
+ if artist.mbid:
+ aid = payload.pop('id')
+ payload.update(artist_id=aid)
+ else:
+ name = payload.pop('name')
+ payload.update(artist=name)
+ payload.update(results=100)
+ payload.update(sort='song_hotttnesss-desc')
return payload
def get_similar(self, artist=None):
).lstrip('musicbrainz:artist:')
yield Artist(mbid=mbid, name=art.get('name'))
+ def get_toptrack(self, artist=None):
+ """Fetch artist top tracks
+ """
+ payload = self._forge_payload(artist, top=True)
+ # Construct URL
+ self._ressource = '{0}/song/search'.format(SimaEch.root_url)
+ self._fetch(payload)
+ titles = list()
+ artist = {
+ 'artist': artist.name,
+ 'musicbrainz_artistid': artist.mbid,
+ }
+ for song in self.current_element.get('response').get('songs'):
+ title = song.get('title')
+ if title not in titles:
+ titles.append(title)
+ yield Track(title=title, **artist )
+
# VIM MODLINE
# vim: ai ts=4 sw=4 sts=4 expandtab
timeout=SOCKET_TIMEOUT)
#self.__class__.ratelimit = req.headers.get('x-ratelimit-remaining', None)
if req.status_code is not 200:
- raise WSHTTPError(req.status_code)
+ raise WSHTTPError('{0.status_code}: {0.reason}'.format(req))
self.current_element = req.json()
self._controls_answer()
if self.caching:
artist = tracks[0].artist
black_list = self.player.queue + self.to_add
not_in_hist = list(set(tracks) - set(self.get_history(artist=artist)))
- if not not_in_hist:
+ if self.plugin_conf.get('queue_mode') != 'top' and not not_in_hist:
self.log.debug('All tracks already played for "{}"'.format(artist))
random.shuffle(not_in_hist)
- #candidate = [ trk for trk in not_in_hist if trk not in black_list
- #if not self.sdb.get_bl_track(trk, add_not=True)]
candidate = []
for trk in [_ for _ in not_in_hist if _ not in black_list]:
- if self.sdb.get_bl_track(trk, add_not=True):
- self.log.info('Blacklisted: {0}: '.format(trk))
- continue
- if self.sdb.get_bl_album(trk, add_not=True):
- self.log.info('Blacklisted album: {0}: '.format(trk))
- continue
# Should use albumartist heuristic as well
if self.plugin_conf.getboolean('single_album'):
if (trk.album == self.player.current.album or
continue
candidate.append(trk)
if not candidate:
- self.log.debug('Unable to find title to add' +
- ' for "%s".' % artist)
- return None
+ return False
self.to_add.append(random.choice(candidate))
+ return True
def _get_artists_list_reorg(self, alist):
"""
art_not_in_hist = [ ar for ar in alist if ar not in art_in_hist ]
random.shuffle(art_not_in_hist)
art_not_in_hist.extend(art_in_hist)
- self.log.debug('history ordered: {}'.format(
+ self.log.info('{}'.format(
' / '.join(art_not_in_hist)))
return art_not_in_hist
results.extend(self.player.fuzzy_find_artist(art_pop))
return results
- def lfm_similar_artists(self, artist=None):
+ def ws_similar_artists(self, artist=None):
"""
Retrieve similar artists from WebServive.
"""
self.log.debug(
'Looking for artist similar to "{0.artist}" as well'.format(
artist))
- similar = self.lfm_similar_artists(artist=artist)
+ similar = self.ws_similar_artists(artist=artist)
if not similar:
return ret_extra
ret_extra.extend(self.get_artists_from_player(similar))
"""
current = self.player.current
self.log.info('Looking for artist similar to "{0.artist}"'.format(current))
- similar = self.lfm_similar_artists()
+ similar = self.ws_similar_artists()
if not similar:
self.log.info('Got nothing from {0}!'.format(self.ws.name))
return []
self.log.warning('Try running in debug mode to guess why...')
return []
self.log.info('Got {} artists in library'.format(len(ret)))
- self.log.info(' / '.join(ret))
# Move around similars items to get in unplayed|not recently played
# artist first.
return self._get_artists_list_reorg(ret)
if nb_album_add == target_album_to_add:
return True
+ def find_top(self, artists):
+ """
+ find top tracks for artists in artists list.
+ """
+ self.to_add = list()
+ nbtracks_target = self.plugin_conf.getint('track_to_add')
+ webserv = self.ws()
+ for artist in artists:
+ artist = Artist(name=artist)
+ if len(self.to_add) == nbtracks_target:
+ return True
+ self.log.info('Looking for a top track for {0}'.format(artist))
+ titles = deque()
+ try:
+ titles = [t for t in webserv.get_toptrack(artist)]
+ except WSError as err:
+ self.log.warning('{0}: {1}'.format(self.ws.name, err))
+ if self.ws.ratelimit:
+ self.log.info('{0.name} ratelimit: {0.ratelimit}'.format(self.ws))
+ for trk in titles:
+ found = self.player.fuzzy_find_track(artist.name, trk.title)
+ if found:
+ self.log.debug('{0}'.format(found[0]))
+ if self.filter_track(found):
+ break
+
def _track(self):
"""Get some tracks for track queue mode
"""
def _top(self):
"""Get some tracks for top track queue mode
"""
- #artists = self.get_local_similar_artists()
- pass
+ artists = self.get_local_similar_artists()
+ nbtracks_target = self.plugin_conf.getint('track_to_add')
+ self.find_top(artists)
+ for track in self.to_add:
+ self.log.info('{1} candidates: {0!s}'.format(track, self.ws.name))
def callback_need_track(self):
self._cleanup_cache()