self.sdb = SimaDB(db_path=conf.get('sima', 'db_file'))
PlayerClient.database = self.sdb
self.log = getLogger('sima')
- self._plugins = list()
- self._core_plugins = list()
+ self._plugins = []
+ self._core_plugins = []
self.player = PlayerClient(conf) # MPD client
self.short_history = deque(maxlen=60)
self.changed = None
return False
def queue(self):
- to_add = list()
+ to_add = []
for plugin in self.plugins:
self.log.debug('callback_need_track: %s', plugin)
pl_candidates = getattr(plugin, 'callback_need_track')()
# TODO: Sanity check for "sima.config.get('sima', source)" ?
for plugin in sima.config.get('sima', source).split(','):
plugin = plugin.strip(' \n')
- module = 'sima.plugins.{0}.{1}'.format(source, plugin.lower())
+ module = f'sima.plugins.{source}.{plugin.lower()}'
try:
mod_obj = sima_import(module, fromlist=[plugin])
except ImportError as err:
- logger.error('Failed to load "{}" plugin\'s module: '.format(plugin) +
- '{0} ({1})'.format(module, err))
+ logger.error(f'Failed to load "{plugin}" plugin\'s module: ' +
+ f'{module} ({err})')
sima.shutdown()
sys.exit(1)
try:
"""Normalize the URL to create a safe key for the cache"""
(scheme, authority, path, query, _) = parse_uri(uri)
if not scheme or not authority:
- raise Exception("Only absolute URIs are allowed. uri = %s" % uri)
+ raise Exception(f'Only absolute URIs are allowed. uri = {uri}')
authority = authority.lower()
scheme = scheme.lower()
if not path:
try:
return self.fetch_ws(req)
except Timeout as err:
- raise WSTimeout('Failed to reach server within {0}s'.format(
- SOCKET_TIMEOUT)) from err
+ raise WSTimeout(f'Failed to reach server within {SOCKET_TIMEOUT}s') from err
except HTTPConnectionError as err:
raise WSError(err) from err
self.stats.update(etag=self.stats.get('etag')+1)
resp = self.controller.update_cached_response(prepreq, resp)
elif resp.status_code != 200:
- raise WSHTTPError('{0.status_code}: {0.reason}'.format(resp))
+ raise WSHTTPError(f'{resp.status_code}: {resp.reason}')
self.controller.cache_response(resp.request, resp)
return resp
if filehdl:
logger.handlers = []
# Add timestamp for file handler
- log_format = '{0} {1}'.format('{asctime}', log_format)
+ log_format = f'{{asctime}} {log_format}'
formatter = logging.Formatter(log_format, DATE_FMT, '{')
# create file handler
fileh = logging.FileHandler(logfile)
if callable(other.__str__) and other.__str__() != self.name:
self.__aliases |= {other.__str__()}
else:
- raise MetaException('No __str__ method found in {!r}'.format(other))
+ raise MetaException(f'No __str__ method found in {other!r}')
@property
def name(self):
def info(cls):
"""self documenting class method
"""
- doc = 'Undocumented plugin! Fill "{}" docstring'.format(cls.__name__)
+ doc = f'Undocumented plugin! Fill "{cls.__name__}" docstring'
if cls.__doc__:
doc = cls.__doc__.strip(' \n').splitlines()[0]
return {'name': cls.__name__,
:param Artist artist: Artist to fetch an album for
:param bool unplayed: Fetch only unplayed album
"""
- self.log.info('Searching an album for "%s"...' % artist)
+ self.log.info('Searching an album for "%s"...', artist)
albums = self.player.search_albums(artist)
if not albums:
return None
albums_not_in_hist = [a for a in albums if a.name not in albums_hist]
# Get to next artist if there are no unplayed albums
if not albums_not_in_hist:
- self.log.info('No unplayed album found for "%s"' % artist)
+ self.log.info('No unplayed album found for "%s"', artist)
if unplayed:
return None
random.shuffle(albums_not_in_hist)
:param sima.lib.track.Track track: track to use
:param bool add: add non existing track to database"""
if not track.file:
- raise SimaDBError('Got a track with no file attribute: %r' % track)
+ raise SimaDBError(f'Got a track with no file attribute: {track}')
if with_connection:
connection = with_connection
else:
LEFT OUTER JOIN albumartists ON tracks.albumartist = albumartists.id
WHERE history.last_play > ? AND albums.name NOT NULL AND artists.name NOT NULL
ORDER BY history.last_play DESC""", (date.isoformat(' '),))
- hist = list()
+ hist = []
for row in rows:
vals = dict(row)
if needle: # Here use artist instead of albumartist
WHERE history.last_play > ? AND artists.name NOT NULL
ORDER BY history.last_play DESC""", (date.isoformat(' '),))
last = deque(maxlen=1)
- hist = list()
+ hist = []
for row in rows:
artist = Artist(**row)
if last and last[0] == artist: # remove consecutive dupes
WHERE history.last_play > ? AND genres.name NOT NULL
ORDER BY history.last_play DESC
""", (date.isoformat(' '),))
- genres = list()
+ genres = []
for row in rows:
genres.append(row)
if len({g[0] for g in genres}) >= limit:
else:
rows = connection.execute(sql+'ORDER BY history.last_play DESC',
(date.isoformat(' '),))
- hist = list()
+ hist = []
for row in rows:
hist.append(Track(**row))
connection.close()
code = ans.get('error')
mess = ans.get('message')
if code == 6:
- raise WSNotFound('{0}: "{1}"'.format(mess, self.artist))
+ raise WSNotFound(f'{mess}: "{self.artist}"')
raise WSError(mess)
return True
payload = payloads.get(method)
payload.update(api_key=LFM.get('apikey'), format='json')
if not isinstance(artist, Artist):
- raise TypeError('"{0!r}" not an Artist object'.format(artist))
+ raise TypeError(f'"{artist!r}" not an Artist object')
self.artist = artist
if artist.mbid:
- payload.update(mbid='{0}'.format(artist.mbid))
+ payload.update(mbid=f'{artist.mbid}')
else:
payload.update(artist=artist.name,
autocorrect=1)
"""
diafilter = True
leven_ratio = 0.82
- regexp_dict = dict()
+ regexp_dict = {}
# Leading patterns: The Le Les
# case-insensitive matching for this RE
'musicbrainz_artistid',
'musicbrainz_albumartistid']
# Which tags have been collapsed?
- self.collapsed_tags = list()
+ self.collapsed_tags = []
# Needed for multiple tags which returns a list instead of a string
self._collapse_tags()
self.log.info('%s: Flushing cache!', name)
else:
self.log.info('%s: Initialising cache!', name)
- self._cache = {'asearch': dict(),
- 'tsearch': dict()}
+ self._cache = {'asearch': {},
+ 'tsearch': {}}
def _cleanup_cache(self):
"""Avoid bloated cache
dynamic = self.plugin_conf.getint('max_art')
if dynamic <= 0:
dynamic = 100
- results = list()
+ results = []
similarities.reverse()
while (len(results) < dynamic and similarities):
art_pop = similarities.pop()
history = self.player.queue + history
history = deque(history)
last_trk = history.popleft() # remove
- extra_arts = list()
- ret_extra = list()
+ extra_arts = []
+ ret_extra = []
depth = 0
while depth < self.plugin_conf.getint('depth'):
if not history:
def find_album(self, artists):
"""Find albums to queue.
"""
- to_add = list()
+ to_add = []
nb_album_add = 0
target_album_to_add = self.plugin_conf.getint('album_to_add')
for artist in artists:
"""
find top tracks for artists in artists list.
"""
- to_add = list()
+ to_add = []
nbtracks_target = self.plugin_conf.getint('track_to_add')
for artist in artists:
if len(to_add) == nbtracks_target:
self.log.info('Looking for a top track for %s', artist)
titles = deque()
try:
- titles = [t for t in self.ws.get_toptrack(artist)]
+ titles = list(self.ws.get_toptrack(artist))
except WSError as err:
self.log.warning('%s: %s', self.ws.name, err)
continue
self.log.debug(repr(self.player.current))
return None
candidates = self.queue_mode()
- msg = ' '.join(['{0}: {1:>3d}'.format(k, v) for
+ msg = ' '.join([f'{k}: {v:>3d}' for
k, v in sorted(self.ws.stats.items())])
self.log.debug('http stats: ' + msg)
if not candidates:
super().connect(host, port)
# Catch socket errors
except OSError as err:
- raise PlayerError('Could not connect to "%s:%s": %s' %
- (host, port, err.strerror)) from err
+ raise PlayerError(f'Could not connect to "{host}:{port}": {err.strerror}'
+ ) from err
# Catch all other possible errors
# ConnectionError and ProtocolError are always fatal. Others may not
# be, but we don't know how to handle them here, so treat them as if
# they are instead of ignoring them.
except MPDError as err:
- raise PlayerError('Could not connect to "%s:%s": %s' %
- (host, port, err)) from err
+ raise PlayerError(f'Could not connect to "{host}:{port}": {err}') from err
if password:
try:
self.password(password)
except (MPDError, OSError) as err:
- raise PlayerError("Could not connect to '%s': %s" % (host, err)) from err
+ raise PlayerError(f"Could not connect to '{host}': {err}") from err
# Controls we have sufficient rights
available_cmd = self.commands()
for cmd in MPD.needed_cmds:
if cmd not in available_cmd:
self.disconnect()
- raise PlayerError('Could connect to "%s", '
- 'but command "%s" not available' %
- (host, cmd))
+ raise PlayerError(f'Could connect to "{host}", but command "{cmd}" not available')
self.tagtypes('clear')
for tag in MPD.needed_tags:
self.tagtypes('enable', tag)
plm = {'repeat': None, 'single': None,
'random': None, 'consume': None, }
for key, val in self.status().items():
- if key in plm.keys():
+ if key in plm:
plm.update({key: bool(int(val))})
return plm
self.log.warning(' '.join(channels))
def sub_chan(self):
- self.chan = 'mpd_sima:{0}.{1}'.format(getfqdn(), getpid())
+ self.chan = f'mpd_sima:{getfqdn()}.{getpid()}'
self.log.debug('Registering as %s', self.chan)
try:
self.player.subscribe(self.chan)
if not trk:
continue
if queue_mode == 'track':
- self.log.info('Genre plugin chose: {}'.format(trk))
+ self.log.info('Genre plugin chose: %s', trk)
candidates.append(trk)
if len(candidates) == target:
break
if not trk:
continue
if queue_mode == 'track':
- self.log.info('Tags plugin chose: {}'.format(trk))
+ self.log.info('Tags plugin chose: %s', trk)
candidates.append(trk)
if len(candidates) == target:
break
res = cli.find(filt, 'window', (0, 300))
except PlayerError as err:
cli.disconnect()
- print('filter error: %s' % err, file=sys.stderr)
+ print(f'filter error: {err}', file=sys.stderr)
sys.exit(1)
artists = list({trk.albumartist for trk in res if trk.albumartist})
if not artists:
self.filedsc = None
self.is_locked = False
dirname = os.path.dirname(file_name)
- self.lockfile = os.path.join(dirname, '{0}.lock'.format(file_name))
+ self.lockfile = os.path.join(dirname, 'f{file_name}.lock')
self.file_name = file_name
self.timeout = timeout
self.delay = delay
def __init__(self, script_info,):
self.parser = None
self.info = dict(script_info)
- self.options = dict()
+ self.options = {}
self.main()
def declare_opts(self):
opt_names = opt.pop('sw')
self.parser.add_argument(*opt_names, **opt)
# Add sub commands
- sp = self.parser.add_subparsers(
+ spa = self.parser.add_subparsers(
title=f'{self.info["prog"]} commands as positional arguments',
description=f"""Use them after optionnal arguments.\n"{self.info["prog"]} command -h" for more info.""",
metavar='', dest='command')
for cmd in CMDS:
helpmsg = cmd.pop('help')
cmd, args = cmd.popitem()
- _ = sp.add_parser(cmd, description=helpmsg, help=helpmsg)
+ _ = spa.add_parser(cmd, description=helpmsg, help=helpmsg)
for arg in args:
name = arg.pop('name', None)
if name:
"""
def checks(self):
if isdir(self._file):
- self.parser.error('need a file not a directory: {}'.format(self._file))
+ self.parser.error(f'need a file not a directory: {self._file}')
if not exists(self._dir):
- self.parser.error('directory does not exist: {0}'.format(self._dir))
+ self.parser.error(f'directory does not exist: {self._dir}')
if not exists(self._file):
# Is parent directory writable then
if not access(self._dir, W_OK):
- self.parser.error('no write access to "{0}"'.format(self._dir))
+ self.parser.error(f'no write access to "{self._dir}"')
else:
if not access(self._file, W_OK):
- self.parser.error('no write access to "{0}"'.format(self._file))
+ self.parser.error(f'no write access to "{self._file}"')
class Rfile(FileAction):
"""
def checks(self):
if not exists(self._file):
- self.parser.error('file does not exist: {0}'.format(self._file))
+ self.parser.error(f'file does not exist: {self._file}')
if not isfile(self._file):
- self.parser.error('not a file: {0}'.format(self._file))
+ self.parser.error(f'not a file: {self._file}')
if not access(self._file, R_OK):
- self.parser.error('no read access to "{0}"'.format(self._file))
+ self.parser.error(f'no read access to "{self._file}"')
class Wdir(FileAction):
"""
def checks(self):
if not exists(self._file):
- self.parser.error('directory does not exist: {0}'.format(self._file))
+ self.parser.error(f'directory does not exist: {self._file}')
if not isdir(self._file):
- self.parser.error('not a directory: {0}'.format(self._file))
+ self.parser.error(f'not a directory: {self._file}')
if not access(self._file, W_OK):
- self.parser.error('no write access to "{0}"'.format(self._file))
+ self.parser.error(f'no write access to "{self._file}"')
class Throttle: