# dataset_utils.py # This utility manages all read/write operations to your persistent HF Dataset. # Both counselor.py and your training scripts will use this. import json import time import os import glob import logging from pathlib import Path from typing import Dict, Any, List, Optional from huggingface_hub import HfApi, hf_hub_download logger = logging.getLogger(__name__) # --- CONFIGURATION --- # !! REPLACE "Sachin21112004" with your username !! DATASET_REPO_ID = os.getenv("HF_DATASET_REPO_ID", "Sachin21112004/DreamFlow-AI-Data") EXAMPLES_FILENAME = "fine_tune_examples.jsonl" LOGS_FILENAME = "fine_tune_logs.jsonl" # Local temp paths LOCAL_EXAMPLES_PATH = Path(f"./{EXAMPLES_FILENAME}") LOCAL_LOGS_PATH = Path(f"./{LOGS_FILENAME}") # --------------------- def _get_hf_token(): """Reads the HF write token from environment secrets.""" return os.environ.get("HF_WRITE_TOKEN") def _download_from_hub(filename: str, local_path: Path) -> bool: """Downloads a file from the dataset, returns True on success.""" token = _get_hf_token() try: hf_hub_download( repo_id=DATASET_REPO_ID, filename=filename, repo_type="dataset", local_dir=".", token=token, force_filename=filename ) return True except Exception as e: # This is common if the file doesn't exist yet logger.info(f"Could not download {filename} from Hub (may not exist yet): {e}") return False def _upload_to_hub(local_path: Path, path_in_repo: str): """Uploads a local file to the dataset repo.""" token = _get_hf_token() try: api = HfApi() api.upload_file( path_or_fileobj=local_path, path_in_repo=path_in_repo, repo_id=DATASET_REPO_ID, repo_type="dataset", token=token ) except Exception as e: logger.error(f"Failed to upload {local_path} to Hub: {e}") # --- API for Training Examples (fine_tune_examples.jsonl) --- def persist_fine_tune_example(text: str, label: str) -> None: """ Appends a single training example and uploads to the HF Dataset. """ try: # 1. Append the new line to the *local* file line = json.dumps({"text": text, "label": label}, ensure_ascii=False) with open(LOCAL_EXAMPLES_PATH, "a", encoding="utf-8") as f: f.write(line + "\n") # 2. Upload the *entire* file back to the dataset repo _upload_to_hub(LOCAL_EXAMPLES_PATH, EXAMPLES_FILENAME) except Exception as e: logger.debug(f"Failed to persist fine-tune example: {e}") def load_fine_tune_examples() -> List[Dict[str, str]]: """ Downloads the latest examples file from the HF Dataset and loads it. """ # 1. Download the latest file if not _download_from_hub(EXAMPLES_FILENAME, LOCAL_EXAMPLES_PATH): return [] # Download failed, return empty list # 2. Load from the file you just downloaded try: if not LOCAL_EXAMPLES_PATH.exists(): return [] with open(LOCAL_EXAMPLES_PATH, "r", encoding="utf-8") as f: lines = [json.loads(l) for l in f if l.strip()] return lines except Exception as e: logger.error(f"Failed to read local examples file {LOCAL_EXAMPLES_PATH}: {e}") return [] def clear_fine_tune_examples(archive: bool = True): """ Archives the examples file in the dataset repo after training. """ api = HfApi() token = _get_hf_token() try: if archive: ts = int(time.time()) archive_path = f"archive/examples/fine_tune_examples.{ts}.jsonl" api.rename_file( from_path=EXAMPLES_FILENAME, to_path=archive_path, repo_id=DATASET_REPO_ID, repo_type="dataset", token=token ) else: api.delete_file( path_in_repo=EXAMPLES_FILENAME, repo_id=DATASET_REPO_ID, repo_type="dataset", token=token ) # Delete all local copies for f in glob.glob(f"./{EXAMPLES_FILENAME}*"): try: os.remove(f) except Exception: pass logger.info("Archived examples file in dataset repo.") except Exception as e: logger.debug(f"Failed to clear/archive examples in Hub (non-fatal): {e}") # --- API for Run Logs (fine_tune_logs.jsonl) --- def append_fine_tune_log(entry: Dict[str, Any]) -> None: """ Appends a single log entry and uploads to the HF Dataset. """ try: # 1. Download the *current* log file first _download_from_hub(LOGS_FILENAME, LOCAL_LOGS_PATH) # 2. Append the new line to the *local* file line = json.dumps(entry, ensure_ascii=False) with open(LOCAL_LOGS_PATH, "a", encoding="utf-8") as f: f.write(line + "\n") # 3. Upload the *entire* file back to the dataset repo _upload_to_hub(LOCAL_LOGS_PATH, LOGS_FILENAME) except Exception as e: logger.debug(f"Failed to persist fine-tune log: {e}") def load_fine_tune_logs(limit: Optional[int] = None) -> List[Dict[str, Any]]: """ Downloads the latest log file from the HF Dataset and loads it. Returns list, most-recent-first if limit is set. """ # 1. Download the latest file if not _download_from_hub(LOGS_FILENAME, LOCAL_LOGS_PATH): return [] # Download failed, return empty list # 2. Load from the file out = [] try: if not LOCAL_LOGS_PATH.exists(): return [] with open(LOCAL_LOGS_PATH, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: out.append(json.loads(line)) except Exception: continue if limit and len(out) >= limit: break except Exception as e: logger.error(f"Failed to read local logs file {LOCAL_LOGS_PATH}: {e}") return out