Dojos für Entwickler 2. Stefan Lieser

Чтение книги онлайн.

Читать онлайн книгу Dojos für Entwickler 2 - Stefan Lieser страница 12

Автор:
Серия:
Издательство:
Dojos für Entwickler 2 - Stefan Lieser

Скачать книгу

... und nach der Parallelisierung.

      Listing 10

      Stichwörter ermitteln, parallel.

      public class Eindeutige_Stichwörter_ermitteln_parallel { private readonly Action<Tuple<string, string>> process; public Eindeutige_Stichwörter_ ermitteln_parallel() { var dateinamen_suchen = new Dateinamen_suchen(); var alle_Stichwörter_ermitteln1 = new Alle_Stichwörter_ermitteln(); var alle_Stichwörter_ermitteln2 = new Alle_Stichwörter_ermitteln(); var eindeutige_Stichwörter_filtern = new Eindeutige_Stichwörter_filtern(); var scatter = new Scatter<string>(); var gather = new Gather<string>(); dateinamen_suchen.Result += scatter.Process; scatter.Output1 += alle_Stichwörter_ ermitteln1.Process; scatter.Output2 += alle_Stichwörter_ ermitteln2.Process; alle_Stichwörter_ermitteln1.Result += gather.Input1; alle_Stichwörter_ermitteln2.Result += gather.Input2; gather.Result += eindeutige_Stichwörter_ filtern.Process; eindeutige_Stichwörter_filtern.Result += x => Result(x); process = path_und_SearchPattern => dateinamen_suchen.Process( path_und_SearchPattern); } public void Process(Tuple<string, string> path_und_SearchPattern) { process(path_und_SearchPattern); } public event Action<IEnumerable< string>> Result; }

      Zu beachten ist, dass der Flow, der durch die Platine realisiert wird, durch die Verwendung von Scatter und Gather asynchron abläuft. Nach Aufruf der Process-Methode der Platine kehrt der Kontrollfluss zum Aufrufer zurück, während der Flow der Platine asynchron auf weiteren Threads noch läuft. Auch der Result- Event der Platine wird asynchron auf einem anderen Thread ausgelöst.

      Die Parallelisierung der Stichwortsuche zeigt deutliche Geschwindigkeitsvorteile. Das ist natürlich nicht in jedem Fall so. Ein triviales Beispiel, bei dem einfach nur Zahlen multipliziert werden, wird durch die Parallelisierung nicht beschleunigt. Im Gegenteil! Durch den Overhead des Multithreadings wird der Durchsatz sogar verringert.

      Realisierung

      Die Grundidee für den Scatter-Baustein besteht darin, ein und denselben Enumerator in den beiden Output-Events zu verwenden. Und natürlich werden die beiden Output-Events auf je einem eigenen Thread ausgelöst.

      Normalerweise wird beim Iterieren mit dem foreach-Sprachkonstrukt gearbeitet:

      var ints = new[] { 1, 2, 3, 4 }; foreach(var i in ints) {...}

      Hinter den Kulissen wird dies vom C#- Compiler in Aufrufe übersetzt, die in IEnumerable und IEnumerator definiert sind:

      var enumerator = ints.GetEnumerator(); while(enumerator.MoveNext()) { var i = enumerator.Current; // Mache etwas mit i }

      Wichtig ist hier festzuhalten, dass für die Schleife mit GetEnumerator eine neue Instanz eines Enumerators erzeugt wird. Jeder Aufruf von GetEnumerator liefert eine neue Instanz eines Enumerators! Insofern wäre es beim Scatter-Baustein schwierig, in jedem der beiden Ausgangsevents mit foreach über die Eingangsdaten zu iterieren, weil dann zwei Enumeratoren im Spiel wären. Beide würden jeweils von vorn nach hinten über die Eingangsdaten iterieren. Jedes Element der Eingangsdaten soll aber nur genau einmal an einem der beiden Ausgänge anstehen. Das wird erreicht, indem beide Ausgänge dieselbe Instanz des Enumerators verwenden, siehe Listing 11.

      Listing 11

      Der Scatter-Baustein.

      public class Scatter<T> { public void Process(IEnumerable<T> input) { var enumerator = input.GetEnumerator(); var thread1 = new Thread(() => Output1( GenerateOutput(enumerator))); var thread2 = new Thread(() => Output2( GenerateOutput(enumerator))); thread1.Start(); thread2.Start(); } private IEnumerable<T> GenerateOutput( IEnumerator<T> enumerator) { while (enumerator.MoveNext()) { yield return enumerator.Current; } } public event Action<IEnumerable<T>> Output1; public event Action<IEnumerable<T>> Output2; }

      Der Enumerator wird einmal mit Get- Enumerator instanziert und dann beide Male an die Methode GenerateOutput übergeben. Diese iteriert mithilfe des Enumerators über die Eingangsdaten und liefert sie mit yield return als neue Aufzählung zurück. Auf diese Weise werden die einzelnen Elemente der Eingangsdaten nur jeweils einmal an einen der beiden Ausgänge verteilt.

      Listing 12 zeigt die Tests für den Scatter- Baustein.

      Listing 12

      Scatter testen.

      [TestFixture] public class ScatterTests { private Scatter<int> sut; private IEnumerable<int> result1; private IEnumerable<int> result2; [SetUp] public void Setup() { sut = new Scatter<int>(); sut.Output1 += x => result1 = x; sut.Output2 += x => result2 = x; } [Test] public void Jedes_Element_steht_genau_ einmal_an_einem_der_Ausgänge() { sut.Process(new[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); Assert.That(result1.Union(result2), Is.EquivalentTo(new[] {1,2,3,4,5,6,7,8,9,10})); } [Test] public void Die_beiden_Ausgänge_laufen_ nicht_auf_dem_Haupthread() { var mainThread = Thread.CurrentThread; sut.Output1 += _ => Assert.That(Thread.CurrentThread, Is.Not.SameAs(mainThread)); sut.Output2 += _ => Assert.That(Thread.CurrentThread, Is.Not.SameAs(mainThread)); sut.Process(new[]{1}); } [Test] public void Die_beiden_Ausgänge_laufen_ auf_unterschiedlichen_Threads() { Thread thread1 = null; Thread thread2 = null; sut.Output1 += _ => thread1 = Thread.CurrentThread; sut.Output2 += _ => thread2 = Thread.CurrentThread; sut.Process(new[]{1}); Assert.That(thread1, Is.Not.SameAs(thread2)); } }

      Der erste Test überprüft, ob die beiden Aufzählungen, die an den Ausgängen des Scatter-Bausteins gebildet werden, zusammengenommen der Aufzählung des Eingangs entsprechen. Da es durch die Ausführung auf mehreren Threads zu Veränderungen der Reihenfolge kommen kann, erfolgt die überprüfung mit Is.EquivalentTo. Die beiden anderen Tests prüfen, ob die Ausgänge tatsächlich auf einem anderen als dem Mainthread ablaufen und ob wirklich beide Ausgänge einen eigenen Thread erhalten.

      Gather

      Aufgabe des Gather-Bausteins ist es, die Aufzählungen, die an den beiden Eingängen anstehen, zu einer Aufzählung für den Ausgang zusammenzufassen. Dabei wird der Ausgang auf einem eigenen Thread ausgeführt, damit der Baustein nicht blockiert. Bei der Implementation habe ich zunächst auf eine ConcurrentQueue und einen AutoResetEvent gesetzt. Doch dabei kam es sporadisch dazu, dass der Baustein blockierte. Die Synchronisierung der beiden Eingänge ist nicht so trivial, wie es zunächst den Anschein haben mag. Doch das .NET Framework hält eine weitere Datenstruktur bereit, die in diesem Fall die Lösung bedeutete: eine BlockingCollection. Die bewerkstelligt genau, was ich gebraucht habe: Auf der Eingangsseite sollen Daten in die BlockingCollection ergänzt werden können. Mit der Add-Methode ist das kein Problem. Und da diese Datenstruktur threadsicher ist, können auch beide Eingänge asynchron Einträge ergänzen, ohne dass es zu Problemen kommt.

      Auf der Ausgangsseite liegt die Herausforderung. Hier können natürlich nur Elemente entnommen werden, wenn welche vorhanden sind. Wenn kein Eintrag vorhanden ist, muss der Thread, auf dem der Ausgangsevent des Gather-Bausteins läuft, angehalten werden. Erst wenn auf der Eingangsseite wieder ein Element ergänzt wird, soll der Thread weiterlaufen. Thread.Sleep kommt nicht infrage, so viel dürfte inzwischen klar sein. Ursprünglich hatte ich hier den AutoResetEvent verwendet, um auf neue Daten zu warten. Doch die BlockingCollection macht genau, was ich brauche: Sie blockiert, wenn keine Daten anliegen. Man kann sie daher gefahrlos iterieren. Im MoveNext des zugrunde liegenden Enumerators wird der Thread so lange angehalten, bis Daten anliegen. Damit sieht

Скачать книгу