]> kaliko git repositories - mpd-sima.git/blob - sima/utils/utils.py
Cleanup PlayerError exception wrapper
[mpd-sima.git] / sima / utils / utils.py
1 # -*- coding: utf-8 -*-
2 #
3 # Copyright (c) 2010, 2011, 2013, 2014, 2015, 2020, 2021, 2024 kaliko <kaliko@azylum.org>
4 #
5 #  This file is part of sima
6 #
7 #  sima is free software: you can redistribute it and/or modify
8 #  it under the terms of the GNU General Public License as published by
9 #  the Free Software Foundation, either version 3 of the License, or
10 #  (at your option) any later version.
11 #
12 #  sima is distributed in the hope that it will be useful,
13 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #  GNU General Public License for more details.
16 #
17 #  You should have received a copy of the GNU General Public License
18 #  along with sima.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 #
21 """generic tools and utilities for sima
22 """
23 # pylint: disable=C0111
24
25 import logging
26 import traceback
27 import sys
28
29 from argparse import ArgumentError, Action
30 from base64 import b64decode as push
31 from codecs import getencoder
32 from datetime import datetime
33 from os import getenv, access, getcwd, W_OK, R_OK
34 from os.path import dirname, isabs, join, normpath, exists, isdir, isfile
35 from time import sleep
36
37 from musicpd import VERSION as mversion
38 from sima.info import __version__ as sversion
39
40
41 def getws(dic):
42     """
43     Decode Obfuscated api key.
44     Only preventing API keys harvesting over the network
45     https://developer.echonest.com/forums/thread/105
46     """
47     aka = push(bytes(dic.get('apikey') + '=', 'utf-8'))
48     aka = getencoder('rot-13')(str((aka), 'utf-8'))[0]
49     dic.update({'apikey': aka})
50
51
52 def parse_mpd_host(value):
53     passwd = host = None
54     # If password is set: MPD_HOST=pass@host
55     if '@' in value:
56         mpd_host_env = value.split('@', 1)
57         if mpd_host_env[0]:
58             # A password is actually set
59             passwd = mpd_host_env[0]
60             if mpd_host_env[1]:
61                 host = mpd_host_env[1]
62         elif mpd_host_env[1]:
63             # No password set but leading @ is an abstract socket
64             host = '@'+mpd_host_env[1]
65     else:
66         # MPD_HOST is a plain host
67         host = value
68     return host, passwd
69
70
71 def get_mpd_environ():
72     """
73     Retrieve MPD env. var.
74     """
75     passwd = host = None
76     if getenv('MPD_HOST'):
77         host, passwd = parse_mpd_host(getenv('MPD_HOST'))
78     return (host, getenv('MPD_PORT', None), passwd)
79
80
81 def normalize_path(path):
82     """Get absolute path
83     """
84     if not isabs(path):
85         return normpath(join(getcwd(), path))
86     return path
87
88
89 def exception_log():
90     """Log unknown exceptions"""
91     log = logging.getLogger(__name__)
92     log.error('Unhandled Exception!!!')
93     log.error(''.join(traceback.format_exc()))
94     log.info('musicpd python module version: %s', mversion)
95     log.info('MPD_sima version: %s', sversion)
96     log.info('Please report the previous message'
97              ' along with some log entries right before the crash.')
98     log.info('thanks for your help :)')
99     log.info('Quiting now!')
100     sys.exit(1)
101
102
103 # ArgParse Callbacks
104 class Obsolete(Action):
105     # pylint: disable=R0903
106     """Deal with obsolete arguments
107     """
108     def __call__(self, parser, namespace, values, option_string=None):
109         raise ArgumentError(self, 'obsolete argument')
110
111
112 class FileAction(Action):
113     """Generic class to inherit from for ArgParse action on file/dir
114     """
115     # pylint: disable=R0903,W0201
116     def __call__(self, parser, namespace, values, option_string=None):
117         self._file = normalize_path(values)
118         self._dir = dirname(self._file)
119         self.parser = parser
120         self.checks()
121         setattr(namespace, self.dest, self._file)
122
123     def checks(self):
124         """control method
125         """
126
127
128 class Wfile(FileAction):
129     # pylint: disable=R0903
130     """Is file writable
131     """
132     def checks(self):
133         if isdir(self._file):
134             self.parser.error(f'need a file not a directory: {self._file}')
135         if not exists(self._dir):
136             self.parser.error(f'directory does not exist: {self._dir}')
137         if not exists(self._file):
138             # Is parent directory writable then
139             if not access(self._dir, W_OK):
140                 self.parser.error(f'no write access to "{self._dir}"')
141         else:
142             if not access(self._file, W_OK):
143                 self.parser.error(f'no write access to "{self._file}"')
144
145
146 class Rfile(FileAction):
147     # pylint: disable=R0903
148     """Is file readable
149     """
150     def checks(self):
151         if not exists(self._file):
152             self.parser.error(f'file does not exist: {self._file}')
153         if not isfile(self._file):
154             self.parser.error(f'not a file: {self._file}')
155         if not access(self._file, R_OK):
156             self.parser.error(f'no read access to "{self._file}"')
157
158
159 class Wdir(FileAction):
160     # pylint: disable=R0903
161     """Is directory writable
162     """
163     def checks(self):
164         if not exists(self._file):
165             self.parser.error(f'directory does not exist: {self._file}')
166         if not isdir(self._file):
167             self.parser.error(f'not a directory: {self._file}')
168         if not access(self._file, W_OK):
169             self.parser.error(f'no write access to "{self._file}"')
170
171
172 class Throttle:
173     """throttle decorator"""
174     # pylint: disable=R0903
175     def __init__(self, wait):
176         self.wait = wait
177         self.last_called = datetime.now()
178
179     def __call__(self, func):
180         def wrapper(*args, **kwargs):
181             while self.last_called + self.wait > datetime.now():
182                 sleep(0.1)
183             result = func(*args, **kwargs)
184             self.last_called = datetime.now()
185             return result
186         return wrapper
187
188
189 class MPDSimaException(Exception):
190     """Generic MPD_sima Exception"""
191
192
193 class SigHup(MPDSimaException):
194     """SIGHUP raises this Exception"""
195
196
197 # http client exceptions (for webservices)
198 class WSError(MPDSimaException):
199     pass
200
201
202 class WSNotFound(WSError):
203     pass
204
205
206 class WSTimeout(WSError):
207     pass
208
209
210 class WSHTTPError(WSError):
211     pass
212
213
214 class PluginException(MPDSimaException):
215     pass
216
217
218 # Wrap Exception decorator
219 def get_decorator(errors=(OSError, TimeoutError), wrap_into=Exception):
220     def decorator(func):
221         def w_func(*args, **kwargs):
222             try:
223                 return func(*args, **kwargs)
224             except errors as err:
225                 strerr = str(err)
226                 if hasattr(err, 'strerror'):
227                     if err.strerror:
228                         strerr = err.strerror
229                 raise wrap_into(strerr) from err
230         return w_func
231     return decorator
232
233 # VIM MODLINE
234 # vim: ai ts=4 sw=4 sts=4 expandtab