Upgrade Ava to v4

This commit is contained in:
Henry Mercer 2022-02-01 18:01:11 +00:00
parent 9a40cc5274
commit ce89f1b611
1153 changed files with 27264 additions and 95308 deletions

View file

@ -1,20 +1,30 @@
const {EventEmitter, on} = require('events');
const v8 = require('v8');
const {workerData, parentPort} = require('worker_threads');
const pkg = require('../../package.json');
import {EventEmitter, on} from 'node:events';
import process from 'node:process';
import {workerData, parentPort, threadId} from 'node:worker_threads';
// Used to forward messages received over the `parentPort`. Every subscription
// adds a listener, so do not enforce any maximums.
import pkg from '../pkg.cjs';
// Used to forward messages received over the `parentPort` and any direct ports
// to test workers. Every subscription adds a listener, so do not enforce any
// maximums.
const events = new EventEmitter().setMaxListeners(0);
const emitMessage = message => {
// Wait for a turn of the event loop, to allow new subscriptions to be
// set up in response to the previous message.
setImmediate(() => events.emit('message', message));
};
// Map of active test workers, used in receiveMessages() to get a reference to
// the TestWorker instance, and relevant release functions.
const activeTestWorkers = new Map();
const internalMessagePort = Symbol('Internal MessagePort');
class TestWorker {
constructor(id, file) {
constructor(id, file, port) {
this.id = id;
this.file = file;
this[internalMessagePort] = port;
}
teardown(fn) {
@ -47,10 +57,10 @@ class TestWorker {
}
class ReceivedMessage {
constructor(testWorker, id, serializedData) {
constructor(testWorker, id, data) {
this.testWorker = testWorker;
this.id = id;
this.data = v8.deserialize(new Uint8Array(serializedData));
this.data = data;
}
reply(data) {
@ -98,7 +108,7 @@ async function * receiveMessages(fromTestWorker, replyTo) {
let received = messageCache.get(message);
if (received === undefined) {
received = new ReceivedMessage(active.instance, message.messageId, message.serializedData);
received = new ReceivedMessage(active.instance, message.messageId, message.data);
messageCache.set(message, received);
}
@ -107,59 +117,47 @@ async function * receiveMessages(fromTestWorker, replyTo) {
}
let messageCounter = 0;
const messageIdPrefix = `${workerData.id}/message`;
const messageIdPrefix = `${threadId}/message`;
const nextMessageId = () => `${messageIdPrefix}/${++messageCounter}`;
function publishMessage(testWorker, data, replyTo) {
const id = nextMessageId();
parentPort.postMessage({
testWorker[internalMessagePort].postMessage({
type: 'message',
messageId: id,
testWorkerId: testWorker.id,
serializedData: [...v8.serialize(data)],
replyTo
data,
replyTo,
});
return {
id,
async * replies() {
yield * receiveMessages(testWorker, id);
}
},
};
}
function broadcastMessage(data) {
const id = nextMessageId();
parentPort.postMessage({
type: 'broadcast',
messageId: id,
serializedData: [...v8.serialize(data)]
});
for (const trackedWorker of activeTestWorkers.values()) {
trackedWorker.instance[internalMessagePort].postMessage({
type: 'message',
messageId: id,
data,
});
}
return {
id,
async * replies() {
yield * receiveMessages(undefined, id);
}
},
};
}
async function loadFactory() {
try {
const mod = require(workerData.filename);
if (typeof mod === 'function') {
return mod;
}
return mod.default;
} catch (error) {
if (error && (error.code === 'ERR_REQUIRE_ESM' || (error.code === 'MODULE_NOT_FOUND' && workerData.filename.startsWith('file://')))) {
const {default: factory} = await import(workerData.filename); // eslint-disable-line node/no-unsupported-features/es-syntax
return factory;
}
throw error;
}
const {default: factory} = await import(workerData.filename); // eslint-disable-line node/no-unsupported-features/es-syntax
return factory;
}
let signalAvailable = () => {
@ -175,7 +173,7 @@ loadFactory(workerData.filename).then(factory => {
factory({
negotiateProtocol(supported) {
if (!supported.includes('experimental')) {
if (!supported.includes('ava-4')) {
fatal = new Error(`This version of AVA (${pkg.version}) is not compatible with shared worker plugin at ${workerData.filename}`);
throw fatal;
}
@ -184,12 +182,13 @@ loadFactory(workerData.filename).then(factory => {
parentPort.on('message', async message => {
if (message.type === 'register-test-worker') {
const {id, file} = message;
const instance = new TestWorker(id, file);
const {id, file, port} = message;
const instance = new TestWorker(id, file, port);
activeTestWorkers.set(id, {instance, teardownFns: new Set()});
produceTestWorker(instance);
port.on('message', message => emitMessage({testWorkerId: id, ...message}));
}
if (message.type === 'deregister-test-worker') {
@ -205,18 +204,16 @@ loadFactory(workerData.filename).then(factory => {
parentPort.postMessage({
type: 'deregistered-test-worker',
id
id,
});
}
// Wait for a turn of the event loop, to allow new subscriptions to be
// set up in response to the previous message.
setImmediate(() => events.emit('message', message));
emitMessage(message);
}
});
return {
initialData: workerData.initialData,
protocol: 'experimental',
protocol: 'ava-4',
ready() {
signalAvailable();
@ -235,9 +232,9 @@ loadFactory(workerData.filename).then(factory => {
for await (const [worker] of on(events, 'testWorker')) {
yield worker;
}
}
},
};
}
},
});
}).catch(error => {
if (fatal === undefined) {

View file

@ -1,14 +1,11 @@
const events = require('events');
const serializeError = require('../serialize-error');
import events from 'node:events';
import {pathToFileURL} from 'node:url';
import {Worker} from 'node:worker_threads';
let Worker;
try {
({Worker} = require('worker_threads'));
} catch {}
import serializeError from '../serialize-error.js';
const LOADER = require.resolve('./shared-worker-loader');
const LOADER = new URL('shared-worker-loader.js', import.meta.url);
let sharedWorkerCounter = 0;
const launchedWorkers = new Map();
const waitForAvailable = async worker => {
@ -19,30 +16,28 @@ const waitForAvailable = async worker => {
}
};
function launchWorker({filename, initialData}) {
function launchWorker(filename, initialData) {
if (launchedWorkers.has(filename)) {
return launchedWorkers.get(filename);
}
const id = `shared-worker/${++sharedWorkerCounter}`;
const worker = new Worker(LOADER, {
// Ensure the worker crashes for unhandled rejections, rather than allowing undefined behavior.
execArgv: ['--unhandled-rejections=strict'],
workerData: {
filename,
id,
initialData
}
initialData,
},
});
worker.setMaxListeners(0);
const launched = {
statePromises: {
available: waitForAvailable(worker),
error: events.once(worker, 'error').then(([error]) => error) // eslint-disable-line promise/prefer-await-to-then
error: events.once(worker, 'error').then(([error]) => error),
},
exited: false,
worker
worker,
};
launchedWorkers.set(filename, launched);
@ -53,7 +48,7 @@ function launchWorker({filename, initialData}) {
return launched;
}
async function observeWorkerProcess(fork, runStatus) {
export async function observeWorkerProcess(fork, runStatus) {
let registrationCount = 0;
let signalDeregistered;
const deregistered = new Promise(resolve => {
@ -66,26 +61,11 @@ async function observeWorkerProcess(fork, runStatus) {
}
});
fork.onConnectSharedWorker(async channel => {
const launched = launchWorker(channel);
const handleChannelMessage = ({messageId, replyTo, serializedData}) => {
launched.worker.postMessage({
type: 'message',
testWorkerId: fork.forkId,
messageId,
replyTo,
serializedData
});
};
fork.onConnectSharedWorker(async ({filename, initialData, port, signalError}) => {
const launched = launchWorker(filename, initialData);
const handleWorkerMessage = async message => {
if (message.type === 'broadcast' || (message.type === 'message' && message.testWorkerId === fork.forkId)) {
const {messageId, replyTo, serializedData} = message;
channel.forwardMessageToFork({messageId, replyTo, serializedData});
}
if (message.type === 'deregistered-test-worker' && message.id === fork.forkId) {
if (message.type === 'deregistered-test-worker' && message.id === fork.threadId) {
launched.worker.off('message', handleWorkerMessage);
registrationCount--;
@ -95,35 +75,35 @@ async function observeWorkerProcess(fork, runStatus) {
}
};
launched.statePromises.error.then(error => { // eslint-disable-line promise/prefer-await-to-then
launched.statePromises.error.then(error => {
signalDeregistered();
launched.worker.off('message', handleWorkerMessage);
runStatus.emitStateChange({type: 'shared-worker-error', err: serializeError('Shared worker error', true, error)});
channel.signalError();
signalError();
});
try {
await launched.statePromises.available;
registrationCount++;
port.postMessage({type: 'ready'});
launched.worker.postMessage({
type: 'register-test-worker',
id: fork.forkId,
file: fork.file
});
id: fork.threadId,
file: pathToFileURL(fork.file).toString(),
port,
}, [port]);
fork.promise.finally(() => {
launched.worker.postMessage({
type: 'deregister-test-worker',
id: fork.forkId
id: fork.threadId,
});
channel.off('message', handleChannelMessage);
});
launched.worker.on('message', handleWorkerMessage);
channel.on('message', handleChannelMessage);
channel.signalReady();
} catch {
return;
} finally {
@ -136,5 +116,3 @@ async function observeWorkerProcess(fork, runStatus) {
return deregistered;
}
exports.observeWorkerProcess = observeWorkerProcess;