X-Git-Url: http://git.kaliko.me/?a=blobdiff_plain;f=musicpd.py;h=e779355ae24bf9fa00a49729e055e2ba0dee2f3f;hb=c353490d8fac7cb3f17193b1b2e3b9bb191d47ce;hp=61ca97b645fb4e4a3341a333f41174b3015c7c9f;hpb=0c16ca07e3ac85ab212c3444f9e5a71c4a96a406;p=python-musicpd.git diff --git a/musicpd.py b/musicpd.py index 61ca97b..e779355 100644 --- a/musicpd.py +++ b/musicpd.py @@ -1,39 +1,30 @@ -# python-musicpd: Python MPD client library -# Copyright (C) 2012-2021 kaliko -# Copyright (C) 2019 Naglis Jonaitis -# Copyright (C) 2019 Bart Van Loon -# Copyright (C) 2008-2010 J. Alexander Treuman -# -# python-musicpd is free software: you can redistribute it and/or modify -# it under the terms of the GNU Lesser General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# python-musicpd is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with python-musicpd. If not, see . - -# pylint: disable=missing-docstring +# SPDX-FileCopyrightText: 2012-2023 kaliko +# SPDX-FileCopyrightText: 2021 Wonko der Verständige +# SPDX-FileCopyrightText: 2019 Naglis Jonaitis +# SPDX-FileCopyrightText: 2019 Bart Van Loon +# SPDX-FileCopyrightText: 2008-2010 J. Alexander Treuman +# SPDX-License-Identifier: LGPL-3.0-or-later +"""python-musicpd: Python Music Player Daemon client library""" + import socket import os from functools import wraps +# Type hint for python <= 3.8 +from typing import Any, Dict, List, Tuple +from typing import Optional, Union HELLO_PREFIX = "OK MPD " ERROR_PREFIX = "ACK " SUCCESS = "OK" NEXT = "list_OK" -VERSION = '0.7.0' +VERSION = '0.9.0b0' #: Seconds before a connection attempt times out #: (overriden by MPD_TIMEOUT env. var.) -CONNECTION_TIMEOUT = 30 +CONNECTION_TIMEOUT: int = 30 #: Socket timeout in second (Default is None for no timeout) -SOCKET_TIMEOUT = None +SOCKET_TIMEOUT: Union[int, None] = None def iterator_wrapper(func): @@ -85,7 +76,7 @@ class IteratingError(MPDError): class Range: - def __init__(self, tpl): + def __init__(self, tpl: Tuple[int]): self.tpl = tpl self._check() @@ -107,8 +98,8 @@ class Range: for index in self.tpl: try: index = int(index) - except (TypeError, ValueError): - raise CommandError('Not a tuple of int') + except (TypeError, ValueError) as err: + raise CommandError('Not a tuple of int') from err class _NotConnected: @@ -116,7 +107,7 @@ class _NotConnected: def __getattr__(self, attr): return self._dummy - def _dummy(*args): + def _dummy(self, *args): raise ConnectionError("Not connected") @@ -157,13 +148,15 @@ class MPDClient: will not be used as default value for the :py:meth:`password` method """ - def __init__(self): - self.iterate = False + def __init__(self) -> None: + self.iterate: bool = False #: Socket timeout value in seconds self._socket_timeout = SOCKET_TIMEOUT #: Current connection timeout value, defaults to - #: :py:attr:`musicpd.MPD_TIMEOUT` or env. var. ``MPD_TIMEOUT`` if provided - self.mpd_timeout = None + #: :py:obj:`CONNECTION_TIMEOUT` or env. var. ``MPD_TIMEOUT`` if provided + self.mpd_timeout: Union[None,int] = None + #: Protocol version as exposed by the server + self.mpd_version: str = '' self._reset() self._commands = { # Status Commands @@ -298,19 +291,20 @@ class MPDClient: } self._get_envvars() - def _get_envvars(self): + def _get_envvars(self) -> None: """ Retrieve MPD env. var. to overrides "localhost:6600" Use MPD_HOST/MPD_PORT if set else use MPD_HOST=${XDG_RUNTIME_DIR:-/run/}/mpd/socket if file exists """ - self.host = 'localhost' - self.pwd = None - self.port = os.getenv('MPD_PORT', '6600') - if os.getenv('MPD_HOST'): + self.host: str = 'localhost' + self.pwd: Union[None, str] = None + self.port: Union[int,str] = os.getenv('MPD_PORT', '6600') + _host: str = os.getenv('MPD_HOST', '') + if _host: # If password is set: MPD_HOST=pass@host - if '@' in os.getenv('MPD_HOST'): - mpd_host_env = os.getenv('MPD_HOST').split('@', 1) + if '@' in _host: + mpd_host_env = _host.split('@', 1) if mpd_host_env[0]: # A password is actually set self.pwd = mpd_host_env[0] @@ -321,16 +315,16 @@ class MPDClient: self.host = '@'+mpd_host_env[1] else: # MPD_HOST is a plain host - self.host = os.getenv('MPD_HOST') + self.host = _host else: # Is socket there xdg_runtime_dir = os.getenv('XDG_RUNTIME_DIR', '/run') rundir = os.path.join(xdg_runtime_dir, 'mpd/socket') if os.path.exists(rundir): self.host = rundir - self.mpd_timeout = os.getenv('MPD_TIMEOUT') - if self.mpd_timeout and self.mpd_timeout.isdigit(): - self.mpd_timeout = int(self.mpd_timeout) + _mpd_timeout = os.getenv('MPD_TIMEOUT', 'X') + if _mpd_timeout.isdigit(): + self.mpd_timeout = int(_mpd_timeout) else: # Use CONNECTION_TIMEOUT as default even if MPD_TIMEOUT carries gargage self.mpd_timeout = CONNECTION_TIMEOUT @@ -349,8 +343,8 @@ class MPDClient: if command not in self._commands: command = command.replace("_", " ") if command not in self._commands: - raise AttributeError("'%s' object has no attribute '%s'" % - (self.__class__.__name__, attr)) + cls = self.__class__.__name__ + raise AttributeError(f"'{cls}' object has no attribute '{attr}'") return lambda *args: wrapper(command, args) def _send(self, command, args): @@ -362,36 +356,31 @@ class MPDClient: if retval is not None: self._pending.append(command) - def _fetch(self, command, args=None): + def _fetch(self, command, args=None): # pylint: disable=unused-argument + cmd_fmt = command.replace(" ", "_") if self._command_list is not None: - raise CommandListError("Cannot use fetch_%s in a command list" % - command.replace(" ", "_")) + raise CommandListError(f"Cannot use fetch_{cmd_fmt} in a command list") if self._iterating: - raise IteratingError("Cannot use fetch_%s while iterating" % - command.replace(" ", "_")) + raise IteratingError(f"Cannot use fetch_{cmd_fmt} while iterating") if not self._pending: raise PendingCommandError("No pending commands to fetch") if self._pending[0] != command: - raise PendingCommandError("'%s' is not the currently " - "pending command" % command) + raise PendingCommandError(f"'{command}' is not the currently pending command") del self._pending[0] retval = self._commands[command] if callable(retval): return retval() return retval - def _execute(self, command, args): + def _execute(self, command, args): # pylint: disable=unused-argument if self._iterating: - raise IteratingError("Cannot execute '%s' while iterating" % - command) + raise IteratingError(f"Cannot execute '{command}' while iterating") if self._pending: - raise PendingCommandError( - "Cannot execute '%s' with pending commands" % command) + raise PendingCommandError(f"Cannot execute '{command}' with pending commands") retval = self._commands[command] if self._command_list is not None: if not callable(retval): - raise CommandListError( - "'%s' not allowed in command list" % command) + raise CommandListError(f"'{command}' not allowed in command list") self._write_command(command, args) self._command_list.append(retval) else: @@ -399,9 +388,10 @@ class MPDClient: if callable(retval): return retval() return retval + return None def _write_line(self, line): - self._wfile.write("%s\n" % line) + self._wfile.write(f"{line!s}\n") self._wfile.flush() def _write_command(self, command, args=None): @@ -428,7 +418,7 @@ class MPDClient: amount -= len(result) return bytes(chunk) - def _read_line(self, binary=False): + def _read_line(self, binary: bool = False): if binary: line = self._rbfile.readline().decode('utf-8') else: @@ -442,23 +432,23 @@ class MPDClient: raise CommandError(error) if self._command_list is not None: if line == NEXT: - return + return None if line == SUCCESS: - raise ProtocolError("Got unexpected '%s'" % SUCCESS) + raise ProtocolError(f"Got unexpected '{SUCCESS}'") elif line == SUCCESS: - return + return None return line - def _read_pair(self, separator, binary=False): + def _read_pair(self, separator: str, binary: bool = False): line = self._read_line(binary=binary) if line is None: - return + return None pair = line.split(separator, 1) if len(pair) < 2: - raise ProtocolError("Could not parse pair: '%s'" % line) + raise ProtocolError(f"Could not parse pair: '{line}'") return pair - def _read_pairs(self, separator=": ", binary=False): + def _read_pairs(self, separator=": ", binary: bool =False): pair = self._read_pair(separator, binary=binary) while pair: yield pair @@ -469,8 +459,7 @@ class MPDClient: for key, value in self._read_pairs(): if key != seen: if seen is not None: - raise ProtocolError("Expected key '%s', got '%s'" % - (seen, key)) + raise ProtocolError(f"Expected key '{seen}', got '{key}'") seen = key yield value @@ -478,8 +467,8 @@ class MPDClient: for _, value in self._read_pairs(":"): yield value - def _read_objects(self, delimiters=None): - obj = {} + def _read_objects(self, delimiters: Optional[List[str]] = None): + obj: Dict[str,Any] = {} if delimiters is None: delimiters = [] for key, value in self._read_pairs(): @@ -509,12 +498,12 @@ class MPDClient: def _fetch_nothing(self): line = self._read_line() if line is not None: - raise ProtocolError("Got unexpected return value: '%s'" % line) + raise ProtocolError(f"Got unexpected return value: '{line}'") def _fetch_item(self): pairs = list(self._read_pairs()) if len(pairs) != 1: - return + return None return pairs[0][1] @iterator_wrapper @@ -577,10 +566,11 @@ class MPDClient: try: obj['data'] = self._read_binary(amount) except IOError as err: - raise ConnectionError('Error reading binary content: %s' % err) - if len(obj['data']) != amount: # can we ever get there? + raise ConnectionError(f'Error reading binary content: {err}') from err + data_bytes = len(obj['data']) + if data_bytes != amount: # can we ever get there? raise ConnectionError('Error reading binary content: ' - 'Expects %sB, got %s' % (amount, len(obj['data']))) + f'Expects {amount}B, got {data_bytes}') # Fetches trailing new line self._read_line(binary=True) # Fetches SUCCESS code @@ -597,11 +587,11 @@ class MPDClient: raise ConnectionError("Connection lost while reading MPD hello") line = line.rstrip("\n") if not line.startswith(HELLO_PREFIX): - raise ProtocolError("Got invalid MPD hello: '%s'" % line) + raise ProtocolError(f"Got invalid MPD hello: '{line}'") self.mpd_version = line[len(HELLO_PREFIX):].strip() def _reset(self): - self.mpd_version = None + self.mpd_version = '' self._iterating = False self._pending = [] self._command_list = None @@ -612,8 +602,7 @@ class MPDClient: def _connect_unix(self, path): if not hasattr(socket, "AF_UNIX"): - raise ConnectionError( - "Unix domain sockets not supported on this platform") + raise ConnectionError("Unix domain sockets not supported on this platform") # abstract socket if path.startswith('@'): path = '\0'+path[1:] @@ -646,19 +635,17 @@ class MPDClient: sock.close() if err is not None: raise ConnectionError(str(err)) - else: - raise ConnectionError("getaddrinfo returns an empty list") + raise ConnectionError("getaddrinfo returns an empty list") def noidle(self): # noidle's special case if not self._pending or self._pending[0] != 'idle': - raise CommandError( - 'cannot send noidle if send_idle was not called') + raise CommandError('cannot send noidle if send_idle was not called') del self._pending[0] self._write_command("noidle") return self._fetch_list() - def connect(self, host=None, port=None): + def connect(self, host: Optional[str] = None, port: Optional[Union[int, str]] = None): """Connects the MPD server :param str host: hostname, IP or FQDN (defaults to `localhost` or socket, see below for details) @@ -735,6 +722,13 @@ class MPDClient: self._sock.close() self._reset() + def __enter__(self): + self.connect() + return self + + def __exit__(self, exception_type, exception_value, exception_traceback): + self.disconnect() + def fileno(self): """Return the socket’s file descriptor (a small integer). This is useful with :py:obj:`select.select`. @@ -749,8 +743,7 @@ class MPDClient: if self._iterating: raise IteratingError("Cannot begin command list while iterating") if self._pending: - raise PendingCommandError("Cannot begin command list " - "with pending commands") + raise PendingCommandError("Cannot begin command list with pending commands") self._write_command("command_list_ok_begin") self._command_list = [] @@ -763,7 +756,7 @@ class MPDClient: return self._fetch_command_list() -def escape(text): +def escape(text: str) -> str: return text.replace("\\", "\\\\").replace('"', '\\"') # vim: set expandtab shiftwidth=4 softtabstop=4 textwidth=79: