From: kaliko Date: Wed, 28 Apr 2021 11:06:36 +0000 (+0200) Subject: Rewrote simadb X-Git-Tag: 0.18.0~72 X-Git-Url: https://git.kaliko.me/?a=commitdiff_plain;h=9be53ff0c7f6dfeaa62bf0964f7d3a477c10499e;p=mpd-sima.git Rewrote simadb --- diff --git a/sima/lib/db.py b/sima/lib/db.py new file mode 100644 index 0000000..6d7836a --- /dev/null +++ b/sima/lib/db.py @@ -0,0 +1,358 @@ +# Copyright (c) 2009-2013, 2019-2021 kaliko +# +# This file is part of sima +# +# sima is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# sima is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with sima. If not, see . +# +# +"""SQlite database library + +https://stackoverflow.com/questions/62818662/sqlite-foreign-key-reverse-cascade-delete +""" + +__DB_VERSION__ = 4 +__HIST_DURATION__ = int(30 * 24) # in hours + +import sqlite3 + +from datetime import (datetime, timedelta) + +from sima.lib.meta import Artist, Album +from sima.lib.track import Track + + +class SimaDB: + "SQLite management" + + def __init__(self, db_path=None): + self._db_path = db_path + + def get_database_connection(self): + """get database reference""" + connection = sqlite3.connect( + self._db_path, isolation_level=None) + return connection + + def close_database_connection(self, connection): + """Close the database connection.""" + connection.close() + + def create_db(self): + """ Set up a database + """ + connection = self.get_database_connection() + connection.execute( + 'CREATE TABLE IF NOT EXISTS db_info' + ' (name CHAR(50), value CHAR(50))') + connection.execute('''INSERT INTO db_info (name, value) SELECT ?, ? + WHERE NOT EXISTS + ( SELECT 1 FROM db_info WHERE name = ? )''', + ('DB Version', __DB_VERSION__, 'DB Version')) + connection.execute( # ARTISTS + 'CREATE TABLE IF NOT EXISTS artists (id INTEGER PRIMARY KEY, ' + 'name VARCHAR(100), mbid CHAR(36))') + connection.execute( # ALBUMS + 'CREATE TABLE IF NOT EXISTS albums (id INTEGER PRIMARY KEY, ' + 'name VARCHAR(100), mbid CHAR(36))') + connection.execute( # ALBUMARTISTS + 'CREATE TABLE IF NOT EXISTS albumartists (id INTEGER PRIMARY KEY, ' + 'name VARCHAR(100), mbid CHAR(36))') + connection.execute( # TRACKS + 'CREATE TABLE IF NOT EXISTS tracks (id INTEGER PRIMARY KEY, ' + 'title VARCHAR(100), artist INTEGER, ' + 'album INTEGER, albumartist INTEGER, ' + 'file VARCHAR(500), mbid CHAR(36), ' + 'FOREIGN KEY(artist) REFERENCES artists(id), ' + 'FOREIGN KEY(album) REFERENCES albums(id), ' + 'FOREIGN KEY(albumartist) REFERENCES albumartists(id))') + connection.execute( # HISTORY + 'CREATE TABLE IF NOT EXISTS history (id INTEGER PRIMARY KEY, ' + 'last_play TIMESTAMP, track integer, ' + 'FOREIGN KEY(track) REFERENCES tracks(id))') + # Create cleanup triggers: + # Tracks table + connection.execute(''' + CREATE TRIGGER IF NOT EXISTS cleanup_tracks + AFTER DELETE ON history + WHEN ((SELECT count(*) FROM history WHERE track=old.id) = 0) + BEGIN + DELETE FROM tracks WHERE id = old.id; + END; + ''') + # Artists table + connection.execute(''' + CREATE TRIGGER IF NOT EXISTS cleanup_artists + AFTER DELETE ON tracks + WHEN ((SELECT count(*) FROM tracks WHERE artist=old.artist) = 0) + BEGIN + DELETE FROM artists WHERE id = old.artist; + END; + ''') + # Albums table + connection.execute(''' + CREATE TRIGGER IF NOT EXISTS cleanup_albums + AFTER DELETE ON tracks + WHEN ((SELECT count(*) FROM tracks WHERE album=old.album) = 0) + BEGIN + DELETE FROM albums WHERE id = old.album; + END; + ''') + # AlbumArtists table + connection.execute(''' + CREATE TRIGGER IF NOT EXISTS cleanup_albumartists + AFTER DELETE ON tracks + WHEN ((SELECT count(*) FROM tracks WHERE albumartist=old.albumartist) = 0) + BEGIN + DELETE FROM albumartists WHERE id = old.albumartist; + END; + ''') + self.close_database_connection(connection) + + def _get_album(self, album, connection): + if album.mbid: + return connection.execute( + "SELECT id FROM albums WHERE mbid = ?", + (album.mbid,)) + else: + return connection.execute( + "SELECT id FROM albums WHERE name = ? AND mbid IS NULL", + (album.name,)) + + def get_album(self, album, with_connection=None, add=True): + """get album information from the database. + if not in database insert new entry. + + :param sima.lib.meta.Album album: album objet + :param sqlite3.Connection with_connection: SQLite connection + """ + if with_connection: + connection = with_connection + else: + connection = self.get_database_connection() + rows = self._get_album(album, connection) + for row in rows: + if not with_connection: + self.close_database_connection(connection) + return row[0] + if not add: + if not with_connection: + self.close_database_connection(connection) + return None + connection.execute( + "INSERT INTO albums (name, mbid) VALUES (?, ?)", + (album.name, album.mbid)) + connection.commit() + rows = self._get_album(album, connection) + for row in rows: + if not with_connection: + self.close_database_connection(connection) + return row[0] + print('damned: %s' % album.mbid) + if not with_connection: + self.close_database_connection(connection) + return None + + def _get_albumartist(self, artist, connection): + if artist.mbid: + return connection.execute( + "SELECT id FROM albumartists WHERE mbid = ?", + (artist.mbid,)) + else: + return connection.execute( + "SELECT id FROM albumartists WHERE name = ? AND mbid IS NULL", + (artist.name,)) + + def get_albumartist(self, artist, with_connection=None, add=True): + """get albumartist information from the database. + if not in database insert new entry. + + :param sima.lib.meta.Artist artist: artist + :param sqlite3.Connection with_connection: SQLite connection + """ + if with_connection: + connection = with_connection + else: + connection = self.get_database_connection() + rows = self._get_albumartist(artist, connection) + for row in rows: + if not with_connection: + self.close_database_connection(connection) + return row[0] + if not add: + if not with_connection: + self.close_database_connection(connection) + return None + connection.execute( + "INSERT INTO albumartists (name, mbid) VALUES (?, ?)", + (artist.name, artist.mbid)) + connection.commit() + rows = self._get_albumartist(artist, connection) + for row in rows: + if not with_connection: + self.close_database_connection(connection) + return row[0] + if not with_connection: + self.close_database_connection(connection) + + def _get_artist(self, artist, connection): + if artist.mbid: + return connection.execute( + "SELECT id FROM artists WHERE mbid = ?", + (artist.mbid,)) + else: + return connection.execute( + "SELECT id FROM artists WHERE name = ? AND mbid IS NULL", (artist.name,)) + + def get_artist(self, artist, with_connection=None, add=True): + """get artist information from the database. + if not in database insert new entry. + + :param sima.lib.meta.Artist artist: artist + :param sqlite3.Connection with_connection: SQLite connection + """ + if with_connection: + connection = with_connection + else: + connection = self.get_database_connection() + rows = self._get_artist(artist, connection) + for row in rows: + if not with_connection: + self.close_database_connection(connection) + return row[0] + if not add: + if not with_connection: + self.close_database_connection(connection) + return None + connection.execute( + "INSERT INTO artists (name, mbid) VALUES (?, ?)", + (artist.name, artist.mbid)) + connection.commit() + rows = self._get_artist(artist, connection) + for row in rows: + if not with_connection: + self.close_database_connection(connection) + return row[0] + if not with_connection: + self.close_database_connection(connection) + + def get_track(self, track, with_connection=None, add=True): + """Get a track from Tracks table, add if not existing, + Attention: use Track() object!! + if not in database insert new entry.""" + if with_connection: + connection = with_connection + else: + connection = self.get_database_connection() + rows = connection.execute( + "SELECT * FROM tracks WHERE file = ?", (track.file,)) + for row in rows: + if not with_connection: + self.close_database_connection(connection) + return row[0] + if not add: # Not adding non existing track + return None + # Get an artist record or None + if track.artist: + art = Artist(name=track.artist, mbid=track.musicbrainz_artistid) + art_id = self.get_artist(art, with_connection=connection) + else: + art_id = None + # Get an albumartist record or None + if track.albumartist: + albart = Artist(name=track.albumartist, + mbid=track.musicbrainz_albumartistid) + albart_id = self.get_albumartist(albart, with_connection=connection) + else: + albart_id = None + # Get an album record or None + if track.album: + alb = Album(name=track.album, mbid=track.musicbrainz_albumid) + alb_id = self.get_album(alb, with_connection=connection) + else: + alb_id = None + connection.execute( + """INSERT INTO tracks (artist, albumartist, album, title, mbid, file) + VALUES (?, ?, ?, ?, ?, ?)""", + (art_id, albart_id, alb_id, track.title, track.musicbrainz_trackid, + track.file)) + connection.commit() + rows = connection.execute( + "SELECT id FROM tracks WHERE file = ?", (track.file,)) + for row in rows: + if not with_connection: + self.close_database_connection(connection) + return row[0] + if not with_connection: + connection.commit() + self.close_database_connection(connection) + return None + + def add_history(self, track, date=None): + """Record last play date of track (ie. not a real exhautive play history). + :param track sima.lib.track.Track: track to add to history""" + if not date: + date = datetime.now() + connection = self.get_database_connection() + track_id = self.get_track(track, with_connection=connection) + rows = connection.execute("SELECT * FROM history WHERE track = ? ", + (track_id,)) + if not rows.fetchone(): + connection.execute("INSERT INTO history (track) VALUES (?)", + (track_id,)) + connection.execute("UPDATE history SET last_play = ? " + " WHERE track = ?", (date, track_id,)) + connection.commit() + self.close_database_connection(connection) + + def purge_history(self, duration=__HIST_DURATION__): + """Remove old entries in history + :param duration int: Purge history record older than duration in hours + (defaults to __HIST_DURATION__)""" + connection = self.get_database_connection() + connection.execute("DELETE FROM history WHERE last_play" + " < datetime('now', '-%i hours')" % duration) + connection.commit() + self.close_database_connection(connection) + + def get_history(self, duration=__HIST_DURATION__): + date = datetime.utcnow() - timedelta(hours=duration) + connection = self.get_database_connection() + connection.row_factory = sqlite3.Row + rows = connection.execute(""" + SELECT tracks.title, tracks.file, artists.name AS artist, + albumartists.name AS albumartist, + artists.mbid as musicbrainz_artistid, + albums.name AS album, + albums.mbid AS musicbrainz_albumid, + tracks.mbid as musicbrainz_trackid + FROM history + JOIN tracks ON history.track = tracks.id + LEFT OUTER JOIN artists ON tracks.artist = artists.id + LEFT OUTER JOIN albumartists ON tracks.albumartist = albumartists.id + LEFT OUTER JOIN albums ON tracks.album = albums.id + WHERE history.last_play > ? + ORDER BY history.last_play DESC""", (date.isoformat(' '),)) + hist = list() + for row in rows: + hist.append(Track(**row)) + connection.close() + return hist + + +def main(): + db = SimaDB('/dev/shm/test.sqlite') + db.create_db() + +# VIM MODLINE +# vim: ai ts=4 sw=4 sts=4 expandtab fileencoding=utf8 diff --git a/tests/test_db.py b/tests/test_db.py new file mode 100644 index 0000000..2419d44 --- /dev/null +++ b/tests/test_db.py @@ -0,0 +1,125 @@ +# coding: utf-8 + +import unittest +import os +import datetime + +from sima.lib.db import SimaDB +from sima.lib.track import Track + + +DEVOLT = { + 'album': 'Grey', + 'albumartist': 'Devolt', + 'albumartistsort': 'Devolt', + 'artist': 'Devolt', + 'date': '2011-12-01', + 'disc': '1/1', + 'file': 'music/Devolt/2011-Grey/03-Devolt - Crazy.mp3', + 'last-modified': '2012-04-02T20:48:59Z', + 'musicbrainz_albumartistid': 'd8e7e3e2-49ab-4f7c-b148-fc946d521f99', + 'musicbrainz_albumid': 'ea2ef2cf-59e1-443a-817e-9066e3e0be4b', + 'musicbrainz_artistid': 'd8e7e3e2-49ab-4f7c-b148-fc946d521f99', + 'musicbrainz_trackid': 'fabf8fc9-2ae5-49c9-8214-a839c958d872', + 'time': '220', + 'duration': '220.000', + 'title': 'Crazy', + 'track': '3/6'} + + +class Main_TestDB(unittest.TestCase): + db_file = 'file::memory:?cache=shared' + #db_file = '/dev/shm/unittest.sqlite' + + @classmethod + def setUpClass(self): + self.db = SimaDB(db_path=self.db_file) + # Maintain a connection to keep the database between test cases + self.conn = self.db.get_database_connection() + + @classmethod + def tearDownClass(self): + self.conn.close() + + +class TestDB(Main_TestDB): + + def test_00_recreation(self): + self.db.create_db() + + def test_01_add_track(self): + trk = Track(**DEVOLT) + trk_id = self.db.get_track(trk) + self.assertEqual(trk_id, self.db.get_track(trk), + 'Same track, same record') + + def test_02_history(self): + curr = datetime.datetime.utcnow() + # set records in the past to ease purging then + last = curr - datetime.timedelta(hours=1) + trk = Track(**DEVOLT) + self.db.add_history(trk, date=last) + self.db.add_history(trk, date=last) + hist = self.db.get_history() + self.assertEqual(len(hist), 1, 'same track results in a single record') + + trk_foo = Track(file="/foo/bar/baz.flac") + self.db.add_history(trk_foo, date=last) + hist = self.db.get_history() + self.assertEqual(len(hist), 2) + + self.db.add_history(trk, date=last) + hist = self.db.get_history() + self.assertEqual(len(hist), 2) + self.db.purge_history(duration=0) + hist = self.db.get_history() + self.assertEqual(len(hist), 0) + + # Controls we got history in the right order + # recent first, oldest last + hist = list() + for i in range(1, 5): # starts at 1 to ensure records are in the past + trk = Track(file=f'/foo/bar.{i}', name='{i}-baz', album='foolbum') + hist.append(trk) + last = curr - datetime.timedelta(minutes=i) + self.db.add_history(trk, date=last) + hist_records = self.db.get_history() + self.assertEqual(hist, hist_records) + self.db.purge_history(duration=0) + + def test_04_triggers(self): + self.db.purge_history(duration=0) + curr = datetime.datetime.utcnow() + tracks_ids = list() + # Set 4 records, same album + for i in range(1, 6): # starts at 1 to ensure records are in the past + trk = Track(file=f'/foo/{i}', name=f'{i}', artist='fooart', + albumartist='fooalbart', album='foolbum',) + tracks_ids.append(self.db.get_track(trk)) # Add track, save its DB id + # set records in the past to ease purging then + last = curr - datetime.timedelta(minutes=i) + self.db.add_history(trk, date=last) # Add to history + conn = self.db.get_database_connection() + # Add another track not related (not same album) + track = Track(file='/baz/bar.baz', name='baz', artist='fooart', + albumartist='not-same', album='not-same',) + self.db.get_track(track) + # for tid in tracks_ids: + for tid in tracks_ids[:-1]: + # Delete lastest record + conn.execute('DELETE FROM history WHERE history.track = ?', (tid,)) + c = conn.execute('SELECT albums.name FROM albums;') + # There are still albums records (still a history using it) + self.assertIn((trk.album,), c.fetchall()) + # purging last entry in history or album == trk.album + c.execute('DELETE FROM history WHERE history.track = ?', + (tracks_ids[-1],)) + # triggers purge other tables if possible + c.execute('SELECT albums.name FROM albums;') + albums = c.fetchall() + self.assertNotIn(('foolbum',), albums) + conn.close() + + +# VIM MODLINE +# vim: ai ts=4 sw=4 sts=4 expandtab fileencoding=utf8