Перейти к содержанию

Потоки

Вся концепция работы потоков заключается в передаче данных между программами в операциях ввода/вывода. Использование потоков позволяет программе быть независимой и объединяться с другими программами для создания более сложной системы.

Обмен данными осуществляется частями, не дожидаясь полной загрузки, что позволяет эффективно использовать оперативную память. Например, вам нужно найти определенную строку в файле большого объема. Использование потока позволяет получать данные по частям и искать необходимую строку в этих частях, при совпадении дальнейшее чтение файла можно прекратить, сэкономив при этом время и производительные ресурсы.

За работу с потоками в Node.js отвечает встроенный модуль stream, причем все экземпляры объектов потоков являются одновременно и экземплярами класса EventEmitter, что позволяет инициировать и обрабатывать события.

В Node.js имеются следующие потоки:

  • Readable - поток чтения, извлекающий данные из переданного источника (например, чтение из файла);
  • Writable - поток записи данных (например, сохранение данных в файл);
  • Duplex - поток, одновременно реализующий и чтение, и запись;
  • Transform - разновидность потока Duplex, только извлеченные данные перед записью могут быть изменены.

Рассмотрим каждый из потоков Node.js более подробно.

Поток Readable

Поток Readable отвечает за чтение данных из определенного источника. Примеры потоков чтения:

  • получение сервером HTTP запроса;
  • получение клиентом ответа на отправленный HTTP запрос
  • чтение из файла;
  • получение данных по сокетам и т. д.

Поток Readable работает в одном из двух режимов: flowing (процесс извлечения данных) и paused (режим ожидания).

По умолчанию все потоки чтения начинают свою работу с режим ожидания. Чтобы начать чтение из источника, необходимо либо добавить обработчик события data, либо вызвать метод [ReadableStreamInstance].resume(), либо отправить поток на запись вызовом метода pipe().

readable.js

const fs = require('fs')

let stream = fs.createReadStream('./files/data.txt')

setTimeout(() => stream.on('data', data => console.log(data.toString())), 3000) //выведет содержимое файла

В примере поток Readable создается с использованием метода createReadStream() модуля fs.

Перевод в режим ожидания, если для считываемых данных нет назначения, осуществляется вызовом метода [ReadableStreamInstance].pause().

В случае явного вызова метода resume() без наличия обработчика события data, все данные потока будут утеряны.

const fs = require('fs')

let stream = fs.createReadStream('./files/data.txt')

stream.resume()

setTimeout(() => stream.on('data', data => console.log(data.toString())), 3000) //событие не будет вызвано

События Node.js потока Readable:

  • data - получение потоком данных;
  • resume - инициируется при вызове метода resume();
  • pause - инициируется при вызове метода pause();
  • close - возникает при закрытии источника данных или самого потока;
  • end - генерируется, когда из источника считаны все данные;
  • error - возникновение в потоке ошибки, обработчику аргументом передается объект ошибки.

Поток Writable

Поток Writable отвечает за запись данных в указанное место. Примеры потоков записи:

  • отправка HTTP запроса с клиента на сервер;
  • отправка ответа на HTTP запрос с сервера на клиент;
  • запись данных в файл;
  • отправка данных по сокету.

Рассмотрим пример работы с потоком Node.js Writable.

writable.js

const fs = require('fs')

let writableStream = fs.createWriteStream('./files/data.txt')

writableStream.write('Something important data')
writableStream.end()

writableStream.on('finish', () => console.log('Data was written.'))
writableStream.on('error', err => console.log(err))

Здесь в примере создание потока Writable создается с помощью метода fs.createWriteStream(), в котором указывается, что все данные должны быть записаны в файл data.txt.

Запись в поток данных осуществляется с помощью метода [WritableStreamInstance].write(), который принимает следующие параметры:

  • данные;
  • кодировку (если данные строкового типа);
  • callback-функцию, которая будет вызвана после записи переданных данных.

Метод [WritableStreamInstance].write() возвращает булевое значение, если true - данные успешно записались, если false - значит запись данных необходимо продолжить после возникновения события drain.

Метод [WritableStreamInstance].write() может вернуть false в том случае, если данные на запись будут поступать в поток слишком быстро, тогда данные не будут успевать записываться и начнут накапливаться во внутреннем буфере. По достижению лимита буфера [WritableStreamInstance].write() вернет false, тем самым говоря. что необходимо подождать записи поступивших данных. Когда буфер разгрузится - будет сгенерировано событие drain и тогда можно будет продолжить запись.

По окончанию работы с потоком Writable вызовите метод [WritableStreamInstance].end(), чтобы указать, что все необходимые данные были записаны. Вызов метода инициирует событие finish.

Если неправильно указать путь к файлу для записи данных, будет сгенерировано событие error, обработчику которого аргументом будет передан объект ошибки.

writableStream.on('error', err => console.log(err))

События Node.js потока Writable:

  • drain - сигнализирует в возможности продолжить запись после переполнения внутреннего буфера;
  • finish - инициируется после вызова метода [WritableStreamInstance].end() и записи в указанное место всех данных;
  • close - возникает при закрытии указанного ресурса назначения данных или самого потока;
  • error - возникновение в потоке ошибки, обработчику аргументом передается объект ошибки.

Поток Duplex

В Node.js Duplex потоками называются те потоки, которые являются одновременно и Readable и Writable. Примером Duplex потока является передача данных по сокетам: отправка данных одной стороной (запись) и принятие их другой (чтение).

Поток Transform

Потоки Transform являются разновидностью потоков Duplex, их отличительная особенность заключается в том, что перед отдачей входящие данные видоизменяются (отсюда и название). Примером потока Transform является процесс шифрования или расшифровки данных.

Метод pipe()

Для передачи данных из одного потока в другой, применительно к объекту потока используется метод pipe(). Он также выполняет связующую роль, позволяя объединять потоки между собой.

const fs = require('fs')

let readStream = fs.createReadStream('./files/from.txt')
let writeStream = fs.createReadStream('./files/to.txt')

readStream.pipe(writeStream)