from types import TracebackType
from typing import Any, List, Optional, Tuple, Type
+from . import HELLO_PREFIX
+from .exceptions import MPDProtocolError
try: # Python 3.7
base = contextlib.AbstractAsyncContextManager
List["Connection"]] = OrderedDict()
self._semaphore = asyncio.Semaphore(max_connections)
- async def connect(self, server: Server, port: Port) -> "Connection":
+ async def connect(self, server: Server, port: Port, timeout: int) -> "Connection":
host = (server, port)
# enforce the connection limit, releasing connections notifies
await self._semaphore.acquire()
connections = self._connections.setdefault(host, [])
- log.info('got %s in pool', len(connections))
+ log.debug('got %s in pool', len(connections))
# find an un-used connection for this host
connection = next(
(conn for conn in connections if not conn.in_use), None)
+ #if connection:
+ # log.debug('reusing %s', connection)
if connection is None:
# disconnect the least-recently-used un-used connection to make space
# for a new connection. There will be at least one.
await conn.close()
break
- reader, writer = await asyncio.open_connection(server, port)
+ log.debug('about to connect %s', host)
+ reader, writer = await asyncio.wait_for(
+ asyncio.open_connection(server, port),
+ timeout
+ )
+ #log.debug('Connected to %s:%s', host[0], host[1])
connection = Connection(self, host, reader, writer)
+ await connection._hello()
connections.append(connection)
connection.in_use = True
# move current host to the front as most-recently used
self._connections.move_to_end(host, False)
+ log.debug('connection %s in use', connection)
return connection
"""Close all connections"""
connections = [c for cs in self._connections.values() for c in cs]
self._connections = OrderedDict()
+ log.info('Closing all connections')
for connection in connections:
await connection.close()
def __repr__(self):
host = f"{self._host[0]}:{self._host[1]}"
- return f"Connection<{host}>"
+ return f"Connection<{host}>#{id(self)}"
@property
def closed(self):
return self._closed
def release(self) -> None:
+ logging.debug('releasing %r', self)
self.in_use = False
self._pool._notify_release()
async def close(self) -> None:
if self._closed:
return
+ logging.debug('closing %r', self)
self._closed = True
self._writer.close()
self._pool._remove(self)
except AttributeError: # wait_closed is new in 3.7
pass
+ async def _hello(self) -> None:
+ """Consume HELLO_PREFIX"""
+ self.in_use = True
+ data = await self.readuntil(b'\n')
+ rcv = data.decode('utf-8')
+ if not rcv.startswith(HELLO_PREFIX):
+ raise MPDProtocolError(f'Got invalid MPD hello: "{rcv}"')
+ log.debug('consumed hello prefix: %r', rcv)
+ self.version = rcv.split('\n')[0][len(HELLO_PREFIX):]
+ log.info('protocol version: %s', self.version)
+
+
def __getattr__(self, name: str) -> Any:
"""All unknown attributes are delegated to the reader and writer"""
if self._closed or not self.in_use: