1 # Copyright (c) 2009-2013, 2019-2021 kaliko <kaliko@azylum.org>
3 # This file is part of sima
5 # sima is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # sima is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with sima. If not, see <http://www.gnu.org/licenses/>.
19 """SQlite database library
21 https://stackoverflow.com/questions/62818662/sqlite-foreign-key-reverse-cascade-delete
25 __HIST_DURATION__ = int(30 * 24) # in hours
29 from datetime import (datetime, timedelta)
31 from sima.lib.meta import Artist, Album
32 from sima.lib.track import Track
35 class SimaDBError(Exception):
44 def __init__(self, db_path=None):
45 self._db_path = db_path
47 def get_database_connection(self):
48 """get database reference"""
49 connection = sqlite3.connect(
50 self._db_path, isolation_level=None)
53 def close_database_connection(self, connection):
54 """Close the database connection."""
60 connection = self.get_database_connection()
62 'CREATE TABLE IF NOT EXISTS db_info'
63 ' (name CHAR(50), value CHAR(50))')
64 connection.execute('''INSERT INTO db_info (name, value) SELECT ?, ?
66 ( SELECT 1 FROM db_info WHERE name = ? )''',
67 ('DB Version', __DB_VERSION__, 'DB Version'))
68 connection.execute( # ARTISTS
69 'CREATE TABLE IF NOT EXISTS artists (id INTEGER PRIMARY KEY, '
70 'name VARCHAR(100), mbid CHAR(36))')
71 connection.execute( # ALBUMS
72 'CREATE TABLE IF NOT EXISTS albums (id INTEGER PRIMARY KEY, '
73 'name VARCHAR(100), mbid CHAR(36))')
74 connection.execute( # ALBUMARTISTS
75 'CREATE TABLE IF NOT EXISTS albumartists (id INTEGER PRIMARY KEY, '
76 'name VARCHAR(100), mbid CHAR(36))')
77 connection.execute( # TRACKS
78 'CREATE TABLE IF NOT EXISTS tracks (id INTEGER PRIMARY KEY, '
79 'title VARCHAR(100), artist INTEGER, '
80 'album INTEGER, albumartist INTEGER, '
81 'file VARCHAR(500), mbid CHAR(36), '
82 'FOREIGN KEY(artist) REFERENCES artists(id), '
83 'FOREIGN KEY(album) REFERENCES albums(id), '
84 'FOREIGN KEY(albumartist) REFERENCES albumartists(id))')
85 connection.execute( # HISTORY
86 'CREATE TABLE IF NOT EXISTS history (id INTEGER PRIMARY KEY, '
87 'last_play TIMESTAMP, track integer, '
88 'FOREIGN KEY(track) REFERENCES tracks(id))')
89 connection.execute( # BLOCKLIST
90 'CREATE TABLE IF NOT EXISTS blocklist (id INTEGER PRIMARY KEY, '
91 'artist INTEGER, album INTEGER, track INTEGER, '
92 'FOREIGN KEY(artist) REFERENCES artists(id), '
93 'FOREIGN KEY(album) REFERENCES albums(id), '
94 'FOREIGN KEY(track) REFERENCES tracks(id))')
95 # Create cleanup triggers:
96 # DELETE history → Tracks table
97 connection.execute('''
98 CREATE TRIGGER IF NOT EXISTS del_history_cleanup_tracks
99 AFTER DELETE ON history
100 WHEN ((SELECT count(*) FROM history WHERE track=old.track) = 0 AND
101 (SELECT count(*) FROM blocklist WHERE track=old.track) = 0)
103 DELETE FROM tracks WHERE id = old.track;
106 # DELETE Tracks → Artists table
107 connection.execute('''
108 CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_artists
109 AFTER DELETE ON tracks
110 WHEN ((SELECT count(*) FROM tracks WHERE artist=old.artist) = 0 AND
111 (SELECT count(*) FROM blocklist WHERE artist=old.artist) = 0)
113 DELETE FROM artists WHERE id = old.artist;
116 # DELETE Tracks → Albums table
117 connection.execute('''
118 CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_albums
119 AFTER DELETE ON tracks
120 WHEN ((SELECT count(*) FROM tracks WHERE album=old.album) = 0 AND
121 (SELECT count(*) FROM blocklist WHERE album=old.album) = 0)
123 DELETE FROM albums WHERE id = old.album;
126 # DELETE Tracks → cleanup AlbumArtists table
127 connection.execute('''
128 CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_albumartists
129 AFTER DELETE ON tracks
130 WHEN ((SELECT count(*) FROM tracks WHERE albumartist=old.albumartist) = 0)
132 DELETE FROM albumartists WHERE id = old.albumartist;
135 # DELETE blocklist → Tracks table
136 connection.execute('''
137 CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_tracks
138 AFTER DELETE ON blocklist
139 WHEN ((SELECT count(*) FROM history WHERE track=old.track) = 0 AND
140 (SELECT count(*) FROM blocklist WHERE track=old.track) = 0)
142 DELETE FROM tracks WHERE id = old.track;
145 # DELETE blocklist → Artists table
146 # The "SELECT count(*) FROM blocklist" is useless,
147 # there can be only one blocklist.artist
148 connection.execute('''
149 CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_artists
150 AFTER DELETE ON blocklist
151 WHEN ((SELECT count(*) FROM tracks WHERE artist=old.artist) = 0 AND
152 (SELECT count(*) FROM blocklist WHERE artist=old.artist) = 0)
154 DELETE FROM artists WHERE id = old.artist;
157 # DELETE Tracks → Albums table
158 # The "SELECT count(*) FROM blocklist" is useless,
159 # there can be only one blocklist.album
160 connection.execute('''
161 CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_albums
162 AFTER DELETE ON blocklist
163 WHEN ((SELECT count(*) FROM tracks WHERE album=old.album) = 0 AND
164 (SELECT count(*) FROM blocklist WHERE album=old.album) = 0)
166 DELETE FROM albums WHERE id = old.album;
169 self.close_database_connection(connection)
172 connection = self.get_database_connection()
173 rows = connection.execute(
174 "SELECT name FROM sqlite_master WHERE type='table'")
175 for r in rows.fetchall():
176 connection.execute(f'DROP TABLE IF EXISTS {r[0]}')
179 def _remove_blocklist_id(self, blid, with_connection=None):
182 connection = with_connection
184 connection = self.get_database_connection()
185 connection = self.get_database_connection()
186 connection.execute('DELETE FROM blocklist'
187 ' WHERE blocklist.id = ?', (blid,))
189 if not with_connection:
190 self.close_database_connection(connection)
192 def _get_album(self, album, connection):
194 return connection.execute(
195 "SELECT id FROM albums WHERE mbid = ?",
198 return connection.execute(
199 "SELECT id FROM albums WHERE name = ? AND mbid IS NULL",
202 def get_album(self, album, with_connection=None, add=True):
203 """get album information from the database.
204 if not in database insert new entry.
206 :param sima.lib.meta.Album album: album objet
207 :param sqlite3.Connection with_connection: SQLite connection
210 connection = with_connection
212 connection = self.get_database_connection()
213 rows = self._get_album(album, connection)
215 if not with_connection:
216 self.close_database_connection(connection)
219 if not with_connection:
220 self.close_database_connection(connection)
223 "INSERT INTO albums (name, mbid) VALUES (?, ?)",
224 (album.name, album.mbid))
226 rows = self._get_album(album, connection)
228 if not with_connection:
229 self.close_database_connection(connection)
231 print('damned: %s' % album.mbid)
232 if not with_connection:
233 self.close_database_connection(connection)
236 def _get_albumartist(self, artist, connection):
238 return connection.execute(
239 "SELECT id FROM albumartists WHERE mbid = ?",
242 return connection.execute(
243 "SELECT id FROM albumartists WHERE name = ? AND mbid IS NULL",
246 def get_albumartist(self, artist, with_connection=None, add=True):
247 """get albumartist information from the database.
248 if not in database insert new entry.
250 :param sima.lib.meta.Artist artist: artist
251 :param sqlite3.Connection with_connection: SQLite connection
254 connection = with_connection
256 connection = self.get_database_connection()
257 rows = self._get_albumartist(artist, connection)
259 if not with_connection:
260 self.close_database_connection(connection)
263 if not with_connection:
264 self.close_database_connection(connection)
267 "INSERT INTO albumartists (name, mbid) VALUES (?, ?)",
268 (artist.name, artist.mbid))
270 rows = self._get_albumartist(artist, connection)
272 if not with_connection:
273 self.close_database_connection(connection)
275 if not with_connection:
276 self.close_database_connection(connection)
278 def _get_artist(self, artist, connection):
280 return connection.execute(
281 "SELECT id FROM artists WHERE mbid = ?",
284 return connection.execute(
285 "SELECT id FROM artists WHERE name = ? AND mbid IS NULL", (artist.name,))
287 def get_artist(self, artist, with_connection=None, add=True):
288 """get artist information from the database.
289 if not in database insert new entry.
291 :param sima.lib.meta.Artist artist: artist
292 :param sqlite3.Connection with_connection: SQLite connection
295 connection = with_connection
297 connection = self.get_database_connection()
298 rows = self._get_artist(artist, connection)
300 if not with_connection:
301 self.close_database_connection(connection)
304 if not with_connection:
305 self.close_database_connection(connection)
308 "INSERT INTO artists (name, mbid) VALUES (?, ?)",
309 (artist.name, artist.mbid))
311 rows = self._get_artist(artist, connection)
313 if not with_connection:
314 self.close_database_connection(connection)
316 if not with_connection:
317 self.close_database_connection(connection)
319 def get_track(self, track, with_connection=None, add=True):
320 """Get a track from Tracks table, add if not existing,
321 Attention: use Track() object!!
322 if not in database insert new entry."""
324 raise SimaDBError('Got a track with no file attribute: %r' % track)
326 connection = with_connection
328 connection = self.get_database_connection()
329 rows = connection.execute(
330 "SELECT * FROM tracks WHERE file = ?", (track.file,))
332 if not with_connection:
333 self.close_database_connection(connection)
335 if not add: # Not adding non existing track
337 # Get an artist record or None
339 art = Artist(name=track.artist, mbid=track.musicbrainz_artistid)
340 art_id = self.get_artist(art, with_connection=connection)
343 # Get an albumartist record or None
344 if track.albumartist:
345 albart = Artist(name=track.albumartist,
346 mbid=track.musicbrainz_albumartistid)
347 albart_id = self.get_albumartist(albart, with_connection=connection)
350 # Get an album record or None
352 alb = Album(name=track.album, mbid=track.musicbrainz_albumid)
353 alb_id = self.get_album(alb, with_connection=connection)
357 """INSERT INTO tracks (artist, albumartist, album, title, mbid, file)
358 VALUES (?, ?, ?, ?, ?, ?)""",
359 (art_id, albart_id, alb_id, track.title, track.musicbrainz_trackid,
362 rows = connection.execute(
363 "SELECT id FROM tracks WHERE file = ?", (track.file,))
365 if not with_connection:
366 self.close_database_connection(connection)
368 if not with_connection:
370 self.close_database_connection(connection)
373 def add_history(self, track, date=None):
374 """Record last play date of track (ie. not a real exhautive play history).
375 :param track sima.lib.track.Track: track to add to history"""
377 date = datetime.now()
378 connection = self.get_database_connection()
379 track_id = self.get_track(track, with_connection=connection)
380 rows = connection.execute("SELECT * FROM history WHERE track = ? ",
382 if not rows.fetchone():
383 connection.execute("INSERT INTO history (track) VALUES (?)",
385 connection.execute("UPDATE history SET last_play = ? "
386 " WHERE track = ?", (date, track_id,))
388 self.close_database_connection(connection)
390 def purge_history(self, duration=__HIST_DURATION__):
391 """Remove old entries in history
392 :param duration int: Purge history record older than duration in hours
393 (defaults to __HIST_DURATION__)"""
394 connection = self.get_database_connection()
395 connection.execute("DELETE FROM history WHERE last_play"
396 " < datetime('now', '-%i hours')" % duration)
398 self.close_database_connection(connection)
400 def fetch_artists_history(self, duration=__HIST_DURATION__):
401 date = datetime.utcnow() - timedelta(hours=duration)
402 connection = self.get_database_connection()
403 connection.row_factory = sqlite3.Row
404 rows = connection.execute("""
405 SELECT artists.name AS name,
408 JOIN tracks ON history.track = tracks.id
409 LEFT OUTER JOIN artists ON tracks.artist = artists.id
410 WHERE history.last_play > ?
411 ORDER BY history.last_play DESC""", (date.isoformat(' '),))
414 if hist and hist[-1] == Album(**row): # remove consecutive dupes
416 hist.append(Album(**row))
420 def fetch_history(self, duration=__HIST_DURATION__):
421 """Fetches tracks history, more recent first
422 :param int duration: How long ago to fetch history from
424 date = datetime.utcnow() - timedelta(hours=duration)
425 connection = self.get_database_connection()
426 connection.row_factory = sqlite3.Row
427 rows = connection.execute("""
428 SELECT tracks.title, tracks.file, artists.name AS artist,
429 albumartists.name AS albumartist,
430 artists.mbid as musicbrainz_artistid,
431 albums.name AS album,
432 albums.mbid AS musicbrainz_albumid,
433 tracks.mbid as musicbrainz_trackid
435 JOIN tracks ON history.track = tracks.id
436 LEFT OUTER JOIN artists ON tracks.artist = artists.id
437 LEFT OUTER JOIN albumartists ON tracks.albumartist = albumartists.id
438 LEFT OUTER JOIN albums ON tracks.album = albums.id
439 WHERE history.last_play > ?
440 ORDER BY history.last_play DESC""", (date.isoformat(' '),))
443 hist.append(Track(**row))
447 def get_bl_track(self, track, with_connection=None, add=True):
448 """Add a track to blocklist
449 :param sima.lib.track.Track track: Track object to add to blocklist
450 :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one
451 :param bool add: Default is to add a new record, set to False to fetch associated record"""
453 connection = with_connection
455 connection = self.get_database_connection()
456 track_id = self.get_track(track, with_connection=connection, add=True)
457 rows = connection.execute(
458 "SELECT id FROM blocklist WHERE track = ?", (track_id,))
459 if not rows.fetchone():
462 connection.execute('INSERT INTO blocklist (track) VALUES (?)',
465 rows = connection.execute(
466 "SELECT id FROM blocklist WHERE track = ?", (track_id,))
467 return rows.fetchone()[0]
469 def get_bl_album(self, album, with_connection=None, add=True):
470 """Add an album to blocklist
471 :param sima.lib.meta.Album: Album object to add to blocklist
472 :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one
473 :param bool add: Default is to add a new record, set to False to fetch associated record"""
475 connection = with_connection
477 connection = self.get_database_connection()
478 album_id = self.get_album(album, with_connection=connection, add=True)
479 rows = connection.execute(
480 "SELECT id FROM blocklist WHERE album = ?", (album_id,))
481 if not rows.fetchone():
484 connection.execute('INSERT INTO blocklist (album) VALUES (?)',
487 rows = connection.execute(
488 "SELECT id FROM blocklist WHERE album = ?", (album_id,))
489 return rows.fetchone()[0]
491 def get_bl_artist(self, artist, with_connection=None, add=True):
492 """Add an artist to blocklist
493 :param sima.lib.meta.Artist: Artist object to add to blocklist
494 :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one
495 :param bool add: Default is to add a new record, set to False to fetch associated record"""
497 connection = with_connection
499 connection = self.get_database_connection()
500 artist_id = self.get_artist(artist, with_connection=connection, add=True)
501 rows = connection.execute(
502 "SELECT id FROM blocklist WHERE artist = ?", (artist_id,))
503 if not rows.fetchone():
506 connection.execute('INSERT INTO blocklist (artist) VALUES (?)',
509 rows = connection.execute(
510 "SELECT id FROM blocklist WHERE artist = ?", (artist_id,))
511 return rows.fetchone()[0]
513 def delete_bl(self, track=None, album=None, artist=None):
514 if not (track or album or artist):
516 connection = self.get_database_connection()
519 blid = self.get_bl_track(track, with_connection=connection)
521 blid = self.get_bl_album(album, with_connection=connection)
523 blid = self.get_bl_artist(artist, with_connection=connection)
526 self._remove_blocklist_id(blid, with_connection=connection)
531 'albumartist': 'Devolt',
533 'date': '2011-12-01',
534 'file': 'music/Devolt/2011-Grey/03-Devolt - Crazy.mp3',
535 'musicbrainz_albumartistid': 'd8e7e3e2-49ab-4f7c-b148-fc946d521f99',
536 'musicbrainz_albumid': 'ea2ef2cf-59e1-443a-817e-9066e3e0be4b',
537 'musicbrainz_artistid': 'd8e7e3e2-49ab-4f7c-b148-fc946d521f99',
538 'musicbrainz_trackid': 'fabf8fc9-2ae5-49c9-8214-a839c958d872',
539 'duration': '220.000',
541 db = SimaDB('/dev/shm/test.sqlite')
543 db.add_history(Track(**DEVOLT))
544 DEVOLT['file'] = 'foo'
545 print(db.get_bl_track(Track(**DEVOLT)))
546 db.add_history(Track(**DEVOLT))
549 # vim: ai ts=4 sw=4 sts=4 expandtab fileencoding=utf8