Что это такое
Грубо говоря, пакетная обработка событий – это накопление событий в течение некоторого времени, чтобы позже обработать их все сразу.
Мы можем использовать две хорошо известные стратегии или даже объединить их, чтобы понять, когда пришло время промыть и обработать кучу буферизованных событий:
- когда количество событий достигает порогового значения;
- используя интервал – например, промывать события каждые 10 секунд.
Зачем это нужно
Пакетирование событий может быть полезно для:
- производительности, например, для уменьшения количества записей в постоянное хранилище или для оптимизации объема данных, передаваемых по сети;
- агрегирование – например, для группировки по ссылкам и подсчета посещений страниц.
Как
Реализация пакетной обработки событий в JavaScript с помощью RxJS – дело несложное.
Давайте начнем с примера на Node.
const EventEmitter = require('events');
const { fromEvent, bufferCount } = require('rxjs');
// I assume you already have an instance of EventEmitter in your app.
// In case I'm wrong, let's create the one.
const eventEmitter = new EventEmitter();
// listen to an event called `something-good-happened`
fromEvent(eventEmitter, 'something-good-happened')
// accumulate events
.pipe(
// and flush them every time it's number reaches 3
bufferCount(3),
// let's log it
tap(() => {
console.log(
`Great! The number of good things happened in a row reached ${events.length}. It's time to celebrate.`
);
console.log(events);
})
)
// process the batch
.subscribe((events) => {
const goodThingsByUser = {};
for (const event of events) {
goodThingsByUser[event.userId] = (goodThingsByUser[event.userId] ?? 0) + 1;
}
// reportGoodThingsDone(goodThingsByUser);
});
И, конечно, пример для браузера.
import { fromEvent, bufferTime, filter } from "rxjs";
// listen to clicks on the whole document
const clicks$ = fromEvent(
document.documentElement,
"click",
// selecte only properties we need
(event) => ({
type: event.type,
time: new Date(),
x: event.x,
y: event.y
})
);
clicks$
.pipe(
// flush events every 1 second
bufferTime(1000),
// move next only if there is at least one event
filter((events) => events.length > 0)
)
// process the batch
.subscribe((events) => {
fetch("/my-analytics", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(events)
});
});
Еще один живой пример здесь.
Обратите внимание, что не существует серебряной пули, и у каждого решения есть свои недостатки.
Массовая обработка большого количества событий может резко ухудшить производительность вашего приложения из-за блокировки основного потока, чего следует избегать любой ценой. Если вы планируете обрабатывать большое количество данных, подумайте об использовании очереди сообщений. Посмотрите, например, на BullMQ.
Спасибо за прочтение!
Не могли бы вы поделиться примерами применения пакетной обработки событий в ваших проектах?