Sachin21112004 commited on
Commit
b3303ff
·
verified ·
1 Parent(s): 4404e67

Upload 2 files

Browse files
Files changed (2) hide show
  1. dataset_utils.py +186 -0
  2. fine_tune_logger.py +119 -0
dataset_utils.py ADDED
@@ -0,0 +1,186 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # dataset_utils.py
2
+ # NEW FILE
3
+ # This utility manages all read/write operations to your persistent HF Dataset.
4
+ # Both counselor.py and your training scripts will use this.
5
+
6
+ import json
7
+ import time
8
+ import os
9
+ import glob
10
+ import logging
11
+ from pathlib import Path
12
+ from typing import Dict, Any, List, Optional
13
+ from huggingface_hub import HfApi, hf_hub_download
14
+
15
+ logger = logging.getLogger(__name__)
16
+
17
+ # --- CONFIGURATION ---
18
+ # !! REPLACE "Sachin21112004" with your username !!
19
+ DATASET_REPO_ID = os.getenv("HF_DATASET_REPO_ID", "Sachin21112004/DreamFlow-AI-Data")
20
+ EXAMPLES_FILENAME = "fine_tune_examples.jsonl"
21
+ LOGS_FILENAME = "fine_tune_logs.jsonl"
22
+
23
+ # Local temp paths
24
+ LOCAL_EXAMPLES_PATH = Path(f"./{EXAMPLES_FILENAME}")
25
+ LOCAL_LOGS_PATH = Path(f"./{LOGS_FILENAME}")
26
+ # ---------------------
27
+
28
+ def _get_hf_token():
29
+ """Reads the HF write token from environment secrets."""
30
+ return os.environ.get("HF_WRITE_TOKEN")
31
+
32
+ def _download_from_hub(filename: str, local_path: Path) -> bool:
33
+ """Downloads a file from the dataset, returns True on success."""
34
+ token = _get_hf_token()
35
+ try:
36
+ hf_hub_download(
37
+ repo_id=DATASET_REPO_ID,
38
+ filename=filename,
39
+ repo_type="dataset",
40
+ local_dir=".",
41
+ token=token,
42
+ force_filename=filename
43
+ )
44
+ return True
45
+ except Exception as e:
46
+ # This is common if the file doesn't exist yet
47
+ logger.info(f"Could not download {filename} from Hub (may not exist yet): {e}")
48
+ return False
49
+
50
+ def _upload_to_hub(local_path: Path, path_in_repo: str):
51
+ """Uploads a local file to the dataset repo."""
52
+ token = _get_hf_token()
53
+ try:
54
+ api = HfApi()
55
+ api.upload_file(
56
+ path_or_fileobj=local_path,
57
+ path_in_repo=path_in_repo,
58
+ repo_id=DATASET_REPO_ID,
59
+ repo_type="dataset",
60
+ token=token
61
+ )
62
+ except Exception as e:
63
+ logger.error(f"Failed to upload {local_path} to Hub: {e}")
64
+
65
+ # --- API for Training Examples (fine_tune_examples.jsonl) ---
66
+
67
+ def persist_fine_tune_example(text: str, label: str) -> None:
68
+ """
69
+ Appends a single training example and uploads to the HF Dataset.
70
+ """
71
+ try:
72
+ # 1. Append the new line to the *local* file
73
+ line = json.dumps({"text": text, "label": label}, ensure_ascii=False)
74
+ with open(LOCAL_EXAMPLES_PATH, "a", encoding="utf-8") as f:
75
+ f.write(line + "\n")
76
+
77
+ # 2. Upload the *entire* file back to the dataset repo
78
+ _upload_to_hub(LOCAL_EXAMPLES_PATH, EXAMPLES_FILENAME)
79
+ except Exception as e:
80
+ logger.debug(f"Failed to persist fine-tune example: {e}")
81
+
82
+ def load_fine_tune_examples() -> List[Dict[str, str]]:
83
+ """
84
+ Downloads the latest examples file from the HF Dataset and loads it.
85
+ """
86
+ # 1. Download the latest file
87
+ if not _download_from_hub(EXAMPLES_FILENAME, LOCAL_EXAMPLES_PATH):
88
+ return [] # Download failed, return empty list
89
+
90
+ # 2. Load from the file you just downloaded
91
+ try:
92
+ if not LOCAL_EXAMPLES_PATH.exists():
93
+ return []
94
+
95
+ with open(LOCAL_EXAMPLES_PATH, "r", encoding="utf-8") as f:
96
+ lines = [json.loads(l) for l in f if l.strip()]
97
+ return lines
98
+ except Exception as e:
99
+ logger.error(f"Failed to read local examples file {LOCAL_EXAMPLES_PATH}: {e}")
100
+ return []
101
+
102
+ def clear_fine_tune_examples(archive: bool = True):
103
+ """
104
+ Archives the examples file in the dataset repo after training.
105
+ """
106
+ api = HfApi()
107
+ token = _get_hf_token()
108
+ try:
109
+ if archive:
110
+ ts = int(time.time())
111
+ archive_path = f"archive/examples/fine_tune_examples.{ts}.jsonl"
112
+ api.rename_file(
113
+ from_path=EXAMPLES_FILENAME,
114
+ to_path=archive_path,
115
+ repo_id=DATASET_REPO_ID,
116
+ repo_type="dataset",
117
+ token=token
118
+ )
119
+ else:
120
+ api.delete_file(
121
+ path_in_repo=EXAMPLES_FILENAME,
122
+ repo_id=DATASET_REPO_ID,
123
+ repo_type="dataset",
124
+ token=token
125
+ )
126
+
127
+ # Delete all local copies
128
+ for f in glob.glob(f"./{EXAMPLES_FILENAME}*"):
129
+ try:
130
+ os.remove(f)
131
+ except Exception:
132
+ pass
133
+ logger.info("Archived examples file in dataset repo.")
134
+ except Exception as e:
135
+ logger.debug(f"Failed to clear/archive examples in Hub (non-fatal): {e}")
136
+
137
+ # --- API for Run Logs (fine_tune_logs.jsonl) ---
138
+
139
+ def append_fine_tune_log(entry: Dict[str, Any]) -> None:
140
+ """
141
+ Appends a single log entry and uploads to the HF Dataset.
142
+ """
143
+ try:
144
+ # 1. Download the *current* log file first
145
+ _download_from_hub(LOGS_FILENAME, LOCAL_LOGS_PATH)
146
+
147
+ # 2. Append the new line to the *local* file
148
+ line = json.dumps(entry, ensure_ascii=False)
149
+ with open(LOCAL_LOGS_PATH, "a", encoding="utf-8") as f:
150
+ f.write(line + "\n")
151
+
152
+ # 3. Upload the *entire* file back to the dataset repo
153
+ _upload_to_hub(LOCAL_LOGS_PATH, LOGS_FILENAME)
154
+ except Exception as e:
155
+ logger.debug(f"Failed to persist fine-tune log: {e}")
156
+
157
+ def load_fine_tune_logs(limit: Optional[int] = None) -> List[Dict[str, Any]]:
158
+ """
159
+ Downloads the latest log file from the HF Dataset and loads it.
160
+ Returns list, most-recent-first if limit is set.
161
+ """
162
+ # 1. Download the latest file
163
+ if not _download_from_hub(LOGS_FILENAME, LOCAL_LOGS_PATH):
164
+ return [] # Download failed, return empty list
165
+
166
+ # 2. Load from the file
167
+ out = []
168
+ try:
169
+ if not LOCAL_LOGS_PATH.exists():
170
+ return []
171
+
172
+ with open(LOCAL_LOGS_PATH, "r", encoding="utf-8") as f:
173
+ for line in f:
174
+ line = line.strip()
175
+ if not line:
176
+ continue
177
+ try:
178
+ out.append(json.loads(line))
179
+ except Exception:
180
+ continue
181
+ if limit and len(out) >= limit:
182
+ break
183
+ except Exception as e:
184
+ logger.error(f"Failed to read local logs file {LOCAL_LOGS_PATH}: {e}")
185
+
186
+ return out
fine_tune_logger.py ADDED
@@ -0,0 +1,119 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # fine_tune_logger.py
2
+ # MODIFIED
3
+ # This file now imports from dataset_utils to provide a consistent logging API
4
+ # while ensuring logs are persisted to your HF Dataset.
5
+
6
+ import json
7
+ import time
8
+ import uuid
9
+ from typing import Dict, Any, Iterable, List, Optional
10
+ import hashlib
11
+
12
+ # Import the new persistent logging functions
13
+ import dataset_utils
14
+
15
+ # --- Helpers (unchanged from your original) ---
16
+ def _short_hash(s: str, length: int = 10) -> str:
17
+ try:
18
+ return hashlib.sha256(s.encode("utf-8")).hexdigest()[:length]
19
+ except Exception:
20
+ return ""
21
+
22
+ def _truncate_text(s: Optional[str], max_len: int = 200) -> Optional[str]:
23
+ if s is None:
24
+ return None
25
+ s = s.strip()
26
+ if len(s) <= max_len:
27
+ return s
28
+ return s[:max_len-3] + "..."
29
+
30
+ def _sanitize_examples(examples: Iterable[str], sample_count: int = 5, max_snippet_len: int = 160) -> Dict[str, Any]:
31
+ # ... (this function is unchanged) ...
32
+ ex_list = list(examples)
33
+ total = len(ex_list)
34
+ sample = ex_list[:sample_count]
35
+ sanitized = []
36
+ hashes = []
37
+ for t in sample:
38
+ txt = t if t is not None else ""
39
+ sanitized.append(_truncate_text(txt, max_snippet_len))
40
+ hashes.append(_short_hash(txt))
41
+ return {"total": total, "sample_snippets": sanitized, "sample_hashes": hashes}
42
+
43
+ # --- Public API (MODIFIED) ---
44
+
45
+ def append_fine_tune_log(
46
+ model_dir: str,
47
+ label_map: Dict[str, int],
48
+ examples: Iterable[str],
49
+ label_counts: Dict[str, int],
50
+ train_args: Dict[str, Any],
51
+ metrics: Dict[str, Any],
52
+ pushed_to_hub: bool = False,
53
+ hub_repo: Optional[str] = None,
54
+ commit_sha: Optional[str] = None,
55
+ created_by: Optional[str] = None,
56
+ extra: Optional[Dict[str, Any]] = None
57
+ ) -> Dict[str, Any]:
58
+ """
59
+ Creates a structured log entry and appends it to the persistent log
60
+ in the HF Dataset.
61
+ Returns the log dict written.
62
+ """
63
+ run_id = str(uuid.uuid4())
64
+ ts = int(time.time())
65
+ sample_info = _sanitize_examples(examples, sample_count=5, max_snippet_len=200)
66
+
67
+ entry = {
68
+ "run_id": run_id,
69
+ "timestamp_utc": ts,
70
+ "model_dir": str(model_dir),
71
+ "label_map": label_map,
72
+ "label_counts": label_counts,
73
+ "total_examples": sample_info["total"],
74
+ "examples_sample_snippets": sample_info["sample_snippets"],
75
+ "examples_sample_hashes": sample_info["sample_hashes"],
76
+ "train_args": train_args,
77
+ "metrics": metrics,
78
+ "pushed_to_hub": bool(pushed_to_hub),
79
+ "hub_repo": hub_repo,
80
+ "commit_sha": commit_sha,
81
+ "created_by": created_by,
82
+ "extra": extra or {}
83
+ }
84
+
85
+ # Write to the persistent HF Dataset
86
+ try:
87
+ dataset_utils.append_fine_tune_log(entry)
88
+ except Exception as e:
89
+ # Fallback for logging is difficult, just log to console
90
+ print(f"CRITICAL: Failed to write log to HF Dataset: {e}")
91
+
92
+ return entry
93
+
94
+ def load_fine_tune_logs(limit: Optional[int] = None) -> List[Dict[str, Any]]:
95
+ """
96
+ Load logs from the persistent HF Dataset.
97
+ Returns list, most-recent-first if limit set.
98
+ """
99
+ # This function now calls the central utility
100
+ return dataset_utils.load_fine_tune_logs(limit=limit)
101
+
102
+ def summarize_logs(max_runs: int = 10) -> Dict[str, Any]:
103
+ """Return a compact summary of recent runs. (Unchanged)"""
104
+ logs = load_fine_tune_logs(limit=max_runs)
105
+ # reverse to show newest first
106
+ logs = logs[::-1]
107
+ summary = {"runs": [], "total_runs": len(logs)}
108
+ for l in logs:
109
+ summary["runs"].append({
110
+ "run_id": l.get("run_id"),
111
+ "timestamp_utc": l.get("timestamp_utc"),
112
+ "model_dir": l.get("model_dir"),
113
+ "total_examples": l.get("total_examples"),
114
+ "label_counts": l.get("label_counts"),
115
+ "metrics": l.get("metrics"),
116
+ "pushed_to_hub": l.get("pushed_to_hub"),
117
+ "hub_repo": l.get("hub_repo")
118
+ })
119
+ return summary