Stream


Стабильность: 2 – Стабильная версия

Стрим – абстрактный интерфейс для работы с поступающими данными в Node.js. Модуль stream предоставляет базовый API, который облегчает создание объектов, что реализуют интерфейс стрима.

Node.js предоставляет много объектов стрима. Например, запрос на HTTP сервер и process.stdout – оба являются экземплярами стрима.

Стримы могут быть открытыми для чтения, открытыми для записи или открытыми для того и другого одновременно. Все стримы являются экземплярами EventEmitter.

Модуль stream доступен отсюда:


const stream = require('stream');

Всем пользователям Node.js важно понимать, как работают стримы, и модуль stream сам по себе является самым удобным для разработчиков, которые создают новые типы экземпляров стримов. Разработчики, которые изначально изучили объекты стримов, редко будут нуждаться (если вообще будут) в прямом использовании модуля stream.

Организация этого документа

Этот документ разделен на две основных части, третья часть – для дополнительных примечаний. Первая часть рассказывает об элементах API стрима, которые требуется использовать для стримов в приложении. Вторая – об элементах API, которые нужны для реализации новых типов стримов.

Типы стримов

В Node.js есть четыре основных типа стримов:

  • Открытый для чтения (Readable) – стримы, данные в которых могут быть прочитаны (например, fs.createReadStream())
  • Открытый для записи (Writable) – стримы, в которые можно записывать данные (например, fs.createWriteStream())
  • Спаренные (Duplex) – открытые и для чтения, и для записи одновременно (например, net.Socket)
  • Трансформеры (Transform) – спаренные стримы, которые могут изменять и преобразовывать данные при чтении и записи (например, zlib.createDeflate())

Объектный режим (Object Mode)

Все стримы, созданные Node.js API оперируют исключительно строками и объектами буфера Buffer. Однако, по возможности для реализации работы стримов с другими типами значений JavaScript (с иключением null, который имеет специальное назначение в стримах). Стримы, которые могут это делать, работают в “object mode” – объектном режиме.

Экземпляры стримов переключаются в объектный режим с помощью опции objectMode при создании стрима. Попытки переключить существующий стрим в объектный режим могут быть небезопасными.

Буферизация (Buffering)

Как открытые для записи, так и открытые для чтения стримы сохраняют данные во внутреннем буфере, извлечь информацию из которого можно посредством writable._writableState.getBuffer() или readable._readableState.buffer соответственно.

Объем потенциально буферизируемых данных зависит от передачи опции highWaterMark в конструктор стримов. Для нормальных стримов опция highWaterMark задает общее количество байт. Для стримов, работающих в объектном режиме, highWaterMark задает общее количество объектов.

Данные буферизируются в открытых для чтения стримах, когда при реализации вызывается stream.push(chunk). Если получатель стрима не вызывает stream.read(), то данные остаются во внутренней очереди, пока не будут получены.

Когда общий размер внутреннего читаемого буфера достигает значения, заданного highWaterMark, стрим временно останавливает чтение данных из предустановленного ресурса, пока данные, которые на данный момент буферизируются, не будут получены (это значит, что стрим прекратит вызывать внутренний метод readable._read(), который используется для заполнения читаемого буфера).

Данные буферизируются в открытые для записи стримы, когда постоянно вызывается метод writable.write(chunk). Если общий размер внутреннего записывающего буфера меньше значения, установленного highWaterMark, вызов writable.write() вернет true. Если размер внутренного буфера достигает или превосходит highWaterMark, возвращается false.

Ключевая цель API stream, а в частности метода stream.pipe() – ограничить буферизацию данных до приемлемых уровней, таких, чтобы источники и направления различных скоростей не подавляли доступную память.

Так как спаренные стримы и трансформеры являются открытыми и для чтения, и для записи, каждый поддерживает два раздельных внутренних буфера для чтения и записи, позволяя каждой части работать независимо от другой при сохранении надлежащего и эффективного потока данных. Например, экземпляры net.Socket являются спаренными стримами, чья открытая для чтения часть позволяет потребление данных, полученных из сокета, и чья открытая для записи часть позволяет записи данных в сокет. Так как данные могут записываться в сокет быстрее или медленнее, чем получаются из сокета, важно, чтобы каждая часть работала независимо от другой.

API для потребителей стримов

Почти все приложения Node.js, вне зависимости от их простоты, так или иначе используют стримы. Следующий пример показывает использование стримов в приложении Node.js, которое реализует HTTP сервер:


const http = require('http');

const server = http.createServer( (req, res) => {
  // req -  http.IncomingMessage, открытый для чтения стрим
  // res - http.ServerResponse, открытый для записи стрим

  let body = '';
  // получение данных в виде строк utf8 
  // если не задана кодировка, будут получаться объекты буфера
  req.setEncoding('utf8');

  // Открытый для чтения стрим генерирует событие 'data', если добавлен слушатель
  req.on('data', (chunk) => {
    body += chunk;
  });

  // событие завершения показывает, что все было получено
  req.on('end', () => {
    try {
      const data = JSON.parse(body);
      // пишет что-то интересное пользователю
      res.write(typeof data);
      res.end();
    } catch (er) {
      // Ой, ошибка json!
      res.statusCode = 400;
      return res.end(`error: ${er.message}`);
    }
  });
});

server.listen(1337);

// $ curl localhost:1337 -d '{}'
// object
// $ curl localhost:1337 -d '"foo"'
// string
// $ curl localhost:1337 -d 'not json'
// error: Unexpected token o

Открытые для записи стримы (как res в примере) работают с методами по типу write() и end(), которые используются для записи данных в стрим.

Открытые для чтения стримы используют EventEmitter API для уведомления кода приложения, когда данные доступны для чтения из стрима. Эти доступные данные можно прочесть из стрима несколькими способами.

Открытые для чтения и для записи стримы используют EventEmitter API различными способами для обсуждения текущего состояния стрима.

Спаренные стримы и трансформеры являются открытыми для чтения и для записи одновременно.

Приложения, которые либо записывают данные, либо получают данные из стрима, не требуют прямой реализации интерфейсов стрима и обычно не нуждаются в вызове require(‘stream’).

Разработчики, которые хотят реализовать новые типы стримов, должны ссылаться на раздел API для реализации стримов.

Открытые для записи стримы

Открытые для записи стримы являются абстракцией для конечной точки, куда записываются данные.

Примеры открытых для записи стримов включают в себя:

  • HTTP запросы на клиенте
  • HTTP ответы на сервере
  • fs write стримы
  • zlib стримы
  • crypto стримы
  • TCP сокеты
  • stdin дочернего процесса
  • process.stdout, process.stderr.

Примечание: некоторые из этих примеров являются настоящими спаренными стримами, которые реализуют открытый для записи интерфейс.

Все открытые для записи стримы реализуют интерфейс, определенный классом stream.Writable.

Так как заданные экземпляры открытых для записи стримов могут отличаться, все открытые для записи стримы следуют одному и тому же основному шаблону использования, как показано в этом примере:


const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data');

Class: stream.Writable

Добавлено в v0.9.4

Event: ‘close’

Добавлено в v0.9.4

Событие ‘close’ генерируется, когда стрим и любой из его предустановленных ресурсов (к примеру, файловый дескриптор) закрываются. Событие показывает, что никакие события больше не генерируются, и не осуществляются расчеты.

Не все открытые для записи стримы генерируют событие ‘close’.

Event: ‘drain’

Добавлено в v0.9.4

Если при вызове stream.write(chunk) возращается false, событие ‘drain’ генерируется, когда нужно восстановить запись данных в стрим.


// Запись данных в предоставленный открытый для записи стрим миллион раз
// Будьте внимательны к противодействию
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000;
  write();
  function write() {
    var ok = true;
    do {
      i--;
      if (i === 0) {
        // последний раз!
        writer.write(data, encoding, callback);
      } else {
        // смотрим, будем ли мы продолжать или ждать
        // не передавайте функцию обратного вызова, так как это еще не все
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // нужно остановиться раньше!
      // запишите еще парочку для события ‘drain’
      writer.once('drain', write);
    }
  }
}

Event: ‘error’

Добавлено в v0.9.4
  • <Ошибка>

Событие ‘error’ генерируется при ошибке записи данных из пайпа. Слушатель функции обратного вызова передает единственный аргумент Error при вызове.

Примечание: стрим не закрывается, когда генерируется событие ‘error’.

Event: ‘finish’

Добавлено в v0.9.4

Событие ‘finish’ генерируется после вызова метода stream.end() и после того, как все данные были сброшены в предустановленную систему.


const writer = getWritableStreamSomehow();
for (var i = 0; i < 100; i ++) {
  writer.write('hello, #${i}!\n');
}
writer.end('This is the end\n');
writer.on('finish', () => {
  console.error('All writes are now complete.');
});

Event: ‘pipe’

Добавлено в v0.9.4
  • src <stream.Readable> источниковый стрим, который передается в открытый для записи

Событие ‘pipe’ генерируется, когда вызывается метод stream.pipe() на открытом для чтения стриме, добавляя открытый для записи стрим в настройки направления.


const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
  console.error('something is piping into the writer');
  assert.equal(src, reader);
});
reader.pipe(writer);

Event: ‘unpipe’

Добавлено в v0.9.4
  • src <Открытый для чтения стрим> Источниковый стрим, который не передает открытый для записи

Событие ‘unpipe’ генерируется, когда метод stream.unpipe() вызывается на открытом для чтения стриме, удаляя открытый для записи стрим из настроек направления.


const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
  console.error('Something has stopped piping into the writer.');
  assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);

writable.cork()

Добавлено в v0.11.2

Метод writable.cork() заставляет все записанные данные буферизироваться в памяти. Буферизированные данные будут сброшено либо с помощью stream.uncork(), либо stream.end().

Изначально writable.cork() предназначался для избегания ситуаций, когда запись очень маленьких порций данных в стрим не вызывает бэк-ап во внутреннем буфере, что может иметь неблагоприятное воздействие на производительность. В таких ситуациях реализации, использующие метод writable._writev() могут выполнять буферизированные записи более оптимальным способом.

writable.end([chunk][, encoding][, callback])

Добавлено в v0.9.4
  • chunk <Строка> | <Буфер> | <любое другое значение> Опциональные данные для записи. Для стримов, не работающих в объектном режиме, chunk может быть строкой или буфером. Для стримов в объектном режиме chunk может быть любым значением JavaScript, отличным от null.
  • encoding <Строка> Кодировка, если chunk – строка.
  • callback <Функция> Опциональная функция обратного вызова, когда стрим завершается.

Вызов метода writable.end() сигнализирует о том, что в открытый для записи стрим больше не поступает данных на запись. Опциональные аргументы chunk и encoding позволяют последней дополнительной порции данных быть записанной непосредственно перед закрытием стрима. При предоставлении опциональной функции callback, она добавляется как слушатель события ‘finish’.

Вызов метода stream.write() после вызова stream.end() приведет к ошибке.


// запись 'hello, ' и последующий end 'world!'
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// последующая запись не разрешена!

writable.setDafaultEncoding(encoding)

Добавлено в v0.11.15
  • encoding <Строка> Новая кодировка по умолчанию
  • Возвращает this

Метод writable.setDafaultEncoding() настраивает кодировку по умолчанию для открытого для записи стрима.

writable.uncork()

Добавлено в v0.11.2

Метод writable.uncork() сбрасывает все данные, буферизированные после вызова stream.cork().

При использовании stream.cork() и stream.uncork() для управления буферизацией записей в стрим, рекомендуется отложить вызов writable.uncork() с помощью process.nextTick(). Это позволяет сгруппировать все вызовы writable.write(), которые происходят в фазе цикла событий Node.js.


stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork());

Если метод writable.cork() вызывается много раз в стриме, требуется такое же количество вызовов writable.uncork() для сброса буферизированных данных.


stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
  stream.uncork();
  // Данные не будут сброшены, пока uncork не будет вызван второй раз
  stream.uncork();
});

writable.write(chunk[, encoding][, callback])

Добавлено в v0.9.4
  • chunk <Строка> | <Буфер> Данные для записи
  • encoding <Строка> Кодировка, если chunk – строка.
  • callback <Функция> Функция обратного вызова, когда сбрасываются данные
  • Возвращает <Boolean>. False, если стрим требует код вызова подождать генерацию события ‘drain’ перед продолжением записи дополнительных данных, во всех других случаях – true.

Метод writable.write() записывает некоторые данные в стрим и вызывает callback, если данные были полностью обработаны. В случае ошибки, callback может или не может быть вызвана с ошибкой в качестве первого аргумента. Для надежного нахождения ошибок записи, добавьте слушатель события ‘error’.

Возвращаемое значение true, если внутренний буфер меньше, чем highWaterMark, сконфигурированный при создании стрима после принятия chunk. Если возвращается false, следующие попытки записи данных в стрим должны быть остановлены до генерации события ‘drain’.

Пока стрим не очищается, вызов write() буферизирует chunk и возвращает false. После очищения от всех буферизированных chunk (принимается для доставки операционной системой), генерируется событие ‘drain’. Рекомендуется, если write() хотя бы раз возвращает false, прекратить запись данных chunk пока не будет сгенерировано событие ‘drain’. Так как вызов write() на неочищенном стриме не разрешен, Node.js буферизирует все записанные chunk, пока память не будет максимально заполнена, и с этой точки буферизация прерывается. Даже перед прерыванием использование памяти в таком режиме может привести к плохой производительности корзины и высокому RSS (что нетипично возвращается в операционную систему, даже после того, как память больше не нужна). Так как TCP-сокеты могут вообще не очищаться, если удаленный пир не читает данные, запись неочищенного сокета может привести к уязвимости.

Запись данных в неочищенный стрим является частично проблематичной для трансформеров, так как стримы Transform приостанавливаются по умолчанию, пока им передаются пайпы или если добавляется обработчик событий ‘data’ или ‘readable’.

Если данные для записи могут быть сгенерированы или получены по запросу, рекомендуется встроить логику в открытый для чтения стрим и использовать stream.pipe(). Однако, если предпочтителен вызов write(), возможно принять противодействие и избежать проблем с использованием памяти с помощью события ‘drain’.


function write (data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb)
  } else {
    process.nextTick(cb)
  }
}

// Ожидается вызов cb перед последующей записью.
write('hello', () => {
  console.log('write completed, do more writes now')
})

Открытый для записи стрим в объектном режиме всегда игнорирует аргумент encoding.

Открытые для чтения стримы

Открытые для чтения стримы являются абстракцией для источника, из которого получаются данные.

Примеры открытых для чтения стримов включают:

  • HTTP ответы на клиенте
  • HTTP запросы на сервере
  • fs read стримы
  • zlib стримы
  • crypto стримы
  • TCP сокеты
  • stdout и stderr дочернего процесса
  • process.stdin.

Все открытые для чтения стримы реализуют интерфейс, определенный классом stream.Readable.

Два режима

Открытые для чтения стримы эффективно работают в одном из двух режимов: поток и пауза.

В режиме потока данные считываются из предустановленной системы автоматически и предоставляются в приложении настолько быстро, насколько это возможно, используя события из интерфейса EventEmitter.

В режиме паузы нужно явно вызвать метод stream.read() для чтения данных chunk из стрима.

Все открытые для чтения стримы начинаются в режиме паузы, но могут быть переключены в режим потока одним из следующих способов:

  • путем добавления обработчика событий ‘data’
  • вызовом метода stream.resume()
  • вызовом метода stream.pipe() для отправки данных в открытый для чтения стрим.

Открытый для чтения срим можно переключить в режим паузы таким образом:

  • если нет назначений pipe, путем вызова метода syream.pause()
  • если есть назначения pipe – удалением любых обработчиков события ‘data’ и удалением всех назначений pipe вызовом метода stream.unpipe().

Важно запомнить, что открытый для чтения стрим не будет генерировать данные, пока предоставляется получающий или игнорирующий механизм для этих данных. Если потребляющий механизм отключен, открытый для чтения стрим будет предпринимать попытки остановить генерацию данных.

Примечание: в связи с обратной совместимостью удаление обработчика событий 'data' не будет автоматически приостанавливать стрим. Также, если есть назначения pipe, вызов stream.pause() не будет гарантировать, что стрим останется приостановленным после очищения от этих назначений и запроса новых данный.

Три состояния

«Два режима» работы для открытого для чтения стрима являются упрощенной абстракцией более сложного внутреннего управления состояниями, которое появляется в реализации открытого для чтения стрима.

В частности, в любой заданной точке каждый открытый для чтения стрим пребывает в одном из трех возможных состояний:

  • readable._readableState.flowing = null
  • readable._readableState.flowing = false
  • readable._readableState.flowing = true

Когда readable._readableState.flowing имеет значение null, не предоставляется никакого механизма для получения данных из стрима, так что стрим не генерирует данных.

Добавление слушателя к событию ‘data’, вызов метода readable.pipe() или вызов readable.resume() переключает readable._readableState.flowing на true, заставляя открытый для чтения стрим активно генерировать события в связи с генерацией данных.

Вызов readable.pause(), readable.unpipe() или получение противодействия приводит к установке readable._readableState.flowing значения false, временно останавливая поток событий, но не останавливая генерацию данных.

Пока readable._readableState.flowing имеет значение false, данные могут быть собраны во внутреннем буфере стрима.

Выбрать один

API открытого для чтения стрима развивалось с выпуском версий Node.js и предоставляет множество методов получения данных из стримов. В общем, разработчики должны выбрать один единственный метод получения данных и не должны ни при каких обстоятельствах использовать разные методы для получения данных из одного стрима.

Использование метода readable.pipe() рекомендуется большинству пользователей, так как он был реализован для предоставления самого простого способа получения данных из стрима. Разработчики, которым нужен более продвинутый контроль над передачей и генерацией данных, могут использовать EventEmitter и readable.pause()/ readable.resume() API.

Class: stream.Readable

Добавлено в v0.9.4

Event: ‘close’

Добавлено в v0.9.4

Событие ‘close’ генерируется, когда стрим и любой из его предустановленных ресурсов (к примеру, файловый дескриптор) закрываются. Событие показывает, что никакие события больше не генерируются, и не осуществляются расчеты.

Не все открытые для чтения стримы генерируют событие ‘close’.

Event: ‘data’

Добавлено в v0.9.4
  • chunk <Буфер> | <Строка> |<любое другое значение> Порция данных. Для стримов, которые не работают в объектном режиме, chunk – строка или буфер. Для работающих в объектном режиме – любое значение JavaScript, отличное от 0.

Событие ‘data’ генерируется всякий раз, когда стрим уступает владение порцией данных получателю (потребителю). Такое случается, когда стрим переключается в режим потока путем вызова readable.pipe(), readable.resume() или добавления слушателя функции обратного вызова в событие ‘data’. Событие ‘data’ также генерируется, когда вызывается метод readable.read() и порция данных доступна для возвращения.

Добавление слушателя события ‘data’ в стрим, который не был явно приостановлен, переключает стрим в режим потока. Данные будут передаваться тогда, когда это снова станет возможным.

Слушатель функции обратного вызова передает порцию данных как строку, если задается кодировка по умолчанию для стрима с помощью метода readable.setEncoding(), в ином случае данные передаются как буфер.


const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});

Event: ‘end’

Добавлено в v0.9.4

Событие ‘end’ генерируется, когда получателю не поступают данные из стрима.

Примечание: событие ‘end’ не будет генерироваться пока данные не полностью получены. Это можно обойти, если переключить стрим в режим потока или вызвать с повторением stream.read() пока все данные не будут получены.


const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
  console.log('There will be no more data.');
});

Event: ‘error’

Добавлено в v0.9.4
  • <Ошибка>

Событие ‘error’ может генерироваться реализацией открытого для чтения стрима в любое время. Обычно это происходит, если предустановленный стрим не может сгенерировать данные по причине внутренней ошибки или если реализация стрима пытается передать недопустимую порцию данных.

Слушатель функции обратного вызова будет передавать единственный объект Error.

Event: ‘readable’

Добавлено в v0.9.4

Событие ‘readable’ генерируется, когда есть доступные данные для чтения из стрима. В некоторых случаях добавление слушателя событию ‘readable’ может привести к тому, что некоторый объем считываемых данных попадет во внутренний буфер.


const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  // есть данные для чтения
});

Событие ‘readable’ также генерируется после получения данных из стрима, но перед генерацией события ‘end’.

Событие ‘readable’ эффективно отображает, что у стрима есть новая информация: либо доступны новые данные, либо достигнут конец стрима. В первом случае stream.read() возвращает доступные данные. В последнем – stream.read() возвращает null. Например, в этом примере foo.txt – пустой файл:


const fs = require('fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
  console.log('readable:', rr.read());
});
rr.on('end', () => {
  console.log('end');
});

Вывод после запуска этого скрипта:


$ node test.js
readable: null
end

Примечание: Обычно механизмы событий readable.pipe() и ‘data’ более предпочтительны для использования, чем событие ‘readable’.

readable.isPaused()

  • Возвращает <Boolean>

Метод readable.isPaused() возвращает текущее рабочее состояние открытого для чтения стрима. Это используется в первую очередь механизмом, который предустанавливает метод readable.pipe(). В большинстве типичных случаев нет причин для прямого использования этого метода.


const readable = new stream.Readable

readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false

readable.pause()

Добавлено в v0.9.4
  • Возвращает this

Метод readable.pause() заставляет стрим в режиме потока остановить генерацию событий ‘data’, выключая режим потока. Любые данные, которые становятся доступными, будут оставаться во внутреннем буфере.


const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
  readable.pause();
  console.log('There will be no additional data for 1 second.');
  setTimeout(() => {
    console.log('Now data will start flowing again.');
    readable.resume();
  }, 1000);
});

readable.pipe(destination[, options])

Добавлено в v0.9.4
  • destination <stream.Writable> Назначение для записи данных
  • options <Объект> Опции пайпа
    • end <Boolean> Заканчивает запись, когда завершается чтение. По умолчанию: true.

Метод readable.pipe() добавляет открытый для записи стрим в readable, заставляя его автоматически переключиться в режим потока и добавить все данные в добавленный открытый для записи стрим. Поток данный будет автоматически управляться, так что назначение открытого для записи стрима не будет перегружено более быстрым открытым для чтения стримом.

Следующий пример показывает пайпы всех данных из readable в файл под названием file.txt:


const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'
readable.pipe(writable);

Возможно добавить множественные открытые для записи стримы в один открытый для чтения.

Метод readable.pipe() возвращает ссылку на стрим назначения, делая возможной настройку цепи стримов с пайпами:


const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);

По умолчанию stream.end() вызывается в открытом для записи стриме назначения, когда источник – открытый для чтения стрим генерирует ‘end’, так что стрим назначения не может больше оставаться открытым для записи. Для отключения этого дефолтного поведения, нужно передать опцию end как false, так, чтобы стрим назначения остался открытым. Пример:


reader.pipe(writer, { end: false });
reader.on('end', () => {
  writer.end('Goodbye\n');
});

Важная загвоздка: если открытый для чтения стрим генерирует ошибку во время обработки, стрим назначения не закрывается автоматически. Если происходит ошибка, нужно вручную закрыть каждый стрим, чтобы предотвратить утечки памяти.

Примечание: открытые для записи стримы process.stderr и process.stdout никогда не закрываются, пока существует процесс Node.js, вне зависимости от заданных опций.

readable.read([size])

Добавлено в v0.9.4
  • size <Число> Опциональный аргумент, который задает количество данных для чтения
  • Возвращает строку, буфер или null.

Метод readable.read() берет извлекает некоторые данные из внутреннего буфера и затем возвращает обратно. Если нет доступных для чтения данных, возвращается null. По умолчанию данные возвращаются как объект буфера, если не задана кодировка через readable.setEncoding() или если стрим не работает в объектном режиме.

Опциональный аргумент size задает определенное количество читаемых байт. Если байты size не доступны для чтения, будет возвращаться null, если стрим не был завершен, в случае чего все данные, оставшиеся во внутреннем буфере, будут возвращены (даже если это превосходит количество байтов size).

Если аргумент size не задан, все данные, содержащиеся во внутреннем буфере, будут возвращены.

Метод readable.read() должен быть вызван в открытых для чтения стримах, работающих в режиме паузы. В режиме потока readable.read() вызывается автоматически пока внутренний буфер не будет полностью очищен.


const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  var chunk;
  while (null !== (chunk = readable.read())) {
    console.log(`Received ${chunk.length} bytes of data.`);
  }
});

В общем, разработчикам рекомендуется избегать использования события ‘readable’ и метода readable.read() в пользу readable.pipe() или события ‘data’.

Открытый для чтения стрим в объектном режиме всегда возвращает единственный элемент от вызова readable.read(size), вне зависимости от значения аргумента size.

Примечание: если метод readable.read() возвращает порцию данных, генерируется событие ‘data’.

Еще примечание: вызов stream.read([size]) после генерации события ‘end’ возвращает null. Ошибка выполнения не выпадает.

readable.resume()

Добавлено в v0.9.4
  • Возвращает this

Метод readable.resume() вызывает явную приостановку восстановления открытым для чтения стримом генерации событий ‘data’, переключая стрим в режим потока.

Метод readable.resume() может использоваться для полного поглощения данных из стрима без их обработки, как показано в примере ниже:


getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('Reached the end, but did not read anything.');
  });

readable.setEncoding(encoding)

Добавлено в v0.9.4
  • encoding <Строка> Используемая кодировка
  • Возвращает this

Метод readable.setEncoding() устанавливает кодировку символов по умолчанию для считываемых из открытого для чтения стрима данных.

Установка кодировки приводит к тому, что данные из стрима возвращаются как строка с заданной кодировкой вместо объекта буфера. Например, вызов readable.setEncoding('utf8') делает так, что данные на выводе интерпретируются как данные UTF-8 и передаются как строки. Вызов readable.setEncoding('hex') кодирует данные в шестнадцатеричном строковом формате.

Отрытый для чтения стрим правильно обрабатывает многобайтовые символы, передающиеся через стрим, которые, тем не менее, могут быть неверно раскодированы, если их просто извлекать из стрима как объекты буфера.

Кодировка может быть отключена с помощью вызова readable.setEncoding(null). Это может оказаться полезным при работе с двоичными данными и с большими многобайтовыми строками, распространяющимися на множественные порции данных.


const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
  assert.equal(typeof chunk, 'string');
  console.log('got %d characters of string data', chunk.length);
});

readable.unpipe ([destination])

Добавлено в v0.9.4
  • destination <stream.Writable> Опционально заданный стрим для анпайпа.

Метод readable.unpipe() отключает открытый для записи стрим, который до этого был подключен посредством stream.pipe().

Если destination не задано, то отключаются все пайпы.

Если destination задано, но не настроен соответствующий пайп, то метод в таком случае ничего не делает.


const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// Все данные из открытого для чтения стрима направляются в'file.txt',
// но только первую секунду
readable.pipe(writable);
setTimeout(() => {
  console.log('Stop writing to file.txt');
  readable.unpipe(writable);
  console.log('Manually close the file stream');
  writable.end();
}, 1000);

readable.unshift (chunk)

Добавлено в v0.9.11
  • chunk <Буфер> | <Строка> Порция данных для обратного смещения в очередь на чтение.

Метод readable.unshift() вставляет порцию данных обратно во внутренний буфер. Это может пригодиться в определенных ситуациях, когда стрим поглощается кодом, которому нужно «вернуть» некоторое количество данных, которые были получены из источника, так что эти данные могут быть переданы в любую другую область.

Примечание: метод stream.unshift(chunk) не может быть вызван после генерации события ‘end’ или после появления ошибки выполнения.

Разработчикам, использующим stream.unshift() часто приходится переключаться в режим трансформера. См. API для реализаций стримов.


// Получение заголовка, ограниченного \n\n
// исп. unshift() если получено слишком много
// функция обратного вызова с (error, header, stream)
const StringDecoder = require('string_decoder').StringDecoder;
function parseHeader(stream, callback) {
  stream.on('error', callback);
  stream.on('readable', onReadable);
  const decoder = new StringDecoder('utf8');
  var header = '';
  function onReadable() {
    var chunk;
    while (null !== (chunk = stream.read())) {
      var str = decoder.write(chunk);
      if (str.match(/\n\n/)) {
        // нахождение границ заголовка
        var split = str.split(/\n\n/);
        header += split.shift();
        const remaining = split.join('\n\n');
        const buf = Buffer.from(remaining, 'utf8');
        stream.removeListener('error', callback);
        // установка слушателя
        stream.removeListener('readable', onReadable);
        if (buf.length)
          stream.unshift(buf);
        // теперь сообщение из стрима может быть прочитано
        callback(null, header, stream);
      } else {
        // до сих пор читаем заголовок
        header += str;
      }
    }
  }
}

Примечание: в отличие от stream.push(chunk) метод stream.unshift(chink) не заканчивает процесс чтения, сбрасывая внутреннее состояние чтения из стрима. Это может повлечь за собой непредсказуемые результаты, если readable.unshift() вызывается в процессе чтения (например, из реализации пользовательского стрима stream._read()). Последующий вызов readable.unshift() с непосредственным вызовом stream.push(‘’) может сбросить состояние чтения, однако, лучше просто избегать вызова readable.unshift() в процессе чтения.

readable.wrap(stream)

Добавлено в v0.9.4
  • stream <Стрим> Открытый для чтения стрим в «старом стиле»

Версии Node.js до v0.10 содержат стримы, которые не реализуют полный API модуля stream как требуется сейчас. См. Совместимость.

При использовании старой библиотеки Node.js, которая генерирует событие ‘data’ и имеет метод stream.pause(), являющийся факультативным, нужно использовать метод stream.wrap() для создания открытого для чтения стрима, что использует старый стрим в качестве источника данных.

Редко бывает так, что приходится использовать readable.wrap(), но метод предоставлен как удобная альтернатива для работы со старыми приложениями и библиотеками Node.js.

Пример:


const OldReader = require('./old-api-module.js').OldReader;
const Readable = require('stream').Readable;
const oreader = new OldReader;
const myReader = new Readable().wrap(oreader);

myReader.on('readable', () => {
  myReader.read(); // etc.
});

Спаренные стримы и трансформеры

Class: stream.Duplex

Добавлено в v0.9.4

Спаренные стримы являются стримамы, реализующими одновременно открытые для чтения и для записи интерфесы.

Примеры спаренных стримов:

  • TCP сокеты
  • zlib стримы
  • crypto стримы

Class: stream.Transform

Добавлено в v0.9.4

Трансформеры являются спаренными стримами, вывод которых в какой-то степени связан с вводом. Как и все спаренные стримы, трансформеры реализуют интерфейсы открытых для чтения и открытых для записи стримов.

Примеры трансформеров:

  • zlib стримы
  • crypto стримы

API для реализаций стримов

API модуля stream было разработано для того, чтобы сделать возможной легкую реализацию стримов используя модель прототипного наследования JavaScript.

Во-первых, разработчик стримов должен декларировать новый класс JavaScript, который расширяет один из четырех базовых классов для стримов (stream.Writable, stream.Readable,stream.Duplex, или stream.Transform), убедившись, что вызван соответствующий конструктор родительского класса:


const Writable = require('stream').Writable;

class MyWritable extends Writable {
  constructor(options) {
    super(options);
  }
}

Новый класс стрима должен реализовывать один или больше заданных методов, в зависимости от типа созданного стрима. Детально в таблице:

Пользовательский случайКлассМетоды
Только чтениеReadable_read
Только записьWritable_write, _writev
Чтение и записьDuplex_read, _write, _writev
Работа с записью, затем чтение результатаTransform_transform, _flush

Примечание: код реализации для стрима никогда не должен вызывать public методы стрима, которые предназначаются для использования получателями (как описано в разделе API для получателей стримов). Это может привести к нежелательным побочным эффектам на стороне получателя стрима.

Упрощенное конструирование

Для множества простых случаев можно сконструировать стрим без учитывания наследования. Можно это применять путем прямого создания экземпляров объектов stream.Writable, stream.Readable,stream.Duplex, или stream.Transform и передачей методов в качестве опций конструктора.

Пример:


const Writable = require('stream').Writable;

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  }
});

Реализация открытого для записи стрима

Класс stream.Writable распространяется на реализацию открытого для записи стрима.

Пользовательские открытые для записи стримы должны вызывать конструктор new stream.Writable([options]) и реализовывать метод writable._write(). Также может быть реализован метод writable._writev().

Constructor: new stream.Writable([options])

  • options <Объект>
    • highWaterMark <Число> Уровень буфера, когда stream.write() начинает возвращать false. По умолчанию: 16384 (16кБ) или 16 для стримов в объектном режиме.
    • decodeStrings <Boolean> Декодирует либо не декодирует строки в буфер перед передачей их в stream._write(). По умолчанию: true.
    • objectMode <Boolean> Показывает, является stream.write(anyObj) валидной операцией или нет. Если эта опция задана, становится возможным записывать отличные от строк и буферов значения JavaScript, если это поддерживается реализацией стрима. По умолчанию: false.
    • write <Функция> Реализация метода stream._write()
    • writev <Функция> Реализация метода stream._writev()

Пример:


const Writable = require('stream').Writable;

class MyWritable extends Writable {
  constructor(options) {
    // Вызывает конструктор stream.Writable()
    super(options);
  }
}

Или, при использовании конструкторов пре-ES6:


const Writable = require('stream').Writable;
const util = require('util');

function MyWritable(options) {
  if (!(this instanceof MyWritable))
    return new MyWritable(options);
  Writable.call(this, options);
}
util.inherits(MyWritable, Writable);

Или же используя упрощенный конструктор:


const Writable = require('stream').Writable;

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  }
});

writable._write(chunk, encoding, callback)

  • chunk <Буфер> | <Строка> Порция данных для записи. Всегда будет буфером, если опции decodeStrings не задано значение false.
  • encoding <Строка> Если порция данных является строкой, то encoding представляет собой кодировку символов этой строки. Если порция данных – буфер, или если стрим работает в объектном режиме, encoding может игнорироваться.
  • callback <Функция> Вызов этой функции (опционально с аргументом ошибки) при обработке завершает предоставленную порцию данных.

Все реализации открытых для записи стримов должны предоставлять метод writable_write() для отправки данных в предустановленный ресурс.

Примечание: трансформеры предоставляют свои собственные реализации writable._write().

Еще примечание: эта функция не должна вызываться прямо из кода приложения. Нужна реализация через дочерние классы и вызывать эту функцию нужно только с помощью внутренних методов классов открытых для записи стримов.

Метод callback должен вызываться для отображения, была ли запись успешно завершена или завершилась с ошибкой. Первый аргумент, передаваемый в callback должен быть объектом Error, если вызов не удался или null, если запись прошла успешно.

Важно помнить, что все вызовы writable.write(), которые происходят между вызовами writable._write() и callback могут привести к тому, что записываемые данные буферизируются. После вызова callback стрим будет генерировать событие ‘drain’. Если реализация стрима способна обрабатывать множественные порции данных за раз, нужно реализовать метод writable._writev().

Если свойство decodeStrings настроено в опциях конструктора, chunk может быть скорее строкой, чем буфером, и encoding будет показывать кодировку символов строки. Это нужно для поддержки реализаций, которые имеют оптимизированную обработку для определенных кодировок строчных данных. Если свойство decodeStrings явно установлено на false, то аргумент encoding может игнорироваться без опаски и chunk будет оставаться тем же объектом, которы передается во .write().

Метод writable._write() предваряется нижним подчеркиванием, так как это внутренний метод, который никогда не должен вызываться напрямую пользовательскими программами.

writable._writev(chunks, callback)

  • chunks <Массив> Порции данных для записи. Каждая порция имеет следующий формат: { chunk: ..., encoding: ... }
  • callback <Функция> Функция обратного вызова (опционально с аргументом ошибки), которая вызывается, когда обработка порций данных завершается.

Примечание: эта функция не должна вызываться напрямую из кода приложения. Ее нужно реализовывать через дочерние классы и вызывать только с помощью внутренних методов открытого для записи стрима.

Метод writable._writev() должен реализовываться как дополнение к writable._write() в стриме, который может обрабатывать множественные порции данных за раз. Если это реализовано, метод вызывается со всеми текущими данными, которые на данный момент буферизированы в очереди на запись.

Метод writable._writev() предваряется нижним подчеркиванием, так как его определяет внутренний класс, и не должен никогда вызываться напрямую из пользовательских программ.

Ошибки при записи

Для ошибок, которые случаются во время обработки методов writable._write() и writable._writev() рекомендуется составлять отчет посредством функции обратного вызова с передачей ошибки в качестве первого аргумента. Это приведет к тому, что открытый для записи стрим сгенерирует событие ‘error’. Выпадение ошибки в writable._write() может привести к неожиданному и непостоянному поведению, зависящему от того, как используется стрим. Использование функции обратного вызова гарантирует постоянную и предсказуемую обработку ошибок.


const Writable = require('stream').Writable;

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  }
});

Пример открытого для записи стрима

Следующий пример иллюстрирует довольно упрощенную (и, в каком-то роде бессмысленную) пользовательскую реализацию открытого для записи стрима. Так как этот экземпляр открытого для записи стрима не имеет реальной практической пользы, пример показывает каждый требуемый элемент экземпляра пользовательского открытого для записи стрима.


const Writable = require('stream').Writable;

class MyWritable extends Writable {
  constructor(options) {
    super(options);
  }

  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  }
}

Реализация открытого для чтения стрима

Класс stream.Readable распространяется на реализацию открытого для чтения стрима.

Пользовательские открытые для чтения стримы должны вызывать конструктор new stream.Readable([options]) и реализовывать метод readable._read().

new stream.Readable([options])

  • options <Объект>
    • highWaterMark <Число> Максимальное количество байт для хранения во внутреннем буфере перед чтением из предустановленного ресурса. По умолчанию: 16384 (16кБ) или 16 для стримов в объектном режиме.
    • encoding <Строка> Если задано, то буферы будут декодированы в строки с использованием этой кодировки. По умолчанию: null.
    • objectMode <Boolean> Показывает, должен ли стрим иметь поведение объектного стрима. Имеется в виду, что stream.read(n) возвращает единственное значение буфера размера n. По умолчанию: false.
    • read <Функция> Реализация метода stream._read()

Пример:


const Readable = require('stream').Readable;

class MyReadable extends Readable {
  constructor(options) {
    // Вызывает конструктор stream.Readable(options) 
    super(options);
  }
}

Или, с использованием конструкторов пре-ES6:


const Readable = require('stream').Readable;
const util = require('util');

function MyReadable(options) {
  if (!(this instanceof MyReadable))
    return new MyReadable(options);
  Readable.call(this, options);
}
util.inherits(MyReadable, Readable);

Или, используя упрощенный конструктор:


const Readable = require('stream').Readable;

const myReadable = new Readable({
  read(size) {
    // ...
  }
});

readable._read(size)

  • size <Число> Количество байт для асинхронного чтения

Примечание: эта функция не должна вызываться напрямую из кода приложения. Ее нужно реализовывать через дочерние классы и вызывать только с помощью внутренних методов открытого для чтения стрима.

Все реализации открытого для чтения стрима должны предоставлять реализацию метода readable._read() для получения данных из предустановленного ресурса.

Когда вызывается readable._read(), и если данные из ресурса доступны, реализация должна начать вставлять данные в очередь на чтение, используя this.push(dataChunk). _read() должен продолжать чтение из ресурса и вставлять данные в очередь, пока readable.push() не возвратит false. Только когда _read() вызывается снова после того, как был остановлен, восстанавливается добавление дополнительных данных в очередь.

Примечание: после того, как вызван метод readable._read(), он не будет вызван снова, пока вызван метод readable.push().

Аргумент size является дополнительным. Для реализаций, где “read” – единственная операция, которая возвращает данные, можно использовать аргумент size для определения того, какое количество данных должно быть получено. Другие реализации могут игнорировать этот аргумент и просто предоставлять данные тогда, когда они становятся доступными. Не нужно ждать, пока байты size станут доступными перед вызовом stream.push(chunk).

Метод readable._read() предваряется нижним подчеркиванием, так как это метод, определяемый внутренним классом и не должен никогда вызываться напрямую из пользовательских программ.

readable.push(chunk[, encoding])

  • chunk <Буфер> | <Null> | <Строка> Порция данных, которая вставляется в очередь на чтение
  • encoding <Строка> Кодировка строковых данных. Должна быть валидной буферной кодировкой, такой, как ‘utf8’ или ‘ascii’.
  • Возвращает boolean. True, если добавление дополнительных порций данных может быть продолжено, в противном случае false.

Когда chunk является буфером или строкой, порции данных могут добавляться во внутреннюю очередь на получение пользователями стримов. Передача chunk как null сигнализирует об окончании стрима (EOF) после которого данные больше не могут быть записаны.

Когда открытый для чтения стрим работает в режиме паузы, данные, добавленные с помощью readable.push() могут быть прочитаны посредством метода readable.read() когда генерируется событие ‘readable’.

Если открытый для чтения стрим работает в режиме потока, данные, добавленные с помощью readable.push() передаются посредством генерации события ‘data’.

Метод readable.push() разработан настолько гибким, насколько это было возможно. Например, при оборачивании низкоуровнего источника, который предоставляет некоторые формы механизма паузы/восстановления и функцию обратного вызова для данных, низкоуровневый источник может оборачиваться пользовательским экземпляром открытого для чтения стрима, как показано в следующем примере:


// источник является объектом с методами readStop()и readStart(),
// и элемент`ondata`, который вызывается, когда есть данные
 // и элемент `onend`, который вызывается, когда данные заканчиваются
class SourceWrapper extends Readable {
  constructor(options) {
    super(options);

    this._source = getLowlevelSourceObject();

    // каждый раз при наличии данных вставляет их во внутренний буфер
    this._source.ondata = (chunk) => {
      // если push() возвращает false, чтение из источника прекращается
      if (!this.push(chunk))
        this._source.readStop();
    };

    // Когда источник оканчивается, вставляется порция данных с EOF-сигналом `null`
    this._source.onend = () => {
      this.push(null);
    };
  }
  // _read вызывается, когда стриму нужно получить больше данных
  // дополнительный аргумент size в этом случае игнорируется
  _read(size) {
    this._source.readStart();
  }
}

Примечание: метод readable.push() предназначен для вызова только в реализациях открытых для чтения стримов и только в рамках метода readable._read().

Ошибки во время чтения

Рекомендуется, чтобы ошибки, встречающиеся во время обработки метода readable._read() генерировались с помощью события ‘error’ вместо обычного механизма выдачи ошибок. Выпадение ошибки в readable._read() может привести к неожиданному и непостоянному поведению, зависящему от того, работает стрим в режиме паузы или в режиме потока. Использование события ‘error’ гарантирует постоянную и предсказуемую обработку ошибок.


const Readable = require('stream').Readable;

const myReadable = new Readable({
  read(size) {
    if (checkSomeErrorCondition()) {
      process.nextTick(() => this.emit('error', err));
      return;
    }
    // какая-то работа 
  }
});

Пример считающих стримов

Следующий пример является простым примером открытого для чтения стрима. который генерирует числа от 1 до 1000000 в порядке возрастания и затем завершается.


const Readable = require('stream').Readable;

class Counter extends Readable {
  constructor(opt) {
    super(opt);
    this._max = 1000000;
    this._index = 1;
  }

  _read() {
    var i = this._index++;
    if (i > this._max)
      this.push(null);
    else {
      var str = '' + i;
      var buf = Buffer.from(str, 'ascii');
      this.push(buf);
    }
  }
}

Реализация спаренного стрима

Спаренный стрим реализует и открытый для чтения и открытый для записи интерфейсы. Примером спаренного стрима можно назвать соединение TCP сокета.

Так как JavaScript не поддерживает множественное наследование, класс stream.Duplex распространяется на реализацию спаренного стрима (противоположно по отношению к классам stream.Readable и stream.Writable).

Примечание: класс stream.Duplex прототипично наследуется из stream.Readable и принужденно из stream.Writable, но instanceof будет правильно работать для обоих базовых классов благодаря переопределению Symbol.hasInstance на stream.Writable.

Пользовательские спаренные стримы должны вызывать конструктор new stream.Duplex([options]) и реализовывать оба метода readable._read() и writable._write().

new stream.Duplex(options)

  • options <Объект> Передается в оба конструктора – writable и readable. Также имеет следующие области:
    • allowHalfOpen <boolean> По умолчанию: true. Если false, то стрим будет автоматически завершать открытую для чтения часть, когда завершается открытая для записи и наоборот.
    • readableObjectMode <boolean> По умолчанию: false. Устанавливает objectMode для открытой для чтения части стрима. Ничего не делает, если objectMode имеет значение true.
    • writableObjectMode <boolean> По умолчанию: false. Устанавливает objectMode для открытой для записи стороны стрима. Ничего не делает, если objectMode имеет значение true.

Пример:


const Duplex = require('stream').Duplex;

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
  }
}

Или, при использовании конструкторов пре-ES6:


const Duplex = require('stream').Duplex;
const util = require('util');

function MyDuplex(options) {
  if (!(this instanceof MyDuplex))
    return new MyDuplex(options);
  Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex);

Или, используя утпрощенный конструктор:


const Duplex = require('stream').Duplex;

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  }
});

Пример спаренного стрима

Нижеследующее иллюстрирует простой пример спаренного стрима, который оборачивает гипотетический объект источника уровнем ниже, в который могут быть записаны данные, и из которого данные могут быть считаны, несмотря на использование несовместимого со стримами Node.js API. Также показывается простой пример спаренного стрима, который буферизирует входящие записанные данные через Writable интерфейс, который читается через Readable интерфейс.


const Duplex = require('stream').Duplex;
const kSource = Symbol('source');

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options);
    this[kSource] = source;
  }

  _write(chunk, encoding, callback) {
    // предустановленный источник работает только со строками
    if (Buffer.isBuffer(chunk))
      chunk = chunk.toString();
    this[kSource].writeSomeData(chunk);
    callback();
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding));
    });
  }
}

Самый главный аспект спаренных стримов состоит в том, что открытые для чтения и для записи стороны работают независимо друг от друга, несмотря на одновременное существование в одном экземпляре объекта.

Спаренные стримы в объектном режиме

Для спаренных стримов objectMode может быть установлен исключительно либо для открытой для чтения, либо для открытой для записи стороны через readableObjectMode и writableObjectMode соответственно.

В следующем примере новый стрим-трансформер (который является одним из видов спаренных стримов) создается с открытой для чтения стороной в объектном режиме, которая принимает числа JavaScript , конвертируемые в шестнадцатеричные строки на открытой для чтения стороне.


const Transform = require('stream').Transform;

// все стримы-трансформеры являются спаренными стримами
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // преобразование данных в число, если необходимо
    chunk |= 0;

    // преобразование данных во что-нибудь еще
    const data = chunk.toString(16);

    // добавление данных в очередь на чтение
    callback(null, '0'.repeat(data.length % 2) + data);
  }
});

myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));

myTransform.write(1);
// Выводит на экран: 01
myTransform.write(10);
// Выводит на экран: 0a
myTransform.write(100);
// Выводит на экран: 64

Реализация стрима-трансформера

Стрим-трансформер является спаренным стримом, в котором вывод в своем роде вычислен из ввода. Примеры – стримы zlib и crypto которые сжимают, шифруют и дешифруют данные.

Примечание: от вывода не требуется, чтобы он был такого же размера, как ввод, имел такое же количество порций данных или включался в то же время. Например, стрим Hash будет всегда иметь единственный вывод, который предоставляется тогда, когда завершается ввод. Стрим zlib будет давать вывод, который будет либо намного меньше, либо намного больше ввода.

Класс stream.Transform распространяется на реализацию стрима-трансформера.

Класс stream.Transform прототипически наследуется из stream.Duplex и реализует свои собственные версии методов writable._write() и readable._read(). Пользовательские реализации трансформеров должны реализовывать метод transform._transform() и могут также transform._flush().

Примечание: нужно быть аккуратным при использовании стримов-трансформеров,так как запись данных в стрим может привести к тому, что открытая для записи сторона стрима будет приостановлена, если вывод открытой для чтения стороны не был получен.

new stream.Transform([options])

  • options <Объект> передается в Writable и Readable конструкторы. Имеет следующие подразделы:
    • transform <Функция> Реализация метода stream._transform()
    • flush <Функция> Реализация метода stream._flush().

Пример:


const Transform = require('stream').Transform;

class MyTransform extends Transform {
  constructor(options) {
    super(options);
  }
}

Или, при использовании конструкторов пре-ES6:


const Transform = require('stream').Transform;
const util = require('util');

function MyTransform(options) {
  if (!(this instanceof MyTransform))
    return new MyTransform(options);
  Transform.call(this, options);
}
util.inherits(MyTransform, Transform);

Или, используя упрощенный конструктор:


const Transform = require('stream').Transform;

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  }
});

События: ‘finish’ и ‘end’

События ‘finish’ и ‘end’ исходят из stream.Writable и stream.Readable соотвественно. Событие ‘finish’ генерируется после вызова stream.end() и после того, как все порции данных были обработаны stream._transform(). Событие ‘end’ генерируется после того, как все данные были направлены на вывод, что происходит после вызова функции callback в transform._flush().

transform._flush(callback)

  • callback <Функция> Функция обратного вызова (опционально с аргументом ошибки и данными), которая вызывается, когда сбрасываются оставшиеся данные.

Примечание: эта функция не должна вызываться напрямую из кода приложения. Ее нужно реализовывать через дочерние классы и вызывать только с помощью внутренних методов открытого для чтения стрима.

В некоторых случаях операции преобразования (transform) могут нуждаться в генерации дополнительных данных в конце стрима. Например, стрим сжатия zlib будет хранить объем внутреннего состояния, используемый для оптимального сжатия вывода. Когда стрим завершается, однако, эти дополнительные данные должны быть сброшены, так, чтобы сжатые данные были закончены.

Пользовательские реализации трансформеров могут реализовывать метод transform._flush(). Он может быть вызван когда больше нет записанных данных для получения, но перед генерацией события ‘end’, сигнализирующего об окончании открытого для чтения стрима.

В реализации transform._flush() метод readable.push() может быть не вызван вообще либо вызван большее количество раз, если нужно. Функция callback должна быть вызвана после завершения операции сброса.

Метод transform._flush() предваряется нижним подчеркиванием, так как его задает внутренний класс, и никогда не должен вызываться напрямую из пользовательских программ.

transform._transform(chunk, encoding, callback)

  • chunk <Буфер> | <Строка> Порция данных, которая преобразуется. Всегда будет буфером, если только опция decodeStrings не имеет значение false.
  • encoding <Строка> Если порция данных является строкой, то это кодировка. Если порция данных – буфер, то тогда есть специальное значение ‘buffer’, а это значение можно игнорировать.
  • callback <Функция> Функция обратного вызова (опционально с аргументом ошибки и данными), которая вызывается после обработки chunk.

Примечание: эта функция не должна вызываться напрямую из кода приложения. Ее нужно реализовывать через дочерние классы и вызывать только с помощью внутренних методов открытого для чтения стрима.

Все стримы-трансформеры и их реализации должны предоставляь метод _transform() на принятый ввод и создавать вывод. Реализация transform._transform() обрабатывает записываемые байты, вычисляет вывод, затем передает вывод на открытую для чтения часть, используя метод readable.push().

Метод transform.push() может не вызываться вообще или вызываться большее количество раз для генерации вывода из единственного ввода, в зависимости от того, сколько должно быть выведено в результате chunk.

Может быть и такое, что не генерируется никакого вывода из данного chunk.

Функция callback должна быть вызвана только тогда, когда текущая порция данных полностью получена. Первый аргумент, передаваемый в callback, должен быть объектом Error, если при обработке ввода случается ошибка, или null в другом случае. Если в callback передается второй аргумент, он направляется дальше в метод readable.push(). Другими словами, следующее эквивалентно:


transform.prototype._transform = function (data, encoding, callback) {
  this.push(data);
  callback();
};

transform.prototype._transform = function (data, encoding, callback) {
  callback(null, data);
};

Метод transform._transform() предваряется нижним подчеркиванием, так как его задает внутренний класс, и никогда не должен вызываться напрямую из пользовательских программ.

Class: stream.PassThrough

Класс stream.PassThrough является банальной реализацией стрима-трансформера, которая просто передает входящие байты на вывод. Эта реализация изначально предназначена для примеров и тестирования, но есть некоторые пользовательские случаи, где stream.PassThrough является довольно полезным в качестве строительного блока для новых видов стримов.

Примечания

Совместимость с более старыми версиями Node.js

В версиях Node.js до v0.10 интерфейс открытого для чтения стрима был проще, но вместе с тем менее мощным и менее полезным.

  • Вместо ожидания вызова метода stream.read(), будут немедленно генерироваться события ‘data’. От приложений, которые нуждаются в выполнении некоторой работы для определения того, как обрабатывать данные, требовалось хранить данные для чтения в буферах, так, чтобы они не потерялись.
  • Метод stream.pause() был дополнительным и ничего не обеспечивал. Это означало, что в любом случае было необходимо быть готовым к получению событий ‘data’ даже когда стрим находился в режиме паузы.

В Node.js v0.10 был добавлен класс Readable. Для обеспечения обратной совместимости со старыми программами Node,js, открытые для чтения стримы переключаются в режим потока, когда добавляется обработчик события ‘data’ или когда вызывается метод stream.resume(). Суть этого в том, что даже когда не используется метод stream.read() и событие ‘readable’, больше не нужно беспокоиться о потере данных.

В то время, как большинство приложений продолжают функционировать в нормальном режиме, это представляет крайний случай при следующих условиях:

  • Нет слушателя события ‘data’
  • Никогда не вызывается метод stream.resume()
  • Стрим не передается по пайпу в какой-либо открытый для записи пункт.

Например:


// Предупреждение!  Неверный код!
net.createServer((socket) => {

  // добавляем метод 'end', но не получаем данные
  socket.on('end', () => {
    // сюда не попадаем
    socket.end('The message was received but was not processed.\n');
  });

}).listen(1337);

В версиях Node,js до v0.10 входящие данные могут просто отклоняться. Однако, в Node.js v0.10 и выше сокет остается приостановленным.

Такую ситуацию можно обойти с помощью вызова метода stream.resume() для старта потока данных:


// обходной путь
net.createServer((socket) => {

  socket.on('end', () => {
    socket.end('The message was received but was not processed.\n');
  });

  // запускает поток данных, отклоняя их
  socket.resume();

}).listen(1337);

В дополнение к новым открытым для чтения стримам, переключающимся в режим потока, стримы до v0.10 могут оборачиваться в класс Readable, используя метод readable.wrap().

readable.read(0)

Существуют такие случаи, когда необходимо обновлять предустановленные механизмы открытых для чтения стримов без непосредственного получения данных. В этих случаях возможно вызвать readable.read(0), который всегда возвращает null.

Если внутренний буфер находится под highWaterMark, а стрим еще не читает данные, то вызов stream.read(0) повлечет вызов низкоуровневого stream._read().

Большинство приложений не требует таких действий, но в Node.js существуют такие ситуации, в которых реализуется данный метод, в частности во внутренних классах открытых для чтения стримов.

readable.push(‘’)

Использовать readable.push(‘’) не рекомендуется.

Вставка строки, содержащей 0 байт или буфера в стрим, который не работает в объектном режиме, имеет интересный побочный эффект. Так как это все-таки вызов readable.push(‘’), сам вызов завершит процесс чтения. Однако, так как аргумент является пустой строкой, в буфер не будут добавляться данные, и, вследствие этого, пользователю нечего получать.