1 # -*- coding: utf-8 -*-
2 # SPDX-FileCopyrightText: 2007-2012 Thomas Perl <thp.io/about>
3 # SPDX-FileCopyrightText: 2010, 2011 Anaël Verrier <elghinn@free.fr>
4 # SPDX-FileCopyrightText: 2014, 2015, 2020, 2023 kaliko <kaliko@azylum.org>
5 # SPDX-License-Identifier: GPL-3.0-or-later
14 from sid import __url__, __doc__
16 log = logging.getLogger(__package__)
19 def botcmd(*args, **kwargs):
20 """Decorator for bot command functions
22 :param bool hidden: is the command hidden in global help
23 :param str name: command name, default to decorated function name
26 def decorate(func, hidden=False, name=None):
27 setattr(func, '_bot_command', True)
28 setattr(func, '_bot_command_hidden', hidden)
29 setattr(func, '_bot_command_name', name or func.__name__)
30 if func.__doc__ is None:
35 return decorate(args[0], **kwargs)
37 return lambda func: decorate(func, **kwargs)
40 class MUCBot(slixmpp.ClientXMPP):
42 :param str jid: jid to log with
43 :param str password: jid password
44 :param str room: conference room to join
45 :param str nick: Nickname to use in the room
48 #: Class attribute to define bot's command prefix
53 def __init__(self, jid, password, room, nick,
54 log_level=logging.INFO):
55 super(MUCBot, self).__init__(jid, password)
57 # Clean sphinx autodoc for self documentation
62 self.commands = dict()
65 #: Keep track of MUC presences: {'nick': presence}
66 self.muc_presences = {}
67 self.__set_logger(log_level)
69 self.register_plugin('xep_0030') # Service Discovery
70 self.register_plugin('xep_0045') # Multi-User Chat
71 self.register_plugin('xep_0071') # xhtml-im
72 self.register_plugin('xep_0199') # self Ping
74 # The session_start event will be triggered when
75 # the bot establishes its connection with the server
76 # and the XML streams are ready for use. We want to
77 # listen for this event so that we we can initialize
79 self.add_event_handler('session_start', self.start)
81 # Handles MUC message and dispatch
82 self.add_event_handler('message', self.message)
83 self.add_event_handler('got_online', self._view)
85 # keep track of join/parts
86 self.add_event_handler(f'muc::{self.room}::got_offline', self._muc_got_offline)
87 self.add_event_handler(f'muc::{self.room}::got_online', self._muc_got_online)
88 self.add_event_handler(f'muc::{self.room}::presence', self._muc_got_presence)
90 # Handles disconnection
91 self.add_event_handler('disconnected', self.disconn)
93 # Discover bot internal command (ie. help)
94 for name, value in inspect.getmembers(self):
95 if inspect.ismethod(value) and \
96 getattr(value, '_bot_command', False):
97 name = getattr(value, '_bot_command_name')
98 log.debug('Registered command: %s', name)
99 self.commands[name] = value
101 def __set_logger(self, log_level):
102 """Set logging level"""
103 log.setLevel(log_level)
104 log.debug('set logger, log level : %s', log_level)
106 def _muc_got_online(self, pres):
107 """Keep track of MUC participants"""
108 fjid = pres['muc']['jid']
109 nick = pres['muc']['nick']
110 role = pres['muc']['role']
111 affi = pres['muc']['affiliation']
112 user = fjid if fjid.full else nick
113 self.muc_presences.update({nick: pres})
114 log.debug('Participants: + %s:%s/%s (len:%s)',
115 user, role, affi,len(self.muc_presences))
117 def _muc_got_offline(self, pres):
118 """Keep track of MUC participants"""
119 fjid = pres['muc']['jid']
120 user = fjid if fjid.full else pres['muc']['nick']
122 self.muc_presences.pop(pres['muc']['nick'])
124 log.error('KeyError removing participant: "%s"', user)
125 log.debug('Participants: - %s got offline (len:%s)',
126 user, len(self.muc_presences))
128 def _muc_got_presence(self, pres):
129 """Keep track of MUC participants"""
130 nick = pres['muc']['nick']
131 fjid = pres['muc']['jid']
132 role = pres['muc']['role']
133 affi = pres['muc']['affiliation']
134 user = fjid if fjid.full else nick
135 log.debug('Participants: u %s:%s/%s (len:%s)',
136 user, role, affi,len(self.muc_presences))
137 self.muc_presences.update({nick: pres})
139 def disconn(self, event):
140 """disconnected handler"""
141 msg = ": %s" % event if event else "‽"
142 log.info('Disconnected from server%s', msg)
145 def message(self, msg):
148 Parses message received to detect :py:obj:`prefix`
150 if msg['type'] not in ('groupchat', 'chat'):
151 log.warning('Unhandled message')
153 if msg['mucnick'] == self.nick:
155 body = msg['body'].strip()
156 if not body.startswith(MUCBot.prefix):
158 log.debug(msg['from'])
159 if msg['from'] not in self.__seen and msg['type'] == 'chat':
160 log.warning('Will not handle direct message'
161 'from unseen jid: %s', msg['from'])
163 args = body[1:].split()
165 if cmd not in self.commands:
167 log.debug('cmd: %s', cmd)
169 log.debug('arg: %s', args)
171 self.commands[cmd](msg, args)
172 except Exception as err:
173 reply = ''.join(traceback.format_exc())
174 log.exception('An error occurred processing: %s: %s', body, reply)
175 if log.level < 10 and reply:
176 self.send_message(mto=msg['from'].bare, mbody=reply,
179 def _view(self, pres):
180 """Track known nick"""
182 status = (pres['type'], pres['status'])
183 self.__seen.update({nick: status})
185 async def start(self, event):
187 Process the session_start event.
189 Typical actions for the session_start event are
190 requesting the roster and broadcasting an initial
193 :param dict event: An empty dictionary. The session_start
194 event does not provide any additional data.
196 await self.get_roster()
198 await self.plugin['xep_0045'].join_muc_wait(self.room,
200 # Do not fetch history
202 # If a room password is needed, use:
203 # password=the_room_password,
206 def register_bot_plugin(self, plugin_cls):
207 """Registers plugin, takes a class, the method instanciates the plugin
209 :param `sid.plugin.Plugin` plugin_cls: A :py:obj:`sid.plugin.Plugin` class
211 self.plugins.append(plugin_cls(self))
212 for name, value in inspect.getmembers(self.plugins[-1]):
213 if inspect.ismethod(value) and \
214 getattr(value, '_bot_command', False):
215 name = getattr(value, '_bot_command_name')
216 log.debug('Registered command: %s', name)
217 self.commands[name] = value
219 def unregister_bot_plugin(self, plugin):
220 for name, value in inspect.getmembers(plugin):
221 if inspect.ismethod(value) and \
222 getattr(value, '_bot_command', False):
223 name = getattr(value, '_bot_command_name')
224 log.debug('Unegistered command: %s', name)
225 self.commands[name] = value
226 self.plugins.remove(plugin)
228 def foreach_plugin(self, method, *args, **kwds):
229 for plugin in self.plugins:
230 log.debug('calling %s for %s', method, plugin)
231 getattr(plugin, method)(*args, **kwds)
233 def shutdown_plugins(self):
234 # TODO: also use event session_end|disconnected?
235 log.info('shuting down')
236 for plugin in self.plugins:
237 log.debug('shuting down %s', plugin)
238 getattr(plugin, 'shutdown')()
241 def help(self, message, args):
242 """Returns a help string listing available options.
244 Automatically assigned to the "help" command."""
245 help_cmd = ('Type {}help <command name>'.format(self.prefix) +
246 ' to get more info about that specific command.\n\n' +
247 f'DOC: {__doc__}\n' +
251 description = self.__doc__.strip()
253 description = 'Available commands:'
256 for name, cmd in self.commands.items():
257 if name == 'help' or cmd._bot_command_hidden:
259 doc = (cmd.__doc__.strip() or 'undocumented').split('\n', 1)[0]
260 cmd_list.append('{0}: {1}'.format(name, doc))
262 usage = '\n'.join(cmd_list)
263 usage = usage + '\n\n' + help_cmd
264 text = '{}\n\n{}'.format(description, usage)
266 if args[0] in self.commands.keys():
267 text = self.commands[args[0]].__doc__ or 'undocumented'
268 text = inspect.cleandoc(text)
270 text = 'That command is not defined.'
271 if message['type'] == 'groupchat':
272 to = message['from'].bare
275 self.send_message(mto=to, mbody=text, mtype=message['type'])