1 # -*- coding: utf-8 -*-
2 # Copyright (c) 2009-2021 kaliko <kaliko@azylum.org>
4 # This file is part of sima
6 # sima is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # sima is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with sima. If not, see <http://www.gnu.org/licenses/>.
19 # standard library import
20 from difflib import get_close_matches
21 from functools import wraps
22 from logging import getLogger
25 from musicpd import MPDClient, MPDError
29 from .lib.meta import Meta, Artist, Album
30 from .lib.track import Track
31 from .lib.simastr import SimaStr
32 from .utils.leven import levenshtein_ratio
35 class PlayerError(MPDError):
36 """Fatal error in the player."""
41 def wrapper(*args, **kwargs):
44 return func(*args, **kwargs)
45 result = func(*args, **kwargs)
48 for art in result.names:
49 artist = Artist(name=art, mbid=result.mbid)
50 if cls.database.get_bl_artist(artist, add=False):
51 cls.log.debug('Artist in blocklist: %s', artist)
56 def set_artist_mbid(func):
57 def wrapper(*args, **kwargs):
59 result = func(*args, **kwargs)
61 if result and not result.mbid:
62 mbid = cls._find_musicbrainz_artistid(result)
63 artist = Artist(name=result.name, mbid=mbid)
64 artist.add_alias(result)
69 def tracks_wrapper(func):
70 """Convert plain track mapping as returned by MPDClient into :py:obj:`sima.lib.track.Track`
71 objects. This decorator accepts single track or list of tracks as input.
74 def wrapper(*args, **kwargs):
75 ret = func(*args, **kwargs)
76 if isinstance(ret, dict):
78 return [Track(**t) for t in ret]
85 Player instance inheriting from MPDClient (python-musicpd).
87 Some methods are overridden to format objects as :py:obj:`sima.lib.track.Track` for
88 instance, other are calling parent class directly through super().
93 * find methods are looking for exact match of the object provided
94 attributes in MPD music library
95 * search methods are looking for exact match + fuzzy match.
97 needed_cmds = ['status', 'stats', 'add', 'find',
98 'search', 'currentsong', 'ping']
99 needed_tags = {'Artist', 'Album', 'AlbumArtist', 'Title', 'Track'}
100 needed_mbid_tags = {'MUSICBRAINZ_ARTISTID', 'MUSICBRAINZ_ALBUMID',
101 'MUSICBRAINZ_ALBUMARTISTID', 'MUSICBRAINZ_TRACKID'}
102 MPD_supported_tags = {'Artist', 'ArtistSort', 'Album', 'AlbumSort', 'AlbumArtist',
103 'AlbumArtistSort', 'Title', 'Track', 'Name', 'Genre',
104 'Date', 'OriginalDate', 'Composer', 'Performer',
105 'Conductor', 'Work', 'Grouping', 'Disc', 'Label',
106 'MUSICBRAINZ_ARTISTID', 'MUSICBRAINZ_ALBUMID',
107 'MUSICBRAINZ_ALBUMARTISTID', 'MUSICBRAINZ_TRACKID',
108 'MUSICBRAINZ_RELEASETRACKID', 'MUSICBRAINZ_WORKID'}
111 def __init__(self, config):
114 self.log = getLogger('sima')
118 # ######### Overriding MPDClient ###########
119 def __getattr__(self, cmd):
120 """Wrapper around MPDClient calls for abstract overriding"""
121 track_wrapped = {'currentsong', 'find', 'playlistinfo', }
122 if cmd in track_wrapped:
123 return tracks_wrapper(super().__getattr__(cmd))
124 return super().__getattr__(cmd)
126 def disconnect(self):
127 """Overriding explicitly MPDClient.disconnect()"""
132 """Overriding explicitly MPDClient.connect()"""
133 mpd_config = self.config['MPD']
134 # host, port, password
135 host = mpd_config.get('host')
136 port = mpd_config.get('port')
137 password = mpd_config.get('password', fallback=None)
140 super().connect(host, port)
141 # Catch socket errors
142 except IOError as err:
143 raise PlayerError('Could not connect to "%s:%s": %s' %
144 (host, port, err.strerror))
145 # Catch all other possible errors
146 # ConnectionError and ProtocolError are always fatal. Others may not
147 # be, but we don't know how to handle them here, so treat them as if
148 # they are instead of ignoring them.
149 except MPDError as err:
150 raise PlayerError('Could not connect to "%s:%s": %s' %
154 self.password(password)
155 except (MPDError, IOError) as err:
156 raise PlayerError("Could not connect to '%s': %s" % (host, err))
157 # Controls we have sufficient rights
158 available_cmd = self.commands()
159 for cmd in MPD.needed_cmds:
160 if cmd not in available_cmd:
162 raise PlayerError('Could connect to "%s", '
163 'but command "%s" not available' %
165 self.tagtypes('clear')
166 for tag in MPD.needed_tags:
167 self.tagtypes('enable', tag)
168 tt = set(map(str.lower, self.tagtypes()))
169 needed_tags = set(map(str.lower, MPD.needed_tags))
170 if len(needed_tags & tt) != len(MPD.needed_tags):
171 self.log.warning('MPD exposes: %s', tt)
172 self.log.warning('Tags needed: %s', needed_tags)
173 raise PlayerError('Missing mandatory metadata!')
174 for tag in MPD.needed_mbid_tags:
175 self.tagtypes('enable', tag)
176 # Controls use of MusicBrainzIdentifier
177 if self.config.getboolean('sima', 'musicbrainzid'):
178 tt = set(self.tagtypes())
179 if len(MPD.needed_mbid_tags & tt) != len(MPD.needed_mbid_tags):
180 self.log.warning('Use of MusicBrainzIdentifier is set but MPD '
181 'is not providing related metadata')
183 self.log.warning('Disabling MusicBrainzIdentifier')
184 self.use_mbid = Meta.use_mbid = False
186 self.log.debug('Available metadata: %s', tt)
187 self.use_mbid = Meta.use_mbid = True
189 self.log.warning('Use of MusicBrainzIdentifier disabled!')
190 self.log.info('Consider using MusicBrainzIdentifier for your music library')
191 self.use_mbid = Meta.use_mbid = False
193 # ######### / Overriding MPDClient #########
195 def _reset_cache(self):
197 Both flushes and instantiates _cache
199 * artists: all artists
200 * nombid_artists: artists with no mbid (set only when self.use_mbid is True)
201 * artist_tracks: caching last artist tracks, used in search_track
203 if isinstance(self._cache, dict):
204 self.log.info('Player: Flushing cache!')
206 self.log.info('Player: Initialising cache!')
207 self._cache = {'artists': frozenset(),
208 'nombid_artists': frozenset(),
210 self._cache['artists'] = frozenset(filter(None, self.list('artist')))
212 artists = self.list('artist', "(MUSICBRAINZ_ARTISTID == '')")
213 self._cache['nombid_artists'] = frozenset(filter(None, artists))
215 def _skipped_track(self, previous):
216 if (self.state == 'stop'
217 or not hasattr(previous, 'id')
218 or not hasattr(self.current, 'id')):
220 return self.current.id != previous.id # pylint: disable=no-member
223 """Monitor player for change
224 Returns a list a events among:
226 * database player media library has changed
227 * playlist playlist modified
228 * options player options changed: repeat mode, etc…
229 * player player state changed: paused, stopped, skip track…
230 * skipped current track skipped
234 ret = self.idle('database', 'playlist', 'player', 'options')
235 except (MPDError, IOError) as err:
236 raise PlayerError("Couldn't init idle: %s" % err)
237 if self._skipped_track(curr):
238 ret.append('skipped')
239 if 'database' in ret:
244 """Clean blocking event (idle) and pending commands
246 if 'idle' in self._pending:
249 self.log.warning('pending commands: %s', self._pending)
251 def add(self, payload):
252 """Overriding MPD's add method to accept Track objects
254 :param Track,list payload: Either a single track or a list of it
256 if isinstance(payload, Track):
257 super().__getattr__('add')(payload.file)
258 elif isinstance(payload, list):
259 self.command_list_ok_begin()
260 map(self.add, payload)
261 self.command_list_end()
263 self.log.error('Cannot add %s', payload)
265 # ######### Properties #####################
268 return self.currentsong()
273 Override deprecated MPD playlist command
275 return self.playlistinfo()
279 plm = {'repeat': None, 'single': None,
280 'random': None, 'consume': None, }
281 for key, val in self.status().items():
282 if key in plm.keys():
283 plm.update({key: bool(int(val))})
289 curr_position = int(self.current.pos)
291 return [trk for trk in plst if int(trk.pos) > curr_position]
295 """Returns (play|stop|pause)"""
296 return str(self.status().get('state'))
297 # ######### / Properties ###################
299 # #### find_tracks ####
300 def find_tracks(self, what):
301 """Find tracks for a specific artist or album
302 >>> player.find_tracks(Artist('Nirvana'))
303 >>> player.find_tracks(Album('In Utero', artist=Artist('Nirvana'))
305 :param Artist,Album what: Artist or Album to fetch track from
306 :return: A list of track objects
309 if isinstance(what, Artist):
310 return self._find_art(what)
311 if isinstance(what, Album):
312 return self._find_alb(what)
313 if isinstance(what, str):
314 return self.find_tracks(Artist(name=what))
315 raise PlayerError('Bad input argument')
317 def _find_art(self, artist):
320 if self.database.get_bl_artist(artist, add=False):
321 self.log.info('Artist in blocklist: %s', artist)
324 tracks |= set(self.find('musicbrainz_artistid', artist.mbid))
325 for name in artist.names:
326 tracks |= set(self.find('artist', name))
328 albums = {Album(trk.Album.name, mbid=trk.musicbrainz_albumid)
330 bl_albums = {Album(a.get('album'), mbid=a.get('musicbrainz_album'))
331 for a in self.database.view_bl() if a.get('album')}
332 if albums & bl_albums:
333 self.log.info('Albums in blocklist for %s: %s', artist, albums & bl_albums)
334 tracks = {trk for trk in tracks if trk.Album not in bl_albums}
336 bl_tracks = {Track(title=t.get('title'), file=t.get('file'))
337 for t in self.database.view_bl() if t.get('title')}
338 if tracks & bl_tracks:
339 self.log.info('Tracks in blocklist for %s: %s',
340 artist, tracks & bl_tracks)
341 tracks = {trk for trk in tracks if trk not in bl_tracks}
344 def _find_alb(self, album):
345 if not hasattr(album, 'artist'):
346 raise PlayerError('Album object have no artist attribute')
347 if self.database.get_bl_album(album, add=False):
348 self.log.info('Album in blocklist: %s', album)
352 filt = f"(MUSICBRAINZ_ALBUMID == '{album.mbid}')"
353 albums = self.find(filt)
354 # Now look for album with no MusicBrainzIdentifier
355 if not albums and album.Artist.mbid: # Use album artist MBID if possible
356 filt = f"((MUSICBRAINZ_ALBUMARTISTID == '{album.Artist.mbid}') AND (album == '{album.name_sz}'))"
357 albums = self.find(filt)
358 if not albums: # Falls back to (album)?artist/album name
359 for artist in album.Artist.names_sz:
360 filt = f"((albumartist == '{artist}') AND (album == '{album.name_sz}'))"
361 albums.extend(self.find(filt))
363 # #### / find_tracks ##
365 # #### Search Methods #####
366 def _find_musicbrainz_artistid(self, artist):
367 """Find MusicBrainzArtistID when possible.
369 if not self.use_mbid:
372 for name in artist.names_sz:
373 filt = f'((artist == "{name}") AND (MUSICBRAINZ_ARTISTID != ""))'
374 mbids = self.list('MUSICBRAINZ_ARTISTID', filt)
380 self.log.debug("Got multiple MBID for artist: %r", artist)
383 if artist.mbid != mbids[0]:
384 self.log('MBID discrepancy, %s found with %s (instead of %s)',
385 artist.name, mbids[0], artist.mbid)
391 def search_artist(self, artist):
393 Search artists based on a fuzzy search in the media library
394 >>> art = Artist(name='the beatles', mbid=<UUID4>) # mbid optional
395 >>> bea = player.search_artist(art)
397 >>> ['The Beatles', 'Beatles', 'the beatles']
399 :param Artist artist: Artist to look for in MPD music library
400 :return: Artist object
405 # look for exact search w/ musicbrainz_artistid
406 library = self.list('artist', f"(MUSICBRAINZ_ARTISTID == '{artist.mbid}')")
409 self.log.trace('Found mbid "%r" in library', artist)
410 # library could fetch several artist name for a single MUSICBRAINZ_ARTISTID
412 self.log.debug('I got "%s" searching for %r', library, artist)
414 if SimaStr(artist.name) == name and name != artist.name:
415 self.log.debug('add alias for %s: %s', artist, name)
416 artist.add_alias(name)
417 elif len(library) == 1 and library[0] != artist.name:
418 new_alias = artist.name
419 self.log.info('Update artist name %s->%s', artist, library[0])
420 self.log.debug('Also add alias for %s: %s', artist, new_alias)
421 artist = Artist(name=library[0], mbid=artist.mbid)
422 artist.add_alias(new_alias)
423 # Fetches remaining artists for potential match
424 artists = self._cache['nombid_artists']
425 else: # not using MusicBrainzIDs
426 artists = self._cache['artists']
427 match = get_close_matches(artist.name, artists, 50, 0.73)
428 if not match and not found:
431 self.log.debug('found close match for "%s": %s',
432 artist, '/'.join(match))
433 # First lowercased comparison
434 for close_art in match:
435 # Regular lowered string comparison
436 if artist.name.lower() == close_art.lower():
437 artist.add_alias(close_art)
439 if artist.name != close_art:
440 self.log.debug('"%s" matches "%s".', close_art, artist)
441 # Does not perform fuzzy matching on short and single word strings
442 # Only lowercased comparison
443 if ' ' not in artist.name and len(artist.name) < 8:
444 self.log.trace('no fuzzy matching for %r', artist)
448 # Now perform fuzzy search
450 if fuzz in artist.names: # Already found in lower cased comparison
452 # SimaStr string __eq__ (not regular string comparison here)
453 if SimaStr(artist.name) == fuzz:
455 artist.add_alias(fuzz)
456 self.log.debug('"%s" quite probably matches "%s" (SimaStr)',
460 self.log.info('Found aliases: %s', '/'.join(artist.names))
464 def search_track(self, artist, title):
465 """Fuzzy search of title by an artist
467 cache = self._cache.get('artist_tracks').get(artist)
468 # Retrieve all tracks from artist
469 all_tracks = cache or self.find_tracks(artist)
471 self._cache['artist_tracks'] = {} # clean up
472 self._cache.get('artist_tracks')[artist] = all_tracks
473 # Get all titles (filter missing titles set to 'None')
474 all_artist_titles = frozenset([tr.title for tr in all_tracks
475 if tr.title is not None])
476 match = get_close_matches(title, all_artist_titles, 50, 0.78)
481 leven = levenshtein_ratio(title, mtitle)
483 tracks.extend([t for t in all_tracks if t.title == mtitle])
485 self.log.debug('title: "%s" should match "%s" (lr=%1.3f)',
486 mtitle, title, leven)
487 tracks.extend([t for t in all_tracks if t.title == mtitle])
489 self.log.debug('title: "%s" does not match "%s" (lr=%1.3f)',
490 mtitle, title, leven)
493 def search_albums(self, artist):
494 """Find potential albums for "artist"
496 * Fetch all albums for "AlbumArtist" == artist
497 → falls back to "Artist" == artist when no "AlbumArtist" tag is set
498 * Tries to filter some mutli-artists album
499 For instance an album by Artist_A may have a track by Artist_B. Then
500 looking for albums for Artist_B wrongly returns this album.
502 # First, look for all potential albums
503 self.log.debug('Searching album for "%r"', artist)
505 self.log.debug('Searching album for %s aliases: "%s"',
506 artist, artist.aliases)
508 if self.use_mbid and artist.mbid:
509 mpd_filter = f"((musicbrainz_albumartistid == '{artist.mbid}') AND ( album != ''))"
510 raw_album_id = self.list('musicbrainz_albumid', mpd_filter)
511 for albumid in raw_album_id:
512 mpd_filter = f"((musicbrainz_albumid == '{albumid}') AND ( album != ''))"
513 album_name = self.list('album', mpd_filter)
514 if not album_name: # something odd here
516 albums.add(Album(album_name[0], artist=artist.name,
517 Artist=artist, mbid=albumid))
518 for name_sz in artist.names_sz:
519 mpd_filter = f"((albumartist == '{name_sz}') AND ( album != ''))"
520 raw_albums = self.list('album', mpd_filter)
521 for alb in raw_albums:
522 if alb in [a.name for a in albums]:
527 mpd_filter = f"((albumartist == '{artist.name_sz}') AND ( album == '{_.name_sz}'))"
528 mbids = self.list('MUSICBRAINZ_ALBUMID', mpd_filter)
531 albums.add(Album(alb, artist=artist.name,
532 Artist=artist, mbid=mbid))
535 album_trks = self.find_tracks(album)
536 if not album_trks: # find_track result can be empty, blocklist applied
538 album_artists = {tr.albumartist for tr in album_trks if tr.albumartist}
539 if album.Artist.names & album_artists:
540 candidates.append(album)
542 if self.use_mbid and artist.mbid:
543 if artist.mbid == album_trks[0].musicbrainz_albumartistid:
544 candidates.append(album)
547 self.log.debug('Discarding "%s", "%r" not set as musicbrainz_albumartistid', album, album.Artist)
549 if 'Various Artists' in album_artists:
550 self.log.debug('Discarding %s ("Various Artists" set)', album)
552 if album_artists and album.Artist.name not in album_artists:
553 self.log.debug('Discarding "%s", "%s" not set as albumartist', album, album.Artist)
555 # Attempt to detect false positive (especially when no
556 # AlbumArtist/MBIDs tag ar set)
557 # Avoid selecting albums where artist is credited for a single
559 album_trks = self.find(f"(album == '{album.name_sz}')")
560 arts = [trk.artist for trk in album_trks] # Artists in the album
561 # count artist occurences
562 ratio = arts.count(album.Artist.name)/len(arts)
564 candidates.append(album)
566 self.log.debug('"%s" probably not an album of "%s" (ratio=%.2f)',
567 album, artist, ratio)
570 # #### / Search Methods ###
573 # vim: ai ts=4 sw=4 sts=4 expandtab