flow: lots, join/leave
This commit is contained in:
parent
d62f5fbfa3
commit
3ce283ff4c
|
@ -18,7 +18,7 @@ const App = () => (
|
|||
/>
|
||||
<Route
|
||||
component={Chat}
|
||||
path="/chat/:type/:name"
|
||||
path="/chat"
|
||||
/>
|
||||
</Switch>
|
||||
</div>
|
||||
|
|
|
@ -2,7 +2,8 @@ import './channel.scss';
|
|||
|
||||
import classnames from 'classnames';
|
||||
import PropTypes from 'prop-types';
|
||||
import React, {useState} from 'react';
|
||||
import React from 'react';
|
||||
import {Link} from 'react-router-dom';
|
||||
|
||||
export default function Channel(props) {
|
||||
const {
|
||||
|
@ -11,22 +12,17 @@ export default function Channel(props) {
|
|||
name,
|
||||
prefix,
|
||||
} = props;
|
||||
const [isActioning, setIsActioning] = useState(false);
|
||||
return (
|
||||
<div
|
||||
className={classnames('channel', {actioning: isActioning})}
|
||||
className={classnames('channel')}
|
||||
>
|
||||
<a
|
||||
<Link
|
||||
className="channel__link"
|
||||
href={href}
|
||||
onContextMenu={(event) => {
|
||||
setIsActioning(!isActioning);
|
||||
event.preventDefault();
|
||||
}}
|
||||
to={href}
|
||||
>
|
||||
<span className="muted">{prefix}</span>
|
||||
{name}
|
||||
</a>
|
||||
</Link>
|
||||
<div
|
||||
className="channel__actions"
|
||||
>
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
@import '~/client/scss/colors.scss';
|
||||
|
||||
.channel {
|
||||
position: relative;
|
||||
}
|
||||
|
|
|
@ -31,6 +31,9 @@ export default function ChatMessages() {
|
|||
current?.scrollTo(0, !current ? 0 : current.scrollHeight);
|
||||
}
|
||||
}, [current, heightWatch, messageCount, isAtTheBottom]);
|
||||
if (!channel) {
|
||||
return null;
|
||||
}
|
||||
let messageOwner = false;
|
||||
return (
|
||||
<div className="chat--messages">
|
||||
|
|
|
@ -1,27 +1,36 @@
|
|||
import './chat.scss';
|
||||
|
||||
import React, {useEffect, useRef} from 'react';
|
||||
import {useSelector} from 'react-redux';
|
||||
import {useHistory, useParams} from 'react-router-dom';
|
||||
import {useSelector, useDispatch} from 'react-redux';
|
||||
import {useHistory} from 'react-router-dom';
|
||||
|
||||
import {channelSelector, submitJoin} from '~/common/state/chat';
|
||||
import {userSelector} from '~/common/state/user';
|
||||
import {validateChannel} from '~/common/channel';
|
||||
|
||||
import useBreakpoints from '~/client/hooks/useBreakpoints';
|
||||
import useChannel from '~/client/hooks/useChannel';
|
||||
|
||||
import useBreakpoints from './hooks/useBreakpoints';
|
||||
import ChatCenter from './chat--center';
|
||||
import ChatLeft from './chat--left';
|
||||
import ChatRight from './chat--right';
|
||||
|
||||
export default function Chat() {
|
||||
const dispatch = useDispatch();
|
||||
const history = useHistory();
|
||||
const {type, name} = useParams();
|
||||
const ref = useRef(null);
|
||||
const {tablet} = useBreakpoints();
|
||||
const user = useSelector(userSelector);
|
||||
const allowedUser = !user.isAnonymous || ('r' === type && 'anonymous' === name);
|
||||
const channel = useChannel();
|
||||
const hasChannel = !!useSelector((state) => channelSelector(state, channel));
|
||||
const allowedUser = !user.isAnonymous || (channel === '/r/anonymous');
|
||||
useEffect(() => {
|
||||
if (!allowedUser) {
|
||||
history.goBack();
|
||||
}
|
||||
if (!hasChannel && validateChannel(channel)) {
|
||||
dispatch(submitJoin({channel, id: user.id}));
|
||||
}
|
||||
});
|
||||
if (!allowedUser) {
|
||||
return null;
|
||||
|
|
|
@ -1,8 +1,19 @@
|
|||
import './home.scss';
|
||||
|
||||
import React from 'react';
|
||||
import React, {useEffect} from 'react';
|
||||
import {useSelector} from 'react-redux';
|
||||
import {useHistory} from 'react-router-dom';
|
||||
|
||||
import {userSelector} from '~/common/state/user';
|
||||
|
||||
export default function Home() {
|
||||
const history = useHistory();
|
||||
const user = useSelector(userSelector);
|
||||
useEffect(() => {
|
||||
if (user.id) {
|
||||
history.replace('/chat');
|
||||
}
|
||||
});
|
||||
return (
|
||||
<div className="home">
|
||||
<div className="home__inner">
|
||||
|
|
24
src/client/store/effects.js
vendored
24
src/client/store/effects.js
vendored
|
@ -1,9 +1,31 @@
|
|||
import Join from '~/common/packets/join.packet';
|
||||
import Leave from '~/common/packets/leave.packet';
|
||||
import Message from '~/common/packets/message.packet';
|
||||
import {addMessage, confirmMessage, submitMessage} from '~/common/state/chat';
|
||||
import {
|
||||
addMessage,
|
||||
confirmMessage,
|
||||
join,
|
||||
leave,
|
||||
submitJoin,
|
||||
submitLeave,
|
||||
submitMessage,
|
||||
} from '~/common/state/chat';
|
||||
|
||||
import {socket} from '~/client/hooks/useSocket';
|
||||
|
||||
const effects = {
|
||||
[submitJoin]: ({dispatch}, {payload}) => {
|
||||
const {channel} = payload;
|
||||
socket.send(new Join(payload), ({messages, users}) => {
|
||||
dispatch(join({channel, messages, users}));
|
||||
});
|
||||
},
|
||||
[submitLeave]: ({dispatch}, {payload}) => {
|
||||
const {channel} = payload;
|
||||
socket.send(new Leave(payload), () => {
|
||||
dispatch(leave({channel}));
|
||||
});
|
||||
},
|
||||
[submitMessage]: ({dispatch}, {payload}) => {
|
||||
dispatch(addMessage(payload));
|
||||
socket.send(new Message(payload), ([timestamp, current]) => {
|
||||
|
|
|
@ -8,3 +8,28 @@ export const parseChannel = (url) => {
|
|||
};
|
||||
|
||||
export const joinChannel = ({name, type}) => `/${type}/${name}`;
|
||||
|
||||
const countryExceptions = ['de', 'es', 'it'];
|
||||
export const validateSubreddit = (name) => {
|
||||
if (-1 !== countryExceptions.indexOf(name)) {
|
||||
return true;
|
||||
}
|
||||
return !!name.match(/^[A-Za-z0-9][A-Za-z0-9_]{2,20}$/i);
|
||||
};
|
||||
|
||||
export const validateUsername = (name) => name.match(/^[\w-]{3,20}/);
|
||||
|
||||
export const validateChannel = (channel) => {
|
||||
if (!channel) {
|
||||
return false;
|
||||
}
|
||||
const parts = parseChannel(`/chat${channel}`);
|
||||
if (!parts) {
|
||||
return false;
|
||||
}
|
||||
const {name, type} = parts;
|
||||
if (-1 === ['r', 'u'].indexOf(type)) {
|
||||
return false;
|
||||
}
|
||||
return ('r' === type ? validateSubreddit : validateUsername)(name);
|
||||
};
|
||||
|
|
|
@ -17,7 +17,7 @@ export const messagesSelector = (state) => state.chat.messages;
|
|||
|
||||
export const channelMessagesSelector = createSelector(
|
||||
[channelSelector, messagesSelector],
|
||||
(channel, messages) => channel.messages.map((uuid) => messages[uuid]),
|
||||
(channel, messages) => (!channel ? [] : channel.messages.map((uuid) => messages[uuid])),
|
||||
);
|
||||
|
||||
const slice = createSlice({
|
||||
|
@ -61,8 +61,14 @@ const slice = createSlice({
|
|||
focus: ({unread}, {payload: {channel}}) => {
|
||||
unread[channel] = 0;
|
||||
},
|
||||
join: ({channels}, {payload: {channel, messages, users}}) => {
|
||||
channels[channel] = {messages, users};
|
||||
join: ({channels, messages}, {payload: {channel, messages: channelMessages, users}}) => {
|
||||
channelMessages.forEach((message) => {
|
||||
messages[message.uuid] = message;
|
||||
});
|
||||
channels[channel] = {
|
||||
messages: channelMessages.map((message) => message.uuid),
|
||||
users,
|
||||
};
|
||||
},
|
||||
joined: ({channels}, {payload: {channel, id}}) => {
|
||||
channels[channel].users.push(id);
|
||||
|
@ -83,6 +89,8 @@ const slice = createSlice({
|
|||
removeRecent: ({recent}, {payload: {channel}}) => {
|
||||
recent.splice(recent.indexOf(channel), 1);
|
||||
},
|
||||
submitJoin: () => {},
|
||||
submitLeave: () => {},
|
||||
submitMessage: () => {},
|
||||
},
|
||||
});
|
||||
|
@ -99,6 +107,8 @@ export const {
|
|||
left,
|
||||
removeMessage,
|
||||
removeRecent,
|
||||
submitJoin,
|
||||
submitLeave,
|
||||
submitMessage,
|
||||
} = slice.actions;
|
||||
|
||||
|
|
|
@ -8,18 +8,22 @@ import createRedisClient, {keys} from './redis';
|
|||
const redisClient = createRedisClient();
|
||||
const mget = promisify(redisClient.mget.bind(redisClient));
|
||||
|
||||
export const channelUserCounts = async (channel) => {
|
||||
const socketKeys = await keys(redisClient, `${channel}:users:*`);
|
||||
export const channelUserCounts = async (req, channel) => {
|
||||
const clients = promisify(req.adapter.clients.bind(req.adapter));
|
||||
const socketKeys = await clients([channel]);
|
||||
const customRequest = promisify(req.adapter.customRequest.bind(req.adapter));
|
||||
const replies = await customRequest({type: 'socketUsers', payload: socketKeys});
|
||||
const socketUsers = replies.reduce((r, m) => ({...r, ...m}), {});
|
||||
return 0 === socketKeys.length
|
||||
? []
|
||||
: (await mget(...socketKeys)).reduce((r, k) => ({...r, [k]: 1 + (r[k] || 0)}), {});
|
||||
: Object.values(socketUsers).reduce((r, uid) => ({...r, [uid]: 1 + (r[uid] || 0)}), {});
|
||||
};
|
||||
|
||||
export const channelUsers = async (channel) => (
|
||||
Object.keys(await channelUserCounts(channel)).map((id) => parseInt(id, 10))
|
||||
export const channelUsers = async (req, channel) => (
|
||||
Object.keys(await channelUserCounts(req, channel)).map((id) => parseInt(id, 10))
|
||||
);
|
||||
|
||||
const channelState = async (req, channel) => {
|
||||
export const channelState = async (req, channel) => {
|
||||
const messageKeys = await keys(redisClient, `${channel}:messages:*`);
|
||||
const messages = 0 === messageKeys.length
|
||||
? []
|
||||
|
@ -30,7 +34,7 @@ const channelState = async (req, channel) => {
|
|||
}))
|
||||
.sort((l, r) => l.timestamp - r.timestamp);
|
||||
const userId = req.user ? req.user.id : 0;
|
||||
const users = await channelUsers(channel);
|
||||
const users = await channelUsers(req, channel);
|
||||
return {
|
||||
messages,
|
||||
users: -1 !== users.indexOf(userId) ? users : users.concat([userId]),
|
||||
|
@ -43,17 +47,18 @@ export const userState = async (req) => {
|
|||
? {
|
||||
favorites: [],
|
||||
friends: await user.friends(),
|
||||
id: user.id,
|
||||
redditUsername: user.redditUsername,
|
||||
}
|
||||
: null;
|
||||
};
|
||||
|
||||
export const channelsToHydrate = (req) => (
|
||||
(req.channel ? [req.channel] : []).concat(req.user ? req.user.favorites : [])
|
||||
export const channelsToHydrate = async (req) => (
|
||||
(req.channel ? [req.channel] : []).concat(req.user ? await req.user.favorites() : [])
|
||||
);
|
||||
|
||||
export const chatState = async (req) => {
|
||||
const toHydrate = channelsToHydrate(req);
|
||||
const toHydrate = await channelsToHydrate(req);
|
||||
if (0 === toHydrate.length) {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -8,9 +8,6 @@ import express from 'express';
|
|||
import httpProxy from 'http-proxy';
|
||||
import {invokeHookFlat} from 'scwp';
|
||||
|
||||
import {parseChannel} from '~/common/channel';
|
||||
|
||||
import userRoutes from './routes/user';
|
||||
import passport from './passport';
|
||||
import session from './session';
|
||||
|
||||
|
@ -33,13 +30,9 @@ export async function createHttpServer() {
|
|||
app.use(session());
|
||||
app.use(passport.initialize());
|
||||
app.use(passport.session());
|
||||
app.use((req, res, next) => {
|
||||
req.channel = parseChannel(req.url);
|
||||
next();
|
||||
});
|
||||
userRoutes(app);
|
||||
const httpServer = http.createServer(app);
|
||||
httpServer.listen(31344, '0.0.0.0');
|
||||
httpServer.app = app;
|
||||
httpServer.createFallthrough = async () => {
|
||||
if ('production' !== process.env.NODE_ENV) {
|
||||
const proxy = httpProxy.createProxyServer({
|
||||
secure: false,
|
||||
|
@ -68,6 +61,8 @@ export async function createHttpServer() {
|
|||
const buffer = await stream.pipe(concat());
|
||||
app.get('*', async (req, res) => res.end(await hydration(req, buffer)));
|
||||
}
|
||||
};
|
||||
httpServer.listen(31344, '0.0.0.0');
|
||||
return httpServer;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
import {registerHooks} from 'scwp';
|
||||
|
||||
import {parseChannel} from '~/common/channel';
|
||||
|
||||
import userRoutes from './routes/user';
|
||||
import {createDatabaseConnection, destroyDatabaseConnection} from './db';
|
||||
import {createHttpServer, destroyHttpServer} from './http';
|
||||
import {createReplServer, destroyReplServer} from './repl';
|
||||
|
@ -41,6 +44,15 @@ async function restartListening() {
|
|||
httpServer = await createHttpServer();
|
||||
replServer = await createReplServer();
|
||||
socketServer = await createSocketServer(httpServer);
|
||||
const {app} = httpServer;
|
||||
app.use((req, res, next) => {
|
||||
req.adapter = socketServer.io.of('/').adapter;
|
||||
req.channel = parseChannel(req.url);
|
||||
req.userId = req.user ? req.user.id : 0;
|
||||
next();
|
||||
});
|
||||
userRoutes(app);
|
||||
await httpServer.createFallthrough();
|
||||
// Accounting bullshit
|
||||
trackConnections();
|
||||
}
|
||||
|
|
|
@ -34,6 +34,10 @@ class User extends BaseModel {
|
|||
});
|
||||
}
|
||||
|
||||
async favorites() {
|
||||
return [];
|
||||
}
|
||||
|
||||
async friends() {
|
||||
const {Friendship} = allModels();
|
||||
const friendships = await Friendship.findAll({
|
||||
|
|
|
@ -6,10 +6,7 @@ import passport from 'passport';
|
|||
export default function userRoutes(app) {
|
||||
app.get('/auth/reddit', (req, res, next) => {
|
||||
req.session.state = randomBytes(32).toString('hex');
|
||||
passport.authenticate('reddit', {
|
||||
state: req.session.state,
|
||||
duration: 'permanent',
|
||||
})(req, res, next);
|
||||
passport.authenticate('reddit', {state: req.session.state})(req, res, next);
|
||||
});
|
||||
app.get('/auth/reddit/callback', (req, res, next) => {
|
||||
if (req.query.state === req.session.state) {
|
||||
|
|
|
@ -4,7 +4,7 @@ import {promisify} from 'util';
|
|||
import redisAdapter from 'socket.io-redis';
|
||||
import {v4 as uuidv4} from 'uuid';
|
||||
|
||||
import {SocketServer} from '@avocado/net/server/socket';
|
||||
import {ServerSocket, SocketServer} from '@avocado/net/server/socket';
|
||||
import socketSession from 'express-socket.io-session';
|
||||
|
||||
import {joinChannel, parseChannel} from '~/common/channel';
|
||||
|
@ -12,7 +12,12 @@ import Join from '~/common/packets/join.packet';
|
|||
import Leave from '~/common/packets/leave.packet';
|
||||
import Message from '~/common/packets/message.packet';
|
||||
|
||||
import {channelsToHydrate, channelUserCounts, channelUsers} from '~/server/entry';
|
||||
import {
|
||||
channelsToHydrate,
|
||||
channelState,
|
||||
channelUserCounts,
|
||||
channelUsers,
|
||||
} from '~/server/entry';
|
||||
|
||||
import passport from './passport';
|
||||
import createRedisClient, {keys} from './redis';
|
||||
|
@ -21,8 +26,6 @@ import session from './session';
|
|||
const pubClient = createRedisClient();
|
||||
const subClient = createRedisClient();
|
||||
const adapter = redisAdapter({pubClient, subClient});
|
||||
const del = promisify(pubClient.del.bind(pubClient));
|
||||
const set = promisify(pubClient.set.bind(pubClient));
|
||||
|
||||
export function createSocketServer(httpServer) {
|
||||
const socketServer = new SocketServer(httpServer, {
|
||||
|
@ -37,31 +40,49 @@ export function createSocketServer(httpServer) {
|
|||
});
|
||||
socketServer.io.use((socket, next) => {
|
||||
/* eslint-disable no-param-reassign */
|
||||
socket.handshake.adapter = socketServer.io.of('/').adapter;
|
||||
socket.handshake.channel = parseChannel(socket.handshake.query.referrer);
|
||||
socket.handshake.userId = socket.handshake.user ? socket.handshake.user.id : 0;
|
||||
/* eslint-enable no-param-reassign */
|
||||
next();
|
||||
});
|
||||
socketServer.io.use(async (socket, next) => {
|
||||
const userJoin = async (channel, socket) => {
|
||||
const {userId} = socket.handshake;
|
||||
const join = promisify(socket.join.bind(socket));
|
||||
await Promise.all(
|
||||
channelsToHydrate(socket.handshake)
|
||||
.map((channel) => joinChannel(channel))
|
||||
.map(async (channel) => {
|
||||
await join(channel);
|
||||
const users = await channelUsers(channel);
|
||||
const users = await channelUsers(socket.handshake, channel);
|
||||
if (-1 === users.indexOf(userId)) {
|
||||
socketServer.send(new Join({channel, id: userId}), channel);
|
||||
ServerSocket.send(socket.to(channel), new Join({channel, id: userId}));
|
||||
}
|
||||
await set(`${channel}:users:${socket.id}`, userId);
|
||||
}),
|
||||
await promisify(socket.join.bind(socket))(channel);
|
||||
};
|
||||
const userLeave = async (channel, socket) => {
|
||||
const {userId} = socket.req;
|
||||
await promisify(socket.leave.bind(socket))(channel);
|
||||
const userCounts = await channelUserCounts(socket.req, channel);
|
||||
if (!userCounts[userId]) {
|
||||
socket.to(channel, new Leave({channel, id: userId}));
|
||||
}
|
||||
};
|
||||
socketServer.io.use(async (socket, next) => {
|
||||
await Promise.all(
|
||||
(await channelsToHydrate(socket.handshake))
|
||||
.map((channel) => joinChannel(channel))
|
||||
.map((channel) => userJoin(channel, socket)),
|
||||
);
|
||||
next();
|
||||
});
|
||||
socketServer.on('connect', (socket) => {
|
||||
const {req} = socket;
|
||||
socket.on('packet', (packet, fn) => {
|
||||
socket.on('packet', async (packet, fn) => {
|
||||
if (packet instanceof Join) {
|
||||
const {channel} = packet.data;
|
||||
await userJoin(channel, socket.socket);
|
||||
fn(await channelState(req, channel));
|
||||
}
|
||||
if (packet instanceof Leave) {
|
||||
const {channel} = packet.data;
|
||||
await userLeave(channel, socket);
|
||||
fn();
|
||||
}
|
||||
if (packet instanceof Message) {
|
||||
const {channel, message} = packet.data;
|
||||
const owner = req.user ? req.user.id : 0;
|
||||
|
@ -85,21 +106,22 @@ export function createSocketServer(httpServer) {
|
|||
.exec(() => fn([timestamp, uuid]));
|
||||
}
|
||||
});
|
||||
socket.on('disconnect', async () => {
|
||||
const socketKeys = await keys(pubClient, `*:users:${socket.id}`);
|
||||
const {userId} = req;
|
||||
if (socketKeys.length > 0) {
|
||||
const channels = socketKeys.map((key) => key.split(':')[0]);
|
||||
await Promise.all(channels.map(async (channel) => {
|
||||
const userCounts = await channelUserCounts(channel);
|
||||
if (1 === userCounts[userId]) {
|
||||
socketServer.send(new Leave({channel, id: userId}), channel);
|
||||
}
|
||||
}));
|
||||
await del(socketKeys);
|
||||
socket.on('disconnecting', async () => {
|
||||
Object.keys(socket.socket.rooms).forEach((room) => {
|
||||
if (parseChannel(`/chat${room}`)) {
|
||||
userLeave(room, socket);
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
socketServer.io.of('/').adapter.customHook = (req, fn) => {
|
||||
if ('socketUsers' === req.type) {
|
||||
const sids = req.payload;
|
||||
const {connected} = socketServer.io.of('/');
|
||||
const here = sids.filter((sid) => !!connected[sid]);
|
||||
fn(here.reduce((r, sid) => ({...r, [sid]: connected[sid].handshake.userId}), {}));
|
||||
}
|
||||
};
|
||||
return socketServer;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user