Fetching similar artists from last.fm web services
"""
-# standart library import
+# standard library import
import random
from collections import deque
-from itertools import dropwhile
from hashlib import md5
-# third parties componants
+# third parties components
# local import
from ...lib.plugin import Plugin
return wrapper
-def blacklist(artist=False, album=False, track=False):
- #pylint: disable=C0111,W0212
- field = (artist, album, track)
- def decorated(func):
- def wrapper(*args, **kwargs):
- cls = args[0]
- boolgen = (bl for bl in field)
- bl_fun = (cls._Plugin__daemon.sdb.get_bl_artist,
- cls._Plugin__daemon.sdb.get_bl_album,
- cls._Plugin__daemon.sdb.get_bl_track,)
- #bl_getter = next(fn for fn, bl in zip(bl_fun, boolgen) if bl is True)
- bl_getter = next(dropwhile(lambda _: not next(boolgen), bl_fun))
- cls.log.debug('using {0} as bl filter'.format(bl_getter.__name__))
- if artist:
- results = func(*args, **kwargs)
- for elem in results:
- if bl_getter(elem, add_not=True):
- cls.log.info('Blacklisted: {0}'.format(elem))
- results.remove(elem)
- return results
- if track:
- for elem in args[1]:
- if bl_getter(elem, add_not=True):
- cls.log.info('Blacklisted: {0}'.format(elem))
- args[1].remove(elem)
- return func(*args, **kwargs)
- return wrapper
- return decorated
-
-
class Lastfm(Plugin):
"""last.fm similar artists
"""
else:
self.log.info('Lastfm: Initialising cache!')
self._cache = {
- 'artists': None,
'asearch': dict(),
'tsearch': dict(),
}
- self._cache['artists'] = frozenset(self.player.list('artist'))
def _cleanup_cache(self):
"""Avoid bloated cache
# Should use albumartist heuristic as well
if self.plugin_conf.getboolean('single_album'):
if (trk.album == self.player.current.album or
- trk.album in [trk.alb for trk in self.to_add]):
+ trk.album in [tr.album for tr in self.to_add]):
self.log.debug('Found unplayed track ' +
'but from an album already queued: %s' % (trk))
continue
' / '.join(art_not_in_hist)))
return art_not_in_hist
- @blacklist(artist=True)
@cache
def get_artists_from_player(self, similarities):
"""
Look in player library for availability of similar artists in
similarities
"""
- dynamic = int(self.plugin_conf.get('dynamic'))
+ dynamic = self.plugin_conf.getint('dynamic')
if dynamic <= 0:
dynamic = 100
- similarity = int(self.plugin_conf.get('similarity'))
+ similarity = self.plugin_conf.getint('similarity')
results = list()
similarities.reverse()
while (len(results) < dynamic
art_pop, match = similarities.pop()
if match < similarity:
break
- results.extend(self.player.fuzzy_find(art_pop))
+ results.extend(self.player.fuzzy_find_artist(art_pop))
results and self.log.debug('Similarity: %d%%' % match) # pylint: disable=w0106
return results
depth = 0
current = self.player.current
extra_arts = list()
- while depth < int(self.plugin_conf.get('depth')):
+ while depth < self.plugin_conf.getint('depth'):
if len(history) == 0:
break
trk = history.popleft()
ret_extra = None
if len(self.history) >= 2:
ret_extra = self.get_recursive_similar_artist()
+ if ret_extra:
+ ret = list(set(ret) | set(ret_extra))
if not ret:
self.log.warning('Got nothing from music library.')
self.log.warning('Try running in debug mode to guess why...')
return []
- if ret_extra:
- ret = list(set(ret) | set(ret_extra))
self.log.info('Got {} artists in library'.format(len(ret)))
self.log.info(' / '.join(ret))
# Move around similars items to get in unplayed|not recently played
# artist first.
return self._get_artists_list_reorg(ret)
- def _detects_var_artists_album(self, album, artist):
- """Detects either an album is a "Various Artists" or a
- single artist release."""
- art_first_track = None
- for track in self.player.find_album(artist, album):
- if not art_first_track: # set artist for the first track
- art_first_track = track.artist
- alb_art = track.albumartist
- # Special heuristic used when AlbumArtist is available
- if (alb_art):
- if artist == alb_art:
- # When album artist field is similar to the artist we're
- # looking an album for, the album is considered good to
- # queue
- return False
- else:
- self.log.debug(track)
- self.log.debug('album art says "%s", looking for "%s",'
- ' not queueing this album' %
- (alb_art, artist))
- return True
- return False
-
def _get_album_history(self, artist=None):
"""Retrieve album history"""
duration = self.daemon_conf.getint('sima', 'history_duration')
"""
self.to_add = list()
nb_album_add = 0
- target_album_to_add = int(self.plugin_conf.get('album_to_add'))
+ target_album_to_add = self.plugin_conf.getint('album_to_add')
for artist in artists:
self.log.info('Looking for an album to add for "%s"...' % artist)
- albums = set(self.player.find_albums(artist))
+ albums = self.player.find_albums(artist)
+ # str conversion while Album type is not propagated
+ albums = [ str(album) for album in albums]
+ if albums:
+ self.log.debug('Albums candidate: {0:s}'.format(' / '.join(albums)))
+ else: continue
# albums yet in history for this artist
+ albums = set(albums)
albums_yet_in_hist = albums & self._get_album_history(artist=artist)
albums_not_in_hist = list(albums - albums_yet_in_hist)
# Get to next artist if there are no unplayed albums
album_to_queue = str()
random.shuffle(albums_not_in_hist)
for album in albums_not_in_hist:
- tracks = self.player.find('album', album)
- if self._detects_var_artists_album(album, artist):
- continue
- if tracks and self.sdb.get_bl_album(tracks[0], add_not=True):
- self.log.info('Blacklisted album: "%s"' % album)
- self.log.debug('using track: "%s"' % tracks[0])
- continue
+ tracks = self.player.find_album(artist, album)
# Look if one track of the album is already queued
# Good heuristic, at least enough to guess if the whole album is
# already queued.
"""Get some tracks for track queue mode
"""
artists = self.get_local_similar_artists()
- nbtracks_target = int(self.plugin_conf.get('track_to_add'))
+ nbtracks_target = self.plugin_conf.getint('track_to_add')
for artist in artists:
self.log.debug('Trying to find titles to add for "{}"'.format(
artist))