Delphis TThread-Klasse und die OmniThreadLibrary

Mutlithreading mit Delphi
Kommentare

Delphi bietet verschiedene Möglichkeiten, multithreaded zu programmieren. Der Artikel zeigt eine Einführung in die Delphi-eigene TThread-Klasse und die OmniThread-Bibliothek.

Seit einigen Jahren ist bei den CPUs der großen Hersteller ein Trend zu erkennen. Die Prozessoren werden derzeit nicht oder nur wenig schneller, was die Taktung der CPU betrifft. Einfacher ist es, neue Techniken wie Intels Hyperthreading oder mehrere Prozessorkerne in einer CPU einzubauen. Um die neuen Möglichkeiten bzw. die Mehrleistung der Prozessoren nutzen zu können, muss bei der Softwareentwicklung mit Threads und Prozessen gearbeitet werden. Anwendungen, die mit Delphi geschrieben sind, laufen erst einmal nur in einen Thread. Dieser Thread ist der Haupt-Thread von der VCL und kümmert sich primär um das Verarbeiten der Windows-Botschaften bzw. um die Programmteile, die über die Formulare aufgerufen werden. Das hat jedoch zur Folge, dass nur ein Prozessorkern genutzt werden kann und die Anwendungen bei Methoden mit langer Laufzeit die Formulare nicht mehr neu zeichnen können.

Einen eigenen Thread mithilfe der Delphi-Klasse TThread zu erstellen, ist einfach. Für den eigenen Thread wird eine eigene Klasse erstellt, die von TThread abgeleitet ist. Nun braucht nur noch die Klassenmethode Execute überschrieben zu werden. In dieser Methode wird der Programmteil abgelegt, der in einen eigenen Thread laufen soll. Beim Instanziieren der Klasse kann bestimmt werden, ob der Thread sofort starten soll. Wird dem Constructor nichts oder false übergeben, startet der Thread sofort. Wird stattdessen true übergeben, wird der Thread als suspendiert erzeugt. Dadurch können weitere Einstellungen vorgenommen werden. Eine wichtige Einstellung wird in der Eigenschaft FreeOnTerminate vorgenommen. Wird diese auf true gesetzt, dann wird der Speicher des Thread-Objekts freigegeben, nachdem die Ausführung der Execute-Methode abgeschlossen ist. Über die Eigenschaft Priority kann die Thread-Priorität gesetzt werden. Um einen Thread zu beenden, wird das Ereignis OnTerminate zur Verfügung gestellt. Damit der Thread gestartet wird, wenn dieser angehalten erzeugt wurde, muss nur noch die Methode Start bzw. bei älteren Delphi-Versionen die Methode Resume aufgerufen werden.

Es ist wichtig zu wissen, dass die VCL von Delphi nicht threadsafe ist. Folglich kann es zu Problemen kommen, wenn mehrere Threads gleichzeitig mit Komponenten arbeiten und deren Werte verändern. Ein kleines Beispiel an dieser Stelle, um den Begriff „threadsafe“ zu erklären: Es soll zwei Threads geben, die eine Integer-Variable lesen. Anschließend wird der Integer-Wert erhöht und dann wieder zurück in die Variable geschrieben. Nun kann es passieren, dass beide Threads fast zeitgleich die Variable gelesen haben, aber noch nicht erhöht und wieder abgelegt haben. Nun schreiben beide Threads die Variable zurück. Das Ergebnis ist, dass die Variable nur einmal hochgezählt wurde und nicht wie erwartet, dass zwei Threads natürlich die Variable zweimal hochzählen. Das konnte nur passieren, weil die Threads die Variable nicht exklusiv für sich nutzen konnten. Um solche Probleme in den Griff zu bekommen, müssen Thread-Synchronisierungstechniken eingesetzt werden. Events und Mutexes werden oft zur Synchronisierung genutzt. Delphi bietet u. a. die Klasse TMonitor dafür an.

Synchronisierungen haben allerdings auch entscheidende Nachteile. Beim Warten auf einen anderen Thread oder auf Ressourcen kann es zu so genannten Deadlocks kommen. Ein Deadlock ist eine Art Endlosschleife, bei der mehrere Prozesse gegenseitig auf sich warten. Ebenfalls kann es passieren, dass Threads nicht parallel funktionieren, da diese sich eine Ressource abwechselnd teilen müssen. Synchronisierungen verlangsamen auch den Programmablauf. So kann es passieren, dass manche Programmteile effizienter ohne Threads arbeiten.

Soll nun von dem eigenen Thread an die VCL zum Beispiel der Fortschritt gemeldet werden, muss diese Meldung synchronisiert an die VCL bzw. an deren Thread übergeben werden. Dieses ist mit Delphi selbstverständlich auch kein Problem. Die TThread-Klasse besitzt die Methode Synchronize, um diese Synchronisierung vorzunehmen. Es gibt zwei Möglichkeiten, die Methode einzusetzen. Die erste Variante ist, eine Methode in die eigene Thread-Klasse zu implementieren, die zweite ist, eine anonyme Methode zu nutzen. Beide Methoden dürfen keine Parameter haben und werden als Parameter der Synchronize-Methode übergeben. Ein Beispiel ist in dem Listing 1 zu sehen. Die TThread-Klasse besitzt eine statische Methode mit dem Namen CreateAnonymousThread. Diese Methode erwartet als Parameter eine anonyme Methode, die die Tätigkeiten von der Execute-Methode übernimmt. Als Ergebnis wird ein TThread-Objekt zurückgegeben. Die anonymen Methoden lassen sich nur in den neuen Delphi-Versionen ab Delphi 2009 nutzen.

type
  TMeinThread = class(TThread)
  private
    fStatus: integer;
    fThreadNr: integer;
    procedure StatusAktuallisieren;
  public
    procedure Execute; override;
    property Nr: integer write fThreadNr;
  end;
procedure TMeinThread.Execute;
var
  i: integer;
begin
  for i:=1 to 50 do
  begin
    fStatus:=i;
    Synchronize(StatusAktuallisieren);
  end;
end;
procedure TMeinThread.StatusAktuallisieren;
begin
  Form1.memo1.lines.Add( format('Thread %d Aufruf: %d', [fThreadNr, fStatus] ));
end;

OmniThreadLibrary

Als Alternative zu den in Delphi integrierten Multithreading-Hilfen, hat Primoz Gabrijelcic im Jahr 2008 mit der Bibliothek OmniThreadLibrary begonnen. Seit Dezember 2010 ist die Version 2.0 verfügbar. An der Bibliothek wird auch selbstverständlich weiter gearbeitet und ständig verbessert. Die OmniThreadLibrary kann mit Delphi 2009 oder höher genutzt werden. Diese Beschränkung ist erforderlich, da viel mit anonymen Methoden gearbeitet wird. Dadurch ist aber auch der Umgang erheblich einfacher. Die OmniThreadLibrary nutzt ein Fluent-Interface [4], das sogar Martin Fowler in seinen Büchern und Webseiten beschrieben hat. Ein Fluent-Interface erlaubt es, Methoden zu verketten (Method Chaining). Das ermöglicht einfachen Zugriff und Konfiguration von Klassen, Methoden und Eigenschaften ohne viel Programmierung. Für das Arbeiten mit Threads wurden in die Bibliothek einige Ideen aus den Parallel Extensions integriert. Viele Funktionen für die Synchronisierung von Daten und Threads sind ebenfalls enthalten.

Beginnen wir mit der Parallel.Async-Methode. Die Klasse Parallel ist in der Unit OtlParallel definiert. Die Methode Async ist die einfachste Art, eine Funktion in einen Thread auszulagern. Als Parameter wird eine anonyme Funktion übergeben, die die Programmteile enthält, die in einen separaten Task ausgeführt werden sollen. Die Methode hat noch weitere Implementierungen. Als zweiter Parameter kann eine anonyme Funktion angegeben werden, die ausgeführt wird, wenn der Task beendet ist. Als letzter Parameter kann eine IOmniTaskConfig-Schnittstelle übergeben werden. Über diese kann der Task noch weiter konfiguriert werden. Die zur Verfügung stehenden Implementierungen sind in Listing 2 dargestellt.

class procedure Async(task: TProc; taskConfig: IOmniTaskConfig = nil);
overload;
class procedure Async(task: TOmniTaskDelegate;
taskConfig: IOmniTaskConfig = nil);
overload;
class procedure Async(task: TProc; onTermination: TProc;
taskConfig: IOmniTaskConfig = nil); overload;
class procedure Async(task: TOmniTaskDelegate; onTermination: TProc;
taskConfig: IOmniTaskConfig = nil); overload;

Mit der Parallel.ForEach-Methode lässt sich, wie mit der Delphi-eigenen ForEach-Funktion über Zahlen, Arrays oder Listen, die einen Enumerator haben, iterieren. Die OmniThreadLibrary-Methode hat allerdings den Vorteil, dass die Anzahl der verfügbaren CPU-Kerne ermittelt und die Datenmenge anschließend auf die Kerne gleichmäßig verteilt wird. Als Parameter wird die Art der Iteration angegeben. Diese können Bereiche oder Enumerators aus den Delphi-Objekten sein. Letztere haben allerdings kleine Einschränkungen. Es muss mit Delphi 2010 gearbeitet werden und die Enumerators müssen in einem Objekt implementiert sein. Record-basierende Enumerators werden nicht unterstützt. Das liegt daran, dass über die RTTI einige Daten abgefragt werden. Über das Fluent-Interface kann anschließend in der Execute-Methode die Programmfunktion implementiert werden. Die Methode ForEach ist eine blockierende Methode. Mit anderen Worten, die Methode unterbricht den Programmfluss, bis alle Threads und Iterationen abgeschlossen sind. Schauen wir uns das im folgenden Beispiel in Listing 3 an.

  Parallel.ForEach(1, 1000)
  .Execute(
    procedure(const i: integer)
  begin
      // implementation hier
    End
  );

Das Verhalten mit dem Blockieren kann auch beeinflusst werden. Vor der Execute-Methode muss die Methode NoWait aufgerufen werden. Die Iteration wird übrigens nicht unbedingt in der erwarteten Reihenfolge ausgeführt, da die Datenmenge gleichmäßig auf die einzelnen Threads aufgeteilt wird. Je nachdem ob ein Thread schneller oder langsamer ist, bekommt dieser erneut Daten zugewiesen. Um sicher zu stellen, dass die Daten in der richtigen Reihenfolge abgearbeitet werden, gib es die Methode PreserveOrder.

Koordination

Die Bibliothek bringt neben den Thread-Funktionen auch Funktionen mit, die es erlauben, mehrere Threads untereinander zu synchronisieren und zu koordinieren. In der Unit OtlSync ist der Record TOmniCS definiert. Das CS steht für Critical Section. Der Record wird genutzt, um Programmteile nicht durch den Task Scheduler von Windows unterbrechen zu lassen. Dazu sind im Record zwei Methoden hinterlegt. Als Erstes gibt es die Methode Acquire, die den Eintritt in die kritische Sektion bewirkt. Ist diese Methode aufgerufen, kann kein weiterer Thread diese Methode verlassen, wenn diese aufgerufen wird. Die Threads warten, bis der erste Thread die Methode Release aufgerufen hat. Es sollte an dieser Stelle unbedingt mit try und finally gearbeitet werden, damit bei einem Fehler die Sektion wieder freigegeben wird. Ist die Methode Release aufgerufen, dann bekommt der nächste wartende Thread die Sektion zugewiesen. Weitere Threads bleiben nach wie vor in einer Warteposition.

Eine weitere Möglichkeit, Daten von einem Thread in einen anderen Thread zu bekommen, besteht mit TOmniBlockingCollection. Dazu wird eine Instanz von der Klasse erzeugt, die mit der Into-Methode der ForEach-Schleife übergeben wird. Die gerade beschriebene Übergabe der Collection ist in Listing 4 zu sehen. Die Execute-Methode hat einen weiteren Parameter erhalten. Dieser Parameter ist von Typ TOmniValue. Dieses entspricht ungefähr dem Delphi-eigenen Typ Variant. Der Unterschied ist, dass TOmniValue schlanker und dadurch schneller ist. Dieser neue Parameter nimmt das Ergebnis der Thread-Methode entgegen. Anschließend wird diese dann in die Collection eingefügt. Im Programmcode wird nur das Interface der Collection genutzt. Das hat den Vorteil, dass die Collection nicht manuell wieder freigegeben werden muss. Am Ende von Listing 4 ist ein normales ForEach zu sehen. An dieser Stelle geschieht eine Besonderheit: Die TOmniBlockingCollection blockiert so lange die ForEach-Schleife, bis alle Threads ihre Arbeit fertig abgeschlossen haben. Im vorgestellten Beispiel blockiert die Schleife nur, weil in Parallel.ForEach die Methode NoWait aufgerufen wurde. Die Collections werden auch massiv bei den OmniThread-Pipelines eingesetzt. Pipelines sind verkettete Threads, die sich gegenseitig die Daten über Collections liefern. Dazu gibt es umfangreiche Demos im OmniThreadLibrary-Quelltext.

var
  collection: iOmniBlockingCollection;

  collection:=TOmniBlockingCollection.Create;
  Parallel.ForEach(1, 1000)
  .NoWait
  .Into(collection)
  .Execute(
    procedure(const i: integer; var result: TOmniValue)
    begin
      result:=Berechnung(i);
    end
  );

  for value in col do
    ErgebnisVerwenden(value);

Eine weitere angenehme Funktion der OmniThreadLibrary ist es, dass es viele unterstützende Funktionen zur Thread-Kommunikation gibt. In Listing 5 ist die Nutzung von Messages zu sehen. Dazu wird einfach die Methode OnMessage aufgerufen. Als Parameter wird der Nachrichtenempfänger angegeben. In diesem Beispiel ist es das Formular der Anwendung. Im Formular wird anschließend eine Nachrichtenprozedur implementiert. Die Execute-Methode erhält nun auch wieder andere Parameter. Dieses Mal wird eine Schnittstelle zum Task mit übergeben. Über die Task-Methode task.Comm.Send können Nachrichten an das Formular geschickt werden. Als Parameter werden die Nachricht und die Daten angegeben, die über eine Queue verschickt werden. Die Queue wird per Default auf 1000 Elemente begrenzt, um nicht zu viel Speicher zu verbrauchen. Allgemein sollte sowieso darauf geachtet werden, dass nicht zu viele Threads erzeugt werden und die Threads lieber größere Datenmengen verarbeiten. Dadurch wird viel Zeit zum Instanziieren der Threads gespart und die Kommunikation reduziert. Als Summe erhält man eine schnellere Anwendung. Im Beispielprogramm auf der Heft-CD kann dies auch nachvollzogen werden. Das Programm, das auf dem Knopf „Message“ ausgeführt wird, läuft viel schneller als die anderen Implementierungen.

procedure TForm1.WMPaintMandel(var msg: TOmniMessage); message wm_paintMandel;
begin
  // msg.MsgData enthält nun die Daten
end;

  Parallel.ForEach(1, 1000)
  .OnMessage(Form1)
  .Execute(
    procedure(const Task: IOmniTask; const i: Integer)
    var
      value: TOmniValue;
    begin
      // Daten berechnen und in Value speichern
      task.Comm.Send(wm_paintMandel, value);
    end
  );

Als Letztes schauen wir uns die so genannten Futures an. Ein Future ist ein Programmteil, der asynchron ausgeführt wird. Der Rückgabewert kann später einfach ausgelesen werden. Wird auf diesen Rückgabewert zugegriffen, bevor der Thread abgeschlossen ist, wird das Auslesen des Werts blockiert, bis der Thread fertig ist. Genutzt wird diese, um z. B. Datensatzprüfungen in den Hintergrund zu schicken, wenn diese langsame Web Services abfragen. Angenommen, es soll ein Kundendatensatz mit einer Bankverbindung und Adressdaten vor dem Speichern geprüft werden, könnte über zwei Futures jeweils die Adresse und die Bankverbindung geprüft werden. Dabei werden die beiden Prüfungen parallelisiert. Das Speichern wird dann über ein If aufgerufen, das als Bedingung beide Rückgabewerte überprüft. Das bewirkt ein Warten auf beide Threads. Ein Beispiel dazu ist in Listing 6 zu sehen.

var
  fuBank, fuAdresse: IOmniFuture<Boolean>;
begin
  fuBank:=TOmniFuture<Boolean>.Create(
    function: Boolean begin
      result:=istBankVerbindungGueltig();
    end
  );
  fuAdresse:=TOmniFuture<Boolean>.Create(
    function: Boolean begin
      result:=istAdresseGueltig();
    end
  );

  // wartet bis beide fertig sind
  if fuBank.Value and fuAdresse.Value then
    MessageDlg('Speichern!', mtInformation, [mbok], 0);

Der Umgang scheint auf den ersten Blick sehr leicht. Die OmniThreadLibrary hat allerdings auch Nachteile, z. B. eine unvollständige Dokumentation. Es werden zwar über 50 Demos mitgeliefert, die die einzelnen Funktionen zeigen. Doch das Übertragen der Beispiele auf die eigenen benötigten Abläufe ist oftmals schwierig. Einige weitere Beispiele, Präsentationen und Videos zu der Bibliothek lassen sich im Blog von Primoz Gabrijelcic [4] finden.

Fazit

Multithreaded zu programmieren, ist ein schwieriges Unterfangen, da sehr viele unerwartete Dinge passieren können. Es muss ständig darauf geachtet werden, dass keine Deadlocks auftreten und dass alle Programmteile threadsafe sind. Das parallele Denken muss erst geübt werden und fällt unerfahrenen Entwicklern am Anfang schwer. Das Debugging von Multithreaded-Anwendungen ist schwierig, da diese sich im Debugger oft anders verhalten. Daher sollte immer stark abgewogen werden, ob überhaupt multithreaded programmiert werden muss. Bei den meisten Anwendungen ist diese schlichtweg nicht erforderlich. Die Werkzeuge selbst, die Delphi mitbringt, sind leicht zu nutzen. Die TThread-Klasse bringt das Wichtigste mit, um multithreaded zu programmieren. Wer erweiterte Funktionen benötigt, sollte auf jeden Fall zu der OmniThreadLibrary greifen.

Unsere Redaktion empfiehlt:

Relevante Beiträge

Meinungen zu diesem Beitrag

X
- Gib Deinen Standort ein -
- or -