mirror of
https://github.com/nodejs/node.git
synced 2024-11-21 10:59:27 +00:00
49413ad8ae
Refs: https://github.com/nodejs/node/pull/39519 PR-URL: https://github.com/nodejs/node/pull/46190 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com>
404 lines
9.2 KiB
JavaScript
404 lines
9.2 KiB
JavaScript
'use strict';
|
|
|
|
const common = require('../common');
|
|
const assert = require('assert');
|
|
const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream');
|
|
const { ReadableStream, WritableStream } = require('stream/web');
|
|
const { Blob } = require('buffer');
|
|
|
|
{
|
|
const d = Duplex.from({
|
|
readable: new Readable({
|
|
read() {
|
|
this.push('asd');
|
|
this.push(null);
|
|
}
|
|
})
|
|
});
|
|
assert.strictEqual(d.readable, true);
|
|
assert.strictEqual(d.writable, false);
|
|
d.once('readable', common.mustCall(function() {
|
|
assert.strictEqual(d.read().toString(), 'asd');
|
|
}));
|
|
d.once('end', common.mustCall(function() {
|
|
assert.strictEqual(d.readable, false);
|
|
}));
|
|
}
|
|
|
|
{
|
|
const d = Duplex.from(new Readable({
|
|
read() {
|
|
this.push('asd');
|
|
this.push(null);
|
|
}
|
|
}));
|
|
assert.strictEqual(d.readable, true);
|
|
assert.strictEqual(d.writable, false);
|
|
d.once('readable', common.mustCall(function() {
|
|
assert.strictEqual(d.read().toString(), 'asd');
|
|
}));
|
|
d.once('end', common.mustCall(function() {
|
|
assert.strictEqual(d.readable, false);
|
|
}));
|
|
}
|
|
|
|
{
|
|
let ret = '';
|
|
const d = Duplex.from(new Writable({
|
|
write(chunk, encoding, callback) {
|
|
ret += chunk;
|
|
callback();
|
|
}
|
|
}));
|
|
assert.strictEqual(d.readable, false);
|
|
assert.strictEqual(d.writable, true);
|
|
d.end('asd');
|
|
d.on('finish', common.mustCall(function() {
|
|
assert.strictEqual(d.writable, false);
|
|
assert.strictEqual(ret, 'asd');
|
|
}));
|
|
}
|
|
|
|
{
|
|
let ret = '';
|
|
const d = Duplex.from({
|
|
writable: new Writable({
|
|
write(chunk, encoding, callback) {
|
|
ret += chunk;
|
|
callback();
|
|
}
|
|
})
|
|
});
|
|
assert.strictEqual(d.readable, false);
|
|
assert.strictEqual(d.writable, true);
|
|
d.end('asd');
|
|
d.on('finish', common.mustCall(function() {
|
|
assert.strictEqual(d.writable, false);
|
|
assert.strictEqual(ret, 'asd');
|
|
}));
|
|
}
|
|
|
|
{
|
|
let ret = '';
|
|
const d = Duplex.from({
|
|
readable: new Readable({
|
|
read() {
|
|
this.push('asd');
|
|
this.push(null);
|
|
}
|
|
}),
|
|
writable: new Writable({
|
|
write(chunk, encoding, callback) {
|
|
ret += chunk;
|
|
callback();
|
|
}
|
|
})
|
|
});
|
|
assert.strictEqual(d.readable, true);
|
|
assert.strictEqual(d.writable, true);
|
|
d.once('readable', common.mustCall(function() {
|
|
assert.strictEqual(d.read().toString(), 'asd');
|
|
}));
|
|
d.once('end', common.mustCall(function() {
|
|
assert.strictEqual(d.readable, false);
|
|
}));
|
|
d.end('asd');
|
|
d.once('finish', common.mustCall(function() {
|
|
assert.strictEqual(d.writable, false);
|
|
assert.strictEqual(ret, 'asd');
|
|
}));
|
|
}
|
|
|
|
{
|
|
const d = Duplex.from(Promise.resolve('asd'));
|
|
assert.strictEqual(d.readable, true);
|
|
assert.strictEqual(d.writable, false);
|
|
d.once('readable', common.mustCall(function() {
|
|
assert.strictEqual(d.read().toString(), 'asd');
|
|
}));
|
|
d.once('end', common.mustCall(function() {
|
|
assert.strictEqual(d.readable, false);
|
|
}));
|
|
}
|
|
|
|
{
|
|
// https://github.com/nodejs/node/issues/40497
|
|
pipeline(
|
|
['abc\ndef\nghi'],
|
|
Duplex.from(async function * (source) {
|
|
let rest = '';
|
|
for await (const chunk of source) {
|
|
const lines = (rest + chunk.toString()).split('\n');
|
|
rest = lines.pop();
|
|
for (const line of lines) {
|
|
yield line;
|
|
}
|
|
}
|
|
yield rest;
|
|
}),
|
|
async function * (source) { // eslint-disable-line require-yield
|
|
let ret = '';
|
|
for await (const x of source) {
|
|
ret += x;
|
|
}
|
|
assert.strictEqual(ret, 'abcdefghi');
|
|
},
|
|
common.mustSucceed(),
|
|
);
|
|
}
|
|
|
|
// Ensure that isDuplexNodeStream was called
|
|
{
|
|
const duplex = new Duplex();
|
|
assert.strictEqual(Duplex.from(duplex), duplex);
|
|
}
|
|
|
|
// Ensure that Duplex.from works for blobs
|
|
{
|
|
const blob = new Blob(['blob']);
|
|
const expectedByteLength = blob.size;
|
|
const duplex = Duplex.from(blob);
|
|
duplex.on('data', common.mustCall((arrayBuffer) => {
|
|
assert.strictEqual(arrayBuffer.byteLength, expectedByteLength);
|
|
}));
|
|
}
|
|
|
|
// Ensure that given a promise rejection it emits an error
|
|
{
|
|
const myErrorMessage = 'myCustomError';
|
|
Duplex.from(Promise.reject(myErrorMessage))
|
|
.on('error', common.mustCall((error) => {
|
|
assert.strictEqual(error, myErrorMessage);
|
|
}));
|
|
}
|
|
|
|
// Ensure that given a promise rejection on an async function it emits an error
|
|
{
|
|
const myErrorMessage = 'myCustomError';
|
|
async function asyncFn() {
|
|
return Promise.reject(myErrorMessage);
|
|
}
|
|
|
|
Duplex.from(asyncFn)
|
|
.on('error', common.mustCall((error) => {
|
|
assert.strictEqual(error, myErrorMessage);
|
|
}));
|
|
}
|
|
|
|
// Ensure that Duplex.from throws an Invalid return value when function is void
|
|
{
|
|
assert.throws(() => Duplex.from(() => {}), {
|
|
code: 'ERR_INVALID_RETURN_VALUE',
|
|
});
|
|
}
|
|
|
|
// Ensure data if a sub object has a readable stream it's duplexified
|
|
{
|
|
const msg = Buffer.from('hello');
|
|
const duplex = Duplex.from({
|
|
readable: Readable({
|
|
read() {
|
|
this.push(msg);
|
|
this.push(null);
|
|
}
|
|
})
|
|
}).on('data', common.mustCall((data) => {
|
|
assert.strictEqual(data, msg);
|
|
}));
|
|
|
|
assert.strictEqual(duplex.writable, false);
|
|
}
|
|
|
|
// Ensure data if a sub object has a writable stream it's duplexified
|
|
{
|
|
const msg = Buffer.from('hello');
|
|
const duplex = Duplex.from({
|
|
writable: Writable({
|
|
write: common.mustCall((data) => {
|
|
assert.strictEqual(data, msg);
|
|
})
|
|
})
|
|
});
|
|
|
|
duplex.write(msg);
|
|
assert.strictEqual(duplex.readable, false);
|
|
}
|
|
|
|
// Ensure data if a sub object has a writable and readable stream it's duplexified
|
|
{
|
|
const msg = Buffer.from('hello');
|
|
|
|
const duplex = Duplex.from({
|
|
readable: Readable({
|
|
read() {
|
|
this.push(msg);
|
|
this.push(null);
|
|
}
|
|
}),
|
|
writable: Writable({
|
|
write: common.mustCall((data) => {
|
|
assert.strictEqual(data, msg);
|
|
})
|
|
})
|
|
});
|
|
|
|
duplex.pipe(duplex)
|
|
.on('data', common.mustCall((data) => {
|
|
assert.strictEqual(data, msg);
|
|
assert.strictEqual(duplex.readable, true);
|
|
assert.strictEqual(duplex.writable, true);
|
|
}))
|
|
.on('end', common.mustCall());
|
|
}
|
|
|
|
// Ensure that given readable stream that throws an error it calls destroy
|
|
{
|
|
const myErrorMessage = 'error!';
|
|
const duplex = Duplex.from(Readable({
|
|
read() {
|
|
throw new Error(myErrorMessage);
|
|
}
|
|
}));
|
|
duplex.on('error', common.mustCall((msg) => {
|
|
assert.strictEqual(msg.message, myErrorMessage);
|
|
}));
|
|
}
|
|
|
|
// Ensure that given writable stream that throws an error it calls destroy
|
|
{
|
|
const myErrorMessage = 'error!';
|
|
const duplex = Duplex.from(Writable({
|
|
write(chunk, enc, cb) {
|
|
cb(myErrorMessage);
|
|
}
|
|
}));
|
|
|
|
duplex.on('error', common.mustCall((msg) => {
|
|
assert.strictEqual(msg, myErrorMessage);
|
|
}));
|
|
|
|
duplex.write('test');
|
|
}
|
|
|
|
{
|
|
const through = new PassThrough({ objectMode: true });
|
|
|
|
let res = '';
|
|
const d = Readable.from(['foo', 'bar'], { objectMode: true })
|
|
.pipe(Duplex.from({
|
|
writable: through,
|
|
readable: through
|
|
}));
|
|
|
|
d.on('data', (data) => {
|
|
d.pause();
|
|
setImmediate(() => {
|
|
d.resume();
|
|
});
|
|
res += data;
|
|
}).on('end', common.mustCall(() => {
|
|
assert.strictEqual(res, 'foobar');
|
|
})).on('close', common.mustCall());
|
|
}
|
|
|
|
function makeATestReadableStream(value) {
|
|
return new ReadableStream({
|
|
start(controller) {
|
|
controller.enqueue(value);
|
|
controller.close();
|
|
}
|
|
});
|
|
}
|
|
|
|
function makeATestWritableStream(writeFunc) {
|
|
return new WritableStream({
|
|
write(chunk) {
|
|
writeFunc(chunk);
|
|
}
|
|
});
|
|
}
|
|
|
|
{
|
|
const d = Duplex.from({
|
|
readable: makeATestReadableStream('foo'),
|
|
});
|
|
assert.strictEqual(d.readable, true);
|
|
assert.strictEqual(d.writable, false);
|
|
|
|
d.on('data', common.mustCall((data) => {
|
|
assert.strictEqual(data.toString(), 'foo');
|
|
}));
|
|
|
|
d.on('end', common.mustCall(() => {
|
|
assert.strictEqual(d.readable, false);
|
|
}));
|
|
}
|
|
|
|
{
|
|
const d = Duplex.from(makeATestReadableStream('foo'));
|
|
|
|
assert.strictEqual(d.readable, true);
|
|
assert.strictEqual(d.writable, false);
|
|
|
|
d.on('data', common.mustCall((data) => {
|
|
assert.strictEqual(data.toString(), 'foo');
|
|
}));
|
|
|
|
d.on('end', common.mustCall(() => {
|
|
assert.strictEqual(d.readable, false);
|
|
}));
|
|
}
|
|
|
|
{
|
|
let ret = '';
|
|
const d = Duplex.from({
|
|
writable: makeATestWritableStream((chunk) => ret += chunk),
|
|
});
|
|
|
|
assert.strictEqual(d.readable, false);
|
|
assert.strictEqual(d.writable, true);
|
|
|
|
d.end('foo');
|
|
d.on('finish', common.mustCall(() => {
|
|
assert.strictEqual(ret, 'foo');
|
|
assert.strictEqual(d.writable, false);
|
|
}));
|
|
}
|
|
|
|
{
|
|
let ret = '';
|
|
const d = Duplex.from(makeATestWritableStream((chunk) => ret += chunk));
|
|
|
|
assert.strictEqual(d.readable, false);
|
|
assert.strictEqual(d.writable, true);
|
|
|
|
d.end('foo');
|
|
d.on('finish', common.mustCall(() => {
|
|
assert.strictEqual(ret, 'foo');
|
|
assert.strictEqual(d.writable, false);
|
|
}));
|
|
}
|
|
|
|
{
|
|
let ret = '';
|
|
const d = Duplex.from({
|
|
readable: makeATestReadableStream('foo'),
|
|
writable: makeATestWritableStream((chunk) => ret += chunk),
|
|
});
|
|
|
|
d.end('bar');
|
|
|
|
d.on('data', common.mustCall((data) => {
|
|
assert.strictEqual(data.toString(), 'foo');
|
|
}));
|
|
|
|
d.on('end', common.mustCall(() => {
|
|
assert.strictEqual(d.readable, false);
|
|
}));
|
|
|
|
d.on('finish', common.mustCall(() => {
|
|
assert.strictEqual(ret, 'bar');
|
|
assert.strictEqual(d.writable, false);
|
|
}));
|
|
}
|