amps / utils /r2_utils.py
jibsn's picture
Update utils/r2_utils.py
e581c57 verified
import boto3
import io
import json
import asyncio
import pandas as pd
from docx import Document
from loguru import logger
from entities.task import Task, task_factory
BUCKET_NAME = "ai-scientist"
r2_endpoint = "https://468d92a3c903c841bc2de3b413e45072.r2.cloudflarestorage.com/"
TOKEN = "KhGGD1ZJI_YTlLaZ0nSMfBJSLnOhgYN6cwq1De7G"
R2_ACCESS_KEY_ID = "b9bc4becece838742ae1dc161be92de3"
R2_SECRET_ACCESS_KEY = "f68eb82bd1c00528f26c6ac9b57d737fe0e4729ac7c429030fbc22a17dc8f105"
def get_client():
return boto3.client(
"s3",
endpoint_url=r2_endpoint,
aws_access_key_id=R2_ACCESS_KEY_ID,
aws_secret_access_key=R2_SECRET_ACCESS_KEY,
region_name="auto" # R2 需要设置为 auto
)
async def get_task_from_minio(
uuid: str,
customer_name: str,
client=None
) -> Task:
if client is None:
client = get_client()
response = await asyncio.to_thread(
lambda: client.list_objects_v2(
Bucket=BUCKET_NAME,
Prefix=f"{customer_name}/"
)
)
objects = response.get("Contents", [])
if not objects:
raise FileNotFoundError(f"No task found for customer {customer_name}")
object_names = [obj["Key"].split("/")[1] for obj in objects]
if uuid not in object_names:
raise FileNotFoundError(f"No task found for customer {customer_name} with uuid {uuid}")
json_file = await get_file_from_minio(
bucket_name=BUCKET_NAME,
object_name=f"{customer_name}/{uuid}/task.json",
client=client
)
json_data = json_file.decode("utf-8")
json_data = json.loads(json_data)
return task_factory[json_data["task_type"]].load_from_json(json_data)
async def get_all_tasks_from_minio(
customer_name: str,
client=None
) -> list[Task]:
if client is None:
client = get_client()
response = await asyncio.to_thread(
lambda: client.list_objects_v2(
Bucket=BUCKET_NAME,
Prefix=f"{customer_name}/"
)
)
objects = response.get("Contents", [])
if not objects:
return []
task_ids = list(set([obj["Key"].split("/")[1] for obj in objects]))
task_jsons = await asyncio.gather(
*(get_task_from_minio(uuid=task_id, customer_name=customer_name, client=client) for task_id in task_ids)
)
return task_jsons
async def upload_task_json_to_minio(task: Task, client=None) -> Task:
if client is None:
client = get_client()
json_data = task.save_to_json()
byte_data = io.BytesIO(json_data.encode("utf-8"))
await asyncio.to_thread(
lambda: client.put_object(
Bucket=BUCKET_NAME,
Key=f"{task.customer_name}/{task.uuid}/task.json",
Body=byte_data,
ContentType="application/json"
)
)
return task
async def upload_text_to_minio(
bucket_name: str,
object_name: str,
file_content: str,
client=None,
):
if client is None:
client = get_client()
file_data = io.BytesIO(file_content.encode("utf-8"))
await asyncio.to_thread(
lambda: client.put_object(
Bucket=bucket_name,
Key=object_name,
Body=file_data
)
)
async def upload_dataframe_to_minio(
bucket_name: str,
object_name: str,
df: pd.DataFrame,
client=None,
):
buffer = io.BytesIO()
df.to_csv(buffer, index=False)
await upload_text_to_minio(
bucket_name=bucket_name,
object_name=object_name,
file_content=buffer.getvalue().decode("utf-8"),
client=client
)
async def upload_document_to_minio(
bucket_name: str,
object_name: str,
document: Document,
client=None,
):
if client is None:
client = get_client()
buffer = io.BytesIO()
document.save(buffer)
buffer.seek(0)
await asyncio.to_thread(
lambda: client.put_object(
Bucket=bucket_name,
Key=object_name,
Body=buffer,
ContentType="application/vnd.openxmlformats-officedocument.wordprocessingml.document"
)
)
async def get_file_from_minio(
bucket_name: str,
object_name: str,
client=None,
):
if client is None:
client = get_client()
try:
response = await asyncio.to_thread(
lambda: client.get_object(Bucket=bucket_name, Key=object_name)
)
return response["Body"].read()
except Exception as e:
raise Exception(f"Error getting file from minio: {e}")
async def get_dataframe_from_minio(
bucket_name: str,
object_name: str,
client=None,
):
file_data = await get_file_from_minio(
bucket_name=bucket_name,
object_name=object_name,
client=client
)
if object_name.endswith(".csv"):
df = pd.read_csv(io.BytesIO(file_data))
elif object_name.endswith(".xlsx") or object_name.endswith(".xls"):
df = pd.read_excel(io.BytesIO(file_data))
return df