angular observable reactive programming

in Programmazione

Angular, RxJs e Observable

Reading Time: 6 minutes

Usare Reactive Programming significa programmare utilizzando flussi di dati asincroni, utilizzanndo una serie di funzioni a cascata (le pipe) che gestiscono i dati in arrivo.

Già in un precedente post avevo introdotto RxJs. In questo post entriamo all’interno della libreria un pò più nello specifico.

Possiamo pensare agli eventi nel DOM e che possono essere gestiti tramite jQuery: nel mondo Reactive il click può essere paragonato ad un flusso asincrono e dovrà essere impostato un oggetto che osserverà il flusso stesso.

Tutto può essere considerato come un flusso: variabili di input da form utente, proprietà pubbliche di classi, strutture dati, cache, ecc…

Per poter “intercettare” le modifiche ad un flusso è necessario creare un oggetto che “osservi” il flusso stesso e gestisca i suoi cambiamenti.

Una delle librerie piu’ utilizzate per la gestione dei flussi è RxJs che consente oltre che la creazione del flusso, anche la combinazione con altri flussi ed operazioni di filtro. L’utilizzo di filtri sul flusso viene utilizzato ad esempio per escludere particolari valori in arrivo oppure per recuperare solo alcuni eventi.

A partire dalla versione 2.0 di Angular è stata introdotta RxJs che fornisce l’implementazione di un particolare tipo di dato, chiamato Observable. Questo particolare tipo di dato viene spesso utilizzato all’interno di operazioni asincrone. Ma non solo!

Basti pensare che in angular esistono delle particolari form, chiamate Reactive, che hanno proprietà che sono proprio di tipo Observable per monitorare l’input dell’utente. Un’ altro esempio, molto utilizzato, è quello relativo alle chiamate HTTP: i moduli di gestione delle chiamate verso API ritornano dati di tipo Observable.

Differenza tra Promise e Observable

Una variabile di tipo Promise potrebbe non essere disponibile al momento della sua definizione, ma potrebbe esserlo nel futuro. Inoltre, una Promise una volta valorizzata, esegue subito la funzione ad essa associata. Per questo tipo di comportamento le Promise vengono dette Eager. Le Promise possono avere tre stati che sono FullFilled, Pending o Reject, lavorando di fatto su un singolo valore. Inoltre non è possibile non esistono funzioni che consentono di riportare una Promise allo stato iniziale.

Un Observable, è una tipo di dato che rimanda la sua esecuzione fino a quando non arriva un elemento del flusso. Per questo tipo di comportamento Observable viene detto Lazy. La sua esecuzione non si ferma ad una sola esecuzione, ma può durare nel tempo: in base al numero di elementi presenti nello stream. Inoltre, presentano una serie di operatori che possono essere posizionati durante l’arrivo dello stream e che permettono, tra l’altro, di riportarle a stati precedenti.

Observable

Gli Observable di Angular sono uno stream di dati. Lo stream ha come è logico una sorgente: il risultato di una chiamata Api Remota, un evento generato nel DOM, i dati provenienti da una query su un database, ecc.

Trattandosi di un flusso, è necessario effettuare la sottoscrizione (subscribe): ogni volta che si presenta un nuovo dato sullo stream, viene eseguita una funzione. Tutto questo ovviamente in modalità asicrona.

Le promise ritornano un singolo valore, mentre gli observable possono restituirne piu’ di uno, ad esempio quando lo stream non si esaurisce con uno specifico evento, ma che prosegue nel tempo.

Rispetto alle callback e alle promise, presenti in javascript, gli observable consentono di intercettare il valore dello stream, prima che venga eseguita la funzione indicata in subscribe. Questa funzionalità è resa possibile concatenando funzioni .pipe, prima della subscribe.

observable.subscribe(function(){...})

Nell’esempio precedente, viene effettuata la sottoscrizioene ad una generica variabile observable (chiamata observable). La funzioen function(){…} viene eseguita ogni volta che all’interno della variabile observable arriva un nuovo valore.

Semplificando di molto possiamo considerare il codice seguente, utilizzato per la gestione di un evento click() nel DOM:

document.addEventListener('click', () => console.log('Clicked!'));

il codice è piuttosto semplice: aggiunge il listener all’evento click eseguendo il bind su una funzione senza parametri che esegue console.log(…).

Utilizzando RxJs la stessa riga di codice verrebbe scritta nel seguente modo:

import { fromEvent } from 'rxjs';
fromEvent(document, 'click').subscribe(() => console.log('Clicked!'));

dove aver aggiunto il riferimento a rxjs,

this.route.paramMap
 .pipe(map(params => params.get('id')))
 .pipe(switchMap(id => this.api.getProject(id)));
 .subscribe(project => {...});

L’esempio precedente (anche senza conoscere il significato delle funzioni map e switchmap) mostrano l’utilizzo delle funzioni pipe concatentate prima dell’effettiva sottoscrizione con il metodo subscribe. In pratica, come già descritto in precedenza, un nuovo valore nello stream verrà prima processato all’interno delle pipe e successivamente elaborato all’interno della funzione project, definita all’interno della subscribe.

Observer

Utilizzando dati di tipo Observable, abbiamo bisogno di un oggetto che osservi un dato e che sia in grado di riceverne le notifiche. Senza entrare nel dettaglio tecnico, la definizione di un dato di tipo Observer:

interface Observer<T> {
  closed?: boolean;
  next: (value: T) => void;
  error: (err: any) => void;
  complete: () => void;
}

dove:

  • next: è il metodo che deve essere sempre presente perchè viene richiamato quando viene inviato un nuovo elemento allo stream. In pratica, la funzione che deve essere eseguita.
  • error: è il metodo che viene utilizzato dall’Observer quando riceve un errore
  • complete: questa funzione viene utilizzata per ricevere un notifica senza valore
  • closed: è un boolean che indica se l’Observer ha già richiesto di non ricevere piu’ notifiche

Nota Importante! Un dato di tipo Observer smette di ricevere dati dallo stream quando riceve una notifica error, oppure una notifica complete.

A questo punto siamo pronti per descrivere gli operatori comuni che possono essere applicati ad un Observable.

Operatori Comuni

Utilizzando il principio della pipe applicabili sullo stream, sono disponibili una serie di funzioni che effettuano modifiche ed integrazioni ai singoli dati. Vediamo quindi le più utilizzate.

Operatori per la creazione di Observable

of(): operatore che crea un Observable che emette i valori uno di seguito all’altro. Al termine dell’esecuzione viene notificato il completamento tramite la chiamata completed.

from(): operatore simile ad of(), con la differenza che la source dei dati arriva da oggetti come Array e Promise.

fromEvent(source, ‘event’): crea un Observable a partire da un evento. Ad esempio un evento che si è verificato sul DOM. Alla funzione devono essere passati due argomenti: il primo rappresenta l’oggetto che emette l’evento, mentre il secondo il tipo di evento.

interval(period:number): è un operatore che consente di generare una sequenza infinita di numeri uno ogni period in millisecondi.

Pipe

Questi operatori sono stati introdotti a partire da RxJs versione 5.5.

Sono un particolare tipo di operatori che consentono di intercettare il normale flusso dello stream e possono modificare i valori dei dati Observable. Le pipe sono molto utilizzate per combinare più operatori che verranno processati nell’ordine in cui sono stati inseriti.

filter: consente di filtrare in base ad una condizione e restituire solo i dati che soddisfano il filtro passato come parametro

map: è un operatore che consente di modificare il dato passato in ingresso, trasformandolo in qualcosa d’altro

const source = interval(1000);

const newObservable = source.pipe(
  filter(value => value % 2 === 0),
  map(value => value * 2)
)

const observer: PartialObserver<number> = {
  next(value: number) {console.log(value)},
  complete() {console.log('complete')}
};

newObservable.subscribe(observer);

nell’esempio precedente viene generato uno stream che emette un numero ogni secondo. Viene definito un oggetto (newObservable) che è il risultato della combinazione della funzione filter e map. La filter consente di estrarre i valori che sono multipli di due, mentre la map di ottenere i valori filtrati e moltiplicati per 2.

tap(): consente di eseguire delle operazioni ad ogni emissione del Observable, senza però di fatto alterarne il valore originario. Non modificando il valore originale, spesso viene utilizzato per operazioni di debug. Prima della versione 5.5 di Rx questa funzione si chiamava do().

const newObservable = source.pipe(
  tap(value => console.log('valore prima di filter: ', value)),
  filter(value => value % 2 === 0),
  tap(value => console.log('valore dopo filter: ', value)),
  map(value => value * 2),
  tap(value => console.log('valore dopo map: ', value)),
)

take(n:number): consente di estrarre un numero n di elementi all’interno di un Observable.

first(): la funzione first consente di estrarre il primo elemento di uno stream

skip(n: number): consente di skippare un numero n di elementi all’interno dello stream

takeLast(n:number): consente di prendere gli ultimi n elementi dello stream

last(): consente di estrarre l’ultimo elemento dello stream

takeUntil(notifier: Observable): restituisce i dati passati dall’Observable sorgente fino a quando un secondo Observable (notifier) non emette un valore

startWith(): consente di generare un Observable che emette gli elementi specificati come argomenti prima di inviare i dati provenienti dall’Observable sorgente

const source = range(3, 3);

const newObservable = source.pipe(
  startWith(0, 1, 2)
)
//  (012345|)

concat():restituisce un nuovo Observable che concatena più sorgenti emettendo un unico Observable con i valori dei due Observable in sequenza

merge(): restituise un nuovo Observable che combina e mescola i dati provenienti da due Observable

switchMap: consente di effettuare il subscribe sia fatto solo sull’Observable piu’ interno. In pratica viene effettuata la cancellazione dalla subscribe precedente e viene creata una nuova subscribe verso l’observable piu’ interno. Supponiamo di voler effettuare una chiamata API ogni volta che sul flusso esterno si presenta un elemento:

const getData = (param) => {
  return of(`retrieved new data with param ${param}`).pipe(
    delay(1000)
  )
}

utilizzando soltanto l’operatore map:

from([1,2,3,4]).pipe(
  map(param => getData(param))
).subscribe(val => val.subscribe(data => console.log(data)));

introducendo la funzione switchAll il codice precedente diventerebbe:

from([1,2,3,4]).pipe(
  map(param => getData(param)),
  switchAll()
).subscribe(val => console.log(val));

che può essere compattato usando swithMap:

from([1,2,3,4]).pipe(
  switchMap(param => getData(param))
).subscribe(val => console.log(val));

retry(n:number): restituisce un Observable identico a quello sorgente, con la sola differenza che in caso di errore verranno fatti n tentativi di iscrizione all’Observable sorgente

Conclusioni

In questo post abbiamo introdotto la libreria RxJs, analizzando il concetto di Reactive Programming. Inoltre sono stati introdotti gli Observable e le modalità di utilizzo tramite l’utilizzo di operatori piu’ comuni.