#!/usr/bin/env python3 """Application entry point for Telegram AI SaaS. Performs the following startup sequence: 1. Load Settings from environment variables (lazy) 2. Initialize structured logging 3. Validate required configuration 4. Create LLMManager 5. Register OpenAI provider (if API key available) 6. Start HTTP health check server (with /health and /ready) 7. Log successful start Shutdown signals (SIGTERM, SIGINT) are handled gracefully. Windows-compatible signal handling is supported. """ from __future__ import annotations import asyncio import logging import signal import sys from contextlib import AsyncExitStack from typing import NoReturn from aiohttp import web from core.config.settings import get_settings from core.logging.logger import get_logger, setup_logging from core.security.validators import validate_admin_id, validate_bot_token from llm.manager import ( AllProvidersFailedError, FallbackPolicy, LLMManager, LLMManagerError, ProviderNotFoundError, ) from llm.providers.base_provider import LLMProviderError from llm.providers.openai_provider import OpenAIProvider # --------------------------------------------------------------------------- # Application-level exception # --------------------------------------------------------------------------- class ConfigurationError(Exception): """Raised when required configuration is missing or invalid.""" # --------------------------------------------------------------------------- # Health states # --------------------------------------------------------------------------- class AppHealth: """Tracks application readiness state for health endpoints. Separates infrastructure liveness (process alive) from business capability readiness (providers registered, config valid). """ def __init__(self) -> None: self._config_valid: bool = False self._llm_managers_ready: bool = False def mark_config_valid(self) -> None: self._config_valid = True def mark_llm_ready(self) -> None: self._llm_managers_ready = True @property def is_healthy(self) -> bool: """Liveness: process is alive and basic infrastructure works.""" return True @property def is_ready(self) -> bool: """Readiness: can serve LLM requests.""" return self._config_valid and self._llm_managers_ready @property def status_detail(self) -> dict[str, str]: """Return a dict with readiness details.""" details: dict[str, str] = {} if not self._config_valid: details["config"] = "invalid" if not self._llm_managers_ready: details["llm_providers"] = "none_registered" return details # Global health state — set during startup _app_health: AppHealth = AppHealth() _llm_manager: LLMManager | None = None # --------------------------------------------------------------------------- # Configuration validation # --------------------------------------------------------------------------- def validate_configuration() -> None: """Validate all required configuration fields. Checks that required environment variables are present and that their values pass format validation. Raises: ConfigurationError: If any required variable is missing or invalid. """ log = get_logger("config_validator") settings = get_settings() required = settings.required_fields missing = [name for name, value in required.items() if not value] if missing: log.error( "missing_required_environment_variables", missing=missing, ) raise ConfigurationError( f"Missing required environment variables: {missing}" ) try: validate_bot_token(settings.BOT_TOKEN) except ValueError as exc: log.error("configuration_validation_failed", field="BOT_TOKEN", error=str(exc)) raise ConfigurationError(str(exc)) from exc try: validate_admin_id(settings.ADMIN_ID) except ValueError as exc: log.error("configuration_validation_failed", field="ADMIN_ID", error=str(exc)) raise ConfigurationError(str(exc)) from exc # --------------------------------------------------------------------------- # HTTP handlers # --------------------------------------------------------------------------- async def _health_handler(request: web.Request) -> web.Response: """Liveness probe — process is alive and responsive. Always returns 200 while the process is running. """ return web.json_response({"status": "ok"}) async def _ready_handler(request: web.Request) -> web.Response: """Readiness probe — can serve LLM requests. Returns 200 if providers are registered and config is valid. Returns 503 with details if not ready. """ if _app_health.is_ready: return web.json_response({"status": "ok"}) details = _app_health.status_detail return web.json_response( {"status": "degraded", "reasons": details}, status=503, ) # --------------------------------------------------------------------------- # HTTP server # --------------------------------------------------------------------------- async def _start_health_server(port: int) -> web.AppRunner: """Start a minimal HTTP server for health checks. Registers /health (liveness) and /ready (readiness) endpoints. Args: port: Port number to listen on. Returns: The aiohttp AppRunner (caller is responsible for cleanup). """ app = web.Application() app.router.add_get("/health", _health_handler) app.router.add_get("/ready", _ready_handler) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, "0.0.0.0", port) await site.start() return runner # --------------------------------------------------------------------------- # Async main # --------------------------------------------------------------------------- async def _async_main() -> None: """Async application entry point with graceful shutdown support.""" log = get_logger("app") # 1. Setup logging (idempotent, thread-safe) settings = get_settings() setup_logging(log_level=settings.LOG_LEVEL, log_format=settings.LOG_FORMAT) log.info( "settings_loaded", default_provider=settings.DEFAULT_PROVIDER, port=settings.PORT, log_level=settings.LOG_LEVEL, log_format=settings.LOG_FORMAT, ) log.info( "logging_initialized", level=settings.LOG_LEVEL, format=settings.LOG_FORMAT, ) # 2. Validate configuration validate_configuration() log.info("configuration_validated") _app_health.mark_config_valid() # 3. Create LLMManager with fallback policy fallback_policy = FallbackPolicy( providers=["openai", "claude", "gemini", "glm", "qwen"], max_retries_per_provider=2, base_delay_seconds=1.0, max_delay_seconds=30.0, ) global _llm_manager _llm_manager = LLMManager( default_provider=settings.DEFAULT_PROVIDER, fallback_policy=fallback_policy, ) log.info( "llm_manager_initialized", default_provider=_llm_manager.default_provider, registered_providers=_llm_manager.list_providers(), ) # 4. Register OpenAI provider if API key is available if settings.OPENAI_API_KEY: try: openai_provider = OpenAIProvider( api_key=settings.OPENAI_API_KEY, ) await _llm_manager.register_provider("openai", openai_provider) _app_health.mark_llm_ready() log.info("openai_provider_registered") except Exception as exc: log.warning( "openai_provider_registration_failed", error=str(exc), ) else: log.warning( "no_openai_api_key", message="OPENAI_API_KEY not set — no LLM provider available", ) if not _llm_manager.has_providers(): log.warning( "no_providers_registered", message="LLMManager has no providers. " "Register providers before making LLM requests. " "The /ready endpoint will return 503.", ) # 5. Start health server runner = await _start_health_server(settings.PORT) log.info("health_server_started", port=settings.PORT) log.info( "application_started", port=settings.PORT, default_provider=settings.DEFAULT_PROVIDER, telegram_api_server=settings.TELEGRAM_API_SERVER, providers_count=len(_llm_manager.list_providers()), ) # 6. Wait for shutdown signal (cross-platform) shutdown_event = asyncio.Event() def _set_shutdown() -> None: log.info("shutdown_signal_received") shutdown_event.set() loop = asyncio.get_running_loop() try: for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, _set_shutdown) except NotImplementedError: # Windows fallback — signal.signal works with SIGINT on Windows signal.signal(signal.SIGINT, lambda *_: _set_shutdown()) try: signal.signal(signal.SIGTERM, lambda *_: _set_shutdown()) except (OSError, ValueError): # SIGTERM may not be signal-able on Windows pass # 7. Graceful shutdown try: await shutdown_event.wait() finally: log.info("application_shutting_down") await runner.cleanup() logging.shutdown() log.info("application_stopped") # --------------------------------------------------------------------------- # Synchronous entry point # --------------------------------------------------------------------------- def main() -> None: """Application entry point — uses asyncio.run() for proper lifecycle.""" try: asyncio.run(_async_main()) except ConfigurationError as exc: try: # Logging may not be configured yet — try structlog, fallback to print get_logger("app").error( "configuration_error", error=str(exc), ) except Exception: print(f"FATAL: Configuration error: {exc}", file=sys.stderr) logging.shutdown() sys.exit(1) except KeyboardInterrupt: # asyncio.run() handles cleanup — just exit cleanly pass except Exception as exc: try: get_logger("app").error( "unexpected_startup_error", error=str(exc), exc_info=True, ) except Exception: print(f"FATAL: Unexpected error: {exc}", file=sys.stderr) logging.shutdown() sys.exit(1) if __name__ == "__main__": main()