Spaces:
Sleeping
Sleeping
File size: 5,885 Bytes
4457bcb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | import firebase_admin
from firebase_admin import credentials, firestore
import bcrypt
from datetime import datetime
class firebaseDB:
def __init__(self):
cred = credentials.Certificate("key.json")
if not firebase_admin._apps:
firebase_admin.initialize_app(cred)
self.db = firestore.client()
self.userid = None # Now this is the email
self.name = None
self.email = None
self.password = None
self.companyname = None
self.latest_form_id = None
self.form_count = None
def hash_password(self, password):
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
def new_user(self, name, email, password, companyname):
hashed_password = self.hash_password(password)
self.userid = email
self.name = name
self.email = email
self.password = hashed_password
self.companyname = companyname
self.latest_form_id = None
self.form_count = 0
self.db.collection("users").document(email).set({
"name": name,
"email": email,
"password": hashed_password,
"company_name": companyname,
"latest_form_id": None,
"form_count": 0
})
def get_user(self, email, password):
doc = self.db.collection("users").document(email).get()
if doc.exists:
data = doc.to_dict()
stored_hash = data.get("password")
if bcrypt.checkpw(password.encode('utf-8'), stored_hash.encode('utf-8')):
self.userid = email
self.name = data.get("name")
self.email = email
self.password = stored_hash
self.companyname = data.get("company_name")
self.latest_form_id = data.get("latest_form_id")
self.form_count = data.get("form_count")
return True
return False
def add_form(self, qa_dict):
if not self.userid:
print("User not loaded. Please authenticate first.")
return
user_ref = self.db.collection("users").document(self.userid)
user_doc = user_ref.get()
if not user_doc.exists:
print(f"User with ID '{self.userid}' does not exist. Form not added.")
return
# Get current form count
user_data = user_doc.to_dict()
current_count = user_data.get("form_count") or 0
new_form_id = f"form_{current_count + 1}"
# Add form data with date_created
self.db.collection("forms").document(new_form_id).set({
"userid": self.userid,
"formid": new_form_id,
"qa_responses": qa_dict,
"date_created": firestore.SERVER_TIMESTAMP # This will set the current time
})
# Update user metadata
user_ref.update({
"latest_form_id": new_form_id,
"form_count": current_count + 1
})
print(f"Form '{new_form_id}' added successfully for user '{self.userid}'")
def update_form(self, formid, updated_qa_dict):
if not self.userid:
print("User not loaded. Please authenticate first.")
return
form_ref = self.db.collection("forms").document(formid)
form_doc = form_ref.get()
if not form_doc.exists:
print(f"Form with ID '{formid}' does not exist.")
return
form_data = form_doc.to_dict()
if form_data.get("userid") != self.userid:
print(f"Form ID '{formid}' does not belong to user '{self.userid}'.")
return
form_ref.update({
"qa_responses": updated_qa_dict
})
print(f"Form '{formid}' successfully updated for user '{self.userid}'")
def get_forms_by_user(self, userid):
forms_ref = self.db.collection("forms")
query = forms_ref.where("userid", "==", userid)
results = query.stream()
forms = []
for doc in results:
form_data = doc.to_dict()
forms.append(form_data)
return forms
def delete_form_by_user_and_formid(self, user_id, form_id):
# Check if user exists
user_ref = self.db.collection("users").document(user_id)
user_doc = user_ref.get()
if not user_doc.exists:
print(f"User with ID '{user_id}' does not exist. Cannot delete form.")
return
# Query for the form to delete
forms_ref = self.db.collection("forms")
query = forms_ref.where("userid", "==", user_id).where("formid", "==", form_id)
results = query.stream()
found = False
for doc in results:
doc.reference.delete()
print(f"Deleted form with ID: {doc.id}")
found = True
if not found:
print(f"No form found for user '{user_id}' with form_id '{form_id}'")
def delete_all_user_data(self, user_id):
# 1. Delete all forms for the user
forms_ref = self.db.collection("forms")
form_query = forms_ref.where("userid", "==", user_id)
form_results = form_query.stream()
form_count = 0
for doc in form_results:
doc.reference.delete()
form_count += 1
print(f"Deleted {form_count} form(s) for user '{user_id}'")
# 2. Delete user document
user_ref = self.db.collection("users").document(user_id)
if user_ref.get().exists:
user_ref.delete()
print(f"Deleted user '{user_id}'")
else:
print(f"User '{user_id}' not found") |