Bartosz Zieliński

Condition Variables, Part I

Introduction

In a previous post about threads we introduced locks from threading module in Python. Locks are a specialized mechanism used to protect a critical section. More general synchronization patterns require the use of other mechanisms such as semaphores, or condition variables discussed here.

Condition variables are a synchronization mechanism used, as the name suggests, to suspend the operation of a thread until a certain condition (usually concerning shared data) is met. A condition variable is always used in conjunction with a lock.

The problem solved by condition variables is as follows: An operation on shared data can only be executed when certain precondition involving the shared data is satisfied. Correct verification of precondition may require exclusive access to shared data, hence precondition like operation itself should be evaluated inside critical section. If the precondition is not met, the thread cannot simply wait for its satisfaction inside the critical section as this would prevent any change to shared data, and the thread would wait in vain, probably deadlocking the system. Instead the thread should exit the critical section, and attempt to enter it again when there is a chance that the precondition is satisfied. Of course this begs the question how the thread should know when to reenter the critical section since in order to observe any change to shared data it should be in the critical section already, and that is where condition variables come in.

We can illustrate this abstract discussion with concreate example: Suppose we have a shared integer variable N with an initial value of 0, and two kinds of threads:

Addition is unconditional. However, subtraction has a precondition which ensures that N never becomes negative. Of course, any access to N should happen only inside a critical section, hence we need a lock:

lock = threading.RLock()

So, how do we implement modifications of N in both kinds of threads? Addition is unproblematic: we simply add 1 to N in a critical section:

lock.acquire()
N = N + 1
lock.release()

It is less clear how to implement subtracting threads. For instance, the following naive attempt practically guarantees deadlock:

lock.acquire()
while N <= 0:
  pass
N = N - 1
lock.release()

The problem is that if N==0 when the above thread enters the critical section then the thread enters a loop which will exit only when another thread increments the value N. However, no other thread will have a chance to increment N (or enter the critical section at all) because the subtracting thread is stuck in the infinite loop inside the critical section. Clearly, the lock should be released on each turn of the loop, e.g.

lock.acquire()
while N <= 0:
  lock.release()
  time.sleep(0.1)
  lock.acquire()
N = N - 1
lock.release()

No deadlock this time, but the above solution is very inefficient. Thread sleeps for 0.1s after releasing the lock, hoping that some adding thread will use this time to enter the critical section and increment N. The problem is that 0.1s is completely arbitrary (too much?, not enough?, optimal? - we have no way of knowing) and it may well happen that even if some other thread increments N, still another thread may decrement it again before our thread wakes again, re-acquires the lock, and re-checks the precondition to find it still false. This, aside from being inefficient may easily lead to starvation. So it would be nice if there was a mechanism which allows a thread which just modified a shared data to notify threads waiting for some condition on the shared data to become true, that it is now a good time to try again. The role of such a mechanism is fulfilled by condition variables. Like locks, this is a general mechanism available in most languages and environments that support multi-threaded and multi-process programming. In Python, they are provided as objects of class Condition by module threading. Other environments provide equivalent APIs.

As remarked above, condition variables are always associated with a lock. A lock object (either Lock or RLock) can be passed to Condition’s constructor. Several condition variables may share the same lock, which is useful if various threads wait for different conditions associated with the same data. If you pass any lock to Condition’s constructor, a new private RLock will be created automatically and associated with the constructed condition variable.

Condition variables accept the following methods:

In addition, condition variables in Python are context managers. If cond is a condition variable then

with cond:
  #some code

calls cond.acquire() before entering the with block and cond.release() after exiting it.

You might wonder why we need both notify_all() and notify(). Sometimes executing some action in critical section invalidates its precondition, and sometimes it does not. In the first case it does not make sense to wake up all the waiting threads since only one of them will get executed. In the second case waking up all the waiting threads is usually the best course of action.

Both wait() and wait_for() methods accept an optional named argument called timeout (a float value is expected). If timeout is given and wait() or wait_for() do not re-acquire locks within timeout seconds then they return False. We will not use timeouts here.

Since Condition objects can pass acquire() and release() to their associated locks, usually it does not make sense to store references to lock objects associated to condition variables, or to use them directly. Using only condition variables simplifies code and makes using the wrong lock less likely.

To illustrate the above, let us now return to our example of threads incrementing and decrementing a shared integer variable. Assume that we have created a lock and condition objects as follows:

lock = RLock()
cond = Condition(lock)

Then the critical section of threads incrementing N by 1 can be implemented as follows:

lock.acquire()
N = N + 1
cond.notify()
lock.release()

This is the same as before, except that after incrementing N we notify one of the threads waiting for N to be bigger than 0. Note that we use notify() method and not notify_all() because after adding just 1 it doesn’t make sense to wake up more than one decrementing thread.

Decrementing threads have to wait using condition variable for N to become greater than 0:

lock.acquire()
while N <= 0: # we wait for N > 0
  cond.wait()
N = N - 1
lock.release()

We can significantly simplify the above code. First, we can get rid of explicit lock object and simply use the condition variable and the lock object automatically created by the constructor:

cond = Condition()

We can also utilize the fact that Condition objects are context managers, as well as use wait_for() instead of explicit while loop:

Threads incrementing N by 1:

with cond:
  N = N + 1
  cond.notify()

Threads conditionally decrementing N by 1:

with cond:
  cond.wait_for(lambda:N>0)
  N = N - 1

To actually see this code in action try the following script:

from threading import Thread, Condition
from time import sleep
from random import random, choice

cond = Condition()
N = 0

def inc(n, i):
    global N 
    with cond:
        print(f'Thread {n}, {i}\'th increment, N = {N}')
        N = N + 1 
        cond.notify()

def dec(n, i):
    global N
    with cond:
        print(f'Thread {n} tries {i}\'th decrement, N = {N}. Waiting ...')
        cond.wait_for(lambda:N>0)
        print(f'Thread {n}, {i}\'th decrement, N = {N}')
        N = N - 1

def get(n):
    f = choice([inc, dec])
    def h():
        for i in range(10):
            sleep(random())
            f(n,i)
    return h

threads = [Thread(target = get(n)) for n in range(10)]

for t in threads:
    t.start()

Function choice() from module random picks at random one element of a list passed as its argument. Above we use it to pick either incrementing or decrementing function. Note the use of a closure in the definition of get(n) which creates the main function of n-th thread.

The above script may occasionally deadlock if all incrementing threads already exited and only decrementing threads are active.

Producer-Consumer problem

We have a shared buffer in memory where you can put items in and retrieve them. Retrieving an item removes it from the buffer. There are two variants of the producer-consumer problem:

We also have two types of threads (or processes):

Of course, for producers and consumers to function properly, some obvious synchronization constraints must be met:

  1. To avoid buffer corruption, we should ensure that only one thread at a time modifies the shared buffer, and therefore buffer operations should be performed in the critical section.
  2. A consumer cannot fetch an item from an empty buffer. Instead, it should wait until some producer puts a new item in the buffer.
  3. When the buffer is bounded, a producer cannot store a new item in the full buffer. Instead, it should wait for some consumer to fetch some item from the buffer to free up the space.

Locks can be used to enforce the first constraint above. The other two synchronization constraints require the use of condition variables.

In the next subsection, we present a solution to the producer-consumer problem with an unbounded buffer (where we only have the first two constraints). Next, we will solve producer-consumer problem with bounded buffer (where we need to force all three constraints).

Producer-Consumer with unbounded buffer

First, we need to decide how to implement the shared buffer. FIFO queue is the usual choice. Python provides efficient FIFOs in the form of objects of the Queue class from the queue module. Unfortunately, Queue is already adapted to multi-threaded environment. In particular, Queue implements synchronization constraints described above, so it is not suitable for teaching implementation of those constraints.

Instead, we will use ordinary lists encapsulated within an object of a class that provides a queue interface. In Python, if x is a list, x.append(y) adds y to the end of the list, and x.pop(i) removes the item at position i from the list (and returns the value of that item). The encapsulating class (without synchronization) looks like this:

class MyQueue:
    def __init__(self):
        self._buf = []
    def enqueue(self, x):
        self._buf.append(x)
    def dequeue(self):
        return self._buf.pop(0)
    def __str__(self):
      return f'Queue<{self._buf}>'

Thus, the objects of this class store a list in a private field _buf. The list is initially empty. Calling enqueue(x) on a buffer adds x at the end of _buf. Calling dequeue() removes the first element and returns it. Note that this is a very inefficient implementation: unlike in many other languages, lists in Python are not linked structures but ordinary arrays instead. Thus, while referring to a list element by index can be executed in a (very small) constant time, any operation which changes list size will carry a much larger time penalty, sometimes requiring a complete copy of the array. It is not that bad all the time as, e.g., CPython will use overallocation to avoid frequent resizing, and dynamic arrays can sometimes be resized using realloc() without a full copy, but it is still worse than a linked structure.

The class above contains an additional method __str__(). This is the method which various output functions in Python (e.g., print(), interpolated strings, etc.) call on objects automatically to convert them to human-readable strings. In this case we display the contents of the underlying list _buf surrounded by Queue<...>.

Below, there is a full script testing the above implementation of MyQueue:

from threading import Thread
from time import sleep
from random import random

class MyQueue:
    def __init__(self):
        self._buf = []
    def enqueue(self, x):
        self._buf.append(x)
    def dequeue(self):
        return self._buf.pop(0)
    def __str__(self):
      return f'Queue<{self._buf}>'

class Producer(Thread):
    def __init__(self, queue, n):
        Thread.__init__(self)
        self.queue = queue
        self.n = n
    def run(self):
        for i in range(10):
            sleep(random())
            print(f'Producer {self.n}, iteration {i}, {self.queue}')
            self.queue.enqueue((self.n, i))

class Consumer(Thread):
    def __init__(self, queue, n):
        Thread.__init__(self)
        self.queue = queue
        self.n = n
    def run(self):
        for i in range(10):
            sleep(random())
            msg = self.queue.dequeue()
            print(f'Consumer {self.n}, iteration {i}, msg={msg}, {self.queue}')

queue = MyQueue()
threads = [Producer(queue, n) for n in range(10)]
threads += [Consumer(queue, n) for n in range(10)]

for t in threads:
    t.start()

Later we will reuse the same script, just modifying MyQueue class. The script creates 10 consumer and producer threads. Each producer thread adds to the queue a pair (self.n, i) consisting of a thread identifier (self.n) and an iteration index (i). In the line

self.queue.enqueue((self.n, i))

note an additional pair of parentheses: We are passing a single argument to enqueue() which is a pair. On the other hand,

self.queue.enqueue(self.n, i)

would have called enqueue() with two arguments, which would be wrong.

Each consumer thread consumes a single element from the buffer 10 times. Both consumer and producer threads display appropriate diangnostic messages (including state of the underlying message list). Also, both producers and consumers introduce artificial sleeping to force some interleaving of operations. Unfortunately, since consumer threads do not check if the queue is non-empty before executing dequeue() (and neither does the method itself before executing pop), the script above will almost always raise an exception. Clearly, we need some synchronization.

Obviously, we need a lock to protect operations on a buffer as a critical section. In actuality, both pop() and append() are thread-safe (or so I understand), but we have to use locks in order to use condition variables anyway.

Secondly, we need a condition variable associated with the aforementioned lock for consumers to wait until the queue becomes non-empty. A producer should notify at most a single consumer about added element (it makes no sense to notify more consumers, as the first one will consume this single added element). Assume that we have a condition variable cond. Then a general scheme for solving the producer-consumer problem with unbounded buffer is as follows:

Producer code:

# Produce x

with cond:
  # Put x in the shared buffer 
  cond.notify()

Consumer code:

with cond:
  cond.wait_for(lambda: Shared_Buffer_Is_Not_Empty)
  # Take the next element from the shared buffer 
  # and assign it to x

# consume x

Thus, we can add synchronization to our queue class as follows:

class MyQueue:
    def __init__(self):
        self._buf = []
        self.cond = Condition()
    def enqueue(self, x):
        with self.cond:
            self._buf.append(x)
            self.cond.notify()
    def dequeue(self):
        with self.cond:
            self.cond.wait_for(lambda:self._buf)
            return self._buf.pop(0)
    def __str__(self):
      return f'Queue<{self._buf}>'

Observe the line:

self.cond.wait_for(lambda:self._buf)

To understand what happens here, note that in Python, if you use a list where boolean value is expected then an empty list is treated as False, and non-empty list is treated as True. Thus, instead of using explicit conditions, like self._buf != [], we simply make the list speak for itself.

Producer-Consumer with bounded buffer

First let us present a general solution of producer-consumer problem with bounded buffer, and then we will describe bounded buffer implementation itself.

Because operations on shared buffer should be performed inside critical section we need a lock. Also, there are two separate preconditions:

This suggests using two separate condition variables (full for the first precondition, empty for the second). However, since we are working on the same shared data, both condition variables should share a lock:

lock = RLock()
full = Condition(lock)
empty = Condition(lock)

Then each producer and consumer should wait for the respective precondition to be satisfied, and upon completion, producer should notify at most one consumer that there is one more element in the buffer, and consumer should notify at most one producer that there is one more free place in the buffer. There is no point in informing more than one consumer waiting at the empty buffer about a single element added, and, similarly, there is no point in informing more than one producer waiting at a full buffer that there is one more free place. This leads to the following solution:

Producer code:

# produce x
with full:
  full.wait_for(lambda:Buffer_Is_Not_Full)
  # add x to buffer
  empty.notify()

Consumer code:

with empty:
  empty.wait_for(lambda:Buffer_Is_Not_Empty)
  # remove next element from 
  # the buffer and store it in x
  full.notify()
# consume x

Note that in both cases we are protecting critical section with the same lock (shared by both condition variables).

Now let us implement the shared buffer itself.

In this case we will implement our buffer as a cyclic buffer: We have a fixed size table _buf (in this case python list), and two indexes rpos and wpos which chase one another around the list (i.e., they are incremented modulo _buf’s size):

0 w 1 p o s c o 2 u n t = 3 4 , s 4 i z e r = 5 p 8 o s 6 7

We add new element at wpos and remove an existing element at rpos. Removed element is not physically deleted, but it can be overwriten by a new elements. Removing increments rpos by 1 modulo size, where size is a size of _buf. Adding increments wpos by 1 modulo size. Thus, wpos and rpos seem to go around the table. We should take care not to overwrite unread element or to read unwritten or already read element, and so indexes should never pass one another. Sometimes, however, they can overlap. Instead of taking into account all the possible cases, it is simpler just to keep track of the number count of elements stored in the queue, and increment or decrement count by 1 after each addition and removal, respectively.

This leads to the following implementation of our queueing class:

class MyQueue:
    def __init__(self,size):
        self.size = size
        self._buf = [None] * size
        self.wpos = 0
        self.rpos = 0
        self.count = 0
        lock = RLock()
        self.empty = Condition(lock)
        self.full = Condition(lock)
    def enqueue(self, x):
        with self.full:
            self.full.wait_for(lambda:self.count<self.size)
            self._buf[self.wpos] = x 
            self.count += 1
            self.wpos = (self.wpos + 1) % self.size
            self.empty.notify() 
    def dequeue(self):
        with self.empty:
            self.empty.wait_for(lambda:self.count>0)
            el = self._buf[self.rpos] 
            self.count -= 1
            self.rpos = (self.rpos + 1) % self.size
            self.full.notify() 
        return el   
    def __str__(self):
      return f'Queue<{self._buf}, count={self.count}, rpos={self.rpos}, wpos={self.wpos}>'