Почему в multi-core Node.js ломаются in-process события и cron — и как это правильно решить
Как только вы распределяете Node.js-приложение по нескольким ядрам через cluster, две вещи, безупречно работавшие в одном процессе, начинают тихо ломаться: in-process события (EventEmitter) и декораторные cron'ы (@Cron). Причина одна, и решение одно. Ниже — механизм проблемы, почему она проявляется как «иногда работает, иногда нет», и как правильно решить её через Redis/BullMQ — практически.
Примеры из backend'а mnazorat (NestJS, cluster, 8 ядер; отслеживается 40+ организаций, 400+ сотрудников).
Основа: cluster делает каждый worker отдельным процессом
Node.js работает в одном потоке — один процесс занимает одно ядро CPU. Чтобы задействовать 8 ядер, стандартный путь — cluster: primary-процесс форкает worker на каждое ядро, все слушают один порт.
import * as cl from 'cluster';
import { cpus } from 'os';
const cluster = cl as unknown as cl.Cluster;
if (cluster.isPrimary) {
const numCPUs = cpus().length; // 8
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', (worker) => {
cluster.fork(); // перезапуск умершего worker'а
});
} else {
bootstrap(); // каждый worker — полностью отдельное NestJS-приложение
}По производительности это отличное решение. Но вместе с ним приходит один факт, из которого вытекает всё остальное:
Каждый worker — отдельный процесс: отдельная память, отдельные переменные. Worker'ы не делят между собой ничего из памяти.
Проблема 1: EventEmitter работает только внутри своего процесса
@nestjs/event-emitter удобен: в одном месте emit, в другом @OnEvent — модули не связаны жёстко.
// издатель
this.eventEmitter.emit('gps.location.received', payload);
// слушатель (в том же процессе)
@OnEvent('gps.location.received')
async handleLocation(payload: GpsPayload) { /* обработка, сохранение */ }EventEmitter — обычный объект в памяти. emit() находит слушателя только внутри своего процесса. Событие, выпущенное в Worker A, до Worker B не доходит.
Как это выглядит на практике. У нас GPS-точки обрабатывались через этот in-process поток. Каждый входящий запрос ОС направляет в случайный worker: одна точка в Worker 3, следующая в Worker 7. Если какое-то звено цепочки предполагает, что «мы в одном процессе», точка, попавшая в другой worker, не подключается к этой цепочке и тихо теряется — без ошибки, без лога.
В итоге баг становится плавающим: у одного пользователя работает, у другого нет; сегодня работает, завтра нет — потому что всё зависит от того, в какой worker попал запрос. У нас на поиск ушло ~7 дней, в основном потому что сначала мы смотрели на базу. На деле данные вообще не доходили до базы — они терялись в зазоре между worker'ами.
Диагностический признак: если данные теряются «на полпути», база здорова, а ошибка воспроизводится у случайных пользователей/в случайное время — первый подозреваемый не хранилище, а межпроцессная передача.
Проблема 2: @Cron дублируется в каждом worker'е
Та же природа «отдельного процесса» размножает запланированные задачи.
@Cron(CronExpression.EVERY_10_MINUTES)
async cleanupOldData() { /* удаление старых записей */ }Каждый worker регистрирует этот декоратор независимо. 8 worker'ов = задача запускается каждые 10 минут 8 раз параллельно: 8 одновременных удалений в базе, race condition, и любой отчёт/уведомление в 8 копиях.
Анти-паттерн: gating через «leader worker»
Первое, что приходит в голову — назначить один worker «главным» и давать singleton-задачи только ему:
for (let i = 0; i < numCPUs; i++) {
if (i === 0) process.env['PROCESS_WORKER_ID'] = '0';
cluster.fork();
}
// затем везде: if (process.env.PROCESS_WORKER_ID === '0') { ... }Работает, но это не решение, а пластырь. Почему:
- Нужно оборачивать вручную — забыл одно место, дубликат вернулся.
- Нет failover — умер «leader», cron'ы полностью остановились, перевыборов нет.
- Не решает проблему событий — входящие запросы всё равно попадают в случайный worker.
Настоящий вопрос другой: как worker'ы координируются между собой?
Решение: вынести координацию за пределы процесса
И EventEmitter, и @Cron рассчитаны на один процесс. Значит, несколько процессов должны координироваться через общую внешнюю точку. Практический инструмент — BullMQ поверх Redis.
1. Перенести cron на scheduler BullMQ
Вместо @Cron — upsertJobScheduler. Его вызывают все worker'ы, но BullMQ хранит scheduler в Redis под одним id и объединяет дубликаты (dedupe):
@Processor(GPS_CLEANUP_QUEUE)
export class GpsSchedule extends WorkerHost implements OnModuleInit {
constructor(
@InjectQueue(GPS_CLEANUP_QUEUE) private cleanupQueue: Queue,
) {
super();
}
async onModuleInit() {
// вызывают все 8 worker'ов — в Redis остаётся один scheduler
await this.cleanupQueue.upsertJobScheduler(
'gps-cleanup-scheduler',
{ pattern: CronExpression.EVERY_DAY_AT_MIDNIGHT, tz: 'Asia/Tashkent' },
{ name: 'cleanup-old-data', data: { type: 'cleanup-old-data' } },
);
}
// job выполняется глобально только в ОДНОМ worker'е
async process(job: Job) {
return this.handleCleanup(job);
}
}Результат: cron запускается глобально один раз (независимо от числа worker'ов), gating через PROCESS_WORKER_ID больше не нужен, failover приходит бесплатно.
2. События, пересекающие границу worker'а, отправлять через очередь
Данные, которые выпускаются в одном процессе, но должны быть надёжно обработаны, идут не через in-process emit(), а в очередь — теперь они не в памяти, а в Redis:
// вместо emit
await this.gpsQueue.add('location.received', payload);@Processor('gps')
export class GpsProcessor extends WorkerHost {
async process(job: Job) {
await this.processLocation(job.data); // возьмёт любой worker
}
}Теперь не важно, в какой worker попала точка: она лежит в очереди Redis, и даже если принимающий worker умрёт, job останется и будет повторён (retry). Эту гарантию in-process EventEmitter дать не может.
Когда очередь использовать НЕ нужно
Переносить каждое событие в очередь — обратная ошибка. Если emit и слушатель всегда в одном worker'е, в рамках одного запроса (например, межмодульный сигнал внутри request), in-process EventEmitter уместен и быстрее. Очередь добавляй только в двух случаях:
- если данные должны пересечь границу worker'а, или
- если работа должна выполниться гарантированно один раз (с retry).
Иначе вы лишь добавили лишнюю инфраструктуру и задержку.
Практический вывод (чеклист)
Перед переводом приложения в режим cluster или PM2 cluster, или на несколько реплик, проверьте:
- `@Cron` / `setInterval` / `@Interval` — дублируются в каждом worker'е. Перенесите scheduler на Redis-backed очередь (BullMQ
upsertJobScheduler). - `EventEmitter` / `@OnEvent` — если издатель и слушатель могут оказаться в разных worker'ах, используйте очередь или Redis Pub/Sub.
- Состояние в памяти —
Map/Set/кэш в переменной, сессии, счётчики rate-limit не делятся между worker'ами. Вынесите общее состояние в Redis. - «Одноразовые» задачи — не полагайтесь на ручной gating вроде
PROCESS_WORKER_ID === '0'; отдайте dedupe инфраструктуре.
Общее правило: в multi-core задавайте вопрос «где должно жить это состояние — внутри процесса или вне его?» заранее для каждой глобальной задачи. Чаще всего ответ — «вне», и решить это заранее дешевле, чем потом.