palondomus's picture
testing huggingface setup because too big for google cloud at 9GB
0b4e916
from dotenv import load_dotenv
load_dotenv()
import time
import json
from typing import Any, Callable, Union
#import mysql.connector
import subprocess
import os
import MySQLdb
import argparse
import base64
class CaesarSQL:
def __init__(self,host:str='localhost',user:str='root',password:str="temp123") -> None:
# Makes SQL connection to remote server.
self.dbdata = {"host": base64.b64decode(os.getenv("HOST")).decode(),
"user":base64.b64decode(os.getenv("USERNAMESQL")).decode(),
"passwd": base64.b64decode(os.getenv("PASSWORD")).decode(),
"db": base64.b64decode(os.getenv("DATABASE")).decode(),
"autocommit" : True,
"ssl_mode" : "VERIFY_IDENTITY",
"ssl" : {
"ca": "/etc/ssl/certs/ca-certificates.crt"
},}
self.connection = MySQLdb.connect(
host= self.dbdata["host"],
user=self.dbdata["user"],
passwd= self.dbdata["passwd"],
db= self.dbdata["db"],
autocommit = self.dbdata["autocommit"],
ssl_mode = self.dbdata["ssl_mode"],
ssl = self.dbdata["ssl"]
)# "/etc/ssl/cert.pem"
#self.connection = mysql.connector.connect(
#host=host,
#user=user,
#password = password,
#)
def reset_connection(self):
self.connection = MySQLdb.connect(
host= self.dbdata["host"],
user=self.dbdata["user"],
passwd= self.dbdata["passwd"],
db= self.dbdata["db"],
autocommit = self.dbdata["autocommit"],
ssl_mode = self.dbdata["ssl_mode"],
ssl = self.dbdata["ssl"]
)
def check_exists(self,result :Any):
# Checks if an entity exists from an SQL Command.
try:
if len(result) == 0:
return False
else:
return True
except Exception as poe:
return False
def load_json_file(self,filename:str):
# Loads json file
with open(filename) as f:
datajson = json.load(f)
return datajson
def fetch(self,result:Any):
# Callback function that fetches data after an SQL command is run self.runcommand("SELECT * FROM names WHERE name LIKE 'bill'",self.fetch)
return result
def json_to_sql(self,datajson :Union[dict,list]):
# Converts JSON to SQL.
if type(datajson) == list:
columns = str(tuple(datajson[0].keys())).replace("'","")
values = str(tuple(tuple(data.values()) for data in datajson))[1:-1]
return columns,values
elif type(datajson) == dict:
columns = str(tuple(datajson.keys())).replace("'","")
values = str(tuple(datajson.values())).replace("'","")
return columns,values
else:
print("JSON is invalid data shape.")
return None,None
def executeScriptsFromFile(self,filename):
fd = open(filename, 'r')
sqlFile = fd.read()
fd.close()
sqlCommands = sqlFile.split(';')
print(sqlCommands)
with self.connection.cursor() as cursor:
for command in sqlCommands:
try:
if command.strip() != '':
print(command)
cursor.execute(command.replace("\n","").replace("\n",""))
except IOError as ex:
print ("Command skipped: ", type(ex),ex )
def run_command(self,sqlcommand : str = None,result_function : Callable =None,datatuple : tuple =None,filename :str = None,verbose:int=1):
# Executes SQL Command or takes SQL file as input.
#if verbose == 1:
#if self.connection.is_connected():
# db_Info = self.connection.get_server_info()
# print("Connected to MySQL Server version ", db_Info)
if sqlcommand == None and filename == None:
print("Please input an SQL command or SQL filename.")
else:
if filename != None:
with open(filename) as f:
sqlcommand = f.read()
with self.connection.cursor() as cursor:
#print(datatuple)
cursor.execute(sqlcommand,datatuple)
result = cursor.fetchall()
if result_function != None:
new_result = result_function(result)
elif result_function == None:
new_result = None
#self.connection.commit()
if verbose == 1:
print("SQL command executed.")
return new_result
def run_command_generator(self,sqlcommand : str = None,arraysize:int =1000, datatuple : tuple =None,filename :str = None,verbose:int=1):
# Executes SQL Command or takes SQL file as input.
#if verbose == 1:
#if self.connection.is_connected():
# db_Info = self.connection.get_server_info()
# print("Connected to MySQL Server version ", db_Info)
if sqlcommand == None and filename == None:
print("Please input an SQL command or SQL filename.")
else:
if filename != None:
with open(filename) as f:
sqlcommand = f.read()
try:
with self.connection.cursor() as cursor:
#print(datatuple)
cursor.execute('set max_allowed_packet=67108864')
cursor.execute(sqlcommand,datatuple)
if verbose == 1:
print("SQL command executed.")
while True:
results = cursor.fetchmany(arraysize)
if not results:
break
for result in results:
yield result
except Exception as poe:
print(f"{type(poe)} - {poe}")
def sql_to_json(self,table,sqldata :tuple):
# Convert SQL tuple to json
columnsinfo = self.run_command(f"DESCRIBE {table}",self.fetch)
columns = [col[0] for col in columnsinfo]
#print(sqldata)
final_json = []
for data in sqldata:
record = {}
for ind in range(len(data)):
record.update({data[ind]: columns[ind]} )
final_json.append(record)
return {table:final_json}
@staticmethod
def convert_to_blob(filename :str):
# Convert digital data to binary format
with open(filename, 'rb') as file:
blobData = file.read()
return blobData
@staticmethod
def start_docker_db(verbose=1):
# Start the docker database
# Run the database commands - "docker exec -it mysql mysql -p"
# https://mothishdeenadayalan.medium.com/containerizing-a-python-app-mysql-python-docker-1ce64e444ed9
dockercommand = 'docker run --name mysql -p 3306:3306 -v mysql_volume:/var/lib/mysql/ -d -e "MYSQL_ROOT_PASSWORD=temp123" mysql'
process = subprocess.Popen(dockercommand.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
if stderr != b"" and verbose == 1:
print(stderr)
elif stderr == b"" and verbose == 1:
print(stdout)
time.sleep(2)
return stdout,stderr
def test():
if __name__ == "__main__":
#CaesarSQL.start_docker_db()
caesarsql = CaesarSQL()
#val = caesarsql.run_command("DROP TABLE customers",result_function=caesarsql.fetch)
parser = argparse.ArgumentParser(
prog='ProgramName',
description='What the program does',
epilog='Text at the bottom of help')
parser.add_argument('sqlcommand',
help='SQL Command.')
args = parser.parse_args()
result = caesarsql.run_command(args.sqlcommand,caesarsql.fetch)
print(result)
if __name__ == "__main__":
caesarsql = CaesarSQL()
resultgen = caesarsql.run_command_generator("SELECT * FROM test WHERE firstname = 'Amhari';",arraysize=1000)
print(resultgen)
#for result in resultgen:
# print(result)