feat: socket-worker

This commit is contained in:
cha0s 2019-04-25 00:09:03 -05:00
parent 655cfce7f9
commit 080fbb0f34
6 changed files with 275 additions and 53 deletions

View File

@ -0,0 +1,96 @@
import D from 'debug';
import io from 'socket.io-client';
import {compose} from '@avocado/core';
import {EventEmitter} from '@avocado/mixins';
import {allPackets, idFromPacket} from '../packet';
const debug = D('@avocado:client:socket');
const decorate = compose(
EventEmitter,
);
import Worker from 'worker-loader!./worker.js';
export class SocketClient extends decorate(class {}) {
constructor(address, options = {}) {
super();
this.address = address;
this.isConnected = false;
this.isReconnecting = false;
this.options = {
path: '/avocado',
perMessageDeflate: false,
// reconnection: false,
...options,
};
this.worker = new Worker();
this.worker.onmessage = (message) => this.onWorkerMessage(message);
if (false !== options.autoConnect) {
this.connect(this.address, this.options);
}
}
close() {
this.worker.postMessage({
type: 'close',
});
}
connect(address, options) {
this.worker.postMessage({
type: 'connect',
payload: {
address,
options,
},
});
for (const Packet of allPackets()) {
const id = idFromPacket(Packet);
this.on(`${id}`, (data) => {
this.emit('packet', new Packet(data));
});
}
}
disconnect() {
this.close();
}
on(...args) {
super.on(...args);
this.worker.postMessage({
type: 'on',
payload: args[0],
});
}
onWorkerMessage({data: action}) {
switch (action.type) {
case 'emit':
return this.onWorkerMessageEmit(action.payload);
case 'error':
return this.onWorkerMessageError(action.payload);
}
}
onWorkerMessageEmit(args) {
this.emit(...args);
}
onWorkerMessageError({message}) {
throw new Error(message);
}
send(packet) {
const id = idFromPacket(packet.constructor);
this.worker.postMessage({
type: 'emit',
payload: [id, packet.data],
});
}
}

View File

@ -58,8 +58,8 @@ export class SocketClient extends decorate(class {}) {
});
for (const Packet of allPackets()) {
const id = idFromPacket(Packet);
this.socket.on(id, (packet) => {
this.emit('packet', packet);
this.socket.on(id, (data) => {
this.emit('packet', new Packet(data));
});
}
}

View File

@ -0,0 +1,140 @@
import io from 'socket.io-client';
import 'register-packets';
import {SocketIoParser} from '../packet';
let socket = null;
function onMessageConnect({address, options}) {
socket = io(address, {
parser: SocketIoParser,
...options,
});
const onSocketConnect = () => {
postMessage({
type: 'emit',
payload: ['connect'],
});
}
const onSocketConnectError = (error) => {
postMessage({
type: 'emit',
payload: ['connect_error', error.toString()],
});
}
const onSocketConnectTimeout = (timeout) => {
postMessage({
type: 'emit',
payload: ['connect_timeout', timeout],
});
}
const onSocketDisconnect = (reason) => {
postMessage({
type: 'emit',
payload: ['disconnect', reason],
});
}
const onSocketError = (error) => {
postMessage({
type: 'emit',
payload: ['error', error.toString()],
});
}
const onSocketReconnect = (attempt) => {
postMessage({
type: 'emit',
payload: ['reconnect', attempt],
});
}
const onSocketReconnectAttempt = (attempt) => {
postMessage({
type: 'emit',
payload: ['reconnect_attempt', attempt],
});
}
const onSocketReconnecting = (attempt) => {
postMessage({
type: 'emit',
payload: ['reconnecting', attempt],
});
}
const onSocketReconnectError = (error) => {
postMessage({
type: 'emit',
payload: ['reconnect_error', error.toString()],
});
}
const onSocketReconnectFailed = () => {
postMessage({
type: 'emit',
payload: ['reconnect_failed'],
});
}
const onSocketPing = () => {
postMessage({
type: 'emit',
payload: ['ping'],
});
}
const onSocketPong = (latency) => {
postMessage({
type: 'emit',
payload: ['pong', latency],
});
}
socket.on('connect', onSocketConnect);
socket.on('connect_error', onSocketConnectError);
socket.on('connect_timeout', onSocketConnectTimeout);
socket.on('disconnect', onSocketDisconnect);
socket.on('error', onSocketError);
socket.on('reconnect', onSocketReconnect);
socket.on('reconnect_attempt', onSocketReconnectAttempt);
socket.on('reconnecting', onSocketReconnecting);
socket.on('reconnect_error', onSocketReconnectError);
socket.on('reconnect_failed', onSocketReconnectFailed);
socket.on('ping', onSocketPing);
socket.on('pong', onSocketPong);
}
function onMessageClose() {
if (!socket) {
return;
}
socket.close();
socket = undefined;
}
function onMessageEmit(args) {
if (!socket) {
return;
}
socket.emit(...args);
}
function onMessageOn(type) {
if (!socket) {
return;
}
if (socket.listeners(type).length !== 0) {
return;
}
socket.on(type, function(...args) {
postMessage({
type: 'emit',
payload: [type].concat(args),
});
});
}
self.onmessage = function({data: action}) {
switch(action.type) {
case 'connect':
return onMessageConnect(action.payload);
case 'emit':
return onMessageEmit(action.payload);
case 'on':
return onMessageOn(action.payload);
case 'close':
return onMessageClose();
}
}

View File

@ -1,6 +1,6 @@
{
"name": "@avocado/net",
"version": "1.0.0",
"version": "1.0.1",
"main": "index.js",
"author": "cha0s",
"license": "MIT",
@ -10,6 +10,7 @@
"debug": "3.1.0",
"schemapack": "1.4.2",
"socket.io": "2.2.0",
"socket.io-client": "2.2.0"
"socket.io-client": "2.2.0",
"worker-loader": "2.0.0"
}
}

View File

@ -1,15 +1,36 @@
import schemapack from 'schemapack';
export class Packet {
constructor(data) {
this.data = data;
}
static get builder() {
if (!this._builder) {
this._builder = schemapack.build(this.schema);
}
return this._builder;
}
static pack(packet) {
return this.builder.encode({
_id: packet.data[0],
data: packet.data[1],
})
}
static get schema() {
return {
_id: 'uint8',
};
}
static unpack(packet) {
const {data} = this.builder.decode(packet);
return data;
}
}
export {

View File

@ -1,9 +1,7 @@
import schemapack from 'schemapack';
import {compose} from '@avocado/core';
import {EventEmitter} from '@avocado/mixins';
import {allPackets, idFromPacket, packetFromId} from './registry';
import {packetFromId} from './registry';
/**
* Packet types (see https://github.com/socketio/socket.io-protocol)
@ -14,23 +12,6 @@ const TYPES = {
BINARY_EVENT: 5,
};
const errorPacket = {
type: TYPES.ERROR,
data: 'parser error',
};
let schemas = undefined;
function schemaFromId(id) {
if (!schemas) {
schemas = {};
for (const Packet of allPackets()) {
const id_ = idFromPacket(Packet);
schemas[id_] = schemapack.build(Packet.schema);
}
}
return schemas[id];
}
class Encoder {
encode(packet, callback) {
@ -44,15 +25,9 @@ class Encoder {
}
pack(packet) {
const eventId = packet.data[0];
const schema = schemaFromId(eventId);
if (!schema) {
throw new Error('unknown schema with id: ' + eventId);
}
return schema.encode({
_id: eventId,
data: packet.data[1],
});
const packetId = packet.data[0];
const Packet = packetFromId(packetId);
return Packet.pack(packet);
}
}
@ -74,27 +49,16 @@ class Decoder extends decorateDecoder(class {}) {
destroy() {}
parseBinary(obj) {
const view = new Uint8Array(obj);
parseBinary(packet) {
const view = new Uint8Array(packet);
const packetId = view[0];
let packet;
try {
const schema = schemaFromId(packetId);
if (!schema) {
throw new Error(`unknown schema with id: ${packetId}`);
}
const {data} = schema.decode(obj);
const Packet = packetFromId(packetId);
packet = {
type: TYPES.EVENT,
data: [packetId, new Packet(data)],
nsp: '/',
};
}
catch (e) {
packet = errorPacket;
}
this.emit('decoded', packet);
const Packet = packetFromId(packetId);
const data = Packet.unpack(packet);
this.emit('decoded', {
type: TYPES.EVENT,
data: [packetId, data],
nsp: '/',
});
}
parseJSON(obj) {