1 # -*- coding: utf-8 -*-
2 # SPDX-FileCopyrightText: 2023 kaliko <kaliko@azylum.org>
3 # SPDX-License-Identifier: GPL-3.0-or-later
4 """A Real Time Block List plugin"""
6 from hashlib import sha256
7 from typing import Dict, Optional
9 from slixmpp import JID
10 from slixmpp.exceptions import XMPPError
11 from slixmpp.xmlstream import tostring
13 from .plugin import Plugin, botcmd
16 def jid_to_sha256(jid: JID) -> str:
17 """Convert Bare JID to sha256 hexdigest"""
18 return sha256(jid.bare.encode('utf-8')).hexdigest()
22 """Plain object to keep track of block list items.
23 Only used in RTBL plugin."""
24 #: Initial seed to ease testing
27 def __init__(self, initial_bl):
28 self.sha256_jids: Dict[str, Optional[str]] = dict(BL.init)
29 for item in initial_bl:
30 self.insert_item(item)
32 def check(self, jid: JID) -> bool:
33 """Check the presence of the JID in the blocklist"""
34 jidhash = jid_to_sha256(jid)
35 return jidhash in self.sha256_jids
37 def retract_item(self, item):
39 self.sha256_jids.pop(item[id], None)
41 def insert_item(self, item):
44 for i in item['payload']:
48 except AttributeError:
50 self.sha256_jids[item['id']] = text
52 def get_reason(self, jid: JID) -> Optional[str]:
53 """Check the presence of the JID in the blocklist"""
54 jidhash = jid_to_sha256(jid)
55 # Raises if item does not exist
56 return self.sha256_jids[jidhash]
59 """Implement the built-in function len(), use for boolean evaluation"""
60 return len(self.sha256_jids)
64 """Spam guard plugin for MUC.
67 pubsub_server = 'example.org'
68 #: Pubsub server node to subscribe to
69 node = 'muc_bans_sha256'
71 def __init__(self, bot):
72 Plugin.__init__(self, bot)
73 bot.register_plugin('xep_0059') # Result Set Management
74 bot.register_plugin('xep_0060') # Publish-Subscribe
76 ('session_start', self._subscribe),
77 ('pubsub_retract', self._retract),
78 ('pubsub_publish', self._publish),
79 (f'muc::{self.bot.room}::presence', self.got_presence),
80 (f'muc::{self.bot.room}::got_offline', self.got_offline),
81 (f'muc::{self.bot.room}::got_online', self.got_online)
85 self.participants = set()
86 self.moderator = False
87 self.blocklist: BL = None
93 self.bot.unregister_bot_plugin(self)
95 async def _subscribe(self, *args):
97 nodes = await self.bot['xep_0060'].get_nodes(self.pubsub_server)
98 nodes_av = [_.get('node') for _ in nodes['disco_items']]
99 self.log.debug(f'nodes available: {nodes_av}')
100 if self.node not in nodes_av:
101 self.log.error(f'{self.node} node not available on {self.pubsub_server}')
103 iq = await self.bot['xep_0060'].subscribe(self.pubsub_server, self.node)
104 subscription = iq['pubsub']['subscription']
105 self.log.info('Subscribed %s to node %s', subscription['jid'], subscription['node'])
106 except XMPPError as error:
107 self.log.error('Could not subscribe %s to node %s: %s',
108 self.bot.boundjid.full, self.node, error.format())
111 node_blocklist = await self.bot['xep_0060'].get_items(self.pubsub_server, self.node)
112 self.blocklist = BL(node_blocklist['pubsub']['items'])
113 mess = f'Got {len(self.blocklist)} items in block list'
115 # Are current participants in the block list
116 for jid in list(self.participants):
117 await self.rtbl_ban(jid)
119 async def _create(self):
120 """Try to create node"""
122 await self.bot['xep_0060'].create_node(self.pubsub_server, self.node)
123 self.log.info('Created node %s', self.node)
124 except XMPPError as err:
125 self.log.error('Could not create node %s: %s', self.node, err.format())
126 raise XMPPError(f'Could not create node {self.node}') from err
128 def _retract(self, msg):
129 """Handler receiving a retract item event."""
130 self.log.debug('Retracted item %s from %s' % (
131 msg['pubsub_event']['items']['retract']['id'],
132 msg['pubsub_event']['items']['node']))
133 self.blocklist.retract_item(msg['pubsub_event']['items']['retract'])
135 async def _publish(self, msg):
136 """Handler receiving a publish item event."""
137 self.log.debug('Published item %s to %s:' % (
138 msg['pubsub_event']['items']['item']['id'],
139 msg['pubsub_event']['items']['node']))
140 data = msg['pubsub_event']['items']['item']['payload']
142 self.log.debug(tostring(data))
144 self.log.debug('No item content')
146 self.blocklist.insert_item(msg['pubsub_event']['items']['item'])
147 # Are current participants in the block list
148 for jid in list(self.participants):
149 await self.rtbl_ban(jid)
151 async def rtbl_ban(self, jid):
152 """Ban jid in RTBL"""
153 if not self.moderator:
155 if self.blocklist is None:
156 self.log.info('Not checking %s, block list not populated yet', jid)
158 if self.blocklist.check(jid):
159 self.log.debug(f'About to ban {jid}')
160 reason = self.blocklist.get_reason(jid)
161 if reason is not None:
162 reason = f'rtbl {reason}'
163 await self.ban(jid.bare, reason=reason)
165 self.log.info(f'{jid} banned!')
167 def got_offline(self, pres):
168 """Handler method for leaving MUC participants"""
169 fjid = pres['muc']['jid']
170 user = fjid if fjid.full else pres['muc']['nick']
172 self.participants.remove(user)
173 self.presences.pop(pres['muc']['nick'])
175 self.log.error('KeyError removing participant: "%s"', user)
176 self.log.debug(f'participants: - {user} got offline (len:{len(self.participants)})')
178 def got_presence(self, pres):
179 """Does bot have required permissions"""
180 if 110 in pres['muc']['status_codes']:
181 if pres['muc']['role'] != 'moderator':
183 'Please give the bot moderator permissions. Will only log actions.'
185 self.moderator = False
188 self.log.info('Got moderator permissions.')
189 self.moderator = True
190 nick = pres['muc']['nick']
191 fjid = pres['muc']['jid']
192 role = pres['muc']['role']
193 affi = pres['muc']['affiliation']
194 user = fjid if fjid.full else nick
195 self.log.debug(f'participants: u {user}:{role}/{affi} (len:{len(self.participants)})')
196 self.presences.update({nick: pres})
198 async def got_online(self, pres):
199 """Handler method for new MUC participants"""
200 fjid = pres['muc']['jid']
201 nick = pres['muc']['nick']
202 role = pres['muc']['role']
203 affi = pres['muc']['affiliation']
204 user = fjid if fjid.full else nick
205 self.participants.add(user)
206 self.presences.update({nick: pres})
207 self.log.debug(f'participants: + {user}:{role}/{affi} (len:{len(self.participants)})')
208 await self.rtbl_ban(user)
210 @botcmd(name="rtbl-info")
211 def rtbl_info(self, rcv, _):
213 if self.blocklist is None:
214 msg = 'Block list not populated yet'
215 self.log.warning(msg)
218 msg = f'Got {len(self.blocklist)} items in {RTBL.pubsub_server}/{RTBL.node}'
220 msg+=f' (hits {self.hits})'
221 if not self.moderator:
222 msg+='\nBot has no moderator permissions!'
226 if __name__ == '__main__':
227 from .cli.rtbl import main
231 # vim: ai ts=4 sw=4 sts=4 expandtab