|
|
from dotenv import load_dotenv |
|
|
load_dotenv() |
|
|
import time |
|
|
import json |
|
|
from typing import Any, Callable, Union |
|
|
|
|
|
import subprocess |
|
|
import os |
|
|
import MySQLdb |
|
|
import argparse |
|
|
import base64 |
|
|
class CaesarSQL: |
|
|
def __init__(self,host:str='localhost',user:str='root',password:str="temp123") -> None: |
|
|
|
|
|
self.dbdata = {"host": base64.b64decode(os.getenv("HOST")).decode(), |
|
|
"user":base64.b64decode(os.getenv("USERNAMESQL")).decode(), |
|
|
"passwd": base64.b64decode(os.getenv("PASSWORD")).decode(), |
|
|
"db": base64.b64decode(os.getenv("DATABASE")).decode(), |
|
|
"autocommit" : True, |
|
|
"ssl_mode" : "VERIFY_IDENTITY", |
|
|
"ssl" : { |
|
|
"ca": "/etc/ssl/certs/ca-certificates.crt" |
|
|
},} |
|
|
self.connection = MySQLdb.connect( |
|
|
host= self.dbdata["host"], |
|
|
user=self.dbdata["user"], |
|
|
passwd= self.dbdata["passwd"], |
|
|
db= self.dbdata["db"], |
|
|
autocommit = self.dbdata["autocommit"], |
|
|
ssl_mode = self.dbdata["ssl_mode"], |
|
|
ssl = self.dbdata["ssl"] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def reset_connection(self): |
|
|
self.connection = MySQLdb.connect( |
|
|
host= self.dbdata["host"], |
|
|
user=self.dbdata["user"], |
|
|
passwd= self.dbdata["passwd"], |
|
|
db= self.dbdata["db"], |
|
|
autocommit = self.dbdata["autocommit"], |
|
|
ssl_mode = self.dbdata["ssl_mode"], |
|
|
ssl = self.dbdata["ssl"] |
|
|
) |
|
|
|
|
|
def check_exists(self,result :Any): |
|
|
|
|
|
try: |
|
|
if len(result) == 0: |
|
|
return False |
|
|
else: |
|
|
return True |
|
|
except Exception as poe: |
|
|
return False |
|
|
|
|
|
def load_json_file(self,filename:str): |
|
|
|
|
|
with open(filename) as f: |
|
|
datajson = json.load(f) |
|
|
return datajson |
|
|
|
|
|
def fetch(self,result:Any): |
|
|
|
|
|
return result |
|
|
|
|
|
def json_to_sql(self,datajson :Union[dict,list]): |
|
|
|
|
|
if type(datajson) == list: |
|
|
columns = str(tuple(datajson[0].keys())).replace("'","") |
|
|
values = str(tuple(tuple(data.values()) for data in datajson))[1:-1] |
|
|
return columns,values |
|
|
elif type(datajson) == dict: |
|
|
columns = str(tuple(datajson.keys())).replace("'","") |
|
|
values = str(tuple(datajson.values())).replace("'","") |
|
|
return columns,values |
|
|
else: |
|
|
print("JSON is invalid data shape.") |
|
|
return None,None |
|
|
def executeScriptsFromFile(self,filename): |
|
|
fd = open(filename, 'r') |
|
|
sqlFile = fd.read() |
|
|
fd.close() |
|
|
sqlCommands = sqlFile.split(';') |
|
|
print(sqlCommands) |
|
|
with self.connection.cursor() as cursor: |
|
|
for command in sqlCommands: |
|
|
try: |
|
|
if command.strip() != '': |
|
|
print(command) |
|
|
cursor.execute(command.replace("\n","").replace("\n","")) |
|
|
except IOError as ex: |
|
|
print ("Command skipped: ", type(ex),ex ) |
|
|
|
|
|
def run_command(self,sqlcommand : str = None,result_function : Callable =None,datatuple : tuple =None,filename :str = None,verbose:int=1): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if sqlcommand == None and filename == None: |
|
|
print("Please input an SQL command or SQL filename.") |
|
|
else: |
|
|
if filename != None: |
|
|
with open(filename) as f: |
|
|
sqlcommand = f.read() |
|
|
|
|
|
with self.connection.cursor() as cursor: |
|
|
|
|
|
cursor.execute(sqlcommand,datatuple) |
|
|
|
|
|
result = cursor.fetchall() |
|
|
|
|
|
|
|
|
if result_function != None: |
|
|
new_result = result_function(result) |
|
|
elif result_function == None: |
|
|
new_result = None |
|
|
|
|
|
|
|
|
if verbose == 1: |
|
|
print("SQL command executed.") |
|
|
return new_result |
|
|
|
|
|
|
|
|
def run_command_generator(self,sqlcommand : str = None,arraysize:int =1000, datatuple : tuple =None,filename :str = None,verbose:int=1): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if sqlcommand == None and filename == None: |
|
|
print("Please input an SQL command or SQL filename.") |
|
|
else: |
|
|
if filename != None: |
|
|
with open(filename) as f: |
|
|
sqlcommand = f.read() |
|
|
try: |
|
|
with self.connection.cursor() as cursor: |
|
|
|
|
|
cursor.execute('set max_allowed_packet=67108864') |
|
|
cursor.execute(sqlcommand,datatuple) |
|
|
if verbose == 1: |
|
|
print("SQL command executed.") |
|
|
while True: |
|
|
results = cursor.fetchmany(arraysize) |
|
|
if not results: |
|
|
break |
|
|
for result in results: |
|
|
yield result |
|
|
except Exception as poe: |
|
|
print(f"{type(poe)} - {poe}") |
|
|
|
|
|
def sql_to_json(self,table,sqldata :tuple): |
|
|
|
|
|
columnsinfo = self.run_command(f"DESCRIBE {table}",self.fetch) |
|
|
columns = [col[0] for col in columnsinfo] |
|
|
|
|
|
final_json = [] |
|
|
for data in sqldata: |
|
|
record = {} |
|
|
for ind in range(len(data)): |
|
|
record.update({data[ind]: columns[ind]} ) |
|
|
final_json.append(record) |
|
|
|
|
|
return {table:final_json} |
|
|
@staticmethod |
|
|
def convert_to_blob(filename :str): |
|
|
|
|
|
with open(filename, 'rb') as file: |
|
|
blobData = file.read() |
|
|
return blobData |
|
|
@staticmethod |
|
|
def start_docker_db(verbose=1): |
|
|
|
|
|
|
|
|
|
|
|
dockercommand = 'docker run --name mysql -p 3306:3306 -v mysql_volume:/var/lib/mysql/ -d -e "MYSQL_ROOT_PASSWORD=temp123" mysql' |
|
|
process = subprocess.Popen(dockercommand.split(), |
|
|
stdout=subprocess.PIPE, |
|
|
stderr=subprocess.PIPE) |
|
|
stdout, stderr = process.communicate() |
|
|
if stderr != b"" and verbose == 1: |
|
|
print(stderr) |
|
|
elif stderr == b"" and verbose == 1: |
|
|
print(stdout) |
|
|
time.sleep(2) |
|
|
return stdout,stderr |
|
|
|
|
|
def test(): |
|
|
if __name__ == "__main__": |
|
|
|
|
|
caesarsql = CaesarSQL() |
|
|
|
|
|
parser = argparse.ArgumentParser( |
|
|
prog='ProgramName', |
|
|
description='What the program does', |
|
|
epilog='Text at the bottom of help') |
|
|
parser.add_argument('sqlcommand', |
|
|
help='SQL Command.') |
|
|
|
|
|
args = parser.parse_args() |
|
|
result = caesarsql.run_command(args.sqlcommand,caesarsql.fetch) |
|
|
print(result) |
|
|
if __name__ == "__main__": |
|
|
caesarsql = CaesarSQL() |
|
|
resultgen = caesarsql.run_command_generator("SELECT * FROM test WHERE firstname = 'Amhari';",arraysize=1000) |
|
|
print(resultgen) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|