workbench / datasets /field_notes.py
GitHub Actions
Initial ZeroGPU deployment with spaces shim
7f9dfed
Raw
History Blame Contribute Delete
7.88 kB
from __future__ import annotations
import csv
import datetime as dt
import json
import sqlite3
from contextlib import closing
from dataclasses import asdict, dataclass
from pathlib import Path
UTC = getattr(dt, "UTC", dt.timezone.utc) # noqa: UP017
@dataclass
class FieldNote:
"""One human correction record."""
created_at: str
model_id: str
prompt: str
response: str
correction: str
tags: str
image_path: str = ""
video_path: str = ""
use_for_training: bool = True
@classmethod
def create(
cls,
model_id: str,
prompt: str,
response: str,
correction: str,
tags: str,
image_path: str = "",
video_path: str = "",
use_for_training: bool = True,
) -> FieldNote:
return cls(
created_at=dt.datetime.now(UTC).isoformat(),
model_id=model_id,
prompt=prompt,
response=response,
correction=correction,
tags=tags,
image_path=image_path,
video_path=video_path,
use_for_training=use_for_training,
)
@classmethod
def from_row(cls, row: dict[str, str]) -> FieldNote:
use_for_training = str(row.get("use_for_training", "true")).lower() in {
"1",
"true",
"yes",
}
return cls(
created_at=row["created_at"],
model_id=row["model_id"],
prompt=row["prompt"],
response=row["response"],
correction=row["correction"],
tags=row["tags"],
image_path=row.get("image_path", ""),
video_path=row.get("video_path", ""),
use_for_training=use_for_training,
)
def to_dict(self) -> dict[str, object]:
return asdict(self)
class FieldNoteStore:
"""CSV-backed field note storage."""
def __init__(self, path: str | Path = "data/field_notes.csv") -> None:
self.path = Path(path)
def save(self, note: FieldNote) -> Path:
self.path.parent.mkdir(parents=True, exist_ok=True)
is_new = not self.path.exists()
with self.path.open("a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=list(note.to_dict()))
if is_new:
writer.writeheader()
writer.writerow(note.to_dict())
return self.path
def list_notes(
self,
corrected_only: bool = False,
tag: str = "",
training_only: bool = False,
) -> list[FieldNote]:
if not self.path.exists():
return []
with self.path.open(newline="", encoding="utf-8") as f:
rows = list(csv.DictReader(f))
notes = [FieldNote.from_row(row) for row in rows]
if corrected_only:
notes = [note for note in notes if note.correction.strip()]
if tag:
notes = [note for note in notes if tag in _split_tags(note.tags)]
if training_only:
notes = [note for note in notes if note.use_for_training]
return notes
def export_jsonl(
self,
output_path: str | Path = "data/field_notes.jsonl",
corrected_only: bool = True,
training_only: bool = True,
) -> Path:
output = Path(output_path)
output.parent.mkdir(parents=True, exist_ok=True)
notes = self.list_notes(
corrected_only=corrected_only,
training_only=training_only,
)
with output.open("w", encoding="utf-8") as f:
for note in notes:
f.write(json.dumps(note.to_dict(), ensure_ascii=False) + "\n")
return output
def export_hf_dataset(
self,
output_dir: str | Path = "data/hf_field_notes",
corrected_only: bool = True,
training_only: bool = True,
) -> Path:
target = Path(output_dir)
target.mkdir(parents=True, exist_ok=True)
data_file = self.export_jsonl(
target / "data.jsonl",
corrected_only=corrected_only,
training_only=training_only,
)
(target / "README.md").write_text(
"# Field Notes Dataset\n\n"
"Local export generated by OpenBMB Local AI Workbench.\n\n"
f"- Data file: `{data_file.name}`\n"
"- Intended split: `train`\n",
encoding="utf-8",
)
return target
class SQLiteFieldNoteStore:
"""SQLite-backed field note storage for larger correction loops."""
def __init__(self, path: str | Path = "data/field_notes.sqlite") -> None:
self.path = Path(path)
self.path.parent.mkdir(parents=True, exist_ok=True)
self._init_schema()
def _connect(self) -> sqlite3.Connection:
return sqlite3.connect(self.path)
def _init_schema(self) -> None:
with closing(self._connect()) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS field_notes (
created_at TEXT NOT NULL,
model_id TEXT NOT NULL,
prompt TEXT NOT NULL,
response TEXT NOT NULL,
correction TEXT NOT NULL,
tags TEXT NOT NULL,
image_path TEXT NOT NULL,
video_path TEXT NOT NULL,
use_for_training INTEGER NOT NULL
)
"""
)
conn.commit()
def save(self, note: FieldNote) -> Path:
with closing(self._connect()) as conn:
conn.execute(
"""
INSERT INTO field_notes (
created_at, model_id, prompt, response, correction, tags,
image_path, video_path, use_for_training
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
note.created_at,
note.model_id,
note.prompt,
note.response,
note.correction,
note.tags,
note.image_path,
note.video_path,
int(note.use_for_training),
),
)
conn.commit()
return self.path
def list_notes(
self,
corrected_only: bool = False,
tag: str = "",
training_only: bool = False,
) -> list[FieldNote]:
with closing(self._connect()) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"""
SELECT created_at, model_id, prompt, response, correction, tags,
image_path, video_path, use_for_training
FROM field_notes
ORDER BY created_at
"""
).fetchall()
notes = [
FieldNote(
created_at=str(row["created_at"]),
model_id=str(row["model_id"]),
prompt=str(row["prompt"]),
response=str(row["response"]),
correction=str(row["correction"]),
tags=str(row["tags"]),
image_path=str(row["image_path"]),
video_path=str(row["video_path"]),
use_for_training=bool(row["use_for_training"]),
)
for row in rows
]
if corrected_only:
notes = [note for note in notes if note.correction.strip()]
if tag:
notes = [note for note in notes if tag in _split_tags(note.tags)]
if training_only:
notes = [note for note in notes if note.use_for_training]
return notes
def _split_tags(tags: str) -> set[str]:
return {tag.strip() for tag in tags.split(",") if tag.strip()}