]> kaliko git repositories - mpd-sima.git/blob - sima/lib/simafm.py
Huge commit… Running last.fm track mode
[mpd-sima.git] / sima / lib / simafm.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Jack Kaliko <kaliko@azylum.org>
4 # Copyright (c) 2010 Eric Casteleijn <thisfred@gmail.com> (Throttle decorator)
5 #
6 #   This program is free software: you can redistribute it and/or modify
7 #   it under the terms of the GNU General Public License as published by
8 #   the Free Software Foundation, either version 3 of the License, or
9 #   (at your option) any later version.
10 #
11 #   This program is distributed in the hope that it will be useful,
12 #   but WITHOUT ANY WARRANTY; without even the implied warranty of
13 #   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 #   GNU General Public License for more details.
15 #
16 #   You should have received a copy of the GNU General Public License
17 #   along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19 #
20
21 """
22 Consume last.fm web service
23
24 """
25
26 __version__ = '0.3.0'
27 __author__ = 'Jack Kaliko'
28
29
30 import urllib.request, urllib.error, urllib.parse
31
32 from datetime import datetime, timedelta
33 from http.client import BadStatusLine
34 from socket import timeout as SocketTimeOut
35 from time import sleep
36 from xml.etree.cElementTree import ElementTree
37
38 # Some definitions
39 WAIT_BETWEEN_REQUESTS = timedelta(0, 0.4)
40 LFM_ERRORS = dict({'2': 'Invalid service -This service does not exist',
41     '3': 'Invalid Method - No method with that name in this package',
42     '4': 'Authentication Failed - You do not have permissions to access the service',
43     '5': "'Invalid format - This service doesn't exist in that format",
44     '6': 'Invalid parameters - Your request is missing a required parameter',
45     '7': 'Invalid resource specified',
46     '9': 'Invalid session key - Please re-authenticate',
47     '10': 'Invalid API key - You must be granted a valid key by last.fm',
48     '11': 'Service Offline - This service is temporarily offline. Try again later.',
49     '12': 'Subscription Error - The user needs to be subscribed in order to do that',
50     '13': 'Invalid method signature supplied',
51     '26': 'Suspended API key - Access for your account has been suspended, please contact Last.fm',
52     })
53
54
55 class XmlFMError(Exception):  # Errors
56     """
57     Exception raised for errors in the input.
58     """
59
60     def __init__(self, expression):
61         self.expression = expression
62
63     def __str__(self):
64         return repr(self.expression)
65
66
67 class EncodingError(XmlFMError):
68     """Raised when string is not unicode"""
69     pass
70
71
72 class XmlFMHTTPError(XmlFMError):
73     """Raised when failed to connect server"""
74
75     def __init__(self, expression):
76         if hasattr(expression, 'code'):
77             self.expression = 'error %d: %s' % (expression.code,
78                 expression.msg)
79         else:
80             self.expression = 'error: %s' % expression
81
82
83 class XmlFMNotFound(XmlFMError):
84     """Raised when no artist is found"""
85
86     def __init__(self, message=None):
87         if not message:
88             message = 'Artist probably not found (http error 400)'
89         self.expression = (message)
90
91
92 class XmlFMMissingArtist(XmlFMError):
93     """Raised when no artist name provided"""
94
95     def __init__(self, message=None):
96         if not message:
97             message = 'Missing artist name.'
98         self.expression = (message)
99
100
101 class XmlFMTimeOut(XmlFMError):
102     """Raised when urlopen times out"""
103
104     def __init__(self, message=None):
105         if not message:
106             message = 'Connection to last.fm web services times out!'
107         self.expression = (message)
108
109
110 class Throttle():
111     def __init__(self, wait):
112         self.wait = wait
113         self.last_called = datetime.now()
114
115     def __call__(self, func):
116         def wrapper(*args, **kwargs):
117             while self.last_called + self.wait > datetime.now():
118                 #print('waiting…')
119                 sleep(0.1)
120             result = func(*args, **kwargs)
121             self.last_called = datetime.now()
122             return result
123         return wrapper
124
125
126 class AudioScrobblerCache():
127     def __init__(self, elem, last):
128         self.elemtree = elem
129         self.requestdate = last
130
131     def created(self):
132         return self.requestdate
133
134     def gettree(self):
135         return self.elemtree
136
137
138 class SimaFM():
139     """
140     """
141     api_key = '4a1c9ddec29816ed803d7be9113ba4cb'
142     host = 'ws.audioscrobbler.com'
143     version = '2.0'
144     root_url = 'http://%s/%s/' % (host, version)
145     request = dict({'similar': '?method=artist.getsimilar&artist=%s&' +\
146                                 'api_key=%s' % api_key,
147                     'top': '?method=artist.gettoptracks&artist=%s&' +\
148                             'api_key=%s' % api_key,
149                     'track': '?method=track.getsimilar&artist=%s' +\
150                             '&track=%s' + '&api_key=%s' % api_key,
151                     'info': '?method=artist.getinfo&artist=%s' +\
152                             '&api_key=%s' % api_key,
153                     })
154     cache = dict({})
155     timestamp = datetime.utcnow()
156     count = 0
157
158     def __init__(self, artist=None, cache=True):
159         self._url = None
160         #SimaFM.count += 1
161         self.current_element = None
162         self.caching = cache
163         self.purge_cache()
164
165     def _is_in_cache(self):
166         """Controls presence of url in cache.
167         """
168         if self._url in SimaFM.cache:
169             #print('already fetch {0}'.format(self.artist))
170             return True
171         return False
172
173     def _fetch(self):
174         """Use cached elements or proceed http request"""
175         if self._is_in_cache():
176             self.current_element = SimaFM.cache.get(self._url).gettree()
177             return
178         self._fetch_lfm()
179
180     @Throttle(WAIT_BETWEEN_REQUESTS)
181     def _fetch_lfm(self):
182         """Get artists, fetch xml from last.fm"""
183         try:
184             fd = urllib.request.urlopen(url=self._url,
185                     timeout=15)
186         except SocketTimeOut:
187             raise XmlFMTimeOut()
188         except BadStatusLine as err:
189             raise XmlFMHTTPError(err)
190         except urllib.error.URLError as err:
191             if hasattr(err, 'reason'):
192                 # URLError, failed to reach server
193                 raise XmlFMError(repr(err.reason))
194             if hasattr(err, 'code'):
195                 # HTTPError, the server couldn't fulfill the request
196                 if err.code == 400:
197                     raise XmlFMNotFound()
198                 raise XmlFMHTTPError(err)
199             raise XmlFMError(err)
200         headers = dict(fd.getheaders())
201         content_type = headers.get('Content-Type').split(';')
202         if content_type[0] != "text/xml":
203             raise XmlFMError('None XML returned from the server')
204         if content_type[1].strip() != "charset=utf-8":
205             raise XmlFMError('XML not UTF-8 encoded!')
206         try:
207             self.current_element = ElementTree(file=fd)
208         except SocketTimeOut:
209             raise XmlFMTimeOut()
210         finally:
211             fd.close()
212         self._controls_lfm_answer()
213         if self.caching:
214             SimaFM.cache[self._url] = AudioScrobblerCache(self.current_element,
215                     datetime.utcnow())
216
217     def _controls_lfm_answer(self):
218         """Controls last.fm answer.
219         """
220         status = self.current_element.getroot().attrib.get('status')
221         if status == 'ok':
222             return True
223         if status == 'failed':
224             error = self.current_element.find('error').attrib.get('code')
225             errormsg = self.current_element.findtext('error')
226             #if error in LFM_ERRORS.keys():
227             #    print LFM_ERRORS.get(error)
228             raise XmlFMNotFound(errormsg)
229
230     def _controls_artist(self, artist):
231         """
232         """
233         self.artist = artist
234         if not self.artist:
235             raise XmlFMMissingArtist('Missing artist name calling SimaFM.get_<method>()')
236         if not isinstance(self.artist, str):
237             raise EncodingError('"%s" not unicode object' % self.artist)
238         # last.fm is UTF-8 encoded URL
239         self.artist_utf8 = self.artist.encode('UTF-8')
240
241     def purge_cache(self, age=4):
242         now = datetime.utcnow()
243         if now.hour == SimaFM.timestamp.hour:
244             return
245         SimaFM.timestamp = datetime.utcnow()
246         cache = SimaFM.cache
247         delta = timedelta(hours=age)
248         for url in list(cache.keys()):
249             timestamp = cache.get(url).created()
250             if now - timestamp > delta:
251                 cache.pop(url)
252
253     def get_similar(self, artist=None):
254         """
255         """
256         self._controls_artist(artist)
257         # Construct URL
258         url = SimaFM.root_url + SimaFM.request.get('similar')
259         self._url = url % (urllib.parse.quote(self.artist_utf8, safe=''))
260         self._fetch()
261         # TODO: controls name encoding
262         elem = self.current_element
263         for art in elem.getiterator(tag='artist'):
264             yield str(art.findtext('name')), 100 * float(art.findtext('match'))
265
266     def get_toptracks(self, artist=None):
267         """
268         """
269         self._controls_artist(artist)
270         # Construct URL
271         url = SimaFM.root_url + SimaFM.request.get('top')
272         self._url = url % (urllib.parse.quote(self.artist_utf8, safe=''))
273         self._fetch()
274         # TODO: controls name encoding
275         elem = self.current_element
276         for track in elem.getiterator(tag='track'):
277             yield str(track.findtext('name')), int(track.attrib.get('rank'))
278
279     def get_mbid(self, artist=None):
280         """
281         """
282         self._controls_artist(artist)
283         # Construct URL
284         url = SimaFM.root_url + SimaFM.request.get('info')
285         self._url = url % (urllib.parse.quote(self.artist_utf8, safe=''))
286         self._fetch()
287         # TODO: controls name encoding
288         elem = self.current_element
289         return str(elem.find('artist').findtext('mbid'))
290
291
292 def run():
293     test = SimaFM()
294     for a, m in test.get_similar(artist='Tool'):
295         pass
296     return
297
298 if __name__ == '__main__':
299     try:
300         run()
301     except XmlFMHTTPError as conn_err:
302         print("error trying to connect: %s" % conn_err)
303     except XmlFMNotFound as not_found:
304         print("looks like no artists were found: %s" % not_found)
305     except XmlFMError as err:
306         print(err)
307
308
309 # VIM MODLINE
310 # vim: ai ts=4 sw=4 sts=4 expandtab