]> git.0d.be Git - botaradio.git/blob - database.py
4ba493fbe39e681b334e0d9c9ad7352ce158ae19
[botaradio.git] / database.py
1 import sqlite3
2 import json
3 import datetime
4
5
6 class DatabaseError(Exception):
7     pass
8
9
10 class SettingsDatabase:
11     version = 1
12
13     def __init__(self, db_path):
14         self.db_path = db_path
15
16         # connect
17         conn = sqlite3.connect(self.db_path)
18         cursor = conn.cursor()
19
20         self.db_version_check_and_create()
21
22         conn.commit()
23         conn.close()
24
25     def has_table(self):
26         conn = sqlite3.connect(self.db_path)
27         cursor = conn.cursor()
28         tables = cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='botamusique';").fetchall()
29         conn.close()
30         if len(tables) == 0:
31             return False
32         return True
33
34     def db_version_check_and_create(self):
35         conn = sqlite3.connect(self.db_path)
36         cursor = conn.cursor()
37
38         if self.has_table():
39             # check version
40             result = cursor.execute("SELECT value FROM botamusique WHERE section=? AND option=?",
41                                     ("bot", "db_version")).fetchall()
42
43             if len(result) == 0 or int(result[0][0]) != self.version:
44                 old_name = "botamusique_old_%s" % datetime.datetime.now().strftime("%Y%m%d")
45                 cursor.execute("ALTER TABLE botamusique RENAME TO %s" % old_name)
46                 conn.commit()
47                 self.create_table()
48                 self.set("bot", "old_db_name", old_name)
49         else:
50             self.create_table()
51
52         conn.close()
53
54     def create_table(self):
55         conn = sqlite3.connect(self.db_path)
56         cursor = conn.cursor()
57         cursor.execute("CREATE TABLE IF NOT EXISTS botamusique ("
58                        "section TEXT, "
59                        "option TEXT, "
60                        "value TEXT, "
61                        "UNIQUE(section, option))")
62         cursor.execute("INSERT INTO botamusique (section, option, value) "
63                        "VALUES (?, ?, ?)", ("bot", "db_version", "1"))
64         cursor.execute("INSERT INTO botamusique (section, option, value) "
65                        "VALUES (?, ?, ?)", ("bot", "music_db_version", "0"))
66         conn.commit()
67         conn.close()
68
69     def get(self, section, option, **kwargs):
70         conn = sqlite3.connect(self.db_path)
71         cursor = conn.cursor()
72         result = cursor.execute("SELECT value FROM botamusique WHERE section=? AND option=?", (section, option)).fetchall()
73         conn.close()
74
75         if len(result) > 0:
76             return result[0][0]
77         else:
78             if 'fallback' in kwargs:
79                 return kwargs['fallback']
80             else:
81                 raise DatabaseError("Item not found")
82
83     def getboolean(self, section, option, **kwargs):
84         return bool(int(self.get(section, option, **kwargs)))
85
86     def getfloat(self, section, option, **kwargs):
87         return float(self.get(section, option, **kwargs))
88
89     def getint(self, section, option, **kwargs):
90         return int(self.get(section, option, **kwargs))
91
92     def set(self, section, option, value):
93         conn = sqlite3.connect(self.db_path)
94         cursor = conn.cursor()
95         cursor.execute("INSERT OR REPLACE INTO botamusique (section, option, value) "
96                        "VALUES (?, ?, ?)", (section, option, value))
97         conn.commit()
98         conn.close()
99
100     def has_option(self, section, option):
101         conn = sqlite3.connect(self.db_path)
102         cursor = conn.cursor()
103         result = cursor.execute("SELECT value FROM botamusique WHERE section=? AND option=?", (section, option)).fetchall()
104         conn.close()
105         if len(result) > 0:
106             return True
107         else:
108             return False
109
110     def remove_option(self, section, option):
111         conn = sqlite3.connect(self.db_path)
112         cursor = conn.cursor()
113         cursor.execute("DELETE FROM botamusique WHERE section=? AND option=?", (section, option))
114         conn.commit()
115         conn.close()
116
117     def remove_section(self, section):
118         conn = sqlite3.connect(self.db_path)
119         cursor = conn.cursor()
120         cursor.execute("DELETE FROM botamusique WHERE section=?", (section, ))
121         conn.commit()
122         conn.close()
123
124     def items(self, section):
125         conn = sqlite3.connect(self.db_path)
126         cursor = conn.cursor()
127         results = cursor.execute("SELECT option, value FROM botamusique WHERE section=?", (section, )).fetchall()
128         conn.close()
129
130         if len(results) > 0:
131             return list(map(lambda v: (v[0], v[1]), results))
132         else:
133             return []
134
135     def drop_table(self):
136         conn = sqlite3.connect(self.db_path)
137         cursor = conn.cursor()
138         cursor.execute("DROP TABLE botamusique")
139         conn.close()
140
141
142 class MusicDatabase:
143     def __init__(self, db_path):
144         self.db_path = db_path
145
146         # connect
147         conn = sqlite3.connect(self.db_path)
148         cursor = conn.cursor()
149
150         # check if table exists, or create one
151         cursor.execute("CREATE TABLE IF NOT EXISTS music ("
152                        "id TEXT PRIMARY KEY, "
153                        "type TEXT, "
154                        "title TEXT, "
155                        "metadata TEXT, "
156                        "tags TEXT)")
157         conn.commit()
158         conn.close()
159
160     def insert_music(self, music_dict):
161         conn = sqlite3.connect(self.db_path)
162         cursor = conn.cursor()
163
164         id = music_dict['id']
165         title = music_dict['title']
166         type = music_dict['type']
167         tags = ",".join(music_dict['tags']) + ","
168
169         del music_dict['id']
170         del music_dict['title']
171         del music_dict['type']
172         del music_dict['tags']
173
174         cursor.execute("INSERT OR REPLACE INTO music (id, type, title, metadata, tags) VALUES (?, ?, ?, ?, ?)",
175                        (id,
176                         type,
177                         title,
178                         json.dumps(music_dict),
179                         tags))
180
181         conn.commit()
182         conn.close()
183
184     def query_all_ids(self):
185         conn = sqlite3.connect(self.db_path)
186         cursor = conn.cursor()
187         results = cursor.execute("SELECT id FROM music").fetchall()
188         conn.close()
189         return list(map(lambda i: i[0], results))
190
191     def query_all_tags(self):
192         conn = sqlite3.connect(self.db_path)
193         cursor = conn.cursor()
194         results = cursor.execute("SELECT tags FROM music").fetchall()
195         tags = []
196         for result in results:
197             for tag in result[0].strip(",").split(","):
198                 if tag and tag not in tags:
199                     tags.append(tag)
200         conn.close()
201         return tags
202
203     def query_music(self, **kwargs):
204         condition = []
205         filler = []
206
207         for key, value in kwargs.items():
208             if isinstance(value, str):
209                 condition.append(key + "=?")
210                 filler.append(value)
211             elif isinstance(value, dict):
212                 condition.append(key + " " + value[0] + " ?")
213                 filler.append(value[1])
214
215         condition_str = " AND ".join(condition)
216
217         conn = sqlite3.connect(self.db_path)
218         cursor = conn.cursor()
219         results = cursor.execute("SELECT id, type, title, metadata, tags FROM music "
220                                  "WHERE %s" % condition_str, filler).fetchall()
221         conn.close()
222
223         return self._result_to_dict(results)
224
225     def query_music_by_keywords(self, keywords):
226         condition = []
227         filler = []
228
229         for keyword in keywords:
230             condition.append('LOWER(title) LIKE ?')
231             filler.append("%{:s}%".format(keyword.lower()))
232
233         condition_str = " AND ".join(condition)
234
235         conn = sqlite3.connect(self.db_path)
236         cursor = conn.cursor()
237         results = cursor.execute("SELECT id, type, title, metadata, tags FROM music "
238                                  "WHERE %s" % condition_str, filler).fetchall()
239         conn.close()
240
241         return self._result_to_dict(results)
242
243     def query_music_by_tags(self, tags):
244         condition = []
245         filler = []
246
247         for tag in tags:
248             condition.append('LOWER(tags) LIKE ?')
249             filler.append("%{:s},%".format(tag.lower()))
250
251         condition_str = " AND ".join(condition)
252
253         conn = sqlite3.connect(self.db_path)
254         cursor = conn.cursor()
255         results = cursor.execute("SELECT id, type, title, metadata, tags FROM music "
256                                  "WHERE %s" % condition_str, filler).fetchall()
257         conn.close()
258
259         return self._result_to_dict(results)
260
261     def query_tags_by_ids(self, ids):
262         condition_str = " OR ".join(['id=?'] * len(ids))
263
264         conn = sqlite3.connect(self.db_path)
265         cursor = conn.cursor()
266         results = cursor.execute("SELECT id, tags FROM music "
267                                  "WHERE %s" % condition_str, ids).fetchall()
268         conn.close()
269
270         lookup = {}
271         if len(results) > 0:
272             for result in results:
273                 id = result[0]
274                 tags = result[1].strip(",").split(",")
275                 lookup[id] = tags if tags[0] else []
276
277         return lookup
278
279     def query_random_music(self, count):
280         conn = sqlite3.connect(self.db_path)
281         cursor = conn.cursor()
282         results = cursor.execute("SELECT id, type, title, metadata, tags FROM music "
283                                  "WHERE id IN (SELECT id FROM music ORDER BY RANDOM() LIMIT ?)", (count,)).fetchall()
284         conn.close()
285
286         return self._result_to_dict(results)
287
288     def _result_to_dict(self, results):
289         if len(results) > 0:
290             music_dicts = []
291             for result in results:
292                 music_dict = json.loads(result[3])
293                 music_dict['type'] = result[1]
294                 music_dict['title'] = result[2]
295                 music_dict['id'] = result[0]
296                 music_dict['tags'] = result[4].strip(",").split(",")
297                 if not music_dict['tags'][0]:
298                     music_dict['tags'] = []
299
300                 music_dicts.append(music_dict)
301
302             return music_dicts
303         else:
304             return []
305
306     def delete_music(self, **kwargs):
307         condition = []
308         filler = []
309
310         for key, value in kwargs.items():
311             if isinstance(value, str):
312                 condition.append(key + "=?")
313                 filler.append(value)
314             else:
315                 condition.append(key + " " + value[0] + " ?")
316                 filler.append(value[1])
317
318         condition_str = " AND ".join(condition)
319
320         conn = sqlite3.connect(self.db_path)
321         cursor = conn.cursor()
322         cursor.execute("DELETE FROM music "
323                        "WHERE %s" % condition_str, filler)
324         conn.commit()
325         conn.close()
326
327     def drop_table(self):
328         conn = sqlite3.connect(self.db_path)
329         cursor = conn.cursor()
330         cursor.execute("DROP TABLE music")
331         conn.close()