Entwurf und Implementierung reaktiver Systeme mit Quarkus – Teil 4
Entwurf und Implementierung reaktiver Systeme mit Quarkus – Teil 4
Im vierten und letzten Teil unserer Reihe wollen wir mehrere reaktive Applikationen nachrichtenbasiert über Apache Kafka miteinander kommunizieren lassen. Spannend ist dabei, welche Werkzeuge uns Quarkus an die Hand gibt, um eine möglichst einfache und nahtlose Integration zu ermöglichen.
Im vorigen Artikel haben wir einen Mitarbeiterregistrierungsdienst implementiert, über den Abteilungsverantwortliche neu eingestellte Mitarbeiter:innen registrieren oder deren Austritt aus dem Unternehmen signalisieren können. Diese Ereignisse publizieren wir nun in Form von Events, sodass auch andere Systeme mit diesen Daten arbeiten können. In unserer Beispielarchitektur ist das ein Indexierungsdienst für eine Solr-gestützte Mitarbeitersuchmaschine. Dieser Dienst abonniert die Events, die der Registrierungsdienst emittiert, und aktualisiert den Index der Suchmaschine (Abb. 1). Beide Dienste müssen sich auf ein gemeinsames Datenformat verständigen. Um das Beispiel einfach zu halten, nutzen wir JSON als Serialisierungsformat. Den vollständigen Code zu unserer Beispielanwendung finden Sie auf GitHub [1].
Die Events bilden wir über Java POJOs ab, die mit Jackson-Annotationen behaftet sind. Die Basisklasse für unsere Events, EmployeeEvent, zeigt Listing 1. Ein EmployeeEvent kapselt allgemeingültige Attribute, wie etwa einen eindeutigen Event Identifier (vgl. eventId), und die Event-Zeit. Letztere markiert den Verarbeitungszeitpunkt der Anfrage an den Registrierungsdienst.
Listing 1: Basistyp EmployeeEvent
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type")
@JsonSubTypes({
@JsonSubTypes.Type(value = EmployeeCreatedEvent.class, name = "employee-created-event"),
@JsonSubTypes.Type(value = EmployeeDeletedEvent.class, name = "employee-deleted-event")
})
abstract public class EmployeeEvent {
private final String eventId;
private final long eventTime;
public EmployeeEvent() {
this(generateEventId(), now());
}
@JsonCreator
public EmployeeEvent(@JsonProperty("eventId") final String eventId,
@JsonProperty("eventTime") final long eventTime) {
this.eventId = eventId;
this.eventTime = eventTime;
}
public String getEventId() {
return eventId;
}
public long getEventTime() {
return eventTime;
}
private static String generateEventId() {
return UUID.randomUUID().toString();
}
private static long now() {
return Instant.now(Clock.systemUTC()).toEpochMilli();
}
}
Der Registrierungsservice emittiert ein Event, wenn eine neue Mitarbeiterin oder ein neuer Mitarbeiter registriert wurde oder das Unternehmen verlassen hat. Die Abbildungen dieser Events zeigen Listing 2 und 3. Auf die Darstellung der Getter verzichten wir aus Platzgründen.
Listing 2: Event zur Neuregistratur einer Mitarbeiterin/eines Mitarbeiters
public class EmployeeCreatedEvent extends EmployeeEvent {
private final String employeeId;
private final String givenName;
private final String lastName;
private final String email;
private final String departmentName;
private final String departmentDescription;
private final String company;
@JsonCreator
public EmployeeCreatedEvent(@JsonProperty("employeeId") String employeeId,
@JsonProperty("givenName") String givenName,
@JsonProperty("lastName") String lastName,
@JsonProperty("email") String email,
@JsonProperty("departmentName") String departmentName,
@JsonProperty("departmentDescription") String departmentDescription,
@JsonProperty("company") String company) {
this.employeeId = employeeId;
this.givenName = givenName;
this.lastName = lastName;
this.email = email;
this.departmentName = departmentName;
this.departmentDescription = departmentDescription;
this.company = company;
}
}
Listing 3: Event zum Austritt einer Mitarbeiterin/eines Mitarbeiters
public class EmployeeDeletedEvent extends EmployeeEvent {
private final String employeeId;
@JsonCreator
public EmployeeDeletedEvent(@JsonProperty("employeeId") String employeeId) {
this.employeeId = employeeId;
}
}
Quarkus arbeitet mit diversen Standards zusammen, wie beispielsweise JMS. Nun ist das API allerdings schon etwas in die Jahre gekommen; die Welt der Messaging-Systeme hat sich weiterentwickelt und neue Konzepte hervorgebracht, die kaum oder gar nicht mit JMS abzubilden sind. Aus diesem Grund lösen wir uns von den klassischen Integrationslösungen und wagen uns zu modernen Ansätzen vor, die die Stärken aktueller Messaging-Systeme ausspielen.
Einen solchen Ansatz findet man im MicroProfile Reactive Messaging der Eclipse Foundation [2]. Quarkus implementiert Version 2.x dieser Spezifikation über das Modul quarkus-smallrye-reactive-messaging. Durch Reactive Messaging kann unsere Anwendung Nachrichten empfangen, verarbeiten und versenden – und zwar protokollagnostisch. Die Umsetzung der Spezifikation in Quarkus integriert sich dabei nahtlos in das bekannte Programmiermodell: Man arbeitet mit einem annotationsgestützten Mechanismus, um Konzepte des Reactive Messaging abzubilden.
Mit Reactive Messaging tauschen sich unsere Anwendungen über nachrichtenorientierte Kanäle (Channels) aus (Abb. 2). Eine Nachricht – oder Message – bildet Reactive Messaging über den statischen Typ org.eclipse.microprofile.reactive.messaging.Message<T> ab. Eine Message ist ein Wrapper um Nutzdaten des parametrierten Typs T. Zusätzlich zu diesen Nutzdaten kann eine Message Metadaten beinhalten und bietet Methoden an, um den Empfang einer Nachricht zu bestätigen oder auf Fehler zu reagieren.
Reactive Messaging kann Channels entweder in-Memory realisieren oder über externe Ressourcen anbinden. In letzterem Fall sorgt ein Konnektor für die technische Abbildung auf das unterliegende Messaging-System. Quarkus kümmert sich um die Integration und konstruiert die zugehörigen Datenströme, entlang derer Messages fließen können.
Reactive Messaging bietet zwei Vorgehensweisen an, um Messages zu erzeugen. Zunächst betrachten wir Emitter. Ein Emitter ist ein Objekt, das an einen Channel gekoppelt ist und Nachrichten in diesen Channel publizieren kann. Innerhalb von Quarkus kann man entweder mit dem statischen Typ Emitter<T> arbeiten, wobei T den Typ der Nutzdaten darstellt, oder mit MutinyEmitter<T>. Beide Varianten bieten ein imperatives API an, allerdings wartet MutinyEmitter mit zusätzlichen Methoden auf. Zugriff auf einen Emitter erhält man über die @Channel-Annotation. Quarkus betrachtet ein mit dieser Annotation beschriebenes Attribut der Bean oder Argument des Konstruktors als Injektionspunkt, sodass eine zusätzliche Auszeichnung über @Inject nicht erforderlich ist (Listing 4).
Listing 4: Publizieren von Nutzdaten über MutinyEmitter
@ApplicationScoped
public ...