# -*- coding: utf-8 -*-
-# Copyright (c) 2009-2014 Jack Kaliko <kaliko@azylum.org>
+# Copyright (c) 2009-2015 Jack Kaliko <kaliko@azylum.org>
#
# This file is part of sima
#
# local import
from .plugin import Plugin
from .track import Track
-from .meta import Artist
-from ..utils.utils import WSError
+from .meta import Artist, MetaContainer
+from ..utils.utils import WSError, WSNotFound
def cache(func):
"""Caching decorator"""
def wrapper(*args, **kwargs):
#pylint: disable=W0212,C0111
cls = args[0]
- similarities = [art for art in args[1]]
+ similarities = [art.name for art in args[1]]
hashedlst = md5(''.join(similarities).encode('utf-8')).hexdigest()
if hashedlst in cls._cache.get('asearch'):
cls.log.debug('cached request')
class WebService(Plugin):
"""similar artists webservice
"""
+ # pylint: disable=bad-builtin
def __init__(self, daemon):
Plugin.__init__(self, daemon)
self.to_add = list()
self._cache = None
self._flush_cache()
- wrapper = {
- 'track': self._track,
- 'top': self._top,
- 'album': self._album,
- }
+ wrapper = {'track': self._track,
+ 'top': self._top,
+ 'album': self._album,}
self.queue_mode = wrapper.get(self.plugin_conf.get('queue_mode'))
self.ws = None
self.log.info('{0}: Flushing cache!'.format(name))
else:
self.log.info('{0}: Initialising cache!'.format(name))
- self._cache = {
- 'asearch': dict(),
- 'tsearch': dict(),
- }
+ self._cache = {'asearch': dict(),
+ 'tsearch': dict(),}
def _cleanup_cache(self):
"""Avoid bloated cache
black_list = self.player.queue + self.to_add
not_in_hist = list(set(tracks) - set(self.get_history(artist=artist)))
if self.plugin_conf.get('queue_mode') != 'top' and not not_in_hist:
- self.log.debug('All tracks already played for "{}"'.format(artist))
+ self.log.debug('All tracks already played for "%s"', artist)
random.shuffle(not_in_hist)
candidate = []
for trk in [_ for _ in not_in_hist if _ not in black_list]:
# Should use albumartist heuristic as well
- if self.plugin_conf.getboolean('single_album'):
+ if self.plugin_conf.getboolean('single_album'): # pylint: disable=no-member
if (trk.album == self.player.current.album or
- trk.album in [tr.album for tr in self.to_add]):
+ trk.album in [tr.album for tr in black_list]):
self.log.debug('Found unplayed track ' +
- 'but from an album already queued: %s' % (trk))
+ 'but from an album already queued: %s', trk)
continue
candidate.append(trk)
if not candidate:
Move around items in artists_list in order to play first not recently
played artists
"""
- # TODO: move to utils as a decorator
+ hist = list()
duration = self.daemon_conf.getint('sima', 'history_duration')
- art_in_hist = list()
- for trk in self.sdb.get_history(duration=duration, artists=alist):
- if trk[0] not in art_in_hist:
- art_in_hist.append(trk[0])
- art_in_hist.reverse()
- art_not_in_hist = [ar for ar in alist if ar not in art_in_hist]
- random.shuffle(art_not_in_hist)
- art_not_in_hist.extend(art_in_hist)
- self.log.info('{}'.format(
- ' / '.join(art_not_in_hist)))
- return art_not_in_hist
+ for art in self.sdb.get_artists_history(alist, duration=duration):
+ if art not in hist:
+ hist.insert(0, art)
+ reorg = [art for art in alist if art not in hist]
+ reorg.extend(hist)
+ return reorg
@cache
def get_artists_from_player(self, similarities):
Look in player library for availability of similar artists in
similarities
"""
- dynamic = self.plugin_conf.getint('max_art')
+ dynamic = self.plugin_conf.getint('max_art') # pylint: disable=no-member
if dynamic <= 0:
dynamic = 100
results = list()
similarities.reverse()
while (len(results) < dynamic
- and len(similarities) > 0):
+ and len(similarities) > 0):
art_pop = similarities.pop()
- results.extend(self.player.fuzzy_find_artist(art_pop))
+ res = self.player.search_artist(art_pop)
+ if res:
+ results.append(res)
return results
- def ws_similar_artists(self, artist=None):
+ def ws_similar_artists(self, artist):
"""
Retrieve similar artists from WebServive.
"""
# initialize artists deque list to construct from DB
as_art = deque()
- as_artists = self.ws().get_similar(artist=artist)
- self.log.debug('Requesting {} for {!r}'.format(self.ws.name,artist))
+ as_artists = self.ws.get_similar(artist=artist)
+ self.log.debug('Requesting {} for {!r}'.format(self.ws.name, artist))
try:
- # TODO: let's propagate Artist type
- [as_art.append(str(art)) for art in as_artists]
+ [as_art.append(art) for art in as_artists]
+ except WSNotFound as err:
+ self.log.warning('{}: {}'.format(self.ws.name, err))
+ if artist.mbid:
+ self.log.debug('Trying without MusicBrainzID')
+ try:
+ return self.ws_similar_artists(Artist(name=artist.name))
+ except WSNotFound as err:
+ self.log.debug('{}: {}'.format(self.ws.name, err))
except WSError as err:
- self.log.warning('{0}: {1}'.format(self.ws.name, err))
+ self.log.warning('{}: {}'.format(self.ws.name, err))
if as_art:
- self.log.debug('Fetched {0} artist(s)'.format(len(as_art)))
+ self.log.debug('Fetched {} artist(s)'.format(len(as_art)))
return as_art
def get_recursive_similar_artist(self):
- ret_extra = list()
- history = deque(self.history)
- history.popleft()
- depth = 0
+ """Check against local player for similar artists (recursive w/ history)
+ """
if not self.player.playlist:
- return ret_extra
- last_trk = self.player.playlist[-1]
+ return
+ history = list(self.history)
+ history = self.player.queue + history
+ history = deque(history)
+ last_trk = history.popleft() # remove
extra_arts = list()
- while depth < self.plugin_conf.getint('depth'):
+ ret_extra = list()
+ depth = 0
+ while depth < self.plugin_conf.getint('depth'): # pylint: disable=no-member
if len(history) == 0:
break
trk = history.popleft()
- if (trk.get_artist() in extra_arts
- or trk.get_artist() == last_trk.get_artist()):
+ if (trk.Artist in extra_arts
+ or trk.Artist == last_trk.Artist):
continue
- extra_arts.append(trk.get_artist())
+ extra_arts.append(trk.Artist)
depth += 1
- self.log.info('EXTRA ARTS: {}'.format(
- '/'.join([art.name for art in extra_arts])))
+ self.log.debug('EXTRA ARTS: %s', '/'.join(map(str, extra_arts)))
for artist in extra_arts:
self.log.debug('Looking for artist similar '
'to "{}" as well'.format(artist))
similar = self.ws_similar_artists(artist=artist)
if not similar:
- return ret_extra
+ continue
ret_extra.extend(self.get_artists_from_player(similar))
- if last_trk.artist in ret_extra:
- ret_extra.remove(last_trk.artist)
+
+ if last_trk.Artist in ret_extra:
+ ret_extra.remove(last_trk.Artist)
+ if ret_extra:
+ self.log.debug('similar artist(s) found: %s',
+ ' / '.join(map(str, MetaContainer(ret_extra))))
return ret_extra
def get_local_similar_artists(self):
"""
if not self.player.playlist:
return []
- tolookfor = self.player.playlist[-1].get_artist()
+ tolookfor = self.player.playlist[-1].Artist
self.log.info('Looking for artist similar to "{}"'.format(tolookfor))
+ self.log.debug(repr(tolookfor))
similar = self.ws_similar_artists(tolookfor)
if not similar:
self.log.info('Got nothing from {0}!'.format(self.ws.name))
return []
- self.log.info('First five similar artist(s): {}...'.format(
- ' / '.join([a for a in list(similar)[0:5]])))
+ self.log.info('First five similar artist(s): %s...',
+ ' / '.join(map(str, list(similar)[:5])))
self.log.info('Looking availability in music library')
- ret = set(self.get_artists_from_player(similar))
+ ret = MetaContainer(self.get_artists_from_player(similar))
+ if ret:
+ self.log.debug('regular found in library: %s',
+ ' / '.join(map(str, ret)))
+ else:
+ self.log.debug('Got nothing similar from library!')
ret_extra = None
if len(self.history) >= 2:
- if self.plugin_conf.getint('depth') > 1:
+ if self.plugin_conf.getint('depth') > 1: # pylint: disable=no-member
ret_extra = self.get_recursive_similar_artist()
if ret_extra:
- ret = set(ret) | set(ret_extra)
+ # get them reorg to pick up best element
+ ret_extra = self._get_artists_list_reorg(ret_extra)
+ # tries to pickup less artist from extra art
+ if len(ret) < 4:
+ ret_extra = MetaContainer(ret_extra)
+ else:
+ ret_extra = MetaContainer(ret_extra[:max(4, len(ret))//2])
+ if ret_extra:
+ self.log.debug('extra found in library: %s',
+ ' / '.join(map(str, ret_extra)))
+ ret = ret | ret_extra
if not ret:
self.log.warning('Got nothing from music library.')
- self.log.warning('Try running in debug mode to guess why...')
return []
- queued_artists = { trk.artist for trk in self.player.queue }
+ queued_artists = MetaContainer([trk.Artist for trk in self.player.queue])
+ self.log.trace('Already queued: {}'.format(queued_artists))
+ self.log.trace('Candidate: {}'.format(ret))
if ret & queued_artists:
- self.log.debug('Removing already queued artist: {0}'.format(ret & queued_artists))
+ self.log.debug('Removing already queued artists: '
+ '{0}'.format('/'.join(map(str, ret & queued_artists))))
ret = ret - queued_artists
- if self.player.current.artist in ret:
- self.log.debug('Removing current artist: {0}'.format(self.player.current.artist))
- ret = ret - {self.player.current.artist}
+ if self.player.current and self.player.current.Artist in ret:
+ self.log.debug('Removing current artist: {0}'.format(self.player.current.Artist))
+ ret = ret - MetaContainer([self.player.current.Artist])
# Move around similars items to get in unplayed|not recently played
# artist first.
self.log.info('Got {} artists in library'.format(len(ret)))
- return self._get_artists_list_reorg(list(ret))
+ candidates = self._get_artists_list_reorg(list(ret))
+ if candidates:
+ self.log.info(' / '.join(map(str, candidates)))
+ return candidates
def _get_album_history(self, artist=None):
"""Retrieve album history"""
duration = self.daemon_conf.getint('sima', 'history_duration')
albums_list = set()
- for trk in self.sdb.get_history(artist=artist, duration=duration):
+ for trk in self.sdb.get_history(artist=artist.name, duration=duration):
albums_list.add(trk[1])
return albums_list
"""
self.to_add = list()
nb_album_add = 0
- target_album_to_add = self.plugin_conf.getint('album_to_add')
+ target_album_to_add = self.plugin_conf.getint('album_to_add') # pylint: disable=no-member
for artist in artists:
self.log.info('Looking for an album to add for "%s"...' % artist)
- albums = self.player.find_albums(artist)
+ albums = self.player.search_albums(artist)
# str conversion while Album type is not propagated
albums = [str(album) for album in albums]
if albums:
- self.log.debug('Albums candidate: {0:s}'.format(
- ' / '.join(albums)))
+ self.log.debug('Albums candidate: %s', ' / '.join(albums))
else: continue
# albums yet in history for this artist
albums = set(albums)
albums_not_in_hist = list(albums - albums_yet_in_hist)
# Get to next artist if there are no unplayed albums
if not albums_not_in_hist:
- self.log.info('No album found for "%s"' % artist)
+ self.log.info('No unplayed album found for "%s"' % artist)
continue
album_to_queue = str()
random.shuffle(albums_not_in_hist)
# Good heuristic, at least enough to guess if the whole album is
# already queued.
if tracks[0] in self.player.queue:
- self.log.debug('"%s" already queued, skipping!' %
- tracks[0].album)
+ self.log.debug('"%s" already queued, skipping!', tracks[0].album)
continue
album_to_queue = album
if not album_to_queue:
- self.log.info('No album found for "%s"' % artist)
+ self.log.info('No album found for "%s"', artist)
continue
- self.log.info('{2} album candidate: {0} - {1}'.format(
- artist, album_to_queue, self.ws.name))
+ self.log.info('%s album candidate: %s - %s', self.ws.name, artist, album_to_queue)
nb_album_add += 1
self.to_add.extend(self.player.find_album(artist, album_to_queue))
if nb_album_add == target_album_to_add:
find top tracks for artists in artists list.
"""
self.to_add = list()
- nbtracks_target = self.plugin_conf.getint('track_to_add')
- webserv = self.ws()
+ nbtracks_target = self.plugin_conf.getint('track_to_add') # pylint: disable=no-member
for artist in artists:
- artist = Artist(name=artist)
if len(self.to_add) == nbtracks_target:
return True
self.log.info('Looking for a top track for {0}'.format(artist))
titles = deque()
try:
- titles = [t for t in webserv.get_toptrack(artist)]
+ titles = [t for t in self.ws.get_toptrack(artist)]
except WSError as err:
- self.log.warning('{0}: {1}'.format(self.ws.name, err))
- if self.ws.ratelimit:
- self.log.info('{0.name} ratelimit: {0.ratelimit}'.format(self.ws))
+ self.log.warning('%s: %s', self.ws.name, err)
for trk in titles:
- found = self.player.fuzzy_find_track(artist.name, trk.title)
+ found = self.player.fuzzy_find_track(artist, trk.title)
random.shuffle(found)
if found:
- self.log.debug('{0}'.format(found[0]))
+ self.log.debug('%s', found[0])
if self.filter_track(found):
break
"""Get some tracks for track queue mode
"""
artists = self.get_local_similar_artists()
- nbtracks_target = self.plugin_conf.getint('track_to_add')
+ nbtracks_target = self.plugin_conf.getint('track_to_add') # pylint: disable=no-member
for artist in artists:
- self.log.debug('Trying to find titles to add for "{}"'.format(
- artist))
+ self.log.debug('Trying to find titles to add for "%r"', artist)
found = self.player.find_track(artist)
random.shuffle(found)
if not found:
if len(self.to_add) == nbtracks_target:
break
if not self.to_add:
- self.log.debug('Found no tracks to queue, is your ' +
- 'history getting too large?')
+ self.log.debug('Found no tracks to queue!')
return None
for track in self.to_add:
self.log.info('{1} candidates: {0!s}'.format(track, self.ws.name))
self.queue_mode()
msg = ' '.join(['{0}: {1:>3d}'.format(k, v) for
k, v in sorted(self.ws.stats.items())])
- self.log.debug(msg)
+ self.log.debug('http stats: ' + msg)
candidates = self.to_add
self.to_add = list()
if self.plugin_conf.get('queue_mode') != 'album':