File size: 7,830 Bytes
0b4e916 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
from CaesarSQLDB.caesarsql import CaesarSQL
class CaesarCRUD:
def __init__(self) -> None:
self.caesarsql = CaesarSQL()
def create_table(self,primary_key:str,fields:tuple,types :tuple,table: str):
if type(fields) == tuple:
fieldlist = [f"{field} {typestr}"for field,typestr in zip(fields,types)]
fieldstr = ', '.join(fieldlist)
result = self.caesarsql.run_command(f"CREATE TABLE IF NOT EXISTS {table} ({primary_key} int NOT NULL AUTO_INCREMENT,{fieldstr}, PRIMARY KEY ({primary_key}) );",self.caesarsql.fetch)
if result == ():
return {"message":f"{table} table was created."}
else:
return {"error":f"error table was not created.","error":result}
else:
fieldstr = f"{fields} {types}"
result = self.caesarsql.run_command(f"CREATE TABLE IF NOT EXISTS {table} ({primary_key} int NOT NULL AUTO_INCREMENT,{fieldstr}, PRIMARY KEY ({primary_key}) );",self.caesarsql.fetch)
if result == ():
return {"message":f"{table} table was created."}
def post_data(self,fields:tuple,values:tuple,table:str):
valuestr= str(tuple("%s" for i in values)).replace("'","",100)
fieldstr = str(tuple(i for i in fields)).replace("'","",100)
if len(fields) == 1:
fieldstr = fieldstr.replace(",","",100)
valuestr = valuestr.replace(",","",100)
#print(f"INSERT INTO {table} {fieldstr} VALUES {valuestr};")
result = self.caesarsql.run_command(f"INSERT INTO {table} {fieldstr} VALUES {valuestr};",self.caesarsql.fetch,datatuple=values)
if result == ():
return True
else:
return False
def tuple_to_json(self,fields:tuple,result:tuple):
if type(result[0]) == tuple:
final_result = []
for entry in result:
entrydict = dict(zip(fields,entry))
final_result.append(entrydict)
return final_result
elif type(result[0]) == str:
final_result = dict(zip(fields,result))
return final_result
def json_to_tuple(self,json:dict):
keys = tuple(json.keys())
values = tuple(json.values())
return keys,values
def get_data(self,fields:tuple,table:str,condition=None,getamount:int=1000):
if len(fields) != 1:
fieldlist = [f"{field}" for field in fields]
fieldstr = ', '.join(fieldlist)
else:
fieldstr = fields[0]
#fieldstr = fieldstr.replace(", ","",100)
if condition:
#print(f"""SELECT {fieldstr} FROM {table} WHERE {condition};""")
result = self.caesarsql.run_command(f"""SELECT {fieldstr} FROM {table} WHERE {condition} LIMIT {str(getamount)};""",self.caesarsql.fetch)
if result == ():
return False
elif result != () and type(result) == tuple:
result = self.tuple_to_json(fields,result)
return result
else:
return {"error":"error syntax error.","error":result}
else:
result = self.caesarsql.run_command(f"""SELECT {fieldstr} FROM {table} LIMIT {str(getamount)};""",self.caesarsql.fetch)
if result == ():
return False
elif result != () and type(result) == tuple:
result = self.tuple_to_json(fields,result)
return result
else:
return {"error":"error syntax error.","error":result}
def get_large_data(self,fields:tuple,table:str,condition=None):
if len(fields) != 1:
fieldlist = [f"{field}" for field in fields]
fieldstr = ', '.join(fieldlist)
else:
fieldstr = fields[0]
#fieldstr = fieldstr.replace(", ","",100)
if condition:
#print(f"""SELECT {fieldstr} FROM {table} WHERE {condition};""")
result = self.caesarsql.run_command_generator(f"""SELECT {fieldstr} FROM {table} WHERE {condition};""")
return result
else:
result = self.caesarsql.run_command_generator(f"""SELECT {fieldstr} FROM {table};""")
return result
def update_data(self,fieldstoupdate:tuple,values:tuple,table=str,condition=str):
if len(fieldstoupdate) > 1:
updatelist = []
for field,value in zip(fieldstoupdate,values):
if type(value) != str:
fieldstr = f"{field} = {value}"
updatelist.append(fieldstr)
else:
fieldstr = f"{field} = '{value}'"
updatelist.append(fieldstr)
updatestr = ', '.join(updatelist)
result = self.caesarsql.run_command(f"UPDATE {table} SET {updatestr} WHERE {condition};",self.caesarsql.fetch)
if result == ():
return True
else:
return False
else:
if type(values[0]) != str:
updatestr = f"{fieldstoupdate[0]} = {values[0]}"
else:
updatestr = f"{fieldstoupdate[0]} = '{values[0]}'"
result = self.caesarsql.run_command(f"UPDATE {table} SET {updatestr} WHERE {condition};",self.caesarsql.fetch)
if result == ():
return True
else:
return False
def update_blob(self,fieldtoupdate:str,value:str,table=str,condition=str):
updatestr = "UPDATE %s SET %s = x'%s' WHERE %s;" % (table,fieldtoupdate,value.hex(),condition)
result = self.caesarsql.run_command(updatestr,self.caesarsql.fetch)
if result == ():
return True
else:
return False
def delete_data(self,table:str,condition:str):
result = self.caesarsql.run_command(f"DELETE FROM {table} WHERE {condition};",self.caesarsql.fetch)
if result == ():
return True
else:
return False
def check_exists(self,fields:tuple,table:str,condition=None):
if len(fields) != 1:
fieldlist = [f"{field}" for field in fields]
fieldstr = ', '.join(fieldlist)
else:
fieldstr = fields[0]
#fieldstr = fieldstr.replace(", ","",100)
if condition:
#print(f"""SELECT {fieldstr} FROM {table} WHERE {condition};""")
result = self.caesarsql.run_command(f"""SELECT {fieldstr} FROM {table} WHERE {condition};""",self.caesarsql.check_exists)
if result == True or result == False:
return result
else:
return {"message":"syntax error or table doesn't exist.","error":result}
else:
result = self.caesarsql.run_command(f"""SELECT {fieldstr} FROM {table};""",self.caesarsql.check_exists)
if result == True or result == False:
return result
else:
return {"message":"syntax error or table doesn't exist.","error":result}
if __name__ == "__main__":
caesarcrud = CaesarCRUD()
#datas = [("Amari","Lawal"),("Billy","Bob"),("Bobby","Jim"),("Loggy","Lag")]
fields = ("Amari","Lawal")
#fieldsupdate = tuple(fields.keys())
#valuesupdate = tuple(fields.values())
fields = ("Bioll")
print(fields)
#caesarcrud.create_table("testid",fields,("varchar(255)","varchar(255)"),"test")
#for data in datas:
# caesarcrud.post_data(fields,data,"test")
#resultgen = caesarcrud.get_large_data(("*"),"test","firstname = 'Billy'")
#for result in resultgen:
# print(result)
#val = tuple(["hi"])
#print(val)
#caesarcrud.post_data()
|