#
#
"""SQlite database library
-
-https://stackoverflow.com/questions/62818662/sqlite-foreign-key-reverse-cascade-delete
"""
__DB_VERSION__ = 4
import sqlite3
+from collections import deque
from datetime import (datetime, timedelta)
from sima.lib.meta import Artist, Album
self._db_path, isolation_level=None)
return connection
- def close_database_connection(self, connection):
- """Close the database connection."""
- connection.close()
-
def create_db(self):
""" Set up a database
"""
'FOREIGN KEY(albumartist) REFERENCES albumartists(id))')
connection.execute( # HISTORY
'CREATE TABLE IF NOT EXISTS history (id INTEGER PRIMARY KEY, '
- 'last_play TIMESTAMP, track integer, '
+ 'last_play TIMESTAMP, track INTEGER, '
'FOREIGN KEY(track) REFERENCES tracks(id))')
connection.execute( # BLOCKLIST
'CREATE TABLE IF NOT EXISTS blocklist (id INTEGER PRIMARY KEY, '
'FOREIGN KEY(artist) REFERENCES artists(id), '
'FOREIGN KEY(album) REFERENCES albums(id), '
'FOREIGN KEY(track) REFERENCES tracks(id))')
+ connection.execute( # Genres (Many-to-many)
+ 'CREATE TABLE IF NOT EXISTS genres '
+ '(id INTEGER PRIMARY KEY, name VARCHAR(100))')
+ connection.execute( # Junction Genres Tracks
+ """CREATE TABLE IF NOT EXISTS tracks_genres
+ ( track INTEGER, genre INTEGER,
+ FOREIGN KEY(track) REFERENCES tracks(id)
+ FOREIGN KEY(genre) REFERENCES genres(id))""")
# Create cleanup triggers:
- # DELETE history → Tracks table
+ # DELETE history → Tracks / Tracks_genres tables
connection.execute('''
CREATE TRIGGER IF NOT EXISTS del_history_cleanup_tracks
AFTER DELETE ON history
(SELECT count(*) FROM blocklist WHERE track=old.track) = 0)
BEGIN
DELETE FROM tracks WHERE id = old.track;
+ DELETE FROM tracks_genres WHERE track = old.track;
+ END;
+ ''')
+ # DELETE Tracks_Genres → Genres table
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_tracks_genres_cleanup_genres
+ AFTER DELETE ON tracks_genres
+ WHEN ((SELECT count(*) FROM tracks_genres WHERE genre=old.genre) = 0)
+ BEGIN
+ DELETE FROM genres WHERE id = old.genre;
END;
''')
# DELETE Tracks → Artists table
DELETE FROM albums WHERE id = old.album;
END;
''')
- self.close_database_connection(connection)
+ connection.close()
def drop_all(self):
connection = self.get_database_connection()
' WHERE blocklist.id = ?', (blid,))
connection.commit()
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
def _get_album(self, album, connection):
if album.mbid:
rows = self._get_album(album, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not add:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return None
connection.execute(
"INSERT INTO albums (name, mbid) VALUES (?, ?)",
rows = self._get_album(album, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
- print('damned: %s' % album.mbid)
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return None
def _get_albumartist(self, artist, connection):
rows = self._get_albumartist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not add:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return None
connection.execute(
"INSERT INTO albumartists (name, mbid) VALUES (?, ?)",
rows = self._get_albumartist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
def _get_artist(self, artist, connection):
if artist.mbid:
rows = self._get_artist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not add:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return None
connection.execute(
"INSERT INTO artists (name, mbid) VALUES (?, ?)",
rows = self._get_artist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
+
+ def get_genre(self, genre, with_connection=None, add=True):
+ """get genre from the database.
+ if not in database insert new entry.
+
+ :param str genre: genre as a string
+ :param sqlite3.Connection with_connection: SQLite connection
+ """
+ if with_connection:
+ connection = with_connection
+ else:
+ connection = self.get_database_connection()
+ rows = connection.execute(
+ "SELECT id FROM genres WHERE name = ?", (genre,))
+ for row in rows:
+ if not with_connection:
+ connection.close()
+ return row[0]
+ if not add:
+ if not with_connection:
+ connection.close()
+ return None
+ connection.execute(
+ "INSERT INTO genres (name) VALUES (?)", (genre,))
+ connection.commit()
+ rows = connection.execute(
+ "SELECT id FROM genres WHERE name = ?", (genre,))
+ for row in rows:
+ if not with_connection:
+ connection.close()
+ return row[0]
def get_track(self, track, with_connection=None, add=True):
- """Get a track from Tracks table, add if not existing,
- Attention: use Track() object!!
- if not in database insert new entry."""
+ """Get a track id from Tracks table, add if not existing,
+ :param sima.lib.track.Track track: track to use
+ :param bool add: add non existing track to database"""
if not track.file:
raise SimaDBError('Got a track with no file attribute: %r' % track)
if with_connection:
"SELECT * FROM tracks WHERE file = ?", (track.file,))
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not add: # Not adding non existing track
+ connection.close()
return None
# Get an artist record or None
if track.artist:
(art_id, albart_id, alb_id, track.title, track.musicbrainz_trackid,
track.file))
connection.commit()
+ # Add track id to junction tables
+ self._add_tracks_genres(track, connection)
rows = connection.execute(
"SELECT id FROM tracks WHERE file = ?", (track.file,))
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
return row[0]
if not with_connection:
- connection.commit()
- self.close_database_connection(connection)
+ connection.close()
return None
+ def _add_tracks_genres(self, track, connection):
+ if not track.genres:
+ return None
+ rows = connection.execute(
+ "SELECT id FROM tracks WHERE file = ?", (track.file,))
+ trk_id = rows.fetchone()[0]
+ for genre in track.genres:
+ # add genre
+ gen_id = self.get_genre(genre)
+ connection.execute("""INSERT INTO tracks_genres (track, genre)
+ VALUES (?, ?)""", (trk_id, gen_id))
+
def add_history(self, track, date=None):
"""Record last play date of track (ie. not a real exhautive play history).
:param track sima.lib.track.Track: track to add to history"""
connection.execute("UPDATE history SET last_play = ? "
" WHERE track = ?", (date, track_id,))
connection.commit()
- self.close_database_connection(connection)
+ connection.close()
def purge_history(self, duration=__HIST_DURATION__):
"""Remove old entries in history
connection = self.get_database_connection()
connection.execute("DELETE FROM history WHERE last_play"
" < datetime('now', '-%i hours')" % duration)
+ connection.execute('VACUUM')
connection.commit()
- self.close_database_connection(connection)
+ connection.close()
- def fetch_artists_history(self, duration=__HIST_DURATION__):
+ def fetch_albums_history(self, needle=None, duration=__HIST_DURATION__):
+ """
+ :param sima.lib.meta.Artist needle: When specified, returns albums history for this artist.
+ """
date = datetime.utcnow() - timedelta(hours=duration)
connection = self.get_database_connection()
connection.row_factory = sqlite3.Row
rows = connection.execute("""
- SELECT artists.name AS name,
- artists.mbid as mbid
+ SELECT albums.name AS name,
+ albums.mbid as mbid,
+ artists.name as artist,
+ artists.mbid as artist_mbib
FROM history
JOIN tracks ON history.track = tracks.id
+ LEFT OUTER JOIN albums ON tracks.album = albums.id
LEFT OUTER JOIN artists ON tracks.artist = artists.id
- WHERE history.last_play > ?
+ WHERE history.last_play > ? AND albums.name NOT NULL AND artists.name NOT NULL
ORDER BY history.last_play DESC""", (date.isoformat(' '),))
hist = list()
for row in rows:
- if hist and hist[-1] == Album(**row): # remove consecutive dupes
+ vals = dict(row)
+ artist = Artist(name=vals.pop('artist'),
+ mbid=vals.pop('artist_mbib'))
+ if needle:
+ if needle != artist:
+ continue
+ album = Album(**vals, artist=artist)
+ if hist and hist[-1] == album:
+ # remove consecutive dupes
continue
- hist.append(Album(**row))
+ hist.append(album)
connection.close()
return hist
- def fetch_history(self, duration=__HIST_DURATION__):
- """Fetches tracks history, more recent first
- :param int duration: How long ago to fetch history from
+ def fetch_artists_history(self, needle=None, duration=__HIST_DURATION__):
+ """Returns a list of Artist objects
+ :param sima.lib.meta.Artist|sima.lib.meta.MetaContainer needle: When specified, returns history for this artist, it's actually testing the artist presence in history.
+ :param sima.lib.meta.MetaContainer needle: When specified, returns history for these artists only
"""
date = datetime.utcnow() - timedelta(hours=duration)
connection = self.get_database_connection()
connection.row_factory = sqlite3.Row
rows = connection.execute("""
- SELECT tracks.title, tracks.file, artists.name AS artist,
- albumartists.name AS albumartist,
- artists.mbid as musicbrainz_artistid,
- albums.name AS album,
- albums.mbid AS musicbrainz_albumid,
- tracks.mbid as musicbrainz_trackid
+ SELECT artists.name AS name,
+ artists.mbid as mbid
FROM history
JOIN tracks ON history.track = tracks.id
LEFT OUTER JOIN artists ON tracks.artist = artists.id
- LEFT OUTER JOIN albumartists ON tracks.albumartist = albumartists.id
- LEFT OUTER JOIN albums ON tracks.album = albums.id
- WHERE history.last_play > ?
+ WHERE history.last_play > ? AND artists.name NOT NULL
ORDER BY history.last_play DESC""", (date.isoformat(' '),))
+ last = deque(maxlen=1)
+ hist = list()
+ for row in rows:
+ artist = Artist(**row)
+ if last and last[0] == artist: # remove consecutive dupes
+ continue
+ last.append(artist)
+ if needle and isinstance(needle, (Artist, str)):
+ if needle == artist:
+ hist.append(artist) # No need to go further
+ break
+ continue
+ elif needle and getattr(needle, '__contains__'):
+ if artist in needle:
+ hist.append(artist) # No need to go further
+ continue
+ hist.append(artist)
+ connection.close()
+ return hist
+
+ def fetch_genres_history(self, duration=__HIST_DURATION__, limit=20):
+ date = datetime.utcnow() - timedelta(hours=duration)
+ connection = self.get_database_connection()
+ rows = connection.execute("""
+ SELECT genres.name, artists.name
+ FROM history
+ JOIN tracks ON history.track = tracks.id
+ LEFT OUTER JOIN tracks_genres ON tracks_genres.track = tracks.id
+ LEFT OUTER JOIN artists ON tracks.artist = artists.id
+ LEFT OUTER JOIN genres ON genres.id = tracks_genres.genre
+ WHERE history.last_play > ?
+ ORDER BY history.last_play DESC
+ """, (date.isoformat(' '),))
+ genres = list()
+ for row in rows:
+ genres.append(row)
+ if len({g[0] for g in genres}) >= limit:
+ break
+ connection.close()
+ return genres
+
+ def fetch_history(self, artist=None, duration=__HIST_DURATION__):
+ """Fetches tracks history, more recent first
+ :param sima.lib.meta.Artist artist: limit history to this artist
+ :param int duration: How long ago to fetch history from
+ """
+ date = datetime.utcnow() - timedelta(hours=duration)
+ connection = self.get_database_connection()
+ connection.row_factory = sqlite3.Row
+ sql = """
+ SELECT tracks.title, tracks.file, artists.name AS artist,
+ albumartists.name AS albumartist,
+ artists.mbid as musicbrainz_artistid,
+ albums.name AS album,
+ albums.mbid AS musicbrainz_albumid,
+ tracks.mbid as musicbrainz_trackid
+ FROM history
+ JOIN tracks ON history.track = tracks.id
+ LEFT OUTER JOIN artists ON tracks.artist = artists.id
+ LEFT OUTER JOIN albumartists ON tracks.albumartist = albumartists.id
+ LEFT OUTER JOIN albums ON tracks.album = albums.id
+ WHERE history.last_play > ?
+ """
+ if artist:
+ if artist.mbid:
+ rows = connection.execute(sql+"""
+ AND artists.mbid = ?
+ ORDER BY history.last_play DESC""",
+ (date.isoformat(' '), artist.mbid))
+ else:
+ rows = connection.execute(sql+"""
+ AND artists.name = ?
+ ORDER BY history.last_play DESC""",
+ (date.isoformat(' '), artist.name))
+ else:
+ rows = connection.execute(sql+'ORDER BY history.last_play DESC',
+ (date.isoformat(' '),))
hist = list()
for row in rows:
hist.append(Track(**row))
connection.commit()
rows = connection.execute(
"SELECT id FROM blocklist WHERE track = ?", (track_id,))
- return rows.fetchone()[0]
+ bl = rows.fetchone()[0]
+ if not with_connection:
+ connection.close()
+ return bl
def get_bl_album(self, album, with_connection=None, add=True):
"""Add an album to blocklist
connection.commit()
rows = connection.execute(
"SELECT id FROM blocklist WHERE album = ?", (album_id,))
- return rows.fetchone()[0]
+ bl = rows.fetchone()[0]
+ if not with_connection:
+ connection.close()
+ return bl
def get_bl_artist(self, artist, with_connection=None, add=True):
"""Add an artist to blocklist
connection.commit()
rows = connection.execute(
"SELECT id FROM blocklist WHERE artist = ?", (artist_id,))
- return rows.fetchone()[0]
+ bl = rows.fetchone()[0]
+ if not with_connection:
+ connection.close()
+ return bl
def delete_bl(self, track=None, album=None, artist=None):
if not (track or album or artist):
if not blid:
return
self._remove_blocklist_id(blid, with_connection=connection)
+ connection.close()
-def main():
- DEVOLT = {
- 'album': 'Grey',
- 'albumartist': 'Devolt',
- 'artist': 'Devolt',
- 'date': '2011-12-01',
- 'file': 'music/Devolt/2011-Grey/03-Devolt - Crazy.mp3',
- 'musicbrainz_albumartistid': 'd8e7e3e2-49ab-4f7c-b148-fc946d521f99',
- 'musicbrainz_albumid': 'ea2ef2cf-59e1-443a-817e-9066e3e0be4b',
- 'musicbrainz_artistid': 'd8e7e3e2-49ab-4f7c-b148-fc946d521f99',
- 'musicbrainz_trackid': 'fabf8fc9-2ae5-49c9-8214-a839c958d872',
- 'duration': '220.000',
- 'title': 'Crazy'}
- db = SimaDB('/dev/shm/test.sqlite')
- db.create_db()
- db.add_history(Track(**DEVOLT))
- DEVOLT['file'] = 'foo'
- print(db.get_bl_track(Track(**DEVOLT)))
- db.add_history(Track(**DEVOLT))
# VIM MODLINE
# vim: ai ts=4 sw=4 sts=4 expandtab fileencoding=utf8