from __future__ import annotations import csv import datetime as dt import json import sqlite3 from contextlib import closing from dataclasses import asdict, dataclass from pathlib import Path UTC = getattr(dt, "UTC", dt.timezone.utc) # noqa: UP017 @dataclass class FieldNote: """One human correction record.""" created_at: str model_id: str prompt: str response: str correction: str tags: str image_path: str = "" video_path: str = "" use_for_training: bool = True @classmethod def create( cls, model_id: str, prompt: str, response: str, correction: str, tags: str, image_path: str = "", video_path: str = "", use_for_training: bool = True, ) -> FieldNote: return cls( created_at=dt.datetime.now(UTC).isoformat(), model_id=model_id, prompt=prompt, response=response, correction=correction, tags=tags, image_path=image_path, video_path=video_path, use_for_training=use_for_training, ) @classmethod def from_row(cls, row: dict[str, str]) -> FieldNote: use_for_training = str(row.get("use_for_training", "true")).lower() in { "1", "true", "yes", } return cls( created_at=row["created_at"], model_id=row["model_id"], prompt=row["prompt"], response=row["response"], correction=row["correction"], tags=row["tags"], image_path=row.get("image_path", ""), video_path=row.get("video_path", ""), use_for_training=use_for_training, ) def to_dict(self) -> dict[str, object]: return asdict(self) class FieldNoteStore: """CSV-backed field note storage.""" def __init__(self, path: str | Path = "data/field_notes.csv") -> None: self.path = Path(path) def save(self, note: FieldNote) -> Path: self.path.parent.mkdir(parents=True, exist_ok=True) is_new = not self.path.exists() with self.path.open("a", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=list(note.to_dict())) if is_new: writer.writeheader() writer.writerow(note.to_dict()) return self.path def list_notes( self, corrected_only: bool = False, tag: str = "", training_only: bool = False, ) -> list[FieldNote]: if not self.path.exists(): return [] with self.path.open(newline="", encoding="utf-8") as f: rows = list(csv.DictReader(f)) notes = [FieldNote.from_row(row) for row in rows] if corrected_only: notes = [note for note in notes if note.correction.strip()] if tag: notes = [note for note in notes if tag in _split_tags(note.tags)] if training_only: notes = [note for note in notes if note.use_for_training] return notes def export_jsonl( self, output_path: str | Path = "data/field_notes.jsonl", corrected_only: bool = True, training_only: bool = True, ) -> Path: output = Path(output_path) output.parent.mkdir(parents=True, exist_ok=True) notes = self.list_notes( corrected_only=corrected_only, training_only=training_only, ) with output.open("w", encoding="utf-8") as f: for note in notes: f.write(json.dumps(note.to_dict(), ensure_ascii=False) + "\n") return output def export_hf_dataset( self, output_dir: str | Path = "data/hf_field_notes", corrected_only: bool = True, training_only: bool = True, ) -> Path: target = Path(output_dir) target.mkdir(parents=True, exist_ok=True) data_file = self.export_jsonl( target / "data.jsonl", corrected_only=corrected_only, training_only=training_only, ) (target / "README.md").write_text( "# Field Notes Dataset\n\n" "Local export generated by OpenBMB Local AI Workbench.\n\n" f"- Data file: `{data_file.name}`\n" "- Intended split: `train`\n", encoding="utf-8", ) return target class SQLiteFieldNoteStore: """SQLite-backed field note storage for larger correction loops.""" def __init__(self, path: str | Path = "data/field_notes.sqlite") -> None: self.path = Path(path) self.path.parent.mkdir(parents=True, exist_ok=True) self._init_schema() def _connect(self) -> sqlite3.Connection: return sqlite3.connect(self.path) def _init_schema(self) -> None: with closing(self._connect()) as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS field_notes ( created_at TEXT NOT NULL, model_id TEXT NOT NULL, prompt TEXT NOT NULL, response TEXT NOT NULL, correction TEXT NOT NULL, tags TEXT NOT NULL, image_path TEXT NOT NULL, video_path TEXT NOT NULL, use_for_training INTEGER NOT NULL ) """ ) conn.commit() def save(self, note: FieldNote) -> Path: with closing(self._connect()) as conn: conn.execute( """ INSERT INTO field_notes ( created_at, model_id, prompt, response, correction, tags, image_path, video_path, use_for_training ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( note.created_at, note.model_id, note.prompt, note.response, note.correction, note.tags, note.image_path, note.video_path, int(note.use_for_training), ), ) conn.commit() return self.path def list_notes( self, corrected_only: bool = False, tag: str = "", training_only: bool = False, ) -> list[FieldNote]: with closing(self._connect()) as conn: conn.row_factory = sqlite3.Row rows = conn.execute( """ SELECT created_at, model_id, prompt, response, correction, tags, image_path, video_path, use_for_training FROM field_notes ORDER BY created_at """ ).fetchall() notes = [ FieldNote( created_at=str(row["created_at"]), model_id=str(row["model_id"]), prompt=str(row["prompt"]), response=str(row["response"]), correction=str(row["correction"]), tags=str(row["tags"]), image_path=str(row["image_path"]), video_path=str(row["video_path"]), use_for_training=bool(row["use_for_training"]), ) for row in rows ] if corrected_only: notes = [note for note in notes if note.correction.strip()] if tag: notes = [note for note in notes if tag in _split_tags(note.tags)] if training_only: notes = [note for note in notes if note.use_for_training] return notes def _split_tags(tags: str) -> set[str]: return {tag.strip() for tag in tags.split(",") if tag.strip()}