| from __future__ import annotations |
|
|
| import json |
| from dataclasses import asdict |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Tuple |
| from uuid import uuid4 |
|
|
| import aiosqlite |
|
|
| from .models import PersonaRecord, PersonaSummary, AnalysisTemplateRecord, AnalysisTemplateSummary |
|
|
|
|
| def _now_iso() -> str: |
| return datetime.now().isoformat() |
|
|
|
|
| def _clean_str_list(value: Any) -> List[str]: |
| if not isinstance(value, list): |
| return [] |
| out: List[str] = [] |
| for item in value: |
| if isinstance(item, str) and item.strip(): |
| out.append(item.strip()) |
| return out |
|
|
|
|
| def _clean_str(value: Any) -> str: |
| return value.strip() if isinstance(value, str) else "" |
|
|
|
|
| def _clean_category_list(value: Any) -> List[Dict[str, str]]: |
| if not isinstance(value, list): |
| return [] |
| out: List[Dict[str, str]] = [] |
| for item in value: |
| if not isinstance(item, dict): |
| continue |
| cid = item.get("category_id") |
| label = item.get("label") |
| desc = item.get("description") |
| if not (isinstance(cid, str) and cid.strip()): |
| continue |
| if not (isinstance(label, str) and label.strip()): |
| continue |
| out.append( |
| { |
| "category_id": cid.strip(), |
| "label": label.strip(), |
| "description": desc.strip() if isinstance(desc, str) else "", |
| } |
| ) |
| return out |
|
|
|
|
| class SQLitePersonaStore: |
| def __init__(self, db_path: str): |
| self.db_path = str(db_path) |
|
|
| async def init(self) -> None: |
| Path(self.db_path).expanduser().resolve().parent.mkdir(parents=True, exist_ok=True) |
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute("PRAGMA foreign_keys=ON;") |
|
|
| await db.execute( |
| """ |
| CREATE TABLE IF NOT EXISTS personas ( |
| persona_id TEXT PRIMARY KEY, |
| kind TEXT NOT NULL, |
| name TEXT NOT NULL, |
| is_default INTEGER NOT NULL DEFAULT 0, |
| is_deleted INTEGER NOT NULL DEFAULT 0, |
| current_version_id TEXT NOT NULL, |
| created_at TEXT NOT NULL, |
| updated_at TEXT NOT NULL |
| ); |
| """ |
| ) |
| await db.execute( |
| "CREATE INDEX IF NOT EXISTS personas_kind ON personas(kind);" |
| ) |
| await db.execute( |
| "CREATE INDEX IF NOT EXISTS personas_is_default ON personas(is_default);" |
| ) |
| await db.execute( |
| """ |
| CREATE TABLE IF NOT EXISTS persona_versions ( |
| persona_id TEXT NOT NULL, |
| version_id TEXT NOT NULL, |
| created_at TEXT NOT NULL, |
| content_json TEXT NOT NULL, |
| PRIMARY KEY (persona_id, version_id), |
| FOREIGN KEY (persona_id) REFERENCES personas(persona_id) ON DELETE CASCADE |
| ); |
| """ |
| ) |
| await db.execute( |
| """ |
| CREATE TABLE IF NOT EXISTS app_settings ( |
| key TEXT PRIMARY KEY, |
| value_json TEXT NOT NULL, |
| updated_at TEXT NOT NULL |
| ); |
| """ |
| ) |
|
|
| await db.execute( |
| """ |
| CREATE TABLE IF NOT EXISTS analysis_templates ( |
| template_id TEXT PRIMARY KEY, |
| name TEXT NOT NULL, |
| is_default INTEGER NOT NULL DEFAULT 0, |
| is_deleted INTEGER NOT NULL DEFAULT 0, |
| current_version_id TEXT NOT NULL, |
| created_at TEXT NOT NULL, |
| updated_at TEXT NOT NULL |
| ); |
| """ |
| ) |
| await db.execute("CREATE INDEX IF NOT EXISTS analysis_templates_is_default ON analysis_templates(is_default);") |
| await db.execute( |
| """ |
| CREATE TABLE IF NOT EXISTS analysis_template_versions ( |
| template_id TEXT NOT NULL, |
| version_id TEXT NOT NULL, |
| created_at TEXT NOT NULL, |
| content_json TEXT NOT NULL, |
| PRIMARY KEY (template_id, version_id), |
| FOREIGN KEY (template_id) REFERENCES analysis_templates(template_id) ON DELETE CASCADE |
| ); |
| """ |
| ) |
| await db.commit() |
|
|
| async def list_analysis_template_summaries( |
| self, |
| *, |
| include_deleted: bool = False, |
| ) -> List[AnalysisTemplateSummary]: |
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute("PRAGMA foreign_keys=ON;") |
| where: List[str] = [] |
| args: List[Any] = [] |
| if not include_deleted: |
| where.append("is_deleted = 0") |
| clause = ("WHERE " + " AND ".join(where)) if where else "" |
| cur = await db.execute( |
| f""" |
| SELECT template_id, name, is_default, is_deleted |
| FROM analysis_templates |
| {clause} |
| ORDER BY is_default DESC, name ASC; |
| """, |
| tuple(args), |
| ) |
| rows = await cur.fetchall() |
| return [ |
| AnalysisTemplateSummary( |
| template_id=row[0], |
| name=row[1], |
| is_default=bool(row[2]), |
| is_deleted=bool(row[3]), |
| ) |
| for row in rows |
| ] |
|
|
| async def list_analysis_template_records( |
| self, |
| *, |
| include_deleted: bool = False, |
| ) -> List[AnalysisTemplateRecord]: |
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute("PRAGMA foreign_keys=ON;") |
| where: List[str] = [] |
| args: List[Any] = [] |
| if not include_deleted: |
| where.append("t.is_deleted = 0") |
| clause = ("WHERE " + " AND ".join(where)) if where else "" |
| cur = await db.execute( |
| f""" |
| SELECT |
| t.template_id, t.name, t.is_default, t.is_deleted, |
| t.current_version_id, t.created_at, t.updated_at, |
| v.content_json |
| FROM analysis_templates t |
| JOIN analysis_template_versions v |
| ON v.template_id = t.template_id AND v.version_id = t.current_version_id |
| {clause} |
| ORDER BY t.is_default DESC, t.name ASC; |
| """, |
| tuple(args), |
| ) |
| rows = await cur.fetchall() |
|
|
| out: List[AnalysisTemplateRecord] = [] |
| for row in rows: |
| content: Dict[str, Any] = {} |
| if isinstance(row[7], str): |
| try: |
| content = json.loads(row[7]) or {} |
| except Exception: |
| content = {} |
| out.append( |
| AnalysisTemplateRecord( |
| template_id=row[0], |
| name=row[1], |
| is_default=bool(row[2]), |
| is_deleted=bool(row[3]), |
| current_version_id=row[4], |
| created_at=row[5], |
| updated_at=row[6], |
| bottom_up_instructions=_clean_str(content.get("bottom_up_instructions")), |
| bottom_up_attributes=_clean_str_list(content.get("bottom_up_attributes")), |
| rubric_instructions=_clean_str(content.get("rubric_instructions")), |
| rubric_attributes=_clean_str_list(content.get("rubric_attributes")), |
| top_down_instructions=_clean_str(content.get("top_down_instructions")), |
| top_down_attributes=_clean_str_list(content.get("top_down_attributes")), |
| categories=_clean_category_list(content.get("categories")), |
| ) |
| ) |
| return out |
|
|
| async def get_analysis_template( |
| self, |
| template_id: str, |
| *, |
| include_deleted: bool = False, |
| ) -> Optional[AnalysisTemplateRecord]: |
| if not isinstance(template_id, str) or not template_id.strip(): |
| return None |
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute("PRAGMA foreign_keys=ON;") |
| cur = await db.execute( |
| """ |
| SELECT template_id, name, is_default, is_deleted, current_version_id, created_at, updated_at |
| FROM analysis_templates |
| WHERE template_id = ?; |
| """, |
| (template_id,), |
| ) |
| row = await cur.fetchone() |
| if not row: |
| return None |
| if bool(row[3]) and not include_deleted: |
| return None |
|
|
| cur2 = await db.execute( |
| """ |
| SELECT content_json |
| FROM analysis_template_versions |
| WHERE template_id = ? AND version_id = ?; |
| """, |
| (row[0], row[4]), |
| ) |
| vrow = await cur2.fetchone() |
| content: Dict[str, Any] = {} |
| if vrow and isinstance(vrow[0], str): |
| try: |
| content = json.loads(vrow[0]) or {} |
| except Exception: |
| content = {} |
|
|
| return AnalysisTemplateRecord( |
| template_id=row[0], |
| name=row[1], |
| is_default=bool(row[2]), |
| is_deleted=bool(row[3]), |
| current_version_id=row[4], |
| created_at=row[5], |
| updated_at=row[6], |
| bottom_up_instructions=_clean_str(content.get("bottom_up_instructions")), |
| bottom_up_attributes=_clean_str_list(content.get("bottom_up_attributes")), |
| rubric_instructions=_clean_str(content.get("rubric_instructions")), |
| rubric_attributes=_clean_str_list(content.get("rubric_attributes")), |
| top_down_instructions=_clean_str(content.get("top_down_instructions")), |
| top_down_attributes=_clean_str_list(content.get("top_down_attributes")), |
| categories=_clean_category_list(content.get("categories")), |
| ) |
|
|
| async def upsert_default_analysis_template( |
| self, |
| *, |
| template_id: str, |
| name: str, |
| bottom_up_instructions: str, |
| bottom_up_attributes: List[str], |
| rubric_instructions: str, |
| rubric_attributes: List[str], |
| top_down_instructions: str, |
| top_down_attributes: List[str], |
| categories: List[Dict[str, str]], |
| ) -> Tuple[str, str]: |
| if not isinstance(template_id, str) or not template_id.strip(): |
| raise ValueError("template_id is required") |
| if not isinstance(name, str) or not name.strip(): |
| raise ValueError("name is required") |
| cleaned_categories = _clean_category_list(categories) |
| if not cleaned_categories: |
| raise ValueError("categories are required") |
| bui = _clean_str(bottom_up_instructions) |
| bua = _clean_str_list(bottom_up_attributes) |
| ri = _clean_str(rubric_instructions) |
| ra = _clean_str_list(rubric_attributes) |
| tdi = _clean_str(top_down_instructions) |
| tda = _clean_str_list(top_down_attributes) |
|
|
| now = _now_iso() |
| version_id = str(uuid4()) |
| content_json = json.dumps( |
| { |
| "bottom_up_instructions": bui, |
| "bottom_up_attributes": bua, |
| "rubric_instructions": ri, |
| "rubric_attributes": ra, |
| "top_down_instructions": tdi, |
| "top_down_attributes": tda, |
| "categories": cleaned_categories, |
| }, |
| ensure_ascii=False, |
| ) |
|
|
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute("PRAGMA foreign_keys=ON;") |
| await db.execute("BEGIN;") |
| cur = await db.execute( |
| "SELECT current_version_id FROM analysis_templates WHERE template_id = ?;", |
| (template_id,), |
| ) |
| existing = await cur.fetchone() |
|
|
| if existing: |
| |
| await db.execute("ROLLBACK;") |
| return template_id, str(existing[0]) |
|
|
| try: |
| |
| await db.execute( |
| """ |
| INSERT INTO analysis_templates (template_id, name, is_default, is_deleted, current_version_id, created_at, updated_at) |
| VALUES (?, ?, 1, 0, ?, ?, ?); |
| """, |
| (template_id, name.strip(), version_id, now, now), |
| ) |
| await db.execute( |
| """ |
| INSERT INTO analysis_template_versions (template_id, version_id, created_at, content_json) |
| VALUES (?, ?, ?, ?); |
| """, |
| (template_id, version_id, now, content_json), |
| ) |
| await db.commit() |
| except Exception: |
| await db.execute("ROLLBACK;") |
| raise |
| return template_id, version_id |
|
|
| async def create_analysis_template( |
| self, |
| *, |
| name: str, |
| bottom_up_instructions: str, |
| bottom_up_attributes: List[str], |
| rubric_instructions: str, |
| rubric_attributes: List[str], |
| top_down_instructions: str, |
| top_down_attributes: List[str], |
| categories: List[Dict[str, str]], |
| is_default: bool = False, |
| ) -> Tuple[str, str]: |
| if not isinstance(name, str) or not name.strip(): |
| raise ValueError("name is required") |
| cleaned_categories = _clean_category_list(categories) |
| if not cleaned_categories: |
| raise ValueError("categories are required") |
| bui = _clean_str(bottom_up_instructions) |
| bua = _clean_str_list(bottom_up_attributes) |
| ri = _clean_str(rubric_instructions) |
| ra = _clean_str_list(rubric_attributes) |
| tdi = _clean_str(top_down_instructions) |
| tda = _clean_str_list(top_down_attributes) |
|
|
| now = _now_iso() |
| template_id = str(uuid4()) |
| version_id = str(uuid4()) |
| content_json = json.dumps( |
| { |
| "bottom_up_instructions": bui, |
| "bottom_up_attributes": bua, |
| "rubric_instructions": ri, |
| "rubric_attributes": ra, |
| "top_down_instructions": tdi, |
| "top_down_attributes": tda, |
| "categories": cleaned_categories, |
| }, |
| ensure_ascii=False, |
| ) |
|
|
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute("PRAGMA foreign_keys=ON;") |
| await db.execute("BEGIN;") |
| try: |
| await db.execute( |
| """ |
| INSERT INTO analysis_templates (template_id, name, is_default, is_deleted, current_version_id, created_at, updated_at) |
| VALUES (?, ?, ?, 0, ?, ?, ?); |
| """, |
| (template_id, name.strip(), 1 if is_default else 0, version_id, now, now), |
| ) |
| await db.execute( |
| """ |
| INSERT INTO analysis_template_versions (template_id, version_id, created_at, content_json) |
| VALUES (?, ?, ?, ?); |
| """, |
| (template_id, version_id, now, content_json), |
| ) |
| await db.commit() |
| except Exception: |
| await db.execute("ROLLBACK;") |
| raise |
| return template_id, version_id |
|
|
| async def update_analysis_template( |
| self, |
| *, |
| template_id: str, |
| bottom_up_instructions: str, |
| bottom_up_attributes: List[str], |
| rubric_instructions: str, |
| rubric_attributes: List[str], |
| top_down_instructions: str, |
| top_down_attributes: List[str], |
| categories: List[Dict[str, str]], |
| allow_default: bool = False, |
| ) -> str: |
| if not isinstance(template_id, str) or not template_id.strip(): |
| raise ValueError("template_id is required") |
| cleaned_categories = _clean_category_list(categories) |
| if not cleaned_categories: |
| raise ValueError("categories are required") |
| bui = _clean_str(bottom_up_instructions) |
| bua = _clean_str_list(bottom_up_attributes) |
| ri = _clean_str(rubric_instructions) |
| ra = _clean_str_list(rubric_attributes) |
| tdi = _clean_str(top_down_instructions) |
| tda = _clean_str_list(top_down_attributes) |
|
|
| now = _now_iso() |
| version_id = str(uuid4()) |
| content_json = json.dumps( |
| { |
| "bottom_up_instructions": bui, |
| "bottom_up_attributes": bua, |
| "rubric_instructions": ri, |
| "rubric_attributes": ra, |
| "top_down_instructions": tdi, |
| "top_down_attributes": tda, |
| "categories": cleaned_categories, |
| }, |
| ensure_ascii=False, |
| ) |
|
|
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute("PRAGMA foreign_keys=ON;") |
| cur = await db.execute( |
| "SELECT is_default, is_deleted FROM analysis_templates WHERE template_id = ?;", |
| (template_id,), |
| ) |
| row = await cur.fetchone() |
| if not row: |
| raise ValueError("template not found") |
| if bool(row[1]): |
| raise ValueError("template not found") |
| if bool(row[0]) and not allow_default: |
| raise PermissionError("default template is immutable") |
|
|
| await db.execute( |
| """ |
| INSERT INTO analysis_template_versions (template_id, version_id, created_at, content_json) |
| VALUES (?, ?, ?, ?); |
| """, |
| (template_id, version_id, now, content_json), |
| ) |
| await db.execute( |
| """ |
| UPDATE analysis_templates |
| SET current_version_id = ?, updated_at = ? |
| WHERE template_id = ?; |
| """, |
| (version_id, now, template_id), |
| ) |
| await db.commit() |
| return version_id |
|
|
| async def soft_delete_analysis_template(self, template_id: str) -> None: |
| if not isinstance(template_id, str) or not template_id.strip(): |
| raise ValueError("template_id is required") |
| now = _now_iso() |
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute("PRAGMA foreign_keys=ON;") |
| cur = await db.execute( |
| "SELECT is_default, is_deleted FROM analysis_templates WHERE template_id = ?;", |
| (template_id,), |
| ) |
| row = await cur.fetchone() |
| if not row: |
| raise ValueError("template not found") |
| if bool(row[1]): |
| raise ValueError("template not found") |
| if bool(row[0]): |
| raise PermissionError("default template is immutable") |
|
|
| await db.execute( |
| "UPDATE analysis_templates SET is_deleted = 1, updated_at = ? WHERE template_id = ?;", |
| (now, template_id), |
| ) |
| await db.commit() |
|
|
| async def retire_analysis_template(self, template_id: str) -> None: |
| if not isinstance(template_id, str) or not template_id.strip(): |
| raise ValueError("template_id is required") |
| now = _now_iso() |
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute("PRAGMA foreign_keys=ON;") |
| cur = await db.execute( |
| "SELECT template_id FROM analysis_templates WHERE template_id = ?;", |
| (template_id,), |
| ) |
| row = await cur.fetchone() |
| if not row: |
| return |
| await db.execute( |
| "UPDATE analysis_templates SET is_default = 0, is_deleted = 1, updated_at = ? WHERE template_id = ?;", |
| (now, template_id), |
| ) |
| await db.commit() |
|
|
| async def upsert_setting(self, key: str, value: Any) -> None: |
| if not isinstance(key, str) or not key.strip(): |
| raise ValueError("key is required") |
| updated_at = _now_iso() |
| payload = json.dumps(value, ensure_ascii=False) |
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute("PRAGMA foreign_keys=ON;") |
| await db.execute( |
| """ |
| INSERT INTO app_settings (key, value_json, updated_at) |
| VALUES (?, ?, ?) |
| ON CONFLICT(key) DO UPDATE SET |
| value_json=excluded.value_json, |
| updated_at=excluded.updated_at; |
| """, |
| (key, payload, updated_at), |
| ) |
| await db.commit() |
|
|
| async def get_setting(self, key: str) -> Optional[Any]: |
| if not isinstance(key, str) or not key.strip(): |
| return None |
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute("PRAGMA foreign_keys=ON;") |
| cur = await db.execute( |
| "SELECT value_json FROM app_settings WHERE key = ?;", |
| (key,), |
| ) |
| row = await cur.fetchone() |
| if not row: |
| return None |
| try: |
| return json.loads(row[0]) |
| except Exception: |
| return None |
|
|
| async def list_personas( |
| self, |
| *, |
| kind: Optional[str] = None, |
| include_deleted: bool = False, |
| ) -> List[PersonaSummary]: |
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute("PRAGMA foreign_keys=ON;") |
| where: List[str] = [] |
| args: List[Any] = [] |
| if kind: |
| where.append("kind = ?") |
| args.append(kind) |
| if not include_deleted: |
| where.append("is_deleted = 0") |
| clause = ("WHERE " + " AND ".join(where)) if where else "" |
| cur = await db.execute( |
| f""" |
| SELECT persona_id, kind, name, is_default, is_deleted |
| FROM personas |
| {clause} |
| ORDER BY is_default DESC, name ASC; |
| """, |
| tuple(args), |
| ) |
| rows = await cur.fetchall() |
| return [ |
| PersonaSummary( |
| persona_id=row[0], |
| kind=row[1], |
| name=row[2], |
| is_default=bool(row[3]), |
| is_deleted=bool(row[4]), |
| ) |
| for row in rows |
| ] |
|
|
| async def list_persona_records( |
| self, |
| *, |
| kind: Optional[str] = None, |
| include_deleted: bool = False, |
| ) -> List[PersonaRecord]: |
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute("PRAGMA foreign_keys=ON;") |
| where: List[str] = [] |
| args: List[Any] = [] |
| if kind: |
| where.append("p.kind = ?") |
| args.append(kind) |
| if not include_deleted: |
| where.append("p.is_deleted = 0") |
| clause = ("WHERE " + " AND ".join(where)) if where else "" |
| cur = await db.execute( |
| f""" |
| SELECT |
| p.persona_id, p.kind, p.name, p.is_default, p.is_deleted, |
| p.current_version_id, p.created_at, p.updated_at, |
| v.content_json |
| FROM personas p |
| JOIN persona_versions v |
| ON v.persona_id = p.persona_id AND v.version_id = p.current_version_id |
| {clause} |
| ORDER BY p.is_default DESC, p.name ASC; |
| """, |
| tuple(args), |
| ) |
| rows = await cur.fetchall() |
|
|
| out: List[PersonaRecord] = [] |
| for row in rows: |
| content: Dict[str, Any] = {} |
| if isinstance(row[8], str): |
| try: |
| content = json.loads(row[8]) or {} |
| except Exception: |
| content = {} |
| out.append( |
| PersonaRecord( |
| persona_id=row[0], |
| kind=row[1], |
| name=row[2], |
| is_default=bool(row[3]), |
| is_deleted=bool(row[4]), |
| current_version_id=row[5], |
| created_at=row[6], |
| updated_at=row[7], |
| attributes=_clean_str_list(content.get("attributes")), |
| question_bank_items=_clean_str_list(content.get("question_bank_items")), |
| ) |
| ) |
| return out |
|
|
| async def get_persona(self, persona_id: str, *, include_deleted: bool = False) -> Optional[PersonaRecord]: |
| if not isinstance(persona_id, str) or not persona_id.strip(): |
| return None |
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute("PRAGMA foreign_keys=ON;") |
| cur = await db.execute( |
| """ |
| SELECT persona_id, kind, name, is_default, is_deleted, current_version_id, created_at, updated_at |
| FROM personas |
| WHERE persona_id = ?; |
| """, |
| (persona_id,), |
| ) |
| row = await cur.fetchone() |
| if not row: |
| return None |
| if (not include_deleted) and bool(row[4]): |
| return None |
| current_version_id = row[5] |
| cur = await db.execute( |
| """ |
| SELECT content_json |
| FROM persona_versions |
| WHERE persona_id = ? AND version_id = ?; |
| """, |
| (persona_id, current_version_id), |
| ) |
| vrow = await cur.fetchone() |
| content: Dict[str, Any] = {} |
| if vrow and isinstance(vrow[0], str): |
| try: |
| content = json.loads(vrow[0]) or {} |
| except Exception: |
| content = {} |
| attrs = _clean_str_list(content.get("attributes")) |
| qb = _clean_str_list(content.get("question_bank_items")) |
| return PersonaRecord( |
| persona_id=row[0], |
| kind=row[1], |
| name=row[2], |
| is_default=bool(row[3]), |
| is_deleted=bool(row[4]), |
| current_version_id=row[5], |
| created_at=row[6], |
| updated_at=row[7], |
| attributes=attrs, |
| question_bank_items=qb, |
| ) |
|
|
| async def create_persona( |
| self, |
| *, |
| persona_id: Optional[str] = None, |
| kind: str, |
| name: str, |
| attributes: Optional[List[str]] = None, |
| question_bank_items: Optional[List[str]] = None, |
| is_default: bool = False, |
| ) -> Tuple[str, str]: |
| if kind not in ("surveyor", "patient"): |
| raise ValueError("kind must be 'surveyor' or 'patient'") |
| if not isinstance(name, str) or not name.strip(): |
| raise ValueError("name is required") |
| if persona_id is not None: |
| if not isinstance(persona_id, str) or not persona_id.strip(): |
| raise ValueError("persona_id must be a non-empty string when provided") |
| persona_id = persona_id.strip() |
| else: |
| persona_id = str(uuid4()) |
| version_id = str(uuid4()) |
| created_at = _now_iso() |
| updated_at = created_at |
| content = { |
| "attributes": _clean_str_list(attributes), |
| "question_bank_items": _clean_str_list(question_bank_items) if kind == "surveyor" else [], |
| } |
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute("PRAGMA foreign_keys=ON;") |
| await db.execute("BEGIN;") |
| try: |
| await db.execute( |
| """ |
| INSERT INTO personas ( |
| persona_id, kind, name, is_default, is_deleted, current_version_id, created_at, updated_at |
| ) VALUES (?, ?, ?, ?, 0, ?, ?, ?); |
| """, |
| (persona_id, kind, name.strip(), 1 if is_default else 0, version_id, created_at, updated_at), |
| ) |
| await db.execute( |
| """ |
| INSERT INTO persona_versions (persona_id, version_id, created_at, content_json) |
| VALUES (?, ?, ?, ?); |
| """, |
| (persona_id, version_id, created_at, json.dumps(content, ensure_ascii=False)), |
| ) |
| await db.commit() |
| except Exception: |
| await db.execute("ROLLBACK;") |
| raise |
| return persona_id, version_id |
|
|
| async def upsert_default_persona( |
| self, |
| *, |
| persona_id: str, |
| kind: str, |
| name: str, |
| attributes: List[str], |
| question_bank_items: List[str], |
| ) -> str: |
| if kind not in ("surveyor", "patient"): |
| raise ValueError("kind must be 'surveyor' or 'patient'") |
| if not isinstance(persona_id, str) or not persona_id.strip(): |
| raise ValueError("persona_id is required") |
| if not isinstance(name, str) or not name.strip(): |
| raise ValueError("name is required") |
|
|
| persona_id = persona_id.strip() |
| version_id = str(uuid4()) |
| now = _now_iso() |
| content = { |
| "attributes": _clean_str_list(attributes), |
| "question_bank_items": _clean_str_list(question_bank_items) if kind == "surveyor" else [], |
| } |
|
|
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute("PRAGMA foreign_keys=ON;") |
| await db.execute("BEGIN;") |
| try: |
| cur = await db.execute( |
| "SELECT persona_id FROM personas WHERE persona_id = ?;", |
| (persona_id,), |
| ) |
| row = await cur.fetchone() |
| if row: |
| await db.execute( |
| """ |
| INSERT INTO persona_versions (persona_id, version_id, created_at, content_json) |
| VALUES (?, ?, ?, ?); |
| """, |
| (persona_id, version_id, now, json.dumps(content, ensure_ascii=False)), |
| ) |
| await db.execute( |
| """ |
| UPDATE personas |
| SET kind = ?, name = ?, is_default = 1, is_deleted = 0, current_version_id = ?, updated_at = ? |
| WHERE persona_id = ?; |
| """, |
| (kind, name.strip(), version_id, now, persona_id), |
| ) |
| else: |
| await db.execute( |
| """ |
| INSERT INTO personas ( |
| persona_id, kind, name, is_default, is_deleted, current_version_id, created_at, updated_at |
| ) VALUES (?, ?, ?, 1, 0, ?, ?, ?); |
| """, |
| (persona_id, kind, name.strip(), version_id, now, now), |
| ) |
| await db.execute( |
| """ |
| INSERT INTO persona_versions (persona_id, version_id, created_at, content_json) |
| VALUES (?, ?, ?, ?); |
| """, |
| (persona_id, version_id, now, json.dumps(content, ensure_ascii=False)), |
| ) |
| await db.commit() |
| except Exception: |
| await db.execute("ROLLBACK;") |
| raise |
| return version_id |
|
|
| async def update_persona( |
| self, |
| *, |
| persona_id: str, |
| name: Optional[str] = None, |
| attributes: Optional[List[str]] = None, |
| question_bank_items: Optional[List[str]] = None, |
| overwrite_defaults: bool = False, |
| ) -> str: |
| record = await self.get_persona(persona_id, include_deleted=True) |
| if not record: |
| raise ValueError("persona not found") |
| if record.is_default and not overwrite_defaults: |
| raise PermissionError("default persona is immutable") |
| version_id = str(uuid4()) |
| created_at = _now_iso() |
| updated_at = created_at |
| next_name = (name.strip() if isinstance(name, str) and name.strip() else record.name) |
| content = { |
| "attributes": _clean_str_list(attributes) if attributes is not None else record.attributes, |
| "question_bank_items": _clean_str_list(question_bank_items) if record.kind == "surveyor" and question_bank_items is not None else record.question_bank_items, |
| } |
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute("PRAGMA foreign_keys=ON;") |
| await db.execute("BEGIN;") |
| try: |
| await db.execute( |
| """ |
| INSERT INTO persona_versions (persona_id, version_id, created_at, content_json) |
| VALUES (?, ?, ?, ?); |
| """, |
| (persona_id, version_id, created_at, json.dumps(content, ensure_ascii=False)), |
| ) |
| await db.execute( |
| """ |
| UPDATE personas |
| SET name = ?, current_version_id = ?, updated_at = ?, is_deleted = 0 |
| WHERE persona_id = ?; |
| """, |
| (next_name, version_id, updated_at, persona_id), |
| ) |
| await db.commit() |
| except Exception: |
| await db.execute("ROLLBACK;") |
| raise |
| return version_id |
|
|
| async def soft_delete_persona(self, persona_id: str) -> None: |
| record = await self.get_persona(persona_id, include_deleted=True) |
| if not record: |
| raise ValueError("persona not found") |
| if record.is_default: |
| raise PermissionError("default persona is immutable") |
| async with aiosqlite.connect(self.db_path) as db: |
| await db.execute("PRAGMA foreign_keys=ON;") |
| await db.execute( |
| "UPDATE personas SET is_deleted = 1, updated_at = ? WHERE persona_id = ?;", |
| (_now_iso(), persona_id), |
| ) |
| await db.commit() |
|
|