import os import io import json import pandas as pd from aiobotocore.session import get_session # --- Configuration and Constants --- AWS_S3_BUCKET = os.getenv("AWS_S3_BUCKET", "oasis-pradelf") AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") ROOT_DIR = "./data/" # --- Data Formatting Helper --- def _format_department_code( df: pd.DataFrame, column_name: str = "code_departement" ) -> pd.DataFrame: """Ensures department codes are 2-character strings, padding with '0' if needed.""" if column_name in df.columns: df[column_name] = df[column_name].astype(str).str.zfill(2) return df # --- Asynchronous Data Loading Functions --- async def async_load_file_s3(object_key: str) -> pd.DataFrame: if not AWS_S3_BUCKET or not AWS_ACCESS_KEY_ID or not AWS_SECRET_ACCESS_KEY: raise ValueError( "AWS credentials or bucket name not set in environment variables." ) session = get_session() async with session.create_client( "s3", aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, ) as s3_client: response = await s3_client.get_object(Bucket=AWS_S3_BUCKET, Key=object_key) status = response.get("ResponseMetadata", {}).get("HTTPStatusCode") if status == 200: async with response["Body"] as stream: content = await stream.read() return pd.read_csv(io.StringIO(content.decode("utf-8"))) raise ValueError(f"Unsuccessful S3 get_object response. Status - {status}") async def async_load_geojson_from_s3(object_key: str) -> dict: local_file_path = f"{ROOT_DIR}{object_key}" if os.path.exists(local_file_path): with open(local_file_path, "r") as f: return json.load(f) async def async_load_file_s3_gzip(object_key: str) -> pd.DataFrame: if not AWS_S3_BUCKET or not AWS_ACCESS_KEY_ID or not AWS_SECRET_ACCESS_KEY: raise ValueError( "AWS credentials or bucket name not set in environment variables." ) session = get_session() async with session.create_client( "s3", aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, ) as s3_client: response = await s3_client.get_object(Bucket=AWS_S3_BUCKET, Key=object_key) status = response.get("ResponseMetadata", {}).get("HTTPStatusCode") if status == 200: async with response["Body"] as stream: content = await stream.read() return pd.read_csv(io.BytesIO(content), compression="gzip") raise ValueError(f"Unsuccessful S3 get_object response. Status - {status}") async def async_load_file_s3_zip(object_key: str) -> pd.DataFrame: if not AWS_S3_BUCKET or not AWS_ACCESS_KEY_ID or not AWS_SECRET_ACCESS_KEY: raise ValueError( "AWS credentials or bucket name not set in environment variables." ) session = get_session() async with session.create_client( "s3", aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, ) as s3_client: response = await s3_client.get_object(Bucket=AWS_S3_BUCKET, Key=object_key) status = response.get("ResponseMetadata", {}).get("HTTPStatusCode") if status == 200: async with response["Body"] as stream: content = await stream.read() return pd.read_csv(io.BytesIO(content), compression="zip") raise ValueError(f"Unsuccessful S3 get_object response. Status - {status}")