# standard library import
from difflib import get_close_matches
-from itertools import dropwhile
from select import select
# third parties components
sexit(1)
# local import
-from .lib.player import Player
+from .lib.player import Player, blacklist
from .lib.track import Track
-from .lib.meta import Album
-from .lib.simastr import SimaStr
+from .lib.meta import Album, Artist
from .utils.leven import levenshtein_ratio
PlayerUnHandledError = MPDError # pylint: disable=C0103
-def blacklist(artist=False, album=False, track=False):
- #pylint: disable=C0111,W0212
- field = (artist, album, track)
- def decorated(func):
- def wrapper(*args, **kwargs):
- if not args[0].database:
- return func(*args, **kwargs)
- cls = args[0]
- boolgen = (bl for bl in field)
- bl_fun = (cls.database.get_bl_artist,
- cls.database.get_bl_album,
- cls.database.get_bl_track,)
- #bl_getter = next(fn for fn, bl in zip(bl_fun, boolgen) if bl is True)
- bl_getter = next(dropwhile(lambda _: not next(boolgen), bl_fun))
- #cls.log.debug('using {0} as bl filter'.format(bl_getter.__name__))
- results = list()
- for elem in func(*args, **kwargs):
- if bl_getter(elem, add_not=True):
- cls.log.debug('Blacklisted "{0}"'.format(elem))
- continue
- if track and cls.database.get_bl_album(elem, add_not=True):
- # filter album as well in track mode
- # (artist have already been)
- cls.log.debug('Blacklisted alb. "{0.album}"'.format(elem))
- continue
- results.append(elem)
- return results
- return wrapper
- return decorated
-
class PlayerClient(Player):
"""MPD Client
From python-musicpd:
self._cache['artists'] = frozenset(self._client.list('artist'))
@blacklist(track=True)
- def find_track(self, artist, title=None):
+ def _find_track(self, artist, title):
#return getattr(self, 'find')('artist', artist, 'title', title)
if title:
return self.find('artist', artist, 'title', title)
return self.find('artist', artist)
+ def find_track(self, artist, title=None):
+ tracks = list()
+ if isinstance(artist, Artist):
+ for name in artist.names:
+ tracks.extend(self._find_track(name, title=title))
+ else:
+ tracks.extend(self._find_track(artist,title=title))
+ return tracks
+
@blacklist(track=True)
def fuzzy_find_track(self, artist, title):
# Retrieve all tracks from artist
return []
return self.find('artist', artist, 'title', title_)
- @blacklist(artist=True)
- def fuzzy_find_artist(self, art):
- """
- Controls presence of artist in music library.
- Crosschecking artist names with SimaStr objects / difflib / levenshtein
-
- TODO: proceed crosschecking even when an artist matched !!!
- Not because we found "The Doors" as "The Doors" that there is no
- remaining entries as "Doors" :/
- not straight forward, need probably heavy refactoring.
- """
- matching_artists = list()
- artist = SimaStr(art)
-
- # Check against the actual string in artist list
- if artist.orig in self.artists:
- self.log.debug('found exact match for "%s"' % artist)
- return [artist.orig]
- # Then proceed with fuzzy matching if got nothing
- match = get_close_matches(artist.orig, self.artists, 50, 0.73)
- if not match:
- return []
- self.log.debug('found close match for "%s": %s' %
- (artist, '/'.join(match)))
- # Does not perform fuzzy matching on short and single word strings
- # Only lowercased comparison
- if ' ' not in artist.orig and len(artist) < 8:
- for fuzz_art in match:
- # Regular string comparison SimaStr().lower is regular string
- if artist.lower() == fuzz_art.lower():
- matching_artists.append(fuzz_art)
- self.log.debug('"%s" matches "%s".' % (fuzz_art, artist))
- return matching_artists
- for fuzz_art in match:
- # Regular string comparison SimaStr().lower is regular string
- if artist.lower() == fuzz_art.lower():
- matching_artists.append(fuzz_art)
- self.log.debug('"%s" matches "%s".' % (fuzz_art, artist))
- return matching_artists
- # SimaStr string __eq__ (not regular string comparison here)
- if artist == fuzz_art:
- matching_artists.append(fuzz_art)
- self.log.info('"%s" quite probably matches "%s" (SimaStr)' %
- (fuzz_art, artist))
- else:
- self.log.debug('FZZZ: "%s" does not match "%s"' %
- (fuzz_art, artist))
- return matching_artists
-
def find_album(self, artist, album):
"""
Special wrapper around album search:
return self.find('artist', artist, 'album', album)
@blacklist(album=True)
- def find_albums(self, artist):
+ def search_albums(self, artist):
"""
Fetch all albums for "AlbumArtist" == artist
Filter albums returned for "artist" == artist since MPD returns any
album containing at least a single track for artist
"""
albums = []
- kwalbart = {'albumartist':artist, 'artist':artist}
- for album in self.list('album', 'albumartist', artist):
- if album not in albums:
- albums.append(Album(name=album, **kwalbart))
- for album in self.list('album', 'artist', artist):
- album_trks = [trk for trk in self.find('album', album)]
- if 'Various Artists' in [tr.albumartist for tr in album_trks]:
- self.log.debug('Discarding {0} ("Various Artists" set)'.format(album))
- continue
- arts = set([trk.artist for trk in album_trks])
- if len(set(arts)) < 2: # TODO: better heuristic, use a ratio instead
- if album not in albums:
- albums.append(Album(name=album, albumartist=artist))
- elif album and album not in albums:
- self.log.debug('"{0}" probably not an album of "{1}"'.format(
- album, artist) + '({0})'.format('/'.join(arts)))
+ for name in artist.aliases:
+ self.log.debug('Searching album for {}'.format(name))
+ kwalbart = {'albumartist':name, 'artist':name}
+ for album in self.list('album', 'albumartist', artist):
+ if album and album not in albums:
+ albums.append(Album(name=album, **kwalbart))
+ for album in self.list('album', 'artist', artist):
+ album_trks = [trk for trk in self.find('album', album)]
+ if 'Various Artists' in [tr.albumartist for tr in album_trks]:
+ self.log.debug('Discarding {0} ("Various Artists" set)'.format(album))
+ continue
+ arts = set([trk.artist for trk in album_trks])
+ if len(set(arts)) < 2: # TODO: better heuristic, use a ratio instead
+ if album not in albums:
+ albums.append(Album(name=album, albumartist=artist))
+ elif album and album not in albums:
+ self.log.debug('"{0}" probably not an album of "{1}"'.format(
+ album, artist) + '({0})'.format('/'.join(arts)))
return albums
def monitor(self):
"""
def __init__(self, **kwargs):
- self.name = None
+ self.__name = None #TODO: should be immutable
self.__mbid = None
+ self.__aliases = set()
if 'name' not in kwargs or not kwargs.get('name'):
raise MetaException('Need a "name" argument')
+ else:
+ self.__name = kwargs.pop('name')
if 'mbid' in kwargs and kwargs.get('mbid'):
is_uuid4(kwargs.get('mbid'))
# mbid immutable as hash rests on
return fmt.format(self.__class__.__name__, self)
def __str__(self):
- return self.name.__str__()
+ return self.__name.__str__()
def __eq__(self, other):
"""
if self.mbid and other.mbid:
return self.mbid == other.mbid
else:
- return other.__str__() == self.__str__()
+ return (other.__str__() == self.__str__() or
+ other.__str__() in self.__aliases)
return False
def __hash__(self):
if self.mbid:
return hash(self.mbid)
- return id(self)
+ return hash(self.__name)
+
+ def add_alias(self, other):
+ if getattr(other, '__str__', None):
+ if callable(other.__str__):
+ self.__aliases |= {other.__str__()}
+ elif isinstance(other, Meta):
+ self.__aliases |= other.__aliases
+ else:
+ raise MetaException('No __str__ method found in {!r}'.format(other))
+
+ @property
+ def name(self):
+ return self.__name
@property
def mbid(self):
return self.__mbid
+ @property
+ def aliases(self):
+ return self.__aliases
+
+ @property
+ def names(self):
+ return self.__aliases | {self.__name,}
+
class Album(Meta):
class Artist(Meta):
- def __init__(self, name=None, **kwargs):
+ def __init__(self, name=None, mbid=None, **kwargs):
"""Artist object built from a mapping dict containing at least an
"artist" entry:
>>> trk = {'artist':'Art Name',
>>> artobj0 = Artist(**trk)
>>> artobj1 = Artist(name='Tool')
"""
- self.__aliases = set()
name = kwargs.get('artist', name)
- mbid = kwargs.get('musicbrainz_artistid', None)
+ mbid = kwargs.get('musicbrainz_artistid', mbid)
if (kwargs.get('albumartist', False) and
kwargs.get('albumartist') != 'Various Artists'):
name = kwargs.get('albumartist').split(', ')[0]
mbid = kwargs.get('musicbrainz_albumartistid').split(', ')[0]
super().__init__(name=name, mbid=mbid)
- def add_alias(self, other):
- if getattr(other, '__str__', None):
- if callable(other.__str__):
- self.__aliases |= {other.__str__()}
- elif isinstance(other, Artist):
- self.__aliases |= other._Artist__aliases
- else:
- raise MetaException('No __str__ method found in {!r}'.format(other))
-
- @property
- def names(self):
- return self.__aliases | {self.name,}
-
# VIM MODLINE
# vim: ai ts=4 sw=4 sts=4 expandtab
# standard library import
import logging
from difflib import get_close_matches
+from itertools import dropwhile
# local import
+from .meta import Artist
from .simastr import SimaStr
from ..utils.leven import levenshtein_ratio
+def blacklist(artist=False, album=False, track=False):
+ #pylint: disable=C0111,W0212
+ field = (album, track)
+ def decorated(func):
+ def wrapper(*args, **kwargs):
+ if not args[0].database:
+ return func(*args, **kwargs)
+ cls = args[0]
+ boolgen = (bl for bl in field)
+ bl_fun = (cls.database.get_bl_album,
+ cls.database.get_bl_track,)
+ #bl_getter = next(fn for fn, bl in zip(bl_fun, boolgen) if bl is True)
+ bl_getter = next(dropwhile(lambda _: not next(boolgen), bl_fun))
+ #cls.log.debug('using {0} as bl filter'.format(bl_getter.__name__))
+ results = list()
+ for elem in func(*args, **kwargs):
+ if bl_getter(elem, add_not=True):
+ cls.log.debug('Blacklisted "{0}"'.format(elem))
+ continue
+ if track and cls.database.get_bl_album(elem, add_not=True):
+ # filter album as well in track mode
+ # (artist have already been)
+ cls.log.debug('Blacklisted alb. "{0.album}"'.format(elem))
+ continue
+ results.append(elem)
+ return results
+ return wrapper
+ return decorated
+
+def bl_artist(func):
+ def wrapper(*args, **kwargs):
+ cls = args[0]
+ if not args[0].database:
+ return func(*args, **kwargs)
+ result = func(*args, **kwargs)
+ if not result:
+ return
+ names = list()
+ for art in result.names:
+ if cls.database.get_bl_artist(art, add_not=True):
+ cls.log.debug('Blacklisted "{0}"'.format(art))
+ continue
+ names.append(art)
+ if not names:
+ return
+ resp = Artist(name=names.pop(), mbid=result.mbid)
+ for name in names:
+ resp.add_alias(name)
+ return resp
+ return wrapper
+
+
class Player(object):
"""Player interface to inherit from.
"""
raise NotImplementedError
- def find_albums(self, artist):
+ def search_albums(self, artist):
"""
Find albums by artist's name
- >>> player.find_alums('Nirvana')
+ >>> art = Artist(name='Nirvana')
+ >>> player.search_albums(art)
Returns a list of string objects
"""
raise NotImplementedError
- def fuzzy_find_artist(self, artist):
+ @bl_artist
+ def search_artist(self, artist):
"""
- Find artists based on a fuzzy search in the media library
- >>> bea = player.fuzzy_find_artist('beatles')
- >>> print(bea)
- >>> ['The Beatles']
+ Search artists based on a fuzzy search in the media library
+ >>> bea = player.search_artist('The beatles')
+ >>> print(bea.names)
+ >>> ['The Beatles', 'Beatles', 'the beatles']
Returns a list of strings (artist names)
"""
- raise NotImplementedError
+ found = False
+ # Then proceed with fuzzy matching if got nothing
+ match = get_close_matches(artist.name, self.artists, 50, 0.73)
+ if not match:
+ return
+ if len(match) > 1:
+ self.log.debug('found close match for "%s": %s' %
+ (artist, '/'.join(match)))
+ # Does not perform fuzzy matching on short and single word strings
+ # Only lowercased comparison
+ if ' ' not in artist.name and len(artist.name) < 8:
+ for fuzz_art in match:
+ # Regular lowered string comparison
+ if artist.name.lower() == fuzz_art.lower():
+ artist.add_alias(fuzz_art)
+ return artist
+ fzartist = SimaStr(artist.name)
+ for fuzz_art in match:
+ # Regular lowered string comparison
+ if artist.name.lower() == fuzz_art.lower():
+ found = True
+ if artist.name != fuzz_art:
+ artist.add_alias(fuzz_art)
+ self.log.debug('"%s" matches "%s".' % (fuzz_art, artist))
+ continue
+ # SimaStr string __eq__ (not regular string comparison here)
+ if fzartist == fuzz_art:
+ found = True
+ artist.add_alias(fuzz_art)
+ self.log.info('"%s" quite probably matches "%s" (SimaStr)' %
+ (fuzz_art, artist))
+ #else:
+ #self.log.debug('FZZZ: "%s" does not match "%s"' %
+ #(fuzz_art, artist))
+ if found:
+ if artist.aliases:
+ self.log.debug('Found: {}'.format('/'.join(artist.names)))
+ return artist
+ return
def disconnect(self):
"""Closing client connection with the Player
"""
raise NotImplementedError
+ @property
+ def artists(self):
+ raise NotImplementedError
+
+ @property
+ def state(self):
+ raise NotImplementedError
+
+ @property
+ def current(self):
+ raise NotImplementedError
+
+ @property
+ def queue(self):
+ raise NotImplementedError
+
+ @property
+ def playlist(self):
+ raise NotImplementedError
+
# VIM MODLINE
# vim: ai ts=4 sw=4 sts=4 expandtab
"""
get album information from the database.
if not in database insert new entry.
- Attention: use Track() object!!
+ Attention: use Track|Album object!!
Use AlbumArtist tag if provided, fallback to Album tag
"""
if with_connection:
self.close_database_connection(connection)
return False
+ def get_artists_history(self, artists, duration=__HIST_DURATION__):
+ """
+ """
+ date = datetime.utcnow() - timedelta(hours=duration)
+ connection = self.get_database_connection()
+ rows = connection.execute(
+ "SELECT arts.name, albs.name, trs.name, trs.file"
+ " FROM artists AS arts, tracks AS trs, history AS hist, albums AS albs"
+ " WHERE trs.id = hist.track AND trs.artist = arts.id AND trs.album = albs.id"
+ " AND hist.last_play > ? ORDER BY hist.last_play DESC", (date.isoformat(' '),))
+ for row in rows:
+ if artists and row[0] not in artists:
+ continue
+ for art in artists:
+ if row[0] == art:
+ yield art
+ self.close_database_connection(connection)
+
def get_history(self, artist=None, artists=None, duration=__HIST_DURATION__):
"""Retrieve complete play history, most recent tracks first
artist : filter history for specific artist
Consume EchoNest web service
"""
-__version__ = '0.0.3'
+__version__ = '0.0.4'
__author__ = 'Jack Kaliko'
for frgnid in art.get('foreign_ids'):
if frgnid.get('catalog') == 'musicbrainz':
mbid = frgnid.get('foreign_id'
- ).lstrip('musicbrainz:artist:')
+ ).split(':')[2]
yield Artist(mbid=mbid, name=art.get('name'))
def get_toptrack(self, artist=None):
self.__dict__.update({tag: ', '.join(set(value))})
def __repr__(self):
- return '%s(artist="%s", album="%s", title="%s", filename="%s")' % (
+ return '%s(artist="%s", album="%s", title="%s", file="%s")' % (
self.__class__.__name__,
self.artist,
self.album,
def wrapper(*args, **kwargs):
#pylint: disable=W0212,C0111
cls = args[0]
- similarities = [art for art in args[1]]
+ similarities = [art.name for art in args[1]]
hashedlst = md5(''.join(similarities).encode('utf-8')).hexdigest()
if hashedlst in cls._cache.get('asearch'):
cls.log.debug('cached request')
Move around items in artists_list in order to play first not recently
played artists
"""
- # TODO: move to utils as a decorator
+ hist = list()
duration = self.daemon_conf.getint('sima', 'history_duration')
- art_in_hist = list()
- for trk in self.sdb.get_history(duration=duration, artists=alist):
- if trk[0] not in art_in_hist:
- art_in_hist.append(trk[0])
- art_in_hist.reverse()
- art_not_in_hist = [ar for ar in alist if ar not in art_in_hist]
- random.shuffle(art_not_in_hist)
- art_not_in_hist.extend(art_in_hist)
- self.log.info('{}'.format(
- ' / '.join(art_not_in_hist)))
- return art_not_in_hist
+ for art in self.sdb.get_artists_history(alist, duration=duration):
+ if art not in hist:
+ hist.insert(0, art)
+ reorg = [art for art in alist if art not in hist]
+ reorg.extend(hist)
+ self.log.info('{}'.format(' / '.join([a.name for a in reorg])))
+ return reorg
@cache
def get_artists_from_player(self, similarities):
results = list()
similarities.reverse()
while (len(results) < dynamic
- and len(similarities) > 0):
+ and len(similarities) > 0):
art_pop = similarities.pop()
- results.extend(self.player.fuzzy_find_artist(art_pop))
+ res = self.player.search_artist(art_pop)
+ if res:
+ results.append(res)
return results
def ws_similar_artists(self, artist=None):
"""
# initialize artists deque list to construct from DB
as_art = deque()
- as_artists = self.ws().get_similar(artist=artist)
- self.log.debug('Requesting {} for {!r}'.format(self.ws.name,artist))
+ as_artists = self.ws.get_similar(artist=artist)
+ self.log.debug('Requesting {} for {!r}'.format(self.ws.name, artist))
try:
- # TODO: let's propagate Artist type
- [as_art.append(str(art)) for art in as_artists]
+ [as_art.append(art) for art in as_artists]
except WSError as err:
- self.log.warning('{0}: {1}'.format(self.ws.name, err))
+ self.log.warning('{}: {}'.format(self.ws.name, err))
if as_art:
- self.log.debug('Fetched {0} artist(s)'.format(len(as_art)))
+ self.log.debug('Fetched {} artist(s)'.format(len(as_art)))
return as_art
def get_recursive_similar_artist(self):
- ret_extra = list()
history = deque(self.history)
history.popleft()
depth = 0
if not self.player.playlist:
- return ret_extra
+ return
last_trk = self.player.playlist[-1]
extra_arts = list()
while depth < self.plugin_conf.getint('depth'):
'to "{}" as well'.format(artist))
similar = self.ws_similar_artists(artist=artist)
if not similar:
- return ret_extra
- ret_extra.extend(self.get_artists_from_player(similar))
- if last_trk.artist in ret_extra:
- ret_extra.remove(last_trk.artist)
+ return []
+ ret_extra = set(self.get_artists_from_player(similar))
+ if last_trk.Artist in ret_extra:
+ ret_extra.remove(last_trk.Artist)
return ret_extra
def get_local_similar_artists(self):
self.log.info('Got nothing from {0}!'.format(self.ws.name))
return []
self.log.info('First five similar artist(s): {}...'.format(
- ' / '.join([a for a in list(similar)[0:5]])))
+ ' / '.join([a.name for a in list(similar)[0:5]])))
self.log.info('Looking availability in music library')
ret = set(self.get_artists_from_player(similar))
ret_extra = None
self.log.warning('Got nothing from music library.')
self.log.warning('Try running in debug mode to guess why...')
return []
- queued_artists = { trk.artist for trk in self.player.queue }
+ queued_artists = { trk.Artist for trk in self.player.queue }
+ for art in queued_artists:
+ if art in ret:
+ self.log.debug('Removing already queued artist: {0}'.format(art))
+ ret = ret - queued_artists
if ret & queued_artists:
self.log.debug('Removing already queued artist: {0}'.format(ret & queued_artists))
ret = ret - queued_artists
- if self.player.current.artist in ret:
- self.log.debug('Removing current artist: {0}'.format(self.player.current.artist))
- ret = ret - {self.player.current.artist}
+ if self.player.current.Artist in ret:
+ self.log.debug('Removing current artist: {0}'.format(self.player.current.Artist))
+ ret = ret - {self.player.current.Artist}
# Move around similars items to get in unplayed|not recently played
# artist first.
self.log.info('Got {} artists in library'.format(len(ret)))
"""Retrieve album history"""
duration = self.daemon_conf.getint('sima', 'history_duration')
albums_list = set()
- for trk in self.sdb.get_history(artist=artist, duration=duration):
+ for trk in self.sdb.get_history(artist=artist.name, duration=duration):
albums_list.add(trk[1])
return albums_list
target_album_to_add = self.plugin_conf.getint('album_to_add')
for artist in artists:
self.log.info('Looking for an album to add for "%s"...' % artist)
- albums = self.player.find_albums(artist)
+ albums = self.player.search_albums(artist)
# str conversion while Album type is not propagated
albums = [str(album) for album in albums]
if albums:
"""
self.to_add = list()
nbtracks_target = self.plugin_conf.getint('track_to_add')
- webserv = self.ws()
for artist in artists:
artist = Artist(name=artist)
if len(self.to_add) == nbtracks_target:
self.log.info('Looking for a top track for {0}'.format(artist))
titles = deque()
try:
- titles = [t for t in webserv.get_toptrack(artist)]
+ titles = [t for t in self.ws.get_toptrack(artist)]
except WSError as err:
self.log.warning('{0}: {1}'.format(self.ws.name, err))
if self.ws.ratelimit:
if len(self.to_add) == nbtracks_target:
break
if not self.to_add:
- self.log.debug('Found no tracks to queue, is your ' +
- 'history getting too large?')
+ self.log.debug('Found no tracks to queue!')
return None
for track in self.to_add:
self.log.info('{1} candidates: {0!s}'.format(track, self.ws.name))
# Set persitent cache
vardir = daemon.config['sima']['var_dir']
SimaEch.cache = FileCache(join(vardir, 'http', 'EchoNest'))
- self.ws = SimaEch
+ self.ws = SimaEch()
# VIM MODLINE
# vim: ai ts=4 sw=4 sts=4 expandtab
def __init__(self, daemon):
WebService.__init__(self, daemon)
- self.ws = SimaFM
+ self.ws = SimaFM()
# Set persitent cache
vardir = daemon.config['sima']['var_dir']
persitent_cache = daemon.config.getboolean('lastfm', 'cache')