# coding=utf-8
import logging
-import os.path
import pymumble.pymumble_py3 as pymumble
import re
import variables as var
from librb import radiobrowser
from database import SettingsDatabase, MusicDatabase
-from media.item import item_builders, item_loaders, item_id_generators, dict_to_item, dicts_to_items
+from media.item import item_id_generators, dict_to_item, dicts_to_items
from media.cache import get_cached_wrapper_from_scrap, get_cached_wrapper_by_id, get_cached_wrappers_by_tags
-from media.file import FileItem
-from media.url_from_playlist import PlaylistURLItem, get_playlist_info
-from media.url import URLItem
-from media.radio import RadioItem
+from media.url_from_playlist import get_playlist_info
log = logging.getLogger("bot")
def cmd_play_file_match(bot, user, text, command, parameter, do_not_refresh_cache=False):
global log
- music_folder = var.music_folder
if parameter:
files = var.cache.files
msgs = [constants.strings('multiple_file_added') + "<ul>"]
global log
# Allow to remove specific music into the queue with a number
- if parameter and parameter.isdigit() and int(parameter) > 0 \
- and int(parameter) <= len(var.playlist):
+ if parameter and parameter.isdigit() and 0 < int(parameter) <= len(var.playlist):
index = int(parameter) - 1
- removed = None
if index == var.playlist.current_index:
removed = var.playlist.remove(index)
else:
msgs = [constants.strings('queue_contents')]
for i, music in enumerate(var.playlist):
- newline = ''
tags = ''
if len(music.item().tags) > 0:
tags = "<sup>{}</sup>".format(", ".join(music.item().tags))
global log
params = parameter.split()
- index = ""
- tags = []
if len(params) == 2:
index = params[0]
tags = list(map(lambda t: t.strip(), params[1].split(",")))
params = parameter.split()
- index = ""
- tags = []
if len(params) == 2:
index = params[0]
tags = list(map(lambda t: t.strip(), params[1].split(",")))
def cmd_shortlist(bot, user, text, command, parameter):
global song_shortlist, log
- indexes = []
try:
indexes = [int(i) for i in parameter.split(" ")]
except ValueError:
def cmd_delete_from_library(bot, user, text, command, parameter):
global song_shortlist, log
- indexes = []
try:
indexes = [int(i) for i in parameter.split(" ")]
except ValueError:
def strings(option, *argv, **kwargs):
- string = ""
try:
string = var.config.get("strings", option)
- except KeyError as e:
- raise KeyError("Missed strings in configuration file: '{string}'. ".format(string=option) +
- "Please restore you configuration file back to default if necessary.")
+ except KeyError:
+ raise KeyError("Missed strings in configuration file: '{string}'. ".format(string=option)
+ + "Please restore you configuration file back to default if necessary.")
if argv or kwargs:
try:
formatted = string.format(*argv, **kwargs)
return formatted
except KeyError as e:
raise KeyError(
- "Missed/Unexpected placeholder {{{placeholder}}} in string '{string}'. ".format(placeholder=str(e).strip("'"), string=option) +
- "Please restore you configuration file back to default if necessary.")
- except TypeError as e:
+ "Missed/Unexpected placeholder {{{placeholder}}} in string '{string}'. ".format(placeholder=str(e).strip("'"), string=option)
+ + "Please restore you configuration file back to default if necessary.")
+ except TypeError:
raise KeyError(
- "Missed placeholder in string '{string}'. ".format(string=option) +
- "Please restore you configuration file back to default if necessary.")
+ "Missed placeholder in string '{string}'. ".format(string=option)
+ + "Please restore you configuration file back to default if necessary.")
else:
return string
def commands(command):
- string = ""
try:
string = var.config.get("commands", command)
return string
- except KeyError as e:
- raise KeyError("Missed command in configuration file: '{string}'. ".format(string=command) +
- "Please restore you configuration file back to default if necessary.")
+ except KeyError:
+ raise KeyError("Missed command in configuration file: '{string}'. ".format(string=command)
+ + "Please restore you configuration file back to default if necessary.")
# connect
conn = sqlite3.connect(self.db_path)
- cursor = conn.cursor()
self.db_version_check_and_create()
class ReverseProxied(object):
- '''Wrap the application in this middleware and configure the
+ """Wrap the application in this middleware and configure the
front-end server to add these headers, to let you quietly bind
this to a URL other than / and to an HTTP scheme that is
different than what is used locally.
}
:param app: the WSGI application
- '''
+ """
def __init__(self, app):
self.app = app
import requests
-from xml.etree import ElementTree
-from urllib.parse import urljoin
-
from librb.rbConstants import endpoints, BASE_URL
import logging
import os
-from database import MusicDatabase
import json
import threading
-from media.item import item_builders, item_loaders, item_id_generators, dict_to_item, dicts_to_items
+from media.item import item_builders, item_id_generators, dict_to_item
from database import MusicDatabase
import variables as var
import util
self.log = logging.getLogger("bot")
self.dir = None
self.files = []
+ self.file_id_lookup = {}
self.dir_lock = threading.Lock()
def get_item_by_id(self, bot, id): # Why all these functions need a bot? Because it need the bot to send message!
self.dir_lock.acquire()
self.log.info("library: rebuild directory cache")
self.files = []
- self.file_id_lookup = {}
files = util.get_recursive_file_list_sorted(var.music_folder)
self.dir = util.Dir(var.music_folder)
for file in files:
-import logging
import os
import re
from io import BytesIO
import hashlib
import mutagen
from PIL import Image
-import json
-import util
import variables as var
from media.item import BaseItem, item_builders, item_loaders, item_id_generators
import constants
return True
def _get_info_from_tag(self):
- match = re.search("(.+)\.(.+)", self.uri())
+ match = re.search(r"(.+)\.(.+)", self.uri())
assert match is not None
file_no_ext = match[1]
import logging
-import threading
-import os
-import re
-from io import BytesIO
-import base64
-import hashlib
-import mutagen
-from PIL import Image
-
-import util
-import variables as var
item_builders = {}
item_loaders = {}
def to_dict(self):
return {"type": self.type, "id": self.id, "ready": self.ready, "path": self.path, "tags": self.tags}
-
-
if len(self) == 0:
self.refresh()
return super().next()
-
global log
log.debug("radio: fetching radio server description")
- p = re.compile('(https?\:\/\/[^\/]*)', re.IGNORECASE)
+ p = re.compile('(https?://[^/]*)', re.IGNORECASE)
res = re.search(p, url)
base_url = res.group(1)
url_icecast = base_url + '/status-json.xsl'
url_shoutcast = base_url + '/stats?json=1'
- title_server = None
try:
r = requests.get(url_shoutcast, timeout=10)
data = r.json()
except (requests.exceptions.ConnectionError,
requests.exceptions.HTTPError,
requests.exceptions.ReadTimeout,
- requests.exceptions.Timeout) as e:
+ requests.exceptions.Timeout):
error_traceback = traceback.format_exc()
error = error_traceback.rstrip().split("\n")[-1]
log.debug("radio: unsuccessful attempts on fetching radio description (shoutcast): " + error)
except (requests.exceptions.ConnectionError,
requests.exceptions.HTTPError,
requests.exceptions.ReadTimeout,
- requests.exceptions.Timeout) as e:
+ requests.exceptions.Timeout):
error_traceback = traceback.format_exc()
error = error_traceback.rstrip().split("\n")[-1]
log.debug("radio: unsuccessful attempts on fetching radio description (icecast): " + error)
requests.exceptions.HTTPError,
requests.exceptions.ReadTimeout,
requests.exceptions.Timeout,
- KeyError) as e:
+ KeyError):
log.debug("radio: unsuccessful attempts on fetching radio title (icy)")
return url
def display_type(self):
return constants.strings("radio")
-
-
-
self.ready = "preparing"
self.log.info("bot: downloading url (%s) %s " % (self.title, self.url))
- ydl_opts = ""
-
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': save_path,
for i in range(attempts):
self.log.info("bot: download attempts %d / %d" % (i+1, attempts))
try:
- info = ydl.extract_info(self.url)
+ ydl.extract_info(self.url)
download_succeed = True
break
except:
import youtube_dl
import constants
-import media
import variables as var
-import hashlib
from media.item import item_builders, item_loaders, item_id_generators
from media.url import URLItem, url_item_id_generator
import os.path
import pymumble.pymumble_py3 as pymumble
import variables as var
-import hashlib
-import youtube_dl
import logging
import logging.handlers
import traceback
import command
import constants
from database import SettingsDatabase, MusicDatabase
-import media.url
-import media.file
-import media.radio
import media.system
from media.playlist import BasePlaylist
from media.cache import MusicCache
# setup logger
werkzeug_logger = logging.getLogger('werkzeug')
logfile = util.solve_filepath(var.config.get('webinterface', 'web_logfile'))
- handler = None
if logfile:
handler = logging.handlers.RotatingFileHandler(logfile, mode='a', maxBytes=10240) # Rotate after 10KB
else:
import os
import sys
import variables as var
-import constants
import zipfile
import requests
-import mutagen
import re
import subprocess as sp
import logging
import youtube_dl
from importlib import reload
-from PIL import Image
-from io import BytesIO
from sys import platform
import traceback
import urllib.request
-import base64
-import media
-import media.radio
from packaging import version
log = logging.getLogger("bot")
def pipe_no_wait(pipefd):
- ''' Used to fetch the STDERR of ffmpeg. pipefd is the file descriptor returned from os.pipe()'''
+ """ Used to fetch the STDERR of ffmpeg. pipefd is the file descriptor returned from os.pipe()"""
if platform == "linux" or platform == "linux2" or platform == "darwin":
import fcntl
import os
try:
r = requests.get("https://www.youtube.com/results", params={'search_query': query}, timeout=5)
- results = re.findall("watch\?v=(.*?)\".*?title=\"(.*?)\".*?"
+ results = re.findall(r"watch\?v=(.*?)\".*?title=\"(.*?)\".*?"
"(?:user|channel).*?>(.*?)<", r.text) # (id, title, uploader)
if len(results) > 0:
return results
- except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError, requests.exceptions.Timeout) as e:
+ except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError, requests.exceptions.Timeout):
error_traceback = traceback.format_exc().split("During")[0]
log.error("util: youtube query failed with error:\n %s" % error_traceback)
return False