# Copyright (C) 2007-2012 Thomas Perl <thp.io/about>
# Copyright (C) 2010, 2011 Anaƫl Verrier <elghinn@free.fr>
-# Copyright (C) 2014 kaliko <kaliko@azylum.org>
+# Copyright (C) 2014, 2015, 2020 kaliko <kaliko@azylum.org>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
import logging
import traceback
-import sleekxmpp
+import slixmpp
def botcmd(*args, **kwargs):
return lambda func: decorate(func, **kwargs)
-class MUCBot(sleekxmpp.ClientXMPP):
+class MUCBot(slixmpp.ClientXMPP):
prefix = '!'
def __init__(self, jid, password, room, nick, log_file=None,
- log_level=logging.INFO):
+ log_level=logging.INFO):
super(MUCBot, self).__init__(jid, password)
- self.log = logging.getLogger(__name__)
+ self.log = logging.getLogger(__package__)
self.plugins = list()
self.commands = dict()
self.room = room
self.nick = nick
self.__set_logger(log_file, log_level)
+ self.__seen = dict()
self.register_plugin('xep_0030') # Service Discovery
self.register_plugin('xep_0045') # Multi-User Chat
+ self.register_plugin('xep_0071') # xhtml-im
self.register_plugin('xep_0199') # self Ping
# The session_start event will be triggered when
# and the XML streams are ready for use. We want to
# listen for this event so that we we can initialize
# our roster.
- self.add_event_handler("session_start", self.start)
+ self.add_event_handler('session_start', self.start)
# Handles MUC message and dispatch
- self.add_event_handler("groupchat_message", self.muc_message)
+ self.add_event_handler('message', self.message)
+ self.add_event_handler('got_online', self._view)
# Discover bot internal command (ie. help)
for name, value in inspect.getmembers(self):
if inspect.ismethod(value) and getattr(value, '_bot_command', False):
name = getattr(value, '_bot_command_name')
- self.log.debug('Registered command: %s' % name)
+ self.log.debug('Registered command: %s', name)
self.commands[name] = value
def __set_logger(self, log_file=None, log_level=logging.INFO):
log_fd = open(log_file, 'w') if log_file else None
chandler = logging.StreamHandler(log_fd)
formatter = logging.Formatter(
- '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+ '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+ )
chandler.setFormatter(formatter)
self.log.addHandler(chandler)
self.log.setLevel(log_level)
- self.log.debug('set logger, log level : %s' % log_level)
+ self.log.debug('set logger, log level : %s', log_level)
- def muc_message(self, msg):
- # ignore message from self
- body = msg['body']
- mucfrom = msg['mucnic']
+ def message(self, msg):
+ """Messages handler"""
+ if msg['type'] not in ('groupchat', 'chat'):
+ self.log.warning('Unhandled message')
+ return
if msg['mucnick'] == self.nick:
return
+ body = msg['body'].strip()
if not body.startswith(MUCBot.prefix):
return
+ if msg['from'] not in self.__seen:
+ self.log.warning('Will not handle message from unseen jid: %s', msg['from'])
+ #return
args = body[1:].split()
cmd = args.pop(0)
if cmd not in self.commands:
return
- self.log.debug('cmd: {0}'.format(cmd))
+ self.log.debug('cmd: %s', cmd)
if args:
- self.log.debug('arg: {0}'.format(args))
+ self.log.debug('arg: %s', args)
try:
- reply = self.commands[cmd](msg, args)
+ self.commands[cmd](msg, args)
except Exception as err:
reply = ''.join(traceback.format_exc())
- self.log.exception('An error occurred processing: {0}: {1}'.format(body, reply))
- self.send_message(mto=msg['from'].bare, mbody=reply, mtype='groupchat')
+ self.log.exception('An error occurred processing: %s: %s', body, reply)
+ if self.log.level < 10 and reply:
+ self.send_message(mto=msg['from'].bare, mbody=reply, mtype='groupchat')
+
+ def _view(self, pres):
+ """Track known nick"""
+ nick = pres['from']
+ status = (pres['type'], pres['status'])
+ self.__seen.update({nick: status})
def start(self, event):
"""
"""
self.get_roster()
self.send_presence()
- self.plugin['xep_0045'].joinMUC(self.room,
+ self.plugin['xep_0045'].join_muc(self.room,
self.nick,
# If a room password is needed, use:
# password=the_room_password,
if inspect.ismethod(value) and getattr(value, '_bot_command',
False):
name = getattr(value, '_bot_command_name')
- self.log.debug('Registered command: %s' % name)
+ self.log.debug('Registered command: %s', name)
self.commands[name] = value
def foreach_plugin(self, method, *args, **kwds):
for plugin in self.plugins:
- self.log.debug('shuting down %s' % plugin.__str__)
+ self.log.debug('shuting down %s', plugin.__str__)
getattr(plugin, method)(*args, **kwds)
def shutdown_plugins(self):
# TODO: why can't use event session_end|disconnected?
self.log.info('shuting down')
for plugin in self.plugins:
- self.log.debug('shuting down %s' % plugin)
+ self.log.debug('shuting down %s', plugin)
getattr(plugin, 'shutdown')()
@botcmd
Automatically assigned to the "help" command."""
help_cmd = ('Type {}help <command name>'.format(self.prefix) +
- ' to get more info about that specific command.')
+ ' to get more info about that specific command.\n\n'+
+ 'SRC: http://git.kaliko.me/sid.git')
if not args:
if self.__doc__:
description = self.__doc__.strip()
text = '{}\n\n{}'.format(description, usage)
else:
if args[0] in self.commands.keys():
- text = self.commands[args[0]].__doc__.strip() or 'undocumented'
+ text = self.commands[args[0]].__doc__ or 'undocumented'
+ text = inspect.cleandoc(text)
else:
text = 'That command is not defined.'
- return text
+ if message['type'] == 'groupchat':
+ to = message['from'].bare
+ else:
+ to = message['from']
+ self.send_message(mto=to, mbody=text, mtype=message['type'])