Spaces:
Running on Zero
Running on Zero
| from __future__ import annotations | |
| import csv | |
| import datetime as dt | |
| import json | |
| import sqlite3 | |
| from contextlib import closing | |
| from dataclasses import asdict, dataclass | |
| from pathlib import Path | |
| UTC = getattr(dt, "UTC", dt.timezone.utc) # noqa: UP017 | |
| class FieldNote: | |
| """One human correction record.""" | |
| created_at: str | |
| model_id: str | |
| prompt: str | |
| response: str | |
| correction: str | |
| tags: str | |
| image_path: str = "" | |
| video_path: str = "" | |
| use_for_training: bool = True | |
| def create( | |
| cls, | |
| model_id: str, | |
| prompt: str, | |
| response: str, | |
| correction: str, | |
| tags: str, | |
| image_path: str = "", | |
| video_path: str = "", | |
| use_for_training: bool = True, | |
| ) -> FieldNote: | |
| return cls( | |
| created_at=dt.datetime.now(UTC).isoformat(), | |
| model_id=model_id, | |
| prompt=prompt, | |
| response=response, | |
| correction=correction, | |
| tags=tags, | |
| image_path=image_path, | |
| video_path=video_path, | |
| use_for_training=use_for_training, | |
| ) | |
| def from_row(cls, row: dict[str, str]) -> FieldNote: | |
| use_for_training = str(row.get("use_for_training", "true")).lower() in { | |
| "1", | |
| "true", | |
| "yes", | |
| } | |
| return cls( | |
| created_at=row["created_at"], | |
| model_id=row["model_id"], | |
| prompt=row["prompt"], | |
| response=row["response"], | |
| correction=row["correction"], | |
| tags=row["tags"], | |
| image_path=row.get("image_path", ""), | |
| video_path=row.get("video_path", ""), | |
| use_for_training=use_for_training, | |
| ) | |
| def to_dict(self) -> dict[str, object]: | |
| return asdict(self) | |
| class FieldNoteStore: | |
| """CSV-backed field note storage.""" | |
| def __init__(self, path: str | Path = "data/field_notes.csv") -> None: | |
| self.path = Path(path) | |
| def save(self, note: FieldNote) -> Path: | |
| self.path.parent.mkdir(parents=True, exist_ok=True) | |
| is_new = not self.path.exists() | |
| with self.path.open("a", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=list(note.to_dict())) | |
| if is_new: | |
| writer.writeheader() | |
| writer.writerow(note.to_dict()) | |
| return self.path | |
| def list_notes( | |
| self, | |
| corrected_only: bool = False, | |
| tag: str = "", | |
| training_only: bool = False, | |
| ) -> list[FieldNote]: | |
| if not self.path.exists(): | |
| return [] | |
| with self.path.open(newline="", encoding="utf-8") as f: | |
| rows = list(csv.DictReader(f)) | |
| notes = [FieldNote.from_row(row) for row in rows] | |
| if corrected_only: | |
| notes = [note for note in notes if note.correction.strip()] | |
| if tag: | |
| notes = [note for note in notes if tag in _split_tags(note.tags)] | |
| if training_only: | |
| notes = [note for note in notes if note.use_for_training] | |
| return notes | |
| def export_jsonl( | |
| self, | |
| output_path: str | Path = "data/field_notes.jsonl", | |
| corrected_only: bool = True, | |
| training_only: bool = True, | |
| ) -> Path: | |
| output = Path(output_path) | |
| output.parent.mkdir(parents=True, exist_ok=True) | |
| notes = self.list_notes( | |
| corrected_only=corrected_only, | |
| training_only=training_only, | |
| ) | |
| with output.open("w", encoding="utf-8") as f: | |
| for note in notes: | |
| f.write(json.dumps(note.to_dict(), ensure_ascii=False) + "\n") | |
| return output | |
| def export_hf_dataset( | |
| self, | |
| output_dir: str | Path = "data/hf_field_notes", | |
| corrected_only: bool = True, | |
| training_only: bool = True, | |
| ) -> Path: | |
| target = Path(output_dir) | |
| target.mkdir(parents=True, exist_ok=True) | |
| data_file = self.export_jsonl( | |
| target / "data.jsonl", | |
| corrected_only=corrected_only, | |
| training_only=training_only, | |
| ) | |
| (target / "README.md").write_text( | |
| "# Field Notes Dataset\n\n" | |
| "Local export generated by OpenBMB Local AI Workbench.\n\n" | |
| f"- Data file: `{data_file.name}`\n" | |
| "- Intended split: `train`\n", | |
| encoding="utf-8", | |
| ) | |
| return target | |
| class SQLiteFieldNoteStore: | |
| """SQLite-backed field note storage for larger correction loops.""" | |
| def __init__(self, path: str | Path = "data/field_notes.sqlite") -> None: | |
| self.path = Path(path) | |
| self.path.parent.mkdir(parents=True, exist_ok=True) | |
| self._init_schema() | |
| def _connect(self) -> sqlite3.Connection: | |
| return sqlite3.connect(self.path) | |
| def _init_schema(self) -> None: | |
| with closing(self._connect()) as conn: | |
| conn.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS field_notes ( | |
| created_at TEXT NOT NULL, | |
| model_id TEXT NOT NULL, | |
| prompt TEXT NOT NULL, | |
| response TEXT NOT NULL, | |
| correction TEXT NOT NULL, | |
| tags TEXT NOT NULL, | |
| image_path TEXT NOT NULL, | |
| video_path TEXT NOT NULL, | |
| use_for_training INTEGER NOT NULL | |
| ) | |
| """ | |
| ) | |
| conn.commit() | |
| def save(self, note: FieldNote) -> Path: | |
| with closing(self._connect()) as conn: | |
| conn.execute( | |
| """ | |
| INSERT INTO field_notes ( | |
| created_at, model_id, prompt, response, correction, tags, | |
| image_path, video_path, use_for_training | |
| ) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, | |
| ( | |
| note.created_at, | |
| note.model_id, | |
| note.prompt, | |
| note.response, | |
| note.correction, | |
| note.tags, | |
| note.image_path, | |
| note.video_path, | |
| int(note.use_for_training), | |
| ), | |
| ) | |
| conn.commit() | |
| return self.path | |
| def list_notes( | |
| self, | |
| corrected_only: bool = False, | |
| tag: str = "", | |
| training_only: bool = False, | |
| ) -> list[FieldNote]: | |
| with closing(self._connect()) as conn: | |
| conn.row_factory = sqlite3.Row | |
| rows = conn.execute( | |
| """ | |
| SELECT created_at, model_id, prompt, response, correction, tags, | |
| image_path, video_path, use_for_training | |
| FROM field_notes | |
| ORDER BY created_at | |
| """ | |
| ).fetchall() | |
| notes = [ | |
| FieldNote( | |
| created_at=str(row["created_at"]), | |
| model_id=str(row["model_id"]), | |
| prompt=str(row["prompt"]), | |
| response=str(row["response"]), | |
| correction=str(row["correction"]), | |
| tags=str(row["tags"]), | |
| image_path=str(row["image_path"]), | |
| video_path=str(row["video_path"]), | |
| use_for_training=bool(row["use_for_training"]), | |
| ) | |
| for row in rows | |
| ] | |
| if corrected_only: | |
| notes = [note for note in notes if note.correction.strip()] | |
| if tag: | |
| notes = [note for note in notes if tag in _split_tags(note.tags)] | |
| if training_only: | |
| notes = [note for note in notes if note.use_for_training] | |
| return notes | |
| def _split_tags(tags: str) -> set[str]: | |
| return {tag.strip() for tag in tags.split(",") if tag.strip()} | |