|
|
import time |
|
|
import json |
|
|
from typing import Any, Callable, Union |
|
|
import mysql.connector |
|
|
import subprocess |
|
|
|
|
|
|
|
|
|
|
|
class CaesarSQL: |
|
|
def __init__(self,host:str='localhost',user:str='root',password:str="temp123") -> None: |
|
|
|
|
|
self.connection = mysql.connector.connect( |
|
|
host=host, |
|
|
user=user, |
|
|
password = password, |
|
|
autocommit=True |
|
|
) |
|
|
|
|
|
def check_exists(self,result :Any): |
|
|
|
|
|
try: |
|
|
if len(result) == 0: |
|
|
return False |
|
|
else: |
|
|
return True |
|
|
except Exception as poe: |
|
|
return False |
|
|
|
|
|
def load_json_file(self,filename:str): |
|
|
|
|
|
with open(filename) as f: |
|
|
datajson = json.load(f) |
|
|
return datajson |
|
|
|
|
|
def fetch(self,result:Any): |
|
|
|
|
|
return result |
|
|
|
|
|
def json_to_sql(self,datajson :Union[dict,list]): |
|
|
|
|
|
if type(datajson) == list: |
|
|
columns = str(tuple(datajson[0].keys())).replace("'","") |
|
|
values = str(tuple(tuple(data.values()) for data in datajson))[1:-1] |
|
|
return columns,values |
|
|
elif type(datajson) == dict: |
|
|
columns = str(tuple(datajson.keys())).replace("'","") |
|
|
values = str(tuple(datajson.values())).replace("'","") |
|
|
return columns,values |
|
|
else: |
|
|
print("JSON is invalid data shape.") |
|
|
return None,None |
|
|
|
|
|
def run_command(self,sqlcommand : str = None,result_function : Callable =None,datatuple : tuple =None,filename :str = None,verbose:int=1): |
|
|
|
|
|
if verbose == 1: |
|
|
if self.connection.is_connected(): |
|
|
db_Info = self.connection.get_server_info() |
|
|
print("Connected to MySQL Server version ", db_Info) |
|
|
if sqlcommand == None and filename == None: |
|
|
print("Please input an SQL command or SQL filename.") |
|
|
else: |
|
|
if filename != None: |
|
|
with open(filename) as f: |
|
|
sqlcommand = f.read() |
|
|
try: |
|
|
with self.connection.cursor() as cursor: |
|
|
|
|
|
cursor.execute(sqlcommand,datatuple,multi=True) |
|
|
result = cursor.fetchall() |
|
|
|
|
|
if result_function != None: |
|
|
new_result = result_function(result) |
|
|
elif result_function == None: |
|
|
new_result = None |
|
|
|
|
|
|
|
|
if verbose == 1: |
|
|
print("SQL command executed.") |
|
|
return new_result |
|
|
except Exception as poe: |
|
|
print(f"{type(poe)} - {poe}") |
|
|
|
|
|
def sql_to_json(self,table,sqldata :tuple): |
|
|
|
|
|
columnsinfo = self.run_command(f"DESCRIBE {table}",self.fetch) |
|
|
columns = [col[0] for col in columnsinfo] |
|
|
|
|
|
final_json = [] |
|
|
for data in sqldata: |
|
|
record = {} |
|
|
for ind in range(len(data)): |
|
|
record.update({data[ind]: columns[ind]} ) |
|
|
final_json.append(record) |
|
|
|
|
|
return {table:final_json} |
|
|
@staticmethod |
|
|
def convert_to_blob(filename :str): |
|
|
|
|
|
with open(filename, 'rb') as file: |
|
|
blobData = file.read() |
|
|
return blobData |
|
|
@staticmethod |
|
|
def start_docker_db(verbose=1): |
|
|
|
|
|
|
|
|
|
|
|
dockercommand = 'docker run --name mysql -p 3306:3306 -v mysql_volume:/var/lib/mysql/ -d -e "MYSQL_ROOT_PASSWORD=temp123" mysql' |
|
|
process = subprocess.Popen(dockercommand.split(), |
|
|
stdout=subprocess.PIPE, |
|
|
stderr=subprocess.PIPE) |
|
|
stdout, stderr = process.communicate() |
|
|
if stderr != b"" and verbose == 1: |
|
|
print(stderr) |
|
|
elif stderr == b"" and verbose == 1: |
|
|
print(stdout) |
|
|
time.sleep(2) |
|
|
return stdout,stderr |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
caesarsql = CaesarSQL() |
|
|
caesarsql.run_command(filename="customer.sql") |
|
|
print("SQL file has been executed.") |
|
|
|
|
|
|
|
|
|