import random
from collections import deque
-from difflib import get_close_matches
+from itertools import dropwhile
from hashlib import md5
# third parties componants
# local import
-from ..utils.leven import levenshtein_ratio
from ..lib.plugin import Plugin
from ..lib.simafm import SimaFM, XmlFMHTTPError, XmlFMNotFound, XmlFMError
-from ..lib.simastr import SimaStr
from ..lib.track import Track
return wrapper
+def blacklist(artist=False, album=False, track=False):
+ #pylint: disable=C0111,W0212
+ field = (artist, album, track)
+ def decorated(func):
+ def wrapper(*args, **kwargs):
+ cls = args[0]
+ boolgen = (bl for bl in field)
+ bl_fun = (cls._Plugin__daemon.sdb.get_bl_artist,
+ cls._Plugin__daemon.sdb.get_bl_album,
+ cls._Plugin__daemon.sdb.get_bl_track,)
+ #bl_getter = next(fn for fn, bl in zip(bl_fun, boolgen) if bl is True)
+ bl_getter = next(dropwhile(lambda _: not next(boolgen), bl_fun))
+ cls.log.debug('using {0} as bl filter'.format(bl_getter.__name__))
+ if artist:
+ results = func(*args, **kwargs)
+ for elem in results:
+ if bl_getter(elem, add_not=True):
+ cls.log.info('Blacklisted: {0}'.format(elem))
+ results.remove(elem)
+ return results
+ if track:
+ for elem in args[1]:
+ if bl_getter(elem, add_not=True):
+ cls.log.info('Blacklisted: {0}'.format(elem))
+ args[1].remove(elem)
+ return func(*args, **kwargs)
+ return wrapper
+ return decorated
+
+
class Lastfm(Plugin):
"""last.fm similar artists
"""
Plugin.__init__(self, daemon)
self.daemon_conf = daemon.config
self.sdb = daemon.sdb
- self.player = daemon.player
self.history = daemon.short_history
##
self.to_add = list()
"""
for _ , val in self._cache.items():
if isinstance(val, dict):
- while len(val) > 100:
+ while len(val) > 150:
val.popitem()
def get_history(self, artist):
- """Check against history for tracks already in history for a specific
- artist.
+ """Constructs list of Track for already played titles for an artist.
"""
duration = self.daemon_conf.getint('sima', 'history_duration')
tracks_from_db = self.sdb.get_history(duration=duration, artist=artist)
Extract one unplayed track from a Track object list.
* not in history
* not already in the queue
+ * not blacklisted
"""
artist = tracks[0].artist
black_list = self.player.queue + self.to_add
if not not_in_hist:
self.log.debug('All tracks already played for "{}"'.format(artist))
random.shuffle(not_in_hist)
- candidate = [ trk for trk in not_in_hist if trk not in black_list ]
+ #candidate = [ trk for trk in not_in_hist if trk not in black_list
+ #if not self.sdb.get_bl_track(trk, add_not=True)]
+ candidate = []
+ for trk in [_ for _ in not_in_hist if _ not in black_list]:
+ if self.sdb.get_bl_track(trk, add_not=True):
+ self.log.info('Blacklisted: {0}: '.format(trk))
+ continue
+ if self.sdb.get_bl_album(trk, add_not=True):
+ self.log.info('Blacklisted album: {0}: '.format(trk))
+ continue
+ candidate.append(trk)
if not candidate:
self.log.debug('Unable to find title to add' +
- ' for "%s".' % artist)
+ ' for "%s".' % artist)
return None
self.to_add.append(random.choice(candidate))
Move around items in artists_list in order to play first not recently
played artists
"""
+ # TODO: move to utils as a decorator
duration = self.daemon_conf.getint('sima', 'history_duration')
art_in_hist = list()
for trk in self.sdb.get_history(duration=duration,
' / '.join(art_not_in_hist)))
return art_not_in_hist
- def _cross_check_artist(self, art):
- """
- Controls presence of artists in liste in music library.
- Crosschecking artist names with SimaStr objects / difflib / levenshtein
-
- TODO: proceed crosschecking even when an artist matched !!!
- Not because we found "The Doors" as "The Doors" that there is no
- remaining entries as "Doors" :/
- not straight forward, need probably heavy refactoring.
- """
- matching_artists = list()
- artist = SimaStr(art)
- all_artists = self._cache.get('artists')
-
- # Check against the actual string in artist list
- if artist.orig in all_artists:
- self.log.debug('found exact match for "%s"' % artist)
- return [artist]
- # Then proceed with fuzzy matching if got nothing
- match = get_close_matches(artist.orig, all_artists, 50, 0.73)
- if not match:
- return []
- self.log.debug('found close match for "%s": %s' %
- (artist, '/'.join(match)))
- # Does not perform fuzzy matching on short and single word strings
- # Only lowercased comparison
- if ' ' not in artist.orig and len(artist) < 8:
- for fuzz_art in match:
- # Regular string comparison SimaStr().lower is regular string
- if artist.lower() == fuzz_art.lower():
- matching_artists.append(fuzz_art)
- self.log.debug('"%s" matches "%s".' % (fuzz_art, artist))
- return matching_artists
- for fuzz_art in match:
- # Regular string comparison SimaStr().lower is regular string
- if artist.lower() == fuzz_art.lower():
- matching_artists.append(fuzz_art)
- self.log.debug('"%s" matches "%s".' % (fuzz_art, artist))
- return matching_artists
- # Proceed with levenshtein and SimaStr
- leven = levenshtein_ratio(artist.stripped.lower(),
- SimaStr(fuzz_art).stripped.lower())
- # SimaStr string __eq__, not regular string comparison here
- if artist == fuzz_art:
- matching_artists.append(fuzz_art)
- self.log.info('"%s" quite probably matches "%s" (SimaStr)' %
- (fuzz_art, artist))
- elif leven >= 0.82: # PARAM
- matching_artists.append(fuzz_art)
- self.log.debug('FZZZ: "%s" should match "%s" (lr=%1.3f)' %
- (fuzz_art, artist, leven))
- else:
- self.log.debug('FZZZ: "%s" does not match "%s" (lr=%1.3f)' %
- (fuzz_art, artist, leven))
- return matching_artists
-
+ @blacklist(artist=True)
@cache
def get_artists_from_player(self, similarities):
"""
art_pop, match = similarities.pop()
if match < similarity:
break
- results.extend(self._cross_check_artist(art_pop))
- results and self.log.debug('Similarity: %d%%' % match)
+ results.extend(self.player.fuzzy_find(art_pop))
+ results and self.log.debug('Similarity: %d%%' % match) # pylint: disable=w0106
return results
def lfm_similar_artists(self, artist=None):
return as_art
def get_recursive_similar_artist(self):
+ ret_extra = list()
history = deque(self.history)
history.popleft()
- ret_extra = list()
depth = 0
current = self.player.current
extra_arts = list()
while depth < int(self.plugin_conf.get('depth')):
+ if len(history) == 0:
+ break
trk = history.popleft()
- if trk.artist in [trk.artist for trk in extra_arts]:
+ if (trk.artist in [trk.artist for trk in extra_arts]
+ or trk.artist == current.artist):
continue
extra_arts.append(trk)
depth += 1
- if len(history) == 0:
- break
self.log.info('EXTRA ARTS: {}'.format(
'/'.join([trk.artist for trk in extra_arts])))
for artist in extra_arts:
# artist first.
return self._get_artists_list_reorg(ret)
+ def _detects_var_artists_album(self, album, artist):
+ """Detects either an album is a "Various Artists" or a
+ single artist release."""
+ art_first_track = None
+ for track in self.player.find_album(artist, album):
+ if not art_first_track: # set artist for the first track
+ art_first_track = track.artist
+ alb_art = track.albumartist
+ # Special heuristic used when AlbumArtist is available
+ if (alb_art):
+ if artist == alb_art:
+ # When album artist field is similar to the artist we're
+ # looking an album for, the album is considered good to
+ # queue
+ return False
+ else:
+ self.log.debug(track)
+ self.log.debug('album art says "%s", looking for "%s",'
+ ' not queueing this album' %
+ (alb_art, artist))
+ return True
+ return False
+
+ def _get_album_history(self, artist=None):
+ """Retrieve album history"""
+ duration = self.daemon_conf.getint('sima', 'history_duration')
+ albums_list = set()
+ for trk in self.sdb.get_history(artist=artist, duration=duration):
+ albums_list.add(trk[1])
+ return albums_list
+
+ def find_album(self, artists):
+ """Find albums to queue.
+ """
+ self.to_add = list()
+ nb_album_add = 0
+ target_album_to_add = int(self.plugin_conf.get('album_to_add'))
+ for artist in artists:
+ self.log.info('Looking for an album to add for "%s"...' % artist)
+ albums = set(self.player.find_albums(artist))
+ # albums yet in history for this artist
+ albums_yet_in_hist = albums & self._get_album_history(artist=artist)
+ albums_not_in_hist = list(albums - albums_yet_in_hist)
+ # Get to next artist if there are no unplayed albums
+ if not albums_not_in_hist:
+ self.log.info('No album found for "%s"' % artist)
+ continue
+ album_to_queue = str()
+ random.shuffle(albums_not_in_hist)
+ for album in albums_not_in_hist:
+ tracks = self.player.find('album', album)
+ if self._detects_var_artists_album(album, artist):
+ continue
+ if tracks and self.sdb.get_bl_album(tracks[0], add_not=True):
+ self.log.info('Blacklisted album: "%s"' % album)
+ self.log.debug('using track: "%s"' % tracks[0])
+ continue
+ # Look if one track of the album is already queued
+ # Good heuristic, at least enough to guess if the whole album is
+ # already queued.
+ if tracks[0] in self.player.queue:
+ self.log.debug('"%s" already queued, skipping!' %
+ tracks[0].album)
+ continue
+ album_to_queue = album
+ if not album_to_queue:
+ self.log.info('No album found for "%s"' % artist)
+ continue
+ self.log.info('last.fm album candidate: {0} - {1}'.format(
+ artist, album_to_queue))
+ nb_album_add += 1
+ self.to_add.extend(self.player.find_album(artist, album_to_queue))
+ if nb_album_add == target_album_to_add:
+ return True
+
def _track(self):
"""Get some tracks for track queue mode
"""
self.log.debug('Trying to find titles to add for "{}"'.format(
artist))
found = self.player.find_track(artist)
- # find tracks not in history
+ # find tracks not in history for artist
self.filter_track(found)
if len(self.to_add) == nbtracks_target:
break
if not self.to_add:
- self.log.debug('Found no unplayed tracks, is your ' +
- 'history getting too large?')
+ self.log.debug('Found no tracks to queue, is your ' +
+ 'history getting too large?')
return None
for track in self.to_add:
self.log.info('last.fm candidate: {0!s}'.format(track))
"""Get albums for album queue mode
"""
artists = self.get_local_similar_artists()
+ self.find_album(artists)
def _top(self):
"""Get some tracks for top track queue mode
"""
- artists = self.get_local_similar_artists()
+ #artists = self.get_local_similar_artists()
+ pass
def callback_need_track(self):
self._cleanup_cache()
if not self.player.current:
- self.log.info('No currently playing track, cannot queue')
+ self.log.info('Not currently playing track, cannot queue')
return None
self.queue_mode()
candidates = self.to_add