X-Git-Url: https://git.kaliko.me/?p=sid.git;a=blobdiff_plain;f=sid%2Fsid.py;h=d412ac4ad65d362b0427cdafb35a2f6485e8e7bd;hp=53fd5696976f4a60d2d01ec7ec75dfaeb4066aaf;hb=16a1747dce0c01b1659cb5e287254bcd434000ed;hpb=40a4cb2e7caa70e009736ead303ce016ddac3a71 diff --git a/sid/sid.py b/sid/sid.py index 53fd569..d412ac4 100644 --- a/sid/sid.py +++ b/sid/sid.py @@ -2,7 +2,7 @@ # Copyright (C) 2007-2012 Thomas Perl # Copyright (C) 2010, 2011 Anaël Verrier -# Copyright (C) 2014 kaliko +# Copyright (C) 2014, 2015, 2020 kaliko # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -21,11 +21,17 @@ import inspect import logging import traceback -import sleekxmpp +import slixmpp + +from sid import __url__ def botcmd(*args, **kwargs): - """Decorator for bot command functions""" + """Decorator for bot command functions + + :param bool hidden: is the command hidden in global help + :param str name: command name, default to decorated function name + """ def decorate(func, hidden=False, name=None): setattr(func, '_bot_command', True) @@ -41,39 +47,55 @@ def botcmd(*args, **kwargs): return lambda func: decorate(func, **kwargs) -class MUCBot(sleekxmpp.ClientXMPP): +class MUCBot(slixmpp.ClientXMPP): + """ + :param str jid: jid to log with + :param str password: jid password + :param str room: conference room to join + :param str nick: Nickname to use in the room + """ + #: Class attribute to define bot's command prefix + #: + #: Defaults to "!" prefix = '!' def __init__(self, jid, password, room, nick, log_file=None, - log_level=logging.INFO): + log_level=logging.INFO): super(MUCBot, self).__init__(jid, password) - self.log = logging.getLogger(__name__) + # Clean sphinx autodoc for self documentation + # (cf. MUCBot.help) + self.__doc__ = None + self.log = logging.getLogger(__package__) self.plugins = list() self.commands = dict() self.room = room self.nick = nick self.__set_logger(log_file, log_level) - self.register_plugin('xep_0030') # Service Discovery - self.register_plugin('xep_0045') # Multi-User Chat - self.register_plugin('xep_0199') # self Ping + self.__seen = dict() + self.register_plugin('xep_0030') # Service Discovery + self.register_plugin('xep_0045') # Multi-User Chat + self.register_plugin('xep_0071') # xhtml-im + self.register_plugin('xep_0199') # self Ping # The session_start event will be triggered when # the bot establishes its connection with the server # and the XML streams are ready for use. We want to # listen for this event so that we we can initialize # our roster. - self.add_event_handler("session_start", self.start) + self.add_event_handler('session_start', self.start) # Handles MUC message and dispatch - self.add_event_handler("groupchat_message", self.muc_message) + self.add_event_handler('message', self.message) + self.add_event_handler('got_online', self._view) # Discover bot internal command (ie. help) for name, value in inspect.getmembers(self): - if inspect.ismethod(value) and getattr(value, '_bot_command', False): + if inspect.ismethod(value) and \ + getattr(value, '_bot_command', False): name = getattr(value, '_bot_command_name') - self.log.debug('Registered command: %s' % name) + self.log.debug('Registered command: %s', name) self.commands[name] = value def __set_logger(self, log_file=None, log_level=logging.INFO): @@ -81,36 +103,52 @@ class MUCBot(sleekxmpp.ClientXMPP): log_fd = open(log_file, 'w') if log_file else None chandler = logging.StreamHandler(log_fd) formatter = logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(message)s') + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) chandler.setFormatter(formatter) self.log.addHandler(chandler) self.log.setLevel(log_level) - self.log.debug('set logger, log level : %s' % log_level) + self.log.debug('set logger, log level : %s', log_level) - def muc_message(self, msg): - # ignore message from self - body = msg['body'].strip() - mucfrom = msg['mucnic'] + def message(self, msg): + """Messages handler + + Parses message received to detect :py:obj:`prefix` + """ + if msg['type'] not in ('groupchat', 'chat'): + self.log.warning('Unhandled message') + return if msg['mucnick'] == self.nick: return + body = msg['body'].strip() if not body.startswith(MUCBot.prefix): return + if msg['from'] not in self.__seen: + self.log.warning('Will not handle message from unseen jid: %s', msg['from']) + #return args = body[1:].split() cmd = args.pop(0) if cmd not in self.commands: return - self.log.debug('cmd: {0}'.format(cmd)) + self.log.debug('cmd: %s', cmd) if args: - self.log.debug('arg: {0}'.format(args)) + self.log.debug('arg: %s', args) try: self.commands[cmd](msg, args) except Exception as err: reply = ''.join(traceback.format_exc()) - self.log.exception('An error occurred processing: {0}: {1}'.format(body, reply)) - if self.log.level < 20 and reply: - self.send_message(mto=msg['from'].bare, mbody=reply, mtype='groupchat') - - def start(self, event): + self.log.exception('An error occurred processing: %s: %s', body, reply) + if self.log.level < 10 and reply: + self.send_message(mto=msg['from'].bare, mbody=reply, + mtype='groupchat') + + def _view(self, pres): + """Track known nick""" + nick = pres['from'] + status = (pres['type'], pres['status']) + self.__seen.update({nick: status}) + + async def start(self, event): """ Process the session_start event. @@ -118,38 +156,40 @@ class MUCBot(sleekxmpp.ClientXMPP): requesting the roster and broadcasting an initial presence stanza. - Arguments: - event -- An empty dictionary. The session_start - event does not provide any additional - data. + :param dict event: An empty dictionary. The session_start + event does not provide any additional data. """ - self.get_roster() + await self.get_roster() self.send_presence() - self.plugin['xep_0045'].joinMUC(self.room, - self.nick, - # If a room password is needed, use: - # password=the_room_password, - wait=True) - - def register_bot_plugin(self, plugin_class): - self.plugins.append(plugin_class(self)) + self.plugin['xep_0045'].join_muc(self.room, + self.nick, + # If a room password is needed, use: + # password=the_room_password, + wait=True) + + def register_bot_plugin(self, plugin_cls): + """Registers plugin, takes a class, the method instanciates the plugin + + :param `sid.plugin.Plugin` plugin_cls: A :py:obj:`sid.plugin.Plugin` class + """ + self.plugins.append(plugin_cls(self)) for name, value in inspect.getmembers(self.plugins[-1]): - if inspect.ismethod(value) and getattr(value, '_bot_command', - False): + if inspect.ismethod(value) and \ + getattr(value, '_bot_command', False): name = getattr(value, '_bot_command_name') - self.log.debug('Registered command: %s' % name) + self.log.debug('Registered command: %s', name) self.commands[name] = value def foreach_plugin(self, method, *args, **kwds): for plugin in self.plugins: - self.log.debug('shuting down %s' % plugin.__str__) + self.log.debug('calling %s for %s', method, plugin) getattr(plugin, method)(*args, **kwds) def shutdown_plugins(self): - # TODO: why can't use event session_end|disconnected? + # TODO: also use event session_end|disconnected? self.log.info('shuting down') for plugin in self.plugins: - self.log.debug('shuting down %s' % plugin) + self.log.debug('shuting down %s', plugin) getattr(plugin, 'shutdown')() @botcmd @@ -158,7 +198,8 @@ class MUCBot(sleekxmpp.ClientXMPP): Automatically assigned to the "help" command.""" help_cmd = ('Type {}help '.format(self.prefix) + - ' to get more info about that specific command.') + ' to get more info about that specific command.\n\n' + + f'SRC: {__url__}') if not args: if self.__doc__: description = self.__doc__.strip() @@ -177,7 +218,12 @@ class MUCBot(sleekxmpp.ClientXMPP): text = '{}\n\n{}'.format(description, usage) else: if args[0] in self.commands.keys(): - text = self.commands[args[0]].__doc__.strip() or 'undocumented' + text = self.commands[args[0]].__doc__ or 'undocumented' + text = inspect.cleandoc(text) else: text = 'That command is not defined.' - self.send_message(mto=message['from'].bare, mbody=text, mtype='groupchat') + if message['type'] == 'groupchat': + to = message['from'].bare + else: + to = message['from'] + self.send_message(mto=to, mbody=text, mtype=message['type'])