8 import variables as var
14 import subprocess as sp
17 from importlib import reload
19 from io import BytesIO
20 from sys import platform
22 import urllib.parse, urllib.request, urllib.error
26 from packaging import version
28 log = logging.getLogger("bot")
31 def solve_filepath(path):
38 mydir = os.path.dirname(os.path.realpath(__file__))
39 return mydir + '/' + path
42 def get_recursive_file_list_sorted(path):
44 for root, dirs, files in os.walk(path):
45 relroot = root.replace(path, '', 1)
46 if relroot != '' and relroot in var.config.get('bot', 'ignored_folders'):
51 if file in var.config.get('bot', 'ignored_files'):
54 fullpath = os.path.join(path, relroot, file)
55 if not os.access(fullpath, os.R_OK):
58 mime = magic.from_file(fullpath, mime=True)
59 if 'audio' in mime or 'audio' in magic.from_file(fullpath).lower() or 'video' in mime:
60 filelist.append(relroot + file)
66 def get_music_path(music):
68 if music["type"] == "url":
70 elif music["type"] == "file":
71 uri = var.music_folder + music["path"]
72 elif music["type"] == "radio":
78 def get_music_tag_info(music):
80 uri = get_music_path(music)
82 if os.path.isfile(uri):
83 match = re.search("(.+)\.(.+)", uri)
87 file_no_ext = match[1]
92 path_thumbnail = file_no_ext + ".jpg"
93 if os.path.isfile(path_thumbnail):
94 im = Image.open(path_thumbnail)
100 # cover artwork: APIC:
101 tags = mutagen.File(uri)
103 music['title'] = tags['TIT2'].text[0]
104 if 'TPE1' in tags: # artist
105 music['artist'] = tags['TPE1'].text[0]
109 im = Image.open(BytesIO(tags["APIC:"].data))
111 elif ext == "m4a" or ext == "m4b" or ext == "mp4" or ext == "m4p":
112 # title: ©nam (\xa9nam)
115 # cover artwork: covr
116 tags = mutagen.File(uri)
118 music['title'] = tags['©nam'][0]
119 if '©ART' in tags: # artist
120 music['artist'] = tags['©ART'][0]
124 im = Image.open(BytesIO(tags["covr"][0]))
127 im.thumbnail((100, 100), Image.ANTIALIAS)
129 im = im.convert('RGB')
130 im.save(buffer, format="JPEG")
131 music['thumbnail'] = base64.b64encode(buffer.getvalue()).decode('utf-8')
138 if 'title' not in music:
139 match = re.search("([^\.]+)\.?.*", os.path.basename(uri))
140 music['title'] = match[1]
145 def format_song_string(music):
147 source = music["type"]
148 title = music["title"] if "title" in music else "Unknown title"
149 artist = music["artist"] if "artist" in music else "Unknown artist"
151 if source == "radio":
152 display = constants.strings("now_playing_radio",
154 title=media.radio.get_radio_title(music["url"]),
158 elif source == "url" and 'from_playlist' in music:
159 display = constants.strings("now_playing_from_playlist",
161 url=music["playlist_url"],
162 playlist=music["playlist_title"],
165 elif source == "url":
166 display = constants.strings("now_playing_url",
171 elif source == "file":
172 display = constants.strings("now_playing_file",
181 def format_debug_song_string(music):
183 source = music["type"]
184 title = music["title"] if "title" in music else "??"
185 artist = music["artist"] if "artist" in music else "??"
187 if source == "radio":
188 display = "[radio] {name} ({url}) by {user}".format(
193 elif source == "url" and 'from_playlist' in music:
194 display = "[url] {title} ({url}) from playlist {playlist} by {user}".format(
197 playlist=music["playlist_title"],
200 elif source == "url":
201 display = "[url] {title} ({url}) by {user}".format(
206 elif source == "file":
207 display = "[file] {artist} - {title} ({path}) by {user}".format(
217 def format_current_playing():
218 music = var.playlist.current_item()
219 display = format_song_string(music)
221 if 'thumbnail' in music:
222 thumbnail_html = '<img width="80" src="data:image/jpge;base64,' + \
223 music['thumbnail'] + '"/>'
224 return display + "<br />" + thumbnail_html
229 # - zips all files of the given zippath (must be a directory)
230 # - returns the absolute path of the created zip file
231 # - zip file will be in the applications tmp folder (according to configuration)
232 # - format of the filename itself = prefix_hash.zip
233 # - prefix can be controlled by the caller
234 # - hash is a sha1 of the string representation of the directories' contents (which are
236 def zipdir(zippath, zipname_prefix=None):
237 zipname = var.tmp_folder
238 if zipname_prefix and '../' not in zipname_prefix:
239 zipname += zipname_prefix.strip().replace('/', '_') + '_'
241 files = get_recursive_file_list_sorted(zippath)
242 hash = hashlib.sha1((str(files).encode())).hexdigest()
243 zipname += hash + '.zip'
245 if os.path.exists(zipname):
248 zipf = zipfile.ZipFile(zipname, 'w', zipfile.ZIP_DEFLATED)
251 file_to_add = os.path.join(zippath, file)
252 if not os.access(file_to_add, os.R_OK):
254 if file in var.config.get('bot', 'ignored_files'):
257 add_file_as = os.path.relpath(os.path.join(zippath, file), os.path.join(zippath, '..'))
258 zipf.write(file_to_add, add_file_as)
265 res = "List of ban hash"
266 for i in var.db.items("user_ban"):
267 res += "<br/>" + i[0]
271 def new_release_version():
272 v = urllib.request.urlopen(urllib.request.Request("https://packages.azlux.fr/botamusique/version")).read()
273 return v.rstrip().decode()
276 def update(current_version):
279 new_version = new_release_version()
280 target = var.config.get('bot', 'target_version')
281 if version.parse(new_version) > version.parse(current_version) or target == "testing":
282 log.info('update: new version, start updating...')
283 tp = sp.check_output(['/usr/bin/env', 'bash', 'update.sh', target]).decode()
285 log.info('update: update pip libraries dependencies')
286 sp.check_output([var.config.get('bot', 'pip3_path'), 'install', '--upgrade', '-r', 'requirements.txt']).decode()
287 msg = "New version installed, please restart the bot."
288 if target == "testing":
289 msg += tp.replace('\n', '<br/>')
292 log.info('update: starting update youtube-dl via pip3')
293 tp = sp.check_output([var.config.get('bot', 'pip3_path'), 'install', '--upgrade', 'youtube-dl']).decode()
295 if "Requirement already up-to-date" in tp:
296 msg += "Youtube-dl is up-to-date"
298 msg += "Update done: " + tp.split('Successfully installed')[1]
300 msg += "<br/> Youtube-dl reloaded"
305 var.db.set("user_ban", user, None)
306 res = "User " + user + " banned"
310 def user_unban(user):
311 var.db.remove_option("user_ban", user)
317 res = "List of ban hash"
318 for i in var.db.items("url_ban"):
319 res += "<br/>" + i[0]
324 var.db.set("url_ban", url, None)
325 res = "url " + url + " banned"
330 var.db.remove_option("url_ban", url)
335 def pipe_no_wait(pipefd):
336 ''' Used to fetch the STDERR of ffmpeg. pipefd is the file descriptor returned from os.pipe()'''
337 if platform == "linux" or platform == "linux2" or platform == "darwin":
341 fl = fcntl.fcntl(pipefd, fcntl.F_GETFL)
342 fcntl.fcntl(pipefd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
344 print(sys.exc_info()[1])
349 elif platform == "win32":
350 # https://stackoverflow.com/questions/34504970/non-blocking-read-on-os-pipe-on-windows
354 from ctypes import windll, byref, wintypes, GetLastError, WinError
355 from ctypes.wintypes import HANDLE, DWORD, POINTER, BOOL
357 LPDWORD = POINTER(DWORD)
358 PIPE_NOWAIT = wintypes.DWORD(0x00000001)
361 SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
362 SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
363 SetNamedPipeHandleState.restype = BOOL
365 h = msvcrt.get_osfhandle(pipefd)
367 res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
375 def __init__(self, path):
376 self.name = os.path.basename(path.strip('/'))
381 def add_file(self, file):
382 if file.startswith(self.name + '/'):
383 file = file.replace(self.name + '/', '', 1)
386 # This file is in a subdir
387 subdir = file.split('/')[0]
388 if subdir in self.subdirs:
389 self.subdirs[subdir].add_file(file)
391 self.subdirs[subdir] = Dir(os.path.join(self.fullpath, subdir))
392 self.subdirs[subdir].add_file(file)
394 self.files.append(file)
397 def get_subdirs(self, path=None):
399 if path and path != '' and path != './':
400 subdir = path.split('/')[0]
401 if subdir in self.subdirs:
402 searchpath = '/'.join(path.split('/')[1::])
403 subdirs = self.subdirs[subdir].get_subdirs(searchpath)
404 subdirs = list(map(lambda subsubdir: os.path.join(subdir, subsubdir), subdirs))
406 subdirs = self.subdirs
410 def get_subdirs_recursively(self, path=None):
412 if path and path != '' and path != './':
413 subdir = path.split('/')[0]
414 if subdir in self.subdirs:
415 searchpath = '/'.join(path.split('/')[1::])
416 subdirs = self.subdirs[subdir].get_subdirs_recursively(searchpath)
418 subdirs = list(self.subdirs.keys())
420 for key, val in self.subdirs.items():
421 subdirs.extend(map(lambda subdir: key + '/' + subdir, val.get_subdirs_recursively()))
426 def get_files(self, path=None):
428 if path and path != '' and path != './':
429 subdir = path.split('/')[0]
430 if subdir in self.subdirs:
431 searchpath = '/'.join(path.split('/')[1::])
432 files = self.subdirs[subdir].get_files(searchpath)
438 def get_files_recursively(self, path=None):
440 if path and path != '' and path != './':
441 subdir = path.split('/')[0]
442 if subdir in self.subdirs:
443 searchpath = '/'.join(path.split('/')[1::])
444 files = self.subdirs[subdir].get_files_recursively(searchpath)
448 for key, val in self.subdirs.items():
449 files.extend(map(lambda file: key + '/' + file, val.get_files_recursively()))
453 def render_text(self, ident=0):
454 print('{}{}/'.format(' ' * (ident * 4), self.name))
455 for key, val in self.subdirs.items():
456 val.render_text(ident + 1)
457 for file in self.files:
458 print('{}{}'.format(' ' * (ident + 1) * 4, file))
461 # Parse the html from the message to get the URL
463 def get_url_from_input(string):
464 if string.startswith('http'):
466 p = re.compile('href="(.+?)"', re.IGNORECASE)
467 res = re.search(p, string)
473 def youtube_search(query):
477 r = requests.get("https://www.youtube.com/results", params={'search_query': query}, timeout=5)
478 results = re.findall("watch\?v=(.*?)\".*?title=\"(.*?)\".*?"
479 "(?:user|channel).*?>(.*?)<", r.text) # (id, title, uploader)
484 except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError, requests.exceptions.Timeout) as e:
485 error_traceback = traceback.format_exc().split("During")[0]
486 log.error("util: youtube query failed with error:\n %s" % error_traceback)