refactor: packet handling

This commit is contained in:
cha0s 2020-07-21 23:01:24 -05:00
parent d085339567
commit 72f3d9ec9b
12 changed files with 311 additions and 197 deletions

View File

@ -0,0 +1,14 @@
import AddFavorite from '~/common/packets/add-favorite.packet';
import {allModels} from '~/server/models/registrar';
export default {
Packet: AddFavorite,
validator: () => true,
responder: async (packet, socket, fn) => {
const {req} = socket;
await req.user.createFavorite({channel: packet.data});
socket.to(`/user/${req.userId}`, packet);
fn();
},
};

View File

@ -0,0 +1,38 @@
import AddFriend from '~/common/packets/add-friend.packet';
import {allModels} from '~/server/models/registrar';
export default {
Packet: AddFriend,
validator: () => true,
responder: async (packet, socket, fn) => {
const {req} = socket;
const {
Friendship,
User,
} = allModels();
const adderId = req.user.id;
const user = await User.findOne({where: {redditUsername: packet.data.nameOrStatus}});
if (!user) {
return;
}
const addeeId = user.id;
let friendship = await Friendship.findOne({where: {adderId: addeeId, addeeId: adderId}});
const friendshipIsActive = !!friendship;
if (friendship) {
friendship.status = 'active';
await friendship.save();
}
else {
friendship = await Friendship.create({adderId, addeeId});
}
[addeeId, adderId].forEach((id) => {
socket.to(`/user/${id}`, new AddFriend({
addeeId: friendshipIsActive ? adderId : addeeId,
adderId: friendshipIsActive ? addeeId : adderId,
nameOrStatus: friendship.status,
}));
});
fn({id: addeeId});
},
};

View File

@ -0,0 +1,34 @@
import {Op} from 'sequelize';
import Block from '~/common/packets/block.packet';
import RemoveFriend from '~/common/packets/remove-friend.packet';
import {allModels} from '~/server/models/registrar';
import {removeFavoritedUser} from './remove-favorite';
export default {
Packet: Block,
validator: () => true,
responder: async (packet, socket, fn) => {
const {req} = socket;
const id = packet.data;
const {Friendship, User} = allModels();
await req.user.createBlock({blocked: id});
await Friendship.destroy({
where: {
[Op.or]: [
{[Op.and]: [{addeeId: req.userId}, {adderId: id}]},
{[Op.and]: [{addeeId: id}, {adderId: req.userId}]},
],
},
});
const user = await User.findByPk(id);
removeFavoritedUser(socket, user, req.user);
removeFavoritedUser(socket, req.user, user);
socket.to(`/user/${req.userId}`, packet);
socket.to(`/user/${id}`, new RemoveFriend(req.userId));
socket.to(`/user/${req.userId}`, new RemoveFriend(id));
fn();
},
};

View File

@ -0,0 +1,10 @@
export {default as AddFavorite} from './add-favorite';
export {default as AddFriend} from './add-friend';
export {default as Block} from './block';
export {default as Join} from './join';
export {default as Leave} from './leave';
export {default as Message} from './message';
export {default as RemoveFavorite} from './remove-favorite';
export {default as RemoveFriend} from './remove-friend';
export {default as Unblock} from './unblock';
export {default as Usernames} from './usernames';

31
src/server/packet/join.js Normal file
View File

@ -0,0 +1,31 @@
// eslint-disable-next-line import/no-extraneous-dependencies
import {promisify} from 'util';
import {ServerSocket} from '@avocado/net/server/socket';
import Join from '~/common/packets/join.packet';
import {
channelState,
channelUsers,
} from '~/server/entry';
export const userJoin = async (channel, socket) => {
const userId = '/r/anonymous' === channel ? 0 : socket.handshake.userId;
const users = await channelUsers(socket.handshake, channel);
if (-1 === users.indexOf(userId)) {
ServerSocket.send(socket.to(channel), new Join({channel, id: userId}));
}
await promisify(socket.join.bind(socket))(channel);
};
export default {
Packet: Join,
validator: () => true,
responder: async (packet, socket, fn) => {
const {req} = socket;
const {channel} = packet.data;
await userJoin(channel, socket.socket);
fn(await channelState(req, channel));
},
};

View File

@ -0,0 +1,25 @@
// eslint-disable-next-line import/no-extraneous-dependencies
import {promisify} from 'util';
import Leave from '~/common/packets/leave.packet';
import {channelUserCounts} from '~/server/entry';
export const userLeave = async (channel, socket) => {
const userId = '/r/anonymous' === channel ? 0 : socket.req.userId;
await promisify(socket.leave.bind(socket))(channel);
const userCounts = await channelUserCounts(socket.req, channel);
if (!userCounts[userId]) {
socket.to(channel, new Leave({channel, id: userId}));
}
};
export default {
Packet: Leave,
validator: () => true,
responder: async (packet, socket, fn) => {
const {channel} = packet.data;
await userLeave(channel, socket);
fn();
},
};

View File

@ -0,0 +1,48 @@
import {v4 as uuidv4} from 'uuid';
import {parseChannel} from '~/common/channel';
import Message from '~/common/packets/message.packet';
import {allModels} from '~/server/models/registrar';
export default {
Packet: Message,
validator: () => true,
responder: async (packet, socket, fn) => {
const {req} = socket;
const {pubClient} = req.adapter;
const {User} = allModels();
const {channel, message} = packet.data;
const {name, type} = parseChannel(`/chat${channel}`);
const other = await User.findOne({where: {redditUsername: name}});
const owner = '/r/anonymous' === channel ? 0 : req.userId;
const serverChannel = 'r' === type
? channel
: `/u/${[name, req.user.redditUsername].sort().join('$')}`;
const timestamp = Date.now();
const username = '/r/anonymous' === channel ? 'anonymous' : req.user.redditUsername;
const uuid = uuidv4();
const key = `${serverChannel}:messages:${uuid}`;
('u' === type ? [`/user/${other.id}`, `/user/${req.userId}`] : [channel]).forEach((room) => (
socket.to(room, new Message({
...packet.data,
channel: 'r' === type
? channel
: `/u/${username === room.substr(3) ? name : username}`,
owner,
timestamp,
uuid,
}))
));
pubClient
.multi()
.set(key, JSON.stringify({
message,
owner,
socket: socket.id,
timestamp,
}))
.expire(key, '/r/anonymous' === channel ? 60 : 600)
.exec(() => fn([timestamp, uuid]));
},
};

View File

@ -0,0 +1,21 @@
import RemoveFavorite from '~/common/packets/remove-favorite.packet';
import {allModels} from '~/server/models/registrar';
export default {
Packet: RemoveFavorite,
validator: () => true,
responder: async (packet, socket, fn) => {
const {req} = socket;
const {Favorite} = allModels();
const favorites = await req.user.getFavorites();
const toRemove = favorites.find(({channel}) => channel === packet.data);
await Favorite.destroy({
where: {
id: toRemove.id,
},
});
socket.to(`/user/${req.userId}`, packet);
fn();
},
};

View File

@ -0,0 +1,44 @@
import {Op} from 'sequelize';
import RemoveFavorite from '~/common/packets/remove-favorite.packet';
import RemoveFriend from '~/common/packets/remove-friend.packet';
import {allModels} from '~/server/models/registrar';
export const removeFavoritedUser = async (socket, user, other) => {
const {Favorite} = allModels();
const favorites = await user.getFavorites();
const toRemove = favorites.find(({channel}) => channel === `/u/${other.redditUsername}`);
if (toRemove) {
await Favorite.destroy({
where: {
id: toRemove.id,
},
});
socket.to(`/user/${user.id}`, new RemoveFavorite(`/u/${other.redditUsername}`));
}
};
export default {
Packet: RemoveFriend,
validator: () => true,
responder: async (packet, socket, fn) => {
const {req} = socket;
const id = packet.data;
const {Friendship, User} = allModels();
await Friendship.destroy({
where: {
[Op.or]: [
{[Op.and]: [{addeeId: req.userId}, {adderId: id}]},
{[Op.and]: [{addeeId: id}, {adderId: req.userId}]},
],
},
});
socket.to(`/user/${id}`, new RemoveFriend(req.userId));
socket.to(`/user/${req.userId}`, new RemoveFriend(id));
const user = await User.findByPk(id);
removeFavoritedUser(socket, user, req.user);
removeFavoritedUser(socket, req.user, user);
fn();
},
};

View File

@ -0,0 +1,20 @@
import Unblock from '~/common/packets/unblock.packet';
import {allModels} from '~/server/models/registrar';
export default {
Packet: Unblock,
validator: () => true,
responder: async (packet, socket, fn) => {
const {req} = socket;
const {Block: BlockModel} = allModels();
await BlockModel.destroy({
where: {
blocked: packet.data,
user_id: req.userId,
},
});
socket.to(`/user/${req.userId}`, packet);
fn();
},
};

View File

@ -0,0 +1,14 @@
import Usernames from '~/common/packets/usernames.packet';
import {allModels} from '~/server/models/registrar';
export default {
Packet: Usernames,
validator: () => true,
responder: async (packet, socket, fn) => {
const {User} = allModels();
fn(await Promise.all(packet.data.map(
async (id) => (await User.findByPk(id)).redditUsername,
)));
},
};

View File

@ -1,32 +1,14 @@
/* eslint-disable import/no-extraneous-dependencies */ import {SocketServer} from '@avocado/net/server/socket';
import {promisify} from 'util';
import {ServerSocket, SocketServer} from '@avocado/net/server/socket';
import socketSession from 'express-socket.io-session'; import socketSession from 'express-socket.io-session';
import {Op} from 'sequelize';
import redisAdapter from 'socket.io-redis'; import redisAdapter from 'socket.io-redis';
import {v4 as uuidv4} from 'uuid';
import {joinChannel, parseChannel} from '~/common/channel'; import {joinChannel, parseChannel} from '~/common/channel';
import AddFavorite from '~/common/packets/add-favorite.packet';
import AddFriend from '~/common/packets/add-friend.packet';
import Block from '~/common/packets/block.packet';
import Join from '~/common/packets/join.packet';
import Leave from '~/common/packets/leave.packet';
import Message from '~/common/packets/message.packet';
import RemoveFavorite from '~/common/packets/remove-favorite.packet';
import RemoveFriend from '~/common/packets/remove-friend.packet';
import Usernames from '~/common/packets/usernames.packet';
import Unblock from '~/common/packets/unblock.packet';
import { import {channelsToHydrate} from '~/server/entry';
channelsToHydrate,
channelState,
channelUserCounts,
channelUsers,
} from '~/server/entry';
import {allModels} from './models/registrar'; import * as PacketHandlers from './packet';
import {userJoin} from './packet/join';
import {userLeave} from './packet/leave';
import passport from './passport'; import passport from './passport';
import createRedisClient from './redis'; import createRedisClient from './redis';
import session from './session'; import session from './session';
@ -34,31 +16,14 @@ import session from './session';
const pubClient = createRedisClient(); const pubClient = createRedisClient();
const subClient = createRedisClient(); const subClient = createRedisClient();
const adapter = redisAdapter({pubClient, subClient}); const adapter = redisAdapter({pubClient, subClient});
const packetHandlers = new Map();
const removeFavoritedUser = async (socket, user, other) => {
const {Favorite} = allModels();
const favorites = await user.getFavorites();
const toRemove = favorites.find(({channel}) => channel === `/u/${other.redditUsername}`);
if (toRemove) {
await Favorite.destroy({
where: {
id: toRemove.id,
},
});
socket.to(`/user/${user.id}`, new RemoveFavorite(`/u/${other.redditUsername}`));
}
};
export function createSocketServer(httpServer) { export function createSocketServer(httpServer) {
const { Object.keys(PacketHandlers).forEach((key) => {
Block: BlockModel, const {Packet, responder, validator} = PacketHandlers[key];
Favorite, packetHandlers.set(Packet, {responder, validator});
Friendship,
User,
} = allModels();
const socketServer = new SocketServer(httpServer, {
adapter,
}); });
const socketServer = new SocketServer(httpServer, {adapter});
socketServer.io.use(socketSession(session())); socketServer.io.use(socketSession(session()));
socketServer.io.use((socket, next) => { socketServer.io.use((socket, next) => {
passport.initialize()(socket.handshake, undefined, next); passport.initialize()(socket.handshake, undefined, next);
@ -81,22 +46,6 @@ export function createSocketServer(httpServer) {
} }
next(); next();
}); });
const userJoin = async (channel, socket) => {
const userId = '/r/anonymous' === channel ? 0 : socket.handshake.userId;
const users = await channelUsers(socket.handshake, channel);
if (-1 === users.indexOf(userId)) {
ServerSocket.send(socket.to(channel), new Join({channel, id: userId}));
}
await promisify(socket.join.bind(socket))(channel);
};
const userLeave = async (channel, socket) => {
const userId = '/r/anonymous' === channel ? 0 : socket.req.userId;
await promisify(socket.leave.bind(socket))(channel);
const userCounts = await channelUserCounts(socket.req, channel);
if (!userCounts[userId]) {
socket.to(channel, new Leave({channel, id: userId}));
}
};
socketServer.io.use(async (socket, next) => { socketServer.io.use(async (socket, next) => {
await Promise.all( await Promise.all(
(await channelsToHydrate(socket.handshake)) (await channelsToHydrate(socket.handshake))
@ -106,143 +55,9 @@ export function createSocketServer(httpServer) {
next(); next();
}); });
socketServer.on('connect', (socket) => { socketServer.on('connect', (socket) => {
const {req} = socket;
socket.on('packet', async (packet, fn) => { socket.on('packet', async (packet, fn) => {
if (packet instanceof AddFavorite) { const {responder} = packetHandlers.get(packet.constructor);
await req.user.createFavorite({channel: packet.data}); responder(packet, socket, fn);
socket.to(`/user/${req.userId}`, packet);
}
if (packet instanceof AddFriend) {
const adderId = req.user.id;
const user = await User.findOne({where: {redditUsername: packet.data.nameOrStatus}});
if (!user) {
return;
}
const addeeId = user.id;
let friendship = await Friendship.findOne({where: {adderId: addeeId, addeeId: adderId}});
const friendshipIsActive = !!friendship;
if (friendship) {
friendship.status = 'active';
await friendship.save();
}
else {
friendship = await Friendship.create({adderId, addeeId});
}
[addeeId, adderId].forEach((id) => {
socket.to(`/user/${id}`, new AddFriend({
addeeId: friendshipIsActive ? adderId : addeeId,
adderId: friendshipIsActive ? addeeId : adderId,
nameOrStatus: friendship.status,
}));
});
fn({id: addeeId});
}
if (packet instanceof Block) {
const id = packet.data;
await req.user.createBlock({blocked: id});
await Friendship.destroy({
where: {
[Op.or]: [
{[Op.and]: [{addeeId: req.userId}, {adderId: id}]},
{[Op.and]: [{addeeId: id}, {adderId: req.userId}]},
],
},
});
const user = await User.findByPk(id);
removeFavoritedUser(socket, user, req.user);
removeFavoritedUser(socket, req.user, user);
socket.to(`/user/${req.userId}`, packet);
socket.to(`/user/${id}`, new RemoveFriend(req.userId));
socket.to(`/user/${req.userId}`, new RemoveFriend(id));
fn();
}
if (packet instanceof RemoveFavorite) {
const favorites = await req.user.getFavorites();
const toRemove = favorites.find(({channel}) => channel === packet.data);
await Favorite.destroy({
where: {
id: toRemove.id,
},
});
socket.to(`/user/${req.userId}`, packet);
fn();
}
if (packet instanceof RemoveFriend) {
const id = packet.data;
await Friendship.destroy({
where: {
[Op.or]: [
{[Op.and]: [{addeeId: req.userId}, {adderId: id}]},
{[Op.and]: [{addeeId: id}, {adderId: req.userId}]},
],
},
});
socket.to(`/user/${id}`, new RemoveFriend(req.userId));
socket.to(`/user/${req.userId}`, new RemoveFriend(id));
const user = await User.findByPk(id);
removeFavoritedUser(socket, user, req.user);
removeFavoritedUser(socket, req.user, user);
fn();
}
if (packet instanceof Join) {
const {channel} = packet.data;
await userJoin(channel, socket.socket);
fn(await channelState(req, channel));
}
if (packet instanceof Leave) {
const {channel} = packet.data;
await userLeave(channel, socket);
fn();
}
if (packet instanceof Message) {
const {channel, message} = packet.data;
const {name, type} = parseChannel(`/chat${channel}`);
const other = await User.findOne({where: {redditUsername: name}});
const owner = '/r/anonymous' === channel ? 0 : req.userId;
const serverChannel = 'r' === type
? channel
: `/u/${[name, req.user.redditUsername].sort().join('$')}`;
const timestamp = Date.now();
const username = '/r/anonymous' === channel ? 'anonymous' : req.user.redditUsername;
const uuid = uuidv4();
const key = `${serverChannel}:messages:${uuid}`;
('u' === type ? [`/user/${other.id}`, `/user/${req.userId}`] : [channel]).forEach((room) => (
socket.to(room, new Message({
...packet.data,
channel: 'r' === type
? channel
: `/u/${username === room.substr(3) ? name : username}`,
owner,
timestamp,
uuid,
}))
));
pubClient
.multi()
.set(key, JSON.stringify({
message,
owner,
socket: socket.id,
timestamp,
}))
.expire(key, '/r/anonymous' === channel ? 60 : 600)
.exec(() => fn([timestamp, uuid]));
}
if (packet instanceof Unblock) {
await BlockModel.destroy({
where: {
blocked: packet.data,
user_id: req.userId,
},
});
socket.to(`/user/${req.userId}`, packet);
fn();
}
if (packet instanceof Usernames) {
fn(await Promise.all(packet.data.map(
async (id) => (await User.findByPk(id)).redditUsername,
)));
}
}); });
socket.on('disconnecting', async () => { socket.on('disconnecting', async () => {
Object.keys(socket.socket.rooms).forEach((room) => { Object.keys(socket.socket.rooms).forEach((room) => {