-# -*- coding: utf-8 -*-
-#
-# Copyright (c) 2009-2013 Jack Kaliko <jack@azylum.org>
-# Copyright (c) 2009, Eric Casteleijn <thisfred@gmail.com>
-# Copyright (c) 2008 Rick van Hattem
+# Copyright (c) 2009-2013, 2019-2021 kaliko <kaliko@azylum.org>
#
# This file is part of sima
#
"""SQlite database library
"""
-# DOC:
-# MuscicBrainz ID: <http://musicbrainz.org/doc/MusicBrainzIdentifier>
-# Artists: <http://musicbrainz.org/doc/Artist_Name>
-# <http://musicbrainz.org/doc/Same_Artist_With_Different_Names>
-
-__DB_VERSION__ = 2
-__HIST_DURATION__ = int(30 * 24) # in hours
+#: DB Version
+__DB_VERSION__ = 4
+#: Default history duration for both request and purge in hours
+__HIST_DURATION__ = int(30 * 24)
import sqlite3
+from collections import deque
from datetime import (datetime, timedelta)
-from os.path import dirname, isdir
-from os import (access, W_OK, F_OK)
-from shutil import copyfile
+from datetime import timezone
+
+
+from sima.lib.meta import Artist, Album
+from sima.lib.track import Track
+from sima.utils.utils import MPDSimaException
-class SimaDBError(Exception):
+class SimaDBError(MPDSimaException):
"""
Exceptions.
"""
- pass
-
-
-class SimaDBAccessError(SimaDBError):
- """Error on accessing DB file"""
- pass
-
-class SimaDBNoFile(SimaDBError):
- """No DB file present"""
- pass
-
-class SimaDBUpgradeError(SimaDBError):
- """Error on upgrade"""
- pass
-
-
-class SimaDB(object):
+class SimaDB:
"SQLite management"
def __init__(self, db_path=None):
self._db_path = db_path
- self.db_path_mod_control()
-
- def db_path_mod_control(self):
- """Controls DB path access & write permissions"""
- db_path = self._db_path
- # Controls directory access
- if not isdir(dirname(db_path)):
- raise SimaDBAccessError('Not a regular directory: "%s"' %
- dirname(db_path))
- if not access(dirname(db_path), W_OK):
- raise SimaDBAccessError('No write access to "%s"' % dirname(db_path))
- # Is a file but no write access
- if access(db_path, F_OK) and not access(db_path, W_OK | F_OK):
- raise SimaDBAccessError('No write access to "%s"' % db_path)
- # No file
- if not access(db_path, F_OK):
- raise SimaDBNoFile('No DB file in "%s"' % db_path)
-
- def close_database_connection(self, connection):
- """Close the database connection."""
- connection.close()
def get_database_connection(self):
"""get database reference"""
- connection = sqlite3.connect(
- self._db_path, timeout=5.0, isolation_level="immediate")
- #connection.text_factory = str
+ connection = sqlite3.connect(self._db_path, isolation_level=None)
return connection
- def upgrade(self):
- """upgrade DB from previous versions"""
+ def get_info(self):
connection = self.get_database_connection()
- try:
- connection.execute('SELECT version FROM db_info')
- except Exception as err:
- if err.__str__() == "no such table: db_info":
- # db version < 2 (MPD_sima 0.6)
- copyfile(self._db_path, self._db_path + '.0.6')
- connection.execute('DROP TABLE tracks')
- connection.commit()
- self.create_db()
- else:
- raise SimaDBUpgradeError('Could not upgrade database: "%s"' %
- err)
- self.close_database_connection(connection)
+ info = connection.execute("""SELECT * FROM db_info
+ WHERE name = "DB Version" LIMIT 1;""").fetchone()
+ connection.close()
+ return info
- def get_artist(self, artist_name, mbid=None,
- with_connection=None, add_not=False):
- """get artist information from the database.
- if not in database insert new entry."""
+ def create_db(self):
+ """ Set up a database
+ """
+ connection = self.get_database_connection()
+ connection.execute(
+ 'CREATE TABLE IF NOT EXISTS db_info'
+ ' (name CHAR(50), value CHAR(50))')
+ connection.execute('''INSERT INTO db_info (name, value) SELECT ?, ?
+ WHERE NOT EXISTS
+ ( SELECT 1 FROM db_info WHERE name = ? )''',
+ ('DB Version', __DB_VERSION__, 'DB Version'))
+ connection.execute( # ARTISTS
+ 'CREATE TABLE IF NOT EXISTS artists (id INTEGER PRIMARY KEY, '
+ 'name VARCHAR(100), mbid CHAR(36))')
+ connection.execute( # ALBUMS
+ 'CREATE TABLE IF NOT EXISTS albums (id INTEGER PRIMARY KEY, '
+ 'name VARCHAR(100), mbid CHAR(36))')
+ connection.execute( # ALBUMARTISTS
+ 'CREATE TABLE IF NOT EXISTS albumartists (id INTEGER PRIMARY KEY, '
+ 'name VARCHAR(100), mbid CHAR(36))')
+ connection.execute( # TRACKS
+ 'CREATE TABLE IF NOT EXISTS tracks (id INTEGER PRIMARY KEY, '
+ 'title VARCHAR(100), artist INTEGER, '
+ 'album INTEGER, albumartist INTEGER, '
+ 'file VARCHAR(500), mbid CHAR(36), '
+ 'FOREIGN KEY(artist) REFERENCES artists(id), '
+ 'FOREIGN KEY(album) REFERENCES albums(id), '
+ 'FOREIGN KEY(albumartist) REFERENCES albumartists(id))')
+ connection.execute( # HISTORY
+ 'CREATE TABLE IF NOT EXISTS history (id INTEGER PRIMARY KEY, '
+ 'last_play TIMESTAMP, track INTEGER, '
+ 'FOREIGN KEY(track) REFERENCES tracks(id))')
+ connection.execute( # BLOCKLIST
+ 'CREATE TABLE IF NOT EXISTS blocklist (id INTEGER PRIMARY KEY, '
+ 'artist INTEGER, album INTEGER, track INTEGER, '
+ 'FOREIGN KEY(artist) REFERENCES artists(id), '
+ 'FOREIGN KEY(album) REFERENCES albums(id), '
+ 'FOREIGN KEY(track) REFERENCES tracks(id))')
+ connection.execute( # Genres (Many-to-many)
+ 'CREATE TABLE IF NOT EXISTS genres '
+ '(id INTEGER PRIMARY KEY, name VARCHAR(100))')
+ connection.execute( # Junction Genres Tracks
+ """CREATE TABLE IF NOT EXISTS tracks_genres
+ ( track INTEGER, genre INTEGER,
+ FOREIGN KEY(track) REFERENCES tracks(id)
+ FOREIGN KEY(genre) REFERENCES genres(id))""")
+ # Create cleanup triggers:
+ # DELETE history → Tracks / Tracks_genres tables
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_history_cleanup_tracks
+ AFTER DELETE ON history
+ WHEN ((SELECT count(*) FROM history WHERE track=old.track) = 0 AND
+ (SELECT count(*) FROM blocklist WHERE track=old.track) = 0)
+ BEGIN
+ DELETE FROM tracks WHERE id = old.track;
+ DELETE FROM tracks_genres WHERE track = old.track;
+ END;
+ ''')
+ # DELETE Tracks_Genres → Genres table
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_tracks_genres_cleanup_genres
+ AFTER DELETE ON tracks_genres
+ WHEN ((SELECT count(*) FROM tracks_genres WHERE genre=old.genre) = 0)
+ BEGIN
+ DELETE FROM genres WHERE id = old.genre;
+ END;
+ ''')
+ # DELETE Tracks → Artists table
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_artists
+ AFTER DELETE ON tracks
+ WHEN ((SELECT count(*) FROM tracks WHERE artist=old.artist) = 0 AND
+ (SELECT count(*) FROM blocklist WHERE artist=old.artist) = 0)
+ BEGIN
+ DELETE FROM artists WHERE id = old.artist;
+ END;
+ ''')
+ # DELETE Tracks → Albums table
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_albums
+ AFTER DELETE ON tracks
+ WHEN ((SELECT count(*) FROM tracks WHERE album=old.album) = 0 AND
+ (SELECT count(*) FROM blocklist WHERE album=old.album) = 0)
+ BEGIN
+ DELETE FROM albums WHERE id = old.album;
+ END;
+ ''')
+ # DELETE Tracks → cleanup AlbumArtists table
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_albumartists
+ AFTER DELETE ON tracks
+ WHEN ((SELECT count(*) FROM tracks WHERE albumartist=old.albumartist) = 0)
+ BEGIN
+ DELETE FROM albumartists WHERE id = old.albumartist;
+ END;
+ ''')
+ # DELETE blocklist → Tracks table
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_tracks
+ AFTER DELETE ON blocklist
+ WHEN ((SELECT count(*) FROM history WHERE track=old.track) = 0 AND
+ (SELECT count(*) FROM blocklist WHERE track=old.track) = 0)
+ BEGIN
+ DELETE FROM tracks WHERE id = old.track;
+ END;
+ ''')
+ # DELETE blocklist → Artists table
+ # The "SELECT count(*) FROM blocklist" is useless,
+ # there can be only one blocklist.artist
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_artists
+ AFTER DELETE ON blocklist
+ WHEN ((SELECT count(*) FROM tracks WHERE artist=old.artist) = 0 AND
+ (SELECT count(*) FROM blocklist WHERE artist=old.artist) = 0)
+ BEGIN
+ DELETE FROM artists WHERE id = old.artist;
+ END;
+ ''')
+ # DELETE Tracks → Albums table
+ # The "SELECT count(*) FROM blocklist" is useless,
+ # there can be only one blocklist.album
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_albums
+ AFTER DELETE ON blocklist
+ WHEN ((SELECT count(*) FROM tracks WHERE album=old.album) = 0 AND
+ (SELECT count(*) FROM blocklist WHERE album=old.album) = 0)
+ BEGIN
+ DELETE FROM albums WHERE id = old.album;
+ END;
+ ''')
+ connection.close()
+
+ def drop_all(self):
+ connection = self.get_database_connection()
+ rows = connection.execute(
+ "SELECT name FROM sqlite_master WHERE type='table'")
+ for row in rows.fetchall():
+ connection.execute(f'DROP TABLE IF EXISTS {row[0]}')
+ connection.close()
+
+ def _remove_blocklist_id(self, blid, with_connection=None):
+ """Remove a blocklist id"""
if with_connection:
connection = with_connection
else:
connection = self.get_database_connection()
- rows = connection.execute(
- "SELECT * FROM artists WHERE name = ?", (artist_name,))
- for row in rows:
- if not with_connection:
- self.close_database_connection(connection)
- return row
- if add_not:
- if not with_connection:
- self.close_database_connection(connection)
- return False
- connection.execute(
- "INSERT INTO artists (name, mbid) VALUES (?, ?)",
- (artist_name, mbid))
+ connection = self.get_database_connection()
+ connection.execute('DELETE FROM blocklist'
+ ' WHERE blocklist.id = ?', (blid,))
connection.commit()
- rows = connection.execute(
- "SELECT * FROM artists WHERE name = ?", (artist_name,))
- for row in rows:
- if not with_connection:
- self.close_database_connection(connection)
- return row
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
+
+ def _get_album(self, album, connection):
+ if album.mbid:
+ return connection.execute(
+ "SELECT id FROM albums WHERE mbid = ?",
+ (album.mbid,))
+ return connection.execute(
+ "SELECT id FROM albums WHERE name = ? AND mbid IS NULL",
+ (album.name,))
+
+ def get_album(self, album, with_connection=None, add=True):
+ """get album information from the database.
+ if not in database insert new entry.
- def get_track(self, track, with_connection=None, add_not=False):
+ :param sima.lib.meta.Album album: album objet
+ :param sqlite3.Connection with_connection: SQLite connection
"""
- Get a track from Tracks table, add if not existing,
- Attention: use Track() object!!
- if not in database insert new entry."""
- art = track.artist
- nam = track.title
- fil = track.file
if with_connection:
connection = with_connection
else:
connection = self.get_database_connection()
- art_id = self.get_artist(art, with_connection=connection)[0]
- alb_id = self.get_album(track, with_connection=connection)[0]
- rows = connection.execute(
- "SELECT * FROM tracks WHERE name = ? AND"
- " artist = ? AND file = ?", (nam, art_id, fil))
+ rows = self._get_album(album, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
- if add_not:
- return False
+ connection.close()
+ return row[0]
+ if not add:
+ if not with_connection:
+ connection.close()
+ return None
connection.execute(
- "INSERT INTO tracks (artist, album, name, file) VALUES (?, ?, ?, ?)",
- (art_id, alb_id, nam, fil))
+ "INSERT INTO albums (name, mbid) VALUES (?, ?)",
+ (album.name, album.mbid))
connection.commit()
- rows = connection.execute(
- "SELECT * FROM tracks WHERE name = ? AND"
- " artist = ? AND album = ? AND file = ?",
- (nam, art_id, alb_id, fil,))
+ rows = self._get_album(album, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
+ connection.close()
+ return row[0]
if not with_connection:
- connection.commit()
- self.close_database_connection(connection)
-
- def get_album(self, track, mbid=None,
- with_connection=None, add_not=False):
- """
- get album information from the database.
+ connection.close()
+ return None
+
+ def _get_albumartist(self, artist, connection):
+ if artist.mbid:
+ return connection.execute(
+ "SELECT id FROM albumartists WHERE mbid = ?",
+ (artist.mbid,))
+ return connection.execute(
+ "SELECT id FROM albumartists WHERE name = ? AND mbid IS NULL",
+ (artist.name,))
+
+ def get_albumartist(self, artist, with_connection=None, add=True):
+ """get albumartist information from the database.
if not in database insert new entry.
- Attention: use Track|Album object!!
- Use AlbumArtist tag if provided, fallback to Album tag
+
+ :param sima.lib.meta.Artist artist: artist
+ :param sqlite3.Connection with_connection: SQLite connection
"""
if with_connection:
connection = with_connection
else:
connection = self.get_database_connection()
- if track.albumartist:
- artist = track.albumartist
- else:
- artist = track.artist
- art_id = self.get_artist(artist, with_connection=connection)[0]
- album = track.album
- rows = connection.execute(
- "SELECT * FROM albums WHERE name = ? AND artist = ?",
- (album, art_id))
+ rows = self._get_albumartist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
- if add_not:
- return False
+ connection.close()
+ return row[0]
+ if not add:
+ if not with_connection:
+ connection.close()
+ return None
connection.execute(
- "INSERT INTO albums (name, artist, mbid) VALUES (?, ?, ?)",
- (album, art_id, mbid))
+ "INSERT INTO albumartists (name, mbid) VALUES (?, ?)",
+ (artist.name, artist.mbid))
connection.commit()
- rows = connection.execute(
- "SELECT * FROM albums WHERE name = ? AND artist = ?",
- (album, art_id))
+ rows = self._get_albumartist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
+ connection.close()
+ return row[0]
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
- def get_artists(self, with_connection=None):
- """Returns all artists in DB"""
- if with_connection:
- connection = with_connection
- else:
- connection = self.get_database_connection()
- rows = connection.execute("SELECT name FROM artists ORDER BY name")
- results = [row for row in rows]
- if not with_connection:
- self.close_database_connection(connection)
- for artist in results:
- yield artist
+ def _get_artist(self, artist, connection):
+ if artist.mbid:
+ return connection.execute(
+ "SELECT id FROM artists WHERE mbid = ?",
+ (artist.mbid,))
+ return connection.execute(
+ "SELECT id FROM artists WHERE name = ? AND mbid IS NULL", (artist.name,))
- def get_bl_artist(self, artist_name,
- with_connection=None, add_not=None):
- """get blacklisted artist information from the database."""
+ def get_artist(self, artist, with_connection=None, add=True):
+ """get artist information from the database.
+ if not in database insert new entry.
+
+ :param sima.lib.meta.Artist artist: artist
+ :param sqlite3.Connection with_connection: SQLite connection
+ """
if with_connection:
connection = with_connection
else:
connection = self.get_database_connection()
- art = self.get_artist(artist_name,
- with_connection=connection, add_not=add_not)
- if not art:
- return False
- art_id = art[0]
- rows = connection.execute("SELECT * FROM black_list WHERE artist = ?",
- (art_id,))
+ rows = self._get_artist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
- if add_not:
+ connection.close()
+ return row[0]
+ if not add:
if not with_connection:
- self.close_database_connection(connection)
- return False
- connection.execute("INSERT INTO black_list (artist) VALUES (?)",
- (art_id,))
- connection.execute("UPDATE black_list SET updated = DATETIME('now')"
- " WHERE artist = ?", (art_id,))
+ connection.close()
+ return None
+ connection.execute(
+ "INSERT INTO artists (name, mbid) VALUES (?, ?)",
+ (artist.name, artist.mbid))
connection.commit()
- rows = connection.execute("SELECT * FROM black_list WHERE artist = ?",
- (art_id,))
+ rows = self._get_artist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
+ connection.close()
+ return row[0]
if not with_connection:
- self.close_database_connection(connection)
- return False
+ connection.close()
- def get_bl_album(self, track,
- with_connection=None, add_not=None):
- """get blacklisted album information from the database."""
+ def get_genre(self, genre, with_connection=None, add=True):
+ """get genre from the database.
+ if not in database insert new entry.
+
+ :param str genre: genre as a string
+ :param sqlite3.Connection with_connection: SQLite connection
+ """
if with_connection:
connection = with_connection
else:
connection = self.get_database_connection()
- album = self.get_album(track,
- with_connection=connection, add_not=add_not)
- if not album:
- return False
- alb_id = album[0]
- rows = connection.execute("SELECT * FROM black_list WHERE album = ?",
- (alb_id,))
+ rows = connection.execute(
+ "SELECT id FROM genres WHERE name = ?", (genre,))
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
- if add_not:
+ connection.close()
+ return row[0]
+ if not add:
if not with_connection:
- self.close_database_connection(connection)
- return False
- connection.execute("INSERT INTO black_list (album) VALUES (?)",
- (alb_id,))
- connection.execute("UPDATE black_list SET updated = DATETIME('now')"
- " WHERE album = ?", (alb_id,))
+ connection.close()
+ return None
+ connection.execute(
+ "INSERT INTO genres (name) VALUES (?)", (genre,))
connection.commit()
- rows = connection.execute("SELECT * FROM black_list WHERE album = ?",
- (alb_id,))
+ rows = connection.execute(
+ "SELECT id FROM genres WHERE name = ?", (genre,))
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
- if not with_connection:
- self.close_database_connection(connection)
- return False
+ connection.close()
+ return row[0]
- def get_bl_track(self, track, with_connection=None, add_not=None):
- """get blacklisted track information from the database."""
+ def get_track(self, track, with_connection=None, add=True):
+ """Get a track id from Tracks table, add if not existing,
+
+ :param sima.lib.track.Track track: track to use
+ :param bool add: add non existing track to database"""
+ if not track.file:
+ raise SimaDBError(f'Got a track with no file attribute: {track}')
if with_connection:
connection = with_connection
else:
connection = self.get_database_connection()
- track = self.get_track(track,
- with_connection=connection, add_not=add_not)
- if not track:
- return False
- track_id = track[0]
- rows = connection.execute("SELECT * FROM black_list WHERE track = ?",
- (track_id,))
+ rows = connection.execute(
+ "SELECT * FROM tracks WHERE file = ?", (track.file,))
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
- if add_not:
+ connection.close()
+ return row[0]
+ if not add: # Not adding non existing track
if not with_connection:
- self.close_database_connection(connection)
- return False
- connection.execute("INSERT INTO black_list (track) VALUES (?)",
- (track_id,))
- connection.execute("UPDATE black_list SET updated = DATETIME('now')"
- " WHERE track = ?", (track_id,))
+ connection.close()
+ return None
+ # Get an artist record or None
+ if track.artist:
+ art = Artist(name=track.artist, mbid=track.musicbrainz_artistid)
+ art_id = self.get_artist(art, with_connection=connection)
+ else:
+ art_id = None
+ # Get an albumartist record or None
+ if track.albumartist:
+ albart = Artist(name=track.albumartist,
+ mbid=track.musicbrainz_albumartistid)
+ albart_id = self.get_albumartist(albart, with_connection=connection)
+ else:
+ albart_id = None
+ # Get an album record or None
+ if track.album:
+ alb = Album(name=track.album, mbid=track.musicbrainz_albumid)
+ alb_id = self.get_album(alb, with_connection=connection)
+ else:
+ alb_id = None
+ connection.execute(
+ """INSERT INTO tracks (artist, albumartist, album, title, mbid, file)
+ VALUES (?, ?, ?, ?, ?, ?)""",
+ (art_id, albart_id, alb_id, track.title, track.musicbrainz_trackid,
+ track.file))
connection.commit()
- rows = connection.execute("SELECT * FROM black_list WHERE track = ?",
- (track_id,))
+ # Add track id to junction tables
+ self._add_tracks_genres(track, connection)
+ rows = connection.execute(
+ "SELECT id FROM tracks WHERE file = ?", (track.file,))
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
+ connection.close()
+ return row[0]
if not with_connection:
- self.close_database_connection(connection)
- return False
+ connection.close()
+ return None
+
+ def _add_tracks_genres(self, track, connection):
+ if not track.genres:
+ return
+ rows = connection.execute(
+ "SELECT id FROM tracks WHERE file = ?", (track.file,))
+ trk_id = rows.fetchone()[0]
+ for genre in track.genres:
+ # add genre
+ gen_id = self.get_genre(genre)
+ connection.execute("""INSERT INTO tracks_genres (track, genre)
+ VALUES (?, ?)""", (trk_id, gen_id))
+
+ def add_history(self, track, date=None):
+ """Record last play date of track (ie. not a real play history).
+
+ :param sima.lib.track.Track track: track to add to history
+ :param datetime.datetime date: UTC datetime object (use "datetime.now(timezone.utc)" is not set)"""
+ if not date:
+ date = datetime.now(timezone.utc)
+ connection = self.get_database_connection()
+ track_id = self.get_track(track, with_connection=connection)
+ rows = connection.execute("SELECT * FROM history WHERE track = ? ",
+ (track_id,))
+ if not rows.fetchone():
+ connection.execute("INSERT INTO history (track) VALUES (?)",
+ (track_id,))
+ connection.execute("UPDATE history SET last_play = ? "
+ " WHERE track = ?", (date, track_id,))
+ connection.commit()
+ connection.close()
+
+ def purge_history(self, duration=__HIST_DURATION__):
+ """Remove old entries in history
+
+ :param int duration: Purge history record older than duration in hours"""
+ connection = self.get_database_connection()
+ connection.execute("DELETE FROM history WHERE last_play"
+ " < datetime('now', '-%i hours')" % duration)
+ connection.execute('VACUUM')
+ connection.commit()
+ connection.close()
- def get_artists_history(self, artists, duration=__HIST_DURATION__):
+ def fetch_albums_history(self, needle=None, duration=__HIST_DURATION__):
"""
+ :param sima.lib.meta.Artist needle: When specified, returns albums history for this artist.
+ :param int duration: How long ago to fetch history from (in hours)
"""
- date = datetime.utcnow() - timedelta(hours=duration)
+ date = datetime.now(timezone.utc) - timedelta(hours=duration)
connection = self.get_database_connection()
- rows = connection.execute(
- "SELECT arts.name, albs.name, trs.name, trs.file"
- " FROM artists AS arts, tracks AS trs, history AS hist, albums AS albs"
- " WHERE trs.id = hist.track AND trs.artist = arts.id AND trs.album = albs.id"
- " AND hist.last_play > ? ORDER BY hist.last_play DESC", (date.isoformat(' '),))
+ connection.row_factory = sqlite3.Row
+ rows = connection.execute("""
+ SELECT albums.name AS name,
+ albums.mbid as mbid,
+ artists.name as artist,
+ artists.mbid as artist_mbib,
+ albumartists.name as albumartist,
+ albumartists.mbid as albumartist_mbib
+ FROM history
+ JOIN tracks ON history.track = tracks.id
+ LEFT OUTER JOIN albums ON tracks.album = albums.id
+ LEFT OUTER JOIN artists ON tracks.artist = artists.id
+ LEFT OUTER JOIN albumartists ON tracks.albumartist = albumartists.id
+ WHERE history.last_play > ? AND albums.name NOT NULL AND artists.name NOT NULL
+ ORDER BY history.last_play DESC""", (date.isoformat(' '),))
+ hist = []
for row in rows:
- if artists and row[0] not in artists:
+ vals = dict(row)
+ if needle: # Here use artist instead of albumartist
+ if needle != Artist(name=vals.get('artist'),
+ mbid=vals.get('artist_mbib')):
+ continue
+ # Use albumartist / MBIDs if possible to build album artist
+ if not vals.get('albumartist'):
+ vals['albumartist'] = vals.get('artist')
+ if not vals.get('albumartist_mbib'):
+ vals['albumartist_mbib'] = vals.get('artist_mbib')
+ artist = Artist(name=vals.get('albumartist'),
+ mbid=vals.pop('albumartist_mbib'))
+ album = Album(**vals, Artist=artist)
+ if hist and hist[-1] == album:
+ # remove consecutive dupes
continue
- for art in artists:
- if row[0] == art:
- yield art
- self.close_database_connection(connection)
-
- def get_history(self, artist=None, artists=None, duration=__HIST_DURATION__):
- """Retrieve complete play history, most recent tracks first
- artist : filter history for specific artist
- artists : filter history for specific artists list
- """ # pylint: disable=C0301
- date = datetime.utcnow() - timedelta(hours=duration)
+ hist.append(album)
+ connection.close()
+ return hist
+
+ def fetch_artists_history(self, needle=None, duration=__HIST_DURATION__):
+ """Returns a list of Artist objects
+
+ :param sima.lib.meta.MetaContainer needle: When specified, returns history for these artists only
+ :param int duration: How long ago to fetch history from (in hours)
+ :type needle: sima.lib.meta.Artist or sima.lib.meta.MetaContainer
+ """
+ date = datetime.now(timezone.utc) - timedelta(hours=duration)
connection = self.get_database_connection()
- if artist:
- rows = connection.execute(
- "SELECT arts.name, albs.name, trs.name, trs.file, hist.last_play"
- " FROM artists AS arts, tracks AS trs, history AS hist, albums AS albs"
- " WHERE trs.id = hist.track AND trs.artist = arts.id AND trs.album = albs.id"
- " AND hist.last_play > ? AND arts.name = ?"
- " ORDER BY hist.last_play DESC", (date.isoformat(' '), artist,))
- else:
- rows = connection.execute(
- "SELECT arts.name, albs.name, trs.name, trs.file"
- " FROM artists AS arts, tracks AS trs, history AS hist, albums AS albs"
- " WHERE trs.id = hist.track AND trs.artist = arts.id AND trs.album = albs.id"
- " AND hist.last_play > ? ORDER BY hist.last_play DESC", (date.isoformat(' '),))
+ connection.row_factory = sqlite3.Row
+ rows = connection.execute("""
+ SELECT artists.name AS name,
+ artists.mbid as mbid
+ FROM history
+ JOIN tracks ON history.track = tracks.id
+ LEFT OUTER JOIN artists ON tracks.artist = artists.id
+ WHERE history.last_play > ? AND artists.name NOT NULL
+ ORDER BY history.last_play DESC""", (date.isoformat(' '),))
+ last = deque(maxlen=1)
+ hist = []
for row in rows:
- if artists and row[0] not in artists:
+ artist = Artist(**row)
+ if last and last[0] == artist: # remove consecutive dupes
+ continue
+ last.append(artist)
+ if needle and isinstance(needle, (Artist, str)):
+ if needle == artist:
+ hist.append(artist) # No need to go further
+ break
+ continue
+ if needle and getattr(needle, '__contains__'):
+ if artist in needle:
+ hist.append(artist) # No need to go further
continue
- yield row
- self.close_database_connection(connection)
+ hist.append(artist)
+ connection.close()
+ return hist
+
+ def fetch_genres_history(self, duration=__HIST_DURATION__, limit=20):
+ """Returns genre history
- def get_black_list(self):
- """Retrieve complete black list."""
+ :param int duration: How long ago to fetch history from (in hours)
+ :param int limit: number of genre to fetch
+ """
+ date = datetime.now(timezone.utc) - timedelta(hours=duration)
connection = self.get_database_connection()
- rows = connection.execute('SELECT black_list.rowid, artists.name'
- ' FROM artists INNER JOIN black_list'
- ' ON artists.id = black_list.artist')
- yield ('Row ID', 'Actual black listed element', 'Extra information',)
- yield ('',)
- yield ('Row ID', 'Artist',)
+ rows = connection.execute("""
+ SELECT genres.name, artists.name
+ FROM history
+ JOIN tracks ON history.track = tracks.id
+ LEFT OUTER JOIN tracks_genres ON tracks_genres.track = tracks.id
+ LEFT OUTER JOIN artists ON tracks.artist = artists.id
+ LEFT OUTER JOIN genres ON genres.id = tracks_genres.genre
+ WHERE history.last_play > ? AND genres.name NOT NULL
+ ORDER BY history.last_play DESC
+ """, (date.isoformat(' '),))
+ genres = []
for row in rows:
- yield row
- rows = connection.execute(
- 'SELECT black_list.rowid, albums.name, artists.name'
- ' FROM artists, albums INNER JOIN black_list'
- ' ON albums.id = black_list.album'
- ' WHERE artists.id = albums.artist')
- yield ('',)
- yield ('Row ID', 'Album', 'Artist name')
- for row in rows:
- yield row
- rows = connection.execute(
- 'SELECT black_list.rowid, tracks.name, artists.name'
- ' FROM artists, tracks INNER JOIN black_list'
- ' ON tracks.id = black_list.track'
- ' WHERE tracks.artist = artists.id')
- yield ('',)
- yield ('Row ID', 'Title', 'Artist name')
- for row in rows:
- yield row
- self.close_database_connection(connection)
+ genres.append(row)
+ if len({g[0] for g in genres}) >= limit:
+ break
+ connection.close()
+ return genres
- def _set_mbid(self, artist_id=None, mbid=None, with_connection=None):
- """get artist information from the database.
- if not in database insert new entry."""
- if with_connection:
- connection = with_connection
- else:
- connection = self.get_database_connection()
- connection.execute("UPDATE artists SET mbid = ? WHERE id = ?",
- (mbid, artist_id))
- connection.commit()
- if not with_connection:
- self.close_database_connection(connection)
+ def fetch_history(self, artist=None, duration=__HIST_DURATION__):
+ """Fetches tracks history, more recent first
- def _get_similar_artists_from_db(self, artist_id):
- connection = self.get_database_connection()
- results = [row for row in connection.execute(
- "SELECT match, name FROM usr_artist_2_artist INNER JOIN"
- " artists ON usr_artist_2_artist.artist2 = artists.id WHERE"
- " usr_artist_2_artist.artist1 = ? ORDER BY match DESC;",
- (artist_id,))]
- self.close_database_connection(connection)
- for score, artist in results:
- yield {'score': score, 'artist': artist}
-
- def _get_reverse_similar_artists_from_db(self, artist_id):
+ :param sima.lib.meta.Artist artist: limit history to this artist
+ :param int duration: How long ago to fetch history from (in hours)
+ """
+ date = datetime.now(timezone.utc) - timedelta(hours=duration)
connection = self.get_database_connection()
- results = [row for row in connection.execute(
- "SELECT name FROM usr_artist_2_artist INNER JOIN"
- " artists ON usr_artist_2_artist.artist1 = artists.id WHERE"
- " usr_artist_2_artist.artist2 = ?;",
- (artist_id,))]
- self.close_database_connection(connection)
- for artist in results:
- yield artist[0]
-
- def get_similar_artists(self, artist_name):
- """get similar artists from the database sorted by descending
- match score"""
- artist_id = self.get_artist(artist_name)[0]
- for result in self._get_similar_artists_from_db(artist_id):
- yield result
-
- def _get_artist_match(self, artist1, artist2, with_connection=None):
- """get artist match score from database"""
- if with_connection:
- connection = with_connection
+ connection.row_factory = sqlite3.Row
+ sql = """
+ SELECT tracks.title, tracks.file, artists.name AS artist,
+ albumartists.name AS albumartist,
+ artists.mbid as musicbrainz_artistid,
+ albums.name AS album,
+ albums.mbid AS musicbrainz_albumid,
+ tracks.mbid as musicbrainz_trackid
+ FROM history
+ JOIN tracks ON history.track = tracks.id
+ LEFT OUTER JOIN artists ON tracks.artist = artists.id
+ LEFT OUTER JOIN albumartists ON tracks.albumartist = albumartists.id
+ LEFT OUTER JOIN albums ON tracks.album = albums.id
+ WHERE history.last_play > ?
+ """
+ if artist:
+ if artist.mbid:
+ rows = connection.execute(sql+"""
+ AND artists.mbid = ?
+ ORDER BY history.last_play DESC""",
+ (date.isoformat(' '), artist.mbid))
+ else:
+ rows = connection.execute(sql+"""
+ AND artists.name = ?
+ ORDER BY history.last_play DESC""",
+ (date.isoformat(' '), artist.name))
else:
- connection = self.get_database_connection()
- rows = connection.execute(
- "SELECT match FROM usr_artist_2_artist WHERE artist1 = ?"
- " AND artist2 = ?",
- (artist1, artist2))
- result = 0
+ rows = connection.execute(sql+'ORDER BY history.last_play DESC',
+ (date.isoformat(' '),))
+ hist = []
for row in rows:
- result = row[0]
- break
- if not with_connection:
- self.close_database_connection(connection)
- return result
-
- def _remove_relation_between_2_artist(self, artist1, artist2):
- """Remove a similarity relation"""
- connection = self.get_database_connection()
- connection.execute(
- 'DELETE FROM usr_artist_2_artist'
- ' WHERE artist1 = ? AND artist2 = ?;',
- (artist1, artist2))
- self.clean_database(with_connection=connection)
- self._update_artist(artist_id=artist1, with_connection=connection)
- connection.commit()
- self.close_database_connection(connection)
-
- def _remove_artist(self, artist_id, deep=False, with_connection=None):
- """Remove all artist1 reference"""
- if with_connection:
- connection = with_connection
- else:
- connection = self.get_database_connection()
- if deep:
- connection.execute(
- 'DELETE FROM usr_artist_2_artist'
- ' WHERE artist1 = ? OR artist2 = ?;',
- (artist_id, artist_id))
- connection.execute(
- 'DELETE FROM artists WHERE id = ?;',
- (artist_id,))
- else:
- connection.execute(
- 'DELETE FROM usr_artist_2_artist WHERE artist1 = ?;',
- (artist_id,))
- self.clean_database(with_connection=connection)
- self._update_artist(artist_id=artist_id, with_connection=connection)
- if not with_connection:
- connection.commit()
- self.close_database_connection(connection)
+ hist.append(Track(**row))
+ connection.close()
+ return hist
- def _remove_bl(self, rowid):
- """Remove bl row id"""
- connection = self.get_database_connection()
- connection.execute('DELETE FROM black_list'
- ' WHERE black_list.rowid = ?', (rowid,))
- connection.commit()
- self.close_database_connection(connection)
+ def get_bl_track(self, track, with_connection=None, add=True):
+ """Add a track to blocklist
- def _insert_artist_match(
- self, artist1, artist2, match, with_connection=None):
- """write match score to the database.
- Does not update time stamp in table artist/*_updated"""
+ :param sima.lib.track.Track track: Track object to add to blocklist
+ :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one
+ :param bool add: Default is to add a new record, set to False to fetch associated record"""
if with_connection:
connection = with_connection
else:
connection = self.get_database_connection()
- connection.execute(
- "INSERT INTO usr_artist_2_artist (artist1, artist2, match) VALUES"
- " (?, ?, ?)",
- (artist1, artist2, match))
- if not with_connection:
- connection.commit()
- self.close_database_connection(connection)
-
- def add_history(self, track):
- """Add to history"""
- connection = self.get_database_connection()
- track_id = self.get_track(track, with_connection=connection)[0]
- rows = connection.execute("SELECT * FROM history WHERE track = ? ",
- (track_id,))
+ track_id = self.get_track(track, with_connection=connection, add=add)
+ rows = connection.execute(
+ "SELECT id FROM blocklist WHERE track = ?", (track_id,))
if not rows.fetchone():
- connection.execute("INSERT INTO history (track) VALUES (?)",
+ if not add:
+ if not with_connection:
+ connection.close()
+ return None
+ connection.execute('INSERT INTO blocklist (track) VALUES (?)',
(track_id,))
- connection.execute("UPDATE history SET last_play = DATETIME('now') "
- " WHERE track = ?", (track_id,))
- connection.commit()
- self.close_database_connection(connection)
-
- def _update_artist(self, artist_id, with_connection=None):
- """write artist information to the database"""
- if with_connection:
- connection = with_connection
- else:
- connection = self.get_database_connection()
- connection.execute(
- "UPDATE artists SET usr_updated = DATETIME('now') WHERE id = ?",
- (artist_id,))
- if not with_connection:
connection.commit()
- self.close_database_connection(connection)
-
- def _update_artist_match(
- self, artist1, artist2, match, with_connection=None):
- """write match score to the database"""
- if with_connection:
- connection = with_connection
- else:
- connection = self.get_database_connection()
- connection.execute(
- "UPDATE usr_artist_2_artist SET match = ? WHERE artist1 = ? AND"
- " artist2 = ?",
- (match, artist1, artist2))
+ rows = connection.execute(
+ "SELECT id FROM blocklist WHERE track = ?", (track_id,))
+ blt = rows.fetchone()[0]
if not with_connection:
- connection.commit()
- self.close_database_connection(connection)
+ connection.close()
+ return blt
- def _update_similar_artists(self, artist, similar_artists):
- """write user similar artist information to the database
- """
- # DOC: similar_artists = list([{'score': match, 'artist': name}])
- #
- connection = self.get_database_connection()
- artist_id = self.get_artist(artist, with_connection=connection)[0]
- for artist in similar_artists:
- id2 = self.get_artist(
- artist['artist'], with_connection=connection)[0]
- if self._get_artist_match(
- artist_id, id2, with_connection=connection):
- self._update_artist_match(
- artist_id, id2, artist['score'],
- with_connection=connection)
- continue
- self._insert_artist_match(
- artist_id, id2, artist['score'],
- with_connection=connection)
- self._update_artist(artist_id, with_connection=connection)
- connection.commit()
- self.close_database_connection(connection)
+ def get_bl_album(self, album, with_connection=None, add=True):
+ """Add an album to blocklist
- def _clean_artists_table(self, with_connection=None):
- """Clean orphan artists"""
+ :param sima.lib.meta.Album: Album object to add to blocklist
+ :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one
+ :param bool add: Default is to add a new record, set to False to fetch associated record"""
if with_connection:
connection = with_connection
else:
connection = self.get_database_connection()
- artists_ids = set([row[0] for row in connection.execute(
- "SELECT id FROM artists")])
- artist_2_artist_ids = set([row[0] for row in connection.execute(
- "SELECT artist1 FROM usr_artist_2_artist")] +
- [row[0] for row in connection.execute(
- "SELECT artist2 FROM usr_artist_2_artist")] +
- [row[0] for row in connection.execute(
- "SELECT artist FROM black_list")] +
- [row[0] for row in connection.execute(
- "SELECT artist FROM albums")] +
- [row[0] for row in connection.execute(
- "SELECT artist FROM tracks")])
- orphans = [(orphan,) for orphan in artists_ids - artist_2_artist_ids]
- connection.executemany('DELETE FROM artists WHERE id = (?);', orphans)
- if not with_connection:
+ album_id = self.get_album(album, with_connection=connection, add=add)
+ rows = connection.execute(
+ "SELECT id FROM blocklist WHERE album = ?", (album_id,))
+ if not rows.fetchone():
+ if not add:
+ if not with_connection:
+ connection.close()
+ return None
+ connection.execute('INSERT INTO blocklist (album) VALUES (?)',
+ (album_id,))
connection.commit()
- self.close_database_connection(connection)
-
- def _clean_albums_table(self, with_connection=None):
- """Clean orphan albums"""
- if with_connection:
- connection = with_connection
- else:
- connection = self.get_database_connection()
- orphan_black_ids = set([row[0] for row in connection.execute(
- """SELECT albums.id FROM albums
- LEFT JOIN black_list ON albums.id = black_list.album
- WHERE ( black_list.album IS NULL )""")])
- orphan_tracks_ids = set([row[0] for row in connection.execute(
- """SELECT albums.id FROM albums
- LEFT JOIN tracks ON albums.id = tracks.album
- WHERE tracks.album IS NULL""")])
- orphans = [(orphan,) for orphan in orphan_black_ids & orphan_tracks_ids]
- connection.executemany('DELETE FROM albums WHERE id = (?);', orphans)
+ rows = connection.execute(
+ "SELECT id FROM blocklist WHERE album = ?", (album_id,))
+ blitem = rows.fetchone()[0]
if not with_connection:
- connection.commit()
- self.close_database_connection(connection)
+ connection.close()
+ return blitem
- def _clean_tracks_table(self, with_connection=None):
- """Clean orphan tracks"""
- if with_connection:
- connection = with_connection
- else:
- connection = self.get_database_connection()
- hist_orphan_ids = set([row[0] for row in connection.execute(
- """SELECT tracks.id FROM tracks
- LEFT JOIN history ON tracks.id = history.track
- WHERE history.track IS NULL""")])
- black_list_orphan_ids = set([row[0] for row in connection.execute(
- """SELECT tracks.id FROM tracks
- LEFT JOIN black_list ON tracks.id = black_list.track
- WHERE black_list.track IS NULL""")])
- orphans = [(orphan,) for orphan in hist_orphan_ids & black_list_orphan_ids]
- connection.executemany('DELETE FROM tracks WHERE id = (?);', orphans)
- if not with_connection:
- connection.commit()
- self.close_database_connection(connection)
+ def get_bl_artist(self, artist, with_connection=None, add=True):
+ """Add an artist to blocklist
- def clean_database(self, with_connection=None):
- """Wrapper around _clean_* methods"""
+ :param sima.lib.meta.Artist: Artist object to add to blocklist
+ :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one
+ :param bool add: Default is to add a new record, set to False to fetch associated record"""
if with_connection:
connection = with_connection
else:
connection = self.get_database_connection()
- self._clean_tracks_table(with_connection=connection)
- self._clean_albums_table(with_connection=connection)
- self._clean_artists_table(with_connection=connection)
- connection.execute("VACUUM")
- if not with_connection:
+ artist_id = self.get_artist(artist, with_connection=connection, add=add)
+ rows = connection.execute(
+ "SELECT id FROM blocklist WHERE artist = ?", (artist_id,))
+ if not rows.fetchone():
+ if not add:
+ return None
+ connection.execute('INSERT INTO blocklist (artist) VALUES (?)',
+ (artist_id,))
connection.commit()
- self.close_database_connection(connection)
-
- def purge_history(self, duration=__HIST_DURATION__):
- """Remove old entries in history"""
- connection = self.get_database_connection()
- connection.execute("DELETE FROM history WHERE last_play"
- " < datetime('now', '-%i hours')" % duration)
- connection.commit()
- self.close_database_connection(connection)
+ rows = connection.execute(
+ "SELECT id FROM blocklist WHERE artist = ?", (artist_id,))
+ blitem = rows.fetchone()[0]
+ if not with_connection:
+ connection.close()
+ return blitem
- def _set_dbversion(self):
- """Add db version"""
+ def view_bl(self):
connection = self.get_database_connection()
- connection.execute('INSERT INTO db_info (version, name) VALUES (?, ?)',
- (__DB_VERSION__, 'Sima DB'))
- connection.commit()
- self.close_database_connection(connection)
+ connection.row_factory = sqlite3.Row
+ rows = connection.execute("""SELECT artists.name AS artist,
+ artists.mbid AS musicbrainz_artist,
+ albums.name AS album,
+ albums.mbid AS musicbrainz_album,
+ tracks.title AS title,
+ tracks.mbid AS musicbrainz_title,
+ tracks.file AS file,
+ blocklist.id
+ FROM blocklist
+ LEFT OUTER JOIN artists ON blocklist.artist = artists.id
+ LEFT OUTER JOIN albums ON blocklist.album = albums.id
+ LEFT OUTER JOIN tracks ON blocklist.track = tracks.id""")
+ res = [dict(row) for row in rows.fetchall()]
+ connection.close()
+ return res
- def create_db(self):
- """ Set up a database for the artist similarity scores
- """
+ def delete_bl(self, track=None, album=None, artist=None):
+ if not (track or album or artist):
+ return
connection = self.get_database_connection()
- connection.execute(
- 'CREATE TABLE IF NOT EXISTS db_info'
- ' (version INTEGER, name CHAR(36))')
- connection.execute(
- 'CREATE TABLE IF NOT EXISTS artists (id INTEGER PRIMARY KEY, name'
- ' VARCHAR(100), mbid CHAR(36), lfm_updated DATE, usr_updated DATE)')
- connection.execute(
- 'CREATE TABLE IF NOT EXISTS usr_artist_2_artist (artist1 INTEGER,'
- ' artist2 INTEGER, match INTEGER)')
- connection.execute(
- 'CREATE TABLE IF NOT EXISTS lfm_artist_2_artist (artist1 INTEGER,'
- ' artist2 INTEGER, match INTEGER)')
- connection.execute(
- 'CREATE TABLE IF NOT EXISTS albums (id INTEGER PRIMARY KEY,'
- ' artist INTEGER, name VARCHAR(100), mbid CHAR(36))')
- connection.execute(
- 'CREATE TABLE IF NOT EXISTS tracks (id INTEGER PRIMARY KEY,'
- ' name VARCHAR(100), artist INTEGER, album INTEGER,'
- ' file VARCHAR(500), mbid CHAR(36))')
- connection.execute(
- 'CREATE TABLE IF NOT EXISTS black_list (artist INTEGER,'
- ' album INTEGER, track INTEGER, updated DATE)')
- connection.execute(
- 'CREATE TABLE IF NOT EXISTS history (last_play DATE,'
- ' track integer)')
- connection.execute(
- "CREATE INDEX IF NOT EXISTS a2aa1x ON usr_artist_2_artist (artist1)")
- connection.execute(
- "CREATE INDEX IF NOT EXISTS a2aa2x ON usr_artist_2_artist (artist2)")
- connection.execute(
- "CREATE INDEX IF NOT EXISTS lfma2aa1x ON lfm_artist_2_artist (artist1)")
- connection.execute(
- "CREATE INDEX IF NOT EXISTS lfma2aa2x ON lfm_artist_2_artist (artist2)")
- connection.commit()
- self.close_database_connection(connection)
- self._set_dbversion()
-
-
-def main():
- db = SimaDB(db_path='/tmp/sima.db')
- db.purge_history(int(4))
- db.clean_database()
-
+ blid = None
+ if track:
+ blid = self.get_bl_track(track, with_connection=connection)
+ if album:
+ blid = self.get_bl_album(album, with_connection=connection)
+ if artist:
+ blid = self.get_bl_artist(artist, with_connection=connection)
+ if not blid:
+ return
+ self._remove_blocklist_id(blid, with_connection=connection)
+ connection.close()
-# Script starts here
-if __name__ == '__main__':
- main()
# VIM MODLINE
-# vim: ai ts=4 sw=4 sts=4 expandtab
+# vim: ai ts=4 sw=4 sts=4 expandtab fileencoding=utf8