fix(async/pooledMap): return ordered result (#2239)

closes #2236
This commit is contained in:
Thomas Cruveilher 2022-05-19 07:05:49 +02:00 committed by GitHub
parent 0a542ef2c0
commit 215139c170
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 5 deletions

View File

@ -1,5 +1,7 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
export const ERROR_WHILE_MAPPING_MESSAGE = "Threw while mapping.";
/** /**
* pooledMap transforms values from an (async) iterable into another async * pooledMap transforms values from an (async) iterable into another async
* iterable. The transforms are done concurrently, with a max concurrency * iterable. The transforms are done concurrently, with a max concurrency
@ -25,7 +27,17 @@ export function pooledMap<T, R>(
p: Promise<R>, p: Promise<R>,
controller: TransformStreamDefaultController<R>, controller: TransformStreamDefaultController<R>,
) { ) {
controller.enqueue(await p); try {
const s = await p;
controller.enqueue(s);
} catch (e) {
if (
e instanceof AggregateError &&
e.message == ERROR_WHILE_MAPPING_MESSAGE
) {
controller.error(e as unknown);
}
}
}, },
}); });
// Start processing items from the iterator // Start processing items from the iterator
@ -40,7 +52,7 @@ export function pooledMap<T, R>(
// fail the race, taking us to the catch block where all currently // fail the race, taking us to the catch block where all currently
// executing jobs are allowed to finish and all rejections among them // executing jobs are allowed to finish and all rejections among them
// can be reported together. // can be reported together.
p.then((v) => writer.write(Promise.resolve(v))).catch(() => {}); writer.write(p);
const e: Promise<unknown> = p.then(() => const e: Promise<unknown> = p.then(() =>
executing.splice(executing.indexOf(e), 1) executing.splice(executing.indexOf(e), 1)
); );
@ -60,7 +72,7 @@ export function pooledMap<T, R>(
} }
} }
writer.write(Promise.reject( writer.write(Promise.reject(
new AggregateError(errors, "Threw while mapping."), new AggregateError(errors, ERROR_WHILE_MAPPING_MESSAGE),
)).catch(() => {}); )).catch(() => {});
} }
})(); })();

View File

@ -1,6 +1,6 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
import { delay } from "./delay.ts"; import { delay } from "./delay.ts";
import { pooledMap } from "./pool.ts"; import { ERROR_WHILE_MAPPING_MESSAGE, pooledMap } from "./pool.ts";
import { import {
assert, assert,
assertEquals, assertEquals,
@ -23,7 +23,7 @@ Deno.test("[async] pooledMap", async function () {
assert(diff < 3000); assert(diff < 3000);
}); });
Deno.test("[async] pooledMap errors", async function () { Deno.test("[async] pooledMap errors", async () => {
async function mapNumber(n: number): Promise<number> { async function mapNumber(n: number): Promise<number> {
if (n <= 2) { if (n <= 2) {
throw new Error(`Bad number: ${n}`); throw new Error(`Bad number: ${n}`);
@ -38,9 +38,31 @@ Deno.test("[async] pooledMap errors", async function () {
} }
}, (error: Error) => { }, (error: Error) => {
assert(error instanceof AggregateError); assert(error instanceof AggregateError);
assert(error.message === ERROR_WHILE_MAPPING_MESSAGE);
assertEquals(error.errors.length, 2); assertEquals(error.errors.length, 2);
assertStringIncludes(error.errors[0].stack, "Error: Bad number: 1"); assertStringIncludes(error.errors[0].stack, "Error: Bad number: 1");
assertStringIncludes(error.errors[1].stack, "Error: Bad number: 2"); assertStringIncludes(error.errors[1].stack, "Error: Bad number: 2");
}); });
assertEquals(mappedNumbers, [3]); assertEquals(mappedNumbers, [3]);
}); });
Deno.test("pooledMap returns ordered items", async () => {
function getRandomInt(min: number, max: number): number {
min = Math.ceil(min);
max = Math.floor(max);
return Math.floor(Math.random() * (max - min) + min); //The maximum is exclusive and the minimum is inclusive
}
const results = pooledMap(
2,
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
(i) =>
new Promise((r) => setTimeout(() => r(i), getRandomInt(5, 20) * 100)),
);
const returned = [];
for await (const value of results) {
returned.push(value);
}
assertEquals(returned, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});