1 # -*- coding: utf-8 -*-
2 # Copyright (c) 2009-2021 kaliko <kaliko@azylum.org>
4 # This file is part of sima
6 # sima is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # sima is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with sima. If not, see <http://www.gnu.org/licenses/>.
19 # standard library import
20 from difflib import get_close_matches
21 from functools import wraps
22 from logging import getLogger
25 from musicpd import MPDClient, MPDError as PlayerError
29 from .lib.meta import Meta, Artist, Album
30 from .lib.track import Track
31 from .lib.simastr import SimaStr
32 from .utils.leven import levenshtein_ratio
37 def wrapper(*args, **kwargs):
40 return func(*args, **kwargs)
41 result = func(*args, **kwargs)
44 for art in result.names:
45 artist = Artist(name=art, mbid=result.mbid)
46 if cls.database.get_bl_artist(artist, add=False):
47 cls.log.debug('Artist in blocklist: %s', artist)
52 def set_artist_mbid(func):
53 def wrapper(*args, **kwargs):
55 result = func(*args, **kwargs)
57 if result and not result.mbid:
58 mbid = cls._find_musicbrainz_artistid(result)
59 artist = Artist(name=result.name, mbid=mbid)
60 artist.add_alias(result)
65 def tracks_wrapper(func):
66 """Convert plain track mapping as returned by MPDClient into :py:obj:`sima.lib.track.Track`
67 objects. This decorator accepts single track or list of tracks as input.
70 def wrapper(*args, **kwargs):
71 ret = func(*args, **kwargs)
72 if isinstance(ret, dict):
74 return [Track(**t) for t in ret]
81 Player instance inheriting from MPDClient (python-musicpd).
83 Some methods are overridden to format objects as :py:obj:`sima.lib.track.Track` for
84 instance, other are calling parent class directly through super().
89 * find methods are looking for exact match of the object provided
90 attributes in MPD music library
91 * search methods are looking for exact match + fuzzy match.
93 needed_cmds = ['status', 'stats', 'add', 'find',
94 'search', 'currentsong', 'ping']
95 needed_tags = {'Artist', 'Album', 'AlbumArtist', 'Title', 'Track'}
96 needed_mbid_tags = {'MUSICBRAINZ_ARTISTID', 'MUSICBRAINZ_ALBUMID',
97 'MUSICBRAINZ_ALBUMARTISTID', 'MUSICBRAINZ_TRACKID'}
98 MPD_supported_tags = {'Artist', 'ArtistSort', 'Album', 'AlbumSort', 'AlbumArtist',
99 'AlbumArtistSort', 'Title', 'Track', 'Name', 'Genre',
100 'Date', 'OriginalDate', 'Composer', 'Performer',
101 'Conductor', 'Work', 'Grouping', 'Disc', 'Label',
102 'MUSICBRAINZ_ARTISTID', 'MUSICBRAINZ_ALBUMID',
103 'MUSICBRAINZ_ALBUMARTISTID', 'MUSICBRAINZ_TRACKID',
104 'MUSICBRAINZ_RELEASETRACKID', 'MUSICBRAINZ_WORKID'}
107 def __init__(self, config):
110 self.log = getLogger('sima')
114 # ######### Overriding MPDClient ###########
115 def __getattr__(self, cmd):
116 """Wrapper around MPDClient calls for abstract overriding"""
117 track_wrapped = {'currentsong', 'find', 'playlistinfo', }
119 if cmd in track_wrapped:
120 return tracks_wrapper(super().__getattr__(cmd))
121 return super().__getattr__(cmd)
122 except OSError as err:
123 raise PlayerError(err)
125 def disconnect(self):
126 """Overriding explicitly MPDClient.disconnect()"""
131 """Overriding explicitly MPDClient.connect()"""
132 mpd_config = self.config['MPD']
133 # host, port, password
134 host = mpd_config.get('host')
135 port = mpd_config.get('port')
136 password = mpd_config.get('password', fallback=None)
139 super().connect(host, port)
140 # Catch socket errors
141 except OSError as err:
142 raise PlayerError('Could not connect to "%s:%s": %s' %
143 (host, port, err.strerror))
144 # Catch all other possible errors
145 # ConnectionError and ProtocolError are always fatal. Others may not
146 # be, but we don't know how to handle them here, so treat them as if
147 # they are instead of ignoring them.
148 except PlayerError as err:
149 raise PlayerError('Could not connect to "%s:%s": %s' %
153 self.password(password)
154 except (PlayerError, OSError) as err:
155 raise PlayerError("Could not connect to '%s': %s" % (host, err))
156 # Controls we have sufficient rights
157 available_cmd = self.commands()
158 for cmd in MPD.needed_cmds:
159 if cmd not in available_cmd:
161 raise PlayerError('Could connect to "%s", '
162 'but command "%s" not available' %
164 self.tagtypes('clear')
165 for tag in MPD.needed_tags:
166 self.tagtypes('enable', tag)
167 tt = set(map(str.lower, self.tagtypes()))
168 needed_tags = set(map(str.lower, MPD.needed_tags))
169 if len(needed_tags & tt) != len(MPD.needed_tags):
170 self.log.warning('MPD exposes: %s', tt)
171 self.log.warning('Tags needed: %s', needed_tags)
172 raise PlayerError('Missing mandatory metadata!')
173 for tag in MPD.needed_mbid_tags:
174 self.tagtypes('enable', tag)
175 # Controls use of MusicBrainzIdentifier
176 if self.config.getboolean('sima', 'musicbrainzid'):
177 tt = set(self.tagtypes())
178 if len(MPD.needed_mbid_tags & tt) != len(MPD.needed_mbid_tags):
179 self.log.warning('Use of MusicBrainzIdentifier is set but MPD '
180 'is not providing related metadata')
182 self.log.warning('Disabling MusicBrainzIdentifier')
183 self.use_mbid = Meta.use_mbid = False
185 self.log.debug('Available metadata: %s', tt)
186 self.use_mbid = Meta.use_mbid = True
188 self.log.warning('Use of MusicBrainzIdentifier disabled!')
189 self.log.info('Consider using MusicBrainzIdentifier for your music library')
190 self.use_mbid = Meta.use_mbid = False
192 # ######### / Overriding MPDClient #########
194 def _reset_cache(self):
196 Both flushes and instantiates _cache
198 * artists: all artists
199 * nombid_artists: artists with no mbid (set only when self.use_mbid is True)
200 * artist_tracks: caching last artist tracks, used in search_track
202 if isinstance(self._cache, dict):
203 self.log.info('Player: Flushing cache!')
205 self.log.info('Player: Initialising cache!')
206 self._cache = {'artists': frozenset(),
207 'nombid_artists': frozenset(),
209 self._cache['artists'] = frozenset(filter(None, self.list('artist')))
211 artists = self.list('artist', "(MUSICBRAINZ_ARTISTID == '')")
212 self._cache['nombid_artists'] = frozenset(filter(None, artists))
214 def _skipped_track(self, previous):
215 if (self.state == 'stop'
216 or not hasattr(previous, 'id')
217 or not hasattr(self.current, 'id')):
219 return self.current.id != previous.id # pylint: disable=no-member
222 """Monitor player for change
223 Returns a list a events among:
225 * database player media library has changed
226 * playlist playlist modified
227 * options player options changed: repeat mode, etc…
228 * player player state changed: paused, stopped, skip track…
229 * skipped current track skipped
232 ret = self.idle('database', 'playlist', 'player', 'options')
233 if self._skipped_track(curr):
234 ret.append('skipped')
235 if 'database' in ret:
240 """Clean blocking event (idle) and pending commands
242 if 'idle' in self._pending:
245 self.log.warning('pending commands: %s', self._pending)
247 def add(self, payload):
248 """Overriding MPD's add method to accept Track objects
250 :param Track,list payload: Either a single track or a list of it
252 if isinstance(payload, Track):
253 super().__getattr__('add')(payload.file)
254 elif isinstance(payload, list):
255 self.command_list_ok_begin()
256 map(self.add, payload)
257 self.command_list_end()
259 self.log.error('Cannot add %s', payload)
261 # ######### Properties #####################
264 return self.currentsong()
269 Override deprecated MPD playlist command
271 return self.playlistinfo()
275 plm = {'repeat': None, 'single': None,
276 'random': None, 'consume': None, }
277 for key, val in self.status().items():
278 if key in plm.keys():
279 plm.update({key: bool(int(val))})
285 curr_position = int(self.current.pos)
287 return [trk for trk in plst if int(trk.pos) > curr_position]
291 """Returns (play|stop|pause)"""
292 return str(self.status().get('state'))
293 # ######### / Properties ###################
295 # #### find_tracks ####
296 def find_tracks(self, what):
297 """Find tracks for a specific artist or album
298 >>> player.find_tracks(Artist('Nirvana'))
299 >>> player.find_tracks(Album('In Utero', artist=Artist('Nirvana'))
301 :param Artist,Album what: Artist or Album to fetch track from
302 :return: A list of track objects
305 if isinstance(what, Artist):
306 return self._find_art(what)
307 if isinstance(what, Album):
308 return self._find_alb(what)
309 if isinstance(what, str):
310 return self.find_tracks(Artist(name=what))
311 raise PlayerError('Bad input argument')
313 def _find_art(self, artist):
316 if self.database.get_bl_artist(artist, add=False):
317 self.log.info('Artist in blocklist: %s', artist)
320 tracks |= set(self.find('musicbrainz_artistid', artist.mbid))
321 for name in artist.names:
322 tracks |= set(self.find('artist', name))
324 albums = {Album(trk.Album.name, mbid=trk.musicbrainz_albumid)
326 bl_albums = {Album(a.get('album'), mbid=a.get('musicbrainz_album'))
327 for a in self.database.view_bl() if a.get('album')}
328 if albums & bl_albums:
329 self.log.info('Albums in blocklist for %s: %s', artist, albums & bl_albums)
330 tracks = {trk for trk in tracks if trk.Album not in bl_albums}
332 bl_tracks = {Track(title=t.get('title'), file=t.get('file'))
333 for t in self.database.view_bl() if t.get('title')}
334 if tracks & bl_tracks:
335 self.log.info('Tracks in blocklist for %s: %s',
336 artist, tracks & bl_tracks)
337 tracks = {trk for trk in tracks if trk not in bl_tracks}
340 def _find_alb(self, album):
341 if not hasattr(album, 'artist'):
342 raise PlayerError('Album object have no artist attribute')
343 if self.database.get_bl_album(album, add=False):
344 self.log.info('Album in blocklist: %s', album)
348 filt = f"(MUSICBRAINZ_ALBUMID == '{album.mbid}')"
349 albums = self.find(filt)
350 # Now look for album with no MusicBrainzIdentifier
351 if not albums and album.Artist.mbid: # Use album artist MBID if possible
352 filt = f"((MUSICBRAINZ_ALBUMARTISTID == '{album.Artist.mbid}') AND (album == '{album.name_sz}'))"
353 albums = self.find(filt)
354 if not albums: # Falls back to (album)?artist/album name
355 for artist in album.Artist.names_sz:
356 filt = f"((albumartist == '{artist}') AND (album == '{album.name_sz}'))"
357 albums.extend(self.find(filt))
359 # #### / find_tracks ##
361 # #### Search Methods #####
362 def _find_musicbrainz_artistid(self, artist):
363 """Find MusicBrainzArtistID when possible.
365 if not self.use_mbid:
368 for name in artist.names_sz:
369 filt = f'((artist == "{name}") AND (MUSICBRAINZ_ARTISTID != ""))'
370 mbids = self.list('MUSICBRAINZ_ARTISTID', filt)
376 self.log.debug("Got multiple MBID for artist: %r", artist)
379 if artist.mbid != mbids[0]:
380 self.log('MBID discrepancy, %s found with %s (instead of %s)',
381 artist.name, mbids[0], artist.mbid)
387 def search_artist(self, artist):
389 Search artists based on a fuzzy search in the media library
390 >>> art = Artist(name='the beatles', mbid=<UUID4>) # mbid optional
391 >>> bea = player.search_artist(art)
393 >>> ['The Beatles', 'Beatles', 'the beatles']
395 :param Artist artist: Artist to look for in MPD music library
396 :return: Artist object
401 # look for exact search w/ musicbrainz_artistid
402 library = self.list('artist', f"(MUSICBRAINZ_ARTISTID == '{artist.mbid}')")
405 self.log.trace('Found mbid "%r" in library', artist)
406 # library could fetch several artist name for a single MUSICBRAINZ_ARTISTID
408 self.log.debug('I got "%s" searching for %r', library, artist)
410 if SimaStr(artist.name) == name and name != artist.name:
411 self.log.debug('add alias for %s: %s', artist, name)
412 artist.add_alias(name)
413 elif len(library) == 1 and library[0] != artist.name:
414 new_alias = artist.name
415 self.log.info('Update artist name %s->%s', artist, library[0])
416 self.log.debug('Also add alias for %s: %s', artist, new_alias)
417 artist = Artist(name=library[0], mbid=artist.mbid)
418 artist.add_alias(new_alias)
419 # Fetches remaining artists for potential match
420 artists = self._cache['nombid_artists']
421 else: # not using MusicBrainzIDs
422 artists = self._cache['artists']
423 match = get_close_matches(artist.name, artists, 50, 0.73)
424 if not match and not found:
427 self.log.debug('found close match for "%s": %s',
428 artist, '/'.join(match))
429 # First lowercased comparison
430 for close_art in match:
431 # Regular lowered string comparison
432 if artist.name.lower() == close_art.lower():
433 artist.add_alias(close_art)
435 if artist.name != close_art:
436 self.log.debug('"%s" matches "%s".', close_art, artist)
437 # Does not perform fuzzy matching on short and single word strings
438 # Only lowercased comparison
439 if ' ' not in artist.name and len(artist.name) < 8:
440 self.log.trace('no fuzzy matching for %r', artist)
444 # Now perform fuzzy search
446 if fuzz in artist.names: # Already found in lower cased comparison
448 # SimaStr string __eq__ (not regular string comparison here)
449 if SimaStr(artist.name) == fuzz:
451 artist.add_alias(fuzz)
452 self.log.debug('"%s" quite probably matches "%s" (SimaStr)',
456 self.log.info('Found aliases: %s', '/'.join(artist.names))
460 def search_track(self, artist, title):
461 """Fuzzy search of title by an artist
463 cache = self._cache.get('artist_tracks').get(artist)
464 # Retrieve all tracks from artist
465 all_tracks = cache or self.find_tracks(artist)
467 self._cache['artist_tracks'] = {} # clean up
468 self._cache.get('artist_tracks')[artist] = all_tracks
469 # Get all titles (filter missing titles set to 'None')
470 all_artist_titles = frozenset([tr.title for tr in all_tracks
471 if tr.title is not None])
472 match = get_close_matches(title, all_artist_titles, 50, 0.78)
477 leven = levenshtein_ratio(title, mtitle)
479 tracks.extend([t for t in all_tracks if t.title == mtitle])
481 self.log.debug('title: "%s" should match "%s" (lr=%1.3f)',
482 mtitle, title, leven)
483 tracks.extend([t for t in all_tracks if t.title == mtitle])
485 self.log.debug('title: "%s" does not match "%s" (lr=%1.3f)',
486 mtitle, title, leven)
489 def search_albums(self, artist):
490 """Find potential albums for "artist"
492 * Fetch all albums for "AlbumArtist" == artist
493 → falls back to "Artist" == artist when no "AlbumArtist" tag is set
494 * Tries to filter some mutli-artists album
495 For instance an album by Artist_A may have a track by Artist_B. Then
496 looking for albums for Artist_B wrongly returns this album.
498 # First, look for all potential albums
499 self.log.debug('Searching album for "%r"', artist)
501 self.log.debug('Searching album for %s aliases: "%s"',
502 artist, artist.aliases)
504 if self.use_mbid and artist.mbid:
505 mpd_filter = f"((musicbrainz_albumartistid == '{artist.mbid}') AND ( album != ''))"
506 raw_album_id = self.list('musicbrainz_albumid', mpd_filter)
507 for albumid in raw_album_id:
508 mpd_filter = f"((musicbrainz_albumid == '{albumid}') AND ( album != ''))"
509 album_name = self.list('album', mpd_filter)
510 if not album_name: # something odd here
512 albums.add(Album(album_name[0], artist=artist.name,
513 Artist=artist, mbid=albumid))
514 for name_sz in artist.names_sz:
515 mpd_filter = f"((albumartist == '{name_sz}') AND ( album != ''))"
516 raw_albums = self.list('album', mpd_filter)
517 for alb in raw_albums:
518 if alb in [a.name for a in albums]:
523 mpd_filter = f"((albumartist == '{artist.name_sz}') AND ( album == '{_.name_sz}'))"
524 mbids = self.list('MUSICBRAINZ_ALBUMID', mpd_filter)
527 albums.add(Album(alb, artist=artist.name,
528 Artist=artist, mbid=mbid))
531 album_trks = self.find_tracks(album)
532 if not album_trks: # find_track result can be empty, blocklist applied
534 album_artists = {tr.albumartist for tr in album_trks if tr.albumartist}
535 if album.Artist.names & album_artists:
536 candidates.append(album)
538 if self.use_mbid and artist.mbid:
539 if artist.mbid == album_trks[0].musicbrainz_albumartistid:
540 candidates.append(album)
543 self.log.debug('Discarding "%s", "%r" not set as musicbrainz_albumartistid', album, album.Artist)
545 if 'Various Artists' in album_artists:
546 self.log.debug('Discarding %s ("Various Artists" set)', album)
548 if album_artists and album.Artist.name not in album_artists:
549 self.log.debug('Discarding "%s", "%s" not set as albumartist', album, album.Artist)
551 # Attempt to detect false positive (especially when no
552 # AlbumArtist/MBIDs tag ar set)
553 # Avoid selecting albums where artist is credited for a single
555 album_trks = self.find(f"(album == '{album.name_sz}')")
556 arts = [trk.artist for trk in album_trks] # Artists in the album
557 # count artist occurences
558 ratio = arts.count(album.Artist.name)/len(arts)
560 candidates.append(album)
562 self.log.debug('"%s" probably not an album of "%s" (ratio=%.2f)',
563 album, artist, ratio)
566 # #### / Search Methods ###
569 # vim: ai ts=4 sw=4 sts=4 expandtab