Big Data gehört zu den heißen Eisen der IT und ist längst nicht mehr ausschließlich Thema für große Unternehmen. Wir sammeln immer mehr Daten, um immer komplexere Analyseabfragen fahren zu können. Archiviert oder gar gelöscht wird nur noch selten, da Speicherplatz und Rechenleistung nicht mehr als entscheidende Kostenfaktoren wahrgenommen werden. Ein wichtiger Aspekt im Umgang mit großen Datenmengen ist die Skalierbarkeit.
Big Data gehört zu den heißen Eisen der IT, neben den schier unbegrenzten Möglichkeiten werden jedoch die Schattenseiten schnell deutlich: Das Erfassen, Verwalten und Auswerten großer Nutzerzahlen und Datenmengen stellt hohe Anforderungen an Hardware und Software, insbesondere aber an die gewählte Lösungsarchitektur. Die Frage nach der Skalierbarkeit der unterliegenden IT-Systeme stellt eine der größten Herausforderungen von Big Data dar. Das ultimative Ziel: eine lineare Skalierbarkeit, die jederzeit eine Anpassung an die aktuelle Situation und Größe eines Unternehmens ermöglicht. Eine Anforderung, die für gewachsene Systeme auf Basis klassischer Datenbanken nur mit erheblichem Aufwand und schwierig einzuschätzendem Risiko nachträglich erreicht werden kann.
Mittlerweile existiert eine Vielzahl komplexer Produkte, die als Panacea aller Skalierungsprobleme angepriesen werden. Diese finden sich sowohl im kommerziellen als auch im Open-Source-Bereich oft unter dem Namen NoSQL. Dass man aber auch wie MacGyver mit einem Schweizer Taschenmesser und einer Kleberolle bzw. in unserem Fall mit reinen Java-Bordmitteln und einer handelsüblichen SQL-Datenbank eine vorhandene Architektur zu einer Big-Data-fähigen Lösung erweitern kann, wird im Folgenden gezeigt.
Ausgangspunkt unserer Betrachtungen bildet ein gewachsenes Softwaresystem mit einer relationalen Persistenzschicht. Die konkrete Ausprägung (Technologieprojektion) der Präsentations- und Logikschicht spielt keine Rolle. Beispielhaft gehen wir von einem Mix aus Rich- und Webclients, eingesetzt wahlweise auf einem Java-Enterprise-Server oder z. B. einer Spring/Tomcat-Kombination, aus.
Als Blueprint für den Umbau der bestehenden Architektur dient ein Ansatz, der Sharding genannt wird. Dabei wird die Datenmenge über mehrere so genannte Shards (engl. (Glas-)Splitter) bzw. Partitionen aufgeteilt. Die jeweiligen Shards haben die gleiche Datenstruktur, beinhalten aber unterschiedliche Daten. Technisch gesehen ist jeder Shard unabhängig von den anderen, logisch gesehen bilden sie aber eine Einheit (Abb. 1) und der Zugriff darauf sollte sich aus Sicht der Applikation/Applikationsentwicklung möglichst transparent gestalten. Der Ansatz kommt vor allem in Szenarien zum Einsatz, bei denen mit sehr hohem Datenaufkommen bei gleichzeitig hohen Anforderungen an den erwarteten Datendurchsatz gerechnet wird.
Die Aufteilung selbst kann dabei anhand verschiedener Strategien erfolgen. Zu den wichtigsten Vertretern zählen hier Hash-basierte sowie Range- bzw. List-basierte Verfahren.
Bei Hash-basierten Verfahren wird ein Hash (Streuwert) eines Datensatzes bzw. Datensatzattributs berechnet und jedem Shard eine Menge von Hash-Werten zugeordnet, für die er zuständig ist. Verwendet man gute Hash-Funktionen, sind diese Werte schnell zu berechnen, ordnen jedem Datensatz eindeutig einen Shard zu und verteilen die Datensätze gleichmäßig auf den zur Verfügung stehenden „Shardpool“.
Ein typischer Kandidat für eine Range-basierte Aufteilung ist die zeitraumbezogene Aufteilung. In Abbildung 2 ist sie exemplarisch dargestellt. Hier werden Bestellungen nach Jahren aufgeteilt und alle Bestellungen eines Jahrgangs werden eigens gehalten.
Eine mandantenbezogene Aufteilung ist ein Beispiel für eine List-basierte Aufteilung. Abbildung 3 veranschaulicht diese Strategie anhand der Buchungen eines oder mehrerer Mandanten, die getrennt von anderen Mandanten gehalten werden.
In unserem Fall haben wir uns für den mandantenbezogenen Ansatz entschieden, da die Daten schon entsprechend geclustert waren.
Weiterhin war ein wesentliches Kriterium bei unseren Überlegungen die Skalierung der bestehenden Architektur mit möglichst minimalinvasiven Maßnahmen. Das Sharding findet daher zwischen Präsentations- und Logikschicht statt und die einzelnen Shards bestehen somit aus Kombinationen von Applikationsserver- und Datenbankinstanzen (Abb. 4).
Für diesen Ansatz haben wir uns vor allem wegen folgender Aspekte entschieden:
Der Ansatz ist flexibel genug, um dann nach und nach auf die identifizierten verbliebenen Schwachstellen reagieren zu können. Tabelle 1 zeigt die wichtigsten Pros und Kontras des Ansatzes noch einmal im Überblick.
Für den Client verlief die Umstellung auf eine geshardete Serverlandschaft transparent, da alle Aufrufe über einen neu dazwischen geschalteten Proxy abgefangen werden und ausschließlich diesen Sharding-spezifischen Code enthält. Der so genannte MultiServerProxy (kurz MSP, siehe Listing 1) erfüllt folgende Aufgaben:
Listing 1
public class MultiServerProxy implements InvocationHandler {
Class<?> interface, mapperClass, reducerClass;
public Object invoke (final Object proxy, final Method method, final Object[] args) throws Throwable {
// reflect mapper & reducer
if (reducerClass == null) {
interface = proxy.getClass().getInterfaces()[0]; // depends on architecture
mapperClass = Class.forName(interface.getName() + "Mapper");
reducerClass = Class.forName(interface.getName() + "Reducer");
}
// map
AbstractMapper mapper = (AbstractMapper) mapperClass.newInstance();
method.invoke(mapper, args);
List<String> shards = mapper.getMappedShards();
// execute
final Map<String, ?> results = new ConcurrentHashMap<String, Object>();
final CountDown countdown = new CountDown(shards.size());
for (final String shard : shards) {
new Thread() {
public void run() {
Object service = getService(interface, shard); // depends on architecture
try {
Object result = method.invoke(service, args);
results.put(shard, result);
} catch (Exception e) {
results.put(shard, e);
} finally {
countdown.release();
}
}
}.start();
}
countdown.acquire(); // wait for all
// merge & return
AbstractReducer = (AbstractReducer) reducerClass.newInstance();
reducer.setResults(results);
Object result = method.invoke(reducer, args);
if (result instanceof Throwable)
throw (Throwable) result;
else
return result;
}
}
Schauen wir uns nun die Schritte anhand des in Abbildung 5 ersichtlichen Beispiels einer Kundenverwaltung genauer an: Für beide Phasen (map und reduce) wird pro Interface der Logikschicht eine implementierende Klasse des Interface realisiert. Der MSP sucht diese Klassen anhand des Namens des Logikinterface, indem er per Konvention Klassen mit demselben Namen und dem Suffix Mapper bzw. Reducer versucht, zu instanziieren.
Er ruft vor dem eigentlichen Aufruf der Shards zunächst die Methode des Mappers auf, beispielsweise findCustomer(100). Da der Mapper zunächst keine Ahnung hat, auf welchem Shard sich der Kunde mit der ID 100 befindet (Listing 2), selektiert er alle Shards. Der Rückgabewert der Methode wird ignoriert. Stattdessen holt sich der MSP mittels getMappedShards das Ergebnis des Mapping-Schritts, befragt jeweils in einem eigenen Thread die entsprechenden Shards und wartet, bis alle ein Ergebnis geliefert haben.
Listing 2
public class CustomerControllerMapper
extends AbstractMapper implements CustomerController {
static Map<Long, String> customerShards = new ConcurrentHashMap<Long, String>();
public Customer findCustomer(long id) {
String shard = customerShards.get(id);
if (shard != null)
mappedShards.add(shard);
else
mappedShards.addAll(ALL_SHARDS);
return null; // will be ignored
}
public List<Customer> findCustomers(String lastName) {
mappedShards.addAll(ALL_SHARDS);
return null; // will be ignored
}
public void saveCustomer(Customer c) {
if (c.getId() == null) { // new customer
c.setId(UUID.generate());
int rand = Random.nextInt(ALL_SHARDS.size()); // depends on sharding rule
String shard = ALL_SHARDS.get(rand);
mappedShards.add(shard);
CustomerControllerMapper.customerShards.put(c.getId(), shard);
} else
findCustomer(c.getId());
}
public void removeCustomer(long id) {
findCustomer(id);
}
}
Die Ergebnisse stellt er als results Map dem Reducer zur Verfügung und ruft dann auch bei ihm findCustomer(100) auf (Listing 3). Dieser verwertet die Ergebnisse. Einer der Shards wird vermutlich den richtigen Kunden geliefert haben, die anderen null, weil sie den Kunden nicht kennen. Nachdem beim Mapper die ID des Kunden für immer auf die Adresse des Shards gemappt wird, wird dieser Kunde zurückgegeben und vom MSP danach dem Client als Ergebnis zurückgeliefert. Für diesen war der Aufruf also transparent.
Beim nächsten Aufruf weiß der CustomerControllerMapper schon, auf welchem Shard sich der Kunde mit der ID 100 befindet und braucht nicht mehr alle Shards zu befragen. Wie die anderen Methoden funktionieren, erfährt der geübte Java-Code-Leser ebenfalls aus Listing 2 und 3.
Listing 3
public class CustomerControllerReducer
extends AbstractReducer implements CustomerController {
public Customer findCustomer(long id) {
for (MapEntry<String, Object> result : results) {
if (result.getValue() instanceof Customer) {
Customer c = (Customer) result.getValue();
String shard = result.getKey();
CustomerControllerMapper.customerShards.put(c.getId(), shard);
return c;
}
}
return null; // customer not found
}
public List<Customer> findCustomers(String lastName) {
List<Customer> ret = new ArrayList<Customer>();
for (MapEntry<String, Object> result : results) {
List<Customer> part = (List<Customer>) result.getValue();
String shard = result.getKey();
for (Customer c : part)
CustomerControllerMapper.customerShards.put(c.getId(), shard);
ret.addAll(part);
}
ret.sort(); // sort the same way the server did for all parts
return ret;
}
}
Wir haben einen Ansatz skizziert, mit dem man eine „typisch“ gewachsene Java-EE-Architektur zu einer skalierbaren Architektur weiterentwickeln kann, ohne neue Produkte einsetzen zu müssen. Damit entgeht man so manchen Risiken bei einer Umstellung. Der Ansatz ist aufgrund der Unabhängigkeit der Shards untereinander fast linear skalierbar. Einige Herausforderungen, die hier nicht beleuchtet wurden, sind allerdings zu bewältigen:
Aufmacherbild: 3d render of servers symbol icon on the keyboard. Big data concept von Shutterstock / Urheberrecht: dencg