]> kaliko git repositories - mpd-sima.git/blob - sima/utils/utils.py
d226ef08303bf3fde53064743b73781e3e868158
[mpd-sima.git] / sima / utils / utils.py
1 # -*- coding: utf-8 -*-
2 #
3 # Copyright (c) 2010, 2011, 2013, 2014, 2015, 2020 kaliko <kaliko@azylum.org>
4 #
5 #  This file is part of sima
6 #
7 #  sima is free software: you can redistribute it and/or modify
8 #  it under the terms of the GNU General Public License as published by
9 #  the Free Software Foundation, either version 3 of the License, or
10 #  (at your option) any later version.
11 #
12 #  sima is distributed in the hope that it will be useful,
13 #  but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #  GNU General Public License for more details.
16 #
17 #  You should have received a copy of the GNU General Public License
18 #  along with sima.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 #
21 """generic tools and utilities for sima
22 """
23 # pylint: disable=C0111
24
25 import logging
26 import traceback
27 import sys
28
29 from argparse import ArgumentError, Action
30 from base64 import b64decode as push
31 from codecs import getencoder
32 from datetime import datetime
33 from os import getenv, access, getcwd, W_OK, R_OK
34 from os.path import dirname, isabs, join, normpath, exists, isdir, isfile
35 from time import sleep
36
37 from musicpd import VERSION as mversion
38 from sima.info import __version__ as sversion
39
40
41 def getws(dic):
42     """
43     Decode Obfuscated api key.
44     Only preventing API keys harvesting over the network
45     https://developer.echonest.com/forums/thread/105
46     """
47     aka = push(bytes(dic.get('apikey') + '=', 'utf-8'))
48     aka = getencoder('rot-13')(str((aka), 'utf-8'))[0]
49     dic.update({'apikey': aka})
50
51
52 def parse_mpd_host(value):
53     passwd = host = None
54     # If password is set: MPD_HOST=pass@host
55     if '@' in value:
56         mpd_host_env = value.split('@', 1)
57         if mpd_host_env[0]:
58             # A password is actually set
59             passwd = mpd_host_env[0]
60             if mpd_host_env[1]:
61                 host = mpd_host_env[1]
62         elif mpd_host_env[1]:
63             # No password set but leading @ is an abstract socket
64             host = '@'+mpd_host_env[1]
65     else:
66         # MPD_HOST is a plain host
67         host = value
68     return host, passwd
69
70
71 def get_mpd_environ():
72     """
73     Retrieve MPD env. var.
74     """
75     passwd = host = None
76     if getenv('MPD_HOST'):
77         host, passwd = parse_mpd_host(getenv('MPD_HOST'))
78     return (host, getenv('MPD_PORT', None), passwd)
79
80
81 def normalize_path(path):
82     """Get absolute path
83     """
84     if not isabs(path):
85         return normpath(join(getcwd(), path))
86     return path
87
88
89 def exception_log():
90     """Log unknown exceptions"""
91     log = logging.getLogger(__name__)
92     log.error('Unhandled Exception!!!')
93     log.error(''.join(traceback.format_exc()))
94     log.info('musicpd python module version: %s', mversion)
95     log.info('MPD_sima version: %s', sversion)
96     log.info('Please report the previous message'
97              ' along with some log entries right before the crash.')
98     log.info('thanks for your help :)')
99     log.info('Quiting now!')
100     sys.exit(1)
101
102
103 class SigHup(Exception):
104     """SIGHUP raises this Exception"""
105
106
107 # ArgParse Callbacks
108 class Obsolete(Action):
109     # pylint: disable=R0903
110     """Deal with obsolete arguments
111     """
112     def __call__(self, parser, namespace, values, option_string=None):
113         raise ArgumentError(self, 'obsolete argument')
114
115
116 class FileAction(Action):
117     """Generic class to inherit from for ArgParse action on file/dir
118     """
119     # pylint: disable=R0903
120     def __call__(self, parser, namespace, values, option_string=None):
121         self._file = normalize_path(values)
122         self._dir = dirname(self._file)
123         self.parser = parser
124         self.checks()
125         setattr(namespace, self.dest, self._file)
126
127     def checks(self):
128         """control method
129         """
130
131
132 class Wfile(FileAction):
133     # pylint: disable=R0903
134     """Is file writable
135     """
136     def checks(self):
137         if isdir(self._file):
138             self.parser.error('need a file not a directory: {}'.format(self._file))
139         if not exists(self._dir):
140             self.parser.error('directory does not exist: {0}'.format(self._dir))
141         if not exists(self._file):
142             # Is parent directory writable then
143             if not access(self._dir, W_OK):
144                 self.parser.error('no write access to "{0}"'.format(self._dir))
145         else:
146             if not access(self._file, W_OK):
147                 self.parser.error('no write access to "{0}"'.format(self._file))
148
149
150 class Rfile(FileAction):
151     # pylint: disable=R0903
152     """Is file readable
153     """
154     def checks(self):
155         if not exists(self._file):
156             self.parser.error('file does not exist: {0}'.format(self._file))
157         if not isfile(self._file):
158             self.parser.error('not a file: {0}'.format(self._file))
159         if not access(self._file, R_OK):
160             self.parser.error('no read access to "{0}"'.format(self._file))
161
162
163 class Wdir(FileAction):
164     # pylint: disable=R0903
165     """Is directory writable
166     """
167     def checks(self):
168         if not exists(self._file):
169             self.parser.error('directory does not exist: {0}'.format(self._file))
170         if not isdir(self._file):
171             self.parser.error('not a directory: {0}'.format(self._file))
172         if not access(self._file, W_OK):
173             self.parser.error('no write access to "{0}"'.format(self._file))
174
175
176 class Throttle:
177     """throttle decorator"""
178     def __init__(self, wait):
179         self.wait = wait
180         self.last_called = datetime.now()
181
182     def __call__(self, func):
183         def wrapper(*args, **kwargs):
184             while self.last_called + self.wait > datetime.now():
185                 sleep(0.1)
186             result = func(*args, **kwargs)
187             self.last_called = datetime.now()
188             return result
189         return wrapper
190
191
192 class MPDSimaException(Exception):
193     """Generic MPD_sima Exception"""
194
195
196 # http client exceptions (for webservices)
197 class WSError(MPDSimaException):
198     pass
199
200
201 class WSNotFound(WSError):
202     pass
203
204
205 class WSTimeout(WSError):
206     pass
207
208
209 class WSHTTPError(WSError):
210     pass
211
212
213 class PluginException(MPDSimaException):
214     pass
215
216 # VIM MODLINE
217 # vim: ai ts=4 sw=4 sts=4 expandtab