From 98d6037125815600cf4732be859848c1bf7c1eca Mon Sep 17 00:00:00 2001 From: kaliko Date: Sun, 3 Mar 2024 11:38:50 +0100 Subject: [PATCH] Add per connection command wrapper --- mpdaio-object.py | 9 +- mpdaio-time.py | 84 +++++++++++------ mpdaio/__init__.py | 1 - mpdaio/client.py | 218 ++++++++++++++++++++----------------------- mpdaio/connection.py | 5 +- 5 files changed, 168 insertions(+), 149 deletions(-) diff --git a/mpdaio-object.py b/mpdaio-object.py index 56533fa..87eabfe 100644 --- a/mpdaio-object.py +++ b/mpdaio-object.py @@ -1,14 +1,17 @@ import asyncio +import logging import timeit from mpdaio.client import MPDClient +logging.basicConfig(level=logging.DEBUG, + format='%(levelname)-8s %(module)-10s %(message)s') + async def run_cli(): cli = MPDClient() cli.mpd_timeout = 0.1 - #await cli.connect() - #current = await cli.currentsong() - #print(current) + current = await cli.currentsong() + print(current) await cli.currentsong() await cli.playlistinfo() diff --git a/mpdaio-time.py b/mpdaio-time.py index 1e21080..af9ec3a 100644 --- a/mpdaio-time.py +++ b/mpdaio-time.py @@ -5,8 +5,6 @@ import timeit from mpdaio.client import MPDClient from musicpd import MPDClient as MPDClientNAIO -logging.basicConfig(level=logging.DEBUG, - format='%(levelname)-8s %(module)-10s %(message)s') async def run_cli(): cli = MPDClient() @@ -24,48 +22,78 @@ async def run_cli(): await cli.close() +async def gather_calls(): + await asyncio.gather( + cli.ping(), + cli.currentsong(), + cli.playlistinfo(), + cli.list('artist'), + cli.listallinfo('The Doors'), + cli.listallinfo('The Doors'), + cli.listallinfo('The Doors'), + cli.listallinfo('AFX'), + cli.listallinfo('AFX'), + cli.listallinfo('AFX'), + cli.find('(MUSICBRAINZ_ARTISTID == "9efff43b-3b29-4082-824e-bc82f646f93d")'), + ) + + +async def aio_warmup(): + # print(len(cli._pool._connections.setdefault(host, []))) + await gather_calls() + # print(len(cli._pool._connections.setdefault(host, []))) + + async def aio(): - cli = MPDClient(host='kaliko.me', port=6601) # Group tasks together try: - await asyncio.gather( - cli.currentsong(), - # cli.playlistinfo(), - # cli.list('artist'), - # cli.listallinfo('The Doors'), - # cli.listallinfo('AFX') - ) - # await asyncio.gather( - # cli.currentsong() - # ) + await gather_calls() finally: # finally close await cli.close() def noaio(): - cli = MPDClientNAIO() - cli.mpd_timeout = 0.1 - cli.connect(host='kaliko.me', port='6601') + cli.ping() cli.currentsong() cli.playlistinfo() cli.list('artist') cli.listallinfo('The Doors') + cli.listallinfo('The Doors') + cli.listallinfo('The Doors') cli.listallinfo('AFX') - # finally close - cli.disconnect() + cli.listallinfo('AFX') + cli.listallinfo('AFX') + cli.find('(MUSICBRAINZ_ARTISTID == "9efff43b-3b29-4082-824e-bc82f646f93d")') + if __name__ == '__main__': - asyncio.run(aio()) - asyncio.run(run_cli()) - import sys - sys.exit(0) + # Setup + n = 5 + host = ('kaliko.me', '6601') + logging.basicConfig(level=logging.INFO, + format='%(levelname)-8s %(module)-10s %(message)s') + print('Running aio code') - t = timeit.Timer('asyncio.run(aio())', globals=globals()) - #print(t.autorange()) - print(t.timeit(10)) - # + cli = MPDClient(*host) + ev = asyncio.get_event_loop() + ev.run_until_complete(aio_warmup()) + + # Time it + #t = timeit.Timer('ev.run_until_complete(aio_warmup())', globals=globals()) + t = timeit.Timer('ev.run_until_complete(aio_warmup())', globals=globals()) + print('Start timeit') + print(t.timeit(n)) + print(t.timeit(n)) + ev.run_until_complete(cli.close()) + + # Setup print('Running non aio code') + cli = MPDClientNAIO() + cli.mpd_timeout = 0.1 + cli.connect(*host) + # Time it t = timeit.Timer('noaio()', globals=globals()) - #print(t.autorange()) - print(t.timeit(10)) + print(t.timeit(n)) + print(t.timeit(n)) + cli.disconnect() diff --git a/mpdaio/__init__.py b/mpdaio/__init__.py index 18ff0b0..5d72b16 100644 --- a/mpdaio/__init__.py +++ b/mpdaio/__init__.py @@ -15,4 +15,3 @@ CONNECTION_TIMEOUT = 30 SOCKET_TIMEOUT = None #: Maximum concurrent connections CONNECTION_MAX = 100 - diff --git a/mpdaio/client.py b/mpdaio/client.py index c790093..65e1497 100644 --- a/mpdaio/client.py +++ b/mpdaio/client.py @@ -18,6 +18,84 @@ log = logging.getLogger(__name__) class MPDClient: def __init__(self, host: str | None = None, port: str | int | None = None, password: str | None = None): + self._pool = ConnectionPool(max_connections=CONNECTION_MAX) + self._get_envvars() + #: host used with the current connection (:py:obj:`str`) + self.host = host or self.server_discovery[0] + #: password detected in :envvar:`MPD_HOST` environment variable (:py:obj:`str`) + self.password = password or self.server_discovery[2] + #: port used with the current connection (:py:obj:`int`, :py:obj:`str`) + self.port = port or self.server_discovery[1] + log.info('logger : "%s"', __name__) + #: Protocol version + self.version: [None, str] = None + self.mpd_timeout = CONNECTION_TIMEOUT + + def _get_envvars(self): + """ + Retrieve MPD env. var. to overrides default "localhost:6600" + """ + # Set some defaults + disco_host = 'localhost' + disco_port = os.getenv('MPD_PORT', '6600') + pwd = None + _host = os.getenv('MPD_HOST', '') + if _host: + # If password is set: MPD_HOST=pass@host + if '@' in _host: + mpd_host_env = _host.split('@', 1) + if mpd_host_env[0]: + # A password is actually set + log.debug( + 'password detected in MPD_HOST, set client pwd attribute') + pwd = mpd_host_env[0] + if mpd_host_env[1]: + disco_host = mpd_host_env[1] + log.debug('host detected in MPD_HOST: %s', disco_host) + elif mpd_host_env[1]: + # No password set but leading @ is an abstract socket + disco_host = '@'+mpd_host_env[1] + log.debug( + 'host detected in MPD_HOST: %s (abstract socket)', disco_host) + else: + # MPD_HOST is a plain host + disco_host = _host + log.debug('host detected in MPD_HOST: %s', disco_host) + else: + # Is socket there + xdg_runtime_dir = os.getenv('XDG_RUNTIME_DIR', '/run') + rundir = os.path.join(xdg_runtime_dir, 'mpd/socket') + if os.path.exists(rundir): + disco_host = rundir + log.debug( + 'host detected in ${XDG_RUNTIME_DIR}/run: %s (unix socket)', disco_host) + _mpd_timeout = os.getenv('MPD_TIMEOUT', '') + if _mpd_timeout.isdigit(): + self.mpd_timeout = int(_mpd_timeout) + log.debug('timeout detected in MPD_TIMEOUT: %d', self.mpd_timeout) + else: # Use CONNECTION_TIMEOUT as default even if MPD_TIMEOUT carries gargage + self.mpd_timeout = CONNECTION_TIMEOUT + self.server_discovery = (disco_host, disco_port, pwd) + + def __getattr__(self, attr): + command = attr + wrapper = CmdHandler(self._pool, self.host, self.port, self.password, self.mpd_timeout) + if command not in wrapper._commands: + command = command.replace("_", " ") + if command not in wrapper._commands: + raise AttributeError( + f"'CmdHandler' object has no attribute '{attr}'") + return lambda *args: wrapper(command, args) + + async def close(self): + await self._pool.close() + + +class CmdHandler: + #TODO: CmdHandler to intanciate in place of MPDClient._execute + # The MPDClient.__getattr__ wrapper should instanciate an CmdHandler object + + def __init__(self, pool, server, port, password, timeout): self._commands = { # Status Commands "clearerror": self._fetch_nothing, @@ -149,111 +227,33 @@ class MPDClient: "readmessages": self._fetch_messages, "sendmessage": self._fetch_nothing, } - self._get_envvars() - #: host used with the current connection (:py:obj:`str`) - self.host = host or self.server_discovery[0] - #: password detected in :envvar:`MPD_HOST` environment variable (:py:obj:`str`) - self.password = password or self.server_discovery[2] - #: port used with the current connection (:py:obj:`int`, :py:obj:`str`) - self.port = port or self.server_discovery[1] - # self._get_envvars() - self._pool = ConnectionPool(max_connections=CONNECTION_MAX) - log.info('logger : "%s"', __name__) + self.command = None + self._command_list = None + self.args = None + self.pool = pool + self.host = (server, port) + self.password = password + self.timeout = timeout #: current connection self.connection: [None, Connection] = None - #: Protocol version - self.version: [None, str] = None - self._command_list = None - self.mpd_timeout = CONNECTION_TIMEOUT - def _get_envvars(self): - """ - Retrieve MPD env. var. to overrides default "localhost:6600" - """ - # Set some defaults - disco_host = 'localhost' - disco_port = os.getenv('MPD_PORT', '6600') - pwd = None - _host = os.getenv('MPD_HOST', '') - if _host: - # If password is set: MPD_HOST=pass@host - if '@' in _host: - mpd_host_env = _host.split('@', 1) - if mpd_host_env[0]: - # A password is actually set - log.debug( - 'password detected in MPD_HOST, set client pwd attribute') - pwd = mpd_host_env[0] - if mpd_host_env[1]: - disco_host = mpd_host_env[1] - log.debug('host detected in MPD_HOST: %s', disco_host) - elif mpd_host_env[1]: - # No password set but leading @ is an abstract socket - disco_host = '@'+mpd_host_env[1] - log.debug( - 'host detected in MPD_HOST: %s (abstract socket)', disco_host) - else: - # MPD_HOST is a plain host - disco_host = _host - log.debug('host detected in MPD_HOST: %s', disco_host) - else: - # Is socket there - xdg_runtime_dir = os.getenv('XDG_RUNTIME_DIR', '/run') - rundir = os.path.join(xdg_runtime_dir, 'mpd/socket') - if os.path.exists(rundir): - disco_host = rundir - log.debug( - 'host detected in ${XDG_RUNTIME_DIR}/run: %s (unix socket)', disco_host) - _mpd_timeout = os.getenv('MPD_TIMEOUT', '') - if _mpd_timeout.isdigit(): - self.mpd_timeout = int(_mpd_timeout) - log.debug('timeout detected in MPD_TIMEOUT: %d', self.mpd_timeout) - else: # Use CONNECTION_TIMEOUT as default even if MPD_TIMEOUT carries gargage - self.mpd_timeout = CONNECTION_TIMEOUT - self.server_discovery = (disco_host, disco_port, pwd) - - def __getattr__(self, attr): - # if attr == 'send_noidle': # have send_noidle to cancel idle as well as noidle - # return self.noidle - if attr.startswith("send_"): - command = attr.replace("send_", "", 1) - wrapper = self._send - elif attr.startswith("fetch_"): - command = attr.replace("fetch_", "", 1) - wrapper = self._fetch - else: - command = attr - wrapper = self._execute - if command not in self._commands: - command = command.replace("_", " ") - if command not in self._commands: - cls = self.__class__.__name__ - raise AttributeError( - f"'{cls}' object has no attribute '{attr}'") - return lambda *args: wrapper(command, args) - - async def _execute(self, command, args): # pylint: disable=unused-argument - log.debug(f'#{command}') - # self.connection = await self._pool.connect(self.host, self.port, timeout=self.mpd_timeout) - # await self._get_connection() - async with await self._get_connection(): - # if self._pending: - # raise MPDCommandError( - # f"Cannot execute '{command}' with pending commands") + def __repr__(self): + args = [str(_) for _ in self.args] + args = ','.join(args or []) + return f'{self.command}({args})' + + async def __call__(self, command: str, args: list | None): + server, port = self.host + self.command = command + self.args = args or '' + self.connection = await self.pool.connect(server, port, timeout=self.timeout) + async with self.connection: retval = self._commands[command] - if self._command_list is not None: - if not callable(retval): - raise MPDCommandError( - f"'{command}' not allowed in command list") - self._write_command(command, args) - self._command_list.append(retval) - else: - await self._write_command(command, args) - if callable(retval): - # log.debug('retvat: %s', retval) - return await retval() - return retval - return None + await self._write_command(command, args) + if callable(retval): + log.debug('retvat: %s', retval) + return await retval() + return retval async def _write_line(self, line): self.connection.write(f"{line!s}\n".encode()) @@ -270,6 +270,7 @@ class MPDClient: parts.append(f'"{escape(str(arg))}"') if '\n' in ' '.join(parts): raise MPDCommandError('new line found in the command!') + #log.debug(' '.join(parts)) await self._write_line(' '.join(parts)) def _read_binary(self, amount): @@ -446,18 +447,3 @@ class MPDClient: def _fetch_command_list(self): return self._read_command_list() - - async def _get_connection(self) -> Connection: - self.connection = await self._pool.connect(self.host, self.port, timeout=self.mpd_timeout) - return self.connection - - async def close(self): - await self._pool.close() - - -class CmdHandler: - #TODO: CmdHandler to intanciate in place of MPDClient._execute - # The MPDClient.__getattr__ wrapper should instanciate an CmdHandler object - - def __init__(self): - pass diff --git a/mpdaio/connection.py b/mpdaio/connection.py index f327a92..f4a97de 100644 --- a/mpdaio/connection.py +++ b/mpdaio/connection.py @@ -51,6 +51,8 @@ class ConnectionPool(base): # find an un-used connection for this host connection = next( (conn for conn in connections if not conn.in_use), None) + #if connection: + # log.debug('reusing %s', connection) if connection is None: # disconnect the least-recently-used un-used connection to make space # for a new connection. There will be at least one. @@ -65,7 +67,7 @@ class ConnectionPool(base): asyncio.open_connection(server, port), timeout ) - log.info('Connected to %s:%s', host[0], host[1]) + #log.debug('Connected to %s:%s', host[0], host[1]) connection = Connection(self, host, reader, writer) await connection._hello() connections.append(connection) @@ -81,6 +83,7 @@ class ConnectionPool(base): """Close all connections""" connections = [c for cs in self._connections.values() for c in cs] self._connections = OrderedDict() + log.info('Closing all connections') for connection in connections: await connection.close() -- 2.39.5