X-Git-Url: https://git.kaliko.me/?a=blobdiff_plain;f=sid%2Frtbl.py;h=1539b28a1273be86420c38b9df6d99c4dfc0f8c9;hb=b32d7a8cb7b1bd5cd16e461d14ed94891b93014f;hp=0ea3bbbf36a8be446305595d36eeac78501e5795;hpb=da0f73b32be59d4ee9284aebeb8e0dba7b6f6f47;p=sid.git diff --git a/sid/rtbl.py b/sid/rtbl.py index 0ea3bbb..1539b28 100644 --- a/sid/rtbl.py +++ b/sid/rtbl.py @@ -1,7 +1,17 @@ # -*- coding: utf-8 -*- # SPDX-FileCopyrightText: 2023 kaliko # SPDX-License-Identifier: GPL-3.0-or-later -"""A Real Time Block List plugin""" +"""A Real Time Block List plugin, cf https://xmppbl.org. + +>>> from sid.rtbl import RTBL, BL +>>> RTBL.pubsub_server = 'xmppbl.org' +>>> # Optional: Node to subcribe, defaults to 'muc_bans_sha256' +>>> RTBL.node = 'muc_bans_sha256' +>>> # Optional: Add this JID hash to the list, default is empty, usefull for test +>>> BL.init = {'1312b8ca593cd074f39ef15cc8442cdf426b21480958836d9ab678ca45ed1312': 'Test!'} +>>> # Optional: Set plugin log level, default inherit from the bot +>>> RTBL.log_level = logging.DEBUG +""" from hashlib import sha256 from typing import Dict, Optional @@ -10,7 +20,7 @@ from slixmpp import JID from slixmpp.exceptions import XMPPError from slixmpp.xmlstream import tostring -from .plugin import Plugin +from .plugin import Plugin, botcmd def jid_to_sha256(jid: JID) -> str: @@ -21,9 +31,11 @@ def jid_to_sha256(jid: JID) -> str: class BL: """Plain object to keep track of block list items. Only used in RTBL plugin.""" + #: Initial seed to ease testing + init = {} def __init__(self, initial_bl): - self.sha256_jids: Dict[str, Optional[str]] = {} + self.sha256_jids: Dict[str, Optional[str]] = dict(BL.init) for item in initial_bl: self.insert_item(item) @@ -50,7 +62,6 @@ class BL: def get_reason(self, jid: JID) -> Optional[str]: """Check the presence of the JID in the blocklist""" jidhash = jid_to_sha256(jid) - # Raises if item does not exist return self.sha256_jids[jidhash] def __len__(self): @@ -75,14 +86,14 @@ class RTBL(Plugin): ('pubsub_retract', self._retract), ('pubsub_publish', self._publish), (f'muc::{self.bot.room}::presence', self.got_presence), - (f'muc::{self.bot.room}::got_offline', self.got_offline), (f'muc::{self.bot.room}::got_online', self.got_online) ] self.add_handlers() self.bot = bot - self.participants = set() self.moderator = False self.blocklist: BL = None + self.hits = 0 + self.presences = bot.muc_presences def _exit(self): self.rm_handlers() @@ -109,7 +120,7 @@ class RTBL(Plugin): mess = f'Got {len(self.blocklist)} items in block list' self.log.info(mess) # Are current participants in the block list - for jid in list(self.participants): + for jid in [pres['muc']['jid'] for pres in self.presences.values()]: await self.rtbl_ban(jid) async def _create(self): @@ -141,15 +152,15 @@ class RTBL(Plugin): return self.blocklist.insert_item(msg['pubsub_event']['items']['item']) # Are current participants in the block list - for jid in list(self.participants): + for jid in [pres['muc']['jid'] for pres in self.presences.values()]: await self.rtbl_ban(jid) - async def rtbl_ban(self, jid): + async def rtbl_ban(self, jid: JID): """Ban jid in RTBL""" - if not self.moderator: + if not self.moderator or not jid.bare: return - if not self.blocklist: - self.log.debug("block list not populated yet") + if self.blocklist is None: + self.log.info('Not checking %s, block list not populated yet', jid) return if self.blocklist.check(jid): self.log.debug(f'About to ban {jid}') @@ -157,16 +168,8 @@ class RTBL(Plugin): if reason is not None: reason = f'rtbl {reason}' await self.ban(jid.bare, reason=reason) - - def got_offline(self, pres): - """Handler method for leaving MUC participants""" - fjid = pres['muc']['jid'] - user = fjid if fjid.full else pres['muc']['nick'] - try: - self.participants.remove(user) - except KeyError: - self.log.error('KeyError removing participant: "%s"', user) - self.log.debug(f'participants: -{user} (len:{len(self.participants)})') + self.hits += 1 + self.log.info(f'{jid} banned!') def got_presence(self, pres): """Does bot have required permissions""" @@ -180,14 +183,32 @@ class RTBL(Plugin): else: self.log.info('Got moderator permissions.') self.moderator = True + #TODO: purge presences cache sid.MUCBot.muc_presences? async def got_online(self, pres): """Handler method for new MUC participants""" fjid = pres['muc']['jid'] - user = fjid if fjid.full else pres['muc']['nick'] - self.participants.add(user) - self.log.debug(f'participants: +{user} (len:{len(self.participants)})') - await self.rtbl_ban(user) + await self.rtbl_ban(fjid) + + @botcmd(name="rtbl-info") + def rtbl_info(self, rcv, _): + """Show RTBL info""" + if self.blocklist is None: + msg = 'Block list not populated yet' + self.log.warning(msg) + self.reply(rcv, msg) + return + msg = f'Got {len(self.blocklist)} items in {RTBL.pubsub_server}/{RTBL.node}' + if self.hits > 0: + msg+=f' (hits {self.hits})' + if not self.moderator: + msg+='\nBot has no moderator permissions!' + self.reply(rcv, msg) + + +if __name__ == '__main__': + from .cli.rtbl import main + main() # VIM MODLINE # vim: ai ts=4 sw=4 sts=4 expandtab