From c43f5782aa78570215134d2172f57794af98a8e1 Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Thu, 15 Dec 2022 15:35:54 -0800 Subject: [PATCH 01/23] fix(daemon): Log to state, not cache --- packages/daemon/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/daemon/index.js b/packages/daemon/index.js index 279537ee2f..803ef6b0c6 100644 --- a/packages/daemon/index.js +++ b/packages/daemon/index.js @@ -56,7 +56,7 @@ export const start = async (locator = defaultLocator) => { }); await cachePathCreated; - const logPath = path.join(locator.cachePath, 'endo.log'); + const logPath = path.join(locator.statePath, 'endo.log'); await statePathCreated; const output = fs.openSync(logPath, 'a'); From 4171cb92d56e213afc5ae2bf79de3d66197725d0 Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Mon, 19 Dec 2022 16:49:36 -0800 Subject: [PATCH 02/23] refactor(daemon): Debugging comments --- packages/daemon/src/connection.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/daemon/src/connection.js b/packages/daemon/src/connection.js index 880af8efe8..1e2f4ac66c 100644 --- a/packages/daemon/src/connection.js +++ b/packages/daemon/src/connection.js @@ -44,6 +44,7 @@ const makeCapTPWithStreams = (name, writer, reader, cancelled, bootstrap) => { /** @param {any} message */ const messageToBytes = message => { const text = JSON.stringify(message); + // console.log('->', text); const bytes = textEncoder.encode(text); return bytes; }; @@ -51,6 +52,7 @@ const messageToBytes = message => { /** @param {Uint8Array} bytes */ const bytesToMessage = bytes => { const text = textDecoder.decode(bytes); + // console.log('<-', text); const message = JSON.parse(text); return message; }; From 0eca5b15d879b5db61048b6a75a31d640c77c2a6 Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Thu, 8 Dec 2022 16:38:54 -0800 Subject: [PATCH 03/23] refactor(daemon): Endo daemon bootstrap is the private facet --- packages/cli/src/endo.js | 3 +-- packages/daemon/README.md | 6 ++---- packages/daemon/index.js | 2 +- packages/daemon/src/daemon.js | 19 ++++--------------- packages/daemon/test/test-endo.js | 2 +- 5 files changed, 9 insertions(+), 23 deletions(-) diff --git a/packages/cli/src/endo.js b/packages/cli/src/endo.js index 09ffbf316f..dd5347527e 100644 --- a/packages/cli/src/endo.js +++ b/packages/cli/src/endo.js @@ -130,8 +130,7 @@ export const main = async rawArgs => { cancelled, ); const bootstrap = getBootstrap(); - const publicFacet = E.get(bootstrap).publicFacet; - await E(publicFacet).ping(); + await E(bootstrap).ping(); process.stderr.write('ok\n'); }); diff --git a/packages/daemon/README.md b/packages/daemon/README.md index 14086a50c4..c382312183 100644 --- a/packages/daemon/README.md +++ b/packages/daemon/README.md @@ -10,7 +10,5 @@ with the user, and manages per-user storage and compute access. Over that channel, the daemon communicates in CapTP over netstring message envelopes. -The bootstrap object has public and private facets. -All access over the public facet are mediate on the private facet. -So, for example, a request for a handle would be posed on the public facet, and -the user would be obliged to answer on the private facet. +The bootstrap provides the user agent API from which one can derive facets for +other agents. diff --git a/packages/daemon/index.js b/packages/daemon/index.js index 803ef6b0c6..5714e7a7e1 100644 --- a/packages/daemon/index.js +++ b/packages/daemon/index.js @@ -41,7 +41,7 @@ export const terminate = async (locator = defaultLocator) => { cancelled, ); const bootstrap = getBootstrap(); - await E(E.get(bootstrap).privateFacet).terminate(); + await E(bootstrap).terminate(); // @ts-expect-error zero-argument promise resolve cancel(); await closed; diff --git a/packages/daemon/src/daemon.js b/packages/daemon/src/daemon.js index 3e1f2fae76..3492364208 100644 --- a/packages/daemon/src/daemon.js +++ b/packages/daemon/src/daemon.js @@ -97,14 +97,11 @@ const makeWorker = async locator => { /** * @param {import('../types.js').Locator} locator */ -const makeEndoFacets = locator => { - const publicFacet = Far('EndoPublicFacet', { +const makeEndoBootstrap = locator => + Far('EndoPublicFacet', { async ping() { return 'pong'; }, - }); - - const privateFacet = Far('EndoPrivateFacet', { async terminate() { console.error('Endo daemon received terminate request'); cancel(Error('Terminate')); @@ -114,14 +111,6 @@ const makeEndoFacets = locator => { }, }); - const endoFacet = harden({ - publicFacet, - privateFacet, - }); - - return endoFacet; -}; - export const main = async () => { console.error(`Endo daemon starting on PID ${process.pid}`); process.once('exit', () => { @@ -142,7 +131,7 @@ export const main = async () => { const locator = { sockPath, statePath, cachePath }; - const endoFacets = makeEndoFacets(locator); + const endoBootstrap = makeEndoBootstrap(locator); const statePathP = fs.promises.mkdir(statePath, { recursive: true }); const cachePathP = fs.promises.mkdir(cachePath, { recursive: true }); @@ -181,7 +170,7 @@ export const main = async () => { conn, conn, cancelled, - endoFacets, + endoBootstrap, ); closed.catch(sinkError); }); diff --git a/packages/daemon/test/test-endo.js b/packages/daemon/test/test-endo.js index 4fc5da9c7c..e0abfd1475 100644 --- a/packages/daemon/test/test-endo.js +++ b/packages/daemon/test/test-endo.js @@ -48,7 +48,7 @@ test.serial('lifecycle', async t => { cancelled, ); const bootstrap = getBootstrap(); - const worker = await E(E.get(bootstrap).privateFacet).makeWorker(); + const worker = await E(bootstrap).makeWorker(); await E(E.get(worker).actions).terminate(); cancel(Error('Cancelled')); await closed; From 3c5d25b39d4b6f4804657ffc211b78d1641048b7 Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Mon, 19 Dec 2022 16:42:07 -0800 Subject: [PATCH 04/23] refactor(daemon): Tautological restart condition --- packages/daemon/index.js | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/packages/daemon/index.js b/packages/daemon/index.js index 5714e7a7e1..56db6ca93e 100644 --- a/packages/daemon/index.js +++ b/packages/daemon/index.js @@ -108,12 +108,8 @@ export const clean = async (locator = defaultLocator) => { }; export const restart = async (locator = defaultLocator) => { - // TODO: Refactor this guaranteed-true condition - // @ts-expect-error - if (restart) { - await terminate(locator).catch(() => {}); - await clean(locator); - } + await terminate(locator).catch(() => {}); + await clean(locator); return start(locator); }; From 3947ec90d8d766384e56aeed3415ce897c212283 Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Mon, 2 Jan 2023 17:24:49 -0800 Subject: [PATCH 05/23] feat(daemon): Thread ephemeral state path --- packages/daemon/index.js | 33 ++++++++++++++++++++++++++----- packages/daemon/src/daemon.js | 9 +++++---- packages/daemon/src/worker.js | 6 +++--- packages/daemon/test/test-endo.js | 1 + packages/daemon/types.d.ts | 1 + 5 files changed, 38 insertions(+), 12 deletions(-) diff --git a/packages/daemon/index.js b/packages/daemon/index.js index 56db6ca93e..64e06d6a1a 100644 --- a/packages/daemon/index.js +++ b/packages/daemon/index.js @@ -9,7 +9,12 @@ import os from 'os'; import { E } from '@endo/eventual-send'; import { makePromiseKit } from '@endo/promise-kit'; -import { whereEndoState, whereEndoSock, whereEndoCache } from '@endo/where'; +import { + whereEndoState, + whereEndoEphemeralState, + whereEndoSock, + whereEndoCache, +} from '@endo/where'; import { makeEndoClient } from './src/client.js'; // Reexports: @@ -25,6 +30,11 @@ const info = { const defaultLocator = { statePath: whereEndoState(process.platform, process.env, info), + ephemeralStatePath: whereEndoEphemeralState( + process.platform, + process.env, + info, + ), sockPath: whereEndoSock(process.platform, process.env, info), cachePath: whereEndoCache(process.platform, process.env, info), }; @@ -63,7 +73,12 @@ export const start = async (locator = defaultLocator) => { const child = popen.fork( endoDaemonPath, - [locator.sockPath, locator.statePath, locator.cachePath], + [ + locator.sockPath, + locator.statePath, + locator.ephemeralStatePath, + locator.cachePath, + ], { detached: true, stdio: ['ignore', output, output, 'ipc'], @@ -119,11 +134,19 @@ export const stop = async (locator = defaultLocator) => { export const reset = async (locator = defaultLocator) => { const cleanedUp = clean(locator); - const restated = fs.promises + const removedState = fs.promises .rm(locator.statePath, { recursive: true }) .catch(enoentOk); - const cachedOut = fs.promises + const removedEphemeralState = fs.promises + .rm(locator.ephemeralStatePath, { recursive: true }) + .catch(enoentOk); + const removedCache = fs.promises .rm(locator.cachePath, { recursive: true }) .catch(enoentOk); - await Promise.all([cleanedUp, restated, cachedOut]); + await Promise.all([ + cleanedUp, + removedState, + removedEphemeralState, + removedCache, + ]); }; diff --git a/packages/daemon/src/daemon.js b/packages/daemon/src/daemon.js index 3492364208..f4dc4f8433 100644 --- a/packages/daemon/src/daemon.js +++ b/packages/daemon/src/daemon.js @@ -118,8 +118,8 @@ export const main = async () => { }); if (process.argv.length < 5) { - throw Error( - `daemon.js requires arguments [sockPath] [statePath] [cachePath], got ${process.argv.join( + throw new Error( + `daemon.js requires arguments [sockPath] [statePath] [ephemeralStatePath] [cachePath], got ${process.argv.join( ', ', )}`, ); @@ -127,9 +127,10 @@ export const main = async () => { const sockPath = process.argv[2]; const statePath = process.argv[3]; - const cachePath = process.argv[4]; + const ephemeralStatePath = process.argv[4]; + const cachePath = process.argv[5]; - const locator = { sockPath, statePath, cachePath }; + const locator = { sockPath, statePath, ephemeralStatePath, cachePath }; const endoBootstrap = makeEndoBootstrap(locator); diff --git a/packages/daemon/src/worker.js b/packages/daemon/src/worker.js index c1c5c70764..46a44a04fd 100644 --- a/packages/daemon/src/worker.js +++ b/packages/daemon/src/worker.js @@ -35,9 +35,9 @@ export const main = async () => { console.error('Endo worker exiting'); }); - if (process.argv.length < 2) { - throw Error( - `worker.js requires arguments [uuid] [workerCachePath], got ${process.argv.join( + if (process.argv.length < 4) { + throw new Error( + `worker.js requires arguments uuid, workerStatePath, workerEphemeralStatePath, workerCachePath, got ${process.argv.join( ', ', )}`, ); diff --git a/packages/daemon/test/test-endo.js b/packages/daemon/test/test-endo.js index e0abfd1475..9dfaebf381 100644 --- a/packages/daemon/test/test-endo.js +++ b/packages/daemon/test/test-endo.js @@ -26,6 +26,7 @@ const dirname = url.fileURLToPath(new URL('..', import.meta.url)).toString(); const locator = { statePath: path.join(dirname, 'tmp/state'), + ephemeralStatePath: path.join(dirname, 'tmp/run'), cachePath: path.join(dirname, 'tmp/cache'), sockPath: process.platform === 'win32' diff --git a/packages/daemon/types.d.ts b/packages/daemon/types.d.ts index 83dedd592a..fc672f9250 100644 --- a/packages/daemon/types.d.ts +++ b/packages/daemon/types.d.ts @@ -1,5 +1,6 @@ type Locator = { statePath: string; + ephemeralStatePath: string; cachePath: string; sockPath: string; }; From 74da700877fbfca5a2396a26596ec04ca069123c Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Mon, 19 Dec 2022 16:42:47 -0800 Subject: [PATCH 06/23] feat(daemon): Clean up socket on halt --- packages/daemon/index.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/daemon/index.js b/packages/daemon/index.js index 64e06d6a1a..86158126bd 100644 --- a/packages/daemon/index.js +++ b/packages/daemon/index.js @@ -129,7 +129,8 @@ export const restart = async (locator = defaultLocator) => { }; export const stop = async (locator = defaultLocator) => { - return terminate(locator).catch(() => {}); + await terminate(locator).catch(() => {}); + await clean(locator); }; export const reset = async (locator = defaultLocator) => { From d0d01c7f627f9d74b6b2243f839d7089202d07c6 Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Mon, 19 Dec 2022 16:44:32 -0800 Subject: [PATCH 07/23] feat(daemon): Reset restarts if currently running --- packages/daemon/index.js | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/packages/daemon/index.js b/packages/daemon/index.js index 86158126bd..439ef93084 100644 --- a/packages/daemon/index.js +++ b/packages/daemon/index.js @@ -51,10 +51,12 @@ export const terminate = async (locator = defaultLocator) => { cancelled, ); const bootstrap = getBootstrap(); - await E(bootstrap).terminate(); + await E(bootstrap) + .terminate() + .catch(() => {}); // @ts-expect-error zero-argument promise resolve cancel(); - await closed; + await closed.catch(() => {}); }; export const start = async (locator = defaultLocator) => { @@ -134,6 +136,13 @@ export const stop = async (locator = defaultLocator) => { }; export const reset = async (locator = defaultLocator) => { + // Attempt to restore to a running state if currently running, based on + // whether we manage to terminate it. + const needsRestart = await terminate(locator).then( + () => true, + () => false, + ); + const cleanedUp = clean(locator); const removedState = fs.promises .rm(locator.statePath, { recursive: true }) @@ -150,4 +159,8 @@ export const reset = async (locator = defaultLocator) => { removedEphemeralState, removedCache, ]); + + if (needsRestart) { + await start(locator); + } }; From 158e4b669d3b5825ee33ef6b30a6b97f8f56add2 Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Mon, 19 Dec 2022 16:50:40 -0800 Subject: [PATCH 08/23] chore(daemon): Ignore test output artifacts --- packages/daemon/.gitignore | 1 + 1 file changed, 1 insertion(+) create mode 100644 packages/daemon/.gitignore diff --git a/packages/daemon/.gitignore b/packages/daemon/.gitignore new file mode 100644 index 0000000000..ceeb05b410 --- /dev/null +++ b/packages/daemon/.gitignore @@ -0,0 +1 @@ +/tmp From d9db807f6303dba2f9052dd830ee24693df87cdf Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Mon, 2 Jan 2023 17:30:59 -0800 Subject: [PATCH 09/23] feat(cli): Thread ephemeral state path --- packages/cli/src/endo.js | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/packages/cli/src/endo.js b/packages/cli/src/endo.js index dd5347527e..ea130c38be 100644 --- a/packages/cli/src/endo.js +++ b/packages/cli/src/endo.js @@ -22,7 +22,12 @@ import { reset, makeEndoClient, } from '@endo/daemon'; -import { whereEndoState, whereEndoSock, whereEndoCache } from '@endo/where'; +import { + whereEndoState, + whereEndoEphemeralState, + whereEndoSock, + whereEndoCache, +} from '@endo/where'; import { mapLocation, hashLocation, @@ -52,6 +57,11 @@ const info = { }; const statePath = whereEndoState(process.platform, process.env, info); +const ephemeralStatePath = whereEndoEphemeralState( + process.platform, + process.env, + info, +); const sockPath = whereEndoSock(process.platform, process.env, info); const cachePath = whereEndoCache(process.platform, process.env, info); const logPath = path.join(statePath, 'endo.log'); @@ -77,6 +87,10 @@ export const main = async rawArgs => { process.stdout.write(`${statePath}\n`); }); + where.command('run').action(async _cmd => { + process.stdout.write(`${ephemeralStatePath}\n`); + }); + where.command('sock').action(async _cmd => { process.stdout.write(`${sockPath}\n`); }); From 2a9445a2c87639ba62e63c785194eb136a0f3171 Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Mon, 19 Dec 2022 16:53:08 -0800 Subject: [PATCH 10/23] feat(cli): Log follow option --- packages/cli/src/endo.js | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/packages/cli/src/endo.js b/packages/cli/src/endo.js index ea130c38be..8b0258c26d 100644 --- a/packages/cli/src/endo.js +++ b/packages/cli/src/endo.js @@ -123,19 +123,21 @@ export const main = async rawArgs => { await reset(); }); - const log = program.command('log').action(async cmd => { - await new Promise((resolve, reject) => { - const args = cmd.opts().follow ? ['-f'] : []; - const child = spawn('tail', [...args, logPath], { - stdio: ['inherit', 'inherit', 'inherit'], + program + .command('log') + .option('-f, --follow', 'follow the tail of the log') + .action(async cmd => { + // TODO rerun follower command after reset + await new Promise((resolve, reject) => { + const args = cmd.opts().follow ? ['-f'] : []; + const child = spawn('tail', [...args, logPath], { + stdio: ['inherit', 'inherit', 'inherit'], + }); + child.on('error', reject); + child.on('exit', resolve); + cancelled.catch(() => child.kill()); }); - child.on('error', reject); - child.on('exit', resolve); - cancelled.catch(() => child.kill()); }); - }); - - log.option('-f, --follow', 'follow the tail of the log'); program.command('ping').action(async _cmd => { const { getBootstrap } = await makeEndoClient( From 3c7e7024114ed45fb716c4ad9b5fd9005775c8ff Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Mon, 19 Dec 2022 16:41:13 -0800 Subject: [PATCH 11/23] feat(daemon): RefReader and ReaderRef --- packages/daemon/index.js | 2 ++ packages/daemon/package.json | 1 + packages/daemon/src/reader-ref.js | 49 +++++++++++++++++++++++++++++++ packages/daemon/src/ref-reader.js | 31 +++++++++++++++++++ 4 files changed, 83 insertions(+) create mode 100644 packages/daemon/src/reader-ref.js create mode 100644 packages/daemon/src/ref-reader.js diff --git a/packages/daemon/index.js b/packages/daemon/index.js index 439ef93084..b1108fe0e9 100644 --- a/packages/daemon/index.js +++ b/packages/daemon/index.js @@ -19,6 +19,8 @@ import { makeEndoClient } from './src/client.js'; // Reexports: export { makeEndoClient } from './src/client.js'; +export { makeRefReader, makeRefIterator } from './src/ref-reader.js'; +export { makeReaderRef, makeIteratorRef } from './src/reader-ref.js'; const { username, homedir } = os.userInfo(); const temp = os.tmpdir(); diff --git a/packages/daemon/package.json b/packages/daemon/package.json index 858732dfa6..354be40bcb 100644 --- a/packages/daemon/package.json +++ b/packages/daemon/package.json @@ -36,6 +36,7 @@ "test": "ava" }, "dependencies": { + "@endo/base64": "^0.2.31", "@endo/captp": "^3.1.1", "@endo/eventual-send": "^0.17.2", "@endo/far": "^0.2.18", diff --git a/packages/daemon/src/reader-ref.js b/packages/daemon/src/reader-ref.js new file mode 100644 index 0000000000..1661e26c94 --- /dev/null +++ b/packages/daemon/src/reader-ref.js @@ -0,0 +1,49 @@ +// @ts-check + +import { encodeBase64 } from '@endo/base64'; +import { mapReader } from '@endo/stream'; +import { Far } from '@endo/far'; + +export const asyncIterate = iterable => { + let iterator; + if (iterable[Symbol.asyncIterator]) { + iterator = iterable[Symbol.asyncIterator](); + } else if (iterable[Symbol.iterator]) { + iterator = iterable[Symbol.iterator](); + } else if ('next' in iterable) { + iterator = iterable; + } + return iterator; +}; + +export const makeIteratorRef = iterator => { + return Far('AsyncIterator', { + async next() { + return iterator.next(); + }, + /** + * @param {any} value + */ + async return(value) { + if (iterator.return !== undefined) { + return iterator.return(value); + } + return harden({ done: true, value: undefined }); + }, + /** + * @param {any} error + */ + async throw(error) { + if (iterator.throw !== undefined) { + return iterator.throw(error); + } + return harden({ done: true, value: undefined }); + }, + [Symbol.asyncIterator]() { + return this; + }, + }); +}; + +export const makeReaderRef = readable => + makeIteratorRef(mapReader(asyncIterate(readable), encodeBase64)); diff --git a/packages/daemon/src/ref-reader.js b/packages/daemon/src/ref-reader.js new file mode 100644 index 0000000000..d55b255f46 --- /dev/null +++ b/packages/daemon/src/ref-reader.js @@ -0,0 +1,31 @@ +// @ts-check + +import { decodeBase64 } from '@endo/base64'; +import { mapReader } from '@endo/stream'; +import { E } from '@endo/far'; + +/** + * @template TValue + * @template TReturn + * @template TNext + * @param {import('@endo/far').ERef>} iteratorRef + */ +export const makeRefIterator = iteratorRef => { + const iterator = { + /** @param {[] | [TNext]} args */ + next: async (...args) => E(iteratorRef).next(...args), + /** @param {[] | [TReturn]} args */ + return: async (...args) => E(iteratorRef).return(...args), + /** @param {any} error */ + throw: async error => E(iteratorRef).throw(error), + [Symbol.asyncIterator]: () => iterator, + }; + return iterator; +}; + +/** + * @param {import('@endo/far').ERef>} readerRef + * @returns {AsyncIterableIterator} + */ +export const makeRefReader = readerRef => + mapReader(makeRefIterator(readerRef), decodeBase64); From f7b5995d96c853f5d0fcdc057e843f203bc7d13c Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Mon, 19 Dec 2022 16:40:28 -0800 Subject: [PATCH 12/23] feat(daemon): Pet names, spawn, eval, store, durable ephemera, worker termination refactor --- packages/daemon/index.js | 41 +- packages/daemon/src/client.js | 24 +- packages/daemon/src/connection.js | 29 +- packages/daemon/src/daemon-node-powers.js | 221 +++++++ packages/daemon/src/daemon-node.js | 55 ++ packages/daemon/src/daemon.js | 686 +++++++++++++++++----- packages/daemon/src/types.d.ts | 90 +++ packages/daemon/src/worker-node-powers.js | 32 + packages/daemon/src/worker-node.js | 45 ++ packages/daemon/src/worker.js | 96 +-- packages/daemon/test/test-endo.js | 107 +++- packages/daemon/types.d.ts | 21 +- 12 files changed, 1208 insertions(+), 239 deletions(-) create mode 100644 packages/daemon/src/daemon-node-powers.js create mode 100644 packages/daemon/src/daemon-node.js create mode 100644 packages/daemon/src/types.d.ts create mode 100644 packages/daemon/src/worker-node-powers.js create mode 100644 packages/daemon/src/worker-node.js diff --git a/packages/daemon/index.js b/packages/daemon/index.js index b1108fe0e9..459f01a0c8 100644 --- a/packages/daemon/index.js +++ b/packages/daemon/index.js @@ -22,6 +22,17 @@ export { makeEndoClient } from './src/client.js'; export { makeRefReader, makeRefIterator } from './src/ref-reader.js'; export { makeReaderRef, makeIteratorRef } from './src/reader-ref.js'; +const removePath = async removalPath => { + return fs.promises + .rm(removalPath, { recursive: true, force: true }) + .catch(cause => { + /** @type {object} */ + const error = new Error(cause.message, { cause }); + error.code = cause.code; + throw error; + }); +}; + const { username, homedir } = os.userInfo(); const temp = os.tmpdir(); const info = { @@ -42,7 +53,7 @@ const defaultLocator = { }; const endoDaemonPath = url.fileURLToPath( - new URL('src/daemon.js', import.meta.url), + new URL('src/daemon-node.js', import.meta.url), ); export const terminate = async (locator = defaultLocator) => { @@ -62,17 +73,10 @@ export const terminate = async (locator = defaultLocator) => { }; export const start = async (locator = defaultLocator) => { - const cachePathCreated = fs.promises.mkdir(locator.cachePath, { - recursive: true, - }); - const statePathCreated = fs.promises.mkdir(locator.statePath, { + await fs.promises.mkdir(locator.statePath, { recursive: true, }); - - await cachePathCreated; const logPath = path.join(locator.statePath, 'endo.log'); - - await statePathCreated; const output = fs.openSync(logPath, 'a'); const child = popen.fork( @@ -107,8 +111,7 @@ export const start = async (locator = defaultLocator) => { child.on('message', _message => { child.disconnect(); child.unref(); - // @ts-expect-error zero-argument promise resolve - resolve(); + resolve(undefined); }); }); }; @@ -122,7 +125,7 @@ const enoentOk = error => { export const clean = async (locator = defaultLocator) => { if (process.platform !== 'win32') { - await fs.promises.rm(locator.sockPath).catch(enoentOk); + await removePath(locator.sockPath).catch(enoentOk); } }; @@ -146,15 +149,11 @@ export const reset = async (locator = defaultLocator) => { ); const cleanedUp = clean(locator); - const removedState = fs.promises - .rm(locator.statePath, { recursive: true }) - .catch(enoentOk); - const removedEphemeralState = fs.promises - .rm(locator.ephemeralStatePath, { recursive: true }) - .catch(enoentOk); - const removedCache = fs.promises - .rm(locator.cachePath, { recursive: true }) - .catch(enoentOk); + const removedState = removePath(locator.statePath).catch(enoentOk); + const removedEphemeralState = removePath(locator.ephemeralStatePath).catch( + enoentOk, + ); + const removedCache = removePath(locator.cachePath).catch(enoentOk); await Promise.all([ cleanedUp, removedState, diff --git a/packages/daemon/src/client.js b/packages/daemon/src/client.js index f02c2dc4e4..5245598ca8 100644 --- a/packages/daemon/src/client.js +++ b/packages/daemon/src/client.js @@ -1,7 +1,8 @@ // @ts-check import net from 'net'; -import { makeNodeNetstringCapTP } from './connection.js'; +import { makeNodeReader, makeNodeWriter } from '@endo/stream-node'; +import { makeNetstringCapTP } from './connection.js'; /** * @template TBootstrap @@ -14,7 +15,24 @@ export const makeEndoClient = async (name, sockPath, cancelled, bootstrap) => { const conn = net.connect(sockPath); await new Promise((resolve, reject) => { conn.on('connect', resolve); - conn.on('error', reject); + conn.on('error', (/** @type {any} */ error) => { + if (error.code === 'ENOENT') { + reject( + new Error( + `Cannot connect to Endo. Is Endo running? ${error.message}`, + ), + ); + } else { + reject(error); + } + }); }); - return makeNodeNetstringCapTP(name, conn, conn, cancelled, bootstrap); + + return makeNetstringCapTP( + name, + makeNodeWriter(conn), + makeNodeReader(conn), + cancelled, + bootstrap, + ); }; diff --git a/packages/daemon/src/connection.js b/packages/daemon/src/connection.js index 1e2f4ac66c..6bb43113da 100644 --- a/packages/daemon/src/connection.js +++ b/packages/daemon/src/connection.js @@ -3,7 +3,6 @@ import { makeCapTP } from '@endo/captp'; import { mapWriter, mapReader } from '@endo/stream'; import { makeNetstringReader, makeNetstringWriter } from '@endo/netstring'; -import { makeNodeReader, makeNodeWriter } from '@endo/stream-node'; const textEncoder = new TextEncoder(); const textDecoder = new TextDecoder(); @@ -16,7 +15,7 @@ const textDecoder = new TextDecoder(); * @param {Promise} cancelled * @param {TBootstrap} bootstrap */ -const makeCapTPWithStreams = (name, writer, reader, cancelled, bootstrap) => { +const makeMessageCapTP = (name, writer, reader, cancelled, bootstrap) => { /** @param {any} message */ const send = message => { return writer.next(message); @@ -60,25 +59,31 @@ const bytesToMessage = bytes => { /** * @template TBootstrap * @param {string} name - * @param {import('stream').Writable} nodeWriter - * @param {import('stream').Readable} nodeReader + * @param {import('@endo/stream').Writer} bytesWriter + * @param {import('@endo/stream').Reader} bytesReader * @param {Promise} cancelled * @param {TBootstrap} bootstrap */ -export const makeNodeNetstringCapTP = ( +export const makeNetstringCapTP = ( name, - nodeWriter, - nodeReader, + bytesWriter, + bytesReader, cancelled, bootstrap, ) => { - const writer = mapWriter( - makeNetstringWriter(makeNodeWriter(nodeWriter), { chunked: true }), + const messageWriter = mapWriter( + makeNetstringWriter(bytesWriter, { chunked: true }), messageToBytes, ); - const reader = mapReader( - makeNetstringReader(makeNodeReader(nodeReader)), + const messageReader = mapReader( + makeNetstringReader(bytesReader), bytesToMessage, ); - return makeCapTPWithStreams(name, writer, reader, cancelled, bootstrap); + return makeMessageCapTP( + name, + messageWriter, + messageReader, + cancelled, + bootstrap, + ); }; diff --git a/packages/daemon/src/daemon-node-powers.js b/packages/daemon/src/daemon-node-powers.js new file mode 100644 index 0000000000..030ea6ba77 --- /dev/null +++ b/packages/daemon/src/daemon-node-powers.js @@ -0,0 +1,221 @@ +// @ts-check +/* global process, setTimeout, clearTimeout */ +/* eslint-disable no-void */ + +import { makePromiseKit } from '@endo/promise-kit'; +import { makePipe } from '@endo/stream'; +import { makeNodeReader, makeNodeWriter } from '@endo/stream-node'; + +/** + * @param {object} modules + * @param {typeof import('crypto')} modules.crypto + * @param {typeof import('net')} modules.net + * @param {typeof import('fs')} modules.fs + * @param {typeof import('path')} modules.path + * @param {typeof import('child_process')} modules.popen + * @param {typeof import('url')} modules.url + * @returns {import('./types.js').DaemonicPowers} + */ +export const makePowers = ({ crypto, net, fs, path: fspath, popen, url }) => { + /** @param {Error} error */ + const sinkError = error => { + console.error(error); + }; + + /** @param {Error} error */ + const exitOnError = error => { + console.error(error); + process.exit(-1); + }; + + const makeSha512 = () => { + const digester = crypto.createHash('sha512'); + return harden({ + update: chunk => digester.update(chunk), + digestHex: () => digester.digest('hex'), + }); + }; + + const randomUuid = () => crypto.randomUUID(); + + const listenOnPath = async (sockPath, cancelled) => { + const [ + /** @type {Reader} */ readFrom, + /** @type {Writer { + err(error); + void writeTo.throw(error); + }); + server.on('close', () => { + void writeTo.return(undefined); + }); + + cancelled.catch(error => { + server.close(); + void writeTo.throw(error); + }); + + const listening = new Promise(resolve => + server.listen({ path: sockPath }, () => resolve(undefined)), + ); + + await Promise.race([erred, cancelled, listening]); + + server.on('connection', conn => { + const reader = makeNodeReader(conn); + const writer = makeNodeWriter(conn); + const closed = new Promise(resolve => conn.on('close', resolve)); + // TODO Respect back-pressure signal and avoid accepting new connections. + void writeTo.next({ reader, writer, closed }); + }); + + return readFrom; + }; + + /** + * @param {string} path + */ + const informParentWhenListeningOnPath = path => { + if (process.send) { + process.send({ type: 'listening', path }); + } + }; + + /** + * @param {string} path + */ + const makeFileReader = path => { + const nodeReadStream = fs.createReadStream(path); + return makeNodeReader(nodeReadStream); + }; + + /** + * @param {string} path + */ + const makeFileWriter = path => { + const nodeWriteStream = fs.createWriteStream(path); + return makeNodeWriter(nodeWriteStream); + }; + + /** + * @param {string} path + * @param {string} text + */ + const writeFileText = async (path, text) => { + await fs.promises.writeFile(path, text); + }; + + /** + * @param {string} path + */ + const readFileText = async path => { + return fs.promises.readFile(path, 'utf-8'); + }; + + /** + * @param {string} path + */ + const makePath = async path => { + await fs.promises.mkdir(path, { recursive: true }); + }; + + const renamePath = async (source, target) => + fs.promises.rename(source, target); + + const joinPath = (...components) => fspath.join(...components); + + const delay = async (ms, cancelled) => { + // Do not attempt to set up a timer if already cancelled. + await Promise.race([cancelled, undefined]); + return new Promise((resolve, reject) => { + const handle = setTimeout(resolve, ms); + cancelled.catch(error => { + reject(error); + clearTimeout(handle); + }); + }); + }; + + /** + * @param {string} uuid + * @param {string} path + * @param {string} logPath + * @param {string} pidPath + * @param {string} sockPath + * @param {string} statePath + * @param {string} ephemeralStatePath + * @param {string} cachePath + * @param {Promise} cancelled + */ + const makeWorker = async ( + uuid, + path, + logPath, + pidPath, + sockPath, + statePath, + ephemeralStatePath, + cachePath, + cancelled, + ) => { + const log = fs.openSync(logPath, 'a'); + const child = popen.fork( + path, + [uuid, sockPath, statePath, ephemeralStatePath, cachePath], + { + stdio: ['ignore', log, log, 'pipe', 'pipe', 'ipc'], + // @ts-ignore Stale Node.js type definition. + windowsHide: true, + }, + ); + const nodeWriter = /** @type {import('stream').Writable} */ ( + child.stdio[3] + ); + const nodeReader = /** @type {import('stream').Readable} */ ( + child.stdio[4] + ); + assert(nodeWriter); + assert(nodeReader); + const reader = makeNodeReader(nodeReader); + const writer = makeNodeWriter(nodeWriter); + + const closed = new Promise(resolve => { + child.on('exit', () => resolve(undefined)); + }); + + await writeFileText(pidPath, `${child.pid}\n`); + + cancelled.catch(async () => { + child.kill(); + }); + + return { reader, writer, closed, pid: child.pid }; + }; + + const endoWorkerPath = url.fileURLToPath( + new URL('worker-node.js', import.meta.url), + ); + + return harden({ + sinkError, + exitOnError, + makeSha512, + randomUuid, + listenOnPath, + informParentWhenListeningOnPath, + makeFileReader, + makeFileWriter, + writeFileText, + readFileText, + makePath, + joinPath, + renamePath, + delay, + makeWorker, + endoWorkerPath, + }); +}; diff --git a/packages/daemon/src/daemon-node.js b/packages/daemon/src/daemon-node.js new file mode 100644 index 0000000000..3dc79079a4 --- /dev/null +++ b/packages/daemon/src/daemon-node.js @@ -0,0 +1,55 @@ +/* global process */ + +// Establish a perimeter: +import 'ses'; +import '@endo/eventual-send/shim.js'; +import '@endo/promise-kit/shim.js'; +import '@endo/lockdown/commit.js'; + +import crypto from 'crypto'; +import net from 'net'; +import fs from 'fs'; +import path from 'path'; +import popen from 'child_process'; +import url from 'url'; + +import { makePromiseKit } from '@endo/promise-kit'; +import { main } from './daemon.js'; +import { makePowers } from './daemon-node-powers.js'; + +if (process.argv.length < 5) { + throw new Error( + `daemon.js requires arguments [sockPath] [statePath] [ephemeralStatePath] [cachePath], got ${process.argv.join( + ', ', + )}`, + ); +} + +const [sockPath, statePath, ephemeralStatePath, cachePath] = + process.argv.slice(2); + +/** @type {import('../index.js').Locator} */ +const locator = { + sockPath, + statePath, + ephemeralStatePath, + cachePath, +}; + +const powers = makePowers({ + crypto, + net, + fs, + path, + popen, + url, +}); + +const { promise: cancelled, reject: cancel } = + /** @type {import('@endo/promise-kit').PromiseKit} */ ( + makePromiseKit() + ); + +process.once('SIGINT', () => cancel(new Error('SIGINT'))); + +main(powers, locator, process.pid, cancel, cancelled).catch(powers.exitOnError); diff --git a/packages/daemon/src/daemon.js b/packages/daemon/src/daemon.js index f4dc4f8433..9beab8abc3 100644 --- a/packages/daemon/src/daemon.js +++ b/packages/daemon/src/daemon.js @@ -1,6 +1,5 @@ // @ts-check /// -/* global process, setTimeout */ // Establish a perimeter: import 'ses'; @@ -8,177 +7,592 @@ import '@endo/eventual-send/shim.js'; import '@endo/promise-kit/shim.js'; import '@endo/lockdown/commit.js'; -import crypto from 'crypto'; -import net from 'net'; -import fs from 'fs'; -import path from 'path'; -import popen from 'child_process'; -import url from 'url'; - import { E, Far } from '@endo/far'; import { makePromiseKit } from '@endo/promise-kit'; -import { makeNodeNetstringCapTP } from './connection.js'; +import { makeNetstringCapTP } from './connection.js'; +import { makeRefReader } from './ref-reader.js'; +import { makeReaderRef } from './reader-ref.js'; const { quote: q } = assert; -const { promise: cancelled, reject: cancel } = makePromiseKit(); - -// TODO thread through command arguments. -const gracePeriodMs = 100; - -const grace = cancelled.catch(async () => { - await new Promise(resolve => setTimeout(resolve, gracePeriodMs)); -}); - -const endoWorkerPath = url.fileURLToPath(new URL('worker.js', import.meta.url)); - -/** @param {Error} error */ -const sinkError = error => { - console.error(error); -}; +const validNamePattern = /^[a-zA-Z][a-zA-Z0-9]{0,127}$/; /** - * @param {import('../types.js').Locator} locator + * @param {import('./types.js').DaemonicPowers} powers + * @param {import('./types.js').Locator} locator + * @param {object} args + * @param {Promise} args.cancelled + * @param {(error: Error) => void} args.cancel + * @param {number} args.gracePeriodMs + * @param {Promise} args.gracePeriodElapsed */ -const makeWorker = async locator => { - // @ts-ignore Node.js crypto does in fact have randomUUID. - const uuid = await crypto.randomUUID(); - const workerCachePath = path.join(locator.cachePath, uuid); - await fs.promises.mkdir(workerCachePath, { recursive: true }); - const logPath = path.join(workerCachePath, 'worker.log'); - const output = fs.openSync(logPath, 'w'); - const child = popen.fork(endoWorkerPath, [uuid, workerCachePath], { - stdio: ['ignore', output, output, 'pipe', 'ipc'], - }); - console.error(`Endo worker started PID ${child.pid} UUID ${uuid}`); - const stream = /** @type {import('stream').Duplex} */ (child.stdio[3]); - assert(stream); - const { getBootstrap, closed } = makeNodeNetstringCapTP( - `Worker ${uuid}`, - stream, - stream, - cancelled, - undefined, - ); +const makeEndoBootstrap = ( + powers, + locator, + { cancelled, cancel, gracePeriodMs, gracePeriodElapsed }, +) => { + /** @type {Map} */ + const pets = new Map(); + /** @type {Map} */ + const values = new Map(); + /** @type {WeakMap>} */ + const workerBootstraps = new WeakMap(); - const bootstrap = getBootstrap(); + const petNameDirectoryPath = powers.joinPath(locator.statePath, 'pet-name'); - const exited = new Promise(resolve => { - child.on('exit', () => { - console.error(`Endo worker stopped PID ${child.pid} UUID ${uuid}`); - resolve(undefined); + /** + * @param {string} sha512 + */ + const makeReadableSha512 = sha512 => { + const storageDirectoryPath = powers.joinPath( + locator.statePath, + 'store-sha512', + ); + const storagePath = powers.joinPath(storageDirectoryPath, sha512); + const stream = async () => { + const reader = powers.makeFileReader(storagePath); + return makeReaderRef(reader); + }; + const text = async () => { + return powers.readFileText(storagePath); + }; + return Far(`Readable file with SHA-512 ${sha512.slice(0, 8)}...`, { + sha512: () => sha512, + stream, + text, + [Symbol.asyncIterator]: stream, }); - }); + }; + + /** + * @param {string} sha512 + */ + const provideReadableSha512 = sha512 => { + // TODO Contemplate using a different map for storage. + // For the moment, there's no risk of a UUID colliding with a SHA512. + let readable = values.get(sha512); + if (readable === undefined) { + readable = makeReadableSha512(sha512); + values.set(sha512, readable); + } + return readable; + }; - const terminated = Promise.all([exited, closed]); + /** + * @param {import('@endo/eventual-send').ERef>} readerRef + * @param {string} [name] + */ + const store = async (readerRef, name) => { + if (name !== undefined) { + if (!validNamePattern.test(name)) { + throw new Error(`Invalid pet name ${q(name)}`); + } + } - const { reject: cancelWorker, promise: workerCancelled } = makePromiseKit(); + const storageDirectoryPath = powers.joinPath( + locator.statePath, + 'store-sha512', + ); + await powers.makePath(storageDirectoryPath); - cancelled.catch(async error => cancelWorker(error)); + // Pump the reader into a temporary file and hash. + // We use a temporary file to avoid leaving a partially writen object, + // but also because we won't know the name we will use until we've + // completed the hash. + const digester = powers.makeSha512(); + const storageUuid = powers.randomUuid(); + const temporaryStoragePath = powers.joinPath( + storageDirectoryPath, + storageUuid, + ); + const writer = powers.makeFileWriter(temporaryStoragePath); + for await (const chunk of makeRefReader(readerRef)) { + await writer.next(chunk); + digester.update(chunk); + } + await writer.return(undefined); + const sha512 = digester.digestHex(); - workerCancelled.then(async () => { - const responded = E(bootstrap).terminate(); - await Promise.race([grace, terminated, responded]); - child.kill(); - }); + // Retain the pet name first (to win a garbage collection race) + if (name !== undefined) { + await powers.makePath(petNameDirectoryPath); + const petNamePath = powers.joinPath(petNameDirectoryPath, `${name}.json`); + await powers.writeFileText( + petNamePath, + `${JSON.stringify({ + type: 'readableSha512', + readableSha512: sha512, + })}\n`, + ); + } - const terminate = () => { - cancelWorker(Error('Terminated')); + // Finish with an atomic rename. + const storagePath = powers.joinPath(storageDirectoryPath, sha512); + await powers.renamePath(temporaryStoragePath, storagePath); + return makeReadableSha512(sha512); }; - return harden({ - actions: Far('EndoWorkerActions', { - terminate, - }), - terminated, - }); -}; + /** + * @param {string} workerUuid + */ + const makeWorkerBootstrap = async workerUuid => { + return Far(`Endo for worker ${workerUuid}`, {}); + }; -/** - * @param {import('../types.js').Locator} locator - */ -const makeEndoBootstrap = locator => - Far('EndoPublicFacet', { - async ping() { - return 'pong'; - }, - async terminate() { - console.error('Endo daemon received terminate request'); - cancel(Error('Terminate')); - }, - async makeWorker() { - return makeWorker(locator); - }, - }); + /** + * @param {string} workerUuid + * @param {string} [workerName] + */ + const makeWorkerUuid = async (workerUuid, workerName) => { + const workerCachePath = powers.joinPath( + locator.cachePath, + 'worker-uuid', + workerUuid, + ); + const workerStatePath = powers.joinPath( + locator.statePath, + 'worker-uuid', + workerUuid, + ); + const workerEphemeralStatePath = powers.joinPath( + locator.ephemeralStatePath, + 'worker-uuid', + workerUuid, + ); -export const main = async () => { - console.error(`Endo daemon starting on PID ${process.pid}`); - process.once('exit', () => { - console.error(`Endo daemon stopping on PID ${process.pid}`); - }); + await Promise.all([ + powers.makePath(workerStatePath), + powers.makePath(workerEphemeralStatePath), + ]); + + if (workerName !== undefined) { + await powers.makePath(petNameDirectoryPath); + const petNamePath = powers.joinPath( + petNameDirectoryPath, + `${workerName}.json`, + ); + await powers.writeFileText( + petNamePath, + `${JSON.stringify({ + type: 'workerUuid', + workerUuid, + })}\n`, + ); + } + + const { reject: cancelWorker, promise: workerCancelled } = + /** @type {import('@endo/promise-kit').PromiseKit} */ ( + makePromiseKit() + ); + cancelled.catch(async error => cancelWorker(error)); - if (process.argv.length < 5) { - throw new Error( - `daemon.js requires arguments [sockPath] [statePath] [ephemeralStatePath] [cachePath], got ${process.argv.join( - ', ', - )}`, + const logPath = powers.joinPath(workerStatePath, 'worker.log'); + const workerPidPath = powers.joinPath( + workerEphemeralStatePath, + 'worker.pid', + ); + const { + reader, + writer, + closed: workerClosed, + pid: workerPid, + } = await powers.makeWorker( + workerUuid, + powers.endoWorkerPath, + logPath, + workerPidPath, + locator.sockPath, + workerStatePath, + workerEphemeralStatePath, + workerCachePath, + workerCancelled, + ); + + console.log(`Endo worker started PID ${workerPid} UUID ${workerUuid}`); + + const { getBootstrap, closed: capTpClosed } = makeNetstringCapTP( + `Worker ${workerUuid}`, + writer, + reader, + gracePeriodElapsed, + makeWorkerBootstrap(workerUuid), ); - } - const sockPath = process.argv[2]; - const statePath = process.argv[3]; - const ephemeralStatePath = process.argv[4]; - const cachePath = process.argv[5]; + const closed = Promise.race([workerClosed, capTpClosed]).finally(() => { + console.log(`Endo worker stopped PID ${workerPid} UUID ${workerUuid}`); + }); - const locator = { sockPath, statePath, ephemeralStatePath, cachePath }; + /** @type {import('@endo/eventual-send').ERef} */ + const workerBootstrap = getBootstrap(); - const endoBootstrap = makeEndoBootstrap(locator); + const terminate = async () => { + E.sendOnly(workerBootstrap).terminate(); + const cancelWorkerGracePeriod = () => { + throw new Error('Exited gracefully before grace period elapsed'); + }; + const workerGracePeriodCancelled = Promise.race([ + gracePeriodElapsed, + closed, + ]).then(cancelWorkerGracePeriod, cancelWorkerGracePeriod); + await powers + .delay(gracePeriodMs, workerGracePeriodCancelled) + .then(() => { + throw new Error( + `Worker termination grace period ${gracePeriodMs}ms elapsed`, + ); + }) + .catch(cancelWorker); + }; - const statePathP = fs.promises.mkdir(statePath, { recursive: true }); - const cachePathP = fs.promises.mkdir(cachePath, { recursive: true }); - await Promise.all([statePathP, cachePathP]); + const worker = Far('EndoWorker', { + terminate, - const pidPath = path.join(cachePath, 'endo.pid'); - await fs.promises.writeFile(pidPath, `${process.pid}\n`); + whenTerminated: () => closed, - const server = net.createServer(); + /** + * @param {string} source + * @param {Array} codeNames + * @param {Array} petNames + * @param {string} resultName + */ + evaluate: async (source, codeNames, petNames, resultName) => { + if (!validNamePattern.test(resultName)) { + throw new Error(`Invalid pet name ${q(resultName)}`); + } + if (petNames.length !== codeNames.length) { + throw new Error('Evaluator requires one pet name for each code name'); + // TODO and they must all be strings. Use pattern language. + } - server.listen( - { - path: sockPath, - }, - () => { - console.log( - `Endo daemon listening on ${q(sockPath)} ${new Date().toISOString()}`, + await powers.makePath(petNameDirectoryPath); + const refs = Object.fromEntries( + await Promise.all( + petNames.map(async (endowmentPetName, index) => { + const endowmentCodeName = codeNames[index]; + const petNamePath = powers.joinPath( + petNameDirectoryPath, + `${endowmentPetName}.json`, + ); + const petNameText = await powers.readFileText(petNamePath); + try { + // TODO validate + /** @type {[string, import('./types.js').Ref]} */ + return [endowmentCodeName, JSON.parse(petNameText)]; + } catch (error) { + throw new TypeError( + `Corrupt pet name description for ${endowmentPetName}: ${error.message}`, + ); + } + }), + ), + ); + + const ref = { + /** @type {'eval'} */ + type: 'eval', + workerUuid, + source, + refs, + }; + // Behold, recursion: + // eslint-disable-next-line no-use-before-define + return makeRef(ref, resultName); + }, + }); + + workerBootstraps.set(worker, workerBootstrap); + + return worker; + }; + + /** + * @param {string} workerUuid + * @param {string} [name] + */ + const provideWorkerUuid = async (workerUuid, name) => { + let worker = + /** @type {import('@endo/eventual-send').ERef>} */ ( + values.get(workerUuid) ); - // Inform parent that we have an open unix domain socket, if we were - // spawned with IPC. - if (process.send) { - process.send({ type: 'listening', path: sockPath }); + if (worker === undefined) { + worker = makeWorkerUuid(workerUuid, name); + values.set(workerUuid, worker); + } + return worker; + }; + + /** + * @param {string} workerUuid + * @param {string} source + * @param {Record} refs + */ + const provideEval = async (workerUuid, source, refs) => { + const workerFacet = await provideWorkerUuid(workerUuid); + const workerBootstrap = workerBootstraps.get(workerFacet); + assert(workerBootstrap); + const codeNames = Object.keys(refs); + const endowmentValues = await Promise.all( + // Behold, recursion: + // eslint-disable-next-line no-use-before-define + Object.values(refs).map(ref => provideRef(ref)), + ); + return E(workerBootstrap).evaluate(source, codeNames, endowmentValues); + }; + + /** + * @param {string} valueUuid + */ + const makeValueUuid = async valueUuid => { + const valuesDirectoryPath = powers.joinPath( + locator.statePath, + 'value-uuid', + ); + const valuePath = powers.joinPath(valuesDirectoryPath, `${valueUuid}.json`); + const refText = await powers.readFileText(valuePath); + const ref = (() => { + try { + return JSON.parse(refText); + } catch (error) { + throw new TypeError( + `Corrupt description for value to be derived according to file ${valuePath}: ${error.message}`, + ); + } + })(); + // Behold, recursion: + // eslint-disable-next-line no-use-before-define + return provideRef(ref); + }; + + /** + * @param {string} valueUuid + */ + const provideValueUuid = async valueUuid => { + let value = values.get(valueUuid); + if (value === undefined) { + value = makeValueUuid(valueUuid); + values.set(valueUuid, value); + } + return value; + }; + + /** + * @param {import('./types.js').Ref} ref + */ + const provideRef = async ref => { + if (ref.type === 'readableSha512') { + return provideReadableSha512(ref.readableSha512); + } else if (ref.type === 'workerUuid') { + return provideWorkerUuid(ref.workerUuid); + } else if (ref.type === 'valueUuid') { + return provideValueUuid(ref.valueUuid); + } else if (ref.type === 'eval') { + return provideEval(ref.workerUuid, ref.source, ref.refs); + } else { + throw new TypeError(`Invalid reference: ${JSON.stringify(ref)}`); + } + }; + + /** + * @param {import('./types.js').Ref} ref + * @param {string} [petName] + */ + const makeRef = async (ref, petName) => { + const value = await provideRef(ref); + if (petName !== undefined) { + const valueUuid = powers.randomUuid(); + + // Persist instructions for revival (this can be collected) + const valuesDirectoryPath = powers.joinPath( + locator.statePath, + 'value-uuid', + ); + await powers.makePath(valuesDirectoryPath); + const valuePath = powers.joinPath( + valuesDirectoryPath, + `${valueUuid}.json`, + ); + await powers.writeFileText(valuePath, `${JSON.stringify(ref)}\n`); + + // Make a reference by pet name (this can be overwritten) + await powers.makePath(petNameDirectoryPath); + const petNamePath = powers.joinPath( + petNameDirectoryPath, + `${petName}.json`, + ); + await powers.writeFileText( + petNamePath, + `${JSON.stringify({ + type: 'valueUuid', + valueUuid, + })}\n`, + ); + + values.set(valueUuid, value); + } + return value; + }; + + /** + * @param {string} refPath + */ + const provideRefPath = async refPath => { + const refText = await powers.readFileText(refPath).catch(() => { + // TODO handle EMFILE gracefully + throw new ReferenceError(`No reference exists at path ${refPath}`); + }); + const ref = (() => { + try { + return JSON.parse(refText); + } catch (error) { + throw new TypeError( + `Corrupt description for reference in file ${refPath}: ${error.message}`, + ); } + })(); + // TODO validate + return provideRef(ref); + }; + + /** + * @param {string} name + */ + const revive = async name => { + const petNamePath = powers.joinPath(petNameDirectoryPath, `${name}.json`); + return provideRefPath(petNamePath).catch(error => { + throw new Error(`Corrupt pet name ${name}: ${error.message}`); + }); + }; + + /** + * @param {string} name + */ + const provide = async name => { + if (!validNamePattern.test(name)) { + throw new Error(`Invalid pet name ${q(name)}`); + } + + let pet = pets.get(name); + if (pet === undefined) { + pet = revive(name); + pets.set(name, pet); + } + return pet; + }; + + return Far('Endo private facet', { + // TODO for user named + + ping: async () => 'pong', + + terminate: async () => { + cancel(new Error('Termination requested')); }, - ); - server.on('error', error => { - sinkError(error); - process.exit(-1); + + /** + * @param {string} [name] + */ + makeWorker: async name => { + // @ts-ignore Node.js crypto does in fact have randomUUID. + const workerUuid = powers.randomUuid(); + return provideWorkerUuid(workerUuid, name); + }, + + store, + provide, + }); +}; + +/* + * @param {import('./types.js').DaemonicPowers} powers + * @param {import('./types.js').Locator} locator + * @param {number | undefined} pid + * @param {(error: Error) => void} cancel + * @param {Promise} cancelled + */ +export const main = async (powers, locator, pid, cancel, cancelled) => { + console.log(`Endo daemon starting on PID ${pid}`); + cancelled.catch(() => { + console.log(`Endo daemon stopping on PID ${pid}`); }); - server.on('connection', conn => { - console.error( - `Endo daemon received connection ${new Date().toISOString()}`, + + const { promise: gracePeriodCancelled, reject: cancelGracePeriod } = + /** @type {import('@endo/promise-kit').PromiseKit} */ ( + makePromiseKit() ); - const { closed } = makeNodeNetstringCapTP( - 'Endo', - conn, - conn, - cancelled, - endoBootstrap, + + // TODO thread through command arguments. + const gracePeriodMs = 100; + + /** @type {Promise} */ + const gracePeriodElapsed = cancelled.catch(async error => { + await powers.delay(gracePeriodMs, gracePeriodCancelled); + console.log( + `Endo daemon grace period ${gracePeriodMs}ms elapsed on PID ${pid}`, ); - closed.catch(sinkError); + throw error; }); - cancelled.catch(() => { - server.close(); + const endoBootstrap = makeEndoBootstrap(powers, locator, { + cancelled, + cancel, + gracePeriodMs, + gracePeriodElapsed, }); -}; -main().catch(sinkError); + const statePathP = powers.makePath(locator.statePath); + const ephemeralStatePathP = powers.makePath(locator.ephemeralStatePath); + const cachePathP = powers.makePath(locator.cachePath); + await Promise.all([statePathP, cachePathP, ephemeralStatePathP]); + + const pidPath = powers.joinPath(locator.ephemeralStatePath, 'endo.pid'); + await powers.writeFileText(pidPath, `${pid}\n`); + + const connections = await powers.listenOnPath(locator.sockPath, cancelled); + // Resolve a promise in the Endo CLI through the IPC channel: + powers.informParentWhenListeningOnPath(locator.sockPath); + console.log( + `Endo daemon listening on ${q( + locator.sockPath, + )} ${new Date().toISOString()}`, + ); + let nextConnectionNumber = 0; + /** @type {Set>} */ + const connectionClosedPromises = new Set(); + try { + for await (const { + reader, + writer, + closed: connectionClosed, + } of connections) { + const connectionNumber = nextConnectionNumber; + nextConnectionNumber += 1; + console.log( + `Endo daemon received connection ${connectionNumber} at ${new Date().toISOString()}`, + ); + + const { closed: capTpClosed } = makeNetstringCapTP( + 'Endo', + writer, + reader, + cancelled, + endoBootstrap, + ); + + const closed = Promise.race([connectionClosed, capTpClosed]); + connectionClosedPromises.add(closed); + closed.finally(() => { + connectionClosedPromises.delete(closed); + console.log( + `Endo daemon closed connection ${connectionNumber} at ${new Date().toISOString()}`, + ); + }); + } + } catch (error) { + cancel(error); + cancelGracePeriod(error); + } finally { + await Promise.all(Array.from(connectionClosedPromises)); + cancel(new Error('Terminated normally')); + cancelGracePeriod(new Error('Terminated normally')); + } +}; diff --git a/packages/daemon/src/types.d.ts b/packages/daemon/src/types.d.ts new file mode 100644 index 0000000000..c907e28907 --- /dev/null +++ b/packages/daemon/src/types.d.ts @@ -0,0 +1,90 @@ +import type { Reader, Writer } from '@endo/stream'; + +export type Locator = { + statePath: string; + ephemeralStatePath: string; + cachePath: string; + sockPath: string; +}; + +export type Sha512 = { + update: (chunk: Uint8Array) => void; + digestHex: () => string; +}; + +export type Connection = { + reader: Reader; + writer: Writer; + closed: Promise; +}; + +export type Worker = Connection & { + pid: number | undefined; +}; + +export type DaemonicPowers = { + sinkError: (error) => void; + exitOnError: (error) => void; + makeSha512: () => Sha512; + randomUuid: () => string; + listenOnPath: ( + path: string, + cancelled: Promise, + ) => Promise>; + informParentWhenListeningOnPath: (path: string) => void; + makeFileReader: (path: string) => Reader; + makeFileWriter: (path: string) => Writer; + readFileText: (path: string) => Promise; + writeFileText: (path: string, text: string) => Promise; + makePath: (path: string) => Promise; + renamePath: (source: string, target: string) => Promise; + joinPath: (...components: Array) => string; + delay: (ms: number, cancelled: Promise) => Promise; + makeWorker: ( + uuid: string, + path: string, + logPath: string, + pidPath: string, + sockPath: string, + statePath: string, + ephemeralStatePath: string, + cachePath: string, + cancelled: Promise, + ) => Promise; + endoWorkerPath: string; +}; + +export type MignonicPowers = { + exitOnError: (error) => void; + connection: { + reader: Reader; + writer: Writer; + }; +}; + +type ReadableSha512Ref = { + type: 'readableSha512'; + readableSha512: string; +}; + +type WorkerUuidRef = { + type: 'workerUuid'; + workerUuid: string; +}; + +// Reference to a reference. +type ValueUuid = { + type: 'valueUuid'; + valueUuid: string; +}; + +type EvalRef = { + type: 'eval'; + workerUuid: string; + source: string; + // Behold: recursion + // eslint-disable-next-line no-use-before-define + refs: Record; +}; + +export type Ref = ReadableSha512Ref | WorkerUuidRef | ValueUuid | EvalRef; diff --git a/packages/daemon/src/worker-node-powers.js b/packages/daemon/src/worker-node-powers.js new file mode 100644 index 0000000000..0635a22a34 --- /dev/null +++ b/packages/daemon/src/worker-node-powers.js @@ -0,0 +1,32 @@ +// @ts-check +/* global process */ + +import { makeNodeReader, makeNodeWriter } from '@endo/stream-node'; + +/** + * @param {object} modules + * @param {typeof import('fs')} modules.fs + * @returns {import('./types.js').MignonicPowers} + */ +export const makePowers = ({ fs }) => { + /** @param {Error} error */ + const exitOnError = error => { + console.error(error); + process.exit(-1); + }; + + // @ts-ignore This is in fact how you open a file descriptor. + const reader = makeNodeReader(fs.createReadStream(null, { fd: 3 })); + // @ts-ignore This is in fact how you open a file descriptor. + const writer = makeNodeWriter(fs.createWriteStream(null, { fd: 4 })); + + const connection = { + reader, + writer, + }; + + return harden({ + exitOnError, + connection, + }); +}; diff --git a/packages/daemon/src/worker-node.js b/packages/daemon/src/worker-node.js new file mode 100644 index 0000000000..81f5de9757 --- /dev/null +++ b/packages/daemon/src/worker-node.js @@ -0,0 +1,45 @@ +/* global process */ + +// Establish a perimeter: +import 'ses'; +import '@endo/eventual-send/shim.js'; +import '@endo/promise-kit/shim.js'; +import '@endo/lockdown/commit.js'; + +import fs from 'fs'; + +import { makePromiseKit } from '@endo/promise-kit'; +import { main } from './worker.js'; +import { makePowers } from './worker-node-powers.js'; + +if (process.argv.length < 7) { + throw new Error( + `worker.js requires arguments workerUuid, daemonSockPath, workerStatePath, workerEphemeralStatePath, workerCachePath, got ${process.argv.join( + ', ', + )}`, + ); +} + +const [workerUuid, sockPath, statePath, ephemeralStatePath, cachePath] = + process.argv.slice(2); + +/** @type {import('../index.js').Locator} */ +const locator = { + sockPath, + statePath, + ephemeralStatePath, + cachePath, +}; + +const powers = makePowers({ fs }); + +const { promise: cancelled, reject: cancel } = + /** @type {import('@endo/promise-kit').PromiseKit} */ ( + makePromiseKit() + ); + +process.once('SIGINT', () => cancel(new Error('SIGINT'))); + +main(powers, locator, workerUuid, process.pid, cancel, cancelled).catch( + powers.exitOnError, +); diff --git a/packages/daemon/src/worker.js b/packages/daemon/src/worker.js index 46a44a04fd..79c87e15c9 100644 --- a/packages/daemon/src/worker.js +++ b/packages/daemon/src/worker.js @@ -1,59 +1,73 @@ // @ts-check /// -/* global process */ -// Establish a perimeter: -import 'ses'; -import '@endo/eventual-send/shim.js'; -import '@endo/lockdown/commit.js'; +import { E, Far } from '@endo/far'; +import { makeNetstringCapTP } from './connection.js'; -import fs from 'fs'; +const endowments = harden({ + assert, + E, + Far, + TextEncoder, + TextDecoder, + URL, +}); -import { Far } from '@endo/far'; -import { makePromiseKit } from '@endo/promise-kit'; -import { makeNodeNetstringCapTP } from './connection.js'; +/** + * @typedef {ReturnType} WorkerBootstrap + */ -/** @param {Error} error */ -const sinkError = error => { - console.error(error); -}; - -const { promise: cancelled, reject: cancel } = makePromiseKit(); - -const makeWorkerFacet = () => { +/** + * @param {() => any} _getDaemonBootstrap + * @param {(error: Error) => void} cancel + */ +const makeWorkerFacet = (_getDaemonBootstrap, cancel) => { return Far('EndoWorkerFacet', { - async terminate() { + terminate: async () => { console.error('Endo worker received terminate request'); cancel(Error('terminate')); }, + + /** + * @param {string} source + * @param {Array} names + * @param {Array} values + */ + evaluate: async (source, names, values) => { + const compartment = new Compartment( + harden({ + ...endowments, + ...Object.fromEntries( + names.map((name, index) => [name, values[index]]), + ), + }), + ); + return compartment.evaluate(source); + }, }); }; -export const main = async () => { - console.error('Endo worker started'); - process.once('exit', () => { - console.error('Endo worker exiting'); +/** + * @param {import('./types.js').MignonicPowers} powers + * @param {import('./types.js').Locator} locator + * @param {string} uuid + * @param {number | undefined} pid + * @param {(error: Error) => void} cancel + * @param {Promise} cancelled + */ +export const main = async (powers, locator, uuid, pid, cancel, cancelled) => { + console.error(`Endo worker started on pid ${pid}`); + cancelled.catch(() => { + console.error(`Endo worker exiting on pid ${pid}`); }); - if (process.argv.length < 4) { - throw new Error( - `worker.js requires arguments uuid, workerStatePath, workerEphemeralStatePath, workerCachePath, got ${process.argv.join( - ', ', - )}`, - ); - } - - // const uuid = process.argv[2]; - // const workerCachePath = process.argv[3]; + const { reader, writer } = powers.connection; - // @ts-ignore This is in fact how you open a file descriptor. - const reader = fs.createReadStream(null, { fd: 3 }); - // @ts-ignore This is in fact how you open a file descriptor. - const writer = fs.createWriteStream(null, { fd: 3 }); + // Behold: reference cycle + // eslint-disable-next-line no-use-before-define + const workerFacet = makeWorkerFacet(() => getBootstrap(), cancel); - const workerFacet = makeWorkerFacet(); - - const { closed } = makeNodeNetstringCapTP( + const { closed, getBootstrap } = makeNetstringCapTP( 'Endo', writer, reader, @@ -61,7 +75,5 @@ export const main = async () => { workerFacet, ); - closed.catch(sinkError); + return Promise.race([cancelled, closed]); }; - -main().catch(sinkError); diff --git a/packages/daemon/test/test-endo.js b/packages/daemon/test/test-endo.js index 9dfaebf381..6c7c3db7a0 100644 --- a/packages/daemon/test/test-endo.js +++ b/packages/daemon/test/test-endo.js @@ -24,19 +24,26 @@ const { raw } = String; const dirname = url.fileURLToPath(new URL('..', import.meta.url)).toString(); -const locator = { - statePath: path.join(dirname, 'tmp/state'), - ephemeralStatePath: path.join(dirname, 'tmp/run'), - cachePath: path.join(dirname, 'tmp/cache'), - sockPath: - process.platform === 'win32' - ? raw`\\?\pipe\endo-test.sock` - : path.join(dirname, 'tmp/endo.sock'), +/** @param {Array} root */ +const makeLocator = (...root) => { + return { + statePath: path.join(dirname, ...root, 'state'), + ephemeralStatePath: path.join(dirname, ...root, 'run'), + cachePath: path.join(dirname, ...root, 'cache'), + sockPath: + process.platform === 'win32' + ? raw`\\?\pipe\endo-${root.join('-')}-test.sock` + : path.join(dirname, ...root, 'endo.sock'), + pets: new Map(), + values: new Map(), + }; }; -test.serial('lifecycle', async t => { +test('lifecycle', async t => { const { reject: cancel, promise: cancelled } = makePromiseKit(); + const locator = makeLocator('tmp', 'lifecycle'); + await stop(locator).catch(() => {}); await reset(locator); await clean(locator); await start(locator); @@ -50,11 +57,85 @@ test.serial('lifecycle', async t => { ); const bootstrap = getBootstrap(); const worker = await E(bootstrap).makeWorker(); - await E(E.get(worker).actions).terminate(); - cancel(Error('Cancelled')); - await closed; + await E(worker) + .terminate() + .catch(() => {}); + cancel(new Error('Cancelled')); + await closed.catch(() => {}); + + t.pass(); +}); + +test('spawn and evaluate', async t => { + const { promise: cancelled } = makePromiseKit(); + const locator = makeLocator('tmp', 'spawn-eval'); + + await stop(locator).catch(() => {}); + await reset(locator); + await start(locator); + + const { getBootstrap } = await makeEndoClient( + 'client', + locator.sockPath, + cancelled, + ); + const bootstrap = getBootstrap(); + + const worker = E(bootstrap).makeWorker(); + const ten = await E(worker).evaluate('10', [], []); + t.is(10, ten); await stop(locator); +}); - t.pass(); +test('persist spawn and evaluation', async t => { + const { promise: cancelled } = makePromiseKit(); + const locator = makeLocator('tmp', 'persist-spawn-eval'); + + await stop(locator).catch(() => {}); + await reset(locator); + await start(locator); + + { + const { getBootstrap } = await makeEndoClient( + 'client', + locator.sockPath, + cancelled, + ); + const bootstrap = getBootstrap(); + + const worker = E(bootstrap).makeWorker(); + const ten = await E(worker).evaluate('10', [], [], 'ten'); + t.is(10, ten); + const twenty = await E(worker).evaluate( + 'number * 2', + ['number'], + ['ten'], + 'twenty', + ); + + // TODO + // Erase the pet name for 'ten', demonstrating that the evaluation record + // does not retain its dependencies by their name. + // await E(worker).forget('ten'); + + t.is(20, twenty); + } + + await restart(locator); + + { + const { getBootstrap } = await makeEndoClient( + 'client', + locator.sockPath, + cancelled, + ); + + const bootstrap = getBootstrap(); + + const retwenty = await E(bootstrap).provide('twenty'); + t.is(20, retwenty); + } + + await stop(locator); }); diff --git a/packages/daemon/types.d.ts b/packages/daemon/types.d.ts index fc672f9250..a2bb010d7e 100644 --- a/packages/daemon/types.d.ts +++ b/packages/daemon/types.d.ts @@ -1,18 +1,15 @@ -type Locator = { - statePath: string; - ephemeralStatePath: string; - cachePath: string; - sockPath: string; -}; +import type { FarRef } from '@endo/far'; +import type { Locator } from './src/types.js'; + +export type { Locator }; + +export { makeEndoClient } from './src/client.js'; +export { makeRefReader, makeRefIterator } from './src/ref-reader.js'; +export { makeReaderRef, makeIteratorRef } from './src/reader-ref.js'; + export async function start(locator?: Locator); export async function stop(locator?: Locator); export async function restart(locator?: Locator); export async function terminate(locator?: Locator); export async function clean(locator?: Locator); export async function reset(locator?: Locator); -export async function makeEndoClient( - name: string, - sockPath: string, - cancelled: Promise, - bootstrap?: TBootstrap, -); From 2ecf21a062cd48619890914905da3b98af970ab4 Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Sat, 7 Jan 2023 16:34:16 -0800 Subject: [PATCH 13/23] test(daemon): Test store --- packages/daemon/test/test-endo.js | 33 +++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/packages/daemon/test/test-endo.js b/packages/daemon/test/test-endo.js index 6c7c3db7a0..3bb6b5d818 100644 --- a/packages/daemon/test/test-endo.js +++ b/packages/daemon/test/test-endo.js @@ -18,6 +18,7 @@ import { clean, reset, makeEndoClient, + makeReaderRef, } from '../index.js'; const { raw } = String; @@ -139,3 +140,35 @@ test('persist spawn and evaluation', async t => { await stop(locator); }); + +test('store', async t => { + const { promise: cancelled } = makePromiseKit(); + const locator = makeLocator('tmp', 'store'); + + await stop(locator).catch(() => {}); + await reset(locator); + await start(locator); + + { + const { getBootstrap } = await makeEndoClient( + 'client', + locator.sockPath, + cancelled, + ); + const bootstrap = getBootstrap(); + const readerRef = makeReaderRef([new TextEncoder().encode('hello\n')]); + await E(bootstrap).store(readerRef, 'helloText'); + } + + { + const { getBootstrap } = await makeEndoClient( + 'client', + locator.sockPath, + cancelled, + ); + const bootstrap = getBootstrap(); + const readable = await E(bootstrap).provide('helloText'); + const actualText = await E(readable).text(); + t.is(actualText, 'hello\n'); + } +}); From da57e6ea9edfe223c3fb716eb344c6a4ef4d47bc Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Sat, 7 Jan 2023 16:34:48 -0800 Subject: [PATCH 14/23] test(daemon): Closure state lost on restart using eval --- packages/daemon/test/test-endo.js | 86 +++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/packages/daemon/test/test-endo.js b/packages/daemon/test/test-endo.js index 3bb6b5d818..5ea124dbad 100644 --- a/packages/daemon/test/test-endo.js +++ b/packages/daemon/test/test-endo.js @@ -172,3 +172,89 @@ test('store', async t => { t.is(actualText, 'hello\n'); } }); + +test('closure state lost by restart', async t => { + const { promise: cancelled } = makePromiseKit(); + const locator = makeLocator('tmp', 'restart-closures'); + + await stop(locator).catch(() => {}); + await reset(locator); + await start(locator); + + { + const { getBootstrap } = await makeEndoClient( + 'client', + locator.sockPath, + cancelled, + ); + const bootstrap = getBootstrap(); + const worker = await E(bootstrap).makeWorker('w1'); + await E(worker).evaluate( + ` + Far('Counter Maker', { + makeCounter: (value = 0) => Far('Counter', { + incr: () => value += 1, + decr: () => value -= 1, + }), + }) + `, + [], + [], + 'counterMaker', + ); + await E(worker).evaluate( + `E(cm).makeCounter() `, + ['cm'], + ['counterMaker'], + 'counter', + ); + const one = await E(worker).evaluate( + `E(counter).incr()`, + ['counter'], + ['counter'], + ); + const two = await E(worker).evaluate( + `E(counter).incr()`, + ['counter'], + ['counter'], + ); + const three = await E(worker).evaluate( + `E(counter).incr()`, + ['counter'], + ['counter'], + ); + t.is(one, 1); + t.is(two, 2); + t.is(three, 3); + } + + await restart(locator); + + { + const { getBootstrap } = await makeEndoClient( + 'client', + locator.sockPath, + cancelled, + ); + const bootstrap = getBootstrap(); + const worker = await E(bootstrap).provide('w1'); + const one = await E(worker).evaluate( + `E(counter).incr()`, + ['counter'], + ['counter'], + ); + const two = await E(worker).evaluate( + `E(counter).incr()`, + ['counter'], + ['counter'], + ); + const three = await E(worker).evaluate( + `E(counter).incr()`, + ['counter'], + ['counter'], + ); + t.is(one, 1); + t.is(two, 2); + t.is(three, 3); + } +}); From 35d956b2c4cfc72b86e7fddbfb084ea8c2f7e41c Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Tue, 20 Dec 2022 14:14:42 -0800 Subject: [PATCH 15/23] feat(cli): Store archive as pet name --- packages/cli/src/endo.js | 42 +++++++++++++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/packages/cli/src/endo.js b/packages/cli/src/endo.js index 8b0258c26d..d98ea7660c 100644 --- a/packages/cli/src/endo.js +++ b/packages/cli/src/endo.js @@ -21,6 +21,7 @@ import { clean, reset, makeEndoClient, + makeReaderRef, } from '@endo/daemon'; import { whereEndoState, @@ -172,16 +173,39 @@ export const main = async rawArgs => { }); program - .command('archive ') - .action(async (_cmd, [archivePath, applicationPath]) => { - const archiveLocation = url.pathToFileURL(archivePath); + .command('archive ') + .option('-n,--name ', 'Store the archive into Endo') + .option('-f,--file ', 'Store the archive into a file') + .action(async (applicationPath, cmd) => { + const archiveName = cmd.opts().name; + const archivePath = cmd.opts().file; const applicationLocation = url.pathToFileURL(applicationPath); - await writeArchive( - write, - readPowers, - archiveLocation, - applicationLocation, - ); + if (archiveName !== undefined) { + const archiveBytes = await makeArchive(readPowers, applicationLocation); + const readerRef = makeReaderRef([archiveBytes]); + const { getBootstrap } = await makeEndoClient( + 'cli', + sockPath, + cancelled, + ); + try { + const bootstrap = getBootstrap(); + await E(bootstrap).store(readerRef, archiveName); + } catch (error) { + console.error(error); + cancel(error); + } + } else if (archivePath !== undefined) { + const archiveLocation = url.pathToFileURL(archivePath); + await writeArchive( + write, + readPowers, + archiveLocation, + applicationLocation, + ); + } else { + throw new TypeError('Archive command requires either a name or a path'); + } }); // Throw an error instead of exiting directly. From 1506968950a823dc7a415174a7641b6991c8750a Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Tue, 20 Dec 2022 14:15:40 -0800 Subject: [PATCH 16/23] feat(cli): Store readable blob --- packages/cli/package.json | 1 + packages/cli/src/endo.js | 24 ++++++++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/packages/cli/package.json b/packages/cli/package.json index b0cbf0f9b9..5dc35fe600 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -32,6 +32,7 @@ "@endo/far": "^0.2.18", "@endo/lockdown": "^0.1.28", "@endo/promise-kit": "^0.2.56", + "@endo/stream-node": "^0.2.26", "@endo/where": "^0.3.1", "commander": "^5.0.0", "ses": "^0.18.4" diff --git a/packages/cli/src/endo.js b/packages/cli/src/endo.js index d98ea7660c..5db0c98ed1 100644 --- a/packages/cli/src/endo.js +++ b/packages/cli/src/endo.js @@ -33,12 +33,14 @@ import { mapLocation, hashLocation, loadArchive, + makeArchive, writeArchive, } from '@endo/compartment-mapper'; import { makeReadPowers, makeWritePowers, } from '@endo/compartment-mapper/node-powers.js'; +import { makeNodeReader } from '@endo/stream-node'; import { E } from '@endo/far'; const readPowers = makeReadPowers({ fs, url, crypto }); @@ -208,6 +210,28 @@ export const main = async rawArgs => { } }); + program + .command('store ') + .option( + '-n,--name ', + 'Assigns a pet name to the result for future reference', + ) + .action(async (storablePath, cmd) => { + const { name } = cmd.opts(); + const nodeReadStream = fs.createReadStream(storablePath); + const reader = makeNodeReader(nodeReadStream); + const readerRef = makeReaderRef(reader); + + const { getBootstrap } = await makeEndoClient('cli', sockPath, cancelled); + try { + const bootstrap = getBootstrap(); + await E(bootstrap).store(readerRef, name); + } catch (error) { + console.error(error); + cancel(error); + } + }); + // Throw an error instead of exiting directly. program.exitOverride(); From 74045b6cb2844fe6c92b759107b6593fdd214a7d Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Tue, 20 Dec 2022 14:16:34 -0800 Subject: [PATCH 17/23] feat(cli): Spawn worker --- packages/cli/src/endo.js | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/packages/cli/src/endo.js b/packages/cli/src/endo.js index 5db0c98ed1..8da01cc0b0 100644 --- a/packages/cli/src/endo.js +++ b/packages/cli/src/endo.js @@ -232,6 +232,17 @@ export const main = async rawArgs => { } }); + program.command('spawn ').action(async name => { + const { getBootstrap } = await makeEndoClient('cli', sockPath, cancelled); + try { + const bootstrap = getBootstrap(); + await E(bootstrap).makeWorker(name); + } catch (error) { + console.error(error); + cancel(error); + } + }); + // Throw an error instead of exiting directly. program.exitOverride(); From 5b4b1ccea828d299426992dc4b68b02d2f38419f Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Tue, 20 Dec 2022 14:17:09 -0800 Subject: [PATCH 18/23] feat(cli): Show pet name --- packages/cli/src/endo.js | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/packages/cli/src/endo.js b/packages/cli/src/endo.js index 8da01cc0b0..b68f4df17e 100644 --- a/packages/cli/src/endo.js +++ b/packages/cli/src/endo.js @@ -243,6 +243,18 @@ export const main = async rawArgs => { } }); + program.command('show ').action(async name => { + const { getBootstrap } = await makeEndoClient('cli', sockPath, cancelled); + try { + const bootstrap = getBootstrap(); + const pet = await E(bootstrap).provide(name); + console.log(pet); + } catch (error) { + console.error(error); + cancel(error); + } + }); + // Throw an error instead of exiting directly. program.exitOverride(); From 8af70826a66487341b8ae92a4bf5469da484f775 Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Tue, 20 Dec 2022 14:17:48 -0800 Subject: [PATCH 19/23] feat(cli): Cat command --- packages/cli/src/endo.js | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/packages/cli/src/endo.js b/packages/cli/src/endo.js index b68f4df17e..ef96ac3fb7 100644 --- a/packages/cli/src/endo.js +++ b/packages/cli/src/endo.js @@ -22,6 +22,7 @@ import { reset, makeEndoClient, makeReaderRef, + makeRefReader, } from '@endo/daemon'; import { whereEndoState, @@ -255,6 +256,22 @@ export const main = async rawArgs => { } }); + program.command('cat ').action(async name => { + const { getBootstrap } = await makeEndoClient('cli', sockPath, cancelled); + try { + const bootstrap = getBootstrap(); + const readable = await E(bootstrap).provide(name); + const readerRef = E(readable).stream(); + const reader = makeRefReader(readerRef); + for await (const chunk of reader) { + process.stdout.write(chunk); + } + } catch (error) { + console.error(error); + cancel(error); + } + }); + // Throw an error instead of exiting directly. program.exitOverride(); From 4c55ff5a88bc1dfa81eea6d27610890554bd7524 Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Tue, 20 Dec 2022 14:18:16 -0800 Subject: [PATCH 20/23] feat(cli): Eval in worker --- packages/cli/src/endo.js | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/packages/cli/src/endo.js b/packages/cli/src/endo.js index ef96ac3fb7..bbecd33770 100644 --- a/packages/cli/src/endo.js +++ b/packages/cli/src/endo.js @@ -272,6 +272,42 @@ export const main = async rawArgs => { } }); + program + .command('eval [names...]') + .option( + '-n,--name ', + 'Assigns a name to the result for future reference, persisted between restarts', + ) + .action(async (worker, source, names, cmd) => { + const { name: resultPetName } = cmd.opts(); + const { getBootstrap } = await makeEndoClient('cli', sockPath, cancelled); + try { + const bootstrap = getBootstrap(); + const workerRef = E(bootstrap).provide(worker); + + const pairs = names.map(name => { + const pair = name.split(':'); + if (pair.length === 1) { + return [name, name]; + } + return pair; + }); + const codeNames = pairs.map(pair => pair[0]); + const petNames = pairs.map(pair => pair[1]); + + const result = await E(workerRef).evaluate( + source, + codeNames, + petNames, + resultPetName, + ); + console.log(result); + } catch (error) { + console.error(error); + cancel(error); + } + }); + // Throw an error instead of exiting directly. program.exitOverride(); From 017cee3679343563980e9e34e8c7daf44aecc454 Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Tue, 27 Dec 2022 23:04:51 -0800 Subject: [PATCH 21/23] feat(cli): Start daemon on demand --- packages/cli/src/endo.js | 48 +++++++++++++++++++++++++++++++++++----- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/packages/cli/src/endo.js b/packages/cli/src/endo.js index bbecd33770..7d53842ef2 100644 --- a/packages/cli/src/endo.js +++ b/packages/cli/src/endo.js @@ -70,6 +70,22 @@ const sockPath = whereEndoSock(process.platform, process.env, info); const cachePath = whereEndoCache(process.platform, process.env, info); const logPath = path.join(statePath, 'endo.log'); +const provideEndoClient = async (...args) => { + try { + // It is okay to fail to connect because the daemon is not running. + return await makeEndoClient(...args); + } catch { + console.error('Starting Endo daemon...'); + // It is also okay to fail the race to start. + await start().catch(() => {}); + // But not okay to fail to connect after starting. + // We are not going to contemplate reliably in the face of a worker getting + // stopped the moment after it was started. + // That is a bridge too far. + return makeEndoClient(...args); + } +}; + export const main = async rawArgs => { const { promise: cancelled, reject: cancel } = makePromiseKit(); cancelled.catch(() => {}); @@ -186,7 +202,7 @@ export const main = async rawArgs => { if (archiveName !== undefined) { const archiveBytes = await makeArchive(readPowers, applicationLocation); const readerRef = makeReaderRef([archiveBytes]); - const { getBootstrap } = await makeEndoClient( + const { getBootstrap } = await provideEndoClient( 'cli', sockPath, cancelled, @@ -223,7 +239,11 @@ export const main = async rawArgs => { const reader = makeNodeReader(nodeReadStream); const readerRef = makeReaderRef(reader); - const { getBootstrap } = await makeEndoClient('cli', sockPath, cancelled); + const { getBootstrap } = await provideEndoClient( + 'cli', + sockPath, + cancelled, + ); try { const bootstrap = getBootstrap(); await E(bootstrap).store(readerRef, name); @@ -234,7 +254,11 @@ export const main = async rawArgs => { }); program.command('spawn ').action(async name => { - const { getBootstrap } = await makeEndoClient('cli', sockPath, cancelled); + const { getBootstrap } = await provideEndoClient( + 'cli', + sockPath, + cancelled, + ); try { const bootstrap = getBootstrap(); await E(bootstrap).makeWorker(name); @@ -245,7 +269,11 @@ export const main = async rawArgs => { }); program.command('show ').action(async name => { - const { getBootstrap } = await makeEndoClient('cli', sockPath, cancelled); + const { getBootstrap } = await provideEndoClient( + 'cli', + sockPath, + cancelled, + ); try { const bootstrap = getBootstrap(); const pet = await E(bootstrap).provide(name); @@ -257,7 +285,11 @@ export const main = async rawArgs => { }); program.command('cat ').action(async name => { - const { getBootstrap } = await makeEndoClient('cli', sockPath, cancelled); + const { getBootstrap } = await provideEndoClient( + 'cli', + sockPath, + cancelled, + ); try { const bootstrap = getBootstrap(); const readable = await E(bootstrap).provide(name); @@ -280,7 +312,11 @@ export const main = async rawArgs => { ) .action(async (worker, source, names, cmd) => { const { name: resultPetName } = cmd.opts(); - const { getBootstrap } = await makeEndoClient('cli', sockPath, cancelled); + const { getBootstrap } = await provideEndoClient( + 'cli', + sockPath, + cancelled, + ); try { const bootstrap = getBootstrap(); const workerRef = E(bootstrap).provide(worker); From e9ecc9b381d86d08ffa42e34540727c078818426 Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Fri, 23 Dec 2022 14:48:15 -0800 Subject: [PATCH 22/23] feat(cli): Log follow watches for reset --- packages/cli/src/endo.js | 51 ++++++++++++++++++++++++++++++++-------- 1 file changed, 41 insertions(+), 10 deletions(-) diff --git a/packages/cli/src/endo.js b/packages/cli/src/endo.js index 7d53842ef2..9d48b89155 100644 --- a/packages/cli/src/endo.js +++ b/packages/cli/src/endo.js @@ -1,4 +1,5 @@ -/* global process */ +/* global process, setTimeout, clearTimeout */ +/* eslint-disable no-await-in-loop */ // Establish a perimeter: import 'ses'; @@ -146,17 +147,47 @@ export const main = async rawArgs => { program .command('log') .option('-f, --follow', 'follow the tail of the log') + .option('-p,--ping ', 'milliseconds between daemon reset checks') .action(async cmd => { - // TODO rerun follower command after reset - await new Promise((resolve, reject) => { - const args = cmd.opts().follow ? ['-f'] : []; - const child = spawn('tail', [...args, logPath], { - stdio: ['inherit', 'inherit', 'inherit'], + const follow = cmd.opts().follow; + const ping = cmd.opts().ping; + const logCheckIntervalMs = ping !== undefined ? Number(ping) : 5_000; + + do { + // Scope cancellation and propagate. + const { promise: followCancelled, reject: cancelFollower } = + makePromiseKit(); + cancelled.catch(cancelFollower); + + (async () => { + const { getBootstrap } = await makeEndoClient( + 'log-follower-probe', + sockPath, + followCancelled, + ); + const bootstrap = await getBootstrap(); + for (;;) { + await delay(logCheckIntervalMs, followCancelled); + await E(bootstrap).ping(); + } + })().catch(cancelFollower); + + await new Promise((resolve, reject) => { + const args = follow ? ['-f'] : []; + const child = spawn('tail', [...args, logPath], { + stdio: ['inherit', 'inherit', 'inherit'], + }); + child.on('error', reject); + child.on('exit', resolve); + followCancelled.catch(() => { + child.kill(); + }); }); - child.on('error', reject); - child.on('exit', resolve); - cancelled.catch(() => child.kill()); - }); + + if (follow) { + await delay(logCheckIntervalMs, cancelled); + } + } while (follow); }); program.command('ping').action(async _cmd => { From ed3c23ebe405fbe27451bbb426a82a1ffbfb148b Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Tue, 27 Dec 2022 17:16:48 -0800 Subject: [PATCH 23/23] feat(cli): Follow command --- packages/cli/src/endo.js | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/packages/cli/src/endo.js b/packages/cli/src/endo.js index 9d48b89155..e0da156dcc 100644 --- a/packages/cli/src/endo.js +++ b/packages/cli/src/endo.js @@ -23,6 +23,7 @@ import { reset, makeEndoClient, makeReaderRef, + makeRefIterator, makeRefReader, } from '@endo/daemon'; import { @@ -71,6 +72,18 @@ const sockPath = whereEndoSock(process.platform, process.env, info); const cachePath = whereEndoCache(process.platform, process.env, info); const logPath = path.join(statePath, 'endo.log'); +const delay = async (ms, cancelled) => { + // Do not attempt to set up a timer if already cancelled. + await Promise.race([cancelled, undefined]); + return new Promise((resolve, reject) => { + const handle = setTimeout(resolve, ms); + cancelled.catch(error => { + reject(error); + clearTimeout(handle); + }); + }); +}; + const provideEndoClient = async (...args) => { try { // It is okay to fail to connect because the daemon is not running. @@ -315,6 +328,25 @@ export const main = async rawArgs => { } }); + program.command('follow ').action(async name => { + const { getBootstrap } = await provideEndoClient( + 'cli', + sockPath, + cancelled, + ); + try { + const bootstrap = getBootstrap(); + const iterable = await E(bootstrap).provide(name); + const iterator = await E(iterable)[Symbol.asyncIterator](); + for await (const iterand of makeRefIterator(iterator)) { + console.log(iterand); + } + } catch (error) { + console.error(error); + cancel(error); + } + }); + program.command('cat ').action(async name => { const { getBootstrap } = await provideEndoClient( 'cli',