Überall ist es zu lesen und zu hören, das Thema Reactive. Meistens im Zusammenhang mit einem Framework, das dem Entwickler viele und mächtige Möglichkeiten an die Hand gibt. Es sorgt allerdings auch dafür, dass er eine weitere Abhängigkeit in seinem Projekt vorfindet. Muss das sein? Nein, hier kann man sehr oft schon mit Core Java zum Ziel kommen, ohne Unmengen an Quelltext zu erzeugen.
Vereinfacht lässt sich sagen, dass es sich hierbei um alle möglichen Ausprägungen des Observer-Patterns handelt. Genau an dieser Stelle werden viele Leser aufschreien, die sich schon damit beschäftigt haben. Jedoch sehen wir uns das erst einmal ganz einfach an. Das Grundprinzip ist, dass der Aufrufer nicht solange wartet, bis das Ergebnis berechnet worden ist. Es wird die Aufgabe formuliert und der nächsten Stufe übergeben. Wenn dies geschehen ist, wird sich diese schon zurückmelden. Im ersten Schritt bedeutet das, dass ich nicht selbst den Aufruf starte, sondern er gestartet wird. Schon sind wir beim Observer-Pattern. Die verarbeitende Einheit meldet sich beim Produzenten von Informationseinheiten an. Ist eine Informationseinheit bereit, verarbeitet zu werden, werden die Listener explizit mit diesem Wert aufgerufen. Der jeweilige Listener wiederum verarbeitet sein eigenes Ergebnis auf die gleiche Art und Weise und liefert es dann auch aus. Schon haben wir eine beliebig lange Kette an Operatoren, die aufeinander aufbauend ein Ergebnis berechnen.
Listing 1: Wrapper um eine „Map<KEY, Consumer<VALUE>>“
java
public class Observable<KEY, VALUE> {
private final Map<KEY, Consumer<VALUE>> listeners = new ConcurrentHashMap<>();
public void register(KEY key , Consumer<VALUE> listener) {
listeners.put(key , listener);
}
public void unregister(KEY key) { listeners.remove(key);}
public void sendEvent(VALUE event) {
listeners.values().forEach(c -> c.accept(event));
}
}
Implementieren wir als Erstes eine einfache Version des Observer-Patterns. Dahinter verbirgt sich nichts anderes als ein Wrapper um eine Map<KEY, Consumer<VALUE>> (Listing 1). Die Map selbst wird mittels ConcurrentHashMap realisiert, damit wir zumindest hier keine Nebenläufigkeitsprobleme bekommen. Können wir doch nicht vorhersehen, wie viele unterschiedliche Threads zu welchem Zeitpunkt Ereignisse erzeugen werden. Ob das reichen wird? Wir kommen später noch dazu. Unter einem Schlüssel kann sich ein Verbraucher registrieren und auch selbst wieder entfernen, sobald er keine weiteren Events mehr empfangen möchte. Wird ein Event erzeugt und dem Observer übergeben, wird dieses Event an alle registrierten Verbraucher übergeben. Was der Verbraucher damit anstellt, ist ihm selbst überlassen. Hier zeigt sich aber auch das erste Problem. Ein Event, also die Daten oder besser gesagt die Datenstruktur, wird jedem Verbraucher übergeben. Das bedeutet, dieselbe Instanz wird jedem Verbraucher übergeben. Es wird einfach angenommen, dass der jeweilige Verbraucher den Inhalt des Ereignisses nicht verändert. Eine sehr gewagte Anforderung. Kommen wir zu der Implementierung selbst. Diese ist im Moment noch einfach. Die Verwendung unseres Verbrauchers sieht so aus:
java
public static void main(String[] args) {
final Observable<String, String> observable = new Observable<>();
observable.register("key1", System.out::println);
observable.register("key2", System.out::println);
observable.sendEvent("Hello World");
observable.unregister("key1");
observable.sendEvent("Hello World again");
}
Die dazugehörige Ausgabe ist:
Hello World
Hello World
Hello World again
Listing 2: Observer statisch zur Verfügung stellen
java
public class Registry {
private static final Observable<String, String> observable = new Observable<>();
public static void register(String key, Consumer<String> consumer){
observable.register(key, consumer);
}
public static void unregister(String key){
observable.unregister(key);
}
public static void sendEvent(String input){
observable.sendEvent(input);
}
}
Selbst diese einfache Implementierung führt allerdings immer wieder dazu, dass Memory Leaks produziert werden. Ok, Memory Leaks und Java? Doch das geht! Kommen wir zu einem recht typischen Anwendungsfall. Oft werden solche Observer benutzt, um eine Art Eventservice aufzubauen. Dazu wird der Observer statisch zur Verfügung gestellt (Listing 2). Nun können von allen möglichen Stellen im Programm Listener registriert werden. Bei der Verwendung hat sich nicht viel geändert.
java
public static void main(String[] args) {
Registry.register("key1" , System.out::println);
Registry.register("key2" , System.out::println);
Registry.sendEvent("Hello World");
Registry.unregister("key1");
Registry.sendEvent("Hello World again");
}
Leider passiert es immer wieder, dass in einem Programm Verbraucher registriert werden, die leider nie wieder entfernt werden. Das kann einerseits dazu führen, dass der Observer selbst einfach überlaufen wird, andererseits kann es auch verhindern, dass bestimmte Teile der Anwendung von dem Garbage Collector erfasst werden können. Nehmen wir als Beispiel eine Webanwendung, die in dem Consumer eine grafische Einheit, z. B. eine Instanz von einem Label, verwendet. Wird die Session geschlossen, hängt diese Einheit leider immer noch im Consumer und damit im Observer. Solange dieser nicht aufräumt oder selbst dem Garbage Collector zum Opfer fallen kann oder darf, kann es sein, dass alle indirekt enthaltenen Daten über das Label gehalten werden. Was fehlt, ist ein komfortabler Weg für den Entwickler, dies automatisch erledigen zu lassen.
Listing 3: Interface „Registration“ definieren
java
public static interface Registration {
public void remove();
}
public class Observable<KEY, VALUE> {
private final Map<KEY, Consumer<VALUE>> listeners = new ConcurrentHashMap<>();
public Registration register(KEY key , Consumer<VALUE> listener) {
listeners.put(key , listener);
return () -> listeners.remove(key);
}
public void sendEvent(VALUE event) {
listeners.values().forEach(c -> c.accept(event));
}
}
Ein Lösungsansatz besteht darin, dass die Elemente, die mit dem Observer interagieren, über einen Lebenszyklus verfügen. Damit meine ich nicht die Methode finalize. Diese sollte so lange wie möglich nicht verwendet werden. Als Beispiel benutzen wir wieder die Webanwendung. Wenn die Komponenten beispielsweise über attach() und detach() verfügen, kann man das verwenden, um sich wieder vom Observer zu lösen. Dazu definieren wir ein Interface Registration, das als Rückgabewert der registrierenden Methode verwendet wird (Listing 3). Hier wird das Lösen der Verbindung hinterlegt. Wie auch schon beim Observer, gibt es keine Methode mehr, um einen Eintrag explizit zu entfernen.
java
public class Registry {
private static final Observable<String, String> observable = new Observable...