node/test/parallel/test-stream-readable-async-iterators.js
Node.js GitHub Bot 9c3c66b499
tools: update eslint to 8.18.0
PR-URL: https://github.com/nodejs/node/pull/43479
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Michaël Zasso <targos@protonmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
Reviewed-By: Mohammed Keyvanzadeh <mohammadkeyvanzade94@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
2022-06-19 19:39:41 +01:00

882 lines
19 KiB
JavaScript

'use strict';
const common = require('../common');
const {
Stream,
Readable,
Transform,
PassThrough,
pipeline
} = require('stream');
const assert = require('assert');
const http = require('http');
const fs = require('fs');
async function tests() {
{
// v1 stream
const stream = new Stream();
stream.destroy = common.mustCall();
process.nextTick(() => {
stream.emit('data', 'hello');
stream.emit('data', 'world');
stream.emit('end');
});
let res = '';
stream[Symbol.asyncIterator] = Readable.prototype[Symbol.asyncIterator];
for await (const d of stream) {
res += d;
}
assert.strictEqual(res, 'helloworld');
}
{
// v1 stream error
const stream = new Stream();
stream.close = common.mustCall();
process.nextTick(() => {
stream.emit('data', 0);
stream.emit('data', 1);
stream.emit('error', new Error('asd'));
});
const iter = Readable.prototype[Symbol.asyncIterator].call(stream);
await iter.next()
.then(common.mustNotCall())
.catch(common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
}));
}
{
// Non standard stream cleanup
const readable = new Readable({ autoDestroy: false, read() {} });
readable.push('asd');
readable.push('asd');
readable.destroy = null;
readable.close = common.mustCall(() => {
readable.emit('close');
});
await (async () => {
for await (const d of readable) {
return;
}
})();
}
{
const readable = new Readable({ objectMode: true, read() {} });
readable.push(0);
readable.push(1);
readable.push(null);
const iter = readable[Symbol.asyncIterator]();
assert.strictEqual((await iter.next()).value, 0);
for await (const d of iter) {
assert.strictEqual(d, 1);
}
}
{
console.log('read without for..await');
const max = 5;
const readable = new Readable({
objectMode: true,
read() {}
});
const iter = readable[Symbol.asyncIterator]();
assert.strictEqual(iter.stream, readable);
const values = [];
for (let i = 0; i < max; i++) {
values.push(iter.next());
}
Promise.all(values).then(common.mustCall((values) => {
values.forEach(common.mustCall(
(item, i) => assert.strictEqual(item.value, 'hello-' + i), 5));
}));
readable.push('hello-0');
readable.push('hello-1');
readable.push('hello-2');
readable.push('hello-3');
readable.push('hello-4');
readable.push(null);
const last = await iter.next();
assert.strictEqual(last.done, true);
}
{
console.log('read without for..await deferred');
const readable = new Readable({
objectMode: true,
read() {}
});
const iter = readable[Symbol.asyncIterator]();
assert.strictEqual(iter.stream, readable);
let values = [];
for (let i = 0; i < 3; i++) {
values.push(iter.next());
}
readable.push('hello-0');
readable.push('hello-1');
readable.push('hello-2');
let k = 0;
const results1 = await Promise.all(values);
results1.forEach(common.mustCall(
(item) => assert.strictEqual(item.value, 'hello-' + k++), 3));
values = [];
for (let i = 0; i < 2; i++) {
values.push(iter.next());
}
readable.push('hello-3');
readable.push('hello-4');
readable.push(null);
const results2 = await Promise.all(values);
results2.forEach(common.mustCall(
(item) => assert.strictEqual(item.value, 'hello-' + k++), 2));
const last = await iter.next();
assert.strictEqual(last.done, true);
}
{
console.log('read without for..await with errors');
const max = 3;
const readable = new Readable({
objectMode: true,
read() {}
});
const iter = readable[Symbol.asyncIterator]();
assert.strictEqual(iter.stream, readable);
const values = [];
const errors = [];
let i;
for (i = 0; i < max; i++) {
values.push(iter.next());
}
for (i = 0; i < 2; i++) {
errors.push(iter.next());
}
readable.push('hello-0');
readable.push('hello-1');
readable.push('hello-2');
const resolved = await Promise.all(values);
resolved.forEach(common.mustCall(
(item, i) => assert.strictEqual(item.value, 'hello-' + i), max));
errors.slice(0, 1).forEach((promise) => {
promise.catch(common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));
});
errors.slice(1).forEach((promise) => {
promise.then(common.mustCall(({ done, value }) => {
assert.strictEqual(done, true);
assert.strictEqual(value, undefined);
}));
});
readable.destroy(new Error('kaboom'));
}
{
console.log('call next() after error');
const readable = new Readable({
read() {}
});
const iterator = readable[Symbol.asyncIterator]();
const err = new Error('kaboom');
readable.destroy(err);
await assert.rejects(iterator.next.bind(iterator), err);
}
{
console.log('read object mode');
const max = 42;
let readed = 0;
let received = 0;
const readable = new Readable({
objectMode: true,
read() {
this.push('hello');
if (++readed === max) {
this.push(null);
}
}
});
for await (const k of readable) {
received++;
assert.strictEqual(k, 'hello');
}
assert.strictEqual(readed, received);
}
{
console.log('destroy sync');
const readable = new Readable({
objectMode: true,
read() {
this.destroy(new Error('kaboom from read'));
}
});
let err;
try {
// eslint-disable-next-line no-unused-vars, no-empty
for await (const k of readable) { }
} catch (e) {
err = e;
}
assert.strictEqual(err.message, 'kaboom from read');
}
{
console.log('destroy async');
const readable = new Readable({
objectMode: true,
read() {
if (!this.pushed) {
this.push('hello');
this.pushed = true;
setImmediate(() => {
this.destroy(new Error('kaboom'));
});
}
}
});
let received = 0;
let err = null;
try {
// eslint-disable-next-line no-unused-vars
for await (const k of readable) {
received++;
}
} catch (e) {
err = e;
}
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(received, 1);
}
{
console.log('destroyed by throw');
const readable = new Readable({
objectMode: true,
read() {
this.push('hello');
}
});
let err = null;
try {
for await (const k of readable) {
assert.strictEqual(k, 'hello');
throw new Error('kaboom');
}
} catch (e) {
err = e;
}
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(readable.destroyed, true);
}
{
console.log('destroyed sync after push');
const readable = new Readable({
objectMode: true,
read() {
this.push('hello');
this.destroy(new Error('kaboom'));
}
});
let received = 0;
let err = null;
try {
for await (const k of readable) {
assert.strictEqual(k, 'hello');
received++;
}
} catch (e) {
err = e;
}
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(received, 1);
}
{
console.log('destroyed will not deadlock');
const readable = new Readable();
readable.destroy();
process.nextTick(async () => {
readable.on('close', common.mustNotCall());
let received = 0;
let err = null;
try {
for await (const k of readable) {
// Just make linting pass. This should never run.
assert.strictEqual(k, 'hello');
received++;
}
} catch (_err) {
err = _err;
}
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
assert.strictEqual(received, 0);
});
}
{
console.log('push async');
const max = 42;
let readed = 0;
let received = 0;
const readable = new Readable({
objectMode: true,
read() {
setImmediate(() => {
this.push('hello');
if (++readed === max) {
this.push(null);
}
});
}
});
for await (const k of readable) {
received++;
assert.strictEqual(k, 'hello');
}
assert.strictEqual(readed, received);
}
{
console.log('push binary async');
const max = 42;
let readed = 0;
const readable = new Readable({
read() {
setImmediate(() => {
this.push('hello');
if (++readed === max) {
this.push(null);
}
});
}
});
let expected = '';
readable.setEncoding('utf8');
readable.pause();
readable.on('data', (chunk) => {
expected += chunk;
});
let data = '';
for await (const k of readable) {
data += k;
}
assert.strictEqual(data, expected);
}
{
console.log('.next() on destroyed stream');
const readable = new Readable({
read() {
// no-op
}
});
readable.destroy();
const it = await readable[Symbol.asyncIterator]();
const next = it.next();
next
.then(common.mustNotCall())
.catch(common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}));
}
{
console.log('.next() on pipelined stream');
const readable = new Readable({
read() {
// no-op
}
});
const passthrough = new PassThrough();
const err = new Error('kaboom');
pipeline(readable, passthrough, common.mustCall((e) => {
assert.strictEqual(e, err);
}));
readable.destroy(err);
await assert.rejects(
readable[Symbol.asyncIterator]().next(),
(e) => {
assert.strictEqual(e, err);
return true;
}
);
}
{
console.log('iterating on an ended stream completes');
const r = new Readable({
objectMode: true,
read() {
this.push('asdf');
this.push('hehe');
this.push(null);
}
});
// eslint-disable-next-line no-unused-vars, no-empty
for await (const a of r) { }
// eslint-disable-next-line no-unused-vars, no-empty
for await (const b of r) { }
}
{
console.log('destroy mid-stream errors');
const r = new Readable({
objectMode: true,
read() {
this.push('asdf');
this.push('hehe');
}
});
let err = null;
try {
// eslint-disable-next-line no-unused-vars
for await (const a of r) {
r.destroy(null);
}
} catch (_err) {
err = _err;
}
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}
{
console.log('readable side of a transform stream pushes null');
const transform = new Transform({
objectMode: true,
transform: (chunk, enc, cb) => { cb(null, chunk); }
});
transform.push(0);
transform.push(1);
process.nextTick(() => {
transform.push(null);
});
const mustReach = [ common.mustCall(), common.mustCall() ];
const iter = transform[Symbol.asyncIterator]();
assert.strictEqual((await iter.next()).value, 0);
for await (const d of iter) {
assert.strictEqual(d, 1);
mustReach[0]();
}
mustReach[1]();
}
{
console.log('all next promises must be resolved on end');
const r = new Readable({
objectMode: true,
read() {
}
});
const b = r[Symbol.asyncIterator]();
const c = b.next();
const d = b.next();
r.push(null);
assert.deepStrictEqual(await c, { done: true, value: undefined });
assert.deepStrictEqual(await d, { done: true, value: undefined });
}
{
console.log('all next promises must be rejected on destroy');
const r = new Readable({
objectMode: true,
read() {
}
});
const b = r[Symbol.asyncIterator]();
const c = b.next();
const d = b.next();
r.destroy();
c
.then(common.mustNotCall())
.catch(common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}));
assert.deepStrictEqual(await d, { done: true, value: undefined });
}
{
console.log('all next promises must be resolved on destroy with error');
const r = new Readable({
objectMode: true,
read() {
}
});
const b = r[Symbol.asyncIterator]();
const c = b.next();
const d = b.next();
const err = new Error('kaboom');
r.destroy(err);
await Promise.all([(async () => {
let e;
try {
await c;
} catch (_e) {
e = _e;
}
assert.strictEqual(e, err);
})(), (async () => {
let e;
let x;
try {
x = await d;
} catch (_e) {
e = _e;
}
assert.strictEqual(e, undefined);
assert.strictEqual(x.done, true);
assert.strictEqual(x.value, undefined);
})()]);
}
{
const _err = new Error('asd');
const r = new Readable({
read() {
},
destroy(err, callback) {
setTimeout(() => callback(_err), 1);
}
});
r.destroy();
const it = r[Symbol.asyncIterator]();
it.next().catch(common.mustCall((err) => {
assert.strictEqual(err, _err);
}));
}
{
// Don't destroy if no auto destroy.
// https://github.com/nodejs/node/issues/35116
const r = new Readable({
autoDestroy: false,
read() {
this.push('asd');
this.push(null);
}
});
for await (const chunk of r) { } // eslint-disable-line no-unused-vars, no-empty
assert.strictEqual(r.destroyed, false);
}
{
// Destroy if no auto destroy and premature break.
// https://github.com/nodejs/node/pull/35122/files#r485678318
const r = new Readable({
autoDestroy: false,
read() {
this.push('asd');
}
});
for await (const chunk of r) { // eslint-disable-line no-unused-vars
break;
}
assert.strictEqual(r.destroyed, true);
}
{
// Don't destroy before 'end'.
const r = new Readable({
read() {
this.push('asd');
this.push(null);
}
}).on('end', () => {
assert.strictEqual(r.destroyed, false);
});
for await (const chunk of r) { } // eslint-disable-line no-unused-vars, no-empty
assert.strictEqual(r.destroyed, true);
}
}
{
// AsyncIterator return should end even when destroy
// does not implement the callback API.
const r = new Readable({
objectMode: true,
read() {
}
});
const originalDestroy = r.destroy;
r.destroy = (err) => {
originalDestroy.call(r, err);
};
const it = r[Symbol.asyncIterator]();
const p = it.return();
r.push(null);
p.then(common.mustCall());
}
{
// AsyncIterator return should not error with
// premature close.
const r = new Readable({
objectMode: true,
read() {
}
});
const originalDestroy = r.destroy;
r.destroy = (err) => {
originalDestroy.call(r, err);
};
const it = r[Symbol.asyncIterator]();
const p = it.return();
r.emit('close');
p.then(common.mustCall()).catch(common.mustNotCall());
}
{
// AsyncIterator should not finish correctly if destroyed.
const r = new Readable({
objectMode: true,
read() {
}
});
r.destroy();
r.on('close', () => {
const it = r[Symbol.asyncIterator]();
const next = it.next();
next
.then(common.mustNotCall())
.catch(common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}));
});
}
{
// AsyncIterator should throw if prematurely closed
// before end has been emitted.
(async function() {
const readable = fs.createReadStream(__filename);
try {
// eslint-disable-next-line no-unused-vars
for await (const chunk of readable) {
readable.close();
}
assert.fail('should have thrown');
} catch (err) {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}
assert.ok(readable.destroyed);
})().then(common.mustCall());
}
// AsyncIterator non-destroying iterator
{
function createReadable() {
return Readable.from((async function* () {
await Promise.resolve();
yield 5;
await Promise.resolve();
yield 7;
await Promise.resolve();
})());
}
// Check explicit destroying on return
(async function() {
const readable = createReadable();
for await (const chunk of readable.iterator({ destroyOnReturn: true })) {
assert.strictEqual(chunk, 5);
break;
}
assert.ok(readable.destroyed);
})().then(common.mustCall());
// Check explicit non-destroy with return true
(async function() {
const readable = createReadable();
const opts = { destroyOnReturn: false };
for await (const chunk of readable.iterator(opts)) {
assert.strictEqual(chunk, 5);
break;
}
assert.ok(!readable.destroyed);
for await (const chunk of readable.iterator(opts)) {
assert.strictEqual(chunk, 7);
}
assert.ok(readable.destroyed);
})().then(common.mustCall());
// Check non-object options.
{
const readable = createReadable();
assert.throws(
() => readable.iterator(42),
{
code: 'ERR_INVALID_ARG_TYPE',
name: 'TypeError',
message: 'The "options" argument must be of type object. Received ' +
'type number (42)',
}
);
}
// Check for dangling listeners
(async function() {
const readable = createReadable();
const opts = { destroyOnReturn: false };
while (readable.readable) {
// eslint-disable-next-line no-unused-vars
for await (const chunk of readable.iterator(opts)) {
break;
}
}
assert.deepStrictEqual(readable.eventNames(), []);
})().then(common.mustCall());
}
{
let _req;
const server = http.createServer((request, response) => {
response.statusCode = 404;
response.write('never ends');
});
server.listen(() => {
_req = http.request(`http://localhost:${server.address().port}`)
.on('response', common.mustCall(async (res) => {
setTimeout(() => {
_req.destroy(new Error('something happened'));
}, 100);
res.on('error', common.mustCall());
let _err;
try {
// eslint-disable-next-line no-unused-vars, no-empty
for await (const chunk of res) { }
} catch (err) {
_err = err;
}
assert.strictEqual(_err.code, 'ECONNRESET');
server.close();
}))
.on('error', common.mustCall())
.end();
});
}
{
async function getParsedBody(request) {
let body = '';
for await (const data of request) {
body += data;
}
try {
return JSON.parse(body);
} catch {
return {};
}
}
const str = JSON.stringify({ asd: true });
const server = http.createServer(async (request, response) => {
const body = await getParsedBody(request);
response.statusCode = 200;
assert.strictEqual(JSON.stringify(body), str);
response.end(JSON.stringify(body));
}).listen(() => {
http
.request({
method: 'POST',
hostname: 'localhost',
port: server.address().port,
})
.end(str)
.on('response', async (res) => {
let body = '';
for await (const chunk of res) {
body += chunk;
}
assert.strictEqual(body, str);
server.close();
});
});
}
// To avoid missing some tests if a promise does not resolve
tests().then(common.mustCall());