Введение в реактивное программирование с RXJS.

Реактивность

Предположим, наше приложение должно получать данные из нескольких источников при клике на кнопку.

Для этого выдвинем несколько условий.

  1. Данные должны быть объеденены из нескольких JSON источников.

  2. Финальные данные не должны содержать дубликатов.

  3. Для предотвращения излишних запросов, пользователь не должен кликать чаще 1 раза в секунду.

Начнем с создания ссылки и потока событий клика.

<a href="#" #mylink>Click me</a>

import { Component, ViewChild, ElementRef, AfterViewInit } from '@angular/core';
import { Observable, fromEvent, Subscription } from 'rxjs';


export class AppComponent implements AfterViewInit  {
  linkStream$: Subscription;
  @ViewChild('mylink', {static: false}) link: ElementRef;

  ngAfterViewInit() {
    const linkStream$ = fromEvent(this.link.nativeElement, 'click')
        .subscribe(res => console.log(res));

  }

}

При этом мы привязываемся к хуку ngAfterViewInit для дого чтобы определить момент когда все елементы DOM созданы.

Если это сделать в ngOnInit так:

export class AppComponent implements  OnInit  {
  linkStream$: Subscription;
  @ViewChild('mylink', {static: false}) link: ElementRef;

  ngOnInit() {
    ...
  }

То получим исключение.

ERROR TypeError: Cannot read property 'nativeElement' of undefined

Создание потока

fromEvent(this.link.nativeElement, 'click')
            .subscribe(res => console.log(res));

Вывод в консоль.

MouseEvent {isTrusted: true, screenX: 37, screenY: 153, clientX: 33, clientY: 21, …}
app.component.ts:15 MouseEvent {isTrusted: true, screenX: 37, screenY: 153, clientX: 33, clientY: 21, …}

Это события мышки по каждому клику.

Поставим задержку в одну секунду, используя debounce.

import { Observable, fromEvent, Subscription, pipe, interval } from 'rxjs';
import { debounce } from 'rxjs/operators';
....

const linkStream$ = fromEvent(this.link.nativeElement, 'click')
    .pipe(debounce(() => interval(1000)))
    .subscribe(res => console.log(res));

Сигнатура: debounce(durationSelector: function): Observable

Как видно ей нужно передать функцию.

Эта функция (фильтр) debounce отменяет все события, произошедшие раньше того времени, которое возвращается той функцией, которая передается в debounce.

В нашем случае мы передаем interval.

Функция interval создает наблюдаемый объект Observable, который порождает события с указанным интервалом.

Всю конструкцию debounce(() => interval(1000)) мы заключили (пропустили через) в поток pipe.

В нашем случае, фильтр debounce не совсем то, что нужно т.к. нам необходимо сгенерировать событие сразу после клика, и задушить все последующие в промежутке 1 сек.

Для этого есть фильтр throttle c аналогичной сигнатурой, но отменяющий события после, а не до.

const linkStream$ = fromEvent(this.link.nativeElement, 'click')
    .pipe(throttle(() => interval(1000)))
    .subscribe(res => console.log(res));

Теперь определим первый Observable - источник данных.

arraySource1 = from([1, 2, 3, 4, 5]);

И переключимся на него внутри потока map при помощи switchMap.

const linkStream$ = fromEvent(this.link.nativeElement, 'click')
    .pipe(
      throttle(() => interval(1000)),
      switchMap(() => this.arraySource1)
    )
    .subscribe(res => console.log(res));

Вывод нас удовлетворяет, теперь вместо события клика мыши мы получили наши данные.

Пропробуем задействовать два источника.

  arraySource1 = from([1, 2, 3, 4, 5]);
  arraySource2 = from(['a','b','c']);

  ngAfterViewInit() {
    const linkStream$ = fromEvent(this.link.nativeElement, 'click')
        .pipe(
          throttle(() => interval(1000)),
          switchMap(() => this.arraySource1),
          switchMap(() => this.arraySource2)
        )
        .subscribe(res => console.log(res));

  }

Результат бредовый.

app.component.ts:26 b
app.component.ts:26 c
app.component.ts:26 a
app.component.ts:26 b
app.component.ts:26 c
app.component.ts:26 a
app.component.ts:26 b
app.component.ts:26 c
app.component.ts:26 a
app.component.ts:26 b
app.component.ts:26 c
app.component.ts:26 a
app.component.ts:26 b
app.component.ts:26 c

Но он говорит о том, что парень прошешел по первому источнику и столько же раз по второму и передал в res результат второго источника.

Что если мы вложим один поток в другой?

const linkStream$ = fromEvent(this.link.nativeElement, 'click')
    .pipe(
      throttle(() => interval(1000)),
      switchMap(() => this.arraySource1.pipe(
        switchMap(() => this.arraySource2)
      ))
    )
    .subscribe(res => console.log(res));

То же самое!

Мы можем проследить процесс console.log-ом так.

const linkStream$ = fromEvent(this.link.nativeElement, 'click')
    .pipe(
      throttle(() => interval(1000)),
      switchMap((a) => {
        console.log(a);
        return this.arraySource1;
      }),
      switchMap((b) => {
        console.log(b);
        return this.arraySource2;
      })
    )
    .subscribe(res => console.log(res));

Вывод.

MouseEvent {isTrusted: true, screenX: 49, screenY: 151, clientX: 44, clientY: 20, …}
app.component.ts:28 1
app.component.ts:32 a
app.component.ts:32 b
app.component.ts:32 c
app.component.ts:28 2
app.component.ts:32 a
app.component.ts:32 b
app.component.ts:32 c
app.component.ts:28 3
app.component.ts:32 a
app.component.ts:32 b
app.component.ts:32 c
app.component.ts:28 4
app.component.ts:32 a
app.component.ts:32 b
app.component.ts:32 c
app.component.ts:28 5
app.component.ts:32 a
app.component.ts:32 b
app.component.ts:32 c

Замечаем что первым пошел MoudeEvent!

Изменим концептуально источники, которые будут возвращать нам данные одним событием, по аналогии с HTTP запросом.

arraySource1 = from([{result1: [1,2,3]}]); arraySource2 = from([{result2: [‘a’,’b’,’c’]}]);

ngAfterViewInit() { const linkStream$ = fromEvent(this.link.nativeElement, ‘click’) .pipe( throttle(() => interval(1000)), switchMap((a) => this.arraySource1), switchMap((b) => this.arraySource2) ) .subscribe(res => console.log(res));

}

Однако это тоже не дает нужных результатов, мы по прежнему не можем получить данные с двух источников в одном месте.

Попробуем использовать RxJS функцию combineLatest, которая принимает произвольное кол-во Observable объектов и возвращает новый Observable в который забрасывает последние элементы, сгенерированные каждым из переданных Observable объектов.

  ngAfterViewInit() {
    const linkStream$ = fromEvent(this.link.nativeElement, 'click')
    .pipe(
      throttle(() => interval(1000)),
      combineLatest(this.arraySource1, this.arraySource2)
    )
    .subscribe(res => console.log(res));
  }

Наконец то мы получили результаты от двух источников, попробуем преобразовать их перед передачей в позписчик ф-цией map.

  const linkStream$ = fromEvent(this.link.nativeElement, 'click')
    .pipe(
      throttle(() => interval(1000)),
      combineLatest(this.arraySource1, this.arraySource2),
      map(([mouseevent,data1,data2]: any) => 
           data1['result1'].map(element => [data2.result2[element-1], element])
      )
    )
    .subscribe(res => console.log(res));


(3) [Array(2), Array(2), Array(2)]
0: (2) ["a", 1]
1: (2) ["b", 2]
2: (2) ["c", 3]

Функция map будет получать список из последних сгенерированных значений каждого из Observable которые были из них вытянуты ф-цией combineLatest.

Осталось применить реальные запросы вместо харкодных данных.

  getRequest() {
    const url = 'http://localhost:4200';
    return this.http.get(url);
  }

  ngAfterViewInit() {
    const linkStream$ = fromEvent(this.link.nativeElement, 'click')
    .pipe(
      throttle(() => interval(1000)),
      combineLatest(this.getRequest(), this.arraySource2)
    )
    .subscribe(res => console.log(res));
  }

Тут нас ожидает проблема.

Дело в том, что combineLatest автоматически подписывается на Observable, возвращаемый ф-цией getRequest. Что вызвает HTTP запрос до клика на ссылке.

После того, как запрос выполнится, произойдет отписка и далее запрос вызван не будет.

Но при клике данные (результаты http запроса) все же будут извлечены изнутри уже испеченного Observable объекта.

Поэтому произведем подписку при клике а не в ngAfterViewInit

<a href="#" (click)="doLink()">Click me</a>

export class AppComponent  {

  link$: Observable<any>;


  arraySource2 = from([{result2: ['a','b','c']}]);

  constructor(private http: HttpClient) {
    this.link$ = from([true])
    .pipe(
      throttle(() => interval(1000)),
      combineLatest(this.getWether(), this.arraySource2)
    )
  }

  getWether() {
    const url = 'http://localhost:8085/i18n/ru.json';
    return this.http.get(url);
  }


  doLink(){
    this.link$.subscribe(val => console.log(val))
  }

В результате получим массив

[true, {…}, {…}]

Первым элементом пойдет значение true, из которого мы инициируем поток в конструкторе для одного события.

Так же необходимо помнить то что при помощи combineLatest мы объеденяем независимые друг от друга потоки, где один не зависит отрезультатов другого.

Для того, чтобы объеденить зависимые потоки необходимо использовать ф-ции megrgeMap и concatMap.

ДЗ. Получить список коментариес с id пользователями одним запросом и информасию о пользователях в другом запросе и объеденить данные.