]> kaliko git repositories - sid.git/blob - sid/rtbl.py
doc: Improve and update
[sid.git] / sid / rtbl.py
1 # -*- coding: utf-8 -*-
2 # SPDX-FileCopyrightText: 2023 kaliko <kaliko@azylum.org>
3 # SPDX-License-Identifier: GPL-3.0-or-later
4 """A Real Time Block List plugin, cf https://xmppbl.org.
5
6 >>> from sid.rtbl import RTBL, BL
7 >>> RTBL.pubsub_server = 'xmppbl.org'
8 >>> # Optional: Node to subcribe, defaults to 'muc_bans_sha256'
9 >>> RTBL.node = 'muc_bans_sha256'
10 >>> # Optional: Add this JID hash to the list, default is empty, usefull for test
11 >>> BL.init = {'1312b8ca593cd074f39ef15cc8442cdf426b21480958836d9ab678ca45ed1312': 'Test!'}
12 >>> # Optional: Set plugin log level, default inherit from the bot
13 >>> RTBL.log_level = logging.DEBUG
14 """
15
16 from hashlib import sha256
17 from typing import Dict, Optional
18
19 from slixmpp import JID
20 from slixmpp.exceptions import XMPPError
21 from slixmpp.xmlstream import tostring
22
23 from .plugin import Plugin, botcmd
24
25
26 def jid_to_sha256(jid: JID) -> str:
27     """Convert Bare JID to sha256 hexdigest"""
28     return sha256(jid.bare.encode('utf-8')).hexdigest()
29
30
31 class BL:
32     """Plain object to keep track of block list items.
33     Only used in RTBL plugin."""
34     #: Initial seed to ease testing
35     init = {}
36
37     def __init__(self, initial_bl):
38         self.sha256_jids: Dict[str, Optional[str]] = dict(BL.init)
39         for item in initial_bl:
40             self.insert_item(item)
41
42     def check(self, jid: JID) -> bool:
43         """Check the presence of the JID in the blocklist"""
44         jidhash = jid_to_sha256(jid)
45         return jidhash in self.sha256_jids
46
47     def retract_item(self, item):
48         """remove bl item"""
49         self.sha256_jids.pop(item[id], None)
50
51     def insert_item(self, item):
52         """insert bl item"""
53         text = None
54         for i in item['payload']:
55             try:
56                 text = i.text
57                 break
58             except AttributeError:
59                 continue
60         self.sha256_jids[item['id']] = text
61
62     def get_reason(self, jid: JID) -> Optional[str]:
63         """Check the presence of the JID in the blocklist"""
64         jidhash = jid_to_sha256(jid)
65         return self.sha256_jids[jidhash]
66
67     def __len__(self):
68         """Implement the built-in function len(), use for boolean evaluation"""
69         return len(self.sha256_jids)
70
71
72 class RTBL(Plugin):
73     """Spam guard plugin for MUC.
74     """
75     #: Pubsub server
76     pubsub_server = 'example.org'
77     #: Pubsub server node to subscribe to
78     node = 'muc_bans_sha256'
79
80     def __init__(self, bot):
81         Plugin.__init__(self, bot)
82         bot.register_plugin('xep_0059')  # Result Set Management
83         bot.register_plugin('xep_0060')  # Publish-Subscribe
84         self.handlers = [
85                 ('session_start', self._subscribe),
86                 ('pubsub_retract', self._retract),
87                 ('pubsub_publish', self._publish),
88                 (f'muc::{self.bot.room}::presence', self.got_presence),
89                 (f'muc::{self.bot.room}::got_online', self.got_online)
90         ]
91         self.add_handlers()
92         self.bot = bot
93         self.moderator = False
94         self.blocklist: BL = None
95         self.hits = 0
96         self.presences = bot.muc_presences
97
98     def _exit(self):
99         self.rm_handlers()
100         self.bot.unregister_bot_plugin(self)
101
102     async def _subscribe(self, *args):
103         try:
104             nodes = await self.bot['xep_0060'].get_nodes(self.pubsub_server)
105             nodes_av = [_.get('node') for _ in nodes['disco_items']]
106             self.log.debug(f'nodes available: {nodes_av}')
107             if self.node not in nodes_av:
108                 self.log.error(f'{self.node} node not available on {self.pubsub_server}')
109                 await self._create()
110             iq = await self.bot['xep_0060'].subscribe(self.pubsub_server, self.node)
111             subscription = iq['pubsub']['subscription']
112             self.log.info('Subscribed %s to node %s', subscription['jid'], subscription['node'])
113         except XMPPError as error:
114             self.log.error('Could not subscribe %s to node %s: %s',
115                            self.bot.boundjid.full, self.node, error.format())
116             self._exit()
117             return
118         node_blocklist = await self.bot['xep_0060'].get_items(self.pubsub_server, self.node)
119         self.blocklist = BL(node_blocklist['pubsub']['items'])
120         mess = f'Got {len(self.blocklist)} items in block list'
121         self.log.info(mess)
122         # Are current participants in the block list
123         for jid in [pres['muc']['jid'] for pres in self.presences.values()]:
124             await self.rtbl_ban(jid)
125
126     async def _create(self):
127         """Try to create node"""
128         try:
129             await self.bot['xep_0060'].create_node(self.pubsub_server, self.node)
130             self.log.info('Created node %s', self.node)
131         except XMPPError as err:
132             self.log.error('Could not create node %s: %s', self.node, err.format())
133             raise XMPPError(f'Could not create node {self.node}') from err
134
135     def _retract(self, msg):
136         """Handler receiving a retract item event."""
137         self.log.debug('Retracted item %s from %s' % (
138             msg['pubsub_event']['items']['retract']['id'],
139             msg['pubsub_event']['items']['node']))
140         self.blocklist.retract_item(msg['pubsub_event']['items']['retract'])
141
142     async def _publish(self, msg):
143         """Handler receiving a publish item event."""
144         self.log.debug('Published item %s to %s:' % (
145               msg['pubsub_event']['items']['item']['id'],
146               msg['pubsub_event']['items']['node']))
147         data = msg['pubsub_event']['items']['item']['payload']
148         if data is not None:
149             self.log.debug(tostring(data))
150         else:
151             self.log.debug('No item content')
152             return
153         self.blocklist.insert_item(msg['pubsub_event']['items']['item'])
154         # Are current participants in the block list
155         for jid in [pres['muc']['jid'] for pres in self.presences.values()]:
156             await self.rtbl_ban(jid)
157
158     async def rtbl_ban(self, jid: JID):
159         """Ban jid in RTBL"""
160         if not self.moderator or not jid.bare:
161             return
162         if self.blocklist is None:
163             self.log.info('Not checking %s, block list not populated yet', jid)
164             return
165         if self.blocklist.check(jid):
166             self.log.debug(f'About to ban {jid}')
167             reason = self.blocklist.get_reason(jid)
168             if reason is not None:
169                 reason = f'rtbl {reason}'
170             await self.ban(jid.bare, reason=reason)
171             self.hits += 1
172             self.log.info(f'{jid} banned!')
173
174     def got_presence(self, pres):
175         """Does bot have required permissions"""
176         if 110 in pres['muc']['status_codes']:
177             if pres['muc']['role'] != 'moderator':
178                 self.log.error(
179                     'Please give the bot moderator permissions. Will only log actions.'
180                 )
181                 self.moderator = False
182                 return
183             else:
184                 self.log.info('Got moderator permissions.')
185                 self.moderator = True
186                 #TODO: purge presences cache sid.MUCBot.muc_presences?
187
188     async def got_online(self, pres):
189         """Handler method for new MUC participants"""
190         fjid = pres['muc']['jid']
191         await self.rtbl_ban(fjid)
192
193     @botcmd(name="rtbl-info")
194     def rtbl_info(self, rcv, _):
195         """Show RTBL info"""
196         if self.blocklist is None:
197             msg = 'Block list not populated yet'
198             self.log.warning(msg)
199             self.reply(rcv, msg)
200             return
201         msg = f'Got {len(self.blocklist)} items in {RTBL.pubsub_server}/{RTBL.node}'
202         if self.hits > 0:
203             msg+=f' (hits {self.hits})'
204         if not self.moderator:
205             msg+='\nBot has no moderator permissions!'
206         self.reply(rcv, msg)
207
208
209 if __name__ == '__main__':
210     from .cli.rtbl import main
211     main()
212
213 # VIM MODLINE
214 # vim: ai ts=4 sw=4 sts=4 expandtab