refactor: promisification
This commit is contained in:
parent
a2a485fbee
commit
3a86b1dd0e
|
@ -1,37 +1,27 @@
|
||||||
|
/* eslint-disable import/no-extraneous-dependencies */
|
||||||
|
import {promisify} from 'util';
|
||||||
|
|
||||||
import {joinChannel} from '~/common/channel';
|
import {joinChannel} from '~/common/channel';
|
||||||
|
|
||||||
import createRedisClient, {keys} from './redis';
|
import createRedisClient, {keys} from './redis';
|
||||||
|
|
||||||
const redisClient = createRedisClient();
|
const redisClient = createRedisClient();
|
||||||
|
const mget = promisify(redisClient.mget.bind(redisClient));
|
||||||
const channelState = async (req, channel) => {
|
const channelState = async (req, channel) => {
|
||||||
const messageKeys = await keys(redisClient, `${channel}:messages:*`);
|
const messageKeys = await keys(redisClient, `${channel}:messages:*`);
|
||||||
const messages = 0 === messageKeys.length
|
const messages = 0 === messageKeys.length
|
||||||
? []
|
? []
|
||||||
: await new Promise((resolve, reject) => {
|
: (await mget(messageKeys))
|
||||||
redisClient.mget(messageKeys, (error, replies) => (
|
.map((reply, i) => ({
|
||||||
error
|
...JSON.parse(reply),
|
||||||
? reject(error)
|
uuid: messageKeys[i].split(':')[2],
|
||||||
: resolve(replies
|
}))
|
||||||
.map((reply, i) => ({
|
.sort((l, r) => l.timestamp - r.timestamp);
|
||||||
...JSON.parse(reply),
|
|
||||||
uuid: messageKeys[i].split(':')[2],
|
|
||||||
}))
|
|
||||||
.sort((l, r) => l.timestamp - r.timestamp))
|
|
||||||
));
|
|
||||||
});
|
|
||||||
const socketKeys = await keys(redisClient, `${channel}:users:*`);
|
const socketKeys = await keys(redisClient, `${channel}:users:*`);
|
||||||
const users = 0 === socketKeys.length
|
const users = 0 === socketKeys.length
|
||||||
? []
|
? []
|
||||||
: await new Promise((resolve, reject) => {
|
: Object.keys((await mget(socketKeys)).reduce((r, k) => ({[k]: true, ...r}), {}))
|
||||||
redisClient.mget(socketKeys, (error, replies) => (
|
.map((idStrings) => parseInt(idStrings, 10));
|
||||||
error ? reject(error) : resolve(
|
|
||||||
Object
|
|
||||||
.keys(replies.reduce((r, k) => ({[k]: true, ...r}), {}))
|
|
||||||
.map((idStrings) => parseInt(idStrings, 10)),
|
|
||||||
)
|
|
||||||
));
|
|
||||||
});
|
|
||||||
return {
|
return {
|
||||||
messages,
|
messages,
|
||||||
users,
|
users,
|
||||||
|
|
|
@ -1,4 +1,6 @@
|
||||||
/* eslint-disable import/no-extraneous-dependencies */
|
/* eslint-disable import/no-extraneous-dependencies */
|
||||||
|
import {promisify} from 'util';
|
||||||
|
|
||||||
import redisAdapter from 'socket.io-redis';
|
import redisAdapter from 'socket.io-redis';
|
||||||
import {v4 as uuidv4} from 'uuid';
|
import {v4 as uuidv4} from 'uuid';
|
||||||
|
|
||||||
|
@ -17,6 +19,7 @@ import session from './session';
|
||||||
const pubClient = createRedisClient();
|
const pubClient = createRedisClient();
|
||||||
const subClient = createRedisClient();
|
const subClient = createRedisClient();
|
||||||
const adapter = redisAdapter({pubClient, subClient});
|
const adapter = redisAdapter({pubClient, subClient});
|
||||||
|
const set = promisify(pubClient.set.bind(pubClient));
|
||||||
|
|
||||||
export function createSocketServer(httpServer) {
|
export function createSocketServer(httpServer) {
|
||||||
const socketServer = new SocketServer(httpServer, {
|
const socketServer = new SocketServer(httpServer, {
|
||||||
|
@ -35,25 +38,15 @@ export function createSocketServer(httpServer) {
|
||||||
next();
|
next();
|
||||||
});
|
});
|
||||||
socketServer.io.use(async (socket, next) => {
|
socketServer.io.use(async (socket, next) => {
|
||||||
|
const join = promisify(socket.join.bind(socket));
|
||||||
await Promise.all(
|
await Promise.all(
|
||||||
channelsToHydrate(socket.handshake)
|
channelsToHydrate(socket.handshake)
|
||||||
.map((channel) => new Promise((resolve, reject) => {
|
.map(async (channel) => {
|
||||||
socket.join(
|
const channelString = joinChannel(channel);
|
||||||
joinChannel(channel),
|
await join(channelString);
|
||||||
(error) => (error ? reject(error) : resolve()),
|
|
||||||
);
|
|
||||||
})),
|
|
||||||
);
|
|
||||||
next();
|
|
||||||
});
|
|
||||||
socketServer.io.use(async (socket, next) => {
|
|
||||||
await Promise.all(
|
|
||||||
channelsToHydrate(socket.handshake)
|
|
||||||
.map((channel) => new Promise((resolve, reject) => {
|
|
||||||
const key = `${joinChannel(channel)}:users:${socket.id}`;
|
|
||||||
const {user} = socket.handshake;
|
const {user} = socket.handshake;
|
||||||
pubClient.set(key, user ? user.id : 0, (error) => (error ? reject(error) : resolve()));
|
await set(`${channelString}:users:${socket.id}`, user ? user.id : 0);
|
||||||
})),
|
}),
|
||||||
);
|
);
|
||||||
next();
|
next();
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue
Block a user