1 # -*- coding: utf-8 -*-
3 # Copyright (c) 2014 Jack Kaliko <kaliko@azylum.org>
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 Consume EchoNest web service
26 __author__ = 'Jack Kaliko'
29 import urllib.request, urllib.error, urllib.parse
31 from datetime import datetime, timedelta
32 from socket import timeout as SocketTimeOut
33 from time import sleep
35 from requests import get
38 from sima.lib.meta import Artist
39 from sima.utils.utils import getws
40 if len(ECH.get('apikey')) == 23:
44 WAIT_BETWEEN_REQUESTS = timedelta(0, 0.4)
47 class SimaEchoError(Exception):
51 def __init__(self, wait):
53 self.last_called = datetime.now()
55 def __call__(self, func):
56 def wrapper(*args, **kwargs):
57 while self.last_called + self.wait > datetime.now():
59 result = func(*args, **kwargs)
60 self.last_called = datetime.now()
66 def __init__(self, elem, last=None):
68 self.requestdate = last
70 self.requestdate = datetime.utcnow()
73 return self.requestdate
82 root_url = 'http://{host}/api/{version}'.format(**ECH)
84 timestamp = datetime.utcnow()
86 def __init__(self, cache=True):
87 self._ressource = None
88 self._payload = {'api_key': ECH.get('apikey')}
89 self.current_element = None
94 """Use cached elements or proceed http request"""
95 self._req = get(self._ressource, params=self._payload, timeout=5)
96 if self._req.url in SimaFM.cache:
97 print('got from SimaFM cache')
98 self.current_element = SimaFM.cache.get(self._req.url).get()
102 @Throttle(WAIT_BETWEEN_REQUESTS)
103 def _fetch_lfm(self):
104 """fetch from web service"""
105 if self._req.status_code is not 200:
106 raise SimaEchoError(self._req.status_code)
107 self.current_element = self._req.json()
108 self._controls_lfm_answer()
110 SimaFM.cache.update({self._req.url:
111 Cache(self.current_element)})
113 def _controls_lfm_answer(self):
114 """Controls last.fm answer.
116 status = self.current_element.get('response').get('status')
117 if status.get('code') is 0:
119 raise SimaEchoError(status.get('message'))
121 def _controls_artist(self, artist):
124 if not isinstance(artist, Artist):
125 raise TypeError('"{0!r}" not an Artist object'.format(artist))
128 self._payload.update(
129 id='musicbrainz:artist:{0}'.format(artist.mbid))
131 self._payload.update(name=artist.name)
132 self._payload.update(bucket='id:musicbrainz')
133 self._payload.update(results=30)
135 def purge_cache(self, age=4):
136 now = datetime.utcnow()
137 if now.hour == SimaFM.timestamp.hour:
139 SimaFM.timestamp = datetime.utcnow()
141 delta = timedelta(hours=age)
142 for url in list(cache.keys()):
143 timestamp = cache.get(url).created()
144 if now - timestamp > delta:
147 def get_similar(self, artist=None):
150 self._controls_artist(artist)
152 self._ressource = '{0}/artist/similar'.format(SimaFM.root_url)
154 for art in self.current_element.get('response').get('artists'):
157 if 'foreign_ids' in art:
158 for frgnid in art.get('foreign_ids'):
159 if frgnid.get('catalog') == 'musicbrainz':
160 mbid = frgnid.get('foreign_id').lstrip('musicbrainz:artist:')
161 yield Artist(mbid=mbid, name=art.get('name'))
166 for t, a, m in test.get_similartracks(artist='Nirvana', track='Smells Like Teen Spirit'):
170 if __name__ == '__main__':
173 except XmlFMHTTPError as conn_err:
174 print("error trying to connect: %s" % conn_err)
175 except XmlFMNotFound as not_found:
176 print("looks like no artists were found: %s" % not_found)
177 except XmlFMError as err:
182 # vim: ai ts=4 sw=4 sts=4 expandtab