Skalierung einer gewachsenen Drei-Schichten-Architektur

MacGyver Scaling

MacGyver Scaling

Skalierung einer gewachsenen Drei-Schichten-Architektur

MacGyver Scaling


Big Data gehört zu den heißen Eisen der IT und ist längst nicht mehr ausschließlich Thema für große Unternehmen. Wir sammeln immer mehr Daten, um immer komplexere Analyseabfragen fahren zu können. Archiviert oder gar gelöscht wird nur noch selten, da Speicherplatz und Rechenleistung nicht mehr als entscheidende Kostenfaktoren wahrgenommen werden. Ein wichtiger Aspekt im Umgang mit großen Datenmengen ist die Skalierbarkeit.

Neben den schier unbegrenzten Möglichkeiten werden jedoch die Schattenseiten schnell deutlich: Das Erfassen, Verwalten und Auswerten großer Nutzerzahlen und Datenmengen stellt hohe Anforderungen an Hardware und Software, insbesondere aber an die gewählte Lösungsarchitektur. Die Frage nach der Skalierbarkeit der unterliegenden IT-Systeme stellt eine der größten Herausforderungen von Big Data dar. Das ultimative Ziel: eine lineare Skalierbarkeit, die jederzeit eine Anpassung an die aktuelle Situation und Größe eines Unternehmens ermöglicht. Eine Anforderung, die für gewachsene Systeme auf Basis klassischer Datenbanken nur mit erheblichem Aufwand und schwierig einzuschätzendem Risiko nachträglich erreicht werden kann.

Mittlerweile existiert eine Vielzahl komplexer Produkte, die als Panacea aller Skalierungsprobleme angepriesen werden. Diese finden sich sowohl im kommerziellen als auch im Open-Source-Bereich oft unter dem Namen NoSQL. Dass man aber auch wie MacGuyver mit einem Schweizer Taschenmesser und einer Kleberolle bzw. in unserem Fall mit reinen Java-Bordmitteln und einer handelsüblichen SQL-Datenbank eine vorhandene Architektur zu einer Big-Data-fähigen Lösung erweitern kann, wird im Folgenden gezeigt.

Ausgangsarchitektur und gewählter Ansatz

Ausgangspunkt unserer Betrachtungen bildet ein gewachsenes Softwaresystem mit einer relationalen Persistenzschicht. Die konkrete Ausprägung (Technologieprojektion) der Präsentations- und Logikschicht spielt keine Rolle. Beispielhaft gehen wir von einem Mix aus Rich- und Webclients, eingesetzt wahlweise auf einem Java-Enterprise-Server oder z. B. einer Spring/Tomcat-Kombination, aus.

Als Blueprint für den Umbau der bestehenden Architektur dient ein Ansatz, der Sharding genannt wird. Dabei wird die Datenmenge über mehrere so genannte Shards (engl. (Glas-)Splitter) bzw. Partitionen aufgeteilt. Die jeweiligen Shards haben die gleiche Datenstruktur, beinhalten aber unterschiedliche Daten. Technisch gesehen ist jeder Shard unabhängig von den anderen, logisch gesehen bilden sie aber eine Einheit (Abb. 1) und der Zugriff darauf sollte sich aus Sicht der Applikation / Applikationsentwicklung möglichst transparent gestalten. Der Ansatz kommt vor allem in Szenarien zum Einsatz, bei denen mit sehr hohem Datenaufkommen bei gleichzeitig hohen Anforderungen an den erwarteten Datendurchsatz gerechnet wird.

louis_abb1.tif_fmt1.jpgAbb. 1: Aufteilung einer monolithischen Datenstruktur in Shards

Die Aufteilung selbst kann dabei anhand verschiedener Strategien erfolgen. Zu den wichtigsten Vertretern zählen hier Hash-basierte sowie Range- bzw. List-­basierte Verfahren.

Hash-basierte Aufteilung

Bei Hash-basierten Verfahren wird ein Hash (Streuwert) eines Datensatzes bzw. Datensatzattributs berechnet und jedem Shard eine Menge von Hash-Werten zugeordnet, für die er zuständig ist. Verwendet man gute Hash-Funktionen, sind diese Werte schnell zu berechnen, ordnen jedem Datensatz eindeutig einen Shard zu und verteilen die Datensätze gleichmäßig auf den zur Verfügung stehenden „Shardpool“.

Range- und List-basierte Aufteilung

Ein typischer Kandidat für eine Range-basierte Aufteilung ist die zeitraumbezogene Aufteilung. In Abbildung 2 ist sie exemplarisch dargestellt. Hier werden Bestellungen nach Jahren aufgeteilt und alle Bestellungen eines Jahrgangs werden eigens gehalten.

louis_abb2.tif_fmt1.jpgAbb. 2: Zeitraumbezogene Aufteilung der Datensätze

Eine mandantenbezogene Aufteilung ist ein Beispiel für eine List-basierte Aufteilung. Abbildung 3 veranschaulicht diese Strategie anhand der Buchungen eines oder mehrerer Mandanten, die getrennt von anderen Mandanten gehalten werden.

louis_abb3.tif_fmt1.jpgAbb. 3: Mandantenbezogene Aufteilung der Datensätze

In unserem Fall haben wir uns für den mandantenbezogenen Ansatz entschieden, da die Daten schon entsprechend geclustert waren.

Weiterhin war ein wesentliches Kriterium bei unseren Überlegungen die Skalierung der bestehenden Architektur mit möglichst minimalinvasiven Maßnahmen. Das Sharding findet daher zwischen Präsentations- und Logikschicht statt und die einzelnen Shards bestehen somit aus Kombinationen von Applikationsserver- und Datenbankinstanzen (Abb. 4).

louis_abb4.tif_fmt1.jpgAbb. 4: Sharding-Logik zwischen Präsentations- und Logikschicht

Für diesen Ansatz haben wir uns vor allem wegen folgender Aspekte entschieden:

  • Sowohl die zum Einsatz kommende Datenbank als auch der Applikationsserver benötigen keine Shard­ing-Fähigkeit. Somit lässt sich auch eine bestehende Architektur weiter verwenden bzw. zu der hier vorgestellten Architektur weiterentwickeln.

  • Damit kann sich nicht nur die Persistenzschicht sondern auch die Logikschicht auf ihre eigenen Daten konzentrieren und diese wesentlich effizienter/platzsparender cachen, weil der Cache nur einen Teil der Daten enthält und nicht synchronisiert werden muss.

  • Die Skalierbarkeit des Systems lässt sich auf einfache Weise durch das Erstellen weiterer Kopien des Datenbank-/Applikationsserver-Set-ups erreichen.

  • Da man sich zwischen der Präsentations- und Logikschicht im gewohnten Programmierkontext befindet, fällt es leicht, dort die Sharding-Logik anwendungsbezogen mit der gesamten Mächtigkeit von Java zu implementieren.

Der Ansatz ist flexibel genug, um dann nach und nach auf die identifizierten verbliebenen Schwachstellen reagieren zu können. Tabelle 1 zeigt die wichtigsten Pros und Kontras des Ansatzes noch einmal im Überblick.

PRO

KONTRA

Bestehende Architektur kann (inkl. Infrastruktur) erhalten bleiben

Annähernd lineare Skalierung

Massendaten besser handhabbar

Kein Totalausfall bei Verlust eines Knotens

Geografische Verteilung von Shards möglich

Compiletime-Sicherheit der Mapper und Reducer

Höhere Programmierkomplexität durch totale Unabhängigkeit der Shards

Keine Unterstützung von Replikation und Failover out of the box

(Aktuell noch) händische Rebalancierung notwendig

(Aktuell noch) Verhältnis von Logikzu Persistenzschicht 1:1

Tabelle 1: Ausgewählte Tradeoffs des gewählten Architekturansatzes

Umsetzung mittels „MultiServerProxy“

Für den Client verlief die Umstellung auf eine geshardete Serverlandschaft transparent, da alle Aufrufe über einen neu dazwischen geschalteten Proxy abgefangen werden und ausschließlich diesen Sharding-spezifischen Code enthält. Der so genannte MultiServerProxy (kurz MSP, siehe Listing 1) erfüllt folgende Aufgaben:

  • Bestimmen der oder des richtigen Shard-Nodes für aufgerufene Methoden und ihre Parameter (map)

  • Paralleles Ausführen der Remote-Aufrufe und Sammeln der Rückgabewerte

Zusammenführen der Rückgabewerte zu einem einzigen Rückgabewert, falls mehrere Nodes angesprochen wurden (reduce)

Listing 1

public class MultiServerProxy implements InvocationHandler {
  Class<?> interface, mapperClass, reducerClass;
 
  public Object invoke (final Object proxy, final Method method, final Object[] args) throws Throwable {
    // reflect mapper & reducer
    if (reducerClass == null) {
      interface = proxy.getClass().getInterfaces()[0]; // depends on architecture
      mapperClass = Class.forName(interface.getName() + "Mapper");
      reducerClass = Class.forName(interface.getName() + "Reducer");
    }
    
    // map
    AbstractMapper mapper = (AbstractMapper) mapperClass.newInstance();
    method.invoke(mapper, args); 
    List<String> shards = mapper.getMappedShards();
    
    // execute
    final Map<String, ?> results = new ConcurrentHashMap<String, Object>();
    final CountDown countdown = new CountDown(shards.size());
    for (final String shard : shards) {
      new Thread() {
        public void run() {
          Object service = getService(interface, shard); // depends on architecture
          try {
            Object result = method.invoke(service, args); 
            results.put(shard, result); 
          } catch (Exception e) {
            results.put(shard, e); 
          } finally {
            countdown.release();
          }
        }
      }.start();
    }
    countdown.acquire(); // wait for all
 
    // merge & return
    AbstractReducer = (AbstractReducer) reducerClass.newInstance();
    reducer.setResults(results); 
    Object result = method.invoke(reducer, args); 
    if (result instanceof Throwable) 
      throw (Throwable) result; 
    else
     return result; 
  }
}

Verwendung des Proxies am Beispiel „Kundenverwaltung“

Schauen wir uns nun die Schritte anhand des in Abbildung 5 ersichtlichen Beispiels einer Kundenverwaltung genauer an: Für beide Phasen (map und reduce) wird pro Interface der Logikschicht eine implementierende Klasse des Interface realisiert. Der MSP sucht diese Klassen anhand des Namens des Logikinterface, indem er per Konvention Klassen mit demselben Namen und dem Suffix Mapper bzw. Reducer versucht, zu instanziieren.

louis_abb5.tif_fmt1.jpgAbb. 5: Klassendiagramm des MSP inkl. eines Beispielinterface

Er ruft vor dem eigentlichen Aufruf der Shards zunächst die Methode des Mappers auf, beispielsweise findCustomer(100). Da der Mapper zunächst keine Ahnung hat, auf welchem Shard sich der Kunde mit der ID 100 befindet (Listing 2), selektiert er alle Shards. Der Rückgabewert der Methode wird ignoriert. Stattdessen holt sich der MSP mittels getMappedShards das Ergebnis des Mapping-Schritts, befragt jeweils in einem eigenen Thread die entsprechenden Shards und wartet, bis alle ein Ergebnis geliefert haben.

Listing 2

public class CustomerControllerMapper 
extends AbstractMapper implements CustomerController {
 
  static Map<Long, String> customerShards = new ConcurrentHashMap<Long, String>();
   
   public Customer findCustomer(long id) {
    String shard = customerShards.get(id);
    if (shard != null)
      mappedShards.add(shard);
    else
      mappedShards.addAll(ALL_SHARDS);
    return null; // will be ignored 
  }
 
  public List<Customer> findCustomers(String lastName) {
    mappedShards.addAll(ALL_SHARDS); 
    return null; // will be ignored 
  }
 
  public void saveCustomer(Customer c) {
    if (c.getId() == null) { // new customer
      c.setId(UUID.generate());
      int rand = Random.nextInt(ALL_SHARDS.size()); // depends on // sharding rule
      String shard = ALL_SHARDS.get(rand); 
      mappedShards.add(shard); 
      CustomerControllerMapper.customerShards.put(c.getId(), shard); 
    } else
      findCustomer(c.getId());
  }
  public void removeCustomer(long id) {
    findCustomer(id); 
  }
}

Die Ergebnisse stellt er als results Map dem Reducer zur Verfügung und ruft dann auch bei ihm findCustomer(100) auf (Listing 3). Dieser verwertet die Ergebnisse. Einer der Shards wird vermutlich den richtigen Kunden geliefert haben, die anderen null, weil sie den Kunden nicht kennen. Nachdem beim Mapper die ID des Kunden für immer auf die Adresse des Shards gemappt wird, wird dieser Kunde zurückgegeben und vom MSP danach dem Client als Ergebnis zurückgeliefert. Für diesen war der Aufruf also transparent.

Beim nächsten Aufruf weiß der CustomerControllerMapper schon, auf welchem Shard sich der Kunde mit der ID 100 befindet und braucht nicht mehr alle Shards zu befragen. Wie die anderen Methoden funktionieren, erfährt der geübte Java-Code-Leser ebenfalls aus Listing 2 und 3.

Listing 3

public class CustomerControllerReducer 
extends AbstractReducer implements CustomerController {
  
  public Customer findCustomer(long id) {
    for (MapEntry<String, Object> result : results) {
      if (result.getValue() instanceof Customer) {
        Customer c = (Customer) result.getValue();
        String shard = result.getKey();
        CustomerControllerMapper.customerShards.put(c.getId(), shard);
        return c;
      }
    }
    return null; // customer not found
  }
 
  public List<Customer> findCustomers(String lastName) {
    List<Customer> ret = new ArrayList<Customer>();
    for (MapEntry<String, Object> result : results) {
      List<Customer> part = (List<Customer>) result.getValue();
      String shard = result.getKey();
      for (Customer c : part) 
        CustomerControllerMapper.customerShards.put(c.getId(), shard); 
      ret.addAll(part); 
    }
    ret.sort(); // sort the same way the server did for all parts
    return ret; 
  }
}

Fazit

Wir haben einen Ansatz skizziert, mit dem man eine „typisch“ gewachsene Java-EE-Architektur zu einer skalierbaren Architektur weiterentwickeln kann, ohne neue Produkte einsetzen zu müssen. Damit entgeht man so manchen Risiken bei einer Umstellung. Der Ansatz ist aufgrund der Unabhängigkeit der Shards untereinander fast linear skalierbar. Einige Herausforderungen, die hier nicht beleuchtet wurden, sind allerdings zu bewältigen:

  • Hochverfügbarkeit, d. h. Failover und Recovery leistet der Ansatz nicht, hier setzen wir eine asynchrone Replikation der Datenbank ein.

  • Skripte zur Verwaltung vieler Shards sind zu erstellen, damit sie nicht einzeln administriert werden müssen.

Unter Umständen müssen foreign key constraints im RDBMS zugunsten des Shardings aufgegeben werden. Dies hätte allerdings auch jeder andere Ansatz zur Folge.

louis_thomas_sw.tif_fmt1.pngThomas Louis ist Gründer und seit elf Jahren Geschäftsführer der agido GmbH in Dortmund, spezialisiert auf mobile und webbasierte Applikationen sowie den Cloud-basierten Java-Enterprise-Bereich. Er beschäftigt sich dabei insbesondere mit Performance, Skalierbarkeit und Hochverfügbarkeit.

Thomas Louis

Thomas Louis ist Gründer und seit elf Jahren Geschäftsführer der agido GmbH in Dortmund, spezialisiert auf mobile und webbasierte Applikationen sowie den Cloud-basierten Java-Enterprise-Bereich. Er beschäftigt sich dabei insbesondere mit Performance, Skalierbarkeit und Hochverfügbarkeit.


Weitere Artikel zu diesem Thema