RXJS – это библиотека для создания асинхронных программ на основе событий с использованием цепочки последовательно расположенных Observables.
С помощью RXJS мы можем работать с источником данных, преобразуя их через поток с Observable, и обрабатывать этот поток с помощью операторов (operators), которые могут выполнять определенную обработку в потребляемых данных, как если бы мы работали с коллекциями, что-то похожее на работу с массивами, используя фильтры, карты или то, что мы хотим обработать в потоке данных.
Наблюдаемые
Это концепция коллекции значений или событий, она может быть многократно использована, работает с потоком данных, и мы можем использовать операторы для работы с этим потоком данных.
Observable – это коллекции из нескольких значений, которые могут следовать следующим протоколам Pull и Push, определяющим, как производитель данных может взаимодействовать с потребителем данных.
Но чтобы понять, какая структура стоит за этой концепцией, давайте поговорим о двух протоколах, стоящих за концепцией Observable, Pull и Push
Потяните
В этом протоколе производитель данных не знает, когда данные будут доставлены потребителю.
function getValue() {
return 10
}
Я получаю значение 10 именно тогда, когда выполняю функцию, Функция является производителем данных, а переменная, вызывающая функцию, является переменной, потребляющей единственное возвращаемое значение.
const x = getValue();
Нажмите
В этом протоколе производитель определяет, когда отправить данные потребителю, поэтому потребитель не знает, когда он получит данные.
const myPromise = new Promise((resolve, reject) => {
setTimeout(() => {
resolve(10);
}, 300);
});
myPromise
.then(data => console.log(10))
// saída => 10
Promise – это тип push, это Producer, который доставляет значение после того, как все инструкции в нем разрешены, но чтобы получить это значение, вы должны подписаться и ждать, в отличие от функции Promise, которая определяет, когда именно это значение будет отправлено в обратный звонок “resolves”.
Помня о том, как работают “тянущие” функции и “толкающие” Promises, RXJS представляет новый способ работы с данными Push, используя Observable.
Ошибочно думать, что Observables – это асинхронные функции, на самом деле они могут быть как синхронными, так и асинхронными, предоставляя один и тот же диапазон данных.
Observable – это способ получения бесконечных значений, начиная с нуля, с момента его вызова, и это может быть сделано синхронно или асинхронно.
Создание наблюдаемого объекта
Для создания Observable нам нужен обратный вызов, который обычно называется подписчиком, он будет передавать данные в виде потока
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
const id = setInterval(() => {
subscriber.next(1)
subscriber.next(2)
subscriber.next(3)
}, 1000);
});
А теперь, чтобы получить этот поток данных, нам нужно подписаться на наблюдаемую.
observable.subscribe(value => console.log(value));
// teremos a seguinte saída
// 1
// 2
// 3
Важным моментом здесь является то, что каждая подписка получает определенное значение для этого вызова, если мы продублируем код выше
observable.subscribe(value => console.log(value));
У нас будут те же выходы, но поставляемые значения и Observable специфичны для каждого subscribe, где он добавляет независимую конфигурацию для каждого, в общем, в этом случае, когда мы используем New Observable, основная идея заключается в том, что он не разделяет состояние.
Обозреватель
Наблюдатель – это потребитель значений, доставляемых Observable, это объект, состоящий из 3 обратных вызовов, по одному на каждый тип:
- Далее: отправляет некое примитивное значение или объект, который доставляется подписчику, когда пользователь подписывается на Observable.
- Error: посылает ошибку или исключение, останавливающее выполнение Observable, после чего ничего не передается.
- Complete: не отправляет никакого значения, просто завершает выполнение Observable, сообщая, что больше нет значения, которое будет отправлено позже.
Для работы с наблюдателем достаточно полностью или частично следовать этой структуре:
// Basta apenas assinar um Observable passando esse objeto como configuração
// para consumi-lo
observable.subscribe({
next(dado) { // obter o dado},
error(error) { // tratar o erro },
complete() { // Notificar que a execução acabou, não retorna nenhum valor }
});
Следуя этой структуре, вы можете получить данные, обработать ошибку или сообщить о завершении выполнения, но если вы хотите получить только данные, вы можете выполнить только часть, как это уже было сделано в некоторых предыдущих примерах:
observable.subscribe(dado => console.log('Entrega o valor passado pelo next ' + dado));
Подписка
Subscription – это сигнатура Observable, уже упоминалась в статье выше, обычно это выполнение Observable для получения потока данных, но она предоставляет очень ценную функцию – возможность отменить само выполнение.
Отмена наблюдаемого объекта
Отменить Observable можно в любое время, когда subscribe выполняет свою цель или нам нужно сделать резкую отмену, мы можем сделать это, удалив subscribe.
import { from } from 'rxjs';
const observable = from(["Huguinho", "Zezinho", "Luizinho"]);
const subscription = observable.subscribe(x => console.log(x));
subscription.unsubscribe();
Вы должны создать подписку, чтобы иметь возможность выполнить отмену, отмена Observable позволяет избежать траты памяти устройства.
Мы также можем отменить, запустив что-то более индивидуальным способом
Вы должны создать подписку, чтобы иметь возможность выполнить отмену, выполнение отмены Observable помогает избежать траты ресурсов памяти.
Мы также можем отменить действие, выполнив что-то более индивидуальным способом
const observable = new Observable(function subscribe(subscriber) {
const intervalId = setInterval(() => {
subscriber.next('Olá');
}, 1000);
return function unsubscribe() {
clearInterval(intervalId);
};
});
RxJS Observables гораздо мощнее в сочетании с другими возможностями RxJS, такими как композиция с помощью операторов, которые мы рассмотрим в другой статье.