1 # -*- coding: utf-8 -*-
3 # Copyright (c) 2009-2013, 2019-2020 kaliko <kaliko@azylum.org>
4 # Copyright (c) 2009, Eric Casteleijn <thisfred@gmail.com>
5 # Copyright (c) 2008 Rick van Hattem
7 # This file is part of sima
9 # sima is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
14 # sima is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with sima. If not, see <http://www.gnu.org/licenses/>.
23 """SQlite database library
27 # MuscicBrainz ID: <http://musicbrainz.org/doc/MusicBrainzIdentifier>
28 # Artists: <http://musicbrainz.org/doc/Artist_Name>
29 # <http://musicbrainz.org/doc/Same_Artist_With_Different_Names>
32 __HIST_DURATION__ = int(30 * 24) # in hours
36 from datetime import (datetime, timedelta)
37 from os.path import dirname, isdir
38 from os import (access, W_OK, F_OK)
41 class SimaDBError(Exception):
47 class SimaDBAccessError(SimaDBError):
48 """Error on accessing DB file"""
51 class SimaDBNoFile(SimaDBError):
52 """No DB file present"""
58 def __init__(self, db_path=None):
59 self._db_path = db_path
60 self.db_path_mod_control()
62 def db_path_mod_control(self):
63 """Controls DB path access & write permissions"""
64 db_path = self._db_path
65 # Controls directory access
66 if not isdir(dirname(db_path)):
67 raise SimaDBAccessError('Not a regular directory: "%s"' %
69 if not access(dirname(db_path), W_OK):
70 raise SimaDBAccessError('No write access to "%s"' % dirname(db_path))
71 # Is a file but no write access
72 if access(db_path, F_OK) and not access(db_path, W_OK | F_OK):
73 raise SimaDBAccessError('No write access to "%s"' % db_path)
75 if not access(db_path, F_OK):
76 raise SimaDBNoFile('No DB file in "%s"' % db_path)
78 def close_database_connection(self, connection):
79 """Close the database connection."""
82 def get_database_connection(self):
83 """get database reference"""
84 connection = sqlite3.connect(
85 self._db_path, timeout=5.0, isolation_level=None)
86 #connection.text_factory = str
89 def get_artist(self, artist_name, mbid=None,
90 with_connection=None, add_not=False):
91 """get artist information from the database.
92 if not in database insert new entry."""
94 connection = with_connection
96 connection = self.get_database_connection()
97 rows = connection.execute(
98 "SELECT * FROM artists WHERE name = ?", (artist_name,))
100 if not with_connection:
101 self.close_database_connection(connection)
104 if not with_connection:
105 self.close_database_connection(connection)
108 "INSERT INTO artists (name, mbid) VALUES (?, ?)",
111 rows = connection.execute(
112 "SELECT * FROM artists WHERE name = ?", (artist_name,))
114 if not with_connection:
115 self.close_database_connection(connection)
117 if not with_connection:
118 self.close_database_connection(connection)
120 def get_track(self, track, with_connection=None, add_not=False):
122 Get a track from Tracks table, add if not existing,
123 Attention: use Track() object!!
124 if not in database insert new entry."""
129 connection = with_connection
131 connection = self.get_database_connection()
132 art_id = self.get_artist(art, with_connection=connection)[0]
133 alb_id = self.get_album(track, with_connection=connection)[0]
134 rows = connection.execute(
135 "SELECT * FROM tracks WHERE name = ? AND"
136 " artist = ? AND file = ?", (nam, art_id, fil))
138 if not with_connection:
139 self.close_database_connection(connection)
144 "INSERT INTO tracks (artist, album, name, file) VALUES (?, ?, ?, ?)",
145 (art_id, alb_id, nam, fil))
147 rows = connection.execute(
148 "SELECT * FROM tracks WHERE name = ? AND"
149 " artist = ? AND album = ? AND file = ?",
150 (nam, art_id, alb_id, fil,))
152 if not with_connection:
153 self.close_database_connection(connection)
155 if not with_connection:
157 self.close_database_connection(connection)
159 def get_album(self, track, mbid=None,
160 with_connection=None, add_not=False):
162 get album information from the database.
163 if not in database insert new entry.
164 Attention: use Track|Album object!!
165 Use AlbumArtist tag if provided, fallback to Album tag
168 connection = with_connection
170 connection = self.get_database_connection()
171 if track.albumartist:
172 artist = track.albumartist
174 artist = track.artist
175 art_id = self.get_artist(artist, with_connection=connection)[0]
177 rows = connection.execute(
178 "SELECT * FROM albums WHERE name = ? AND artist = ?",
181 if not with_connection:
182 self.close_database_connection(connection)
187 "INSERT INTO albums (name, artist, mbid) VALUES (?, ?, ?)",
188 (album, art_id, mbid))
190 rows = connection.execute(
191 "SELECT * FROM albums WHERE name = ? AND artist = ?",
194 if not with_connection:
195 self.close_database_connection(connection)
197 if not with_connection:
198 self.close_database_connection(connection)
200 def get_artists(self, with_connection=None):
201 """Returns all artists in DB"""
203 connection = with_connection
205 connection = self.get_database_connection()
206 rows = connection.execute("SELECT name FROM artists ORDER BY name")
207 results = [row for row in rows]
208 if not with_connection:
209 self.close_database_connection(connection)
210 for artist in results:
213 def get_bl_artist(self, artist_name,
214 with_connection=None, add_not=None):
215 """get blacklisted artist information from the database."""
217 connection = with_connection
219 connection = self.get_database_connection()
220 art = self.get_artist(artist_name, with_connection=connection,
225 rows = connection.execute("SELECT * FROM black_list WHERE artist = ?",
228 if not with_connection:
229 self.close_database_connection(connection)
232 if not with_connection:
233 self.close_database_connection(connection)
235 connection.execute("INSERT INTO black_list (artist) VALUES (?)",
237 connection.execute("UPDATE black_list SET updated = DATETIME('now')"
238 " WHERE artist = ?", (art_id,))
240 rows = connection.execute("SELECT * FROM black_list WHERE artist = ?",
243 if not with_connection:
244 self.close_database_connection(connection)
246 if not with_connection:
247 self.close_database_connection(connection)
250 def get_bl_album(self, track, with_connection=None, add_not=None):
251 """get blacklisted album information from the database."""
253 connection = with_connection
255 connection = self.get_database_connection()
256 album = self.get_album(track, with_connection=connection,
261 rows = connection.execute("SELECT * FROM black_list WHERE album = ?",
264 if not with_connection:
265 self.close_database_connection(connection)
268 if not with_connection:
269 self.close_database_connection(connection)
271 connection.execute("INSERT INTO black_list (album) VALUES (?)",
273 connection.execute("UPDATE black_list SET updated = DATETIME('now')"
274 " WHERE album = ?", (alb_id,))
276 rows = connection.execute("SELECT * FROM black_list WHERE album = ?",
279 if not with_connection:
280 self.close_database_connection(connection)
282 if not with_connection:
283 self.close_database_connection(connection)
286 def get_bl_track(self, track, with_connection=None, add_not=None):
287 """get blacklisted track information from the database."""
289 connection = with_connection
291 connection = self.get_database_connection()
292 track = self.get_track(track, with_connection=connection,
297 rows = connection.execute("SELECT * FROM black_list WHERE track = ?",
300 if not with_connection:
301 self.close_database_connection(connection)
304 if not with_connection:
305 self.close_database_connection(connection)
307 connection.execute("INSERT INTO black_list (track) VALUES (?)",
309 connection.execute("UPDATE black_list SET updated = DATETIME('now')"
310 " WHERE track = ?", (track_id,))
312 rows = connection.execute("SELECT * FROM black_list WHERE track = ?",
315 if not with_connection:
316 self.close_database_connection(connection)
318 if not with_connection:
319 self.close_database_connection(connection)
322 def get_artists_history(self, artists, duration=__HIST_DURATION__):
324 :param list artists: list of object that can evaluate equality with
325 artist name, iterable of str or Artist object
327 date = datetime.utcnow() - timedelta(hours=duration)
328 connection = self.get_database_connection()
329 rows = connection.execute(
330 "SELECT arts.name, albs.name, trs.name, trs.file"
331 " FROM artists AS arts, tracks AS trs, history AS hist, albums AS albs"
332 " WHERE trs.id = hist.track AND trs.artist = arts.id AND trs.album = albs.id"
333 " AND hist.last_play > ? ORDER BY hist.last_play DESC", (date.isoformat(' '),))
335 if artists and row[0] not in artists:
340 self.close_database_connection(connection)
342 def get_history(self, artist=None, artists=None, duration=__HIST_DURATION__):
343 """Retrieve complete play history, most recent tracks first
344 artist : filter history for specific artist
345 artists : filter history for specific artists list
346 """ # pylint: disable=C0301
347 date = datetime.utcnow() - timedelta(hours=duration)
348 connection = self.get_database_connection()
350 rows = connection.execute(
351 "SELECT arts.name, albs.name, trs.name, trs.file, hist.last_play"
352 " FROM artists AS arts, tracks AS trs, history AS hist, albums AS albs"
353 " WHERE trs.id = hist.track AND trs.artist = arts.id AND trs.album = albs.id"
354 " AND hist.last_play > ? AND arts.name = ?"
355 " ORDER BY hist.last_play DESC", (date.isoformat(' '), artist,))
357 rows = connection.execute(
358 "SELECT arts.name, albs.name, trs.name, trs.file"
359 " FROM artists AS arts, tracks AS trs, history AS hist, albums AS albs"
360 " WHERE trs.id = hist.track AND trs.artist = arts.id AND trs.album = albs.id"
361 " AND hist.last_play > ? ORDER BY hist.last_play DESC", (date.isoformat(' '),))
363 if artists and row[0] not in artists:
366 self.close_database_connection(connection)
368 def get_black_list(self):
369 """Retrieve complete black list."""
370 connection = self.get_database_connection()
371 rows = connection.execute('SELECT black_list.rowid, artists.name'
372 ' FROM artists INNER JOIN black_list'
373 ' ON artists.id = black_list.artist')
374 yield ('Row ID', 'Actual black listed element', 'Extra information',)
376 yield ('Row ID', 'Artist',)
379 rows = connection.execute(
380 'SELECT black_list.rowid, albums.name, artists.name'
381 ' FROM artists, albums INNER JOIN black_list'
382 ' ON albums.id = black_list.album'
383 ' WHERE artists.id = albums.artist')
385 yield ('Row ID', 'Album', 'Artist name')
388 rows = connection.execute(
389 'SELECT black_list.rowid, tracks.name, artists.name'
390 ' FROM artists, tracks INNER JOIN black_list'
391 ' ON tracks.id = black_list.track'
392 ' WHERE tracks.artist = artists.id')
394 yield ('Row ID', 'Title', 'Artist name')
397 self.close_database_connection(connection)
399 def _set_mbid(self, artist_id=None, mbid=None, with_connection=None):
402 connection = with_connection
404 connection = self.get_database_connection()
405 connection.execute("UPDATE artists SET mbid = ? WHERE id = ?",
408 if not with_connection:
409 self.close_database_connection(connection)
411 def _remove_bl(self, rowid):
412 """Remove bl row id"""
413 connection = self.get_database_connection()
414 connection.execute('DELETE FROM black_list'
415 ' WHERE black_list.rowid = ?', (rowid,))
417 self.close_database_connection(connection)
419 def add_history(self, track):
421 connection = self.get_database_connection()
422 track_id = self.get_track(track, with_connection=connection)[0]
423 rows = connection.execute("SELECT * FROM history WHERE track = ? ",
425 if not rows.fetchone():
426 connection.execute("INSERT INTO history (track) VALUES (?)",
428 connection.execute("UPDATE history SET last_play = DATETIME('now') "
429 " WHERE track = ?", (track_id,))
431 self.close_database_connection(connection)
433 def _clean_artists_table(self, with_connection=None):
434 """Clean orphan artists"""
436 connection = with_connection
438 connection = self.get_database_connection()
439 artists_ids = {row[0] for row in connection.execute(
440 "SELECT id FROM artists")}
441 artist_2_artist_ids = {row[0] for row in connection.execute(
442 "SELECT artist FROM black_list")} | {
443 row[0] for row in connection.execute(
444 "SELECT artist FROM albums")} | {
445 row[0] for row in connection.execute(
446 "SELECT artist FROM tracks")}
447 orphans = [(orphan,) for orphan in artists_ids - artist_2_artist_ids]
448 connection.executemany('DELETE FROM artists WHERE id = (?);', orphans)
449 if not with_connection:
451 self.close_database_connection(connection)
453 def _clean_albums_table(self, with_connection=None):
454 """Clean orphan albums"""
456 connection = with_connection
458 connection = self.get_database_connection()
459 orphan_black_ids = {row[0] for row in connection.execute(
460 """SELECT albums.id FROM albums
461 LEFT JOIN black_list ON albums.id = black_list.album
462 WHERE ( black_list.album IS NULL )""")}
463 orphan_tracks_ids = {row[0] for row in connection.execute(
464 """SELECT albums.id FROM albums
465 LEFT JOIN tracks ON albums.id = tracks.album
466 WHERE tracks.album IS NULL""")}
467 orphans = [(orphan,) for orphan in orphan_black_ids & orphan_tracks_ids]
468 connection.executemany('DELETE FROM albums WHERE id = (?);', orphans)
469 if not with_connection:
471 self.close_database_connection(connection)
473 def _clean_tracks_table(self, with_connection=None):
474 """Clean orphan tracks"""
476 connection = with_connection
478 connection = self.get_database_connection()
479 hist_orphan_ids = {row[0] for row in connection.execute(
480 """SELECT tracks.id FROM tracks
481 LEFT JOIN history ON tracks.id = history.track
482 WHERE history.track IS NULL""")}
483 black_list_orphan_ids = {row[0] for row in connection.execute(
484 """SELECT tracks.id FROM tracks
485 LEFT JOIN black_list ON tracks.id = black_list.track
486 WHERE black_list.track IS NULL""")}
487 orphans = [(orphan,) for orphan in hist_orphan_ids & black_list_orphan_ids]
488 connection.executemany('DELETE FROM tracks WHERE id = (?);', orphans)
489 if not with_connection:
491 self.close_database_connection(connection)
493 def clean_database(self, with_connection=None):
494 """Wrapper around _clean_* methods"""
496 connection = with_connection
498 connection = self.get_database_connection()
499 self._clean_tracks_table(with_connection=connection)
500 self._clean_albums_table(with_connection=connection)
501 self._clean_artists_table(with_connection=connection)
502 connection.execute("VACUUM")
503 if not with_connection:
505 self.close_database_connection(connection)
507 def purge_history(self, duration=__HIST_DURATION__):
508 """Remove old entries in history"""
509 connection = self.get_database_connection()
510 connection.execute("DELETE FROM history WHERE last_play"
511 " < datetime('now', '-%i hours')" % duration)
513 self.close_database_connection(connection)
515 def _set_dbversion(self):
517 connection = self.get_database_connection()
518 connection.execute('INSERT INTO db_info (version, name) VALUES (?, ?)',
519 (__DB_VERSION__, 'Sima DB'))
521 self.close_database_connection(connection)
524 """ Set up a database
526 connection = self.get_database_connection()
528 'CREATE TABLE IF NOT EXISTS db_info'
529 ' (version INTEGER, name CHAR(36))')
531 'CREATE TABLE IF NOT EXISTS artists (id INTEGER PRIMARY KEY, name'
532 ' VARCHAR(100), mbid CHAR(36))')
534 'CREATE TABLE IF NOT EXISTS albums (id INTEGER PRIMARY KEY,'
535 ' artist INTEGER, name VARCHAR(100), mbid CHAR(36))')
537 'CREATE TABLE IF NOT EXISTS tracks (id INTEGER PRIMARY KEY,'
538 ' name VARCHAR(100), artist INTEGER, album INTEGER,'
539 ' file VARCHAR(500), mbid CHAR(36))')
541 'CREATE TABLE IF NOT EXISTS black_list (artist INTEGER,'
542 ' album INTEGER, track INTEGER, updated DATE)')
544 'CREATE TABLE IF NOT EXISTS history (last_play DATE,'
547 self.close_database_connection(connection)
548 self._set_dbversion()
552 # vim: ai ts=4 sw=4 sts=4 expandtab