+++ /dev/null
-# Copyright (c) 2009-2013, 2019-2021 kaliko <kaliko@azylum.org>
-#
-# This file is part of sima
-#
-# sima is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# sima is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with sima. If not, see <http://www.gnu.org/licenses/>.
-#
-#
-"""SQlite database library
-"""
-
-__DB_VERSION__ = 4
-__HIST_DURATION__ = int(30 * 24) # in hours
-
-import sqlite3
-
-from collections import deque
-from datetime import (datetime, timedelta)
-
-from sima.lib.meta import Artist, Album
-from sima.lib.track import Track
-
-
-class SimaDBError(Exception):
- """
- Exceptions.
- """
-
-
-class SimaDB:
- "SQLite management"
-
- def __init__(self, db_path=None):
- self._db_path = db_path
-
- def get_database_connection(self):
- """get database reference"""
- connection = sqlite3.connect(
- self._db_path, isolation_level=None)
- return connection
-
- def create_db(self):
- """ Set up a database
- """
- connection = self.get_database_connection()
- connection.execute(
- 'CREATE TABLE IF NOT EXISTS db_info'
- ' (name CHAR(50), value CHAR(50))')
- connection.execute('''INSERT INTO db_info (name, value) SELECT ?, ?
- WHERE NOT EXISTS
- ( SELECT 1 FROM db_info WHERE name = ? )''',
- ('DB Version', __DB_VERSION__, 'DB Version'))
- connection.execute( # ARTISTS
- 'CREATE TABLE IF NOT EXISTS artists (id INTEGER PRIMARY KEY, '
- 'name VARCHAR(100), mbid CHAR(36))')
- connection.execute( # ALBUMS
- 'CREATE TABLE IF NOT EXISTS albums (id INTEGER PRIMARY KEY, '
- 'name VARCHAR(100), mbid CHAR(36))')
- connection.execute( # ALBUMARTISTS
- 'CREATE TABLE IF NOT EXISTS albumartists (id INTEGER PRIMARY KEY, '
- 'name VARCHAR(100), mbid CHAR(36))')
- connection.execute( # TRACKS
- 'CREATE TABLE IF NOT EXISTS tracks (id INTEGER PRIMARY KEY, '
- 'title VARCHAR(100), artist INTEGER, '
- 'album INTEGER, albumartist INTEGER, '
- 'file VARCHAR(500), mbid CHAR(36), '
- 'FOREIGN KEY(artist) REFERENCES artists(id), '
- 'FOREIGN KEY(album) REFERENCES albums(id), '
- 'FOREIGN KEY(albumartist) REFERENCES albumartists(id))')
- connection.execute( # HISTORY
- 'CREATE TABLE IF NOT EXISTS history (id INTEGER PRIMARY KEY, '
- 'last_play TIMESTAMP, track INTEGER, '
- 'FOREIGN KEY(track) REFERENCES tracks(id))')
- connection.execute( # BLOCKLIST
- 'CREATE TABLE IF NOT EXISTS blocklist (id INTEGER PRIMARY KEY, '
- 'artist INTEGER, album INTEGER, track INTEGER, '
- 'FOREIGN KEY(artist) REFERENCES artists(id), '
- 'FOREIGN KEY(album) REFERENCES albums(id), '
- 'FOREIGN KEY(track) REFERENCES tracks(id))')
- connection.execute( # Genres (Many-to-many)
- 'CREATE TABLE IF NOT EXISTS genres '
- '(id INTEGER PRIMARY KEY, name VARCHAR(100))')
- connection.execute( # Junction Genres Tracks
- """CREATE TABLE IF NOT EXISTS tracks_genres
- ( track INTEGER, genre INTEGER,
- FOREIGN KEY(track) REFERENCES tracks(id)
- FOREIGN KEY(genre) REFERENCES genres(id))""")
- # Create cleanup triggers:
- # DELETE history → Tracks / Tracks_genres tables
- connection.execute('''
- CREATE TRIGGER IF NOT EXISTS del_history_cleanup_tracks
- AFTER DELETE ON history
- WHEN ((SELECT count(*) FROM history WHERE track=old.track) = 0 AND
- (SELECT count(*) FROM blocklist WHERE track=old.track) = 0)
- BEGIN
- DELETE FROM tracks WHERE id = old.track;
- DELETE FROM tracks_genres WHERE track = old.track;
- END;
- ''')
- # DELETE Tracks_Genres → Genres table
- connection.execute('''
- CREATE TRIGGER IF NOT EXISTS del_tracks_genres_cleanup_genres
- AFTER DELETE ON tracks_genres
- WHEN ((SELECT count(*) FROM tracks_genres WHERE genre=old.genre) = 0)
- BEGIN
- DELETE FROM genres WHERE id = old.genre;
- END;
- ''')
- # DELETE Tracks → Artists table
- connection.execute('''
- CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_artists
- AFTER DELETE ON tracks
- WHEN ((SELECT count(*) FROM tracks WHERE artist=old.artist) = 0 AND
- (SELECT count(*) FROM blocklist WHERE artist=old.artist) = 0)
- BEGIN
- DELETE FROM artists WHERE id = old.artist;
- END;
- ''')
- # DELETE Tracks → Albums table
- connection.execute('''
- CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_albums
- AFTER DELETE ON tracks
- WHEN ((SELECT count(*) FROM tracks WHERE album=old.album) = 0 AND
- (SELECT count(*) FROM blocklist WHERE album=old.album) = 0)
- BEGIN
- DELETE FROM albums WHERE id = old.album;
- END;
- ''')
- # DELETE Tracks → cleanup AlbumArtists table
- connection.execute('''
- CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_albumartists
- AFTER DELETE ON tracks
- WHEN ((SELECT count(*) FROM tracks WHERE albumartist=old.albumartist) = 0)
- BEGIN
- DELETE FROM albumartists WHERE id = old.albumartist;
- END;
- ''')
- # DELETE blocklist → Tracks table
- connection.execute('''
- CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_tracks
- AFTER DELETE ON blocklist
- WHEN ((SELECT count(*) FROM history WHERE track=old.track) = 0 AND
- (SELECT count(*) FROM blocklist WHERE track=old.track) = 0)
- BEGIN
- DELETE FROM tracks WHERE id = old.track;
- END;
- ''')
- # DELETE blocklist → Artists table
- # The "SELECT count(*) FROM blocklist" is useless,
- # there can be only one blocklist.artist
- connection.execute('''
- CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_artists
- AFTER DELETE ON blocklist
- WHEN ((SELECT count(*) FROM tracks WHERE artist=old.artist) = 0 AND
- (SELECT count(*) FROM blocklist WHERE artist=old.artist) = 0)
- BEGIN
- DELETE FROM artists WHERE id = old.artist;
- END;
- ''')
- # DELETE Tracks → Albums table
- # The "SELECT count(*) FROM blocklist" is useless,
- # there can be only one blocklist.album
- connection.execute('''
- CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_albums
- AFTER DELETE ON blocklist
- WHEN ((SELECT count(*) FROM tracks WHERE album=old.album) = 0 AND
- (SELECT count(*) FROM blocklist WHERE album=old.album) = 0)
- BEGIN
- DELETE FROM albums WHERE id = old.album;
- END;
- ''')
- connection.close()
-
- def drop_all(self):
- connection = self.get_database_connection()
- rows = connection.execute(
- "SELECT name FROM sqlite_master WHERE type='table'")
- for r in rows.fetchall():
- connection.execute(f'DROP TABLE IF EXISTS {r[0]}')
- connection.close()
-
- def _remove_blocklist_id(self, blid, with_connection=None):
- """Remove id"""
- if with_connection:
- connection = with_connection
- else:
- connection = self.get_database_connection()
- connection = self.get_database_connection()
- connection.execute('DELETE FROM blocklist'
- ' WHERE blocklist.id = ?', (blid,))
- connection.commit()
- if not with_connection:
- connection.close()
-
- def _get_album(self, album, connection):
- if album.mbid:
- return connection.execute(
- "SELECT id FROM albums WHERE mbid = ?",
- (album.mbid,))
- else:
- return connection.execute(
- "SELECT id FROM albums WHERE name = ? AND mbid IS NULL",
- (album.name,))
-
- def get_album(self, album, with_connection=None, add=True):
- """get album information from the database.
- if not in database insert new entry.
-
- :param sima.lib.meta.Album album: album objet
- :param sqlite3.Connection with_connection: SQLite connection
- """
- if with_connection:
- connection = with_connection
- else:
- connection = self.get_database_connection()
- rows = self._get_album(album, connection)
- for row in rows:
- if not with_connection:
- connection.close()
- return row[0]
- if not add:
- if not with_connection:
- connection.close()
- return None
- connection.execute(
- "INSERT INTO albums (name, mbid) VALUES (?, ?)",
- (album.name, album.mbid))
- connection.commit()
- rows = self._get_album(album, connection)
- for row in rows:
- if not with_connection:
- connection.close()
- return row[0]
- if not with_connection:
- connection.close()
- return None
-
- def _get_albumartist(self, artist, connection):
- if artist.mbid:
- return connection.execute(
- "SELECT id FROM albumartists WHERE mbid = ?",
- (artist.mbid,))
- else:
- return connection.execute(
- "SELECT id FROM albumartists WHERE name = ? AND mbid IS NULL",
- (artist.name,))
-
- def get_albumartist(self, artist, with_connection=None, add=True):
- """get albumartist information from the database.
- if not in database insert new entry.
-
- :param sima.lib.meta.Artist artist: artist
- :param sqlite3.Connection with_connection: SQLite connection
- """
- if with_connection:
- connection = with_connection
- else:
- connection = self.get_database_connection()
- rows = self._get_albumartist(artist, connection)
- for row in rows:
- if not with_connection:
- connection.close()
- return row[0]
- if not add:
- if not with_connection:
- connection.close()
- return None
- connection.execute(
- "INSERT INTO albumartists (name, mbid) VALUES (?, ?)",
- (artist.name, artist.mbid))
- connection.commit()
- rows = self._get_albumartist(artist, connection)
- for row in rows:
- if not with_connection:
- connection.close()
- return row[0]
- if not with_connection:
- connection.close()
-
- def _get_artist(self, artist, connection):
- if artist.mbid:
- return connection.execute(
- "SELECT id FROM artists WHERE mbid = ?",
- (artist.mbid,))
- else:
- return connection.execute(
- "SELECT id FROM artists WHERE name = ? AND mbid IS NULL", (artist.name,))
-
- def get_artist(self, artist, with_connection=None, add=True):
- """get artist information from the database.
- if not in database insert new entry.
-
- :param sima.lib.meta.Artist artist: artist
- :param sqlite3.Connection with_connection: SQLite connection
- """
- if with_connection:
- connection = with_connection
- else:
- connection = self.get_database_connection()
- rows = self._get_artist(artist, connection)
- for row in rows:
- if not with_connection:
- connection.close()
- return row[0]
- if not add:
- if not with_connection:
- connection.close()
- return None
- connection.execute(
- "INSERT INTO artists (name, mbid) VALUES (?, ?)",
- (artist.name, artist.mbid))
- connection.commit()
- rows = self._get_artist(artist, connection)
- for row in rows:
- if not with_connection:
- connection.close()
- return row[0]
- if not with_connection:
- connection.close()
-
- def get_genre(self, genre, with_connection=None, add=True):
- """get genre from the database.
- if not in database insert new entry.
-
- :param str genre: genre as a string
- :param sqlite3.Connection with_connection: SQLite connection
- """
- if with_connection:
- connection = with_connection
- else:
- connection = self.get_database_connection()
- rows = connection.execute(
- "SELECT id FROM genres WHERE name = ?", (genre,))
- for row in rows:
- if not with_connection:
- connection.close()
- return row[0]
- if not add:
- if not with_connection:
- connection.close()
- return None
- connection.execute(
- "INSERT INTO genres (name) VALUES (?)", (genre,))
- connection.commit()
- rows = connection.execute(
- "SELECT id FROM genres WHERE name = ?", (genre,))
- for row in rows:
- if not with_connection:
- connection.close()
- return row[0]
-
- def get_track(self, track, with_connection=None, add=True):
- """Get a track id from Tracks table, add if not existing,
- :param sima.lib.track.Track track: track to use
- :param bool add: add non existing track to database"""
- if not track.file:
- raise SimaDBError('Got a track with no file attribute: %r' % track)
- if with_connection:
- connection = with_connection
- else:
- connection = self.get_database_connection()
- rows = connection.execute(
- "SELECT * FROM tracks WHERE file = ?", (track.file,))
- for row in rows:
- if not with_connection:
- connection.close()
- return row[0]
- if not add: # Not adding non existing track
- connection.close()
- return None
- # Get an artist record or None
- if track.artist:
- art = Artist(name=track.artist, mbid=track.musicbrainz_artistid)
- art_id = self.get_artist(art, with_connection=connection)
- else:
- art_id = None
- # Get an albumartist record or None
- if track.albumartist:
- albart = Artist(name=track.albumartist,
- mbid=track.musicbrainz_albumartistid)
- albart_id = self.get_albumartist(albart, with_connection=connection)
- else:
- albart_id = None
- # Get an album record or None
- if track.album:
- alb = Album(name=track.album, mbid=track.musicbrainz_albumid)
- alb_id = self.get_album(alb, with_connection=connection)
- else:
- alb_id = None
- connection.execute(
- """INSERT INTO tracks (artist, albumartist, album, title, mbid, file)
- VALUES (?, ?, ?, ?, ?, ?)""",
- (art_id, albart_id, alb_id, track.title, track.musicbrainz_trackid,
- track.file))
- connection.commit()
- # Add track id to junction tables
- self._add_tracks_genres(track, connection)
- rows = connection.execute(
- "SELECT id FROM tracks WHERE file = ?", (track.file,))
- for row in rows:
- if not with_connection:
- connection.close()
- return row[0]
- if not with_connection:
- connection.close()
- return None
-
- def _add_tracks_genres(self, track, connection):
- if not track.genres:
- return None
- rows = connection.execute(
- "SELECT id FROM tracks WHERE file = ?", (track.file,))
- trk_id = rows.fetchone()[0]
- for genre in track.genres:
- # add genre
- gen_id = self.get_genre(genre)
- connection.execute("""INSERT INTO tracks_genres (track, genre)
- VALUES (?, ?)""", (trk_id, gen_id))
-
- def add_history(self, track, date=None):
- """Record last play date of track (ie. not a real exhautive play history).
- :param track sima.lib.track.Track: track to add to history"""
- if not date:
- date = datetime.now()
- connection = self.get_database_connection()
- track_id = self.get_track(track, with_connection=connection)
- rows = connection.execute("SELECT * FROM history WHERE track = ? ",
- (track_id,))
- if not rows.fetchone():
- connection.execute("INSERT INTO history (track) VALUES (?)",
- (track_id,))
- connection.execute("UPDATE history SET last_play = ? "
- " WHERE track = ?", (date, track_id,))
- connection.commit()
- connection.close()
-
- def purge_history(self, duration=__HIST_DURATION__):
- """Remove old entries in history
- :param duration int: Purge history record older than duration in hours
- (defaults to __HIST_DURATION__)"""
- connection = self.get_database_connection()
- connection.execute("DELETE FROM history WHERE last_play"
- " < datetime('now', '-%i hours')" % duration)
- connection.execute('VACUUM')
- connection.commit()
- connection.close()
-
- def fetch_albums_history(self, needle=None, duration=__HIST_DURATION__):
- """
- :param sima.lib.meta.Artist needle: When specified, returns albums history for this artist.
- """
- date = datetime.utcnow() - timedelta(hours=duration)
- connection = self.get_database_connection()
- connection.row_factory = sqlite3.Row
- rows = connection.execute("""
- SELECT albums.name AS name,
- albums.mbid as mbid,
- artists.name as artist,
- artists.mbid as artist_mbib
- FROM history
- JOIN tracks ON history.track = tracks.id
- LEFT OUTER JOIN albums ON tracks.album = albums.id
- LEFT OUTER JOIN artists ON tracks.artist = artists.id
- WHERE history.last_play > ? AND albums.name NOT NULL AND artists.name NOT NULL
- ORDER BY history.last_play DESC""", (date.isoformat(' '),))
- hist = list()
- for row in rows:
- vals = dict(row)
- artist = Artist(name=vals.pop('artist'),
- mbid=vals.pop('artist_mbib'))
- if needle:
- if needle != artist:
- continue
- album = Album(**vals, artist=artist)
- if hist and hist[-1] == album:
- # remove consecutive dupes
- continue
- hist.append(album)
- connection.close()
- return hist
-
- def fetch_artists_history(self, needle=None, duration=__HIST_DURATION__):
- """Returns a list of Artist objects
- :param sima.lib.meta.Artist|sima.lib.meta.MetaContainer needle: When specified, returns history for this artist, it's actually testing the artist presence in history.
- :param sima.lib.meta.MetaContainer needle: When specified, returns history for these artists only
- """
- date = datetime.utcnow() - timedelta(hours=duration)
- connection = self.get_database_connection()
- connection.row_factory = sqlite3.Row
- rows = connection.execute("""
- SELECT artists.name AS name,
- artists.mbid as mbid
- FROM history
- JOIN tracks ON history.track = tracks.id
- LEFT OUTER JOIN artists ON tracks.artist = artists.id
- WHERE history.last_play > ? AND artists.name NOT NULL
- ORDER BY history.last_play DESC""", (date.isoformat(' '),))
- last = deque(maxlen=1)
- hist = list()
- for row in rows:
- artist = Artist(**row)
- if last and last[0] == artist: # remove consecutive dupes
- continue
- last.append(artist)
- if needle and isinstance(needle, (Artist, str)):
- if needle == artist:
- hist.append(artist) # No need to go further
- break
- continue
- elif needle and getattr(needle, '__contains__'):
- if artist in needle:
- hist.append(artist) # No need to go further
- continue
- hist.append(artist)
- connection.close()
- return hist
-
- def fetch_genres_history(self, duration=__HIST_DURATION__, limit=20):
- date = datetime.utcnow() - timedelta(hours=duration)
- connection = self.get_database_connection()
- rows = connection.execute("""
- SELECT genres.name, artists.name
- FROM history
- JOIN tracks ON history.track = tracks.id
- LEFT OUTER JOIN tracks_genres ON tracks_genres.track = tracks.id
- LEFT OUTER JOIN artists ON tracks.artist = artists.id
- LEFT OUTER JOIN genres ON genres.id = tracks_genres.genre
- WHERE history.last_play > ?
- ORDER BY history.last_play DESC
- """, (date.isoformat(' '),))
- genres = list()
- for row in rows:
- genres.append(row)
- if len({g[0] for g in genres}) >= limit:
- break
- connection.close()
- return genres
-
- def fetch_history(self, artist=None, duration=__HIST_DURATION__):
- """Fetches tracks history, more recent first
- :param sima.lib.meta.Artist artist: limit history to this artist
- :param int duration: How long ago to fetch history from
- """
- date = datetime.utcnow() - timedelta(hours=duration)
- connection = self.get_database_connection()
- connection.row_factory = sqlite3.Row
- sql = """
- SELECT tracks.title, tracks.file, artists.name AS artist,
- albumartists.name AS albumartist,
- artists.mbid as musicbrainz_artistid,
- albums.name AS album,
- albums.mbid AS musicbrainz_albumid,
- tracks.mbid as musicbrainz_trackid
- FROM history
- JOIN tracks ON history.track = tracks.id
- LEFT OUTER JOIN artists ON tracks.artist = artists.id
- LEFT OUTER JOIN albumartists ON tracks.albumartist = albumartists.id
- LEFT OUTER JOIN albums ON tracks.album = albums.id
- WHERE history.last_play > ?
- """
- if artist:
- if artist.mbid:
- rows = connection.execute(sql+"""
- AND artists.mbid = ?
- ORDER BY history.last_play DESC""",
- (date.isoformat(' '), artist.mbid))
- else:
- rows = connection.execute(sql+"""
- AND artists.name = ?
- ORDER BY history.last_play DESC""",
- (date.isoformat(' '), artist.name))
- else:
- rows = connection.execute(sql+'ORDER BY history.last_play DESC',
- (date.isoformat(' '),))
- hist = list()
- for row in rows:
- hist.append(Track(**row))
- connection.close()
- return hist
-
- def get_bl_track(self, track, with_connection=None, add=True):
- """Add a track to blocklist
- :param sima.lib.track.Track track: Track object to add to blocklist
- :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one
- :param bool add: Default is to add a new record, set to False to fetch associated record"""
- if with_connection:
- connection = with_connection
- else:
- connection = self.get_database_connection()
- track_id = self.get_track(track, with_connection=connection, add=True)
- rows = connection.execute(
- "SELECT id FROM blocklist WHERE track = ?", (track_id,))
- if not rows.fetchone():
- if not add:
- return None
- connection.execute('INSERT INTO blocklist (track) VALUES (?)',
- (track_id,))
- connection.commit()
- rows = connection.execute(
- "SELECT id FROM blocklist WHERE track = ?", (track_id,))
- bl = rows.fetchone()[0]
- if not with_connection:
- connection.close()
- return bl
-
- def get_bl_album(self, album, with_connection=None, add=True):
- """Add an album to blocklist
- :param sima.lib.meta.Album: Album object to add to blocklist
- :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one
- :param bool add: Default is to add a new record, set to False to fetch associated record"""
- if with_connection:
- connection = with_connection
- else:
- connection = self.get_database_connection()
- album_id = self.get_album(album, with_connection=connection, add=True)
- rows = connection.execute(
- "SELECT id FROM blocklist WHERE album = ?", (album_id,))
- if not rows.fetchone():
- if not add:
- return None
- connection.execute('INSERT INTO blocklist (album) VALUES (?)',
- (album_id,))
- connection.commit()
- rows = connection.execute(
- "SELECT id FROM blocklist WHERE album = ?", (album_id,))
- bl = rows.fetchone()[0]
- if not with_connection:
- connection.close()
- return bl
-
- def get_bl_artist(self, artist, with_connection=None, add=True):
- """Add an artist to blocklist
- :param sima.lib.meta.Artist: Artist object to add to blocklist
- :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one
- :param bool add: Default is to add a new record, set to False to fetch associated record"""
- if with_connection:
- connection = with_connection
- else:
- connection = self.get_database_connection()
- artist_id = self.get_artist(artist, with_connection=connection, add=True)
- rows = connection.execute(
- "SELECT id FROM blocklist WHERE artist = ?", (artist_id,))
- if not rows.fetchone():
- if not add:
- return None
- connection.execute('INSERT INTO blocklist (artist) VALUES (?)',
- (artist_id,))
- connection.commit()
- rows = connection.execute(
- "SELECT id FROM blocklist WHERE artist = ?", (artist_id,))
- bl = rows.fetchone()[0]
- if not with_connection:
- connection.close()
- return bl
-
- def delete_bl(self, track=None, album=None, artist=None):
- if not (track or album or artist):
- return
- connection = self.get_database_connection()
- blid = None
- if track:
- blid = self.get_bl_track(track, with_connection=connection)
- if album:
- blid = self.get_bl_album(album, with_connection=connection)
- if artist:
- blid = self.get_bl_artist(artist, with_connection=connection)
- if not blid:
- return
- self._remove_blocklist_id(blid, with_connection=connection)
- connection.close()
-
-
-# VIM MODLINE
-# vim: ai ts=4 sw=4 sts=4 expandtab fileencoding=utf8
-# -*- coding: utf-8 -*-
-#
-# Copyright (c) 2009-2013, 2019-2020 kaliko <kaliko@azylum.org>
-# Copyright (c) 2009, Eric Casteleijn <thisfred@gmail.com>
-# Copyright (c) 2008 Rick van Hattem
+# Copyright (c) 2009-2013, 2019-2021 kaliko <kaliko@azylum.org>
#
# This file is part of sima
#
"""SQlite database library
"""
-# DOC:
-# MuscicBrainz ID: <http://musicbrainz.org/doc/MusicBrainzIdentifier>
-# Artists: <http://musicbrainz.org/doc/Artist_Name>
-# <http://musicbrainz.org/doc/Same_Artist_With_Different_Names>
-
-__DB_VERSION__ = 3
+__DB_VERSION__ = 4
__HIST_DURATION__ = int(30 * 24) # in hours
import sqlite3
+from collections import deque
from datetime import (datetime, timedelta)
-from os.path import dirname, isdir
-from os import (access, W_OK, F_OK)
+
+from sima.lib.meta import Artist, Album
+from sima.lib.track import Track
class SimaDBError(Exception):
"""
-class SimaDBAccessError(SimaDBError):
- """Error on accessing DB file"""
-
-
-class SimaDBNoFile(SimaDBError):
- """No DB file present"""
-
-
class SimaDB:
"SQLite management"
def __init__(self, db_path=None):
self._db_path = db_path
- self.db_path_mod_control()
-
- def db_path_mod_control(self):
- """Controls DB path access & write permissions"""
- db_path = self._db_path
- # Controls directory access
- if not isdir(dirname(db_path)):
- raise SimaDBAccessError('Not a regular directory: "%s"' %
- dirname(db_path))
- if not access(dirname(db_path), W_OK):
- raise SimaDBAccessError('No write access to "%s"' % dirname(db_path))
- # Is a file but no write access
- if access(db_path, F_OK) and not access(db_path, W_OK | F_OK):
- raise SimaDBAccessError('No write access to "%s"' % db_path)
- # No file
- if not access(db_path, F_OK):
- raise SimaDBNoFile('No DB file in "%s"' % db_path)
-
- def close_database_connection(self, connection):
- """Close the database connection."""
- connection.close()
def get_database_connection(self):
"""get database reference"""
connection = sqlite3.connect(
- self._db_path, timeout=5.0, isolation_level=None)
- #connection.text_factory = str
+ self._db_path, isolation_level=None)
return connection
- def get_artist(self, artist_name, mbid=None,
- with_connection=None, add_not=False):
- """get artist information from the database.
- if not in database insert new entry."""
+ def create_db(self):
+ """ Set up a database
+ """
+ connection = self.get_database_connection()
+ connection.execute(
+ 'CREATE TABLE IF NOT EXISTS db_info'
+ ' (name CHAR(50), value CHAR(50))')
+ connection.execute('''INSERT INTO db_info (name, value) SELECT ?, ?
+ WHERE NOT EXISTS
+ ( SELECT 1 FROM db_info WHERE name = ? )''',
+ ('DB Version', __DB_VERSION__, 'DB Version'))
+ connection.execute( # ARTISTS
+ 'CREATE TABLE IF NOT EXISTS artists (id INTEGER PRIMARY KEY, '
+ 'name VARCHAR(100), mbid CHAR(36))')
+ connection.execute( # ALBUMS
+ 'CREATE TABLE IF NOT EXISTS albums (id INTEGER PRIMARY KEY, '
+ 'name VARCHAR(100), mbid CHAR(36))')
+ connection.execute( # ALBUMARTISTS
+ 'CREATE TABLE IF NOT EXISTS albumartists (id INTEGER PRIMARY KEY, '
+ 'name VARCHAR(100), mbid CHAR(36))')
+ connection.execute( # TRACKS
+ 'CREATE TABLE IF NOT EXISTS tracks (id INTEGER PRIMARY KEY, '
+ 'title VARCHAR(100), artist INTEGER, '
+ 'album INTEGER, albumartist INTEGER, '
+ 'file VARCHAR(500), mbid CHAR(36), '
+ 'FOREIGN KEY(artist) REFERENCES artists(id), '
+ 'FOREIGN KEY(album) REFERENCES albums(id), '
+ 'FOREIGN KEY(albumartist) REFERENCES albumartists(id))')
+ connection.execute( # HISTORY
+ 'CREATE TABLE IF NOT EXISTS history (id INTEGER PRIMARY KEY, '
+ 'last_play TIMESTAMP, track INTEGER, '
+ 'FOREIGN KEY(track) REFERENCES tracks(id))')
+ connection.execute( # BLOCKLIST
+ 'CREATE TABLE IF NOT EXISTS blocklist (id INTEGER PRIMARY KEY, '
+ 'artist INTEGER, album INTEGER, track INTEGER, '
+ 'FOREIGN KEY(artist) REFERENCES artists(id), '
+ 'FOREIGN KEY(album) REFERENCES albums(id), '
+ 'FOREIGN KEY(track) REFERENCES tracks(id))')
+ connection.execute( # Genres (Many-to-many)
+ 'CREATE TABLE IF NOT EXISTS genres '
+ '(id INTEGER PRIMARY KEY, name VARCHAR(100))')
+ connection.execute( # Junction Genres Tracks
+ """CREATE TABLE IF NOT EXISTS tracks_genres
+ ( track INTEGER, genre INTEGER,
+ FOREIGN KEY(track) REFERENCES tracks(id)
+ FOREIGN KEY(genre) REFERENCES genres(id))""")
+ # Create cleanup triggers:
+ # DELETE history → Tracks / Tracks_genres tables
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_history_cleanup_tracks
+ AFTER DELETE ON history
+ WHEN ((SELECT count(*) FROM history WHERE track=old.track) = 0 AND
+ (SELECT count(*) FROM blocklist WHERE track=old.track) = 0)
+ BEGIN
+ DELETE FROM tracks WHERE id = old.track;
+ DELETE FROM tracks_genres WHERE track = old.track;
+ END;
+ ''')
+ # DELETE Tracks_Genres → Genres table
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_tracks_genres_cleanup_genres
+ AFTER DELETE ON tracks_genres
+ WHEN ((SELECT count(*) FROM tracks_genres WHERE genre=old.genre) = 0)
+ BEGIN
+ DELETE FROM genres WHERE id = old.genre;
+ END;
+ ''')
+ # DELETE Tracks → Artists table
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_artists
+ AFTER DELETE ON tracks
+ WHEN ((SELECT count(*) FROM tracks WHERE artist=old.artist) = 0 AND
+ (SELECT count(*) FROM blocklist WHERE artist=old.artist) = 0)
+ BEGIN
+ DELETE FROM artists WHERE id = old.artist;
+ END;
+ ''')
+ # DELETE Tracks → Albums table
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_albums
+ AFTER DELETE ON tracks
+ WHEN ((SELECT count(*) FROM tracks WHERE album=old.album) = 0 AND
+ (SELECT count(*) FROM blocklist WHERE album=old.album) = 0)
+ BEGIN
+ DELETE FROM albums WHERE id = old.album;
+ END;
+ ''')
+ # DELETE Tracks → cleanup AlbumArtists table
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_tracks_cleanup_albumartists
+ AFTER DELETE ON tracks
+ WHEN ((SELECT count(*) FROM tracks WHERE albumartist=old.albumartist) = 0)
+ BEGIN
+ DELETE FROM albumartists WHERE id = old.albumartist;
+ END;
+ ''')
+ # DELETE blocklist → Tracks table
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_tracks
+ AFTER DELETE ON blocklist
+ WHEN ((SELECT count(*) FROM history WHERE track=old.track) = 0 AND
+ (SELECT count(*) FROM blocklist WHERE track=old.track) = 0)
+ BEGIN
+ DELETE FROM tracks WHERE id = old.track;
+ END;
+ ''')
+ # DELETE blocklist → Artists table
+ # The "SELECT count(*) FROM blocklist" is useless,
+ # there can be only one blocklist.artist
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_artists
+ AFTER DELETE ON blocklist
+ WHEN ((SELECT count(*) FROM tracks WHERE artist=old.artist) = 0 AND
+ (SELECT count(*) FROM blocklist WHERE artist=old.artist) = 0)
+ BEGIN
+ DELETE FROM artists WHERE id = old.artist;
+ END;
+ ''')
+ # DELETE Tracks → Albums table
+ # The "SELECT count(*) FROM blocklist" is useless,
+ # there can be only one blocklist.album
+ connection.execute('''
+ CREATE TRIGGER IF NOT EXISTS del_blocklist_cleanup_albums
+ AFTER DELETE ON blocklist
+ WHEN ((SELECT count(*) FROM tracks WHERE album=old.album) = 0 AND
+ (SELECT count(*) FROM blocklist WHERE album=old.album) = 0)
+ BEGIN
+ DELETE FROM albums WHERE id = old.album;
+ END;
+ ''')
+ connection.close()
+
+ def drop_all(self):
+ connection = self.get_database_connection()
+ rows = connection.execute(
+ "SELECT name FROM sqlite_master WHERE type='table'")
+ for r in rows.fetchall():
+ connection.execute(f'DROP TABLE IF EXISTS {r[0]}')
+ connection.close()
+
+ def _remove_blocklist_id(self, blid, with_connection=None):
+ """Remove id"""
if with_connection:
connection = with_connection
else:
connection = self.get_database_connection()
- rows = connection.execute(
- "SELECT * FROM artists WHERE name = ?", (artist_name,))
- for row in rows:
- if not with_connection:
- self.close_database_connection(connection)
- return row
- if add_not:
- if not with_connection:
- self.close_database_connection(connection)
- return False
- connection.execute(
- "INSERT INTO artists (name, mbid) VALUES (?, ?)",
- (artist_name, mbid))
+ connection = self.get_database_connection()
+ connection.execute('DELETE FROM blocklist'
+ ' WHERE blocklist.id = ?', (blid,))
connection.commit()
- rows = connection.execute(
- "SELECT * FROM artists WHERE name = ?", (artist_name,))
- for row in rows:
- if not with_connection:
- self.close_database_connection(connection)
- return row
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
+
+ def _get_album(self, album, connection):
+ if album.mbid:
+ return connection.execute(
+ "SELECT id FROM albums WHERE mbid = ?",
+ (album.mbid,))
+ else:
+ return connection.execute(
+ "SELECT id FROM albums WHERE name = ? AND mbid IS NULL",
+ (album.name,))
- def get_track(self, track, with_connection=None, add_not=False):
+ def get_album(self, album, with_connection=None, add=True):
+ """get album information from the database.
+ if not in database insert new entry.
+
+ :param sima.lib.meta.Album album: album objet
+ :param sqlite3.Connection with_connection: SQLite connection
"""
- Get a track from Tracks table, add if not existing,
- Attention: use Track() object!!
- if not in database insert new entry."""
- art = track.artist
- nam = track.title
- fil = track.file
if with_connection:
connection = with_connection
else:
connection = self.get_database_connection()
- art_id = self.get_artist(art, with_connection=connection)[0]
- alb_id = self.get_album(track, with_connection=connection)[0]
- rows = connection.execute(
- "SELECT * FROM tracks WHERE name = ? AND"
- " artist = ? AND file = ?", (nam, art_id, fil))
+ rows = self._get_album(album, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
- if add_not:
- return False
+ connection.close()
+ return row[0]
+ if not add:
+ if not with_connection:
+ connection.close()
+ return None
connection.execute(
- "INSERT INTO tracks (artist, album, name, file) VALUES (?, ?, ?, ?)",
- (art_id, alb_id, nam, fil))
+ "INSERT INTO albums (name, mbid) VALUES (?, ?)",
+ (album.name, album.mbid))
connection.commit()
- rows = connection.execute(
- "SELECT * FROM tracks WHERE name = ? AND"
- " artist = ? AND album = ? AND file = ?",
- (nam, art_id, alb_id, fil,))
+ rows = self._get_album(album, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
+ connection.close()
+ return row[0]
if not with_connection:
- connection.commit()
- self.close_database_connection(connection)
+ connection.close()
+ return None
+
+ def _get_albumartist(self, artist, connection):
+ if artist.mbid:
+ return connection.execute(
+ "SELECT id FROM albumartists WHERE mbid = ?",
+ (artist.mbid,))
+ else:
+ return connection.execute(
+ "SELECT id FROM albumartists WHERE name = ? AND mbid IS NULL",
+ (artist.name,))
- def get_album(self, track, mbid=None,
- with_connection=None, add_not=False):
- """
- get album information from the database.
+ def get_albumartist(self, artist, with_connection=None, add=True):
+ """get albumartist information from the database.
if not in database insert new entry.
- Attention: use Track|Album object!!
- Use AlbumArtist tag if provided, fallback to Album tag
+
+ :param sima.lib.meta.Artist artist: artist
+ :param sqlite3.Connection with_connection: SQLite connection
"""
if with_connection:
connection = with_connection
else:
connection = self.get_database_connection()
- if track.albumartist:
- artist = track.albumartist
- else:
- artist = track.artist
- art_id = self.get_artist(artist, with_connection=connection)[0]
- album = track.album
- rows = connection.execute(
- "SELECT * FROM albums WHERE name = ? AND artist = ?",
- (album, art_id))
+ rows = self._get_albumartist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
- if add_not:
- return False
+ connection.close()
+ return row[0]
+ if not add:
+ if not with_connection:
+ connection.close()
+ return None
connection.execute(
- "INSERT INTO albums (name, artist, mbid) VALUES (?, ?, ?)",
- (album, art_id, mbid))
+ "INSERT INTO albumartists (name, mbid) VALUES (?, ?)",
+ (artist.name, artist.mbid))
connection.commit()
- rows = connection.execute(
- "SELECT * FROM albums WHERE name = ? AND artist = ?",
- (album, art_id))
+ rows = self._get_albumartist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
+ connection.close()
+ return row[0]
if not with_connection:
- self.close_database_connection(connection)
+ connection.close()
- def get_artists(self, with_connection=None):
- """Returns all artists in DB"""
- if with_connection:
- connection = with_connection
+ def _get_artist(self, artist, connection):
+ if artist.mbid:
+ return connection.execute(
+ "SELECT id FROM artists WHERE mbid = ?",
+ (artist.mbid,))
else:
- connection = self.get_database_connection()
- rows = connection.execute("SELECT name FROM artists ORDER BY name")
- results = [row for row in rows]
- if not with_connection:
- self.close_database_connection(connection)
- for artist in results:
- yield artist
+ return connection.execute(
+ "SELECT id FROM artists WHERE name = ? AND mbid IS NULL", (artist.name,))
+
+ def get_artist(self, artist, with_connection=None, add=True):
+ """get artist information from the database.
+ if not in database insert new entry.
- def get_bl_artist(self, artist_name,
- with_connection=None, add_not=None):
- """get blacklisted artist information from the database."""
+ :param sima.lib.meta.Artist artist: artist
+ :param sqlite3.Connection with_connection: SQLite connection
+ """
if with_connection:
connection = with_connection
else:
connection = self.get_database_connection()
- art = self.get_artist(artist_name, with_connection=connection,
- add_not=add_not)
- if not art:
- return False
- art_id = art[0]
- rows = connection.execute("SELECT * FROM black_list WHERE artist = ?",
- (art_id,))
+ rows = self._get_artist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
- if add_not:
+ connection.close()
+ return row[0]
+ if not add:
if not with_connection:
- self.close_database_connection(connection)
- return False
- connection.execute("INSERT INTO black_list (artist) VALUES (?)",
- (art_id,))
- connection.execute("UPDATE black_list SET updated = DATETIME('now')"
- " WHERE artist = ?", (art_id,))
+ connection.close()
+ return None
+ connection.execute(
+ "INSERT INTO artists (name, mbid) VALUES (?, ?)",
+ (artist.name, artist.mbid))
connection.commit()
- rows = connection.execute("SELECT * FROM black_list WHERE artist = ?",
- (art_id,))
+ rows = self._get_artist(artist, connection)
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
+ connection.close()
+ return row[0]
if not with_connection:
- self.close_database_connection(connection)
- return False
+ connection.close()
- def get_bl_album(self, track, with_connection=None, add_not=None):
- """get blacklisted album information from the database."""
+ def get_genre(self, genre, with_connection=None, add=True):
+ """get genre from the database.
+ if not in database insert new entry.
+
+ :param str genre: genre as a string
+ :param sqlite3.Connection with_connection: SQLite connection
+ """
if with_connection:
connection = with_connection
else:
connection = self.get_database_connection()
- album = self.get_album(track, with_connection=connection,
- add_not=add_not)
- if not album:
- return False
- alb_id = album[0]
- rows = connection.execute("SELECT * FROM black_list WHERE album = ?",
- (alb_id,))
+ rows = connection.execute(
+ "SELECT id FROM genres WHERE name = ?", (genre,))
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
- if add_not:
+ connection.close()
+ return row[0]
+ if not add:
if not with_connection:
- self.close_database_connection(connection)
- return False
- connection.execute("INSERT INTO black_list (album) VALUES (?)",
- (alb_id,))
- connection.execute("UPDATE black_list SET updated = DATETIME('now')"
- " WHERE album = ?", (alb_id,))
+ connection.close()
+ return None
+ connection.execute(
+ "INSERT INTO genres (name) VALUES (?)", (genre,))
connection.commit()
- rows = connection.execute("SELECT * FROM black_list WHERE album = ?",
- (alb_id,))
+ rows = connection.execute(
+ "SELECT id FROM genres WHERE name = ?", (genre,))
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
- if not with_connection:
- self.close_database_connection(connection)
- return False
-
- def get_bl_track(self, track, with_connection=None, add_not=None):
- """get blacklisted track information from the database."""
+ connection.close()
+ return row[0]
+
+ def get_track(self, track, with_connection=None, add=True):
+ """Get a track id from Tracks table, add if not existing,
+ :param sima.lib.track.Track track: track to use
+ :param bool add: add non existing track to database"""
+ if not track.file:
+ raise SimaDBError('Got a track with no file attribute: %r' % track)
if with_connection:
connection = with_connection
else:
connection = self.get_database_connection()
- track = self.get_track(track, with_connection=connection,
- add_not=add_not)
- if not track:
- return False
- track_id = track[0]
- rows = connection.execute("SELECT * FROM black_list WHERE track = ?",
- (track_id,))
+ rows = connection.execute(
+ "SELECT * FROM tracks WHERE file = ?", (track.file,))
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
- if add_not:
- if not with_connection:
- self.close_database_connection(connection)
- return False
- connection.execute("INSERT INTO black_list (track) VALUES (?)",
- (track_id,))
- connection.execute("UPDATE black_list SET updated = DATETIME('now')"
- " WHERE track = ?", (track_id,))
+ connection.close()
+ return row[0]
+ if not add: # Not adding non existing track
+ connection.close()
+ return None
+ # Get an artist record or None
+ if track.artist:
+ art = Artist(name=track.artist, mbid=track.musicbrainz_artistid)
+ art_id = self.get_artist(art, with_connection=connection)
+ else:
+ art_id = None
+ # Get an albumartist record or None
+ if track.albumartist:
+ albart = Artist(name=track.albumartist,
+ mbid=track.musicbrainz_albumartistid)
+ albart_id = self.get_albumartist(albart, with_connection=connection)
+ else:
+ albart_id = None
+ # Get an album record or None
+ if track.album:
+ alb = Album(name=track.album, mbid=track.musicbrainz_albumid)
+ alb_id = self.get_album(alb, with_connection=connection)
+ else:
+ alb_id = None
+ connection.execute(
+ """INSERT INTO tracks (artist, albumartist, album, title, mbid, file)
+ VALUES (?, ?, ?, ?, ?, ?)""",
+ (art_id, albart_id, alb_id, track.title, track.musicbrainz_trackid,
+ track.file))
connection.commit()
- rows = connection.execute("SELECT * FROM black_list WHERE track = ?",
- (track_id,))
+ # Add track id to junction tables
+ self._add_tracks_genres(track, connection)
+ rows = connection.execute(
+ "SELECT id FROM tracks WHERE file = ?", (track.file,))
for row in rows:
if not with_connection:
- self.close_database_connection(connection)
- return row
+ connection.close()
+ return row[0]
if not with_connection:
- self.close_database_connection(connection)
- return False
+ connection.close()
+ return None
+
+ def _add_tracks_genres(self, track, connection):
+ if not track.genres:
+ return None
+ rows = connection.execute(
+ "SELECT id FROM tracks WHERE file = ?", (track.file,))
+ trk_id = rows.fetchone()[0]
+ for genre in track.genres:
+ # add genre
+ gen_id = self.get_genre(genre)
+ connection.execute("""INSERT INTO tracks_genres (track, genre)
+ VALUES (?, ?)""", (trk_id, gen_id))
+
+ def add_history(self, track, date=None):
+ """Record last play date of track (ie. not a real exhautive play history).
+ :param track sima.lib.track.Track: track to add to history"""
+ if not date:
+ date = datetime.now()
+ connection = self.get_database_connection()
+ track_id = self.get_track(track, with_connection=connection)
+ rows = connection.execute("SELECT * FROM history WHERE track = ? ",
+ (track_id,))
+ if not rows.fetchone():
+ connection.execute("INSERT INTO history (track) VALUES (?)",
+ (track_id,))
+ connection.execute("UPDATE history SET last_play = ? "
+ " WHERE track = ?", (date, track_id,))
+ connection.commit()
+ connection.close()
+
+ def purge_history(self, duration=__HIST_DURATION__):
+ """Remove old entries in history
+ :param duration int: Purge history record older than duration in hours
+ (defaults to __HIST_DURATION__)"""
+ connection = self.get_database_connection()
+ connection.execute("DELETE FROM history WHERE last_play"
+ " < datetime('now', '-%i hours')" % duration)
+ connection.execute('VACUUM')
+ connection.commit()
+ connection.close()
- def get_artists_history(self, artists, duration=__HIST_DURATION__):
+ def fetch_albums_history(self, needle=None, duration=__HIST_DURATION__):
"""
- :param list artists: list of object that can evaluate equality with
- artist name, iterable of str or Artist object
+ :param sima.lib.meta.Artist needle: When specified, returns albums history for this artist.
"""
date = datetime.utcnow() - timedelta(hours=duration)
connection = self.get_database_connection()
- rows = connection.execute(
- "SELECT arts.name, albs.name, trs.name, trs.file"
- " FROM artists AS arts, tracks AS trs, history AS hist, albums AS albs"
- " WHERE trs.id = hist.track AND trs.artist = arts.id AND trs.album = albs.id"
- " AND hist.last_play > ? ORDER BY hist.last_play DESC", (date.isoformat(' '),))
+ connection.row_factory = sqlite3.Row
+ rows = connection.execute("""
+ SELECT albums.name AS name,
+ albums.mbid as mbid,
+ artists.name as artist,
+ artists.mbid as artist_mbib
+ FROM history
+ JOIN tracks ON history.track = tracks.id
+ LEFT OUTER JOIN albums ON tracks.album = albums.id
+ LEFT OUTER JOIN artists ON tracks.artist = artists.id
+ WHERE history.last_play > ? AND albums.name NOT NULL AND artists.name NOT NULL
+ ORDER BY history.last_play DESC""", (date.isoformat(' '),))
+ hist = list()
for row in rows:
- if artists and row[0] not in artists:
+ vals = dict(row)
+ artist = Artist(name=vals.pop('artist'),
+ mbid=vals.pop('artist_mbib'))
+ if needle:
+ if needle != artist:
+ continue
+ album = Album(**vals, artist=artist)
+ if hist and hist[-1] == album:
+ # remove consecutive dupes
continue
- for art in artists:
- if row[0] == art:
- yield art
- self.close_database_connection(connection)
-
- def get_history(self, artist=None, artists=None, duration=__HIST_DURATION__):
- """Retrieve complete play history, most recent tracks first
- artist : filter history for specific artist
- artists : filter history for specific artists list
- """ # pylint: disable=C0301
+ hist.append(album)
+ connection.close()
+ return hist
+
+ def fetch_artists_history(self, needle=None, duration=__HIST_DURATION__):
+ """Returns a list of Artist objects
+ :param sima.lib.meta.Artist|sima.lib.meta.MetaContainer needle: When specified, returns history for this artist, it's actually testing the artist presence in history.
+ :param sima.lib.meta.MetaContainer needle: When specified, returns history for these artists only
+ """
date = datetime.utcnow() - timedelta(hours=duration)
connection = self.get_database_connection()
- if artist:
- rows = connection.execute(
- "SELECT arts.name, albs.name, trs.name, trs.file, hist.last_play"
- " FROM artists AS arts, tracks AS trs, history AS hist, albums AS albs"
- " WHERE trs.id = hist.track AND trs.artist = arts.id AND trs.album = albs.id"
- " AND hist.last_play > ? AND arts.name = ?"
- " ORDER BY hist.last_play DESC", (date.isoformat(' '), artist,))
- else:
- rows = connection.execute(
- "SELECT arts.name, albs.name, trs.name, trs.file"
- " FROM artists AS arts, tracks AS trs, history AS hist, albums AS albs"
- " WHERE trs.id = hist.track AND trs.artist = arts.id AND trs.album = albs.id"
- " AND hist.last_play > ? ORDER BY hist.last_play DESC", (date.isoformat(' '),))
+ connection.row_factory = sqlite3.Row
+ rows = connection.execute("""
+ SELECT artists.name AS name,
+ artists.mbid as mbid
+ FROM history
+ JOIN tracks ON history.track = tracks.id
+ LEFT OUTER JOIN artists ON tracks.artist = artists.id
+ WHERE history.last_play > ? AND artists.name NOT NULL
+ ORDER BY history.last_play DESC""", (date.isoformat(' '),))
+ last = deque(maxlen=1)
+ hist = list()
for row in rows:
- if artists and row[0] not in artists:
+ artist = Artist(**row)
+ if last and last[0] == artist: # remove consecutive dupes
+ continue
+ last.append(artist)
+ if needle and isinstance(needle, (Artist, str)):
+ if needle == artist:
+ hist.append(artist) # No need to go further
+ break
+ continue
+ elif needle and getattr(needle, '__contains__'):
+ if artist in needle:
+ hist.append(artist) # No need to go further
continue
- yield row
- self.close_database_connection(connection)
+ hist.append(artist)
+ connection.close()
+ return hist
- def get_black_list(self):
- """Retrieve complete black list."""
+ def fetch_genres_history(self, duration=__HIST_DURATION__, limit=20):
+ date = datetime.utcnow() - timedelta(hours=duration)
connection = self.get_database_connection()
- rows = connection.execute('SELECT black_list.rowid, artists.name'
- ' FROM artists INNER JOIN black_list'
- ' ON artists.id = black_list.artist')
- yield ('Row ID', 'Actual black listed element', 'Extra information',)
- yield ('',)
- yield ('Row ID', 'Artist',)
+ rows = connection.execute("""
+ SELECT genres.name, artists.name
+ FROM history
+ JOIN tracks ON history.track = tracks.id
+ LEFT OUTER JOIN tracks_genres ON tracks_genres.track = tracks.id
+ LEFT OUTER JOIN artists ON tracks.artist = artists.id
+ LEFT OUTER JOIN genres ON genres.id = tracks_genres.genre
+ WHERE history.last_play > ?
+ ORDER BY history.last_play DESC
+ """, (date.isoformat(' '),))
+ genres = list()
for row in rows:
- yield row
- rows = connection.execute(
- 'SELECT black_list.rowid, albums.name, artists.name'
- ' FROM artists, albums INNER JOIN black_list'
- ' ON albums.id = black_list.album'
- ' WHERE artists.id = albums.artist')
- yield ('',)
- yield ('Row ID', 'Album', 'Artist name')
- for row in rows:
- yield row
- rows = connection.execute(
- 'SELECT black_list.rowid, tracks.name, artists.name'
- ' FROM artists, tracks INNER JOIN black_list'
- ' ON tracks.id = black_list.track'
- ' WHERE tracks.artist = artists.id')
- yield ('',)
- yield ('Row ID', 'Title', 'Artist name')
+ genres.append(row)
+ if len({g[0] for g in genres}) >= limit:
+ break
+ connection.close()
+ return genres
+
+ def fetch_history(self, artist=None, duration=__HIST_DURATION__):
+ """Fetches tracks history, more recent first
+ :param sima.lib.meta.Artist artist: limit history to this artist
+ :param int duration: How long ago to fetch history from
+ """
+ date = datetime.utcnow() - timedelta(hours=duration)
+ connection = self.get_database_connection()
+ connection.row_factory = sqlite3.Row
+ sql = """
+ SELECT tracks.title, tracks.file, artists.name AS artist,
+ albumartists.name AS albumartist,
+ artists.mbid as musicbrainz_artistid,
+ albums.name AS album,
+ albums.mbid AS musicbrainz_albumid,
+ tracks.mbid as musicbrainz_trackid
+ FROM history
+ JOIN tracks ON history.track = tracks.id
+ LEFT OUTER JOIN artists ON tracks.artist = artists.id
+ LEFT OUTER JOIN albumartists ON tracks.albumartist = albumartists.id
+ LEFT OUTER JOIN albums ON tracks.album = albums.id
+ WHERE history.last_play > ?
+ """
+ if artist:
+ if artist.mbid:
+ rows = connection.execute(sql+"""
+ AND artists.mbid = ?
+ ORDER BY history.last_play DESC""",
+ (date.isoformat(' '), artist.mbid))
+ else:
+ rows = connection.execute(sql+"""
+ AND artists.name = ?
+ ORDER BY history.last_play DESC""",
+ (date.isoformat(' '), artist.name))
+ else:
+ rows = connection.execute(sql+'ORDER BY history.last_play DESC',
+ (date.isoformat(' '),))
+ hist = list()
for row in rows:
- yield row
- self.close_database_connection(connection)
+ hist.append(Track(**row))
+ connection.close()
+ return hist
- def _set_mbid(self, artist_id=None, mbid=None, with_connection=None):
- """"""
+ def get_bl_track(self, track, with_connection=None, add=True):
+ """Add a track to blocklist
+ :param sima.lib.track.Track track: Track object to add to blocklist
+ :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one
+ :param bool add: Default is to add a new record, set to False to fetch associated record"""
if with_connection:
connection = with_connection
else:
connection = self.get_database_connection()
- connection.execute("UPDATE artists SET mbid = ? WHERE id = ?",
- (mbid, artist_id))
- connection.commit()
- if not with_connection:
- self.close_database_connection(connection)
-
- def _remove_bl(self, rowid):
- """Remove bl row id"""
- connection = self.get_database_connection()
- connection.execute('DELETE FROM black_list'
- ' WHERE black_list.rowid = ?', (rowid,))
- connection.commit()
- self.close_database_connection(connection)
-
- def add_history(self, track):
- """Add to history"""
- connection = self.get_database_connection()
- track_id = self.get_track(track, with_connection=connection)[0]
- rows = connection.execute("SELECT * FROM history WHERE track = ? ",
- (track_id,))
+ track_id = self.get_track(track, with_connection=connection, add=True)
+ rows = connection.execute(
+ "SELECT id FROM blocklist WHERE track = ?", (track_id,))
if not rows.fetchone():
- connection.execute("INSERT INTO history (track) VALUES (?)",
+ if not add:
+ return None
+ connection.execute('INSERT INTO blocklist (track) VALUES (?)',
(track_id,))
- connection.execute("UPDATE history SET last_play = DATETIME('now') "
- " WHERE track = ?", (track_id,))
- connection.commit()
- self.close_database_connection(connection)
-
- def _clean_artists_table(self, with_connection=None):
- """Clean orphan artists"""
- if with_connection:
- connection = with_connection
- else:
- connection = self.get_database_connection()
- artists_ids = {row[0] for row in connection.execute(
- "SELECT id FROM artists")}
- artist_2_artist_ids = {row[0] for row in connection.execute(
- "SELECT artist FROM black_list")} | {
- row[0] for row in connection.execute(
- "SELECT artist FROM albums")} | {
- row[0] for row in connection.execute(
- "SELECT artist FROM tracks")}
- orphans = [(orphan,) for orphan in artists_ids - artist_2_artist_ids]
- connection.executemany('DELETE FROM artists WHERE id = (?);', orphans)
- if not with_connection:
connection.commit()
- self.close_database_connection(connection)
-
- def _clean_albums_table(self, with_connection=None):
- """Clean orphan albums"""
- if with_connection:
- connection = with_connection
- else:
- connection = self.get_database_connection()
- orphan_black_ids = {row[0] for row in connection.execute(
- """SELECT albums.id FROM albums
- LEFT JOIN black_list ON albums.id = black_list.album
- WHERE ( black_list.album IS NULL )""")}
- orphan_tracks_ids = {row[0] for row in connection.execute(
- """SELECT albums.id FROM albums
- LEFT JOIN tracks ON albums.id = tracks.album
- WHERE tracks.album IS NULL""")}
- orphans = [(orphan,) for orphan in orphan_black_ids & orphan_tracks_ids]
- connection.executemany('DELETE FROM albums WHERE id = (?);', orphans)
+ rows = connection.execute(
+ "SELECT id FROM blocklist WHERE track = ?", (track_id,))
+ bl = rows.fetchone()[0]
if not with_connection:
- connection.commit()
- self.close_database_connection(connection)
-
- def _clean_tracks_table(self, with_connection=None):
- """Clean orphan tracks"""
+ connection.close()
+ return bl
+
+ def get_bl_album(self, album, with_connection=None, add=True):
+ """Add an album to blocklist
+ :param sima.lib.meta.Album: Album object to add to blocklist
+ :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one
+ :param bool add: Default is to add a new record, set to False to fetch associated record"""
if with_connection:
connection = with_connection
else:
connection = self.get_database_connection()
- hist_orphan_ids = {row[0] for row in connection.execute(
- """SELECT tracks.id FROM tracks
- LEFT JOIN history ON tracks.id = history.track
- WHERE history.track IS NULL""")}
- black_list_orphan_ids = {row[0] for row in connection.execute(
- """SELECT tracks.id FROM tracks
- LEFT JOIN black_list ON tracks.id = black_list.track
- WHERE black_list.track IS NULL""")}
- orphans = [(orphan,) for orphan in hist_orphan_ids & black_list_orphan_ids]
- connection.executemany('DELETE FROM tracks WHERE id = (?);', orphans)
- if not with_connection:
+ album_id = self.get_album(album, with_connection=connection, add=True)
+ rows = connection.execute(
+ "SELECT id FROM blocklist WHERE album = ?", (album_id,))
+ if not rows.fetchone():
+ if not add:
+ return None
+ connection.execute('INSERT INTO blocklist (album) VALUES (?)',
+ (album_id,))
connection.commit()
- self.close_database_connection(connection)
-
- def clean_database(self, with_connection=None):
- """Wrapper around _clean_* methods"""
+ rows = connection.execute(
+ "SELECT id FROM blocklist WHERE album = ?", (album_id,))
+ bl = rows.fetchone()[0]
+ if not with_connection:
+ connection.close()
+ return bl
+
+ def get_bl_artist(self, artist, with_connection=None, add=True):
+ """Add an artist to blocklist
+ :param sima.lib.meta.Artist: Artist object to add to blocklist
+ :param sqlite3.Connection with_connection: sqlite3.Connection to reuse, else create a new one
+ :param bool add: Default is to add a new record, set to False to fetch associated record"""
if with_connection:
connection = with_connection
else:
connection = self.get_database_connection()
- self._clean_tracks_table(with_connection=connection)
- self._clean_albums_table(with_connection=connection)
- self._clean_artists_table(with_connection=connection)
- connection.execute("VACUUM")
- if not with_connection:
+ artist_id = self.get_artist(artist, with_connection=connection, add=True)
+ rows = connection.execute(
+ "SELECT id FROM blocklist WHERE artist = ?", (artist_id,))
+ if not rows.fetchone():
+ if not add:
+ return None
+ connection.execute('INSERT INTO blocklist (artist) VALUES (?)',
+ (artist_id,))
connection.commit()
- self.close_database_connection(connection)
-
- def purge_history(self, duration=__HIST_DURATION__):
- """Remove old entries in history"""
- connection = self.get_database_connection()
- connection.execute("DELETE FROM history WHERE last_play"
- " < datetime('now', '-%i hours')" % duration)
- connection.commit()
- self.close_database_connection(connection)
-
- def _set_dbversion(self):
- """Add db version"""
- connection = self.get_database_connection()
- connection.execute('INSERT INTO db_info (version, name) VALUES (?, ?)',
- (__DB_VERSION__, 'Sima DB'))
- connection.commit()
- self.close_database_connection(connection)
+ rows = connection.execute(
+ "SELECT id FROM blocklist WHERE artist = ?", (artist_id,))
+ bl = rows.fetchone()[0]
+ if not with_connection:
+ connection.close()
+ return bl
- def create_db(self):
- """ Set up a database
- """
+ def delete_bl(self, track=None, album=None, artist=None):
+ if not (track or album or artist):
+ return
connection = self.get_database_connection()
- connection.execute(
- 'CREATE TABLE IF NOT EXISTS db_info'
- ' (version INTEGER, name CHAR(36))')
- connection.execute(
- 'CREATE TABLE IF NOT EXISTS artists (id INTEGER PRIMARY KEY, name'
- ' VARCHAR(100), mbid CHAR(36))')
- connection.execute(
- 'CREATE TABLE IF NOT EXISTS albums (id INTEGER PRIMARY KEY,'
- ' artist INTEGER, name VARCHAR(100), mbid CHAR(36))')
- connection.execute(
- 'CREATE TABLE IF NOT EXISTS tracks (id INTEGER PRIMARY KEY,'
- ' name VARCHAR(100), artist INTEGER, album INTEGER,'
- ' file VARCHAR(500), mbid CHAR(36))')
- connection.execute(
- 'CREATE TABLE IF NOT EXISTS black_list (artist INTEGER,'
- ' album INTEGER, track INTEGER, updated DATE)')
- connection.execute(
- 'CREATE TABLE IF NOT EXISTS history (last_play DATE,'
- ' track integer)')
- connection.commit()
- self.close_database_connection(connection)
- self._set_dbversion()
+ blid = None
+ if track:
+ blid = self.get_bl_track(track, with_connection=connection)
+ if album:
+ blid = self.get_bl_album(album, with_connection=connection)
+ if artist:
+ blid = self.get_bl_artist(artist, with_connection=connection)
+ if not blid:
+ return
+ self._remove_blocklist_id(blid, with_connection=connection)
+ connection.close()
# VIM MODLINE
-# vim: ai ts=4 sw=4 sts=4 expandtab
+# vim: ai ts=4 sw=4 sts=4 expandtab fileencoding=utf8