]> kaliko git repositories - sid.git/blobdiff - sid/rtbl.py
doc: Improve and update
[sid.git] / sid / rtbl.py
index bb4427e049989fad5af2f9f3be3d67b946a8d8b1..1539b28a1273be86420c38b9df6d99c4dfc0f8c9 100644 (file)
@@ -1,7 +1,17 @@
 # -*- coding: utf-8 -*-
 # SPDX-FileCopyrightText: 2023 kaliko <kaliko@azylum.org>
 # SPDX-License-Identifier: GPL-3.0-or-later
-"""A Real Time Block List plugin"""
+"""A Real Time Block List plugin, cf https://xmppbl.org.
+
+>>> from sid.rtbl import RTBL, BL
+>>> RTBL.pubsub_server = 'xmppbl.org'
+>>> # Optional: Node to subcribe, defaults to 'muc_bans_sha256'
+>>> RTBL.node = 'muc_bans_sha256'
+>>> # Optional: Add this JID hash to the list, default is empty, usefull for test
+>>> BL.init = {'1312b8ca593cd074f39ef15cc8442cdf426b21480958836d9ab678ca45ed1312': 'Test!'}
+>>> # Optional: Set plugin log level, default inherit from the bot
+>>> RTBL.log_level = logging.DEBUG
+"""
 
 from hashlib import sha256
 from typing import Dict, Optional
@@ -10,7 +20,7 @@ from slixmpp import JID
 from slixmpp.exceptions import XMPPError
 from slixmpp.xmlstream import tostring
 
-from .plugin import Plugin
+from .plugin import Plugin, botcmd
 
 
 def jid_to_sha256(jid: JID) -> str:
@@ -52,7 +62,6 @@ class BL:
     def get_reason(self, jid: JID) -> Optional[str]:
         """Check the presence of the JID in the blocklist"""
         jidhash = jid_to_sha256(jid)
-        # Raises if item does not exist
         return self.sha256_jids[jidhash]
 
     def __len__(self):
@@ -77,14 +86,14 @@ class RTBL(Plugin):
                 ('pubsub_retract', self._retract),
                 ('pubsub_publish', self._publish),
                 (f'muc::{self.bot.room}::presence', self.got_presence),
-                (f'muc::{self.bot.room}::got_offline', self.got_offline),
                 (f'muc::{self.bot.room}::got_online', self.got_online)
         ]
         self.add_handlers()
         self.bot = bot
-        self.participants = set()
         self.moderator = False
         self.blocklist: BL = None
+        self.hits = 0
+        self.presences = bot.muc_presences
 
     def _exit(self):
         self.rm_handlers()
@@ -111,7 +120,7 @@ class RTBL(Plugin):
         mess = f'Got {len(self.blocklist)} items in block list'
         self.log.info(mess)
         # Are current participants in the block list
-        for jid in list(self.participants):
+        for jid in [pres['muc']['jid'] for pres in self.presences.values()]:
             await self.rtbl_ban(jid)
 
     async def _create(self):
@@ -143,15 +152,15 @@ class RTBL(Plugin):
             return
         self.blocklist.insert_item(msg['pubsub_event']['items']['item'])
         # Are current participants in the block list
-        for jid in list(self.participants):
+        for jid in [pres['muc']['jid'] for pres in self.presences.values()]:
             await self.rtbl_ban(jid)
 
-    async def rtbl_ban(self, jid):
+    async def rtbl_ban(self, jid: JID):
         """Ban jid in RTBL"""
-        if not self.moderator:
+        if not self.moderator or not jid.bare:
             return
-        if not self.blocklist:
-            self.log.debug("block list not populated yet")
+        if self.blocklist is None:
+            self.log.info('Not checking %s, block list not populated yet', jid)
             return
         if self.blocklist.check(jid):
             self.log.debug(f'About to ban {jid}')
@@ -159,16 +168,8 @@ class RTBL(Plugin):
             if reason is not None:
                 reason = f'rtbl {reason}'
             await self.ban(jid.bare, reason=reason)
-
-    def got_offline(self, pres):
-        """Handler method for leaving MUC participants"""
-        fjid = pres['muc']['jid']
-        user = fjid if fjid.full else pres['muc']['nick']
-        try:
-            self.participants.remove(user)
-        except KeyError:
-            self.log.error('KeyError removing participant: "%s"', user)
-        self.log.debug(f'participants: -{user} (len:{len(self.participants)})')
+            self.hits += 1
+            self.log.info(f'{jid} banned!')
 
     def got_presence(self, pres):
         """Does bot have required permissions"""
@@ -182,14 +183,27 @@ class RTBL(Plugin):
             else:
                 self.log.info('Got moderator permissions.')
                 self.moderator = True
+                #TODO: purge presences cache sid.MUCBot.muc_presences?
 
     async def got_online(self, pres):
         """Handler method for new MUC participants"""
         fjid = pres['muc']['jid']
-        user = fjid if fjid.full else pres['muc']['nick']
-        self.participants.add(user)
-        self.log.debug(f'participants: +{user} (len:{len(self.participants)})')
-        await self.rtbl_ban(user)
+        await self.rtbl_ban(fjid)
+
+    @botcmd(name="rtbl-info")
+    def rtbl_info(self, rcv, _):
+        """Show RTBL info"""
+        if self.blocklist is None:
+            msg = 'Block list not populated yet'
+            self.log.warning(msg)
+            self.reply(rcv, msg)
+            return
+        msg = f'Got {len(self.blocklist)} items in {RTBL.pubsub_server}/{RTBL.node}'
+        if self.hits > 0:
+            msg+=f' (hits {self.hits})'
+        if not self.moderator:
+            msg+='\nBot has no moderator permissions!'
+        self.reply(rcv, msg)
 
 
 if __name__ == '__main__':