From 00cf0979bb075c5fb324fec04515b40c68d37534 Mon Sep 17 00:00:00 2001 From: cha0s Date: Fri, 17 Jul 2020 01:27:48 -0500 Subject: [PATCH] feat: http join/leave --- src/client/dispatcher.jsx | 10 +++++++++- src/common/packets/join.packet.js | 5 ++++- src/common/packets/leave.packet.js | 5 ++++- src/common/state/chat.js | 3 +-- src/server/entry.js | 13 +++++++++---- src/server/sockets.js | 29 +++++++++++++++++++++++------ 6 files changed, 50 insertions(+), 15 deletions(-) diff --git a/src/client/dispatcher.jsx b/src/client/dispatcher.jsx index 97c31e2..7272c03 100644 --- a/src/client/dispatcher.jsx +++ b/src/client/dispatcher.jsx @@ -2,8 +2,10 @@ import PropTypes from 'prop-types'; import React from 'react'; import {useDispatch} from 'react-redux'; +import Join from '~/common/packets/join.packet'; +import Leave from '~/common/packets/leave.packet'; import Message from '~/common/packets/message.packet'; -import {addMessage} from '~/common/state/chat'; +import {addMessage, joined, left} from '~/common/state/chat'; import useSocket from './hooks/useSocket'; @@ -14,6 +16,12 @@ export default function Dispatcher({children}) { if (packet instanceof Message) { dispatch(addMessage(packet.data)); } + if (packet instanceof Join) { + dispatch(joined(packet.data)); + } + if (packet instanceof Leave) { + dispatch(left(packet.data)); + } }; socket.on('packet', onPacket); socket.on('disconnect', () => socket.off('packet', onPacket)); diff --git a/src/common/packets/join.packet.js b/src/common/packets/join.packet.js index ba62595..6afa6ad 100644 --- a/src/common/packets/join.packet.js +++ b/src/common/packets/join.packet.js @@ -5,7 +5,10 @@ export default class Join extends Packet { static get schema() { return { ...super.schema, - data: 'string', + data: { + id: 'uint32', + channel: 'string', + }, }; } diff --git a/src/common/packets/leave.packet.js b/src/common/packets/leave.packet.js index d1370d2..b70d812 100644 --- a/src/common/packets/leave.packet.js +++ b/src/common/packets/leave.packet.js @@ -5,7 +5,10 @@ export default class Leave extends Packet { static get schema() { return { ...super.schema, - data: 'string', + data: { + id: 'uint32', + channel: 'string', + }, }; } diff --git a/src/common/state/chat.js b/src/common/state/chat.js index b0b81ca..1fb9c93 100644 --- a/src/common/state/chat.js +++ b/src/common/state/chat.js @@ -61,9 +61,8 @@ const slice = createSlice({ focus: ({unread}, {payload: {channel}}) => { unread[channel] = 0; }, - join: ({channels, unread}, {payload: {channel, messages, users}}) => { + join: ({channels}, {payload: {channel, messages, users}}) => { channels[channel] = {messages, users}; - unread[channel] = 0; }, joined: ({channels}, {payload: {channel, id}}) => { channels[channel].users.push(id); diff --git a/src/server/entry.js b/src/server/entry.js index c075079..814ffc1 100644 --- a/src/server/entry.js +++ b/src/server/entry.js @@ -8,14 +8,17 @@ import createRedisClient, {keys} from './redis'; const redisClient = createRedisClient(); const mget = promisify(redisClient.mget.bind(redisClient)); -export const channelUsers = async (channel) => { +export const channelUserCounts = async (channel) => { const socketKeys = await keys(redisClient, `${channel}:users:*`); return 0 === socketKeys.length ? [] - : Object.keys((await mget(socketKeys)).reduce((r, k) => ({[k]: true, ...r}), {})) - .map((idStrings) => parseInt(idStrings, 10)); + : (await mget(...socketKeys)).reduce((r, k) => ({...r, [k]: 1 + (r[k] || 0)}), {}); }; +export const channelUsers = async (channel) => ( + Object.keys(await channelUserCounts(channel)).map((id) => parseInt(id, 10)) +); + const channelState = async (req, channel) => { const messageKeys = await keys(redisClient, `${channel}:messages:*`); const messages = 0 === messageKeys.length @@ -26,9 +29,11 @@ const channelState = async (req, channel) => { uuid: messageKeys[i].split(':')[2], })) .sort((l, r) => l.timestamp - r.timestamp); + const userId = req.user ? req.user.id : 0; + const users = await channelUsers(channel); return { messages, - users: channelUsers(channel), + users: -1 !== users.indexOf(userId) ? users : users.concat([userId]), }; }; diff --git a/src/server/sockets.js b/src/server/sockets.js index acdcc5d..82ea20e 100644 --- a/src/server/sockets.js +++ b/src/server/sockets.js @@ -8,9 +8,11 @@ import {SocketServer} from '@avocado/net/server/socket'; import socketSession from 'express-socket.io-session'; import {joinChannel, parseChannel} from '~/common/channel'; +import Join from '~/common/packets/join.packet'; +import Leave from '~/common/packets/leave.packet'; import Message from '~/common/packets/message.packet'; -import {channelsToHydrate} from '~/server/entry'; +import {channelsToHydrate, channelUserCounts, channelUsers} from '~/server/entry'; import passport from './passport'; import createRedisClient, {keys} from './redis'; @@ -19,6 +21,7 @@ import session from './session'; const pubClient = createRedisClient(); const subClient = createRedisClient(); const adapter = redisAdapter({pubClient, subClient}); +const del = promisify(pubClient.del.bind(pubClient)); const set = promisify(pubClient.set.bind(pubClient)); export function createSocketServer(httpServer) { @@ -38,14 +41,19 @@ export function createSocketServer(httpServer) { next(); }); socketServer.io.use(async (socket, next) => { + const {user} = socket.handshake; const join = promisify(socket.join.bind(socket)); await Promise.all( channelsToHydrate(socket.handshake) + .map((channel) => joinChannel(channel)) .map(async (channel) => { - const channelString = joinChannel(channel); - await join(channelString); - const {user} = socket.handshake; - await set(`${channelString}:users:${socket.id}`, user ? user.id : 0); + await join(channel); + const users = await channelUsers(channel); + const id = user ? user.id : 0; + if (-1 === users.indexOf(id)) { + socketServer.send(new Join({channel, id}), channel); + } + await set(`${channel}:users:${socket.id}`, user ? user.id : 0); }), ); next(); @@ -78,8 +86,17 @@ export function createSocketServer(httpServer) { }); socket.on('disconnect', async () => { const socketKeys = await keys(pubClient, `*:users:${socket.id}`); + const {user} = req; if (socketKeys.length > 0) { - pubClient.del(socketKeys); + const channels = socketKeys.map((key) => key.split(':')[0]); + await Promise.all(channels.map(async (channel) => { + const userCounts = await channelUserCounts(channel); + const id = user ? user.id : 0; + if (1 === userCounts[id]) { + socketServer.send(new Leave({channel, id}), channel); + } + })); + await del(socketKeys); } }); });