File size: 12,231 Bytes
80f50ca
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
"""
Codebook storage (universal).

SQLite-backed CRUD over `codes` and `annotation_codes` in
`<task_dir>/project.sqlite` via the universal persistence layer. No
business rules live here (no cycle checks, no permissions, no cache
invalidation) — that is the service layer's job. This module only
persists rows.

A *code* is a (possibly nested) label in a project's codebook. An
*annotation_code* links a stored annotation to a code, optionally with a
time span (`started_at`/`ended_at`) for temporal / agentic-trace coding.
Universal: usable in standard annotation, solo mode, and QDA mode.

Design notes:
- `parent_id` is TEXT NOT NULL DEFAULT '' where '' means "root". Using a
  sentinel instead of NULL lets `UNIQUE(project, parent_id, name)`
  actually prevent duplicate sibling names at the root too (SQLite
  treats NULLs as distinct in UNIQUE constraints).
- No SQL foreign keys (consistent with the memos store): the service
  layer enforces parent existence, cycle-freedom, and recursive delete.
"""

from __future__ import annotations

import time
import uuid
from typing import Any, Dict, List, Optional

from potato.persistence import Migration, get_db, register_migration

ROOT = ""  # sentinel parent_id for top-level codes

_CODEBOOK_MIGRATION = Migration(
    name="0001_codebook",
    sql="""
    CREATE TABLE IF NOT EXISTS codes (
        id          TEXT PRIMARY KEY,
        project     TEXT NOT NULL,
        name        TEXT NOT NULL,
        color       TEXT,
        parent_id   TEXT NOT NULL DEFAULT '',
        sort_order  INTEGER NOT NULL DEFAULT 0,
        created_by  TEXT NOT NULL,
        created_at  REAL NOT NULL,
        updated_at  REAL NOT NULL,
        UNIQUE (project, parent_id, name)
    );
    CREATE INDEX IF NOT EXISTS idx_codes_project ON codes (project);
    CREATE INDEX IF NOT EXISTS idx_codes_parent
        ON codes (project, parent_id);

    CREATE TABLE IF NOT EXISTS annotation_codes (
        annotation_id TEXT NOT NULL,
        code_id       TEXT NOT NULL,
        project       TEXT NOT NULL,
        created_by    TEXT NOT NULL,
        started_at    REAL,
        ended_at      REAL,
        PRIMARY KEY (annotation_id, code_id)
    );
    CREATE INDEX IF NOT EXISTS idx_anncodes_code
        ON annotation_codes (project, code_id);
    CREATE INDEX IF NOT EXISTS idx_anncodes_ann
        ON annotation_codes (annotation_id);
    """,
)

register_migration(_CODEBOOK_MIGRATION)


def _db(task_dir: str):
    """Connection guaranteeing the codebook migration is registered.

    register_migration is idempotent, so this is a no-op normally; it
    makes the store robust if a test helper (clear_migrations) wiped the
    process-global registry before this task_dir's first get_db().
    """
    register_migration(_CODEBOOK_MIGRATION)
    return get_db(task_dir)


def _ensure_temporal_schema() -> None:
    """Guarantee the Phase 2 (C) append-only columns/tables exist before
    any link read/write that depends on `invalidated_at`. Lazy import
    avoids a module-load cycle (changelog imports this module). The
    0003 migration is additive (nullable cols + new tables), so this is
    safe even for callers that only registered 0001."""
    from potato.codebook.changelog import _CHANGE_MIGRATION
    from potato.codebook.revision import (
        _REVISION_MIGRATION, _CODES_REV_MIGRATION)
    register_migration(_REVISION_MIGRATION)
    register_migration(_CODES_REV_MIGRATION)
    register_migration(_CHANGE_MIGRATION)


# ---- codes ---------------------------------------------------------------

def insert_code(
    task_dir: str,
    *,
    project: str,
    name: str,
    created_by: str,
    color: Optional[str] = None,
    parent_id: str = ROOT,
    sort_order: int = 0,
    code_id: Optional[str] = None,
    created_revision: int = 0,
) -> Dict[str, Any]:
    """Insert one code row and return it. `code_id` lets the init CLI
    supply a deterministic id; otherwise a random uuid4 is used.
    `created_revision` records the codebook revision the code first
    appeared in (for provenance / the review worklist)."""
    cid = code_id or uuid.uuid4().hex
    now = time.time()
    conn = _db(task_dir)
    conn.execute(
        """INSERT INTO codes
           (id, project, name, color, parent_id, sort_order,
            created_by, created_at, updated_at, created_revision)
           VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
        (cid, project, name, color, parent_id, sort_order,
         created_by, now, now, created_revision),
    )
    conn.commit()
    return get_code(task_dir, cid)


def get_code(task_dir: str, code_id: str) -> Optional[Dict[str, Any]]:
    row = _db(task_dir).execute(
        "SELECT * FROM codes WHERE id = ?", (code_id,)
    ).fetchone()
    return dict(row) if row else None


def find_code(
    task_dir: str, project: str, parent_id: str, name: str
) -> Optional[Dict[str, Any]]:
    row = _db(task_dir).execute(
        """SELECT * FROM codes
           WHERE project = ? AND parent_id = ? AND name = ?""",
        (project, parent_id, name),
    ).fetchone()
    return dict(row) if row else None


def list_codes(task_dir: str, project: str) -> List[Dict[str, Any]]:
    rows = _db(task_dir).execute(
        """SELECT * FROM codes WHERE project = ?
           ORDER BY parent_id ASC, sort_order ASC, name ASC""",
        (project,),
    ).fetchall()
    return [dict(r) for r in rows]


def children_of(
    task_dir: str, project: str, parent_id: str
) -> List[Dict[str, Any]]:
    rows = _db(task_dir).execute(
        """SELECT * FROM codes WHERE project = ? AND parent_id = ?
           ORDER BY sort_order ASC, name ASC""",
        (project, parent_id),
    ).fetchall()
    return [dict(r) for r in rows]


def update_code(
    task_dir: str,
    code_id: str,
    *,
    name: Optional[str] = None,
    color: Optional[str] = None,
    parent_id: Optional[str] = None,
    sort_order: Optional[int] = None,
) -> Optional[Dict[str, Any]]:
    sets, params = [], []
    if name is not None:
        sets.append("name = ?"); params.append(name)
    if color is not None:
        sets.append("color = ?"); params.append(color)
    if parent_id is not None:
        sets.append("parent_id = ?"); params.append(parent_id)
    if sort_order is not None:
        sets.append("sort_order = ?"); params.append(sort_order)
    if not sets:
        return get_code(task_dir, code_id)
    sets.append("updated_at = ?"); params.append(time.time())
    params.append(code_id)
    conn = _db(task_dir)
    conn.execute(f"UPDATE codes SET {', '.join(sets)} WHERE id = ?", params)
    conn.commit()
    return get_code(task_dir, code_id)


def delete_codes(task_dir: str, code_ids: List[str]) -> int:
    """Delete the given codes and their annotation_codes links. The
    service computes the full subtree; this just executes the delete."""
    if not code_ids:
        return 0
    qs = ",".join("?" * len(code_ids))
    conn = _db(task_dir)
    conn.execute(
        f"DELETE FROM annotation_codes WHERE code_id IN ({qs})", code_ids)
    cur = conn.execute(
        f"DELETE FROM codes WHERE id IN ({qs})", code_ids)
    conn.commit()
    return cur.rowcount


# ---- annotation_codes ----------------------------------------------------

def link_annotation(
    task_dir: str,
    *,
    project: str,
    annotation_id: str,
    code_id: str,
    created_by: str,
    started_at: Optional[float] = None,
    ended_at: Optional[float] = None,
) -> None:
    conn = _db(task_dir)
    conn.execute(
        """INSERT OR REPLACE INTO annotation_codes
           (annotation_id, code_id, project, created_by,
            started_at, ended_at)
           VALUES (?, ?, ?, ?, ?, ?)""",
        (annotation_id, code_id, project, created_by,
         started_at, ended_at),
    )
    conn.commit()


def unlink_annotation(
    task_dir: str, annotation_id: str, code_id: str
) -> bool:
    conn = _db(task_dir)
    cur = conn.execute(
        """DELETE FROM annotation_codes
           WHERE annotation_id = ? AND code_id = ?""",
        (annotation_id, code_id),
    )
    conn.commit()
    return cur.rowcount > 0


def codes_for_annotation(
    task_dir: str, annotation_id: str
) -> List[Dict[str, Any]]:
    # THE single load-bearing temporal reader: only LIVE links, and
    # never an archived (e.g. merged-away) code.
    _ensure_temporal_schema()
    rows = _db(task_dir).execute(
        """SELECT ac.code_id, ac.started_at, ac.ended_at,
                  ac.created_by, c.name, c.color, c.parent_id
           FROM annotation_codes ac
           JOIN codes c ON c.id = ac.code_id
           WHERE ac.annotation_id = ?
             AND ac.invalidated_at IS NULL
             AND c.archived_at IS NULL
           ORDER BY c.name ASC""",
        (annotation_id,),
    ).fetchall()
    return [dict(r) for r in rows]


# ---- Phase 2 (C): append-only retroactive primitives --------------------

def affected_annotation_ids(
    task_dir: str, project: str, code_id: str,
    created_by: Optional[str] = None,
) -> List[str]:
    """annotation_ids with a LIVE link to `code_id` (optionally only
    those created by `created_by` — the split-by-annotator selector)."""
    _ensure_temporal_schema()
    q = ("SELECT DISTINCT annotation_id FROM annotation_codes "
         "WHERE project = ? AND code_id = ? AND invalidated_at IS NULL")
    p: List[Any] = [project, code_id]
    if created_by is not None:
        q += " AND created_by = ?"
        p.append(created_by)
    rows = _db(task_dir).execute(q, p).fetchall()
    return [r["annotation_id"] for r in rows]


def get_link(
    task_dir: str, annotation_id: str, code_id: str
) -> Optional[Dict[str, Any]]:
    _ensure_temporal_schema()
    row = _db(task_dir).execute(
        """SELECT * FROM annotation_codes
           WHERE annotation_id = ? AND code_id = ?""",
        (annotation_id, code_id),
    ).fetchone()
    return dict(row) if row else None


def invalidate_links(
    task_dir: str, *, project: str, code_id: str, change_id: str,
    created_by: Optional[str] = None,
) -> int:
    """Mark live links to `code_id` superseded (append-only — never
    DELETE). Optional `created_by` scopes to one annotator (split)."""
    _ensure_temporal_schema()
    q = ("UPDATE annotation_codes SET invalidated_at = ?, "
         "invalidated_by_change = ? "
         "WHERE project = ? AND code_id = ? AND invalidated_at IS NULL")
    p: List[Any] = [time.time(), change_id, project, code_id]
    if created_by is not None:
        q += " AND created_by = ?"
        p.append(created_by)
    conn = _db(task_dir)
    cur = conn.execute(q, p)
    conn.commit()
    return cur.rowcount


def set_link_live(
    task_dir: str, *, project: str, annotation_id: str, code_id: str,
    created_by: str, started_at: Optional[float] = None,
    ended_at: Optional[float] = None,
) -> None:
    """Make (annotation_id, code_id) a LIVE link. Idempotent against the
    PK(annotation_id, code_id): if the row exists (live or invalidated)
    it is reactivated rather than duplicated/clobbered — this is how a
    merge stays correct when the annotation is already on the target."""
    _ensure_temporal_schema()
    conn = _db(task_dir)
    cur = conn.execute(
        """UPDATE annotation_codes
           SET invalidated_at = NULL, invalidated_by_change = NULL
           WHERE annotation_id = ? AND code_id = ?""",
        (annotation_id, code_id),
    )
    if cur.rowcount == 0:
        conn.execute(
            """INSERT INTO annotation_codes
               (annotation_id, code_id, project, created_by,
                started_at, ended_at)
               VALUES (?, ?, ?, ?, ?, ?)""",
            (annotation_id, code_id, project, created_by,
             started_at, ended_at),
        )
    conn.commit()


def archive_code(task_dir: str, code_id: str) -> bool:
    """Soft-archive a code (merged away): leaves the live palette + ICL
    prompt but the row and its history survive (append-only)."""
    _ensure_temporal_schema()
    conn = _db(task_dir)
    cur = conn.execute(
        "UPDATE codes SET archived_at = ?, updated_at = ? WHERE id = ?",
        (time.time(), time.time(), code_id),
    )
    conn.commit()
    return cur.rowcount > 0