Spaces:
Runtime error
Runtime error
| # Hello | |
| # Red | |
| # Blue | |
| from box.timer import Timer | |
| timer = Timer() | |
| from box.common import print_red_line, make_string_green, truncate, Timer | |
| from box.reltools import pathjoin | |
| from box.dbutils import get_sqlite3 | |
| from enum import IntEnum | |
| timer = Timer() | |
| import sys | |
| def table_exists(cursor, table_name): | |
| statement = f"SELECT * FROM sqlite_master WHERE type='table' AND name='{table_name}'" | |
| D = cursor.execute(statement).fetchall() | |
| return True if D else False | |
| def dump_list(L): | |
| # print("dump_list", L) | |
| import json | |
| if L is None: | |
| return '[]' | |
| else: | |
| return json.dumps(L) | |
| def load_json_bytes(json_bytes): | |
| print("load_json_bytes", json_bytes, type(json_bytes)) | |
| import json | |
| if json_bytes is None: | |
| return [] | |
| else: | |
| return json.loads(json_bytes) | |
| def db_init_old(db_path): | |
| sqlite3 = get_sqlite3() | |
| sqlite3.register_adapter(list, dump_list) | |
| sqlite3.register_converter("LIST", load_json_bytes) | |
| import os | |
| db_path = pathjoin(sys._getframe(1), db_path) | |
| os.makedirs(os.path.dirname(db_path), exist_ok=True) | |
| connection = sqlite3.connect(db_path, detect_types=sqlite3.PARSE_DECLTYPES) | |
| connection.row_factory = sqlite3.Row | |
| cursor = connection.cursor() | |
| return cursor, connection | |
| def db_init(db_path, check_same_thread=False): | |
| """ | |
| db_path: str | |
| Returns: cursor, connection | |
| Creates directories if they do not exist | |
| By default check_same_thread is False which means that the connection can be shared across threads | |
| """ | |
| import os | |
| sqlite3 = get_sqlite3() | |
| db_path = pathjoin(sys._getframe(1), db_path) | |
| os.makedirs(os.path.dirname(db_path), exist_ok=True) | |
| connection = sqlite3.connect(db_path, check_same_thread=check_same_thread) | |
| connection.row_factory = sqlite3.Row | |
| # connection can be configured after creation, e.g.: | |
| # | |
| # [1] connection.row_factory = sqlite3.Row | |
| # [2] connection.check_same_thread = False | |
| return connection | |
| def jsonify_lists(D): | |
| import json | |
| list_keys = [ key for key, value in D.items() if type(value) == list ] | |
| for key in list_keys: | |
| L = D[key] | |
| D[key] = json.dumps(L) | |
| return D | |
| # flatten_dict({ "vim": "god", "css": "okay" }, "skills") | |
| # -> | |
| # [ ("skills.vim", "god"), ("skills.css", "okay") ] | |
| def flatten_dict(D, parent_keys=[]): | |
| pairs = [] | |
| for k, v in D.items(): | |
| if type(v) == dict: | |
| pairs = pairs + flatten_dict(v, parent_keys+[k]) | |
| else: | |
| new_k = ".".join(parent_keys+[k]) | |
| pairs.append((new_k, v)) | |
| return pairs | |
| def prepare_dict(D): | |
| pairs = flatten_dict(D) # 1.10, 60% | |
| D = { k: v for k, v in pairs } # 0.26, 15% | |
| # D = jsonify_lists(D) # 0.45, 25% | |
| return D | |
| def is_reserved(key): | |
| # 147 Words "ABORT" ... "WITHOUT" | |
| sqlite_reserved_words = [ "ABORT", "ACTION", "ADD", "AFTER", "ALL", "ALTER", "ALWAYS", "ANALYZE", "AND", "AS", "ASC", "ATTACH", "AUTOINCREMENT", "BEFORE", "BEGIN", "BETWEEN", "BY", "CASCADE", "CASE", "CAST", "CHECK", "COLLATE", "COLUMN", "COMMIT", "CONFLICT", "CONSTRAINT", "CREATE", "CROSS", "CURRENT", "CURRENT_DATE", "CURRENT_TIME", "CURRENT_TIMESTAMP", "DATABASE", "DEFAULT", "DEFERRABLE", "DEFERRED", "DELETE", "DESC", "DETACH", "DISTINCT", "DO", "DROP", "EACH", "ELSE", "END", "ESCAPE", "EXCEPT", "EXCLUDE", "EXCLUSIVE", "EXISTS", "EXPLAIN", "FAIL", "FILTER", "FIRST", "FOLLOWING", "FOR", "FOREIGN", "FROM", "FULL", "GENERATED", "GLOB", "GROUP", "GROUPS", "HAVING", "IF", "IGNORE", "IMMEDIATE", "IN", "INDEX", "INDEXED", "INITIALLY", "INNER", "INSERT", "INSTEAD", "INTERSECT", "INTO", "IS", "ISNULL", "JOIN", "KEY", "LAST", "LEFT", "LIKE", "LIMIT", "MATCH", "MATERIALIZED", "NATURAL", "NO", "NOT", "NOTHING", "NOTNULL", "NULL", "NULLS", "OF", "OFFSET", "ON", "OR", "ORDER", "OTHERS", "OUTER", "OVER", "PARTITION", "PLAN", "PRAGMA", "PRECEDING", "PRIMARY", "QUERY", "RAISE", "RANGE", "RECURSIVE", "REFERENCES", "REGEXP", "REINDEX", "RELEASE", "RENAME", "REPLACE", "RESTRICT", "RETURNING", "RIGHT", "ROLLBACK", "ROW", "ROWS", "SAVEPOINT", "SELECT", "SET", "TABLE", "TEMP", "TEMPORARY", "THEN", "TIES", "TO", "TRANSACTION", "TRIGGER", "UNBOUNDED", "UNION", "UNIQUE", "UPDATE", "USING", "VACUUM", "VALUES", "VIEW", "VIRTUAL", "WHEN", "WHERE", "WINDOW", "WITH", "WITHOUT" ] | |
| key_upper = key.upper() | |
| return key_upper in sqlite_reserved_words | |
| def escape_key_if_required(key): | |
| if "." in key or is_reserved(key): | |
| key = f'"{key}"' | |
| return key | |
| def escaped_keys(input_keys): | |
| keys = [ escape_key_if_required(key) for key in input_keys ] | |
| return keys | |
| class SqliteType(IntEnum): | |
| TEXT = 1 | |
| REAL = 2 | |
| INTEGER = 3 | |
| BLOB = 4 # ideally 2 but confuses from_text | |
| LIST = 5 # ideally 2 but confuses from_text | |
| def from_text(arg): | |
| D = { "TEXT" : SqliteType.TEXT, "REAL" : SqliteType.REAL, | |
| "INTEGER" : SqliteType.INTEGER, "INT" : SqliteType.INTEGER, | |
| "BLOB" : SqliteType.BLOB, "LIST" : SqliteType.LIST } | |
| return D[arg] | |
| def as_column_str(x): | |
| D = { str : "TEXT", float : "REAL", int : "INTEGER", bytes : "BLOB", list : "LIST", | |
| SqliteType.TEXT : "TEXT", SqliteType.REAL : "REAL", SqliteType.INTEGER : "INTEGER", SqliteType.BLOB: "BLOB", SqliteType.LIST : "LIST", | |
| type(None) : "TEXT", bool : "INTEGER" } | |
| if x not in D.keys(): | |
| x = type(x) | |
| return D[x] | |
| class Schema: | |
| "TEXT (null)" | |
| "TEXT (genuine)" | |
| "INTEGER" | |
| "REAL" | |
| "BLOB" | |
| "LIST" | |
| def __init__(self, arg=None): | |
| sqlite3 = get_sqlite3() | |
| if arg == [] or arg == None: | |
| self.D = {} | |
| else: | |
| items = arg if type(arg) == list else [ arg ] | |
| is_pragma = type(items[0]) == sqlite3.Row | |
| if is_pragma: | |
| self.D = { row_D["name"] : SqliteType.from_text(row_D["type"]) for row_D in items } | |
| else: | |
| self.D = self.dictionaries_to_schema(items) | |
| def dictionaries_to_schema(self, dictionaries): | |
| items = [] | |
| for D in dictionaries: | |
| for T in D.items(): | |
| items.append(T) | |
| # timer.start("Implementation 1") | |
| # D1 = {} | |
| # content_offset_count = 0 | |
| # for k, v in items: | |
| # v1 = D1.get(k, SqliteType.TEXT) # 0.8, v1 = SqliteType.TEXT, INTEGER, REAL | |
| # column_str = as_column_str(v) # 2.3, as_column_str = "TEXT, "INTEGER", "REAL" | |
| # v2 = SqliteType.from_text(column_str) # 3.7, v2 = SqliteType.TEXT, INTEGER, REAL | |
| # D1[k] = max(v1, v2) # 0.8, D1[k] = SqliteType.TEXT, INTEGER, REAL | |
| # if k == "content_offset_seconds": | |
| # content_offset_count = content_offset_count + 1 | |
| # # if content_offset_count < 20: | |
| # if v2 == 3: | |
| # print(f"[{content_offset_count}] content_offset_seconds: {v}, v1: {v1}, v2: {v2}") | |
| # timer.print("Implementation 1") | |
| finalized_column_names = [] | |
| type2sqlite = { type(None): SqliteType.TEXT, str: SqliteType.TEXT, float: SqliteType.REAL, int: SqliteType.INTEGER, bool: SqliteType.INTEGER, bytes: SqliteType.BLOB, list: SqliteType.LIST } | |
| schema_D = {} | |
| for column_name, value in items: | |
| if column_name in finalized_column_names: | |
| continue | |
| value_type = type(value) | |
| schema_D[column_name] = type2sqlite[value_type] | |
| if value != None and value_type != int: | |
| finalized_column_names.append(column_name) | |
| return schema_D | |
| def join(self, with_types=False): | |
| if not with_types: | |
| return ", ".join(escaped_keys(self.keys())) | |
| return ", ".join(escape_key_if_required(key)+" "+as_column_str(value) for key, value in self.items()) | |
| def column_definitions(self, constraint_D={}): | |
| # https://www.sqlite.org/syntax/column-constraint.html | |
| column_definitions = [] | |
| for key, value in self.items(): | |
| column_L = [ escape_key_if_required(key), as_column_str(value) ] | |
| if constraint_D.get(key): | |
| column_L = [ escape_key_if_required(key), as_column_str(value), constraint_D.get(key)] | |
| else: | |
| column_L = [ escape_key_if_required(key), as_column_str(value) ] | |
| column_definition = " ".join(column_L) | |
| column_definitions.append(column_definition) | |
| return ", ".join(column_definitions) | |
| def keys(self): | |
| return self.D.keys() | |
| def values(self): | |
| return self.D.values() | |
| def items(self): | |
| return self.D.items() | |
| def get(self, key): | |
| return self.D.get(key) | |
| def __getitem__(self, key): | |
| return self.D[key] | |
| def __setitem__(self, key, value): | |
| self.D[key] = value | |
| def __len__(self): | |
| return len(self.D) | |
| def __repr__(self): | |
| return repr(self.D) | |
| def __eq__(self, rhs_schema): | |
| return self.D == rhs_schema.D | |
| def __str__(self): | |
| if len(self.D) == 0: | |
| return "Empty Schema" | |
| max_length = max(len(key) for key in self.D.keys()) | |
| column_gap = 2 | |
| separator = " " * column_gap | |
| gutter_length = 2 | |
| gutter_string = " " * gutter_length | |
| lines = [] | |
| for k, v in self.D.items(): | |
| key_string = str(k).ljust(max_length) | |
| value_string = str(v) | |
| lines.append(gutter_string + key_string + separator + value_string) | |
| lines = [ "" ] + lines + [ "" ] | |
| return "\n".join(lines) | |
| def get_schema(cursor, table_name): | |
| schema_query = cursor.execute(f"PRAGMA table_info('{table_name}');") | |
| rows = cursor.fetchall() | |
| schema = Schema(rows) | |
| return schema | |
| def batch_execute(cursor, statements, print_statements=True): | |
| for statement in statements: | |
| cursor.execute(statement) | |
| if print_statements: | |
| print(statement) | |
| print() | |
| def add_new_columns(cursor, table_name, dictionary_schema): | |
| table_schema = get_schema(cursor, table_name) | |
| new_columns_schema = Schema() | |
| new_columns_schema.D = { k: v for k, v in dictionary_schema.items() if table_schema.get(k) == None } | |
| new_column_statements = [ f"ALTER TABLE {table_name} ADD COLUMN {escape_key_if_required(k)} {as_column_str(v)};" for k, v in new_columns_schema.items() ] | |
| return new_column_statements | |
| def modify_existing_column_types(cursor, table_name, dictionary_schema, constraint_D={}): | |
| def get_random_four_digit_hex(): | |
| from random import randint | |
| min_hex = 4096 | |
| max_hex = 65535 | |
| return hex(randint(min_hex, max_hex))[2:] | |
| # We don’t check if table_schema has a key in dictionary_schema as this is meant to be run AFTER add_new_columns | |
| # There should be no key in dictionary that is not there in the table schema | |
| table_schema = get_schema(cursor, table_name) | |
| changed_columns = [ key for key in dictionary_schema.keys() if table_schema[key] != dictionary_schema[key] ] | |
| if len(changed_columns) == 0: | |
| return [] | |
| for key in dictionary_schema.keys(): | |
| table_schema[key] = max(dictionary_schema[key], table_schema[key]) | |
| temp_table_name = f"tmp_{table_name}_{get_random_four_digit_hex()}" | |
| create_table_statement = f"CREATE TABLE {temp_table_name} ({table_schema.column_definitions(constraint_D=constraint_D)});" | |
| key_string = table_schema.join() | |
| insert_statement = f"INSERT INTO {temp_table_name}({key_string}) SELECT {key_string} FROM {table_name};" | |
| drop_statement = f"DROP TABLE {table_name};" | |
| rename_statement = f"ALTER TABLE {temp_table_name} RENAME TO {table_name};" | |
| return [ create_table_statement, insert_statement, drop_statement, rename_statement ] | |
| def get_insert_pair_with_on_conflict_clause(D, table_name): | |
| keys = D.keys() | |
| values = tuple(D[key] for key in keys) | |
| keys = escaped_keys(keys) | |
| column_name_string = ", ".join(keys) | |
| question_mark_string = ", ".join(["?"] * len(keys)) | |
| prefixed_keys = [ f"excluded.{key}" for key in keys ] | |
| prefixed_column_name_string = ", ".join(prefixed_keys) | |
| statement = f""" | |
| INSERT INTO {table_name} ({column_name_string}) VALUES ({question_mark_string}) | |
| ON CONFLICT DO UPDATE SET ({column_name_string})=({prefixed_column_name_string}); | |
| """.strip() | |
| return (statement, values) | |
| # TRIGGER | |
| def get_create_trigger_statement(cursor, table_name, trigger_name, vcs_table_name, date_column_name="vcs_date_epoch_utc_seconds"): | |
| def get_primary_key(cursor, table_name): | |
| schema_query = cursor.execute(f"SELECT name FROM pragma_table_info('{table_name}') WHERE pk=1;") | |
| row = cursor.fetchone() | |
| primary_key = row["name"] | |
| return primary_key | |
| primary_key = get_primary_key(cursor, table_name) | |
| table_schema = get_schema(cursor, table_name) | |
| unescaped_column_names = table_schema.keys() | |
| clauses = [] | |
| column_names = [] | |
| column_operations = [] | |
| for unescaped_column_name in unescaped_column_names: | |
| escaped_column_name = escape_key_if_required(unescaped_column_name) | |
| old_prefix = "old."+escaped_column_name | |
| new_prefix = "new."+escaped_column_name | |
| condition = f"{old_prefix} IS NOT {new_prefix}" | |
| column_operation = f"NULLIF({old_prefix}, {new_prefix})" | |
| if unescaped_column_name != primary_key and unescaped_column_name != date_column_name: | |
| clauses.append(condition) | |
| column_names.append(escaped_column_name) | |
| column_operations.append(column_operation) | |
| else: | |
| column_names.append(escaped_column_name) | |
| column_operations.append(old_prefix) | |
| if len(clauses) == 0: | |
| return "" | |
| condition_string = " OR ".join(clauses) | |
| column_name_string = ", ".join(column_names) | |
| column_operation_string = ", ".join(column_operations) | |
| create_trigger_statement = f""" | |
| CREATE TRIGGER {trigger_name} | |
| BEFORE UPDATE | |
| ON {table_name} | |
| WHEN {condition_string} | |
| BEGIN | |
| INSERT INTO {vcs_table_name} ({column_name_string}) VALUES ( | |
| {column_operation_string} | |
| ); | |
| END; | |
| """.strip() | |
| if column_name_string == "": | |
| print_red_line("column_name_string is EMPTY") | |
| breakpoint() | |
| return create_trigger_statement | |
| # TRIGGER | |
| def check_if_trigger_exists(cursor, table_name, trigger_name): | |
| query = f"SELECT COUNT(*) trigger_count FROM sqlite_master WHERE type='trigger' AND tbl_name='{table_name}' AND name='{trigger_name}';" | |
| row = cursor.execute(query).fetchone() | |
| trigger_count = row["trigger_count"] | |
| return trigger_count > 0 | |
| # TRIGGER | |
| def add_date_column(dictionaries): | |
| from time import time | |
| epoch_utc = int(time()) | |
| date_column_name = "vcs_date_epoch_utc_seconds" | |
| for D in dictionaries: | |
| D[date_column_name] = epoch_utc | |
| return dictionaries | |
| def insert_dictionaries(cursor, table_name, dictionaries, constraint_D={}, vcs=False): | |
| if dictionaries == []: | |
| return | |
| import threading | |
| global lock | |
| lock = threading.Lock() | |
| lock.acquire() | |
| print_statements = False | |
| dictionaries = [ prepare_dict(D) for D in dictionaries ] | |
| if vcs: | |
| dictionaries = add_date_column(dictionaries) | |
| dictionary_schema = Schema(dictionaries) | |
| table_schema = get_schema(cursor, table_name) | |
| table_does_not_exist = len(table_schema) == 0 | |
| # 1. CREATE TABLE IF IT DOES NOT EXIST | |
| if table_does_not_exist: | |
| create_table_statement = f"CREATE TABLE {table_name} ({dictionary_schema.column_definitions(constraint_D=constraint_D)});" | |
| cursor.execute(create_table_statement) | |
| if print_statements: | |
| print(create_table_statement) | |
| # 2. ADD NEW COLUMNS | |
| add_column_statements = add_new_columns(cursor, table_name, dictionary_schema) | |
| batch_execute(cursor, add_column_statements, print_statements) | |
| # 3. MODIFY UPDATED COLUMN TYPES | |
| modify_column_statements = modify_existing_column_types(cursor, table_name, dictionary_schema, constraint_D=constraint_D) | |
| batch_execute(cursor, modify_column_statements, print_statements) | |
| if vcs: | |
| # 4.1. SETUP VCS TABLE {{{ | |
| vcs_print_statements = False | |
| vcs_table_name = f"{table_name}_vcs" | |
| vcs_table_schema = get_schema(cursor, vcs_table_name) | |
| vcs_table_does_not_exist = len(vcs_table_schema) == 0 | |
| if vcs_table_does_not_exist: | |
| create_vcs_table_statement = f"CREATE TABLE {vcs_table_name} ({dictionary_schema.column_definitions()});" | |
| cursor.execute(create_vcs_table_statement) | |
| if vcs_print_statements: | |
| print(create_vcs_table_statement) | |
| vcs_add_column_statements = add_new_columns(cursor, vcs_table_name, dictionary_schema) | |
| batch_execute(cursor, vcs_add_column_statements, vcs_print_statements) | |
| vcs_modify_column_statements = modify_existing_column_types(cursor, vcs_table_name, dictionary_schema) | |
| batch_execute(cursor, vcs_modify_column_statements, vcs_print_statements) | |
| # }}} | |
| # 4.2. ADD VCS TRIGGER | |
| trigger_name = f"update_trigger_{table_name}_vcs" | |
| if len(vcs_add_column_statements) > 0 or len(vcs_modify_column_statements) > 0: # SNIPPET 1 for debug | |
| cursor.execute(f"DROP TRIGGER IF EXISTS {trigger_name}") | |
| if vcs_print_statements: | |
| print(f"DROP TRIGGER IF EXISTS {trigger_name}") | |
| trigger_exists = check_if_trigger_exists(cursor, table_name, trigger_name) | |
| if not trigger_exists: | |
| primary_key_supplied = len([ value for value in constraint_D.values() if value.index("PRIMARY KEY") > -1 ]) == 1 | |
| if primary_key_supplied: | |
| create_trigger_statement = get_create_trigger_statement(cursor, table_name, trigger_name, vcs_table_name) | |
| cursor.execute(create_trigger_statement) | |
| if vcs_print_statements: | |
| print(create_trigger_statement) | |
| # 5. INSERT | |
| for D in dictionaries: | |
| insert_statement, values = get_insert_pair_with_on_conflict_clause(D, table_name) | |
| try: | |
| cursor.execute(insert_statement, values ) | |
| except Exception as e: | |
| print() | |
| print("[ERROR]", insert_statement) | |
| raise | |
| if print_statements: | |
| print(insert_statement, make_string_green(truncate(str(values), 200))) | |
| print() | |
| lock.release() | |
| def version(): | |
| sqlite3 = get_sqlite3() | |
| pysqlite_version = sqlite3.version | |
| sqlite_version = sqlite3.sqlite_version | |
| print(f"PYSQLITE: {pysqlite_version}") | |
| print(f"SQLITE : {sqlite_version}") | |
| def main(): | |
| from tdd import test_ic_custom_types | |
| test_ic_custom_types() | |
| pass | |
| # list_type() | |
| if __name__ == "__main__": | |
| # import sys; main(); sys.exit(1) | |
| from box.error import handler | |
| with handler(): | |
| main() | |