|
|
import boto3 |
|
|
import io |
|
|
import json |
|
|
import asyncio |
|
|
import pandas as pd |
|
|
|
|
|
from docx import Document |
|
|
from loguru import logger |
|
|
from entities.task import Task, task_factory |
|
|
|
|
|
|
|
|
BUCKET_NAME = "ai-scientist" |
|
|
|
|
|
r2_endpoint = "https://468d92a3c903c841bc2de3b413e45072.r2.cloudflarestorage.com/" |
|
|
|
|
|
TOKEN = "KhGGD1ZJI_YTlLaZ0nSMfBJSLnOhgYN6cwq1De7G" |
|
|
R2_ACCESS_KEY_ID = "b9bc4becece838742ae1dc161be92de3" |
|
|
R2_SECRET_ACCESS_KEY = "f68eb82bd1c00528f26c6ac9b57d737fe0e4729ac7c429030fbc22a17dc8f105" |
|
|
|
|
|
def get_client(): |
|
|
return boto3.client( |
|
|
"s3", |
|
|
endpoint_url=r2_endpoint, |
|
|
aws_access_key_id=R2_ACCESS_KEY_ID, |
|
|
aws_secret_access_key=R2_SECRET_ACCESS_KEY, |
|
|
region_name="auto" |
|
|
) |
|
|
|
|
|
|
|
|
async def get_task_from_minio( |
|
|
uuid: str, |
|
|
customer_name: str, |
|
|
client=None |
|
|
) -> Task: |
|
|
if client is None: |
|
|
client = get_client() |
|
|
|
|
|
response = await asyncio.to_thread( |
|
|
lambda: client.list_objects_v2( |
|
|
Bucket=BUCKET_NAME, |
|
|
Prefix=f"{customer_name}/" |
|
|
) |
|
|
) |
|
|
|
|
|
objects = response.get("Contents", []) |
|
|
if not objects: |
|
|
raise FileNotFoundError(f"No task found for customer {customer_name}") |
|
|
|
|
|
object_names = [obj["Key"].split("/")[1] for obj in objects] |
|
|
if uuid not in object_names: |
|
|
raise FileNotFoundError(f"No task found for customer {customer_name} with uuid {uuid}") |
|
|
|
|
|
json_file = await get_file_from_minio( |
|
|
bucket_name=BUCKET_NAME, |
|
|
object_name=f"{customer_name}/{uuid}/task.json", |
|
|
client=client |
|
|
) |
|
|
|
|
|
json_data = json_file.decode("utf-8") |
|
|
json_data = json.loads(json_data) |
|
|
return task_factory[json_data["task_type"]].load_from_json(json_data) |
|
|
|
|
|
|
|
|
async def get_all_tasks_from_minio( |
|
|
customer_name: str, |
|
|
client=None |
|
|
) -> list[Task]: |
|
|
if client is None: |
|
|
client = get_client() |
|
|
|
|
|
response = await asyncio.to_thread( |
|
|
lambda: client.list_objects_v2( |
|
|
Bucket=BUCKET_NAME, |
|
|
Prefix=f"{customer_name}/" |
|
|
) |
|
|
) |
|
|
objects = response.get("Contents", []) |
|
|
if not objects: |
|
|
return [] |
|
|
|
|
|
task_ids = list(set([obj["Key"].split("/")[1] for obj in objects])) |
|
|
task_jsons = await asyncio.gather( |
|
|
*(get_task_from_minio(uuid=task_id, customer_name=customer_name, client=client) for task_id in task_ids) |
|
|
) |
|
|
return task_jsons |
|
|
|
|
|
|
|
|
async def upload_task_json_to_minio(task: Task, client=None) -> Task: |
|
|
if client is None: |
|
|
client = get_client() |
|
|
|
|
|
json_data = task.save_to_json() |
|
|
byte_data = io.BytesIO(json_data.encode("utf-8")) |
|
|
|
|
|
await asyncio.to_thread( |
|
|
lambda: client.put_object( |
|
|
Bucket=BUCKET_NAME, |
|
|
Key=f"{task.customer_name}/{task.uuid}/task.json", |
|
|
Body=byte_data, |
|
|
ContentType="application/json" |
|
|
) |
|
|
) |
|
|
return task |
|
|
|
|
|
|
|
|
async def upload_text_to_minio( |
|
|
bucket_name: str, |
|
|
object_name: str, |
|
|
file_content: str, |
|
|
client=None, |
|
|
): |
|
|
if client is None: |
|
|
client = get_client() |
|
|
|
|
|
file_data = io.BytesIO(file_content.encode("utf-8")) |
|
|
|
|
|
await asyncio.to_thread( |
|
|
lambda: client.put_object( |
|
|
Bucket=bucket_name, |
|
|
Key=object_name, |
|
|
Body=file_data |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
async def upload_dataframe_to_minio( |
|
|
bucket_name: str, |
|
|
object_name: str, |
|
|
df: pd.DataFrame, |
|
|
client=None, |
|
|
): |
|
|
buffer = io.BytesIO() |
|
|
df.to_csv(buffer, index=False) |
|
|
await upload_text_to_minio( |
|
|
bucket_name=bucket_name, |
|
|
object_name=object_name, |
|
|
file_content=buffer.getvalue().decode("utf-8"), |
|
|
client=client |
|
|
) |
|
|
|
|
|
|
|
|
async def upload_document_to_minio( |
|
|
bucket_name: str, |
|
|
object_name: str, |
|
|
document: Document, |
|
|
client=None, |
|
|
): |
|
|
if client is None: |
|
|
client = get_client() |
|
|
|
|
|
buffer = io.BytesIO() |
|
|
document.save(buffer) |
|
|
buffer.seek(0) |
|
|
|
|
|
await asyncio.to_thread( |
|
|
lambda: client.put_object( |
|
|
Bucket=bucket_name, |
|
|
Key=object_name, |
|
|
Body=buffer, |
|
|
ContentType="application/vnd.openxmlformats-officedocument.wordprocessingml.document" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
async def get_file_from_minio( |
|
|
bucket_name: str, |
|
|
object_name: str, |
|
|
client=None, |
|
|
): |
|
|
if client is None: |
|
|
client = get_client() |
|
|
|
|
|
try: |
|
|
response = await asyncio.to_thread( |
|
|
lambda: client.get_object(Bucket=bucket_name, Key=object_name) |
|
|
) |
|
|
return response["Body"].read() |
|
|
except Exception as e: |
|
|
raise Exception(f"Error getting file from minio: {e}") |
|
|
|
|
|
|
|
|
async def get_dataframe_from_minio( |
|
|
bucket_name: str, |
|
|
object_name: str, |
|
|
client=None, |
|
|
): |
|
|
file_data = await get_file_from_minio( |
|
|
bucket_name=bucket_name, |
|
|
object_name=object_name, |
|
|
client=client |
|
|
) |
|
|
|
|
|
if object_name.endswith(".csv"): |
|
|
df = pd.read_csv(io.BytesIO(file_data)) |
|
|
elif object_name.endswith(".xlsx") or object_name.endswith(".xls"): |
|
|
df = pd.read_excel(io.BytesIO(file_data)) |
|
|
return df |
|
|
|