Leistungsfähiges Backend für IoT-Anwendungen

Azure Event Hubs
Kommentare

Für das „Internet of Things“ oder kurz IoT werden schwindelerregende Wachstumszahlen prognostiziert. Je nachdem welcher Prognose man Glauben schenken will, werden im Jahr 2020 bis zu 26 Milliarden „Dinge“ mit dem Internet verbunden sein und komplexe Systeme bilden. All diese Geräte oder Devices werden Daten erheben und zur weiteren Verarbeitung an ein zentrales System übertragen. Gerade bei Systemen, die in sehr kurzen Intervallen Daten übermitteln, oder bei einer sehr großen Anzahl an Devices ergeben sich Anforderungen an das Backend, die mit klassischen Lösungsansätzen bzw. Architekturen nur schwer zu erfüllen sind.

Wenn es um viele Sensordaten geht, ist ein IoT-System zur Überwachung von Umweltdaten in einer Chemie- oder Industrieanlage sicher ein gutes Beispiel. Dort finden sich sehr schnell Tausende von Sensoren, die in kurzen Zeitintervallen Telemetriedaten wie z. B. die aktuelle Temperatur, Luftfeuchtigkeit, Windstärke, Luftverschmutzung etc. senden.

Um sich die anfallenden Datenmenge zu verdeutlichen, kann die in Tabelle 1 gezeigte Rechnung aufgestellt werden. In Summe ergeben sich hier ~ 86,5 Millionen Ingests pro Tag und ~ 70 Mbyte Datenvolumen pro Stunde, die analysiert und bearbeitet werden müssen. Man kann sich vorstellen, dass klassische Ansätze, wie z. B. das Speichern der Daten in einer relationalen Datenbank mit nachgelagerter Businesslogik, welche die Daten analysiert, sehr schnell an Grenzen stoßen. Microsoft stellt mit Azure Event Hubs nun einen Cloud Service zur Verfügung, der es sehr einfach macht, die Datenmengen, die bei einer IoT-Lösung anfallen können, entgegenzunehmen und für die weitere Verarbeitung bereitzustellen.

Tabelle 1: Beispielrechnung für die Sensorendatenmenge

Telemetrie Ingest mit Azure Event Hubs

Event Hubs ist ein weiterer Service innerhalb des Azure Service Bus und stellt neben Relay, Queue, Topic/Subscription und Notifications das fünfte Familienmitglied dar und wurde für High-scale-Daten-Ingest konzipiert (Abb. 1). Ein Event Hub kann hierbei Daten von bis zu 1 MB (Megabyte!) pro Sekunde aufnehmen und für die weitere Verarbeitung speichern.

Ähnlich wie Topics kann ein so genannter „Event Publisher“, sprich der Sensor, der die Telemetriedaten erzeugt, diese an einen Event Hub übermitteln. „Event Consumer“, d. h. Komponenten, welche die Telemetriedaten verarbeiten, können diese wiederum aus dem Event Hub abrufen. Ähnlich wie bei Topics/Subscription können die Telemetriedaten von mehreren „Event Consumer“ abgerufen werden und mehrere „Event Publisher“ können die Daten an den Event Hub senden (Abb. 2).

Abb. 1: Aufbau des Azure Service Bus

Abb. 2: Der Event Hub zwischen Publisher und Comsumer

Competing Consumer vs. Partitioned Consumer

Der Hauptunterschied zwischen Event Hubs und Topics ist, dass Event Hubs Telemetriedaten hochskalierbar, mit geringer Latenz und hoher Zuverlässigkeit entgegennehmen und für weitere Verarbeitung vorhalten können. Dies wird im Unterschied zu Topics durch einen anderen konzeptionellen Aufbau erreicht. Service Bus Topics und Service Bus Queues verwenden ein so genanntes „Competing Consumer“-Modell, in dem mehrere Consumer versuchen von der gleichen Ressource (Subscription oder Queue) zu lesen. Dieser „Wettbewerb“ der Consumer wird vom Server koordiniert und z. B. durch Locks, Dead Letter Queues usw. verwaltet. Im weitesten Sinne kann dieses Konzept mit einem serverseitigen Cursor verglichen werden. Wie alle serverseitigen Konzepte münden diese in Komplexität und Skalierungslimits am Server. So auch bei Service Bus Queues und Topics.

Event Hubs verwenden im Gegensatz dazu ein „Partitioned Consumer“-Modell. Hierbei verwaltet der Client, welche Daten er vom Server, in diesem Fall dem Event Hub, bereits angefordert hat bzw. anfordern möchte. Dies ist im weitesten Sinne mit einem clientseitigem Cursor zu vergleichen. Da sich die Komplexität am Server dadurch verringert, kann dieser auch eine sehr viel größere Menge an Informationen bzw. Telemetrie Ingests entgegennehmen und verwalten.

Das Konzept des „Partitioned Consumer“ findet sich bei der Arbeit mit Event Hubs noch an weiteren Stellen. So wird bei der Anlage eines Event Hubs im Portal von Microsoft Azure zwingend die Anzahl von „Partitionen“ und eine „Retention Time“ in Tagen verlangt (Abb. 3). Bei der „Retention Time“ handelt es sich um einen Zeitraum (aktuell maximal 7 Tage), in dem der Event Hub alle eingegangenen Telemetrie Ingests speichert. Das heißt sobald von einem „Event Consumer“ Daten abgerufen und verarbeitet wurden, werden diese nicht im Event Hub gelöscht, sondern weiter vorgehalten. Somit kann ein einzelner „Event Consumer“ Telemetriedaten öfters abrufen. Nach Verstreichen der „Retention Time“ werden die Telemetriedaten automatisch vom Event Hub gelöscht. Der Event Hub ist damit nicht nur ein „Enterprise Service Bus“, der Nachrichten entgegennimmt und weiterleitet. Er kann vielmehr als „Event-Datenbank“ betrachtet werden, aus der ein Client oder „Event Consumer“ beliebig Daten innerhalb der „Retention Time“ abrufen kann. Die Angabe der Partitionsanzahl hat weniger mit der Skalierbarkeit des Event Hub, sondern mehr mit dem Auslesen der Telemetriedaten aus dem Event Hub zu tun. Die Partitionsanzahl muss aktuell zwischen 8 und 32 liegen und jeder „Event Consumer“ muss für jede Partition einen eigenen Partition Reader definieren. Details dazu im folgenden Sourcecode. Event Hubs stellen sowohl für Ingest als auch Egress der Events das AMQP-Protokoll und ein HTTP-REST-Interface zur Verfügung.

Abb. 3: Event Hub mit Retention Time im Azure Portal

Somit liefert der Event Hub bereits zwei Funktionen, die sehr häufig in IoT-Szenarien benötigt werden. Zum einem den Telemetrie Ingest at Scale durch „Event Producer“ und zum anderen die Speicherung dieser Telemetriedaten zum mehrmaligen Aufruf durch „Event Consumer“.

Erzeugung Azure Event Hub

Im genannten Beispielszenario sollen Sensoren Umweltbedingungen wie z. B. Temperatur, Luftfeuchtigkeit und Luftverschmutzung messen und die gewonnenen Daten an einen Event Hub senden. Um dies zu verwirklichen, muss zunächst ein Event Hub angelegt werden. Dies kann komfortabel über das Azure Management Portal oder über ein Powershell Script erledigt werden (Listing 1).

param
(
  [bool]$FalseValue = $false
)

#Get-AzureSubscription; Gets list of existing Subscription
$SubscriptionName = "<>"
$ServiceBusNamespaceName = "IoTMasterClass-ns"
$EventHubName = "IoTMasterClass"

Add-AzureAccount
Select-AzureSubscription $SubscriptionName
New-AzureSBNamespace -Name $ServiceBusNamespaceName -Location "West Europe" 
-CreateACSNamespace $FalseValue -NamespaceType Messaging
$SBNamespace = Get-AzureSBNamespace -Name "IoTMasterClass-ns"
Write-Output $SBNamespace.ConnectionString 

Nach Anlage des Service Bus Namespace kann ein Event Hub angelegt werden. Dies kann wiederum über das Azure Portal oder über Code erfolgen. Der C#-Code aus Listing 2 zeigt die Erzeugung eines Event Hub. Zusätzlich zum Event Hub wird eine Shared Access Signature mit den Rechten „Send“ und „Listen“ erzeugt. Das heißt mithilfe dieser Policy können Telemetriedaten an den Event Hub gesendet und auch wieder ausgelesen werden. Hierzu muss der Wert von eventHusSASKey aus Listing 1 im Code für den „Event Publisher“ als auch für den „Event Consumer“ verwendet werden, um Zugriffe digital zu signieren.

public async Task CreateEventHubAsync()
{
  string eventHubName = "IoTMC";
  string eventHubSASKeyName = "Device01";
  string serviceBusConnectionString = "<>"; 

  NamespaceManager nameSpaceManager = 
NamespaceManager.CreateFromConnectionString(serviceBusConnectionString);
  string eventHubKey = SharedAccessAuthorizationRule.GenerateRandomKey();
  EventHubDescription eventHubDescription = new EventHubDescription(eventHubName)
  {
    PartitionCount = 8,
    MessageRetentionInDays = 1
  };
  SharedAccessAuthorizationRule eventHubSendRule = new SharedAccessAuthorizationRule(eventHubSASKeyName, 
eventHubKey, 
    new[] { AccessRights.Send, AccessRights.Listen });
    eventHubDescription.Authorization.Add(eventHubSendRule);
    string eventHubSASKey = ((SharedAccessAuthorizationRule)eventHubDescription.Authorization.First()).PrimaryKey;
  return await nameSpaceManager.CreateEventHubIfNotExistsAsync(eventHubDescription); 
}

Event Publisher; Telemetry Ingest

Um Telemetriedaten an einen Event Hub zu senden, finden sich im Azure Service Bus SDK Objekte zur objektorientierten, komfortablen Entwicklung. „Devices“ können jedoch i. d. R. nicht mit dem SDK anprogrammiert werden, weil diese nicht genügend Hauptspeicher, Rechenkapazität oder keinen Managed-Code ausführen können. Auf sehr vielen „Devices“ ist jedoch ein HTTP Stack vorhanden, der ebenfalls benutzt werden kann, um Telemetriedaten zu senden.

Der nachfolgende Node.js-Code kann beispielsweise auf einem Raspberry Pi, oder „Devices“, die Node.js-Code ausführen können, eingesetzt werden. Im Beispielcode werden mittels eines HTTP POST Telemetriedaten an den soeben erzeugten Event Hub gesendet.

Um den POST am Event Hub zu autorisieren, muss im Header ein Eintrag „Authorization“ mit einem signierten String vorhanden sein. Der signierte String besteht unter anderem aus dem Target-URL des POST und einem Gültigkeitsdatum (siehe createSASToken() in Listing 3). Signiert wird der String wiederum mit dem Schlüssel, der während der Anlage der Shared Access Policy erzeugt wurde (siehe string eventHubSASKey in Listing 2)

var https = require('https');
var crypto = require('crypto');
var moment = require('moment');
 
// Event Hubs parameters
var nameSpace = 'IoTMC-ns';
var hubName ='IoTMC';
var deviceName = 'Device01';

// Shared access key (from Event Hub configuration)
var policyName = 'Device01';
var policyKey = 't0JK19v94H3......kGcIUFi8zmGmBts4N09aNI0s=';
 
// Full Event Hub publisher URI
var eventHubUri = 'https://' + nameSpace + '.servicebus.windows.net' + '/' + hubName + '/publishers/' + deviceName + '/messages';
 
// Create a SAS token
// See http://msdn.microsoft.com/library/azure/dn170477.aspx
function createSASToken(uri, policyName, key)
{
  // Token expires in one hour
  var expiry = moment().add(1, 'hours').unix();
 
  var stringToSign = encodeURIComponent(uri) + 'n' + expiry;
  var hmac = crypto.createHmac('sha256', key);
  hmac.update(stringToSign);
  var signature = hmac.digest('base64');
  var token = 'SharedAccessSignature sr=' + encodeURIComponent(uri) + '&sig=' + 
encodeURIComponent(signature) + '&se=' + expiry + '&skn=' + policyName;
 
  return token;
}

setInterval(sendTelemetry, 1000);

function sendTelemetry(){
  var mySAS = createSASToken(eventHubUri, policyName, policyKey)

  // Payload to send
  var polution=Math.floor(Math.random()*101)
  var payload = '{"Temperature":"37.0","Humidity":"0.4","Polution":"' + polution + '"}';

  //Show payload send
  console.log(payload);

  // Send the request to the Event Hub
  var options = {
    hostname: nameSpace + '.servicebus.windows.net',
    port: 443,
    path: '/' + hubName + '/publishers/' + deviceName + '/messages',
    method: 'POST',
    headers: {
      'Authorization': mySAS,
      'Content-Length': payload.length,
      'Content-Type': 'application/atom+xml;type=entry;charset=utf-8'
    }
  };
  
  var req = https.request(options, function(res) {
    res.on('data', function(d) {
      process.stdout.write(d);
    });
  });
  
  req.on('error', function(e) {
    console.error(e);
  });
  
  req.write(payload);
  req.end();
}

Event Consumer; Telemetrie Egress

Um Telemetriedaten komfortabel aus einem Event Hub auszulesen, stellt das Azure Service Bus SDK alle nötigen Objekte zur Verfügung. Zu beachten ist, dass bei der Anlage eines Event Hub eine Anzahl von Partitionen angegeben wurde. Für jede dieser Partitionen muss nun eine eigene Instanz eines EventHubReceiver erzeugt wurden, die explizit Daten aus dieser Event Hub Partition ausliest (Listing 4).

public void ReadFromEventHubPartition()
{
  string serviceBusNamespace = "iotmc-ns";
  string eventHubName = "IoTMC";
  string eventHubSASKeyName = "Device01";
  int eventHubPartitionCount = 8;
  string eventHubSASKey = "t0JK19v94H3R8yAZ1umBts4N09aNI0s=";

  string eventHubConnectionString = ServiceBusConnectionStringBuilder.CreateUsingSharedAccessKey(
    ServiceBusEnvironment.CreateServiceUri("sb", serviceBusNamespace, string.Empty),
    eventHubSASKeyName,
    eventHubSASKey);

  EventHubClient eventHubClient = EventHubClient.CreateFromConnectionString(
    eventHubConnectionString, eventHubName);
  EventHubConsumerGroup eventHubConsumerGroup = eventHubClient.GetDefaultConsumerGroup();

  for (int i = 0; i 
    {
      //Just "new" message -> DateTime.Now
      //EventHubReceiver eventHubReceiver = eventHubConsumerGroup.CreateReceiver(partitionId, 
DateTime.Now);
      //All existing messages in partition -> -1
      EventHubReceiver eventHubReceiver = await eventHubConsumerGroup.CreateReceiverAsync(partitionId, 
"-1");
      do
      {
        EventData eventData = await eventHubReceiver.ReceiveAsync(TimeSpan.FromSeconds(2));
        if (eventData != null)
        {
          string message = Encoding.UTF8.GetString(eventData.GetBytes());
          string messageDetails = String.Format(
           "Received: Seq number={0} Offset={1} Partition={2} EnqueueTimeUtc={3} Message={4}",
           eventData.SequenceNumber,
           eventData.Offset,
           eventHubReceiver.PartitionId,
           eventData.EnqueuedTimeUtc.ToShortTimeString(),
         message);
         Console.WriteLine(messageDetails);
       }
     } while (true);
   });
  }
}

Beim Erzeugen eines EventHubReceiver mittels CreateReceiver() wird das Konzept des clientseitigen Cursor deutlich. Der Funktion muss eine PartitionID und zusätzlich ein DateTime Value oder Offset übergeben werden. Der DateTime Value legt fest, ab welchem Zeitpunkt Telemetriedaten abgerufen werden sollen. Wird DateTime.Now übergeben, werden alle Telemetrie-Ingests ab dem aktuellen Zeitpunkt ausgelesen. Bei Übergabe von -1 werden alle noch in der Partition des Event Hub vorhandenen Telemetrie Ingests abgerufen.

Bei Übergabe eines Offset kann genau festgelegt werden, ab welchem Telemetrie-Ingest beginnend neue Daten abgerufen werden sollen. Dieser Wert kann aus dem Property Offset eines empfangenen Telemetrie-Ingest ausgelesen werden.

Um das Handling mit Partitionen zu vereinfachen, kann das NuGet Package Microsoft.Azure.ServiceBus.EventProcessorHost verwendet werden, welches die Verarbeitung vereinfacht. Hierzu muss eine Klasse implementiert werden, welche das Interface IEventProcessor verwendet. Das Interface erfordert die Anlage einer Funktion ProcessEventsAsync(), welche aufgerufen wird, wann immer Daten in einer beliebigen Partition zur Verarbeitung anliegen.

Fazit

Mit Azure Event Hubs stellt Microsoft einen hoch skalierbaren Cloud Service zur Verfügung, der das „Sammeln“ und Bereitstellen von Telemetriedaten zur weiteren Bearbeitung stark vereinfacht und damit eine der grundlegenden Problematiken von IoT-Anwendungen adressiert. Im weiteren Verlauf stellen Event Hubs eine ideale Basis dar, um mittels Azure Stream Analytics eine weitere Near-Realtime-Verarbeitung des eingehenden Datenstroms zu ermöglichen.

Aufmacherbild: Data Management Technology via Shutterstock / Urheberrecht: kentoh

Unsere Redaktion empfiehlt:

Relevante Beiträge

Meinungen zu diesem Beitrag

X
- Gib Deinen Standort ein -
- or -