# -*- coding: utf-8 -*-
-
-# Copyright (C) 2007-2012 Thomas Perl <thp.io/about>
-# Copyright (C) 2010, 2011 Anaël Verrier <elghinn@free.fr>
-# Copyright (C) 2014, 2015, 2020 kaliko <kaliko@azylum.org>
-
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, version 3 only.
-
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
+# SPDX-FileCopyrightText: 2007-2012 Thomas Perl <thp.io/about>
+# SPDX-FileCopyrightText: 2010, 2011 Anaël Verrier <elghinn@free.fr>
+# SPDX-FileCopyrightText: 2014, 2015, 2020, 2023 kaliko <kaliko@azylum.org>
+# SPDX-License-Identifier: GPL-3.0-or-later
import inspect
import slixmpp
-from sid import __url__
+from sid import __url__, __doc__
+
+log = logging.getLogger(__package__)
def botcmd(*args, **kwargs):
#: Defaults to "!"
prefix = '!'
- def __init__(self, jid, password, room, nick, log_file=None,
+ def __init__(self, jid, password, room, nick,
log_level=logging.INFO):
super(MUCBot, self).__init__(jid, password)
# Clean sphinx autodoc for self documentation
# (cf. MUCBot.help)
self.__doc__ = None
- self.log = logging.getLogger(__package__)
+ self.log = log
self.plugins = list()
self.commands = dict()
self.room = room
self.nick = nick
- self.__set_logger(log_file, log_level)
+ #: Keep track of MUC presences: {'nick': presence}
+ self.muc_presences = {}
+ self.__set_logger(log_level)
self.__seen = dict()
self.register_plugin('xep_0030') # Service Discovery
self.register_plugin('xep_0045') # Multi-User Chat
self.add_event_handler('message', self.message)
self.add_event_handler('got_online', self._view)
+ # keep track of join/parts
+ self.add_event_handler(f'muc::{self.room}::got_offline', self._muc_got_offline)
+ self.add_event_handler(f'muc::{self.room}::got_online', self._muc_got_online)
+ self.add_event_handler(f'muc::{self.room}::presence', self._muc_got_presence)
+
+ # Handles disconnection
+ self.add_event_handler('disconnected', self.disconn)
+
# Discover bot internal command (ie. help)
for name, value in inspect.getmembers(self):
- if inspect.ismethod(value) and getattr(value, '_bot_command', False):
+ if inspect.ismethod(value) and \
+ getattr(value, '_bot_command', False):
name = getattr(value, '_bot_command_name')
- self.log.debug('Registered command: %s', name)
+ log.debug('Registered command: %s', name)
self.commands[name] = value
- def __set_logger(self, log_file=None, log_level=logging.INFO):
- """Create console/file handler"""
- log_fd = open(log_file, 'w') if log_file else None
- chandler = logging.StreamHandler(log_fd)
- formatter = logging.Formatter(
- '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- chandler.setFormatter(formatter)
- self.log.addHandler(chandler)
- self.log.setLevel(log_level)
- self.log.debug('set logger, log level : %s', log_level)
+ def __set_logger(self, log_level):
+ """Set logging level"""
+ log.setLevel(log_level)
+ log.debug('set logger, log level : %s', log_level)
+
+ def _muc_got_online(self, pres):
+ """Keep track of MUC participants"""
+ fjid = pres['muc']['jid']
+ nick = pres['muc']['nick']
+ role = pres['muc']['role']
+ affi = pres['muc']['affiliation']
+ user = fjid if fjid.full else nick
+ self.muc_presences.update({nick: pres})
+ log.debug('Participants: + %s:%s/%s (len:%s)',
+ user, role, affi,len(self.muc_presences))
+
+ def _muc_got_offline(self, pres):
+ """Keep track of MUC participants"""
+ fjid = pres['muc']['jid']
+ user = fjid if fjid.full else pres['muc']['nick']
+ try:
+ self.muc_presences.pop(pres['muc']['nick'])
+ except KeyError:
+ log.error('KeyError removing participant: "%s"', user)
+ log.debug('Participants: - %s got offline (len:%s)',
+ user, len(self.muc_presences))
+
+ def _muc_got_presence(self, pres):
+ """Keep track of MUC participants"""
+ nick = pres['muc']['nick']
+ fjid = pres['muc']['jid']
+ role = pres['muc']['role']
+ affi = pres['muc']['affiliation']
+ user = fjid if fjid.full else nick
+ log.debug('Participants: u %s:%s/%s (len:%s)',
+ user, role, affi,len(self.muc_presences))
+ self.muc_presences.update({nick: pres})
+
+ def disconn(self, event):
+ """disconnected handler"""
+ msg = ": %s" % event if event else "‽"
+ log.info('Disconnected from server%s', msg)
+ self.connect()
def message(self, msg):
"""Messages handler
Parses message received to detect :py:obj:`prefix`
"""
if msg['type'] not in ('groupchat', 'chat'):
- self.log.warning('Unhandled message')
+ log.warning('Unhandled message')
return
if msg['mucnick'] == self.nick:
return
body = msg['body'].strip()
if not body.startswith(MUCBot.prefix):
return
- if msg['from'] not in self.__seen:
- self.log.warning('Will not handle message from unseen jid: %s', msg['from'])
- #return
+ log.debug(msg['from'])
+ if msg['from'] not in self.__seen and msg['type'] == 'chat':
+ log.warning('Will not handle direct message'
+ 'from unseen jid: %s', msg['from'])
+ return
args = body[1:].split()
cmd = args.pop(0)
if cmd not in self.commands:
return
- self.log.debug('cmd: %s', cmd)
+ log.debug('cmd: %s', cmd)
if args:
- self.log.debug('arg: %s', args)
+ log.debug('arg: %s', args)
try:
self.commands[cmd](msg, args)
except Exception as err:
reply = ''.join(traceback.format_exc())
- self.log.exception('An error occurred processing: %s: %s', body, reply)
- if self.log.level < 10 and reply:
- self.send_message(mto=msg['from'].bare, mbody=reply, mtype='groupchat')
+ log.exception('An error occurred processing: %s: %s', body, reply)
+ if log.level < 10 and reply:
+ self.send_message(mto=msg['from'].bare, mbody=reply,
+ mtype='groupchat')
def _view(self, pres):
"""Track known nick"""
status = (pres['type'], pres['status'])
self.__seen.update({nick: status})
- def start(self, event):
+ async def start(self, event):
"""
Process the session_start event.
:param dict event: An empty dictionary. The session_start
event does not provide any additional data.
"""
- self.get_roster()
+ await self.get_roster()
self.send_presence()
- self.plugin['xep_0045'].join_muc(self.room,
- self.nick,
- # If a room password is needed, use:
- # password=the_room_password,
- wait=True)
+ await self.plugin['xep_0045'].join_muc_wait(self.room,
+ self.nick,
+ # Do not fetch history
+ maxstanzas=0,
+ # If a room password is needed, use:
+ # password=the_room_password,
+ )
+
+ def register_bot_plugin(self, plugin_cls):
+ """Registers plugin, takes a class, the method instanciates the plugin
:param `sid.plugin.Plugin` plugin_cls: A :py:obj:`sid.plugin.Plugin` class
"""
self.plugins.append(plugin_cls(self))
for name, value in inspect.getmembers(self.plugins[-1]):
- if inspect.ismethod(value) and getattr(value, '_bot_command',
- False):
+ if inspect.ismethod(value) and \
+ getattr(value, '_bot_command', False):
+ name = getattr(value, '_bot_command_name')
+ log.debug('Registered command: %s', name)
+ self.commands[name] = value
+
+ def unregister_bot_plugin(self, plugin):
+ for name, value in inspect.getmembers(plugin):
+ if inspect.ismethod(value) and \
+ getattr(value, '_bot_command', False):
name = getattr(value, '_bot_command_name')
- self.log.debug('Registered command: %s', name)
+ log.debug('Unegistered command: %s', name)
self.commands[name] = value
+ self.plugins.remove(plugin)
def foreach_plugin(self, method, *args, **kwds):
for plugin in self.plugins:
- self.log.debug('shuting down %s', plugin.__str__)
+ log.debug('calling %s for %s', method, plugin)
getattr(plugin, method)(*args, **kwds)
def shutdown_plugins(self):
- # TODO: why can't use event session_end|disconnected?
- self.log.info('shuting down')
+ # TODO: also use event session_end|disconnected?
+ log.info('shuting down')
for plugin in self.plugins:
- self.log.debug('shuting down %s', plugin)
+ log.debug('shuting down %s', plugin)
getattr(plugin, 'shutdown')()
@botcmd
Automatically assigned to the "help" command."""
help_cmd = ('Type {}help <command name>'.format(self.prefix) +
' to get more info about that specific command.\n\n' +
+ f'DOC: {__doc__}\n' +
f'SRC: {__url__}')
if not args:
if self.__doc__: