Spaces:
Running
Running
| """ | |
| Fine-Tuning Dataset Builder for Janus. | |
| Combines conversation data, self-reflection insights, and HF dataset extracts | |
| into instruction-tuning pairs ready for model fine-tuning. | |
| """ | |
| import json | |
| import time | |
| import logging | |
| from pathlib import Path | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime, timezone | |
| from app.config import DATA_DIR | |
| logger = logging.getLogger(__name__) | |
| FINE_TUNING_DIR = DATA_DIR / "fine_tuning" | |
| FINE_TUNING_DIR.mkdir(parents=True, exist_ok=True) | |
| TRAINING_FILE = FINE_TUNING_DIR / "training_data.jsonl" | |
| METADATA_FILE = FINE_TUNING_DIR / "metadata.json" | |
| class FineTuningBuilder: | |
| """ | |
| Builds instruction-tuning dataset from: | |
| 1. Conversation data (user input + system response) | |
| 2. Self-reflection insights (corrections, opinions) | |
| 3. HF dataset extracts (facts, patterns) | |
| """ | |
| def __init__(self): | |
| self._metadata = self._load_metadata() | |
| def _load_metadata(self) -> Dict: | |
| if METADATA_FILE.exists(): | |
| try: | |
| with open(METADATA_FILE) as f: | |
| return json.load(f) | |
| except Exception: | |
| pass | |
| return { | |
| "total_pairs": 0, | |
| "sources": {}, | |
| "last_updated": None, | |
| "quality_threshold": 0.6, | |
| } | |
| def _save_metadata(self): | |
| try: | |
| self._metadata["last_updated"] = datetime.now(timezone.utc).isoformat() | |
| with open(METADATA_FILE, "w") as f: | |
| json.dump(self._metadata, f, indent=2) | |
| except Exception as e: | |
| logger.error(f"Failed to save metadata: {e}") | |
| def add_conversation_pair( | |
| self, | |
| user_input: str, | |
| response: str, | |
| confidence: float, | |
| sources: List[str] = None, | |
| ): | |
| """ | |
| Add a conversation as an instruction-tuning pair. | |
| Only adds if confidence is above threshold. | |
| """ | |
| if confidence < self._metadata.get("quality_threshold", 0.6): | |
| return False | |
| pair = { | |
| "instruction": user_input, | |
| "input": "", | |
| "output": response, | |
| "source": "conversation", | |
| "confidence": confidence, | |
| "sources": sources or [], | |
| "timestamp": time.time(), | |
| "iso_time": datetime.now(timezone.utc).isoformat(), | |
| } | |
| self._append_pair(pair) | |
| self._metadata["total_pairs"] = self._metadata.get("total_pairs", 0) + 1 | |
| self._metadata["sources"]["conversation"] = ( | |
| self._metadata["sources"].get("conversation", 0) + 1 | |
| ) | |
| self._save_metadata() | |
| return True | |
| def add_correction_pair(self, original_input: str, correction: str, topic: str): | |
| """ | |
| Add a user correction as a high-quality training pair. | |
| These are the most valuable — user-verified improvements. | |
| """ | |
| pair = { | |
| "instruction": f"Correct your previous response about: {original_input[:200]}", | |
| "input": "", | |
| "output": correction, | |
| "source": "user_correction", | |
| "confidence": 1.0, # User-verified | |
| "topic": topic, | |
| "timestamp": time.time(), | |
| "iso_time": datetime.now(timezone.utc).isoformat(), | |
| } | |
| self._append_pair(pair) | |
| self._metadata["total_pairs"] = self._metadata.get("total_pairs", 0) + 1 | |
| self._metadata["sources"]["user_correction"] = ( | |
| self._metadata["sources"].get("user_correction", 0) + 1 | |
| ) | |
| self._save_metadata() | |
| logger.info(f"Added correction pair for topic: {topic}") | |
| def add_dataset_pairs(self, pairs: List[Dict], source_dataset: str): | |
| """ | |
| Add instruction pairs extracted from a HF dataset. | |
| """ | |
| added = 0 | |
| for pair in pairs: | |
| pair["source"] = f"dataset:{source_dataset}" | |
| pair["confidence"] = pair.get("confidence", 0.7) | |
| pair["timestamp"] = time.time() | |
| pair["iso_time"] = datetime.now(timezone.utc).isoformat() | |
| if pair.get("confidence", 0) >= self._metadata.get( | |
| "quality_threshold", 0.6 | |
| ): | |
| self._append_pair(pair) | |
| added += 1 | |
| self._metadata["total_pairs"] = self._metadata.get("total_pairs", 0) + added | |
| dataset_key = f"dataset:{source_dataset}" | |
| self._metadata["sources"][dataset_key] = ( | |
| self._metadata["sources"].get(dataset_key, 0) + added | |
| ) | |
| self._save_metadata() | |
| logger.info(f"Added {added}/{len(pairs)} pairs from {source_dataset}") | |
| return added | |
| def _append_pair(self, pair: Dict): | |
| """Append a training pair to the JSONL file.""" | |
| try: | |
| with open(TRAINING_FILE, "a") as f: | |
| f.write(json.dumps(pair) + "\n") | |
| except Exception as e: | |
| logger.error(f"Failed to append training pair: {e}") | |
| def get_stats(self) -> Dict: | |
| """Get fine-tuning dataset statistics.""" | |
| total = self._metadata.get("total_pairs", 0) | |
| sources = self._metadata.get("sources", {}) | |
| # Calculate quality distribution | |
| quality_dist = {"high": 0, "medium": 0, "low": 0} | |
| if TRAINING_FILE.exists(): | |
| try: | |
| with open(TRAINING_FILE) as f: | |
| for line in f: | |
| if line.strip(): | |
| entry = json.loads(line) | |
| conf = entry.get("confidence", 0.5) | |
| if conf >= 0.8: | |
| quality_dist["high"] += 1 | |
| elif conf >= 0.6: | |
| quality_dist["medium"] += 1 | |
| else: | |
| quality_dist["low"] += 1 | |
| except Exception: | |
| pass | |
| return { | |
| "total_pairs": total, | |
| "sources": sources, | |
| "quality_distribution": quality_dist, | |
| "file_size_mb": round(TRAINING_FILE.stat().st_size / 1024 / 1024, 2) | |
| if TRAINING_FILE.exists() | |
| else 0, | |
| "last_updated": self._metadata.get("last_updated"), | |
| "ready_for_training": total >= 1000, # Minimum for meaningful fine-tuning | |
| } | |
| def export_for_training(self, min_confidence: float = 0.7) -> str: | |
| """ | |
| Export high-quality pairs for model training. | |
| Returns the path to the exported file. | |
| """ | |
| export_file = FINE_TUNING_DIR / "export_high_quality.jsonl" | |
| count = 0 | |
| try: | |
| with open(TRAINING_FILE) as fin, open(export_file, "w") as fout: | |
| for line in fin: | |
| if line.strip(): | |
| entry = json.loads(line) | |
| if entry.get("confidence", 0) >= min_confidence: | |
| # Convert to standard format | |
| training_entry = { | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": entry.get("instruction", ""), | |
| }, | |
| { | |
| "role": "assistant", | |
| "content": entry.get("output", ""), | |
| }, | |
| ] | |
| } | |
| fout.write(json.dumps(training_entry) + "\n") | |
| count += 1 | |
| logger.info(f"Exported {count} high-quality pairs to {export_file}") | |
| return str(export_file) | |
| except Exception as e: | |
| logger.error(f"Failed to export training data: {e}") | |
| return "" | |
| fine_tuning_builder = FineTuningBuilder() | |