]> kaliko git repositories - sid.git/blob - sid/rtbl.py
99095eacf679244a2b221afe8f2905e3fe916206
[sid.git] / sid / rtbl.py
1 # -*- coding: utf-8 -*-
2 # SPDX-FileCopyrightText: 2023 kaliko <kaliko@azylum.org>
3 # SPDX-License-Identifier: GPL-3.0-or-later
4 """A Real Time Block List plugin"""
5
6 from hashlib import sha256
7 from typing import Dict, Optional
8
9 from slixmpp import JID
10 from slixmpp.exceptions import XMPPError
11 from slixmpp.xmlstream import tostring
12
13 from .plugin import Plugin, botcmd
14
15
16 def jid_to_sha256(jid: JID) -> str:
17     """Convert Bare JID to sha256 hexdigest"""
18     return sha256(jid.bare.encode('utf-8')).hexdigest()
19
20
21 class BL:
22     """Plain object to keep track of block list items.
23     Only used in RTBL plugin."""
24     #: Initial seed to ease testing
25     init = {}
26
27     def __init__(self, initial_bl):
28         self.sha256_jids: Dict[str, Optional[str]] = dict(BL.init)
29         for item in initial_bl:
30             self.insert_item(item)
31
32     def check(self, jid: JID) -> bool:
33         """Check the presence of the JID in the blocklist"""
34         jidhash = jid_to_sha256(jid)
35         return jidhash in self.sha256_jids
36
37     def retract_item(self, item):
38         """remove bl item"""
39         self.sha256_jids.pop(item[id], None)
40
41     def insert_item(self, item):
42         """insert bl item"""
43         text = None
44         for i in item['payload']:
45             try:
46                 text = i.text
47                 break
48             except AttributeError:
49                 continue
50         self.sha256_jids[item['id']] = text
51
52     def get_reason(self, jid: JID) -> Optional[str]:
53         """Check the presence of the JID in the blocklist"""
54         jidhash = jid_to_sha256(jid)
55         # Raises if item does not exist
56         return self.sha256_jids[jidhash]
57
58     def __len__(self):
59         """Implement the built-in function len(), use for boolean evaluation"""
60         return len(self.sha256_jids)
61
62
63 class RTBL(Plugin):
64     """Spam guard plugin for MUC.
65     """
66     #: Pubsub server
67     pubsub_server = 'example.org'
68     #: Pubsub server node to subscribe to
69     node = 'muc_bans_sha256'
70
71     def __init__(self, bot):
72         Plugin.__init__(self, bot)
73         bot.register_plugin('xep_0059')  # Result Set Management
74         bot.register_plugin('xep_0060')  # Publish-Subscribe
75         self.handlers = [
76                 ('session_start', self._subscribe),
77                 ('pubsub_retract', self._retract),
78                 ('pubsub_publish', self._publish),
79                 (f'muc::{self.bot.room}::presence', self.got_presence),
80                 (f'muc::{self.bot.room}::got_offline', self.got_offline),
81                 (f'muc::{self.bot.room}::got_online', self.got_online)
82         ]
83         self.add_handlers()
84         self.bot = bot
85         self.participants = set()
86         self.moderator = False
87         self.blocklist: BL = None
88         self.hits = 0
89         self.presences = {}
90
91     def _exit(self):
92         self.rm_handlers()
93         self.bot.unregister_bot_plugin(self)
94
95     async def _subscribe(self, *args):
96         try:
97             nodes = await self.bot['xep_0060'].get_nodes(self.pubsub_server)
98             nodes_av = [_.get('node') for _ in nodes['disco_items']]
99             self.log.debug(f'nodes available: {nodes_av}')
100             if self.node not in nodes_av:
101                 self.log.error(f'{self.node} node not available on {self.pubsub_server}')
102                 await self._create()
103             iq = await self.bot['xep_0060'].subscribe(self.pubsub_server, self.node)
104             subscription = iq['pubsub']['subscription']
105             self.log.info('Subscribed %s to node %s', subscription['jid'], subscription['node'])
106         except XMPPError as error:
107             self.log.error('Could not subscribe %s to node %s: %s',
108                            self.bot.boundjid.full, self.node, error.format())
109             self._exit()
110             return
111         node_blocklist = await self.bot['xep_0060'].get_items(self.pubsub_server, self.node)
112         self.blocklist = BL(node_blocklist['pubsub']['items'])
113         mess = f'Got {len(self.blocklist)} items in block list'
114         self.log.info(mess)
115         # Are current participants in the block list
116         for jid in list(self.participants):
117             await self.rtbl_ban(jid)
118
119     async def _create(self):
120         """Try to create node"""
121         try:
122             await self.bot['xep_0060'].create_node(self.pubsub_server, self.node)
123             self.log.info('Created node %s', self.node)
124         except XMPPError as err:
125             self.log.error('Could not create node %s: %s', self.node, err.format())
126             raise XMPPError(f'Could not create node {self.node}') from err
127
128     def _retract(self, msg):
129         """Handler receiving a retract item event."""
130         self.log.debug('Retracted item %s from %s' % (
131             msg['pubsub_event']['items']['retract']['id'],
132             msg['pubsub_event']['items']['node']))
133         self.blocklist.retract_item(msg['pubsub_event']['items']['retract'])
134
135     async def _publish(self, msg):
136         """Handler receiving a publish item event."""
137         self.log.debug('Published item %s to %s:' % (
138               msg['pubsub_event']['items']['item']['id'],
139               msg['pubsub_event']['items']['node']))
140         data = msg['pubsub_event']['items']['item']['payload']
141         if data is not None:
142             self.log.debug(tostring(data))
143         else:
144             self.log.debug('No item content')
145             return
146         self.blocklist.insert_item(msg['pubsub_event']['items']['item'])
147         # Are current participants in the block list
148         for jid in list(self.participants):
149             await self.rtbl_ban(jid)
150
151     async def rtbl_ban(self, jid):
152         """Ban jid in RTBL"""
153         if not self.moderator:
154             return
155         if self.blocklist is None:
156             self.log.info('Not checking %s, block list not populated yet', jid)
157             return
158         if self.blocklist.check(jid):
159             self.log.debug(f'About to ban {jid}')
160             reason = self.blocklist.get_reason(jid)
161             if reason is not None:
162                 reason = f'rtbl {reason}'
163             await self.ban(jid.bare, reason=reason)
164             self.hits += 1
165             self.log.info(f'{jid} banned!')
166
167     def got_offline(self, pres):
168         """Handler method for leaving MUC participants"""
169         fjid = pres['muc']['jid']
170         user = fjid if fjid.full else pres['muc']['nick']
171         try:
172             self.participants.remove(user)
173             self.presences.pop(pres['muc']['nick'])
174         except KeyError:
175             self.log.error('KeyError removing participant: "%s"', user)
176         self.log.debug(f'participants: - {user} got offline (len:{len(self.participants)})')
177
178     def got_presence(self, pres):
179         """Does bot have required permissions"""
180         if 110 in pres['muc']['status_codes']:
181             if pres['muc']['role'] != 'moderator':
182                 self.log.error(
183                     'Please give the bot moderator permissions. Will only log actions.'
184                 )
185                 self.moderator = False
186                 return
187             else:
188                 self.log.info('Got moderator permissions.')
189                 self.moderator = True
190         nick = pres['muc']['nick']
191         fjid = pres['muc']['jid']
192         role = pres['muc']['role']
193         affi = pres['muc']['affiliation']
194         user = fjid if fjid.full else nick
195         self.log.debug(f'participants: u {user}:{role}/{affi} (len:{len(self.participants)})')
196         self.presences.update({nick: pres})
197
198     async def got_online(self, pres):
199         """Handler method for new MUC participants"""
200         fjid = pres['muc']['jid']
201         nick = pres['muc']['nick']
202         role = pres['muc']['role']
203         affi = pres['muc']['affiliation']
204         user = fjid if fjid.full else nick
205         self.participants.add(user)
206         self.presences.update({nick: pres})
207         self.log.debug(f'participants: + {user}:{role}/{affi} (len:{len(self.participants)})')
208         await self.rtbl_ban(user)
209
210     @botcmd(name="rtbl-info")
211     def rtbl_info(self, rcv, _):
212         """Show RTBL info"""
213         if self.blocklist is None:
214             msg = 'Block list not populated yet'
215             self.log.warning(msg)
216             self.reply(rcv, msg)
217             return
218         msg = f'Got {len(self.blocklist)} items in {RTBL.pubsub_server}/{RTBL.node}'
219         if self.hits > 0:
220             msg+=f' (hits {self.hits})'
221         if not self.moderator:
222             msg+='\nBot has no moderator permissions!'
223         self.reply(rcv, msg)
224
225
226 if __name__ == '__main__':
227     from .cli.rtbl import main
228     main()
229
230 # VIM MODLINE
231 # vim: ai ts=4 sw=4 sts=4 expandtab