]> git.0d.be Git - botaradio.git/blob - util.py
update submodule
[botaradio.git] / util.py
1 #!/usr/bin/python3
2 # coding=utf-8
3
4 import hashlib
5 import magic
6 import os
7 import sys
8 import variables as var
9 import constants
10 import zipfile
11 import requests
12 import mutagen
13 import re
14 import subprocess as sp
15 import logging
16 import youtube_dl
17 from importlib import reload
18 from PIL import Image
19 from io import BytesIO
20 from sys import platform
21 import traceback
22 import urllib.parse, urllib.request, urllib.error
23 import base64
24 import media
25 import media.radio
26 from packaging import version
27
28 log = logging.getLogger("bot")
29
30
31 def solve_filepath(path):
32     if not path:
33         return ''
34
35     if path[0] == '/':
36         return path
37     else:
38         mydir = os.path.dirname(os.path.realpath(__file__))
39         return mydir + '/' + path
40
41
42 def get_recursive_file_list_sorted(path):
43     filelist = []
44     for root, dirs, files in os.walk(path):
45         relroot = root.replace(path, '', 1)
46         if relroot != '' and relroot in var.config.get('bot', 'ignored_folders'):
47             continue
48         if len(relroot):
49             relroot += '/'
50         for file in files:
51             if file in var.config.get('bot', 'ignored_files'):
52                 continue
53
54             fullpath = os.path.join(path, relroot, file)
55             if not os.access(fullpath, os.R_OK):
56                 continue
57
58             mime = magic.from_file(fullpath, mime=True)
59             if 'audio' in mime or 'audio' in magic.from_file(fullpath).lower() or 'video' in mime:
60                 filelist.append(relroot + file)
61
62     filelist.sort()
63     return filelist
64
65
66 def get_music_path(music):
67     uri = ''
68     if music["type"] == "url":
69         uri = music['path']
70     elif music["type"] == "file":
71         uri = var.music_folder + music["path"]
72     elif music["type"] == "radio":
73         uri = music['url']
74
75     return uri
76
77
78 def get_music_tag_info(music):
79     if "path" in music:
80         uri = get_music_path(music)
81
82         if os.path.isfile(uri):
83             match = re.search("(.+)\.(.+)", uri)
84             if match is None:
85                 return music
86
87             file_no_ext = match[1]
88             ext = match[2]
89
90             try:
91                 im = None
92                 path_thumbnail = file_no_ext + ".jpg"
93                 if os.path.isfile(path_thumbnail):
94                     im = Image.open(path_thumbnail)
95
96                 if ext == "mp3":
97                     # title: TIT2
98                     # artist: TPE1, TPE2
99                     # album: TALB
100                     # cover artwork: APIC:
101                     tags = mutagen.File(uri)
102                     if 'TIT2' in tags:
103                         music['title'] = tags['TIT2'].text[0]
104                     if 'TPE1' in tags:  # artist
105                         music['artist'] = tags['TPE1'].text[0]
106
107                     if im is None:
108                         if "APIC:" in tags:
109                             im = Image.open(BytesIO(tags["APIC:"].data))
110
111                 elif ext == "m4a" or ext == "m4b" or ext == "mp4" or ext == "m4p":
112                     # title: ©nam (\xa9nam)
113                     # artist: ©ART
114                     # album: ©alb
115                     # cover artwork: covr
116                     tags = mutagen.File(uri)
117                     if '©nam' in tags:
118                         music['title'] = tags['©nam'][0]
119                     if '©ART' in tags:  # artist
120                         music['artist'] = tags['©ART'][0]
121
122                         if im is None:
123                             if "covr" in tags:
124                                 im = Image.open(BytesIO(tags["covr"][0]))
125
126                 if im:
127                     im.thumbnail((100, 100), Image.ANTIALIAS)
128                     buffer = BytesIO()
129                     im = im.convert('RGB')
130                     im.save(buffer, format="JPEG")
131                     music['thumbnail'] = base64.b64encode(buffer.getvalue()).decode('utf-8')
132             except:
133                 pass
134     else:
135         uri = music['url']
136
137     # if nothing found
138     if 'title' not in music:
139         match = re.search("([^\.]+)\.?.*", os.path.basename(uri))
140         music['title'] = match[1]
141
142     return music
143
144
145 def format_song_string(music):
146     display = ''
147     source = music["type"]
148     title = music["title"] if "title" in music else "Unknown title"
149     artist = music["artist"] if "artist" in music else "Unknown artist"
150
151     if source == "radio":
152         display = constants.strings("now_playing_radio",
153             url=music["url"],
154             title=media.radio.get_radio_title(music["url"]),
155             name=music["name"],
156             user=music["user"]
157         )
158     elif source == "url" and 'from_playlist' in music:
159         display = constants.strings("now_playing_from_playlist",
160                                     title=title,
161                                     url=music["playlist_url"],
162                                     playlist=music["playlist_title"],
163                                     user=music["user"]
164         )
165     elif source == "url":
166         display = constants.strings("now_playing_url",
167                                     title=title,
168                                     url=music["url"],
169                                     user=music["user"]
170         )
171     elif source == "file":
172         display = constants.strings("now_playing_file",
173                                     title=title,
174                                     artist=artist,
175                                     user=music["user"]
176         )
177
178     return display
179
180
181 def format_debug_song_string(music):
182     display = ''
183     source = music["type"]
184     title = music["title"] if "title" in music else "??"
185     artist = music["artist"] if "artist" in music else "??"
186
187     if source == "radio":
188         display = "[radio] {name} ({url}) by {user}".format(
189             name=music["name"],
190             url=music["url"],
191             user=music["user"]
192         )
193     elif source == "url" and 'from_playlist' in music:
194         display = "[url] {title} ({url}) from playlist {playlist} by {user}".format(
195             title=title,
196             url=music["url"],
197             playlist=music["playlist_title"],
198             user=music["user"]
199         )
200     elif source == "url":
201         display = "[url] {title} ({url}) by {user}".format(
202             title=title,
203             url=music["url"],
204             user=music["user"]
205         )
206     elif source == "file":
207         display = "[file] {artist} - {title} ({path}) by {user}".format(
208             title=title,
209             artist=artist,
210             path=music["path"],
211             user=music["user"]
212         )
213
214     return display
215
216
217 def format_current_playing():
218     music = var.playlist.current_item()
219     display = format_song_string(music)
220
221     if 'thumbnail' in music:
222         thumbnail_html = '<img width="80" src="data:image/jpge;base64,' + \
223                          music['thumbnail'] + '"/>'
224         return display + "<br />" + thumbnail_html
225
226     return display
227
228
229 # - zips all files of the given zippath (must be a directory)
230 # - returns the absolute path of the created zip file
231 # - zip file will be in the applications tmp folder (according to configuration)
232 # - format of the filename itself = prefix_hash.zip
233 #       - prefix can be controlled by the caller
234 #       - hash is a sha1 of the string representation of the directories' contents (which are
235 #           zipped)
236 def zipdir(zippath, zipname_prefix=None):
237     zipname = var.tmp_folder
238     if zipname_prefix and '../' not in zipname_prefix:
239         zipname += zipname_prefix.strip().replace('/', '_') + '_'
240
241     files = get_recursive_file_list_sorted(zippath)
242     hash = hashlib.sha1((str(files).encode())).hexdigest()
243     zipname += hash + '.zip'
244
245     if os.path.exists(zipname):
246         return zipname
247
248     zipf = zipfile.ZipFile(zipname, 'w', zipfile.ZIP_DEFLATED)
249
250     for file in files:
251         file_to_add = os.path.join(zippath, file)
252         if not os.access(file_to_add, os.R_OK):
253             continue
254         if file in var.config.get('bot', 'ignored_files'):
255             continue
256
257         add_file_as = os.path.relpath(os.path.join(zippath, file), os.path.join(zippath, '..'))
258         zipf.write(file_to_add, add_file_as)
259
260     zipf.close()
261     return zipname
262
263
264 def get_user_ban():
265     res = "List of ban hash"
266     for i in var.db.items("user_ban"):
267         res += "<br/>" + i[0]
268     return res
269
270
271 def new_release_version():
272     v = urllib.request.urlopen(urllib.request.Request("https://packages.azlux.fr/botamusique/version")).read()
273     return v.rstrip().decode()
274
275
276 def update(current_version):
277     global log
278
279     new_version = new_release_version()
280     target = var.config.get('bot', 'target_version')
281     if version.parse(new_version) > version.parse(current_version) or target == "testing":
282         log.info('update: new version, start updating...')
283         tp = sp.check_output(['/usr/bin/env', 'bash', 'update.sh', target]).decode()
284         log.debug(tp)
285         log.info('update: update pip libraries dependencies')
286         sp.check_output([var.config.get('bot', 'pip3_path'), 'install', '--upgrade', '-r', 'requirements.txt']).decode()
287         msg = "New version installed, please restart the bot."
288         if target == "testing":
289             msg += tp.replace('\n', '<br/>')
290
291     else:
292         log.info('update: starting update youtube-dl via pip3')
293         tp = sp.check_output([var.config.get('bot', 'pip3_path'), 'install', '--upgrade', 'youtube-dl']).decode()
294         msg = ""
295         if "Requirement already up-to-date" in tp:
296             msg += "Youtube-dl is up-to-date"
297         else:
298             msg += "Update done: " + tp.split('Successfully installed')[1]
299     reload(youtube_dl)
300     msg += "<br/> Youtube-dl reloaded"
301     return msg
302
303
304 def user_ban(user):
305     var.db.set("user_ban", user, None)
306     res = "User " + user + " banned"
307     return res
308
309
310 def user_unban(user):
311     var.db.remove_option("user_ban", user)
312     res = "Done"
313     return res
314
315
316 def get_url_ban():
317     res = "List of ban hash"
318     for i in var.db.items("url_ban"):
319         res += "<br/>" + i[0]
320     return res
321
322
323 def url_ban(url):
324     var.db.set("url_ban", url, None)
325     res = "url " + url + " banned"
326     return res
327
328
329 def url_unban(url):
330     var.db.remove_option("url_ban", url)
331     res = "Done"
332     return res
333
334
335 def pipe_no_wait(pipefd):
336     ''' Used to fetch the STDERR of ffmpeg. pipefd is the file descriptor returned from os.pipe()'''
337     if platform == "linux" or platform == "linux2" or platform == "darwin":
338         import fcntl
339         import os
340         try:
341             fl = fcntl.fcntl(pipefd, fcntl.F_GETFL)
342             fcntl.fcntl(pipefd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
343         except:
344             print(sys.exc_info()[1])
345             return False
346         else:
347             return True
348
349     elif platform == "win32":
350         # https://stackoverflow.com/questions/34504970/non-blocking-read-on-os-pipe-on-windows
351         import msvcrt
352         import os
353
354         from ctypes import windll, byref, wintypes, GetLastError, WinError
355         from ctypes.wintypes import HANDLE, DWORD, POINTER, BOOL
356
357         LPDWORD = POINTER(DWORD)
358         PIPE_NOWAIT = wintypes.DWORD(0x00000001)
359         ERROR_NO_DATA = 232
360
361         SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
362         SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
363         SetNamedPipeHandleState.restype = BOOL
364
365         h = msvcrt.get_osfhandle(pipefd)
366
367         res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
368         if res == 0:
369             print(WinError())
370             return False
371         return True
372
373
374 class Dir(object):
375     def __init__(self, path):
376         self.name = os.path.basename(path.strip('/'))
377         self.fullpath = path
378         self.subdirs = {}
379         self.files = []
380
381     def add_file(self, file):
382         if file.startswith(self.name + '/'):
383             file = file.replace(self.name + '/', '', 1)
384
385         if '/' in file:
386             # This file is in a subdir
387             subdir = file.split('/')[0]
388             if subdir in self.subdirs:
389                 self.subdirs[subdir].add_file(file)
390             else:
391                 self.subdirs[subdir] = Dir(os.path.join(self.fullpath, subdir))
392                 self.subdirs[subdir].add_file(file)
393         else:
394             self.files.append(file)
395         return True
396
397     def get_subdirs(self, path=None):
398         subdirs = []
399         if path and path != '' and path != './':
400             subdir = path.split('/')[0]
401             if subdir in self.subdirs:
402                 searchpath = '/'.join(path.split('/')[1::])
403                 subdirs = self.subdirs[subdir].get_subdirs(searchpath)
404                 subdirs = list(map(lambda subsubdir: os.path.join(subdir, subsubdir), subdirs))
405         else:
406             subdirs = self.subdirs
407
408         return subdirs
409
410     def get_subdirs_recursively(self, path=None):
411         subdirs = []
412         if path and path != '' and path != './':
413             subdir = path.split('/')[0]
414             if subdir in self.subdirs:
415                 searchpath = '/'.join(path.split('/')[1::])
416                 subdirs = self.subdirs[subdir].get_subdirs_recursively(searchpath)
417         else:
418             subdirs = list(self.subdirs.keys())
419
420             for key, val in self.subdirs.items():
421                 subdirs.extend(map(lambda subdir: key + '/' + subdir, val.get_subdirs_recursively()))
422
423         subdirs.sort()
424         return subdirs
425
426     def get_files(self, path=None):
427         files = []
428         if path and path != '' and path != './':
429             subdir = path.split('/')[0]
430             if subdir in self.subdirs:
431                 searchpath = '/'.join(path.split('/')[1::])
432                 files = self.subdirs[subdir].get_files(searchpath)
433         else:
434             files = self.files
435
436         return files
437
438     def get_files_recursively(self, path=None):
439         files = []
440         if path and path != '' and path != './':
441             subdir = path.split('/')[0]
442             if subdir in self.subdirs:
443                 searchpath = '/'.join(path.split('/')[1::])
444                 files = self.subdirs[subdir].get_files_recursively(searchpath)
445         else:
446             files = self.files
447
448             for key, val in self.subdirs.items():
449                 files.extend(map(lambda file: key + '/' + file, val.get_files_recursively()))
450
451         return files
452
453     def render_text(self, ident=0):
454         print('{}{}/'.format(' ' * (ident * 4), self.name))
455         for key, val in self.subdirs.items():
456             val.render_text(ident + 1)
457         for file in self.files:
458             print('{}{}'.format(' ' * (ident + 1) * 4, file))
459
460
461 # Parse the html from the message to get the URL
462
463 def get_url_from_input(string):
464     if string.startswith('http'):
465         return string
466     p = re.compile('href="(.+?)"', re.IGNORECASE)
467     res = re.search(p, string)
468     if res:
469         return res.group(1)
470     else:
471         return False
472
473 def youtube_search(query):
474     global log
475
476     try:
477         r = requests.get("https://www.youtube.com/results", params={'search_query': query}, timeout=5)
478         results = re.findall("watch\?v=(.*?)\".*?title=\"(.*?)\".*?"
479                              "(?:user|channel).*?>(.*?)<", r.text) # (id, title, uploader)
480
481         if len(results) > 0:
482             return results
483
484     except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError, requests.exceptions.Timeout) as e:
485         error_traceback = traceback.format_exc().split("During")[0]
486         log.error("util: youtube query failed with error:\n %s" % error_traceback)
487         return False