Bartosz Zieliński

Condition Variables, Part II

In a previous post about condition variables we introduced condition variables in Python, and we have shown how can they be used to solve producer-consumer problem. In this post we will tackle the problem of synchronizing threads so that they all reach certain milestone in the code before proceeding further, i.e., we will show how to implement barriers using condtion variables. Before that, we introduce gates: very useful synchronization primitives, easily implementable using condition variables. Here we use gates to implement barriers.

I learned about barriers from The Little Book of Semaphores by Allen B. Downey (of course the book uses semaphores and here I use condition variables).

Gates

A gate (sometimes also called turnstile) is a synchronization object which can be either open or closed. Threads can simply pass through an open gate, but they have to sleep at the closed one, while waiting for the gate to open. More specifically, gates should provide three argumentless methods:

Gates are usually not among standard synchronization objects provided by libraries. However, they can be easily implemented using condition variables. For example, the following Python class implements gate interface:

class Gate:
    def __init__(self, is_open = False):
        self.cond = Condition()
        self.is_open = is_open
    def open(self):
        with self.cond:
            self.is_open = True
            self.cond.notify_all()
    def close(self):
        with self.cond:
            self.is_open = False
    def tryPass(self):
        with self.cond:
            self.cond.wait_for(lambda:self.is_open)

Thus, objects of this class have two fields: the boolean field is_open, true if the gate is open and false otherwise, and condition variable cond. Initial state of the gate is passed as an optional argument to the constructor (False by default, i.e., the gate is created closed by default). All methods are executed within a critical section protected by a lock associated with cond:

Barriers

We will now use gates to construct various kinds of barriers.

Barriers for n-processes

Suppose we have n threads, each executing a program of the form:

graph TD A(Part A of thread i) --> B(Part B of thread i)

As a concrete illustration consider the following program, where parts A and B just print respective messages on the console. As usual, we have also thrown in some random time delays for better effect:

from threading import Thread
from time import sleep
from random import random

class MyThread(Thread):
    def __init__(self, m):
        Thread.__init__(self)
        self.m = m
    def run(self):
        sleep(random())
        print('Thread', self.m, 'executed A')
        sleep(random())
        print('Thread', self.m, 'executed B')

n_threads = 10
for m in range(n_threads):
    MyThread(m).start()

Suppose that for some reason we want parts A of all threads to finish execution before any of the threads starts its B part. To ensure that, we need some additional sychronization between threads (the unconvinced reader may run the above script as a demonstration). The obvious idea is to use a gate object described in the previous section, initially closed:

graph TD A(Part A of thread i) --> C[Gate] C --> B(Part B of thread i)

A closed gate will not allow threads to go to part B. It only remains to find a way to open the gate after all n threads have finished their A-parts. The solution does not require any central authority or separate control thread. We need a shared integer variable m, initialized to the number of threads. Each thread decrements m after finishing its A part. When m==0 we know that all n threads decremented m, and, therefore, also executed A. Hence, the thread which sees m==0 can open the gate:

graph TD A(Part A of thread i) --> D[m -= 1, if now m == 0 open gate] D --> C[Gate] C --> B(Part B of thread i)

Counting from n to 0 is more convenient than counting from 0 to n because you do not have to remember n.

We encapsulate the barrier code inside a class with the following definition:

class Barrier:
    def __init__(self, n):
        self.lock = RLock()
        self.gate = Gate()
        self.m = n
    def tryPass(self):
        with self.lock:
            self.m -= 1
            if self.m == 0:
                self.gate.open()
        self.gate.tryPass()

Barrier’s constructor requires the number of threads as an argument. The constructor creates a Gate object and a lock object, the latter to protect the body of tryPass() method. The tryPass() method decrements the counter (self.m) by 1, and, if the resulting value is 0, it opens the gate. Then, outside critical section, it calls tryPass() on its gate.

Now we can modify the MyThread class from the example script at the beginning of this section by creating a global Barrier object (barrier) and placing barrier.tryPass() between parts A and B of our threads (recall that n_threads is the number of threads):

barrier = Barrier(n_threads)

class MyThread(Thread):
    def __init__(self, m):
        Thread.__init__(self)
        self.m = m
    def run(self):
        sleep(random())
        print('Thread', self.m, 'executed A')
        barrier.tryPass()
        sleep(random())
        print('Thread', self.m, 'executed B')

If we now execute the modified script (do not forget to include definitions of Gate and Barrier classes and to import Condition and RLock from threading), we will see that parts A of all the threads execute first (in some random order), and only then parts B.

Reusable barriers

Suppose we have n threads, each executing a program of the form:

graph TD A(Part A of thread i) --> B(Part B of thread i) B --> A

i.e. part A is performed in a loop alternating with part B. As an example consider the following python program where parts A and B just print the respective messages on the console. As usual, we make threads sleep for a random time between the messages to make interleaving more probable:

from threading import Thread
from time import sleep
from random import random

class MyThread(Thread):
    def __init__(self, m):
        Thread.__init__(self)
        self.m = m
    def run(self):
        for i in range(5):
            sleep(random())
            print(f'Thread {self.m} iteration {i} executed A')
            sleep(random())
            print(f'Thread {self.m} iteration {i} executed B')

threads = [MyThread(m) for m in range(10)]
for t in threads:
    t.start()

Suppose now that for some reason we want

It seems natural to use two gates between parts A and B:

graph TD A(Part A of thread i) --> b1[Gate 1] b1 --> B(Part B of thread i) B --> b2[Gate 2] b2 --> A

Initially gate 1 is closed and gate 2 is open. When threads finish their A parts in the first iteration, we first close gate 2 and then we open gate 1. The threads can now execute part B, but none of them will jump to part A in the second iteration. When all threads have finished part B, we first close gate 1 and then open gate 2, allowing execution of part A in the second iteration but not the start of part B. We continue in the same way for subsequent iterations. As you can see, the pair of gates 1 and 2 acts as a cyclic airlock.

It remains to figure out how to open and close the gates. As in the previous section, we will count threads, where the nth thread opens and closes the appropriate gates. Instead of two separate counters for each gate we will use a single counter m which counts from n to 0 at the first gate, and from 0 to n at the second gate, as in the diagram below (initially m == n):

graph TD A(Part A of thread i) --> c1[m -= 1, if m == 0
then close gate 2
and open gate 1] c1 --> b1[Gate 1] b1 --> B(Part B of thread i) B --> c2[m += 1, if m == n
then close gate 1
and open gate 2] c2 --> b2(Gate 2) b2 --> A

Accordingly, we create a ReusableBarrier class encapsulating our reusable barrier:

class ReusableBarrier:
    def __init__(self,  n):
        self.gate1 = Gate(False)
        self.gate2 = Gate(True)
        self.n = n 
        self.m = n 
        self.lock = RLock()
    def tryPass1(self):
        with self.lock:
            self.m -= 1 
            if self.m == 0:
                self.gate2.close()
                self.gate1.open()
        self.gate1.tryPass()
    def tryPass2(self):
        with self.lock:
            self.m += 1 
            if self.m == self.n:
                self.gate1.close()
                self.gate2.open()
        self.gate2.tryPass()

Now we can modify the MyThread class from the example script at the beginning of this section:

barrier = ReusableBarrier(n_threads)

class MyThread(Thread):
    def __init__(self, m):
        Thread.__init__(self)
        self.m = m
    def run(self):
        for i in range(5):
            sleep(random())
            print(f'Thread {self.m} iteration {i} executed A')
            barrier.tryPass1()
            sleep(random())
            print(f'Thread {self.m} iteration {i} executed B')
            barrier.tryPass2()

First, we create a global, shared ReusableBarrier object (barrier). Recall that n_threads is the number of threads. Then we place a call to tryPass1() between part A and B inside a loop, and also a call to tryPass2() after part B and before starting new iteration.

When running the script do not forget to include definitions of Gate and ReusableBarrier classes, and to import Condition and RLock from module threading.