Condition Variables, Part I
Introduction
In a previous post about threads
we introduced locks from threading
module in Python.
Locks are a specialized mechanism used to protect a critical section. More general synchronization patterns require the use of other mechanisms such as semaphores, or condition variables discussed here.
Condition variables are a synchronization mechanism used, as the name suggests, to suspend the operation of a thread until a certain condition (usually concerning shared data) is met. A condition variable is always used in conjunction with a lock.
The problem solved by condition variables is as follows: An operation on shared data can only be executed when certain precondition involving the shared data is satisfied. Correct verification of precondition may require exclusive access to shared data, hence precondition like operation itself should be evaluated inside critical section. If the precondition is not met, the thread cannot simply wait for its satisfaction inside the critical section as this would prevent any change to shared data, and the thread would wait in vain, probably deadlocking the system. Instead the thread should exit the critical section, and attempt to enter it again when there is a chance that the precondition is satisfied. Of course this begs the question how the thread should know when to reenter the critical section since in order to observe any change to shared data it should be in the critical section already, and that is where condition variables come in.
We can illustrate this abstract discussion with concreate example:
Suppose we have a shared integer variable N
with an initial value of 0, and two kinds of threads:
- Threads adding 1 to
N
- Threads subtracting 1 from
N
ifN>0
Addition is unconditional. However, subtraction has a precondition which ensures that N
never becomes negative. Of course, any access to N
should happen only inside a critical section, hence we need a lock:
lock = threading.RLock()
So, how do we implement modifications of N
in both kinds of threads? Addition is unproblematic: we simply add 1 to N
in a critical section:
lock.acquire()
N = N + 1
lock.release()
It is less clear how to implement subtracting threads. For instance, the following naive attempt practically guarantees deadlock:
lock.acquire()
while N <= 0:
pass
N = N - 1
lock.release()
The problem is that if N==0
when the above thread enters the critical section
then the thread enters a loop which will exit only when another thread increments the value N
. However, no other thread will have a chance to increment N
(or enter the critical section at all) because the subtracting thread is stuck in the infinite loop inside the critical section.
Clearly, the lock should be released on each turn of the loop, e.g.
lock.acquire()
while N <= 0:
lock.release()
time.sleep(0.1)
lock.acquire()
N = N - 1
lock.release()
No deadlock this time, but the above solution is very inefficient.
Thread sleeps for 0.1s after releasing the lock, hoping that some adding thread will use this time to enter the critical section and increment N
. The problem is that 0.1s is completely arbitrary (too much?, not enough?, optimal? - we have no way of knowing) and it may well happen that even if some other thread increments N
, still another thread may decrement it again before our thread wakes again, re-acquires the lock, and re-checks the precondition to find it still false. This, aside from being inefficient may easily lead to starvation.
So it would be nice if there was a mechanism which allows a thread which just modified a shared data to notify threads waiting for some condition on the shared data to become true, that it is now a good time to try again. The role of such a mechanism is fulfilled by condition variables. Like locks, this is a general mechanism available in most languages and environments that support multi-threaded and multi-process programming. In Python, they are provided as objects of class Condition
by module threading
. Other environments provide equivalent APIs.
As remarked above, condition variables are always associated with a lock. A lock object (either Lock
or RLock
) can be passed to Condition
’s constructor. Several condition variables may share the same lock, which is useful if various threads wait for different conditions associated with the same data.
If you pass any lock to Condition
’s constructor, a new private RLock
will be created automatically and associated with the constructed condition variable.
Condition variables accept the following methods:
-
wait()
does the following things in sequence: it- releases the lock associated with the condition variable (if the calling thread does not have the lock, a
RuntimeError
exception is thrown), - suspends the calling thread until another thread calls the
notify()
ornotify_all()
on the same condition variable - wakes the calling thread and re-acquires the associated lock,
- returns
True
to the calling thread
To wait using condition variable
cond
for the satisfaction of a preconditionC
we can use the loopwhile not C: cond.wait()
Note that we still need to use the loop, as we have no guarantee that, after the thread re-acquires the lock, the precondiotion will be still satisfied (assuming it was satisfied at all when the thread was notified about the shared data change).
- releases the lock associated with the condition variable (if the calling thread does not have the lock, a
-
wait_for(pred)
, wherepred
is something which behaves like argumentless function returning a boolean value. This method allows us to save the effort of writing a loop when waiting for a condition. More precisely, ifcond
is a condition variable andpred
is a function returning boolean value, then the callcond.wait_for(pred)
is more or less equivalent to
while not pred(): cond.wait()
which is shorter. The rub is that now we need a function, and it might seem that we just traded a simple and readable loop for a weird method call accompanied by a function definition. Fortunately, we have lambdas: We can replace a loop from the previous point by
cond.wait_for(lambda:C)
-
notify()
wakes up one of the threads waiting under this condition variable. The calling thread must have the associated lock orRuntimeError
exception will be thrown. -
notify_all()
: calling this method wakes up all threads waiting under this condition variable. The calling thread must have the associated lock orRuntimeError
exception will be thrown. -
acquire()
andrelease()
callacquire()
andrelease()
, respectively, on the associated lock object.
In addition, condition variables in Python are context managers. If cond
is a condition variable then
with cond:
#some code
calls cond.acquire()
before entering the with
block and cond.release()
after exiting it.
You might wonder why we need both notify_all()
and notify()
. Sometimes executing some action in critical section invalidates its precondition, and sometimes it does not. In the first case it does not make sense to wake up all the waiting threads since only one of them will get executed. In the second case waking up all the waiting threads is usually the best course of action.
Both wait()
and wait_for()
methods accept an optional named argument called timeout
(a float value is expected). If timeout
is given and
wait()
or wait_for()
do not re-acquire locks within timeout
seconds then they
return False
. We will not use timeouts here.
Since Condition
objects can pass acquire()
and release()
to their associated locks, usually it does not make sense to store references to lock objects associated to condition variables, or to use them directly. Using only condition variables simplifies code and makes using the wrong lock less likely.
To illustrate the above, let us now return to our example of threads incrementing and decrementing a shared integer variable. Assume that we have created a lock and condition objects as follows:
lock = RLock()
cond = Condition(lock)
Then the critical section of threads incrementing N
by 1 can be implemented as follows:
lock.acquire()
N = N + 1
cond.notify()
lock.release()
This is the same as before, except that after incrementing N
we notify one of the threads waiting for N
to be bigger than 0.
Note that we use notify()
method and not notify_all()
because after adding just 1 it doesn’t make sense to wake up more than one decrementing thread.
Decrementing threads have to wait using condition variable for N
to become greater than 0:
lock.acquire()
while N <= 0: # we wait for N > 0
cond.wait()
N = N - 1
lock.release()
We can significantly simplify the above code. First, we can get rid of explicit lock object and simply use the condition variable and the lock object automatically created by the constructor:
cond = Condition()
We can also utilize the fact that Condition
objects are context managers, as well as use wait_for()
instead of explicit while loop:
Threads incrementing N
by 1:
with cond:
N = N + 1
cond.notify()
Threads conditionally decrementing N
by 1:
with cond:
cond.wait_for(lambda:N>0)
N = N - 1
To actually see this code in action try the following script:
from threading import Thread, Condition
from time import sleep
from random import random, choice
cond = Condition()
N = 0
def inc(n, i):
global N
with cond:
print(f'Thread {n}, {i}\'th increment, N = {N}')
N = N + 1
cond.notify()
def dec(n, i):
global N
with cond:
print(f'Thread {n} tries {i}\'th decrement, N = {N}. Waiting ...')
cond.wait_for(lambda:N>0)
print(f'Thread {n}, {i}\'th decrement, N = {N}')
N = N - 1
def get(n):
f = choice([inc, dec])
def h():
for i in range(10):
sleep(random())
f(n,i)
return h
threads = [Thread(target = get(n)) for n in range(10)]
for t in threads:
t.start()
Function choice()
from module random
picks at random one element of a list passed as its argument. Above we use it to pick either incrementing or decrementing function. Note the use of a closure in the definition of get(n)
which creates the main function of n
-th thread.
The above script may occasionally deadlock if all incrementing threads already exited and only decrementing threads are active.
Producer-Consumer problem
We have a shared buffer in memory where you can put items in and retrieve them. Retrieving an item removes it from the buffer. There are two variants of the producer-consumer problem:
- with unbounded buffer that can hold any number of elements
- and with a bounded buffer that allows you to place only a finite number of elements. When the buffer is full, an element must be removed from it before a new one can be inserted.
We also have two types of threads (or processes):
- Producers who produce the data and put it as items in the buffer
- Consumers that take items from the buffer and consume them.
Of course, for producers and consumers to function properly, some obvious synchronization constraints must be met:
- To avoid buffer corruption, we should ensure that only one thread at a time modifies the shared buffer, and therefore buffer operations should be performed in the critical section.
- A consumer cannot fetch an item from an empty buffer. Instead, it should wait until some producer puts a new item in the buffer.
- When the buffer is bounded, a producer cannot store a new item in the full buffer. Instead, it should wait for some consumer to fetch some item from the buffer to free up the space.
Locks can be used to enforce the first constraint above. The other two synchronization constraints require the use of condition variables.
In the next subsection, we present a solution to the producer-consumer problem with an unbounded buffer (where we only have the first two constraints). Next, we will solve producer-consumer problem with bounded buffer (where we need to force all three constraints).
Producer-Consumer with unbounded buffer
First, we need to decide how to implement the shared buffer.
FIFO queue is the usual choice.
Python provides efficient FIFOs in the form of objects of the Queue
class from the queue
module. Unfortunately, Queue
is already adapted to multi-threaded environment. In particular, Queue
implements synchronization constraints described above, so it is not suitable for teaching implementation of those constraints.
Instead, we will use ordinary lists encapsulated within an object of a class that provides a queue interface. In Python, if x is a list, x.append(y) adds y to the end of the list, and x.pop(i) removes the item at position i from the list (and returns the value of that item). The encapsulating class (without synchronization) looks like this:
class MyQueue:
def __init__(self):
self._buf = []
def enqueue(self, x):
self._buf.append(x)
def dequeue(self):
return self._buf.pop(0)
def __str__(self):
return f'Queue<{self._buf}>'
Thus, the objects of this class store a list in a private field _buf
. The list is initially empty. Calling enqueue(x)
on a buffer adds x
at the end of _buf
. Calling dequeue()
removes the first element and returns it. Note that this is a very inefficient implementation: unlike in many other languages, lists in Python are not linked structures but ordinary arrays instead. Thus, while referring to a list element by index can be executed in a (very small) constant time, any operation which changes list size will carry a much larger time penalty, sometimes requiring a complete copy of the array. It is not that bad all the time as, e.g., CPython will use overallocation to avoid frequent resizing, and dynamic arrays can sometimes be resized using realloc()
without a full copy, but it is still worse than a linked structure.
The class above contains an additional method __str__()
. This is the method which various output functions in Python (e.g., print()
, interpolated strings, etc.) call on objects automatically to convert them to human-readable strings. In this case we display the contents of the underlying list _buf
surrounded by Queue<...>
.
Below, there is a full script testing the above implementation of MyQueue
:
from threading import Thread
from time import sleep
from random import random
class MyQueue:
def __init__(self):
self._buf = []
def enqueue(self, x):
self._buf.append(x)
def dequeue(self):
return self._buf.pop(0)
def __str__(self):
return f'Queue<{self._buf}>'
class Producer(Thread):
def __init__(self, queue, n):
Thread.__init__(self)
self.queue = queue
self.n = n
def run(self):
for i in range(10):
sleep(random())
print(f'Producer {self.n}, iteration {i}, {self.queue}')
self.queue.enqueue((self.n, i))
class Consumer(Thread):
def __init__(self, queue, n):
Thread.__init__(self)
self.queue = queue
self.n = n
def run(self):
for i in range(10):
sleep(random())
msg = self.queue.dequeue()
print(f'Consumer {self.n}, iteration {i}, msg={msg}, {self.queue}')
queue = MyQueue()
threads = [Producer(queue, n) for n in range(10)]
threads += [Consumer(queue, n) for n in range(10)]
for t in threads:
t.start()
Later we will reuse the same script, just modifying MyQueue
class.
The script creates 10 consumer and producer threads. Each producer thread adds to the queue a pair (self.n, i)
consisting of a thread identifier (self.n
) and an iteration index (i
). In the line
self.queue.enqueue((self.n, i))
note an additional pair of parentheses: We are passing a single argument to
enqueue()
which is a pair. On the other hand,
self.queue.enqueue(self.n, i)
would have called enqueue()
with two arguments, which would be wrong.
Each consumer thread consumes a single element from the buffer 10 times.
Both consumer and producer threads display appropriate diangnostic messages (including state of the underlying message list). Also, both producers and consumers introduce artificial sleeping to force some interleaving of operations. Unfortunately, since consumer threads do not check if the queue is non-empty before executing dequeue()
(and neither does the method itself before executing pop
), the script above will almost always raise an exception. Clearly, we need some synchronization.
Obviously, we need a lock to protect operations on a buffer as a critical section. In actuality, both pop()
and append()
are thread-safe (or so I understand), but we have to use locks in order to use condition variables anyway.
Secondly, we need a condition variable associated with the aforementioned lock for consumers to wait until
the queue becomes non-empty. A producer should notify at most a single consumer about added element (it makes no sense to notify more consumers, as the first one will consume this single added element). Assume that we have a condition variable cond
.
Then a general scheme for solving the producer-consumer problem with unbounded buffer is as follows:
Producer code:
# Produce x
with cond:
# Put x in the shared buffer
cond.notify()
Consumer code:
with cond:
cond.wait_for(lambda: Shared_Buffer_Is_Not_Empty)
# Take the next element from the shared buffer
# and assign it to x
# consume x
Thus, we can add synchronization to our queue class as follows:
class MyQueue:
def __init__(self):
self._buf = []
self.cond = Condition()
def enqueue(self, x):
with self.cond:
self._buf.append(x)
self.cond.notify()
def dequeue(self):
with self.cond:
self.cond.wait_for(lambda:self._buf)
return self._buf.pop(0)
def __str__(self):
return f'Queue<{self._buf}>'
Observe the line:
self.cond.wait_for(lambda:self._buf)
To understand what happens here, note that in Python, if you use a list where boolean value is expected then an empty list is treated as False
, and non-empty list is treated as
True
. Thus, instead of using explicit conditions, like self._buf != []
, we simply make the list speak for itself.
Producer-Consumer with bounded buffer
First let us present a general solution of producer-consumer problem with bounded buffer, and then we will describe bounded buffer implementation itself.
Because operations on shared buffer should be performed inside critical section we need a lock. Also, there are two separate preconditions:
- producer can place data in the shared buffer only if the latter is not full,
- consumer can consume data from the shared buffer only if the latter is not empty.
This suggests using two separate condition variables (full
for the first precondition, empty
for the second). However, since we are working on the same shared data, both condition variables should share a lock:
lock = RLock()
full = Condition(lock)
empty = Condition(lock)
Then each producer and consumer should wait for the respective precondition to be satisfied, and upon completion, producer should notify at most one consumer that there is one more element in the buffer, and consumer should notify at most one producer that there is one more free place in the buffer. There is no point in informing more than one consumer waiting at the empty buffer about a single element added, and, similarly, there is no point in informing more than one producer waiting at a full buffer that there is one more free place. This leads to the following solution:
Producer code:
# produce x
with full:
full.wait_for(lambda:Buffer_Is_Not_Full)
# add x to buffer
empty.notify()
Consumer code:
with empty:
empty.wait_for(lambda:Buffer_Is_Not_Empty)
# remove next element from
# the buffer and store it in x
full.notify()
# consume x
Note that in both cases we are protecting critical section with the same lock (shared by both condition variables).
Now let us implement the shared buffer itself.
In this case we will implement our buffer as a cyclic buffer: We have a fixed size table _buf
(in this case python list), and two indexes rpos
and wpos
which chase one another around the list (i.e., they are incremented modulo _buf
’s size):
We add new element at wpos
and remove an existing element at rpos
. Removed element is not physically deleted, but it can be overwriten by a new elements. Removing increments rpos
by 1 modulo size
, where size
is a size of _buf
. Adding
increments wpos
by 1 modulo size
. Thus, wpos
and rpos
seem to go around the table.
We should take care not to overwrite unread element or to read unwritten or already read element, and so indexes should never pass one another. Sometimes, however, they can overlap. Instead of taking into account all the possible cases, it is simpler just to keep track of the number count
of elements stored in the queue, and increment or decrement count
by 1 after each addition and removal, respectively.
This leads to the following implementation of our queueing class:
class MyQueue:
def __init__(self,size):
self.size = size
self._buf = [None] * size
self.wpos = 0
self.rpos = 0
self.count = 0
lock = RLock()
self.empty = Condition(lock)
self.full = Condition(lock)
def enqueue(self, x):
with self.full:
self.full.wait_for(lambda:self.count<self.size)
self._buf[self.wpos] = x
self.count += 1
self.wpos = (self.wpos + 1) % self.size
self.empty.notify()
def dequeue(self):
with self.empty:
self.empty.wait_for(lambda:self.count>0)
el = self._buf[self.rpos]
self.count -= 1
self.rpos = (self.rpos + 1) % self.size
self.full.notify()
return el
def __str__(self):
return f'Queue<{self._buf}, count={self.count}, rpos={self.rpos}, wpos={self.wpos}>'