truss/comm/index.js
2019-03-05 19:21:09 -06:00

121 lines
2.9 KiB
JavaScript

import {createDebugAction} from './debug';
import {pack, unpack} from './packer';
import {Dispatcher} from './dispatcher';
import {client as createIpcClient} from './socket.ipc';
import {client as createNetClient} from './socket.net';
import D from 'debug';
const debug = D('truss:comm');
export function createDispatcher(...args) {
if (0 === args.length) {
return new Dispatcher();
}
if ('function' !== typeof args[0]) {
const value = args[0]
args[0] = () => value
}
if (1 === args.length) {
return new Dispatcher(args[0]);
}
if ('function' !== typeof args[1]) {
const value = args[1]
args[1] = () => value
}
if (2 === args.length) {
return new Dispatcher(args[0], args[1]);
}
};
export function invokeHook(hook, ...args) {
return sendActionToService({
type: '@truss/invoke', payload: {hook, args},
}, 'gateway');
};
export function invokeHookFlat(hook, ...args) {
return sendActionToService({
type: '@truss/invoke-flat', payload: {hook, args},
}, 'gateway');
};
export function invokeHookReduce(hook, rhs, ...args) {
return servicesImplementingHook(hook).then((services) => {
return services.reduce((promise, service) => {
return promise.then((rhs) => {
return invokeServiceHook(service, hook, rhs, ...args);
});
}, Promise.resolve(rhs));
});
};
export function invokeServiceHook(service, hook, ...args) {
return sendActionToService({
type: '@truss/hook', payload: {hook, args},
}, service);
};
export function sendActionToService(action, service) {
const createClient = process.env.TRUSS_IPC ? createIpcClient : createNetClient;
const client = createClient(service);
client.setEncoding('utf8');
client.on('ready', writeClientPacket);
let response = '';
client.on('data', (chunk) => {
response += chunk;
});
return new Promise((resolve, reject) => {
client.on('error', (error) => {
// try try again...
if (-1 !== ['ECONNREFUSED', 'ENOTFOUND'].indexOf(error.code)) {
debug(`Can't connect to ${service}, retrying in 1 second...`);
return resolve(tryAgain());
}
reject(error);
});
client.on('end', () => {
try {
resolve(response ? unpack(response) : undefined);
}
catch (error) {
reject(error);
}
});
});
function tryAgain() {
return new Promise((resolve) => {
setTimeout(() => {
resolve(sendActionToService(action, service));
}, 1000);
});
}
function writeClientPacket() {
const debugAction = createDebugAction(debug, action);
if (debugAction) {
debug(`sendActionToService(${
JSON.stringify(debugAction, null, ' ')
}, '${service}')`);
}
client.write(pack(action));
}
}
export function servicesImplementingHook(hook) {
return sendActionToService({
type: '@truss/hook-services', payload: {hook},
}, 'gateway');
};
export {createDebugAction};