]> kaliko git repositories - mpd-sima.git/blobdiff - sima/utils/utils.py
Cleanup PlayerError exception wrapper
[mpd-sima.git] / sima / utils / utils.py
index e3927fcd7cbff823dbda081cb2b5f561b56f25e2..cb672639e2e780d06187f641b9b0ac0c5d84ebe7 100644 (file)
@@ -1,6 +1,6 @@
 # -*- coding: utf-8 -*-
 #
-# Copyright (c) 2010, 2011, 2013 Jack Kaliko <kaliko@azylum.org>
+# Copyright (c) 2010, 2011, 2013, 2014, 2015, 2020, 2021, 2024 kaliko <kaliko@azylum.org>
 #
 #  This file is part of sima
 #
 #  along with sima.  If not, see <http://www.gnu.org/licenses/>.
 #
 #
-"""generic tools and utilitaries for sima
+"""generic tools and utilities for sima
 """
+# pylint: disable=C0111
 
+import logging
 import traceback
 import sys
 
 from argparse import ArgumentError, Action
-from os import environ, access, getcwd, W_OK, R_OK
+from base64 import b64decode as push
+from codecs import getencoder
+from datetime import datetime
+from os import getenv, access, getcwd, W_OK, R_OK
 from os.path import dirname, isabs, join, normpath, exists, isdir, isfile
+from time import sleep
+
+from musicpd import VERSION as mversion
+from sima.info import __version__ as sversion
+
+
+def getws(dic):
+    """
+    Decode Obfuscated api key.
+    Only preventing API keys harvesting over the network
+    https://developer.echonest.com/forums/thread/105
+    """
+    aka = push(bytes(dic.get('apikey') + '=', 'utf-8'))
+    aka = getencoder('rot-13')(str((aka), 'utf-8'))[0]
+    dic.update({'apikey': aka})
+
+
+def parse_mpd_host(value):
+    passwd = host = None
+    # If password is set: MPD_HOST=pass@host
+    if '@' in value:
+        mpd_host_env = value.split('@', 1)
+        if mpd_host_env[0]:
+            # A password is actually set
+            passwd = mpd_host_env[0]
+            if mpd_host_env[1]:
+                host = mpd_host_env[1]
+        elif mpd_host_env[1]:
+            # No password set but leading @ is an abstract socket
+            host = '@'+mpd_host_env[1]
+    else:
+        # MPD_HOST is a plain host
+        host = value
+    return host, passwd
+
 
 def get_mpd_environ():
     """
     Retrieve MPD env. var.
     """
     passwd = host = None
-    mpd_host_env = environ.get('MPD_HOST')
-    if mpd_host_env:
-        # If password is set:
-        # mpd_host_env = ['pass', 'host'] because MPD_HOST=pass@host
-        mpd_host_env = mpd_host_env.split('@')
-        mpd_host_env.reverse()
-        host = mpd_host_env[0]
-        if len(mpd_host_env) > 1 and mpd_host_env[1]:
-            passwd = mpd_host_env[1]
-    return (host, environ.get('MPD_PORT', None), passwd)
+    if getenv('MPD_HOST'):
+        host, passwd = parse_mpd_host(getenv('MPD_HOST'))
+    return (host, getenv('MPD_PORT', None), passwd)
+
 
 def normalize_path(path):
     """Get absolute path
@@ -51,18 +85,21 @@ def normalize_path(path):
         return normpath(join(getcwd(), path))
     return path
 
+
 def exception_log():
     """Log unknown exceptions"""
-    import logging
-    log = logging.getLogger('sima')
+    log = logging.getLogger(__name__)
     log.error('Unhandled Exception!!!')
     log.error(''.join(traceback.format_exc()))
+    log.info('musicpd python module version: %s', mversion)
+    log.info('MPD_sima version: %s', sversion)
     log.info('Please report the previous message'
              ' along with some log entries right before the crash.')
     log.info('thanks for your help :)')
     log.info('Quiting now!')
     sys.exit(1)
 
+
 # ArgParse Callbacks
 class Obsolete(Action):
     # pylint: disable=R0903
@@ -71,10 +108,11 @@ class Obsolete(Action):
     def __call__(self, parser, namespace, values, option_string=None):
         raise ArgumentError(self, 'obsolete argument')
 
+
 class FileAction(Action):
     """Generic class to inherit from for ArgParse action on file/dir
     """
-    # pylint: disable=R0903
+    # pylint: disable=R0903,W0201
     def __call__(self, parser, namespace, values, option_string=None):
         self._file = normalize_path(values)
         self._dir = dirname(self._file)
@@ -85,23 +123,25 @@ class FileAction(Action):
     def checks(self):
         """control method
         """
-        pass
+
 
 class Wfile(FileAction):
     # pylint: disable=R0903
     """Is file writable
     """
     def checks(self):
+        if isdir(self._file):
+            self.parser.error(f'need a file not a directory: {self._file}')
         if not exists(self._dir):
-            #raise ArgumentError(self, '"{0}" does not exist'.format(self._dir))
-            self.parser.error('file does not exist: {0}'.format(self._dir))
+            self.parser.error(f'directory does not exist: {self._dir}')
         if not exists(self._file):
             # Is parent directory writable then
             if not access(self._dir, W_OK):
-                self.parser.error('no write access to "{0}"'.format(self._dir))
+                self.parser.error(f'no write access to "{self._dir}"')
         else:
             if not access(self._file, W_OK):
-                self.parser.error('no write access to "{0}"'.format(self._file))
+                self.parser.error(f'no write access to "{self._file}"')
+
 
 class Rfile(FileAction):
     # pylint: disable=R0903
@@ -109,11 +149,12 @@ class Rfile(FileAction):
     """
     def checks(self):
         if not exists(self._file):
-            self.parser.error('file does not exist: {0}'.format(self._file))
+            self.parser.error(f'file does not exist: {self._file}')
         if not isfile(self._file):
-            self.parser.error('not a file: {0}'.format(self._file))
+            self.parser.error(f'not a file: {self._file}')
         if not access(self._file, R_OK):
-            self.parser.error('no read access to "{0}"'.format(self._file))
+            self.parser.error(f'no read access to "{self._file}"')
+
 
 class Wdir(FileAction):
     # pylint: disable=R0903
@@ -121,12 +162,73 @@ class Wdir(FileAction):
     """
     def checks(self):
         if not exists(self._file):
-            self.parser.error('directory does not exist: {0}'.format(self._file))
+            self.parser.error(f'directory does not exist: {self._file}')
         if not isdir(self._file):
-            self.parser.error('not a directory: {0}'.format(self._file))
+            self.parser.error(f'not a directory: {self._file}')
         if not access(self._file, W_OK):
-            self.parser.error('no write access to "{0}"'.format(self._file))
+            self.parser.error(f'no write access to "{self._file}"')
+
+
+class Throttle:
+    """throttle decorator"""
+    # pylint: disable=R0903
+    def __init__(self, wait):
+        self.wait = wait
+        self.last_called = datetime.now()
+
+    def __call__(self, func):
+        def wrapper(*args, **kwargs):
+            while self.last_called + self.wait > datetime.now():
+                sleep(0.1)
+            result = func(*args, **kwargs)
+            self.last_called = datetime.now()
+            return result
+        return wrapper
+
+
+class MPDSimaException(Exception):
+    """Generic MPD_sima Exception"""
+
+
+class SigHup(MPDSimaException):
+    """SIGHUP raises this Exception"""
+
+
+# http client exceptions (for webservices)
+class WSError(MPDSimaException):
+    pass
+
+
+class WSNotFound(WSError):
+    pass
+
+
+class WSTimeout(WSError):
+    pass
+
+
+class WSHTTPError(WSError):
+    pass
+
+
+class PluginException(MPDSimaException):
+    pass
+
 
+# Wrap Exception decorator
+def get_decorator(errors=(OSError, TimeoutError), wrap_into=Exception):
+    def decorator(func):
+        def w_func(*args, **kwargs):
+            try:
+                return func(*args, **kwargs)
+            except errors as err:
+                strerr = str(err)
+                if hasattr(err, 'strerror'):
+                    if err.strerror:
+                        strerr = err.strerror
+                raise wrap_into(strerr) from err
+        return w_func
+    return decorator
 
 # VIM MODLINE
 # vim: ai ts=4 sw=4 sts=4 expandtab