Kolumne: Enterprise Angular
Mit den Streaming Resources dringen das Resource API sowie Signals in Bereiche von RxJS vor – sie erlauben es, kontinuierliche Datenströme darzustellen. Neben den Gemeinsamkeiten zu RxJS gibt es jedoch auch einige nicht offensichtliche, jedoch besonders wichtige semantische Unterschiede, die es zu berücksichtigen gilt.
Heute gehen wir anhand von zwei Beispielen auf die Details rund um Streaming Resources sowie auf die semantischen Feinheiten und das Zusammenspiel mit RxJS ein. Den Quellcode finden Sie unter [1].
Um die Nutzung von Streaming Resources zu demonstrieren, kommen hier zwei Beispiele zum Einsatz. Das erste ist quasi das „Hello World“ der reaktiven Programmierung: ein einfacher Timer, der im Sekundentakt hochzählt (Abb. 1).
Abb. 1: Ein einfacher Timer auf Basis einer Streaming Resource
Das Schöne an diesem sehr einfachen Beispiel ist, dass es bereits vieles enthält, an dem die kleinen, jedoch wichtigen Feinheiten von Streaming Resources diskutiert werden können. Eine Variante dieses Beispiels kommt auch zum Einsatz, um die Interoperabilität mit RxJS zu zeigen.
Das zweite Beispiel ist ein wenig praxisnäher. Es handelt sich dabei um einen einfachen WebSockets-basierten Chat auf der Basis einer Streaming Resource (Abb. 2).
Abb. 2: Ein einfacher Chat auf Basis einer Streaming Resource
Dieses Chatbeispiel unterscheidet sich technisch gesehen in einem wichtigen Punkt vom Timer: Hier sind zum selben Zeitpunkt alle empfangenen Nachrichten und nicht nur die letzte von Bedeutung. Das stellt eine kleine Herausforderung in der Signal-basierten Welt dar, die sich allerdings lösen lässt.
Als Chatserver kommt ein Fork eines Beispielprojektes aus dem Mozilla Developer Network zum Einsatz [2]. Dieser Node-basierte Server liegt dem Beispielprojekt [1] bei und die readme.md zeigt, wie er sich starten lässt.
Im Gegensatz zu herkömmlichen Resources liefert der Loader einer Streaming Resource immer die folgende Struktur zurück:
PromiseLike<Signal<
{ value: T; }
| { error: unknown;}>>;
Es handelt sich hierbei um ein Promise mit einem Signal, das ein Objekt vorhält. Das Objekt verweist entweder auf den zuletzt veröffentlichten Wert (value) oder auf einen aufgetretenen Fehler. Dadurch, dass das Signal im Verlauf der Zeit mehrere Werte bzw. Fehlerzustände annimmt, repräsentiert es einen Datenstrom.
Um in weiterer Folge den Programmcode zu vereinfachen, definiere ich für den Inhalt dieses Signals einen Typ namens StreamItem (Listing 1). Angular bietet derzeit dafür keinen expliziten Typ an. Das soll sich aber im Laufe der weiteren Entwicklung noch ändern.
Listing 1
export type StreamItem<T> =
| {
value: T;
}
| {
error: unknown;
};
Um eine Streaming Resource zu definieren, nutzt der Programmcode die Funktion resource, die auch bei herkömmlichen Resources zum Einsatz kommt. Anstatt eines Loaders bekommt diese Funktion jedoch einen Streaming Loader über die Eigenschaft stream übergeben (Listing 2). Da dieses Beispiel das Schlüsselwort async nutzt, ergibt sich das Promise, das das Signal bereitstellt, implizit.
Listing 2
const myResource = resource({
request: requestSignal,
stream: async (params) => {
// 1. Create Signal representing the Stream
const result = signal<StreamItem<number>>({
value: 4711
});
// 2. Set up async logic updating the Signal
[…]
// 3. Set up clean-up handler triggered by AbortSignal
params.abortSignal.addEventListener('abort', () => {
[…]
});
// 4. Return Signal
return result;
},
});
Typischerweise lässt sich ein Streaming Loader in vier Teile untergliedern, die im gezeigten Beispiel mit Kommentaren angedeutet sind:
Zunächst erzeugt der Streaming Loader ein neues Signal, das den Datenstrom repräsentiert. Dieses Signal bekommt einen Initialwert.
Danach startet der Streaming Loader eine asynchrone Operation, die im Laufe der Zeit mehrere Ergebnisse liefert. Diese Ergebnisse veröffentlicht er nach und nach über das Signal.
Der Streaming Loader ist auch für das Bereitstellen einer Aufräumlogik verantwortlich. Diese beendet die zugrundeliegende asynchrone Operation, wenn ihre Werte nicht mehr benötigt werden.
Am Ende liefert der Streaming Loader das Signal zurück.
Für das Bereitstellen der Aufräumlogik nutzt der Streaming Loader das vom Resource API bereitgestellte AbortSignal, das sich im übergebenen Parameterobjekt findet. Interessanterweise entsprechen diese Punkte mehr oder weniger auch der typischen Vorgehensweise bei der direkten Nutzung des Observable-Konstruktors in der RxJS-Welt.
Die definierte Aufräumlogik läuft immer dann, wenn die Anwendung den aktuellen Datenstrom nicht mehr benötigt. Dafür gibt es zwei Gründe: Der erste tritt dann ein, wenn Angular den Building Block zerstört, der die Resource beherbergt. Stellen wir uns dazu eine Komponente mit einer Resource vor. Beim Verlassen zerstört der Router diese Komponente und im Rahmen dessen zerstört Angular auch die Resource.
Der zweite Grund ergibt sich, wenn sich das request-Signal der Resource ändert. Jede Änderung triggert den Streaming Loader und dieser liefert daraufhin einen neuen Stream. Beim Übergang zwischen diesen Streams nutzt das Resource API somit dieselbe Semantik wie switchMap in RxJS (Abb. 3).
Abb. 3: switchMap-Semantik beim Übergang zu einem neuen Stream
Das bedeutet, dass die Resource immer nur den neuesten Stream konsumiert. Diese Vorgehensweise ist die gleiche, die in der Regel beim Laden von Daten zum Einsatz kommt. Wie generell bei Signals in Angular geht es also auch hier darum, für Standardfälle einfache Konzepte anzubieten. Für komplexere Szenarien kann die Anwendung auf Bibliotheken wie RxJS zurückgreifen.
Nachdem wir nun den prinzipiellen Aufbau einer Streaming Resource besprochen haben, möchte ich die konkrete Nutzung anhand eines ersten Beispiels demonstrieren: Ein Timer, der eine Zahl in einem bestimmten Intervall hochzählt. Listing 3 zeigt diese timerResource aus Sicht eines Konsumenten.
Listing 3
@Component([…])
export class TimerResourceComponent {
ResourceStatus = ResourceStatus;
startValue = signal(0);
timer = timerResource(1000, this.startValue);
forward(): void {
this.startValue.update((v) => nextSegment(v));
}
}
function nextSegment(currentValue: number): number {
return Math.floor(currentValue / 100) * 100 + 100;
}
Der Timer liefert einen Stream, der beim übergebenen Startwert zu zählen beginnt. Der erste Parameter von timerResource repräsentiert das gewünschte Intervall in Millisekunden. Den Startwert repräsentiert das Signal startValue. Immer wenn er sich ändert, wechselt der Timer zu einem neuen Stream. Um diesen Umstand zu demonstrieren, springt die Methode forward zum nächsten vollen Hunderter, also z. B. von 17 auf 100 oder von 123 auf 200.
Um den Einsatz der Streaming Resource für den Timer zu vereinfachen, handelt es sich bei der Funktion timerResource lediglich um eine Factory (Listing 4).
Listing 4
export function timerResource(
timeout: number,
startValue: () => number
): ResourceRef<number | undefined> {
const request = computed(() => ({
startValue: startValue(),
}));
const result = resource({
request: request,
stream: async (params) => {
const counter = params.request.startValue;
[…]
}
});
return result;
}
Diese Factory nimmt das gewünschte...