oasis-web / src /pages /utils /utils.py
pradelf's picture
ajout des repertoires de travail
3e7e3d0
import os
import io
import json
import pandas as pd
from aiobotocore.session import get_session
# --- Configuration and Constants ---
AWS_S3_BUCKET = os.getenv("AWS_S3_BUCKET", "oasis-pradelf")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
ROOT_DIR = "./data/"
# --- Data Formatting Helper ---
def _format_department_code(
df: pd.DataFrame, column_name: str = "code_departement"
) -> pd.DataFrame:
"""Ensures department codes are 2-character strings, padding with '0' if needed."""
if column_name in df.columns:
df[column_name] = df[column_name].astype(str).str.zfill(2)
return df
# --- Asynchronous Data Loading Functions ---
async def async_load_file_s3(object_key: str) -> pd.DataFrame:
if not AWS_S3_BUCKET or not AWS_ACCESS_KEY_ID or not AWS_SECRET_ACCESS_KEY:
raise ValueError(
"AWS credentials or bucket name not set in environment variables."
)
session = get_session()
async with session.create_client(
"s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
) as s3_client:
response = await s3_client.get_object(Bucket=AWS_S3_BUCKET, Key=object_key)
status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")
if status == 200:
async with response["Body"] as stream:
content = await stream.read()
return pd.read_csv(io.StringIO(content.decode("utf-8")))
raise ValueError(f"Unsuccessful S3 get_object response. Status - {status}")
async def async_load_geojson_from_s3(object_key: str) -> dict:
local_file_path = f"{ROOT_DIR}{object_key}"
if os.path.exists(local_file_path):
with open(local_file_path, "r") as f:
return json.load(f)
async def async_load_file_s3_gzip(object_key: str) -> pd.DataFrame:
if not AWS_S3_BUCKET or not AWS_ACCESS_KEY_ID or not AWS_SECRET_ACCESS_KEY:
raise ValueError(
"AWS credentials or bucket name not set in environment variables."
)
session = get_session()
async with session.create_client(
"s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
) as s3_client:
response = await s3_client.get_object(Bucket=AWS_S3_BUCKET, Key=object_key)
status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")
if status == 200:
async with response["Body"] as stream:
content = await stream.read()
return pd.read_csv(io.BytesIO(content), compression="gzip")
raise ValueError(f"Unsuccessful S3 get_object response. Status - {status}")
async def async_load_file_s3_zip(object_key: str) -> pd.DataFrame:
if not AWS_S3_BUCKET or not AWS_ACCESS_KEY_ID or not AWS_SECRET_ACCESS_KEY:
raise ValueError(
"AWS credentials or bucket name not set in environment variables."
)
session = get_session()
async with session.create_client(
"s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
) as s3_client:
response = await s3_client.get_object(Bucket=AWS_S3_BUCKET, Key=object_key)
status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")
if status == 200:
async with response["Body"] as stream:
content = await stream.read()
return pd.read_csv(io.BytesIO(content), compression="zip")
raise ValueError(f"Unsuccessful S3 get_object response. Status - {status}")