1 # -*- coding: utf-8 -*-
2 # SPDX-FileCopyrightText: 2023 kaliko <kaliko@azylum.org>
3 # SPDX-License-Identifier: GPL-3.0-or-later
4 """A Real Time Block List plugin, cf https://xmppbl.org.
6 >>> from sid.rtbl import RTBL, BL
7 >>> RTBL.pubsub_server = 'xmppbl.org'
8 >>> # Optional: Node to subcribe, defaults to 'muc_bans_sha256'
9 >>> RTBL.node = 'muc_bans_sha256'
10 >>> # Optional: Add this JID hash to the list, default is empty, usefull for test
11 >>> BL.init = {'1312b8ca593cd074f39ef15cc8442cdf426b21480958836d9ab678ca45ed1312': 'Test!'}
12 >>> # Optional: Set plugin log level, default inherit from the bot
13 >>> RTBL.log_level = logging.DEBUG
16 from hashlib import sha256
17 from typing import Dict, Optional
19 from slixmpp import JID
20 from slixmpp.exceptions import XMPPError
21 from slixmpp.xmlstream import tostring
23 from .plugin import Plugin, botcmd
26 def jid_to_sha256(jid: JID) -> str:
27 """Convert Bare JID to sha256 hexdigest"""
28 return sha256(jid.bare.encode('utf-8')).hexdigest()
32 """Plain object to keep track of block list items.
33 Only used in RTBL plugin."""
34 #: Initial seed to ease testing
37 def __init__(self, initial_bl):
38 self.sha256_jids: Dict[str, Optional[str]] = dict(BL.init)
39 for item in initial_bl:
40 self.insert_item(item)
42 def check(self, jid: JID) -> bool:
43 """Check the presence of the JID in the blocklist"""
44 jidhash = jid_to_sha256(jid)
45 return jidhash in self.sha256_jids
47 def retract_item(self, item):
49 self.sha256_jids.pop(item[id], None)
51 def insert_item(self, item):
54 for i in item['payload']:
58 except AttributeError:
60 self.sha256_jids[item['id']] = text
62 def get_reason(self, jid: JID) -> Optional[str]:
63 """Check the presence of the JID in the blocklist"""
64 jidhash = jid_to_sha256(jid)
65 return self.sha256_jids[jidhash]
68 """Implement the built-in function len(), use for boolean evaluation"""
69 return len(self.sha256_jids)
73 """Spam guard plugin for MUC.
76 pubsub_server = 'example.org'
77 #: Pubsub server node to subscribe to
78 node = 'muc_bans_sha256'
80 def __init__(self, bot):
81 Plugin.__init__(self, bot)
82 bot.register_plugin('xep_0059') # Result Set Management
83 bot.register_plugin('xep_0060') # Publish-Subscribe
85 ('session_start', self._subscribe),
86 ('pubsub_retract', self._retract),
87 ('pubsub_publish', self._publish),
88 (f'muc::{self.bot.room}::presence', self.got_presence),
89 (f'muc::{self.bot.room}::got_online', self.got_online)
93 self.moderator = False
94 self.blocklist: BL = None
96 self.presences = bot.muc_presences
100 self.bot.unregister_bot_plugin(self)
102 async def _subscribe(self, *args):
104 nodes = await self.bot['xep_0060'].get_nodes(self.pubsub_server)
105 nodes_av = [_.get('node') for _ in nodes['disco_items']]
106 self.log.debug(f'nodes available: {nodes_av}')
107 if self.node not in nodes_av:
108 self.log.error(f'{self.node} node not available on {self.pubsub_server}')
110 iq = await self.bot['xep_0060'].subscribe(self.pubsub_server, self.node)
111 subscription = iq['pubsub']['subscription']
112 self.log.info('Subscribed %s to node %s', subscription['jid'], subscription['node'])
113 except XMPPError as error:
114 self.log.error('Could not subscribe %s to node %s: %s',
115 self.bot.boundjid.full, self.node, error.format())
118 node_blocklist = await self.bot['xep_0060'].get_items(self.pubsub_server, self.node)
119 self.blocklist = BL(node_blocklist['pubsub']['items'])
120 mess = f'Got {len(self.blocklist)} items in block list'
122 # Are current participants in the block list
123 for jid in [pres['muc']['jid'] for pres in self.presences.values()]:
124 await self.rtbl_ban(jid)
126 async def _create(self):
127 """Try to create node"""
129 await self.bot['xep_0060'].create_node(self.pubsub_server, self.node)
130 self.log.info('Created node %s', self.node)
131 except XMPPError as err:
132 self.log.error('Could not create node %s: %s', self.node, err.format())
133 raise XMPPError(f'Could not create node {self.node}') from err
135 def _retract(self, msg):
136 """Handler receiving a retract item event."""
137 self.log.debug('Retracted item %s from %s' % (
138 msg['pubsub_event']['items']['retract']['id'],
139 msg['pubsub_event']['items']['node']))
140 self.blocklist.retract_item(msg['pubsub_event']['items']['retract'])
142 async def _publish(self, msg):
143 """Handler receiving a publish item event."""
144 self.log.debug('Published item %s to %s:' % (
145 msg['pubsub_event']['items']['item']['id'],
146 msg['pubsub_event']['items']['node']))
147 data = msg['pubsub_event']['items']['item']['payload']
149 self.log.debug(tostring(data))
151 self.log.debug('No item content')
153 self.blocklist.insert_item(msg['pubsub_event']['items']['item'])
154 # Are current participants in the block list
155 for jid in [pres['muc']['jid'] for pres in self.presences.values()]:
156 await self.rtbl_ban(jid)
158 async def rtbl_ban(self, jid: JID):
159 """Ban jid in RTBL"""
160 if not self.moderator or not jid.bare:
162 if self.blocklist is None:
163 self.log.info('Not checking %s, block list not populated yet', jid)
165 if self.blocklist.check(jid):
166 self.log.debug(f'About to ban {jid}')
167 reason = self.blocklist.get_reason(jid)
168 if reason is not None:
169 reason = f'rtbl {reason}'
170 await self.ban(jid.bare, reason=reason)
172 self.log.info(f'{jid} banned!')
174 def got_presence(self, pres):
175 """Does bot have required permissions"""
176 if 110 in pres['muc']['status_codes']:
177 if pres['muc']['role'] != 'moderator':
179 'Please give the bot moderator permissions. Will only log actions.'
181 self.moderator = False
184 self.log.info('Got moderator permissions.')
185 self.moderator = True
186 #TODO: purge presences cache sid.MUCBot.muc_presences?
188 async def got_online(self, pres):
189 """Handler method for new MUC participants"""
190 fjid = pres['muc']['jid']
191 await self.rtbl_ban(fjid)
193 @botcmd(name="rtbl-info")
194 def rtbl_info(self, rcv, _):
196 if self.blocklist is None:
197 msg = 'Block list not populated yet'
198 self.log.warning(msg)
201 msg = f'Got {len(self.blocklist)} items in {RTBL.pubsub_server}/{RTBL.node}'
203 msg+=f' (hits {self.hits})'
204 if not self.moderator:
205 msg+='\nBot has no moderator permissions!'
209 if __name__ == '__main__':
210 from .cli.rtbl import main
214 # vim: ai ts=4 sw=4 sts=4 expandtab