]> kaliko git repositories - sid.git/blob - sid/rtbl.py
rtbl
[sid.git] / sid / rtbl.py
1 # -*- coding: utf-8 -*-
2 # SPDX-FileCopyrightText: 2014, 2020, 2023 kaliko <kaliko@azylum.org>
3 # SPDX-FileCopyrightText: 2007-2012 Thomas Perl <thp.io/about>
4 # SPDX-License-Identifier: GPL-3.0-or-later
5 """A Real Time Block List plugin"""
6
7 from hashlib import sha256
8 from typing import Dict, Optional
9
10 from slixmpp import JID
11 from slixmpp.exceptions import XMPPError
12 from slixmpp.xmlstream import tostring
13
14 from .plugin import Plugin
15
16
17 def jid_to_sha256(jid: JID) -> str:
18     """Convert Bare JID to sha256 hexdigest"""
19     return sha256(jid.bare.encode('utf-8')).hexdigest()
20
21
22 class BL:
23     """Plain object to keep track of block list items"""
24
25     def __init__(self, initial_bl):
26         self.sha256_jids: Dict[str, Optional[str]] = {}
27         for item in initial_bl:
28             self.insert_item(item)
29
30     def check(self, jid: JID) -> bool:
31         """Check the presence of the JID in the blocklist"""
32         jidhash = jid_to_sha256(jid)
33         return jidhash in self.sha256_jids
34
35     def retract_item(self, item):
36         """remove bl item"""
37         self.sha256_jids.pop(item[id], None)
38
39     def insert_item(self, item):
40         """insert bl item"""
41         text = None
42         for i in item['payload']:
43             try:
44                 text = i.text
45                 break
46             except AttributeError:
47                 continue
48         self.sha256_jids[item['id']] = text
49
50     def get_reason(self, jid: JID) -> Optional[str]:
51         """Check the presence of the JID in the blocklist"""
52         jidhash = jid_to_sha256(jid)
53         # Raises if item does not exist
54         return self.sha256_jids[jidhash]
55
56     def __len__(self):
57         """Implement the built-in function len(), use for boolean evaluation"""
58         return len(self.sha256_jids)
59
60
61 class RTBL(Plugin):
62     """Spam guard for MUC
63     """
64     pubsub_server = 'example.org'
65     node = 'muc_bans_sha256'
66
67     def __init__(self, bot):
68         Plugin.__init__(self, bot)
69         bot.register_plugin('xep_0059')  # Result Set Management
70         bot.register_plugin('xep_0060')  # Publish-Subscribe
71         self.handlers = [
72                 ('session_start', self._subscribe),
73                 ('pubsub_retract', self._retract),
74                 ('pubsub_publish', self._publish),
75                 (f'muc::{self.bot.room}::presence', self.got_presence),
76                 (f'muc::{self.bot.room}::got_offline', self.got_offline),
77                 (f'muc::{self.bot.room}::got_online', self.got_online)
78         ]
79         self.add_handlers()
80         self.bot = bot
81         self.participants = set()
82         self.moderator = False
83         self.blocklist: BL = None
84
85     def _exit(self):
86         self.rm_handlers()
87         self.bot.unregister_bot_plugin(self)
88
89     async def _subscribe(self, *args):
90         try:
91             nodes = await self.bot['xep_0060'].get_nodes(self.pubsub_server)
92             nodes_av = [_.get('node') for _ in nodes['disco_items']]
93             self.log.debug(f'nodes available: {nodes_av}')
94             if self.node not in nodes_av:
95                 self.log.error(f'{self.node} node not available on {self.pubsub_server}')
96                 await self._create()
97             iq = await self.bot['xep_0060'].subscribe(self.pubsub_server, self.node)
98             subscription = iq['pubsub']['subscription']
99             self.log.info('Subscribed %s to node %s', subscription['jid'], subscription['node'])
100         except XMPPError as error:
101             self.log.error('Could not subscribe %s to node %s: %s',
102                            self.bot.boundjid.full, self.node, error.format())
103             self._exit()
104             return
105         node_blocklist = await self.bot['xep_0060'].get_items(self.pubsub_server, self.node)
106         self.blocklist = BL(node_blocklist['pubsub']['items'])
107         mess = f'Got {len(self.blocklist)} items in block list'
108         self.log.info(mess)
109
110     async def _create(self):
111         """Try to create node"""
112         try:
113             await self.bot['xep_0060'].create_node(self.pubsub_server, self.node)
114             self.log.info('Created node %s', self.node)
115         except XMPPError as err:
116             self.log.error('Could not create node %s: %s', self.node, err.format())
117             raise XMPPError(f'Could not create node {self.node}') from err
118
119     def _retract(self, msg):
120         """Handler receiving a retract item event."""
121         self.log.debug('Retracted item %s from %s' % (
122             msg['pubsub_event']['items']['retract']['id'],
123             msg['pubsub_event']['items']['node']))
124         self.blocklist.retract_item(msg['pubsub_event']['items']['retract']['id'])
125
126     async def _publish(self, msg):
127         """Handler receiving a publish item event."""
128         self.log.debug('Published item %s to %s:' % (
129               msg['pubsub_event']['items']['item']['id'],
130               msg['pubsub_event']['items']['node']))
131         data = msg['pubsub_event']['items']['item']['payload']
132         if data is not None:
133             self.log.debug(tostring(data))
134         else:
135             self.log.debug('No item content')
136             return
137         self.blocklist.insert_item(msg['pubsub_event']['items']['item']['id'])
138         # Are current participants in the block list
139         for jid in self.participants:
140             self.rtbl_ban(jid)
141
142     async def rtbl_ban(self, jid):
143         """Ban jid in RTBL"""
144         if not self.moderator:
145             return
146         if not self.blocklist:
147             self.log.debug("block list not populated yet")
148             return
149         if self.blocklist.check(jid):
150             self.log.debug(f'About to ban {jid}')
151             reason = self.blocklist.get_reason(jid)
152             if reason is not None:
153                 reason = f'rtbl {reason}'
154             await self.ban(jid.bare, reason=reason)
155
156     def got_offline(self, pres):
157         """Handler method for laving MUC participants"""
158         fjid = pres['muc']['jid']
159         user = fjid if fjid.full else pres['muc']['nick']
160         try:
161             self.participants.remove(user)
162         except KeyError:
163             self.log.error('KeyError removing participant: "%s"', user)
164         self.log.debug(f'participants: -{user} (len:{len(self.participants)})')
165
166     def got_presence(self, pres):
167         """Does bot have required permissions"""
168         if 110 in pres['muc']['status_codes']:
169             if pres['muc']['role'] != 'moderator':
170                 self.log.error(
171                     'Please give the bot moderator permissions. Will only log actions.'
172                 )
173                 self.moderator = False
174                 return
175             else:
176                 self.log.info('Got moderator permissions.')
177                 self.moderator = True
178
179     async def got_online(self, pres):
180         """Handler method for new MUC participants"""
181         fjid = pres['muc']['jid']
182         user = fjid if fjid.full else pres['muc']['nick']
183         self.participants.add(user)
184         self.log.debug(f'participants: +{user} (len:{len(self.participants)})')
185         await self.rtbl_ban(user)
186
187 # VIM MODLINE
188 # vim: ai ts=4 sw=4 sts=4 expandtab