DreamFlow-AI / dataset_utils.py
Sachin21112004's picture
Update dataset_utils.py
ecc185e verified
# dataset_utils.py
# This utility manages all read/write operations to your persistent HF Dataset.
# Both counselor.py and your training scripts will use this.
import json
import time
import os
import glob
import logging
from pathlib import Path
from typing import Dict, Any, List, Optional
from huggingface_hub import HfApi, hf_hub_download
logger = logging.getLogger(__name__)
# --- CONFIGURATION ---
# !! REPLACE "Sachin21112004" with your username !!
DATASET_REPO_ID = os.getenv("HF_DATASET_REPO_ID", "Sachin21112004/DreamFlow-AI-Data")
EXAMPLES_FILENAME = "fine_tune_examples.jsonl"
LOGS_FILENAME = "fine_tune_logs.jsonl"
# Local temp paths
LOCAL_EXAMPLES_PATH = Path(f"./{EXAMPLES_FILENAME}")
LOCAL_LOGS_PATH = Path(f"./{LOGS_FILENAME}")
# ---------------------
def _get_hf_token():
"""Reads the HF write token from environment secrets."""
return os.environ.get("HF_WRITE_TOKEN")
def _download_from_hub(filename: str, local_path: Path) -> bool:
"""Downloads a file from the dataset, returns True on success."""
token = _get_hf_token()
try:
hf_hub_download(
repo_id=DATASET_REPO_ID,
filename=filename,
repo_type="dataset",
local_dir=".",
token=token,
force_filename=filename
)
return True
except Exception as e:
# This is common if the file doesn't exist yet
logger.info(f"Could not download {filename} from Hub (may not exist yet): {e}")
return False
def _upload_to_hub(local_path: Path, path_in_repo: str):
"""Uploads a local file to the dataset repo."""
token = _get_hf_token()
try:
api = HfApi()
api.upload_file(
path_or_fileobj=local_path,
path_in_repo=path_in_repo,
repo_id=DATASET_REPO_ID,
repo_type="dataset",
token=token
)
except Exception as e:
logger.error(f"Failed to upload {local_path} to Hub: {e}")
# --- API for Training Examples (fine_tune_examples.jsonl) ---
def persist_fine_tune_example(text: str, label: str) -> None:
"""
Appends a single training example and uploads to the HF Dataset.
"""
try:
# 1. Append the new line to the *local* file
line = json.dumps({"text": text, "label": label}, ensure_ascii=False)
with open(LOCAL_EXAMPLES_PATH, "a", encoding="utf-8") as f:
f.write(line + "\n")
# 2. Upload the *entire* file back to the dataset repo
_upload_to_hub(LOCAL_EXAMPLES_PATH, EXAMPLES_FILENAME)
except Exception as e:
logger.debug(f"Failed to persist fine-tune example: {e}")
def load_fine_tune_examples() -> List[Dict[str, str]]:
"""
Downloads the latest examples file from the HF Dataset and loads it.
"""
# 1. Download the latest file
if not _download_from_hub(EXAMPLES_FILENAME, LOCAL_EXAMPLES_PATH):
return [] # Download failed, return empty list
# 2. Load from the file you just downloaded
try:
if not LOCAL_EXAMPLES_PATH.exists():
return []
with open(LOCAL_EXAMPLES_PATH, "r", encoding="utf-8") as f:
lines = [json.loads(l) for l in f if l.strip()]
return lines
except Exception as e:
logger.error(f"Failed to read local examples file {LOCAL_EXAMPLES_PATH}: {e}")
return []
def clear_fine_tune_examples(archive: bool = True):
"""
Archives the examples file in the dataset repo after training.
"""
api = HfApi()
token = _get_hf_token()
try:
if archive:
ts = int(time.time())
archive_path = f"archive/examples/fine_tune_examples.{ts}.jsonl"
api.rename_file(
from_path=EXAMPLES_FILENAME,
to_path=archive_path,
repo_id=DATASET_REPO_ID,
repo_type="dataset",
token=token
)
else:
api.delete_file(
path_in_repo=EXAMPLES_FILENAME,
repo_id=DATASET_REPO_ID,
repo_type="dataset",
token=token
)
# Delete all local copies
for f in glob.glob(f"./{EXAMPLES_FILENAME}*"):
try:
os.remove(f)
except Exception:
pass
logger.info("Archived examples file in dataset repo.")
except Exception as e:
logger.debug(f"Failed to clear/archive examples in Hub (non-fatal): {e}")
# --- API for Run Logs (fine_tune_logs.jsonl) ---
def append_fine_tune_log(entry: Dict[str, Any]) -> None:
"""
Appends a single log entry and uploads to the HF Dataset.
"""
try:
# 1. Download the *current* log file first
_download_from_hub(LOGS_FILENAME, LOCAL_LOGS_PATH)
# 2. Append the new line to the *local* file
line = json.dumps(entry, ensure_ascii=False)
with open(LOCAL_LOGS_PATH, "a", encoding="utf-8") as f:
f.write(line + "\n")
# 3. Upload the *entire* file back to the dataset repo
_upload_to_hub(LOCAL_LOGS_PATH, LOGS_FILENAME)
except Exception as e:
logger.debug(f"Failed to persist fine-tune log: {e}")
def load_fine_tune_logs(limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Downloads the latest log file from the HF Dataset and loads it.
Returns list, most-recent-first if limit is set.
"""
# 1. Download the latest file
if not _download_from_hub(LOGS_FILENAME, LOCAL_LOGS_PATH):
return [] # Download failed, return empty list
# 2. Load from the file
out = []
try:
if not LOCAL_LOGS_PATH.exists():
return []
with open(LOCAL_LOGS_PATH, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
out.append(json.loads(line))
except Exception:
continue
if limit and len(out) >= limit:
break
except Exception as e:
logger.error(f"Failed to read local logs file {LOCAL_LOGS_PATH}: {e}")
return out