X-Git-Url: https://git.kaliko.me/?a=blobdiff_plain;f=sid%2Frtbl.py;h=1539b28a1273be86420c38b9df6d99c4dfc0f8c9;hb=b32d7a8cb7b1bd5cd16e461d14ed94891b93014f;hp=d701aa52be4c31bd90794972c6abf1899d7b698a;hpb=e1ff749af5e6fa7f31308d685f3a67b487f8f170;p=sid.git diff --git a/sid/rtbl.py b/sid/rtbl.py index d701aa5..1539b28 100644 --- a/sid/rtbl.py +++ b/sid/rtbl.py @@ -1,40 +1,105 @@ # -*- coding: utf-8 -*- -# SPDX-FileCopyrightText: 2014, 2020, 2023 kaliko -# SPDX-FileCopyrightText: 2007-2012 Thomas Perl +# SPDX-FileCopyrightText: 2023 kaliko # SPDX-License-Identifier: GPL-3.0-or-later +"""A Real Time Block List plugin, cf https://xmppbl.org. + +>>> from sid.rtbl import RTBL, BL +>>> RTBL.pubsub_server = 'xmppbl.org' +>>> # Optional: Node to subcribe, defaults to 'muc_bans_sha256' +>>> RTBL.node = 'muc_bans_sha256' +>>> # Optional: Add this JID hash to the list, default is empty, usefull for test +>>> BL.init = {'1312b8ca593cd074f39ef15cc8442cdf426b21480958836d9ab678ca45ed1312': 'Test!'} +>>> # Optional: Set plugin log level, default inherit from the bot +>>> RTBL.log_level = logging.DEBUG +""" from hashlib import sha256 +from typing import Dict, Optional +from slixmpp import JID from slixmpp.exceptions import XMPPError from slixmpp.xmlstream import tostring -from .plugin import Plugin +from .plugin import Plugin, botcmd + + +def jid_to_sha256(jid: JID) -> str: + """Convert Bare JID to sha256 hexdigest""" + return sha256(jid.bare.encode('utf-8')).hexdigest() + + +class BL: + """Plain object to keep track of block list items. + Only used in RTBL plugin.""" + #: Initial seed to ease testing + init = {} + + def __init__(self, initial_bl): + self.sha256_jids: Dict[str, Optional[str]] = dict(BL.init) + for item in initial_bl: + self.insert_item(item) + + def check(self, jid: JID) -> bool: + """Check the presence of the JID in the blocklist""" + jidhash = jid_to_sha256(jid) + return jidhash in self.sha256_jids + + def retract_item(self, item): + """remove bl item""" + self.sha256_jids.pop(item[id], None) + + def insert_item(self, item): + """insert bl item""" + text = None + for i in item['payload']: + try: + text = i.text + break + except AttributeError: + continue + self.sha256_jids[item['id']] = text + + def get_reason(self, jid: JID) -> Optional[str]: + """Check the presence of the JID in the blocklist""" + jidhash = jid_to_sha256(jid) + return self.sha256_jids[jidhash] + + def __len__(self): + """Implement the built-in function len(), use for boolean evaluation""" + return len(self.sha256_jids) class RTBL(Plugin): - """Spam guard for MUC + """Spam guard plugin for MUC. """ + #: Pubsub server pubsub_server = 'example.org' + #: Pubsub server node to subscribe to node = 'muc_bans_sha256' def __init__(self, bot): Plugin.__init__(self, bot) - bot.register_plugin('xep_0059') # Mediated Information eXchange (MIX) + bot.register_plugin('xep_0059') # Result Set Management bot.register_plugin('xep_0060') # Publish-Subscribe self.handlers = [ ('session_start', self._subscribe), ('pubsub_retract', self._retract), - ('pubsub_publish', self._publish) + ('pubsub_publish', self._publish), + (f'muc::{self.bot.room}::presence', self.got_presence), + (f'muc::{self.bot.room}::got_online', self.got_online) ] self.add_handlers() - self.blocklist = None self.bot = bot + self.moderator = False + self.blocklist: BL = None + self.hits = 0 + self.presences = bot.muc_presences def _exit(self): self.rm_handlers() self.bot.unregister_bot_plugin(self) - async def _subscribe(self, *args, **kwargs): + async def _subscribe(self, *args): try: nodes = await self.bot['xep_0060'].get_nodes(self.pubsub_server) nodes_av = [_.get('node') for _ in nodes['disco_items']] @@ -47,14 +112,16 @@ class RTBL(Plugin): self.log.info('Subscribed %s to node %s', subscription['jid'], subscription['node']) except XMPPError as error: self.log.error('Could not subscribe %s to node %s: %s', - self.bot.boundjid.bare, self.node, error.format()) + self.bot.boundjid.full, self.node, error.format()) self._exit() return - self.blocklist = await self.bot['xep_0060'].get_items(self.pubsub_server, self.node) - message = f'Got {len(self.blocklist["pubsub"]["items"])} items in block list' - self.log.info(message) - # Add got_online handler once the blocklist is set - self.bot.add_event_handler(f'muc::{self.bot.room}::got_online', self.got_online) + node_blocklist = await self.bot['xep_0060'].get_items(self.pubsub_server, self.node) + self.blocklist = BL(node_blocklist['pubsub']['items']) + mess = f'Got {len(self.blocklist)} items in block list' + self.log.info(mess) + # Are current participants in the block list + for jid in [pres['muc']['jid'] for pres in self.presences.values()]: + await self.rtbl_ban(jid) async def _create(self): """Try to create node""" @@ -70,6 +137,7 @@ class RTBL(Plugin): self.log.debug('Retracted item %s from %s' % ( msg['pubsub_event']['items']['retract']['id'], msg['pubsub_event']['items']['node'])) + self.blocklist.retract_item(msg['pubsub_event']['items']['retract']) async def _publish(self, msg): """Handler receiving a publish item event.""" @@ -81,15 +149,66 @@ class RTBL(Plugin): self.log.debug(tostring(data)) else: self.log.debug('No item content') - self.blocklist = await self.bot['xep_0060'].get_items(self.pubsub_server, self.node) + return + self.blocklist.insert_item(msg['pubsub_event']['items']['item']) + # Are current participants in the block list + for jid in [pres['muc']['jid'] for pres in self.presences.values()]: + await self.rtbl_ban(jid) + + async def rtbl_ban(self, jid: JID): + """Ban jid in RTBL""" + if not self.moderator or not jid.bare: + return + if self.blocklist is None: + self.log.info('Not checking %s, block list not populated yet', jid) + return + if self.blocklist.check(jid): + self.log.debug(f'About to ban {jid}') + reason = self.blocklist.get_reason(jid) + if reason is not None: + reason = f'rtbl {reason}' + await self.ban(jid.bare, reason=reason) + self.hits += 1 + self.log.info(f'{jid} banned!') + + def got_presence(self, pres): + """Does bot have required permissions""" + if 110 in pres['muc']['status_codes']: + if pres['muc']['role'] != 'moderator': + self.log.error( + 'Please give the bot moderator permissions. Will only log actions.' + ) + self.moderator = False + return + else: + self.log.info('Got moderator permissions.') + self.moderator = True + #TODO: purge presences cache sid.MUCBot.muc_presences? async def got_online(self, pres): """Handler method for new MUC participants""" - bjid = pres['muc']['jid'].bare - bjid_hash = sha256(bjid.encode('utf-8')).hexdigest() - if bjid_hash in [_['id'] for _ in self.blocklist['pubsub']['items']]: - self.log.debug(f'About to ban {bjid}') - await self.ban(bjid, reason='rtbl ') + fjid = pres['muc']['jid'] + await self.rtbl_ban(fjid) + + @botcmd(name="rtbl-info") + def rtbl_info(self, rcv, _): + """Show RTBL info""" + if self.blocklist is None: + msg = 'Block list not populated yet' + self.log.warning(msg) + self.reply(rcv, msg) + return + msg = f'Got {len(self.blocklist)} items in {RTBL.pubsub_server}/{RTBL.node}' + if self.hits > 0: + msg+=f' (hits {self.hits})' + if not self.moderator: + msg+='\nBot has no moderator permissions!' + self.reply(rcv, msg) + + +if __name__ == '__main__': + from .cli.rtbl import main + main() # VIM MODLINE # vim: ai ts=4 sw=4 sts=4 expandtab