Spaces:
Runtime error
Runtime error
File size: 2,948 Bytes
480ece8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | import os
import logging
from dotenv import load_dotenv
import databases
import sqlalchemy
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
# Load environment variables from .env file
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Load the DATABASE_URL from environment variables
# Explicitly load the .env file from the current project directory
ENV_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../.env")
load_dotenv(dotenv_path=ENV_PATH)
DATABASE_URL = os.getenv("DATABASE_URI")
if not DATABASE_URL:
logger.error("DATABASE_URI not found in environment variables.")
raise ValueError("DATABASE_URI not found in environment variables. Ensure it is set in the .env file.")
# Initialize the database connection and metadata
try:
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
logger.info("Database connection initialized successfully.")
except Exception as e:
logger.error("Failed to initialize database connection: %s", e)
raise
# Define rca_results table
rca_results = sqlalchemy.Table(
"rca_results",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True),
sqlalchemy.Column("filename", sqlalchemy.String, nullable=False),
sqlalchemy.Column("anomaly", sqlalchemy.Boolean, nullable=False),
sqlalchemy.Column("details", sqlalchemy.Text, nullable=True),
)
# Insert anomaly result into rca_results table
async def insert_rca_result(rca_result):
query = rca_results.insert().values(
filename=rca_result["filename"],
anomaly=rca_result["anomaly"],
details=rca_result.get("details", "")
)
await database.execute(query)
# Create async SQLAlchemy engine and session for PostgreSQL
ASYNC_DATABASE_URL = DATABASE_URL.replace('postgresql://', 'postgresql+asyncpg://')
engine = create_async_engine(ASYNC_DATABASE_URL, echo=False, future=True)
async_session = sessionmaker(
engine, expire_on_commit=False, class_=AsyncSession
)
# Helper function to initialize database
async def connect_to_database():
"""
Connects to the database when the application starts.
"""
try:
await database.connect()
logger.info("Successfully connected to the database.")
except Exception as e:
logger.error("Error connecting to the database: %s", e)
raise
# Helper function to disconnect database
async def disconnect_from_database():
"""
Disconnects from the database when the application shuts down.
"""
try:
await database.disconnect()
logger.info("Successfully disconnected from the database.")
except Exception as e:
logger.error("Error disconnecting from the database: %s", e)
raise |