]> kaliko git repositories - sid.git/blob - sid/sid.py
Switch to SPDX headers
[sid.git] / sid / sid.py
1 # -*- coding: utf-8 -*-
2 # SPDX-FileCopyrightText: 2007-2012 Thomas Perl <thp.io/about>
3 # SPDX-FileCopyrightText: 2010, 2011 Anaël Verrier <elghinn@free.fr>
4 # SPDX-FileCopyrightText: 2014, 2015, 2020, 2023 kaliko <kaliko@azylum.org>
5 # SPDX-License-Identifier: GPL-3.0-or-later
6
7
8 import inspect
9 import logging
10 import traceback
11
12 import slixmpp
13
14 from sid import __url__, __doc__
15
16
17 def botcmd(*args, **kwargs):
18     """Decorator for bot command functions
19
20     :param bool hidden: is the command hidden in global help
21     :param str name: command name, default to decorated function name
22     """
23
24     def decorate(func, hidden=False, name=None):
25         setattr(func, '_bot_command', True)
26         setattr(func, '_bot_command_hidden', hidden)
27         setattr(func, '_bot_command_name', name or func.__name__)
28         if func.__doc__ is None:
29             func.__doc__ = ''
30         return func
31
32     if len(args):
33         return decorate(args[0], **kwargs)
34     else:
35         return lambda func: decorate(func, **kwargs)
36
37
38 class MUCBot(slixmpp.ClientXMPP):
39     """
40     :param str jid: jid to log with
41     :param str password: jid password
42     :param str room: conference room to join
43     :param str nick: Nickname to use in the room
44     """
45
46     #: Class attribute to define bot's command prefix
47     #:
48     #: Defaults to "!"
49     prefix = '!'
50
51     def __init__(self, jid, password, room, nick, log_file=None,
52                  log_level=logging.INFO):
53         super(MUCBot, self).__init__(jid, password)
54
55         # Clean sphinx autodoc for self documentation
56         # (cf. MUCBot.help)
57         self.__doc__ = None
58         self.log = logging.getLogger(__package__)
59         self.plugins = list()
60         self.commands = dict()
61         self.room = room
62         self.nick = nick
63         self.__set_logger(log_file, log_level)
64         self.__seen = dict()
65         self.register_plugin('xep_0030')  # Service Discovery
66         self.register_plugin('xep_0045')  # Multi-User Chat
67         self.register_plugin('xep_0071')  # xhtml-im
68         self.register_plugin('xep_0199')  # self Ping
69
70         # The session_start event will be triggered when
71         # the bot establishes its connection with the server
72         # and the XML streams are ready for use. We want to
73         # listen for this event so that we we can initialize
74         # our roster.
75         self.add_event_handler('session_start', self.start)
76
77         # Handles MUC message and dispatch
78         self.add_event_handler('message', self.message)
79         self.add_event_handler('got_online', self._view)
80
81         # Handles disconnection
82         self.add_event_handler('disconnected', self.disconn)
83
84         # Discover bot internal command (ie. help)
85         for name, value in inspect.getmembers(self):
86             if inspect.ismethod(value) and \
87                getattr(value, '_bot_command', False):
88                 name = getattr(value, '_bot_command_name')
89                 self.log.debug('Registered command: %s', name)
90                 self.commands[name] = value
91
92     def __set_logger(self, log_file=None, log_level=logging.INFO):
93         """Create console/file handler"""
94         log_fd = open(log_file, 'w') if log_file else None
95         chandler = logging.StreamHandler(log_fd)
96         formatter = logging.Formatter(
97             '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
98             )
99         chandler.setFormatter(formatter)
100         self.log.addHandler(chandler)
101         self.log.setLevel(log_level)
102         self.log.debug('set logger, log level : %s', log_level)
103
104     def disconn(self, event):
105         """disconnected handler"""
106         msg = ": %s" % event if event else "‽"
107         self.log.info('Disconnected from server%s', msg)
108         self.connect()
109
110     def message(self, msg):
111         """Messages handler
112
113         Parses message received to detect :py:obj:`prefix`
114         """
115         if msg['type'] not in ('groupchat', 'chat'):
116             self.log.warning('Unhandled message')
117             return
118         if msg['mucnick'] == self.nick:
119             return
120         body = msg['body'].strip()
121         if not body.startswith(MUCBot.prefix):
122             return
123         self.log.debug(msg['from'])
124         if msg['from'] not in self.__seen and msg['type'] == 'chat':
125             self.log.warning('Will not handle direct message'
126                              'from unseen jid: %s', msg['from'])
127             return
128         args = body[1:].split()
129         cmd = args.pop(0)
130         if cmd not in self.commands:
131             return
132         self.log.debug('cmd: %s', cmd)
133         if args:
134             self.log.debug('arg: %s', args)
135         try:
136             self.commands[cmd](msg, args)
137         except Exception as err:
138             reply = ''.join(traceback.format_exc())
139             self.log.exception('An error occurred processing: %s: %s', body, reply)
140             if self.log.level < 10 and reply:
141                 self.send_message(mto=msg['from'].bare, mbody=reply,
142                                   mtype='groupchat')
143
144     def _view(self, pres):
145         """Track known nick"""
146         nick = pres['from']
147         status = (pres['type'], pres['status'])
148         self.__seen.update({nick: status})
149
150     async def start(self, event):
151         """
152         Process the session_start event.
153
154         Typical actions for the session_start event are
155         requesting the roster and broadcasting an initial
156         presence stanza.
157
158         :param dict event: An empty dictionary. The session_start
159                      event does not provide any additional data.
160         """
161         await self.get_roster()
162         self.send_presence()
163         await self.plugin['xep_0045'].join_muc_wait(self.room,
164                                          self.nick,
165                                          # Do not fetch history
166                                          seconds=0,
167                                          # If a room password is needed, use:
168                                          # password=the_room_password,
169                                          )
170
171     def register_bot_plugin(self, plugin_cls):
172         """Registers plugin, takes a class, the method instanciates the plugin
173
174         :param `sid.plugin.Plugin` plugin_cls: A :py:obj:`sid.plugin.Plugin` class
175         """
176         self.plugins.append(plugin_cls(self))
177         for name, value in inspect.getmembers(self.plugins[-1]):
178             if inspect.ismethod(value) and \
179                getattr(value, '_bot_command', False):
180                 name = getattr(value, '_bot_command_name')
181                 self.log.debug('Registered command: %s', name)
182                 self.commands[name] = value
183
184     def foreach_plugin(self, method, *args, **kwds):
185         for plugin in self.plugins:
186             self.log.debug('calling %s for %s', method, plugin)
187             getattr(plugin, method)(*args, **kwds)
188
189     def shutdown_plugins(self):
190         # TODO: also use event session_end|disconnected?
191         self.log.info('shuting down')
192         for plugin in self.plugins:
193             self.log.debug('shuting down %s', plugin)
194             getattr(plugin, 'shutdown')()
195
196     @botcmd
197     def help(self, message, args):
198         """Returns a help string listing available options.
199
200         Automatically assigned to the "help" command."""
201         help_cmd = ('Type {}help <command name>'.format(self.prefix) +
202                     ' to get more info about that specific command.\n\n' +
203                     f'DOC: {__doc__}\n' +
204                     f'SRC: {__url__}')
205         if not args:
206             if self.__doc__:
207                 description = self.__doc__.strip()
208             else:
209                 description = 'Available commands:'
210
211             cmd_list = list()
212             for name, cmd in self.commands.items():
213                 if name == 'help' or cmd._bot_command_hidden:
214                     continue
215                 doc = (cmd.__doc__.strip() or 'undocumented').split('\n', 1)[0]
216                 cmd_list.append('{0}: {1}'.format(name, doc))
217
218             usage = '\n'.join(cmd_list)
219             usage = usage + '\n\n' + help_cmd
220             text = '{}\n\n{}'.format(description, usage)
221         else:
222             if args[0] in self.commands.keys():
223                 text = self.commands[args[0]].__doc__ or 'undocumented'
224                 text = inspect.cleandoc(text)
225             else:
226                 text = 'That command is not defined.'
227         if message['type'] == 'groupchat':
228             to = message['from'].bare
229         else:
230             to = message['from']
231         self.send_message(mto=to, mbody=text, mtype=message['type'])