]> kaliko git repositories - mpd-sima.git/blob - sima/utils/utils.py
Some refactoring around Exceptions
[mpd-sima.git] / sima / utils / utils.py
1 # -*- coding: utf-8 -*-
2 #
3 # Copyright (c) 2010, 2011, 2013, 2014, 2015, 2020 kaliko <kaliko@azylum.org>
4 #
5 #  This file is part of sima
6 #
7 #  sima is free software: you can redistribute it and/or modify
8 #  it under the terms of the GNU General Public License as published by
9 #  the Free Software Foundation, either version 3 of the License, or
10 #  (at your option) any later version.
11 #
12 #  sima is distributed in the hope that it will be useful,
13 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #  GNU General Public License for more details.
16 #
17 #  You should have received a copy of the GNU General Public License
18 #  along with sima.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 #
21 """generic tools and utilities for sima
22 """
23 # pylint: disable=C0111
24
25 import logging
26 import traceback
27 import sys
28
29 from argparse import ArgumentError, Action
30 from base64 import b64decode as push
31 from codecs import getencoder
32 from datetime import datetime
33 from os import getenv, access, getcwd, W_OK, R_OK
34 from os.path import dirname, isabs, join, normpath, exists, isdir, isfile
35 from time import sleep
36
37 from musicpd import VERSION as mversion
38 from sima.info import __version__ as sversion
39
40
41 def getws(dic):
42     """
43     Decode Obfuscated api key.
44     Only preventing API keys harvesting over the network
45     https://developer.echonest.com/forums/thread/105
46     """
47     aka = push(bytes(dic.get('apikey') + '=', 'utf-8'))
48     aka = getencoder('rot-13')(str((aka), 'utf-8'))[0]
49     dic.update({'apikey': aka})
50
51
52 def parse_mpd_host(value):
53     passwd = host = None
54     # If password is set: MPD_HOST=pass@host
55     if '@' in value:
56         mpd_host_env = value.split('@', 1)
57         if mpd_host_env[0]:
58             # A password is actually set
59             passwd = mpd_host_env[0]
60             if mpd_host_env[1]:
61                 host = mpd_host_env[1]
62         elif mpd_host_env[1]:
63             # No password set but leading @ is an abstract socket
64             host = '@'+mpd_host_env[1]
65     else:
66         # MPD_HOST is a plain host
67         host = value
68     return host, passwd
69
70
71 def get_mpd_environ():
72     """
73     Retrieve MPD env. var.
74     """
75     passwd = host = None
76     if getenv('MPD_HOST'):
77         host, passwd = parse_mpd_host(getenv('MPD_HOST'))
78     return (host, getenv('MPD_PORT', None), passwd)
79
80
81 def normalize_path(path):
82     """Get absolute path
83     """
84     if not isabs(path):
85         return normpath(join(getcwd(), path))
86     return path
87
88
89 def exception_log():
90     """Log unknown exceptions"""
91     log = logging.getLogger(__name__)
92     log.error('Unhandled Exception!!!')
93     log.error(''.join(traceback.format_exc()))
94     log.info('musicpd python module version: %s', mversion)
95     log.info('MPD_sima version: %s', sversion)
96     log.info('Please report the previous message'
97              ' along with some log entries right before the crash.')
98     log.info('thanks for your help :)')
99     log.info('Quiting now!')
100     sys.exit(1)
101
102
103 # ArgParse Callbacks
104 class Obsolete(Action):
105     # pylint: disable=R0903
106     """Deal with obsolete arguments
107     """
108     def __call__(self, parser, namespace, values, option_string=None):
109         raise ArgumentError(self, 'obsolete argument')
110
111
112 class FileAction(Action):
113     """Generic class to inherit from for ArgParse action on file/dir
114     """
115     # pylint: disable=R0903
116     def __call__(self, parser, namespace, values, option_string=None):
117         self._file = normalize_path(values)
118         self._dir = dirname(self._file)
119         self.parser = parser
120         self.checks()
121         setattr(namespace, self.dest, self._file)
122
123     def checks(self):
124         """control method
125         """
126
127
128 class Wfile(FileAction):
129     # pylint: disable=R0903
130     """Is file writable
131     """
132     def checks(self):
133         if isdir(self._file):
134             self.parser.error('need a file not a directory: {}'.format(self._file))
135         if not exists(self._dir):
136             self.parser.error('directory does not exist: {0}'.format(self._dir))
137         if not exists(self._file):
138             # Is parent directory writable then
139             if not access(self._dir, W_OK):
140                 self.parser.error('no write access to "{0}"'.format(self._dir))
141         else:
142             if not access(self._file, W_OK):
143                 self.parser.error('no write access to "{0}"'.format(self._file))
144
145
146 class Rfile(FileAction):
147     # pylint: disable=R0903
148     """Is file readable
149     """
150     def checks(self):
151         if not exists(self._file):
152             self.parser.error('file does not exist: {0}'.format(self._file))
153         if not isfile(self._file):
154             self.parser.error('not a file: {0}'.format(self._file))
155         if not access(self._file, R_OK):
156             self.parser.error('no read access to "{0}"'.format(self._file))
157
158
159 class Wdir(FileAction):
160     # pylint: disable=R0903
161     """Is directory writable
162     """
163     def checks(self):
164         if not exists(self._file):
165             self.parser.error('directory does not exist: {0}'.format(self._file))
166         if not isdir(self._file):
167             self.parser.error('not a directory: {0}'.format(self._file))
168         if not access(self._file, W_OK):
169             self.parser.error('no write access to "{0}"'.format(self._file))
170
171
172 class Throttle:
173     """throttle decorator"""
174     def __init__(self, wait):
175         self.wait = wait
176         self.last_called = datetime.now()
177
178     def __call__(self, func):
179         def wrapper(*args, **kwargs):
180             while self.last_called + self.wait > datetime.now():
181                 sleep(0.1)
182             result = func(*args, **kwargs)
183             self.last_called = datetime.now()
184             return result
185         return wrapper
186
187
188 class MPDSimaException(Exception):
189     """Generic MPD_sima Exception"""
190
191
192 class SigHup(MPDSimaException):
193     """SIGHUP raises this Exception"""
194
195
196 # http client exceptions (for webservices)
197 class WSError(MPDSimaException):
198     pass
199
200
201 class WSNotFound(WSError):
202     pass
203
204
205 class WSTimeout(WSError):
206     pass
207
208
209 class WSHTTPError(WSError):
210     pass
211
212
213 class PluginException(MPDSimaException):
214     pass
215
216 # VIM MODLINE
217 # vim: ai ts=4 sw=4 sts=4 expandtab