Durchhaltevermögen

Long-Running-Workflows als Serverless Functions in Azure
Keine Kommentare

Azure Functions bringen viele Eigenschaften mit, die einem die Arbeit erleichtern. Für länger laufende Prozesse sind sie allerdings weniger gut geeignet. Hier helfen Durable und Entity Functions weiter.

Serverless Functions [1] sind aus meiner Sicht eine großartige Erweiterung von Microsoft Azure, die sich nicht umsonst zunehmend größerer Beliebtheit erfreut. Die Gründe: Man muss sich weder um die Auswahl der richtigen Anzahl und Größe von Servern kümmern noch um die Konfiguration des Autoscalings. Schon gar nicht ist man damit beschäftigt, virtuelle Maschinen aktuell zu halten. APIs in der Cloud kommen durch die Serverless-Technologie wie der sprichwörtliche Strom aus der Steckdose. Auch dort steckt eine gewaltige Ingenieursleistung dahinter, immer die richtige Menge Strom zum richtigen Zeitpunkt anzubieten. So ist es auch mit Serverless Functions. Man verpackt seinen Code, übergibt ihn Microsoft und lässt es deren Problem sein, die notwendige Infrastruktur für die gerade anstehende Last bereitzustellen. Nach dem sogenannten Consumption Plan zahlt der Nutzer für das, was er tatsächlich verbraucht, und die Kosten fallen sogar auf null, wenn gerade niemand die Cloud-Software nutzt [2].

Die zweite Besonderheit von Azure Functions ist das Programmiermodell: Es ist Event-getrieben. Events können dabei einerseits die üblichen HTTP Requests sein, falls das zu entwickelnde API ein Web-API sein soll. Es gibt aber auch eine große Anzahl anderer Events, auf die man reagieren kann [3]. Hier einige Beispiele:

  • Eine Datei wird auf den Blob Storage hochgeladen.

  • Eine Datenänderung geschieht in Cosmos DB.

  • Eine Nachricht kommt von einem IoT-Gerät.

  • Über den Service-Bus kommt eine Nachricht von einem anderen Microservice.

  • Ein Timer informiert darüber, dass ein eingestellter Zeitpunkt erreicht wurde.

Das Konzept von Azure Functions passt daher perfekt, wenn man eine Software in Form lose gekoppelter Microservices aufbauen möchte.

Wozu Durable Functions?

Die klassischen Azure Functions haben zwei Eigenschaften, die man beim Design berücksichtigen muss. Erstens müssen sie ihre Aufgabe in relativ kurzer Zeit erledigen. Das Standard-Timeout sind fünf Minuten (Functions mit HTTP-Trigger müssen sogar in knapp vier Minuten antworten), bei Bedarf kann es auf bis zu zehn Minuten erhöht werden [4]. Zweitens sind Serverless Azure Functions stateless. Der Entwickler muss sich selbst um das Speichern von State kümmern, beispielsweise in anderen Azure-PaaS- oder Serverless-Diensten wie Azure SQL Database oder Cosmos DB.

Aufgrund dieser beiden Einschränkungen sind Azure Functions für lange laufende Prozesse nicht gut geeignet. Stellen Sie sich vor, Ihre Serverless Function soll während der Ausführung mit einem Benutzer über einen Slack-Bot kommunizieren. Es ist nicht vorhersagbar, wie schnell der Benutzer reagiert. Es können Minuten oder sogar Stunden vergehen. Eine Function würde mit großer Wahrscheinlichkeit in ein Timeout laufen.

In solchen Situationen helfen Durable Functions und Entity Functions. Sie sind dafür konzipiert, lange zu laufen und sich um das State-Management selbst zu kümmern. Wir konzentrieren uns nun auf diese Varianten der Azure Functions und gehen davon aus, dass Sie als Leserin oder Leser grundlegendes Wissen über die klassischen Azure Functions haben. Falls Ihnen diese Erfahrung fehlt, empfehle ich das Durcharbeiten eines entsprechenden Tutorials, wie man es zum Beispiel unter [5] findet.

Programmieren mit Durable Functions

Mit Durable Functions implementiert man lange laufende Workflows. Im Gegensatz zu anderen Workflowtools wird dafür aber keine eigene deklarative Sprache (z. B. domänenspezifische Sprache (DSL), XML, JSON) verwendet, sondern ganz normales C# (Azure Functions unterstützt auch andere Programmiersprachen, hier beschränken wir uns aber auf C#). Aus der Codestruktur lässt sich der Ablauf des Workflows gut ablesen. Die einzelnen Aktivitäten des Workflows, die eventuell länger dauern könnten, sind hinter await-Aufrufen versteckt.

Das allein ermöglicht es aber noch nicht, lange laufende Workflows in C# zu programmieren. Azure Functions sind serverless. Die Serverlandschaft, auf der Ihr C#-Code läuft, kann sich daher ständig ändern. Es kommen Server dazu oder der Server, auf dem eine Workflowinstanz gerade ausgeführt wird, fällt weg. Wie gehen Durable Functions damit um? Die Antwort auf diese Frage scheint auf den ersten Blick absurd: Durable Functions werden immer wieder von Beginn an ausgeführt.

Die einzelnen Aktivitäten innerhalb des Workflows müssen aus diesem Grund deterministisch sein. Das bedeutet, dass sie bei jedem Durchlauf einer Workflowinstanz bei gleichen Eingabeparametern das gleiche Ergebnis liefern müssen. Entsprechend dem Event-Sourcing-Pattern speichert die Durable Functions Runtime jede Aktion einer Workflowinstanz und deren Ergebnis automatisch in Azure Storage ab. Startet die Function einer Workflowinstanz später von vorn, wird vor dem Aufruf der jeweiligen Aktion geprüft, ob sie bei einem früheren Durchlauf der Instanz schon einmal ausgeführt wurde. Falls ja, wird die Aktion nicht erneut durchgeführt, sondern ihr zuvor ermitteltes Ergebnis wird gelesen und zurückgegeben. Dadurch schadet es nichts, dass immer wieder von vorn begonnen wird. Die schon durchgeführten Aktionen werden quasi übersprungen.

Viele C#-Funktionen sind nicht von Haus aus deterministisch. Man denke an die aktuelle Uhrzeit, die Erzeugung einer neuen GUID, externe Web-APIs, Zufallszahlen etc. Solche APIs dürfen in Durable Functions nicht verwendet werden. Ihr API bietet Alternativen mit ähnlicher Funktionalität an, die mit der Durable Functions Runtime kompatibel sind [6].

Entspannt abwarten

Listing 1 enthält ein Beispiel für eine Durable Function, die das Human Interaction Application Pattern [7] implementiert. Abbildung 1 stellt den Ablauf als Sequenzdiagramm dar.

Abb. 1: Sequenzdiagramm zu Listing 1

Abb. 1: Sequenzdiagramm zu Listing 1

In unserem Szenario sendet eine Verkehrsüberwachungskamera Geschwindigkeitsübertretungen über ein HTTP-Web-API an eine normale Azure Function (SpeedViolationRecognition). Diese Funktion überprüft die Genauigkeit, mit der das Fahrzeugkennzeichen erkannt wird. Ist diese nicht hoch genug, brauchen wir die Hilfe eines Menschen, der sich das aufgenommene Bild ansieht und das erkannte Kennzeichen prüft. Die Interaktion mit dieser Person könnte über ein Messaging-System wie Slack erfolgen (im Beispielcode dieses Artikels nur angedeutet). Da nicht absehbar ist, wie rasch die Person reagieren wird, steckt die Interaktionslogik in einer Durable Function (ManuallyApproveRecognition). Sie sendet die Bitte nach manueller Kennzeichenkontrolle an Slack und wartet darauf, dass Slack die Antwort über eine Web-API-Funktion (ProcessSlackApproval) zurückliefert. Über ein Event (ReceiveApprovalResponseEvent) wird die Durable Function über das Eintreffen der Antwort informiert und die Geschwindigkeitsübertretung kann fertig verarbeitet werden (StoreSpeedViolation).

Beachten Sie beim Durchsehen des Codes insbesondere die Orchestration ID, die an verschiedenen Stellen eingesetzt wird. Sie identifiziert die Workflowinstanz eindeutig. Mit ihr wird in den Event-Sourcing-Tabellen in Azure Storage gefiltert. Abbildung 2 zeigt den Zusammenhang zwischen HTTP-Web-API, der Orchestration ID und den Tabellen in Azure Storage.

Abb. 2: Event-Source-Tabelle in Azure Storage

Abb. 2: Event-Source-Tabelle in Azure Storage



using System.Net;
using System.Net.Http;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
 
namespace DurableFunctions
{
  # region Data Transfer Objects
  /// <summary>
  /// Represents a speed violation recognized by a traffic camera
  /// </summary>
  public class SpeedViolation
  {
    /// <summary>
    /// ID of the camera that has recognized the vehicle
    /// </summary>
    public int CameraID { get; set; }
 
    /// <summary>
    /// License plate number as recognized by the camera
    /// </summary>
    public string LicensePlateNumber { get; set; }
 
    /// <summary>
    /// Accuracy of license plate recognition (value between 0 and 1)
    /// </summary>
    public double RecognitionAccuracy { get; set; }
 
    /// <summary>
    /// Measured speed of the vehicle
    /// </summary>
    public decimal SpeedKmh { get; set; }
  }
 
  /// <summary>
  /// Represents a request for manual approval of license plate read
  /// </summary>
  public class ApprovalRequest
  {
    /// <summary>
    /// ID or the long-running orchestration handling the approval process
    /// </summary>
    public string OrchestrationInstanceID { get; set; }
 
 
    /// <summary>
    /// Data about the speed violation to approve
    /// </summary>
    public SpeedViolation SpeedViolation { get; set; }
  }
 
  /// <summary>
  /// Represents a response of a user concerning a license plate read
  /// </summary>
  public class ApprovalResponse
  {
    /// <summary>
    /// ID or the long-running orchestration handling the approval process
    /// </summary>
    public string OrchestrationInstanceID { get; set; }
 
    /// <summary>
    /// True if license plate read has been confirmed, otherwise false
    /// </summary>
    public bool Approved { get; set; }
  }
  # endregion
 
  public class TrafficSpeedViolation
  {
    /// <summary>
    /// Web API handling incoming speed violations recognized by traffic cameras
    /// </summary>
    /// <returns>
    /// OK if license plate read accuracy was ok, otherwise tracking data for
    /// long-running orchestration handling manual approval of license plate read.
    /// </returns>
    [FunctionName(nameof(SpeedViolationRecognition))]
    public async Task<HttpResponseMessage> SpeedViolationRecognition(
      [HttpTrigger(AuthorizationLevel.Anonymous, "post")]HttpRequestMessage req,
      [DurableClient] IDurableOrchestrationClient starter,
      ILogger log)
    {
      // Get speed violation data from HTTP body
      var sv = JsonSerializer.Deserialize<SpeedViolation>(await req.Content.ReadAsStringAsync());
 
      // Check if read accuracy was not good enough
      if (sv.RecognitionAccuracy < 0.75d)
      {
        log.LogInformation($"Recognition not accurate enough, starting orchestration to ask human for help");
 
        // Start durable function for manual approval process
        string instanceId = await starter.StartNewAsync(nameof(ManuallyApproveRecognition), sv);
 
        // Return status object with instance ID and URLs for status monitoring
        return starter.CreateCheckStatusResponse(req, instanceId);
      }
 
      // Read accuracy was ok, than store it (assumption: storing speed
      // violation is pretty fast, i. e. a matter of seconds).
      await StoreSpeedViolation(sv, log);
      return new HttpResponseMessage(HttpStatusCode.OK);
    }
 
    private const string ReceiveApprovalResponseEvent = "ReceiveApprovalResponse";
 
    /// <summary>
    /// Web API receiving responses from Slack API
    /// </summary>
    /// <returns>
    /// OK if approval was ok, BadRequest if approval is unknown or no longer running
    /// </returns>
    [FunctionName(nameof(ProcessSlackApproval))]
    public async Task<HttpResponseMessage> ProcessSlackApproval(
      [HttpTrigger(AuthorizationLevel.Anonymous, "post")]HttpRequestMessage req,
      [DurableClient] IDurableOrchestrationClient orchestrationClient,
      ILogger log)
    {
      // Get approval response from HTTP body
      var slackResponse = JsonSerializer.Deserialize<ApprovalResponse>(await req.Content.ReadAsStringAsync());
 
      // Get status based on orchestration Id
      var status = await orchestrationClient.GetStatusAsync(slackResponse.OrchestrationInstanceID);
      if (status.RuntimeStatus == OrchestrationRuntimeStatus.Running || status.RuntimeStatus == OrchestrationRuntimeStatus.Pending)
      {
        log.LogInformation("Received Slack response in time, raising event");
 
                // Raise an event for the given orchestration
                await orchestrationClient.RaiseEventAsync(slackResponse.OrchestrationInstanceID,
                    ReceiveApprovalResponseEvent, slackResponse.Approved);
                return new HttpResponseMessage(HttpStatusCode.OK);
            }
 
            return new HttpResponseMessage(HttpStatusCode.BadRequest);
        }
 
        /// <summary>
        /// Durable function handling long-running approval process
        /// </summary>
        [FunctionName(nameof(ManuallyApproveRecognition))]
        public async Task<bool> ManuallyApproveRecognition([OrchestrationTrigger] DurableOrchestrationContext context)
        {
          // Get speed violation data from orchestration context
          var sv = context.GetInput<SpeedViolation>();
 
          // Call activity that sends approval request to Slack. Note that this
          // activity will not await the human's response. It will only wait until
          // message will have been sent to Slack.
          await context.CallActivityAsync(nameof(SendApprovalRequestViaSlack), new ApprovalRequest
          {
            OrchestrationInstanceID = context.InstanceId,
            SpeedViolation = sv
          });
 
          // We want the human operator to respond within 60 minutes. We setup a
          // timer for that. Note that this is NOT a regular .NET timer. It is a
          // special timer from the Durable Functions runtime!
          using var timeoutCts = new CancellationTokenSource();
          var expiration = context.CurrentUtcDateTime.AddMinutes(60);
          var timeoutTask = context.CreateTimer(expiration, timeoutCts.Token);
 
          // Wait for the event that will be raised once we have received the           // response from Slack.
          var approvalResponse = context.WaitForExternalEvent<bool>(ReceiveApprovalResponseEvent);
 
          // Wait for Slack response or timer, whichever comes first
          var winner = await Task.WhenAny(approvalResponse, timeoutTask);
 
          // Was the Slack task the first task to complete?
          if (winner == approvalResponse && approvalResponse.Result)
          {
            // License plate read approved -> Store speed violation
            await context.CallActivityAsync(nameof(StoreSpeedViolation), sv);
          }
 
          if (!timeoutTask.IsCompleted)
          {
            // All pending timers must be completed or cancelled before the             // function exits.
            timeoutCts.Cancel();
          }
 
          return winner == approvalResponse && approvalResponse.Result;
    }
 
    [FunctionName(nameof(SendApprovalRequestViaSlack))]
    public Task SendApprovalRequestViaSlack([ActivityTrigger] ApprovalRequest req, ILogger log)
    {
      log.LogInformation($"Message regarding {req.SpeedViolation.LicensePlateNumber} sent to Slack " +
        $"(instance ID {req.OrchestrationInstanceID}!");
 
      // Todo: Send data about speed violation to Slack via Slack REST API.
      // Not implemented here, just a demo.
 
      return Task.CompletedTask;
    }
 
    [FunctionName(nameof(StoreSpeedViolation))]
    public Task StoreSpeedViolation([ActivityTrigger] SpeedViolation sv, ILogger log)
    {
      log.LogInformation($"Processing speed violation from camera {sv.CameraID}" +
        $"for LP {sv.LicensePlateNumber} ({sv.SpeedKmh} km/h)");
 
      // Todo: Add code for processing speed violation
      // Not implemented here, just a demo.
 
      return Task.CompletedTask;
    }
  }
}

Entity Functions

Erst vor wenigen Monaten wurden die Azure Durable Functions um eine neue Art von Funktionen erweitert: die Entity Functions. Während sich bei den bisherigen Durable Functions (aka Orchestrator Functions) der State implizit durch den Kontrollfluss und die lokalen Variablen im C#-Code ergibt, wird er bei Entity Functions explizit gespeichert.

Das Prinzip lässt sich am leichtesten an einem Beispiel erklären. Listing 2 enthält ein Durable Entity, das ein Verfahren wegen Geschwindigkeitsübertretung repräsentiert (SpeedViolationLawsuit). Es verwendet die Class-based Syntax. Alternativ würde auch eine Function-based Syntax angeboten, wobei Microsoft in C# die Variante mit Klassen empfiehlt [8]. Man muss keinen Code schreiben, um die Instanzen dieser Klasse zu persistieren. Die Durable Functions Runtime speichert den State der Instanzen als JSON Blobs in Azure Storage. Das Aussehen des JSON lässt sich mit den üblichen C#-Attributen für die JSON-Serialisierung beeinflussen.

using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using System;
using System.Threading.Tasks;
 
namespace DurableFunctions
{
  /// <summary>
  /// Represents a speed violation lawsuite
  /// </summary>
  public class SpeedViolationLawsuit
  {
    public SpeedViolation SpeedViolation { get; set; }
 
    public string Driver { get; set; }
 
    public decimal? Fine { get; set; }
 
    public bool Paid { get; set; }
 
    public void SetSpeedViolation(SpeedViolation sv) => SpeedViolation = sv;
 
    public void StoreDriver(string driver) => Driver = driver;
 
    public async Task SetFine(decimal fine)
    {
      if (string.IsNullOrEmpty(Driver))
      {
        throw new InvalidOperationException();
      }
 
      // Simulate an async operation (e. g. for I/O)
      await Task.Delay(1);
 
      Fine = fine;
    }
 
    public void MarkAsPaid()
    {
      if (!Fine.HasValue)
      {
        throw new InvalidOperationException();
      }
 
      Paid = true;
    }
 
    public void Delete()
    {
      // Note how we access the current entity
      Entity.Current.DeleteState();
    }
 
    [FunctionName(nameof(SpeedViolationLawsuit))]
    public static Task Run([EntityTrigger] IDurableEntityContext ctx)
    {
      // When creating a new entity, make sure it is marked as not paid
      if (!ctx.HasState)
      {
        ctx.SetState(new SpeedViolationLawsuit
        {
          Paid = false
        });
      }
 
      return ctx.DispatchAsync<SpeedViolationLawsuit>();
    }
  }
}

Listing 3 zeigt, wie man aus normalen und Durable Functions heraus mit den Durable Entities umgeht. In ManuallyApproveRecognition sieht man, wie eine neue Entity erzeugt wird. In unserem Szenario wäre das beispielsweise dann der Fall, wenn in der Durable Function der Benutzer die Kennzeichenerkennung bestätigt hat. GetLawsuite zeigt, wie man den State der Durable Entity mit Hilfe ihrer ID (in unserem Fall eine GUID) lesen kann. SetDriver ist schließlich ein stark vereinfachtes Beispiel für das Aufrufen einer Methode, die den State der Entity verändert. Ich empfehle jedem, der mit Durable Entities experimentiert, analog zu Abbildung 2 einen Blick in den Azure Storage zu werfen, um ein tieferes Verständnis dafür zu bekommen, wie Azure Functions den State tatsächlich speichern.

public async Task<bool> ManuallyApproveRecognition(
  [OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
{
  // ...
 
  // Create a new instance of a speed violation lawsuite
  var entityId = Guid.NewGuid();
  var lawsuitId = new EntityId(nameof(SpeedViolationLawsuit), entityId.ToString());
 
  // Store data of speed violation in new entity
  await context.CallEntityAsync(lawsuitId, nameof(SpeedViolationLawsuit.SetSpeedViolation), sv);
 
  // ...
}
 
[FunctionName(nameof(GetLawsuite))]
public async Task<IActionResult> GetLawsuite(
  [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "lawsuite/{entityKey}")] HttpRequestMessage req,
  string entityKey,
  [DurableClient] IDurableEntityClient client,
  ILogger log)
{
  // Get state of speed violation lawsuite using its Id (GUID)
  var svl = await client.ReadEntityStateAsync<SpeedViolationLawsuit>(
    new EntityId(nameof(SpeedViolationLawsuit), entityKey));
 
  // Return current state of lawsuite
  return new OkObjectResult(svl);
}
 
[FunctionName(nameof(SetDriver))]
public async Task<IActionResult> SetDriver(
  [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "lawsuite/{entityKey}/setDriver")] HttpRequestMessage req,
  string entityKey,
  [DurableClient] IDurableEntityClient client,
  ILogger log)
{
  var driverName = JsonSerializer.Deserialize<string>(await req.Content.ReadAsStringAsync());
 
  // Send one-way message to lawsuite entity (not processing result)
  await client.SignalEntityAsync(
    new EntityId(nameof(SpeedViolationLawsuit), entityKey),nameof(SpeedViolationLawsuit.StoreDriver), 
    driverName);
 
  return new StatusCodeResult((int)HttpStatusCode.Accepted);
}

Fazit

Durch Durable Functions wurde der Anwendungsbereich von Azure Functions deutlich erweitert. Die bisherigen Einschränkungen bei den Ausführungszeiten fallen weg. Das Task-basierte Programmiermodell und async/await eignen sich gut zur Abbildung von Workflows. Der C#-Code lässt sich angenehm lesen und spiegelt den Inhalt der Workflows gut wider. Auch komplexe Anwendungsfälle wie die parallele Ausführung mehrerer Teilprozesse (aka Fan out/Fan in) oder die Einbindung von Menschen in lange laufende Workflows sind kein Problem.

Bis vor wenigen Monaten war es jedoch schwierig, mit den Durable Functions das Aggregatorpattern umzusetzen. Dabei kommen Daten, die eine logische, adressierbare Entität betreffen, über einen längeren Zeitraum von verschiedenen Quellen und müssen gesammelt werden. Genau für solche Anwendungsfälle eignen sich die neuen Durable Entities.

Neben den funktionalen Erweiterungen von Azure Functions vernachlässigt Microsoft auch die technische Weiterentwicklung nicht: Die aktuelle Azure Functions Runtime erlaubt bereits die Verwendung von .NET Core 3.1. Auch der Betrieb von Functions in Kubernetes wird unterstützt [9].

Man sieht, dass Microsoft die Entwicklung von Azure Functions flott vorantreibt. Die Plattform ist aus meiner Sicht eine interessante Option für alle, die auf lose gekoppelte Microservices und Event-getriebene Cloud-Lösungen setzen.

 

Unsere Redaktion empfiehlt:

Relevante Beiträge

Abonnieren
Benachrichtige mich bei
guest
0 Comments
Inline Feedbacks
View all comments
X
- Gib Deinen Standort ein -
- or -