File size: 3,649 Bytes
0aee67c
 
 
 
 
 
 
e62736e
0aee67c
 
 
 
e62736e
0aee67c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e7e3d0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0aee67c
 
 
 
 
 
 
 
 
 
 
eb53ba9
0aee67c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import os
import io
import json
import pandas as pd
from aiobotocore.session import get_session

# --- Configuration and Constants ---
AWS_S3_BUCKET = os.getenv("AWS_S3_BUCKET", "oasis-pradelf")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
ROOT_DIR = "./data/"


# --- Data Formatting Helper ---
def _format_department_code(
    df: pd.DataFrame, column_name: str = "code_departement"
) -> pd.DataFrame:
    """Ensures department codes are 2-character strings, padding with '0' if needed."""
    if column_name in df.columns:
        df[column_name] = df[column_name].astype(str).str.zfill(2)
    return df


# --- Asynchronous Data Loading Functions ---
async def async_load_file_s3(object_key: str) -> pd.DataFrame:
    if not AWS_S3_BUCKET or not AWS_ACCESS_KEY_ID or not AWS_SECRET_ACCESS_KEY:
        raise ValueError(
            "AWS credentials or bucket name not set in environment variables."
        )

    session = get_session()
    async with session.create_client(
        "s3",
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    ) as s3_client:
        response = await s3_client.get_object(Bucket=AWS_S3_BUCKET, Key=object_key)
        status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

        if status == 200:
            async with response["Body"] as stream:
                content = await stream.read()
            return pd.read_csv(io.StringIO(content.decode("utf-8")))
        raise ValueError(f"Unsuccessful S3 get_object response. Status - {status}")


async def async_load_geojson_from_s3(object_key: str) -> dict:
    local_file_path = f"{ROOT_DIR}{object_key}"
    if os.path.exists(local_file_path):
        with open(local_file_path, "r") as f:
            return json.load(f)


async def async_load_file_s3_gzip(object_key: str) -> pd.DataFrame:
    if not AWS_S3_BUCKET or not AWS_ACCESS_KEY_ID or not AWS_SECRET_ACCESS_KEY:
        raise ValueError(
            "AWS credentials or bucket name not set in environment variables."
        )

    session = get_session()
    async with session.create_client(
        "s3",
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    ) as s3_client:
        response = await s3_client.get_object(Bucket=AWS_S3_BUCKET, Key=object_key)
        status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")
        if status == 200:
            async with response["Body"] as stream:
                content = await stream.read()
            return pd.read_csv(io.BytesIO(content), compression="gzip")
        raise ValueError(f"Unsuccessful S3 get_object response. Status - {status}")


async def async_load_file_s3_zip(object_key: str) -> pd.DataFrame:
    if not AWS_S3_BUCKET or not AWS_ACCESS_KEY_ID or not AWS_SECRET_ACCESS_KEY:
        raise ValueError(
            "AWS credentials or bucket name not set in environment variables."
        )

    session = get_session()
    async with session.create_client(
        "s3",
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    ) as s3_client:
        response = await s3_client.get_object(Bucket=AWS_S3_BUCKET, Key=object_key)
        status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")
        if status == 200:
            async with response["Body"] as stream:
                content = await stream.read()
            return pd.read_csv(io.BytesIO(content), compression="zip")
        raise ValueError(f"Unsuccessful S3 get_object response. Status - {status}")