--- /dev/null
+# Copyright (c) 2009-2013, 2019-2021 kaliko <kaliko@azylum.org>
+#
+# This file is part of sima
+#
+# sima is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# sima is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with sima. If not, see <http://www.gnu.org/licenses/>.
+#
+#
+"""SQlite database library
+
+https://stackoverflow.com/questions/62818662/sqlite-foreign-key-reverse-cascade-delete
+"""
+
+__DB_VERSION__ = 4
+__HIST_DURATION__ = int(30 * 24) # in hours
+
+import sqlite3
+
+from datetime import (datetime, timedelta)
+
+from sima.lib.meta import Artist, Album
+from sima.lib.track import Track
+
+
+class SimaDB:
+ "SQLite management"
+
+ def __init__(self, db_path=None):
+ self._db_path = db_path
+
+ def get_database_connection(self):
+ """get database reference"""
+ connection = sqlite3.connect(
+ self._db_path, isolation_level=None)
+ return connection
+
+ def close_database_connection(self, connection):
+ """Close the database connection."""
+ connection.close()
+
+ def create_db(self):
+ """ Set up a database
+ """
+ connection = self.get_database_connection()
+ connection.execute(
+ 'CREATE TABLE IF NOT EXISTS db_info'
+ ' (name CHAR(50), value CHAR(50))')
+ connection.execute('''INSERT INTO db_info (name, value) SELECT ?, ?
+ WHERE NOT EXISTS
+ ( SELECT 1 FROM db_info WHERE name = ? )''',
+ ('DB Version', __DB_VERSION__, 'DB Version'))
+ connection.execute( # ARTISTS
+ 'CREATE TABLE IF NOT EXISTS artists (id INTEGER PRIMARY KEY, '
+ 'name VARCHAR(100), mbid CHAR(36))')
+ connection.execute( # ALBUMS
+ 'CREATE TABLE IF NOT EXISTS albums (id INTEGER PRIMARY KEY, '
+ 'name VARCHAR(100), mbid CHAR(36))')
+ connection.execute( # ALBUMARTISTS
+ 'CREATE TABLE IF NOT EXISTS albumartists (id INTEGER PRIMARY KEY, '
+ 'name VARCHAR(100), mbid CHAR(36))')
+ connection.execute( # TRACKS
+ 'CREATE TABLE IF NOT EXISTS tracks (id INTEGER PRIMARY KEY, '
+ 'title VARCHAR(100), artist INTEGER, '
+ 'album INTEGER, albumartist INTEGER, '
+ 'file VARCHAR(500), mbid CHAR(36), '
+ 'FOREIGN KEY(artist) REFERENCES artists(id), '
+ 'FOREIGN KEY(album) REFERENCES albums(id), '
+ 'FOREIGN KEY(albumartist) REFERENCES albumartists(id))')
+ connection.execute( # HISTORY
+ 'CREATE TABLE IF NOT EXISTS history (id INTEGER PRIMARY KEY, '
+ 'last_play TIMESTAMP, track integer, '
+ 'FOREIGN KEY(track) REFERENCES tracks(id))')
+ # Create cleanup triggers:
+ # Tracks table
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS cleanup_tracks
+ AFTER DELETE ON history
+ WHEN ((SELECT count(*) FROM history WHERE track=old.id) = 0)
+ BEGIN
+ DELETE FROM tracks WHERE id = old.id;
+ END;
+ ''')
+ # Artists table
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS cleanup_artists
+ AFTER DELETE ON tracks
+ WHEN ((SELECT count(*) FROM tracks WHERE artist=old.artist) = 0)
+ BEGIN
+ DELETE FROM artists WHERE id = old.artist;
+ END;
+ ''')
+ # Albums table
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS cleanup_albums
+ AFTER DELETE ON tracks
+ WHEN ((SELECT count(*) FROM tracks WHERE album=old.album) = 0)
+ BEGIN
+ DELETE FROM albums WHERE id = old.album;
+ END;
+ ''')
+ # AlbumArtists table
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS cleanup_albumartists
+ AFTER DELETE ON tracks
+ WHEN ((SELECT count(*) FROM tracks WHERE albumartist=old.albumartist) = 0)
+ BEGIN
+ DELETE FROM albumartists WHERE id = old.albumartist;
+ END;
+ ''')
+ self.close_database_connection(connection)
+
+ def _get_album(self, album, connection):
+ if album.mbid:
+ return connection.execute(
+ "SELECT id FROM albums WHERE mbid = ?",
+ (album.mbid,))
+ else:
+ return connection.execute(
+ "SELECT id FROM albums WHERE name = ? AND mbid IS NULL",
+ (album.name,))
+
+ def get_album(self, album, with_connection=None, add=True):
+ """get album information from the database.
+ if not in database insert new entry.
+
+ :param sima.lib.meta.Album album: album objet
+ :param sqlite3.Connection with_connection: SQLite connection
+ """
+ if with_connection:
+ connection = with_connection
+ else:
+ connection = self.get_database_connection()
+ rows = self._get_album(album, connection)
+ for row in rows:
+ if not with_connection:
+ self.close_database_connection(connection)
+ return row[0]
+ if not add:
+ if not with_connection:
+ self.close_database_connection(connection)
+ return None
+ connection.execute(
+ "INSERT INTO albums (name, mbid) VALUES (?, ?)",
+ (album.name, album.mbid))
+ connection.commit()
+ rows = self._get_album(album, connection)
+ for row in rows:
+ if not with_connection:
+ self.close_database_connection(connection)
+ return row[0]
+ print('damned: %s' % album.mbid)
+ if not with_connection:
+ self.close_database_connection(connection)
+ return None
+
+ def _get_albumartist(self, artist, connection):
+ if artist.mbid:
+ return connection.execute(
+ "SELECT id FROM albumartists WHERE mbid = ?",
+ (artist.mbid,))
+ else:
+ return connection.execute(
+ "SELECT id FROM albumartists WHERE name = ? AND mbid IS NULL",
+ (artist.name,))
+
+ def get_albumartist(self, artist, with_connection=None, add=True):
+ """get albumartist information from the database.
+ if not in database insert new entry.
+
+ :param sima.lib.meta.Artist artist: artist
+ :param sqlite3.Connection with_connection: SQLite connection
+ """
+ if with_connection:
+ connection = with_connection
+ else:
+ connection = self.get_database_connection()
+ rows = self._get_albumartist(artist, connection)
+ for row in rows:
+ if not with_connection:
+ self.close_database_connection(connection)
+ return row[0]
+ if not add:
+ if not with_connection:
+ self.close_database_connection(connection)
+ return None
+ connection.execute(
+ "INSERT INTO albumartists (name, mbid) VALUES (?, ?)",
+ (artist.name, artist.mbid))
+ connection.commit()
+ rows = self._get_albumartist(artist, connection)
+ for row in rows:
+ if not with_connection:
+ self.close_database_connection(connection)
+ return row[0]
+ if not with_connection:
+ self.close_database_connection(connection)
+
+ def _get_artist(self, artist, connection):
+ if artist.mbid:
+ return connection.execute(
+ "SELECT id FROM artists WHERE mbid = ?",
+ (artist.mbid,))
+ else:
+ return connection.execute(
+ "SELECT id FROM artists WHERE name = ? AND mbid IS NULL", (artist.name,))
+
+ def get_artist(self, artist, with_connection=None, add=True):
+ """get artist information from the database.
+ if not in database insert new entry.
+
+ :param sima.lib.meta.Artist artist: artist
+ :param sqlite3.Connection with_connection: SQLite connection
+ """
+ if with_connection:
+ connection = with_connection
+ else:
+ connection = self.get_database_connection()
+ rows = self._get_artist(artist, connection)
+ for row in rows:
+ if not with_connection:
+ self.close_database_connection(connection)
+ return row[0]
+ if not add:
+ if not with_connection:
+ self.close_database_connection(connection)
+ return None
+ connection.execute(
+ "INSERT INTO artists (name, mbid) VALUES (?, ?)",
+ (artist.name, artist.mbid))
+ connection.commit()
+ rows = self._get_artist(artist, connection)
+ for row in rows:
+ if not with_connection:
+ self.close_database_connection(connection)
+ return row[0]
+ if not with_connection:
+ self.close_database_connection(connection)
+
+ def get_track(self, track, with_connection=None, add=True):
+ """Get a track from Tracks table, add if not existing,
+ Attention: use Track() object!!
+ if not in database insert new entry."""
+ if with_connection:
+ connection = with_connection
+ else:
+ connection = self.get_database_connection()
+ rows = connection.execute(
+ "SELECT * FROM tracks WHERE file = ?", (track.file,))
+ for row in rows:
+ if not with_connection:
+ self.close_database_connection(connection)
+ return row[0]
+ if not add: # Not adding non existing track
+ return None
+ # Get an artist record or None
+ if track.artist:
+ art = Artist(name=track.artist, mbid=track.musicbrainz_artistid)
+ art_id = self.get_artist(art, with_connection=connection)
+ else:
+ art_id = None
+ # Get an albumartist record or None
+ if track.albumartist:
+ albart = Artist(name=track.albumartist,
+ mbid=track.musicbrainz_albumartistid)
+ albart_id = self.get_albumartist(albart, with_connection=connection)
+ else:
+ albart_id = None
+ # Get an album record or None
+ if track.album:
+ alb = Album(name=track.album, mbid=track.musicbrainz_albumid)
+ alb_id = self.get_album(alb, with_connection=connection)
+ else:
+ alb_id = None
+ connection.execute(
+ """INSERT INTO tracks (artist, albumartist, album, title, mbid, file)
+ VALUES (?, ?, ?, ?, ?, ?)""",
+ (art_id, albart_id, alb_id, track.title, track.musicbrainz_trackid,
+ track.file))
+ connection.commit()
+ rows = connection.execute(
+ "SELECT id FROM tracks WHERE file = ?", (track.file,))
+ for row in rows:
+ if not with_connection:
+ self.close_database_connection(connection)
+ return row[0]
+ if not with_connection:
+ connection.commit()
+ self.close_database_connection(connection)
+ return None
+
+ def add_history(self, track, date=None):
+ """Record last play date of track (ie. not a real exhautive play history).
+ :param track sima.lib.track.Track: track to add to history"""
+ if not date:
+ date = datetime.now()
+ connection = self.get_database_connection()
+ track_id = self.get_track(track, with_connection=connection)
+ rows = connection.execute("SELECT * FROM history WHERE track = ? ",
+ (track_id,))
+ if not rows.fetchone():
+ connection.execute("INSERT INTO history (track) VALUES (?)",
+ (track_id,))
+ connection.execute("UPDATE history SET last_play = ? "
+ " WHERE track = ?", (date, track_id,))
+ connection.commit()
+ self.close_database_connection(connection)
+
+ def purge_history(self, duration=__HIST_DURATION__):
+ """Remove old entries in history
+ :param duration int: Purge history record older than duration in hours
+ (defaults to __HIST_DURATION__)"""
+ connection = self.get_database_connection()
+ connection.execute("DELETE FROM history WHERE last_play"
+ " < datetime('now', '-%i hours')" % duration)
+ connection.commit()
+ self.close_database_connection(connection)
+
+ def get_history(self, duration=__HIST_DURATION__):
+ date = datetime.utcnow() - timedelta(hours=duration)
+ connection = self.get_database_connection()
+ connection.row_factory = sqlite3.Row
+ rows = connection.execute("""
+ SELECT tracks.title, tracks.file, artists.name AS artist,
+ albumartists.name AS albumartist,
+ artists.mbid as musicbrainz_artistid,
+ albums.name AS album,
+ albums.mbid AS musicbrainz_albumid,
+ tracks.mbid as musicbrainz_trackid
+ FROM history
+ JOIN tracks ON history.track = tracks.id
+ LEFT OUTER JOIN artists ON tracks.artist = artists.id
+ LEFT OUTER JOIN albumartists ON tracks.albumartist = albumartists.id
+ LEFT OUTER JOIN albums ON tracks.album = albums.id
+ WHERE history.last_play > ?
+ ORDER BY history.last_play DESC""", (date.isoformat(' '),))
+ hist = list()
+ for row in rows:
+ hist.append(Track(**row))
+ connection.close()
+ return hist
+
+
+def main():
+ db = SimaDB('/dev/shm/test.sqlite')
+ db.create_db()
+
+# VIM MODLINE
+# vim: ai ts=4 sw=4 sts=4 expandtab fileencoding=utf8
--- /dev/null
+# coding: utf-8
+
+import unittest
+import os
+import datetime
+
+from sima.lib.db import SimaDB
+from sima.lib.track import Track
+
+
+DEVOLT = {
+ 'album': 'Grey',
+ 'albumartist': 'Devolt',
+ 'albumartistsort': 'Devolt',
+ 'artist': 'Devolt',
+ 'date': '2011-12-01',
+ 'disc': '1/1',
+ 'file': 'music/Devolt/2011-Grey/03-Devolt - Crazy.mp3',
+ 'last-modified': '2012-04-02T20:48:59Z',
+ 'musicbrainz_albumartistid': 'd8e7e3e2-49ab-4f7c-b148-fc946d521f99',
+ 'musicbrainz_albumid': 'ea2ef2cf-59e1-443a-817e-9066e3e0be4b',
+ 'musicbrainz_artistid': 'd8e7e3e2-49ab-4f7c-b148-fc946d521f99',
+ 'musicbrainz_trackid': 'fabf8fc9-2ae5-49c9-8214-a839c958d872',
+ 'time': '220',
+ 'duration': '220.000',
+ 'title': 'Crazy',
+ 'track': '3/6'}
+
+
+class Main_TestDB(unittest.TestCase):
+ db_file = 'file::memory:?cache=shared'
+ #db_file = '/dev/shm/unittest.sqlite'
+
+ @classmethod
+ def setUpClass(self):
+ self.db = SimaDB(db_path=self.db_file)
+ # Maintain a connection to keep the database between test cases
+ self.conn = self.db.get_database_connection()
+
+ @classmethod
+ def tearDownClass(self):
+ self.conn.close()
+
+
+class TestDB(Main_TestDB):
+
+ def test_00_recreation(self):
+ self.db.create_db()
+
+ def test_01_add_track(self):
+ trk = Track(**DEVOLT)
+ trk_id = self.db.get_track(trk)
+ self.assertEqual(trk_id, self.db.get_track(trk),
+ 'Same track, same record')
+
+ def test_02_history(self):
+ curr = datetime.datetime.utcnow()
+ # set records in the past to ease purging then
+ last = curr - datetime.timedelta(hours=1)
+ trk = Track(**DEVOLT)
+ self.db.add_history(trk, date=last)
+ self.db.add_history(trk, date=last)
+ hist = self.db.get_history()
+ self.assertEqual(len(hist), 1, 'same track results in a single record')
+
+ trk_foo = Track(file="/foo/bar/baz.flac")
+ self.db.add_history(trk_foo, date=last)
+ hist = self.db.get_history()
+ self.assertEqual(len(hist), 2)
+
+ self.db.add_history(trk, date=last)
+ hist = self.db.get_history()
+ self.assertEqual(len(hist), 2)
+ self.db.purge_history(duration=0)
+ hist = self.db.get_history()
+ self.assertEqual(len(hist), 0)
+
+ # Controls we got history in the right order
+ # recent first, oldest last
+ hist = list()
+ for i in range(1, 5): # starts at 1 to ensure records are in the past
+ trk = Track(file=f'/foo/bar.{i}', name='{i}-baz', album='foolbum')
+ hist.append(trk)
+ last = curr - datetime.timedelta(minutes=i)
+ self.db.add_history(trk, date=last)
+ hist_records = self.db.get_history()
+ self.assertEqual(hist, hist_records)
+ self.db.purge_history(duration=0)
+
+ def test_04_triggers(self):
+ self.db.purge_history(duration=0)
+ curr = datetime.datetime.utcnow()
+ tracks_ids = list()
+ # Set 4 records, same album
+ for i in range(1, 6): # starts at 1 to ensure records are in the past
+ trk = Track(file=f'/foo/{i}', name=f'{i}', artist='fooart',
+ albumartist='fooalbart', album='foolbum',)
+ tracks_ids.append(self.db.get_track(trk)) # Add track, save its DB id
+ # set records in the past to ease purging then
+ last = curr - datetime.timedelta(minutes=i)
+ self.db.add_history(trk, date=last) # Add to history
+ conn = self.db.get_database_connection()
+ # Add another track not related (not same album)
+ track = Track(file='/baz/bar.baz', name='baz', artist='fooart',
+ albumartist='not-same', album='not-same',)
+ self.db.get_track(track)
+ # for tid in tracks_ids:
+ for tid in tracks_ids[:-1]:
+ # Delete lastest record
+ conn.execute('DELETE FROM history WHERE history.track = ?', (tid,))
+ c = conn.execute('SELECT albums.name FROM albums;')
+ # There are still albums records (still a history using it)
+ self.assertIn((trk.album,), c.fetchall())
+ # purging last entry in history or album == trk.album
+ c.execute('DELETE FROM history WHERE history.track = ?',
+ (tracks_ids[-1],))
+ # triggers purge other tables if possible
+ c.execute('SELECT albums.name FROM albums;')
+ albums = c.fetchall()
+ self.assertNotIn(('foolbum',), albums)
+ conn.close()
+
+
+# VIM MODLINE
+# vim: ai ts=4 sw=4 sts=4 expandtab fileencoding=utf8