]> git.0d.be Git - botaradio.git/commitdiff
refactor: replace urllib into requests in radio.py
authorTerry Geng <gengyanda@gmail.com>
Mon, 2 Mar 2020 11:02:56 +0000 (19:02 +0800)
committerTerry Geng <gengyanda@gmail.com>
Mon, 2 Mar 2020 11:02:56 +0000 (19:02 +0800)
media/radio.py

index ae749aa29ba8311b0bd9e36d4dff7c54e914f614..c4fc6487186b3de65e84ca11203ec0fd785952cc 100644 (file)
@@ -1,13 +1,16 @@
 import re
-import urllib.request
-import urllib.error
 import logging
 import json
 import http.client
 import struct
+import requests
+import traceback
 
+log = logging.getLogger("bot")
 
 def get_radio_server_description(url):
+    global log
+
     p = re.compile('(https?\:\/\/[^\/]*)', re.IGNORECASE)
     res = re.search(p, url)
     base_url = res.group(1)
@@ -15,59 +18,52 @@ def get_radio_server_description(url):
     url_shoutcast = base_url + '/stats?json=1'
     title_server = None
     try:
-        request = urllib.request.Request(url_shoutcast)
-        response = urllib.request.urlopen(request)
-        data = json.loads(response.read().decode("utf-8"))
+        r = requests.get(url_shoutcast, timeout=5)
+        data = r.json()
         title_server = data['servertitle']
+        return title_server
         # logging.info("TITLE FOUND SHOUTCAST: " + title_server)
-    except urllib.error.HTTPError:
-        pass
-    except http.client.BadStatusLine:
-        pass
+    except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError, requests.exceptions.Timeout) as e:
+        error_traceback = traceback.format_exc()
+        error = error_traceback.rstrip().split("\n")[-1]
+        log.debug("radio: unsuccessful attempts on fetching radio description (shoutcast): " + error)
     except ValueError:
-        return False
+        return False # ?
 
-    if not title_server:
-        try:
-            request = urllib.request.Request(url_icecast)
-            response = urllib.request.urlopen(request)
-            response_data = response.read().decode('utf-8', errors='ignore')
-            if response_data:
-                data = json.loads(response_data, strict=False)
-                source = data['icestats']['source']
-                if type(source) is list:
-                    source = source[0]
-                title_server = source['server_name']
-                if 'server_description' in source:
-                    title_server += ' - ' + source['server_description']
-                # logging.info("TITLE FOUND ICECAST: " + title_server)
-                if not title_server:
-                    title_server = url
-        except urllib.error.URLError:
-            title_server = url
-        except http.client.BadStatusLine:
-            pass
-    return title_server
+    try:
+        r = requests.get(url_icecast, timeout=5)
+        data = r.json()
+        source = data['icestats']['source']
+        if type(source) is list:
+            source = source[0]
+        title_server = source['server_name']
+        if 'server_description' in source:
+            title_server += ' - ' + source['server_description']
+        # logging.info("TITLE FOUND ICECAST: " + title_server)
+        return title_server
+    except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError, requests.exceptions.Timeout) as e:
+        error_traceback = traceback.format_exc()
+        error = error_traceback.rstrip().split("\n")[-1]
+        log.debug("radio: unsuccessful attempts on fetching radio description (icecast): " + error)
+
+    return url
 
 
 def get_radio_title(url):
-    request = urllib.request.Request(url, headers={'Icy-MetaData': 1})
     try:
+        r = requests.get(url, headers={'Icy-MetaData': '1'}, stream=True, timeout=5)
+        icy_metaint_header = int(r.headers['icy-metaint'])
+        r.raw.read(icy_metaint_header)
 
-        response = urllib.request.urlopen(request)
-        icy_metaint_header = int(response.headers['icy-metaint'])
-        if icy_metaint_header is not None:
-            response.read(icy_metaint_header)
-
-            metadata_length = struct.unpack('B', response.read(1))[0] * 16  # length byte
-            metadata = response.read(metadata_length).rstrip(b'\0')
-            logging.info(metadata)
-            # extract title from the metadata
-            m = re.search(br"StreamTitle='([^']*)';", metadata)
-            if m:
-                title = m.group(1)
-                if title:
-                    return title.decode()
-    except (urllib.error.URLError, urllib.error.HTTPError, http.client.BadStatusLine):
+        metadata_length = struct.unpack('B', r.raw.read(1))[0] * 16  # length byte
+        metadata = r.raw.read(metadata_length).rstrip(b'\0')
+        logging.info(metadata)
+        # extract title from the metadata
+        m = re.search(br"StreamTitle='([^']*)';", metadata)
+        if m:
+            title = m.group(1)
+            if title:
+                return title.decode()
+    except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e:
         pass
-    return 'Unknown title'
+    return url