From: kaliko Date: Sat, 1 May 2021 13:28:37 +0000 (+0200) Subject: Add fetch history methods X-Git-Tag: 0.18.0~68 X-Git-Url: https://git.kaliko.me/?a=commitdiff_plain;h=798ce9e58b7725118a05245ddb723f6e909ded95;p=mpd-sima.git Add fetch history methods --- diff --git a/sima/lib/db.py b/sima/lib/db.py index a2687b8..91cf037 100644 --- a/sima/lib/db.py +++ b/sima/lib/db.py @@ -26,6 +26,7 @@ __HIST_DURATION__ = int(30 * 24) # in hours import sqlite3 +from collections import deque from datetime import (datetime, timedelta) from sima.lib.meta import Artist, Album @@ -394,50 +395,115 @@ class SimaDB: connection = self.get_database_connection() connection.execute("DELETE FROM history WHERE last_play" " < datetime('now', '-%i hours')" % duration) + connection.execute('VACUUM') connection.commit() self.close_database_connection(connection) - def fetch_artists_history(self, duration=__HIST_DURATION__): + def fetch_albums_history(self, needle=None, duration=__HIST_DURATION__): + """ + :param sima.lib.meta.Artist needle: When specified, returns albums history for this artist. + """ date = datetime.utcnow() - timedelta(hours=duration) connection = self.get_database_connection() connection.row_factory = sqlite3.Row rows = connection.execute(""" - SELECT artists.name AS name, - artists.mbid as mbid + SELECT albums.name AS name, + albums.mbid as mbid, + artists.name as artist, + artists.mbid as artist_mbib FROM history JOIN tracks ON history.track = tracks.id + LEFT OUTER JOIN albums ON tracks.album = albums.id LEFT OUTER JOIN artists ON tracks.artist = artists.id - WHERE history.last_play > ? + WHERE history.last_play > ? AND albums.name NOT NULL AND artists.name NOT NULL ORDER BY history.last_play DESC""", (date.isoformat(' '),)) hist = list() for row in rows: - if hist and hist[-1] == Album(**row): # remove consecutive dupes + vals = dict(row) + artist = Artist(name=vals.pop('artist'), + mbid=vals.pop('artist_mbib')) + if needle: + if needle != artist: + continue + album = Album(**vals, artist=artist) + if hist and hist[-1] == album: + # remove consecutive dupes continue - hist.append(Album(**row)) + hist.append(album) connection.close() return hist - def fetch_history(self, duration=__HIST_DURATION__): - """Fetches tracks history, more recent first - :param int duration: How long ago to fetch history from + def fetch_artists_history(self, needle=None, duration=__HIST_DURATION__): + """Returns a list of Artist objects + :param sima.lib.meta.Artist|sima.lib.meta.MetaContainer needle: When specified, returns history for this artist, it's actually testing the artist presence in history. + :param sima.lib.meta.MetaContainer needle: When specified, returns history for these artists only """ date = datetime.utcnow() - timedelta(hours=duration) connection = self.get_database_connection() connection.row_factory = sqlite3.Row rows = connection.execute(""" - SELECT tracks.title, tracks.file, artists.name AS artist, - albumartists.name AS albumartist, - artists.mbid as musicbrainz_artistid, - albums.name AS album, - albums.mbid AS musicbrainz_albumid, - tracks.mbid as musicbrainz_trackid + SELECT artists.name AS name, + artists.mbid as mbid FROM history JOIN tracks ON history.track = tracks.id LEFT OUTER JOIN artists ON tracks.artist = artists.id - LEFT OUTER JOIN albumartists ON tracks.albumartist = albumartists.id - LEFT OUTER JOIN albums ON tracks.album = albums.id - WHERE history.last_play > ? + WHERE history.last_play > ? AND artists.name NOT NULL ORDER BY history.last_play DESC""", (date.isoformat(' '),)) + last = deque(maxlen=1) + hist = list() + for row in rows: + artist = Artist(**row) + if last and last[0] == artist: # remove consecutive dupes + continue + last.append(artist) + if needle and isinstance(needle, (Artist, str)): + if needle == artist: + hist.append(artist) # No need to go further + break + continue + elif needle and getattr(needle, '__contains__'): + if artist in needle: + hist.append(artist) # No need to go further + continue + hist.append(artist) + return hist + + def fetch_history(self, artist=None, duration=__HIST_DURATION__): + """Fetches tracks history, more recent first + :param sima.lib.meta.Artist artist: limit history to this artist + :param int duration: How long ago to fetch history from + """ + date = datetime.utcnow() - timedelta(hours=duration) + connection = self.get_database_connection() + connection.row_factory = sqlite3.Row + sql = """ + SELECT tracks.title, tracks.file, artists.name AS artist, + albumartists.name AS albumartist, + artists.mbid as musicbrainz_artistid, + albums.name AS album, + albums.mbid AS musicbrainz_albumid, + tracks.mbid as musicbrainz_trackid + FROM history + JOIN tracks ON history.track = tracks.id + LEFT OUTER JOIN artists ON tracks.artist = artists.id + LEFT OUTER JOIN albumartists ON tracks.albumartist = albumartists.id + LEFT OUTER JOIN albums ON tracks.album = albums.id + WHERE history.last_play > ? + """ + if artist: + if artist.mbid: + rows = connection.execute(sql+""" + AND artists.mbid = ? + ORDER BY history.last_play DESC""", + (date.isoformat(' '), artist.mbid)) + else: + rows = connection.execute(sql+""" + AND artists.name = ? + ORDER BY history.last_play DESC""", + (date.isoformat(' '), artist.name)) + else: + rows = connection.execute(sql+'ORDER BY history.last_play DESC', + (date.isoformat(' '),)) hist = list() for row in rows: hist.append(Track(**row)) @@ -525,25 +591,6 @@ class SimaDB: return self._remove_blocklist_id(blid, with_connection=connection) -def main(): - DEVOLT = { - 'album': 'Grey', - 'albumartist': 'Devolt', - 'artist': 'Devolt', - 'date': '2011-12-01', - 'file': 'music/Devolt/2011-Grey/03-Devolt - Crazy.mp3', - 'musicbrainz_albumartistid': 'd8e7e3e2-49ab-4f7c-b148-fc946d521f99', - 'musicbrainz_albumid': 'ea2ef2cf-59e1-443a-817e-9066e3e0be4b', - 'musicbrainz_artistid': 'd8e7e3e2-49ab-4f7c-b148-fc946d521f99', - 'musicbrainz_trackid': 'fabf8fc9-2ae5-49c9-8214-a839c958d872', - 'duration': '220.000', - 'title': 'Crazy'} - db = SimaDB('/dev/shm/test.sqlite') - db.create_db() - db.add_history(Track(**DEVOLT)) - DEVOLT['file'] = 'foo' - print(db.get_bl_track(Track(**DEVOLT))) - db.add_history(Track(**DEVOLT)) # VIM MODLINE # vim: ai ts=4 sw=4 sts=4 expandtab fileencoding=utf8 diff --git a/tests/test_simadb.py b/tests/test_simadb.py index 8e0d49f..52e3e44 100644 --- a/tests/test_simadb.py +++ b/tests/test_simadb.py @@ -6,7 +6,7 @@ import os from sima.lib.db import SimaDB from sima.lib.track import Track -from sima.lib.meta import Album +from sima.lib.meta import Album, Artist, MetaContainer DEVOLT = { @@ -145,9 +145,11 @@ class Test_00DB(Main): # trk03 = Track(file='03', **tr) self.db.add_history(trk03, CURRENT-datetime.timedelta(hours=1)) - # got multiple tracks, same artist, got artist history len == 1 + # got multiple tracks, same artist/album, got artist/album history len = 1 art_history = self.db.fetch_artists_history() + alb_history = self.db.fetch_albums_history() self.assertEqual(len(art_history), 1) + self.assertEqual(len(alb_history), 1) self.assertEqual(art_history, [trk01.Artist]) # Now add new artist to history @@ -164,7 +166,43 @@ class Test_00DB(Main): trk04.artist, trk03.artist], art_history) - def test_04_triggers(self): + def test_04_filtering_history(self): + # Controls artist history filtering + for i in range(0, 5): + trk = Track(file=f'/foo/bar.{i}', name=f'{i}-baz', + artist=f'{i}-art', album=f'{i}-lbum') + last = CURRENT - datetime.timedelta(minutes=i) + self.db.add_history(trk, date=last) + if i == 5: # bounce latest record + self.db.add_history(trk, date=last) + art_history = self.db.fetch_artists_history() + # Already checked but to be sure, we should have 5 artists in history + self.assertEqual(len(art_history), 5) + for needle in ['4-art', Artist(name='4-art')]: + art_history = self.db.fetch_artists_history(needle=needle) + self.assertEqual(art_history, [needle]) + needle = Artist(name='not-art') + art_history = self.db.fetch_artists_history(needle=needle) + self.assertEqual(art_history, []) + # Controls artists history filtering + # for a list of Artist objects + needle_list = [Artist(name='3-art'), Artist(name='4-art')] + art_history = self.db.fetch_artists_history(needle=needle_list) + self.assertEqual(art_history, needle_list) + # for a MetaContainer + needle_meta = MetaContainer(needle_list) + art_history = self.db.fetch_artists_history(needle=needle_meta) + self.assertEqual(art_history, needle_list) + # for a list of string (Meta object handles Artist/str comparison) + art_history = self.db.fetch_artists_history(['3-art', '4-art']) + self.assertEqual(art_history, needle_list) + + # Controls album history filtering + needle = Artist(name='4-art') + alb_history = self.db.fetch_albums_history(needle=needle) + self.assertEqual(alb_history, [Album(name='4-lbum')]) + + def test_05_triggers(self): self.db.purge_history(duration=0) tracks_ids = list() # Add a first track