#
#
"""SQlite database library
-
-https://stackoverflow.com/questions/62818662/sqlite-foreign-key-reverse-cascade-delete
"""
__DB_VERSION__ = 4
self._db_path, isolation_level=None)
return connection
- def close_database_connection(self, connection):
- """Close the database connection."""
- connection.close()
-
def create_db(self):
""" Set up a database
"""
'FOREIGN KEY(albumartist) REFERENCES albumartists(id))')
connection.execute( # HISTORY
'CREATE TABLE IF NOT EXISTS history (id INTEGER PRIMARY KEY, '
- 'last_play TIMESTAMP, track integer, '
+ 'last_play TIMESTAMP, track INTEGER, '
'FOREIGN KEY(track) REFERENCES tracks(id))')
connection.execute( # BLOCKLIST
'CREATE TABLE IF NOT EXISTS blocklist (id INTEGER PRIMARY KEY, '
'FOREIGN KEY(artist) REFERENCES artists(id), '
'FOREIGN KEY(album) REFERENCES albums(id), '
'FOREIGN KEY(track) REFERENCES tracks(id))')
+ connection.execute( # Genres (Many-to-many)
+ 'CREATE TABLE IF NOT EXISTS genres '
+ '(id INTEGER PRIMARY KEY, name VARCHAR(100))')
+ connection.execute( # Junction Genres Tracks
+ """CREATE TABLE IF NOT EXISTS tracks_genres
+ ( track INTEGER, genre INTEGER,
+ FOREIGN KEY(track) REFERENCES tracks(id)
+ FOREIGN KEY(genre) REFERENCES genres(id))""")
# Create cleanup triggers:
- # DELETE history → Tracks table
+ # DELETE history → Tracks / Tracks_genres tables
connection.execute('''
CREATE TRIGGER IF NOT EXISTS del_history_cleanup_tracks
AFTER DELETE ON history
(SELECT count(*) FROM blocklist WHERE track=old.track) = 0)
BEGIN
DELETE FROM tracks WHERE id = old.track;
+ DELETE FROM tracks_genres WHERE track = old.track;
+ END;
+ ''')
+ # DELETE Tracks_Genres → Genres table
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_tracks_genres_cleanup_genres
+ AFTER DELETE ON tracks_genres
+ WHEN ((SELECT count(*) FROM tracks_genres WHERE genre=old.genre) = 0)
+ BEGIN
+ DELETE FROM genres WHERE id = old.genre;
END;
''')
# DELETE Tracks → Artists table
DELETE FROM albums WHERE id = old.album;
END;
''')
- self.close_database_connection(connection)
+ connection.close()
def drop_all(self):
connection = self.get_database_connection()
' WHERE blocklist.id = ?', (blid,))
connection.commit()
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
def _get_album(self, album, connection):
if album.mbid:
rows = self._get_album(album, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not add:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return None
connection.execute(
"INSERT INTO albums (name, mbid) VALUES (?, ?)",
rows = self._get_album(album, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
- print('damned: %s' % album.mbid)
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return None
def _get_albumartist(self, artist, connection):
rows = self._get_albumartist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not add:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return None
connection.execute(
"INSERT INTO albumartists (name, mbid) VALUES (?, ?)",
rows = self._get_albumartist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
def _get_artist(self, artist, connection):
if artist.mbid:
rows = self._get_artist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not add:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return None
connection.execute(
"INSERT INTO artists (name, mbid) VALUES (?, ?)",
rows = self._get_artist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
+
+ def get_genre(self, genre, with_connection=None, add=True):
+ """get genre from the database.
+ if not in database insert new entry.
+
+ :param str genre: genre as a string
+ :param sqlite3.Connection with_connection: SQLite connection
+ """
+ if with_connection:
+ connection = with_connection
+ else:
+ connection = self.get_database_connection()
+ rows = connection.execute(
+ "SELECT id FROM genres WHERE name = ?", (genre,))
+ for row in rows:
+ if not with_connection:
+ connection.close()
+ return row[0]
+ if not add:
+ if not with_connection:
+ connection.close()
+ return None
+ connection.execute(
+ "INSERT INTO genres (name) VALUES (?)", (genre,))
+ connection.commit()
+ rows = connection.execute(
+ "SELECT id FROM genres WHERE name = ?", (genre,))
+ for row in rows:
+ if not with_connection:
+ connection.close()
+ return row[0]
def get_track(self, track, with_connection=None, add=True):
- """Get a track from Tracks table, add if not existing,
- Attention: use Track() object!!
- if not in database insert new entry."""
+ """Get a track id from Tracks table, add if not existing,
+ :param sima.lib.track.Track track: track to use
+ :param bool add: add non existing track to database"""
if not track.file:
raise SimaDBError('Got a track with no file attribute: %r' % track)
if with_connection:
"SELECT * FROM tracks WHERE file = ?", (track.file,))
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not add: # Not adding non existing track
+ connection.close()
return None
# Get an artist record or None
if track.artist:
(art_id, albart_id, alb_id, track.title, track.musicbrainz_trackid,
track.file))
connection.commit()
+ # Add track id to junction tables
+ self._add_tracks_genres(track, connection)
rows = connection.execute(
"SELECT id FROM tracks WHERE file = ?", (track.file,))
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not with_connection:
- connection.commit()
- self.close_database_connection(connection)
+ connection.close()
return None
+ def _add_tracks_genres(self, track, connection):
+ if not track.genres:
+ return None
+ rows = connection.execute(
+ "SELECT id FROM tracks WHERE file = ?", (track.file,))
+ trk_id = rows.fetchone()[0]
+ for genre in track.genres:
+ # add genre
+ gen_id = self.get_genre(genre)
+ connection.execute("""INSERT INTO tracks_genres (track, genre)
+ VALUES (?, ?)""", (trk_id, gen_id))
+
def add_history(self, track, date=None):
"""Record last play date of track (ie. not a real exhautive play history).
:param track sima.lib.track.Track: track to add to history"""
connection.execute("UPDATE history SET last_play = ? "
" WHERE track = ?", (date, track_id,))
connection.commit()
- self.close_database_connection(connection)
+ connection.close()
def purge_history(self, duration=__HIST_DURATION__):
"""Remove old entries in history
" < datetime('now', '-%i hours')" % duration)
connection.execute('VACUUM')
connection.commit()
- self.close_database_connection(connection)
+ connection.close()
def fetch_albums_history(self, needle=None, duration=__HIST_DURATION__):
"""
hist.append(artist) # No need to go further
continue
hist.append(artist)
+ connection.close()
return hist
+ def fetch_genres_history(self, duration=__HIST_DURATION__, limit=20):
+ date = datetime.utcnow() - timedelta(hours=duration)
+ connection = self.get_database_connection()
+ rows = connection.execute("""
+ SELECT genres.name, artists.name
+ FROM history
+ JOIN tracks ON history.track = tracks.id
+ LEFT OUTER JOIN tracks_genres ON tracks_genres.track = tracks.id
+ LEFT OUTER JOIN artists ON tracks.artist = artists.id
+ LEFT OUTER JOIN genres ON genres.id = tracks_genres.genre
+ WHERE history.last_play > ?
+ ORDER BY history.last_play DESC
+ """, (date.isoformat(' '),))
+ genres = list()
+ for row in rows:
+ genres.append(row)
+ if len({g[0] for g in genres}) >= limit:
+ break
+ connection.close()
+ return genres
+
def fetch_history(self, artist=None, duration=__HIST_DURATION__):
"""Fetches tracks history, more recent first
:param sima.lib.meta.Artist artist: limit history to this artist
connection.commit()
rows = connection.execute(
"SELECT id FROM blocklist WHERE track = ?", (track_id,))
- return rows.fetchone()[0]
+ bl = rows.fetchone()[0]
+ if not with_connection:
+ connection.close()
+ return bl
def get_bl_album(self, album, with_connection=None, add=True):
"""Add an album to blocklist
connection.commit()
rows = connection.execute(
"SELECT id FROM blocklist WHERE album = ?", (album_id,))
- return rows.fetchone()[0]
+ bl = rows.fetchone()[0]
+ if not with_connection:
+ connection.close()
+ return bl
def get_bl_artist(self, artist, with_connection=None, add=True):
"""Add an artist to blocklist
connection.commit()
rows = connection.execute(
"SELECT id FROM blocklist WHERE artist = ?", (artist_id,))
- return rows.fetchone()[0]
+ bl = rows.fetchone()[0]
+ if not with_connection:
+ connection.close()
+ return bl
def delete_bl(self, track=None, album=None, artist=None):
if not (track or album or artist):
if not blid:
return
self._remove_blocklist_id(blid, with_connection=connection)
+ connection.close()
# VIM MODLINE