refactor: lt msgs
This commit is contained in:
parent
c99ec7eb96
commit
28fa1ec084
|
@ -52,7 +52,7 @@ const About = () => (
|
||||||
{' '}
|
{' '}
|
||||||
<strong style={{color: 'orange'}}>1</strong>
|
<strong style={{color: 'orange'}}>1</strong>
|
||||||
{' '}
|
{' '}
|
||||||
minute.
|
minute. In addition, non-anonymous chats will preserve their last 50 messages.
|
||||||
</p>
|
</p>
|
||||||
<h2>Be excellent to each other</h2>
|
<h2>Be excellent to each other</h2>
|
||||||
<p>
|
<p>
|
||||||
|
|
|
@ -1,13 +1,13 @@
|
||||||
import {createClient, keys} from '@latus/redis';
|
import {promisify} from 'util';
|
||||||
|
|
||||||
|
import {createClient} from '@latus/redis';
|
||||||
import {parseChannel} from '@reddichat/core';
|
import {parseChannel} from '@reddichat/core';
|
||||||
|
|
||||||
const messageChannel = async (latus, uuid) => {
|
const messageChannel = async (latus, uuid) => {
|
||||||
const results = (await keys(createClient(latus), `*:messages:${uuid}`));
|
const client = createClient(latus);
|
||||||
if (0 === results.length) {
|
const get = promisify(client.get.bind(client));
|
||||||
return undefined;
|
const channel = get(`uuid-channel:${uuid}`);
|
||||||
}
|
return channel ? parseChannel(channel) : undefined;
|
||||||
const key = results.pop();
|
|
||||||
return parseChannel(key.split(':')[0]);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
export default messageChannel;
|
export default messageChannel;
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
import {promisify} from 'util';
|
||||||
|
|
||||||
import {v4 as uuidv4} from 'uuid';
|
import {v4 as uuidv4} from 'uuid';
|
||||||
|
|
||||||
import {ModelMap, Op} from '@latus/db';
|
import {ModelMap, Op} from '@latus/db';
|
||||||
|
@ -9,6 +11,8 @@ import {ADMIN, MOD} from '../distinction';
|
||||||
|
|
||||||
import Message from './message';
|
import Message from './message';
|
||||||
|
|
||||||
|
const LONG_TERM_COUNT = 50;
|
||||||
|
|
||||||
export default (latus) => class MessageServer extends Message(latus) {
|
export default (latus) => class MessageServer extends Message(latus) {
|
||||||
|
|
||||||
static characterLimiter = createLimiter(latus, {
|
static characterLimiter = createLimiter(latus, {
|
||||||
|
@ -64,23 +68,37 @@ export default (latus) => class MessageServer extends Message(latus) {
|
||||||
uuid,
|
uuid,
|
||||||
}])
|
}])
|
||||||
));
|
));
|
||||||
return new Promise((r, e) => {
|
const del = promisify(pubClient.del.bind(pubClient));
|
||||||
const rendered = renderChannel(channel);
|
const set = promisify(pubClient.set.bind(pubClient));
|
||||||
const serverChannel = 'r' === type
|
const setex = promisify(pubClient.setex.bind(pubClient));
|
||||||
? rendered
|
const rendered = renderChannel(channel);
|
||||||
: `/u/${[name, req.user.redditUsername].sort().join('$')}`;
|
const serverChannel = 'r' === type
|
||||||
const key = `${serverChannel}:messages:${uuid}`;
|
? rendered
|
||||||
const ttl = channelIsAnonymous(channel) ? 60 : 600;
|
: `/u/${[name, req.user.redditUsername].sort().join('$')}`;
|
||||||
pubClient
|
const key = `${serverChannel}:messages:${uuid}`;
|
||||||
.setex(key, ttl, JSON.stringify({
|
const ttl = channelIsAnonymous(channel) ? 60 : 600;
|
||||||
distinction,
|
const stringified = JSON.stringify({
|
||||||
ip: req.ip,
|
distinction,
|
||||||
message,
|
ip: req.ip,
|
||||||
owner,
|
message,
|
||||||
socket: socket.id,
|
owner,
|
||||||
timestamp,
|
socket: socket.id,
|
||||||
}), (error) => (error ? e(error) : r([timestamp, uuid])));
|
timestamp,
|
||||||
|
uuid,
|
||||||
});
|
});
|
||||||
|
if ('r' === type && !isAnonymous) {
|
||||||
|
await set(`uuid-channel:${uuid}`, rendered);
|
||||||
|
const lpush = promisify(pubClient.lpush.bind(pubClient));
|
||||||
|
const lrange = promisify(pubClient.lrange.bind(pubClient));
|
||||||
|
const ltrim = promisify(pubClient.ltrim.bind(pubClient));
|
||||||
|
const key = `${serverChannel}:messages-lt`;
|
||||||
|
await lpush(key, stringified);
|
||||||
|
const fallingOff = await lrange(key, LONG_TERM_COUNT, -1);
|
||||||
|
const messages = fallingOff.map((reply) => JSON.parse(reply));
|
||||||
|
await Promise.all(messages.map(({uuid}) => del(`uuid-channel:${uuid}`)));
|
||||||
|
await ltrim(key, 0, LONG_TERM_COUNT - 1);
|
||||||
|
}
|
||||||
|
return setex(key, ttl, stringified).then(() => [timestamp, uuid]);
|
||||||
}
|
}
|
||||||
|
|
||||||
static async validate(packet, socket) {
|
static async validate(packet, socket) {
|
||||||
|
|
|
@ -1,17 +1,34 @@
|
||||||
import {promisify} from 'util';
|
import {promisify} from 'util';
|
||||||
|
|
||||||
import {keys} from '@latus/redis';
|
import {keys} from '@latus/redis';
|
||||||
|
import {renderChannel} from '@reddichat/core';
|
||||||
|
|
||||||
|
import messageChannel from './message-channel';
|
||||||
|
|
||||||
const replaceMessage = async (req, uuid, fn) => {
|
const replaceMessage = async (req, uuid, fn) => {
|
||||||
const {pubClient} = req.adapter;
|
const {pubClient} = req.adapter;
|
||||||
|
const channel = messageChannel(uuid);
|
||||||
|
if (channel) {
|
||||||
|
const lrange = promisify(pubClient.lrange.bind(pubClient));
|
||||||
|
const lset = promisify(pubClient.lset.bind(pubClient));
|
||||||
|
const key = `${renderChannel(channel)}:messages-lt`;
|
||||||
|
const replies = await lrange(key, 0, -1);
|
||||||
|
if (replies) {
|
||||||
|
const messages = replies
|
||||||
|
.map((reply) => JSON.parse(reply));
|
||||||
|
const index = messages.findIndex(({uuid: muuid}) => uuid === muuid);
|
||||||
|
if (-1 !== index) {
|
||||||
|
messages[index] = fn(messages[index]);
|
||||||
|
await lset(key, index, JSON.stringify(messages[index]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
const get = promisify(pubClient.get.bind(pubClient));
|
const get = promisify(pubClient.get.bind(pubClient));
|
||||||
const key = (await keys(pubClient, `*:messages:${uuid}`)).pop();
|
const key = (await keys(pubClient, `*:messages:${uuid}`)).pop();
|
||||||
if (!key) {
|
if (key) {
|
||||||
return Promise.resolve();
|
const message = fn(JSON.parse(await get(key)));
|
||||||
}
|
const peval = promisify(pubClient.eval.bind(pubClient));
|
||||||
const message = fn(JSON.parse(await get(key)));
|
return peval(
|
||||||
return new Promise((r, e) => pubClient
|
|
||||||
.eval(
|
|
||||||
[
|
[
|
||||||
"local ttl = redis.call('ttl', ARGV[1])",
|
"local ttl = redis.call('ttl', ARGV[1])",
|
||||||
'if ttl > 0 then',
|
'if ttl > 0 then',
|
||||||
|
@ -21,8 +38,10 @@ const replaceMessage = async (req, uuid, fn) => {
|
||||||
0,
|
0,
|
||||||
key,
|
key,
|
||||||
JSON.stringify(message),
|
JSON.stringify(message),
|
||||||
(error) => (error ? e(error) : r(message)),
|
)
|
||||||
));
|
.then(() => message);
|
||||||
|
}
|
||||||
|
return Promise.resolve();
|
||||||
};
|
};
|
||||||
|
|
||||||
export default replaceMessage;
|
export default replaceMessage;
|
||||||
|
|
|
@ -30,28 +30,40 @@ export const channelUsers = async (req, channel) => {
|
||||||
export const channelState = async (req, latus, channel) => {
|
export const channelState = async (req, latus, channel) => {
|
||||||
const {name, type} = channel;
|
const {name, type} = channel;
|
||||||
const redisClient = createClient(latus);
|
const redisClient = createClient(latus);
|
||||||
|
const lrange = promisify(redisClient.lrange.bind(redisClient));
|
||||||
const mget = promisify(redisClient.mget.bind(redisClient));
|
const mget = promisify(redisClient.mget.bind(redisClient));
|
||||||
const realName = 'r' === type
|
const realName = 'r' === type
|
||||||
? name
|
? name
|
||||||
: `${[name, req.user.redditUsername].sort().join('$')}`;
|
: `${[name, req.user.redditUsername].sort().join('$')}`;
|
||||||
const pattern = `${renderChannel({type, name: realName})}:messages:*`;
|
const pattern = `${renderChannel({type, name: realName})}:messages*:*`;
|
||||||
const messageKeys = await keys(redisClient, pattern);
|
const messageKeys = await keys(redisClient, pattern);
|
||||||
const messages = 0 === messageKeys.length
|
const tmessages = 0 === messageKeys.length
|
||||||
? []
|
? []
|
||||||
: (await mget(messageKeys))
|
: (await mget(messageKeys))
|
||||||
.map((reply, i) => ({
|
.map((reply) => JSON.parse(reply));
|
||||||
...JSON.parse(reply),
|
const lmessages = (await lrange(`${renderChannel(channel)}:messages-lt`, 0, -1))
|
||||||
uuid: messageKeys[i].split(':')[2],
|
.map((reply) => JSON.parse(reply));
|
||||||
}))
|
const messages = tmessages
|
||||||
.map((message) => {
|
.concat(lmessages)
|
||||||
const {socket, ip, ...rest} = message;
|
.filter(({message}) => !!message)
|
||||||
return {
|
.filter((() => {
|
||||||
...rest,
|
const seen = new Set();
|
||||||
owner: channelIsAnonymous(channel) ? 0 : rest.owner,
|
return ({uuid}) => {
|
||||||
};
|
if (seen.has(uuid)) {
|
||||||
})
|
return false;
|
||||||
.filter(({message}) => !!message)
|
}
|
||||||
.sort((l, r) => l.timestamp - r.timestamp);
|
seen.add(uuid);
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
})())
|
||||||
|
.map((message) => {
|
||||||
|
const {socket, ip, ...rest} = message;
|
||||||
|
return {
|
||||||
|
...rest,
|
||||||
|
owner: channelIsAnonymous(channel) ? 0 : rest.owner,
|
||||||
|
};
|
||||||
|
})
|
||||||
|
.sort((l, r) => l.timestamp - r.timestamp);
|
||||||
const users = channelIsAnonymous(channel)
|
const users = channelIsAnonymous(channel)
|
||||||
? [0]
|
? [0]
|
||||||
: Array.from((new Set(await channelUsers(req, channel))).values());
|
: Array.from((new Set(await channelUsers(req, channel))).values());
|
||||||
|
|
Loading…
Reference in New Issue
Block a user