| import os
|
| import base64
|
| import json
|
| import io
|
| import datetime
|
| from PIL import Image
|
| import logging
|
| from huggingface_hub import HfApi, CommitOperationAdd
|
| import numpy as np
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
| HF_DATASET_NAME = "aiwithoutborders-xyz/degentic_rd0"
|
| LOCAL_LOG_DIR = "./hf_inference_logs"
|
|
|
|
|
| class NumpyEncoder(json.JSONEncoder):
|
| def default(self, obj):
|
| if isinstance(obj, np.float32):
|
| return float(obj)
|
| return json.JSONEncoder.default(self, obj)
|
|
|
| def _pil_to_base64(image: Image.Image) -> str:
|
| """Converts a PIL Image to a base64 string."""
|
|
|
| if not isinstance(image, Image.Image):
|
| raise TypeError(f"Expected a PIL Image, but received type: {type(image)}")
|
|
|
| buffered = io.BytesIO()
|
|
|
| if image.mode != 'RGB':
|
| image = image.convert('RGB')
|
| image.save(buffered, format="JPEG", quality=85)
|
| return base64.b64encode(buffered.getvalue()).decode('utf-8')
|
|
|
|
|
|
|
| def initialize_dataset_repo():
|
| """Initializes or ensures the Hugging Face dataset repository exists."""
|
| api = HfApi(token=os.getenv("HF_TOKEN"))
|
| try:
|
| api.repo_info(repo_id=HF_DATASET_NAME, repo_type="dataset")
|
| logger.info(f"Hugging Face dataset repository already exists: {HF_DATASET_NAME}")
|
| except Exception:
|
| logger.info(f"Creating new Hugging Face dataset repository: {HF_DATASET_NAME}")
|
| api.create_repo(repo_id=HF_DATASET_NAME, repo_type="dataset", private=True)
|
| return api
|
|
|
| def log_inference_data(
|
| original_image: Image.Image,
|
| inference_params: dict,
|
| model_predictions: list[dict],
|
| ensemble_output: dict,
|
| forensic_images: list[Image.Image],
|
| agent_monitoring_data: dict,
|
| human_feedback: dict = None
|
| ):
|
| """Logs a single inference event by uploading a JSON file to the Hugging Face dataset repository."""
|
| try:
|
| api = initialize_dataset_repo()
|
|
|
| original_image_b64 = _pil_to_base64(original_image)
|
|
|
| forensic_images_b64 = []
|
| for img_item in forensic_images:
|
| if img_item is not None:
|
| if not isinstance(img_item, Image.Image):
|
| try:
|
| img_item = Image.fromarray(img_item)
|
| except Exception as e:
|
| logger.error(f"Error converting forensic image to PIL for base64 encoding: {e}")
|
| continue
|
| forensic_images_b64.append(_pil_to_base64(img_item))
|
|
|
| new_entry = {
|
| "timestamp": datetime.datetime.now().isoformat(),
|
| "image": original_image_b64,
|
| "inference_request": inference_params,
|
| "model_predictions": model_predictions,
|
| "ensemble_output": ensemble_output,
|
| "forensic_outputs": forensic_images_b64,
|
| "agent_monitoring_data": agent_monitoring_data,
|
| "human_feedback": human_feedback if human_feedback is not None else {}
|
| }
|
|
|
|
|
| os.makedirs(LOCAL_LOG_DIR, exist_ok=True)
|
| timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S%f")
|
| log_file_path = os.path.join(LOCAL_LOG_DIR, f"log_{timestamp_str}.json")
|
|
|
|
|
| with open(log_file_path, 'w', encoding='utf-8') as f:
|
| json.dump(new_entry, f, cls=NumpyEncoder, indent=2)
|
|
|
| logger.info(f"Inference data logged successfully to local file: {log_file_path}")
|
|
|
| except Exception as e:
|
| logger.error(f"Failed to log inference data to local file: {e}") |