sqlite: add support for SQLite Session Extension
Some checks failed
Coverage Linux (without intl) / coverage-linux-without-intl (push) Waiting to run
Coverage Linux / coverage-linux (push) Waiting to run
Coverage Windows / coverage-windows (push) Waiting to run
Test and upload documentation to artifacts / build-docs (push) Waiting to run
Linters / lint-addon-docs (push) Waiting to run
Linters / lint-cpp (push) Waiting to run
Linters / format-cpp (push) Waiting to run
Linters / lint-js-and-md (push) Waiting to run
Linters / lint-py (push) Waiting to run
Linters / lint-yaml (push) Waiting to run
Linters / lint-sh (push) Waiting to run
Linters / lint-codeowners (push) Waiting to run
Linters / lint-pr-url (push) Waiting to run
Linters / lint-readme (push) Waiting to run
Notify on Push / Notify on Force Push on `main` (push) Waiting to run
Notify on Push / Notify on Push on `main` that lacks metadata (push) Waiting to run
Scorecard supply-chain security / Scorecard analysis (push) Has been cancelled
Find inactive TSC voting members / find (push) Has been cancelled

PR-URL: https://github.com/nodejs/node/pull/54181
Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
This commit is contained in:
Bart Louwers 2024-11-18 01:57:04 +01:00 committed by GitHub
parent d159c97320
commit 746b17e1a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 842 additions and 46 deletions

View File

@ -12,6 +12,10 @@
'xcode_settings': {
'GCC_SYMBOLS_PRIVATE_EXTERN': 'YES', # -fvisibility=hidden
},
'defines': [
'SQLITE_ENABLE_SESSION',
'SQLITE_ENABLE_PREUPDATE_HOOK'
],
'include_dirs': ['.'],
'sources': [
'<@(sqlite_sources)',

View File

@ -155,6 +155,70 @@ added: v22.5.0
Compiles a SQL statement into a [prepared statement][]. This method is a wrapper
around [`sqlite3_prepare_v2()`][].
### `database.createSession([options])`
* `options` {Object} The configuration options for the session.
* `table` {string} A specific table to track changes for. By default, changes to all tables are tracked.
* `db` {string} Name of the database to track. This is useful when multiple databases have been added using [`ATTACH DATABASE`][]. **Default**: `'main'`.
* Returns: {Session} A session handle.
Creates and attaches a session to the database. This method is a wrapper around [`sqlite3session_create()`][] and [`sqlite3session_attach()`][].
### `database.applyChangeset(changeset[, options])`
* `changeset` {Uint8Array} A binary changeset or patchset.
* `options` {Object} The configuration options for how the changes will be applied.
* `filter` {Function} Skip changes that, when targeted table name is supplied to this function, return a truthy value.
By default, all changes are attempted.
* `onConflict` {number} Determines how conflicts are handled. **Default**: `SQLITE_CHANGESET_ABORT`.
* `SQLITE_CHANGESET_OMIT`: conflicting changes are omitted.
* `SQLITE_CHANGESET_REPLACE`: conflicting changes replace existing values.
* `SQLITE_CHANGESET_ABORT`: abort on conflict and roll back databsase.
* Returns: {boolean} Whether the changeset was applied succesfully without being aborted.
An exception is thrown if the database is not
open. This method is a wrapper around [`sqlite3changeset_apply()`][].
```js
const sourceDb = new DatabaseSync(':memory:');
const targetDb = new DatabaseSync(':memory:');
sourceDb.exec('CREATE TABLE data(key INTEGER PRIMARY KEY, value TEXT)');
targetDb.exec('CREATE TABLE data(key INTEGER PRIMARY KEY, value TEXT)');
const session = sourceDb.createSession();
const insert = sourceDb.prepare('INSERT INTO data (key, value) VALUES (?, ?)');
insert.run(1, 'hello');
insert.run(2, 'world');
const changeset = session.changeset();
targetDb.applyChangeset(changeset);
// Now that the changeset has been applied, targetDb contains the same data as sourceDb.
```
## Class: `Session`
### `session.changeset()`
* Returns: {Uint8Array} Binary changeset that can be applied to other databases.
Retrieves a changeset containing all changes since the changeset was created. Can be called multiple times.
An exception is thrown if the database or the session is not open. This method is a wrapper around [`sqlite3session_changeset()`][].
### `session.patchset()`
* Returns: {Uint8Array} Binary patchset that can be applied to other databases.
Similar to the method above, but generates a more compact patchset. See [Changesets and Patchsets][]
in the documentation of SQLite. An exception is thrown if the database or the session is not open. This method is a
wrapper around [`sqlite3session_patchset()`][].
### `session.close()`.
Closes the session. An exception is thrown if the database or the session is not open. This method is a
wrapper around [`sqlite3session_delete()`][].
## Class: `StatementSync`
<!-- YAML
@ -317,8 +381,39 @@ exception.
| `TEXT` | {string} |
| `BLOB` | {Uint8Array} |
## SQLite constants
The following constants are exported by the `node:sqlite` module.
### SQLite Session constants
#### Conflict-resolution constants
The following constants are meant for use with [`database.applyChangeset()`](#databaseapplychangesetchangeset-options).
<table>
<tr>
<th>Constant</th>
<th>Description</th>
</tr>
<tr>
<td><code>SQLITE_CHANGESET_OMIT</code></td>
<td>Conflicting changes are omitted.</td>
</tr>
<tr>
<td><code>SQLITE_CHANGESET_REPLACE</code></td>
<td>Conflicting changes replace existing values.</td>
</tr>
<tr>
<td><code>SQLITE_CHANGESET_ABORT</code></td>
<td>Abort when a change encounters a conflict and roll back databsase.</td>
</tr>
</table>
[Changesets and Patchsets]: https://www.sqlite.org/sessionintro.html#changesets_and_patchsets
[SQL injection]: https://en.wikipedia.org/wiki/SQL_injection
[`--experimental-sqlite`]: cli.md#--experimental-sqlite
[`ATTACH DATABASE`]: https://www.sqlite.org/lang_attach.html
[`PRAGMA foreign_keys`]: https://www.sqlite.org/pragma.html#pragma_foreign_keys
[`sqlite3_changes64()`]: https://www.sqlite.org/c3ref/changes.html
[`sqlite3_close_v2()`]: https://www.sqlite.org/c3ref/close.html
@ -327,6 +422,12 @@ exception.
[`sqlite3_last_insert_rowid()`]: https://www.sqlite.org/c3ref/last_insert_rowid.html
[`sqlite3_prepare_v2()`]: https://www.sqlite.org/c3ref/prepare.html
[`sqlite3_sql()`]: https://www.sqlite.org/c3ref/expanded_sql.html
[`sqlite3changeset_apply()`]: https://www.sqlite.org/session/sqlite3changeset_apply.html
[`sqlite3session_attach()`]: https://www.sqlite.org/session/sqlite3session_attach.html
[`sqlite3session_changeset()`]: https://www.sqlite.org/session/sqlite3session_changeset.html
[`sqlite3session_create()`]: https://www.sqlite.org/session/sqlite3session_create.html
[`sqlite3session_delete()`]: https://www.sqlite.org/session/sqlite3session_delete.html
[`sqlite3session_patchset()`]: https://www.sqlite.org/session/sqlite3session_patchset.html
[connection]: https://www.sqlite.org/c3ref/sqlite3.html
[data types]: https://www.sqlite.org/datatype3.html
[double-quoted string literals]: https://www.sqlite.org/quirks.html#dblquote

View File

@ -882,6 +882,7 @@
# Warn when using deprecated V8 APIs.
'V8_DEPRECATION_WARNINGS=1',
'NODE_OPENSSL_SYSTEM_CERT_PATH="<(openssl_system_ca_path)"',
"SQLITE_ENABLE_SESSION"
],
# - "C4244: conversion from 'type1' to 'type2', possible loss of data"

View File

@ -160,6 +160,7 @@
V(fields_string, "fields") \
V(file_string, "file") \
V(filename_string, "filename") \
V(filter_string, "filter") \
V(fingerprint256_string, "fingerprint256") \
V(fingerprint512_string, "fingerprint512") \
V(fingerprint_string, "fingerprint") \
@ -246,6 +247,7 @@
V(onchange_string, "onchange") \
V(onclienthello_string, "onclienthello") \
V(oncomplete_string, "oncomplete") \
V(onconflict_string, "onConflict") \
V(onconnection_string, "onconnection") \
V(ondone_string, "ondone") \
V(onerror_string, "onerror") \
@ -413,6 +415,7 @@
V(shutdown_wrap_template, v8::ObjectTemplate) \
V(socketaddress_constructor_template, v8::FunctionTemplate) \
V(sqlite_statement_sync_constructor_template, v8::FunctionTemplate) \
V(sqlite_session_constructor_template, v8::FunctionTemplate) \
V(streambaseentry_ctor_template, v8::FunctionTemplate) \
V(streambaseoutputstream_constructor_template, v8::ObjectTemplate) \
V(streamentry_ctor_template, v8::FunctionTemplate) \

View File

@ -22,6 +22,7 @@ using v8::ConstructorBehavior;
using v8::Context;
using v8::DontDelete;
using v8::Exception;
using v8::Function;
using v8::FunctionCallback;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
@ -31,6 +32,7 @@ using v8::Local;
using v8::LocalVector;
using v8::MaybeLocal;
using v8::Name;
using v8::NewStringType;
using v8::Null;
using v8::Number;
using v8::Object;
@ -51,7 +53,7 @@ using v8::Value;
#define THROW_AND_RETURN_ON_BAD_STATE(env, condition, msg) \
do { \
if ((condition)) { \
node::THROW_ERR_INVALID_STATE((env), (msg)); \
THROW_ERR_INVALID_STATE((env), (msg)); \
return; \
} \
} while (0)
@ -121,9 +123,19 @@ DatabaseSync::DatabaseSync(Environment* env,
}
}
void DatabaseSync::DeleteSessions() {
// all attached sessions need to be deleted before the database is closed
// https://www.sqlite.org/session/sqlite3session_create.html
for (auto* session : sessions_) {
sqlite3session_delete(session);
}
sessions_.clear();
}
DatabaseSync::~DatabaseSync() {
if (IsOpen()) {
FinalizeStatements();
DeleteSessions();
sqlite3_close_v2(connection_);
connection_ = nullptr;
}
@ -137,7 +149,7 @@ void DatabaseSync::MemoryInfo(MemoryTracker* tracker) const {
bool DatabaseSync::Open() {
if (IsOpen()) {
node::THROW_ERR_INVALID_STATE(env(), "database is already open");
THROW_ERR_INVALID_STATE(env(), "database is already open");
return false;
}
@ -204,21 +216,21 @@ void DatabaseSync::New(const FunctionCallbackInfo<Value>& args) {
}
if (!args[0]->IsString()) {
node::THROW_ERR_INVALID_ARG_TYPE(env->isolate(),
"The \"path\" argument must be a string.");
THROW_ERR_INVALID_ARG_TYPE(env->isolate(),
"The \"path\" argument must be a string.");
return;
}
std::string location =
node::Utf8Value(env->isolate(), args[0].As<String>()).ToString();
Utf8Value(env->isolate(), args[0].As<String>()).ToString();
DatabaseOpenConfiguration open_config(std::move(location));
bool open = true;
if (args.Length() > 1) {
if (!args[1]->IsObject()) {
node::THROW_ERR_INVALID_ARG_TYPE(
env->isolate(), "The \"options\" argument must be an object.");
THROW_ERR_INVALID_ARG_TYPE(env->isolate(),
"The \"options\" argument must be an object.");
return;
}
@ -230,7 +242,7 @@ void DatabaseSync::New(const FunctionCallbackInfo<Value>& args) {
}
if (!open_v->IsUndefined()) {
if (!open_v->IsBoolean()) {
node::THROW_ERR_INVALID_ARG_TYPE(
THROW_ERR_INVALID_ARG_TYPE(
env->isolate(), "The \"options.open\" argument must be a boolean.");
return;
}
@ -245,7 +257,7 @@ void DatabaseSync::New(const FunctionCallbackInfo<Value>& args) {
}
if (!read_only_v->IsUndefined()) {
if (!read_only_v->IsBoolean()) {
node::THROW_ERR_INVALID_ARG_TYPE(
THROW_ERR_INVALID_ARG_TYPE(
env->isolate(),
"The \"options.readOnly\" argument must be a boolean.");
return;
@ -262,7 +274,7 @@ void DatabaseSync::New(const FunctionCallbackInfo<Value>& args) {
}
if (!enable_foreign_keys_v->IsUndefined()) {
if (!enable_foreign_keys_v->IsBoolean()) {
node::THROW_ERR_INVALID_ARG_TYPE(
THROW_ERR_INVALID_ARG_TYPE(
env->isolate(),
"The \"options.enableForeignKeyConstraints\" argument must be a "
"boolean.");
@ -281,7 +293,7 @@ void DatabaseSync::New(const FunctionCallbackInfo<Value>& args) {
}
if (!enable_dqs_v->IsUndefined()) {
if (!enable_dqs_v->IsBoolean()) {
node::THROW_ERR_INVALID_ARG_TYPE(
THROW_ERR_INVALID_ARG_TYPE(
env->isolate(),
"The \"options.enableDoubleQuotedStringLiterals\" argument must be "
"a boolean.");
@ -306,6 +318,7 @@ void DatabaseSync::Close(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
THROW_AND_RETURN_ON_BAD_STATE(env, !db->IsOpen(), "database is not open");
db->FinalizeStatements();
db->DeleteSessions();
int r = sqlite3_close_v2(db->connection_);
CHECK_ERROR_OR_THROW(env->isolate(), db->connection_, r, SQLITE_OK, void());
db->connection_ = nullptr;
@ -318,12 +331,12 @@ void DatabaseSync::Prepare(const FunctionCallbackInfo<Value>& args) {
THROW_AND_RETURN_ON_BAD_STATE(env, !db->IsOpen(), "database is not open");
if (!args[0]->IsString()) {
node::THROW_ERR_INVALID_ARG_TYPE(env->isolate(),
"The \"sql\" argument must be a string.");
THROW_ERR_INVALID_ARG_TYPE(env->isolate(),
"The \"sql\" argument must be a string.");
return;
}
node::Utf8Value sql(env->isolate(), args[0].As<String>());
Utf8Value sql(env->isolate(), args[0].As<String>());
sqlite3_stmt* s = nullptr;
int r = sqlite3_prepare_v2(db->connection_, *sql, -1, &s, 0);
CHECK_ERROR_OR_THROW(env->isolate(), db->connection_, r, SQLITE_OK, void());
@ -339,16 +352,179 @@ void DatabaseSync::Exec(const FunctionCallbackInfo<Value>& args) {
THROW_AND_RETURN_ON_BAD_STATE(env, !db->IsOpen(), "database is not open");
if (!args[0]->IsString()) {
node::THROW_ERR_INVALID_ARG_TYPE(env->isolate(),
"The \"sql\" argument must be a string.");
THROW_ERR_INVALID_ARG_TYPE(env->isolate(),
"The \"sql\" argument must be a string.");
return;
}
node::Utf8Value sql(env->isolate(), args[0].As<String>());
Utf8Value sql(env->isolate(), args[0].As<String>());
int r = sqlite3_exec(db->connection_, *sql, nullptr, nullptr, nullptr);
CHECK_ERROR_OR_THROW(env->isolate(), db->connection_, r, SQLITE_OK, void());
}
void DatabaseSync::CreateSession(const FunctionCallbackInfo<Value>& args) {
std::string table;
std::string db_name = "main";
Environment* env = Environment::GetCurrent(args);
if (args.Length() > 0) {
if (!args[0]->IsObject()) {
THROW_ERR_INVALID_ARG_TYPE(env->isolate(),
"The \"options\" argument must be an object.");
return;
}
Local<Object> options = args[0].As<Object>();
Local<String> table_key = FIXED_ONE_BYTE_STRING(env->isolate(), "table");
if (options->HasOwnProperty(env->context(), table_key).FromJust()) {
Local<Value> table_value;
if (!options->Get(env->context(), table_key).ToLocal(&table_value)) {
return;
}
if (table_value->IsString()) {
String::Utf8Value str(env->isolate(), table_value);
table = *str;
} else {
THROW_ERR_INVALID_ARG_TYPE(
env->isolate(), "The \"options.table\" argument must be a string.");
return;
}
}
Local<String> db_key =
String::NewFromUtf8(env->isolate(), "db", NewStringType::kNormal)
.ToLocalChecked();
if (options->HasOwnProperty(env->context(), db_key).FromJust()) {
Local<Value> db_value =
options->Get(env->context(), db_key).ToLocalChecked();
if (db_value->IsString()) {
String::Utf8Value str(env->isolate(), db_value);
db_name = std::string(*str);
} else {
THROW_ERR_INVALID_ARG_TYPE(
env->isolate(), "The \"options.db\" argument must be a string.");
return;
}
}
}
DatabaseSync* db;
ASSIGN_OR_RETURN_UNWRAP(&db, args.This());
THROW_AND_RETURN_ON_BAD_STATE(env, !db->IsOpen(), "database is not open");
sqlite3_session* pSession;
int r = sqlite3session_create(db->connection_, db_name.c_str(), &pSession);
CHECK_ERROR_OR_THROW(env->isolate(), db->connection_, r, SQLITE_OK, void());
db->sessions_.insert(pSession);
r = sqlite3session_attach(pSession, table == "" ? nullptr : table.c_str());
CHECK_ERROR_OR_THROW(env->isolate(), db->connection_, r, SQLITE_OK, void());
BaseObjectPtr<Session> session =
Session::Create(env, BaseObjectWeakPtr<DatabaseSync>(db), pSession);
args.GetReturnValue().Set(session->object());
}
// the reason for using static functions here is that SQLite needs a
// function pointer
static std::function<int()> conflictCallback;
static int xConflict(void* pCtx, int eConflict, sqlite3_changeset_iter* pIter) {
if (!conflictCallback) return SQLITE_CHANGESET_ABORT;
return conflictCallback();
}
static std::function<bool(std::string)> filterCallback;
static int xFilter(void* pCtx, const char* zTab) {
if (!filterCallback) return 1;
return filterCallback(zTab) ? 1 : 0;
}
void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
conflictCallback = nullptr;
filterCallback = nullptr;
DatabaseSync* db;
ASSIGN_OR_RETURN_UNWRAP(&db, args.This());
Environment* env = Environment::GetCurrent(args);
THROW_AND_RETURN_ON_BAD_STATE(env, !db->IsOpen(), "database is not open");
if (!args[0]->IsUint8Array()) {
THROW_ERR_INVALID_ARG_TYPE(
env->isolate(), "The \"changeset\" argument must be a Uint8Array.");
return;
}
if (args.Length() > 1 && !args[1]->IsUndefined()) {
if (!args[1]->IsObject()) {
THROW_ERR_INVALID_ARG_TYPE(env->isolate(),
"The \"options\" argument must be an object.");
return;
}
Local<Object> options = args[1].As<Object>();
Local<Value> conflictValue =
options->Get(env->context(), env->onconflict_string()).ToLocalChecked();
if (!conflictValue->IsUndefined()) {
if (!conflictValue->IsNumber()) {
THROW_ERR_INVALID_ARG_TYPE(
env->isolate(),
"The \"options.onConflict\" argument must be a number.");
return;
}
int conflictInt = conflictValue->Int32Value(env->context()).FromJust();
conflictCallback = [conflictInt]() -> int { return conflictInt; };
}
if (options->HasOwnProperty(env->context(), env->filter_string())
.FromJust()) {
Local<Value> filterValue =
options->Get(env->context(), env->filter_string()).ToLocalChecked();
if (!filterValue->IsFunction()) {
THROW_ERR_INVALID_ARG_TYPE(
env->isolate(),
"The \"options.filter\" argument must be a function.");
return;
}
Local<Function> filterFunc = filterValue.As<Function>();
filterCallback = [env, filterFunc](std::string item) -> bool {
Local<Value> argv[] = {String::NewFromUtf8(env->isolate(),
item.c_str(),
NewStringType::kNormal)
.ToLocalChecked()};
Local<Value> result =
filterFunc->Call(env->context(), Null(env->isolate()), 1, argv)
.ToLocalChecked();
return result->BooleanValue(env->isolate());
};
}
}
ArrayBufferViewContents<uint8_t> buf(args[0]);
int r = sqlite3changeset_apply(
db->connection_,
buf.length(),
const_cast<void*>(static_cast<const void*>(buf.data())),
xFilter,
xConflict,
nullptr);
if (r == SQLITE_ABORT) {
args.GetReturnValue().Set(false);
return;
}
CHECK_ERROR_OR_THROW(env->isolate(), db->connection_, r, SQLITE_OK, void());
args.GetReturnValue().Set(true);
}
StatementSync::StatementSync(Environment* env,
Local<Object> object,
DatabaseSync* db,
@ -412,7 +588,7 @@ bool StatementSync::BindParams(const FunctionCallbackInfo<Value>& args) {
if (insertion.second == false) {
auto existing_full_name = (*insertion.first).second;
if (full_name != existing_full_name) {
node::THROW_ERR_INVALID_STATE(
THROW_ERR_INVALID_STATE(
env(),
"Cannot create bare named parameter '%s' because of "
"conflicting names '%s' and '%s'.",
@ -432,7 +608,7 @@ bool StatementSync::BindParams(const FunctionCallbackInfo<Value>& args) {
return false;
}
node::Utf8Value utf8_key(env()->isolate(), key);
Utf8Value utf8_key(env()->isolate(), key);
int r = sqlite3_bind_parameter_index(statement_, *utf8_key);
if (r == 0) {
if (allow_bare_named_params_) {
@ -444,7 +620,7 @@ bool StatementSync::BindParams(const FunctionCallbackInfo<Value>& args) {
}
if (r == 0) {
node::THROW_ERR_INVALID_STATE(
THROW_ERR_INVALID_STATE(
env(), "Unknown named parameter '%s'", *utf8_key);
return false;
}
@ -488,7 +664,7 @@ bool StatementSync::BindValue(const Local<Value>& value, const int index) {
double val = value.As<Number>()->Value();
r = sqlite3_bind_double(statement_, index, val);
} else if (value->IsString()) {
node::Utf8Value val(env()->isolate(), value.As<String>());
Utf8Value val(env()->isolate(), value.As<String>());
r = sqlite3_bind_text(
statement_, index, *val, val.length(), SQLITE_TRANSIENT);
} else if (value->IsNull()) {
@ -501,13 +677,12 @@ bool StatementSync::BindValue(const Local<Value>& value, const int index) {
bool lossless;
int64_t as_int = value.As<BigInt>()->Int64Value(&lossless);
if (!lossless) {
node::THROW_ERR_INVALID_ARG_VALUE(env(),
"BigInt value is too large to bind.");
THROW_ERR_INVALID_ARG_VALUE(env(), "BigInt value is too large to bind.");
return false;
}
r = sqlite3_bind_int64(statement_, index, as_int);
} else {
node::THROW_ERR_INVALID_ARG_TYPE(
THROW_ERR_INVALID_ARG_TYPE(
env()->isolate(),
"Provided value cannot be bound to SQLite parameter %d.",
index);
@ -564,8 +739,7 @@ MaybeLocal<Value> StatementSync::ColumnToValue(const int column) {
MaybeLocal<Name> StatementSync::ColumnNameToName(const int column) {
const char* col_name = sqlite3_column_name(statement_, column);
if (col_name == nullptr) {
node::THROW_ERR_INVALID_STATE(
env(), "Cannot get name of column %d", column);
THROW_ERR_INVALID_STATE(env(), "Cannot get name of column %d", column);
return MaybeLocal<Name>();
}
@ -591,32 +765,23 @@ void StatementSync::All(const FunctionCallbackInfo<Value>& args) {
auto reset = OnScopeLeave([&]() { sqlite3_reset(stmt->statement_); });
int num_cols = sqlite3_column_count(stmt->statement_);
LocalVector<Value> rows(isolate);
LocalVector<Name> row_keys(isolate);
while ((r = sqlite3_step(stmt->statement_)) == SQLITE_ROW) {
if (row_keys.size() == 0) {
row_keys.reserve(num_cols);
for (int i = 0; i < num_cols; ++i) {
Local<Name> key;
if (!stmt->ColumnNameToName(i).ToLocal(&key)) return;
row_keys.emplace_back(key);
}
}
LocalVector<Name> row_keys(isolate);
row_keys.reserve(num_cols);
LocalVector<Value> row_values(isolate);
row_values.reserve(num_cols);
for (size_t i = 0; i < row_keys.size(); ++i) {
for (int i = 0; i < num_cols; ++i) {
Local<Name> key;
if (!stmt->ColumnNameToName(i).ToLocal(&key)) return;
Local<Value> val;
if (!stmt->ColumnToValue(i).ToLocal(&val)) return;
row_keys.emplace_back(key);
row_values.emplace_back(val);
}
Local<Object> row = Object::New(isolate,
Null(isolate),
row_keys.data(),
row_values.data(),
row_keys.size());
Local<Object> row = Object::New(
isolate, Null(isolate), row_keys.data(), row_values.data(), num_cols);
rows.emplace_back(row);
}
@ -766,7 +931,7 @@ void StatementSync::SetAllowBareNamedParameters(
env, stmt->IsFinalized(), "statement has been finalized");
if (!args[0]->IsBoolean()) {
node::THROW_ERR_INVALID_ARG_TYPE(
THROW_ERR_INVALID_ARG_TYPE(
env->isolate(),
"The \"allowBareNamedParameters\" argument must be a boolean.");
return;
@ -783,7 +948,7 @@ void StatementSync::SetReadBigInts(const FunctionCallbackInfo<Value>& args) {
env, stmt->IsFinalized(), "statement has been finalized");
if (!args[0]->IsBoolean()) {
node::THROW_ERR_INVALID_ARG_TYPE(
THROW_ERR_INVALID_ARG_TYPE(
env->isolate(), "The \"readBigInts\" argument must be a boolean.");
return;
}
@ -792,7 +957,7 @@ void StatementSync::SetReadBigInts(const FunctionCallbackInfo<Value>& args) {
}
void IllegalConstructor(const FunctionCallbackInfo<Value>& args) {
node::THROW_ERR_ILLEGAL_CONSTRUCTOR(Environment::GetCurrent(args));
THROW_ERR_ILLEGAL_CONSTRUCTOR(Environment::GetCurrent(args));
}
static inline void SetSideEffectFreeGetter(
@ -858,6 +1023,100 @@ BaseObjectPtr<StatementSync> StatementSync::Create(Environment* env,
return MakeBaseObject<StatementSync>(env, obj, db, stmt);
}
Session::Session(Environment* env,
Local<Object> object,
BaseObjectWeakPtr<DatabaseSync> database,
sqlite3_session* session)
: BaseObject(env, object),
session_(session),
database_(std::move(database)) {
MakeWeak();
}
Session::~Session() {
Delete();
}
BaseObjectPtr<Session> Session::Create(Environment* env,
BaseObjectWeakPtr<DatabaseSync> database,
sqlite3_session* session) {
Local<Object> obj;
if (!GetConstructorTemplate(env)
->InstanceTemplate()
->NewInstance(env->context())
.ToLocal(&obj)) {
return BaseObjectPtr<Session>();
}
return MakeBaseObject<Session>(env, obj, std::move(database), session);
}
Local<FunctionTemplate> Session::GetConstructorTemplate(Environment* env) {
Local<FunctionTemplate> tmpl = env->sqlite_session_constructor_template();
if (tmpl.IsEmpty()) {
Isolate* isolate = env->isolate();
tmpl = NewFunctionTemplate(isolate, IllegalConstructor);
tmpl->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "Session"));
tmpl->InstanceTemplate()->SetInternalFieldCount(
Session::kInternalFieldCount);
SetProtoMethod(isolate,
tmpl,
"changeset",
Session::Changeset<sqlite3session_changeset>);
SetProtoMethod(
isolate, tmpl, "patchset", Session::Changeset<sqlite3session_patchset>);
SetProtoMethod(isolate, tmpl, "close", Session::Close);
env->set_sqlite_session_constructor_template(tmpl);
}
return tmpl;
}
void Session::MemoryInfo(MemoryTracker* tracker) const {}
template <Sqlite3ChangesetGenFunc sqliteChangesetFunc>
void Session::Changeset(const FunctionCallbackInfo<Value>& args) {
Session* session;
ASSIGN_OR_RETURN_UNWRAP(&session, args.This());
Environment* env = Environment::GetCurrent(args);
sqlite3* db = session->database_ ? session->database_->connection_ : nullptr;
THROW_AND_RETURN_ON_BAD_STATE(
env, !session->database_->IsOpen(), "database is not open");
THROW_AND_RETURN_ON_BAD_STATE(
env, session->session_ == nullptr, "session is not open");
int nChangeset;
void* pChangeset;
int r = sqliteChangesetFunc(session->session_, &nChangeset, &pChangeset);
CHECK_ERROR_OR_THROW(env->isolate(), db, r, SQLITE_OK, void());
auto freeChangeset = OnScopeLeave([&] { sqlite3_free(pChangeset); });
Local<ArrayBuffer> buffer = ArrayBuffer::New(env->isolate(), nChangeset);
std::memcpy(buffer->GetBackingStore()->Data(), pChangeset, nChangeset);
Local<Uint8Array> uint8Array = Uint8Array::New(buffer, 0, nChangeset);
args.GetReturnValue().Set(uint8Array);
}
void Session::Close(const FunctionCallbackInfo<Value>& args) {
Session* session;
ASSIGN_OR_RETURN_UNWRAP(&session, args.This());
Environment* env = Environment::GetCurrent(args);
THROW_AND_RETURN_ON_BAD_STATE(
env, !session->database_->IsOpen(), "database is not open");
THROW_AND_RETURN_ON_BAD_STATE(
env, session->session_ == nullptr, "session is not open");
session->Delete();
}
void Session::Delete() {
if (!database_ || !database_->connection_ || session_ == nullptr) return;
sqlite3session_delete(session_);
database_->sessions_.erase(session_);
session_ = nullptr;
}
static void Initialize(Local<Object> target,
Local<Value> unused,
Local<Context> context,
@ -873,11 +1132,19 @@ static void Initialize(Local<Object> target,
SetProtoMethod(isolate, db_tmpl, "close", DatabaseSync::Close);
SetProtoMethod(isolate, db_tmpl, "prepare", DatabaseSync::Prepare);
SetProtoMethod(isolate, db_tmpl, "exec", DatabaseSync::Exec);
SetProtoMethod(
isolate, db_tmpl, "createSession", DatabaseSync::CreateSession);
SetProtoMethod(
isolate, db_tmpl, "applyChangeset", DatabaseSync::ApplyChangeset);
SetConstructorFunction(context, target, "DatabaseSync", db_tmpl);
SetConstructorFunction(context,
target,
"StatementSync",
StatementSync::GetConstructorTemplate(env));
NODE_DEFINE_CONSTANT(target, SQLITE_CHANGESET_OMIT);
NODE_DEFINE_CONSTANT(target, SQLITE_CHANGESET_REPLACE);
NODE_DEFINE_CONSTANT(target, SQLITE_CHANGESET_ABORT);
}
} // namespace sqlite

View File

@ -56,6 +56,8 @@ class DatabaseSync : public BaseObject {
static void Close(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Prepare(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Exec(const v8::FunctionCallbackInfo<v8::Value>& args);
static void CreateSession(const v8::FunctionCallbackInfo<v8::Value>& args);
static void ApplyChangeset(const v8::FunctionCallbackInfo<v8::Value>& args);
void FinalizeStatements();
void UntrackStatement(StatementSync* statement);
bool IsOpen();
@ -66,11 +68,16 @@ class DatabaseSync : public BaseObject {
private:
bool Open();
void DeleteSessions();
~DatabaseSync() override;
DatabaseOpenConfiguration open_config_;
sqlite3* connection_;
std::set<sqlite3_session*> sessions_;
std::unordered_set<StatementSync*> statements_;
friend class Session;
};
class StatementSync : public BaseObject {
@ -113,6 +120,34 @@ class StatementSync : public BaseObject {
v8::MaybeLocal<v8::Name> ColumnNameToName(const int column);
};
using Sqlite3ChangesetGenFunc = int (*)(sqlite3_session*, int*, void**);
class Session : public BaseObject {
public:
Session(Environment* env,
v8::Local<v8::Object> object,
BaseObjectWeakPtr<DatabaseSync> database,
sqlite3_session* session);
~Session() override;
template <Sqlite3ChangesetGenFunc sqliteChangesetFunc>
static void Changeset(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Close(const v8::FunctionCallbackInfo<v8::Value>& args);
static v8::Local<v8::FunctionTemplate> GetConstructorTemplate(
Environment* env);
static BaseObjectPtr<Session> Create(Environment* env,
BaseObjectWeakPtr<DatabaseSync> database,
sqlite3_session* session);
void MemoryInfo(MemoryTracker* tracker) const override;
SET_MEMORY_INFO_NAME(Session)
SET_SELF_SIZE(Session)
private:
void Delete();
sqlite3_session* session_;
BaseObjectWeakPtr<DatabaseSync> database_; // The Parent Database
};
} // namespace sqlite
} // namespace node

View File

@ -0,0 +1,384 @@
// Flags: --experimental-sqlite
'use strict';
require('../common');
const {
DatabaseSync,
SQLITE_CHANGESET_OMIT,
SQLITE_CHANGESET_REPLACE,
SQLITE_CHANGESET_ABORT
} = require('node:sqlite');
const { test, suite } = require('node:test');
/**
* Convenience wrapper around assert.deepStrictEqual that sets a null
* prototype to the expected object.
*/
function deepStrictEqual(t) {
return (actual, expected, message) => {
if (Array.isArray(expected)) {
expected = expected.map((obj) => ({ ...obj, __proto__: null }));
} else if (typeof expected === 'object') {
expected = { ...expected, __proto__: null };
}
t.assert.deepStrictEqual(actual, expected, message);
};
}
test('creating and applying a changeset', (t) => {
const createDataTableSql = `
CREATE TABLE data(
key INTEGER PRIMARY KEY,
value TEXT
) STRICT`;
const createDatabase = () => {
const database = new DatabaseSync(':memory:');
database.exec(createDataTableSql);
return database;
};
const databaseFrom = createDatabase();
const session = databaseFrom.createSession();
const select = 'SELECT * FROM data ORDER BY key';
const insert = databaseFrom.prepare('INSERT INTO data (key, value) VALUES (?, ?)');
insert.run(1, 'hello');
insert.run(2, 'world');
const databaseTo = createDatabase();
t.assert.strictEqual(databaseTo.applyChangeset(session.changeset()), true);
deepStrictEqual(t)(
databaseFrom.prepare(select).all(),
databaseTo.prepare(select).all()
);
});
test('database.createSession() - closed database results in exception', (t) => {
const database = new DatabaseSync(':memory:');
database.close();
t.assert.throws(() => {
database.createSession();
}, {
name: 'Error',
message: 'database is not open',
});
});
test('session.changeset() - closed database results in exception', (t) => {
const database = new DatabaseSync(':memory:');
const session = database.createSession();
database.close();
t.assert.throws(() => {
session.changeset();
}, {
name: 'Error',
message: 'database is not open',
});
});
test('database.applyChangeset() - closed database results in exception', (t) => {
const database = new DatabaseSync(':memory:');
const session = database.createSession();
const changeset = session.changeset();
database.close();
t.assert.throws(() => {
database.applyChangeset(changeset);
}, {
name: 'Error',
message: 'database is not open',
});
});
test('database.createSession() - use table option to track specific table', (t) => {
const database1 = new DatabaseSync(':memory:');
const database2 = new DatabaseSync(':memory:');
const createData1TableSql = `CREATE TABLE data1 (
key INTEGER PRIMARY KEY,
value TEXT
) STRICT
`;
const createData2TableSql = `CREATE TABLE data2 (
key INTEGER PRIMARY KEY,
value TEXT
) STRICT
`;
database1.exec(createData1TableSql);
database1.exec(createData2TableSql);
database2.exec(createData1TableSql);
database2.exec(createData2TableSql);
const session = database1.createSession({
table: 'data1'
});
const insert1 = database1.prepare('INSERT INTO data1 (key, value) VALUES (?, ?)');
insert1.run(1, 'hello');
insert1.run(2, 'world');
const insert2 = database1.prepare('INSERT INTO data2 (key, value) VALUES (?, ?)');
insert2.run(1, 'hello');
insert2.run(2, 'world');
const select1 = 'SELECT * FROM data1 ORDER BY key';
const select2 = 'SELECT * FROM data2 ORDER BY key';
t.assert.strictEqual(database2.applyChangeset(session.changeset()), true);
deepStrictEqual(t)(
database1.prepare(select1).all(),
database2.prepare(select1).all()); // data1 table should be equal
deepStrictEqual(t)(database2.prepare(select2).all(), []); // data2 should be empty in database2
t.assert.strictEqual(database1.prepare(select2).all().length, 2); // data1 should have values in database1
});
suite('conflict resolution', () => {
const prepareConflict = () => {
const database1 = new DatabaseSync(':memory:');
const database2 = new DatabaseSync(':memory:');
const createDataTableSql = `CREATE TABLE data (
key INTEGER PRIMARY KEY,
value TEXT
) STRICT
`;
database1.exec(createDataTableSql);
database2.exec(createDataTableSql);
const insertSql = 'INSERT INTO data (key, value) VALUES (?, ?)';
const session = database1.createSession();
database1.prepare(insertSql).run(1, 'hello');
database1.prepare(insertSql).run(2, 'foo');
database2.prepare(insertSql).run(1, 'world');
return {
database2,
changeset: session.changeset()
};
};
test('database.applyChangeset() - conflict with default behavior (abort)', (t) => {
const { database2, changeset } = prepareConflict();
// When changeset is aborted due to a conflict, applyChangeset should return false
t.assert.strictEqual(database2.applyChangeset(changeset), false);
deepStrictEqual(t)(
database2.prepare('SELECT value from data').all(),
[{ value: 'world' }]); // unchanged
});
test('database.applyChangeset() - conflict with SQLITE_CHANGESET_ABORT', (t) => {
const { database2, changeset } = prepareConflict();
const result = database2.applyChangeset(changeset, {
onConflict: SQLITE_CHANGESET_ABORT
});
// When changeset is aborted due to a conflict, applyChangeset should return false
t.assert.strictEqual(result, false);
deepStrictEqual(t)(
database2.prepare('SELECT value from data').all(),
[{ value: 'world' }]); // unchanged
});
test('database.applyChangeset() - conflict with SQLITE_CHANGESET_REPLACE', (t) => {
const { database2, changeset } = prepareConflict();
const result = database2.applyChangeset(changeset, {
onConflict: SQLITE_CHANGESET_REPLACE
});
// Not aborted due to conflict, so should return true
t.assert.strictEqual(result, true);
deepStrictEqual(t)(
database2.prepare('SELECT value from data ORDER BY key').all(),
[{ value: 'hello' }, { value: 'foo' }]); // replaced
});
test('database.applyChangeset() - conflict with SQLITE_CHANGESET_OMIT', (t) => {
const { database2, changeset } = prepareConflict();
const result = database2.applyChangeset(changeset, {
onConflict: SQLITE_CHANGESET_OMIT
});
// Not aborted due to conflict, so should return true
t.assert.strictEqual(result, true);
deepStrictEqual(t)(
database2.prepare('SELECT value from data ORDER BY key ASC').all(),
[{ value: 'world' }, { value: 'foo' }]); // Conflicting change omitted
});
});
test('session related constants are defined', (t) => {
t.assert.strictEqual(SQLITE_CHANGESET_OMIT, 0);
t.assert.strictEqual(SQLITE_CHANGESET_REPLACE, 1);
t.assert.strictEqual(SQLITE_CHANGESET_ABORT, 2);
});
test('database.createSession() - filter changes', (t) => {
const database1 = new DatabaseSync(':memory:');
const database2 = new DatabaseSync(':memory:');
const createTableSql = 'CREATE TABLE data1(key INTEGER PRIMARY KEY); CREATE TABLE data2(key INTEGER PRIMARY KEY);';
database1.exec(createTableSql);
database2.exec(createTableSql);
const session = database1.createSession();
database1.exec('INSERT INTO data1 (key) VALUES (1), (2), (3)');
database1.exec('INSERT INTO data2 (key) VALUES (1), (2), (3), (4), (5)');
database2.applyChangeset(session.changeset(), {
filter: (tableName) => tableName === 'data2'
});
const data1Rows = database2.prepare('SELECT * FROM data1').all();
const data2Rows = database2.prepare('SELECT * FROM data2').all();
// Expect no rows since all changes were filtered out
t.assert.strictEqual(data1Rows.length, 0);
// Expect 5 rows since these changes were not filtered out
t.assert.strictEqual(data2Rows.length, 5);
});
test('database.createSession() - specify other database', (t) => {
const database = new DatabaseSync(':memory:');
const session = database.createSession();
const sessionMain = database.createSession({
db: 'main'
});
const sessionTest = database.createSession({
db: 'test'
});
database.exec('CREATE TABLE data (key INTEGER PRIMARY KEY)');
database.exec('INSERT INTO data (key) VALUES (1)');
t.assert.notStrictEqual(session.changeset().length, 0);
t.assert.notStrictEqual(sessionMain.changeset().length, 0);
// Since this session is attached to a different database, its changeset should be empty
t.assert.strictEqual(sessionTest.changeset().length, 0);
});
test('database.createSession() - wrong arguments', (t) => {
const database = new DatabaseSync(':memory:');
t.assert.throws(() => {
database.createSession(null);
}, {
name: 'TypeError',
message: 'The "options" argument must be an object.'
});
t.assert.throws(() => {
database.createSession({
table: 123
});
}, {
name: 'TypeError',
message: 'The "options.table" argument must be a string.'
});
t.assert.throws(() => {
database.createSession({
db: 123
});
}, {
name: 'TypeError',
message: 'The "options.db" argument must be a string.'
});
});
test('database.applyChangeset() - wrong arguments', (t) => {
const database = new DatabaseSync(':memory:');
const session = database.createSession();
t.assert.throws(() => {
database.applyChangeset(null);
}, {
name: 'TypeError',
message: 'The "changeset" argument must be a Uint8Array.'
});
t.assert.throws(() => {
database.applyChangeset(session.changeset(), null);
}, {
name: 'TypeError',
message: 'The "options" argument must be an object.'
});
t.assert.throws(() => {
database.applyChangeset(session.changeset(), {
filter: null
}, null);
}, {
name: 'TypeError',
message: 'The "options.filter" argument must be a function.'
});
t.assert.throws(() => {
database.applyChangeset(session.changeset(), {
onConflict: null
}, null);
}, {
name: 'TypeError',
message: 'The "options.onConflict" argument must be a number.'
});
});
test('session.patchset()', (t) => {
const database = new DatabaseSync(':memory:');
database.exec('CREATE TABLE data(key INTEGER PRIMARY KEY, value TEXT)');
database.exec("INSERT INTO data VALUES ('1', 'Lorem ipsum dolor sit amet, consectetur adipiscing elit.')");
const session = database.createSession();
database.exec("UPDATE data SET value = 'hi' WHERE key = 1");
const patchset = session.patchset();
const changeset = session.changeset();
t.assert.ok(patchset instanceof Uint8Array);
t.assert.ok(changeset instanceof Uint8Array);
t.assert.deepStrictEqual(patchset, session.patchset());
t.assert.deepStrictEqual(changeset, session.changeset());
t.assert.ok(
patchset.length < changeset.length,
'expected patchset to be smaller than changeset');
});
test('session.close() - using session after close throws exception', (t) => {
const database = new DatabaseSync(':memory:');
database.exec('CREATE TABLE data(key INTEGER PRIMARY KEY, value TEXT)');
database.exec("INSERT INTO data VALUES ('1', 'Lorem ipsum dolor sit amet, consectetur adipiscing elit.')");
const session = database.createSession();
database.exec("UPDATE data SET value = 'hi' WHERE key = 1");
session.close();
database.exec("UPDATE data SET value = 'world' WHERE key = 1");
t.assert.throws(() => {
session.changeset();
}, {
name: 'Error',
message: 'session is not open'
});
});
test('session.close() - after closing database throws exception', (t) => {
const database = new DatabaseSync(':memory:');
database.exec('CREATE TABLE data(key INTEGER PRIMARY KEY, value TEXT)');
database.exec("INSERT INTO data VALUES ('1', 'Lorem ipsum dolor sit amet, consectetur adipiscing elit.')");
const session = database.createSession();
database.close();
t.assert.throws(() => {
session.close();
}, {
name: 'Error',
message: 'database is not open'
});
});
test('session.close() - closing twice', (t) => {
const database = new DatabaseSync(':memory:');
const session = database.createSession();
session.close();
t.assert.throws(() => {
session.close();
}, {
name: 'Error',
message: 'session is not open'
});
});

View File

@ -199,6 +199,7 @@ const customTypesMap = {
'repl.REPLServer': 'repl.html#class-replserver',
'Session': 'sqlite.html#class-session',
'StatementSync': 'sqlite.html#class-statementsync',
'Stream': 'stream.html#stream',