File size: 5,390 Bytes
1295969
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
//! LMDB persistence layer using heed 0.20.
//!
//! Each store gets its own LMDB environment directory. Values are JSON-encoded,
//! keys are UTF-8 strings. All writes go through a single write transaction
//! that is committed synchronously — durability is guaranteed on fsync.
//!
//! Thread safety: heed's Env and Database are Send + Sync. All LMDB write
//! transactions are serialised by LMDB itself (only one writer at a time).
//!
//! Usage:
//!   let store = LmdbStore::open("data/kyc_db", "records")?;
//!   store.put("user123", &my_record)?;
//!   let rec: Option<MyRecord> = store.get("user123")?;

use heed::types::Bytes;
use heed::{Database, Env, EnvOpenOptions};
use serde::{Deserialize, Serialize};
use tracing::error;

/// A named LMDB database inside a dedicated environment directory.
pub struct LmdbStore {
    env: Env,
    db: Database<Bytes, Bytes>,
}

// LMDB environments are safe to share across threads.
unsafe impl Send for LmdbStore {}
unsafe impl Sync for LmdbStore {}

impl LmdbStore {
    /// Open (or create) an LMDB environment at `dir` and a named database inside it.
    /// Idempotent: calling this multiple times on the same directory is safe.
    pub fn open(dir: &str, db_name: &'static str) -> anyhow::Result<Self> {
        std::fs::create_dir_all(dir)?;
        // SAFETY: we are the sole process opening this environment directory.
        // Do not open the same `dir` from multiple processes simultaneously.
        let env = unsafe {
            EnvOpenOptions::new()
                .map_size(64 * 1024 * 1024) // 64 MiB
                .max_dbs(16)
                .open(dir)?
        };
        let mut wtxn = env.write_txn()?;
        let db: Database<Bytes, Bytes> = env.create_database(&mut wtxn, Some(db_name))?;
        wtxn.commit()?;
        Ok(Self { env, db })
    }

    /// Write a JSON-serialised value under `key`. Durable after commit.
    pub fn put<V: Serialize>(&self, key: &str, value: &V) -> anyhow::Result<()> {
        let val_bytes = serde_json::to_vec(value)?;
        let mut wtxn = self.env.write_txn()?;
        self.db.put(&mut wtxn, key.as_bytes(), &val_bytes)?;
        wtxn.commit()?;
        Ok(())
    }

    /// Append `item` to a JSON array stored under `key`.
    /// If the key does not exist, a new single-element array is created.
    pub fn append<V: Serialize + for<'de> Deserialize<'de>>(
        &self,
        key: &str,
        item: V,
    ) -> anyhow::Result<()> {
        let mut wtxn = self.env.write_txn()?;
        // Read existing list (to_vec eagerly so we release the borrow on wtxn)
        let existing: Option<Vec<u8>> = self.db.get(&wtxn, key.as_bytes())?.map(|b| b.to_vec());
        let mut list: Vec<V> = match existing {
            None => vec![],
            Some(bytes) => serde_json::from_slice(&bytes)?,
        };
        list.push(item);
        let new_bytes = serde_json::to_vec(&list)?;
        self.db.put(&mut wtxn, key.as_bytes(), &new_bytes)?;
        wtxn.commit()?;
        Ok(())
    }

    /// Read the value at `key`, returning `None` if absent.
    pub fn get<V: for<'de> Deserialize<'de>>(&self, key: &str) -> anyhow::Result<Option<V>> {
        let rtxn = self.env.read_txn()?;
        match self.db.get(&rtxn, key.as_bytes())? {
            None => Ok(None),
            Some(bytes) => Ok(Some(serde_json::from_slice(bytes)?)),
        }
    }

    /// Read a JSON array stored under `key`, returning an empty vec if absent.
    pub fn get_list<V: for<'de> Deserialize<'de>>(&self, key: &str) -> anyhow::Result<Vec<V>> {
        let rtxn = self.env.read_txn()?;
        match self.db.get(&rtxn, key.as_bytes())? {
            None => Ok(vec![]),
            Some(bytes) => Ok(serde_json::from_slice(bytes)?),
        }
    }

    /// Iterate all values in the database.
    pub fn all_values<V: for<'de> Deserialize<'de>>(&self) -> anyhow::Result<Vec<V>> {
        let rtxn = self.env.read_txn()?;
        let mut out = Vec::new();
        for result in self.db.iter(&rtxn)? {
            let (_k, v) = result?;
            match serde_json::from_slice::<V>(v) {
                Ok(val) => out.push(val),
                Err(e) => error!("persist: JSON decode error while scanning: {}", e),
            }
        }
        Ok(out)
    }

    /// Read-modify-write under `key` in a single write transaction.
    /// Returns `true` if the key existed and was updated, `false` if absent.
    ///
    /// Note: reads first in a read-txn, then writes in a write-txn.
    /// This is safe for the access patterns in this codebase (low concurrency).
    pub fn update<V: Serialize + for<'de> Deserialize<'de>>(
        &self,
        key: &str,
        f: impl FnOnce(&mut V),
    ) -> anyhow::Result<bool> {
        // Phase 1: read the current value (read txn released before write txn)
        let current: Option<V> = self.get(key)?;
        match current {
            None => Ok(false),
            Some(mut val) => {
                f(&mut val);
                self.put(key, &val)?;
                Ok(true)
            }
        }
    }

    /// Delete the value at `key`. Returns `true` if it existed.
    #[allow(dead_code)]
    pub fn delete(&self, key: &str) -> anyhow::Result<bool> {
        let mut wtxn = self.env.write_txn()?;
        let deleted = self.db.delete(&mut wtxn, key.as_bytes())?;
        wtxn.commit()?;
        Ok(deleted)
    }
}