From 28fa1ec084cfbba56471e4108af99bd70b26b3ba Mon Sep 17 00:00:00 2001 From: cha0s Date: Fri, 18 Dec 2020 13:20:10 -0600 Subject: [PATCH] refactor: lt msgs --- app/src/react/components/about/index.jsx | 2 +- packages/chat/src/message-channel.js | 14 +++--- packages/chat/src/packets/message.server.js | 50 ++++++++++++++------- packages/chat/src/replace-message.js | 35 +++++++++++---- packages/chat/src/state.js | 42 ++++++++++------- 5 files changed, 96 insertions(+), 47 deletions(-) diff --git a/app/src/react/components/about/index.jsx b/app/src/react/components/about/index.jsx index c01b3f2..09424f8 100644 --- a/app/src/react/components/about/index.jsx +++ b/app/src/react/components/about/index.jsx @@ -52,7 +52,7 @@ const About = () => ( {' '} 1 {' '} - minute. + minute. In addition, non-anonymous chats will preserve their last 50 messages.

Be excellent to each other

diff --git a/packages/chat/src/message-channel.js b/packages/chat/src/message-channel.js index 8a72064..0b2f764 100644 --- a/packages/chat/src/message-channel.js +++ b/packages/chat/src/message-channel.js @@ -1,13 +1,13 @@ -import {createClient, keys} from '@latus/redis'; +import {promisify} from 'util'; + +import {createClient} from '@latus/redis'; import {parseChannel} from '@reddichat/core'; const messageChannel = async (latus, uuid) => { - const results = (await keys(createClient(latus), `*:messages:${uuid}`)); - if (0 === results.length) { - return undefined; - } - const key = results.pop(); - return parseChannel(key.split(':')[0]); + const client = createClient(latus); + const get = promisify(client.get.bind(client)); + const channel = get(`uuid-channel:${uuid}`); + return channel ? parseChannel(channel) : undefined; }; export default messageChannel; diff --git a/packages/chat/src/packets/message.server.js b/packages/chat/src/packets/message.server.js index 780944e..30a65a4 100644 --- a/packages/chat/src/packets/message.server.js +++ b/packages/chat/src/packets/message.server.js @@ -1,3 +1,5 @@ +import {promisify} from 'util'; + import {v4 as uuidv4} from 'uuid'; import {ModelMap, Op} from '@latus/db'; @@ -9,6 +11,8 @@ import {ADMIN, MOD} from '../distinction'; import Message from './message'; +const LONG_TERM_COUNT = 50; + export default (latus) => class MessageServer extends Message(latus) { static characterLimiter = createLimiter(latus, { @@ -64,23 +68,37 @@ export default (latus) => class MessageServer extends Message(latus) { uuid, }]) )); - return new Promise((r, e) => { - const rendered = renderChannel(channel); - const serverChannel = 'r' === type - ? rendered - : `/u/${[name, req.user.redditUsername].sort().join('$')}`; - const key = `${serverChannel}:messages:${uuid}`; - const ttl = channelIsAnonymous(channel) ? 60 : 600; - pubClient - .setex(key, ttl, JSON.stringify({ - distinction, - ip: req.ip, - message, - owner, - socket: socket.id, - timestamp, - }), (error) => (error ? e(error) : r([timestamp, uuid]))); + const del = promisify(pubClient.del.bind(pubClient)); + const set = promisify(pubClient.set.bind(pubClient)); + const setex = promisify(pubClient.setex.bind(pubClient)); + const rendered = renderChannel(channel); + const serverChannel = 'r' === type + ? rendered + : `/u/${[name, req.user.redditUsername].sort().join('$')}`; + const key = `${serverChannel}:messages:${uuid}`; + const ttl = channelIsAnonymous(channel) ? 60 : 600; + const stringified = JSON.stringify({ + distinction, + ip: req.ip, + message, + owner, + socket: socket.id, + timestamp, + uuid, }); + if ('r' === type && !isAnonymous) { + await set(`uuid-channel:${uuid}`, rendered); + const lpush = promisify(pubClient.lpush.bind(pubClient)); + const lrange = promisify(pubClient.lrange.bind(pubClient)); + const ltrim = promisify(pubClient.ltrim.bind(pubClient)); + const key = `${serverChannel}:messages-lt`; + await lpush(key, stringified); + const fallingOff = await lrange(key, LONG_TERM_COUNT, -1); + const messages = fallingOff.map((reply) => JSON.parse(reply)); + await Promise.all(messages.map(({uuid}) => del(`uuid-channel:${uuid}`))); + await ltrim(key, 0, LONG_TERM_COUNT - 1); + } + return setex(key, ttl, stringified).then(() => [timestamp, uuid]); } static async validate(packet, socket) { diff --git a/packages/chat/src/replace-message.js b/packages/chat/src/replace-message.js index af4ae95..b1cd334 100644 --- a/packages/chat/src/replace-message.js +++ b/packages/chat/src/replace-message.js @@ -1,17 +1,34 @@ import {promisify} from 'util'; import {keys} from '@latus/redis'; +import {renderChannel} from '@reddichat/core'; + +import messageChannel from './message-channel'; const replaceMessage = async (req, uuid, fn) => { const {pubClient} = req.adapter; + const channel = messageChannel(uuid); + if (channel) { + const lrange = promisify(pubClient.lrange.bind(pubClient)); + const lset = promisify(pubClient.lset.bind(pubClient)); + const key = `${renderChannel(channel)}:messages-lt`; + const replies = await lrange(key, 0, -1); + if (replies) { + const messages = replies + .map((reply) => JSON.parse(reply)); + const index = messages.findIndex(({uuid: muuid}) => uuid === muuid); + if (-1 !== index) { + messages[index] = fn(messages[index]); + await lset(key, index, JSON.stringify(messages[index])); + } + } + } const get = promisify(pubClient.get.bind(pubClient)); const key = (await keys(pubClient, `*:messages:${uuid}`)).pop(); - if (!key) { - return Promise.resolve(); - } - const message = fn(JSON.parse(await get(key))); - return new Promise((r, e) => pubClient - .eval( + if (key) { + const message = fn(JSON.parse(await get(key))); + const peval = promisify(pubClient.eval.bind(pubClient)); + return peval( [ "local ttl = redis.call('ttl', ARGV[1])", 'if ttl > 0 then', @@ -21,8 +38,10 @@ const replaceMessage = async (req, uuid, fn) => { 0, key, JSON.stringify(message), - (error) => (error ? e(error) : r(message)), - )); + ) + .then(() => message); + } + return Promise.resolve(); }; export default replaceMessage; diff --git a/packages/chat/src/state.js b/packages/chat/src/state.js index 8a8b57e..c54aca6 100644 --- a/packages/chat/src/state.js +++ b/packages/chat/src/state.js @@ -30,28 +30,40 @@ export const channelUsers = async (req, channel) => { export const channelState = async (req, latus, channel) => { const {name, type} = channel; const redisClient = createClient(latus); + const lrange = promisify(redisClient.lrange.bind(redisClient)); const mget = promisify(redisClient.mget.bind(redisClient)); const realName = 'r' === type ? name : `${[name, req.user.redditUsername].sort().join('$')}`; - const pattern = `${renderChannel({type, name: realName})}:messages:*`; + const pattern = `${renderChannel({type, name: realName})}:messages*:*`; const messageKeys = await keys(redisClient, pattern); - const messages = 0 === messageKeys.length + const tmessages = 0 === messageKeys.length ? [] : (await mget(messageKeys)) - .map((reply, i) => ({ - ...JSON.parse(reply), - uuid: messageKeys[i].split(':')[2], - })) - .map((message) => { - const {socket, ip, ...rest} = message; - return { - ...rest, - owner: channelIsAnonymous(channel) ? 0 : rest.owner, - }; - }) - .filter(({message}) => !!message) - .sort((l, r) => l.timestamp - r.timestamp); + .map((reply) => JSON.parse(reply)); + const lmessages = (await lrange(`${renderChannel(channel)}:messages-lt`, 0, -1)) + .map((reply) => JSON.parse(reply)); + const messages = tmessages + .concat(lmessages) + .filter(({message}) => !!message) + .filter((() => { + const seen = new Set(); + return ({uuid}) => { + if (seen.has(uuid)) { + return false; + } + seen.add(uuid); + return true; + }; + })()) + .map((message) => { + const {socket, ip, ...rest} = message; + return { + ...rest, + owner: channelIsAnonymous(channel) ? 0 : rest.owner, + }; + }) + .sort((l, r) => l.timestamp - r.timestamp); const users = channelIsAnonymous(channel) ? [0] : Array.from((new Set(await channelUsers(req, channel))).values());