Priority Queues

Priority Queues also implemented using Heap are great tools to solve some of the problems. These are data structures which are used to maintain the order (increasing/decreasing) of the items in data structure. A property of heaps is that you can get the top most (i.e. minimum or maximum) item in all the items in O(1) operation. More theory

This data structure is very useful when there is a stream of items and sorting cannot be performed as all items are not seen, at any point when running over the stream, algorithm needs the maximum or minimum item in the items seen so far. Basically in problems, where running minimum or maximum needs to be maintained.

Priority Queues are implemented using a heap which are complete binary tree, but usually this representation of the tree is done in the form of an arrays.

Heaps#

In this article I will discuss some of the problems related to heaps and will see some of the patterns related for these problems. Also I will be using heapq library in python to solve these problems.

Some notable functions in the library.

nums = [4, 6, 5, 3, 2, 9, 1]

# Create a min heap from an array of numbers.
# Creates a min heap, with smallest number on the top i.e. on hp[0]. Note: largest number is not at the last position hp[-1]
# Note: to create maxHeap negate all the input before entering.
# Complexity: O(n)
hp =  heapq.heapify(nums)

# https://docs.python.org/3/library/heapq.html#heapq.heappush
# Insert into the heap; Pushes an item to the heap, if smallest, it would be at top of the heap.
# Complexity: O(logn)
# hp[0] => 1
heapq.heappush(hp, 0)
# hp[0] => 0

# https://docs.python.org/3/library/heapq.html#heapq.heappop
# Pop the top element from the heap. in case of min heap will be smallest, and in case of maxHeap will be negative of largest.
# Complexity O(logn)
# elem = 0
elem = heapq.heappop(hp)

# https://docs.python.org/3/library/heapq.html#heapq.heappushpop
# Push and Pop in one operation, according to the docs, this operation is more efficient than separate push and pop. Also this doesn't affect the size of the heap. The push happens first, so in a min heap the smaller element will be popped out between the top and new added element.
heapq.heappushpop(hp, val)

This problem I already discussed in Quick Select. Now will discuss this problem using heap, another way to solve this problem, without actual sorting.

Idea is to use the heapify method on the array to create the heap and then pop out k - 1 elements from the heap to get the k largest element.

def findKthLargest(self, nums: List[int], k: int) -> int:
    # If need to get largest element or 1st largest, that would by first element of list in decreasing order.
    # So change numbers to negative to get largest on the top i.e. make max heap rather.
    hp = [-x for x in nums]
    # Heapify to get the number on the top
    heapq.heapify(hp)
    # For getting the largest i.e. 1st largest, don't remove any elements.
    # For second largest, remove largest. i.e. 1 element from top of the heap.
    # For third largest, remove 2 element from top of the heap.
    # For kth largest, remove k - 1 elements from the top of the heap.
    for i in range(k - 1):
        # Pop the top k - 1 elements, as index starts from 0
        heapq.heappop(hp)
    # return the top element after changing the sign back again.
    return -1 * hp[0]

Time Complexity of the above algorithm will be O(klogn), as heappop() is done k times, which will require rebalancing of the heap to get the max element on the top of the heap. This heappop() will take O(logn) for each operation. In worst case it can be seen as O(nlogn) which is no better than sorting the array and finding the kth element.

So, we see time wise Quick Select method is much better which runs O(n). In the above problem, the array nums is given as input, but let’s say if a stream of numbers was given rather, in such case the quick select won’t work and using heap we can do the similar operation as above.

Let’s look at the problem at the same problem finding Kth largest from a data stream.

The idea here like in the previous, to get the kth largest, the heap size is k with all the k largest elements in the heap. i.e. we need to throw all the elements which are smaller than the kth largest element. This can be done, by keeping k elements in the min heap. As soon as we get an element which is greater than the top of the heap, now we can pop and push the new element to the heap. For all smaller elements than the kth largest element can be ignored.

class KthLargest:

    def __init__(self, k: int, nums: List[int]):
        self.hp = nums
        self.k = k
        heapq.heapify(self.hp)
        # Remove all smaller elements from heap than the kth largest.
        while len(self.hp) > self.k:
            heapq.heappop(self.hp)

    def add(self, val: int) -> int:
        # Once there are k elements in the heap, all the elements smaller than kth largest can be ignored.
        if len(self.hp) == self.k and val < self.hp[0]:
            return self.hp[0]

        # If there are less than k elements in heap, then add till k elements in heap
        if len(self.hp) < self.k:
            heapq.heappush(self.hp, val)
        else:
        # Once k elements are reached in heap and the value from stream is greater than kth largest element, then push and pop from the heap to get the new kth largest element.
            heapq.heappushpop(self.hp, val)
        return self.hp[0]

Similar problem can be extended to find a median in the stream of data using the heap. Quick select method can be used to find the median in a list of numbers.

Idea in this problem is to use two heaps, maxHeap on left and minHeap on right and divide the numbers equally between the two heaps. The median will be either the mean of the two heaps, or will be present in the one which is of greater size.

class MedianFinder:

   def __init__(self):
       # maxHeap contains all the numbers less than or equal to the median.
       # minHeap contains all the numbers greater than or equal to the median.
       # size of maxHeap and minHeap should be same, in which case the median will be the mean of the top of the heaps.
       # or the size will have a difference of 1 in which case, the bigger size will have median.
       self.minHp = []
       # Remember: maxHeap is a min heap with negated input.
       self.maxHp = []
       self.count = 0

   def addNum(self, num: int) -> None:
       # Operation 1: Insert the new number in the heap.
       # If the num is smaller or equal than top of maxHeap or maxHeap empty, put it in maxHeap, else put it in minHeap.
       # Remember: maxHeap, numbers should be negative, to give corret results
       # Complexity : O(logn) => Time to add a number to the heap of size (n/2) or rebalance the heap.
       self.count += 1
       if len(self.maxHp) == 0 or num <= -1 * self.maxHp[0]:
           heapq.heappush(self.maxHp, -1 * num)
       else:
           heapq.heappush(self.minHp, num)

       # Operation 2: Rebalance heaps
       # If the abs(len(maxHeap) - len(minHeap)) <= 1, no rebalance is needed.
       # else:
       # if len(maxHeap) > len(minHeap) ==> minHeap.push(maxHeap.pop())
       # else maxHeap.push(minHeap.pop())

       if abs(len(self.maxHp) - len(self.minHp)) <= 1:
           return
       if len(self.maxHp) > len(self.minHp):
           heapq.heappush(self.minHp, -1 * heapq.heappop(self.maxHp))
       else:
           heapq.heappush(self.maxHp, -1 * heapq.heappop(self.minHp))


   def findMedian(self) -> float:
       # Complexity: O(1) => Only takes constant time to access the top of the heap element.
       if self.count == 0:
           return 0

       if len(self.minHp) == len(self.maxHp):
           return (-1 * self.maxHp[0] + self.minHp[0])/2

       if len(self.minHp) > len(self.maxHp):
           return self.minHp[0]
       else:
           return -1 * self.maxHp[0]

Follow Ups:

  • If all integer numbers from the stream are in the range [0, 100], how would you optimize your solution?

In such a case, heap is not needed to store all the numbers, rather we case use a frequency table with the count of the numbers. In FindMedian, a prefix count sum array can be created with the elements in frequency table, and do the binary search of 100 elements to find the total count half.

  • If 99% of all integer numbers from the stream are in the range [0, 100], how would you optimize your solution?

How much accuracy of the median is critical here. Let’s say if the 99% numbers are between 0-100, so out of 100 numbers if 1 number is out is greater than 100, we add those numbers to 101 th bucket to keep track of all numbers > 100. The median should not be impacted much if 1% lies in the bucket 101.

Priority Queues#

This article on priority queue, describes really well how to build priority queues using heaps, how to delete items in the heap and update the priority by deleting the item and adding new item with new priority. This can be used be used to schedule tasks in the operating system based on the priority of the task. Using a max heap the task with the highest priority can be fetched from the pool of tasks in constant time.

Next look at some of the problems related to scheduling of tasks.

This is an optimization problem, to minimize the amount of time taken, we will use a greedy approach, where we will always first run the task which needs to be run the max:

To the implement the task scheduler, we need to run the item on top of the PQ on the CPU and then wait for the cool down period in another queue before adding it back to the PQ i.e. heap.

def leastInterval(self, tasks: List[str], n: int) -> int:
     # Count the frequency of each Task.
     # Add each type of task in a max heap with (-freq, priority, type)
     # Run the CPU clock till heap is empty or cooling tasks are empty
     # For each clock time take the task with highest freq is at the top of the PQ and run them.
     # Once the task is run, add the task to the coolingTasks queue, if more of same task need to run:
     # If the -freq is greater > 1, decreament by 1 and insert it into the coolingTasks queue with priority as cpuTime + cooldownTime.
     # Check if there are any items in the coolingTasks queue which can be added if there CPU time is reached.
     # Add such tasks back to the PQ

     # Time Complexity: O(logn)(heap operation) * number times the CPU will run to complete all task(coolDown(k) * max(frequency of task => worst case=>n))
     # Complexity: O(nklogn)
    # Create PQ
    taskCount = Counter(tasks)
    hp = []
    for i, (task, freq) in enumerate(taskCount.most_common()):
        hp.append((-freq, 0, task))
    heapq.heapify(hp)
    # Create cooling down queue.
    coolingTasks = deque()
    # Record all task list
    taskList = []
    cpuTime = 1
    # Run CPU till PQ is empty
    while len(hp) > 0 or len(coolingTasks) > 0:
        # print(cpuTime, hp)
        # Task at the top of the heap, waitTime has expired.
        if len(hp) > 0:
            (freq, lastRunTime, task) = heapq.heappop(hp)
            taskList.append(task)
            if -freq > 1:
                # Once run, add the task to the cooling down queue for cpuTime + n delay
                coolingTasks.append((freq + 1, cpuTime + n, task))
        else:
            taskList.append('idle')

        # Push the cooling down tasks back to the heap, if there wait period is reached.
        while coolingTasks and coolingTasks[0][1] == cpuTime:
            heapq.heappush(hp, coolingTasks.popleft())
        cpuTime += 1

    # print(taskList)
    return cpuTime - 1
comments powered by Disqus