File size: 4,513 Bytes
4a2ab42
 
 
2a5f5af
4a2ab42
 
 
 
 
2a5f5af
4a2ab42
2a5f5af
4a2ab42
 
 
2a5f5af
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2a5f5af
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2a5f5af
4a2ab42
 
 
2a5f5af
 
4a2ab42
 
2a5f5af
 
 
 
 
4a2ab42
 
 
 
2a5f5af
 
 
4a2ab42
 
 
 
 
2a5f5af
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""
Base Models and Enums

Contains: base model class, enums, and utility functions
used across all database models.
"""

import os
from datetime import datetime
from enum import Enum

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from core.logging import logger
from core.security.encryption import EncryptedString

# Encrypted field types

# Create base class
Base = declarative_base()


# Utility functions
def utc_now():
    return datetime.utcnow()


# Enum classes
class CaseStatus(str, Enum):
    OPEN = "OPEN"
    INVESTIGATING = "INVESTIGATING"
    PENDING_REVIEW = "PENDING_REVIEW"
    ESCALATED = "ESCALATED"
    CLOSED = "CLOSED"
    ARCHIVED = "ARCHIVED"


class CasePriority(str, Enum):
    LOW = "LOW"
    MEDIUM = "MEDIUM"
    HIGH = "HIGH"
    CRITICAL = "CRITICAL"


class CaseType(str, Enum):
    MONEY_LAUNDERING = "MONEY_LAUNDERING"
    FRAUD_SUSPECTED = "FRAUD_SUSPECTED"
    IDENTITY_THEFT = "IDENTITY_THEFT"
    ACCOUNT_TAKEOVER = "ACCOUNT_TAKEOVER"
    WIRE_FRAUD = "WIRE_FRAUD"
    CHECK_FRAUD = "CHECK_FRAUD"
    CARD_FRAUD = "CARD_FRAUD"


class UserRole(str, Enum):
    ADMIN = "ADMIN"
    MANAGER = "MANAGER"
    INVESTIGATOR = "INVESTIGATOR"
    ANALYST = "ANALYST"
    AUDITOR = "AUDITOR"
    VIEWER = "VIEWER"
    USER = "USER"


class ReconciliationType(str, Enum):
    EXACT = "EXACT"
    FUZZY = "FUZZY"
    MANUAL = "MANUAL"
    REJECTED = "REJECTED"


# Database setup functions
def get_database_url():
    """Get database URL from settings or fallback to SQLite"""
    from core.config import settings

    # Priority 1: Settings/Env Variable (Postgres support)
    if hasattr(settings, "DATABASE_URL") and settings.DATABASE_URL and "sqlite" not in settings.DATABASE_URL:
        return settings.DATABASE_URL

    # Priority 2: Local SQLite Default
    app_data_dir = os.path.expanduser("~/.zenith")
    os.makedirs(app_data_dir, exist_ok=True)
    return f"sqlite:///{app_data_dir}/fraud_detection.db"


def create_engine_and_session():
    """Create database engine and session with connection pooling"""
    from sqlalchemy.pool import QueuePool, StaticPool

    db_url = get_database_url()
    connect_args = {}
    poolclass = QueuePool

    if "sqlite" in db_url:
        connect_args = {"check_same_thread": False}
        if ":memory:" in db_url:
            # Use StaticPool for in-memory SQLite to share the same database
            # across all connections in the pool
            poolclass = StaticPool
            logger.info("Using StaticPool for in-memory SQLite database")

    engine = create_engine(
        db_url,
        echo=False,
        poolclass=poolclass,
        pool_size=20 if poolclass != StaticPool else None,
        max_overflow=30 if poolclass != StaticPool else None,
        pool_timeout=60,
        pool_recycle=1800,  # Recycle connections every 30 minutes
        pool_pre_ping=True,  # Check connection health before use
        connect_args=connect_args,  # Needed for SQLite with pooling
    )
    session_local = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    return engine, session_local


# Session management
engine, SessionLocal = create_engine_and_session()


def get_db():
    """Dependency for FastAPI to get database session"""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


# Security hardening: Parameterized query enforcement
def secure_query_execution(query_template: str, params: dict) -> str:
    """Execute parameterized queries to prevent SQL injection"""
    try:
        from sqlalchemy import text

        # Use SQLAlchemy text() for safe parameter binding
        text(query_template)
        # Implementation would use session.execute(safe_query, params)
        return "Query executed safely"
    except Exception as e:
        from core.logging import logger

        logger.error(f"Secure query execution failed: {e!s}")
        raise


def create_tables():
    """Create all database tables"""
    engine, _ = create_engine_and_session()
    Base.metadata.create_all(bind=engine)


# Export all base components
__all__ = [
    "Base",
    "utc_now",
    "EncryptedString",
    # Enums
    "CaseStatus",
    "CasePriority",
    "CaseType",
    "UserRole",
    "ReconciliationType",
    # Database setup
    "get_database_url",
    "create_engine_and_session",
    "engine",
    "SessionLocal",
    "get_db",
    "secure_query_execution",
    "create_tables",
]