File size: 2,301 Bytes
28af5bc e8a6b78 bf41ce7 49be943 28af5bc e8a6b78 28af5bc 74f7ac4 38d2586 e8a6b78 bf41ce7 e8a6b78 38d2586 1015a8a 38d2586 e8a6b78 74f7ac4 38d2586 bf41ce7 e8a6b78 4e54695 28af5bc e8a6b78 28af5bc 49be943 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | import json
import os
from typing import Any
import firebase_admin
from firebase_admin import db
from firebase_admin import credentials
from Brain.src.common.assembler import Assembler
from Brain.src.common.brain_exception import BrainException
from Brain.src.common.http_response_codes import responses
from Brain.src.common.utils import FIREBASE_STORAGE_BUCKET, FIREBASE_REALTIME_DATABASE
from Brain.src.logs import logger
from Brain.src.model.req_model import ReqModel
from Brain.src.model.requests.request_model import BasicReq
def initialize_app(setting: ReqModel) -> firebase_admin.App:
app_name = get_firebase_admin_name(setting.uuid)
# Check if the app is already initialized
try:
app = firebase_admin.get_app(app_name)
return app
# if app is not None:
# # Delete the existing app
# firebase_admin.delete_app(app)
except Exception as ex:
logger.warn(
title="firebase init",
message=f"this app name: {app_name} does not exist",
)
return firebase_admin.initialize_app(
get_firebase_cred(setting),
{
"storageBucket": FIREBASE_STORAGE_BUCKET,
"databaseURL": FIREBASE_REALTIME_DATABASE,
},
name=app_name,
)
def get_firebase_admin_name(uuid: str = ""):
return f"firebase_admin_{uuid}"
def firebase_admin_with_setting(data: BasicReq):
# firebase admin init
assembler = Assembler()
setting = assembler.to_req_model(data.confs)
try:
firebase_app = initialize_app(setting)
except Exception as ex:
raise BrainException(code=507, message=responses[507])
return setting, firebase_app
def get_firebase_cred(setting: ReqModel):
if os.path.exists("Brain/firebase_cred.json"):
file = open("Brain/firebase_cred.json")
cred = json.load(file)
file.close()
return credentials.Certificate(cred)
else:
cred = json.loads(setting.firebase_key)
return credentials.Certificate(cred)
"""
delete data from real time database of firebase using reference link
"""
def delete_data_from_realtime(
reference_link: str, firebase_app: firebase_admin.App
) -> None:
ref = db.reference(reference_link, app=firebase_app)
ref.delete()
|