# -*- coding: utf-8 -*-
-# Copyright (c) 2013-2015, 2020 kaliko <kaliko@azylum.org>
+# Copyright (c) 2013-2015, 2020-2021 kaliko <kaliko@azylum.org>
#
# This file is part of sima
#
import random
from .track import Track
-from .meta import Album, Artist
+from .meta import Album, Artist, MetaContainer
class Plugin:
if cls.__doc__:
doc = cls.__doc__.strip(' \n').splitlines()[0]
return {'name': cls.__name__,
- 'doc': doc,}
+ 'doc': doc}
def __init__(self, daemon):
self.log = daemon.log
- self.__daemon = daemon
self.player = daemon.player
self.plugin_conf = None
+ self.main_conf = daemon.config
+ self.sdb = daemon.sdb
self.__get_config()
def __str__(self):
def __get_config(self):
"""Get plugin's specific configuration from global applications's config
"""
- conf = self.__daemon.config
+ conf = self.main_conf
for sec in conf.sections(): # Discovering plugin conf
if sec == self.__class__.__name__.lower():
self.plugin_conf = conf[sec]
if not self.plugin_conf:
self.plugin_conf = {'priority': '80'}
#if self.plugin_conf:
- # self.log.debug('Got config for %s: %s', self, self.plugin_conf)
+ # self.log.debug('Got config for %s: %s', self, self.plugin_conf)
@property
def priority(self):
Called when the daemon().run() is called and
right after the player has connected successfully.
"""
- pass
def callback_player(self):
"""
Called on player changes, stopped, paused, skipped
"""
- pass
def callback_player_database(self):
"""
Called on player music library changes
"""
- pass
def callback_playlist(self):
"""
Called on playlist changes
Not returning data
"""
- pass
def callback_next_song(self):
"""
Could be use to scrobble, maintain an history…
Not returning data,
"""
- pass
def callback_need_track(self):
"""
Returns a list of Track objects to add
"""
- pass
def callback_need_track_fb(self):
"""
Called when callback_need_track failled to find tracks to queue
Returns a list of Track objects to add
"""
- pass
def shutdown(self):
"""Called on application shutdown"""
- pass
-class AdvancedLookUp:
+class AdvancedPlugin(Plugin):
"""Object to derive from for plugins
Exposes advanced music library look up with use of play history
"""
- def __init__(self, daemon):
- self.log = daemon.log
- self.daemon = daemon
- self.player = daemon.player
-
# Query History
- def get_history(self, artist=False):
- """Constructs list of already played artists.
- """
- duration = self.daemon.config.getint('sima', 'history_duration')
- name = None
- if artist:
- name = artist.name
- from_db = self.daemon.sdb.get_history(duration=duration, artist=name)
- hist = [Track(artist=tr[0], album=tr[1], title=tr[2],
- file=tr[3]) for tr in from_db]
- return hist
+ def get_history(self):
+ """Returns a Track list of already played artists."""
+ duration = self.main_conf.getint('sima', 'history_duration')
+ return self.sdb.fetch_history(duration=duration)
def get_album_history(self, artist):
"""Retrieve album history"""
- hist = []
- tracks_from_db = self.get_history(artist=artist)
- for trk in tracks_from_db:
- if trk.album and trk.album in hist:
- continue
- hist.append(Album(name=trk.album, artist=Artist(trk.artist)))
- return hist
+ duration = self.main_conf.getint('sima', 'history_duration')
+ return self.sdb.fetch_albums_history(needle=artist, duration=duration)
def get_reorg_artists_list(self, alist):
"""
- Move around items in artists_list in order to play first not recently
- played artists
+ Move around items in alist in order to have first not recently
+ played (or about to be played) artists.
- :param list(str) alist:
+ :param {Artist} alist: Artist objects list/container
"""
- hist = list()
- duration = self.daemon.config.getint('sima', 'history_duration')
- for art in self.daemon.sdb.get_artists_history(alist, duration=duration):
+ queued_artist = MetaContainer([Artist(_.artist) for _ in
+ self.player.queue if _.artist])
+ not_queued_artist = alist - queued_artist
+ duration = self.main_conf.getint('sima', 'history_duration')
+ hist = []
+ for art in self.sdb.fetch_artists_history(alist, duration=duration):
if art not in hist:
- hist.insert(0, art)
- reorg = [art for art in alist if art not in hist]
+ if art not in queued_artist:
+ hist.insert(0, art)
+ else:
+ hist.append(art)
+ # Find not recently played (not in history) & not in queue
+ reorg = [art for art in not_queued_artist if art not in hist]
reorg.extend(hist)
return reorg
# /Query History
# Find not recently played/unplayed
def album_candidate(self, artist, unplayed=True):
- """
+ """Search an album for artist
+
:param Artist artist: Artist to fetch an album for
:param bool unplayed: Fetch only unplayed album
"""
self.log.info('Searching an album for "%s"...' % artist)
albums = self.player.search_albums(artist)
if not albums:
- return []
- self.log.debug('Albums candidate: %s', albums)
+ return None
+ self.log.debug('Albums candidates: %s', albums)
albums_hist = self.get_album_history(artist)
- self.log.debug('Albums history: %s', albums_hist)
+ self.log.trace('Albums history: %s', [a.name for a in albums_hist])
albums_not_in_hist = [a for a in albums if a.name not in albums_hist]
# Get to next artist if there are no unplayed albums
if not albums_not_in_hist:
self.log.info('No unplayed album found for "%s"' % artist)
if unplayed:
- return []
+ return None
random.shuffle(albums_not_in_hist)
albums_not_in_hist.extend(albums_hist)
+ self.log.debug('Albums candidate: %s', albums_not_in_hist)
album_to_queue = []
for album in albums_not_in_hist:
# Controls the album found is not already queued
if album in {t.album for t in self.player.queue}:
self.log.debug('"%s" already queued, skipping!', album)
- return []
+ continue
# In random play mode use complete playlist to filter
if self.player.playmode.get('random'):
if album in {t.album for t in self.player.playlist}:
self.log.debug('"%s" already in playlist, skipping!',
album)
- return []
+ continue
album_to_queue = album
+ break
if not album_to_queue:
self.log.info('No album found for "%s"', artist)
- return []
- self.log.info('%s album candidate: %s - %s', self.__class__.__name__,
- artist, album_to_queue)
+ return None
+ self.log.info('%s plugin chose album: %s - %s',
+ self.__class__.__name__, artist, album_to_queue)
return album_to_queue
def filter_track(self, tracks, unplayed=False):
deny_list = self.player.playlist
else:
deny_list = self.player.queue
- not_in_hist = list(set(tracks) - set(self.get_history(artist=artist)))
+ not_in_hist = list(set(tracks) - set(self.sdb.fetch_history(artist=artist)))
if not not_in_hist:
self.log.debug('All tracks already played for "%s"', artist)
if unplayed:
return None
random.shuffle(not_in_hist)
- candidates = [_ for _ in not_in_hist if _ not in deny_list]
- # for trk in [_ for _ in not_in_hist if _ not in deny_list]:
- # # Should use albumartist heuristic as well
- # if self.plugin_conf.getboolean('single_album'): # pylint: disable=no-member
- # if (trk.album == self.player.current.album or
- # trk.album in [tr.album for tr in black_list]):
- # self.log.debug('Found unplayed track ' +
- # 'but from an album already queued: %s', trk)
- # continue
- # candidates.append(trk)
+ candidates = []
+ for trk in [_ for _ in not_in_hist if _ not in deny_list]:
+ # Should use albumartist heuristic as well
+ if self.plugin_conf.getboolean('single_album', False): # pylint: disable=no-member
+ if (trk.album == self.player.current.album or
+ trk.album in [tr.album for tr in deny_list]):
+ self.log.debug('Found unplayed track ' +
+ 'but from an album already queued: %s', trk)
+ continue
+ candidates.append(trk)
if not candidates:
return None
return random.choice(candidates)