//! LMDB persistence layer using heed 0.20. //! //! Each store gets its own LMDB environment directory. Values are JSON-encoded, //! keys are UTF-8 strings. All writes go through a single write transaction //! that is committed synchronously — durability is guaranteed on fsync. //! //! Thread safety: heed's Env and Database are Send + Sync. All LMDB write //! transactions are serialised by LMDB itself (only one writer at a time). //! //! Usage: //! let store = LmdbStore::open("data/kyc_db", "records")?; //! store.put("user123", &my_record)?; //! let rec: Option = store.get("user123")?; use heed::types::Bytes; use heed::{Database, Env, EnvOpenOptions}; use serde::{Deserialize, Serialize}; use tracing::error; /// A named LMDB database inside a dedicated environment directory. pub struct LmdbStore { env: Env, db: Database, } // LMDB environments are safe to share across threads. unsafe impl Send for LmdbStore {} unsafe impl Sync for LmdbStore {} impl LmdbStore { /// Open (or create) an LMDB environment at `dir` and a named database inside it. /// Idempotent: calling this multiple times on the same directory is safe. pub fn open(dir: &str, db_name: &'static str) -> anyhow::Result { std::fs::create_dir_all(dir)?; // SAFETY: we are the sole process opening this environment directory. // Do not open the same `dir` from multiple processes simultaneously. let env = unsafe { EnvOpenOptions::new() .map_size(64 * 1024 * 1024) // 64 MiB .max_dbs(16) .open(dir)? }; let mut wtxn = env.write_txn()?; let db: Database = env.create_database(&mut wtxn, Some(db_name))?; wtxn.commit()?; Ok(Self { env, db }) } /// Write a JSON-serialised value under `key`. Durable after commit. pub fn put(&self, key: &str, value: &V) -> anyhow::Result<()> { let val_bytes = serde_json::to_vec(value)?; let mut wtxn = self.env.write_txn()?; self.db.put(&mut wtxn, key.as_bytes(), &val_bytes)?; wtxn.commit()?; Ok(()) } /// Append `item` to a JSON array stored under `key`. /// If the key does not exist, a new single-element array is created. pub fn append Deserialize<'de>>( &self, key: &str, item: V, ) -> anyhow::Result<()> { let mut wtxn = self.env.write_txn()?; // Read existing list (to_vec eagerly so we release the borrow on wtxn) let existing: Option> = self.db.get(&wtxn, key.as_bytes())?.map(|b| b.to_vec()); let mut list: Vec = match existing { None => vec![], Some(bytes) => serde_json::from_slice(&bytes)?, }; list.push(item); let new_bytes = serde_json::to_vec(&list)?; self.db.put(&mut wtxn, key.as_bytes(), &new_bytes)?; wtxn.commit()?; Ok(()) } /// Read the value at `key`, returning `None` if absent. pub fn get Deserialize<'de>>(&self, key: &str) -> anyhow::Result> { let rtxn = self.env.read_txn()?; match self.db.get(&rtxn, key.as_bytes())? { None => Ok(None), Some(bytes) => Ok(Some(serde_json::from_slice(bytes)?)), } } /// Read a JSON array stored under `key`, returning an empty vec if absent. pub fn get_list Deserialize<'de>>(&self, key: &str) -> anyhow::Result> { let rtxn = self.env.read_txn()?; match self.db.get(&rtxn, key.as_bytes())? { None => Ok(vec![]), Some(bytes) => Ok(serde_json::from_slice(bytes)?), } } /// Iterate all values in the database. pub fn all_values Deserialize<'de>>(&self) -> anyhow::Result> { let rtxn = self.env.read_txn()?; let mut out = Vec::new(); for result in self.db.iter(&rtxn)? { let (_k, v) = result?; match serde_json::from_slice::(v) { Ok(val) => out.push(val), Err(e) => error!("persist: JSON decode error while scanning: {}", e), } } Ok(out) } /// Read-modify-write under `key` in a single write transaction. /// Returns `true` if the key existed and was updated, `false` if absent. /// /// Note: reads first in a read-txn, then writes in a write-txn. /// This is safe for the access patterns in this codebase (low concurrency). pub fn update Deserialize<'de>>( &self, key: &str, f: impl FnOnce(&mut V), ) -> anyhow::Result { // Phase 1: read the current value (read txn released before write txn) let current: Option = self.get(key)?; match current { None => Ok(false), Some(mut val) => { f(&mut val); self.put(key, &val)?; Ok(true) } } } /// Delete the value at `key`. Returns `true` if it existed. #[allow(dead_code)] pub fn delete(&self, key: &str) -> anyhow::Result { let mut wtxn = self.env.write_txn()?; let deleted = self.db.delete(&mut wtxn, key.as_bytes())?; wtxn.commit()?; Ok(deleted) } }