Перейти к содержанию

Объект Observable

Объекты RxJS Observable создаются либо с использованием операторов создания (of, from, fromEvent), либо через new Observable.

Пример с оператором of().

of('Hello').subscribe(vl => console.log(vl))

Пример с new Observable.

const obs = new Observable(sub => {
  sub.next(1)

  setTimeout(() => {
    sub.next(3)
    sub.complete()
  }, 500)
})

obs.subscribe(vl => console.log(vl))

Каждый Observable может отправлять своим "потребителям" уведомления вызовом одного из трех методов:

  • next() - отправка данных, количество вызовов не ограничено;
  • error() - генерация ошибки, параметром указываются данные любого формата (строка, объект, исключение) о причине ее возникновения;
  • complete() - завершение исполнения Observable, не принимает никаких параметров и не передает никакого значения.

Но исполнение RxJS Observable начнется только после вызова у него метода subscribe(), который принимает функцию с передаваемыми данными в качестве аргумента. Вторым и третьим необязательными параметрами методу subscribe() можно передать функции, которые будут вызваны в случае ошибки или (и) завершения Observable.

const obs = new Observable(sub => {
  sub.next(1)

  setTimeout(() => {
    sub.error(3)
  }, 500)
})

obs.subscribe(vl => console.log(vl), err => console.log('Error: ', err), () => console.log('Completed'))

Вызов error() или complete() автоматически завершает исполнение Observable.

Количество вызовов исполнения такого объекта не ограничено, а сам он даже не знает, сколько "потребителей" получают от него данные.

Метод subscribe() возвращает объект типа Subscription, который хранит текущее исполнение конкретного RxJS Observable и имеет единственный метод unsubscribe() для отмены его исполнения.

const sub = obs.subscribe()

sub.unsubscribe()

Вызов unsubscribe() нужен только для бесконечно исполняемых Observable, иначе занимаемые ими ресурсы будут освобождены только с окончанием работы всего приложения. А значит в процессе работы программы может произойти утечка памяти или могут быть созданы ненужные дублирующиеся "потребители".

Например, последнее может произойти, когда пользователь зашел на страницу, инициирующую исполнение RxJS Observable, затем перешел на другой URL и вернулся обратно.

Бесконечно исполняемыми Observable считаются те из них, которые никогда не вызывают метод complete(), даже если у них предусмотрен сценарий, при котором произойдет обращение к error(), что также завершит исполнение.

В Angular приложении unsubscribe() обычно вызывается на стадии жизненного цикла OnDestroy() того компонента, в котором используется Observable.