fix: various nits with process lifetime and streaming

This commit is contained in:
cha0s 2024-02-21 06:57:54 -06:00
parent 8fed3a3065
commit e0ed998601
16 changed files with 1214 additions and 832 deletions

1855
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -20,7 +20,6 @@
"packages/*" "packages/*"
], ],
"devDependencies": { "devDependencies": {
"@npmcli/arborist": "^7.3.1", "@npmcli/arborist": "^7.3.1"
"chalk": "^4.1.2"
} }
} }

View File

@ -66,6 +66,7 @@ module.exports = async (flecks) => ({
'import/prefer-default-export': 'off', 'import/prefer-default-export': 'off',
'jsx-a11y/control-has-associated-label': ['error', {assert: 'either'}], 'jsx-a11y/control-has-associated-label': ['error', {assert: 'either'}],
'jsx-a11y/label-has-associated-control': ['error', {assert: 'either'}], 'jsx-a11y/label-has-associated-control': ['error', {assert: 'either'}],
'max-classes-per-file': ['error', {ignoreExpressions: true}],
'no-param-reassign': ['error', {props: false}], 'no-param-reassign': ['error', {props: false}],
'no-plusplus': 'off', 'no-plusplus': 'off',
'no-shadow': 'off', 'no-shadow': 'off',

View File

@ -20,6 +20,7 @@ const {
} = require('@babel/types'); } = require('@babel/types');
const addPathsToYml = require('@flecks/core/build/add-paths-to-yml'); const addPathsToYml = require('@flecks/core/build/add-paths-to-yml');
const D = require('@flecks/core/build/debug'); const D = require('@flecks/core/build/debug');
const {prefixLines} = require('@flecks/core/build/stream');
const { const {
add, add,
binaryPath, binaryPath,
@ -28,6 +29,7 @@ const {
spawnWith, spawnWith,
writeFile, writeFile,
} = require('@flecks/core/src/server'); } = require('@flecks/core/src/server');
const chalk = require('chalk');
const chokidar = require('chokidar'); const chokidar = require('chokidar');
const {glob} = require('glob'); const {glob} = require('glob');
const {paperwork} = require('precinct'); const {paperwork} = require('precinct');
@ -224,24 +226,43 @@ exports.hook = (program, flecks) => {
'--mode', (production && !hot) ? 'production' : 'development', '--mode', (production && !hot) ? 'production' : 'development',
]; ];
const options = { const options = {
// @todo This kills the pnpm. Let's use a real IPC channel. stdio: ['pipe', 'pipe', 'pipe', 'ipc'],
useFork: true,
...rest, ...rest,
env: { env: {
DEBUG_COLORS: process.stdout.isTTY,
FLECKS_BUILD_IS_PRODUCTION: production, FLECKS_BUILD_IS_PRODUCTION: production,
FORCE_COLOR: process.stdout.isTTY,
...(target ? {FLECKS_CORE_BUILD_LIST: target} : {}), ...(target ? {FLECKS_CORE_BUILD_LIST: target} : {}),
...(hot ? {FLECKS_ENV__flecks_server__hot: 'true'} : {}), ...(hot ? {FLECKS_ENV__flecks_server__hot: 'true'} : {}),
...rest.env, ...rest.env,
}, },
}; };
const spawnWithPrefixedLines = (cmd, options) => {
const child = spawnWith(cmd, options);
if (
'pipe' === options.stdio
|| (Array.isArray(options.stdio) && options.stdio[0] === 'pipe')
) {
prefixLines(child.stdout, chalk.green('[📦] '))
.pipe(process.stdout);
}
if (
'pipe' === options.stdio
|| (Array.isArray(options.stdio) && options.stdio[1] === 'pipe')
) {
prefixLines(child.stderr, chalk.green('[📦] '))
.pipe(process.stderr);
}
return child;
};
if (!watch) { if (!watch) {
return spawnWith(cmd, options); return spawnWithPrefixedLines(cmd, options);
} }
try { try {
await access(join(FLECKS_CORE_ROOT, 'build/flecks.yml')); await access(join(FLECKS_CORE_ROOT, 'build/flecks.yml'));
} }
catch (error) { catch (error) {
return spawnWith(cmd, options); return spawnWithPrefixedLines(cmd, options);
} }
await rootsDependencies(flecks.roots, flecks.resolver); await rootsDependencies(flecks.roots, flecks.resolver);
const watched = Object.keys(dependencies); const watched = Object.keys(dependencies);
@ -259,7 +280,7 @@ exports.hook = (program, flecks) => {
}); });
let webpack; let webpack;
const spawnWebpack = () => { const spawnWebpack = () => {
webpack = spawnWith(cmd, options); webpack = spawnWithPrefixedLines(cmd, options);
webpack.on('message', (message) => { webpack.on('message', (message) => {
switch (message.type) { switch (message.type) {
case 'kill': case 'kill':

View File

@ -37,6 +37,7 @@
"babel-merge": "^3.0.0", "babel-merge": "^3.0.0",
"chai": "4.2.0", "chai": "4.2.0",
"chai-as-promised": "7.1.1", "chai-as-promised": "7.1.1",
"chalk": "^4.1.2",
"chokidar": "^3.5.3", "chokidar": "^3.5.3",
"commander": "11.1.0", "commander": "11.1.0",
"copy-webpack-plugin": "^11.0.0", "copy-webpack-plugin": "^11.0.0",

View File

@ -21,7 +21,7 @@ module.exports = (name) => {
D.formatters.o = undefined; D.formatters.o = undefined;
D.formatters.O = undefined; D.formatters.O = undefined;
} }
const type = 'web' === process.env.FLECKS_CORE_BUILD_TARGET ? 'debug' : 'error'; const type = 'undefined' !== typeof window ? 'log' : 'error';
// eslint-disable-next-line no-console // eslint-disable-next-line no-console
D.log = console[type].bind(console); D.log = console[type].bind(console);
} }

View File

@ -1,8 +1,12 @@
// eslint-disable-next-line max-classes-per-file, import/no-extraneous-dependencies // eslint-disable-next-line max-classes-per-file, import/no-extraneous-dependencies
const {Buffer} = require('buffer'); const {Buffer} = require('buffer');
const {EOL} = require('os');
const {Transform, Writable} = require('stream');
const {dump: dumpYml, load: loadYml} = require('js-yaml'); const {dump: dumpYml, load: loadYml} = require('js-yaml');
const JsonParse = require('jsonparse'); const JsonParse = require('jsonparse');
const {Transform, Writable} = require('stream');
const linebreak = /\r?\n/;
exports.JsonStream = class JsonStream extends Transform { exports.JsonStream = class JsonStream extends Transform {
@ -46,17 +50,43 @@ exports.JsonStream.PrettyPrint = class extends exports.JsonStream {
}; };
exports.pipesink = (stream) => { exports.LineStream = class LineStream extends Transform {
constructor(encoding = 'utf8') {
super();
this.encoding = encoding;
this.buffer = '';
}
// eslint-disable-next-line no-underscore-dangle
_transform(chunk, encoding, done) {
const string = chunk.toString(this.encoding);
if (!string.match(linebreak)) {
this.buffer += string;
done();
return;
}
const parts = (this.buffer + string).split(linebreak);
this.buffer = parts.pop();
for (let i = 0; i < parts.length; ++i) {
this.push(parts[i]);
}
done();
}
};
exports.pipesink = (stream, {concat = Buffer.concat} = {}) => {
class Sink extends Writable { class Sink extends Writable {
constructor() { constructor() {
super(); super();
this.buffers = []; this.chunks = [];
} }
// eslint-disable-next-line no-underscore-dangle // eslint-disable-next-line no-underscore-dangle
_write(chunk, encoding, done) { _write(chunk, encoding, done) {
this.buffers.push(chunk); this.chunks.push(chunk);
done(); done();
} }
@ -66,11 +96,27 @@ exports.pipesink = (stream) => {
stream.pipe(sink); stream.pipe(sink);
stream.on('error', reject); stream.on('error', reject);
stream.on('end', () => { stream.on('end', () => {
resolve(Buffer.concat(sink.buffers)); resolve(concat(sink.chunks));
}); });
}); });
}; };
exports.prefixLines = (stream, prefix) => (
stream
.pipe(new exports.LineStream())
.pipe(new class Stdio extends Transform {
// eslint-disable-next-line no-underscore-dangle, class-methods-use-this
_transform(chunk, encoding, done) {
this.push(prefix);
this.push(chunk);
this.push(EOL);
done();
}
}())
);
exports.YamlStream = class YamlStream extends Transform { exports.YamlStream = class YamlStream extends Transform {
constructor(decorator, options = {dump: {}, load: {}}) { constructor(decorator, options = {dump: {}, load: {}}) {

View File

@ -1,3 +1,4 @@
exports.hook = async (req, flecks) => ({ exports.hook = async (req, flecks) => ({
id: flecks.get('@flecks/core.id'), id: flecks.get('@flecks/core.id'),
hi: 'foo',
}); });

View File

@ -1,4 +1,4 @@
const {exec, fork, spawn} = require('child_process'); const {exec, spawn} = require('child_process');
const { const {
access, access,
constants: {X_OK}, constants: {X_OK},
@ -60,15 +60,14 @@ exports.run = (cmd, {suppressError = true} = {}) => (
const children = []; const children = [];
exports.spawnWith = (cmd, opts = {}) => { exports.spawnWith = (cmd, opts = {}) => {
const {useFork, ...rest} = opts; debug("spawning: '%s'", cmd.join(' '));
debug("%sing: '%s'", useFork ? 'fork' : 'spawn', cmd.join(' ')); debugSilly('with options: %O', opts);
debugSilly('with options: %O', rest); const child = spawn(cmd[0], cmd.slice(1), {
const child = (useFork ? fork : spawn)(cmd[0], cmd.slice(1), {
stdio: 'inherit', stdio: 'inherit',
...rest, ...opts,
env: { env: {
...process.env, ...process.env,
...rest.env, ...opts.env,
}, },
}); });
children.push(child); children.push(child);

View File

@ -13,7 +13,6 @@ const debug = D('@flecks/build.commands');
const { const {
FLECKS_CORE_ROOT = process.cwd(), FLECKS_CORE_ROOT = process.cwd(),
TERM,
} = process.env; } = process.env;
module.exports = (program, flecks) => { module.exports = (program, flecks) => {
@ -49,7 +48,7 @@ module.exports = (program, flecks) => {
const filename = await flecks.resolveBuildConfig('test.webpack.config.js', '@flecks/build'); const filename = await flecks.resolveBuildConfig('test.webpack.config.js', '@flecks/build');
const config = {test: await require(filename)(env, argv, flecks)}; const config = {test: await require(filename)(env, argv, flecks)};
await flecks.configureBuilds(config, env, argv); await flecks.configureBuilds(config, env, argv);
if (!config.test.entry) { if (0 === Object.entries(config.test.entry).length) {
return undefined; return undefined;
} }
// Remove the previous test(s). // Remove the previous test(s).
@ -59,8 +58,8 @@ module.exports = (program, flecks) => {
'test', 'test',
{ {
env: { env: {
DEBUG_COLORS: 'dumb' !== TERM, DEBUG_COLORS: process.stdout.isTTY,
FORCE_COLOR: 'dumb' !== TERM, FORCE_COLOR: process.stdout.isTTY,
}, },
production, production,
stdio: watch ? 'inherit' : 'pipe', stdio: watch ? 'inherit' : 'pipe',

View File

@ -1,6 +1,9 @@
const cluster = require('cluster'); const cluster = require('cluster');
const {join} = require('path'); const {join} = require('path');
const {prefixLines} = require('@flecks/core/build/stream');
const chalk = require('chalk');
class StartServerPlugin { class StartServerPlugin {
pluginName = 'StartServerPlugin'; pluginName = 'StartServerPlugin';
@ -27,14 +30,19 @@ class StartServerPlugin {
apply(compiler) { apply(compiler) {
const {options: {exec, signal}, pluginName} = this; const {options: {exec, signal}, pluginName} = this;
const logger = compiler.getInfrastructureLogger(pluginName); const logger = compiler.getInfrastructureLogger(pluginName);
let lastStartHadErrors = false;
compiler.hooks.afterEmit.tapPromise(pluginName, async (compilation) => { compiler.hooks.afterEmit.tapPromise(pluginName, async (compilation) => {
if (compilation.errors.length > 0) {
lastStartHadErrors = true;
return;
}
if (this.worker && this.worker.isConnected()) { if (this.worker && this.worker.isConnected()) {
if (signal) { if (signal && !lastStartHadErrors) {
process.kill( process.kill(
this.worker.process.pid, this.worker.process.pid,
true === signal ? 'SIGUSR2' : signal, true === signal ? 'SIGUSR2' : signal,
); );
return undefined; return;
} }
const promise = new Promise((resolve) => { const promise = new Promise((resolve) => {
this.worker.on('disconnect', resolve); this.worker.on('disconnect', resolve);
@ -42,6 +50,7 @@ class StartServerPlugin {
this.worker.disconnect(); this.worker.disconnect();
await promise; await promise;
} }
lastStartHadErrors = false;
let entryPoint; let entryPoint;
if (!exec) { if (!exec) {
entryPoint = compilation.getPath(Object.keys(compilation.assets)[0]); entryPoint = compilation.getPath(Object.keys(compilation.assets)[0]);
@ -55,7 +64,7 @@ class StartServerPlugin {
else { else {
entryPoint = exec(compilation); entryPoint = exec(compilation);
} }
return this.startServer(join(compiler.options.output.path, entryPoint)); await this.startServer(join(compiler.options.output.path, entryPoint));
}); });
compiler.hooks.shouldEmit.tap(pluginName, (compilation) => { compiler.hooks.shouldEmit.tap(pluginName, (compilation) => {
const entryPoints = Object.keys(compilation.assets); const entryPoints = Object.keys(compilation.assets);
@ -92,9 +101,16 @@ class StartServerPlugin {
exec, exec,
execArgv, execArgv,
args, args,
stdio: ['pipe', 'pipe', 'pipe', 'ipc'],
...(inspectPort && {inspectPort}), ...(inspectPort && {inspectPort}),
}); });
this.worker = cluster.fork(env); this.worker = cluster.fork({
...env,
});
['stdout', 'stderr'].forEach((stream) => {
prefixLines(this.worker.process[stream], chalk.blue('[SRV] '))
.pipe(process[stream]);
});
this.worker.on('exit', (code) => { this.worker.on('exit', (code) => {
if (killOnExit && !this.worker.exitedAfterDisconnect) { if (killOnExit && !this.worker.exitedAfterDisconnect) {
process.send({type: 'kill', payload: code}); process.send({type: 'kill', payload: code});

View File

@ -23,7 +23,8 @@
"server.js" "server.js"
], ],
"dependencies": { "dependencies": {
"@flecks/core": "^4.2.3" "@flecks/core": "^4.2.3",
"chalk": "^4.1.2"
}, },
"devDependencies": { "devDependencies": {
"@flecks/build": "^4.1.3", "@flecks/build": "^4.1.3",

View File

@ -15,7 +15,6 @@ import {createApplication} from './create-application';
const { const {
FLECKS_CORE_ROOT = process.cwd(), FLECKS_CORE_ROOT = process.cwd(),
TERM,
} = process.env; } = process.env;
class SocketWrapper { class SocketWrapper {
@ -135,12 +134,10 @@ export async function startServer({
stdio: 'pipe', stdio: 'pipe',
...opts, ...opts,
env: { env: {
DEBUG_COLORS: 'dumb' !== TERM,
FLECKS_ENV__flecks_server__stats: '{"preset": "none"}', FLECKS_ENV__flecks_server__stats: '{"preset": "none"}',
FLECKS_ENV__flecks_server__start: true, FLECKS_ENV__flecks_server__start: true,
FLECKS_CORE_ROOT: path, FLECKS_CORE_ROOT: path,
FLECKS_SERVER_TEST_SOCKET: socketPath, FLECKS_SERVER_TEST_SOCKET: socketPath,
FORCE_COLOR: 'dumb' !== TERM,
NODE_ENV: 'test', NODE_ENV: 'test',
NODE_PATH: join(FLECKS_CORE_ROOT, '..', '..', 'node_modules'), NODE_PATH: join(FLECKS_CORE_ROOT, '..', '..', 'node_modules'),
...opts.env, ...opts.env,

View File

@ -15,6 +15,10 @@ exports.hook = () => ({
* (webpack-dev-server) Port to bind. Defaults to random port. * (webpack-dev-server) Port to bind. Defaults to random port.
*/ */
devPort: 0, devPort: 0,
/**
* (webpack-dev-server) Set up a proxy to the dev server. Defaults to `false` in production.
*/
devProxyWds: undefined,
/** /**
* (webpack-dev-server) Webpack stats output. * (webpack-dev-server) Webpack stats output.
*/ */

View File

@ -38,6 +38,7 @@
"before-build-webpack": "^0.2.13", "before-build-webpack": "^0.2.13",
"browserify-zlib": "^0.2.0", "browserify-zlib": "^0.2.0",
"buffer": "^6.0.3", "buffer": "^6.0.3",
"chalk": "^4.1.2",
"clean-webpack-plugin": "4.0.0", "clean-webpack-plugin": "4.0.0",
"compression": "^1.7.4", "compression": "^1.7.4",
"css-loader": "^6.8.1", "css-loader": "^6.8.1",

View File

@ -4,7 +4,8 @@ import {join} from 'path';
import {PassThrough, Transform} from 'stream'; import {PassThrough, Transform} from 'stream';
import {D} from '@flecks/core'; import {D} from '@flecks/core';
import {binaryPath, spawnWith} from '@flecks/core/server'; import {binaryPath, prefixLines, spawnWith} from '@flecks/core/server';
import chalk from 'chalk';
import compression from 'compression'; import compression from 'compression';
import express from 'express'; import express from 'express';
import httpProxy from 'http-proxy'; import httpProxy from 'http-proxy';
@ -15,7 +16,6 @@ const {
FLECKS_CORE_ROOT = process.cwd(), FLECKS_CORE_ROOT = process.cwd(),
FLECKS_WEB_DEV_SERVER, FLECKS_WEB_DEV_SERVER,
NODE_ENV, NODE_ENV,
TERM,
} = process.env; } = process.env;
const debug = D('@flecks/web/server/http'); const debug = D('@flecks/web/server/http');
@ -47,6 +47,7 @@ const deliverHtmlStream = async (stream, req, res, flecks) => {
export const createHttpServer = async (flecks) => { export const createHttpServer = async (flecks) => {
const { const {
devProxyWds = 'production' !== NODE_ENV,
public: publicConfig, public: publicConfig,
port, port,
trust, trust,
@ -56,6 +57,22 @@ export const createHttpServer = async (flecks) => {
app.set('trust proxy', trust); app.set('trust proxy', trust);
const httpServer = createServer(app); const httpServer = createServer(app);
httpServer.app = app; httpServer.app = app;
// Hold requests until the server is up.
let markAsUp;
let waitingOnUp = new Promise((resolve) => {
markAsUp = () => {
waitingOnUp = undefined;
resolve();
};
});
app.use(async (req, res, next) => {
if (!waitingOnUp) {
next();
return;
}
await waitingOnUp;
next();
});
// Body parser. // Body parser.
app.use(express.urlencoded({extended: true})); app.use(express.urlencoded({extended: true}));
app.use(express.json()); app.use(express.json());
@ -89,18 +106,18 @@ export const createHttpServer = async (flecks) => {
const actualPort = 0 === port ? httpServer.address().port : port; const actualPort = 0 === port ? httpServer.address().port : port;
debug( debug(
'HTTP server up @ %s!', 'HTTP server up @ %s!',
new URL(`http://${[host, actualPort].filter((e) => !!e).join(':')}`), chalk.cyan(new URL(`http://${[host, actualPort].filter((e) => !!e).join(':')}`)),
); );
if ('undefined' === typeof publicConfig) { if ('undefined' === typeof publicConfig) {
flecks.web.public = [host, actualPort].join(':'); flecks.web.public = [host, actualPort].join(':');
} }
resolve(); resolve();
}); });
debug('httpServer.listen(...%O)', args.slice(0, -1)); debug('httpServer.listen(%s)', args.slice(0, -1).join(', '));
httpServer.listen(...args); httpServer.listen(...args);
}); });
// In development mode, create a proxy to the webpack-dev-server. // Create a proxy to the webpack-dev-server.
if ('production' !== NODE_ENV) { if (devProxyWds) {
const { const {
devHost, devHost,
devPort, devPort,
@ -131,9 +148,7 @@ export const createHttpServer = async (flecks) => {
cmd, cmd,
{ {
env: { env: {
DEBUG_COLORS: 'dumb' !== TERM,
FLECKS_CORE_BUILD_LIST: 'web', FLECKS_CORE_BUILD_LIST: 'web',
FORCE_COLOR: 'dumb' !== TERM,
}, },
stdio: 0 === wdsPort ? 'pipe' : 'inherit', stdio: 0 === wdsPort ? 'pipe' : 'inherit',
}, },
@ -172,13 +187,16 @@ export const createHttpServer = async (flecks) => {
} }
const parsePort = new ParsePort(); const parsePort = new ParsePort();
const stderr = new PassThrough(); const stderr = new PassThrough();
wds.stderr.pipe(parsePort).pipe(stderr); prefixLines(wds.stderr.pipe(parsePort), chalk.yellow('[WDS] '))
.pipe(stderr);
stderr.pipe(process.stderr); stderr.pipe(process.stderr);
wds.stdout.pipe(process.stdout); prefixLines(wds.stdout, chalk.yellow('[WDS] '))
.pipe(process.stdout);
wdsPort = await parsePort.port; wdsPort = await parsePort.port;
parsePort.unpipe(stderr); parsePort.unpipe(stderr);
stderr.unpipe(process.stderr); stderr.unpipe(process.stderr);
wds.stderr.pipe(process.stderr); prefixLines(wds.stderr, chalk.yellow('[WDS] '))
.pipe(process.stderr);
} }
} }
const proxy = httpProxy.createProxyServer({ const proxy = httpProxy.createProxyServer({
@ -263,6 +281,7 @@ export const createHttpServer = async (flecks) => {
}); });
} }
flecks.web.server = httpServer; flecks.web.server = httpServer;
markAsUp();
}; };
export const destroyHttpServer = (httpServer) => { export const destroyHttpServer = (httpServer) => {