1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2007-2012 Thomas Perl <thp.io/about>
4 # Copyright (C) 2010, 2011 Anaƫl Verrier <elghinn@free.fr>
5 # Copyright (C) 2014, 2015, 2020 kaliko <kaliko@azylum.org>
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, version 3 only.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 from sid import __url__
29 def botcmd(*args, **kwargs):
30 """Decorator for bot command functions
32 :param bool hidden: is the command hidden in global help
33 :param str name: command name, default to decorated function name
36 def decorate(func, hidden=False, name=None):
37 setattr(func, '_bot_command', True)
38 setattr(func, '_bot_command_hidden', hidden)
39 setattr(func, '_bot_command_name', name or func.__name__)
40 if func.__doc__ is None:
45 return decorate(args[0], **kwargs)
47 return lambda func: decorate(func, **kwargs)
50 class MUCBot(slixmpp.ClientXMPP):
52 :param str jid: jid to log with
53 :param str password: jid password
54 :param str room: conference room to join
55 :param str nick: Nickname to use in the room
58 #: Class attribute to define bot's command prefix
63 def __init__(self, jid, password, room, nick, log_file=None,
64 log_level=logging.INFO):
65 super(MUCBot, self).__init__(jid, password)
67 # Clean sphinx autodoc for self documentation
70 self.log = logging.getLogger(__package__)
72 self.commands = dict()
75 self.__set_logger(log_file, log_level)
77 self.register_plugin('xep_0030') # Service Discovery
78 self.register_plugin('xep_0045') # Multi-User Chat
79 self.register_plugin('xep_0071') # xhtml-im
80 self.register_plugin('xep_0199') # self Ping
82 # The session_start event will be triggered when
83 # the bot establishes its connection with the server
84 # and the XML streams are ready for use. We want to
85 # listen for this event so that we we can initialize
87 self.add_event_handler('session_start', self.start)
89 # Handles MUC message and dispatch
90 self.add_event_handler('message', self.message)
91 self.add_event_handler('got_online', self._view)
93 # Discover bot internal command (ie. help)
94 for name, value in inspect.getmembers(self):
95 if inspect.ismethod(value) and \
96 getattr(value, '_bot_command', False):
97 name = getattr(value, '_bot_command_name')
98 self.log.debug('Registered command: %s', name)
99 self.commands[name] = value
101 def __set_logger(self, log_file=None, log_level=logging.INFO):
102 """Create console/file handler"""
103 log_fd = open(log_file, 'w') if log_file else None
104 chandler = logging.StreamHandler(log_fd)
105 formatter = logging.Formatter(
106 '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
108 chandler.setFormatter(formatter)
109 self.log.addHandler(chandler)
110 self.log.setLevel(log_level)
111 self.log.debug('set logger, log level : %s', log_level)
113 def message(self, msg):
116 Parses message received to detect :py:obj:`prefix`
118 if msg['type'] not in ('groupchat', 'chat'):
119 self.log.warning('Unhandled message')
121 if msg['mucnick'] == self.nick:
123 body = msg['body'].strip()
124 if not body.startswith(MUCBot.prefix):
126 self.log.debug(msg['from'])
127 if msg['from'] not in self.__seen and msg['type'] == 'chat':
128 self.log.warning('Will not handle direct message'
129 'from unseen jid: %s', msg['from'])
131 args = body[1:].split()
133 if cmd not in self.commands:
135 self.log.debug('cmd: %s', cmd)
137 self.log.debug('arg: %s', args)
139 self.commands[cmd](msg, args)
140 except Exception as err:
141 reply = ''.join(traceback.format_exc())
142 self.log.exception('An error occurred processing: %s: %s', body, reply)
143 if self.log.level < 10 and reply:
144 self.send_message(mto=msg['from'].bare, mbody=reply,
147 def _view(self, pres):
148 """Track known nick"""
150 status = (pres['type'], pres['status'])
151 self.__seen.update({nick: status})
153 async def start(self, event):
155 Process the session_start event.
157 Typical actions for the session_start event are
158 requesting the roster and broadcasting an initial
161 :param dict event: An empty dictionary. The session_start
162 event does not provide any additional data.
164 await self.get_roster()
166 self.plugin['xep_0045'].join_muc(self.room,
168 # If a room password is needed, use:
169 # password=the_room_password,
172 def register_bot_plugin(self, plugin_cls):
173 """Registers plugin, takes a class, the method instanciates the plugin
175 :param `sid.plugin.Plugin` plugin_cls: A :py:obj:`sid.plugin.Plugin` class
177 self.plugins.append(plugin_cls(self))
178 for name, value in inspect.getmembers(self.plugins[-1]):
179 if inspect.ismethod(value) and \
180 getattr(value, '_bot_command', False):
181 name = getattr(value, '_bot_command_name')
182 self.log.debug('Registered command: %s', name)
183 self.commands[name] = value
185 def foreach_plugin(self, method, *args, **kwds):
186 for plugin in self.plugins:
187 self.log.debug('calling %s for %s', method, plugin)
188 getattr(plugin, method)(*args, **kwds)
190 def shutdown_plugins(self):
191 # TODO: also use event session_end|disconnected?
192 self.log.info('shuting down')
193 for plugin in self.plugins:
194 self.log.debug('shuting down %s', plugin)
195 getattr(plugin, 'shutdown')()
198 def help(self, message, args):
199 """Returns a help string listing available options.
201 Automatically assigned to the "help" command."""
202 help_cmd = ('Type {}help <command name>'.format(self.prefix) +
203 ' to get more info about that specific command.\n\n' +
207 description = self.__doc__.strip()
209 description = 'Available commands:'
212 for name, cmd in self.commands.items():
213 if name == 'help' or cmd._bot_command_hidden:
215 doc = (cmd.__doc__.strip() or 'undocumented').split('\n', 1)[0]
216 cmd_list.append('{0}: {1}'.format(name, doc))
218 usage = '\n'.join(cmd_list)
219 usage = usage + '\n\n' + help_cmd
220 text = '{}\n\n{}'.format(description, usage)
222 if args[0] in self.commands.keys():
223 text = self.commands[args[0]].__doc__ or 'undocumented'
224 text = inspect.cleandoc(text)
226 text = 'That command is not defined.'
227 if message['type'] == 'groupchat':
228 to = message['from'].bare
231 self.send_message(mto=to, mbody=text, mtype=message['type'])