# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2023 kaliko <kaliko@azylum.org>
# SPDX-License-Identifier: GPL-3.0-or-later
-"""A Real Time Block List plugin"""
+"""A Real Time Block List plugin, cf https://xmppbl.org.
+
+>>> from sid.rtbl import RTBL, BL
+>>> RTBL.pubsub_server = 'xmppbl.org'
+>>> # Optional: Node to subcribe, defaults to 'muc_bans_sha256'
+>>> RTBL.node = 'muc_bans_sha256'
+>>> # Optional: Add this JID hash to the list, default is empty, usefull for test
+>>> BL.init = {'1312b8ca593cd074f39ef15cc8442cdf426b21480958836d9ab678ca45ed1312': 'Test!'}
+>>> # Optional: Set plugin log level, default inherit from the bot
+>>> RTBL.log_level = logging.DEBUG
+"""
from hashlib import sha256
from typing import Dict, Optional
from slixmpp.exceptions import XMPPError
from slixmpp.xmlstream import tostring
-from .plugin import Plugin
+from .plugin import Plugin, botcmd
def jid_to_sha256(jid: JID) -> str:
def get_reason(self, jid: JID) -> Optional[str]:
"""Check the presence of the JID in the blocklist"""
jidhash = jid_to_sha256(jid)
- # Raises if item does not exist
return self.sha256_jids[jidhash]
def __len__(self):
('pubsub_retract', self._retract),
('pubsub_publish', self._publish),
(f'muc::{self.bot.room}::presence', self.got_presence),
- (f'muc::{self.bot.room}::got_offline', self.got_offline),
(f'muc::{self.bot.room}::got_online', self.got_online)
]
self.add_handlers()
self.bot = bot
- self.participants = set()
self.moderator = False
self.blocklist: BL = None
+ self.hits = 0
+ self.presences = bot.muc_presences
def _exit(self):
self.rm_handlers()
mess = f'Got {len(self.blocklist)} items in block list'
self.log.info(mess)
# Are current participants in the block list
- for jid in list(self.participants):
+ for jid in [pres['muc']['jid'] for pres in self.presences.values()]:
await self.rtbl_ban(jid)
async def _create(self):
return
self.blocklist.insert_item(msg['pubsub_event']['items']['item'])
# Are current participants in the block list
- for jid in list(self.participants):
+ for jid in [pres['muc']['jid'] for pres in self.presences.values()]:
await self.rtbl_ban(jid)
- async def rtbl_ban(self, jid):
+ async def rtbl_ban(self, jid: JID):
"""Ban jid in RTBL"""
- if not self.moderator:
+ if not self.moderator or not jid.bare:
return
- if not self.blocklist:
- self.log.debug("block list not populated yet")
+ if self.blocklist is None:
+ self.log.info('Not checking %s, block list not populated yet', jid)
return
if self.blocklist.check(jid):
self.log.debug(f'About to ban {jid}')
if reason is not None:
reason = f'rtbl {reason}'
await self.ban(jid.bare, reason=reason)
-
- def got_offline(self, pres):
- """Handler method for leaving MUC participants"""
- fjid = pres['muc']['jid']
- user = fjid if fjid.full else pres['muc']['nick']
- try:
- self.participants.remove(user)
- except KeyError:
- self.log.error('KeyError removing participant: "%s"', user)
- self.log.debug(f'participants: -{user} (len:{len(self.participants)})')
+ self.hits += 1
+ self.log.info(f'{jid} banned!')
def got_presence(self, pres):
"""Does bot have required permissions"""
else:
self.log.info('Got moderator permissions.')
self.moderator = True
+ #TODO: purge presences cache sid.MUCBot.muc_presences?
async def got_online(self, pres):
"""Handler method for new MUC participants"""
fjid = pres['muc']['jid']
- user = fjid if fjid.full else pres['muc']['nick']
- self.participants.add(user)
- self.log.debug(f'participants: +{user} (len:{len(self.participants)})')
- await self.rtbl_ban(user)
+ await self.rtbl_ban(fjid)
+
+ @botcmd(name="rtbl-info")
+ def rtbl_info(self, rcv, _):
+ """Show RTBL info"""
+ if self.blocklist is None:
+ msg = 'Block list not populated yet'
+ self.log.warning(msg)
+ self.reply(rcv, msg)
+ return
+ msg = f'Got {len(self.blocklist)} items in {RTBL.pubsub_server}/{RTBL.node}'
+ if self.hits > 0:
+ msg+=f' (hits {self.hits})'
+ if not self.moderator:
+ msg+='\nBot has no moderator permissions!'
+ self.reply(rcv, msg)
+
+
+if __name__ == '__main__':
+ from .cli.rtbl import main
+ main()
# VIM MODLINE
# vim: ai ts=4 sw=4 sts=4 expandtab