truss/comm/index.js
2018-12-24 13:21:31 -06:00

117 lines
2.7 KiB
JavaScript

const {createDebugAction} = require('./debug');
const {pack, unpack} = require('./packer');
const {Dispatcher} = require('./dispatcher');
const debug = require('debug')('truss:comm');
export function createDispatcher(...args) {
if (0 === args.length) {
return new Dispatcher();
}
if ('function' !== typeof args[0]) {
const value = args[0]
args[0] = () => value
}
if (1 === args.length) {
return new Dispatcher(args[0]);
}
if ('function' !== typeof args[1]) {
const value = args[1]
args[1] = () => value
}
if (2 === args.length) {
return new Dispatcher(args[0], args[1]);
}
};
export function invokeHook(hook, ...args) {
return sendActionToService({
type: 'truss/invoke', payload: {hook, args},
}, 'gateway');
};
export function invokeHookFlat(hook, ...args) {
return sendActionToService({
type: 'truss/invoke-flat', payload: {hook, args},
}, 'gateway');
};
export function invokeHookSerial(hook, rhs, ...args) {
return servicesImplementingHook(hook).then((services) => {
return services.reduce((promise, service) => {
return promise.then((rhs) => {
return invokeServiceHook(service, hook, rhs, ...args);
});
}, Promise.resolve(rhs));
});
};
export function invokeServiceHook(service, hook, ...args) {
return sendActionToService({
type: 'truss/hook', payload: {hook, args},
}, service);
};
export function sendActionToService(action, service) {
const client = require('./socket.ipc').client(service);
client.setEncoding('utf8');
client.on('ready', writeClientPacket);
let response = '';
client.on('data', (chunk) => {
response += chunk;
});
return new Promise((resolve, reject) => {
client.on('error', (error) => {
// try try again...
if (-1 !== ['ECONNREFUSED', 'ENOTFOUND'].indexOf(error.code)) {
debug(`Can't connect to ${service}, retrying in 1 second...`);
return resolve(tryAgain());
}
reject(error);
});
client.on('end', () => {
try {
resolve(response ? unpack(response) : undefined);
}
catch (error) {
reject(error);
}
});
});
function tryAgain() {
return new Promise((resolve) => {
setTimeout(() => {
resolve(sendActionToService(action, service));
}, 1000);
});
}
function writeClientPacket() {
const debugAction = createDebugAction(debug, action);
if (debugAction) {
debug(`sendActionToService(${
JSON.stringify(debugAction, null, ' ')
}, '${service}')`);
}
client.write(pack(action));
}
}
export function servicesImplementingHook(hook) {
return sendActionToService({
type: 'truss/hook-services', payload: {hook},
}, 'gateway');
};
export {createDebugAction};