Spaces:
Build error
Build error
File size: 5,390 Bytes
1295969 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | //! LMDB persistence layer using heed 0.20.
//!
//! Each store gets its own LMDB environment directory. Values are JSON-encoded,
//! keys are UTF-8 strings. All writes go through a single write transaction
//! that is committed synchronously — durability is guaranteed on fsync.
//!
//! Thread safety: heed's Env and Database are Send + Sync. All LMDB write
//! transactions are serialised by LMDB itself (only one writer at a time).
//!
//! Usage:
//! let store = LmdbStore::open("data/kyc_db", "records")?;
//! store.put("user123", &my_record)?;
//! let rec: Option<MyRecord> = store.get("user123")?;
use heed::types::Bytes;
use heed::{Database, Env, EnvOpenOptions};
use serde::{Deserialize, Serialize};
use tracing::error;
/// A named LMDB database inside a dedicated environment directory.
pub struct LmdbStore {
env: Env,
db: Database<Bytes, Bytes>,
}
// LMDB environments are safe to share across threads.
unsafe impl Send for LmdbStore {}
unsafe impl Sync for LmdbStore {}
impl LmdbStore {
/// Open (or create) an LMDB environment at `dir` and a named database inside it.
/// Idempotent: calling this multiple times on the same directory is safe.
pub fn open(dir: &str, db_name: &'static str) -> anyhow::Result<Self> {
std::fs::create_dir_all(dir)?;
// SAFETY: we are the sole process opening this environment directory.
// Do not open the same `dir` from multiple processes simultaneously.
let env = unsafe {
EnvOpenOptions::new()
.map_size(64 * 1024 * 1024) // 64 MiB
.max_dbs(16)
.open(dir)?
};
let mut wtxn = env.write_txn()?;
let db: Database<Bytes, Bytes> = env.create_database(&mut wtxn, Some(db_name))?;
wtxn.commit()?;
Ok(Self { env, db })
}
/// Write a JSON-serialised value under `key`. Durable after commit.
pub fn put<V: Serialize>(&self, key: &str, value: &V) -> anyhow::Result<()> {
let val_bytes = serde_json::to_vec(value)?;
let mut wtxn = self.env.write_txn()?;
self.db.put(&mut wtxn, key.as_bytes(), &val_bytes)?;
wtxn.commit()?;
Ok(())
}
/// Append `item` to a JSON array stored under `key`.
/// If the key does not exist, a new single-element array is created.
pub fn append<V: Serialize + for<'de> Deserialize<'de>>(
&self,
key: &str,
item: V,
) -> anyhow::Result<()> {
let mut wtxn = self.env.write_txn()?;
// Read existing list (to_vec eagerly so we release the borrow on wtxn)
let existing: Option<Vec<u8>> = self.db.get(&wtxn, key.as_bytes())?.map(|b| b.to_vec());
let mut list: Vec<V> = match existing {
None => vec![],
Some(bytes) => serde_json::from_slice(&bytes)?,
};
list.push(item);
let new_bytes = serde_json::to_vec(&list)?;
self.db.put(&mut wtxn, key.as_bytes(), &new_bytes)?;
wtxn.commit()?;
Ok(())
}
/// Read the value at `key`, returning `None` if absent.
pub fn get<V: for<'de> Deserialize<'de>>(&self, key: &str) -> anyhow::Result<Option<V>> {
let rtxn = self.env.read_txn()?;
match self.db.get(&rtxn, key.as_bytes())? {
None => Ok(None),
Some(bytes) => Ok(Some(serde_json::from_slice(bytes)?)),
}
}
/// Read a JSON array stored under `key`, returning an empty vec if absent.
pub fn get_list<V: for<'de> Deserialize<'de>>(&self, key: &str) -> anyhow::Result<Vec<V>> {
let rtxn = self.env.read_txn()?;
match self.db.get(&rtxn, key.as_bytes())? {
None => Ok(vec![]),
Some(bytes) => Ok(serde_json::from_slice(bytes)?),
}
}
/// Iterate all values in the database.
pub fn all_values<V: for<'de> Deserialize<'de>>(&self) -> anyhow::Result<Vec<V>> {
let rtxn = self.env.read_txn()?;
let mut out = Vec::new();
for result in self.db.iter(&rtxn)? {
let (_k, v) = result?;
match serde_json::from_slice::<V>(v) {
Ok(val) => out.push(val),
Err(e) => error!("persist: JSON decode error while scanning: {}", e),
}
}
Ok(out)
}
/// Read-modify-write under `key` in a single write transaction.
/// Returns `true` if the key existed and was updated, `false` if absent.
///
/// Note: reads first in a read-txn, then writes in a write-txn.
/// This is safe for the access patterns in this codebase (low concurrency).
pub fn update<V: Serialize + for<'de> Deserialize<'de>>(
&self,
key: &str,
f: impl FnOnce(&mut V),
) -> anyhow::Result<bool> {
// Phase 1: read the current value (read txn released before write txn)
let current: Option<V> = self.get(key)?;
match current {
None => Ok(false),
Some(mut val) => {
f(&mut val);
self.put(key, &val)?;
Ok(true)
}
}
}
/// Delete the value at `key`. Returns `true` if it existed.
#[allow(dead_code)]
pub fn delete(&self, key: &str) -> anyhow::Result<bool> {
let mut wtxn = self.env.write_txn()?;
let deleted = self.db.delete(&mut wtxn, key.as_bytes())?;
wtxn.commit()?;
Ok(deleted)
}
}
|