8 import variables as var
14 import subprocess as sp
17 from importlib import reload
19 from io import BytesIO
20 from sys import platform
22 import urllib.parse, urllib.request, urllib.error
26 from packaging import version
28 log = logging.getLogger("bot")
31 def solve_filepath(path):
38 mydir = os.path.dirname(os.path.realpath(__file__))
39 return mydir + '/' + path
42 def get_recursive_file_list_sorted(path):
44 for root, dirs, files in os.walk(path, topdown=True, onerror=None, followlinks=True):
45 relroot = root.replace(path, '', 1)
46 if relroot != '' and relroot in var.config.get('bot', 'ignored_folders'):
51 if file in var.config.get('bot', 'ignored_files'):
54 fullpath = os.path.join(path, relroot, file)
55 if not os.access(fullpath, os.R_OK):
59 mime = magic.from_file(fullpath, mime=True)
60 if 'audio' in mime or 'audio' in magic.from_file(fullpath).lower() or 'video' in mime:
61 filelist.append(relroot + file)
68 # - zips all files of the given zippath (must be a directory)
69 # - returns the absolute path of the created zip file
70 # - zip file will be in the applications tmp folder (according to configuration)
71 # - format of the filename itself = prefix_hash.zip
72 # - prefix can be controlled by the caller
73 # - hash is a sha1 of the string representation of the directories' contents (which are
75 def zipdir(zippath, zipname_prefix=None):
76 zipname = var.tmp_folder
77 if zipname_prefix and '../' not in zipname_prefix:
78 zipname += zipname_prefix.strip().replace('/', '_') + '_'
80 files = get_recursive_file_list_sorted(zippath)
81 hash = hashlib.sha1((str(files).encode())).hexdigest()
82 zipname += hash + '.zip'
84 if os.path.exists(zipname):
87 zipf = zipfile.ZipFile(zipname, 'w', zipfile.ZIP_DEFLATED)
90 file_to_add = os.path.join(zippath, file)
91 if not os.access(file_to_add, os.R_OK):
93 if file in var.config.get('bot', 'ignored_files'):
96 add_file_as = os.path.relpath(os.path.join(zippath, file), os.path.join(zippath, '..'))
97 zipf.write(file_to_add, add_file_as)
104 res = "List of ban hash"
105 for i in var.db.items("user_ban"):
106 res += "<br/>" + i[0]
110 def new_release_version():
111 v = urllib.request.urlopen(urllib.request.Request("https://packages.azlux.fr/botamusique/version")).read()
112 return v.rstrip().decode()
115 def update(current_version):
118 new_version = new_release_version()
119 target = var.config.get('bot', 'target_version')
120 if version.parse(new_version) > version.parse(current_version) or target == "testing":
121 log.info('update: new version, start updating...')
122 tp = sp.check_output(['/usr/bin/env', 'bash', 'update.sh', target]).decode()
124 log.info('update: update pip libraries dependencies')
125 sp.check_output([var.config.get('bot', 'pip3_path'), 'install', '--upgrade', '-r', 'requirements.txt']).decode()
126 msg = "New version installed, please restart the bot."
127 if target == "testing":
128 msg += tp.replace('\n', '<br/>')
131 log.info('update: starting update youtube-dl via pip3')
132 tp = sp.check_output([var.config.get('bot', 'pip3_path'), 'install', '--upgrade', 'youtube-dl']).decode()
134 if "Requirement already up-to-date" in tp:
135 msg += "Youtube-dl is up-to-date"
137 msg += "Update done: " + tp.split('Successfully installed')[1]
139 msg += "<br/> Youtube-dl reloaded"
144 var.db.set("user_ban", user, None)
145 res = "User " + user + " banned"
149 def user_unban(user):
150 var.db.remove_option("user_ban", user)
157 for i in var.db.items("url_ban"):
158 res += "<br/>" + i[0]
163 var.db.set("url_ban", url, None)
164 res = "url " + url + " banned"
169 var.db.remove_option("url_ban", url)
174 def pipe_no_wait(pipefd):
175 ''' Used to fetch the STDERR of ffmpeg. pipefd is the file descriptor returned from os.pipe()'''
176 if platform == "linux" or platform == "linux2" or platform == "darwin":
180 fl = fcntl.fcntl(pipefd, fcntl.F_GETFL)
181 fcntl.fcntl(pipefd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
183 print(sys.exc_info()[1])
188 elif platform == "win32":
189 # https://stackoverflow.com/questions/34504970/non-blocking-read-on-os-pipe-on-windows
193 from ctypes import windll, byref, wintypes, WinError, POINTER
194 from ctypes.wintypes import HANDLE, DWORD, BOOL
196 LPDWORD = POINTER(DWORD)
197 PIPE_NOWAIT = wintypes.DWORD(0x00000001)
200 SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
201 SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
202 SetNamedPipeHandleState.restype = BOOL
204 h = msvcrt.get_osfhandle(pipefd)
206 res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
214 def __init__(self, path):
215 self.name = os.path.basename(path.strip('/'))
220 def add_file(self, file):
221 if file.startswith(self.name + '/'):
222 file = file.replace(self.name + '/', '', 1)
225 # This file is in a subdir
226 subdir = file.split('/')[0]
227 if subdir in self.subdirs:
228 self.subdirs[subdir].add_file(file)
230 self.subdirs[subdir] = Dir(os.path.join(self.fullpath, subdir))
231 self.subdirs[subdir].add_file(file)
233 self.files.append(file)
236 def get_subdirs(self, path=None):
238 if path and path != '' and path != './':
239 subdir = path.split('/')[0]
240 if subdir in self.subdirs:
241 searchpath = '/'.join(path.split('/')[1::])
242 subdirs = self.subdirs[subdir].get_subdirs(searchpath)
243 subdirs = list(map(lambda subsubdir: os.path.join(subdir, subsubdir), subdirs))
245 subdirs = self.subdirs
249 def get_subdirs_recursively(self, path=None):
251 if path and path != '' and path != './':
252 subdir = path.split('/')[0]
253 if subdir in self.subdirs:
254 searchpath = '/'.join(path.split('/')[1::])
255 subdirs = self.subdirs[subdir].get_subdirs_recursively(searchpath)
257 subdirs = list(self.subdirs.keys())
259 for key, val in self.subdirs.items():
260 subdirs.extend(map(lambda subdir: key + '/' + subdir, val.get_subdirs_recursively()))
265 def get_files(self, path=None):
267 if path and path != '' and path != './':
268 subdir = path.split('/')[0]
269 if subdir in self.subdirs:
270 searchpath = '/'.join(path.split('/')[1::])
271 files = self.subdirs[subdir].get_files(searchpath)
277 def get_files_recursively(self, path=None):
279 if path and path != '' and path != './':
280 subdir = path.split('/')[0]
281 if subdir in self.subdirs:
282 searchpath = '/'.join(path.split('/')[1::])
283 files = self.subdirs[subdir].get_files_recursively(searchpath)
287 for key, val in self.subdirs.items():
288 files.extend(map(lambda file: key + '/' + file, val.get_files_recursively()))
292 def render_text(self, ident=0):
293 print('{}{}/'.format(' ' * (ident * 4), self.name))
294 for key, val in self.subdirs.items():
295 val.render_text(ident + 1)
296 for file in self.files:
297 print('{}{}'.format(' ' * (ident + 1) * 4, file))
300 # Parse the html from the message to get the URL
302 def get_url_from_input(string):
303 if string.startswith('http'):
305 p = re.compile('href="(.+?)"', re.IGNORECASE)
306 res = re.search(p, string)
312 def youtube_search(query):
316 r = requests.get("https://www.youtube.com/results", params={'search_query': query}, timeout=5)
317 results = re.findall("watch\?v=(.*?)\".*?title=\"(.*?)\".*?"
318 "(?:user|channel).*?>(.*?)<", r.text) # (id, title, uploader)
323 except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError, requests.exceptions.Timeout) as e:
324 error_traceback = traceback.format_exc().split("During")[0]
325 log.error("util: youtube query failed with error:\n %s" % error_traceback)