stream: use Array for Readable buffer

PR-URL: https://github.com/nodejs/node/pull/50341
Reviewed-By: Vinícius Lourenço Claro Cardoso <contact@viniciusl.com.br>
Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
This commit is contained in:
Robert Nagy 2023-10-25 17:35:05 +02:00 committed by GitHub
parent c60c11aae1
commit 991fd9c255
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 116 additions and 400 deletions

View File

@ -15,7 +15,7 @@ function main({ n }) {
bench.start();
for (let k = 0; k < n; ++k) {
for (let i = 0; i < 1e4; ++i)
for (let i = 0; i < 1e3; ++i)
s.push(b);
while (s.read(128));
}

View File

@ -15,7 +15,7 @@ function main({ n }) {
bench.start();
for (let k = 0; k < n; ++k) {
for (let i = 0; i < 1e4; ++i)
for (let i = 0; i < 1e3; ++i)
s.push(b);
while (s.read());
}

View File

@ -1,181 +0,0 @@
'use strict';
const {
StringPrototypeSlice,
SymbolIterator,
TypedArrayPrototypeSet,
Uint8Array,
} = primordials;
const { Buffer } = require('buffer');
const { inspect } = require('internal/util/inspect');
module.exports = class BufferList {
constructor() {
this.head = null;
this.tail = null;
this.length = 0;
}
push(v) {
const entry = { data: v, next: null };
if (this.length > 0)
this.tail.next = entry;
else
this.head = entry;
this.tail = entry;
++this.length;
}
unshift(v) {
const entry = { data: v, next: this.head };
if (this.length === 0)
this.tail = entry;
this.head = entry;
++this.length;
}
shift() {
if (this.length === 0)
return;
const ret = this.head.data;
if (this.length === 1)
this.head = this.tail = null;
else
this.head = this.head.next;
--this.length;
return ret;
}
clear() {
this.head = this.tail = null;
this.length = 0;
}
join(s) {
if (this.length === 0)
return '';
let p = this.head;
let ret = '' + p.data;
while ((p = p.next) !== null)
ret += s + p.data;
return ret;
}
concat(n) {
if (this.length === 0)
return Buffer.alloc(0);
const ret = Buffer.allocUnsafe(n >>> 0);
let p = this.head;
let i = 0;
while (p) {
TypedArrayPrototypeSet(ret, p.data, i);
i += p.data.length;
p = p.next;
}
return ret;
}
// Consumes a specified amount of bytes or characters from the buffered data.
consume(n, hasStrings) {
const data = this.head.data;
if (n < data.length) {
// `slice` is the same for buffers and strings.
const slice = data.slice(0, n);
this.head.data = data.slice(n);
return slice;
}
if (n === data.length) {
// First chunk is a perfect match.
return this.shift();
}
// Result spans more than one buffer.
return hasStrings ? this._getString(n) : this._getBuffer(n);
}
first() {
return this.head.data;
}
*[SymbolIterator]() {
for (let p = this.head; p; p = p.next) {
yield p.data;
}
}
// Consumes a specified amount of characters from the buffered data.
_getString(n) {
let ret = '';
let p = this.head;
let c = 0;
do {
const str = p.data;
if (n > str.length) {
ret += str;
n -= str.length;
} else {
if (n === str.length) {
ret += str;
++c;
if (p.next)
this.head = p.next;
else
this.head = this.tail = null;
} else {
ret += StringPrototypeSlice(str, 0, n);
this.head = p;
p.data = StringPrototypeSlice(str, n);
}
break;
}
++c;
} while ((p = p.next) !== null);
this.length -= c;
return ret;
}
// Consumes a specified amount of bytes from the buffered data.
_getBuffer(n) {
const ret = Buffer.allocUnsafe(n);
const retLen = n;
let p = this.head;
let c = 0;
do {
const buf = p.data;
if (n > buf.length) {
TypedArrayPrototypeSet(ret, buf, retLen - n);
n -= buf.length;
} else {
if (n === buf.length) {
TypedArrayPrototypeSet(ret, buf, retLen - n);
++c;
if (p.next)
this.head = p.next;
else
this.head = this.tail = null;
} else {
TypedArrayPrototypeSet(ret,
new Uint8Array(buf.buffer, buf.byteOffset, n),
retLen - n);
this.head = p;
p.data = buf.slice(n);
}
break;
}
++c;
} while ((p = p.next) !== null);
this.length -= c;
return ret;
}
// Make sure the linked list only shows the minimal necessary information.
[inspect.custom](_, options) {
return inspect(this, {
...options,
// Only inspect one level.
depth: 0,
// It should not recurse.
customInspect: false,
});
}
};

View File

@ -31,9 +31,11 @@ const {
ObjectSetPrototypeOf,
Promise,
SafeSet,
Symbol,
SymbolAsyncDispose,
SymbolAsyncIterator,
Symbol,
SymbolSpecies,
TypedArrayPrototypeSet,
} = primordials;
module.exports = Readable;
@ -51,7 +53,6 @@ const eos = require('internal/streams/end-of-stream');
let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
debug = fn;
});
const BufferList = require('internal/streams/buffer_list');
const destroyImpl = require('internal/streams/destroy');
const {
getHighWaterMark,
@ -74,6 +75,7 @@ const { validateObject } = require('internal/validators');
const { kOnConstructed } = require('internal/streams/utils');
const kState = Symbol('kState');
const FastBuffer = Buffer[SymbolSpecies];
const { StringDecoder } = require('string_decoder');
const from = require('internal/streams/from');
@ -270,10 +272,8 @@ function ReadableState(options, stream, isDuplex) {
getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) :
getDefaultHighWaterMark(false);
// A linked list is used to store data chunks instead of an array because the
// linked list can remove elements from the beginning faster than
// array.shift().
this.buffer = new BufferList();
this.buffer = [];
this.bufferIndex = 0;
this.length = 0;
this.pipes = [];
@ -542,10 +542,15 @@ function addChunk(stream, state, chunk, addToFront) {
} else {
// Update the buffer info.
state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length;
if (addToFront)
state.buffer.unshift(chunk);
else
if (addToFront) {
if (state.bufferIndex > 0) {
state.buffer[--state.bufferIndex] = chunk;
} else {
state.buffer.unshift(chunk); // Slow path
}
} else {
state.buffer.push(chunk);
}
if ((state[kState] & kNeedReadable) !== 0)
emitReadable(stream);
@ -560,21 +565,24 @@ Readable.prototype.isPaused = function() {
// Backwards compatibility.
Readable.prototype.setEncoding = function(enc) {
const decoder = new StringDecoder(enc);
this._readableState.decoder = decoder;
// If setEncoding(null), decoder.encoding equals utf8.
this._readableState.encoding = this._readableState.decoder.encoding;
const state = this._readableState;
const decoder = new StringDecoder(enc);
state.decoder = decoder;
// If setEncoding(null), decoder.encoding equals utf8.
state.encoding = state.decoder.encoding;
const buffer = this._readableState.buffer;
// Iterate over current buffer to convert already stored Buffers:
let content = '';
for (const data of buffer) {
for (const data of state.buffer.slice(state.bufferIndex)) {
content += decoder.write(data);
}
buffer.clear();
state.buffer.length = 0;
state.bufferIndex = 0;
if (content !== '')
buffer.push(content);
this._readableState.length = content.length;
state.buffer.push(content);
state.length = content.length;
return this;
};
@ -607,7 +615,7 @@ function howMuchToRead(n, state) {
if (NumberIsNaN(n)) {
// Only flow one buffer at a time.
if ((state[kState] & kFlowing) !== 0 && state.length)
return state.buffer.first().length;
return state.buffer[state.bufferIndex].length;
return state.length;
}
if (n <= state.length)
@ -1545,21 +1553,96 @@ function fromList(n, state) {
if (state.length === 0)
return null;
let idx = state.bufferIndex;
let ret;
if (state.objectMode)
ret = state.buffer.shift();
else if (!n || n >= state.length) {
const buf = state.buffer;
const len = buf.length;
if ((state[kState] & kObjectMode) !== 0) {
ret = buf[idx];
buf[idx++] = null;
} else if (!n || n >= state.length) {
// Read it all, truncate the list.
if (state.decoder)
ret = state.buffer.join('');
else if (state.buffer.length === 1)
ret = state.buffer.first();
else
ret = state.buffer.concat(state.length);
state.buffer.clear();
if ((state[kState] & kDecoder) !== 0) {
ret = '';
while (idx < len) {
ret += buf[idx];
buf[idx++] = null;
}
} else if (len - idx === 0) {
ret = Buffer.alloc(0);
} else if (len - idx === 1) {
ret = buf[idx];
buf[idx++] = null;
} else {
ret = Buffer.allocUnsafe(state.length);
let i = 0;
while (idx < len) {
TypedArrayPrototypeSet(ret, buf[idx], i);
i += buf[idx].length;
buf[idx++] = null;
}
}
} else if (n < buf[idx].length) {
// `slice` is the same for buffers and strings.
ret = buf[idx].slice(0, n);
buf[idx] = buf[idx].slice(n);
} else if (n === buf[idx].length) {
// First chunk is a perfect match.
ret = buf[idx];
buf[idx++] = null;
} else if ((state[kState] & kDecoder) !== 0) {
ret = '';
while (idx < len) {
const str = buf[idx];
if (n > str.length) {
ret += str;
n -= str.length;
buf[idx++] = null;
} else {
if (n === buf.length) {
ret += str;
buf[idx++] = null;
} else {
ret += str.slice(0, n);
buf[idx] = str.slice(n);
}
break;
}
}
} else {
// read part of list.
ret = state.buffer.consume(n, state.decoder);
ret = Buffer.allocUnsafe(n);
const retLen = n;
while (idx < len) {
const data = buf[idx];
if (n > data.length) {
TypedArrayPrototypeSet(ret, data, retLen - n);
n -= data.length;
buf[idx++] = null;
} else {
if (n === data.length) {
TypedArrayPrototypeSet(ret, data, retLen - n);
buf[idx++] = null;
} else {
TypedArrayPrototypeSet(ret, new FastBuffer(data.buffer, data.byteOffset, n), retLen - n);
buf[idx] = new FastBuffer(data.buffer, data.byteOffset + n, data.length - n);
}
break;
}
}
}
if (idx === len) {
state.buffer.length = 0;
state.bufferIndex = 0;
} else if (idx > 1024) {
state.buffer.splice(0, idx);
state.bufferIndex = 0;
} else {
state.bufferIndex = idx;
}
return ret;

View File

@ -107,7 +107,6 @@ if (common.isMainThread) {
'NativeModule internal/perf/event_loop_utilization',
'NativeModule internal/process/worker_thread_only',
'NativeModule internal/streams/add-abort-signal',
'NativeModule internal/streams/buffer_list',
'NativeModule internal/streams/compose',
'NativeModule internal/streams/destroy',
'NativeModule internal/streams/duplex',

View File

@ -1,84 +0,0 @@
// Flags: --expose-internals
'use strict';
require('../common');
const assert = require('assert');
const BufferList = require('internal/streams/buffer_list');
// Test empty buffer list.
const emptyList = new BufferList();
emptyList.shift();
assert.deepStrictEqual(emptyList, new BufferList());
assert.strictEqual(emptyList.join(','), '');
assert.deepStrictEqual(emptyList.concat(0), Buffer.alloc(0));
const buf = Buffer.from('foo');
function testIterator(list, count) {
// test iterator
let len = 0;
// eslint-disable-next-line no-unused-vars
for (const x of list) {
len++;
}
assert.strictEqual(len, count);
}
// Test buffer list with one element.
const list = new BufferList();
testIterator(list, 0);
list.push(buf);
testIterator(list, 1);
for (const x of list) {
assert.strictEqual(x, buf);
}
const copy = list.concat(3);
testIterator(copy, 3);
assert.notStrictEqual(copy, buf);
assert.deepStrictEqual(copy, buf);
assert.strictEqual(list.join(','), 'foo');
const shifted = list.shift();
testIterator(list, 0);
assert.strictEqual(shifted, buf);
assert.deepStrictEqual(list, new BufferList());
{
const list = new BufferList();
list.push('foo');
list.push('bar');
list.push('foo');
list.push('bar');
assert.strictEqual(list.consume(6, true), 'foobar');
assert.strictEqual(list.consume(6, true), 'foobar');
}
{
const list = new BufferList();
list.push('foo');
list.push('bar');
assert.strictEqual(list.consume(5, true), 'fooba');
}
{
const list = new BufferList();
list.push(buf);
list.push(buf);
list.push(buf);
list.push(buf);
assert.strictEqual(list.consume(6).toString(), 'foofoo');
assert.strictEqual(list.consume(6).toString(), 'foofoo');
}
{
const list = new BufferList();
list.push(buf);
list.push(buf);
assert.strictEqual(list.consume(5).toString(), 'foofo');
}

View File

@ -1,101 +0,0 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
// Flags: --expose-internals
'use strict';
require('../common');
const assert = require('assert');
const fromList = require('stream').Readable._fromList;
const BufferList = require('internal/streams/buffer_list');
const util = require('util');
function bufferListFromArray(arr) {
const bl = new BufferList();
for (let i = 0; i < arr.length; ++i)
bl.push(arr[i]);
return bl;
}
{
// Verify behavior with buffers
let list = [ Buffer.from('foog'),
Buffer.from('bark'),
Buffer.from('bazy'),
Buffer.from('kuel') ];
list = bufferListFromArray(list);
assert.strictEqual(
util.inspect([ list ], { compact: false }),
`[
BufferList {
head: [Object],
tail: [Object],
length: 4
}
]`);
// Read more than the first element.
let ret = fromList(6, { buffer: list, length: 16 });
assert.strictEqual(ret.toString(), 'foogba');
// Read exactly the first element.
ret = fromList(2, { buffer: list, length: 10 });
assert.strictEqual(ret.toString(), 'rk');
// Read less than the first element.
ret = fromList(2, { buffer: list, length: 8 });
assert.strictEqual(ret.toString(), 'ba');
// Read more than we have.
ret = fromList(100, { buffer: list, length: 6 });
assert.strictEqual(ret.toString(), 'zykuel');
// all consumed.
assert.deepStrictEqual(list, new BufferList());
}
{
// Verify behavior with strings
let list = [ 'foog',
'bark',
'bazy',
'kuel' ];
list = bufferListFromArray(list);
// Read more than the first element.
let ret = fromList(6, { buffer: list, length: 16, decoder: true });
assert.strictEqual(ret, 'foogba');
// Read exactly the first element.
ret = fromList(2, { buffer: list, length: 10, decoder: true });
assert.strictEqual(ret, 'rk');
// Read less than the first element.
ret = fromList(2, { buffer: list, length: 8, decoder: true });
assert.strictEqual(ret, 'ba');
// Read more than we have.
ret = fromList(100, { buffer: list, length: 6, decoder: true });
assert.strictEqual(ret, 'zykuel');
// all consumed.
assert.deepStrictEqual(list, new BufferList());
}