Concurrency Patterns

This article explains some of the patterns related to concurrent problems. These patterns can be reused across multiple problems. Some of these patterns have been borrowed from the book Concurrency in Go and has been kept here for a quick reference.

So in this post I will add the templates related to concurrency problems and patterns related to that!

So, let’s start with what is Concurrency?

Concurrency is way to execute sections of code in an interleave manner. Concurrency is different from parallelism. To think about concurrency is to think that there is only one processor and code is running concurrently, where the cpu is available to more than one process concurrently, whereas in case of parallelism the section of code might be running in parallel in two different CPU’s. In that sense there can be parallelism with concurrency in multi-core processors.

In case of concurrency, basically we have a piece of code A for example which runs in CPU, and then it waits for some condition so it gets swapped out with another piece of code B for example to run in the CPU. So in a sense code A and code B are running concurrently interleaving over each other in the CPU. This is quite common to get more performance out of the CPU, where we don’t want the CPU not doing anything.

Now, even though concurrency is seen all throughout the OS processes. Developer needs to pay special attention, if the interleaving code is accessing the same memory which is mutable or changeable by the interleaving code. As the order in which the common memory can be changed or read by the two sections is undefined. If the sections code are just reading and not writing or the memory is completely different then it should not be much of an issue with the interleaving.

So to protect this common access of the memory is where all the extra effort is needed, so that the data is consistent as the code is running sequentially. And the common theme to do so is by doing:

  1. Use immutable state between the piece of code, so no one can change.
  2. If a change is needed guard the state, such that only one thread can change the state at a time.

Now to do this guarding of the memory, there are some primitives which are available to the programmers.

Mutex or Locks#

A mutex (mutual exclusion) or also commonly known as locks are there to protect the section of the code which needs to be sequentially accessed. What that means is only one thread will be able to get entry written between the locking and unlocking at a time, i.e. there won’t be any interleaving of the code written between lock and unlock for two threads.

Now there are different types of locks, like a simple lock which is same for read and write thread, or a RW specific lock which allows threads which are reading from the data structure when no thread is writing to the data structure is accessing the lock. The threads which need access will have to wait for the lock to be unlocked, before they can get access. So those threads waiting for the lock get unlock will go in blocked state, and one of the them will get rescheduled as soon as look will get unlocked.

In some language API’s these locks are also referred to as Mutex(), but it is the same concept. You lock or hold the mutex before accessing the critical section, once the access is done unlock or release the mutex.

Basic construct of such lock is:

mp = {}
m = Lock()
# uses m.acquire() to acquire the lock
with m:
  # access critical section or protected data structure access.
  val = mp[key]
# does m.release() to release the lock

In process mutex:

In process mutex are in lifecycle of the same process. Such mutex are faster when the lock is not taken or the lock is un-contended i.e. the lock is free for the thread to take. That is because there is no kernel transition in such cases, i.e. the language runtime can use it’s one reference count to know if the lock is taken or not, and if the lock is not taken then there is no rescheduling of the thread is needed which requires to notify to the os kernel. So this gives performance optimization, as most of the times the lock will be uncontended in good designed concurrent systems.

Cross process mutex:

Some languages also provide cross process mutex, which basically provides a for two process to share a lock across the process memory space. Although this is more of language feature, other ways to do that is to use files such as a lock file or use network API for such communications. For example golang doesn’t provide such features as a part of core sdk. Also language which provides it usually have these as part of kernel transition.

Conditional variables#

Another common construct which is used in conjunction with the mutex is conditional variables. A conditional variables is initialized with a mutex and is used communicate an event between two threads one waiting for a condition to occur and other notifying that condition has occurred. It is important to remember whenever the condition is waiting, it releases the mutex so that other thread can grab the mutex to change the data structure and notify, which in that case will grab the mutex back again.

A common example would be a thread waiting for the queue/stack to fill with at least one element and other thread notifying that the queue/stack has been filled after placing an element in the queue/stack. So two common methods for condition variables are wait which waits for the notification, and notify which notifies one of the waiting thread, there is also notify_all which notifies all the threads which are waiting, kind of like a broadcast.

A common way write such conditions are using with block and condition in python, when the code enters the with block mutex is taken and get released when waiting for the condition to get notified.

An example of such usage can be seen in the multi-producer, multi-consumer Queue implementation in python. Code. Below code is a concise bounded queue, where consumer using get is fetching the elem from queue and producer using put is adding elem to queue.

MAX_SIZE = 10
# Using deque as data structure for FIFO structure.
q = deque()
# mutex used to access the data structure of the queue.
mutex = Lock()
# Not full condition, should be taken before appending an element to the queue.
not_full = Condition(mutex)
# Not empty condition should be taken before popping an element from the queue.
not_empty = Condition(mutex)

# Note: It is important to use while rather than if in the condition check, due to spurious wait where cv can come out of wait even without and notify. In such cases, it better to go back and check for condition again so that thread doesn't come out of it spuriously.

# Consumer fetching from the queue using get

def get():
  with not_empty: # Using the condition mutex that the queue is not empty to consume from the queue.
    while(len(q) == 0): # Use while loop for spurious wake up
      not_empty.wait() # If the queue is actually empty wait for the not_empty to get notified.
    elem = q.popLeft() #
    not_full.notify() # Once the element has been removed from the queue, is not full anymore, so notify the producer waiting to add element.
    return elem

# Producer adding to the queue using put

def put(elem):
  with not_full: # Using the condition mutex, that the queue is not full to add an element
    while(len(q) == MAX_SIZE): # Use the while loop for spurious wake ups
      not_full.wait()  # If the queue is actually full, wait before a new element can be added
    q.append(elem) # Add a new element to the queue.
    not_empty.notify() # Notify one of the waiting consumer thread as the queue was empty that an element has been placed in the queue and the queue is not_empty anymore.

Semaphores#

Semaphore is one of the oldest way to synchronize between two concurrent running threads. Semaphores are atomic counters, which can be used to limit the number of access to a limited quantity of items.

For example let’s say an online library has 10 copies of a book of a famous author which can be borrowed. Suppose on a sudden day someone famous tweets about the book and now everyone wants to borrow the book. A semaphore can be used to limit the amount of users which can borrow copies of the book. Every time a user borrows or acquire an associated semaphore, the counter decreases by 1 and once the counter reaches to 0, all the other users wait for the book to available again. The book copy becomes available again when some user returns the book i.e. release the associated semaphore, and that unblocks one of the blocked user who can then acquire the semaphore.

So basically semaphore is there to guard limited usage of a resource. A typical example is DB connection from the limited set of connections. In that regards a mutex can be thought of a semaphore with count of 1, as it protecting only a single use of resource at a time. So semaphore is used, when we need multiple access of the resource but still limited in quantity.

Atomic operations#

Some Patterns using the constructs above:

Bounded FIFO Queue#

Multi Producer & Multi Consumer#

Thread Pool#

Object Pool#

Concurrent Bounded HashSet#

comments powered by Disqus