# SPDX-FileCopyrightText: 2014, 2020, 2023 kaliko <kaliko@azylum.org>
# SPDX-FileCopyrightText: 2007-2012 Thomas Perl <thp.io/about>
# SPDX-License-Identifier: GPL-3.0-or-later
+"""A Real Time Block List plugin"""
from hashlib import sha256
+from typing import Dict, Optional
+from slixmpp import JID
from slixmpp.exceptions import XMPPError
from slixmpp.xmlstream import tostring
from .plugin import Plugin
+def jid_to_sha256(jid: JID) -> str:
+ """Convert Bare JID to sha256 hexdigest"""
+ return sha256(jid.bare.encode('utf-8')).hexdigest()
+
+
+class BL:
+ """Plain object to keep track of block list items"""
+
+ def __init__(self, initial_bl):
+ self.sha256_jids: Dict[str, Optional[str]] = {}
+ for item in initial_bl:
+ self.insert_item(item)
+
+ def check(self, jid: JID) -> bool:
+ """Check the presence of the JID in the blocklist"""
+ jidhash = jid_to_sha256(jid)
+ return jidhash in self.sha256_jids
+
+ def retract_item(self, item):
+ """remove bl item"""
+ self.sha256_jids.pop(item[id], None)
+
+ def insert_item(self, item):
+ """insert bl item"""
+ text = None
+ for i in item['payload']:
+ try:
+ text = i.text
+ break
+ except AttributeError:
+ continue
+ self.sha256_jids[item['id']] = text
+
+ def get_reason(self, jid: JID) -> Optional[str]:
+ """Check the presence of the JID in the blocklist"""
+ jidhash = jid_to_sha256(jid)
+ # Raises if item does not exist
+ return self.sha256_jids[jidhash]
+
+ def __len__(self):
+ """Implement the built-in function len(), use for boolean evaluation"""
+ return len(self.sha256_jids)
+
+
class RTBL(Plugin):
"""Spam guard for MUC
"""
def __init__(self, bot):
Plugin.__init__(self, bot)
- bot.register_plugin('xep_0059') # Mediated Information eXchange (MIX)
+ bot.register_plugin('xep_0059') # Result Set Management
bot.register_plugin('xep_0060') # Publish-Subscribe
self.handlers = [
('session_start', self._subscribe),
('pubsub_retract', self._retract),
- ('pubsub_publish', self._publish)
+ ('pubsub_publish', self._publish),
+ (f'muc::{self.bot.room}::presence', self.got_presence),
+ (f'muc::{self.bot.room}::got_offline', self.got_offline),
+ (f'muc::{self.bot.room}::got_online', self.got_online)
]
self.add_handlers()
- self.blocklist = None
self.bot = bot
+ self.participants = set()
+ self.moderator = False
+ self.blocklist: BL = None
def _exit(self):
self.rm_handlers()
self.bot.unregister_bot_plugin(self)
- async def _subscribe(self, *args, **kwargs):
+ async def _subscribe(self, *args):
try:
nodes = await self.bot['xep_0060'].get_nodes(self.pubsub_server)
nodes_av = [_.get('node') for _ in nodes['disco_items']]
self.log.info('Subscribed %s to node %s', subscription['jid'], subscription['node'])
except XMPPError as error:
self.log.error('Could not subscribe %s to node %s: %s',
- self.bot.boundjid.bare, self.node, error.format())
+ self.bot.boundjid.full, self.node, error.format())
self._exit()
return
- self.blocklist = await self.bot['xep_0060'].get_items(self.pubsub_server, self.node)
- message = f'Got {len(self.blocklist["pubsub"]["items"])} items in block list'
- self.log.info(message)
- # Add got_online handler once the blocklist is set
- self.bot.add_event_handler(f'muc::{self.bot.room}::got_online', self.got_online)
+ node_blocklist = await self.bot['xep_0060'].get_items(self.pubsub_server, self.node)
+ self.blocklist = BL(node_blocklist['pubsub']['items'])
+ mess = f'Got {len(self.blocklist)} items in block list'
+ self.log.info(mess)
async def _create(self):
"""Try to create node"""
self.log.debug('Retracted item %s from %s' % (
msg['pubsub_event']['items']['retract']['id'],
msg['pubsub_event']['items']['node']))
+ self.blocklist.retract_item(msg['pubsub_event']['items']['retract']['id'])
async def _publish(self, msg):
"""Handler receiving a publish item event."""
self.log.debug(tostring(data))
else:
self.log.debug('No item content')
- self.blocklist = await self.bot['xep_0060'].get_items(self.pubsub_server, self.node)
+ return
+ self.blocklist.insert_item(msg['pubsub_event']['items']['item']['id'])
+ # Are current participants in the block list
+ for jid in self.participants:
+ self.rtbl_ban(jid)
+
+ async def rtbl_ban(self, jid):
+ """Ban jid in RTBL"""
+ if not self.moderator:
+ return
+ if not self.blocklist:
+ self.log.debug("block list not populated yet")
+ return
+ if self.blocklist.check(jid):
+ self.log.debug(f'About to ban {jid}')
+ reason = self.blocklist.get_reason(jid)
+ if reason is not None:
+ reason = f'rtbl {reason}'
+ await self.ban(jid.bare, reason=reason)
+
+ def got_offline(self, pres):
+ """Handler method for laving MUC participants"""
+ fjid = pres['muc']['jid']
+ user = fjid if fjid.full else pres['muc']['nick']
+ try:
+ self.participants.remove(user)
+ except KeyError:
+ self.log.error('KeyError removing participant: "%s"', user)
+ self.log.debug(f'participants: -{user} (len:{len(self.participants)})')
+
+ def got_presence(self, pres):
+ """Does bot have required permissions"""
+ if 110 in pres['muc']['status_codes']:
+ if pres['muc']['role'] != 'moderator':
+ self.log.error(
+ 'Please give the bot moderator permissions. Will only log actions.'
+ )
+ self.moderator = False
+ return
+ else:
+ self.log.info('Got moderator permissions.')
+ self.moderator = True
async def got_online(self, pres):
"""Handler method for new MUC participants"""
- bjid = pres['muc']['jid'].bare
- bjid_hash = sha256(bjid.encode('utf-8')).hexdigest()
- if bjid_hash in [_['id'] for _ in self.blocklist['pubsub']['items']]:
- self.log.debug(f'About to ban {bjid}')
- await self.ban(bjid, reason='rtbl <timestamp>')
+ fjid = pres['muc']['jid']
+ user = fjid if fjid.full else pres['muc']['nick']
+ self.participants.add(user)
+ self.log.debug(f'participants: +{user} (len:{len(self.participants)})')
+ await self.rtbl_ban(user)
# VIM MODLINE
# vim: ai ts=4 sw=4 sts=4 expandtab