log = logging.getLogger("bot")
+
def register_all_commands(bot):
bot.register_command(constants.commands('joinme'), cmd_joinme, no_partial_match=False, access_outside_channel=True)
bot.register_command(constants.commands('user_ban'), cmd_user_ban, no_partial_match=True)
bot.register_command('loop', cmd_loop_state, True)
bot.register_command('item', cmd_item, True)
+
def send_multi_lines(bot, lines, text, linebreak="<br />"):
global log
for newline in lines:
msg += br
br = linebreak
- if (len(msg) + len(newline)) > (bot.mumble.get_max_message_length() - 4) != 0: # 4 == len("<br>")
+ if (len(msg) + len(newline)) > (bot.mumble.get_max_message_length() - 4) != 0: # 4 == len("<br>")
bot.send_msg(msg, text)
msg = ""
msg += newline
bot.send_msg(msg, text)
+
# ---------------- Variables -----------------
song_shortlist = []
-# ---------------- Commands ------------------
+# ---------------- Commands ------------------
def cmd_joinme(bot, user, text, command, parameter):
global log
bot.mumble.users[text.actor].send_text_message(constants.strings('not_admin'))
return
+
def cmd_url_ban_list(bot, user, text, command, parameter):
if bot.is_admin(user):
bot.mumble.users[text.actor].send_text_message(util.get_url_ban())
bot.mumble.users[text.actor].send_text_message(constants.strings('not_admin'))
return
+
def cmd_url_unban(bot, user, text, command, parameter):
global log
if len(var.playlist) > 0:
if parameter:
if parameter.isdigit() and 1 <= int(parameter) <= len(var.playlist):
- var.playlist.point_to(int(parameter) - 1 - 1) # First "-1" transfer 12345 to 01234, second "-1"
- # point to the previous item. the loop will next to
- # the one you want
+ # First "-1" transfer 12345 to 01234, second "-1"
+ # point to the previous item. the loop will next to
+ # the one you want
+ var.playlist.point_to(int(parameter) - 1 - 1)
+
bot.interrupt()
else:
bot.send_msg(constants.strings('invalid_index', index=parameter), text)
else:
# try to do a partial match
files = var.cache.files
- matches = [ file for file in files if parameter.lower() in file.lower()]
+ matches = [file for file in files if parameter.lower() in file.lower()]
if len(matches) == 1:
file = matches[0]
music_wrapper = get_cached_wrapper_by_id(bot, var.cache.file_id_lookup[file], user)
bot.send_msg(constants.strings('file_added', item=music_wrapper.format_song_string()))
return
elif len(matches) > 1:
- msgs = [ constants.strings('multiple_matches') ]
+ msgs = [constants.strings('multiple_matches')]
song_shortlist = []
for index, match in enumerate(matches):
id = var.cache.file_id_lookup[match]
music_folder = var.music_folder
if parameter:
files = var.cache.files
- msgs = [ constants.strings('multiple_file_added') + "<ul>"]
+ msgs = [constants.strings('multiple_file_added') + "<ul>"]
count = 0
try:
music_wrappers = []
music_wrappers.append(music_wrapper)
log.info("cmd: add to playlist: " + music_wrapper.format_debug_string())
msgs.append("<li><b>{}</b> ({})</li>".format(music_wrapper.item().title,
- file[:match.span()[0]]
- + "<b style='color:pink'>"
- + file[match.span()[0]: match.span()[1]]
- + "</b>"
- + file[match.span()[1]:]
- ))
+ file[:match.span()[0]]
+ + "<b style='color:pink'>"
+ + file[match.span()[0]: match.span()[1]]
+ + "</b>"
+ + file[match.span()[1]:]
+ ))
if count != 0:
msgs.append("</ul>")
bot.send_msg(constants.strings('bad_parameter', command=command))
-
def cmd_play_playlist(bot, user, text, command, parameter):
global log
msg += "No playable url found for this station, please try another station."
bot.send_msg(msg, text)
+
yt_last_result = []
-yt_last_page = 0 # TODO: if we keep adding global variables, we need to consider sealing all commands up into classes.
+yt_last_page = 0 # TODO: if we keep adding global variables, we need to consider sealing all commands up into classes.
+
def cmd_yt_search(bot, user, text, command, parameter):
global log, yt_last_result, yt_last_page, song_shortlist
else:
bot.send_msg(constants.strings('bad_parameter', command=command), text)
+
def _yt_format_result(results, start, count):
msg = '<table><tr><th width="10%">Index</th><th>Title</th><th width="20%">Uploader</th></tr>'
for index, item in enumerate(results[start:start+count]):
if parameter and parameter.isdigit() and 0 <= int(parameter) <= 100:
bot.volume_set = float(float(parameter) / 100)
bot.send_msg(constants.strings('change_volume',
- volume=int(bot.volume_set * 100), user=bot.mumble.users[text.actor]['name']))
+ volume=int(bot.volume_set * 100), user=bot.mumble.users[text.actor]['name']))
var.db.set('bot', 'volume', str(bot.volume_set))
log.info('cmd: volume set to %d' % (bot.volume_set * 100))
else:
if parameter and parameter.isdigit() and 0 <= int(parameter) <= 100:
bot.ducking_volume = float(float(parameter) / 100)
bot.send_msg(constants.strings('change_ducking_volume',
- volume=int(bot.ducking_volume * 100), user=bot.mumble.users[text.actor]['name']), text)
+ volume=int(bot.ducking_volume * 100), user=bot.mumble.users[text.actor]['name']), text)
# var.db.set('bot', 'volume', str(bot.volume_set))
var.db.set('bot', 'ducking_volume', str(bot.ducking_volume))
log.info('cmd: volume on ducking set to %d' % (bot.ducking_volume * 100))
var.playlist.current_index -= 1
# then the bot will move to next item
- else: # if item deleted is the last item of the queue
+ else: # if item deleted is the last item of the queue
var.playlist.current_index -= 1
if not bot.is_pause:
bot.interrupt()
removed = var.playlist.remove(index)
bot.send_msg(constants.strings('removing_item',
- item=removed.format_short_string()), text)
+ item=removed.format_short_string()), text)
log.info("cmd: delete from playlist: " + removed.format_debug_string())
else:
global log
files = var.cache.files
- msgs = [ constants.strings("multiple_file_found") ]
+ msgs = [constants.strings("multiple_file_found")]
try:
count = 0
for index, file in enumerate(files):
msg = constants.strings('queue_empty')
bot.send_msg(msg, text)
else:
- msgs = [ constants.strings('queue_contents')]
+ msgs = [constants.strings('queue_contents')]
for i, music in enumerate(var.playlist):
newline = ''
tags = ''
tags = "<sup>{}</sup>".format(", ".join(music.item().tags))
if i == var.playlist.current_index:
newline = "<b style='color:orange'>{} ({}) {} </b> {}".format(i + 1, music.display_type(),
- music.format_short_string(), tags)
+ music.format_short_string(), tags)
else:
newline = '<b>{}</b> ({}) {} {}'.format(i + 1, music.display_type(),
- music.format_short_string(), tags)
+ music.format_short_string(), tags)
msgs.append(newline)
send_multi_lines(bot, msgs, text)
+
def cmd_random(bot, user, text, command, parameter):
global log
bot.interrupt()
var.playlist.randomize()
+
def cmd_repeat(bot, user, text, command, parameter):
global log
bot.send_msg(constants.strings("repeat", song=music.format_song_string(), n=str(repeat)), text)
+
def cmd_mode(bot, user, text, command, parameter):
global log
if not parameter:
bot.send_msg(constants.strings("current_mode", mode=var.playlist.mode), text)
return
- if not parameter in ["one-shot", "repeat", "random", "autoplay"]:
+ if parameter not in ["one-shot", "repeat", "random", "autoplay"]:
bot.send_msg(constants.strings('unknown_mode', mode=parameter), text)
else:
var.db.set('playlist', 'playback_mode', parameter)
bot.interrupt()
bot.launch_music()
+
def cmd_play_tags(bot, user, text, command, parameter):
if not parameter:
bot.send_msg(constants.strings('bad_parameter', command=command), text)
log.info("cmd: add to playlist: " + music_wrapper.format_debug_string())
msgs.append("<li><b>{}</b> (<i>{}</i>)</li>".format(music_wrapper.item().title, ", ".join(music_wrapper.item().tags)))
-
if count != 0:
msgs.append("</ul>")
var.playlist.extend(music_wrappers)
if tags[0] != "*":
var.playlist[int(index) - 1].remove_tags(tags)
log.info("cmd: remove tags %s from song %s" % (", ".join(tags),
- var.playlist[int(index) - 1].format_debug_string()))
+ var.playlist[int(index) - 1].format_debug_string()))
bot.send_msg(constants.strings("removed_tags",
tags=", ".join(tags),
song=var.playlist[int(index) - 1].format_short_string()), text)
for item in var.playlist:
item.remove_tags(tags)
log.info("cmd: remove tags %s from song %s" % (", ".join(tags),
- item.format_debug_string()))
+ item.format_debug_string()))
bot.send_msg(constants.strings("removed_tags_from_all", tags=", ".join(tags)), text)
return
else:
bot.send_msg(constants.strings('bad_parameter', command=command), text)
+
def cmd_find_tagged(bot, user, text, command, parameter):
global song_shortlist
else:
bot.send_msg(constants.strings("no_file"), text)
+
def cmd_search_library(bot, user, text, command, parameter):
global song_shortlist
if not parameter:
global song_shortlist, log
indexes = []
try:
- indexes = [ int(i) for i in parameter.split(" ") ]
+ indexes = [int(i) for i in parameter.split(" ")]
except ValueError:
bot.send_msg(constants.strings('bad_parameter', command=command), text)
return
global song_shortlist, log
indexes = []
try:
- indexes = [ int(i) for i in parameter.split(" ") ]
+ indexes = [int(i) for i in parameter.split(" ")]
except ValueError:
bot.send_msg(constants.strings('bad_parameter', command=command), text)
return
if 'id' in music_dict:
music_wrapper = get_cached_wrapper_by_id(bot, music_dict['id'], user)
log.info("cmd: remove from library: " + music_wrapper.format_debug_string())
- msgs.append("<li>[{}] <b>{}</b></li>".format(music_wrapper.item().type ,music_wrapper.item().title))
+ msgs.append("<li>[{}] <b>{}</b></li>".format(music_wrapper.item().type, music_wrapper.item().title))
var.playlist.remove_by_id(music_dict['id'])
var.cache.free_and_delete(music_dict['id'])
count += 1
bot.send_msg(constants.strings('bad_parameter', command=command), text)
+
def cmd_drop_database(bot, user, text, command, parameter):
global log
else:
bot.mumble.users[text.actor].send_text_message(constants.strings('not_admin'))
+
def cmd_refresh_cache(bot, user, text, command, parameter):
global log
if bot.is_admin(user):
else:
bot.mumble.users[text.actor].send_text_message(constants.strings('not_admin'))
+
# Just for debug use
def cmd_real_time_rms(bot, user, text, command, parameter):
bot._display_rms = not bot._display_rms
+
def cmd_loop_state(bot, user, text, command, parameter):
print(bot._loop_status)
+
def cmd_item(bot, user, text, command, parameter):
print(bot.wait_for_downloading)
print(var.playlist.current_item().to_dict())
import variables as var
+
def strings(option, *argv, **kwargs):
string = ""
try:
else:
return string
+
def commands(command):
string = ""
try:
import json
import datetime
+
class DatabaseError(Exception):
pass
+
class SettingsDatabase:
version = 1
+
def __init__(self, db_path):
self.db_path = db_path
"value TEXT, "
"UNIQUE(section, option))")
cursor.execute("INSERT INTO botamusique (section, option, value) "
- "VALUES (?, ?, ?)" , ("bot", "db_version", "1"))
+ "VALUES (?, ?, ?)", ("bot", "db_version", "1"))
cursor.execute("INSERT INTO botamusique (section, option, value) "
- "VALUES (?, ?, ?)" , ("bot", "music_db_version", "0"))
+ "VALUES (?, ?, ?)", ("bot", "music_db_version", "0"))
conn.commit()
conn.close()
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("INSERT OR REPLACE INTO botamusique (section, option, value) "
- "VALUES (?, ?, ?)" , (section, option, value))
+ "VALUES (?, ?, ?)", (section, option, value))
conn.commit()
conn.close()
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
results = cursor.execute("SELECT id, type, title, metadata, tags FROM music "
- "WHERE %s" % condition_str, filler).fetchall()
+ "WHERE %s" % condition_str, filler).fetchall()
conn.close()
return self._result_to_dict(results)
condition.append('LOWER(title) LIKE ?')
filler.append("%{:s}%".format(keyword.lower()))
-
condition_str = " AND ".join(condition)
conn = sqlite3.connect(self.db_path)
condition.append('LOWER(tags) LIKE ?')
filler.append("%{:s},%".format(tag.lower()))
-
condition_str = " AND ".join(condition)
conn = sqlite3.connect(self.db_path)
return self._result_to_dict(results)
-
def _result_to_dict(self, results):
if len(results) > 0:
music_dicts = []
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM music "
- "WHERE %s" % condition_str, filler)
+ "WHERE %s" % condition_str, filler)
conn.commit()
conn.close()
-
def drop_table(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
log = logging.getLogger("bot")
user = 'Remote Control'
+
def init_proxy():
global web
if var.is_proxified:
# https://stackoverflow.com/questions/29725217/password-protect-one-webpage-in-flask-app
+
def check_auth(username, password):
"""This function is called to check if a username /
password combination is valid.
"""
return username == var.config.get("webinterface", "user") and password == var.config.get("webinterface", "password")
+
def authenticate():
"""Sends a 401 response that enables basic auth"""
global log
- return Response(
- 'Could not verify your access level for that URL.\n'
- 'You have to login with proper credentials', 401,
- {'WWW-Authenticate': 'Basic realm="Login Required"'})
+ return Response('Could not verify your access level for that URL.\n'
+ 'You have to login with proper credentials', 401,
+ {'WWW-Authenticate': 'Basic realm="Login Required"'})
+
def requires_auth(f):
@wraps(f)
return f(*args, **kwargs)
return decorated
+
def tag_color(tag):
num = hash(tag) % 8
if num == 0:
elif num == 7:
return "dark"
+
def build_tags_color_lookup():
color_lookup = {}
for tag in var.music_db.query_all_tags():
color_lookup[tag] = tag_color(tag)
-
return color_lookup
+
def build_path_tags_lookup():
path_tags_lookup = {}
ids = list(var.cache.file_id_lookup.values())
return path_tags_lookup
+
def recur_dir(dirobj):
for name, dir in dirobj.get_subdirs().items():
print(dirobj.fullpath + "/" + name)
recur_dir(dir)
+
@web.route("/", methods=['GET'])
@requires_auth
def index():
paused=var.bot.is_pause,
)
+
@web.route("/playlist", methods=['GET'])
@requires_auth
def playlist():
if len(var.playlist) == 0:
return jsonify({'items': [render_template('playlist.html',
- m=False,
- index=-1
- )]
+ m=False,
+ index=-1
+ )]
})
tags_color_lookup = build_tags_color_lookup()
items = []
for index, item_wrapper in enumerate(var.playlist):
- items.append(render_template('playlist.html',
+ items.append(render_template('playlist.html',
index=index,
tags_color_lookup=tags_color_lookup,
m=item_wrapper.item(),
)
)
- return jsonify({ 'items': items })
+ return jsonify({'items': items})
+
def status():
if len(var.playlist) > 0:
if request.method == 'POST':
if request.form:
- log.debug("web: Post request from %s: %s" % ( request.remote_addr, str(request.form)))
+ log.debug("web: Post request from %s: %s" % (request.remote_addr, str(request.form)))
if 'add_file_bottom' in request.form and ".." not in request.form['add_file_bottom']:
path = var.music_folder + request.form['add_file_bottom']
if os.path.isfile(path):
music_wrappers = list(map(
lambda file:
- get_cached_wrapper_by_id(var.bot, var.cache.file_id_lookup[folder + file], user),
- files))
+ get_cached_wrapper_by_id(var.bot, var.cache.file_id_lookup[folder + file], user), files))
var.playlist.extend(music_wrappers)
for music_wrapper in music_wrappers:
log.info('web: add to playlist: ' + music_wrapper.format_debug_string())
-
elif 'add_url' in request.form:
music_wrapper = get_cached_wrapper_from_scrap(var.bot, type='url', url=request.form['add_url'], user=user)
var.playlist.append(music_wrapper)
else:
var.playlist.remove(index)
-
elif 'play_music' in request.form:
music_wrapper = var.playlist[int(request.form['play_music'])]
log.info("web: jump to: " + music_wrapper.format_debug_string())
return status()
+
@web.route('/upload', methods=["POST"])
def upload():
global log
if not files:
return redirect("./", code=406)
- #filename = secure_filename(file.filename).strip()
+ # filename = secure_filename(file.filename).strip()
for file in files:
filename = file.filename
if filename == '':
if "audio" in file.mimetype:
storagepath = os.path.abspath(os.path.join(var.music_folder, targetdir))
- print('storagepath:',storagepath)
+ print('storagepath:', storagepath)
if not storagepath.startswith(os.path.abspath(var.music_folder)):
return redirect("./", code=406)
rb = RadioBrowser()
+
def getstations_byname(query):
results = rb.stations_byname(query)
stations = []
for st in results:
try:
- station = {'stationname': st['name'], 'id':st['id'], 'codec':st['codec'], 'bitrate':st['bitrate'], 'country':st['country'], 'homepage':st['homepage'], 'genre':st['tags']}
+ station = {'stationname': st['name'], 'id': st['id'], 'codec': st['codec'], 'bitrate': st['bitrate'], 'country': st['country'], 'homepage': st['homepage'], 'genre': st['tags']}
stations.append(station)
except:
pass
return stations
+
def geturl_byid(id):
url = rb.playable_station(id)['url']
- if url != None:
+ if url is not None:
return url
else:
return "-1"
+
def getstationname_byid(id):
return rb.stations_byid(id)
-
+
+
if __name__ == "__main__":
r = getstations_byname('r.sh')
stationinfo = getstationname_byid(96748)
- pass
\ No newline at end of file
+ pass
kwargs["params"] = params
endpoint = self.builder.produce_endpoint(endpoint="station_search")
return request(endpoint, **kwargs)
-
\ No newline at end of file
self.files = []
self.dir_lock = threading.Lock()
- def get_item_by_id(self, bot, id): # Why all these functions need a bot? Because it need the bot to send message!
+ def get_item_by_id(self, bot, id): # Why all these functions need a bot? Because it need the bot to send message!
if id in self:
return self[id]
return item
else:
return None
- #print(id)
- #raise KeyError("Unable to fetch item from the database! Please try to refresh the cache by !recache.")
-
+ # print(id)
+ # raise KeyError("Unable to fetch item from the database! Please try to refresh the cache by !recache.")
def get_item(self, bot, **kwargs):
# kwargs should provide type and id, and parameters to build the item if not existed in the library.
return item
# if not in the database, build one
- self[id] = item_builders[kwargs['type']](bot, **kwargs) # newly built item will not be saved immediately
+ self[id] = item_builders[kwargs['type']](bot, **kwargs) # newly built item will not be saved immediately
return self[id]
def get_items_by_tags(self, bot, tags):
ret = []
for item in items:
ret.append(CachedItemWrapper(var.cache, item.id, item.type, user))
- return ret
\ No newline at end of file
+ return ret
user
'''
+
def file_item_builder(bot, **kwargs):
return FileItem(bot, kwargs['path'])
+
def file_item_loader(bot, _dict):
return FileItem(bot, "", _dict)
+
def file_item_id_generator(**kwargs):
return hashlib.md5(kwargs['path'].encode()).hexdigest()
+
item_builders['file'] = file_item_builder
item_loaders['file'] = file_item_loader
item_id_generators['file'] = file_item_id_generator
self.send_client_message(constants.strings('file_missed', file=self.path))
return False
- #self.version += 1 # 0 -> 1, notify the wrapper to save me when validate() is visited the first time
+ # self.version += 1 # 0 -> 1, notify the wrapper to save me when validate() is visited the first time
self.ready = "yes"
return True
def format_song_string(self, user):
return constants.strings("file_item",
- title=self.title,
- artist=self.artist if self.artist else '??',
- user=user
- )
+ title=self.title,
+ artist=self.artist if self.artist else '??',
+ user=user
+ )
def format_current_playing(self, user):
display = constants.strings("now_playing", item=self.format_song_string(user))
if self.thumbnail:
thumbnail_html = '<img width="80" src="data:image/jpge;base64,' + \
self.thumbnail + '"/>'
- display += "<br />" + thumbnail_html
+ display += "<br />" + thumbnail_html
return display
item_loaders = {}
item_id_generators = {}
+
def example_builder(bot, **kwargs):
return BaseItem(bot)
+
def example_loader(bot, _dict):
return BaseItem(bot, from_dict=_dict)
+
def example_id_generator(**kwargs):
return ""
+
item_builders['base'] = example_builder
item_loaders['base'] = example_loader
item_id_generators['base'] = example_id_generator
+
def dicts_to_items(bot, music_dicts):
items = []
for music_dict in music_dicts:
items.append(item_loaders[type](bot, music_dict))
return items
+
def dict_to_item(bot, music_dict):
type = music_dict['type']
return item_loaders[type](bot, music_dict)
self.title = ""
self.path = ""
self.tags = []
- self.version = 0 # if version increase, wrapper will re-save this item
+ self.version = 0 # if version increase, wrapper will re-save this item
if from_dict is None:
self.id = ""
- self.ready = "pending" # pending - is_valid() -> validated - prepare() -> yes, failed
+ self.ready = "pending" # pending - is_valid() -> validated - prepare() -> yes, failed
else:
self.id = from_dict['id']
self.ready = from_dict['ready']
self.bot.send_msg(msg)
def to_dict(self):
- return {"type" : self.type, "id": self.id, "ready": self.ready, "path": self.path, "tags": self.tags}
+ return {"type": self.type, "id": self.id, "ready": self.ready, "path": self.path, "tags": self.tags}
def randomize(self):
# current_index will lose track after shuffling, thus we take current music out before shuffling
- #current = self.current_item()
- #del self[self.current_index]
+ # current = self.current_item()
+ # del self[self.current_index]
random.shuffle(self)
- #self.insert(0, current)
+ # self.insert(0, current)
self.current_index = -1
self.version += 1
var.db.set("playlist", "current_index", self.current_index)
for index, music in enumerate(self):
- var.db.set("playlist_item", str(index), json.dumps({'id': music.id, 'user': music.user }))
+ var.db.set("playlist_item", str(index), json.dumps({'id': music.id, 'user': music.user}))
def load(self):
current_index = var.db.getint("playlist", "current_index", fallback=-1)
def start_async_validating(self):
if not self.validating_thread_lock.locked():
- time.sleep(0.1) # Just avoid validation finishes too fast and delete songs while something is reading it.
+ time.sleep(0.1) # Just avoid validation finishes too fast and delete songs while something is reading it.
th = threading.Thread(target=self._check_valid, name="Validating")
th.daemon = True
th.start()
log = logging.getLogger("bot")
+
def get_radio_server_description(url):
global log
else:
return RadioItem(bot, kwargs['url'], '')
+
def radio_item_loader(bot, _dict):
return RadioItem(bot, "", "", _dict)
+
def radio_item_id_generator(**kwargs):
return hashlib.md5(kwargs['url'].encode()).hexdigest()
+
item_builders['radio'] = radio_item_builder
item_loaders['radio'] = radio_item_loader
item_id_generators['radio'] = radio_item_id_generator
super().__init__(bot)
self.url = url
if not name:
- self.title = get_radio_server_description(self.url) # The title of the radio station
+ self.title = get_radio_server_description(self.url) # The title of the radio station
else:
self.title = name
self.id = hashlib.md5(url.encode()).hexdigest()
self.type = "radio"
def validate(self):
- self.version += 1 # 0 -> 1, notify the wrapper to save me when validate() is visited the first time
+ self.version += 1 # 0 -> 1, notify the wrapper to save me when validate() is visited the first time
return True
def is_ready(self):
def format_song_string(self, user):
return constants.strings("radio_item",
url=self.url,
- title=get_radio_title(self.url), # the title of current song
- name=self.title, # the title of radio station
+ title=get_radio_title(self.url), # the title of current song
+ name=self.title, # the title of radio station
user=user
)
log = logging.getLogger("bot")
+
def url_item_builder(bot, **kwargs):
return URLItem(bot, kwargs['url'])
+
def url_item_loader(bot, _dict):
return URLItem(bot, "", _dict)
+
def url_item_id_generator(**kwargs):
return hashlib.md5(kwargs['url'].encode()).hexdigest()
+
item_builders['url'] = url_item_builder
item_loaders['url'] = url_item_loader
item_id_generators['url'] = url_item_id_generator
return False
else:
self.ready = "validated"
- self.version += 1 # notify wrapper to save me
+ self.version += 1 # notify wrapper to save me
return True
# Run in a other thread
"bot: finished downloading url (%s) %s, saved to %s." % (self.title, self.url, self.path))
self.downloading = False
self._read_thumbnail_from_file(base_path + ".jpg")
- self.version += 1 # notify wrapper to save me
+ self.version += 1 # notify wrapper to save me
return True
else:
for f in glob.glob(base_path + "*"):
return dict
-
def format_debug_string(self):
return "[url] {title} ({url})".format(
title=self.title,
def format_song_string(self, user):
if self.ready in ['validated', 'yes']:
return constants.strings("url_item",
- title=self.title if self.title else "??",
- url=self.url,
- user=user)
+ title=self.title if self.title else "??",
+ url=self.url,
+ user=user)
return self.url
def format_current_playing(self, user):
if self.thumbnail:
thumbnail_html = '<img width="80" src="data:image/jpge;base64,' + \
self.thumbnail + '"/>'
- display += "<br />" + thumbnail_html
+ display += "<br />" + thumbnail_html
return display
from media.item import item_builders, item_loaders, item_id_generators
from media.url import URLItem, url_item_id_generator
+
def get_playlist_info(url, start_index=0, user=""):
items = []
ydl_opts = {
def playlist_url_item_loader(bot, _dict):
return PlaylistURLItem(bot, "", "", "", "", _dict)
+
item_builders['url_from_playlist'] = playlist_url_item_builder
item_loaders['url_from_playlist'] = playlist_url_item_loader
item_id_generators['url_from_playlist'] = url_item_id_generator
def format_song_string(self, user):
return constants.strings("url_from_playlist_item",
- title=self.title,
- url=self.url,
- playlist_url=self.playlist_url,
- playlist=self.playlist_title,
- user=user)
+ title=self.title,
+ url=self.url,
+ playlist_url=self.playlist_url,
+ playlist=self.playlist_title,
+ user=user)
def format_current_playing(self, user):
display = constants.strings("now_playing", item=self.format_song_string(user))
if self.thumbnail:
thumbnail_html = '<img width="80" src="data:image/jpge;base64,' + \
self.thumbnail + '"/>'
- display += "<br />" + thumbnail_html
+ display += "<br />" + thumbnail_html
return display
self.song_start_at = -1
self.last_ffmpeg_err = ""
self.read_pcm_size = 0
- #self.download_threads = []
- self.wait_for_downloading = False # flag for the loop are waiting for download to complete in the other thread
+ # self.download_threads = []
+ self.wait_for_downloading = False # flag for the loop are waiting for download to complete in the other thread
if var.config.getboolean("webinterface", "enabled"):
wi_addr = var.config.get("webinterface", "listening_addr")
self.ducking_volume = var.db.getfloat("bot", "ducking_volume", fallback=self.ducking_volume)
self.ducking_threshold = var.config.getfloat("bot", "ducking_threshold", fallback=5000)
self.ducking_threshold = var.db.getfloat("bot", "ducking_threshold", fallback=self.ducking_threshold)
- self.mumble.callbacks.set_callback(pymumble.constants.PYMUMBLE_CLBK_SOUNDRECEIVED, \
+ self.mumble.callbacks.set_callback(pymumble.constants.PYMUMBLE_CLBK_SOUNDRECEIVED,
self.ducking_sound_received)
self.mumble.set_receive_sound(True)
self.nb_exit += 1
if var.config.getboolean('bot', 'save_playlist', fallback=True) \
- and var.config.get("bot", "save_music_library", fallback=True):
+ and var.config.get("bot", "save_music_library", fallback=True):
self.log.info("bot: save playlist into database")
var.playlist.save()
for command in cmds:
command = command.strip()
if command:
- self.cmd_handle[command] = { 'handle': handle,
- 'partial_match': not no_partial_match,
- 'access_outside_channel': access_outside_channel}
+ self.cmd_handle[command] = {'handle': handle,
+ 'partial_match': not no_partial_match,
+ 'access_outside_channel': access_outside_channel}
self.log.debug("bot: command added: " + command)
def set_comment(self):
constants.strings('url_ban'))
return
-
command_exc = ""
try:
if command in self.cmd_handle:
and not self.is_admin(user) \
and not var.config.getboolean('bot', 'allow_other_channel_message') \
and self.mumble.users[text.actor]['channel_id'] != self.mumble.users.myself[
- 'channel_id']:
+ 'channel_id']:
self.mumble.users[text.actor].send_text_message(
constants.strings('not_in_my_channel'))
return
def launch_music(self):
if var.playlist.is_empty():
return
- assert self.wait_for_downloading == False
+ assert self.wait_for_downloading is False
music_wrapper = var.playlist.current_item()
uri = music_wrapper.uri()
# The ffmpeg process is a thread
# prepare pipe for catching stderr of ffmpeg
pipe_rd, pipe_wd = os.pipe()
- util.pipe_no_wait(pipe_rd) # Let the pipe work in non-blocking mode
+ util.pipe_no_wait(pipe_rd) # Let the pipe work in non-blocking mode
self.thread_stderr = os.fdopen(pipe_rd)
self.thread = sp.Popen(command, stdout=sp.PIPE, stderr=pipe_wd, bufsize=480)
self.is_pause = False
var.playlist.remove_by_id(next.id)
var.cache.free_and_delete(next.id)
-
# =======================
# Loop
# =======================
if rms < self.ducking_threshold:
print('%6d/%6d ' % (rms, self._max_rms) + '-'*int(rms/200), end='\r')
else:
- print('%6d/%6d ' % (rms, self._max_rms) + '-'*int(self.ducking_threshold/200) \
+ print('%6d/%6d ' % (rms, self._max_rms) + '-'*int(self.ducking_threshold/200)
+ '+'*int((rms - self.ducking_threshold)/200), end='\r')
if rms > self.ducking_threshold:
if self.on_ducking is False:
self.log.debug("bot: ducking triggered")
self.on_ducking = True
- self.ducking_release = time.time() + 1 # ducking release after 1s
+ self.ducking_release = time.time() + 1 # ducking release after 1s
# =======================
# Play Control
command = ("ffmpeg", '-v', ffmpeg_debug, '-nostdin', '-ss', "%f" % self.playhead, '-i',
uri, '-ac', '1', '-f', 's16le', '-ar', '48000', '-')
-
if var.config.getboolean('bot', 'announce_current_music'):
self.send_msg(var.playlist.current_item().format_current_playing())
# The ffmpeg process is a thread
# prepare pipe for catching stderr of ffmpeg
pipe_rd, pipe_wd = os.pipe()
- util.pipe_no_wait(pipe_rd) # Let the pipe work in non-blocking mode
+ util.pipe_no_wait(pipe_rd) # Let the pipe work in non-blocking mode
self.thread_stderr = os.fdopen(pipe_rd)
self.thread = sp.Popen(command, stdout=sp.PIPE, stderr=pipe_wd, bufsize=480)
self.last_volume_cycle_time = time.time()
self.pause_at_id = ""
-
# TODO: this is a temporary workaround for issue #44 of pymumble.
def _clear_pymumble_soundqueue(self):
for id, user in self.mumble.users.items():
self.log.debug("bot: pymumble soundqueue cleared.")
-
def start_web_interface(addr, port):
global formatter
import interface
logfile = util.solve_filepath(var.config.get('webinterface', 'web_logfile'))
handler = None
if logfile:
- handler = logging.handlers.RotatingFileHandler(logfile, mode='a', maxBytes=10240) # Rotate after 10KB
+ handler = logging.handlers.RotatingFileHandler(logfile, mode='a', maxBytes=10240) # Rotate after 10KB
else:
handler = logging.StreamHandler()
logfile = util.solve_filepath(var.config.get('bot', 'logfile'))
handler = None
if logfile:
- handler = logging.handlers.RotatingFileHandler(logfile, mode='a', maxBytes=10240) # Rotate after 10KB
+ handler = logging.handlers.RotatingFileHandler(logfile, mode='a', maxBytes=10240) # Rotate after 10KB
else:
handler = logging.StreamHandler()
from io import BytesIO
from sys import platform
import traceback
-import urllib.parse, urllib.request, urllib.error
+import urllib.request
import base64
import media
import media.radio
filelist.sort()
return filelist
+
# - zips all files of the given zippath (must be a directory)
# - returns the absolute path of the created zip file
# - zip file will be in the applications tmp folder (according to configuration)
else:
return False
+
def youtube_search(query):
global log
try:
r = requests.get("https://www.youtube.com/results", params={'search_query': query}, timeout=5)
results = re.findall("watch\?v=(.*?)\".*?title=\"(.*?)\".*?"
- "(?:user|channel).*?>(.*?)<", r.text) # (id, title, uploader)
+ "(?:user|channel).*?>(.*?)<", r.text) # (id, title, uploader)
if len(results) > 0:
return results