SACC-release / core /config.py
cacode's picture
Deploy updated SCU course catcher
e28c9e4 verified
from __future__ import annotations
import json
import os
import secrets
import shutil
from dataclasses import dataclass
from pathlib import Path
from cryptography.fernet import Fernet
DEFAULT_PARALLEL_LIMIT = 2
POLL_INTERVAL_SECONDS = 10
LOGIN_RETRY_LIMIT = 6
TASK_BACKOFF_SECONDS = 6
BROWSER_PAGE_TIMEOUT = 40
LOGS_PAGE_SIZE = 180
SELENIUM_ERROR_LIMIT = 5
SELENIUM_RESTART_LIMIT = 5
SUBMIT_CAPTCHA_RETRY_LIMIT = 5
def _pick_binary(env_name: str, *fallbacks: str) -> str:
env_value = os.getenv(env_name)
if env_value:
return env_value
for candidate in fallbacks:
if not candidate:
continue
if Path(candidate).exists():
return candidate
resolved = shutil.which(candidate)
if resolved:
return resolved
return fallbacks[0] if fallbacks else ""
def _load_or_create_internal_secrets(data_dir: Path) -> tuple[str, str]:
secret_file = data_dir / ".app_secrets.json"
payload: dict[str, str] = {}
if secret_file.exists():
try:
raw_payload = json.loads(secret_file.read_text(encoding="utf-8"))
if isinstance(raw_payload, dict):
payload = {
str(key): str(value)
for key, value in raw_payload.items()
if isinstance(value, str)
}
except (OSError, json.JSONDecodeError):
payload = {}
session_secret = payload.get("session_secret", "").strip()
encryption_key = payload.get("encryption_key", "").strip()
changed = False
if not session_secret:
session_secret = secrets.token_hex(32)
changed = True
if not encryption_key:
encryption_key = Fernet.generate_key().decode("utf-8")
changed = True
if changed or not secret_file.exists():
secret_file.write_text(
json.dumps(
{
"session_secret": session_secret,
"encryption_key": encryption_key,
},
ensure_ascii=False,
indent=2,
),
encoding="utf-8",
)
return session_secret, encryption_key
@dataclass(slots=True)
class AppConfig:
root_dir: Path
data_dir: Path
db_path: Path
session_secret: str
encryption_key: str
super_admin_username: str
super_admin_password: str
default_parallel_limit: int
poll_interval_seconds: int
login_retry_limit: int
task_backoff_seconds: int
browser_page_timeout: int
logs_page_size: int
selenium_error_limit: int
selenium_restart_limit: int
submit_captcha_retry_limit: int
chrome_binary: str
chromedriver_path: str
@classmethod
def load(cls) -> "AppConfig":
root_dir = Path(__file__).resolve().parent.parent
data_dir = Path(os.getenv("DATA_DIR", str(root_dir / "data"))).resolve()
data_dir.mkdir(parents=True, exist_ok=True)
super_admin_username = os.getenv("ADMIN", "superadmin")
super_admin_password = os.getenv("PASSWORD", "change-me-in-hf-space")
session_secret, encryption_key = _load_or_create_internal_secrets(data_dir)
return cls(
root_dir=root_dir,
data_dir=data_dir,
db_path=data_dir / "course_catcher.db",
session_secret=session_secret,
encryption_key=encryption_key,
super_admin_username=super_admin_username,
super_admin_password=super_admin_password,
default_parallel_limit=DEFAULT_PARALLEL_LIMIT,
poll_interval_seconds=POLL_INTERVAL_SECONDS,
login_retry_limit=LOGIN_RETRY_LIMIT,
task_backoff_seconds=TASK_BACKOFF_SECONDS,
browser_page_timeout=BROWSER_PAGE_TIMEOUT,
logs_page_size=LOGS_PAGE_SIZE,
selenium_error_limit=SELENIUM_ERROR_LIMIT,
selenium_restart_limit=SELENIUM_RESTART_LIMIT,
submit_captcha_retry_limit=SUBMIT_CAPTCHA_RETRY_LIMIT,
chrome_binary=_pick_binary(
"CHROME_BIN",
"/usr/bin/chromium",
"/usr/bin/chromium-browser",
"chromium",
"chromium-browser",
),
chromedriver_path=_pick_binary(
"CHROMEDRIVER_PATH",
"/usr/bin/chromedriver",
"chromedriver",
),
)