]> kaliko git repositories - sid.git/blob - sid/sid.py
Bump version
[sid.git] / sid / sid.py
1 # -*- coding: utf-8 -*-
2 # SPDX-FileCopyrightText: 2007-2012 Thomas Perl <thp.io/about>
3 # SPDX-FileCopyrightText: 2010, 2011 Anaël Verrier <elghinn@free.fr>
4 # SPDX-FileCopyrightText: 2014, 2015, 2020, 2023 kaliko <kaliko@azylum.org>
5 # SPDX-License-Identifier: GPL-3.0-or-later
6
7
8 import inspect
9 import logging
10 import traceback
11
12 import slixmpp
13
14 from sid import __url__, __doc__
15
16 log = logging.getLogger(__package__)
17
18
19 def botcmd(*args, **kwargs):
20     """Decorator for bot command functions
21
22     :param bool hidden: is the command hidden in global help
23     :param str name: command name, default to decorated function name
24     """
25
26     def decorate(func, hidden=False, name=None):
27         setattr(func, '_bot_command', True)
28         setattr(func, '_bot_command_hidden', hidden)
29         setattr(func, '_bot_command_name', name or func.__name__)
30         if func.__doc__ is None:
31             func.__doc__ = ''
32         return func
33
34     if len(args):
35         return decorate(args[0], **kwargs)
36     else:
37         return lambda func: decorate(func, **kwargs)
38
39
40 class MUCBot(slixmpp.ClientXMPP):
41     """
42     :param str jid: jid to log with
43     :param str password: jid password
44     :param str room: conference room to join
45     :param str nick: Nickname to use in the room
46     """
47
48     #: Class attribute to define bot's command prefix
49     #:
50     #: Defaults to "!"
51     prefix = '!'
52
53     def __init__(self, jid, password, room, nick,
54                  log_level=logging.INFO):
55         super(MUCBot, self).__init__(jid, password)
56
57         # Clean sphinx autodoc for self documentation
58         # (cf. MUCBot.help)
59         self.__doc__ = None
60         self.log = log
61         self.plugins = list()
62         self.commands = dict()
63         self.room = room
64         self.nick = nick
65         #: Keep track of MUC presences: {'nick': presence}
66         self.muc_presences = {}
67         self.__set_logger(log_level)
68         self.__seen = dict()
69         self.register_plugin('xep_0030')  # Service Discovery
70         self.register_plugin('xep_0045')  # Multi-User Chat
71         self.register_plugin('xep_0071')  # xhtml-im
72         self.register_plugin('xep_0199')  # self Ping
73
74         # The session_start event will be triggered when
75         # the bot establishes its connection with the server
76         # and the XML streams are ready for use. We want to
77         # listen for this event so that we we can initialize
78         # our roster.
79         self.add_event_handler('session_start', self.start)
80
81         # Handles MUC message and dispatch
82         self.add_event_handler('message', self.message)
83         self.add_event_handler('got_online', self._view)
84
85         # keep track of join/parts
86         self.add_event_handler(f'muc::{self.room}::got_offline', self._muc_got_offline)
87         self.add_event_handler(f'muc::{self.room}::got_online', self._muc_got_online)
88         self.add_event_handler(f'muc::{self.room}::presence', self._muc_got_presence)
89
90         # Handles disconnection
91         self.add_event_handler('disconnected', self.disconn)
92
93         # Discover bot internal command (ie. help)
94         for name, value in inspect.getmembers(self):
95             if inspect.ismethod(value) and \
96                getattr(value, '_bot_command', False):
97                 name = getattr(value, '_bot_command_name')
98                 log.debug('Registered command: %s', name)
99                 self.commands[name] = value
100
101     def __set_logger(self, log_level):
102         """Set logging level"""
103         log.setLevel(log_level)
104         log.debug('set logger, log level : %s', log_level)
105
106     def _muc_got_online(self, pres):
107         """Keep track of MUC participants"""
108         fjid = pres['muc']['jid']
109         nick = pres['muc']['nick']
110         role = pres['muc']['role']
111         affi = pres['muc']['affiliation']
112         user = fjid if fjid.full else nick
113         self.muc_presences.update({nick: pres})
114         log.debug('Participants: + %s:%s/%s (len:%s)',
115                    user, role, affi,len(self.muc_presences))
116
117     def _muc_got_offline(self, pres):
118         """Keep track of MUC participants"""
119         fjid = pres['muc']['jid']
120         user = fjid if fjid.full else pres['muc']['nick']
121         try:
122             self.muc_presences.pop(pres['muc']['nick'])
123         except KeyError:
124             log.error('KeyError removing participant: "%s"', user)
125         log.debug('Participants: - %s got offline (len:%s)',
126                   user, len(self.muc_presences))
127
128     def _muc_got_presence(self, pres):
129         """Keep track of MUC participants"""
130         nick = pres['muc']['nick']
131         fjid = pres['muc']['jid']
132         role = pres['muc']['role']
133         affi = pres['muc']['affiliation']
134         user = fjid if fjid.full else nick
135         log.debug('Participants: u %s:%s/%s (len:%s)',
136                    user, role, affi,len(self.muc_presences))
137         self.muc_presences.update({nick: pres})
138
139     def disconn(self, event):
140         """disconnected handler"""
141         msg = ": %s" % event if event else "‽"
142         log.info('Disconnected from server%s', msg)
143         self.connect()
144
145     def message(self, msg):
146         """Messages handler
147
148         Parses message received to detect :py:obj:`prefix`
149         """
150         if msg['type'] not in ('groupchat', 'chat'):
151             log.warning('Unhandled message')
152             return
153         if msg['mucnick'] == self.nick:
154             return
155         body = msg['body'].strip()
156         if not body.startswith(MUCBot.prefix):
157             return
158         log.debug(msg['from'])
159         if msg['from'] not in self.__seen and msg['type'] == 'chat':
160             log.warning('Will not handle direct message'
161                              'from unseen jid: %s', msg['from'])
162             return
163         args = body[1:].split()
164         cmd = args.pop(0)
165         if cmd not in self.commands:
166             return
167         log.debug('cmd: %s', cmd)
168         if args:
169             log.debug('arg: %s', args)
170         try:
171             self.commands[cmd](msg, args)
172         except Exception as err:
173             reply = ''.join(traceback.format_exc())
174             log.exception('An error occurred processing: %s: %s', body, reply)
175             if log.level < 10 and reply:
176                 self.send_message(mto=msg['from'].bare, mbody=reply,
177                                   mtype='groupchat')
178
179     def _view(self, pres):
180         """Track known nick"""
181         nick = pres['from']
182         status = (pres['type'], pres['status'])
183         self.__seen.update({nick: status})
184
185     async def start(self, event):
186         """
187         Process the session_start event.
188
189         Typical actions for the session_start event are
190         requesting the roster and broadcasting an initial
191         presence stanza.
192
193         :param dict event: An empty dictionary. The session_start
194                      event does not provide any additional data.
195         """
196         await self.get_roster()
197         self.send_presence()
198         await self.plugin['xep_0045'].join_muc_wait(self.room,
199                                          self.nick,
200                                          # Do not fetch history
201                                          maxstanzas=0,
202                                          # If a room password is needed, use:
203                                          # password=the_room_password,
204                                          )
205
206     def register_bot_plugin(self, plugin_cls):
207         """Registers plugin, takes a class, the method instanciates the plugin
208
209         :param `sid.plugin.Plugin` plugin_cls: A :py:obj:`sid.plugin.Plugin` class
210         """
211         self.plugins.append(plugin_cls(self))
212         for name, value in inspect.getmembers(self.plugins[-1]):
213             if inspect.ismethod(value) and \
214                getattr(value, '_bot_command', False):
215                 name = getattr(value, '_bot_command_name')
216                 log.debug('Registered command: %s', name)
217                 self.commands[name] = value
218
219     def unregister_bot_plugin(self, plugin):
220         for name, value in inspect.getmembers(plugin):
221             if inspect.ismethod(value) and \
222                getattr(value, '_bot_command', False):
223                 name = getattr(value, '_bot_command_name')
224                 log.debug('Unegistered command: %s', name)
225                 self.commands[name] = value
226         self.plugins.remove(plugin)
227
228     def foreach_plugin(self, method, *args, **kwds):
229         for plugin in self.plugins:
230             log.debug('calling %s for %s', method, plugin)
231             getattr(plugin, method)(*args, **kwds)
232
233     def shutdown_plugins(self):
234         # TODO: also use event session_end|disconnected?
235         log.info('shuting down')
236         for plugin in self.plugins:
237             log.debug('shuting down %s', plugin)
238             getattr(plugin, 'shutdown')()
239
240     @botcmd
241     def help(self, message, args):
242         """Returns a help string listing available options.
243
244         Automatically assigned to the "help" command."""
245         help_cmd = ('Type {}help <command name>'.format(self.prefix) +
246                     ' to get more info about that specific command.\n\n' +
247                     f'DOC: {__doc__}\n' +
248                     f'SRC: {__url__}')
249         if not args:
250             if self.__doc__:
251                 description = self.__doc__.strip()
252             else:
253                 description = 'Available commands:'
254
255             cmd_list = list()
256             for name, cmd in self.commands.items():
257                 if name == 'help' or cmd._bot_command_hidden:
258                     continue
259                 doc = (cmd.__doc__.strip() or 'undocumented').split('\n', 1)[0]
260                 cmd_list.append('{0}: {1}'.format(name, doc))
261
262             usage = '\n'.join(cmd_list)
263             usage = usage + '\n\n' + help_cmd
264             text = '{}\n\n{}'.format(description, usage)
265         else:
266             if args[0] in self.commands.keys():
267                 text = self.commands[args[0]].__doc__ or 'undocumented'
268                 text = inspect.cleandoc(text)
269             else:
270                 text = 'That command is not defined.'
271         if message['type'] == 'groupchat':
272             to = message['from'].bare
273         else:
274             to = message['from']
275         self.send_message(mto=to, mbody=text, mtype=message['type'])