feat(ext/kv): connect to remote database (#20178)

This patch adds a `remote` backend for `ext/kv`. This supports
connection to Deno Deploy and potentially other services compatible with
the KV Connect protocol.
This commit is contained in:
Heyang Zhou 2023-08-22 13:56:00 +08:00 committed by GitHub
parent 5834d282d4
commit 6d4a005e41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1492 additions and 12 deletions

View File

@ -31,6 +31,12 @@ jobs:
- uses: dsherret/rust-toolchain-file@v1
- name: Install protoc
uses: arduino/setup-protoc@v2
with:
version: "21.12"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Build release
run: cargo build --release --locked --all-targets

View File

@ -168,6 +168,11 @@ const installNodeStep = {
uses: "actions/setup-node@v3",
with: { "node-version": 18 },
};
const installProtocStep = {
name: "Install protoc",
uses: "arduino/setup-protoc@v2",
with: { "version": "21.12", "repo-token": "${{ secrets.GITHUB_TOKEN }}" },
};
const installDenoStep = {
name: "Install Deno",
uses: "denoland/setup-deno@v1",
@ -434,6 +439,7 @@ const ci = {
if: "matrix.job == 'bench'",
...installNodeStep,
},
installProtocStep,
{
if: [
"matrix.profile == 'release' &&",

View File

@ -170,6 +170,12 @@ jobs:
uses: actions/setup-node@v3
with:
node-version: 18
- name: Install protoc
uses: arduino/setup-protoc@v2
with:
version: '21.12'
repo-token: '${{ secrets.GITHUB_TOKEN }}'
if: '!(github.event_name == ''pull_request'' && matrix.skip_pr)'
- if: |-
!(github.event_name == 'pull_request' && matrix.skip_pr) && (matrix.profile == 'release' &&
matrix.job == 'test' &&

78
Cargo.lock generated
View File

@ -507,6 +507,7 @@ dependencies = [
"iana-time-zone",
"js-sys",
"num-traits",
"serde",
"time 0.1.45",
"wasm-bindgen",
"winapi",
@ -1227,15 +1228,20 @@ dependencies = [
"anyhow",
"async-trait",
"base64 0.13.1",
"chrono",
"deno_core",
"hex",
"log",
"num-bigint",
"prost",
"prost-build",
"rand",
"reqwest",
"rusqlite",
"serde",
"serde_json",
"tokio",
"url",
"uuid",
]
@ -3113,6 +3119,12 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4519a88847ba2d5ead3dc53f1060ec6a571de93f325d9c5c4968147382b1cbc3"
[[package]]
name = "multimap"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "napi-build"
version = "1.2.1"
@ -3695,6 +3707,16 @@ dependencies = [
"yansi",
]
[[package]]
name = "prettyplease"
version = "0.1.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86"
dependencies = [
"proc-macro2 1.0.66",
"syn 1.0.109",
]
[[package]]
name = "primeorder"
version = "0.13.1"
@ -3762,6 +3784,60 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "prost"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-build"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270"
dependencies = [
"bytes",
"heck",
"itertools",
"lazy_static",
"log",
"multimap",
"petgraph",
"prettyplease",
"prost",
"prost-types",
"regex",
"syn 1.0.109",
"tempfile",
"which",
]
[[package]]
name = "prost-derive"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4"
dependencies = [
"anyhow",
"itertools",
"proc-macro2 1.0.66",
"quote 1.0.32",
"syn 1.0.109",
]
[[package]]
name = "prost-types"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13"
dependencies = [
"prost",
]
[[package]]
name = "psm"
version = "0.1.21"
@ -5232,6 +5308,8 @@ dependencies = [
"os_pipe",
"parking_lot 0.12.1",
"pretty_assertions",
"prost",
"prost-build",
"regex",
"reqwest",
"ring",

View File

@ -82,6 +82,7 @@ brotli = "3.3.4"
bytes = "1.4.0"
cache_control = "=0.2.0"
cbc = { version = "=0.1.2", features = ["alloc"] }
chrono = { version = "=0.4.26", default-features = false, features = ["std", "serde", "clock"] }
console_static_text = "=0.8.1"
data-url = "=0.2.0"
dlopen = "0.1.8"
@ -110,10 +111,12 @@ parking_lot = "0.12.0"
percent-encoding = "=2.3.0"
pin-project = "1.0.11" # don't pin because they yank crates from cargo
pretty_assertions = "=1.3.0"
prost = "0.11"
prost-build = "0.11"
rand = "=0.8.5"
regex = "^1.7.0"
lazy-regex = "2.5.0"
reqwest = { version = "0.11.18", default-features = false, features = ["rustls-tls", "stream", "gzip", "brotli", "socks"] }
reqwest = { version = "0.11.18", default-features = false, features = ["rustls-tls", "stream", "gzip", "brotli", "socks", "json"] }
ring = "=0.16.20"
rusqlite = { version = "=0.29.0", features = ["unlock_notify", "bundled"] }
rustls = "0.21.0"

View File

@ -60,6 +60,11 @@ scoop install deno
Build and install from source using [Cargo](https://crates.io/crates/deno):
```sh
# Install the Protobuf compiler
apt install -y protobuf-compiler # Linux
brew install protobuf # macOS
# Build and install Deno
cargo install deno --locked
```

View File

@ -66,7 +66,7 @@ base32 = "=0.4.0"
base64.workspace = true
bincode = "=1.3.3"
cache_control.workspace = true
chrono = { version = "=0.4.26", default-features = false, features = ["std"] }
chrono.workspace = true
clap = { version = "=4.3.3", features = ["string"] }
clap_complete = "=4.3.1"
clap_complete_fig = "=4.3.1"

View File

@ -0,0 +1,54 @@
{
"$id": "https://deno.land/x/deno/cli/schemas/kv-metadata-exchange-response.v1.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"Uuid": {
"type": "string",
"pattern": "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"
},
"DateTime": {
"type": "string",
"format": "date-time"
},
"EndpointInfo": {
"type": "object",
"properties": {
"url": {
"type": "string"
},
"consistency": {
"type": "string"
}
},
"required": ["url", "consistency"],
"additionalProperties": false
},
"DatabaseMetadata": {
"type": "object",
"properties": {
"version": {
"type": "integer",
"minimum": 0
},
"databaseId": {
"$ref": "#/definitions/Uuid"
},
"endpoints": {
"type": "array",
"items": {
"$ref": "#/definitions/EndpointInfo"
}
},
"token": {
"type": "string"
},
"expiresAt": {
"$ref": "#/definitions/DateTime"
}
},
"required": ["version", "databaseId", "endpoints", "token", "expiresAt"],
"additionalProperties": false
}
},
"$ref": "#/definitions/DatabaseMetadata"
}

View File

@ -20,6 +20,9 @@ try {
isCI = true;
}
// Defined in test_util/src/lib.rs
Deno.env.set("DENO_KV_ACCESS_TOKEN", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
Deno.test({
name: "openKv :memory: no permissions",
permissions: {},
@ -1932,3 +1935,74 @@ Deno.test({
}
},
});
Deno.test({
name: "remote backend",
async fn() {
const db = await Deno.openKv("http://localhost:4545/kv_remote_authorize");
try {
await db.set(["some-key"], 1);
const entry = await db.get(["some-key"]);
assertEquals(entry.value, null);
assertEquals(entry.versionstamp, null);
} finally {
db.close();
}
},
});
Deno.test({
name: "remote backend invalid format",
async fn() {
const db = await Deno.openKv(
"http://localhost:4545/kv_remote_authorize_invalid_format",
);
let ok = false;
try {
await db.set(["some-key"], 1);
} catch (e) {
if (
e.name === "TypeError" &&
e.message.startsWith("Metadata error: Failed to decode metadata: ")
) {
ok = true;
} else {
throw e;
}
} finally {
db.close();
}
if (!ok) {
throw new Error("did not get expected error");
}
},
});
Deno.test({
name: "remote backend invalid version",
async fn() {
const db = await Deno.openKv(
"http://localhost:4545/kv_remote_authorize_invalid_version",
);
let ok = false;
try {
await db.set(["some-key"], 1);
} catch (e) {
if (
e.name === "TypeError" &&
e.message === "Metadata error: Unsupported metadata version: 2"
) {
ok = true;
} else {
throw e;
}
} finally {
db.close();
}
if (!ok) {
throw new Error("did not get expected error");
}
},
});

View File

@ -17,13 +17,20 @@ path = "lib.rs"
anyhow.workspace = true
async-trait.workspace = true
base64.workspace = true
chrono.workspace = true
deno_core.workspace = true
hex.workspace = true
log.workspace = true
num-bigint.workspace = true
prost.workspace = true
rand.workspace = true
reqwest.workspace = true
rusqlite.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
url.workspace = true
uuid.workspace = true
[build-dependencies]
prost-build.workspace = true

View File

@ -1,3 +1,81 @@
# deno_kv
This crate provides a key/value store for Deno.
This crate provides a key/value store for Deno. For an overview of Deno KV,
please read the [manual](https://deno.land/manual/runtime/kv).
## Storage Backends
Deno KV has a pluggable storage interface that supports multiple backends:
- SQLite - backed by a local SQLite database. This backend is suitable for
development and is the default when running locally.
- Remote - backed by a remote service that implements the
[KV Connect](#kv-connect) protocol, for example
[Deno Deploy](https://deno.com/deploy).
Additional backends can be added by implementing the `DatabaseHandler` trait.
## KV Connect
The KV Connect protocol has separate control and data planes to maximize
throughput and minimize latency. _Metadata Exchange_ and _Data Path_ are the two
sub-protocols that are used when talking to a KV Connect-compatible service.
### Metadata Exchange
To connect to a KV Connect service, the user provides an HTTP or HTTPS URL to
`Deno.openKv`. A background task is then spawned to periodically make HTTP POST
requests to the provided URL to refresh database metadata.
The HTTP `Authorization` header is included and have the format
`Bearer <access-token>`. The `<access-token>` is a static token issued by the
service provider. For Deno Deploy, this is the personal access token generated
from the dashboard. You can specify the access token with the environment
variable `DENO_KV_ACCESS_TOKEN`.
Request body is currently unused. The response is a JSON message that satisfies
the [JSON Schema](https://json-schema.org/) definition in
`cli/schemas/kv-metadata-exchange-response.v1.json`.
Semantics of the response fields:
- `version`: Protocol version. The only supported value is `1`.
- `databaseId`: UUID of the database.
- `endpoints`: Data plane endpoints that can serve requests to the database,
along with their consistency levels.
- `token`: An ephemeral authentication token that must be included in all
requests to the data plane. This value is an opaque string and the client
should not depend on its format.
- `expiresAt`: The time at which the token expires. Encoded as an ISO 8601
string.
### Data Path
After the first metadata exchange has completed, the client can talk to the data
plane endpoints listed in the `endpoints` field using a Protobuf-over-HTTP
protocol called the _Data Path_. The Protobuf messages are defined in
`proto/datapath.proto`.
Two sub-endpoints are available under a data plane endpoint URL:
- `POST /snapshot_read`: Used for read operations: `kv.get()` and
`kv.getMany()`.
- **Request type**: `SnapshotRead`
- **Response type**: `SnapshotReadOutput`
- `POST /atomic_write`: Used for write operations: `kv.set()` and
`kv.atomic().commit()`.
- **Request type**: `AtomicWrite`
- **Response type**: `AtomicWriteOutput`
An HTTP `Authorization` header in the format `Bearer <ephemeral-token>` must be
included in all requests to the data plane. The value of `<ephemeral-token>` is
the `token` field from the metadata exchange response.
### Error handling
All non-client errors (i.e. network errors and HTTP 5xx status codes) are
handled by retrying the request. Randomized exponential backoff is applied to
each retry.
Client errors cannot be recovered by retrying. A JavaScript exception is
generated for each of those errors.

19
ext/kv/build.rs Normal file
View File

@ -0,0 +1,19 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::env;
use std::io;
use std::path::PathBuf;
fn main() -> io::Result<()> {
println!("cargo:rerun-if-changed=./proto");
let descriptor_path =
PathBuf::from(env::var("OUT_DIR").unwrap()).join("proto_descriptor.bin");
prost_build::Config::new()
.file_descriptor_set_path(&descriptor_path)
.compile_well_known_types()
.compile_protos(&["proto/datapath.proto"], &["proto/"])?;
Ok(())
}

216
ext/kv/dynamic.rs Normal file
View File

@ -0,0 +1,216 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::cell::RefCell;
use std::rc::Rc;
use crate::remote::RemoteDbHandlerPermissions;
use crate::sqlite::SqliteDbHandler;
use crate::sqlite::SqliteDbHandlerPermissions;
use crate::AtomicWrite;
use crate::CommitResult;
use crate::Database;
use crate::DatabaseHandler;
use crate::QueueMessageHandle;
use crate::ReadRange;
use crate::ReadRangeOutput;
use crate::SnapshotReadOptions;
use async_trait::async_trait;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::OpState;
pub struct MultiBackendDbHandler {
backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>,
}
impl MultiBackendDbHandler {
pub fn new(
backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>,
) -> Self {
Self { backends }
}
pub fn remote_or_sqlite<
P: SqliteDbHandlerPermissions + RemoteDbHandlerPermissions + 'static,
>(
default_storage_dir: Option<std::path::PathBuf>,
) -> Self {
Self::new(vec![
(
&["https://", "http://"],
Box::new(crate::remote::RemoteDbHandler::<P>::new()),
),
(
&[""],
Box::new(SqliteDbHandler::<P>::new(default_storage_dir)),
),
])
}
}
#[async_trait(?Send)]
impl DatabaseHandler for MultiBackendDbHandler {
type DB = Box<dyn DynamicDb>;
async fn open(
&self,
state: Rc<RefCell<OpState>>,
path: Option<String>,
) -> Result<Self::DB, AnyError> {
for (prefixes, handler) in &self.backends {
for &prefix in *prefixes {
if prefix.is_empty() {
return handler.dyn_open(state.clone(), path.clone()).await;
}
let Some(path) = &path else {
continue;
};
if path.starts_with(prefix) {
return handler.dyn_open(state.clone(), Some(path.clone())).await;
}
}
}
Err(type_error(format!(
"No backend supports the given path: {:?}",
path
)))
}
}
#[async_trait(?Send)]
pub trait DynamicDbHandler {
async fn dyn_open(
&self,
state: Rc<RefCell<OpState>>,
path: Option<String>,
) -> Result<Box<dyn DynamicDb>, AnyError>;
}
#[async_trait(?Send)]
impl DatabaseHandler for Box<dyn DynamicDbHandler> {
type DB = Box<dyn DynamicDb>;
async fn open(
&self,
state: Rc<RefCell<OpState>>,
path: Option<String>,
) -> Result<Self::DB, AnyError> {
(**self).dyn_open(state, path).await
}
}
#[async_trait(?Send)]
impl<T, DB> DynamicDbHandler for T
where
T: DatabaseHandler<DB = DB>,
DB: Database + 'static,
{
async fn dyn_open(
&self,
state: Rc<RefCell<OpState>>,
path: Option<String>,
) -> Result<Box<dyn DynamicDb>, AnyError> {
Ok(Box::new(self.open(state, path).await?))
}
}
#[async_trait(?Send)]
pub trait DynamicDb {
async fn dyn_snapshot_read(
&self,
state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError>;
async fn dyn_atomic_write(
&self,
state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError>;
async fn dyn_dequeue_next_message(
&self,
state: Rc<RefCell<OpState>>,
) -> Result<Box<dyn QueueMessageHandle>, AnyError>;
fn dyn_close(&self);
}
#[async_trait(?Send)]
impl Database for Box<dyn DynamicDb> {
type QMH = Box<dyn QueueMessageHandle>;
async fn snapshot_read(
&self,
state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
(**self).dyn_snapshot_read(state, requests, options).await
}
async fn atomic_write(
&self,
state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
(**self).dyn_atomic_write(state, write).await
}
async fn dequeue_next_message(
&self,
state: Rc<RefCell<OpState>>,
) -> Result<Box<dyn QueueMessageHandle>, AnyError> {
(**self).dyn_dequeue_next_message(state).await
}
fn close(&self) {
(**self).dyn_close()
}
}
#[async_trait(?Send)]
impl<T, QMH> DynamicDb for T
where
T: Database<QMH = QMH>,
QMH: QueueMessageHandle + 'static,
{
async fn dyn_snapshot_read(
&self,
state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
Ok(self.snapshot_read(state, requests, options).await?)
}
async fn dyn_atomic_write(
&self,
state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
Ok(self.atomic_write(state, write).await?)
}
async fn dyn_dequeue_next_message(
&self,
state: Rc<RefCell<OpState>>,
) -> Result<Box<dyn QueueMessageHandle>, AnyError> {
Ok(Box::new(self.dequeue_next_message(state).await?))
}
fn dyn_close(&self) {
self.close()
}
}
#[async_trait(?Send)]
impl QueueMessageHandle for Box<dyn QueueMessageHandle> {
async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> {
(**self).take_payload().await
}
async fn finish(&self, success: bool) -> Result<(), AnyError> {
(**self).finish(success).await
}
}

View File

@ -29,16 +29,21 @@ pub trait Database {
async fn snapshot_read(
&self,
state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError>;
async fn atomic_write(
&self,
state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError>;
async fn dequeue_next_message(&self) -> Result<Self::QMH, AnyError>;
async fn dequeue_next_message(
&self,
state: Rc<RefCell<OpState>>,
) -> Result<Self::QMH, AnyError>;
fn close(&self);
}

View File

@ -1,7 +1,10 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
pub mod codec;
pub mod dynamic;
mod interface;
mod proto;
pub mod remote;
pub mod sqlite;
use std::borrow::Cow;
@ -285,7 +288,8 @@ where
let opts = SnapshotReadOptions {
consistency: consistency.into(),
};
let output_ranges = db.snapshot_read(read_ranges, opts).await?;
let output_ranges =
db.snapshot_read(state.clone(), read_ranges, opts).await?;
let output_ranges = output_ranges
.into_iter()
.map(|x| {
@ -323,7 +327,7 @@ where
resource.db.clone()
};
let mut handle = db.dequeue_next_message().await?;
let mut handle = db.dequeue_next_message(state.clone()).await?;
let payload = handle.take_payload().await?.into();
let handle_rid = {
let mut state = state.borrow_mut();
@ -660,7 +664,7 @@ where
enqueues,
};
let result = db.atomic_write(atomic_write).await?;
let result = db.atomic_write(state.clone(), atomic_write).await?;
Ok(result.map(|res| hex::encode(res.versionstamp)))
}

View File

@ -0,0 +1,96 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
syntax = "proto3";
package datapath;
message SnapshotRead {
repeated ReadRange ranges = 1;
}
message SnapshotReadOutput {
repeated ReadRangeOutput ranges = 1;
bool read_disabled = 2;
repeated string regions_if_read_disabled = 3;
bool read_is_strongly_consistent = 4;
string primary_if_not_strongly_consistent = 5;
}
message ReadRange {
bytes start = 1;
bytes end = 2;
int32 limit = 3;
bool reverse = 4;
}
message ReadRangeOutput {
repeated KvEntry values = 1;
}
message AtomicWrite {
repeated KvCheck kv_checks = 1;
repeated KvMutation kv_mutations = 2;
repeated Enqueue enqueues = 3;
}
message AtomicWriteOutput {
AtomicWriteStatus status = 1;
bytes versionstamp = 2;
string primary_if_write_disabled = 3;
}
message KvCheck {
bytes key = 1;
bytes versionstamp = 2; // 10-byte raw versionstamp
}
message KvMutation {
bytes key = 1;
KvValue value = 2;
KvMutationType mutation_type = 3;
}
message KvValue {
bytes data = 1;
KvValueEncoding encoding = 2;
}
message KvEntry {
bytes key = 1;
bytes value = 2;
KvValueEncoding encoding = 3;
bytes versionstamp = 4;
}
enum KvMutationType {
M_UNSPECIFIED = 0;
M_SET = 1;
M_CLEAR = 2;
M_SUM = 3;
M_MAX = 4;
M_MIN = 5;
}
enum KvValueEncoding {
VE_UNSPECIFIED = 0;
VE_V8 = 1;
VE_LE64 = 2;
VE_BYTES = 3;
}
enum AtomicWriteStatus {
AW_UNSPECIFIED = 0;
AW_SUCCESS = 1;
AW_CHECK_FAILURE = 2;
AW_UNSUPPORTED_WRITE = 3;
AW_USAGE_LIMIT_EXCEEDED = 4;
AW_WRITE_DISABLED = 5;
AW_QUEUE_BACKLOG_LIMIT_EXCEEDED = 6;
}
message Enqueue {
bytes payload = 1;
int64 deadline_ms = 2;
repeated bytes kv_keys_if_undelivered = 3;
repeated uint32 backoff_schedule = 4;
}

7
ext/kv/proto/mod.rs Normal file
View File

@ -0,0 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Generated code, disable lints
#[allow(clippy::all, non_snake_case)]
pub mod datapath {
include!(concat!(env!("OUT_DIR"), "/datapath.rs"));
}

558
ext/kv/remote.rs Normal file
View File

@ -0,0 +1,558 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::cell::RefCell;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
use crate::proto::datapath as pb;
use crate::AtomicWrite;
use crate::CommitResult;
use crate::Database;
use crate::DatabaseHandler;
use crate::KvEntry;
use crate::MutationKind;
use crate::QueueMessageHandle;
use crate::ReadRange;
use crate::ReadRangeOutput;
use crate::SnapshotReadOptions;
use anyhow::Context;
use async_trait::async_trait;
use chrono::DateTime;
use chrono::Utc;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::TryFutureExt;
use deno_core::task::JoinHandle;
use deno_core::OpState;
use prost::Message;
use rand::Rng;
use serde::Deserialize;
use tokio::sync::watch;
use url::Url;
use uuid::Uuid;
pub trait RemoteDbHandlerPermissions {
fn check_env(&mut self, var: &str) -> Result<(), AnyError>;
fn check_net_url(
&mut self,
url: &Url,
api_name: &str,
) -> Result<(), AnyError>;
}
pub struct RemoteDbHandler<P: RemoteDbHandlerPermissions + 'static> {
_p: std::marker::PhantomData<P>,
}
impl<P: RemoteDbHandlerPermissions> RemoteDbHandler<P> {
pub fn new() -> Self {
Self { _p: PhantomData }
}
}
impl<P: RemoteDbHandlerPermissions> Default for RemoteDbHandler<P> {
fn default() -> Self {
Self::new()
}
}
#[derive(Deserialize)]
struct VersionInfo {
version: u64,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
#[allow(dead_code)]
struct DatabaseMetadata {
version: u64,
database_id: Uuid,
endpoints: Vec<EndpointInfo>,
token: String,
expires_at: DateTime<Utc>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct EndpointInfo {
pub url: String,
// Using `String` instead of an enum, so that parsing doesn't
// break if more consistency levels are added.
pub consistency: String,
}
#[async_trait(?Send)]
impl<P: RemoteDbHandlerPermissions> DatabaseHandler for RemoteDbHandler<P> {
type DB = RemoteDb<P>;
async fn open(
&self,
state: Rc<RefCell<OpState>>,
path: Option<String>,
) -> Result<Self::DB, AnyError> {
const ENV_VAR_NAME: &str = "DENO_KV_ACCESS_TOKEN";
let Some(url) = path else {
return Err(type_error("Missing database url"));
};
let Ok(parsed_url) = Url::parse(&url) else {
return Err(type_error(format!("Invalid database url: {}", url)));
};
{
let mut state = state.borrow_mut();
let permissions = state.borrow_mut::<P>();
permissions.check_env(ENV_VAR_NAME)?;
permissions.check_net_url(&parsed_url, "Deno.openKv")?;
}
let access_token = std::env::var(ENV_VAR_NAME)
.map_err(anyhow::Error::from)
.with_context(|| {
"Missing DENO_KV_ACCESS_TOKEN environment variable. Please set it to your access token from https://dash.deno.com/account."
})?;
let refresher = MetadataRefresher::new(url, access_token);
let db = RemoteDb {
client: reqwest::Client::new(),
refresher,
_p: PhantomData,
};
Ok(db)
}
}
pub struct RemoteDb<P: RemoteDbHandlerPermissions + 'static> {
client: reqwest::Client,
refresher: MetadataRefresher,
_p: std::marker::PhantomData<P>,
}
pub struct DummyQueueMessageHandle {}
#[async_trait(?Send)]
impl QueueMessageHandle for DummyQueueMessageHandle {
async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> {
unimplemented!()
}
async fn finish(&self, _success: bool) -> Result<(), AnyError> {
unimplemented!()
}
}
#[async_trait(?Send)]
impl<P: RemoteDbHandlerPermissions> Database for RemoteDb<P> {
type QMH = DummyQueueMessageHandle;
async fn snapshot_read(
&self,
state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
_options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
let req = pb::SnapshotRead {
ranges: requests
.into_iter()
.map(|r| pb::ReadRange {
start: r.start,
end: r.end,
limit: r.limit.get() as _,
reverse: r.reverse,
})
.collect(),
};
let res: pb::SnapshotReadOutput = call_remote::<P, _, _>(
&state,
&self.refresher,
&self.client,
"snapshot_read",
&req,
)
.await?;
if res.read_disabled {
return Err(type_error("Reads are disabled for this database."));
}
let out = res
.ranges
.into_iter()
.map(|r| {
Ok(ReadRangeOutput {
entries: r
.values
.into_iter()
.map(|e| {
let encoding = e.encoding();
Ok(KvEntry {
key: e.key,
value: decode_value(e.value, encoding)?,
versionstamp: <[u8; 10]>::try_from(&e.versionstamp[..])?,
})
})
.collect::<Result<_, AnyError>>()?,
})
})
.collect::<Result<Vec<_>, AnyError>>()?;
Ok(out)
}
async fn atomic_write(
&self,
state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
if !write.enqueues.is_empty() {
return Err(type_error("Enqueue operations are not supported yet."));
}
let req = pb::AtomicWrite {
kv_checks: write
.checks
.into_iter()
.map(|x| {
Ok(pb::KvCheck {
key: x.key,
versionstamp: x.versionstamp.unwrap_or([0u8; 10]).to_vec(),
})
})
.collect::<anyhow::Result<_>>()?,
kv_mutations: write
.mutations
.into_iter()
.map(|x| encode_mutation(x.key, x.kind))
.collect(),
enqueues: vec![],
};
let res: pb::AtomicWriteOutput = call_remote::<P, _, _>(
&state,
&self.refresher,
&self.client,
"atomic_write",
&req,
)
.await?;
match res.status() {
pb::AtomicWriteStatus::AwSuccess => Ok(Some(CommitResult {
versionstamp: if res.versionstamp.is_empty() {
Default::default()
} else {
res.versionstamp[..].try_into()?
},
})),
pb::AtomicWriteStatus::AwCheckFailure => Ok(None),
pb::AtomicWriteStatus::AwUnsupportedWrite => {
Err(type_error("Unsupported write"))
}
pb::AtomicWriteStatus::AwUsageLimitExceeded => {
Err(type_error("The database usage limit has been exceeded."))
}
pb::AtomicWriteStatus::AwWriteDisabled => {
// TODO: Auto retry
Err(type_error("Writes are disabled for this database."))
}
pb::AtomicWriteStatus::AwUnspecified => {
Err(type_error("Unspecified error"))
}
pb::AtomicWriteStatus::AwQueueBacklogLimitExceeded => {
Err(type_error("Queue backlog limit exceeded"))
}
}
}
async fn dequeue_next_message(
&self,
_state: Rc<RefCell<OpState>>,
) -> Result<Self::QMH, AnyError> {
deno_core::futures::future::pending().await
}
fn close(&self) {}
}
fn decode_value(
value: Vec<u8>,
encoding: pb::KvValueEncoding,
) -> anyhow::Result<crate::Value> {
match encoding {
pb::KvValueEncoding::VeV8 => Ok(crate::Value::V8(value)),
pb::KvValueEncoding::VeBytes => Ok(crate::Value::Bytes(value)),
pb::KvValueEncoding::VeLe64 => Ok(crate::Value::U64(u64::from_le_bytes(
<[u8; 8]>::try_from(&value[..])?,
))),
pb::KvValueEncoding::VeUnspecified => {
Err(anyhow::anyhow!("Unspecified value encoding, cannot decode"))
}
}
}
fn encode_value(value: crate::Value) -> pb::KvValue {
match value {
crate::Value::V8(data) => pb::KvValue {
data,
encoding: pb::KvValueEncoding::VeV8 as _,
},
crate::Value::Bytes(data) => pb::KvValue {
data,
encoding: pb::KvValueEncoding::VeBytes as _,
},
crate::Value::U64(x) => pb::KvValue {
data: x.to_le_bytes().to_vec(),
encoding: pb::KvValueEncoding::VeLe64 as _,
},
}
}
fn encode_mutation(key: Vec<u8>, mutation: MutationKind) -> pb::KvMutation {
match mutation {
MutationKind::Set(x) => pb::KvMutation {
key,
value: Some(encode_value(x)),
mutation_type: pb::KvMutationType::MSet as _,
},
MutationKind::Delete => pb::KvMutation {
key,
value: Some(encode_value(crate::Value::Bytes(vec![]))),
mutation_type: pb::KvMutationType::MClear as _,
},
MutationKind::Max(x) => pb::KvMutation {
key,
value: Some(encode_value(x)),
mutation_type: pb::KvMutationType::MMax as _,
},
MutationKind::Min(x) => pb::KvMutation {
key,
value: Some(encode_value(x)),
mutation_type: pb::KvMutationType::MMin as _,
},
MutationKind::Sum(x) => pb::KvMutation {
key,
value: Some(encode_value(x)),
mutation_type: pb::KvMutationType::MSum as _,
},
}
}
#[derive(Clone)]
enum MetadataState {
Ready(Arc<DatabaseMetadata>),
Invalid(String),
Pending,
}
struct MetadataRefresher {
metadata_rx: watch::Receiver<MetadataState>,
handle: JoinHandle<()>,
}
impl MetadataRefresher {
pub fn new(url: String, access_token: String) -> Self {
let (tx, rx) = watch::channel(MetadataState::Pending);
let handle =
deno_core::task::spawn(metadata_refresh_task(url, access_token, tx));
Self {
handle,
metadata_rx: rx,
}
}
}
impl Drop for MetadataRefresher {
fn drop(&mut self) {
self.handle.abort();
}
}
async fn metadata_refresh_task(
metadata_url: String,
access_token: String,
tx: watch::Sender<MetadataState>,
) {
let client = reqwest::Client::new();
loop {
let mut attempt = 0u64;
let metadata = loop {
match fetch_metadata(&client, &metadata_url, &access_token).await {
Ok(Ok(x)) => break x,
Ok(Err(e)) => {
if tx.send(MetadataState::Invalid(e)).is_err() {
return;
}
}
Err(e) => {
log::error!("Failed to fetch database metadata: {}", e);
}
}
randomized_exponential_backoff(Duration::from_secs(5), attempt).await;
attempt += 1;
};
let ms_until_expire = u64::try_from(
metadata
.expires_at
.timestamp_millis()
.saturating_sub(Utc::now().timestamp_millis()),
)
.unwrap_or_default();
// Refresh 10 minutes before expiry
// In case of buggy clocks, don't refresh more than once per minute
let interval = Duration::from_millis(ms_until_expire)
.saturating_sub(Duration::from_secs(600))
.max(Duration::from_secs(60));
if tx.send(MetadataState::Ready(Arc::new(metadata))).is_err() {
return;
}
tokio::time::sleep(interval).await;
}
}
async fn fetch_metadata(
client: &reqwest::Client,
metadata_url: &str,
access_token: &str,
) -> anyhow::Result<Result<DatabaseMetadata, String>> {
let res = client
.post(metadata_url)
.header("authorization", format!("Bearer {}", access_token))
.send()
.await?;
if !res.status().is_success() {
if res.status().is_client_error() {
return Ok(Err(format!(
"Client error while fetching metadata: {:?} {}",
res.status(),
res.text().await?
)));
} else {
anyhow::bail!(
"remote returned error: {:?} {}",
res.status(),
res.text().await?
);
}
}
let res = res.bytes().await?;
let version_info: VersionInfo = match serde_json::from_slice(&res) {
Ok(x) => x,
Err(e) => return Ok(Err(format!("Failed to decode version info: {}", e))),
};
if version_info.version > 1 {
return Ok(Err(format!(
"Unsupported metadata version: {}",
version_info.version
)));
}
Ok(
serde_json::from_slice(&res)
.map_err(|e| format!("Failed to decode metadata: {}", e)),
)
}
async fn randomized_exponential_backoff(base: Duration, attempt: u64) {
let attempt = attempt.min(12);
let delay = base.as_millis() as u64 + (2 << attempt);
let delay = delay + rand::thread_rng().gen_range(0..(delay / 2) + 1);
tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
}
async fn call_remote<
P: RemoteDbHandlerPermissions + 'static,
T: Message,
R: Message + Default,
>(
state: &RefCell<OpState>,
refresher: &MetadataRefresher,
client: &reqwest::Client,
method: &str,
req: &T,
) -> anyhow::Result<R> {
let mut attempt = 0u64;
let res = loop {
let mut metadata_rx = refresher.metadata_rx.clone();
let metadata = loop {
match &*metadata_rx.borrow() {
MetadataState::Pending => {}
MetadataState::Ready(x) => break x.clone(),
MetadataState::Invalid(e) => {
return Err(type_error(format!("Metadata error: {}", e)))
}
}
// `unwrap()` never fails because `tx` is owned by the task held by `refresher`.
metadata_rx.changed().await.unwrap();
};
let Some(sc_endpoint) = metadata.endpoints.iter().find(|x| x.consistency == "strong") else {
return Err(type_error("No strong consistency endpoint is available for this database"));
};
let full_url = format!("{}/{}", sc_endpoint.url, method);
{
let parsed_url = Url::parse(&full_url)?;
let mut state = state.borrow_mut();
let permissions = state.borrow_mut::<P>();
permissions.check_net_url(&parsed_url, "Deno.Kv")?;
}
let res = client
.post(&full_url)
.header("x-transaction-domain-id", metadata.database_id.to_string())
.header("authorization", format!("Bearer {}", metadata.token))
.body(req.encode_to_vec())
.send()
.map_err(anyhow::Error::from)
.and_then(|x| async move {
if x.status().is_success() {
Ok(Ok(x.bytes().await?))
} else if x.status().is_client_error() {
Ok(Err((x.status(), x.text().await?)))
} else {
Err(anyhow::anyhow!(
"server error ({:?}): {}",
x.status(),
x.text().await?
))
}
})
.await;
match res {
Ok(x) => break x,
Err(e) => {
log::error!("retryable error in {}: {}", method, e);
randomized_exponential_backoff(Duration::from_millis(0), attempt).await;
attempt += 1;
}
}
};
let res = match res {
Ok(x) => x,
Err((status, message)) => {
return Err(type_error(format!(
"client error in {} (status {:?}): {}",
method, status, message
)))
}
};
match R::decode(&*res) {
Ok(x) => Ok(x),
Err(e) => Err(type_error(format!(
"failed to decode response from {}: {}",
method, e
))),
}
}

View File

@ -724,6 +724,7 @@ impl Database for SqliteDb {
async fn snapshot_read(
&self,
_state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
_options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
@ -769,6 +770,7 @@ impl Database for SqliteDb {
async fn atomic_write(
&self,
_state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
let write = Arc::new(write);
@ -894,7 +896,10 @@ impl Database for SqliteDb {
Ok(commit_result)
}
async fn dequeue_next_message(&self) -> Result<Self::QMH, AnyError> {
async fn dequeue_next_message(
&self,
_state: Rc<RefCell<OpState>>,
) -> Result<Self::QMH, AnyError> {
let queue = self
.queue
.get_or_init(|| async move { SqliteQueue::new(self.conn.clone()) })

View File

@ -1483,6 +1483,22 @@ impl deno_kv::sqlite::SqliteDbHandlerPermissions for PermissionsContainer {
}
}
impl deno_kv::remote::RemoteDbHandlerPermissions for PermissionsContainer {
#[inline(always)]
fn check_env(&mut self, var: &str) -> Result<(), AnyError> {
self.0.lock().env.check(var)
}
#[inline(always)]
fn check_net_url(
&mut self,
url: &url::Url,
api_name: &str,
) -> Result<(), AnyError> {
self.0.lock().net.check_url(url, Some(api_name))
}
}
fn unit_permission_from_flag_bools(
allow_flag: bool,
deny_flag: bool,

View File

@ -38,7 +38,7 @@ use deno_core::SourceMapGetter;
use deno_fs::FileSystem;
use deno_http::DefaultHttpPropertyExtractor;
use deno_io::Stdio;
use deno_kv::sqlite::SqliteDbHandler;
use deno_kv::dynamic::MultiBackendDbHandler;
use deno_node::SUPPORTED_BUILTIN_NODE_MODULES_WITH_PREFIX;
use deno_tls::RootCertStoreProvider;
use deno_web::create_entangled_message_port;
@ -439,7 +439,7 @@ impl WebWorker {
),
deno_tls::deno_tls::init_ops_and_esm(),
deno_kv::deno_kv::init_ops_and_esm(
SqliteDbHandler::<PermissionsContainer>::new(None),
MultiBackendDbHandler::remote_or_sqlite::<PermissionsContainer>(None),
unstable,
),
deno_napi::deno_napi::init_ops_and_esm::<PermissionsContainer>(),

View File

@ -35,7 +35,7 @@ use deno_core::SourceMapGetter;
use deno_fs::FileSystem;
use deno_http::DefaultHttpPropertyExtractor;
use deno_io::Stdio;
use deno_kv::sqlite::SqliteDbHandler;
use deno_kv::dynamic::MultiBackendDbHandler;
use deno_node::SUPPORTED_BUILTIN_NODE_MODULES_WITH_PREFIX;
use deno_tls::RootCertStoreProvider;
use deno_web::BlobStore;
@ -334,7 +334,7 @@ impl MainWorker {
),
deno_tls::deno_tls::init_ops_and_esm(),
deno_kv::deno_kv::init_ops_and_esm(
SqliteDbHandler::<PermissionsContainer>::new(
MultiBackendDbHandler::remote_or_sqlite::<PermissionsContainer>(
options.origin_storage_dir.clone(),
),
unstable,

View File

@ -31,6 +31,7 @@ once_cell.workspace = true
os_pipe.workspace = true
parking_lot.workspace = true
pretty_assertions.workspace = true
prost.workspace = true
regex.workspace = true
reqwest.workspace = true
ring.workspace = true
@ -46,3 +47,6 @@ url.workspace = true
[target.'cfg(windows)'.dependencies]
winapi = { workspace = true, features = ["consoleapi", "synchapi", "handleapi", "namedpipeapi", "winbase", "winerror"] }
[build-dependencies]
prost-build.workspace = true

22
test_util/build.rs Normal file
View File

@ -0,0 +1,22 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::env;
use std::io;
use std::path::PathBuf;
fn main() -> io::Result<()> {
println!("cargo:rerun-if-changed=../ext/kv/proto");
let descriptor_path =
PathBuf::from(env::var("OUT_DIR").unwrap()).join("proto_descriptor.bin");
prost_build::Config::new()
.file_descriptor_set_path(&descriptor_path)
.compile_well_known_types()
.compile_protos(
&["../ext/kv/proto/datapath.proto"],
&["../ext/kv/proto/"],
)?;
Ok(())
}

View File

@ -0,0 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Generated code, disable lints
#[allow(clippy::all, non_snake_case)]
pub mod datapath {
include!(concat!(env!("OUT_DIR"), "/datapath.rs"));
}

View File

@ -15,9 +15,16 @@ use hyper::Body;
use hyper::Request;
use hyper::Response;
use hyper::StatusCode;
use kv_remote::datapath::AtomicWrite;
use kv_remote::datapath::AtomicWriteOutput;
use kv_remote::datapath::AtomicWriteStatus;
use kv_remote::datapath::ReadRangeOutput;
use kv_remote::datapath::SnapshotRead;
use kv_remote::datapath::SnapshotReadOutput;
use npm::CUSTOM_NPM_PACKAGE_CACHE;
use once_cell::sync::Lazy;
use pretty_assertions::assert_eq;
use prost::Message;
use pty::Pty;
use regex::Regex;
use rustls::Certificate;
@ -57,6 +64,7 @@ pub mod assertions;
mod builders;
pub mod factory;
mod fs;
mod kv_remote;
pub mod lsp;
mod npm;
pub mod pty;
@ -72,6 +80,9 @@ const PORT: u16 = 4545;
const TEST_AUTH_TOKEN: &str = "abcdef123456789";
const TEST_BASIC_AUTH_USERNAME: &str = "testuser123";
const TEST_BASIC_AUTH_PASSWORD: &str = "testpassabc";
const KV_DATABASE_ID: &str = "11111111-1111-1111-1111-111111111111";
const KV_ACCESS_TOKEN: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
const KV_DATABASE_TOKEN: &str = "MOCKMOCKMOCKMOCKMOCKMOCKMOCK";
const REDIRECT_PORT: u16 = 4546;
const ANOTHER_REDIRECT_PORT: u16 = 4547;
const DOUBLE_REDIRECTS_PORT: u16 = 4548;
@ -1095,6 +1106,199 @@ async fn main_server(
let res = Response::new(Body::from(query.unwrap_or_default()));
Ok(res)
}
(&hyper::Method::POST, "/kv_remote_authorize") => {
if req
.headers()
.get("authorization")
.and_then(|x| x.to_str().ok())
.unwrap_or_default()
!= format!("Bearer {}", KV_ACCESS_TOKEN)
{
return Ok(
Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body(Body::empty())
.unwrap(),
);
}
Ok(
Response::builder()
.header("content-type", "application/json")
.body(Body::from(
serde_json::json!({
"version": 1,
"databaseId": KV_DATABASE_ID,
"endpoints": [
{
"url": format!("http://localhost:{}/kv_blackhole", PORT),
"consistency": "strong",
}
],
"token": KV_DATABASE_TOKEN,
"expiresAt": "2099-01-01T00:00:00Z",
})
.to_string(),
))
.unwrap(),
)
}
(&hyper::Method::POST, "/kv_remote_authorize_invalid_format") => {
if req
.headers()
.get("authorization")
.and_then(|x| x.to_str().ok())
.unwrap_or_default()
!= format!("Bearer {}", KV_ACCESS_TOKEN)
{
return Ok(
Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body(Body::empty())
.unwrap(),
);
}
Ok(
Response::builder()
.header("content-type", "application/json")
.body(Body::from(
serde_json::json!({
"version": 1,
"databaseId": KV_DATABASE_ID,
})
.to_string(),
))
.unwrap(),
)
}
(&hyper::Method::POST, "/kv_remote_authorize_invalid_version") => {
if req
.headers()
.get("authorization")
.and_then(|x| x.to_str().ok())
.unwrap_or_default()
!= format!("Bearer {}", KV_ACCESS_TOKEN)
{
return Ok(
Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body(Body::empty())
.unwrap(),
);
}
Ok(
Response::builder()
.header("content-type", "application/json")
.body(Body::from(
serde_json::json!({
"version": 2,
"databaseId": KV_DATABASE_ID,
"endpoints": [
{
"url": format!("http://localhost:{}/kv_blackhole", PORT),
"consistency": "strong",
}
],
"token": KV_DATABASE_TOKEN,
"expiresAt": "2099-01-01T00:00:00Z",
})
.to_string(),
))
.unwrap(),
)
}
(&hyper::Method::POST, "/kv_blackhole/snapshot_read") => {
if req
.headers()
.get("authorization")
.and_then(|x| x.to_str().ok())
.unwrap_or_default()
!= format!("Bearer {}", KV_DATABASE_TOKEN)
{
return Ok(
Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body(Body::empty())
.unwrap(),
);
}
let body = hyper::body::to_bytes(req.into_body())
.await
.unwrap_or_default();
let Ok(body): Result<SnapshotRead, _> = prost::Message::decode(&body[..]) else {
return Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::empty())
.unwrap());
};
if body.ranges.is_empty() {
return Ok(
Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::empty())
.unwrap(),
);
}
Ok(
Response::builder()
.body(Body::from(
SnapshotReadOutput {
ranges: body
.ranges
.iter()
.map(|_| ReadRangeOutput { values: vec![] })
.collect(),
read_disabled: false,
regions_if_read_disabled: vec![],
read_is_strongly_consistent: true,
primary_if_not_strongly_consistent: "".into(),
}
.encode_to_vec(),
))
.unwrap(),
)
}
(&hyper::Method::POST, "/kv_blackhole/atomic_write") => {
if req
.headers()
.get("authorization")
.and_then(|x| x.to_str().ok())
.unwrap_or_default()
!= format!("Bearer {}", KV_DATABASE_TOKEN)
{
return Ok(
Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body(Body::empty())
.unwrap(),
);
}
let body = hyper::body::to_bytes(req.into_body())
.await
.unwrap_or_default();
let Ok(_body): Result<AtomicWrite, _> = prost::Message::decode(&body[..]) else {
return Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::empty())
.unwrap());
};
Ok(
Response::builder()
.body(Body::from(
AtomicWriteOutput {
status: AtomicWriteStatus::AwSuccess.into(),
versionstamp: vec![0u8; 10],
primary_if_write_disabled: "".into(),
}
.encode_to_vec(),
))
.unwrap(),
)
}
_ => {
let mut file_path = testdata_path().to_path_buf();
file_path.push(&req.uri().path()[1..]);