# standard library import
from difflib import get_close_matches
-from itertools import dropwhile
from select import select
# third parties components
sexit(1)
# local import
-from .lib.player import Player
-from .lib.track import Track
-from .lib.meta import Album
from .lib.simastr import SimaStr
+from .lib.player import Player, blacklist
+from .lib.track import Track
+from .lib.meta import Album, Artist
from .utils.leven import levenshtein_ratio
PlayerUnHandledError = MPDError # pylint: disable=C0103
+def bl_artist(func):
+ def wrapper(*args, **kwargs):
+ cls = args[0]
+ if not args[0].database:
+ return func(*args, **kwargs)
+ result = func(*args, **kwargs)
+ if not result:
+ return
+ names = list()
+ for art in result.names:
+ if cls.database.get_bl_artist(art, add_not=True):
+ cls.log.debug('Blacklisted "{0}"'.format(art))
+ continue
+ names.append(art)
+ if not names:
+ return
+ resp = Artist(name=names.pop(), mbid=result.mbid)
+ for name in names:
+ resp.add_alias(name)
+ return resp
+ return wrapper
-def blacklist(artist=False, album=False, track=False):
- #pylint: disable=C0111,W0212
- field = (artist, album, track)
- def decorated(func):
- def wrapper(*args, **kwargs):
- cls = args[0]
- boolgen = (bl for bl in field)
- bl_fun = (cls.database.get_bl_artist,
- cls.database.get_bl_album,
- cls.database.get_bl_track,)
- #bl_getter = next(fn for fn, bl in zip(bl_fun, boolgen) if bl is True)
- bl_getter = next(dropwhile(lambda _: not next(boolgen), bl_fun))
- #cls.log.debug('using {0} as bl filter'.format(bl_getter.__name__))
- results = list()
- for elem in func(*args, **kwargs):
- if bl_getter(elem, add_not=True):
- cls.log.debug('Blacklisted "{0}"'.format(elem))
- continue
- if track and cls.database.get_bl_album(elem, add_not=True):
- # filter album as well in track mode
- # (artist have already been)
- cls.log.debug('Blacklisted alb. "{0.album}"'.format(elem))
- continue
- results.append(elem)
- return results
- return wrapper
- return decorated
class PlayerClient(Player):
"""MPD Client
else:
self.log.info('Player: Initialising cache!')
self._cache = {
- 'artists': None,
+ 'artists': frozenset(),
+ 'nombid_artists': frozenset(),
}
- self._cache['artists'] = frozenset(self._client.list('artist'))
+ self._cache['artists'] = frozenset(self._execute('list', ['artist']))
+ if Artist.use_mbid:
+ self._cache['nombid_artists'] = frozenset(self._execute('list', ['artist', 'musicbrainz_artistid', '']))
@blacklist(track=True)
def find_track(self, artist, title=None):
- #return getattr(self, 'find')('artist', artist, 'title', title)
- if title:
- return self.find('artist', artist, 'title', title)
- return self.find('artist', artist)
+ tracks = set()
+ if artist.mbid:
+ if title:
+ tracks |= set(self.find('musicbrainz_artistid', artist.mbid,
+ 'title', title))
+ else:
+ tracks |= set(self.find('musicbrainz_artistid', artist.mbid))
+ else:
+ for name in artist.names:
+ if title:
+ tracks |= set(self.find('artist', name, 'title', title))
+ else:
+ tracks |= set(self.find('artist', name))
+ return list(tracks)
+
+ @bl_artist
+ def search_artist(self, artist):
+ """
+ Search artists based on a fuzzy search in the media library
+ >>> art = Artist(name='the beatles', mbid=<UUID4>) # mbid optional
+ >>> bea = player.search_artist(art)
+ >>> print(bea.names)
+ >>> ['The Beatles', 'Beatles', 'the beatles']
+
+ Returns an Artist object
+ """
+ found = False
+ if artist.mbid:
+ # look for exact search w/ musicbrainz_artistid
+ exact_m = self._execute('list', ['artist', 'musicbrainz_artistid', artist.mbid])
+ if exact_m:
+ [artist.add_alias(name) for name in exact_m]
+ found = True
+ else:
+ artist = Artist(name=artist.name)
+ # then complete with fuzzy search on artist with no musicbrainz_artistid
+ if artist.mbid:
+ # we already performed a lookup on artists with mbid set
+ # search through remaining artists
+ artists = self._cache.get('nombid_artists')
+ else:
+ artists = self._cache.get('artists')
+ match = get_close_matches(artist.name, artists, 50, 0.73)
+ if not match and not found:
+ return
+ if len(match) > 1:
+ self.log.debug('found close match for "%s": %s' %
+ (artist, '/'.join(match)))
+ # Does not perform fuzzy matching on short and single word strings
+ # Only lowercased comparison
+ if ' ' not in artist.name and len(artist.name) < 8:
+ for fuzz_art in match:
+ # Regular lowered string comparison
+ if artist.name.lower() == fuzz_art.lower():
+ artist.add_alias(fuzz_art)
+ return artist
+ fzartist = SimaStr(artist.name)
+ for fuzz_art in match:
+ # Regular lowered string comparison
+ if artist.name.lower() == fuzz_art.lower():
+ found = True
+ artist.add_alias(fuzz_art)
+ if artist.name != fuzz_art:
+ self.log.debug('"%s" matches "%s".' % (fuzz_art, artist))
+ continue
+ # SimaStr string __eq__ (not regular string comparison here)
+ if fzartist == fuzz_art:
+ found = True
+ artist.add_alias(fuzz_art)
+ self.log.info('"%s" quite probably matches "%s" (SimaStr)' %
+ (fuzz_art, artist))
+ if found:
+ if artist.aliases:
+ self.log.debug('Found: {}'.format('/'.join(list(artist.names)[:4])))
+ return artist
- @blacklist(track=True)
def fuzzy_find_track(self, artist, title):
# Retrieve all tracks from artist
- all_tracks = self.find('artist', artist)
+ all_tracks = self.find_track(artist, title)
# Get all titles (filter missing titles set to 'None')
all_artist_titles = frozenset([tr.title for tr in all_tracks
if tr.title is not None])
return []
return self.find('artist', artist, 'title', title_)
- @blacklist(artist=True)
- def fuzzy_find_artist(self, art):
- """
- Controls presence of artist in music library.
- Crosschecking artist names with SimaStr objects / difflib / levenshtein
-
- TODO: proceed crosschecking even when an artist matched !!!
- Not because we found "The Doors" as "The Doors" that there is no
- remaining entries as "Doors" :/
- not straight forward, need probably heavy refactoring.
- """
- matching_artists = list()
- artist = SimaStr(art)
-
- # Check against the actual string in artist list
- if artist.orig in self.artists:
- self.log.debug('found exact match for "%s"' % artist)
- return [artist]
- # Then proceed with fuzzy matching if got nothing
- match = get_close_matches(artist.orig, self.artists, 50, 0.73)
- if not match:
- return []
- self.log.debug('found close match for "%s": %s' %
- (artist, '/'.join(match)))
- # Does not perform fuzzy matching on short and single word strings
- # Only lowercased comparison
- if ' ' not in artist.orig and len(artist) < 8:
- for fuzz_art in match:
- # Regular string comparison SimaStr().lower is regular string
- if artist.lower() == fuzz_art.lower():
- matching_artists.append(fuzz_art)
- self.log.debug('"%s" matches "%s".' % (fuzz_art, artist))
- return matching_artists
- for fuzz_art in match:
- # Regular string comparison SimaStr().lower is regular string
- if artist.lower() == fuzz_art.lower():
- matching_artists.append(fuzz_art)
- self.log.debug('"%s" matches "%s".' % (fuzz_art, artist))
- return matching_artists
- # SimaStr string __eq__ (not regular string comparison here)
- if artist == fuzz_art:
- matching_artists.append(fuzz_art)
- self.log.info('"%s" quite probably matches "%s" (SimaStr)' %
- (fuzz_art, artist))
- else:
- self.log.debug('FZZZ: "%s" does not match "%s"' %
- (fuzz_art, artist))
- return matching_artists
-
def find_album(self, artist, album):
"""
Special wrapper around album search:
return self.find('artist', artist, 'album', album)
@blacklist(album=True)
- def find_albums(self, artist):
+ def search_albums(self, artist):
"""
Fetch all albums for "AlbumArtist" == artist
Filter albums returned for "artist" == artist since MPD returns any
album containing at least a single track for artist
"""
albums = []
- kwalbart = {'albumartist':artist, 'artist':artist}
- for album in self.list('album', 'albumartist', artist):
- if album not in albums:
- albums.append(Album(name=album, **kwalbart))
- for album in self.list('album', 'artist', artist):
- album_trks = [trk for trk in self.find('album', album)]
- if 'Various Artists' in [tr.albumartist for tr in album_trks]:
- self.log.debug('Discarding {0} ("Various Artists" set)'.format(album))
- continue
- arts = set([trk.artist for trk in album_trks])
- if len(set(arts)) < 2: # TODO: better heuristic, use a ratio instead
- if album not in albums:
- albums.append(Album(name=album, albumartist=artist))
- elif album and album not in albums:
- self.log.debug('"{0}" probably not an album of "{1}"'.format(
- album, artist) + '({0})'.format('/'.join(arts)))
+ for name in artist.names:
+ if len(artist.names) > 1:
+ self.log.debug('Searching album for aliase: "{}"'.format(name))
+ kwalbart = {'albumartist':name, 'artist':name}
+ for album in self.list('album', 'albumartist', artist):
+ if album and album not in albums:
+ albums.append(Album(name=album, **kwalbart))
+ for album in self.list('album', 'artist', artist):
+ album_trks = [trk for trk in self.find('album', album)]
+ if 'Various Artists' in [tr.albumartist for tr in album_trks]:
+ self.log.debug('Discarding {0} ("Various Artists" set)'.format(album))
+ continue
+ arts = set([trk.artist for trk in album_trks])
+ if len(set(arts)) < 2: # TODO: better heuristic, use a ratio instead
+ if album not in albums:
+ albums.append(Album(name=album, **kwalbart))
+ elif album and album not in albums:
+ self.log.debug('"{0}" probably not an album of "{1}"'.format(
+ album, artist) + '({0})'.format('/'.join(arts)))
return albums
def monitor(self):
def add(self, track):
"""Overriding MPD's add method to accept add signature with a Track
object"""
- self._client.add(track.file)
+ self._execute('add', [track.file])
@property
def artists(self):
raise PlayerError('Could connect to "%s", '
'but command "%s" not available' %
(host, nddcmd))
+
+ # Controls use of MusicBrainzIdentifier
+ if Artist.use_mbid:
+ if 'MUSICBRAINZ_ARTISTID' not in self._client.tagtypes():
+ self.log.warning('Use of MusicBrainzIdentifier is set but MPD is '
+ 'not providing related metadata')
+ self.log.info(self._client.tagtypes())
+ self.log.warning('Disabling MusicBrainzIdentifier')
+ Artist.use_mbid = False
+ else:
+ self.log.warning('Use of MusicBrainzIdentifier disabled!')
+ self.log.info('Consider using MusicBrainzIdentifier for your music library')
self._flush_cache()
def disconnect(self):