File size: 7,116 Bytes
bf20cb7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
#include "wayy_db/wal.hpp"
#include "wayy_db/database.hpp"

#include <array>
#include <cstring>
#include <filesystem>

namespace fs = std::filesystem;

namespace wayy_db {

// Simple CRC32 (IEEE polynomial)
static const std::array<uint32_t, 256> crc32_table = [] {
    std::array<uint32_t, 256> table{};
    for (uint32_t i = 0; i < 256; ++i) {
        uint32_t crc = i;
        for (int j = 0; j < 8; ++j) {
            crc = (crc >> 1) ^ ((crc & 1) ? 0xEDB88320u : 0);
        }
        table[i] = crc;
    }
    return table;
}();

WriteAheadLog::WriteAheadLog(const std::string& db_path) {
    fs::create_directories(db_path);
    path_ = db_path + "/wal.bin";
    open_for_append();
}

WriteAheadLog::~WriteAheadLog() {
    if (file_.is_open()) {
        file_.flush();
        file_.close();
    }
}

void WriteAheadLog::open_for_append() {
    if (file_.is_open()) file_.close();
    file_.open(path_, std::ios::binary | std::ios::app);
    if (!file_) {
        throw WayyException("Failed to open WAL file: " + path_);
    }
}

uint32_t WriteAheadLog::crc32(const uint8_t* data, size_t len) {
    uint32_t crc = 0xFFFFFFFF;
    for (size_t i = 0; i < len; ++i) {
        crc = crc32_table[(crc ^ data[i]) & 0xFF] ^ (crc >> 8);
    }
    return crc ^ 0xFFFFFFFF;
}

void WriteAheadLog::write_entry(WalOp op, const std::string& table, size_t row,
                                 const std::vector<uint8_t>& payload) {
    std::lock_guard<std::mutex> lock(mu_);

    // Build the entry in a buffer for CRC calculation
    std::vector<uint8_t> buf;
    buf.reserve(4 + 1 + 4 + table.size() + 8 + 4 + payload.size());

    // Magic
    uint32_t magic = WAL_MAGIC;
    buf.insert(buf.end(), reinterpret_cast<uint8_t*>(&magic),
               reinterpret_cast<uint8_t*>(&magic) + 4);

    // Op type
    buf.push_back(static_cast<uint8_t>(op));

    // Table name length + name
    uint32_t tlen = static_cast<uint32_t>(table.size());
    buf.insert(buf.end(), reinterpret_cast<uint8_t*>(&tlen),
               reinterpret_cast<uint8_t*>(&tlen) + 4);
    buf.insert(buf.end(), table.begin(), table.end());

    // Row ID
    uint64_t row_id = static_cast<uint64_t>(row);
    buf.insert(buf.end(), reinterpret_cast<uint8_t*>(&row_id),
               reinterpret_cast<uint8_t*>(&row_id) + 8);

    // Payload length + payload
    uint32_t plen = static_cast<uint32_t>(payload.size());
    buf.insert(buf.end(), reinterpret_cast<uint8_t*>(&plen),
               reinterpret_cast<uint8_t*>(&plen) + 4);
    buf.insert(buf.end(), payload.begin(), payload.end());

    // CRC32
    uint32_t checksum = crc32(buf.data(), buf.size());
    buf.insert(buf.end(), reinterpret_cast<uint8_t*>(&checksum),
               reinterpret_cast<uint8_t*>(&checksum) + 4);

    // Write to file
    file_.write(reinterpret_cast<const char*>(buf.data()),
                static_cast<std::streamsize>(buf.size()));
    file_.flush();
}

void WriteAheadLog::log_insert(const std::string& table, size_t row,
                                const std::vector<uint8_t>& data) {
    write_entry(WalOp::Insert, table, row, data);
}

void WriteAheadLog::log_update(const std::string& table, size_t row,
                                const std::string& col, const std::vector<uint8_t>& data) {
    // Encode column name + data as payload
    std::vector<uint8_t> payload;
    uint32_t clen = static_cast<uint32_t>(col.size());
    payload.insert(payload.end(), reinterpret_cast<uint8_t*>(&clen),
                   reinterpret_cast<uint8_t*>(&clen) + 4);
    payload.insert(payload.end(), col.begin(), col.end());
    payload.insert(payload.end(), data.begin(), data.end());
    write_entry(WalOp::Update, table, row, payload);
}

void WriteAheadLog::log_delete(const std::string& table, size_t row) {
    write_entry(WalOp::Delete, table, row, {});
}

void WriteAheadLog::checkpoint(Database& db) {
    std::lock_guard<std::mutex> lock(mu_);

    // Flush and close WAL
    if (file_.is_open()) {
        file_.flush();
        file_.close();
    }

    // Save all tables to disk
    db.save();

    // Truncate WAL (start fresh)
    file_.open(path_, std::ios::binary | std::ios::trunc);
    if (!file_) {
        throw WayyException("Failed to truncate WAL: " + path_);
    }
}

void WriteAheadLog::replay(Database& db) {
    if (!fs::exists(path_)) return;

    std::ifstream wal(path_, std::ios::binary);
    if (!wal) return;

    // Get file size
    wal.seekg(0, std::ios::end);
    auto file_size = wal.tellg();
    if (file_size <= 0) return;
    wal.seekg(0, std::ios::beg);

    size_t entries_replayed = 0;

    while (wal.good() && wal.tellg() < file_size) {
        auto entry_start = wal.tellg();

        // Read magic
        uint32_t magic = 0;
        wal.read(reinterpret_cast<char*>(&magic), 4);
        if (magic != WAL_MAGIC) break;  // Corrupt or end of valid entries

        // Read op
        uint8_t op_byte = 0;
        wal.read(reinterpret_cast<char*>(&op_byte), 1);
        auto op = static_cast<WalOp>(op_byte);

        // Read table name
        uint32_t tlen = 0;
        wal.read(reinterpret_cast<char*>(&tlen), 4);
        std::string table_name(tlen, '\0');
        wal.read(table_name.data(), tlen);

        // Read row ID
        uint64_t row_id = 0;
        wal.read(reinterpret_cast<char*>(&row_id), 8);

        // Read payload
        uint32_t plen = 0;
        wal.read(reinterpret_cast<char*>(&plen), 4);
        std::vector<uint8_t> payload(plen);
        if (plen > 0) {
            wal.read(reinterpret_cast<char*>(payload.data()), plen);
        }

        // Read CRC
        uint32_t stored_crc = 0;
        wal.read(reinterpret_cast<char*>(&stored_crc), 4);

        // Verify CRC (re-read the entry from start to before CRC)
        auto entry_end = wal.tellg();
        size_t entry_size = static_cast<size_t>(entry_end - entry_start) - 4;  // Exclude CRC
        wal.seekg(entry_start);
        std::vector<uint8_t> entry_data(entry_size);
        wal.read(reinterpret_cast<char*>(entry_data.data()), entry_size);
        wal.seekg(entry_end);  // Skip past CRC we already read

        uint32_t computed_crc = crc32(entry_data.data(), entry_data.size());
        if (computed_crc != stored_crc) {
            break;  // Corrupt entry, stop replay
        }

        // Apply operation (best-effort: skip if table doesn't exist)
        // The actual replay logic depends on the table having been loaded.
        // For now, we just count replayed entries. Full replay requires
        // deserializing the payload and calling table CRUD methods.
        // TODO: Implement full row-level replay when table schema is available.
        (void)op;
        (void)row_id;
        (void)table_name;

        ++entries_replayed;
    }

    // After replay, truncate WAL
    wal.close();
    if (entries_replayed > 0) {
        // Re-save state and clear WAL
        std::ofstream truncate(path_, std::ios::binary | std::ios::trunc);
    }
}

bool WriteAheadLog::has_entries() const {
    if (!fs::exists(path_)) return false;
    return fs::file_size(path_) > 0;
}

}  // namespace wayy_db