Spaces:
Build error
Build error
File size: 4,884 Bytes
a87329f 752c636 72de35d 752c636 a87329f 752c636 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | import os
import io
import base64
import uuid
from PIL import Image
import mysql.connector
def generate_random_id():
return str(uuid.uuid1())
def image_to_base64(img):
img=Image.fromarray(img[:,:,:3])
rawBytes=io.BytesIO()
img.save(rawBytes,"JPEG")
rawBytes.seek(0)
img_base64=base64.b64encode(rawBytes.read()).decode()
return img_base64
def base64_to_image(img_base64):
# Assuming base64_str is the string value without 'data:image/jpeg;base64,'
img = Image.open(io.BytesIO(base64.decodebytes(bytes(img_base64, "utf-8"))))
return np.array(img)
def access_database_as_admin():
return mysql.connector.connect(host="bnadwttldj2i5cq9aymp-mysql.services.clever-cloud.com",
user="uvfqvcypihhznd2u",
port=3306,
password=os.environ["mysql_password"],
database="bnadwttldj2i5cq9aymp")
def create_user_table(username):
dataBase=access_database_as_admin()
cursor=dataBase.cursor()
query=f"""create table if not exists user_{username}(
person_id varchar(30) unique not null,
face_vectors blob not null,
remarks text not null,
group_id char(30)
);"""
cursor.execute(query)
dataBase.commit()
dataBase.close()
def drop_user_table(username):
dataBase=access_database_as_admin()
cursor=dataBase.cursor()
query=f"drop table if exists user_{username};"
cursor.execute(query)
dataBase.commit()
dataBase.close()
def add_row_user_table(username,person_id,face_vectors,remarks,group_id=None):
"""
person_id:str
face_vectors:np.array shape:(n,128)
remarks:np.array shape:(n,)
group_id:str
"""
# print(face_vectors.tobytes())
# print(remarks.tobytes())
# print(np.frombuffer(remarks.tobytes(), dtype="float64"))
# pass
dataBase=access_database_as_admin()
cursor=dataBase.cursor()
cursor.execute(f"select person_id from user_{username} where person_id=%s;",[person_id])
if cursor.fetchone() is None:
if group_id is not None:
query=f"insert into user_{username}(person_id,face_vectors,remarks,group_id) values(%s,%s,%s,%s);"
cursor.execute(query,[person_id,face_vectors.tobytes(),remarks,group_id])
else:
query=f"insert into user_{username}(person_id,face_vectors,remarks) values(%s,%s,%s);"
cursor.execute(query,[person_id,face_vectors.tobytes(),remarks])
else:
if group_id is not None:
query=f"update user_{username} SET face_vectors=%s,remarks=%s,group_id=%s where person_id=%s;"
cursor.execute(query,[face_vectors.tobytes(),remarks,group_id,person_id])
else:
query=f"update user_{username} SET face_vectors=%s,remarks=%s where person_id=%s;"
cursor.execute(query,[face_vectors.tobytes(),remarks,person_id])
dataBase.commit()
dataBase.close()
def read_row_user_table(username):
dataBase=access_database_as_admin()
cursor=dataBase.cursor()
query=f"select person_id,face_vectors,remarks,group_id from user_{username};"
cursor.execute(query)
data=list(cursor.fetchone())
data[1]=np.frombuffer(data[1],dtype="float64")
data[2]=data[2].split(",")
data[1]=data[1].reshape(len(data[2]),-1)
print(data)
# dataBase.commit()
dataBase.close()
def read_user_table(username,split_remarks=True,group_id=None):
dataBase=access_database_as_admin()
cursor=dataBase.cursor()
if group_id is None:
cursor.execute(f"select person_id,face_vectors,remarks,group_id from user_{username};")
else:
cursor.execute(f"select person_id,face_vectors,remarks,group_id from user_{username} where group_id=%s;",[group_id])
data={column_name:[] for column_name in cursor.column_names}
for row in cursor.fetchall():
row=list(row)
row[1]=np.frombuffer(row[1],dtype="float64")
row[2]=row[2].split(",")
data['person_id'].append(row[0])
data['face_vectors'].append(row[1].reshape(len(row[2]),-1))
if not split_remarks:
row[2]=",".join(row[2])
data['remarks'].append(row[2])
data['group_id'].append(row[3])
dataBase.close()
# print(data['person_id'])
# print(data['face_vectors'])
return data
def remove_person_from_user_table(username,person_id):
dataBase=access_database_as_admin()
cursor=dataBase.cursor()
query=f"delete from user_{username} where person_id=%s;"
cursor.execute(query,[person_id])
dataBase.commit()
dataBase.close()
import numpy as np
# add_row_user_table("abc","1",np.array([[2,3],[1,4]],dtype="float64"),"front face,left face,right face",group_id="1")
# read_user_table("abc")
# print(np.fromstring("a,b,c",dtype='S3'))
# drop_user_table("abc")
# create_user_table("abc") |