]> git.0d.be Git - botaradio.git/blob - util.py
Don't lookup tags unnecessarily #93
[botaradio.git] / util.py
1 #!/usr/bin/python3
2 # coding=utf-8
3
4 import hashlib
5 import magic
6 import os
7 import sys
8 import variables as var
9 import constants
10 import zipfile
11 import requests
12 import mutagen
13 import re
14 import subprocess as sp
15 import logging
16 import youtube_dl
17 from importlib import reload
18 from PIL import Image
19 from io import BytesIO
20 from sys import platform
21 import traceback
22 import urllib.parse, urllib.request, urllib.error
23 import base64
24 import media
25 import media.radio
26 from packaging import version
27
28 log = logging.getLogger("bot")
29
30
31 def solve_filepath(path):
32     if not path:
33         return ''
34
35     if path[0] == '/':
36         return path
37     else:
38         mydir = os.path.dirname(os.path.realpath(__file__))
39         return mydir + '/' + path
40
41
42 def get_recursive_file_list_sorted(path):
43     filelist = []
44     for root, dirs, files in os.walk(path, topdown=True, onerror=None, followlinks=True):
45         relroot = root.replace(path, '', 1)
46         if relroot != '' and relroot in var.config.get('bot', 'ignored_folders'):
47             continue
48         if len(relroot):
49             relroot += '/'
50         for file in files:
51             if file in var.config.get('bot', 'ignored_files'):
52                 continue
53
54             fullpath = os.path.join(path, relroot, file)
55             if not os.access(fullpath, os.R_OK):
56                 continue
57
58             try:
59                 mime = magic.from_file(fullpath, mime=True)
60                 if 'audio' in mime or 'audio' in magic.from_file(fullpath).lower() or 'video' in mime:
61                     filelist.append(relroot + file)
62             except:
63                 pass
64
65     filelist.sort()
66     return filelist
67
68 # - zips all files of the given zippath (must be a directory)
69 # - returns the absolute path of the created zip file
70 # - zip file will be in the applications tmp folder (according to configuration)
71 # - format of the filename itself = prefix_hash.zip
72 #       - prefix can be controlled by the caller
73 #       - hash is a sha1 of the string representation of the directories' contents (which are
74 #           zipped)
75 def zipdir(zippath, zipname_prefix=None):
76     zipname = var.tmp_folder
77     if zipname_prefix and '../' not in zipname_prefix:
78         zipname += zipname_prefix.strip().replace('/', '_') + '_'
79
80     files = get_recursive_file_list_sorted(zippath)
81     hash = hashlib.sha1((str(files).encode())).hexdigest()
82     zipname += hash + '.zip'
83
84     if os.path.exists(zipname):
85         return zipname
86
87     zipf = zipfile.ZipFile(zipname, 'w', zipfile.ZIP_DEFLATED)
88
89     for file in files:
90         file_to_add = os.path.join(zippath, file)
91         if not os.access(file_to_add, os.R_OK):
92             continue
93         if file in var.config.get('bot', 'ignored_files'):
94             continue
95
96         add_file_as = os.path.relpath(os.path.join(zippath, file), os.path.join(zippath, '..'))
97         zipf.write(file_to_add, add_file_as)
98
99     zipf.close()
100     return zipname
101
102
103 def get_user_ban():
104     res = "List of ban hash"
105     for i in var.db.items("user_ban"):
106         res += "<br/>" + i[0]
107     return res
108
109
110 def new_release_version():
111     v = urllib.request.urlopen(urllib.request.Request("https://packages.azlux.fr/botamusique/version")).read()
112     return v.rstrip().decode()
113
114
115 def update(current_version):
116     global log
117
118     new_version = new_release_version()
119     target = var.config.get('bot', 'target_version')
120     if version.parse(new_version) > version.parse(current_version) or target == "testing":
121         log.info('update: new version, start updating...')
122         tp = sp.check_output(['/usr/bin/env', 'bash', 'update.sh', target]).decode()
123         log.debug(tp)
124         log.info('update: update pip libraries dependencies')
125         sp.check_output([var.config.get('bot', 'pip3_path'), 'install', '--upgrade', '-r', 'requirements.txt']).decode()
126         msg = "New version installed, please restart the bot."
127         if target == "testing":
128             msg += tp.replace('\n', '<br/>')
129
130     else:
131         log.info('update: starting update youtube-dl via pip3')
132         tp = sp.check_output([var.config.get('bot', 'pip3_path'), 'install', '--upgrade', 'youtube-dl']).decode()
133         msg = ""
134         if "Requirement already up-to-date" in tp:
135             msg += "Youtube-dl is up-to-date"
136         else:
137             msg += "Update done: " + tp.split('Successfully installed')[1]
138     reload(youtube_dl)
139     msg += "<br/> Youtube-dl reloaded"
140     return msg
141
142
143 def user_ban(user):
144     var.db.set("user_ban", user, None)
145     res = "User " + user + " banned"
146     return res
147
148
149 def user_unban(user):
150     var.db.remove_option("user_ban", user)
151     res = "Done"
152     return res
153
154
155 def get_url_ban():
156     res = "List of ban:"
157     for i in var.db.items("url_ban"):
158         res += "<br/>" + i[0]
159     return res
160
161
162 def url_ban(url):
163     var.db.set("url_ban", url, None)
164     res = "url " + url + " banned"
165     return res
166
167
168 def url_unban(url):
169     var.db.remove_option("url_ban", url)
170     res = "Done"
171     return res
172
173
174 def pipe_no_wait(pipefd):
175     ''' Used to fetch the STDERR of ffmpeg. pipefd is the file descriptor returned from os.pipe()'''
176     if platform == "linux" or platform == "linux2" or platform == "darwin":
177         import fcntl
178         import os
179         try:
180             fl = fcntl.fcntl(pipefd, fcntl.F_GETFL)
181             fcntl.fcntl(pipefd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
182         except:
183             print(sys.exc_info()[1])
184             return False
185         else:
186             return True
187
188     elif platform == "win32":
189         # https://stackoverflow.com/questions/34504970/non-blocking-read-on-os-pipe-on-windows
190         import msvcrt
191         import os
192
193         from ctypes import windll, byref, wintypes, WinError, POINTER
194         from ctypes.wintypes import HANDLE, DWORD, BOOL
195
196         LPDWORD = POINTER(DWORD)
197         PIPE_NOWAIT = wintypes.DWORD(0x00000001)
198         ERROR_NO_DATA = 232
199
200         SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
201         SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
202         SetNamedPipeHandleState.restype = BOOL
203
204         h = msvcrt.get_osfhandle(pipefd)
205
206         res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
207         if res == 0:
208             print(WinError())
209             return False
210         return True
211
212
213 class Dir(object):
214     def __init__(self, path):
215         self.name = os.path.basename(path.strip('/'))
216         self.fullpath = path
217         self.subdirs = {}
218         self.files = []
219
220     def add_file(self, file):
221         if file.startswith(self.name + '/'):
222             file = file.replace(self.name + '/', '', 1)
223
224         if '/' in file:
225             # This file is in a subdir
226             subdir = file.split('/')[0]
227             if subdir in self.subdirs:
228                 self.subdirs[subdir].add_file(file)
229             else:
230                 self.subdirs[subdir] = Dir(os.path.join(self.fullpath, subdir))
231                 self.subdirs[subdir].add_file(file)
232         else:
233             self.files.append(file)
234         return True
235
236     def get_subdirs(self, path=None):
237         subdirs = []
238         if path and path != '' and path != './':
239             subdir = path.split('/')[0]
240             if subdir in self.subdirs:
241                 searchpath = '/'.join(path.split('/')[1::])
242                 subdirs = self.subdirs[subdir].get_subdirs(searchpath)
243                 subdirs = list(map(lambda subsubdir: os.path.join(subdir, subsubdir), subdirs))
244         else:
245             subdirs = self.subdirs
246
247         return subdirs
248
249     def get_subdirs_recursively(self, path=None):
250         subdirs = []
251         if path and path != '' and path != './':
252             subdir = path.split('/')[0]
253             if subdir in self.subdirs:
254                 searchpath = '/'.join(path.split('/')[1::])
255                 subdirs = self.subdirs[subdir].get_subdirs_recursively(searchpath)
256         else:
257             subdirs = list(self.subdirs.keys())
258
259             for key, val in self.subdirs.items():
260                 subdirs.extend(map(lambda subdir: key + '/' + subdir, val.get_subdirs_recursively()))
261
262         subdirs.sort()
263         return subdirs
264
265     def get_files(self, path=None):
266         files = []
267         if path and path != '' and path != './':
268             subdir = path.split('/')[0]
269             if subdir in self.subdirs:
270                 searchpath = '/'.join(path.split('/')[1::])
271                 files = self.subdirs[subdir].get_files(searchpath)
272         else:
273             files = self.files
274
275         return files
276
277     def get_files_recursively(self, path=None):
278         files = []
279         if path and path != '' and path != './':
280             subdir = path.split('/')[0]
281             if subdir in self.subdirs:
282                 searchpath = '/'.join(path.split('/')[1::])
283                 files = self.subdirs[subdir].get_files_recursively(searchpath)
284         else:
285             files = self.files
286
287             for key, val in self.subdirs.items():
288                 files.extend(map(lambda file: key + '/' + file, val.get_files_recursively()))
289
290         return files
291
292     def render_text(self, ident=0):
293         print('{}{}/'.format(' ' * (ident * 4), self.name))
294         for key, val in self.subdirs.items():
295             val.render_text(ident + 1)
296         for file in self.files:
297             print('{}{}'.format(' ' * (ident + 1) * 4, file))
298
299
300 # Parse the html from the message to get the URL
301
302 def get_url_from_input(string):
303     if string.startswith('http'):
304         return string
305     p = re.compile('href="(.+?)"', re.IGNORECASE)
306     res = re.search(p, string)
307     if res:
308         return res.group(1)
309     else:
310         return False
311
312 def youtube_search(query):
313     global log
314
315     try:
316         r = requests.get("https://www.youtube.com/results", params={'search_query': query}, timeout=5)
317         results = re.findall("watch\?v=(.*?)\".*?title=\"(.*?)\".*?"
318                              "(?:user|channel).*?>(.*?)<", r.text) # (id, title, uploader)
319
320         if len(results) > 0:
321             return results
322
323     except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError, requests.exceptions.Timeout) as e:
324         error_traceback = traceback.format_exc().split("During")[0]
325         log.error("util: youtube query failed with error:\n %s" % error_traceback)
326         return False