#
#
"""SQlite database library
-
-https://stackoverflow.com/questions/62818662/sqlite-foreign-key-reverse-cascade-delete
"""
__DB_VERSION__ = 4
import sqlite3
+from collections import deque
from datetime import (datetime, timedelta)
from sima.lib.meta import Artist, Album
from sima.lib.track import Track
+class SimaDBError(Exception):
+ """
+ Exceptions.
+ """
+
+
class SimaDB:
"SQLite management"
self._db_path, isolation_level=None)
return connection
- def close_database_connection(self, connection):
- """Close the database connection."""
- connection.close()
-
def create_db(self):
""" Set up a database
"""
'FOREIGN KEY(albumartist) REFERENCES albumartists(id))')
connection.execute( # HISTORY
'CREATE TABLE IF NOT EXISTS history (id INTEGER PRIMARY KEY, '
- 'last_play TIMESTAMP, track integer, '
+ 'last_play TIMESTAMP, track INTEGER, '
'FOREIGN KEY(track) REFERENCES tracks(id))')
+ connection.execute( # BLOCKLIST
+ 'CREATE TABLE IF NOT EXISTS blocklist (id INTEGER PRIMARY KEY, '
+ 'artist INTEGER, album INTEGER, track INTEGER, '
+ 'FOREIGN KEY(artist) REFERENCES artists(id), '
+ 'FOREIGN KEY(album) REFERENCES albums(id), '
+ 'FOREIGN KEY(track) REFERENCES tracks(id))')
+ connection.execute( # Genres (Many-to-many)
+ 'CREATE TABLE IF NOT EXISTS genres '
+ '(id INTEGER PRIMARY KEY, name VARCHAR(100))')
+ connection.execute( # Junction Genres Tracks
+ """CREATE TABLE IF NOT EXISTS tracks_genres
+ ( track INTEGER, genre INTEGER,
+ FOREIGN KEY(track) REFERENCES tracks(id)
+ FOREIGN KEY(genre) REFERENCES genres(id))""")
# Create cleanup triggers:
- # Tracks table
+ # DELETE history → Tracks / Tracks_genres tables
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_history_cleanup_tracks
+ AFTER DELETE ON history
+ WHEN ((SELECT count(*) FROM history WHERE track=old.track) = 0 AND
+ (SELECT count(*) FROM blocklist WHERE track=old.track) = 0)
+ BEGIN
+ DELETE FROM tracks WHERE id = old.track;
+ DELETE FROM tracks_genres WHERE track = old.track;
+ END;
+ ''')
+ # DELETE Tracks_Genres → Genres table
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_tracks_genres_cleanup_genres
+ AFTER DELETE ON tracks_genres
+ WHEN ((SELECT count(*) FROM tracks_genres WHERE genre=old.genre) = 0)
+ BEGIN
+ DELETE FROM genres WHERE id = old.genre;
+ END;
+ ''')
+ # DELETE Tracks → Artists table
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_artists
+ AFTER DELETE ON tracks
+ WHEN ((SELECT count(*) FROM tracks WHERE artist=old.artist) = 0 AND
+ (SELECT count(*) FROM blocklist WHERE artist=old.artist) = 0)
+ BEGIN
+ DELETE FROM artists WHERE id = old.artist;
+ END;
+ ''')
+ # DELETE Tracks → Albums table
connection.execute('''
- CREATE TRIGGER IF NOT EXISTS cleanup_tracks
- AFTER DELETE ON history
- WHEN ((SELECT count(*) FROM history WHERE track=old.id) = 0)
- BEGIN
- DELETE FROM tracks WHERE id = old.id;
- END;
- ''')
- # Artists table
+ CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_albums
+ AFTER DELETE ON tracks
+ WHEN ((SELECT count(*) FROM tracks WHERE album=old.album) = 0 AND
+ (SELECT count(*) FROM blocklist WHERE album=old.album) = 0)
+ BEGIN
+ DELETE FROM albums WHERE id = old.album;
+ END;
+ ''')
+ # DELETE Tracks → cleanup AlbumArtists table
connection.execute('''
- CREATE TRIGGER IF NOT EXISTS cleanup_artists
- AFTER DELETE ON tracks
- WHEN ((SELECT count(*) FROM tracks WHERE artist=old.artist) = 0)
- BEGIN
- DELETE FROM artists WHERE id = old.artist;
- END;
- ''')
- # Albums table
+ CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_albumartists
+ AFTER DELETE ON tracks
+ WHEN ((SELECT count(*) FROM tracks WHERE albumartist=old.albumartist) = 0)
+ BEGIN
+ DELETE FROM albumartists WHERE id = old.albumartist;
+ END;
+ ''')
+ # DELETE blocklist → Tracks table
connection.execute('''
- CREATE TRIGGER IF NOT EXISTS cleanup_albums
- AFTER DELETE ON tracks
- WHEN ((SELECT count(*) FROM tracks WHERE album=old.album) = 0)
- BEGIN
- DELETE FROM albums WHERE id = old.album;
- END;
- ''')
- # AlbumArtists table
+ CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_tracks
+ AFTER DELETE ON blocklist
+ WHEN ((SELECT count(*) FROM history WHERE track=old.track) = 0 AND
+ (SELECT count(*) FROM blocklist WHERE track=old.track) = 0)
+ BEGIN
+ DELETE FROM tracks WHERE id = old.track;
+ END;
+ ''')
+ # DELETE blocklist → Artists table
+ # The "SELECT count(*) FROM blocklist" is useless,
+ # there can be only one blocklist.artist
connection.execute('''
- CREATE TRIGGER IF NOT EXISTS cleanup_albumartists
- AFTER DELETE ON tracks
- WHEN ((SELECT count(*) FROM tracks WHERE albumartist=old.albumartist) = 0)
- BEGIN
- DELETE FROM albumartists WHERE id = old.albumartist;
- END;
- ''')
- self.close_database_connection(connection)
+ CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_artists
+ AFTER DELETE ON blocklist
+ WHEN ((SELECT count(*) FROM tracks WHERE artist=old.artist) = 0 AND
+ (SELECT count(*) FROM blocklist WHERE artist=old.artist) = 0)
+ BEGIN
+ DELETE FROM artists WHERE id = old.artist;
+ END;
+ ''')
+ # DELETE Tracks → Albums table
+ # The "SELECT count(*) FROM blocklist" is useless,
+ # there can be only one blocklist.album
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_albums
+ AFTER DELETE ON blocklist
+ WHEN ((SELECT count(*) FROM tracks WHERE album=old.album) = 0 AND
+ (SELECT count(*) FROM blocklist WHERE album=old.album) = 0)
+ BEGIN
+ DELETE FROM albums WHERE id = old.album;
+ END;
+ ''')
+ connection.close()
+
+ def drop_all(self):
+ connection = self.get_database_connection()
+ rows = connection.execute(
+ "SELECT name FROM sqlite_master WHERE type='table'")
+ for r in rows.fetchall():
+ connection.execute(f'DROP TABLE IF EXISTS {r[0]}')
+ connection.close()
+
+ def _remove_blocklist_id(self, blid, with_connection=None):
+ """Remove id"""
+ if with_connection:
+ connection = with_connection
+ else:
+ connection = self.get_database_connection()
+ connection = self.get_database_connection()
+ connection.execute('DELETE FROM blocklist'
+ ' WHERE blocklist.id = ?', (blid,))
+ connection.commit()
+ if not with_connection:
+ connection.close()
def _get_album(self, album, connection):
if album.mbid:
rows = self._get_album(album, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not add:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return None
connection.execute(
"INSERT INTO albums (name, mbid) VALUES (?, ?)",
rows = self._get_album(album, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
- print('damned: %s' % album.mbid)
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return None
def _get_albumartist(self, artist, connection):
rows = self._get_albumartist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not add:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return None
connection.execute(
"INSERT INTO albumartists (name, mbid) VALUES (?, ?)",
rows = self._get_albumartist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
def _get_artist(self, artist, connection):
if artist.mbid:
rows = self._get_artist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not add:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return None
connection.execute(
"INSERT INTO artists (name, mbid) VALUES (?, ?)",
rows = self._get_artist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
+
+ def get_genre(self, genre, with_connection=None, add=True):
+ """get genre from the database.
+ if not in database insert new entry.
+
+ :param str genre: genre as a string
+ :param sqlite3.Connection with_connection: SQLite connection
+ """
+ if with_connection:
+ connection = with_connection
+ else:
+ connection = self.get_database_connection()
+ rows = connection.execute(
+ "SELECT id FROM genres WHERE name = ?", (genre,))
+ for row in rows:
+ if not with_connection:
+ connection.close()
+ return row[0]
+ if not add:
+ if not with_connection:
+ connection.close()
+ return None
+ connection.execute(
+ "INSERT INTO genres (name) VALUES (?)", (genre,))
+ connection.commit()
+ rows = connection.execute(
+ "SELECT id FROM genres WHERE name = ?", (genre,))
+ for row in rows:
+ if not with_connection:
+ connection.close()
+ return row[0]
def get_track(self, track, with_connection=None, add=True):
- """Get a track from Tracks table, add if not existing,
- Attention: use Track() object!!
- if not in database insert new entry."""
+ """Get a track id from Tracks table, add if not existing,
+ :param sima.lib.track.Track track: track to use
+ :param bool add: add non existing track to database"""
+ if not track.file:
+ raise SimaDBError('Got a track with no file attribute: %r' % track)
if with_connection:
connection = with_connection
else:
"SELECT * FROM tracks WHERE file = ?", (track.file,))
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not add: # Not adding non existing track
+ connection.close()
return None
# Get an artist record or None
if track.artist:
(art_id, albart_id, alb_id, track.title, track.musicbrainz_trackid,
track.file))
connection.commit()
+ # Add track id to junction tables
+ self._add_tracks_genres(track, connection)
rows = connection.execute(
"SELECT id FROM tracks WHERE file = ?", (track.file,))
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not with_connection:
- connection.commit()
- self.close_database_connection(connection)
+ connection.close()
return None
+ def _add_tracks_genres(self, track, connection):
+ if not track.genres:
+ return None
+ rows = connection.execute(
+ "SELECT id FROM tracks WHERE file = ?", (track.file,))
+ trk_id = rows.fetchone()[0]
+ for genre in track.genres:
+ # add genre
+ gen_id = self.get_genre(genre)
+ connection.execute("""INSERT INTO tracks_genres (track, genre)
+ VALUES (?, ?)""", (trk_id, gen_id))
+
def add_history(self, track, date=None):
"""Record last play date of track (ie. not a real exhautive play history).
:param track sima.lib.track.Track: track to add to history"""
connection.execute("UPDATE history SET last_play = ? "
" WHERE track = ?", (date, track_id,))
connection.commit()
- self.close_database_connection(connection)
+ connection.close()
def purge_history(self, duration=__HIST_DURATION__):
"""Remove old entries in history
connection = self.get_database_connection()
connection.execute("DELETE FROM history WHERE last_play"
" < datetime('now', '-%i hours')" % duration)
+ connection.execute('VACUUM')
connection.commit()
- self.close_database_connection(connection)
+ connection.close()
- def get_history(self, duration=__HIST_DURATION__):
+ def fetch_albums_history(self, needle=None, duration=__HIST_DURATION__):
+ """
+ :param sima.lib.meta.Artist needle: When specified, returns albums history for this artist.
+ """
date = datetime.utcnow() - timedelta(hours=duration)
connection = self.get_database_connection()
connection.row_factory = sqlite3.Row
rows = connection.execute("""
- SELECT tracks.title, tracks.file, artists.name AS artist,
- albumartists.name AS albumartist,
- artists.mbid as musicbrainz_artistid,
- albums.name AS album,
- albums.mbid AS musicbrainz_albumid,
- tracks.mbid as musicbrainz_trackid
+ SELECT albums.name AS name,
+ albums.mbid as mbid,
+ artists.name as artist,
+ artists.mbid as artist_mbib
FROM history
JOIN tracks ON history.track = tracks.id
- LEFT OUTER JOIN artists ON tracks.artist = artists.id
- LEFT OUTER JOIN albumartists ON tracks.albumartist = albumartists.id
LEFT OUTER JOIN albums ON tracks.album = albums.id
- WHERE history.last_play > ?
+ LEFT OUTER JOIN artists ON tracks.artist = artists.id
+ WHERE history.last_play > ? AND albums.name NOT NULL AND artists.name NOT NULL
ORDER BY history.last_play DESC""", (date.isoformat(' '),))
hist = list()
+ for row in rows:
+ vals = dict(row)
+ artist = Artist(name=vals.pop('artist'),
+ mbid=vals.pop('artist_mbib'))
+ if needle:
+ if needle != artist:
+ continue
+ album = Album(**vals, artist=artist)
+ if hist and hist[-1] == album:
+ # remove consecutive dupes
+ continue
+ hist.append(album)
+ connection.close()
+ return hist
+
+ def fetch_artists_history(self, needle=None, duration=__HIST_DURATION__):
+ """Returns a list of Artist objects
+ :param sima.lib.meta.Artist|sima.lib.meta.MetaContainer needle: When specified, returns history for this artist, it's actually testing the artist presence in history.
+ :param sima.lib.meta.MetaContainer needle: When specified, returns history for these artists only
+ """
+ date = datetime.utcnow() - timedelta(hours=duration)
+ connection = self.get_database_connection()
+ connection.row_factory = sqlite3.Row
+ rows = connection.execute("""
+ SELECT artists.name AS name,
+ artists.mbid as mbid
+ FROM history
+ JOIN tracks ON history.track = tracks.id
+ LEFT OUTER JOIN artists ON tracks.artist = artists.id
+ WHERE history.last_play > ? AND artists.name NOT NULL
+ ORDER BY history.last_play DESC""", (date.isoformat(' '),))
+ last = deque(maxlen=1)
+ hist = list()
+ for row in rows:
+ artist = Artist(**row)
+ if last and last[0] == artist: # remove consecutive dupes
+ continue
+ last.append(artist)
+ if needle and isinstance(needle, (Artist, str)):
+ if needle == artist:
+ hist.append(artist) # No need to go further
+ break
+ continue
+ elif needle and getattr(needle, '__contains__'):
+ if artist in needle:
+ hist.append(artist) # No need to go further
+ continue
+ hist.append(artist)
+ connection.close()
+ return hist
+
+ def fetch_genres_history(self, duration=__HIST_DURATION__, limit=20):
+ date = datetime.utcnow() - timedelta(hours=duration)
+ connection = self.get_database_connection()
+ rows = connection.execute("""
+ SELECT genres.name, artists.name
+ FROM history
+ JOIN tracks ON history.track = tracks.id
+ LEFT OUTER JOIN tracks_genres ON tracks_genres.track = tracks.id
+ LEFT OUTER JOIN artists ON tracks.artist = artists.id
+ LEFT OUTER JOIN genres ON genres.id = tracks_genres.genre
+ WHERE history.last_play > ?
+ ORDER BY history.last_play DESC
+ """, (date.isoformat(' '),))
+ genres = list()
+ for row in rows:
+ genres.append(row)
+ if len({g[0] for g in genres}) >= limit:
+ break
+ connection.close()
+ return genres
+
+ def fetch_history(self, artist=None, duration=__HIST_DURATION__):
+ """Fetches tracks history, more recent first
+ :param sima.lib.meta.Artist artist: limit history to this artist
+ :param int duration: How long ago to fetch history from
+ """
+ date = datetime.utcnow() - timedelta(hours=duration)
+ connection = self.get_database_connection()
+ connection.row_factory = sqlite3.Row
+ sql = """
+ SELECT tracks.title, tracks.file, artists.name AS artist,
+ albumartists.name AS albumartist,
+ artists.mbid as musicbrainz_artistid,
+ albums.name AS album,
+ albums.mbid AS musicbrainz_albumid,
+ tracks.mbid as musicbrainz_trackid
+ FROM history
+ JOIN tracks ON history.track = tracks.id
+ LEFT OUTER JOIN artists ON tracks.artist = artists.id
+ LEFT OUTER JOIN albumartists ON tracks.albumartist = albumartists.id
+ LEFT OUTER JOIN albums ON tracks.album = albums.id
+ WHERE history.last_play > ?
+ """
+ if artist:
+ if artist.mbid:
+ rows = connection.execute(sql+"""
+ AND artists.mbid = ?
+ ORDER BY history.last_play DESC""",
+ (date.isoformat(' '), artist.mbid))
+ else:
+ rows = connection.execute(sql+"""
+ AND artists.name = ?
+ ORDER BY history.last_play DESC""",
+ (date.isoformat(' '), artist.name))
+ else:
+ rows = connection.execute(sql+'ORDER BY history.last_play DESC',
+ (date.isoformat(' '),))
+ hist = list()
for row in rows:
hist.append(Track(**row))
connection.close()
return hist
+ def get_bl_track(self, track, with_connection=None, add=True):
+ """Add a track to blocklist
+ :param sima.lib.track.Track track: Track object to add to blocklist
+ :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one
+ :param bool add: Default is to add a new record, set to False to fetch associated record"""
+ if with_connection:
+ connection = with_connection
+ else:
+ connection = self.get_database_connection()
+ track_id = self.get_track(track, with_connection=connection, add=True)
+ rows = connection.execute(
+ "SELECT id FROM blocklist WHERE track = ?", (track_id,))
+ if not rows.fetchone():
+ if not add:
+ return None
+ connection.execute('INSERT INTO blocklist (track) VALUES (?)',
+ (track_id,))
+ connection.commit()
+ rows = connection.execute(
+ "SELECT id FROM blocklist WHERE track = ?", (track_id,))
+ bl = rows.fetchone()[0]
+ if not with_connection:
+ connection.close()
+ return bl
+
+ def get_bl_album(self, album, with_connection=None, add=True):
+ """Add an album to blocklist
+ :param sima.lib.meta.Album: Album object to add to blocklist
+ :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one
+ :param bool add: Default is to add a new record, set to False to fetch associated record"""
+ if with_connection:
+ connection = with_connection
+ else:
+ connection = self.get_database_connection()
+ album_id = self.get_album(album, with_connection=connection, add=True)
+ rows = connection.execute(
+ "SELECT id FROM blocklist WHERE album = ?", (album_id,))
+ if not rows.fetchone():
+ if not add:
+ return None
+ connection.execute('INSERT INTO blocklist (album) VALUES (?)',
+ (album_id,))
+ connection.commit()
+ rows = connection.execute(
+ "SELECT id FROM blocklist WHERE album = ?", (album_id,))
+ bl = rows.fetchone()[0]
+ if not with_connection:
+ connection.close()
+ return bl
+
+ def get_bl_artist(self, artist, with_connection=None, add=True):
+ """Add an artist to blocklist
+ :param sima.lib.meta.Artist: Artist object to add to blocklist
+ :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one
+ :param bool add: Default is to add a new record, set to False to fetch associated record"""
+ if with_connection:
+ connection = with_connection
+ else:
+ connection = self.get_database_connection()
+ artist_id = self.get_artist(artist, with_connection=connection, add=True)
+ rows = connection.execute(
+ "SELECT id FROM blocklist WHERE artist = ?", (artist_id,))
+ if not rows.fetchone():
+ if not add:
+ return None
+ connection.execute('INSERT INTO blocklist (artist) VALUES (?)',
+ (artist_id,))
+ connection.commit()
+ rows = connection.execute(
+ "SELECT id FROM blocklist WHERE artist = ?", (artist_id,))
+ bl = rows.fetchone()[0]
+ if not with_connection:
+ connection.close()
+ return bl
+
+ def delete_bl(self, track=None, album=None, artist=None):
+ if not (track or album or artist):
+ return
+ connection = self.get_database_connection()
+ blid = None
+ if track:
+ blid = self.get_bl_track(track, with_connection=connection)
+ if album:
+ blid = self.get_bl_album(album, with_connection=connection)
+ if artist:
+ blid = self.get_bl_artist(artist, with_connection=connection)
+ if not blid:
+ return
+ self._remove_blocklist_id(blid, with_connection=connection)
+ connection.close()
-def main():
- db = SimaDB('/dev/shm/test.sqlite')
- db.create_db()
# VIM MODLINE
# vim: ai ts=4 sw=4 sts=4 expandtab fileencoding=utf8