431 lines
9.4 KiB
JavaScript
431 lines
9.4 KiB
JavaScript
|
'use strict';
|
||
|
|
||
|
Object.defineProperty(exports, '__esModule', { value: true });
|
||
|
|
||
|
function _interopDefault (ex) { return (ex && (typeof ex === 'object') && 'default' in ex) ? ex['default'] : ex; }
|
||
|
|
||
|
var events = require('events');
|
||
|
var net = require('net');
|
||
|
var D = _interopDefault(require('debug'));
|
||
|
|
||
|
const debugActionIgnore = (process.env.DEBUG_ACTION_IGNORE || '').split(',');
|
||
|
|
||
|
function createDebugAction(debug, action) {
|
||
|
if (!debug.enabled || -1 !== debugActionIgnore.indexOf(action.type)) {
|
||
|
return undefined;
|
||
|
}
|
||
|
const debugAction = {type: action.type, payload: {...action.payload}};
|
||
|
if (!process.env.SILLY_DEBUG) {
|
||
|
for (const i in debugAction.payload) {
|
||
|
// Exceptions...
|
||
|
if (
|
||
|
// Invocation hook.
|
||
|
-1 !== ['@truss/invoke', '@truss/invoke-flat'] && i !== 'hook'
|
||
|
) {
|
||
|
debugAction.payload[i] = '[...]';
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return debugAction;
|
||
|
}
|
||
|
|
||
|
function pack(action) { return JSON.stringify(action); }
|
||
|
function unpack (serial) { return JSON.parse(serial); }
|
||
|
|
||
|
let clientId = 0;
|
||
|
|
||
|
class IpcClient extends events.EventEmitter {
|
||
|
|
||
|
constructor(service) {
|
||
|
super();
|
||
|
|
||
|
this.id = clientId;
|
||
|
|
||
|
process.send({
|
||
|
type: 'connect',
|
||
|
payload: {
|
||
|
clientId: this.id,
|
||
|
service,
|
||
|
},
|
||
|
});
|
||
|
|
||
|
const messageListener = ({type, payload}) => {
|
||
|
|
||
|
switch (type) {
|
||
|
|
||
|
case 'connection_to':
|
||
|
|
||
|
if (payload.clientId !== this.id) { return; }
|
||
|
|
||
|
this.connection = payload.connection;
|
||
|
this.to = payload.to;
|
||
|
|
||
|
setTimeout(() => {
|
||
|
this.emit('ready');
|
||
|
}, 0);
|
||
|
|
||
|
break;
|
||
|
|
||
|
case 'connection_error':
|
||
|
|
||
|
if (payload.clientId !== this.id) { return; }
|
||
|
|
||
|
process.off('message', messageListener);
|
||
|
|
||
|
setTimeout(() => {
|
||
|
this.emit('error', {code: 'ENOTFOUND'});
|
||
|
}, 0);
|
||
|
|
||
|
break;
|
||
|
|
||
|
case 'client_recv':
|
||
|
|
||
|
if (payload.clientId !== this.id) { return; }
|
||
|
|
||
|
process.off('message', messageListener);
|
||
|
|
||
|
setTimeout(() => {
|
||
|
this.emit('data', payload.data);
|
||
|
this.emit('end');
|
||
|
}, 0);
|
||
|
|
||
|
break;
|
||
|
|
||
|
}
|
||
|
};
|
||
|
|
||
|
process.on('message', messageListener);
|
||
|
|
||
|
clientId++;
|
||
|
}
|
||
|
|
||
|
end() {}
|
||
|
|
||
|
setEncoding() {}
|
||
|
|
||
|
write(data) {
|
||
|
|
||
|
process.send({
|
||
|
type: 'client_send',
|
||
|
payload: {
|
||
|
clientId: this.id,
|
||
|
data,
|
||
|
to: this.to,
|
||
|
},
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function client(address) {
|
||
|
return new IpcClient(address);
|
||
|
}
|
||
|
|
||
|
class IpcSocket extends events.EventEmitter {
|
||
|
|
||
|
constructor(clientId, to) {
|
||
|
super();
|
||
|
|
||
|
return {
|
||
|
|
||
|
emit: this.emit,
|
||
|
|
||
|
end() {
|
||
|
this.emit('end');
|
||
|
},
|
||
|
|
||
|
on: this.on,
|
||
|
|
||
|
setEncoding() {},
|
||
|
|
||
|
write(data) {
|
||
|
|
||
|
process.send({
|
||
|
type: 'server_send',
|
||
|
payload: {
|
||
|
clientId,
|
||
|
data,
|
||
|
to,
|
||
|
},
|
||
|
});
|
||
|
},
|
||
|
};
|
||
|
}
|
||
|
}
|
||
|
|
||
|
class IpcServer extends events.EventEmitter {
|
||
|
|
||
|
constructor() {
|
||
|
super();
|
||
|
|
||
|
const connections = {};
|
||
|
|
||
|
return {
|
||
|
|
||
|
emit: this.emit,
|
||
|
|
||
|
listen() {
|
||
|
|
||
|
process.send({
|
||
|
type: 'listening',
|
||
|
});
|
||
|
|
||
|
setTimeout(() => {
|
||
|
this.emit('listening');
|
||
|
}, 0);
|
||
|
|
||
|
process.on('message', ({type, payload}) => {
|
||
|
|
||
|
switch (type) {
|
||
|
|
||
|
case 'connection_from':
|
||
|
|
||
|
const socket = new IpcSocket(payload.clientId, payload.from);
|
||
|
connections[payload.clientId] = socket;
|
||
|
this.emit('connection', socket);
|
||
|
|
||
|
socket.on('end', () => {
|
||
|
delete connections[payload.clientId];
|
||
|
});
|
||
|
|
||
|
break;
|
||
|
|
||
|
case 'server_recv':
|
||
|
|
||
|
if (connections[payload.clientId]) {
|
||
|
connections[payload.clientId].emit('data', payload.data);
|
||
|
}
|
||
|
|
||
|
break;
|
||
|
|
||
|
}
|
||
|
});
|
||
|
},
|
||
|
|
||
|
on: this.on,
|
||
|
|
||
|
setEncoding() {},
|
||
|
};
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function server() {
|
||
|
return new IpcServer();
|
||
|
}
|
||
|
|
||
|
const port = process.env.DISPATCHER_PORT || 43170;
|
||
|
|
||
|
function client$1(address) {
|
||
|
return net.createConnection(port, address);
|
||
|
}
|
||
|
|
||
|
function server$1() {
|
||
|
const server = net.createServer();
|
||
|
const originalListen = server.listen;
|
||
|
server.listen = () => {
|
||
|
return originalListen.call(server, port);
|
||
|
};
|
||
|
return server;
|
||
|
}
|
||
|
|
||
|
const debug = D('truss:comm:dispatcher');
|
||
|
|
||
|
class Dispatcher {
|
||
|
|
||
|
constructor(actions = (() => ({})), hooks = (() => ({}))) {
|
||
|
this.args = [];
|
||
|
this.lookup = {actions, hooks};
|
||
|
}
|
||
|
|
||
|
connect() {
|
||
|
|
||
|
const server$$1 = process.env.TRUSS_IPC ? server() : server$1();
|
||
|
|
||
|
server$$1.on('connection', (socket) => {
|
||
|
socket.on('error', (error) => { this.handleError(error); });
|
||
|
|
||
|
socket.setEncoding('utf8');
|
||
|
socket.on('data', (chunk) => {
|
||
|
|
||
|
let action = undefined;
|
||
|
try { action = unpack(chunk); }
|
||
|
catch (error) { return this.handleError(error); }
|
||
|
|
||
|
const actions = this.createActions();
|
||
|
if (!(action.type in actions)) {
|
||
|
debug(`discarding ${JSON.stringify(action, null, ' ')}!`);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
const debugAction = createDebugAction(debug, action);
|
||
|
if (debugAction) {
|
||
|
debug(`dispatching ${JSON.stringify(debugAction, null, ' ')}`);
|
||
|
}
|
||
|
const actionPromise = actions[action.type](action);
|
||
|
Promise.resolve(actionPromise).then((result) => {
|
||
|
if ('undefined' === typeof result) {
|
||
|
debug(`undefined result from ${action.type} not forwarded!`);
|
||
|
}
|
||
|
else {
|
||
|
socket.write(pack(result));
|
||
|
}
|
||
|
socket.end();
|
||
|
}).catch((error) => { this.handleError(error); });
|
||
|
});
|
||
|
});
|
||
|
|
||
|
server$$1.on('listening', () => {
|
||
|
debug(`dispatcher listening...`);
|
||
|
});
|
||
|
|
||
|
server$$1.listen();
|
||
|
|
||
|
return server$$1;
|
||
|
}
|
||
|
|
||
|
createActions() {
|
||
|
|
||
|
const actions = this.lookup.actions(...this.args);
|
||
|
const hooks = this.lookup.hooks(...this.args);
|
||
|
|
||
|
// builtin hook plumbing
|
||
|
actions['@truss/hook'] = (action) => {
|
||
|
if (!(action.payload.hook in hooks)) { return undefined; }
|
||
|
return hooks[action.payload.hook](...action.payload.args);
|
||
|
};
|
||
|
|
||
|
// builtin schema
|
||
|
const originalSchema = actions['@truss/schema'] || (() => ({}));
|
||
|
actions['@truss/schema'] = (action) => {
|
||
|
return {...originalSchema(action), hooks: Object.keys(hooks)};
|
||
|
};
|
||
|
|
||
|
return actions;
|
||
|
}
|
||
|
|
||
|
handleError(error) {
|
||
|
console.error(error);
|
||
|
}
|
||
|
|
||
|
lookupActions(lookup) { this.lookup.actions = lookup; }
|
||
|
|
||
|
lookupHooks(lookup) { this.lookup.hooks = lookup; }
|
||
|
|
||
|
setArgs(...args) { this.args = args; }
|
||
|
}
|
||
|
|
||
|
const debug$1 = D('truss:comm');
|
||
|
|
||
|
function createDispatcher(...args) {
|
||
|
|
||
|
if (0 === args.length) {
|
||
|
return new Dispatcher();
|
||
|
}
|
||
|
|
||
|
if ('function' !== typeof args[0]) {
|
||
|
const value = args[0];
|
||
|
args[0] = () => value;
|
||
|
}
|
||
|
|
||
|
if (1 === args.length) {
|
||
|
return new Dispatcher(args[0]);
|
||
|
}
|
||
|
|
||
|
if ('function' !== typeof args[1]) {
|
||
|
const value = args[1];
|
||
|
args[1] = () => value;
|
||
|
}
|
||
|
|
||
|
if (2 === args.length) {
|
||
|
return new Dispatcher(args[0], args[1]);
|
||
|
}
|
||
|
}
|
||
|
function invokeHook(hook, ...args) {
|
||
|
return sendActionToService({
|
||
|
type: '@truss/invoke', payload: {hook, args},
|
||
|
}, 'gateway');
|
||
|
}
|
||
|
function invokeHookFlat(hook, ...args) {
|
||
|
return sendActionToService({
|
||
|
type: '@truss/invoke-flat', payload: {hook, args},
|
||
|
}, 'gateway');
|
||
|
}
|
||
|
function invokeHookReduce(hook, rhs, ...args) {
|
||
|
return servicesImplementingHook(hook).then((services) => {
|
||
|
return services.reduce((promise, service) => {
|
||
|
return promise.then((rhs) => {
|
||
|
return invokeServiceHook(service, hook, rhs, ...args);
|
||
|
});
|
||
|
}, Promise.resolve(rhs));
|
||
|
});
|
||
|
}
|
||
|
function invokeServiceHook(service, hook, ...args) {
|
||
|
return sendActionToService({
|
||
|
type: '@truss/hook', payload: {hook, args},
|
||
|
}, service);
|
||
|
}
|
||
|
function sendActionToService(action, service) {
|
||
|
|
||
|
const createClient = process.env.TRUSS_IPC ? client : client$1;
|
||
|
const client$$1 = createClient(service);
|
||
|
client$$1.setEncoding('utf8');
|
||
|
|
||
|
client$$1.on('ready', writeClientPacket);
|
||
|
|
||
|
let response = '';
|
||
|
client$$1.on('data', (chunk) => {
|
||
|
response += chunk;
|
||
|
});
|
||
|
|
||
|
return new Promise((resolve, reject) => {
|
||
|
client$$1.on('error', (error) => {
|
||
|
// try try again...
|
||
|
if (-1 !== ['ECONNREFUSED', 'ENOTFOUND'].indexOf(error.code)) {
|
||
|
debug$1(`Can't connect to ${service}, retrying in 1 second...`);
|
||
|
return resolve(tryAgain());
|
||
|
}
|
||
|
reject(error);
|
||
|
});
|
||
|
client$$1.on('end', () => {
|
||
|
try {
|
||
|
resolve(response ? unpack(response) : undefined);
|
||
|
}
|
||
|
catch (error) {
|
||
|
reject(error);
|
||
|
}
|
||
|
});
|
||
|
});
|
||
|
|
||
|
function tryAgain() {
|
||
|
return new Promise((resolve) => {
|
||
|
setTimeout(() => {
|
||
|
resolve(sendActionToService(action, service));
|
||
|
}, 1000);
|
||
|
});
|
||
|
}
|
||
|
|
||
|
function writeClientPacket() {
|
||
|
const debugAction = createDebugAction(debug$1, action);
|
||
|
if (debugAction) {
|
||
|
debug$1(`sendActionToService(${
|
||
|
JSON.stringify(debugAction, null, ' ')
|
||
|
}, '${service}')`);
|
||
|
}
|
||
|
client$$1.write(pack(action));
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function servicesImplementingHook(hook) {
|
||
|
return sendActionToService({
|
||
|
type: '@truss/hook-services', payload: {hook},
|
||
|
}, 'gateway');
|
||
|
}
|
||
|
|
||
|
exports.createDispatcher = createDispatcher;
|
||
|
exports.invokeHook = invokeHook;
|
||
|
exports.invokeHookFlat = invokeHookFlat;
|
||
|
exports.invokeHookReduce = invokeHookReduce;
|
||
|
exports.invokeServiceHook = invokeServiceHook;
|
||
|
exports.sendActionToService = sendActionToService;
|
||
|
exports.servicesImplementingHook = servicesImplementingHook;
|
||
|
exports.createDebugAction = createDebugAction;
|