]> kaliko git repositories - sid.git/blob - sid/sid.py
sphinx: Add sphinx docstring
[sid.git] / sid / sid.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2007-2012 Thomas Perl <thp.io/about>
4 # Copyright (C) 2010, 2011 AnaĆ«l Verrier <elghinn@free.fr>
5 # Copyright (C) 2014, 2015, 2020 kaliko <kaliko@azylum.org>
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, version 3 only.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
20 import inspect
21 import logging
22 import traceback
23
24 import slixmpp
25
26
27 def botcmd(*args, **kwargs):
28     """Decorator for bot command functions
29
30     :param bool hidden: is the command hidden in global help
31     :param str name: command name, default to decorated function name
32     """
33
34     def decorate(func, hidden=False, name=None):
35         setattr(func, '_bot_command', True)
36         setattr(func, '_bot_command_hidden', hidden)
37         setattr(func, '_bot_command_name', name or func.__name__)
38         if func.__doc__ is None:
39             func.__doc__ = ''
40         return func
41
42     if len(args):
43         return decorate(args[0], **kwargs)
44     else:
45         return lambda func: decorate(func, **kwargs)
46
47
48 class MUCBot(slixmpp.ClientXMPP):
49     """
50     :param str jid: jid to log with
51     :param str password: jid password
52     :param str room: conference room to join
53     :param str nick: Nickname to use in the room
54     """
55
56     #: Class attribute to define bot's command prefix
57     #:
58     #: Defaults to "!"
59     prefix = '!'
60
61     def __init__(self, jid, password, room, nick, log_file=None,
62                  log_level=logging.INFO):
63         super(MUCBot, self).__init__(jid, password)
64
65         self.log = logging.getLogger(__package__)
66         self.plugins = list()
67         self.commands = dict()
68         self.room = room
69         self.nick = nick
70         self.__set_logger(log_file, log_level)
71         self.__seen = dict()
72         self.register_plugin('xep_0030')  # Service Discovery
73         self.register_plugin('xep_0045')  # Multi-User Chat
74         self.register_plugin('xep_0071')  # xhtml-im
75         self.register_plugin('xep_0199')  # self Ping
76
77         # The session_start event will be triggered when
78         # the bot establishes its connection with the server
79         # and the XML streams are ready for use. We want to
80         # listen for this event so that we we can initialize
81         # our roster.
82         self.add_event_handler('session_start', self.start)
83
84         # Handles MUC message and dispatch
85         self.add_event_handler('message', self.message)
86         self.add_event_handler('got_online', self._view)
87
88         # Discover bot internal command (ie. help)
89         for name, value in inspect.getmembers(self):
90             if inspect.ismethod(value) and getattr(value, '_bot_command', False):
91                 name = getattr(value, '_bot_command_name')
92                 self.log.debug('Registered command: %s', name)
93                 self.commands[name] = value
94
95     def __set_logger(self, log_file=None, log_level=logging.INFO):
96         """Create console/file handler"""
97         log_fd = open(log_file, 'w') if log_file else None
98         chandler = logging.StreamHandler(log_fd)
99         formatter = logging.Formatter(
100             '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
101             )
102         chandler.setFormatter(formatter)
103         self.log.addHandler(chandler)
104         self.log.setLevel(log_level)
105         self.log.debug('set logger, log level : %s', log_level)
106
107     def message(self, msg):
108         """Messages handler
109
110         Parses message received to detect :py:obj:`prefix`
111         """
112         if msg['type'] not in ('groupchat', 'chat'):
113             self.log.warning('Unhandled message')
114             return
115         if msg['mucnick'] == self.nick:
116             return
117         body = msg['body'].strip()
118         if not body.startswith(MUCBot.prefix):
119             return
120         if msg['from'] not in self.__seen:
121             self.log.warning('Will not handle message from unseen jid: %s', msg['from'])
122             #return
123         args = body[1:].split()
124         cmd = args.pop(0)
125         if cmd not in self.commands:
126             return
127         self.log.debug('cmd: %s', cmd)
128         if args:
129             self.log.debug('arg: %s', args)
130         try:
131             self.commands[cmd](msg, args)
132         except Exception as err:
133             reply = ''.join(traceback.format_exc())
134             self.log.exception('An error occurred processing: %s: %s', body, reply)
135             if self.log.level < 10 and reply:
136                 self.send_message(mto=msg['from'].bare, mbody=reply, mtype='groupchat')
137
138     def _view(self, pres):
139         """Track known nick"""
140         nick = pres['from']
141         status = (pres['type'], pres['status'])
142         self.__seen.update({nick: status})
143
144     def start(self, event):
145         """
146         Process the session_start event.
147
148         Typical actions for the session_start event are
149         requesting the roster and broadcasting an initial
150         presence stanza.
151
152         :param dict event: An empty dictionary. The session_start
153                      event does not provide any additional data.
154         """
155         self.get_roster()
156         self.send_presence()
157         self.plugin['xep_0045'].join_muc(self.room,
158                                         self.nick,
159                                         # If a room password is needed, use:
160                                         # password=the_room_password,
161                                         wait=True)
162
163         :param `sid.plugin.Plugin` plugin_cls: A :py:obj:`sid.plugin.Plugin` class
164         """
165         self.plugins.append(plugin_cls(self))
166         for name, value in inspect.getmembers(self.plugins[-1]):
167             if inspect.ismethod(value) and getattr(value, '_bot_command',
168                                                    False):
169                 name = getattr(value, '_bot_command_name')
170                 self.log.debug('Registered command: %s', name)
171                 self.commands[name] = value
172
173     def foreach_plugin(self, method, *args, **kwds):
174         for plugin in self.plugins:
175             self.log.debug('shuting down %s', plugin.__str__)
176             getattr(plugin, method)(*args, **kwds)
177
178     def shutdown_plugins(self):
179         # TODO: why can't use event session_end|disconnected?
180         self.log.info('shuting down')
181         for plugin in self.plugins:
182             self.log.debug('shuting down %s', plugin)
183             getattr(plugin, 'shutdown')()
184
185     @botcmd
186     def help(self, message, args):
187         """Returns a help string listing available options.
188
189         Automatically assigned to the "help" command."""
190         help_cmd = ('Type {}help <command name>'.format(self.prefix) +
191                     ' to get more info about that specific command.\n\n'+
192                     'SRC: http://git.kaliko.me/sid.git')
193         if not args:
194             if self.__doc__:
195                 description = self.__doc__.strip()
196             else:
197                 description = 'Available commands:'
198
199             cmd_list = list()
200             for name, cmd in self.commands.items():
201                 if name == 'help' or cmd._bot_command_hidden:
202                     continue
203                 doc = (cmd.__doc__.strip() or 'undocumented').split('\n', 1)[0]
204                 cmd_list.append('{0}: {1}'.format(name, doc))
205
206             usage = '\n'.join(cmd_list)
207             usage = usage + '\n\n' + help_cmd
208             text = '{}\n\n{}'.format(description, usage)
209         else:
210             if args[0] in self.commands.keys():
211                 text = self.commands[args[0]].__doc__ or 'undocumented'
212                 text = inspect.cleandoc(text)
213             else:
214                 text = 'That command is not defined.'
215         if message['type'] == 'groupchat':
216             to = message['from'].bare
217         else:
218             to = message['from']
219         self.send_message(mto=to, mbody=text, mtype=message['type'])