]> kaliko git repositories - sid.git/blob - sid/sid.py
Cleanup logging
[sid.git] / sid / sid.py
1 # -*- coding: utf-8 -*-
2 # SPDX-FileCopyrightText: 2007-2012 Thomas Perl <thp.io/about>
3 # SPDX-FileCopyrightText: 2010, 2011 Anaël Verrier <elghinn@free.fr>
4 # SPDX-FileCopyrightText: 2014, 2015, 2020, 2023 kaliko <kaliko@azylum.org>
5 # SPDX-License-Identifier: GPL-3.0-or-later
6
7
8 import inspect
9 import logging
10 import traceback
11
12 import slixmpp
13
14 from sid import __url__, __doc__
15
16 log = logging.getLogger(__package__)
17
18
19 def botcmd(*args, **kwargs):
20     """Decorator for bot command functions
21
22     :param bool hidden: is the command hidden in global help
23     :param str name: command name, default to decorated function name
24     """
25
26     def decorate(func, hidden=False, name=None):
27         setattr(func, '_bot_command', True)
28         setattr(func, '_bot_command_hidden', hidden)
29         setattr(func, '_bot_command_name', name or func.__name__)
30         if func.__doc__ is None:
31             func.__doc__ = ''
32         return func
33
34     if len(args):
35         return decorate(args[0], **kwargs)
36     else:
37         return lambda func: decorate(func, **kwargs)
38
39
40 class MUCBot(slixmpp.ClientXMPP):
41     """
42     :param str jid: jid to log with
43     :param str password: jid password
44     :param str room: conference room to join
45     :param str nick: Nickname to use in the room
46     """
47
48     #: Class attribute to define bot's command prefix
49     #:
50     #: Defaults to "!"
51     prefix = '!'
52
53     def __init__(self, jid, password, room, nick,
54                  log_level=logging.INFO):
55         super(MUCBot, self).__init__(jid, password)
56
57         # Clean sphinx autodoc for self documentation
58         # (cf. MUCBot.help)
59         self.__doc__ = None
60         self.log = log
61         self.plugins = list()
62         self.commands = dict()
63         self.room = room
64         self.nick = nick
65         self.__set_logger(log_level)
66         self.__seen = dict()
67         self.register_plugin('xep_0030')  # Service Discovery
68         self.register_plugin('xep_0045')  # Multi-User Chat
69         self.register_plugin('xep_0071')  # xhtml-im
70         self.register_plugin('xep_0199')  # self Ping
71
72         # The session_start event will be triggered when
73         # the bot establishes its connection with the server
74         # and the XML streams are ready for use. We want to
75         # listen for this event so that we we can initialize
76         # our roster.
77         self.add_event_handler('session_start', self.start)
78
79         # Handles MUC message and dispatch
80         self.add_event_handler('message', self.message)
81         self.add_event_handler('got_online', self._view)
82
83         # Handles disconnection
84         self.add_event_handler('disconnected', self.disconn)
85
86         # Discover bot internal command (ie. help)
87         for name, value in inspect.getmembers(self):
88             if inspect.ismethod(value) and \
89                getattr(value, '_bot_command', False):
90                 name = getattr(value, '_bot_command_name')
91                 log.debug('Registered command: %s', name)
92                 self.commands[name] = value
93
94     def __set_logger(self, log_level):
95         """Set logging level"""
96         log.setLevel(log_level)
97         log.debug('set logger, log level : %s', log_level)
98
99     def disconn(self, event):
100         """disconnected handler"""
101         msg = ": %s" % event if event else "‽"
102         log.info('Disconnected from server%s', msg)
103         self.connect()
104
105     def message(self, msg):
106         """Messages handler
107
108         Parses message received to detect :py:obj:`prefix`
109         """
110         if msg['type'] not in ('groupchat', 'chat'):
111             log.warning('Unhandled message')
112             return
113         if msg['mucnick'] == self.nick:
114             return
115         body = msg['body'].strip()
116         if not body.startswith(MUCBot.prefix):
117             return
118         log.debug(msg['from'])
119         if msg['from'] not in self.__seen and msg['type'] == 'chat':
120             log.warning('Will not handle direct message'
121                              'from unseen jid: %s', msg['from'])
122             return
123         args = body[1:].split()
124         cmd = args.pop(0)
125         if cmd not in self.commands:
126             return
127         log.debug('cmd: %s', cmd)
128         if args:
129             log.debug('arg: %s', args)
130         try:
131             self.commands[cmd](msg, args)
132         except Exception as err:
133             reply = ''.join(traceback.format_exc())
134             log.exception('An error occurred processing: %s: %s', body, reply)
135             if log.level < 10 and reply:
136                 self.send_message(mto=msg['from'].bare, mbody=reply,
137                                   mtype='groupchat')
138
139     def _view(self, pres):
140         """Track known nick"""
141         nick = pres['from']
142         status = (pres['type'], pres['status'])
143         self.__seen.update({nick: status})
144
145     async def start(self, event):
146         """
147         Process the session_start event.
148
149         Typical actions for the session_start event are
150         requesting the roster and broadcasting an initial
151         presence stanza.
152
153         :param dict event: An empty dictionary. The session_start
154                      event does not provide any additional data.
155         """
156         await self.get_roster()
157         self.send_presence()
158         await self.plugin['xep_0045'].join_muc_wait(self.room,
159                                          self.nick,
160                                          # Do not fetch history
161                                          maxstanzas=0,
162                                          # If a room password is needed, use:
163                                          # password=the_room_password,
164                                          )
165
166     def register_bot_plugin(self, plugin_cls):
167         """Registers plugin, takes a class, the method instanciates the plugin
168
169         :param `sid.plugin.Plugin` plugin_cls: A :py:obj:`sid.plugin.Plugin` class
170         """
171         self.plugins.append(plugin_cls(self))
172         for name, value in inspect.getmembers(self.plugins[-1]):
173             if inspect.ismethod(value) and \
174                getattr(value, '_bot_command', False):
175                 name = getattr(value, '_bot_command_name')
176                 log.debug('Registered command: %s', name)
177                 self.commands[name] = value
178
179     def foreach_plugin(self, method, *args, **kwds):
180         for plugin in self.plugins:
181             log.debug('calling %s for %s', method, plugin)
182             getattr(plugin, method)(*args, **kwds)
183
184     def shutdown_plugins(self):
185         # TODO: also use event session_end|disconnected?
186         log.info('shuting down')
187         for plugin in self.plugins:
188             log.debug('shuting down %s', plugin)
189             getattr(plugin, 'shutdown')()
190
191     @botcmd
192     def help(self, message, args):
193         """Returns a help string listing available options.
194
195         Automatically assigned to the "help" command."""
196         help_cmd = ('Type {}help <command name>'.format(self.prefix) +
197                     ' to get more info about that specific command.\n\n' +
198                     f'DOC: {__doc__}\n' +
199                     f'SRC: {__url__}')
200         if not args:
201             if self.__doc__:
202                 description = self.__doc__.strip()
203             else:
204                 description = 'Available commands:'
205
206             cmd_list = list()
207             for name, cmd in self.commands.items():
208                 if name == 'help' or cmd._bot_command_hidden:
209                     continue
210                 doc = (cmd.__doc__.strip() or 'undocumented').split('\n', 1)[0]
211                 cmd_list.append('{0}: {1}'.format(name, doc))
212
213             usage = '\n'.join(cmd_list)
214             usage = usage + '\n\n' + help_cmd
215             text = '{}\n\n{}'.format(description, usage)
216         else:
217             if args[0] in self.commands.keys():
218                 text = self.commands[args[0]].__doc__ or 'undocumented'
219                 text = inspect.cleandoc(text)
220             else:
221                 text = 'That command is not defined.'
222         if message['type'] == 'groupchat':
223             to = message['from'].bare
224         else:
225             to = message['from']
226         self.send_message(mto=to, mbody=text, mtype=message['type'])