1 # -*- coding: utf-8 -*-
2 # Copyright (c) 2009-2021 kaliko <kaliko@azylum.org>
4 # This file is part of sima
6 # sima is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # sima is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with sima. If not, see <http://www.gnu.org/licenses/>.
19 # standard library import
20 from difflib import get_close_matches
21 from functools import wraps
22 from logging import getLogger
25 from musicpd import MPDClient, MPDError as PlayerError
29 from .lib.meta import Meta, Artist, Album
30 from .lib.track import Track
31 from .lib.simastr import SimaStr
32 from .utils.leven import levenshtein_ratio
37 def wrapper(*args, **kwargs):
40 return func(*args, **kwargs)
41 result = func(*args, **kwargs)
44 for art in result.names:
45 artist = Artist(name=art, mbid=result.mbid)
46 if cls.database.get_bl_artist(artist, add=False):
47 cls.log.debug('Artist in blocklist: %s', artist)
52 def set_artist_mbid(func):
53 def wrapper(*args, **kwargs):
55 result = func(*args, **kwargs)
57 if result and not result.mbid:
58 mbid = cls._find_musicbrainz_artistid(result)
59 artist = Artist(name=result.name, mbid=mbid)
60 artist.add_alias(result)
65 def tracks_wrapper(func):
66 """Convert plain track mapping as returned by MPDClient into :py:obj:`sima.lib.track.Track`
67 objects. This decorator accepts single track or list of tracks as input.
70 def wrapper(*args, **kwargs):
71 ret = func(*args, **kwargs)
72 if isinstance(ret, dict):
74 return [Track(**t) for t in ret]
81 Player instance inheriting from MPDClient (python-musicpd).
83 Some methods are overridden to format objects as :py:obj:`sima.lib.track.Track` for
84 instance, other are calling parent class directly through super().
89 * find methods are looking for exact match of the object provided
90 attributes in MPD music library
91 * search methods are looking for exact match + fuzzy match.
93 needed_cmds = ['status', 'stats', 'add', 'find',
94 'search', 'currentsong', 'ping']
95 needed_tags = {'Artist', 'Album', 'AlbumArtist', 'Title', 'Track'}
96 needed_mbid_tags = {'MUSICBRAINZ_ARTISTID', 'MUSICBRAINZ_ALBUMID',
97 'MUSICBRAINZ_ALBUMARTISTID', 'MUSICBRAINZ_TRACKID'}
98 MPD_supported_tags = {'Artist', 'ArtistSort', 'Album', 'AlbumSort', 'AlbumArtist',
99 'AlbumArtistSort', 'Title', 'Track', 'Name', 'Genre',
100 'Date', 'OriginalDate', 'Composer', 'Performer',
101 'Conductor', 'Work', 'Grouping', 'Disc', 'Label',
102 'MUSICBRAINZ_ARTISTID', 'MUSICBRAINZ_ALBUMID',
103 'MUSICBRAINZ_ALBUMARTISTID', 'MUSICBRAINZ_TRACKID',
104 'MUSICBRAINZ_RELEASETRACKID', 'MUSICBRAINZ_WORKID'}
107 def __init__(self, config):
110 self.log = getLogger('sima')
114 # ######### Overriding MPDClient ###########
115 def __getattr__(self, cmd):
116 """Wrapper around MPDClient calls for abstract overriding"""
117 track_wrapped = {'currentsong', 'find', 'playlistinfo', }
118 if cmd in track_wrapped:
119 return tracks_wrapper(super().__getattr__(cmd))
120 return super().__getattr__(cmd)
122 def disconnect(self):
123 """Overriding explicitly MPDClient.disconnect()"""
128 """Overriding explicitly MPDClient.connect()"""
129 mpd_config = self.config['MPD']
130 # host, port, password
131 host = mpd_config.get('host')
132 port = mpd_config.get('port')
133 password = mpd_config.get('password', fallback=None)
136 super().connect(host, port)
137 # Catch socket errors
138 except OSError as err:
139 raise PlayerError('Could not connect to "%s:%s": %s' %
140 (host, port, err.strerror))
141 # Catch all other possible errors
142 # ConnectionError and ProtocolError are always fatal. Others may not
143 # be, but we don't know how to handle them here, so treat them as if
144 # they are instead of ignoring them.
145 except PlayerError as err:
146 raise PlayerError('Could not connect to "%s:%s": %s' %
150 self.password(password)
151 except (PlayerError, OSError) as err:
152 raise PlayerError("Could not connect to '%s': %s" % (host, err))
153 # Controls we have sufficient rights
154 available_cmd = self.commands()
155 for cmd in MPD.needed_cmds:
156 if cmd not in available_cmd:
158 raise PlayerError('Could connect to "%s", '
159 'but command "%s" not available' %
161 self.tagtypes('clear')
162 for tag in MPD.needed_tags:
163 self.tagtypes('enable', tag)
164 tt = set(map(str.lower, self.tagtypes()))
165 needed_tags = set(map(str.lower, MPD.needed_tags))
166 if len(needed_tags & tt) != len(MPD.needed_tags):
167 self.log.warning('MPD exposes: %s', tt)
168 self.log.warning('Tags needed: %s', needed_tags)
169 raise PlayerError('Missing mandatory metadata!')
170 for tag in MPD.needed_mbid_tags:
171 self.tagtypes('enable', tag)
172 # Controls use of MusicBrainzIdentifier
173 if self.config.getboolean('sima', 'musicbrainzid'):
174 tt = set(self.tagtypes())
175 if len(MPD.needed_mbid_tags & tt) != len(MPD.needed_mbid_tags):
176 self.log.warning('Use of MusicBrainzIdentifier is set but MPD '
177 'is not providing related metadata')
179 self.log.warning('Disabling MusicBrainzIdentifier')
180 self.use_mbid = Meta.use_mbid = False
182 self.log.debug('Available metadata: %s', tt)
183 self.use_mbid = Meta.use_mbid = True
185 self.log.warning('Use of MusicBrainzIdentifier disabled!')
186 self.log.info('Consider using MusicBrainzIdentifier for your music library')
187 self.use_mbid = Meta.use_mbid = False
189 # ######### / Overriding MPDClient #########
191 def _reset_cache(self):
193 Both flushes and instantiates _cache
195 * artists: all artists
196 * nombid_artists: artists with no mbid (set only when self.use_mbid is True)
197 * artist_tracks: caching last artist tracks, used in search_track
199 if isinstance(self._cache, dict):
200 self.log.info('Player: Flushing cache!')
202 self.log.info('Player: Initialising cache!')
203 self._cache = {'artists': frozenset(),
204 'nombid_artists': frozenset(),
206 self._cache['artists'] = frozenset(filter(None, self.list('artist')))
208 artists = self.list('artist', "(MUSICBRAINZ_ARTISTID == '')")
209 self._cache['nombid_artists'] = frozenset(filter(None, artists))
211 def _skipped_track(self, previous):
212 if (self.state == 'stop'
213 or not hasattr(previous, 'id')
214 or not hasattr(self.current, 'id')):
216 return self.current.id != previous.id # pylint: disable=no-member
219 """Monitor player for change
220 Returns a list a events among:
222 * database player media library has changed
223 * playlist playlist modified
224 * options player options changed: repeat mode, etc…
225 * player player state changed: paused, stopped, skip track…
226 * skipped current track skipped
230 ret = self.idle('database', 'playlist', 'player', 'options')
231 except (PlayerError, OSError) as err:
232 raise PlayerError("Couldn't init idle: %s" % err)
233 if self._skipped_track(curr):
234 ret.append('skipped')
235 if 'database' in ret:
240 """Clean blocking event (idle) and pending commands
242 if 'idle' in self._pending:
245 self.log.warning('pending commands: %s', self._pending)
247 def add(self, payload):
248 """Overriding MPD's add method to accept Track objects
250 :param Track,list payload: Either a single track or a list of it
252 if isinstance(payload, Track):
253 super().__getattr__('add')(payload.file)
254 elif isinstance(payload, list):
255 self.command_list_ok_begin()
256 map(self.add, payload)
257 self.command_list_end()
259 self.log.error('Cannot add %s', payload)
261 # ######### Properties #####################
264 return self.currentsong()
269 Override deprecated MPD playlist command
271 return self.playlistinfo()
275 plm = {'repeat': None, 'single': None,
276 'random': None, 'consume': None, }
277 for key, val in self.status().items():
278 if key in plm.keys():
279 plm.update({key: bool(int(val))})
285 curr_position = int(self.current.pos)
287 return [trk for trk in plst if int(trk.pos) > curr_position]
291 """Returns (play|stop|pause)"""
292 return str(self.status().get('state'))
293 # ######### / Properties ###################
295 # #### find_tracks ####
296 def find_tracks(self, what):
297 """Find tracks for a specific artist or album
298 >>> player.find_tracks(Artist('Nirvana'))
299 >>> player.find_tracks(Album('In Utero', artist=Artist('Nirvana'))
301 :param Artist,Album what: Artist or Album to fetch track from
302 :return: A list of track objects
305 if isinstance(what, Artist):
306 return self._find_art(what)
307 if isinstance(what, Album):
308 return self._find_alb(what)
309 if isinstance(what, str):
310 return self.find_tracks(Artist(name=what))
311 raise PlayerError('Bad input argument')
313 def _find_art(self, artist):
316 if self.database.get_bl_artist(artist, add=False):
317 self.log.info('Artist in blocklist: %s', artist)
320 tracks |= set(self.find('musicbrainz_artistid', artist.mbid))
321 for name in artist.names:
322 tracks |= set(self.find('artist', name))
324 albums = {Album(trk.Album.name, mbid=trk.musicbrainz_albumid)
326 bl_albums = {Album(a.get('album'), mbid=a.get('musicbrainz_album'))
327 for a in self.database.view_bl() if a.get('album')}
328 if albums & bl_albums:
329 self.log.info('Albums in blocklist for %s: %s', artist, albums & bl_albums)
330 tracks = {trk for trk in tracks if trk.Album not in bl_albums}
332 bl_tracks = {Track(title=t.get('title'), file=t.get('file'))
333 for t in self.database.view_bl() if t.get('title')}
334 if tracks & bl_tracks:
335 self.log.info('Tracks in blocklist for %s: %s',
336 artist, tracks & bl_tracks)
337 tracks = {trk for trk in tracks if trk not in bl_tracks}
340 def _find_alb(self, album):
341 if not hasattr(album, 'artist'):
342 raise PlayerError('Album object have no artist attribute')
343 if self.database.get_bl_album(album, add=False):
344 self.log.info('Album in blocklist: %s', album)
348 filt = f"(MUSICBRAINZ_ALBUMID == '{album.mbid}')"
349 albums = self.find(filt)
350 # Now look for album with no MusicBrainzIdentifier
351 if not albums and album.Artist.mbid: # Use album artist MBID if possible
352 filt = f"((MUSICBRAINZ_ALBUMARTISTID == '{album.Artist.mbid}') AND (album == '{album.name_sz}'))"
353 albums = self.find(filt)
354 if not albums: # Falls back to (album)?artist/album name
355 for artist in album.Artist.names_sz:
356 filt = f"((albumartist == '{artist}') AND (album == '{album.name_sz}'))"
357 albums.extend(self.find(filt))
359 # #### / find_tracks ##
361 # #### Search Methods #####
362 def _find_musicbrainz_artistid(self, artist):
363 """Find MusicBrainzArtistID when possible.
365 if not self.use_mbid:
368 for name in artist.names_sz:
369 filt = f'((artist == "{name}") AND (MUSICBRAINZ_ARTISTID != ""))'
370 mbids = self.list('MUSICBRAINZ_ARTISTID', filt)
376 self.log.debug("Got multiple MBID for artist: %r", artist)
379 if artist.mbid != mbids[0]:
380 self.log('MBID discrepancy, %s found with %s (instead of %s)',
381 artist.name, mbids[0], artist.mbid)
387 def search_artist(self, artist):
389 Search artists based on a fuzzy search in the media library
390 >>> art = Artist(name='the beatles', mbid=<UUID4>) # mbid optional
391 >>> bea = player.search_artist(art)
393 >>> ['The Beatles', 'Beatles', 'the beatles']
395 :param Artist artist: Artist to look for in MPD music library
396 :return: Artist object
401 # look for exact search w/ musicbrainz_artistid
402 library = self.list('artist', f"(MUSICBRAINZ_ARTISTID == '{artist.mbid}')")
405 self.log.trace('Found mbid "%r" in library', artist)
406 # library could fetch several artist name for a single MUSICBRAINZ_ARTISTID
408 self.log.debug('I got "%s" searching for %r', library, artist)
410 if SimaStr(artist.name) == name and name != artist.name:
411 self.log.debug('add alias for %s: %s', artist, name)
412 artist.add_alias(name)
413 elif len(library) == 1 and library[0] != artist.name:
414 new_alias = artist.name
415 self.log.info('Update artist name %s->%s', artist, library[0])
416 self.log.debug('Also add alias for %s: %s', artist, new_alias)
417 artist = Artist(name=library[0], mbid=artist.mbid)
418 artist.add_alias(new_alias)
419 # Fetches remaining artists for potential match
420 artists = self._cache['nombid_artists']
421 else: # not using MusicBrainzIDs
422 artists = self._cache['artists']
423 match = get_close_matches(artist.name, artists, 50, 0.73)
424 if not match and not found:
427 self.log.debug('found close match for "%s": %s',
428 artist, '/'.join(match))
429 # First lowercased comparison
430 for close_art in match:
431 # Regular lowered string comparison
432 if artist.name.lower() == close_art.lower():
433 artist.add_alias(close_art)
435 if artist.name != close_art:
436 self.log.debug('"%s" matches "%s".', close_art, artist)
437 # Does not perform fuzzy matching on short and single word strings
438 # Only lowercased comparison
439 if ' ' not in artist.name and len(artist.name) < 8:
440 self.log.trace('no fuzzy matching for %r', artist)
444 # Now perform fuzzy search
446 if fuzz in artist.names: # Already found in lower cased comparison
448 # SimaStr string __eq__ (not regular string comparison here)
449 if SimaStr(artist.name) == fuzz:
451 artist.add_alias(fuzz)
452 self.log.debug('"%s" quite probably matches "%s" (SimaStr)',
456 self.log.info('Found aliases: %s', '/'.join(artist.names))
460 def search_track(self, artist, title):
461 """Fuzzy search of title by an artist
463 cache = self._cache.get('artist_tracks').get(artist)
464 # Retrieve all tracks from artist
465 all_tracks = cache or self.find_tracks(artist)
467 self._cache['artist_tracks'] = {} # clean up
468 self._cache.get('artist_tracks')[artist] = all_tracks
469 # Get all titles (filter missing titles set to 'None')
470 all_artist_titles = frozenset([tr.title for tr in all_tracks
471 if tr.title is not None])
472 match = get_close_matches(title, all_artist_titles, 50, 0.78)
477 leven = levenshtein_ratio(title, mtitle)
479 tracks.extend([t for t in all_tracks if t.title == mtitle])
481 self.log.debug('title: "%s" should match "%s" (lr=%1.3f)',
482 mtitle, title, leven)
483 tracks.extend([t for t in all_tracks if t.title == mtitle])
485 self.log.debug('title: "%s" does not match "%s" (lr=%1.3f)',
486 mtitle, title, leven)
489 def search_albums(self, artist):
490 """Find potential albums for "artist"
492 * Fetch all albums for "AlbumArtist" == artist
493 → falls back to "Artist" == artist when no "AlbumArtist" tag is set
494 * Tries to filter some mutli-artists album
495 For instance an album by Artist_A may have a track by Artist_B. Then
496 looking for albums for Artist_B wrongly returns this album.
498 # First, look for all potential albums
499 self.log.debug('Searching album for "%r"', artist)
501 self.log.debug('Searching album for %s aliases: "%s"',
502 artist, artist.aliases)
504 if self.use_mbid and artist.mbid:
505 mpd_filter = f"((musicbrainz_albumartistid == '{artist.mbid}') AND ( album != ''))"
506 raw_album_id = self.list('musicbrainz_albumid', mpd_filter)
507 for albumid in raw_album_id:
508 mpd_filter = f"((musicbrainz_albumid == '{albumid}') AND ( album != ''))"
509 album_name = self.list('album', mpd_filter)
510 if not album_name: # something odd here
512 albums.add(Album(album_name[0], artist=artist.name,
513 Artist=artist, mbid=albumid))
514 for name_sz in artist.names_sz:
515 mpd_filter = f"((albumartist == '{name_sz}') AND ( album != ''))"
516 raw_albums = self.list('album', mpd_filter)
517 for alb in raw_albums:
518 if alb in [a.name for a in albums]:
523 mpd_filter = f"((albumartist == '{artist.name_sz}') AND ( album == '{_.name_sz}'))"
524 mbids = self.list('MUSICBRAINZ_ALBUMID', mpd_filter)
527 albums.add(Album(alb, artist=artist.name,
528 Artist=artist, mbid=mbid))
531 album_trks = self.find_tracks(album)
532 if not album_trks: # find_track result can be empty, blocklist applied
534 album_artists = {tr.albumartist for tr in album_trks if tr.albumartist}
535 if album.Artist.names & album_artists:
536 candidates.append(album)
538 if self.use_mbid and artist.mbid:
539 if artist.mbid == album_trks[0].musicbrainz_albumartistid:
540 candidates.append(album)
543 self.log.debug('Discarding "%s", "%r" not set as musicbrainz_albumartistid', album, album.Artist)
545 if 'Various Artists' in album_artists:
546 self.log.debug('Discarding %s ("Various Artists" set)', album)
548 if album_artists and album.Artist.name not in album_artists:
549 self.log.debug('Discarding "%s", "%s" not set as albumartist', album, album.Artist)
551 # Attempt to detect false positive (especially when no
552 # AlbumArtist/MBIDs tag ar set)
553 # Avoid selecting albums where artist is credited for a single
555 album_trks = self.find(f"(album == '{album.name_sz}')")
556 arts = [trk.artist for trk in album_trks] # Artists in the album
557 # count artist occurences
558 ratio = arts.count(album.Artist.name)/len(arts)
560 candidates.append(album)
562 self.log.debug('"%s" probably not an album of "%s" (ratio=%.2f)',
563 album, artist, ratio)
566 # #### / Search Methods ###
569 # vim: ai ts=4 sw=4 sts=4 expandtab