AI-Sprint-bot / app.py
FreshPixels's picture
Update app.py
03a535a verified
Raw
History Blame Contribute Delete
10.9 kB
#!/usr/bin/env python3
"""Application entry point for Telegram AI SaaS.
Performs the following startup sequence:
1. Load Settings from environment variables (lazy)
2. Initialize structured logging
3. Validate required configuration
4. Create LLMManager
5. Register OpenAI provider (if API key available)
6. Start HTTP health check server (with /health and /ready)
7. Log successful start
Shutdown signals (SIGTERM, SIGINT) are handled gracefully.
Windows-compatible signal handling is supported.
"""
from __future__ import annotations
import asyncio
import logging
import signal
import sys
from contextlib import AsyncExitStack
from typing import NoReturn
from aiohttp import web
from core.config.settings import get_settings
from core.logging.logger import get_logger, setup_logging
from core.security.validators import validate_admin_id, validate_bot_token
from llm.manager import (
AllProvidersFailedError,
FallbackPolicy,
LLMManager,
LLMManagerError,
ProviderNotFoundError,
)
from llm.providers.base_provider import LLMProviderError
from llm.providers.openai_provider import OpenAIProvider
# ---------------------------------------------------------------------------
# Application-level exception
# ---------------------------------------------------------------------------
class ConfigurationError(Exception):
"""Raised when required configuration is missing or invalid."""
# ---------------------------------------------------------------------------
# Health states
# ---------------------------------------------------------------------------
class AppHealth:
"""Tracks application readiness state for health endpoints.
Separates infrastructure liveness (process alive) from business
capability readiness (providers registered, config valid).
"""
def __init__(self) -> None:
self._config_valid: bool = False
self._llm_managers_ready: bool = False
def mark_config_valid(self) -> None:
self._config_valid = True
def mark_llm_ready(self) -> None:
self._llm_managers_ready = True
@property
def is_healthy(self) -> bool:
"""Liveness: process is alive and basic infrastructure works."""
return True
@property
def is_ready(self) -> bool:
"""Readiness: can serve LLM requests."""
return self._config_valid and self._llm_managers_ready
@property
def status_detail(self) -> dict[str, str]:
"""Return a dict with readiness details."""
details: dict[str, str] = {}
if not self._config_valid:
details["config"] = "invalid"
if not self._llm_managers_ready:
details["llm_providers"] = "none_registered"
return details
# Global health state — set during startup
_app_health: AppHealth = AppHealth()
_llm_manager: LLMManager | None = None
# ---------------------------------------------------------------------------
# Configuration validation
# ---------------------------------------------------------------------------
def validate_configuration() -> None:
"""Validate all required configuration fields.
Checks that required environment variables are present and
that their values pass format validation.
Raises:
ConfigurationError: If any required variable is missing or invalid.
"""
log = get_logger("config_validator")
settings = get_settings()
required = settings.required_fields
missing = [name for name, value in required.items() if not value]
if missing:
log.error(
"missing_required_environment_variables",
missing=missing,
)
raise ConfigurationError(
f"Missing required environment variables: {missing}"
)
try:
validate_bot_token(settings.BOT_TOKEN)
except ValueError as exc:
log.error("configuration_validation_failed", field="BOT_TOKEN", error=str(exc))
raise ConfigurationError(str(exc)) from exc
try:
validate_admin_id(settings.ADMIN_ID)
except ValueError as exc:
log.error("configuration_validation_failed", field="ADMIN_ID", error=str(exc))
raise ConfigurationError(str(exc)) from exc
# ---------------------------------------------------------------------------
# HTTP handlers
# ---------------------------------------------------------------------------
async def _health_handler(request: web.Request) -> web.Response:
"""Liveness probe — process is alive and responsive.
Always returns 200 while the process is running.
"""
return web.json_response({"status": "ok"})
async def _ready_handler(request: web.Request) -> web.Response:
"""Readiness probe — can serve LLM requests.
Returns 200 if providers are registered and config is valid.
Returns 503 with details if not ready.
"""
if _app_health.is_ready:
return web.json_response({"status": "ok"})
details = _app_health.status_detail
return web.json_response(
{"status": "degraded", "reasons": details},
status=503,
)
# ---------------------------------------------------------------------------
# HTTP server
# ---------------------------------------------------------------------------
async def _start_health_server(port: int) -> web.AppRunner:
"""Start a minimal HTTP server for health checks.
Registers /health (liveness) and /ready (readiness) endpoints.
Args:
port: Port number to listen on.
Returns:
The aiohttp AppRunner (caller is responsible for cleanup).
"""
app = web.Application()
app.router.add_get("/health", _health_handler)
app.router.add_get("/ready", _ready_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "0.0.0.0", port)
await site.start()
return runner
# ---------------------------------------------------------------------------
# Async main
# ---------------------------------------------------------------------------
async def _async_main() -> None:
"""Async application entry point with graceful shutdown support."""
log = get_logger("app")
# 1. Setup logging (idempotent, thread-safe)
settings = get_settings()
setup_logging(log_level=settings.LOG_LEVEL, log_format=settings.LOG_FORMAT)
log.info(
"settings_loaded",
default_provider=settings.DEFAULT_PROVIDER,
port=settings.PORT,
log_level=settings.LOG_LEVEL,
log_format=settings.LOG_FORMAT,
)
log.info(
"logging_initialized",
level=settings.LOG_LEVEL,
format=settings.LOG_FORMAT,
)
# 2. Validate configuration
validate_configuration()
log.info("configuration_validated")
_app_health.mark_config_valid()
# 3. Create LLMManager with fallback policy
fallback_policy = FallbackPolicy(
providers=["openai", "claude", "gemini", "glm", "qwen"],
max_retries_per_provider=2,
base_delay_seconds=1.0,
max_delay_seconds=30.0,
)
global _llm_manager
_llm_manager = LLMManager(
default_provider=settings.DEFAULT_PROVIDER,
fallback_policy=fallback_policy,
)
log.info(
"llm_manager_initialized",
default_provider=_llm_manager.default_provider,
registered_providers=_llm_manager.list_providers(),
)
# 4. Register OpenAI provider if API key is available
if settings.OPENAI_API_KEY:
try:
openai_provider = OpenAIProvider(
api_key=settings.OPENAI_API_KEY,
)
await _llm_manager.register_provider("openai", openai_provider)
_app_health.mark_llm_ready()
log.info("openai_provider_registered")
except Exception as exc:
log.warning(
"openai_provider_registration_failed",
error=str(exc),
)
else:
log.warning(
"no_openai_api_key",
message="OPENAI_API_KEY not set — no LLM provider available",
)
if not _llm_manager.has_providers():
log.warning(
"no_providers_registered",
message="LLMManager has no providers. "
"Register providers before making LLM requests. "
"The /ready endpoint will return 503.",
)
# 5. Start health server
runner = await _start_health_server(settings.PORT)
log.info("health_server_started", port=settings.PORT)
log.info(
"application_started",
port=settings.PORT,
default_provider=settings.DEFAULT_PROVIDER,
telegram_api_server=settings.TELEGRAM_API_SERVER,
providers_count=len(_llm_manager.list_providers()),
)
# 6. Wait for shutdown signal (cross-platform)
shutdown_event = asyncio.Event()
def _set_shutdown() -> None:
log.info("shutdown_signal_received")
shutdown_event.set()
loop = asyncio.get_running_loop()
try:
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, _set_shutdown)
except NotImplementedError:
# Windows fallback — signal.signal works with SIGINT on Windows
signal.signal(signal.SIGINT, lambda *_: _set_shutdown())
try:
signal.signal(signal.SIGTERM, lambda *_: _set_shutdown())
except (OSError, ValueError):
# SIGTERM may not be signal-able on Windows
pass
# 7. Graceful shutdown
try:
await shutdown_event.wait()
finally:
log.info("application_shutting_down")
await runner.cleanup()
logging.shutdown()
log.info("application_stopped")
# ---------------------------------------------------------------------------
# Synchronous entry point
# ---------------------------------------------------------------------------
def main() -> None:
"""Application entry point — uses asyncio.run() for proper lifecycle."""
try:
asyncio.run(_async_main())
except ConfigurationError as exc:
try:
# Logging may not be configured yet — try structlog, fallback to print
get_logger("app").error(
"configuration_error",
error=str(exc),
)
except Exception:
print(f"FATAL: Configuration error: {exc}", file=sys.stderr)
logging.shutdown()
sys.exit(1)
except KeyboardInterrupt:
# asyncio.run() handles cleanup — just exit cleanly
pass
except Exception as exc:
try:
get_logger("app").error(
"unexpected_startup_error",
error=str(exc),
exc_info=True,
)
except Exception:
print(f"FATAL: Unexpected error: {exc}", file=sys.stderr)
logging.shutdown()
sys.exit(1)
if __name__ == "__main__":
main()