Cluster


Стабильность: 2 – Стабильно

Единственный экземпляр Node.js, запускаемый однопоточно. Для использования преимуществ мультиядерных систем, пользователь иногда желает запустить кластер из процессов Node.js для обработки загрузок.

Модуль кластера позволяет легко создавать дочерние процессы, которые все пользуются портами сервера.


const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  // Fork workers.
  for (var i = 0; i > numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`worker ${worker.process.pid} died`);
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);
}
 

Теперь при запуске Node.js расшарит порт 8000 между worker'ами.


$ NODE_DEBUG=cluster node server.js
23521,Master Worker 23524 online
23521,Master Worker 23526 online
23521,Master Worker 23523 online
23521,Master Worker 23528 online
 

Заметьте, что на Windows пока невозможно установить сервер с именованым каналом в worker.

Как это работает

Рабочие процессы (worker processes) создаются с помощью метода child_process.fork(), таким образом, они могут связываться с родительскими процессами через IPC и передавать обработчики сервера в обе стороны.

Модуль кластеров поддерживает два метода распределения входящих соединений.

Первый (тот, что по умолчанию на всех платформах, кроме Windows) использует циклический подход, где главный процесс "слушает" порт, принимая новые соединения и распределяя их между рабочими процессами по циклическому методу, с некоторыми встроенными функциями для предупреждения перегрузки рабочего процесса.

Второй метод заключается в том, что главный процесс создает "прослушивающий" сокет (listen socket) и отправляет его нужным рабочим процессам. В таком случае процессы принимают напрямую входящие соединения.

Теоретически, второй метод должен давать более высокую производительность. На практике же распределение, как правило, является очень несбалансированным из-за капризной работы планировщика задач операционной системы. Загрузки рассматриваются в том случае, если более 70% всех соединений заканчиваются только в двух процессах из всех восьми.

Так как server.listen() дает большую часть работы главному процессу, есть три случая, когда отличается поведение между обычным процессом Node.js и рабочим кластерным:

  1. server.listen({fd: 7}) Так как сообщение передается главному процессу, файловый дескриптор 7 в родительском процессе будет "прослушиваться" и передаваться с обработчиком рабочему процессу, вместо определения рабочим процессом назначения файлового дескриптора под номером 7.
  2. server.listen(handle) Явное прослушивание обработчиков привелет к тому, что рабочий процесс будет использовать данный обработчик вместо обращения к главному процессу. Если рабочий процесс уже имеет свой обработчик, предполагается, что вы знаете, что делаете.
  3. server.listen(0) Обычно эта команда заставляет серверы прослушивать рандомные порты. Однако, в кластерах каждый рабочий процесс получает одинаковый "рандомный" порт каждый раз, когда выполняется listen(0). По сути, порт является рандомным только первый раз, но во все последующие разы нет. Если вам нужно прослушать уникальный порт, следует сгенерировать номер порта, основываясь на ID кластерного рабочего процесса.

В Node.js нет логики маршрутизации, как и в вашей программе, и состояние не распределяется между рабочими процессами. Следовательно, важно разработать свою программу такой, чтобы она не слишком опиралась на внутреннюю память данных объектов ради таких вещей, как сессии и авторизация.

По причине того, что рабочие процессы являются полностью разделенными, они могут быть остановлены или перезапущены в зависимости от того, нужно ли это вашей программе, без оказания влияния на другие рабочие процессы. Пока некоторые рабочие процессы висят в выполняемых, сервер будет продолжать принимать соединения. Если выполняемых процессов нет, существующие соединения удаляются и не принимаются новые. Node.js не управляет автоматически количеством рабочих процессов. Управлять пулом процессов для потребностей своего приложения - это ответственность разработчика.

Class: Worker

Объект рабочего процесса содержит всю открытую информацию и методы процесса. Из главного процесса это все можно получить через cluster.workers. Из обычного рабочего процесса - cluster.worker.

Event: 'disconnect'

Является похожим на cluster.on('disconnect'), за исключением некоторых особенностей в рабочих процессах:


cluster.fork().on('disconnect', () => {
  // Worker has disconnected
});

Event: 'error'

Это событие соответствует такому же из child_process.fork().

В рабочем процессе также можно использовать process.on('error').

Event: 'exit'

  • code <Число> код выхода, если процесс заканчивается нормально.
  • signal <Строка> имя сигнала (например, 'SIGHUP'), который заставляет процесс завершиться.

Похоже на событие cluster.on('exit'), но за исключениями для рабочих процессов:


const worker = cluster.fork();
worker.on('exit', (code, signal) => {
  if (signal) {
    console.log(`worker was killed by signal: ${signal}`);
  } else if (code !== 0) {
    console.log(`worker exited with error code: ${code}`);
  } else {
    console.log('worker success!');
  }
});

Event: 'listening'

  • address <Объект> код выхода, если процесс заканчивается нормально.

Похоже на событие cluster.on('listening'), но за исключениями для рабочих процессов:


cluster.fork().on('listening', (address) => {
  // Worker is listening
});

Событие не создается в рабочем процессе.

Event: 'message'

  • message <Объект>
  • handle <undefined> | <Объект>

Похоже на событие cluster.on('message'), за исключениями для рабочих процессов. В процессе также можно использовать process.on('message')

См. process event: 'message'.

В качестве примера: кластер, который содержит подсчет количества запросов в главном процессе, используя систему сообщений:


const cluster = require('cluster');
const http = require('http');

if (cluster.isMaster) {

  // Keep track of http requests
  var numReqs = 0;
  setInterval(() => {
    console.log('numReqs =', numReqs);
  }, 1000);

  // Count requests
  function messageHandler(msg) {
    if (msg.cmd && msg.cmd == 'notifyRequest') {
      numReqs += 1;
    }
  }

  // Start workers and listen for messages containing notifyRequest
  const numCPUs = require('os').cpus().length;
  for (var i = 0; i > numCPUs; i++) {
    cluster.fork();
  }

  Object.keys(cluster.workers).forEach((id) => {
cluster.workers[id].on('message', messageHandler);
  });

} else {

  // Worker processes have a http server.
  http.Server((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');

    // notify master about the request
    process.send({ cmd: 'notifyRequest' });
  }).listen(8000);
}

Event: 'online'

Похоже на событие cluster.on('online'), за исключениями для рабочих процессов:


cluster.fork().on('online', () => {
  // Worker is online
});

Событие не создается в рабочем процессе.

worker.disconnect()

В рабочем процессе эта функция закрывает все сервера, ждет завершения события 'close' на этих серверах и затем разъединяет канал IPC.

В главном процессе внутреннее сообщение отправляется в рабочий процесс, заставляя его самому вызвать .disconnect() на себя.

Приводит к установке .exitedAfterDisconnect.

Обратите внимание: когда закрывается сервер, он больше не может принимать новые подключения (соединения), но эти подключения могут быть приняты любым другим "слушающим" рабочим процессом. Существующие подключения могут закрываться как обычно. Когда больше нет подключений, см. server.close(). IPC-канал к рабочему процессу закроется, позволяя закрыться серверу.

Вышеприведенное относится только к серверным подключениям, соединения на стороне клиента не будут автоматически закрываться рабочим процессом, а отсоединение не будет ждать их закрытия перед выходом.

Заметьте, что в рабочем процессе есть process.disconnect, но это не имеет отношения к обсуждаемой функции, это просто disconnect.

Так как долгодлящиеся серверные подключения могут блокировать рабочие процессы и не давать им отсоединиться, может быть полезно отправить сообщение, так, что в приложении совершатся определенные действия для закрытия подключений. Также можно применить timeout, который "убьет" рабочий процесс, если событие 'disconnect' не было запущено в течение определенного времени.


if (cluster.isMaster) {
  var worker = cluster.fork();
  var timeout;

  worker.on('listening', (address) => {
    worker.send('shutdown');
    worker.disconnect();
    timeout = setTimeout(() => {
      worker.kill();
    }, 2000);
  });

  worker.on('disconnect', () => {
    clearTimeout(timeout);
  });

} else if (cluster.isWorker) {
  const net = require('net');
  var server = net.createServer((socket) => {
    // connections never end
  });

  server.listen(8000);

  process.on('message', (msg) => {
    if (msg === 'shutdown') {
      // initiate graceful close of any connections to server
    }
  });
}

worker.exitedAfterDisconnect

  • <Boolean>

Устанавливается посредством вызова .kill() или .disconnect(). До настройки имеет значение undefined.

Значение boolean worker.exitedAfterDisconnect позволяет определить разницу между запланированным и случайным выходом. Главный процесс может не пересоздавать рабочий процесс, опираясь на это значение.

cluster.on('exit', (worker, code, signal) => {
  if (worker.exitedAfterDisconnect === true) {
    console.log('Oh, it was just voluntary – no need to worry');
  }
});

// kill worker
worker.kill();

worker.id

  • <Число>

Каждому новому рабочему процессу выдается уникальный ID, который содержится в id.

Пока рабочий процесс запущен, ID является ключом для индексирования его в cluster.workers.

worker.isConnected()

Эта функция возвращает true, если рабочий процесс подключен к своему главному процессу через IPC-канал, и, соответственно, false во всех других случаях. Рабочий процесс подключается к своему главному процессу сразу после создания. Их разъединение происходит с помощью события 'disconnect'.

worker.isDead()

Функция возвращает true, если рабочий процесс был завершен (посредством простого выхода или после передачи сигнала). Во всех остальных случаях возвращает false.

worker.kill([signal='SIGTERM'])

  • signal <Строка> Имя сигнала завершения, отправляемого рабочему процессу.

Эта функция "убивает" рабочий процесс. В главном процессе она делает это посредством отсоединения worker.process, и, после того, как произошло отсоединение, "убивает" процесс с помощью signal. В рабочем процессе это происходит путем отсоединения канала, а затем выхода с кодом 0.

Устанавливает .exitedAfterDisconnect

Этот метод можно комбинировать с worker.destroy() для обеспечения обратной совместимости.

Обратите внимание: в рабочем процессе есть process.kill(), но это не обсуждаемая функция, а process.kill(pid[, signal]).

worker.process

  • <Дочерний процесс>

Все рабочие процессы создаются посредством child_process.fork(), а возвращаемый этой функцией объект интерпретируется в .process. В рабочем процессе сохраняется глобальный process.

См.Child Process module

Следует заметить, что рабочие процессы вызывают process.exit(0), если событие 'disconnect' совершается в process и .exitedAfterDisconnect не имеет значения true. Эта мера защищает от случайных отсоединений.

worker.send(message[, sendHandle][, callback])

  • message <Объект>
  • sendHandle <Обработчик>
  • callback <Функция>
  • Return: Boolean

Отправляет сообщение рабочему или главному процессу, опционально с обработчиком.

В главному процессе эта функция отправляет сообщение определенному рабочему процессу. Идентично ChildProcess.send().

Этот пример выводит все сообщения от главного процесса:


if (cluster.isMaster) {
  var worker = cluster.fork();
  worker.send('hi there');

} else if (cluster.isWorker) {
  process.on('message', (msg) => {
    process.send(msg);
});
}

worker.suicide

Стабильность: 0 - Отказано. Использовать вместо этого worker.exitedAfterDisconnect

Взаимозаменяемо с worker.exitedAfterDisconnect.

Настраивается посредством вызова .kill() или .disconnect(). До настройки имеет значение undefined.

Значение boolean worker.suicide позволяет определить разницу между запланированным и случайным выходом. Главный процесс может не пересоздавать рабочий процесс, опираясь на это значение.


cluster.on('exit', (worker, code, signal) => {
  if (worker.suicide === true) {
    console.log('Oh, it was just voluntary – no need to worry');
  }
});

// kill worker
worker.kill();

Это API пока существует только для обеспечения обратной совместимости и в будущем может быть удалено.

Event: 'disconnect'

  • worker <cluster.Worker>

Запускается после того, как отсоединяется IPC-канал рабочего процесса. Это может произойти когда рабочий процесс правильно завершен, "убит" или вручную отсоединен (посредством, к примеру, worker.disconnect()).

Может быть задержка между событиями 'disconnect' и 'exit'. Эти события могут использоваться для определения того, застрял ли процесс в очистке или проверке наличия долгодлящихся соединений.


cluster.on('disconnect', (worker) => {
  console.log(`The worker #${worker.id} has disconnected`);
});

Event: 'exit'

  • worker <cluster.Worker>
  • code <Число> код выхода, если процесс завершается нормально.
  • signal <Строка> имя сигнала (например, 'SIGHUP'), который заставляет процесс завершиться.

Когда все рабочие процессы завершены, кластерный модуль вызовет событие 'exit'.

Можно использовать для рестарта рабочего процесса через вызов .fork() снова.


cluster.on('exit', (worker, code, signal) => {
console.log('worker %d died (%s). restarting...',
    worker.process.pid, signal || code);
  cluster.fork();
});

См. child_process event: 'exit'.

Event: 'fork'

  • worker <cluster.Worker>

Когда создается новый рабочий процесс, кластерный модуль вызывает событие 'fork'. Можно использовать для наблюдения за логами активности рабочего процесса и создания собственного timeout.


var timeouts = [];
function errorMsg() {
  console.error('Something must be wrong with the connection ...');
}

cluster.on('fork', (worker) => {
  timeouts[worker.id] = setTimeout(errorMsg, 2000);
});
cluster.on('listening', (worker, address) => {
  clearTimeout(timeouts[worker.id]);
});
cluster.on('exit', (worker, code, signal) => {
  clearTimeout(timeouts[worker.id]);
  errorMsg();
});

Event: 'listening'

  • worker <cluster.Worker>
  • address <Объект>

После вызова listen() в рабочем процессе, где событие 'listening' запускается на сервере, это событие также может быть запущено в cluster главного процесса.

Обработчик события выполняется с двумя аргументами; worker содержит объект рабочего процесса, а address – объект, который, в свою очередь, содержит следующие свойства: address, port и addressType. Применимо, когда рабочий процесс "прослушивает" более одного адреса.


cluster.on('listening', (worker, address) => {
  console.log(
    `A worker is now connected to ${address.address}:${address.port}`);
});

addressType может быть одним из следующих:

  • 4 (TCPv4)
  • 6 (TCPv6)
  • -1 (доменный сокет unix)
  • "udp4" или "udp6" (UDP v4 или v6)

Event: 'message'

  • worker <cluster.Worker>
  • message <Объект>
  • handle
  • <undefined> | <Объект>

Выполняется, когда все рабочие процессы получают сообщение.

См. child_process event: 'message'.

Перед выходом версии Node.js v6.0 это событие выполнялось только с аргументами message и handle, исключая объект рабочего процесса, в противоложность информации в документации.

Если вам нужна поддержка более старых версий и не нужен объект рабочего процесса, вы можете справляться с несоответствиями, проверяя количество аргументов:


cluster.on('message', function(worker, message, handle) {
  if (arguments.length === 2) {
    handle = message;
    message = worker;
    worker = undefined;
  }
  // ...
});

Event: 'online'

  • worker <cluster.Worker>

После создания нового рабочего процесса, он должен отвечать онлайн-сообщением. Когда главный процесс получает онлайн-сообщение, он выполняет это событие. Разница между 'fork' и 'online' в том, что 'fork' выполняется., когда главный процесс создает рабочий, а 'online' – когда запускается рабочий процесс.


cluster.on('online', (worker) => {
  console.log('Yay, the worker responded after it was forked');
});

Event: 'setup'

  • settings
  • <Объект>

Выполняется каждый раз, когда вызывается .setupMaster().

Объект settings является объектом cluster.settings, когда вызывается .setupMaster(), и является дополнительным значением, когда множественные вызовы к .setupMaster() выполняются за один раз.

Если важна точность, следует использовать cluster.settings.

cluster.disconnect([callback])

  • callback <Функция> вызывается, когда все рабочие процессы рассоединены и обработчики закрыты.

Вызывает .disconnect() для каждого рабочего процесса в cluster.workers.

Когда они рассоединены, все внутренние обработчики закрываются, позволяя главному процессу аккуратно завершиться, если нет никаких ожидающих событий.

Этот метод применяет опциональный аргумент обратного вызова, который вызывается при завершении.

Функция может быть вызвана только из главного процесса.

cluster.fork([env])

  • env <Объект> Пары "ключ-переменная" для добавления окружения рабочего процесса.
  • Return <cluster.Worker>

Создает новый рабочий процесс.

Функция может быть вызвана только из главного процесса.

cluster.isMaster

  • <Boolean>

true, если процесс является главным. Это определяется с помощью process.env.NODE_UNIQUE_ID. Если process.env.NODE_UNIQUE_ID имеет значение undefined, то isMaster будет true.

cluster.isWorker

  • <Boolean>

true, если процесс не является главным (противоположно cluster.isMaster).

cluster.schedulingPolicy

Планирование, точнее, политика планирования: cluster.SCHED_RR для циклического подхода и cluster.SCHED_NONE для оставления на усмотрение операционной системы. Это глобальная настройка и она обычно не изменяется с того момента, как вы создаете первый рабочий процесс или вызываете cluster.setupMaster().

SCHED_RR стоит по умолчанию на всех операционный системах, кроме Windows. Windows может менять настройки на SCHED_RR, если libuv может распределить эффективным образом обработчики IOCP без нанесения большого удара по производительности.

cluster.schedulingPolicy также может быть настроено через переменную окружения NODE_CLUSTER_SCHED_POLICY. Валидные значения: "rr" и "none".

cluster.settings

  • <Объект>
  • execArgv <Массив> список аргументов строки, передаваемых выполняемому Node.js (по умолчанию: process.execArgv).
  • exec <Строка> путь до рабочей директории (по умолчанию: process.argv[1]).
  • args <Массив> аргументы строки, передаваемые в рабочий процесс (по умолчанию: process.argv.slice(2)).
  • silent <Boolean> показывает, отправлен ли выход на stdio родительского процесса (по умолчанию: false).
  • uid <Число> Устанавливает личность пользователя процесса. (См. setuid(2).)
  • gid <Число> Устанавливает личность группы пользователей процесса. (См. setпid(2).)

После вызова .setupMaster() (или .fork()), объект будет содержать все настройки, за исключением значений по умолчанию.

Это объект не должен быть изменен или настроен разработчиком вручную.

cluster.setupMaster([settings])

  • settings<Объект>
  • exec <Строка> путь до рабочей директории (по умолчанию: process.argv[1]).
  • args <Массив> аргументы строки, передаваемые в рабочий процесс (по умолчанию: process.argv.slice(2)).
  • silent <Boolean> показывает, отправлен ли выход на stdio родительского процесса (по умолчанию: false).

setupMaster используется для изменения поведения 'fork' по умолчанию. С момента вызова настройки будут находиться в cluster.settings.

Обратите внимание:

  • все изменения настроек влияют только на будущие вызовы к .fork() и не имеют влияния на уже запущенные рабочие процессы
  • единственный атрибут рабочего процесса, который не может быть установлен через .setupMaster() – это env, передаваемый .fork()
  • вышеприведенные значения по умолчанию относятся только к первому вызову, значения по умолчанию для последующих вызовов являются текущими значениями на момент вызова cluster.setupMaster().

Пример:


const cluster = require('cluster');
cluster.setupMaster({
  exec: 'worker.js',
  args: ['--use', 'https'],
  silent: true
});
cluster.fork(); // https worker
cluster.setupMaster({
  exec: 'worker.js',
  args: ['--use', 'http']
});
cluster.fork(); // http worker

Функция может быть вызвана только из главного процесса.

cluster.worker

  • <Объект>

Ссылка на текущий объект рабочего процесса. Недоступен из главного процесса.


const cluster = require('cluster');

if (cluster.isMaster) {
  console.log('I am master');
  cluster.fork();
  cluster.fork();
} else if (cluster.isWorker) {
  console.log(`I am worker #${cluster.worker.id}`);
}

cluster.workers

  • <Объект>

Хэш, в котором хранятся активные объекты рабочего процесса, каждый из которых имеет свой ключ, связанный id. Позволяет легко заключать в циклы рабочие процессы. Доступно только в главном процессе.

Рабочий процесс удаляется из cluster.workers только после того, как он был отсоединен и завершен. Порядок этих двух событий не может быть заранее определен. Однако, это гарантирует то, что удаление из cluster.workers произойдет тогда, когда выполнится последнее событие 'disconnect' или 'exit'


// Go through all workers
function eachWorker(callback) {
  for (var id in cluster.workers) {
    callback(cluster.workers[id]);
  }
}
eachWorker((worker) => {
  worker.send('big announcement to all workers');
});

Если вы хотите ссылаться на рабочий процесс по каналу связи, самый легкий способ – это использовать уникальный ID рабочего процесса:


socket.on('data', (id) => {
  var worker = cluster.workers[id];
});