janshakti-backend / database.py
varunka's picture
backend restart
0e66dad
import logging
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker, DeclarativeBase
from sqlalchemy.exc import OperationalError
from config import (
DATABASE_URL,
DEFAULT_LEADER_NAME,
DEFAULT_LEADER_EMAIL,
DEFAULT_LEADER_PASSWORD,
DEFAULT_AUTHORITY_NAME,
DEFAULT_AUTHORITY_EMAIL,
DEFAULT_AUTHORITY_PASSWORD,
DEFAULT_ADMIN_NAME,
DEFAULT_ADMIN_EMAIL,
DEFAULT_ADMIN_PASSWORD,
)
logger = logging.getLogger(__name__)
FALLBACK_SQLITE_URL = "sqlite:///./janshakti.db"
def _build_engine(url: str):
"""Create a SQLAlchemy engine for the given URL."""
is_sqlite = url.startswith("sqlite")
kwargs = {"pool_pre_ping": True}
if is_sqlite:
kwargs["connect_args"] = {"check_same_thread": False}
else:
kwargs["connect_args"] = {"connect_timeout": 10}
kwargs["pool_recycle"] = 300
kwargs["pool_size"] = 5
kwargs["max_overflow"] = 10
return create_engine(url, **kwargs), is_sqlite
# --- Build initial engine ---------------------------------------------------
_active_url = DATABASE_URL
engine, IS_SQLITE = _build_engine(_active_url)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
class Base(DeclarativeBase):
pass
def _switch_to_sqlite():
"""Fall back to a local SQLite database when PostgreSQL is unreachable."""
global engine, SessionLocal, IS_SQLITE, _active_url # noqa: PLW0603
print("[DB] ⚠️ PostgreSQL unreachable — falling back to local SQLite database")
_active_url = FALLBACK_SQLITE_URL
engine, IS_SQLITE = _build_engine(FALLBACK_SQLITE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def _ensure_user_columns_sqlite():
if not IS_SQLITE:
return
with engine.connect() as conn:
result = conn.execute(text("PRAGMA table_info(users)"))
columns = [row[1] for row in result.fetchall()]
if "state" not in columns:
conn.execute(text("ALTER TABLE users ADD COLUMN state VARCHAR"))
if "district" not in columns:
conn.execute(text("ALTER TABLE users ADD COLUMN district VARCHAR"))
if "role" not in columns:
conn.execute(text("ALTER TABLE users ADD COLUMN role VARCHAR NOT NULL DEFAULT 'citizen'"))
if "is_active" not in columns:
conn.execute(text("ALTER TABLE users ADD COLUMN is_active BOOLEAN NOT NULL DEFAULT 1"))
if "last_login_at" not in columns:
conn.execute(text("ALTER TABLE users ADD COLUMN last_login_at DATETIME"))
conn.commit()
def _ensure_user_columns_postgres():
if IS_SQLITE:
return
required_columns = {
"state": "VARCHAR",
"district": "VARCHAR",
"role": "VARCHAR NOT NULL DEFAULT 'citizen'",
"is_active": "BOOLEAN NOT NULL DEFAULT TRUE",
"last_login_at": "TIMESTAMP WITH TIME ZONE",
}
with engine.connect() as conn:
result = conn.execute(text(
"SELECT column_name FROM information_schema.columns "
"WHERE table_name = 'users'"
))
existing = {row[0] for row in result.fetchall()}
for name, col_type in required_columns.items():
if name not in existing:
conn.execute(text(f"ALTER TABLE users ADD COLUMN {name} {col_type}"))
print(f"[DB] Added missing column: users.{name}")
conn.commit()
def _ensure_complaint_workflow_columns():
if not IS_SQLITE:
return
required_columns = {
"ai_risk_score": "FLOAT",
"ai_risk_level": "VARCHAR",
"ai_risk_factors": "TEXT",
"ai_risk_reasoning": "TEXT",
"ai_leader_brief": "TEXT",
"citizen_user_id": "INTEGER",
"citizen_language": "VARCHAR",
"image_path": "VARCHAR",
"audio_path": "VARCHAR",
"assigned_authority": "VARCHAR",
"authority_email": "VARCHAR",
"leader_note": "TEXT",
"authority_response": "TEXT",
"citizen_update": "TEXT",
"ai_breakdown": "TEXT",
"ai_explanation": "TEXT",
"ai_model_version": "VARCHAR",
"before_meta": "TEXT",
"after_meta": "TEXT",
"verification_score": "FLOAT",
"verification_confidence": "FLOAT",
}
with engine.connect() as conn:
result = conn.execute(text("PRAGMA table_info(complaints)"))
columns = [row[1] for row in result.fetchall()]
for name, col_type in required_columns.items():
if name not in columns:
conn.execute(text(f"ALTER TABLE complaints ADD COLUMN {name} {col_type}"))
conn.commit()
def _seed_default_leader():
from models.user import User
from routers.auth import hash_password
db = SessionLocal()
try:
existing = db.query(User).filter(User.email == DEFAULT_LEADER_EMAIL).first()
if existing:
if existing.role != "leader":
existing.role = "leader"
db.commit()
return
leader = User(
name=DEFAULT_LEADER_NAME,
email=DEFAULT_LEADER_EMAIL,
phone=None,
role="leader",
hashed_password=hash_password(DEFAULT_LEADER_PASSWORD),
)
db.add(leader)
db.commit()
print(f"[Auth] Default leader ready: {DEFAULT_LEADER_EMAIL}")
finally:
db.close()
def _seed_default_authority():
from models.user import User
from routers.auth import hash_password
db = SessionLocal()
try:
existing = db.query(User).filter(User.email == DEFAULT_AUTHORITY_EMAIL).first()
if existing:
if existing.role != "authority":
existing.role = "authority"
db.commit()
return
authority = User(
name=DEFAULT_AUTHORITY_NAME,
email=DEFAULT_AUTHORITY_EMAIL,
phone=None,
role="authority",
hashed_password=hash_password(DEFAULT_AUTHORITY_PASSWORD),
)
db.add(authority)
db.commit()
print(f"[Auth] Default authority ready: {DEFAULT_AUTHORITY_EMAIL}")
finally:
db.close()
def _seed_default_admin():
from models.user import User
from routers.auth import hash_password
db = SessionLocal()
try:
existing = db.query(User).filter(User.email == DEFAULT_ADMIN_EMAIL).first()
if existing:
changed = False
if existing.role != "admin":
existing.role = "admin"
changed = True
if existing.is_active is False:
existing.is_active = True
changed = True
if changed:
db.commit()
return
admin = User(
name=DEFAULT_ADMIN_NAME,
email=DEFAULT_ADMIN_EMAIL,
phone=None,
role="admin",
hashed_password=hash_password(DEFAULT_ADMIN_PASSWORD),
is_active=True,
)
db.add(admin)
db.commit()
print(f"[Auth] Default admin ready: {DEFAULT_ADMIN_EMAIL}")
finally:
db.close()
def init_db():
"""Initialize database tables. If PostgreSQL is unreachable,
automatically fall back to a local SQLite database so the app
can still start and serve requests."""
try:
Base.metadata.create_all(bind=engine)
except (OperationalError, Exception) as exc:
if not IS_SQLITE:
print(f"[DB] PostgreSQL connection failed: {exc}")
_switch_to_sqlite()
# Retry with SQLite
Base.metadata.create_all(bind=engine)
else:
raise
try:
if IS_SQLITE:
_ensure_user_columns_sqlite()
_ensure_complaint_workflow_columns()
else:
_ensure_user_columns_postgres()
_ensure_pg_complaint_columns()
_seed_default_admin()
_seed_default_leader()
_seed_default_authority()
print(f"[DB] ✅ Database initialized successfully (using {'SQLite' if IS_SQLITE else 'PostgreSQL'})")
except Exception as exc:
print(f"[DB] Warning — seeding/migration issue (non-fatal): {exc}")
def _ensure_pg_complaint_columns():
"""Add any missing columns to the complaints table in PostgreSQL."""
required_columns = {
"ai_risk_score": "FLOAT",
"ai_risk_level": "VARCHAR",
"ai_risk_factors": "TEXT",
"ai_risk_reasoning": "TEXT",
"ai_leader_brief": "TEXT",
"citizen_language": "VARCHAR",
"image_path": "VARCHAR",
"audio_path": "VARCHAR",
"assigned_authority": "VARCHAR",
"authority_email": "VARCHAR",
"leader_note": "TEXT",
"authority_response": "TEXT",
"citizen_update": "TEXT",
"ai_breakdown": "TEXT",
"ai_explanation": "TEXT",
"ai_model_version": "VARCHAR",
"before_meta": "TEXT",
"after_meta": "TEXT",
"verification_score": "FLOAT",
"verification_confidence": "FLOAT",
}
with engine.connect() as conn:
result = conn.execute(text(
"SELECT column_name FROM information_schema.columns "
"WHERE table_name = 'complaints'"
))
existing = {row[0] for row in result.fetchall()}
for name, col_type in required_columns.items():
if name not in existing:
conn.execute(text(f"ALTER TABLE complaints ADD COLUMN {name} {col_type}"))
print(f"[DB] Added missing column: complaints.{name}")
conn.commit()