SuperAI_Forecast / backend /system_integration.py
Thang6822
Stabilize workspace UX and deployment flow
2eec8c3
from __future__ import annotations
import os
import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Mapping, MutableMapping, Optional
from dotenv import set_key
AUTOSTART_ENV_KEY = "AIFORECAST_START_WITH_WINDOWS"
AUTOSTART_REGISTRY_VALUE_NAME = "SuperAIForecastRunBat"
AUTOSTART_STARTUP_FILE_NAME = "SuperAIForecastRunBat.cmd"
BROWSER_PATH_ENV_KEY = "AIFORECAST_BROWSER_PATH"
WINDOWS_RUN_KEY_PATH = r"Software\Microsoft\Windows\CurrentVersion\Run"
DEFAULT_BROWSER_PREFERENCE = "opera"
@dataclass(frozen=True)
class BrowserLaunchStatus:
preferred: str
available: bool
executable: Optional[str]
def to_payload(self) -> dict[str, object]:
return {
"preferred": self.preferred,
"available": self.available,
"executable": self.executable,
}
@dataclass(frozen=True)
class StartupPreferenceStatus:
supported: bool
enabled: bool
registered: bool
command: Optional[str]
env_key: str
registry_value_name: str
browser: BrowserLaunchStatus
message: str
def to_payload(self) -> dict[str, object]:
return {
"supported": self.supported,
"enabled": self.enabled,
"registered": self.registered,
"command": self.command,
"env_key": self.env_key,
"registry_value_name": self.registry_value_name,
"browser": self.browser.to_payload(),
"message": self.message,
}
def _read_bool_env_value(value: Optional[str], *, default: bool) -> bool:
if value is None:
return default
normalized = value.strip().lower()
if not normalized:
return default
if normalized in {"1", "true", "yes", "on"}:
return True
if normalized in {"0", "false", "no", "off"}:
return False
return default
def resolve_start_with_windows_enabled(
env: Mapping[str, str] | None = None,
*,
default: bool = True,
) -> bool:
runtime_env = os.environ if env is None else env
return _read_bool_env_value(runtime_env.get(AUTOSTART_ENV_KEY), default=default)
def resolve_windows_startup_folder(env: Mapping[str, str] | None = None) -> Path:
runtime_env = os.environ if env is None else env
app_data = runtime_env.get("APPDATA", "").strip()
if app_data:
return Path(app_data) / "Microsoft" / "Windows" / "Start Menu" / "Programs" / "Startup"
return (
Path.home()
/ "AppData"
/ "Roaming"
/ "Microsoft"
/ "Windows"
/ "Start Menu"
/ "Programs"
/ "Startup"
)
def build_windows_startup_launcher_path(
project_root: Path,
env: Mapping[str, str] | None = None,
) -> Path:
del project_root
return resolve_windows_startup_folder(env) / AUTOSTART_STARTUP_FILE_NAME
def build_windows_startup_launcher_contents(project_root: Path) -> str:
resolved_project_root = Path(project_root).resolve()
run_bat_path = resolved_project_root / "run.bat"
return "\n".join(
[
"@echo off",
f'start "" /d "{resolved_project_root}" "{run_bat_path}"',
"",
],
)
def _read_text_file(path: Path) -> Optional[str]:
try:
return path.read_text(encoding="utf-8")
except FileNotFoundError:
return None
def _write_text_file(path: Path, content: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
def _delete_file(path: Path) -> None:
try:
path.unlink()
except FileNotFoundError:
return
def _normalize_text_lines(value: Optional[str]) -> Optional[str]:
if value is None:
return None
return value.replace("\r\n", "\n").replace("\r", "\n")
def iter_preferred_browser_candidates(env: Mapping[str, str] | None = None) -> list[str]:
runtime_env = os.environ if env is None else env
candidates: list[str] = []
custom_browser_path = runtime_env.get(BROWSER_PATH_ENV_KEY, "").strip()
if custom_browser_path:
candidates.append(custom_browser_path)
local_app_data = runtime_env.get("LOCALAPPDATA", "").strip()
program_files = runtime_env.get("ProgramFiles", "").strip()
program_files_x86 = runtime_env.get("ProgramFiles(x86)", "").strip()
common_candidates = [
Path(local_app_data) / "Programs" / "Opera" / "opera.exe",
Path(local_app_data) / "Programs" / "Opera" / "launcher.exe",
Path(program_files) / "Opera" / "opera.exe",
Path(program_files) / "Opera" / "launcher.exe",
Path(program_files_x86) / "Opera" / "opera.exe",
Path(program_files_x86) / "Opera" / "launcher.exe",
Path(local_app_data) / "Programs" / "Opera GX" / "opera.exe",
Path(local_app_data) / "Programs" / "Opera GX" / "launcher.exe",
]
candidates.extend(str(path) for path in common_candidates if str(path).strip())
for binary_name in ("opera.exe", "opera", "launcher.exe"):
binary_path = shutil.which(binary_name)
if binary_path:
candidates.append(binary_path)
deduped: list[str] = []
seen: set[str] = set()
for candidate in candidates:
normalized = str(candidate).strip()
if not normalized:
continue
lowered = normalized.lower()
if lowered in seen:
continue
seen.add(lowered)
deduped.append(normalized)
return deduped
def get_browser_launch_status(
env: Mapping[str, str] | None = None,
*,
path_exists: Callable[[Path], bool] | None = None,
) -> BrowserLaunchStatus:
exists = path_exists or Path.exists
for candidate in iter_preferred_browser_candidates(env):
candidate_path = Path(candidate)
if exists(candidate_path):
return BrowserLaunchStatus(
preferred=DEFAULT_BROWSER_PREFERENCE,
available=True,
executable=str(candidate_path),
)
return BrowserLaunchStatus(
preferred=DEFAULT_BROWSER_PREFERENCE,
available=False,
executable=None,
)
def open_url_with_preferred_browser(
url: str,
*,
env: Mapping[str, str] | None = None,
path_exists: Callable[[Path], bool] | None = None,
popen: Callable[..., object] = subprocess.Popen,
) -> bool:
browser_status = get_browser_launch_status(env, path_exists=path_exists)
if not browser_status.available or not browser_status.executable:
return False
try:
popen(
[browser_status.executable, url],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return True
except OSError:
return False
def _read_windows_run_value(
value_name: str,
*,
key_path: str = WINDOWS_RUN_KEY_PATH,
) -> Optional[str]:
import winreg
try:
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, key_path, 0, winreg.KEY_READ) as key:
value, _ = winreg.QueryValueEx(key, value_name)
return str(value)
except FileNotFoundError:
return None
def _delete_windows_run_value(
value_name: str,
*,
key_path: str = WINDOWS_RUN_KEY_PATH,
) -> None:
import winreg
try:
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, key_path, 0, winreg.KEY_SET_VALUE) as key:
winreg.DeleteValue(key, value_name)
except FileNotFoundError:
return
def get_startup_preference_status(
project_root: Path,
*,
env: Mapping[str, str] | None = None,
os_name: Optional[str] = None,
registry_reader: Callable[[str], Optional[str]] = _read_windows_run_value,
path_exists: Callable[[Path], bool] | None = None,
startup_file_reader: Callable[[Path], Optional[str]] = _read_text_file,
) -> StartupPreferenceStatus:
resolved_os_name = os.name if os_name is None else os_name
runtime_env = os.environ if env is None else env
supported = resolved_os_name == "nt"
enabled = resolve_start_with_windows_enabled(runtime_env, default=True)
launcher_path = build_windows_startup_launcher_path(project_root, runtime_env)
launcher_contents = _normalize_text_lines(build_windows_startup_launcher_contents(project_root))
command = str((Path(project_root).resolve() / "run.bat"))
registered_launcher = bool(
supported
and _normalize_text_lines(startup_file_reader(launcher_path)) == launcher_contents
)
legacy_registered_command = registry_reader(AUTOSTART_REGISTRY_VALUE_NAME) if supported else None
legacy_registered = bool(supported and legacy_registered_command)
registered = bool(supported and registered_launcher and not legacy_registered)
browser = get_browser_launch_status(runtime_env, path_exists=path_exists)
if not supported:
message = "Tu dong khoi dong file .bat chi duoc ho tro tren Windows."
elif enabled and registered_launcher and legacy_registered:
message = "Phat hien logic khoi dong cu trong Windows Run. Launcher se xoa no va giu Startup folder cho run.bat."
elif enabled and registered:
message = "run.bat dang duoc dang ky trong Startup folder cua Windows."
elif enabled:
message = "Da bat tu dong khoi dong. Launcher se tao file Startup mo run.bat."
else:
message = "run.bat se khong khoi dong cung Windows."
return StartupPreferenceStatus(
supported=supported,
enabled=enabled,
registered=registered,
command=command,
env_key=AUTOSTART_ENV_KEY,
registry_value_name=AUTOSTART_REGISTRY_VALUE_NAME,
browser=browser,
message=message,
)
def sync_startup_preference(
project_root: Path,
*,
env: MutableMapping[str, str] | None = None,
os_name: Optional[str] = None,
registry_reader: Callable[[str], Optional[str]] = _read_windows_run_value,
registry_deleter: Callable[[str], None] = _delete_windows_run_value,
path_exists: Callable[[Path], bool] | None = None,
startup_file_reader: Callable[[Path], Optional[str]] = _read_text_file,
startup_file_writer: Callable[[Path, str], None] = _write_text_file,
startup_file_deleter: Callable[[Path], None] = _delete_file,
) -> StartupPreferenceStatus:
runtime_env = os.environ if env is None else env
status = get_startup_preference_status(
project_root,
env=runtime_env,
os_name=os_name,
registry_reader=registry_reader,
path_exists=path_exists,
startup_file_reader=startup_file_reader,
)
if status.supported:
launcher_path = build_windows_startup_launcher_path(project_root, runtime_env)
launcher_contents = build_windows_startup_launcher_contents(project_root)
registry_deleter(status.registry_value_name)
if status.enabled:
startup_file_writer(launcher_path, launcher_contents)
else:
startup_file_deleter(launcher_path)
return get_startup_preference_status(
project_root,
env=runtime_env,
os_name=os_name,
registry_reader=registry_reader,
path_exists=path_exists,
startup_file_reader=startup_file_reader,
)
def update_startup_preference(
project_root: Path,
env_file: Path,
*,
enabled: bool,
env: MutableMapping[str, str] | None = None,
os_name: Optional[str] = None,
env_writer: Callable[..., tuple[Optional[bool], str, str]] = set_key,
registry_reader: Callable[[str], Optional[str]] = _read_windows_run_value,
registry_deleter: Callable[[str], None] = _delete_windows_run_value,
path_exists: Callable[[Path], bool] | None = None,
startup_file_reader: Callable[[Path], Optional[str]] = _read_text_file,
startup_file_writer: Callable[[Path, str], None] = _write_text_file,
startup_file_deleter: Callable[[Path], None] = _delete_file,
) -> StartupPreferenceStatus:
runtime_env = os.environ if env is None else env
runtime_env[AUTOSTART_ENV_KEY] = "1" if enabled else "0"
env_file.parent.mkdir(parents=True, exist_ok=True)
env_writer(str(env_file), AUTOSTART_ENV_KEY, runtime_env[AUTOSTART_ENV_KEY], quote_mode="never")
return sync_startup_preference(
project_root,
env=runtime_env,
os_name=os_name,
registry_reader=registry_reader,
registry_deleter=registry_deleter,
path_exists=path_exists,
startup_file_reader=startup_file_reader,
startup_file_writer=startup_file_writer,
startup_file_deleter=startup_file_deleter,
)