feat: http join/leave

This commit is contained in:
cha0s 2020-07-17 01:27:48 -05:00
parent 605150e984
commit 00cf0979bb
6 changed files with 50 additions and 15 deletions

View File

@ -2,8 +2,10 @@ import PropTypes from 'prop-types';
import React from 'react';
import {useDispatch} from 'react-redux';
import Join from '~/common/packets/join.packet';
import Leave from '~/common/packets/leave.packet';
import Message from '~/common/packets/message.packet';
import {addMessage} from '~/common/state/chat';
import {addMessage, joined, left} from '~/common/state/chat';
import useSocket from './hooks/useSocket';
@ -14,6 +16,12 @@ export default function Dispatcher({children}) {
if (packet instanceof Message) {
dispatch(addMessage(packet.data));
}
if (packet instanceof Join) {
dispatch(joined(packet.data));
}
if (packet instanceof Leave) {
dispatch(left(packet.data));
}
};
socket.on('packet', onPacket);
socket.on('disconnect', () => socket.off('packet', onPacket));

View File

@ -5,7 +5,10 @@ export default class Join extends Packet {
static get schema() {
return {
...super.schema,
data: 'string',
data: {
id: 'uint32',
channel: 'string',
},
};
}

View File

@ -5,7 +5,10 @@ export default class Leave extends Packet {
static get schema() {
return {
...super.schema,
data: 'string',
data: {
id: 'uint32',
channel: 'string',
},
};
}

View File

@ -61,9 +61,8 @@ const slice = createSlice({
focus: ({unread}, {payload: {channel}}) => {
unread[channel] = 0;
},
join: ({channels, unread}, {payload: {channel, messages, users}}) => {
join: ({channels}, {payload: {channel, messages, users}}) => {
channels[channel] = {messages, users};
unread[channel] = 0;
},
joined: ({channels}, {payload: {channel, id}}) => {
channels[channel].users.push(id);

View File

@ -8,14 +8,17 @@ import createRedisClient, {keys} from './redis';
const redisClient = createRedisClient();
const mget = promisify(redisClient.mget.bind(redisClient));
export const channelUsers = async (channel) => {
export const channelUserCounts = async (channel) => {
const socketKeys = await keys(redisClient, `${channel}:users:*`);
return 0 === socketKeys.length
? []
: Object.keys((await mget(socketKeys)).reduce((r, k) => ({[k]: true, ...r}), {}))
.map((idStrings) => parseInt(idStrings, 10));
: (await mget(...socketKeys)).reduce((r, k) => ({...r, [k]: 1 + (r[k] || 0)}), {});
};
export const channelUsers = async (channel) => (
Object.keys(await channelUserCounts(channel)).map((id) => parseInt(id, 10))
);
const channelState = async (req, channel) => {
const messageKeys = await keys(redisClient, `${channel}:messages:*`);
const messages = 0 === messageKeys.length
@ -26,9 +29,11 @@ const channelState = async (req, channel) => {
uuid: messageKeys[i].split(':')[2],
}))
.sort((l, r) => l.timestamp - r.timestamp);
const userId = req.user ? req.user.id : 0;
const users = await channelUsers(channel);
return {
messages,
users: channelUsers(channel),
users: -1 !== users.indexOf(userId) ? users : users.concat([userId]),
};
};

View File

@ -8,9 +8,11 @@ import {SocketServer} from '@avocado/net/server/socket';
import socketSession from 'express-socket.io-session';
import {joinChannel, parseChannel} from '~/common/channel';
import Join from '~/common/packets/join.packet';
import Leave from '~/common/packets/leave.packet';
import Message from '~/common/packets/message.packet';
import {channelsToHydrate} from '~/server/entry';
import {channelsToHydrate, channelUserCounts, channelUsers} from '~/server/entry';
import passport from './passport';
import createRedisClient, {keys} from './redis';
@ -19,6 +21,7 @@ import session from './session';
const pubClient = createRedisClient();
const subClient = createRedisClient();
const adapter = redisAdapter({pubClient, subClient});
const del = promisify(pubClient.del.bind(pubClient));
const set = promisify(pubClient.set.bind(pubClient));
export function createSocketServer(httpServer) {
@ -38,14 +41,19 @@ export function createSocketServer(httpServer) {
next();
});
socketServer.io.use(async (socket, next) => {
const {user} = socket.handshake;
const join = promisify(socket.join.bind(socket));
await Promise.all(
channelsToHydrate(socket.handshake)
.map((channel) => joinChannel(channel))
.map(async (channel) => {
const channelString = joinChannel(channel);
await join(channelString);
const {user} = socket.handshake;
await set(`${channelString}:users:${socket.id}`, user ? user.id : 0);
await join(channel);
const users = await channelUsers(channel);
const id = user ? user.id : 0;
if (-1 === users.indexOf(id)) {
socketServer.send(new Join({channel, id}), channel);
}
await set(`${channel}:users:${socket.id}`, user ? user.id : 0);
}),
);
next();
@ -78,8 +86,17 @@ export function createSocketServer(httpServer) {
});
socket.on('disconnect', async () => {
const socketKeys = await keys(pubClient, `*:users:${socket.id}`);
const {user} = req;
if (socketKeys.length > 0) {
pubClient.del(socketKeys);
const channels = socketKeys.map((key) => key.split(':')[0]);
await Promise.all(channels.map(async (channel) => {
const userCounts = await channelUserCounts(channel);
const id = user ? user.id : 0;
if (1 === userCounts[id]) {
socketServer.send(new Leave({channel, id}), channel);
}
}));
await del(socketKeys);
}
});
});