| import os |
| import io |
| import json |
| import pandas as pd |
| from aiobotocore.session import get_session |
|
|
| |
| AWS_S3_BUCKET = os.getenv("AWS_S3_BUCKET", "oasis-pradelf") |
| AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") |
| AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") |
| ROOT_DIR = "./data/" |
|
|
|
|
| |
| def _format_department_code( |
| df: pd.DataFrame, column_name: str = "code_departement" |
| ) -> pd.DataFrame: |
| """Ensures department codes are 2-character strings, padding with '0' if needed.""" |
| if column_name in df.columns: |
| df[column_name] = df[column_name].astype(str).str.zfill(2) |
| return df |
|
|
|
|
| |
| async def async_load_file_s3(object_key: str) -> pd.DataFrame: |
| if not AWS_S3_BUCKET or not AWS_ACCESS_KEY_ID or not AWS_SECRET_ACCESS_KEY: |
| raise ValueError( |
| "AWS credentials or bucket name not set in environment variables." |
| ) |
|
|
| session = get_session() |
| async with session.create_client( |
| "s3", |
| aws_access_key_id=AWS_ACCESS_KEY_ID, |
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY, |
| ) as s3_client: |
| response = await s3_client.get_object(Bucket=AWS_S3_BUCKET, Key=object_key) |
| status = response.get("ResponseMetadata", {}).get("HTTPStatusCode") |
|
|
| if status == 200: |
| async with response["Body"] as stream: |
| content = await stream.read() |
| return pd.read_csv(io.StringIO(content.decode("utf-8"))) |
| raise ValueError(f"Unsuccessful S3 get_object response. Status - {status}") |
|
|
|
|
| async def async_load_geojson_from_s3(object_key: str) -> dict: |
| local_file_path = f"{ROOT_DIR}{object_key}" |
| if os.path.exists(local_file_path): |
| with open(local_file_path, "r") as f: |
| return json.load(f) |
|
|
|
|
| async def async_load_file_s3_gzip(object_key: str) -> pd.DataFrame: |
| if not AWS_S3_BUCKET or not AWS_ACCESS_KEY_ID or not AWS_SECRET_ACCESS_KEY: |
| raise ValueError( |
| "AWS credentials or bucket name not set in environment variables." |
| ) |
|
|
| session = get_session() |
| async with session.create_client( |
| "s3", |
| aws_access_key_id=AWS_ACCESS_KEY_ID, |
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY, |
| ) as s3_client: |
| response = await s3_client.get_object(Bucket=AWS_S3_BUCKET, Key=object_key) |
| status = response.get("ResponseMetadata", {}).get("HTTPStatusCode") |
| if status == 200: |
| async with response["Body"] as stream: |
| content = await stream.read() |
| return pd.read_csv(io.BytesIO(content), compression="gzip") |
| raise ValueError(f"Unsuccessful S3 get_object response. Status - {status}") |
|
|
|
|
| async def async_load_file_s3_zip(object_key: str) -> pd.DataFrame: |
| if not AWS_S3_BUCKET or not AWS_ACCESS_KEY_ID or not AWS_SECRET_ACCESS_KEY: |
| raise ValueError( |
| "AWS credentials or bucket name not set in environment variables." |
| ) |
|
|
| session = get_session() |
| async with session.create_client( |
| "s3", |
| aws_access_key_id=AWS_ACCESS_KEY_ID, |
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY, |
| ) as s3_client: |
| response = await s3_client.get_object(Bucket=AWS_S3_BUCKET, Key=object_key) |
| status = response.get("ResponseMetadata", {}).get("HTTPStatusCode") |
| if status == 200: |
| async with response["Body"] as stream: |
| content = await stream.read() |
| return pd.read_csv(io.BytesIO(content), compression="zip") |
| raise ValueError(f"Unsuccessful S3 get_object response. Status - {status}") |
|
|