# -*- coding: utf-8 -*-
#
-# Copyright (c) 2010, 2011, 2013 Jack Kaliko <kaliko@azylum.org>
+# Copyright (c) 2010, 2011, 2013, 2014, 2015, 2020 kaliko <kaliko@azylum.org>
#
# This file is part of sima
#
# along with sima. If not, see <http://www.gnu.org/licenses/>.
#
#
-"""generic tools and utilitaries for sima
+"""generic tools and utilities for sima
"""
+# pylint: disable=C0111
+import logging
import traceback
import sys
-from argparse import (ArgumentError, Action)
-from os import (environ, access, getcwd, W_OK, R_OK)
-from os.path import (dirname, isabs, join, normpath, exists, isdir, isfile)
+from argparse import ArgumentError, Action
+from base64 import b64decode as push
+from codecs import getencoder
+from datetime import datetime
+from os import environ, access, getcwd, W_OK, R_OK
+from os.path import dirname, isabs, join, normpath, exists, isdir, isfile
+from time import sleep
+
+
+def getws(dic):
+ """
+ Decode Obfuscated api key.
+ Only preventing API keys harvesting over the network
+ https://developer.echonest.com/forums/thread/105
+ """
+ aka = push(bytes(dic.get('apikey') + '=', 'utf-8'))
+ aka = getencoder('rot-13')(str((aka), 'utf-8'))[0]
+ dic.update({'apikey': aka})
+
def get_mpd_environ():
"""
passwd = mpd_host_env[1]
return (host, environ.get('MPD_PORT', None), passwd)
+
def normalize_path(path):
"""Get absolute path
"""
return normpath(join(getcwd(), path))
return path
+
def exception_log():
"""Log unknown exceptions"""
- import logging
- log = logging.getLogger('sima')
+ log = logging.getLogger(__name__)
log.error('Unhandled Exception!!!')
log.error(''.join(traceback.format_exc()))
log.info('Please report the previous message'
sys.exit(1)
+class SigHup(Exception):
+ """SIGHUP raises this Exception"""
+
+
# ArgParse Callbacks
class Obsolete(Action):
# pylint: disable=R0903
def __call__(self, parser, namespace, values, option_string=None):
raise ArgumentError(self, 'obsolete argument')
+
class FileAction(Action):
- """Generic class to inherit from for ARgPArse action on file/dir
+ """Generic class to inherit from for ArgParse action on file/dir
"""
# pylint: disable=R0903
def __call__(self, parser, namespace, values, option_string=None):
def checks(self):
"""control method
"""
- pass
+
class Wfile(FileAction):
# pylint: disable=R0903
"""Is file writable
"""
def checks(self):
+ if isdir(self._file):
+ self.parser.error('need a file not a directory: {}'.format(self._file))
if not exists(self._dir):
#raise ArgumentError(self, '"{0}" does not exist'.format(self._dir))
- self.parser.error('file does not exist: {0}'.format(self._dir))
+ self.parser.error('directory does not exist: {0}'.format(self._dir))
if not exists(self._file):
# Is parent directory writable then
if not access(self._dir, W_OK):
if not access(self._file, W_OK):
self.parser.error('no write access to "{0}"'.format(self._file))
+
class Rfile(FileAction):
# pylint: disable=R0903
"""Is file readable
if not access(self._file, R_OK):
self.parser.error('no read access to "{0}"'.format(self._file))
+
class Wdir(FileAction):
# pylint: disable=R0903
"""Is directory writable
self.parser.error('no write access to "{0}"'.format(self._file))
+class Throttle:
+ """throttle decorator"""
+ def __init__(self, wait):
+ self.wait = wait
+ self.last_called = datetime.now()
+
+ def __call__(self, func):
+ def wrapper(*args, **kwargs):
+ while self.last_called + self.wait > datetime.now():
+ sleep(0.1)
+ result = func(*args, **kwargs)
+ self.last_called = datetime.now()
+ return result
+ return wrapper
+
+
+class MPDSimaException(Exception):
+ """Generic MPD_sima Exception"""
+
+
+# http client exceptions (for webservices)
+class WSError(MPDSimaException):
+ pass
+
+
+class WSNotFound(WSError):
+ pass
+
+
+class WSTimeout(WSError):
+ pass
+
+
+class WSHTTPError(WSError):
+ pass
+
+
+class PluginException(MPDSimaException):
+ pass
+
# VIM MODLINE
# vim: ai ts=4 sw=4 sts=4 expandtab