MohitRajput45's picture
Upload 21 files
97ac315 verified
import os
import pandas as pd
from sqlalchemy import create_engine, Column, Integer, Float
from sqlalchemy.orm import declarative_base, sessionmaker
from dotenv import load_dotenv
# Load environment variables (.env)
load_dotenv()
# --- NEW: Cloud Database Connection via SQLAlchemy ---
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../"))
DB_PATH = os.path.join(BASE_DIR, "data", "fraud.db")
DATABASE_URL = f"sqlite:///{DB_PATH}"
# Initialize Engine and Session
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# 🔹 Define the schema for the transactions table
class TransactionLog(Base):
__tablename__ = "transactions"
id = Column(Integer, primary_key=True, index=True)
Time = Column(Float)
V1 = Column(Float); V2 = Column(Float); V3 = Column(Float); V4 = Column(Float)
V5 = Column(Float); V6 = Column(Float); V7 = Column(Float); V8 = Column(Float)
V9 = Column(Float); V10 = Column(Float); V11 = Column(Float); V12 = Column(Float)
V13 = Column(Float); V14 = Column(Float); V15 = Column(Float); V16 = Column(Float)
V17 = Column(Float); V18 = Column(Float); V19 = Column(Float); V20 = Column(Float)
V21 = Column(Float); V22 = Column(Float); V23 = Column(Float); V24 = Column(Float)
V25 = Column(Float); V26 = Column(Float); V27 = Column(Float); V28 = Column(Float)
Amount = Column(Float)
prediction = Column(Integer)
probability = Column(Float)
# 🔥 HUMAN-IN-THE-LOOP UPGRADE: The true label provided by a human analyst later
Actual_Class = Column(Integer, nullable=True)
def init_db():
"""Create tables if they do not exist."""
Base.metadata.create_all(bind=engine)
def save_to_db(data: dict, pred: int, prob: float):
db = SessionLocal()
try:
new_tx = TransactionLog(**data, prediction=pred, probability=prob)
db.add(new_tx)
db.commit()
except Exception as e:
# 🔥 THE FIX: If anything goes wrong, roll back the transaction so the DB doesn't freeze
db.rollback()
print(f"⚠️ Database insertion failed: {e}")
finally:
db.close()
def load_data_from_db():
"""Load all database records into a Pandas DataFrame for Retraining/Drift detection."""
return pd.read_sql("SELECT * FROM transactions", engine)