# Hello # Red # Blue from box.timer import Timer timer = Timer() from box.common import print_red_line, make_string_green, truncate, Timer from box.reltools import pathjoin from box.dbutils import get_sqlite3 from enum import IntEnum timer = Timer() import sys def table_exists(cursor, table_name): statement = f"SELECT * FROM sqlite_master WHERE type='table' AND name='{table_name}'" D = cursor.execute(statement).fetchall() return True if D else False def dump_list(L): # print("dump_list", L) import json if L is None: return '[]' else: return json.dumps(L) def load_json_bytes(json_bytes): print("load_json_bytes", json_bytes, type(json_bytes)) import json if json_bytes is None: return [] else: return json.loads(json_bytes) def db_init_old(db_path): sqlite3 = get_sqlite3() sqlite3.register_adapter(list, dump_list) sqlite3.register_converter("LIST", load_json_bytes) import os db_path = pathjoin(sys._getframe(1), db_path) os.makedirs(os.path.dirname(db_path), exist_ok=True) connection = sqlite3.connect(db_path, detect_types=sqlite3.PARSE_DECLTYPES) connection.row_factory = sqlite3.Row cursor = connection.cursor() return cursor, connection def db_init(db_path, check_same_thread=False): """ db_path: str Returns: cursor, connection Creates directories if they do not exist By default check_same_thread is False which means that the connection can be shared across threads """ import os sqlite3 = get_sqlite3() db_path = pathjoin(sys._getframe(1), db_path) os.makedirs(os.path.dirname(db_path), exist_ok=True) connection = sqlite3.connect(db_path, check_same_thread=check_same_thread) connection.row_factory = sqlite3.Row # connection can be configured after creation, e.g.: # # [1] connection.row_factory = sqlite3.Row # [2] connection.check_same_thread = False return connection def jsonify_lists(D): import json list_keys = [ key for key, value in D.items() if type(value) == list ] for key in list_keys: L = D[key] D[key] = json.dumps(L) return D # flatten_dict({ "vim": "god", "css": "okay" }, "skills") # -> # [ ("skills.vim", "god"), ("skills.css", "okay") ] def flatten_dict(D, parent_keys=[]): pairs = [] for k, v in D.items(): if type(v) == dict: pairs = pairs + flatten_dict(v, parent_keys+[k]) else: new_k = ".".join(parent_keys+[k]) pairs.append((new_k, v)) return pairs def prepare_dict(D): pairs = flatten_dict(D) # 1.10, 60% D = { k: v for k, v in pairs } # 0.26, 15% # D = jsonify_lists(D) # 0.45, 25% return D def is_reserved(key): # 147 Words "ABORT" ... "WITHOUT" sqlite_reserved_words = [ "ABORT", "ACTION", "ADD", "AFTER", "ALL", "ALTER", "ALWAYS", "ANALYZE", "AND", "AS", "ASC", "ATTACH", "AUTOINCREMENT", "BEFORE", "BEGIN", "BETWEEN", "BY", "CASCADE", "CASE", "CAST", "CHECK", "COLLATE", "COLUMN", "COMMIT", "CONFLICT", "CONSTRAINT", "CREATE", "CROSS", "CURRENT", "CURRENT_DATE", "CURRENT_TIME", "CURRENT_TIMESTAMP", "DATABASE", "DEFAULT", "DEFERRABLE", "DEFERRED", "DELETE", "DESC", "DETACH", "DISTINCT", "DO", "DROP", "EACH", "ELSE", "END", "ESCAPE", "EXCEPT", "EXCLUDE", "EXCLUSIVE", "EXISTS", "EXPLAIN", "FAIL", "FILTER", "FIRST", "FOLLOWING", "FOR", "FOREIGN", "FROM", "FULL", "GENERATED", "GLOB", "GROUP", "GROUPS", "HAVING", "IF", "IGNORE", "IMMEDIATE", "IN", "INDEX", "INDEXED", "INITIALLY", "INNER", "INSERT", "INSTEAD", "INTERSECT", "INTO", "IS", "ISNULL", "JOIN", "KEY", "LAST", "LEFT", "LIKE", "LIMIT", "MATCH", "MATERIALIZED", "NATURAL", "NO", "NOT", "NOTHING", "NOTNULL", "NULL", "NULLS", "OF", "OFFSET", "ON", "OR", "ORDER", "OTHERS", "OUTER", "OVER", "PARTITION", "PLAN", "PRAGMA", "PRECEDING", "PRIMARY", "QUERY", "RAISE", "RANGE", "RECURSIVE", "REFERENCES", "REGEXP", "REINDEX", "RELEASE", "RENAME", "REPLACE", "RESTRICT", "RETURNING", "RIGHT", "ROLLBACK", "ROW", "ROWS", "SAVEPOINT", "SELECT", "SET", "TABLE", "TEMP", "TEMPORARY", "THEN", "TIES", "TO", "TRANSACTION", "TRIGGER", "UNBOUNDED", "UNION", "UNIQUE", "UPDATE", "USING", "VACUUM", "VALUES", "VIEW", "VIRTUAL", "WHEN", "WHERE", "WINDOW", "WITH", "WITHOUT" ] key_upper = key.upper() return key_upper in sqlite_reserved_words def escape_key_if_required(key): if "." in key or is_reserved(key): key = f'"{key}"' return key def escaped_keys(input_keys): keys = [ escape_key_if_required(key) for key in input_keys ] return keys class SqliteType(IntEnum): TEXT = 1 REAL = 2 INTEGER = 3 BLOB = 4 # ideally 2 but confuses from_text LIST = 5 # ideally 2 but confuses from_text @staticmethod def from_text(arg): D = { "TEXT" : SqliteType.TEXT, "REAL" : SqliteType.REAL, "INTEGER" : SqliteType.INTEGER, "INT" : SqliteType.INTEGER, "BLOB" : SqliteType.BLOB, "LIST" : SqliteType.LIST } return D[arg] def as_column_str(x): D = { str : "TEXT", float : "REAL", int : "INTEGER", bytes : "BLOB", list : "LIST", SqliteType.TEXT : "TEXT", SqliteType.REAL : "REAL", SqliteType.INTEGER : "INTEGER", SqliteType.BLOB: "BLOB", SqliteType.LIST : "LIST", type(None) : "TEXT", bool : "INTEGER" } if x not in D.keys(): x = type(x) return D[x] class Schema: "TEXT (null)" "TEXT (genuine)" "INTEGER" "REAL" "BLOB" "LIST" def __init__(self, arg=None): sqlite3 = get_sqlite3() if arg == [] or arg == None: self.D = {} else: items = arg if type(arg) == list else [ arg ] is_pragma = type(items[0]) == sqlite3.Row if is_pragma: self.D = { row_D["name"] : SqliteType.from_text(row_D["type"]) for row_D in items } else: self.D = self.dictionaries_to_schema(items) def dictionaries_to_schema(self, dictionaries): items = [] for D in dictionaries: for T in D.items(): items.append(T) # timer.start("Implementation 1") # D1 = {} # content_offset_count = 0 # for k, v in items: # v1 = D1.get(k, SqliteType.TEXT) # 0.8, v1 = SqliteType.TEXT, INTEGER, REAL # column_str = as_column_str(v) # 2.3, as_column_str = "TEXT, "INTEGER", "REAL" # v2 = SqliteType.from_text(column_str) # 3.7, v2 = SqliteType.TEXT, INTEGER, REAL # D1[k] = max(v1, v2) # 0.8, D1[k] = SqliteType.TEXT, INTEGER, REAL # if k == "content_offset_seconds": # content_offset_count = content_offset_count + 1 # # if content_offset_count < 20: # if v2 == 3: # print(f"[{content_offset_count}] content_offset_seconds: {v}, v1: {v1}, v2: {v2}") # timer.print("Implementation 1") finalized_column_names = [] type2sqlite = { type(None): SqliteType.TEXT, str: SqliteType.TEXT, float: SqliteType.REAL, int: SqliteType.INTEGER, bool: SqliteType.INTEGER, bytes: SqliteType.BLOB, list: SqliteType.LIST } schema_D = {} for column_name, value in items: if column_name in finalized_column_names: continue value_type = type(value) schema_D[column_name] = type2sqlite[value_type] if value != None and value_type != int: finalized_column_names.append(column_name) return schema_D def join(self, with_types=False): if not with_types: return ", ".join(escaped_keys(self.keys())) return ", ".join(escape_key_if_required(key)+" "+as_column_str(value) for key, value in self.items()) def column_definitions(self, constraint_D={}): # https://www.sqlite.org/syntax/column-constraint.html column_definitions = [] for key, value in self.items(): column_L = [ escape_key_if_required(key), as_column_str(value) ] if constraint_D.get(key): column_L = [ escape_key_if_required(key), as_column_str(value), constraint_D.get(key)] else: column_L = [ escape_key_if_required(key), as_column_str(value) ] column_definition = " ".join(column_L) column_definitions.append(column_definition) return ", ".join(column_definitions) def keys(self): return self.D.keys() def values(self): return self.D.values() def items(self): return self.D.items() def get(self, key): return self.D.get(key) def __getitem__(self, key): return self.D[key] def __setitem__(self, key, value): self.D[key] = value def __len__(self): return len(self.D) def __repr__(self): return repr(self.D) def __eq__(self, rhs_schema): return self.D == rhs_schema.D def __str__(self): if len(self.D) == 0: return "Empty Schema" max_length = max(len(key) for key in self.D.keys()) column_gap = 2 separator = " " * column_gap gutter_length = 2 gutter_string = " " * gutter_length lines = [] for k, v in self.D.items(): key_string = str(k).ljust(max_length) value_string = str(v) lines.append(gutter_string + key_string + separator + value_string) lines = [ "" ] + lines + [ "" ] return "\n".join(lines) def get_schema(cursor, table_name): schema_query = cursor.execute(f"PRAGMA table_info('{table_name}');") rows = cursor.fetchall() schema = Schema(rows) return schema def batch_execute(cursor, statements, print_statements=True): for statement in statements: cursor.execute(statement) if print_statements: print(statement) print() def add_new_columns(cursor, table_name, dictionary_schema): table_schema = get_schema(cursor, table_name) new_columns_schema = Schema() new_columns_schema.D = { k: v for k, v in dictionary_schema.items() if table_schema.get(k) == None } new_column_statements = [ f"ALTER TABLE {table_name} ADD COLUMN {escape_key_if_required(k)} {as_column_str(v)};" for k, v in new_columns_schema.items() ] return new_column_statements def modify_existing_column_types(cursor, table_name, dictionary_schema, constraint_D={}): def get_random_four_digit_hex(): from random import randint min_hex = 4096 max_hex = 65535 return hex(randint(min_hex, max_hex))[2:] # We don’t check if table_schema has a key in dictionary_schema as this is meant to be run AFTER add_new_columns # There should be no key in dictionary that is not there in the table schema table_schema = get_schema(cursor, table_name) changed_columns = [ key for key in dictionary_schema.keys() if table_schema[key] != dictionary_schema[key] ] if len(changed_columns) == 0: return [] for key in dictionary_schema.keys(): table_schema[key] = max(dictionary_schema[key], table_schema[key]) temp_table_name = f"tmp_{table_name}_{get_random_four_digit_hex()}" create_table_statement = f"CREATE TABLE {temp_table_name} ({table_schema.column_definitions(constraint_D=constraint_D)});" key_string = table_schema.join() insert_statement = f"INSERT INTO {temp_table_name}({key_string}) SELECT {key_string} FROM {table_name};" drop_statement = f"DROP TABLE {table_name};" rename_statement = f"ALTER TABLE {temp_table_name} RENAME TO {table_name};" return [ create_table_statement, insert_statement, drop_statement, rename_statement ] def get_insert_pair_with_on_conflict_clause(D, table_name): keys = D.keys() values = tuple(D[key] for key in keys) keys = escaped_keys(keys) column_name_string = ", ".join(keys) question_mark_string = ", ".join(["?"] * len(keys)) prefixed_keys = [ f"excluded.{key}" for key in keys ] prefixed_column_name_string = ", ".join(prefixed_keys) statement = f""" INSERT INTO {table_name} ({column_name_string}) VALUES ({question_mark_string}) ON CONFLICT DO UPDATE SET ({column_name_string})=({prefixed_column_name_string}); """.strip() return (statement, values) # TRIGGER def get_create_trigger_statement(cursor, table_name, trigger_name, vcs_table_name, date_column_name="vcs_date_epoch_utc_seconds"): def get_primary_key(cursor, table_name): schema_query = cursor.execute(f"SELECT name FROM pragma_table_info('{table_name}') WHERE pk=1;") row = cursor.fetchone() primary_key = row["name"] return primary_key primary_key = get_primary_key(cursor, table_name) table_schema = get_schema(cursor, table_name) unescaped_column_names = table_schema.keys() clauses = [] column_names = [] column_operations = [] for unescaped_column_name in unescaped_column_names: escaped_column_name = escape_key_if_required(unescaped_column_name) old_prefix = "old."+escaped_column_name new_prefix = "new."+escaped_column_name condition = f"{old_prefix} IS NOT {new_prefix}" column_operation = f"NULLIF({old_prefix}, {new_prefix})" if unescaped_column_name != primary_key and unescaped_column_name != date_column_name: clauses.append(condition) column_names.append(escaped_column_name) column_operations.append(column_operation) else: column_names.append(escaped_column_name) column_operations.append(old_prefix) if len(clauses) == 0: return "" condition_string = " OR ".join(clauses) column_name_string = ", ".join(column_names) column_operation_string = ", ".join(column_operations) create_trigger_statement = f""" CREATE TRIGGER {trigger_name} BEFORE UPDATE ON {table_name} WHEN {condition_string} BEGIN INSERT INTO {vcs_table_name} ({column_name_string}) VALUES ( {column_operation_string} ); END; """.strip() if column_name_string == "": print_red_line("column_name_string is EMPTY") breakpoint() return create_trigger_statement # TRIGGER def check_if_trigger_exists(cursor, table_name, trigger_name): query = f"SELECT COUNT(*) trigger_count FROM sqlite_master WHERE type='trigger' AND tbl_name='{table_name}' AND name='{trigger_name}';" row = cursor.execute(query).fetchone() trigger_count = row["trigger_count"] return trigger_count > 0 # TRIGGER def add_date_column(dictionaries): from time import time epoch_utc = int(time()) date_column_name = "vcs_date_epoch_utc_seconds" for D in dictionaries: D[date_column_name] = epoch_utc return dictionaries def insert_dictionaries(cursor, table_name, dictionaries, constraint_D={}, vcs=False): if dictionaries == []: return import threading global lock lock = threading.Lock() lock.acquire() print_statements = False dictionaries = [ prepare_dict(D) for D in dictionaries ] if vcs: dictionaries = add_date_column(dictionaries) dictionary_schema = Schema(dictionaries) table_schema = get_schema(cursor, table_name) table_does_not_exist = len(table_schema) == 0 # 1. CREATE TABLE IF IT DOES NOT EXIST if table_does_not_exist: create_table_statement = f"CREATE TABLE {table_name} ({dictionary_schema.column_definitions(constraint_D=constraint_D)});" cursor.execute(create_table_statement) if print_statements: print(create_table_statement) # 2. ADD NEW COLUMNS add_column_statements = add_new_columns(cursor, table_name, dictionary_schema) batch_execute(cursor, add_column_statements, print_statements) # 3. MODIFY UPDATED COLUMN TYPES modify_column_statements = modify_existing_column_types(cursor, table_name, dictionary_schema, constraint_D=constraint_D) batch_execute(cursor, modify_column_statements, print_statements) if vcs: # 4.1. SETUP VCS TABLE {{{ vcs_print_statements = False vcs_table_name = f"{table_name}_vcs" vcs_table_schema = get_schema(cursor, vcs_table_name) vcs_table_does_not_exist = len(vcs_table_schema) == 0 if vcs_table_does_not_exist: create_vcs_table_statement = f"CREATE TABLE {vcs_table_name} ({dictionary_schema.column_definitions()});" cursor.execute(create_vcs_table_statement) if vcs_print_statements: print(create_vcs_table_statement) vcs_add_column_statements = add_new_columns(cursor, vcs_table_name, dictionary_schema) batch_execute(cursor, vcs_add_column_statements, vcs_print_statements) vcs_modify_column_statements = modify_existing_column_types(cursor, vcs_table_name, dictionary_schema) batch_execute(cursor, vcs_modify_column_statements, vcs_print_statements) # }}} # 4.2. ADD VCS TRIGGER trigger_name = f"update_trigger_{table_name}_vcs" if len(vcs_add_column_statements) > 0 or len(vcs_modify_column_statements) > 0: # SNIPPET 1 for debug cursor.execute(f"DROP TRIGGER IF EXISTS {trigger_name}") if vcs_print_statements: print(f"DROP TRIGGER IF EXISTS {trigger_name}") trigger_exists = check_if_trigger_exists(cursor, table_name, trigger_name) if not trigger_exists: primary_key_supplied = len([ value for value in constraint_D.values() if value.index("PRIMARY KEY") > -1 ]) == 1 if primary_key_supplied: create_trigger_statement = get_create_trigger_statement(cursor, table_name, trigger_name, vcs_table_name) cursor.execute(create_trigger_statement) if vcs_print_statements: print(create_trigger_statement) # 5. INSERT for D in dictionaries: insert_statement, values = get_insert_pair_with_on_conflict_clause(D, table_name) try: cursor.execute(insert_statement, values ) except Exception as e: print() print("[ERROR]", insert_statement) raise if print_statements: print(insert_statement, make_string_green(truncate(str(values), 200))) print() lock.release() def version(): sqlite3 = get_sqlite3() pysqlite_version = sqlite3.version sqlite_version = sqlite3.sqlite_version print(f"PYSQLITE: {pysqlite_version}") print(f"SQLITE : {sqlite_version}") def main(): from tdd import test_ic_custom_types test_ic_custom_types() pass # list_type() if __name__ == "__main__": # import sys; main(); sys.exit(1) from box.error import handler with handler(): main()