So bewältigt man Datenmengen mit Azure HDInsight

Big-Data-Analyse mit Apache Hadoop in der Windows Azure Cloud
Kommentare

Der gelbe Dickhäuter ist endgültig in der Windows-Welt angekommen … und das Warten hat sich gelohnt. Microsoft kündigte Ende Oktober die allgemeine Verfügbarkeit und damit offiziell eine zum produktiven Einsatz freigegebene Version von HDInsight an. Dabei handelt es sich um eine auf Apache Hadoop basierende Cloud-Lösung für Azure, die zur Analyse sehr großer und unstrukturierter Datenmengen mittels skalierbarer und verteilter Batchverarbeitung geeignet ist.

Es ist hinreichend bekannt, dass sich Apache Hadoop über die letzten Jahre zu einem De-facto-Standard im Kontext von Big Data entwickeln konnte. Wenn es darum geht, riesige Mengen an Daten verteilt zu speichern und auszuwerten, nimmt Hadoop eine zentrale Stellung in modernen Datenarchitekturen ein. Dazu bietet es neben einem verteilten Dateisystem ein skalierbares und fehlertolerantes Programmiermodell namens MapReduce an. Da Hadoop auf Java basiert und ursprünglich ausschließlich für Linux-Umgebungen konzipiert wurde, war es bislang eine nicht zu unterschätzende Herausforderung, Apache Hadoop auf der Microsoft-Plattform produktiv einzusetzen. Durch eine Entwicklungspartnerschaft mit Hortonworks, einem US-amerikanischen, auf Hadoop spezialisierten Unternehmen, ist es Microsoft jedoch gelungen, diese Einstiegshürde erfolgreich zu beseitigen. So bieten sich potenziellen Anwendern je nach Präferenz mittlerweile auch im Microsoft-Umfeld zwei effektive Möglichkeiten, Hadoop stabil und produktiv einzusetzen:

  • In der Cloud: HDInsight als Windows-Azure-Cloud-Service
  • On Premise: HDP for Windows, ein MSI-Paket zur Installation auf Windows-Server

Der vorliegenden Artikel beschränkt sich auf die Betrachtung der standardmäßig unterstützten Datenverarbeitungsszenarien von Windows Azure HDInsight 2.1, einem Cloud-Service, der sich auf die Hortonworks Data Platform (HDP) 1.3.0 stützt. Im Laufe der letzten Jahre sind rund um Hadoop viele weitere Open-Source-Projekte unter dem Dach der Apache Software Foundation entstanden. Daraus hat sich ein stattliches und dynamisches Ökosystem mit der Zielsetzung entwickelt, Frameworks und Tools zu schaffen, die Hadoop geeignet ergänzen bzw. darauf aufsetzen. Tabelle 1 zeigt, welche Version von Apache Hadoop und welche ausgewählten Komponenten aus diesem Ökosystem in HDInsight bereitstehen, um Entwickler und Datenanalysten bei der täglichen Arbeit zu unterstützen.

Tabelle 1: Komponenten und Versionen von Windows Azure HDInsight 2.1

Tabelle 1: Komponenten und Versionen von Windows Azure HDInsight 2.1

Low-Level vs. High-Level MapReduce

Das von Google 2004 veröffentlichte und an das funktionale Paradigma angelehnte Programmiermodell MapReduce (Kasten: „Hadoop MapReduce“) stellte in den Anfangszeiten Hadoops die einzige Möglichkeit dar, verteilte Datenauswertungen auf Hadoop-Clustern durchzuführen. Neben der nativen MapReduce-Entwicklung auf Basis von Java konnten Entwickler Hadoop-Streaming einsetzen, um die entsprechenden Map- und Reduce-Codefragmente auch mit anderen Sprachen zu programmieren.

Hadoop MapReduce

Ein Hadoop-MapReduce-Job verarbeitet Input-Daten in zwei Schritten:

  1. Map-Phase: Input-Daten werden in so genannte Splits geteilt. Pro Split wird ein Map-Task gestartet, wobei auf jeden Record eines Splits die benutzerdefinierte Map-Funktion angewendet wird. Daten werden dabei in Form von Key-Value-Paaren verarbeitet. Die Map-Phase ist massiv parallelisierbar und damit ausschlaggebend für die gute Skalierbarkeit.
  2. Reduce-Phase: Reduce-Tasks bekommen den Output sämtlicher Map-Tasks zur weiteren Verarbeitung. Das Framework sorgt dafür, dass alle Daten (Values) eines Keys beim selben Reduce-Task landen. Die benutzerdefinierte Reduce-Funktion erhält folglich zu einem spezifischen Key die Liste mit allen dazugehörigen Values. Typischerweise werden Daten an dieser Stelle verdichtet (Aggregationsfunktionen wie Summe, Min/Max, Durchschnitt etc.).

Die Grafik illustriert das Schema beispielhaft anhand von drei Map-Tasks und einem Reduce-Task. Im Allgemeinen kommen sowohl mehrere Map-Tasks als auch Reduce-Tasks zum Einsatz.

Abb. 1: Beispielhaftes Schema zur Verarbeitung eines MapReduce-Jobs

Abb. 1: Beispielhaftes Schema zur Verarbeitung eines MapReduce-Jobs

Früh wurde erkannt, dass es relativ aufwändig war, gängige Datenverarbeitungsszenarien aus der Praxis ausschließlich auf Basis von Low-Level-MapReduce-Programmierung zu realisieren. Zum einen kommen typische Aufgaben im Bereich der Datenanalyse nicht mit einem einzigen MapReduce-Job aus, sondern bestehen aus einer geeigneten Verkettung mehrerer solcher Jobs. Andererseits stoßen Datenanalysten oft auf wiederkehrende Muster wie das Filtern, Gruppieren bzw. Aggregieren oder das Sortieren von Daten und müssen daher häufig ähnlichen Programmcode mehrfach schreiben. Auf diese Problematik haben die „Hadoop-Poweruser“ Yahoo und Facebook bereits vor einiger Zeit reagiert und mit Pig bzw. Hive zwei Abstraktionsschichten über MapReduce ins Leben gerufen. Bei Pig (Latin) handelt es sich um eine ausdrucksstarke, skriptbasierte Datenflusssprache, während Hive deklarative Datenauswertung in SQL-Manier ermöglicht. Beide High-Level-Ansätze sind mittlerweile etabliert, erlauben eine äußerst produktive Arbeitsweise und entschärfen die Einstiegshürde im Umgang mit Big Data und HDInsight insbesondere bei Themenneulingen erheblich. In beiden Fällen wird im Hintergrund automatisch eine dem Pig-Latin- oder HiveQL-Skript entsprechende MapReduce-Job-Sequenz generiert und zur Ausführung gebracht.

HDInsight unterstützt standardmäßig sowohl die eben angesprochenen Varianten auf Low-Level-Ebene als auch die beiden High-Level-Ansätze Pig und Hive. Während der Previewphase von HDInsight konnten sowohl MapReduce-Jobs als auch Pig- und Hive-Skripte aus interaktiven Konsolen direkt aus der Weboberfläche ausgeführt werden. Diese Option wurde nun durch eine mächtigere Alternative in Form einer spezifischen PowerShell-Erweiterung abgelöst. Die Windows-Azure-HDInsight-PowerShell-Tools erlauben robustes und einfaches Arbeiten mit dem HDInsight-Cluster aus der lokalen Windows-Umgebung. Des Weiteren können sich Entwickler und Datenanalysten bei speziellen Anforderungen nach Aktivierung von Remote-Management jederzeit per RDP-Verbindung auf den Head Node begeben, um den HDInsight-Cluster direkt aus der Windows-Server-Umgebung zu steuern.

Verteilte Speicherung in HDInsight

Ein wesentlicher Unterschied zwischen Windows Azure HDInsight und der herkömmlichen Open-Source-Version von Apache Hadoop liegt in der Massendatenspeicherung. Apache Hadoop verwendet zur verteilten und fehlertoleranten Speicherung beliebiger Daten standardmäßig das Hadoop Distributed File System (kurz HDFS), ein Dateisystem, das für große verteilte Datenauswertungen konzipiert und optimiert wurde. Fundamentale Designprinzipien von HDFS sind unter anderem die Speicherung sehr großer Dateien (Gigabytes bis zu Terabytes pro Datei), die Fehlertoleranz gegenüber Hardwareausfällen und das performante Lesen von Datenströmen. Details zur genauen Funktionsweise und der Architektur von HDFS finden sich in der offiziellen Dokumentation hier, und hier oder unter hier.

Datenlokalität

Ein zentraler Aspekt hinsichtlich der skalierbaren Datenverarbeitung mit Apache Hadoop ist das Prinzip der Datenlokalität. Es geht darum, das Programm zur Berechnung möglichst nahe an die Daten zu bringen – idealerweise genau auf jene Knoten im Cluster, auf denen die Daten tatsächlich gespeichert sind. Das bedeutet im Endeffekt, dass während der Map-Phase eines MapReduce-Jobs mit sämtlichen zur Berechnung benötigten Daten lokal gearbeitet werden kann. Dasselbe gilt natürlich im Allgemeinen nicht für die Reduce-Phase, weil diese auf den Outputs der Map-Phase weiterarbeitet und die Daten dazu knotenübergreifend übers Netzwerk transferiert werden müssen.

Der HDInsight-Cloud-Service bietet direkt auf den Knoten eines Hadoop-Clusters ebenso HDFS zur Datenspeicherung an. Standardmäßig kommt bei HDInsight jedoch Windows Azure Blob Storage zum Einsatz. Blob-Storage-Container speichern Daten in Form von Key-Value-Paaren. Keys sind vergleichbar mit herkömmlichen Dateinamen und dienen als eindeutige Verweise auf Daten. Values beziehen sich auf die den Keys zugeordneten Daten und repräsentieren damit den eigentlichen Inhalt von Dateien. Folglich gibt es keine klassische Verzeichnisstruktur, wie man sie von hierarchischen Dateisystemen kennt. Dennoch kann durch die Verwendung des Trennzeichens Slash („/“) im Namen der Keys eine hierarchische Dateisystemstruktur imitiert werden.

Um Blob Storage aus HDInsight möglichst problemlos verwenden zu können, steht eine zu HDFS kompatible, nahtlose Schnittstelle sowohl für die Zusammenarbeit mit Hadoop als auch mit Komponenten aus dem Ökosystem zur Verfügung. Ebenso funktionieren gängige, nicht auf die native HDFS-Implementierung bezogene Dateisystembefehle direkt aus der Hadoop CMD-Shell am entsprechenden HDInsight Master Node. Die Verwendung des Windows Azure Blob Storage bedingt, dass das fundamentale Prinzip der Datenlokalität (Kasten: „Datenlokalität“) streng genommen nicht gegeben ist, weshalb leichte Performanceeinbußen bei Lese- und Schreibvorgängen zu erwarten sind. Diese Tatsache wird jedoch dadurch abgeschwächt, dass die vorgehaltenen Cluster-Knoten zur Berechnung örtlich nahe am entsprechenden Blob Storage im Windows-Azure-Datenzentrum platziert werden. Zusätzlich profitiert der Datenaustausch über das Netzwerk vom hoch performanten Azure Flat Network Storage (hier und hier). Davon abgesehen ergeben sich aus der Nutzung des Blob Storage in erster Linie zwei Vorteile: Technisch gesehen kann der Blob Storage unabhängig von den benötigten Berechnungsressourcen elastisch skaliert werden und die Daten können problemlos anderen Anwendungen außerhalb des HDInsight-Clusters zur Verfügung gestellt werden. Aus wirtschaftlicher Sicht ergeben sich typischerweise wesentlich geringere Kosten für die längerfristige Speicherung von Roh- und Ergebnisdaten, weil HDFS-Speicherplatz aufrechter HDInsight-Cluster-Nodes um ein Vielfaches teurer ist.

Provisionierung von HDInsight-Clustern

Microsoft bietet für HDInsight sehr flexible Möglichkeiten zur Provisionierung von Hadoop-Clustern in der Azure Cloud. Mit dem Windows-Azure-Management-Portal steht eine benutzerfreundliche HTML5-Weboberfläche bereit, die mit ein paar Klicks und binnen weniger Minuten erlaubt, einen einsatzfähigen HDInsight-Cluster zu erstellen. Voraussetzung dafür ist ein aktiver Storage-Account in den Regionen North Europe, East oder West US. Im Rahmen der 30-tägigen Testversion von Windows Azure bekommt man derzeit gratis Credits im Wert von 150 Euro (200 US-Dollar) zum Experimentieren zur Verfügung gestellt. Damit kann unter Berücksichtigung des ab Dezember 2013 gültigen Preismodells ein kleiner HDInsight-Cluster mit einem Head Node und vier Compute Nodes für ungefähr 100 Stunden betrieben werden. Die Kosten für Storage und weitere Azure-Services sind dabei nicht eingerechnet.

Abgesehen von der manuellen Einrichtung über die Weboberfläche kann die Provisionierung von HDInsight-Clustern auch durch verschiedene Mechanismen (teil-)automatisiert werden. Dazu bieten sich Administratoren im Wesentlichen zwei Möglichkeiten: Zum einen gibt es ein spezifisches Windows-Azure-PowerShell-Modul zur automatisierbaren Verwaltung von HDInsight-Clustern. Andererseits steht zusätzlich eine plattformübergreifende, auf Node.js basierende Konsolenumgebung bereit, die in erster Linie Mac-OS- und Linux-Anwender ansprechen dürfte. Letztlich können sich auch Entwickler mit dem Anlegen, Konfigurieren und Überwachen von HDInsight-Clustern auseinandersetzen. Dazu existieren eigene Clientbibliotheken, die Teil des HDInsight .NET SDK sind.

Datenanalyse mit Pig

Im folgenden Beispiel sollen die Grundlagen der Datenverarbeitung mit Pig unter HDInsight anhand eines kundenunspezifischen Cross-Selling-Features vorgestellt werden. Ausgangspunkt ist eine TAB-separierte Textdatei (Listing 1), die schematische Warenkorbinformationen darstellt. Die Datei enthält pro Zeile eine Warenkorb-ID und die jeweils bestellten Artikel-IDs.

Listing 1
// \t=TAB, \n=Zeilenumbruch
W0098407034 \t A0000032541 \t A0000012534 \t A0000089756 \t A0009948528\n
W0200125338 \t A0003747263 \t A0000089756\n
W0074958300 \t A0009948528 \t A0054834738 \t A0001243621\n
... // beliebige weitere Einträge

Auf dieser Grundlage soll eine warenkorbübergreifende Gesamtauswertung erstellt werden, die zu jedem einzelnen Artikel ausgibt, mit wie vielen anderen Artikeln dieser bereits zusammen gekauft wurde. Zusätzlich soll im Detail ermittelt werden, wie häufig ein gemeinsamer Kauf von Artikelpaarungen über alle Einkäufe hinweg stattgefunden hat. Schließlich sollen die Daten die gemeinsamen Kaufvorgänge betreffend entlang beider Dimensionen in absteigender Reihenfolge sortiert werden. Einerseits auf der Ebene der Artikel, um zu erkennen, welche Artikel am häufigsten mit mindestens einem weiteren Artikel gemeinsam gekauft wurden. Andererseits sollen pro Artikel ebenso die einzelnen Paarungen nach Häufigkeit ihres Auftretens absteigend aufgereiht werden. Eine mögliche Lösung für diese Datenauswertung mittels Pig findet sich in Listing 2.

Listing 2
-- Eine UDF wird über die benutzerdefinierte JAR-Datei registriert.
REGISTER wasb:///pfad/zu/udf/paket.jar;

-- Zur kompakteren Schreibweise im Skript wird ein UDF-Alias definiert.
DEFINE ArtikelPaare my.pig.udf.GenerateTuplePairings();

-- Rohdaten werden von der TSV-Datei aus dem Blob Storage geladen.
raw_data = load 'wasb:///pfad/zum/input.tsv' USING PigStorage('\t');

-- Pro Datensatz werden alle Zweierkombinationen der enthaltenen Artikel
-- von der UDF erstellt und mit Flatten paarweise in eigene Zeilen geschrieben.
-- Dazu werden der UDF alle Artikel-IDs des jeweiligen Datensatzes übergeben.
-- Die Warenkorb-ID ist im weiteren Verlauf nicht von Relevanz.
article_pairs = FOREACH raw_data GENERATE FLATTEN(ArtikelPaare($1 ..));

-- Alle entstandenen Artikelkombinationen werden paarweise gruppiert, um auf
-- diesem Weg die Häufigkeiten ihrer gemeinsamen Vorkommnisse zu ermitteln.
pair_frequency = FOREACH (GROUP article_pairs BY ($0,$1)) 
    GENERATE group.$0 AS articleA,group.$1 AS articleB,
              COUNT(article_pairs) AS frequency;

-- Es erfolgt eine weitere Gruppierung, hier jedoch nicht mehr paarweise,
-- sondern nur nach dem ersten Artikel jedes Paares. So kann pro Artikel 
-- berechnet werden, wie oft dieser generell mit anderen gekauft wurde.
-- Zusätzlich werden jedem Artikel alle Paarungen, in denen er vorkam,
-- entsprechend den Häufigkeiten absteigend sortiert zugeordnet.
article_grouping = FOREACH (GROUP pair_frequency BY (articleA)) {
  freq_sort = ORDER pair_frequency BY articleA ASC,frequency DESC;
  GENERATE group AS article,COUNT(pair_frequency) AS cross_sellings,freq_sort;
  }

-- Am Ende werden die gesamten Daten noch absteigend anhand der Cross-Selling-
-- Anzahl sortiert. 
total_ranking = ORDER article_grouping BY cross_sellings DESC;

-- Das sortierte Resultat wird als Datei im Blob Storage gespeichert.
STORE total_ranking INTO 'wasb:///pfad/zum/ergebnisordner' using PigStorage();

Im Pig Latin Script wird in Zeile 4 eine User Defined Function (UDF) verwendet, um pro Datensatz der Input-Datei alle möglichen Artikelpaarungen zu generieren. Diese UDF wurde mit dem Java-Code aus Listing 3 realisiert und in eine JAR-Datei gepackt. Mit dem Register-Befehl kann diese JAR-Datei ins Pig Latin Script eingebunden werden. Mit der Pig-Version 0.10 wurde überdies die Möglichkeit geschaffen, UDFs mit JavaScript oder Python zu schreiben. Allerdings handelt es sich dabei nach wie vor um ein mager dokumentiertes und experimentelles Feature, weshalb hier der klassische Weg über die Java-UDF gewählt wurde. Im Rahmen der Ausführung erstellt Pig aus diesem Skript automatisch MapReduce-Code und einen geeigneten Ausführungsplan. Dieses Beispiel wird in eine Sequenz von vier MapReduce-Jobs umgewandelt. Im Vergleich zu einer Low-Level-Implementierung mit MapReduce benötigt die vorgestellte Variante auf Basis von Pig lediglich ein paar wenige Zeilen an Quellcode. Entwickler müssten für eine vergleichbare Auswertung mindestens drei MapReduce-Jobs programmieren. Typische Umsetzungen benötigen zwischen 250 und 300 Zeilen an Java-Code und erfordern auch tiefergehendes Know-how zu MapReduce, um z. B. die absteigenden Datensortierungen anhand beider Dimensionen realisieren zu können.

Listing 3
package my.pig.udf;
// NOTE: skipped import declarations
public class GenerateTuplePairings extends EvalFunc{
  private static final TupleFactory tFact = TupleFactory.getInstance();
  private static final BagFactory bFact = BagFactory.getInstance();
  @Override
  public DataBag exec(Tuple input) throws IOException {
    if(input == null || input.size() <= 1) {
      return bFact.newDefaultBag();
    }
    List pairs = new ArrayList();
    for(int i=0;i<input.size()-1;++i) {
      for(int j=i+1;j<input.size();++j) {
        if(!input.isNull(i) && !input.isNull(j)) {
          Tuple p1 = tFact.newTuple();
          p1.append(input.get(i));
          p1.append(input.get(j));
          pairs.add(p1);
          Tuple p2 = tFact.newTuple();
          p2.append(input.get(j));
          p2.append(input.get(i));
          pairs.add(p2);
        }
      }
    }
    return bFact.newDefaultBag(pairs);
  }
}

Pig-Jobausführung mit der Azure PowerShell

Bevor die Ausführung des Pig-Jobs über die HDInsight-PowerShell-Tools erfolgen kann, müssen noch sowohl die JAR-Datei mit der UDF als auch die Rohdaten in Form der TSV-Datei in den Blob-Storage-Container des jeweiligen HDInsight-Clusters hochgeladen und unter zwei Keys gespeichert werden. Für die Dateiverwaltung von Storage-Accounts bieten sich abseits von Azure PowerShell und der Azure Storage Client .NET Library auch verschiedene grafische Desktopclients an.

Der Pig-Job zur verteilten Datenauswertung kann schließlich problemlos aus der lokalen Windows-Umgebung gestartet werden. Voraussetzung dafür ist die Installation der Azure PowerShell inklusive HDInsight-Tools. Der Quellcode für das entsprechende PowerShell-Skript findet sich in Listing 4.

Listing 4
#Subscription und Cluster Settings
$subscriptionName = "Name Ihrer Azure Subscription"
$clusterName = "Name Ihres HDInsight Clusters"
#Pig Script und Jobdefinition
$pigLatinScript =  "Pig Latin Source aus Listing 2";
$pigJobDefinition = New-AzureHDInsightPigJobDefinition -Query $pigLatinScript
$pigJob = Start-AzureHDInsightJob -Subscription $subscriptionName -Cluster \
$clusterName -JobDefinition $pigJobDefinition
#Ausführung des Pig-Jobs
Wait-AzureHDInsightJob -Subscription $subscriptionName -Job \
$pigJob -WaitTimeoutInSeconds 3600
#Ergebnis (Ausgabe bzw. Fehler) anzeigen
Get-AzureHDInsightJobOutput -Cluster $clusterName –Subscription \
$subscriptionName -JobId $pigJob.JobId –StandardOutput –StandardError

Fazit und Ausblick

Die Zeiten, in denen Hadoop lediglich unter diversen Linux-Distributionen verwendet werden konnte, gehören zum Glück der Vergangenheit an. Die Partnerschaft zwischen Microsoft und Hortonworks hat sich mehr als gelohnt und einem erfolgreichen Unternehmenseinsatz innerhalb einer Windows-Infrastruktur steht nichts mehr im Weg. Wer keine Freude daran hat, sich einen On-Premise-Cluster auf Basis der Hortonworks-HDP-Plattform unter Windows selbst einzurichten, dem sei der Einsatz von HDInsight nahegelegt. Microsoft bietet damit in einer gut durchdachten und stabilen Cloud-Umgebung die aktuell gängigsten Anwendungsszenarien im traditionellen Hadoop-Umfeld an. Man kann sich also ohne jegliche Vorlaufzeit in Bezug auf administrative Tätigkeiten unmittelbar auf die Analyse von Massendaten stürzen. Für die Zielgruppe der Entwickler stehen Low-Level-MapReduce und Hadoop-Streaming zur Verfügung, während sich Datenanalysten vermutlich eher mit den High-Level-Ansätzen Pig und Hive anfreunden dürften. Ein absolutes Differenzierungsmerkmal im Vergleich zu den Konkurrenzanbietern ist die Anbindung des Windows Azure Blob Storage zur Massendatenspeicherung, womit sich für ausgewählte Anwendungsszenarien neben technischen vor allem wirtschaftliche Vorteile ergeben.

Auch in der Open-Source-Community hat sich gerade einiges getan. Knapp vor der offiziellen Ankündigung von HDInsight wurde Mitte Oktober 2013 mit dem Release von Apache Hadoop 2.0 ein weiterer Meilenstein erreicht. Auf Basis von YARN (Yet Another Resource Negotiator) können völlig neuartige Interaktionsszenarien im Big-Data-Kontext realisiert werden. Abseits der traditionellen Batchverarbeitung von ruhenden Daten können unter anderem In-Memory-, Stream- oder Graph-Processing-Analysen umgesetzt werden. Man darf demnach schon gespannt sein, wie sich HDInsight in den kommenden Monaten weiterentwickeln wird und ab wann wir über YARN-basierte Analyseszenarien in der Azure Cloud berichten dürfen.

Aufmacherbild: Big Data Storage Online Technology Database Concept via Shutterstock.com / Urheberrecht: Rawpixel

Unsere Redaktion empfiehlt:

Relevante Beiträge

Meinungen zu diesem Beitrag

X
- Gib Deinen Standort ein -
- or -