Erzeuger-Verbraucher-Problem
Das Erzeuger-Verbraucher-Problem (englisch producer–consumer problem, PCP) ist eine klassische, abstrakt formulierte Problemstellung der Prozesssynchronisation, welche eine Regelung der Zugriffsreihenfolge auf eine Datenstruktur durch elementerzeugende (schreibende) und elementverbrauchende (lesende) Prozesse bzw. Threads thematisiert. Die Zugriffsregelung soll verhindern, dass ein verbrauchender Prozess auf die Datenstruktur zugreift, wenn die Datenstruktur keine Elemente enthält und eine Entnahme eines Elements aus der Datenstruktur somit nicht möglich ist. Wenn die Aufnahmekapazität der Datenstruktur beschränkt ist, soll die Zugriffsregelung ferner verhindern, dass ein erzeugender Prozess auf die Datenstruktur zugreift, wenn die Aufnahmekapazität der Datenstruktur bereits ausgeschöpft ist.
Problemformulierung
Betrachtet wird ein System mit Prozessen, die sich entweder als Erzeuger oder als Verbraucher verhalten, und einer Datenstruktur, die von den Prozessen für die Kommunikation untereinander gemeinsam genutzt wird. Es gibt mindestens einen Erzeugerprozess und mindestens einen Verbraucherprozess im System. Erzeugerprozesse erzeugen irgendwelche Elemente und legen sie in der gemeinsamen Datenstruktur ab. Verbraucherprozesse entnehmen der Datenstruktur Elemente und verarbeiten sie. Die Datenstruktur kann unbeschränkt oder beschränkt viele Elemente aufnehmen. Sie kann bei (nahezu) unbeschränkter Kapazität als Liste oder Stapelspeicher und bei beschränkter Kapazität z. B. als Ringpuffer organisiert sein.
Eine Reihenfolgeregelung (Synchronisation) ist dann erforderlich, wenn
- Zugriffe auf die Datenstruktur kritische Abschnitte sind.
Legt ein Erzeugerprozess gerade ein Element in die Datenstruktur oder entfernt ein Verbraucherprozess gerade ein Element, so muss verhindert werden, dass ein anderer Erzeuger- oder Verbraucherprozess diesen Vorgang unterbricht um auch auf die Datenstruktur verändernd zuzugreifen. Andernfalls kann es zu einem inkonsistenten Zustand der Datenstruktur kommen. - ein Verbraucherprozess der Datenstruktur ein Element entnehmen will, obwohl die Datenstruktur keine Elemente enthält.
- die Datenstruktur eine beschränkte Aufnahmekapazität besitzt und ein Erzeugerprozess bei voll belegter Datenstruktur ein Element ablegen will.
Greift ein Verbraucherprozess auf eine leere Datenstruktur zu bzw. ein Erzeugerprozess auf eine voll belegte Datenstruktur, so sollen die Prozesse blockiert und wieder aufgeweckt werden, wenn sich der Zustand der Datenstruktur verändert hat (Prozesskooperation).
Eine Lösung unter Verwendung des Verbraucher-Erzeuger Musters (engl. producer–consumer pattern) ist nur dann sinnvoll, wenn entweder
- eine solche Abstraktions-Schicht systemisch bedingt notwendig ist. Beispielsweise als Sicherheits-Abstraktionsschicht, oder weil ein System-Wechsel (Hardware zu Software) vorliegt.
- die Anzahl von Verbrauchern und Erzeugern unterschiedlich bzw. unbekannt ist.
Der zweite Punkt lässt sich leicht erklären, wenn man beachtet, dass die Zeit die ein Verbraucher für einen Vorgang braucht, sich offensichtlich aus der Zeit des Verbrauchers und des Erzeugers (auf den der Verbraucher zwingend warten muss) für diesen Vorgang, so wie der Zeit für die Kommunikation über das Muster zusammensetzt als:
Existiert nun aber für jeden Verbraucher stets immer genau ein Produzent, so könnte der Verbraucher diesen beinhalten, also die „Produktion“ selbst vornehmen ohne dabei das Muster anzuwenden, da er ansonsten sowieso nur auf den Produzent warten müsste. Die Zeit für diese triviale Lösung beträgt aber nur:
Sie wäre also um geringer, eine Anwendung des Musters in diesem Fall also eine Verlangsamung.
Problemlösung
Abstrakt
Das Erzeuger-Verbraucher-Problem wird mit Mechanismen der Prozesssynchronisation gelöst. In den meisten Lösungsbeschreibungen werden Semaphore verwendet, da diese außer dem wechselseitigen Ausschluss bei der Ausführung kritischer Abschnitte auch die zuvor verlangte Kooperation zwischen Prozessen unterstützen.
Es werden folgende, von allen Prozessen gemeinsam genutzte Semaphore benötigt (aus Gründen der Allgemeingültigkeit werden für die Semaphoroperationen die Originalbezeichner P und V verwendet):
- ein binärer Semaphor mutex zum Schutz modifizierender Zugriffe auf die Datenstruktur.
Will ein Erzeuger- oder Verbraucherprozess P2 die Datenstruktur modifizieren, so zeigt er dies durch eine P-Operation des Semaphors an. Sollte gerade ein anderer Prozess P1 die Datenstruktur modifizieren, so wird der Prozess P2 blockiert, bis der Prozess P1 die V-Operation des Semaphors aufruft. - ein zählender Semaphor sem_read, mit dem für einen Lesezugriff die Verfügbarkeit von Elementen in der Datenstruktur erfasst wird.
Sollte sich kein Element in der Datenstruktur befinden, so bewirkt ein Aufruf der P-Operation des Semaphors die Blockade des aufrufenden Verbraucherprozesses. Ein Deblockieren des Prozesses erfolgt durch einen Erzeugerprozess, der die V-Operation des Semaphors aufruft. Enthält die Datenstruktur Elemente, so darf ein die P-Operation des Semaphors aufrufender Verbraucherprozess mit seinen Aktionen (d. h. mit der Entnahme eines Elements) fortfahren. - ein zählender Semaphor sem_write, mit dem für einen Schreibzugriff die verfügbare Aufnahmekapazität erfasst wird.
Sollte sich kein Ablageplatz in der Datenstruktur finden, so bewirkt ein Aufruf der P-Operation des Semaphors die Blockade des aufrufenden Erzeugerprozesses. Ein Deblockieren des Prozesses erfolgt durch einen Verbraucherprozess, der die V-Operation des Semaphors aufruft. Enthält die Datenstruktur Ablageplätze, so darf ein die P-Operation des Semaphors aufrufender Erzeugerprozess mit seinen Aktionen (d. h. mit der Ablage eines Elements) fortfahren.
// Aufnahmekapazität der Datenstruktur const N=4; // Deklaration der Semaphore semaphor mutex; semaphor sem_read; semaphor sem_write; // Initialisierung der Semaphore init (mutex, 1); // genau einem Prozess wird die Modifikation der Datenstruktur ermöglicht init (sem_read, 0); // kein Element befindet sich in der Datenstruktur init (sem_write, N); // es sind N freie Ablageplätze für Elemente vorhanden
// Erzeugerprozess while (1) { P (sem_write); // Zeige Ablageaktion an P (mutex); // Schütze die Datenstruktur vor Zugriffen anderer Prozesse write (element); // Schreibzugriff auf die Datenstruktur V (mutex); // Hebe den Datenstrukturschutz auf V (sem_read); // Informiere einen evtl. blockierten Verbraucherprozess über die Ablage eines Elements }
// Verbraucherprozess while (1) { P (sem_read); // Zeige Entnahmeaktion an P (mutex); // Schütze die Datenstruktur vor Zugriffen anderer Prozesse read (element); // Lesezugriff auf die Datenstruktur V (mutex); // Hebe den Datenstrukturschutz auf V (sem_write); // Informiere einen evtl. blockierten Erzeugerprozess über die Entnahme eines Elements }
Wenn die Modifikation der Datenstruktur keine kritische Aktion ist, dann kann auf den Semaphor mutex verzichtet werden. Wenn die Datenstruktur von unbeschränkter Kapazität ist, so wird der Semaphor sem_write nicht benötigt.
C++
Ab C++20 sind Semaphore Teil der Sprache. Mit C++ lässt sich die Lösung von Dijkstra sehr direkt wiedergeben. Der Puffer kann N Teile oder Elemente speichern. Das Semaphor "number_of_queuing_portions" zählt die gefüllten Plätze im Puffer, das Semaphor "number_of_empty_positions" zählt die leeren Plätze im Puffer und "buffer_manipulation" dient als Mutex für die Put- und Get-Operationen des Puffers. Wenn der Puffer voll ist, d. h. number_of_empty_positions ist null, wartet der Producer-Thread in der number_of_empty_positions.acquire() Operation. Wenn der Puffer leer ist, d. h. number_of_queuing_portions ist null, wartet der Verbraucher-Thread in der number_of_queuing_portions.acquire() Operation. Die release() Operationen geben die Semaphore frei. Als Nebeneffekt kann sich ein Thread von der wait Queue in die ready Queue bewegen. Die Operation acquire() verringert den Semaphorwert bis auf Null. Die Operation release() erhöht den Semaphorwert. Die Variable buffer_manipulation ist ein Mutex. Die Semaphore Fähigkeit des acquire() in einem Thread und des release() in einem anderen Thread wird hier nicht benötigt. Die Anweisung lock_guard() anstelle eines Paares von lock() und unlock() ist C++ RAII. Der lock_guard-Destruktor sorgt für die Freigabe der Sperre bei exception. Diese Lösung kann mehrere Consumer-Threads und/oder mehrere Producer-Threads verarbeiten.[1]
#include <thread>
#include <mutex>
#include <semaphore>
std::counting_semaphore<N> number_of_queuing_portions{0};
std::counting_semaphore<N> number_of_empty_positions{N};
std::mutex buffer_manipulation;
void producer() {
for(;;) {
Portion portion = produce_next_portion();
number_of_empty_positions.acquire();
{
std::lock_guard<std::mutex> g(buffer_manipulation);
add_portion_to_buffer(portion);
}
number_of_queuing_portions.release();
}
}
void consumer() {
for(;;) {
number_of_queuing_portions.acquire();
Portion portion;
{
std::lock_guard<std::mutex> g(buffer_manipulation);
portion = take_portion_from_buffer();
}
number_of_empty_positions.release();
process_portion_taken(portion);
}
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
}
Java
Die Programmiersprache Java stellt bereits vorgefertigte Klassen bereit um dieses Problem Thread-sicher zu lösen. Eine simple Implementation unter Verwendung einer LinkedBlockingQueue[2][3] wäre zum Beispiel in dieser Form möglich:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
public class ProducerConsumerWithBlockingQueue {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(3);
Thread producerThread = new Thread(() -> {
try {
for (int value = 1; ; value++) {
Thread.sleep(400);
System.out.println("Erzeugt " + value);
blockingQueue.put(value);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumerThread = new Thread(() -> {
try {
for (;;) {
int value = blockingQueue.take();
System.out.println("\t\tVerbraucht " + value);
Thread.sleep(ThreadLocalRandom.current().nextInt(200, 800));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
}
Die verwendete Queue sorgt hierbei automatisch für die Synchronisation zwischen Verbraucher und Erzeuger.
Es gibt viele Beispiele einer eigenen Implementierung des Verbraucher-Erzeuger-Musters in Java. Dieses threadsicher zu implementieren ist allerdings nicht trivial, da folgende Probleme behandelt werden müssen:
- Eine Lösung unter Verwendung der Sprachelemente
synchronized
sowiewait()
bzw.notify()
führt genau dann zu einem möglichen Deadlock, wenn ein Produzent mittelsnotify()
einen Verbraucher aufwecken will, dieser allerdings noch nicht wartet (wait()
noch nicht aufgerufen wurde)[4].
Zur Lösung dieses Problems können weitere Klassen wie Locks[5] verwendet werden. - Die LinkedBlockingQueue skaliert als Klasse des Concurrent-Packages deutlich besser mit steigender Anzahl Threads, als dies mit einer simplen
synchronized
Lösung möglich wäre, da, beim Zugriff auf vorhandene Produkte in der Liste, eine Nicht-blockierende Synchronisation verwendet wird[6].
Siehe auch
Einzelnachweise
- Dijkstra; 1965; EWD123 Cooperating sequential processes, Kapitel 4.3. The Bounded Buffer
- http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html
- Ioan Tinca, 2018, The Evolution of the Producer-Consumer Problem in Java
- http://www.mudraservices.com/waitnotify.html Wait/Notify Pitfalls
- http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Lock.html
- http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/package-summary.html#package_description Java Concurrent Package