Перейти к содержанию

Multicasted Observable

Ранее говорилось, что объекты RxJS Observable выполняются для каждого вызова метода subscribe() уникально, в отличие от RxJS Subject. Но это не совсем так. При необходимости можно создать Multicasted Observable, который позволяет в рамках одного и того выполнения регистрировать сразу несколько обработчиков.

multicast()

Такие объекты создаются с помощью метода RxJS multicast(), а в их основе находятся объекты Subject.

const subject = new Subject()
const multicasted = from([2, 4, 6]).pipe(multicast(subject))

multicasted.subscribe(vl => console.log(`1st: ${vl}`))
multicasted.subscribe(vl => console.log(`2nd: ${vl}`))

multicasted.connect()

RxJS multicast() принимает Subject, который регистрирует на себя всех "потребителей" и который сам регистрируется в качестве обработчика для исходного объекта Observable.

В результате получается объект типа ConnectableObservable - стандартный Observable с методом connect(). Именно вызов connect() инициирует выполнение исходного объекта и возвращает его контекст (объект с unsubscribe()).

Объект Subject можно передать двумя способами: напрямую и с использованием фабричной функции. В первом случае после завершения выполнения объекта новые "потребители" получат только уведомление о завершении (обработка complete) и все, тогда как фабричная функция запустит новое исполнение объекта.

refCount()

Метод RxJS refCount() облегчает работу с ConnectableObservable.

При регистрации первого обработчика он автоматически начинает выполнение исходного объекта (вызывается connect()), а когда не остается ни одного "потребителя" автоматически завершает его выполнение (вызывается unsubscribe()).

Так отпадает необходимость в ручном контроле таких объектов.

const subject = new Subject()
const refCounted = interval(3).pipe(
  multicast(subject),
  refCount()
)

let sub1, sub2

//выполнение Observable начинается
sub1 = refCounted.subscribe(vl => console.log(`1st: ${vl}`))

setTimeout(() => (sub2 = refCounted.subscribe(vl => console.log(`2nd: ${vl}`))), 500)

setTimeout(() => sub1.unsubscribe(), 1500)

//выполнение Observable завершается
setTimeout(() => sub2.unsubscribe(), 2000)

Использовать refCount() можно только с объектами ConnectableObservable.