queue_trigger = self.config.getint('sima', 'queue_length')
if self.player.playmode.get('random'):
queue = self.player.playlist
- self.log.debug('Currently %s track(s) in the playlist. (target %s)', len(queue), queue_trigger)
+ self.log.debug('Currently %s track(s) in the playlist. (target %s)',
+ len(queue), queue_trigger)
else:
queue = self.player.queue
self.log.debug('Currently %s track(s) ahead. (target %s)', len(queue), queue_trigger)
self.log.info('bye...')
def run(self):
- """
- """
try:
self.log.info('Connecting MPD: %(host)s:%(port)s', self.config['MPD'])
self.player.connect()
# -*- coding: utf-8 -*-
-# Copyright (c) 2013, 2014, 2015, 2020,2021 kaliko <kaliko@azylum.org>
+# Copyright (c) 2013, 2014, 2015, 2020, 2021 kaliko <kaliko@azylum.org>
#
# This file is part of sima
#
"""Handles internal/external plugins
sima: sima.core.Sima instance
source: ['internal', 'contrib']
- """
+ """# pylint: disable=logging-not-lazy,logging-format-interpolation
if not sima.config.get('sima', source):
return
logger = logging.getLogger('sima')
if cmd == "generate-config":
config.write(sys.stdout, space_around_delimiters=True)
sys.exit(0)
- logger.info('Running "%s" and exit' % cmd)
+ logger.info('Running "%s" and exit', cmd)
if cmd == "config-test":
logger.info('Config location: "%s"', cfg_mgmt.conf_file)
from .utils.configtest import config_test
# -*- coding: utf-8 -*-
-# Copyright (c) 2014 kaliko <kaliko@azylum.org>
+# Copyright (c) 2014, 2021 kaliko <kaliko@azylum.org>
# Copyright (c) 2012, 2013 Eric Larson <eric@ionrock.org>
#
# This program is free software: you can redistribute it and/or modify
name = self._fn(key)
if os.path.exists(name):
return load(codecs.open(name, 'rb'))
+ return None
def set(self, key, value):
name = self._fn(key)
#
# You should have received a copy of the GNU General Public License
# along with MPD_sima. If not, see <http://www.gnu.org/licenses/>.
+#
+# pylint: disable=all
import atexit
import os
return (groups[1], groups[3], groups[4], groups[6], groups[8])
-class CacheController(object):
+class CacheController:
"""An interface to see if request should cached or not.
"""
CACHE_ANYWAY = False
# Could do syntax based normalization of the URI before
# computing the digest. See Section 6.2.2 of Std 66.
- request_uri = query and "?".join([path, query]) or path
+ request_uri = "?".join([path, query]) if query else path
scheme = scheme.lower()
defrag_uri = scheme + "://" + authority + request_uri
cc = self.parse_cache_control(request.headers)
# non-caching states
- no_cache = True if 'no-cache' in cc else False
+ no_cache = bool('no-cache' in cc)
if 'max-age' in cc and cc['max-age'] == 0:
no_cache = True
# see if it is in the cache anyways
# -*- coding: utf-8 -*-
-# Copyright (c) 2009, 2010, 2013, 2014, 2015 kaliko <kaliko@azylum.org>
+# Copyright (c) 2009, 2010, 2013, 2014, 2015, 2021 kaliko <kaliko@azylum.org>
#
# This file is part of sima
#
logfile: file to log to
"""
name = 'sima'
- if getenv('TRACE', False):
+ if getenv('TRACE', None):
user_log_level = TRACE_LEVEL_NUM
else:
user_log_level = getattr(logging, level.upper())
raise MetaException('Need a "name" argument (str type)')
if not isinstance(kwargs.get('name'), str):
raise MetaException('"name" argument not a string')
- else:
- self.__name = kwargs.pop('name').split(SEPARATOR)[0]
+ self.__name = kwargs.pop('name').split(SEPARATOR)[0]
if 'mbid' in kwargs and kwargs.get('mbid'):
mbid = kwargs.get('mbid').lower().split(SEPARATOR)[0]
if is_uuid4(mbid):
connection = self.get_database_connection()
rows = connection.execute(
"SELECT name FROM sqlite_master WHERE type='table'")
- for r in rows.fetchall():
- connection.execute(f'DROP TABLE IF EXISTS {r[0]}')
+ for row in rows.fetchall():
+ connection.execute(f'DROP TABLE IF EXISTS {row[0]}')
connection.close()
def _remove_blocklist_id(self, blid, with_connection=None):
return connection.execute(
"SELECT id FROM albums WHERE mbid = ?",
(album.mbid,))
- else:
- return connection.execute(
- "SELECT id FROM albums WHERE name = ? AND mbid IS NULL",
- (album.name,))
+ return connection.execute(
+ "SELECT id FROM albums WHERE name = ? AND mbid IS NULL",
+ (album.name,))
def get_album(self, album, with_connection=None, add=True):
"""get album information from the database.
return connection.execute(
"SELECT id FROM albumartists WHERE mbid = ?",
(artist.mbid,))
- else:
- return connection.execute(
- "SELECT id FROM albumartists WHERE name = ? AND mbid IS NULL",
- (artist.name,))
+ return connection.execute(
+ "SELECT id FROM albumartists WHERE name = ? AND mbid IS NULL",
+ (artist.name,))
def get_albumartist(self, artist, with_connection=None, add=True):
"""get albumartist information from the database.
return connection.execute(
"SELECT id FROM artists WHERE mbid = ?",
(artist.mbid,))
- else:
- return connection.execute(
- "SELECT id FROM artists WHERE name = ? AND mbid IS NULL", (artist.name,))
+ return connection.execute(
+ "SELECT id FROM artists WHERE name = ? AND mbid IS NULL", (artist.name,))
def get_artist(self, artist, with_connection=None, add=True):
"""get artist information from the database.
def _add_tracks_genres(self, track, connection):
if not track.genres:
- return None
+ return
rows = connection.execute(
"SELECT id FROM tracks WHERE file = ?", (track.file,))
trk_id = rows.fetchone()[0]
hist.append(artist) # No need to go further
break
continue
- elif needle and getattr(needle, '__contains__'):
+ if needle and getattr(needle, '__contains__'):
if artist in needle:
hist.append(artist) # No need to go further
continue
connection.commit()
rows = connection.execute(
"SELECT id FROM blocklist WHERE track = ?", (track_id,))
- bl = rows.fetchone()[0]
+ blt = rows.fetchone()[0]
if not with_connection:
connection.close()
- return bl
+ return blt
def get_bl_album(self, album, with_connection=None, add=True):
"""Add an album to blocklist
connection.commit()
rows = connection.execute(
"SELECT id FROM blocklist WHERE album = ?", (album_id,))
- bl = rows.fetchone()[0]
+ blitem = rows.fetchone()[0]
if not with_connection:
connection.close()
- return bl
+ return blitem
def get_bl_artist(self, artist, with_connection=None, add=True):
"""Add an artist to blocklist
connection.commit()
rows = connection.execute(
"SELECT id FROM blocklist WHERE artist = ?", (artist_id,))
- bl = rows.fetchone()[0]
+ blitem = rows.fetchone()[0]
if not with_connection:
connection.close()
- return bl
+ return blitem
def view_bl(self):
connection = self.get_database_connection()
# -*- coding: utf-8 -*-
#
-# Copyright (c) 2009, 2010, 2013 kaliko <kaliko@azylum.org>
+# Copyright (c) 2009, 2010, 2013, 2021 kaliko <kaliko@azylum.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as
def __init__(self, fuzzstr):
"""
"""
+ super().__init__()
self.orig = str(fuzzstr)
self.stripped = str(fuzzstr.strip())
# fuzzy computation
@property
def length(self):
- """Get a fancy duration as ``%H:%M:%S`` (use :attr:`duration` to get duration in second only)"""
+ """Get a fancy duration as ``%H:%M:%S`` (use :attr:`duration` to get
+ duration in second only)"""
temps = time.gmtime(self.duration) # TODO: returns a date not a duration
if temps.tm_hour:
fmt = '%H:%M:%S'
# -*- coding: utf-8 -*-
-# Copyright (c) 2009-2020 kaliko <kaliko@azylum.org>
+# Copyright (c) 2009-2021 kaliko <kaliko@azylum.org>
# Copyright (c) 2019 sacha <sachahony@gmail.com>
#
# This file is part of sima
to_add.extend(candidates)
if nb_album_add == target_album_to_add:
return to_add
+ return to_add
def find_top(self, artists):
"""
self.tagtypes('clear')
for tag in MPD.needed_tags:
self.tagtypes('enable', tag)
- tt = set(map(str.lower, self.tagtypes()))
+ ltt = set(map(str.lower, self.tagtypes()))
needed_tags = set(map(str.lower, MPD.needed_tags))
- if len(needed_tags & tt) != len(MPD.needed_tags):
- self.log.warning('MPD exposes: %s', tt)
+ if len(needed_tags & ltt) != len(MPD.needed_tags):
+ self.log.warning('MPD exposes: %s', ltt)
self.log.warning('Tags needed: %s', needed_tags)
raise PlayerError('Missing mandatory metadata!')
for tag in MPD.needed_mbid_tags:
self.tagtypes('enable', tag)
# Controls use of MusicBrainzIdentifier
if self.config.getboolean('sima', 'musicbrainzid'):
- tt = set(self.tagtypes())
- if len(MPD.needed_mbid_tags & tt) != len(MPD.needed_mbid_tags):
+ ltt = set(self.tagtypes())
+ if len(MPD.needed_mbid_tags & ltt) != len(MPD.needed_mbid_tags):
self.log.warning('Use of MusicBrainzIdentifier is set but MPD '
'is not providing related metadata')
- self.log.info(tt)
+ self.log.info(ltt)
self.log.warning('Disabling MusicBrainzIdentifier')
self.use_mbid = Meta.use_mbid = False
else:
- self.log.debug('Available metadata: %s', tt)
+ self.log.debug('Available metadata: %s', ltt)
self.use_mbid = Meta.use_mbid = True
else:
self.log.warning('Use of MusicBrainzIdentifier disabled!')
artist.name, mbids[0], artist.mbid)
else:
return mbids[0]
+ return None
@bl_artist
@set_artist_mbid
if artist.mbid == album_trks[0].musicbrainz_albumartistid:
candidates.append(album)
continue
- else:
- self.log.debug('Discarding "%s", "%r" not set as musicbrainz_albumartistid', album, album.Artist)
- continue
+ self.log.debug('Discarding "%s", "%r" not set as musicbrainz_albumartistid',
+ album, album.Artist)
+ continue
if 'Various Artists' in album_artists:
self.log.debug('Discarding %s ("Various Artists" set)', album)
continue
# -*- coding: utf-8 -*-
-# Copyright (c) 2020 kaliko <kaliko@azylum.org>
+# Copyright (c) 2020, 2021 kaliko <kaliko@azylum.org>
#
# This file is part of sima
#
blocklist = self.sdb.view_bl()
for entry in ['artist', 'album', 'title']:
header = False
- for bl in blocklist:
- val = bl.get(entry, '')
- mbid = bl.get(f'musicbrainz_{entry}', '')
+ for blitem in blocklist:
+ val = blitem.get(entry, '')
+ mbid = blitem.get(f'musicbrainz_{entry}', '')
if val or mbid:
if not header:
header = True
self.log.info(f'{entry.capitalize()}'
'(id name musicbranzID):')
- self.log.info(f'{bl["id"]} "{val}"\t\t{mbid}')
+ self.log.info(f'{blitem["id"]} "{val}"\t\t{mbid}')
def bl_add_artist(self):
artist = self.options.get('artist', None)
# You should have received a copy of the GNU General Public License
# along with sima. If not, see <http://www.gnu.org/licenses/>.
#
-# pylint: disable=bad-continuation
"""
Deal with configuration and data files.
self.log.debug('file permission is: %o', mode)
if mode & S_IRWXO or mode & S_IRWXG:
self.log.warning('File is readable by "other" and/or' +
- ' "group" (actual permission %o octal).' %
- mode)
- self.log.warning('Consider setting permissions' +
- ' to 600 octal.')
+ ' "group" (actual permission %o octal).', mode)
+ self.log.warning('Consider setting permissions to 600 octal.')
def supersedes_config_with_cmd_line_options(self):
"""Updates defaults settings with command line options"""
# -*- coding: utf-8 -*-
# Copyright (c) 2009 Evan Fosmark
-# Copyright (c) 2014 kaliko <kaliko@azylum.org>
+# Copyright (c) 2014, 2021 kaliko <kaliko@azylum.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
self.acquire()
return self
- def __exit__(self, type, value, traceback):
+ def __exit__(self, ex_type, ex_value, ex_traceback):
"""end of the with statement
"""
if self.is_locked:
# -*- coding: utf-8 -*-
#
-# Copyright (c) 2010, 2011, 2013, 2014, 2015, 2020 kaliko <kaliko@azylum.org>
+# Copyright (c) 2010, 2011, 2013, 2014, 2015, 2020, 2021 kaliko <kaliko@azylum.org>
#
# This file is part of sima
#
class FileAction(Action):
"""Generic class to inherit from for ArgParse action on file/dir
"""
- # pylint: disable=R0903
+ # pylint: disable=R0903,W0201
def __call__(self, parser, namespace, values, option_string=None):
self._file = normalize_path(values)
self._dir = dirname(self._file)
class Throttle:
"""throttle decorator"""
+ # pylint: disable=R0903
def __init__(self, wait):
self.wait = wait
self.last_called = datetime.now()