Spaces:
Running
Running
| import logging | |
| from sqlalchemy import create_engine, text | |
| from sqlalchemy.orm import sessionmaker, DeclarativeBase | |
| from sqlalchemy.exc import OperationalError | |
| from config import ( | |
| DATABASE_URL, | |
| DEFAULT_LEADER_NAME, | |
| DEFAULT_LEADER_EMAIL, | |
| DEFAULT_LEADER_PASSWORD, | |
| DEFAULT_AUTHORITY_NAME, | |
| DEFAULT_AUTHORITY_EMAIL, | |
| DEFAULT_AUTHORITY_PASSWORD, | |
| DEFAULT_ADMIN_NAME, | |
| DEFAULT_ADMIN_EMAIL, | |
| DEFAULT_ADMIN_PASSWORD, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| FALLBACK_SQLITE_URL = "sqlite:///./janshakti.db" | |
| def _build_engine(url: str): | |
| """Create a SQLAlchemy engine for the given URL.""" | |
| is_sqlite = url.startswith("sqlite") | |
| kwargs = {"pool_pre_ping": True} | |
| if is_sqlite: | |
| kwargs["connect_args"] = {"check_same_thread": False} | |
| else: | |
| kwargs["connect_args"] = {"connect_timeout": 10} | |
| kwargs["pool_recycle"] = 300 | |
| kwargs["pool_size"] = 5 | |
| kwargs["max_overflow"] = 10 | |
| return create_engine(url, **kwargs), is_sqlite | |
| # --- Build initial engine --------------------------------------------------- | |
| _active_url = DATABASE_URL | |
| engine, IS_SQLITE = _build_engine(_active_url) | |
| SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | |
| class Base(DeclarativeBase): | |
| pass | |
| def _switch_to_sqlite(): | |
| """Fall back to a local SQLite database when PostgreSQL is unreachable.""" | |
| global engine, SessionLocal, IS_SQLITE, _active_url # noqa: PLW0603 | |
| print("[DB] ⚠️ PostgreSQL unreachable — falling back to local SQLite database") | |
| _active_url = FALLBACK_SQLITE_URL | |
| engine, IS_SQLITE = _build_engine(FALLBACK_SQLITE_URL) | |
| SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | |
| def get_db(): | |
| db = SessionLocal() | |
| try: | |
| yield db | |
| finally: | |
| db.close() | |
| def _ensure_user_columns_sqlite(): | |
| if not IS_SQLITE: | |
| return | |
| with engine.connect() as conn: | |
| result = conn.execute(text("PRAGMA table_info(users)")) | |
| columns = [row[1] for row in result.fetchall()] | |
| if "state" not in columns: | |
| conn.execute(text("ALTER TABLE users ADD COLUMN state VARCHAR")) | |
| if "district" not in columns: | |
| conn.execute(text("ALTER TABLE users ADD COLUMN district VARCHAR")) | |
| if "role" not in columns: | |
| conn.execute(text("ALTER TABLE users ADD COLUMN role VARCHAR NOT NULL DEFAULT 'citizen'")) | |
| if "is_active" not in columns: | |
| conn.execute(text("ALTER TABLE users ADD COLUMN is_active BOOLEAN NOT NULL DEFAULT 1")) | |
| if "last_login_at" not in columns: | |
| conn.execute(text("ALTER TABLE users ADD COLUMN last_login_at DATETIME")) | |
| conn.commit() | |
| def _ensure_user_columns_postgres(): | |
| if IS_SQLITE: | |
| return | |
| required_columns = { | |
| "state": "VARCHAR", | |
| "district": "VARCHAR", | |
| "role": "VARCHAR NOT NULL DEFAULT 'citizen'", | |
| "is_active": "BOOLEAN NOT NULL DEFAULT TRUE", | |
| "last_login_at": "TIMESTAMP WITH TIME ZONE", | |
| } | |
| with engine.connect() as conn: | |
| result = conn.execute(text( | |
| "SELECT column_name FROM information_schema.columns " | |
| "WHERE table_name = 'users'" | |
| )) | |
| existing = {row[0] for row in result.fetchall()} | |
| for name, col_type in required_columns.items(): | |
| if name not in existing: | |
| conn.execute(text(f"ALTER TABLE users ADD COLUMN {name} {col_type}")) | |
| print(f"[DB] Added missing column: users.{name}") | |
| conn.commit() | |
| def _ensure_complaint_workflow_columns(): | |
| if not IS_SQLITE: | |
| return | |
| required_columns = { | |
| "ai_risk_score": "FLOAT", | |
| "ai_risk_level": "VARCHAR", | |
| "ai_risk_factors": "TEXT", | |
| "ai_risk_reasoning": "TEXT", | |
| "ai_leader_brief": "TEXT", | |
| "citizen_user_id": "INTEGER", | |
| "citizen_language": "VARCHAR", | |
| "image_path": "VARCHAR", | |
| "audio_path": "VARCHAR", | |
| "assigned_authority": "VARCHAR", | |
| "authority_email": "VARCHAR", | |
| "leader_note": "TEXT", | |
| "authority_response": "TEXT", | |
| "citizen_update": "TEXT", | |
| "ai_breakdown": "TEXT", | |
| "ai_explanation": "TEXT", | |
| "ai_model_version": "VARCHAR", | |
| "before_meta": "TEXT", | |
| "after_meta": "TEXT", | |
| "verification_score": "FLOAT", | |
| "verification_confidence": "FLOAT", | |
| } | |
| with engine.connect() as conn: | |
| result = conn.execute(text("PRAGMA table_info(complaints)")) | |
| columns = [row[1] for row in result.fetchall()] | |
| for name, col_type in required_columns.items(): | |
| if name not in columns: | |
| conn.execute(text(f"ALTER TABLE complaints ADD COLUMN {name} {col_type}")) | |
| conn.commit() | |
| def _seed_default_leader(): | |
| from models.user import User | |
| from routers.auth import hash_password | |
| db = SessionLocal() | |
| try: | |
| existing = db.query(User).filter(User.email == DEFAULT_LEADER_EMAIL).first() | |
| if existing: | |
| if existing.role != "leader": | |
| existing.role = "leader" | |
| db.commit() | |
| return | |
| leader = User( | |
| name=DEFAULT_LEADER_NAME, | |
| email=DEFAULT_LEADER_EMAIL, | |
| phone=None, | |
| role="leader", | |
| hashed_password=hash_password(DEFAULT_LEADER_PASSWORD), | |
| ) | |
| db.add(leader) | |
| db.commit() | |
| print(f"[Auth] Default leader ready: {DEFAULT_LEADER_EMAIL}") | |
| finally: | |
| db.close() | |
| def _seed_default_authority(): | |
| from models.user import User | |
| from routers.auth import hash_password | |
| db = SessionLocal() | |
| try: | |
| existing = db.query(User).filter(User.email == DEFAULT_AUTHORITY_EMAIL).first() | |
| if existing: | |
| if existing.role != "authority": | |
| existing.role = "authority" | |
| db.commit() | |
| return | |
| authority = User( | |
| name=DEFAULT_AUTHORITY_NAME, | |
| email=DEFAULT_AUTHORITY_EMAIL, | |
| phone=None, | |
| role="authority", | |
| hashed_password=hash_password(DEFAULT_AUTHORITY_PASSWORD), | |
| ) | |
| db.add(authority) | |
| db.commit() | |
| print(f"[Auth] Default authority ready: {DEFAULT_AUTHORITY_EMAIL}") | |
| finally: | |
| db.close() | |
| def _seed_default_admin(): | |
| from models.user import User | |
| from routers.auth import hash_password | |
| db = SessionLocal() | |
| try: | |
| existing = db.query(User).filter(User.email == DEFAULT_ADMIN_EMAIL).first() | |
| if existing: | |
| changed = False | |
| if existing.role != "admin": | |
| existing.role = "admin" | |
| changed = True | |
| if existing.is_active is False: | |
| existing.is_active = True | |
| changed = True | |
| if changed: | |
| db.commit() | |
| return | |
| admin = User( | |
| name=DEFAULT_ADMIN_NAME, | |
| email=DEFAULT_ADMIN_EMAIL, | |
| phone=None, | |
| role="admin", | |
| hashed_password=hash_password(DEFAULT_ADMIN_PASSWORD), | |
| is_active=True, | |
| ) | |
| db.add(admin) | |
| db.commit() | |
| print(f"[Auth] Default admin ready: {DEFAULT_ADMIN_EMAIL}") | |
| finally: | |
| db.close() | |
| def init_db(): | |
| """Initialize database tables. If PostgreSQL is unreachable, | |
| automatically fall back to a local SQLite database so the app | |
| can still start and serve requests.""" | |
| try: | |
| Base.metadata.create_all(bind=engine) | |
| except (OperationalError, Exception) as exc: | |
| if not IS_SQLITE: | |
| print(f"[DB] PostgreSQL connection failed: {exc}") | |
| _switch_to_sqlite() | |
| # Retry with SQLite | |
| Base.metadata.create_all(bind=engine) | |
| else: | |
| raise | |
| try: | |
| if IS_SQLITE: | |
| _ensure_user_columns_sqlite() | |
| _ensure_complaint_workflow_columns() | |
| else: | |
| _ensure_user_columns_postgres() | |
| _ensure_pg_complaint_columns() | |
| _seed_default_admin() | |
| _seed_default_leader() | |
| _seed_default_authority() | |
| print(f"[DB] ✅ Database initialized successfully (using {'SQLite' if IS_SQLITE else 'PostgreSQL'})") | |
| except Exception as exc: | |
| print(f"[DB] Warning — seeding/migration issue (non-fatal): {exc}") | |
| def _ensure_pg_complaint_columns(): | |
| """Add any missing columns to the complaints table in PostgreSQL.""" | |
| required_columns = { | |
| "ai_risk_score": "FLOAT", | |
| "ai_risk_level": "VARCHAR", | |
| "ai_risk_factors": "TEXT", | |
| "ai_risk_reasoning": "TEXT", | |
| "ai_leader_brief": "TEXT", | |
| "citizen_language": "VARCHAR", | |
| "image_path": "VARCHAR", | |
| "audio_path": "VARCHAR", | |
| "assigned_authority": "VARCHAR", | |
| "authority_email": "VARCHAR", | |
| "leader_note": "TEXT", | |
| "authority_response": "TEXT", | |
| "citizen_update": "TEXT", | |
| "ai_breakdown": "TEXT", | |
| "ai_explanation": "TEXT", | |
| "ai_model_version": "VARCHAR", | |
| "before_meta": "TEXT", | |
| "after_meta": "TEXT", | |
| "verification_score": "FLOAT", | |
| "verification_confidence": "FLOAT", | |
| } | |
| with engine.connect() as conn: | |
| result = conn.execute(text( | |
| "SELECT column_name FROM information_schema.columns " | |
| "WHERE table_name = 'complaints'" | |
| )) | |
| existing = {row[0] for row in result.fetchall()} | |
| for name, col_type in required_columns.items(): | |
| if name not in existing: | |
| conn.execute(text(f"ALTER TABLE complaints ADD COLUMN {name} {col_type}")) | |
| print(f"[DB] Added missing column: complaints.{name}") | |
| conn.commit() | |