from functools import wraps
# Type hint for python <= 3.8
from typing import Any, Dict, List, Tuple
-from typing import Literal, Optional, Union
+from typing import Optional, Union
HELLO_PREFIX = "OK MPD "
ERROR_PREFIX = "ACK "
#: Current connection timeout value, defaults to
#: :py:obj:`CONNECTION_TIMEOUT` or env. var. ``MPD_TIMEOUT`` if provided
self.mpd_timeout: Union[None,int] = None
+ #: Protocol version as exposed by the server
+ self.mpd_version: str = ''
self._reset()
self._commands = {
# Status Commands
amount -= len(result)
return bytes(chunk)
- def _read_line(self, binary: Literal[True,False] = False):
+ def _read_line(self, binary: bool = False):
if binary:
line = self._rbfile.readline().decode('utf-8')
else:
return None
return line
- def _read_pair(self, separator: str, binary: Literal[True,False] = False):
+ def _read_pair(self, separator: str, binary: bool = False):
line = self._read_line(binary=binary)
if line is None:
return None
raise ProtocolError(f"Could not parse pair: '{line}'")
return pair
- def _read_pairs(self, separator=": ", binary: Literal[True,False] =False):
+ def _read_pairs(self, separator=": ", binary: bool =False):
pair = self._read_pair(separator, binary=binary)
while pair:
yield pair
self.mpd_version = line[len(HELLO_PREFIX):].strip()
def _reset(self):
- self.mpd_version = None
+ self.mpd_version = ''
self._iterating = False
self._pending = []
self._command_list = None