import boto3 import io import json import asyncio import pandas as pd from docx import Document from loguru import logger from entities.task import Task, task_factory BUCKET_NAME = "ai-scientist" r2_endpoint = "https://468d92a3c903c841bc2de3b413e45072.r2.cloudflarestorage.com/" TOKEN = "KhGGD1ZJI_YTlLaZ0nSMfBJSLnOhgYN6cwq1De7G" R2_ACCESS_KEY_ID = "b9bc4becece838742ae1dc161be92de3" R2_SECRET_ACCESS_KEY = "f68eb82bd1c00528f26c6ac9b57d737fe0e4729ac7c429030fbc22a17dc8f105" def get_client(): return boto3.client( "s3", endpoint_url=r2_endpoint, aws_access_key_id=R2_ACCESS_KEY_ID, aws_secret_access_key=R2_SECRET_ACCESS_KEY, region_name="auto" # R2 需要设置为 auto ) async def get_task_from_minio( uuid: str, customer_name: str, client=None ) -> Task: if client is None: client = get_client() response = await asyncio.to_thread( lambda: client.list_objects_v2( Bucket=BUCKET_NAME, Prefix=f"{customer_name}/" ) ) objects = response.get("Contents", []) if not objects: raise FileNotFoundError(f"No task found for customer {customer_name}") object_names = [obj["Key"].split("/")[1] for obj in objects] if uuid not in object_names: raise FileNotFoundError(f"No task found for customer {customer_name} with uuid {uuid}") json_file = await get_file_from_minio( bucket_name=BUCKET_NAME, object_name=f"{customer_name}/{uuid}/task.json", client=client ) json_data = json_file.decode("utf-8") json_data = json.loads(json_data) return task_factory[json_data["task_type"]].load_from_json(json_data) async def get_all_tasks_from_minio( customer_name: str, client=None ) -> list[Task]: if client is None: client = get_client() response = await asyncio.to_thread( lambda: client.list_objects_v2( Bucket=BUCKET_NAME, Prefix=f"{customer_name}/" ) ) objects = response.get("Contents", []) if not objects: return [] task_ids = list(set([obj["Key"].split("/")[1] for obj in objects])) task_jsons = await asyncio.gather( *(get_task_from_minio(uuid=task_id, customer_name=customer_name, client=client) for task_id in task_ids) ) return task_jsons async def upload_task_json_to_minio(task: Task, client=None) -> Task: if client is None: client = get_client() json_data = task.save_to_json() byte_data = io.BytesIO(json_data.encode("utf-8")) await asyncio.to_thread( lambda: client.put_object( Bucket=BUCKET_NAME, Key=f"{task.customer_name}/{task.uuid}/task.json", Body=byte_data, ContentType="application/json" ) ) return task async def upload_text_to_minio( bucket_name: str, object_name: str, file_content: str, client=None, ): if client is None: client = get_client() file_data = io.BytesIO(file_content.encode("utf-8")) await asyncio.to_thread( lambda: client.put_object( Bucket=bucket_name, Key=object_name, Body=file_data ) ) async def upload_dataframe_to_minio( bucket_name: str, object_name: str, df: pd.DataFrame, client=None, ): buffer = io.BytesIO() df.to_csv(buffer, index=False) await upload_text_to_minio( bucket_name=bucket_name, object_name=object_name, file_content=buffer.getvalue().decode("utf-8"), client=client ) async def upload_document_to_minio( bucket_name: str, object_name: str, document: Document, client=None, ): if client is None: client = get_client() buffer = io.BytesIO() document.save(buffer) buffer.seek(0) await asyncio.to_thread( lambda: client.put_object( Bucket=bucket_name, Key=object_name, Body=buffer, ContentType="application/vnd.openxmlformats-officedocument.wordprocessingml.document" ) ) async def get_file_from_minio( bucket_name: str, object_name: str, client=None, ): if client is None: client = get_client() try: response = await asyncio.to_thread( lambda: client.get_object(Bucket=bucket_name, Key=object_name) ) return response["Body"].read() except Exception as e: raise Exception(f"Error getting file from minio: {e}") async def get_dataframe_from_minio( bucket_name: str, object_name: str, client=None, ): file_data = await get_file_from_minio( bucket_name=bucket_name, object_name=object_name, client=client ) if object_name.endswith(".csv"): df = pd.read_csv(io.BytesIO(file_data)) elif object_name.endswith(".xlsx") or object_name.endswith(".xls"): df = pd.read_excel(io.BytesIO(file_data)) return df