Spaces:
Sleeping
Sleeping
| # dataset_utils.py | |
| # This utility manages all read/write operations to your persistent HF Dataset. | |
| # Both counselor.py and your training scripts will use this. | |
| import json | |
| import time | |
| import os | |
| import glob | |
| import logging | |
| from pathlib import Path | |
| from typing import Dict, Any, List, Optional | |
| from huggingface_hub import HfApi, hf_hub_download | |
| logger = logging.getLogger(__name__) | |
| # --- CONFIGURATION --- | |
| # !! REPLACE "Sachin21112004" with your username !! | |
| DATASET_REPO_ID = os.getenv("HF_DATASET_REPO_ID", "Sachin21112004/DreamFlow-AI-Data") | |
| EXAMPLES_FILENAME = "fine_tune_examples.jsonl" | |
| LOGS_FILENAME = "fine_tune_logs.jsonl" | |
| # Local temp paths | |
| LOCAL_EXAMPLES_PATH = Path(f"./{EXAMPLES_FILENAME}") | |
| LOCAL_LOGS_PATH = Path(f"./{LOGS_FILENAME}") | |
| # --------------------- | |
| def _get_hf_token(): | |
| """Reads the HF write token from environment secrets.""" | |
| return os.environ.get("HF_WRITE_TOKEN") | |
| def _download_from_hub(filename: str, local_path: Path) -> bool: | |
| """Downloads a file from the dataset, returns True on success.""" | |
| token = _get_hf_token() | |
| try: | |
| hf_hub_download( | |
| repo_id=DATASET_REPO_ID, | |
| filename=filename, | |
| repo_type="dataset", | |
| local_dir=".", | |
| token=token, | |
| force_filename=filename | |
| ) | |
| return True | |
| except Exception as e: | |
| # This is common if the file doesn't exist yet | |
| logger.info(f"Could not download {filename} from Hub (may not exist yet): {e}") | |
| return False | |
| def _upload_to_hub(local_path: Path, path_in_repo: str): | |
| """Uploads a local file to the dataset repo.""" | |
| token = _get_hf_token() | |
| try: | |
| api = HfApi() | |
| api.upload_file( | |
| path_or_fileobj=local_path, | |
| path_in_repo=path_in_repo, | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| token=token | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to upload {local_path} to Hub: {e}") | |
| # --- API for Training Examples (fine_tune_examples.jsonl) --- | |
| def persist_fine_tune_example(text: str, label: str) -> None: | |
| """ | |
| Appends a single training example and uploads to the HF Dataset. | |
| """ | |
| try: | |
| # 1. Append the new line to the *local* file | |
| line = json.dumps({"text": text, "label": label}, ensure_ascii=False) | |
| with open(LOCAL_EXAMPLES_PATH, "a", encoding="utf-8") as f: | |
| f.write(line + "\n") | |
| # 2. Upload the *entire* file back to the dataset repo | |
| _upload_to_hub(LOCAL_EXAMPLES_PATH, EXAMPLES_FILENAME) | |
| except Exception as e: | |
| logger.debug(f"Failed to persist fine-tune example: {e}") | |
| def load_fine_tune_examples() -> List[Dict[str, str]]: | |
| """ | |
| Downloads the latest examples file from the HF Dataset and loads it. | |
| """ | |
| # 1. Download the latest file | |
| if not _download_from_hub(EXAMPLES_FILENAME, LOCAL_EXAMPLES_PATH): | |
| return [] # Download failed, return empty list | |
| # 2. Load from the file you just downloaded | |
| try: | |
| if not LOCAL_EXAMPLES_PATH.exists(): | |
| return [] | |
| with open(LOCAL_EXAMPLES_PATH, "r", encoding="utf-8") as f: | |
| lines = [json.loads(l) for l in f if l.strip()] | |
| return lines | |
| except Exception as e: | |
| logger.error(f"Failed to read local examples file {LOCAL_EXAMPLES_PATH}: {e}") | |
| return [] | |
| def clear_fine_tune_examples(archive: bool = True): | |
| """ | |
| Archives the examples file in the dataset repo after training. | |
| """ | |
| api = HfApi() | |
| token = _get_hf_token() | |
| try: | |
| if archive: | |
| ts = int(time.time()) | |
| archive_path = f"archive/examples/fine_tune_examples.{ts}.jsonl" | |
| api.rename_file( | |
| from_path=EXAMPLES_FILENAME, | |
| to_path=archive_path, | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| token=token | |
| ) | |
| else: | |
| api.delete_file( | |
| path_in_repo=EXAMPLES_FILENAME, | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| token=token | |
| ) | |
| # Delete all local copies | |
| for f in glob.glob(f"./{EXAMPLES_FILENAME}*"): | |
| try: | |
| os.remove(f) | |
| except Exception: | |
| pass | |
| logger.info("Archived examples file in dataset repo.") | |
| except Exception as e: | |
| logger.debug(f"Failed to clear/archive examples in Hub (non-fatal): {e}") | |
| # --- API for Run Logs (fine_tune_logs.jsonl) --- | |
| def append_fine_tune_log(entry: Dict[str, Any]) -> None: | |
| """ | |
| Appends a single log entry and uploads to the HF Dataset. | |
| """ | |
| try: | |
| # 1. Download the *current* log file first | |
| _download_from_hub(LOGS_FILENAME, LOCAL_LOGS_PATH) | |
| # 2. Append the new line to the *local* file | |
| line = json.dumps(entry, ensure_ascii=False) | |
| with open(LOCAL_LOGS_PATH, "a", encoding="utf-8") as f: | |
| f.write(line + "\n") | |
| # 3. Upload the *entire* file back to the dataset repo | |
| _upload_to_hub(LOCAL_LOGS_PATH, LOGS_FILENAME) | |
| except Exception as e: | |
| logger.debug(f"Failed to persist fine-tune log: {e}") | |
| def load_fine_tune_logs(limit: Optional[int] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Downloads the latest log file from the HF Dataset and loads it. | |
| Returns list, most-recent-first if limit is set. | |
| """ | |
| # 1. Download the latest file | |
| if not _download_from_hub(LOGS_FILENAME, LOCAL_LOGS_PATH): | |
| return [] # Download failed, return empty list | |
| # 2. Load from the file | |
| out = [] | |
| try: | |
| if not LOCAL_LOGS_PATH.exists(): | |
| return [] | |
| with open(LOCAL_LOGS_PATH, "r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| out.append(json.loads(line)) | |
| except Exception: | |
| continue | |
| if limit and len(out) >= limit: | |
| break | |
| except Exception as e: | |
| logger.error(f"Failed to read local logs file {LOCAL_LOGS_PATH}: {e}") | |
| return out |