Stock/db/kysely-sqlite-driver.ts

113 lines
3.0 KiB
TypeScript
Raw Permalink Normal View History

2023-10-26 23:58:33 +09:00
/// The MIT License (MIT)
/// Copyright (c) 2023 Alex Gleason
/// Copyright (c) 2022 Sami Koskimäki
/// https://gitlab.com/soapbox-pub/kysely-deno-sqlite
import { CompiledQuery, type DatabaseConnection, type Driver, type QueryResult } from 'kysely';
import type { DenoSqlite, DenoSqliteDialectConfig } from './deno-sqlite-dialect-config.ts';
class DenoSqliteDriver implements Driver {
readonly #config: DenoSqliteDialectConfig;
readonly #connectionMutex = new ConnectionMutex();
#db?: DenoSqlite;
#connection?: DatabaseConnection;
constructor(config: DenoSqliteDialectConfig) {
this.#config = Object.freeze({ ...config });
}
async init(): Promise<void> {
this.#db = typeof this.#config.database === 'function' ? await this.#config.database() : this.#config.database;
this.#connection = new DenoSqliteConnection(this.#db);
if (this.#config.onCreateConnection) {
await this.#config.onCreateConnection(this.#connection);
}
}
async acquireConnection(): Promise<DatabaseConnection> {
// SQLite only has one single connection. We use a mutex here to wait
// until the single connection has been released.
await this.#connectionMutex.lock();
return this.#connection!;
}
async beginTransaction(connection: DatabaseConnection): Promise<void> {
await connection.executeQuery(CompiledQuery.raw('begin'));
}
async commitTransaction(connection: DatabaseConnection): Promise<void> {
await connection.executeQuery(CompiledQuery.raw('commit'));
}
async rollbackTransaction(connection: DatabaseConnection): Promise<void> {
await connection.executeQuery(CompiledQuery.raw('rollback'));
}
// deno-lint-ignore require-await
async releaseConnection(): Promise<void> {
this.#connectionMutex.unlock();
}
// deno-lint-ignore require-await
async destroy(): Promise<void> {
this.#db?.close();
}
}
class DenoSqliteConnection implements DatabaseConnection {
readonly #db: DenoSqlite;
constructor(db: DenoSqlite) {
this.#db = db;
}
executeQuery<O>({ sql, parameters }: CompiledQuery): Promise<QueryResult<O>> {
const rows = 'queryEntries' in this.#db
? this.#db.queryEntries(sql, parameters)
: this.#db.prepare(sql).all(...parameters);
const { changes, lastInsertRowId } = this.#db;
return Promise.resolve({
rows: rows as O[],
numAffectedRows: BigInt(changes),
insertId: BigInt(lastInsertRowId),
});
}
// deno-lint-ignore require-yield
async *streamQuery<R>(): AsyncIterableIterator<QueryResult<R>> {
throw new Error('Sqlite driver doesn\'t support streaming');
}
}
class ConnectionMutex {
#promise?: Promise<void>;
#resolve?: () => void;
async lock(): Promise<void> {
while (this.#promise) {
await this.#promise;
}
this.#promise = new Promise((resolve) => {
this.#resolve = resolve;
});
}
unlock(): void {
const resolve = this.#resolve;
this.#promise = undefined;
this.#resolve = undefined;
resolve?.();
}
}
export { DenoSqliteDriver };