Spaces:
Sleeping
Sleeping
| import * as hrana from "@libsql/hrana-client"; | |
| import { LibsqlError } from "@libsql/core/api"; | |
| import { expandConfig } from "@libsql/core/config"; | |
| import { HranaTransaction, executeHranaBatch, stmtToHrana, resultSetFromHrana, mapHranaError, } from "./hrana.js"; | |
| import { SqlCache } from "./sql_cache.js"; | |
| import { encodeBaseUrl } from "@libsql/core/uri"; | |
| import { supportedUrlLink } from "@libsql/core/util"; | |
| import promiseLimit from "promise-limit"; | |
| export * from "@libsql/core/api"; | |
| export function createClient(config) { | |
| return _createClient(expandConfig(config, true)); | |
| } | |
| /** @private */ | |
| export function _createClient(config) { | |
| if (config.scheme !== "https" && config.scheme !== "http") { | |
| throw new LibsqlError('The HTTP client supports only "libsql:", "https:" and "http:" URLs, ' + | |
| `got ${JSON.stringify(config.scheme + ":")}. For more information, please read ${supportedUrlLink}`, "URL_SCHEME_NOT_SUPPORTED"); | |
| } | |
| if (config.encryptionKey !== undefined) { | |
| throw new LibsqlError("Encryption key is not supported by the remote client.", "ENCRYPTION_KEY_NOT_SUPPORTED"); | |
| } | |
| if (config.scheme === "http" && config.tls) { | |
| throw new LibsqlError(`A "http:" URL cannot opt into TLS by using ?tls=1`, "URL_INVALID"); | |
| } | |
| else if (config.scheme === "https" && !config.tls) { | |
| throw new LibsqlError(`A "https:" URL cannot opt out of TLS by using ?tls=0`, "URL_INVALID"); | |
| } | |
| const url = encodeBaseUrl(config.scheme, config.authority, config.path); | |
| return new HttpClient(url, config.authToken, config.intMode, config.fetch, config.concurrency, config.remoteEncryptionKey); | |
| } | |
| const sqlCacheCapacity = 30; | |
| export class HttpClient { | |
| #client; | |
| protocol; | |
| #url; | |
| #intMode; | |
| #customFetch; | |
| #concurrency; | |
| #authToken; | |
| #remoteEncryptionKey; | |
| #promiseLimitFunction; | |
| /** @private */ | |
| constructor(url, authToken, intMode, customFetch, concurrency, remoteEncryptionKey) { | |
| this.#url = url; | |
| this.#authToken = authToken; | |
| this.#intMode = intMode; | |
| this.#customFetch = customFetch; | |
| this.#concurrency = concurrency; | |
| this.#remoteEncryptionKey = remoteEncryptionKey; | |
| this.#client = hrana.openHttp(this.#url, this.#authToken, this.#customFetch, remoteEncryptionKey); | |
| this.#client.intMode = this.#intMode; | |
| this.protocol = "http"; | |
| this.#promiseLimitFunction = promiseLimit(this.#concurrency); | |
| } | |
| async limit(fn) { | |
| return this.#promiseLimitFunction(fn); | |
| } | |
| async execute(stmtOrSql, args) { | |
| let stmt; | |
| if (typeof stmtOrSql === "string") { | |
| stmt = { | |
| sql: stmtOrSql, | |
| args: args || [], | |
| }; | |
| } | |
| else { | |
| stmt = stmtOrSql; | |
| } | |
| return this.limit(async () => { | |
| try { | |
| const hranaStmt = stmtToHrana(stmt); | |
| // Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the statement and | |
| // close the stream in a single HTTP request. | |
| let rowsPromise; | |
| const stream = this.#client.openStream(); | |
| try { | |
| rowsPromise = stream.query(hranaStmt); | |
| } | |
| finally { | |
| stream.closeGracefully(); | |
| } | |
| const rowsResult = await rowsPromise; | |
| return resultSetFromHrana(rowsResult); | |
| } | |
| catch (e) { | |
| throw mapHranaError(e); | |
| } | |
| }); | |
| } | |
| async batch(stmts, mode = "deferred") { | |
| return this.limit(async () => { | |
| try { | |
| const normalizedStmts = stmts.map((stmt) => { | |
| if (Array.isArray(stmt)) { | |
| return { | |
| sql: stmt[0], | |
| args: stmt[1] || [], | |
| }; | |
| } | |
| return stmt; | |
| }); | |
| const hranaStmts = normalizedStmts.map(stmtToHrana); | |
| const version = await this.#client.getVersion(); | |
| // Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the batch and | |
| // close the stream in a single HTTP request. | |
| let resultsPromise; | |
| const stream = this.#client.openStream(); | |
| try { | |
| // It makes sense to use a SQL cache even for a single batch, because it may contain the same | |
| // statement repeated multiple times. | |
| const sqlCache = new SqlCache(stream, sqlCacheCapacity); | |
| sqlCache.apply(hranaStmts); | |
| // TODO: we do not use a cursor here, because it would cause three roundtrips: | |
| // 1. pipeline request to store SQL texts | |
| // 2. cursor request | |
| // 3. pipeline request to close the stream | |
| const batch = stream.batch(false); | |
| resultsPromise = executeHranaBatch(mode, version, batch, hranaStmts); | |
| } | |
| finally { | |
| stream.closeGracefully(); | |
| } | |
| const results = await resultsPromise; | |
| return results; | |
| } | |
| catch (e) { | |
| throw mapHranaError(e); | |
| } | |
| }); | |
| } | |
| async migrate(stmts) { | |
| return this.limit(async () => { | |
| try { | |
| const hranaStmts = stmts.map(stmtToHrana); | |
| const version = await this.#client.getVersion(); | |
| // Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the batch and | |
| // close the stream in a single HTTP request. | |
| let resultsPromise; | |
| const stream = this.#client.openStream(); | |
| try { | |
| const batch = stream.batch(false); | |
| resultsPromise = executeHranaBatch("deferred", version, batch, hranaStmts, true); | |
| } | |
| finally { | |
| stream.closeGracefully(); | |
| } | |
| const results = await resultsPromise; | |
| return results; | |
| } | |
| catch (e) { | |
| throw mapHranaError(e); | |
| } | |
| }); | |
| } | |
| async transaction(mode = "write") { | |
| return this.limit(async () => { | |
| try { | |
| const version = await this.#client.getVersion(); | |
| return new HttpTransaction(this.#client.openStream(), mode, version); | |
| } | |
| catch (e) { | |
| throw mapHranaError(e); | |
| } | |
| }); | |
| } | |
| async executeMultiple(sql) { | |
| return this.limit(async () => { | |
| try { | |
| // Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the sequence and | |
| // close the stream in a single HTTP request. | |
| let promise; | |
| const stream = this.#client.openStream(); | |
| try { | |
| promise = stream.sequence(sql); | |
| } | |
| finally { | |
| stream.closeGracefully(); | |
| } | |
| await promise; | |
| } | |
| catch (e) { | |
| throw mapHranaError(e); | |
| } | |
| }); | |
| } | |
| sync() { | |
| throw new LibsqlError("sync not supported in http mode", "SYNC_NOT_SUPPORTED"); | |
| } | |
| close() { | |
| this.#client.close(); | |
| } | |
| async reconnect() { | |
| try { | |
| if (!this.closed) { | |
| // Abort in-flight ops and free resources | |
| this.#client.close(); | |
| } | |
| } | |
| finally { | |
| // Recreate the underlying hrana client | |
| this.#client = hrana.openHttp(this.#url, this.#authToken, this.#customFetch, this.#remoteEncryptionKey); | |
| this.#client.intMode = this.#intMode; | |
| } | |
| } | |
| get closed() { | |
| return this.#client.closed; | |
| } | |
| } | |
| export class HttpTransaction extends HranaTransaction { | |
| #stream; | |
| #sqlCache; | |
| /** @private */ | |
| constructor(stream, mode, version) { | |
| super(mode, version); | |
| this.#stream = stream; | |
| this.#sqlCache = new SqlCache(stream, sqlCacheCapacity); | |
| } | |
| /** @private */ | |
| _getStream() { | |
| return this.#stream; | |
| } | |
| /** @private */ | |
| _getSqlCache() { | |
| return this.#sqlCache; | |
| } | |
| close() { | |
| this.#stream.close(); | |
| } | |
| get closed() { | |
| return this.#stream.closed; | |
| } | |
| } | |