1 # Copyright (c) 2009-2013, 2019-2021 kaliko <kaliko@azylum.org>
3 # This file is part of sima
5 # sima is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # sima is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with sima. If not, see <http://www.gnu.org/licenses/>.
19 """SQlite database library
21 https://stackoverflow.com/questions/62818662/sqlite-foreign-key-reverse-cascade-delete
25 __HIST_DURATION__ = int(30 * 24) # in hours
29 from collections import deque
30 from datetime import (datetime, timedelta)
32 from sima.lib.meta import Artist, Album
33 from sima.lib.track import Track
36 class SimaDBError(Exception):
45 def __init__(self, db_path=None):
46 self._db_path = db_path
48 def get_database_connection(self):
49 """get database reference"""
50 connection = sqlite3.connect(
51 self._db_path, isolation_level=None)
54 def close_database_connection(self, connection):
55 """Close the database connection."""
61 connection = self.get_database_connection()
63 'CREATE TABLE IF NOT EXISTS db_info'
64 ' (name CHAR(50), value CHAR(50))')
65 connection.execute('''INSERT INTO db_info (name, value) SELECT ?, ?
67 ( SELECT 1 FROM db_info WHERE name = ? )''',
68 ('DB Version', __DB_VERSION__, 'DB Version'))
69 connection.execute( # ARTISTS
70 'CREATE TABLE IF NOT EXISTS artists (id INTEGER PRIMARY KEY, '
71 'name VARCHAR(100), mbid CHAR(36))')
72 connection.execute( # ALBUMS
73 'CREATE TABLE IF NOT EXISTS albums (id INTEGER PRIMARY KEY, '
74 'name VARCHAR(100), mbid CHAR(36))')
75 connection.execute( # ALBUMARTISTS
76 'CREATE TABLE IF NOT EXISTS albumartists (id INTEGER PRIMARY KEY, '
77 'name VARCHAR(100), mbid CHAR(36))')
78 connection.execute( # TRACKS
79 'CREATE TABLE IF NOT EXISTS tracks (id INTEGER PRIMARY KEY, '
80 'title VARCHAR(100), artist INTEGER, '
81 'album INTEGER, albumartist INTEGER, '
82 'file VARCHAR(500), mbid CHAR(36), '
83 'FOREIGN KEY(artist) REFERENCES artists(id), '
84 'FOREIGN KEY(album) REFERENCES albums(id), '
85 'FOREIGN KEY(albumartist) REFERENCES albumartists(id))')
86 connection.execute( # HISTORY
87 'CREATE TABLE IF NOT EXISTS history (id INTEGER PRIMARY KEY, '
88 'last_play TIMESTAMP, track integer, '
89 'FOREIGN KEY(track) REFERENCES tracks(id))')
90 connection.execute( # BLOCKLIST
91 'CREATE TABLE IF NOT EXISTS blocklist (id INTEGER PRIMARY KEY, '
92 'artist INTEGER, album INTEGER, track INTEGER, '
93 'FOREIGN KEY(artist) REFERENCES artists(id), '
94 'FOREIGN KEY(album) REFERENCES albums(id), '
95 'FOREIGN KEY(track) REFERENCES tracks(id))')
96 # Create cleanup triggers:
97 # DELETE history → Tracks table
98 connection.execute('''
99 CREATE TRIGGER IF NOT EXISTS del_history_cleanup_tracks
100 AFTER DELETE ON history
101 WHEN ((SELECT count(*) FROM history WHERE track=old.track) = 0 AND
102 (SELECT count(*) FROM blocklist WHERE track=old.track) = 0)
104 DELETE FROM tracks WHERE id = old.track;
107 # DELETE Tracks → Artists table
108 connection.execute('''
109 CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_artists
110 AFTER DELETE ON tracks
111 WHEN ((SELECT count(*) FROM tracks WHERE artist=old.artist) = 0 AND
112 (SELECT count(*) FROM blocklist WHERE artist=old.artist) = 0)
114 DELETE FROM artists WHERE id = old.artist;
117 # DELETE Tracks → Albums table
118 connection.execute('''
119 CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_albums
120 AFTER DELETE ON tracks
121 WHEN ((SELECT count(*) FROM tracks WHERE album=old.album) = 0 AND
122 (SELECT count(*) FROM blocklist WHERE album=old.album) = 0)
124 DELETE FROM albums WHERE id = old.album;
127 # DELETE Tracks → cleanup AlbumArtists table
128 connection.execute('''
129 CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_albumartists
130 AFTER DELETE ON tracks
131 WHEN ((SELECT count(*) FROM tracks WHERE albumartist=old.albumartist) = 0)
133 DELETE FROM albumartists WHERE id = old.albumartist;
136 # DELETE blocklist → Tracks table
137 connection.execute('''
138 CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_tracks
139 AFTER DELETE ON blocklist
140 WHEN ((SELECT count(*) FROM history WHERE track=old.track) = 0 AND
141 (SELECT count(*) FROM blocklist WHERE track=old.track) = 0)
143 DELETE FROM tracks WHERE id = old.track;
146 # DELETE blocklist → Artists table
147 # The "SELECT count(*) FROM blocklist" is useless,
148 # there can be only one blocklist.artist
149 connection.execute('''
150 CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_artists
151 AFTER DELETE ON blocklist
152 WHEN ((SELECT count(*) FROM tracks WHERE artist=old.artist) = 0 AND
153 (SELECT count(*) FROM blocklist WHERE artist=old.artist) = 0)
155 DELETE FROM artists WHERE id = old.artist;
158 # DELETE Tracks → Albums table
159 # The "SELECT count(*) FROM blocklist" is useless,
160 # there can be only one blocklist.album
161 connection.execute('''
162 CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_albums
163 AFTER DELETE ON blocklist
164 WHEN ((SELECT count(*) FROM tracks WHERE album=old.album) = 0 AND
165 (SELECT count(*) FROM blocklist WHERE album=old.album) = 0)
167 DELETE FROM albums WHERE id = old.album;
170 self.close_database_connection(connection)
173 connection = self.get_database_connection()
174 rows = connection.execute(
175 "SELECT name FROM sqlite_master WHERE type='table'")
176 for r in rows.fetchall():
177 connection.execute(f'DROP TABLE IF EXISTS {r[0]}')
180 def _remove_blocklist_id(self, blid, with_connection=None):
183 connection = with_connection
185 connection = self.get_database_connection()
186 connection = self.get_database_connection()
187 connection.execute('DELETE FROM blocklist'
188 ' WHERE blocklist.id = ?', (blid,))
190 if not with_connection:
191 self.close_database_connection(connection)
193 def _get_album(self, album, connection):
195 return connection.execute(
196 "SELECT id FROM albums WHERE mbid = ?",
199 return connection.execute(
200 "SELECT id FROM albums WHERE name = ? AND mbid IS NULL",
203 def get_album(self, album, with_connection=None, add=True):
204 """get album information from the database.
205 if not in database insert new entry.
207 :param sima.lib.meta.Album album: album objet
208 :param sqlite3.Connection with_connection: SQLite connection
211 connection = with_connection
213 connection = self.get_database_connection()
214 rows = self._get_album(album, connection)
216 if not with_connection:
217 self.close_database_connection(connection)
220 if not with_connection:
221 self.close_database_connection(connection)
224 "INSERT INTO albums (name, mbid) VALUES (?, ?)",
225 (album.name, album.mbid))
227 rows = self._get_album(album, connection)
229 if not with_connection:
230 self.close_database_connection(connection)
232 print('damned: %s' % album.mbid)
233 if not with_connection:
234 self.close_database_connection(connection)
237 def _get_albumartist(self, artist, connection):
239 return connection.execute(
240 "SELECT id FROM albumartists WHERE mbid = ?",
243 return connection.execute(
244 "SELECT id FROM albumartists WHERE name = ? AND mbid IS NULL",
247 def get_albumartist(self, artist, with_connection=None, add=True):
248 """get albumartist information from the database.
249 if not in database insert new entry.
251 :param sima.lib.meta.Artist artist: artist
252 :param sqlite3.Connection with_connection: SQLite connection
255 connection = with_connection
257 connection = self.get_database_connection()
258 rows = self._get_albumartist(artist, connection)
260 if not with_connection:
261 self.close_database_connection(connection)
264 if not with_connection:
265 self.close_database_connection(connection)
268 "INSERT INTO albumartists (name, mbid) VALUES (?, ?)",
269 (artist.name, artist.mbid))
271 rows = self._get_albumartist(artist, connection)
273 if not with_connection:
274 self.close_database_connection(connection)
276 if not with_connection:
277 self.close_database_connection(connection)
279 def _get_artist(self, artist, connection):
281 return connection.execute(
282 "SELECT id FROM artists WHERE mbid = ?",
285 return connection.execute(
286 "SELECT id FROM artists WHERE name = ? AND mbid IS NULL", (artist.name,))
288 def get_artist(self, artist, with_connection=None, add=True):
289 """get artist information from the database.
290 if not in database insert new entry.
292 :param sima.lib.meta.Artist artist: artist
293 :param sqlite3.Connection with_connection: SQLite connection
296 connection = with_connection
298 connection = self.get_database_connection()
299 rows = self._get_artist(artist, connection)
301 if not with_connection:
302 self.close_database_connection(connection)
305 if not with_connection:
306 self.close_database_connection(connection)
309 "INSERT INTO artists (name, mbid) VALUES (?, ?)",
310 (artist.name, artist.mbid))
312 rows = self._get_artist(artist, connection)
314 if not with_connection:
315 self.close_database_connection(connection)
317 if not with_connection:
318 self.close_database_connection(connection)
320 def get_track(self, track, with_connection=None, add=True):
321 """Get a track from Tracks table, add if not existing,
322 Attention: use Track() object!!
323 if not in database insert new entry."""
325 raise SimaDBError('Got a track with no file attribute: %r' % track)
327 connection = with_connection
329 connection = self.get_database_connection()
330 rows = connection.execute(
331 "SELECT * FROM tracks WHERE file = ?", (track.file,))
333 if not with_connection:
334 self.close_database_connection(connection)
336 if not add: # Not adding non existing track
338 # Get an artist record or None
340 art = Artist(name=track.artist, mbid=track.musicbrainz_artistid)
341 art_id = self.get_artist(art, with_connection=connection)
344 # Get an albumartist record or None
345 if track.albumartist:
346 albart = Artist(name=track.albumartist,
347 mbid=track.musicbrainz_albumartistid)
348 albart_id = self.get_albumartist(albart, with_connection=connection)
351 # Get an album record or None
353 alb = Album(name=track.album, mbid=track.musicbrainz_albumid)
354 alb_id = self.get_album(alb, with_connection=connection)
358 """INSERT INTO tracks (artist, albumartist, album, title, mbid, file)
359 VALUES (?, ?, ?, ?, ?, ?)""",
360 (art_id, albart_id, alb_id, track.title, track.musicbrainz_trackid,
363 rows = connection.execute(
364 "SELECT id FROM tracks WHERE file = ?", (track.file,))
366 if not with_connection:
367 self.close_database_connection(connection)
369 if not with_connection:
371 self.close_database_connection(connection)
374 def add_history(self, track, date=None):
375 """Record last play date of track (ie. not a real exhautive play history).
376 :param track sima.lib.track.Track: track to add to history"""
378 date = datetime.now()
379 connection = self.get_database_connection()
380 track_id = self.get_track(track, with_connection=connection)
381 rows = connection.execute("SELECT * FROM history WHERE track = ? ",
383 if not rows.fetchone():
384 connection.execute("INSERT INTO history (track) VALUES (?)",
386 connection.execute("UPDATE history SET last_play = ? "
387 " WHERE track = ?", (date, track_id,))
389 self.close_database_connection(connection)
391 def purge_history(self, duration=__HIST_DURATION__):
392 """Remove old entries in history
393 :param duration int: Purge history record older than duration in hours
394 (defaults to __HIST_DURATION__)"""
395 connection = self.get_database_connection()
396 connection.execute("DELETE FROM history WHERE last_play"
397 " < datetime('now', '-%i hours')" % duration)
398 connection.execute('VACUUM')
400 self.close_database_connection(connection)
402 def fetch_albums_history(self, needle=None, duration=__HIST_DURATION__):
404 :param sima.lib.meta.Artist needle: When specified, returns albums history for this artist.
406 date = datetime.utcnow() - timedelta(hours=duration)
407 connection = self.get_database_connection()
408 connection.row_factory = sqlite3.Row
409 rows = connection.execute("""
410 SELECT albums.name AS name,
412 artists.name as artist,
413 artists.mbid as artist_mbib
415 JOIN tracks ON history.track = tracks.id
416 LEFT OUTER JOIN albums ON tracks.album = albums.id
417 LEFT OUTER JOIN artists ON tracks.artist = artists.id
418 WHERE history.last_play > ? AND albums.name NOT NULL AND artists.name NOT NULL
419 ORDER BY history.last_play DESC""", (date.isoformat(' '),))
423 artist = Artist(name=vals.pop('artist'),
424 mbid=vals.pop('artist_mbib'))
428 album = Album(**vals, artist=artist)
429 if hist and hist[-1] == album:
430 # remove consecutive dupes
436 def fetch_artists_history(self, needle=None, duration=__HIST_DURATION__):
437 """Returns a list of Artist objects
438 :param sima.lib.meta.Artist|sima.lib.meta.MetaContainer needle: When specified, returns history for this artist, it's actually testing the artist presence in history.
439 :param sima.lib.meta.MetaContainer needle: When specified, returns history for these artists only
441 date = datetime.utcnow() - timedelta(hours=duration)
442 connection = self.get_database_connection()
443 connection.row_factory = sqlite3.Row
444 rows = connection.execute("""
445 SELECT artists.name AS name,
448 JOIN tracks ON history.track = tracks.id
449 LEFT OUTER JOIN artists ON tracks.artist = artists.id
450 WHERE history.last_play > ? AND artists.name NOT NULL
451 ORDER BY history.last_play DESC""", (date.isoformat(' '),))
452 last = deque(maxlen=1)
455 artist = Artist(**row)
456 if last and last[0] == artist: # remove consecutive dupes
459 if needle and isinstance(needle, (Artist, str)):
461 hist.append(artist) # No need to go further
464 elif needle and getattr(needle, '__contains__'):
466 hist.append(artist) # No need to go further
471 def fetch_history(self, artist=None, duration=__HIST_DURATION__):
472 """Fetches tracks history, more recent first
473 :param sima.lib.meta.Artist artist: limit history to this artist
474 :param int duration: How long ago to fetch history from
476 date = datetime.utcnow() - timedelta(hours=duration)
477 connection = self.get_database_connection()
478 connection.row_factory = sqlite3.Row
480 SELECT tracks.title, tracks.file, artists.name AS artist,
481 albumartists.name AS albumartist,
482 artists.mbid as musicbrainz_artistid,
483 albums.name AS album,
484 albums.mbid AS musicbrainz_albumid,
485 tracks.mbid as musicbrainz_trackid
487 JOIN tracks ON history.track = tracks.id
488 LEFT OUTER JOIN artists ON tracks.artist = artists.id
489 LEFT OUTER JOIN albumartists ON tracks.albumartist = albumartists.id
490 LEFT OUTER JOIN albums ON tracks.album = albums.id
491 WHERE history.last_play > ?
495 rows = connection.execute(sql+"""
497 ORDER BY history.last_play DESC""",
498 (date.isoformat(' '), artist.mbid))
500 rows = connection.execute(sql+"""
502 ORDER BY history.last_play DESC""",
503 (date.isoformat(' '), artist.name))
505 rows = connection.execute(sql+'ORDER BY history.last_play DESC',
506 (date.isoformat(' '),))
509 hist.append(Track(**row))
513 def get_bl_track(self, track, with_connection=None, add=True):
514 """Add a track to blocklist
515 :param sima.lib.track.Track track: Track object to add to blocklist
516 :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one
517 :param bool add: Default is to add a new record, set to False to fetch associated record"""
519 connection = with_connection
521 connection = self.get_database_connection()
522 track_id = self.get_track(track, with_connection=connection, add=True)
523 rows = connection.execute(
524 "SELECT id FROM blocklist WHERE track = ?", (track_id,))
525 if not rows.fetchone():
528 connection.execute('INSERT INTO blocklist (track) VALUES (?)',
531 rows = connection.execute(
532 "SELECT id FROM blocklist WHERE track = ?", (track_id,))
533 return rows.fetchone()[0]
535 def get_bl_album(self, album, with_connection=None, add=True):
536 """Add an album to blocklist
537 :param sima.lib.meta.Album: Album object to add to blocklist
538 :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one
539 :param bool add: Default is to add a new record, set to False to fetch associated record"""
541 connection = with_connection
543 connection = self.get_database_connection()
544 album_id = self.get_album(album, with_connection=connection, add=True)
545 rows = connection.execute(
546 "SELECT id FROM blocklist WHERE album = ?", (album_id,))
547 if not rows.fetchone():
550 connection.execute('INSERT INTO blocklist (album) VALUES (?)',
553 rows = connection.execute(
554 "SELECT id FROM blocklist WHERE album = ?", (album_id,))
555 return rows.fetchone()[0]
557 def get_bl_artist(self, artist, with_connection=None, add=True):
558 """Add an artist to blocklist
559 :param sima.lib.meta.Artist: Artist object to add to blocklist
560 :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one
561 :param bool add: Default is to add a new record, set to False to fetch associated record"""
563 connection = with_connection
565 connection = self.get_database_connection()
566 artist_id = self.get_artist(artist, with_connection=connection, add=True)
567 rows = connection.execute(
568 "SELECT id FROM blocklist WHERE artist = ?", (artist_id,))
569 if not rows.fetchone():
572 connection.execute('INSERT INTO blocklist (artist) VALUES (?)',
575 rows = connection.execute(
576 "SELECT id FROM blocklist WHERE artist = ?", (artist_id,))
577 return rows.fetchone()[0]
579 def delete_bl(self, track=None, album=None, artist=None):
580 if not (track or album or artist):
582 connection = self.get_database_connection()
585 blid = self.get_bl_track(track, with_connection=connection)
587 blid = self.get_bl_album(album, with_connection=connection)
589 blid = self.get_bl_artist(artist, with_connection=connection)
592 self._remove_blocklist_id(blid, with_connection=connection)
596 # vim: ai ts=4 sw=4 sts=4 expandtab fileencoding=utf8