]> git.0d.be Git - botaradio.git/blob - util.py
feat: advanced url processing, whitespaces issue
[botaradio.git] / util.py
1 #!/usr/bin/python3
2 # coding=utf-8
3
4 import hashlib
5 import magic
6 import os
7 import sys
8 import variables as var
9 import zipfile
10 import requests
11 import re
12 import subprocess as sp
13 import logging
14 import youtube_dl
15 from importlib import reload
16 from sys import platform
17 import traceback
18 import urllib.request
19 from packaging import version
20
21 log = logging.getLogger("bot")
22
23
24 def solve_filepath(path):
25     if not path:
26         return ''
27
28     if path[0] == '/':
29         return path
30     else:
31         mydir = os.path.dirname(os.path.realpath(__file__))
32         return mydir + '/' + path
33
34
35 def get_recursive_file_list_sorted(path):
36     filelist = []
37     for root, dirs, files in os.walk(path, topdown=True, onerror=None, followlinks=True):
38         relroot = root.replace(path, '', 1)
39         if relroot != '' and relroot in var.config.get('bot', 'ignored_folders'):
40             continue
41         if len(relroot):
42             relroot += '/'
43         for file in files:
44             if file in var.config.get('bot', 'ignored_files'):
45                 continue
46
47             fullpath = os.path.join(path, relroot, file)
48             if not os.access(fullpath, os.R_OK):
49                 continue
50
51             try:
52                 mime = magic.from_file(fullpath, mime=True)
53                 if 'audio' in mime or 'audio' in magic.from_file(fullpath).lower() or 'video' in mime:
54                     filelist.append(relroot + file)
55             except:
56                 pass
57
58     filelist.sort()
59     return filelist
60
61
62 # - zips all files of the given zippath (must be a directory)
63 # - returns the absolute path of the created zip file
64 # - zip file will be in the applications tmp folder (according to configuration)
65 # - format of the filename itself = prefix_hash.zip
66 #       - prefix can be controlled by the caller
67 #       - hash is a sha1 of the string representation of the directories' contents (which are
68 #           zipped)
69 def zipdir(zippath, zipname_prefix=None):
70     zipname = var.tmp_folder
71     if zipname_prefix and '../' not in zipname_prefix:
72         zipname += zipname_prefix.strip().replace('/', '_') + '_'
73
74     files = get_recursive_file_list_sorted(zippath)
75     hash = hashlib.sha1((str(files).encode())).hexdigest()
76     zipname += hash + '.zip'
77
78     if os.path.exists(zipname):
79         return zipname
80
81     zipf = zipfile.ZipFile(zipname, 'w', zipfile.ZIP_DEFLATED)
82
83     for file in files:
84         file_to_add = os.path.join(zippath, file)
85         if not os.access(file_to_add, os.R_OK):
86             continue
87         if file in var.config.get('bot', 'ignored_files'):
88             continue
89
90         add_file_as = os.path.relpath(os.path.join(zippath, file), os.path.join(zippath, '..'))
91         zipf.write(file_to_add, add_file_as)
92
93     zipf.close()
94     return zipname
95
96
97 def get_user_ban():
98     res = "List of ban hash"
99     for i in var.db.items("user_ban"):
100         res += "<br/>" + i[0]
101     return res
102
103
104 def new_release_version():
105     v = urllib.request.urlopen(urllib.request.Request("https://packages.azlux.fr/botamusique/version")).read()
106     return v.rstrip().decode()
107
108
109 def update(current_version):
110     global log
111
112     new_version = new_release_version()
113     target = var.config.get('bot', 'target_version')
114     if version.parse(new_version) > version.parse(current_version) or target == "testing":
115         log.info('update: new version, start updating...')
116         tp = sp.check_output(['/usr/bin/env', 'bash', 'update.sh', target]).decode()
117         log.debug(tp)
118         log.info('update: update pip libraries dependencies')
119         sp.check_output([var.config.get('bot', 'pip3_path'), 'install', '--upgrade', '-r', 'requirements.txt']).decode()
120         msg = "New version installed, please restart the bot."
121         if target == "testing":
122             msg += tp.replace('\n', '<br/>')
123
124     else:
125         log.info('update: starting update youtube-dl via pip3')
126         tp = sp.check_output([var.config.get('bot', 'pip3_path'), 'install', '--upgrade', 'youtube-dl']).decode()
127         msg = ""
128         if "Requirement already up-to-date" in tp:
129             msg += "Youtube-dl is up-to-date"
130         else:
131             msg += "Update done: " + tp.split('Successfully installed')[1]
132     reload(youtube_dl)
133     msg += "<br/> Youtube-dl reloaded"
134     return msg
135
136
137 def user_ban(user):
138     var.db.set("user_ban", user, None)
139     res = "User " + user + " banned"
140     return res
141
142
143 def user_unban(user):
144     var.db.remove_option("user_ban", user)
145     res = "Done"
146     return res
147
148
149 def get_url_ban():
150     res = "List of ban:"
151     for i in var.db.items("url_ban"):
152         res += "<br/>" + i[0]
153     return res
154
155
156 def url_ban(url):
157     var.db.set("url_ban", url, None)
158     res = "url " + url + " banned"
159     return res
160
161
162 def url_unban(url):
163     var.db.remove_option("url_ban", url)
164     res = "Done"
165     return res
166
167
168 def pipe_no_wait(pipefd):
169     """ Used to fetch the STDERR of ffmpeg. pipefd is the file descriptor returned from os.pipe()"""
170     if platform == "linux" or platform == "linux2" or platform == "darwin":
171         import fcntl
172         import os
173         try:
174             fl = fcntl.fcntl(pipefd, fcntl.F_GETFL)
175             fcntl.fcntl(pipefd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
176         except:
177             print(sys.exc_info()[1])
178             return False
179         else:
180             return True
181
182     elif platform == "win32":
183         # https://stackoverflow.com/questions/34504970/non-blocking-read-on-os-pipe-on-windows
184         import msvcrt
185         import os
186
187         from ctypes import windll, byref, wintypes, WinError, POINTER
188         from ctypes.wintypes import HANDLE, DWORD, BOOL
189
190         LPDWORD = POINTER(DWORD)
191         PIPE_NOWAIT = wintypes.DWORD(0x00000001)
192         ERROR_NO_DATA = 232
193
194         SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
195         SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
196         SetNamedPipeHandleState.restype = BOOL
197
198         h = msvcrt.get_osfhandle(pipefd)
199
200         res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
201         if res == 0:
202             print(WinError())
203             return False
204         return True
205
206
207 class Dir(object):
208     def __init__(self, path):
209         self.name = os.path.basename(path.strip('/'))
210         self.fullpath = path
211         self.subdirs = {}
212         self.files = []
213
214     def add_file(self, file):
215         if file.startswith(self.name + '/'):
216             file = file.replace(self.name + '/', '', 1)
217
218         if '/' in file:
219             # This file is in a subdir
220             subdir = file.split('/')[0]
221             if subdir in self.subdirs:
222                 self.subdirs[subdir].add_file(file)
223             else:
224                 self.subdirs[subdir] = Dir(os.path.join(self.fullpath, subdir))
225                 self.subdirs[subdir].add_file(file)
226         else:
227             self.files.append(file)
228         return True
229
230     def get_subdirs(self, path=None):
231         subdirs = []
232         if path and path != '' and path != './':
233             subdir = path.split('/')[0]
234             if subdir in self.subdirs:
235                 searchpath = '/'.join(path.split('/')[1::])
236                 subdirs = self.subdirs[subdir].get_subdirs(searchpath)
237                 subdirs = list(map(lambda subsubdir: os.path.join(subdir, subsubdir), subdirs))
238         else:
239             subdirs = self.subdirs
240
241         return subdirs
242
243     def get_subdirs_recursively(self, path=None):
244         subdirs = []
245         if path and path != '' and path != './':
246             subdir = path.split('/')[0]
247             if subdir in self.subdirs:
248                 searchpath = '/'.join(path.split('/')[1::])
249                 subdirs = self.subdirs[subdir].get_subdirs_recursively(searchpath)
250         else:
251             subdirs = list(self.subdirs.keys())
252
253             for key, val in self.subdirs.items():
254                 subdirs.extend(map(lambda subdir: key + '/' + subdir, val.get_subdirs_recursively()))
255
256         subdirs.sort()
257         return subdirs
258
259     def get_files(self, path=None):
260         files = []
261         if path and path != '' and path != './':
262             subdir = path.split('/')[0]
263             if subdir in self.subdirs:
264                 searchpath = '/'.join(path.split('/')[1::])
265                 files = self.subdirs[subdir].get_files(searchpath)
266         else:
267             files = self.files
268
269         return files
270
271     def get_files_recursively(self, path=None):
272         files = []
273         if path and path != '' and path != './':
274             subdir = path.split('/')[0]
275             if subdir in self.subdirs:
276                 searchpath = '/'.join(path.split('/')[1::])
277                 files = self.subdirs[subdir].get_files_recursively(searchpath)
278         else:
279             files = self.files
280
281             for key, val in self.subdirs.items():
282                 files.extend(map(lambda file: key + '/' + file, val.get_files_recursively()))
283
284         return files
285
286     def render_text(self, ident=0):
287         print('{}{}/'.format(' ' * (ident * 4), self.name))
288         for key, val in self.subdirs.items():
289             val.render_text(ident + 1)
290         for file in self.files:
291             print('{}{}'.format(' ' * (ident + 1) * 4, file))
292
293
294 # Parse the html from the message to get the URL
295
296 def get_url_from_input(string):
297     string = string.strip()
298     if not (string.startswith("http") or string.startswith("HTTP")):
299         res = re.search('href="(.+?)"', string, flags=re.IGNORECASE)
300         if res:
301             string = res.group(1)
302         else:
303             return False
304
305     match = re.search("(http|https)://(\S*)?/(\S*)", string, flags=re.IGNORECASE)
306     if match:
307         url = match[1].lower() + "://" + match[2].lower() + "/" + match[3]
308         return url
309     else:
310         return False
311
312
313 def youtube_search(query):
314     global log
315
316     try:
317         r = requests.get("https://www.youtube.com/results", params={'search_query': query}, timeout=5)
318         results = re.findall(r"watch\?v=(.*?)\".*?title=\"(.*?)\".*?"
319                              "(?:user|channel).*?>(.*?)<", r.text)  # (id, title, uploader)
320
321         if len(results) > 0:
322             return results
323
324     except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError, requests.exceptions.Timeout):
325         error_traceback = traceback.format_exc().split("During")[0]
326         log.error("util: youtube query failed with error:\n %s" % error_traceback)
327         return False