Spaces:
Sleeping
Sleeping
File size: 1,775 Bytes
ce673e5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | """
Spark Session Manager for OpenTriage AI Engine.
Provides a singleton Spark session for distributed processing.
"""
import os
import logging
from pyspark.sql import SparkSession
from config.settings import settings
logger = logging.getLogger(__name__)
_spark_session = None
def get_or_create_spark_session() -> SparkSession:
"""
Get or create a Spark session.
Returns:
SparkSession: The Spark session instance
"""
global _spark_session
if _spark_session is not None:
return _spark_session
try:
builder = SparkSession.builder \
.appName(settings.SPARK_APP_NAME) \
.master(settings.SPARK_MASTER)
# Configure memory if specified
if settings.SPARK_DRIVER_MEMORY:
builder = builder.config("spark.driver.memory", settings.SPARK_DRIVER_MEMORY)
if settings.SPARK_EXECUTOR_MEMORY:
builder = builder.config("spark.executor.memory", settings.SPARK_EXECUTOR_MEMORY)
# Reduce logging verbosity
builder = builder.config("spark.ui.showConsoleProgress", "false")
_spark_session = builder.getOrCreate()
# Set log level
_spark_session.sparkContext.setLogLevel(settings.SPARK_LOG_LEVEL)
logger.info(f"Spark session created: {settings.SPARK_APP_NAME}")
return _spark_session
except Exception as e:
logger.error(f"Failed to create Spark session: {e}")
raise
def stop_spark_session():
"""Stop the Spark session if running."""
global _spark_session
if _spark_session is not None:
_spark_session.stop()
_spark_session = None
logger.info("Spark session stopped")
|