1 # -*- coding: utf-8 -*-
2 # Copyright (c) 2009-2022, 2024 kaliko <kaliko@azylum.org>
4 # This file is part of sima
6 # sima is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # sima is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with sima. If not, see <http://www.gnu.org/licenses/>.
19 # standard library import
20 from difflib import get_close_matches
21 from functools import wraps
22 from logging import getLogger
23 from select import select
26 from musicpd import MPDClient, MPDError as PlayerError
30 from .lib.meta import Meta, Artist, Album
31 from .lib.track import Track
32 from .lib.simastr import SimaStr
33 from .utils.leven import levenshtein_ratio
34 from .utils.utils import get_decorator
39 def wrapper(*args, **kwargs):
42 return func(*args, **kwargs)
43 result = func(*args, **kwargs)
46 for art in result.names:
47 artist = Artist(name=art, mbid=result.mbid)
48 if cls.database.get_bl_artist(artist, add=False):
49 cls.log.debug('Artist in blocklist: %s', artist)
55 def set_artist_mbid(func):
56 def wrapper(*args, **kwargs):
58 result = func(*args, **kwargs)
60 if result and not result.mbid:
61 mbid = cls._find_musicbrainz_artistid(result)
62 artist = Artist(name=result.name, mbid=mbid)
63 artist.add_alias(result)
69 def tracks_wrapper(func):
70 """Convert plain track mapping as returned by MPDClient into :py:obj:`sima.lib.track.Track`
71 objects. This decorator accepts single track or list of tracks as input.
74 def wrapper(*args, **kwargs):
75 ret = func(*args, **kwargs)
76 if isinstance(ret, dict):
78 return [Track(**t) for t in ret]
81 # Decorator to wrap non MPDError exceptions from musicpd in PlayerError
82 we = get_decorator(errors=(OSError, TimeoutError), wrap_into=PlayerError)
88 Player instance inheriting from MPDClient (python-musicpd).
90 Some methods are overridden to format objects as :py:obj:`sima.lib.track.Track` for
91 instance, other are calling parent class directly through super().
96 * find methods are looking for exact match of the object provided
97 attributes in MPD music library
98 * search methods are looking for exact match + fuzzy match.
100 needed_cmds = ['status', 'stats', 'add', 'find',
101 'search', 'currentsong', 'ping']
102 needed_tags = {'Artist', 'Album', 'AlbumArtist', 'Title', 'Track'}
103 needed_mbid_tags = {'MUSICBRAINZ_ARTISTID', 'MUSICBRAINZ_ALBUMID',
104 'MUSICBRAINZ_ALBUMARTISTID', 'MUSICBRAINZ_TRACKID'}
105 MPD_supported_tags = {'Artist', 'ArtistSort', 'Album', 'AlbumSort', 'AlbumArtist',
106 'AlbumArtistSort', 'Title', 'Track', 'Name', 'Genre',
107 'Date', 'OriginalDate', 'Composer', 'Performer',
108 'Conductor', 'Work', 'Grouping', 'Disc', 'Label',
109 'MUSICBRAINZ_ARTISTID', 'MUSICBRAINZ_ALBUMID',
110 'MUSICBRAINZ_ALBUMARTISTID', 'MUSICBRAINZ_TRACKID',
111 'MUSICBRAINZ_RELEASETRACKID', 'MUSICBRAINZ_WORKID'}
114 def __init__(self, config):
116 self.socket_timeout = 10
118 self.log = getLogger('sima')
122 # ######### Overriding MPDClient ###########
124 def __getattr__(self, cmd):
125 """Wrapper around MPDClient calls for abstract overriding"""
126 track_wrapped = {'currentsong', 'find', 'playlistinfo', }
127 if cmd in track_wrapped:
128 return tracks_wrapper(super().__getattr__(cmd))
129 return super().__getattr__(cmd)
131 def disconnect(self):
132 """Overriding explicitly MPDClient.disconnect()"""
137 """Overriding explicitly MPDClient.connect()"""
138 mpd_config = self.config['MPD']
139 # host, port, password
140 host = mpd_config.get('host')
141 port = mpd_config.get('port')
142 password = mpd_config.get('password', fallback=None)
144 super().connect(host, port)
146 self.password(password)
147 # Controls we have sufficient rights
148 available_cmd = self.commands()
149 for cmd in MPD.needed_cmds:
150 if cmd not in available_cmd:
152 raise PlayerError(f'Could connect to "{host}", but command "{cmd}" not available')
153 self.tagtypes_clear()
154 for tag in MPD.needed_tags:
155 self.tagtypes_enable(tag)
156 ltt = set(map(str.lower, self.tagtypes()))
157 needed_tags = set(map(str.lower, MPD.needed_tags))
158 if len(needed_tags & ltt) != len(MPD.needed_tags):
159 self.log.warning('MPD exposes: %s', ltt)
160 self.log.warning('Tags needed: %s', needed_tags)
161 raise PlayerError('Missing mandatory metadata!')
162 for tag in MPD.needed_mbid_tags:
163 self.tagtypes_enable(tag)
164 # Controls use of MusicBrainzIdentifier
165 if self.config.getboolean('sima', 'musicbrainzid'):
166 ltt = set(self.tagtypes())
167 if len(MPD.needed_mbid_tags & ltt) != len(MPD.needed_mbid_tags):
168 self.log.warning('Use of MusicBrainzIdentifier is set but MPD '
169 'is not providing related metadata')
171 self.log.warning('Disabling MusicBrainzIdentifier')
172 self.use_mbid = Meta.use_mbid = False
174 self.log.debug('Available metadata: %s', ltt)
175 self.use_mbid = Meta.use_mbid = True
177 self.log.warning('Use of MusicBrainzIdentifier disabled!')
178 self.log.info('Consider using MusicBrainzIdentifier for your music library')
179 self.use_mbid = Meta.use_mbid = False
181 # ######### / Overriding MPDClient #########
183 def _reset_cache(self):
185 Both flushes and instantiates _cache
187 * artists: all artists
188 * nombid_artists: artists with no mbid (set only when self.use_mbid is True)
189 * artist_tracks: caching last artist tracks, used in search_track
191 if isinstance(self._cache, dict):
192 self.log.info('Player: Flushing cache!')
194 self.log.info('Player: Initialising cache!')
195 self._cache = {'artists': frozenset(),
196 'nombid_artists': frozenset(),
198 self._cache['artists'] = frozenset(filter(None, self.list('artist')))
200 artists = self.list('artist', "(MUSICBRAINZ_ARTISTID == '')")
201 self._cache['nombid_artists'] = frozenset(filter(None, artists))
203 def _skipped_track(self, previous):
204 if (self.state == 'stop'
205 or not hasattr(previous, 'id')
206 or not hasattr(self.current, 'id')):
208 return self.current.id != previous.id # pylint: disable=no-member
212 """Monitor player for change
213 Returns a list a events among:
215 * database player media library has changed
216 * playlist playlist modified
217 * options player options changed: repeat mode, etc…
218 * player player state changed: paused, stopped, skip track…
219 * skipped current track skipped
224 self.send_idle('database', 'playlist', 'player', 'options')
225 _read, _, _ = select([self], [], [], select_timeout)
226 if _read: # tries to read response
227 ret = self.fetch_idle()
228 if self._skipped_track(curr):
229 ret.append('skipped')
230 if 'database' in ret:
233 # Nothing to read, canceling idle
237 """Clean blocking event (idle) and pending commands
239 if 'idle' in self._pending:
242 self.log.warning('pending commands: %s', self._pending)
245 def add(self, payload):
246 """Overriding MPD's add method to accept Track objects
248 :param Track,list payload: Either a single track or a list of it
250 if isinstance(payload, Track):
251 super().__getattr__('add')(payload.file)
252 elif isinstance(payload, list):
253 self.command_list_ok_begin()
254 map(self.add, payload)
255 self.command_list_end()
257 self.log.error('Cannot add %s', payload)
259 # ######### Properties #####################
262 return self.currentsong()
267 Override deprecated MPD playlist command
269 return self.playlistinfo()
273 plm = {'repeat': None, 'single': None,
274 'random': None, 'consume': None, }
275 for key, val in self.status().items():
277 plm.update({key: bool(int(val))})
283 curr_position = int(self.current.pos)
285 return [trk for trk in plst if int(trk.pos) > curr_position]
289 """Returns (play|stop|pause)"""
290 return str(self.status().get('state'))
291 # ######### / Properties ###################
293 # #### find_tracks ####
294 def find_tracks(self, what):
295 """Find tracks for a specific artist or album
296 >>> player.find_tracks(Artist('Nirvana'))
297 >>> player.find_tracks(Album('In Utero', artist=Artist('Nirvana'))
299 :param Artist,Album what: Artist or Album to fetch track from
300 :return: A list of track objects
303 if isinstance(what, Artist):
304 return self._find_art(what)
305 if isinstance(what, Album):
306 return self._find_alb(what)
307 if isinstance(what, str):
308 return self.find_tracks(Artist(name=what))
309 raise PlayerError('Bad input argument')
311 def _find_art(self, artist):
314 if self.database.get_bl_artist(artist, add=False):
315 self.log.info('Artist in blocklist: %s', artist)
318 tracks |= set(self.find('musicbrainz_artistid', artist.mbid))
319 for name in artist.names:
320 tracks |= set(self.find('artist', name))
322 albums = {Album(trk.Album.name, mbid=trk.musicbrainz_albumid)
324 bl_albums = {Album(a.get('album'), mbid=a.get('musicbrainz_album'))
325 for a in self.database.view_bl() if a.get('album')}
326 if albums & bl_albums:
327 self.log.info('Albums in blocklist for %s: %s', artist, albums & bl_albums)
328 tracks = {trk for trk in tracks if trk.Album not in bl_albums}
330 bl_tracks = {Track(title=t.get('title'), file=t.get('file'))
331 for t in self.database.view_bl() if t.get('title')}
332 if tracks & bl_tracks:
333 self.log.info('Tracks in blocklist for %s: %s',
334 artist, tracks & bl_tracks)
335 tracks = {trk for trk in tracks if trk not in bl_tracks}
338 def _find_alb(self, album):
339 if not hasattr(album, 'artist'):
340 raise PlayerError('Album object have no artist attribute')
341 if self.database.get_bl_album(album, add=False):
342 self.log.info('Album in blocklist: %s', album)
346 filt = f"(MUSICBRAINZ_ALBUMID == '{album.mbid}')"
347 albums = self.find(filt)
348 # Now look for album with no MusicBrainzIdentifier
349 if not albums and album.Artist.mbid: # Use album artist MBID if possible
350 filt = f"((MUSICBRAINZ_ALBUMARTISTID == '{album.Artist.mbid}') AND (album == '{album.name_sz}'))"
351 albums = self.find(filt)
352 if not albums: # Falls back to (album)?artist/album name
353 for artist in album.Artist.names_sz:
354 filt = f"((albumartist == '{artist}') AND (album == '{album.name_sz}'))"
355 albums.extend(self.find(filt))
357 # #### / find_tracks ##
359 # #### Search Methods #####
360 def _find_musicbrainz_artistid(self, artist):
361 """Find MusicBrainzArtistID when possible.
363 if not self.use_mbid:
366 for name in artist.names_sz:
367 filt = f'((artist == "{name}") AND (MUSICBRAINZ_ARTISTID != ""))'
368 mbids = self.list('MUSICBRAINZ_ARTISTID', filt)
374 self.log.debug("Got multiple MBID for artist: %r", artist)
377 if artist.mbid != mbids[0]:
378 self.log('MBID discrepancy, %s found with %s (instead of %s)',
379 artist.name, mbids[0], artist.mbid)
386 def search_artist(self, artist):
388 Search artists based on a fuzzy search in the media library
389 >>> art = Artist(name='the beatles', mbid=<UUID4>) # mbid optional
390 >>> bea = player.search_artist(art)
392 >>> ['The Beatles', 'Beatles', 'the beatles']
394 :param Artist artist: Artist to look for in MPD music library
395 :return: Artist object
400 # look for exact search w/ musicbrainz_artistid
401 library = self.list('artist', f"(MUSICBRAINZ_ARTISTID == '{artist.mbid}')")
404 self.log.trace('Found mbid "%r" in library', artist)
405 # library could fetch several artist name for a single MUSICBRAINZ_ARTISTID
407 self.log.debug('I got "%s" searching for %r', library, artist)
409 if SimaStr(artist.name) == name and name != artist.name:
410 self.log.debug('add alias for %s: %s', artist, name)
411 artist.add_alias(name)
412 # Fetches remaining artists for potential match
413 artists = self._cache['nombid_artists']
414 else: # not using MusicBrainzIDs
415 artists = self._cache['artists']
416 match = get_close_matches(artist.name, artists, 50, 0.73)
417 if not match and not found:
420 self.log.debug('found close match for "%s": %s',
421 artist, '/'.join(match))
422 # First lowercased comparison
423 for close_art in match:
424 # Regular lowered string comparison
425 if artist.name.lower() == close_art.lower():
426 artist.add_alias(close_art)
428 if artist.name != close_art:
429 self.log.debug('"%s" matches "%s".', close_art, artist)
430 # Does not perform fuzzy matching on short and single word strings
431 # Only lowercased comparison
432 if ' ' not in artist.name and len(artist.name) < 8:
433 self.log.trace('no fuzzy matching for %r', artist)
437 # Now perform fuzzy search
439 if fuzz in artist.names: # Already found in lower cased comparison
441 # SimaStr string __eq__ (not regular string comparison here)
442 if SimaStr(artist.name) == fuzz:
444 artist.add_alias(fuzz)
445 self.log.debug('"%s" quite probably matches "%s" (SimaStr)',
449 self.log.info('Found aliases: %s', '/'.join(artist.names))
453 def search_track(self, artist, title):
454 """Fuzzy search of title by an artist
456 cache = self._cache.get('artist_tracks').get(artist)
457 # Retrieve all tracks from artist
458 all_tracks = cache or self.find_tracks(artist)
460 self._cache['artist_tracks'] = {} # clean up
461 self._cache.get('artist_tracks')[artist] = all_tracks
462 # Get all titles (filter missing titles set to 'None')
463 all_artist_titles = frozenset([tr.title for tr in all_tracks
464 if tr.title is not None])
465 match = get_close_matches(title, all_artist_titles, 50, 0.78)
470 leven = levenshtein_ratio(title, mtitle)
472 tracks.extend([t for t in all_tracks if t.title == mtitle])
474 self.log.debug('title: "%s" should match "%s" (lr=%1.3f)',
475 mtitle, title, leven)
476 tracks.extend([t for t in all_tracks if t.title == mtitle])
478 self.log.debug('title: "%s" does not match "%s" (lr=%1.3f)',
479 mtitle, title, leven)
482 def search_albums(self, artist):
483 """Find potential albums for "artist"
485 * Fetch all albums for "AlbumArtist" == artist
486 → falls back to "Artist" == artist when no "AlbumArtist" tag is set
487 * Tries to filter some mutli-artists album
488 For instance an album by Artist_A may have a track by Artist_B. Then
489 looking for albums for Artist_B wrongly returns this album.
491 # First, look for all potential albums
492 self.log.debug('Searching album for "%r"', artist)
494 self.log.debug('Searching album for %s aliases: "%s"',
495 artist, artist.aliases)
497 if self.use_mbid and artist.mbid:
498 mpd_filter = f"((musicbrainz_albumartistid == '{artist.mbid}') AND ( album != ''))"
499 raw_album_id = self.list('musicbrainz_albumid', mpd_filter)
500 for albumid in raw_album_id:
501 mpd_filter = f"((musicbrainz_albumid == '{albumid}') AND ( album != ''))"
502 album_name = self.list('album', mpd_filter)
503 if not album_name: # something odd here
505 albums.add(Album(album_name[0], artist=artist.name,
506 Artist=artist, mbid=albumid))
507 for name_sz in artist.names_sz:
508 mpd_filter = f"((albumartist == '{name_sz}') AND ( album != ''))"
509 raw_albums = self.list('album', mpd_filter)
510 for alb in raw_albums:
511 if alb in [a.name for a in albums]:
516 mpd_filter = f"((albumartist == '{artist.name_sz}') AND ( album == '{_.name_sz}'))"
517 mbids = self.list('MUSICBRAINZ_ALBUMID', mpd_filter)
520 albums.add(Album(alb, artist=artist.name,
521 Artist=artist, mbid=mbid))
524 album_trks = self.find_tracks(album)
525 if not album_trks: # find_track result can be empty, blocklist applied
527 album_artists = {tr.albumartist for tr in album_trks if tr.albumartist}
528 if album.Artist.names & album_artists:
529 candidates.append(album)
531 if self.use_mbid and artist.mbid:
532 if artist.mbid == album_trks[0].musicbrainz_albumartistid:
533 candidates.append(album)
535 self.log.debug('Discarding "%s", "%r" not set as musicbrainz_albumartistid',
538 if 'Various Artists' in album_artists:
539 self.log.debug('Discarding %s ("Various Artists" set)', album)
541 if album_artists and album.Artist.name not in album_artists:
542 self.log.debug('Discarding "%s", "%s" not set as albumartist', album, album.Artist)
544 # Attempt to detect false positive (especially when no
545 # AlbumArtist/MBIDs tag ar set)
546 # Avoid selecting albums where artist is credited for a single
548 album_trks = self.find(f"(album == '{album.name_sz}')")
549 arts = [trk.artist for trk in album_trks] # Artists in the album
550 # count artist occurences
551 ratio = arts.count(album.Artist.name)/len(arts)
553 candidates.append(album)
555 self.log.debug('"%s" probably not an album of "%s" (ratio=%.2f)',
556 album, artist, ratio)
559 # #### / Search Methods ###
562 # vim: ai ts=4 sw=4 sts=4 expandtab