Реактивне програмування з RxJs.

Фронтенд розробка JavaScript. -> Реактивне програмування з RxJs.

Реактивне програмування з RxJs.

Еволюція у програмуванні асинхронних процесів.

Колбеки.

jQuery(() => {

    function B(callback) {
        callback('Done!');
    }

    function A() {
        console.log('Hello from calback');
    }

    B(A);
})

Коллбек - це функція A, що передається як параметр іншої функції B, яка здійснює асинхронну операцію. Коли B закінчить виконання, вона викличе назад A.

Колбеки використовуються для обробки таких операцій як передача по мережі, доступ до БД, обробка введення користувача.

Колбеки мають такі недоліки.

  1. Колбекове пекло. Безліч вкладених колбеків.

    firstFunction(args, function() { secondFunction(args, function() { thirdFunction(args, function() { // And so on… }); }); });

  2. Колбеки можуть бути викликані більше одного разу і немає гарантії їхнього одноразового виконання. При множинні залучення можуть призводити до труднощів виявлення помилок.

  3. Колбеки змінюють семантику роботи з помилками. При цьому відійдуть від механізму try/catch і покладають на програміста відповідальність перевірки помилок і передача їх по ланцюжку викликів.

    var num = ‘5’;

    myFunction(num, function callback(err, result) { if (err) { // handle error }

    // handle result });

  4. За необхідності забезпечення багатопотокового виконання програмування стає вкрай складним. Коли, наприклад, нам необхідно скомбінувати дані з різних незалежних асинхронних викликів. При цьому виникає необхідність відстежувати стан кожного з них у тимчасових змінних перед комбінацією, а потім передачу їх функції-комбінатора в потрібній послідовності.

Промисли.

Проміси є результатом виконання асинхронної операції. У коді, заснованому на промісах, виклик асинхронної операції поверне спеціальний об’єкт-проміс, який може перебувати в наступних станах:

  • бути виконаним (resolved)

  • відкинутим (rejected) у разі помилок

  • Виконуються (pending)

Таким чином, код стає більш схожим на синхронний і виключає вкладені блоки.

Визначення промісу.

var promise = new Promise(function(resolve, reject) {
    // do a thing, possibly async, then…



    if (true === true) {
      resolve("Stuff worked!");
    }
    else {
      reject(Error("It broke"));
    }
  })

  promise.then(function(result) {
    console.log(result); // "Stuff worked!"
  }, function(err) {
    console.log(err); // Error: "It broke"
  }).then()
    .then()
    .then();

  promise.then(function(result) {
    console.log(result); // "Stuff worked!"
  }, function(err) {
    console.log(err); // Error: "It broke"
  });

На жаль, проміси є лише особливим чином роботи з колбеками і як вони здатні повертати єдиний результат за раз. Це робить їх некорисними в процесах, що повторюються, таких як кліки мишею або потоках даних, що приходять від віддаленого джерела. У разі ми змушені створювати кожному події у потоці окремий проміс.

Генератор подій Event Emitter.

Суть – ми генеруємо подію та підписуємо на неї обробника (слухача). Це чудовий спосіб розділити функціональність та послабити зв’язки між елементами логіки.

Однак, це має свої проблеми.

  1. Слухачі породжують побічні ефекти, т.к. не враховують те, що повертають і змушені змінювати те, що знаходиться за межами в навколишньому просторі імен.

  2. Події - це прості об’єкти першого класу. Наприклад серія кліків мишею може бути передана як масиву що саме собою масив. Ми повинні обробляти кожну подію індивідуально.

  3. Дуже просто пропустити подію, якщо ми запізнилися зі слухачем. У ситуації коли подія виникає до того моменту, як ми додаємо слухача.

Що таке реактивне програмування?

Просте - це механізми створення, зміни та реагування на потоки даних.

Ці механізми описуються такими діаграмами.

start page

Що таке RxJS?

Це імплементація принципів реактивного програмування (RX) мови JS.

Засновано на застосуванні 2 патернів – ітератор та спостерігач.

Спостерігач.

function Producer() {
    this.listeners = [];
}
Producer.prototype.add = function(listener) {
    this.listeners.push(listener);
};
Producer.prototype.remove = function(listener) {
    var index = this.listeners.indexOf(listener);
    this.listeners.splice(index, 1);
};
Producer.prototype.notify = function(message) {
    this.listeners.forEach(function(listener) {
    listener.update(message);
});

Применение в клиентском коде.

var listener1 = {
    update: function(message) {
    console.log('Listener 1 received:', message);
}
};
var listener2 = {
    update: function(message) {
    console.log('Listener 2 received:', message);
}
};

var notifier = new Producer();
notifier.add(listener1);
notifier.add(listener2);
notifier.notify('Hello there!');

Ітератор.

function iterateOnMultiples(arr, divisor) {
    this.cursor = 0;
    this.array = arr;
    this.divisor = divisor || 1;

}

iterateOnMultiples.prototype.next = function() {
    while (this.cursor < this.array.length) {
        var value = this.array[this.cursor++];
            if (value % this.divisor === 0) {
            return value;
        }
    }
};

iterateOnMultiples.prototype.hasNext = function() {
    var cur = this.cursor;
    while (cur < this.array.length) {
        if (this.array[cur++] % this.divisor === 0) {
            return true;
        }
    }
    return false;
};


var consumer = new iterateOnMultiples([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 3);
console.log(consumer.next(), consumer.hasNext()); // 3 true
console.log(consumer.next(), consumer.hasNext()); // 6 true
console.log(consumer.next(), consumer.hasNext()); // 9 false

Візьмемо потік кліку мишею.

start page

Програма для відстеження може мати такий вигляд.

$(document).on('click', (evt) => {
    console.log(evt);
})

Проблема тут у тому, що працювати з подіями не так просто як з масивами.

Наприклад, якщо ми хочемо відстежити перші 5 натискань.

var clicks = 0;
var registerClicks = $(document).on('click', (evt) => {
    if (clicks < 5) {
        clicks++;
        console.log(clicks);
    } else {
        $(document).off('click', registerClicks);
    }
})

Ми змушені вводити зовнішню змінну стану clicks та додаткові перевірки.

Все це називається побічними ефектами.

Як це виглядає у RxJs.

Rx.Observable.fromEvent(document, 'click')
.take(5)
.subscribe(function(c) { console.log(c.clientX, c.clientY) })

Встановлення Rx.

npm install rx --save

Увімкнення.

<script src="node_modules/rx/dist/rx.all.js"></script>

npm install rxjs --save

Встановлення RxJs.

Простого включення мало і необхідно скористатися завантажувачем.

<script src="node_modules/rxjs/Rx.js"></script>

start page

RxJs відрізняється від Rx більшою продуктивністю, підтримкою модульності та інструментами для дебага.

Якщо ми хочемо додати умову і відстежувати клік в області, то це робиться так:

Rx.Observable.fromEvent(document, 'click')
.filter(function(c) { return c.clientX > window.innerWidth / 2; })
.take(5)
.subscribe(function(c) { console.log(c.clientX, c.clientY) })

Таким чином Observable (відстеження) генерує події на кшталт ітератора, і проштовхує дані всередину передплатника (споживача), це називається механізмом push. На відміну від механізму pull у якому передплатник б запитував дані.

Ручне створення відстежуваного потоку (ВП).

var observable = Rx.Observable.create(function(observer) {
    observer.onNext('Simon');
    observer.onNext('Jen');
    observer.onNext('Sergi');
    observer.onCompleted(); // We are done
});

observable.subscribe((val) => {
    console.log(val);
})

start page

Найчастіше створювати такі отслеживаемые потоки годі й т.к. існує багато інструментів щодо їх створення із різноманітних подій.

З масиву.

Rx.Observable
.from(['1', '2', '3'])
.subscribe(
    function(x) { console.log('Next: ' + x); }
);

Із події.

var allMoves = Rx.Observable.fromEvent(document, 'mousemove');

allMoves.subscribe(function(e) {
    console.log(e.clientX, e.clientY);
});

Відстежимо переміщення у різних областях екрана.

var movesOnTheRight = allMoves.filter(function(e) {
    return e.clientX > window.innerWidth / 2;
});
var movesOnTheLeft = allMoves.filter(function(e) {
    return e.clientX < window.innerWidth / 2;
});

movesOnTheRight.subscribe(function(e) {
    console.log('Mouse is on the right:', e.clientX);
});
    movesOnTheLeft.subscribe(function(e) {
    console.log('Mouse is on the left:', e.clientX);
})

Комбінаторика потоків.

Посилання на корисну статтю

Посилання на основні оператори

Загальні позначення

start page

Merge

const a = stream('a', 200, 3, 'partial');
const b = stream('b', 200, 3, 'partial');
merge(a, b).subscribe(fullObserver('merge'));
// can also be used as an instance operator
a.pipe(merge(b)).subscribe(fullObserver('merge'));

start page

start page

Drad and drop

Создадим два блока.

<div id="out">
  <div id="in"></div>
</div>

Стили

#out {
    width: 200px;
    height: 200px;
    position: relative;
    border: 1px solid red;
}

#in {
    position: absolute;
    border-radius: 50%;
    background-color: red;
    width: 30px;
    height: 30px;
}

Определим 3 потока и привяжем их к элементам.

var box = $('#in');
var document = $('#out');
const mousedown$ = Rx.Observable.fromEvent(box, 'mousedown');
const mousemove$ = Rx.Observable.fromEvent(document, 'mousemove');
const mouseup$ = Rx.Observable.fromEvent(document, 'mouseup');

Переключимся с потока mousedown$ на mousemove$.

mousedown$.switchMap((evt) => mousemove$).subscribe((e) => {
    console.log(`${e.clientX}-${e.clientY}`);
})

Переключимся с mousemove$ на mouseup$.

mousedown$.switchMap((evtup) => 
    mousemove$.switchMap((evtdwn) => mouseup$))
.subscribe((e) => {
    console.log(`${e.clientX}-${e.clientY}`);
})

Передвинем блок.

mousedown$.switchMap((evtup) => 
    mousemove$.switchMap((evtdwn) => mouseup$))
.subscribe((e) => {
    console.log(`${e.clientX}-${e.clientY}`);
    box.css({ top: e.offsetY+'px' });
    box.css({ left: e.offsetX+'px' });

})

Передвигаем в момент движения.

mousedown$.switchMap((evtup) => mousemove$)
.subscribe((e) => {
    console.log(`${e.clientX}-${e.clientY}`);
    box.css({ top: e.offsetY+'px' });
    box.css({ left: e.offsetX+'px' });

})

Прекращаем передвигать при mouseup при помощи takeUntil.

mousedown$.switchMap((evtup) => mousemove$.takeUntil(mouseup$))
.subscribe((e) => {
    console.log(`${e.clientX}-${e.clientY}`);
    box.css({ top: e.offsetY+'px' });
    box.css({ left: e.offsetX+'px' });
})

Обращаем внимание что takeUntil применяется к отслеживаемому потоку mousemove$ т.к. если его применить к тому что возвращает switchMap, например так:

mousedown$.switchMap((evtup) => mousemove$)
.takeUntil(mouseup$)
.subscribe((e) => {
    console.log(`${e.clientX}-${e.clientY}`);
    box.css({ top: e.offsetY+'px' });
    box.css({ left: e.offsetX+'px' });

})

То объект будет передвинут но после этого произойдет отписка от потока mousedown$ и больше передвигать станет невозможно.

Игра звездные войны.

Создадим канвас.

var canvas = document.createElement('canvas');
var ctx = canvas.getContext("2d");
document.body.appendChild(canvas);
canvas.width = window.innerWidth;
canvas.height = window.innerHeight;

Создадим масив из случайных звезд

var SPEED = 40;
var STAR_NUMBER = 250;
var StarStream = Rx.Observable.range(1, STAR_NUMBER)
.map(function() {
    return {
        x: parseInt(Math.random() * canvas.width),
        y: parseInt(Math.random() * canvas.height),
        size: Math.random() * 3 + 1
    };
})

StarStream.subscribe((evt) => {
    console.log(evt);
})

Преобразуем в массив .toArray();.

var StarStream = Rx.Observable.range(1, STAR_NUMBER)
.map(...).toArray();

Закрасим небо в черный и включим это в подписку.

StarStream.subscribe((evt) => {
    paintStars();
})

function paintStars() {
    ctx.fillStyle = '#000000';
    ctx.fillRect(0, 0, canvas.width, canvas.height);
}

Операторы flatMap/mergeMap и switchMap

Используется когда нужно объединить данные из внутреннего отслеживаемого потока (ОП) но хотите контролировать число внутренних подписчиков. Например когда мы используем switchMap каждый внутренний подписчик завершается при генерации данных новым ОП. Таки образо в каждый момент времени активен один ОП (источник данных).

Тогда как mergeMap позволяет быть активным многим подписчикам одновременно из разных ОП.

start page

start page

Создадим новый ОП в операторе switchMap с заданным интервалом, в котором будем пересчитывать координаты каждой звезды.

var StarStream = Rx.Observable.range(1, STAR_NUMBER)
.map(function() {
    return {
        x: parseInt(Math.random() * canvas.width),
        y: parseInt(Math.random() * canvas.height),
        size: Math.random() * 3 + 1
    };
})
.toArray()
.switchMap((starArray) => {
    return Rx.Observable.interval(SPEED).map(function() {
        starArray.forEach(function(star) {
            if (star.y >= canvas.height) {
                star.y = 0; // Reset star to top of the screen
            }
            star.y += 3; // Move star
        });
        return starArray;
    });
});

Изменим подписку т.к. теперь в нее будет попадать масив звезд 40 раз в секунду.

StarStream.subscribe((starsArray) => {
    paintStars(starsArray);
})

Вводим массив звезд аргументом в функцию paintStars и отрисовываем в цикле.

function paintStars(stars) {
    ctx.fillStyle = '#000000';
    ctx.fillRect(0, 0, canvas.width, canvas.height);
    ctx.fillStyle = '#ffffff';
    stars.forEach(function(star) {
        ctx.fillRect(star.x, star.y, star.size, star.size);
    });
}

Добавляем ОП (отслеживаемый поток) космического корабля.

var HERO_Y = canvas.height - 30;
var mouseMove = Rx.Observable.fromEvent(canvas, 'mousemove');

var SpaceShip = mouseMove
.map(function(event) {
    return {
        x: event.clientX,
        y: HERO_Y
    };
})
.startWith({
    x: canvas.width / 2,
    y: HERO_Y
});


SpaceShip.subscribe((obj) => console.log(obj))

Добавим функции отрисовки трехугольника и включим ее в подписку.

function drawTriangle(x, y, width, color, direction) {
    ctx.fillStyle = color;
    ctx.beginPath();
    ctx.moveTo(x - width, y);
    ctx.lineTo(x, direction === 'up' ? y - width : y + width);
    ctx.lineTo(x + width, y);
    ctx.lineTo(x - width,y);
    ctx.fill();
}

function paintSpaceShip(obj) {
    drawTriangle(obj.x, obj.y, 20, '#ff0000', 'up');
}

SpaceShip.subscribe((obj) => paintSpaceShip(obj))

Проблема в том что отрисовка звезд стирает корабль. Нам необходимо обьеденить эти два потока и сначало отрисовывать звезды, а потом корабль.

Обьеденим отрисовки в отдельной функции, которая получит обьект с персонажами.

function renderScene(actors) {
    paintStars(actors.stars);
    paintSpaceShip(actors.spaceship);
}

Создадим новый поток игры.

var Game = Rx.Observable
.combineLatest(
StarStream, SpaceShip,
function(stars, spaceship) {
    return { stars: stars, spaceship: spaceship };
});

Работа функции combineLatest

start page

Подпишем renderScene к потоку Game.

Game.subscribe(renderScene);

Уберем

// StarStream.subscribe((starsArray) => {
//     paintStars(starsArray);
// })

Генерация врагов.

Будем создавать массив раз в 1.5 сек.

var ENEMY_FREQ = 1500;
var Enemies = Rx.Observable.interval(ENEMY_FREQ)
.scan((enemyArray) => {
    var enemy = {
    x: parseInt(Math.random() * canvas.width),
    y: -30,
};
enemyArray.push(enemy);
return enemyArray;
}, []);

Enemies.subscribe((val) => console.log(val))

Функция scan применяет заданную функцию к каждому элементу потока, причем второй аргумет использует в качестве начального значения.

Добавляем третий поток в combineLatest.

var Game = Rx.Observable
.combineLatest(
StarStream, SpaceShip, Enemies,
function(stars, spaceship, enemies) {
    return { stars: stars, spaceship: spaceship, enemies: enemies };
});

start page

Создадим функцию генерации случайных координат.

function getRandomInt(min, max) {
    return Math.floor(Math.random() * (max - min + 1)) + min;
}

Отрисовываем со сдвигом.

function paintEnemies(enemies) {
    enemies.forEach(function(enemy) {
        enemy.y += 5;
        enemy.x += getRandomInt(-15, 15);
        drawTriangle(enemy.x, enemy.y, 20, '#00ff00', 'down');
    });
}

Включаем в прорисовку.

function renderScene(actors) {
    paintStars(actors.stars);
    paintSpaceShip(actors.spaceship);
    paintEnemies(actors.enemies);
}

Добавим sample(SPEED) в combineLatest тем самым скажем “никогда не отдавай данные чаще 40 раз в секунду”

var Game = Rx.Observable
.combineLatest(
StarStream, SpaceShip, Enemies,
function(stars, spaceship, enemies) {
    return { stars: stars, spaceship: spaceship, enemies: enemies };
}).sample(SPEED);

Стрельба.

Смержим два события в поток не чаще 5 раз в сек.

var playerFiring = Rx.Observable.fromEvent(canvas, 'click')
.sample(200)
.timestamp();

Соединим ОП корабля и стрельбы чтобы начать стрелять из координат текущего корабля.

var HeroShots = Rx.Observable
.combineLatest(
    playerFiring,
    SpaceShip,
    function(shotEvents, spaceShip) {
        return { x: spaceShip.x };
    })
.scan(function(shotArray, shot) {
    shotArray.push({x: shot.x, y: HERO_Y});
    return shotArray;
}, []);

Создадим функцию отрисовки пулек.

var SHOOTING_SPEED = 15;
function paintHeroShots(heroShots) {
    heroShots.forEach(function(shot) {
        shot.y -= SHOOTING_SPEED;
        drawTriangle(shot.x, shot.y, 5, '#ffff00', 'up');
    });
}

Включим ее в renderScene

function renderScene(actors) {
    paintStars(actors.stars);
    paintSpaceShip(actors.spaceship);
    paintEnemies(actors.enemies);
    paintHeroShots(actors.heroShots);
}

Добавим пульки в сцену.

var Game = Rx.Observable
.combineLatest(
StarStream, SpaceShip, Enemies, HeroShots,
function(stars, spaceship, enemies, heroShots) {
    return { 
        stars: stars, 
        spaceship: spaceship, 
        enemies: enemies,
        heroShots: heroShots
    };
}).sample(SPEED);

Изменим генерацию пуль исходя из timestamp.

var HeroShots = Rx.Observable
.combineLatest(
    playerFiring,
    SpaceShip,
    function(shotEvents, spaceShip) {
        return { x: spaceShip.x, timestamp: shotEvents.timestamp,};
    })
.distinctUntilChanged(function(shot) { return shot.timestamp; })
.scan(function(shotArray, shot) {
    shotArray.push({x: shot.x, y: HERO_Y});
    return shotArray;
}, []);

start page

Фронтенд розробка JavaScript. -> Домашнє завдання.

Домашнє завдання.

Додати картинки до персонажів.

Додати рух корабля координатою Y.

Додати звук пострілу.

Додати до цілей функцію відсрілюватися такими ж кульками червоного кольору.

Фронтенд розробка JavaScript. -> Знаходження найбільшого спільного дільника.
File /home/webmaster/it-course/frontend-js-ua/15-reactive/common-devider.md does not exist!

RxJs оператори.

Посилання на джерело

У RxJs існує два види операторів.

  1. Канальні (pipeble), які можуть бути вставлені у канал наступним чином.

    observableInstance.pipe(operator())

Наприклад filter(…) або mergeMap(…)

Вони не змінюють існуючий екземпляр потоку, а повертають новий потік, у якого логіка підписки така сама, як і у вихідного потоку.

Канальні оператори – це функції, які приймають вихідний потік як аргумент і повертають новий потік, не змінюючи вихідного.

  1. Творчі оператори.

Вони створюють нові потоки із різних джерел (масивів, подій тощо).

Наприклад of(1, 2, 3) створить відстежуваний потік, який згенерує 1, 2, і 3 один за одним.

Відмінність of і from у тому, що of([1,2,3]) згенерує потік з одного елемента масиву, а from([1,2,3]) з 3-х.

Канальність

Канальні оператори - це прості функції, які можуть бути вкладені в канал так:

obs.pipe(
  op1(),
  op2(),
  op3(),
  op3(),
)

Приклад творення потоку з допомогою операторів.

import { interval } from 'rxjs';

const observable = interval(1000 /* number of milliseconds */);

Потоки найвищого рівня.

Спостерігаються потоки (НП) зазвичай генерують значення простих типів як числа, масиви і т.д.