feat: http user join/leave

This commit is contained in:
cha0s 2020-07-16 22:46:11 -05:00
parent 7780efa239
commit a2a485fbee
2 changed files with 50 additions and 40 deletions

View File

@ -1,47 +1,40 @@
import {joinChannel} from '~/common/channel';
import createRedisClient from './redis';
import createRedisClient, {keys} from './redis';
const redisClient = createRedisClient();
const enterChannel = async (req, channel) => {
const messages = await new Promise((resolve, reject) => {
redisClient.scan(
0,
'COUNT', 50,
'MATCH', `${channel}:*`,
(error, [, keys]) => (
const channelState = async (req, channel) => {
const messageKeys = await keys(redisClient, `${channel}:messages:*`);
const messages = 0 === messageKeys.length
? []
: await new Promise((resolve, reject) => {
redisClient.mget(messageKeys, (error, replies) => (
error
? reject(error)
: resolve(
0 === keys.length
? []
: new Promise((resolve, reject) => {
redisClient.mget(keys, (error, replies) => (
error
? reject(error)
: resolve(replies
.map((reply, i) => ({
...JSON.parse(reply),
uuid: keys[i].split(':')[1],
}))
.sort((l, r) => l.timestamp - r.timestamp))
));
}),
)
),
);
});
// const users = await new Promise((resolve, reject) => {
// req.adapter.remoteJoin(req.socketId, channel, (error) => {
// if (error) {
// reject(error);
// return;
// }
// });
// });
: resolve(replies
.map((reply, i) => ({
...JSON.parse(reply),
uuid: messageKeys[i].split(':')[2],
}))
.sort((l, r) => l.timestamp - r.timestamp))
));
});
const socketKeys = await keys(redisClient, `${channel}:users:*`);
const users = 0 === socketKeys.length
? []
: await new Promise((resolve, reject) => {
redisClient.mget(socketKeys, (error, replies) => (
error ? reject(error) : resolve(
Object
.keys(replies.reduce((r, k) => ({[k]: true, ...r}), {}))
.map((idStrings) => parseInt(idStrings, 10)),
)
));
});
return {
messages,
users,
};
};
@ -72,14 +65,14 @@ export const chatState = async (req) => {
recent: [],
};
const entries = await Promise.all(
toHydrate.map((favorite) => enterChannel(req, joinChannel(favorite))),
toHydrate.map((favorite) => channelState(req, joinChannel(favorite))),
);
for (let i = 0; i < toHydrate.length; i++) {
const channel = joinChannel(toHydrate[i]);
const {messages} = entries[i];
const {messages, users} = entries[i];
chat.channels[channel] = {
messages: messages.map((message) => message.uuid),
users: [],
users,
};
messages.forEach((message) => {
chat.messages[message.uuid] = message;

View File

@ -11,7 +11,7 @@ import Message from '~/common/packets/message.packet';
import {channelsToHydrate} from '~/server/entry';
import passport from './passport';
import createRedisClient from './redis';
import createRedisClient, {keys} from './redis';
import session from './session';
const pubClient = createRedisClient();
@ -46,6 +46,17 @@ export function createSocketServer(httpServer) {
);
next();
});
socketServer.io.use(async (socket, next) => {
await Promise.all(
channelsToHydrate(socket.handshake)
.map((channel) => new Promise((resolve, reject) => {
const key = `${joinChannel(channel)}:users:${socket.id}`;
const {user} = socket.handshake;
pubClient.set(key, user ? user.id : 0, (error) => (error ? reject(error) : resolve()));
})),
);
next();
});
socketServer.on('connect', (socket) => {
const {req} = socket;
socket.on('packet', (packet, fn) => {
@ -54,7 +65,7 @@ export function createSocketServer(httpServer) {
const owner = req.user ? req.user.id : 0;
const timestamp = Date.now();
const uuid = uuidv4();
const key = `${channel}:${uuid}`;
const key = `${channel}:messages:${uuid}`;
pubClient
.multi()
.set(key, JSON.stringify({
@ -66,6 +77,12 @@ export function createSocketServer(httpServer) {
.exec(() => fn([timestamp, uuid]));
}
});
socket.on('disconnect', async () => {
const socketKeys = await keys(pubClient, `*:users:${socket.id}`);
if (socketKeys.length > 0) {
pubClient.del(socketKeys);
}
});
});
return socketServer;
}