1 # -*- coding: utf-8 -*-
2 # Copyright (c) 2009-2020 kaliko <kaliko@azylum.org>
3 # Copyright (c) 2019 sacha <sachahony@gmail.com>
5 # This file is part of sima
7 # sima is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # sima is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with sima. If not, see <http://www.gnu.org/licenses/>.
22 Fetching similar artists from last.fm web services
25 # standard library import
28 from collections import deque
29 from hashlib import md5
31 # third parties components
34 from .plugin import AdvancedPlugin
35 from .meta import Artist, MetaContainer
36 from ..utils.utils import WSError, WSNotFound, WSTimeout
40 """Caching decorator"""
41 def wrapper(*args, **kwargs):
42 #pylint: disable=W0212,C0111
44 similarities = [art.name for art in args[1]]
45 hashedlst = md5(''.join(similarities).encode('utf-8')).hexdigest()
46 if hashedlst in cls._cache.get('asearch'):
47 cls.log.debug('cached request')
48 results = cls._cache.get('asearch').get(hashedlst)
50 results = func(*args, **kwargs)
51 cls.log.debug('caching request')
52 cls._cache.get('asearch').update({hashedlst: list(results)})
53 random.shuffle(results)
58 class WebService(AdvancedPlugin):
59 """similar artists webservice
62 def __init__(self, daemon):
63 super().__init__(daemon)
64 self.history = daemon.short_history
68 wrapper = {'track': self._track,
71 self.queue_mode = wrapper.get(self.plugin_conf.get('queue_mode'))
75 def _flush_cache(self):
77 Both flushes and instanciates _cache
79 name = self.__class__.__name__
80 if isinstance(self._cache, dict):
81 self.log.info('%s: Flushing cache!', name)
83 self.log.info('%s: Initialising cache!', name)
84 self._cache = {'asearch': dict(),
87 def _cleanup_cache(self):
88 """Avoid bloated cache
90 for _, val in self._cache.items():
91 if isinstance(val, dict):
96 def get_artists_from_player(self, similarities):
98 Look in player library for availability of similar artists in
101 dynamic = self.plugin_conf.getint('max_art')
105 similarities.reverse()
106 while (len(results) < dynamic and similarities):
107 art_pop = similarities.pop()
108 res = self.player.search_artist(art_pop)
113 def ws_similar_artists(self, artist):
115 Retrieve similar artists from WebServive.
117 # initialize artists deque list to construct from DB
119 as_artists = self.ws.get_similar(artist=artist)
120 self.log.debug('Requesting %s for %r', self.ws.name, artist)
122 [as_art.append(art) for art in as_artists]
123 except WSNotFound as err:
124 self.log.warning('%s: %s', self.ws.name, err)
126 self.log.debug('Trying without MusicBrainzID')
128 return self.ws_similar_artists(Artist(name=artist.name))
129 except WSNotFound as err:
130 self.log.debug('%s: %s', self.ws.name, err)
131 except WSTimeout as err:
132 self.log.warning('%s: %s', self.ws.name, err)
133 if self.ws_retry < 3:
135 self.log.warning('%s: retrying', self.ws.name)
136 as_art = self.ws_similar_artists(artist)
138 self.log.warning('%s: stop retrying', self.ws.name)
140 except WSError as err:
141 self.log.warning('%s: %s', self.ws.name, err)
143 self.log.debug('Fetched %d artist(s)', len(as_art))
146 def get_recursive_similar_artist(self):
147 """Check against local player for similar artists (recursive w/ history)
149 if not self.player.playlist:
151 history = list(self.history)
152 # In random play mode use complete playlist to filter
153 if self.player.playmode.get('random'):
154 history = self.player.playlist + history
156 history = self.player.queue + history
157 history = deque(history)
158 last_trk = history.popleft() # remove
162 while depth < self.plugin_conf.getint('depth'):
165 trk = history.popleft()
166 if (trk.Artist in extra_arts
167 or trk.Artist == last_trk.Artist):
169 extra_arts.append(trk.Artist)
171 self.log.debug('EXTRA ARTS: %s', '/'.join(map(str, extra_arts)))
172 for artist in extra_arts:
173 self.log.debug('Looking for artist similar '
174 'to "%s" as well', artist)
175 similar = self.ws_similar_artists(artist=artist)
178 ret_extra.extend(self.get_artists_from_player(similar))
180 if last_trk.Artist in ret_extra:
181 ret_extra.remove(last_trk.Artist)
183 self.log.debug('similar artist(s) found: %s',
184 ' / '.join(map(str, MetaContainer(ret_extra))))
187 def get_local_similar_artists(self):
188 """Check against local player for similar artists
190 if not self.player.playlist:
192 tolookfor = self.player.playlist[-1].Artist
193 self.log.info('Looking for artist similar to "%s"', tolookfor)
194 self.log.debug('%r', tolookfor)
195 similar = self.ws_similar_artists(tolookfor)
197 self.log.info('Got nothing from %s!', self.ws.name)
199 self.log.info('First five similar artist(s): %s...',
200 ' / '.join(map(str, list(similar)[:5])))
201 self.log.info('Looking availability in music library')
202 ret = MetaContainer(self.get_artists_from_player(similar))
204 self.log.debug('regular found in library: %s',
205 ' / '.join(map(str, ret)))
207 self.log.debug('Got nothing similar from library!')
209 if len(self.history) >= 2:
210 if self.plugin_conf.getint('depth') > 1:
211 ret_extra = self.get_recursive_similar_artist()
213 # get them reorg to pick up best element
214 ret_extra = self.get_reorg_artists_list(ret_extra)
215 # tries to pickup less artist from extra art
217 ret_extra = MetaContainer(ret_extra[:max(4, len(ret))//2])
219 self.log.debug('extra found in library: %s',
220 ' / '.join(map(str, ret_extra)))
221 ret = ret | ret_extra
223 self.log.warning('Got nothing from music library.')
225 # In random play mode use complete playlist to filter
226 if self.player.playmode.get('random'):
227 queued_artists = MetaContainer([trk.Artist for trk
228 in self.player.playlist])
230 queued_artists = MetaContainer([trk.Artist for trk
231 in self.player.queue])
232 self.log.trace('Already queued: %s', queued_artists)
233 self.log.trace('Candidate: %s', ret)
234 if ret & queued_artists:
235 self.log.debug('Removing already queued artists: '
236 '%s', '/'.join(map(str, ret & queued_artists)))
237 ret = ret - queued_artists
238 current = self.player.current
239 if current and current.Artist in ret:
240 self.log.debug('Removing current artist: %s', current.Artist)
241 ret = ret - MetaContainer([current.Artist])
242 # Move around similars items to get in unplayed|not recently played
244 self.log.info('Got %d artists in library', len(ret))
245 candidates = self.get_reorg_artists_list(ret)
247 self.log.info(' / '.join(map(str, candidates)))
250 def find_album(self, artists):
251 """Find albums to queue.
255 target_album_to_add = self.plugin_conf.getint('album_to_add')
256 for artist in artists:
257 album = self.album_candidate(artist, unplayed=True)
261 candidates = self.player.find_tracks(album)
264 if self.plugin_conf.getboolean('shuffle_album'):
265 random.shuffle(candidates)
266 # this allows to select a maximum number of track from the album
267 # a value of 0 (default) means keep all
268 nbtracks = self.plugin_conf.getint('track_to_add_from_album')
270 candidates = candidates[0:nbtracks]
271 to_add.extend(candidates)
272 if nb_album_add == target_album_to_add:
275 def find_top(self, artists):
277 find top tracks for artists in artists list.
280 nbtracks_target = self.plugin_conf.getint('track_to_add')
281 for artist in artists:
282 if len(to_add) == nbtracks_target:
284 self.log.info('Looking for a top track for %s', artist)
287 titles = [t for t in self.ws.get_toptrack(artist)]
288 except WSError as err:
289 self.log.warning('%s: %s', self.ws.name, err)
292 found = self.player.search_track(artist, trk.title)
294 random.shuffle(found)
295 top_trk = self.filter_track(found, to_add)
297 to_add.append(top_trk)
301 """Get some tracks for track queue mode
303 :return: list of Tracks
306 artists = self.get_local_similar_artists()
307 nbtracks_target = self.plugin_conf.getint('track_to_add')
308 for artist in artists:
309 self.log.debug('Trying to find titles to add for "%r"', artist)
310 found = self.player.find_tracks(artist)
312 self.log.debug('Found nothing to queue for %s', artist)
314 random.shuffle(found)
315 # find tracks not in history for artist
316 track_candidate = self.filter_track(found, to_add)
318 to_add.append(track_candidate)
319 self.log.info('%s plugin chose: %s',
320 self.ws.name, track_candidate)
321 if len(to_add) == nbtracks_target:
326 """Get albums for album queue mode
328 :return: list of Tracks
330 artists = self.get_local_similar_artists()
331 return self.find_album(artists)
334 """Get some tracks for top track queue mode
336 :return: list of Tracks
338 artists = self.get_local_similar_artists()
339 chosen = self.find_top(artists)
341 self.log.info('%s candidates: %s', self.ws.name, track)
344 def callback_need_track(self):
345 self._cleanup_cache()
346 if not self.player.playlist:
347 self.log.info('No last track, cannot queue')
349 if not self.player.playlist[-1].artist:
350 self.log.warning('No artist set for the last track in queue')
351 self.log.debug(repr(self.player.current))
353 candidates = self.queue_mode()
354 msg = ' '.join(['{0}: {1:>3d}'.format(k, v) for
355 k, v in sorted(self.ws.stats.items())])
356 self.log.debug('http stats: ' + msg)
358 self.log.info('%s plugin found nothing to queue', self.ws.name)
359 if self.plugin_conf.get('queue_mode') != 'album':
360 random.shuffle(candidates)
363 def callback_player_database(self):
367 # vim: ai ts=4 sw=4 sts=4 expandtab