Lessons Learned

RabbitMQ und Drupal im Einsatz
Kommentare

Im Unternehmen sind Content-Management-Systeme wie Drupal selten allein – CRM, Shopsysteme oder andere Anwendungen gehören zum Alltag einfach dazu. Doch wie bringt man die Anwendungen dazu, reibungslos miteinander zu agieren?

In heutigen Unternehmen sind die Arbeitsprozesse und daran beteiligte Daten fast immer auf unterschiedliche Anwendungen verteilt. In der täglichen Arbeit hat praktisch jeder mit verschiedenen Systemen zu tun: Es werden CMS, CRM, Warenwirtschaftssysteme, Dokumenten-Management-Systeme und viele weitere eingesetzt. Jedes hat seine Daseinsberechtigung, aber dennoch wäre es meist schöner, wenn die einzelnen Anwendungen voneinander Bescheid wüssten und damit besser zusammenarbeiten könnten. Dann könnte man sich eine Menge Arbeit sparen – manuelles Synchronisieren von Daten oder das händische Verarbeiten von Bestellungsprozessen in mehreren involvierten Systemen würden dann der Vergangenheit angehören. Ein Message Broker wie RabbitMQ kann für genau solche Anwendungsszenarien sehr gut eingesetzt werden. Er ermöglicht die Interaktion zwischen verschiedenen IT-Systemen, ganz egal, ob es sich dabei um eine Java-EE-Lösung, eine Ruby-on-Rails-Website oder das CMS Drupal handelt. Zur Kommunikation untereinander werden Nachrichten versandt, vom Message Broker an beliebige Nachbarsysteme weitergeleitet und dort empfangen und verarbeitet. Die Besonderheit dabei ist die Art und Weise, wie der Versand und die Verteilung der Nachrichten beim so genannten Routing geschieht: Nachrichten werden in Warteschlangen (Queues) gehalten und ggf. später zugestellt, falls eine Anwendung offline ist. Außerdem bestimmt nicht der Sender den Empfänger – sondern der Broker anhand von vorher definierten Regeln. Damit entsteht ein verlässlicher und flexibler Kommunikationskanal für die Anwendungen. Darüber hinaus können Message Broker auch gezielt eingesetzt werden, um hoch skalierbare Dienste zu realisieren oder Lastspitzen abzufedern: Nachrichten werden dann gleichmäßig auf mehrere konsumierende Anwendungen verteilt, sodass ein Load Balancing stattfindet. Gleichzeitig erlauben es die Queues den Anwendungen, die Verarbeitungsgeschwindigkeit selbst zu definieren. So kann eine Anwendung stabil weiterlaufen, während der Message Broker den Sturm einkommender Anfragen puffert.

Nachrichten im Fluss

Nachrichten sind das zentrale Element des Message Brokers – sie bestehen aus einem Kopf (Header) mit Schlüssel-Wert-Paaren und einem beliebigem Inhalt in einem zumeist technologieneutralen Format wie XML oder JSON. Der Header enthält Metadaten wie den Content-Type application/xml und weitere Felder, die das Routing beeinflussen. Das ist bei RabbitMQ bzw. dem zugrunde liegenden Industriestandard AMQP einmal der Exchange sowie der Routing-Key. Ersteren kann man sich als eine Vermittlungsstelle vorstellen, der Zweiten ist eine speziell formatierte Zeichenkette, die zum Filtern von Nachrichten verwendet werden kann. Ein Beispiel: Ein Mitarbeiter ändert die Verfügbarkeit eines Produkts in einem zentralen Warenwirtschaftssystem – davon sollen alle angebundenen Shopsysteme in Kenntnis gesetzt werden. Also wird automatisiert eine Nachricht mit einem Routing Key wie product.update.lagerX.kategorieY an den Exchange Warenwirtschaft versandt. Dieser Exchange leitet die Nachricht nun an die entsprechenden Queues weiter (Abb. 1) – Details dazu finden Sie im Kasten „Routing im Detail“.

Abb. 1: Nachrichten der Warenwirtschaft werden vom Message Broker weitergeleitet

Abb. 1: Nachrichten der Warenwirtschaft werden vom Message Broker weitergeleitet

Damit wird jedem Shopsystem eine eigene Kopie der Nachricht zugestellt. Dort wird die Nachricht empfangen und erst aus der Queue des Brokers entfernt, wenn sie erfolgreich verarbeitet wurde. Diese Architektur beschert uns einige positive Eigenschaften: Kommt ein weiteres Shopsystem in Zukunft hinzu, so ist es ausreichend, das Routing anzupassen – die Warenwirtschaft an sich bleibt unangetastet. Außerdem kann jederzeit ein Shopsystem zur Wartung abgeschaltet werden. Die Nachrichten verbleiben dann einfach im Broker und werden später zugestellt. Die Message-Broker-Infrastruktur garantiert uns also, dass die Nachrichten zugestellt werden – wenn auch nicht unbedingt sofort.

Routing im Detail Nachrichten werden an einen Exchange versendet, aber von Queues empfangen. Die Verbindung zwischen beiden kommt durch Binding-Regeln zustande, die eine Queue an eine bestimmte Exchange binden. Dies geschieht abhängig vom Typ der Exchange: • Fanout: Jede Nachricht wird an jede angebundene Queue weitergeleitet (Routing Key wird ignoriert) • Direct: Nachrichten werden anhand des Routing Keys gefiltert, es erfolgt ein einfacher String-Vergleich (pro Routing Key sind mehrere Queues möglich) • Topic: Nachrichten werden wie bei Direct gefiltert, jedoch ist ein komplexeres Pattern Matching möglich: Wörter werden durch einen Punkt getrennt, das Raute-Zeichen steht für ein beliebiges Wort, der Asterisk-Stern für Null oder mehr Wörter, Beispiel: produkt.*.lagerX.# • Headers: Filtern der Nachrichten nach beliebigen Schlüssel-Wert-Paaren aus dem Nachrichtenkopf Ausführlichere Beschreibungen der AMQP-Konzepte sind auf der Seite von RabbitMQ zu finden.

Entwurf asynchroner Anwendungen

Es ist hilfreich, sich den Nachrichtenaustausch als eine Art der eventbasierte Programmierung vorzustellen – nur zusätzlich über Rechnergrenzen hinweg. Dieses Architekturparadigma wird tatsächlich immer beliebter, da es automatisch zur Entkopplung einzelner Komponenten führt. Eines der besten Beispiele dafür ist das Symfony2 Framework, das aus vielen kleinen Teilen besteht, die untereinander z. B. mit Events kommunizieren. Die konkrete Programmierung mit einem Message Broker unterscheidet sich stark von der Arbeit mit klassischen Web Services. Anfragen werden nicht synchron sondern immer asynchron ausgeführt. Das kann Konsequenzen bis hinauf zur Weboberfläche haben, da man nicht auf die Antwort eines Ereignisses warten kann. Vielleicht ist das zu kontaktierende System ja gerade überlastet oder wird gewartet? In der synchronen Welt heißt das dann meist, dass der Nutzer eine Fehlermeldung zu Gesicht bekommt. Bei zunehmender Anzahl an kommunizierenden Systemen wird die Wahrscheinlichkeit eines Fehlers außerdem immer größer, denn das Blackout eines Systems kann durch die direkte Kopplung der synchronen Aufrufe alle anderen beeinflussen. In einer asynchronen Nachrichtenwelt wird man gezwungen, diese Fälle von Anfang an zu bedenken: Die Anwendung wird auch auf die unschönen Situationen vorbereitet und kann dann immer noch problemlos weiter arbeiten – und insbesondere beeinflusst die Störung einer einzigen Anwendung nicht auch die anderen. Oftmals ist es dann sinnvoll, die Oberfläche für die Nutzer so anzupassen, dass diese nicht immer mit einer sofortigen Antwort rechnen. Im Zweifelsfall ist dann die Benachrichtigung des Nutzers per E-Mail ein zuverlässigerer Weg als die Anzeige im Webbrowser.

Robuste Nachrichtenempfänger

Das Routing des Brokers und die Nutzung der Queues hat Auswirkungen auf die Verarbeitung der Nachrichten in der empfangenden Anwendung: Nachrichten können sich überholen und sogar mehrfach eintreffen. Außerdem muss man immer damit rechnen, dass andere Anwendungen ungültige Nachrichten produzieren, die das eigene System nicht verarbeiten kann. Das gilt insbesondere dann, wenn der Broker nicht nur intern, sondern auch übergreifend zur Enterprise-Integration eingesetzt wird. Abhängig von der Art der Anwendung können, aber müssen nicht, alle hier beschriebenen Probleme auftreten. Die betroffenen Systeme können diese Schwierigkeiten aber leicht wegstecken, wenn sie ein paar einfache Regeln beachten:

  1. Nachrichten idempotent verarbeiten, das heißt: Wurde ein Ereignis schon mal verarbeitet, keinen Fehler auslösen, sondern die Nachricht stillschweigend akzeptieren – analog zu einem HTTP PUT- oder DELETE-Aufruf.
  2. Nachrichten ignorieren, wenn sie veraltet sind.
  3. Nachrichten ignorieren, wenn sie ungültig sind.
  4. „Let it crash“: Verarbeitung bei schweren Fehlern abbrechen und Prozess eher neustarten als versuchen fortzufahren.

Regel Nummer eins löst das Problem der doppelten Nachrichten bzw. sorgt dafür, dass die doppelte Behandlung einer Nachricht keine weiteren Effekte hat. Wurde beispielsweise im Rahmen einer Synchronisierung ein Datensatz bereits gelöscht, so dürfte die zweite Aufforderung zum Löschen nicht zum Fehler führen. Nachrichtenduplikate lassen sich teilweise auch durch die Verwendung von Transaktionen verhindern: RabbitMQ sichert dann zu, dass eine neue Nachricht nur dann produziert wird, wenn die aktuell empfangende vollständig verarbeitet wurde. Leider bleiben damit aber alle anderen Systeme – wie eine MySQL-Datenbank – außer Acht, denn sie haben eigene Transaktionen, die nicht mit denen des Brokers koordiniert werden. Gesamtheitliche Lösungen existieren zwar (außerhalb der PHP-Welt), doch sind sie wesentlich unperformanter, da sie auf einem aufwendigen Transaktionsprotokoll aufsetzen und mit Timeouts arbeiten. Die idempotente Implementierung ist da eine einfachere und elegante Lösung. Dass veraltete Nachrichten keine neuen Daten überschreiben, ist Aufgabe der zweiten Regel. Dazu ist es notwendig, auf Anwendungsebene ein eigenes Kriterium zu finden, mit dem man die Reihenfolge der Nachrichten bestimmen kann. Im einfachsten Fall ist das ein Timestamp der möglichst gut synchronisierten Serverzeit. Die konsumierende Anwendung speichert irgendwo persistent dieses Kriterium und vergleicht es vor dem Verarbeiten einer Nachricht: Ist bereits eine neuere Version vorhanden, wird die Nachricht ignoriert. Ein typischer Anwendungsfall ist das Abgleichen von Nutzerdaten über mehrere Systeme hinweg – hier könnte man pro Nutzer das Datum der letzten Nachricht bzw. Änderung speichern. Regel Nummer drei stellt sicher, dass die Queues nicht mit ungültigen Nachrichten volllaufen, die immer und immer wieder zugestellt werden, weil die Verarbeitung fehlschlägt. Hier ist der Entwickler gefragt – er muss den Nachrichteninhalt noch vor der Verarbeitung genau überprüfen. Denn ist bereits ein Fehler aufgetreten, ist meist nur noch schwer erkennbar, ob es an der Nachricht oder an einem temporären Fehler in der Anwendung lag. Bei Letzterem sollte die Nachrichtenverarbeitung nicht bestätigt werden, damit die Nachricht später automatisch ein weiteres Mal zugestellt wird. Die Anwendung erhält dann eine weitere Chance, die Nachricht zu verarbeiten. Die Quelle – Nachricht oder Anwendung – eines Fehlers zu finden, ist also sehr wichtig, da sich der Umgang mit der entsprechenden Nachricht je nach Fehlerquelle grundlegend voneinander unterscheidet. Die letzte Regel – „Let it crash“ – ist wichtig, da der Empfang der Nachrichten nicht als Teil des normalen HTTP-Requests geschehen kann. Denn für die zeitnahe Verarbeitung von Nachrichten ist ein langlaufender Prozess notwendig, der ständig mit RabbitMQ in Kontakt steht. Das ist in PHP inzwischen kein Problem mehr, da es seit PHP 5.3 auch eine funktionierende Garbage Collection gibt. So wächst der Speicherverbrauch im Laufe der Zeit nicht ins Unermessliche bzw. an die Grenzen des memory_limit. Dennoch sollte man immer mit fatalen Fehlern rechnen, die nicht aufzulösen sind und den Prozess aus dem Takt bringen. Daher ist es oft die beste und vor allem einfachere Strategie, den Prozess zu überwachen und ggf. automatisch neuzustarten, als zu viel Aufwand in eine komplexe Ausnahmebehandlung zu stecken. Eine einfache, aber sonst eher ungewöhnliche Fehlersituation für PHP-Webframeworks ist z. B. der Timeout der MySQL-Verbindung aufgrund von längerer Inaktivität. Was bei Sekunden oder besser Millisekunden andauernden Requests im Apache unmöglich erscheint, kann jetzt nachts schon mal auftreten und dann den Prozess in die Knie zwingen.

RabbitMQ mit Drupal verwenden

Damit die Konfiguration und Entwicklung mit RabbitMQ einfacher und angenehmer wird, hat die sirup° Agentur für Neue Medien das Drupal-Modul „Message Broker“ entwickelt: Dem Programmierer wird eine einfache Schnittstelle zur Verfügung gestellt, mit der Nachrichten gesendet und empfangen werden können. Derzeit existieren eine AMQP- und eine Simulations-Implementierung; im Prinzip wäre es aber auch möglich, andere Protokolle wie STOMP anzubinden, da das API generisch aufgebaut ist. Das Versenden einer Nachricht wird exemplarisch in Listing 1 dargestellt.

// Implementierung des MessageBrokerInterface holen
$broker = message_broker_get();
$message_body = json_encode(array('foo' => 'bar'));

// Nachricht an Exchange Warenwirtschaft senden ...
$broker->sendMessage($message_body, 'Warenwirtschaft', array(
  'routing_key' => 'product.update.lagerX.kategorieY', 
  'content_type' => 'application/json'));

Eine Kernidee des Moduls ist die deklarative Beschreibung des Routings in einer zentralen JSON-Datei (Listing 2), die für alle beteiligten Systeme erreichbar ist. Damit ist sichergestellt, dass alle Queues, Exchanges und Bindings rechtzeitig im Broker angelegt werden und zusätzlich auch noch konsistent bleiben. Letzteres ist sonst praktisch unmöglich, da Systeme im Betrieb nicht gleichzeitig aktualisiert werden können. Das führt dazu, dass verschiedene Konsumenten unterschiedliche Routing-Topologien erwarten und auch versuchen zu definieren, was dann unweigerlich zu einer Exception führt.

{
  "exchanges": [{
    "name": "Warenwirtschaft",
    "type": "topic",
    "durable": true,
    "auto_delete": false,
  }],
  "queues": [{
    "name": "Shopsystem1",
    "durable": true,
    "exclusive": true,
    "auto_delete": false,
    "bindings": [{
      "exchange": "Warenwirtschaft",
      "routing_key": "produkt.*.lagerX.#"
    }]
  }]
}

Der Nachrichtenempfang wird Drupal-typisch per Hook vorbereitet und stützt sich dabei auf vorher beschriebene Queues aus der JSON-Datei. Wie in Listing 3 ersichtlich, definiert man für jede abzurufende Queue einen Callback, der dann pro Nachricht einmal aufgerufen wird. Zu beachten ist, dass der übergebene Nachrichteninhalt erst noch dekodiert werden muss, z. B. über json_decode. Der zweite Parameter ist eine Funktion, die aufgerufen wird, um die vollständige Verarbeitung der Nachricht zu signalisieren.

// Implementierung des hook_message_broker_consumers
function mymodule_message_broker_consumers($self_name) {
  $consumers = array();

  // Consumer für Queue Shopsystem1 registrieren
  $consumers['shop-product-updates'] = array(
    'queue' => 'Shopsystem1', 
    'callback' => 'mymodule_handle_product_updates');

  return $consumers;
}

// Hier findet die Nachrichtenverarbeitung statt
function mymodule_handle_product_updates($message_body, $ack) {
  $messageData = json_decode($message_body);
  // Nachricht validieren, ggf. InvalidMessageException werfen
  // Nachricht dann verarbeiten und am Ende bestätigen:
  $ack();
}

Weiterhin stellt das Modul so viel Unterstützung wie möglich zur Verfügung, um die Realisierung der obigen Regeln zu vereinfachen. Für die zweite Regel existiert beispielsweise ein eigener Exception-Typ namens InvalidMessageException. Eine solche Ausnahme loggt die ungültigen Werte, bestätigt aber dennoch die Verarbeitung der Nachricht gegenüber RabbitMQ und macht dann mit der nächsten weiter. Außerdem kann ein fataler Fehler über die spezielle CriticalErrorException angezeigt werden. Im Gegensatz zu allen anderen Ausnahmen wird damit nicht nur die Behandlung der aktuellen Nachricht abgebrochen, sondern der komplette Prozess heruntergefahren. Das ermöglicht zusammen mit einem externen Überwachungsprozess das gezielte Neustarten der kompletten Verarbeitung – sehr nützlich bei oben angesprochenem MySQL Timeout.

Lokales Testen und Entwickeln

Das Entwickeln von verteilten Systemen gestaltet sich immer etwas schwieriger als die Umsetzung von einfachen lokalen Anwendungen. Beim Einsatz eines Message Brokers ist das nicht viel anders: RabbitMQ sollte lokal installiert sein, damit man die eigene Software überhaupt ausführen und testen kann. Die sirup° Agentur für Neue Medien hat darum eine Dummy-Implementierung des API entwickelt, mit der der Nachrichtenaustausch nach AMQP-Standard auch ohne einen eigenen Message Broker simuliert werden kann. Dabei wird der vollständige Nachrichtenfluss synchron im aktuellen Drupal Request ausgeführt, sodass man mit dem gewohnten Komfort entwickeln und testen kann: Das Zusammenspiel der einzelnen Anwendungsteile kann so viel einfacher getestet und debuggt werden. Dieser Ansatz hat natürlich seine Grenzen: Es werden nur die Nachrichten verarbeitet, die an dasselbe Drupal-System gesendet werden. Zudem ist die Ausführung synchron, was die Reihenfolge der Nachrichtenverarbeitung beeinflussen kann – aber das macht robusten Anwendungen sowieso nichts aus (siehe oben). Die erste Einschränkung umgeht man in der Praxis mit einfachen Mocks (Atrappen) der externen Systeme – für Integrationstests sowieso unentbehrlich. Die Integration dieser gemockten Systeme gestaltet sich wie folgt: Für jedes externe System wird ein eigenes Drupal-Modul entwickelt, was sich in die Simulationsumgebung über die Hooks (Listing 3) einklinkt. So kann dann doch das komplette System lokal entwickelt und getestet werden.  Das vermeidet das aufwendige Aufsetzen von komplexen Entwicklungs- und Testumgebungen und hilft nebenbei auch noch, die Performance des Entwicklungsrechners zu verbessern.

Lang lebe Drupal

Um die Nachrichtenverarbeitung zeitnah zu ermöglichen, ist ein langlaufender PHP-Prozess notwendig. Im Kontext von Drupal ist das Kommandozeilentool Drush die perfekte Lösung, da es sich um das Bootstrapping des CMS und die Integration in die Konsolenwelt kümmert. Das Message-Broker-Modul klinkt sich dort ein und kann über den Befehl drush consume-amqp-messages gestartet werden. Es verbindet sich dann mit RabbitMQ und verarbeitet Nachrichten bis in alle Ewigkeit. Es ist auch möglich, (vorher per Hook definierte) Consumer einzeln oder mehrfach zu starten und dabei weitere Parameter anzugeben, die das Zusammenspiel mit RabbitMQ optimieren. Beim Ausführen des Drupal-Codes wird man allerdings ab und zu bemerken, dass das CMS nicht unbedingt für eine solche Situation konzipiert wurde. Beispielsweise nutzt die watchdog-Funktion beim Logging eine einmal initialisierte Konstante für die aktuelle Uhrzeit. Alle Log-Einträge des Drush-Prozesses erscheinen daher nachher so, als wären sie zur Startzeit des Prozesses erfolgt. Das ist ärgerlich, aber es lohnt sich auch aus einem anderem Grund, das Logging von Drupal etwas anzupassen: Denn mit den Standardeinstellungen wird in die Datenbank geschrieben. Wenn dann die MySQL-Verbindung mal unerwartet geschlossen wird, erfährt man von nichts … Das Loggen in eine Datei ist daher um einiges zuverlässiger – z. B. über das syslog-Modul von Drupal. Eine weitere Eigenart einiger Methoden wie node_load ist die Verwendung eines internen Caches in Form von statischen Variablen. Das soll die erneute Ausführung von MySQL-Querys verhindern – ist hier aber gefährlich, wenn mehrere konkurrierende Drush-Prozesse eingesetzt werden. Es kann dann leicht passieren, dass ihr Code mit einem inzwischen veralteten Node-Objekt weiterarbeitet und diesen dann in die Datenbank (über)schreibt: Besser ist es, node_load und ähnliche Funktionen mit dem Parameter reset=TRUE aufzurufen, damit der Cache deaktiviert wird.

Legacy-Anwendungen

Nicht jede Software kann direkt mit einer Schnittstelle zum Message Broker ausgerüstet werden. Fremde Software oder Legacy-Anwendungen müssen über einen Messaging-Gateway angebunden werden. Dieser übersetzt die Welt der Nachrichten in die jeweilige Sprache der Anwendung. Beim Verarbeiten der Nachrichten wird also nur weiter delegiert, am besten an eine stabile Schnittstelle, wie sie in vielen Produkten zu finden ist.

Abb. 2: Vorgeschalteter Gateway setzt Nachrichten in API-Aufrufe um

Abb. 2: Vorgeschalteter Gateway setzt Nachrichten in API-Aufrufe um

Je nach Umfang und (Un)vollständigkeit der Schnittstelle kann dann auch der Gateway komplexer werden. Oftmals ist es notwendig, die Produkt-APIs regelmäßig abzufragen, um auf neue Ereignisse zu reagieren. Ein entsprechender Cronjob muss dann dafür sorgen, dass in einem Intervall von z. B. 10 Minuten neue Bestellungen abgefragt und die passenden Nachrichten abgeschickt werden. Zum Realisieren der zweiten Regel muss außerdem das Datum der letzten Änderung für alle Datensätze gesichert werden. Wenn die Schnittstelle das nicht hergibt, benötigt der Gateway selbst eine eigene Datenbank.

Fazit

Der Umstieg auf den asynchronen Nachrichtenversand bringt Veränderungen mit sich, die neue Herangehensweisen und Lösungen erfordern: Nicht nur beim Systementwurf, sondern auch bei der Implementierung ist ein Umdenken notwendig, damit sich am Ende die Vorteile des Message Brokers in der fertigen Software wiederfinden. Qualitätsanforderungen wie Ausfallsicherheit oder Skalierbarkeit werden dank des Message Brokers nun leicht umsetzbar. Die zugrunde liegende Denkweise der ereignisorientierten Programmierung hilft außerdem bei der Konstruktion einer geeigneten Architektur. Denn die Auftrennung in Nachrichtenproduzenten und -konsumenten zwingt zur Entkopplung und Aufsplittung in einzelne eigenständige Programmteile: die Basis für eine langfristig wartbare Software. Im Unternehmenskontext gewinnt man durch RabbitMQ eine flexible Vermittlungsstelle, die alle IT-Systeme im Unternehmen verbindet und damit die Wertschöpfungskette zusammenhält. Das zentral definierte Routing ermöglicht dabei das Verändern und Hinzufügen von Systemen. Die existierende Software muss dafür nicht angepasst werden, da allein der Message Broker den Fluss der Nachrichten steuert. Viele weitere Details und Patterns zum Thema findet der interessierte Leser im Buch „Enterprise Integration Patterns“.

Aufmacherbild: von istock / Uhrheberrech: latsun

Unsere Redaktion empfiehlt:

Relevante Beiträge

Meinungen zu diesem Beitrag

X
- Gib Deinen Standort ein -
- or -