Hackerrank Heaps: Find the Running Median

Given in input a stream of integers, calculate their running median. As the name of this HackerRank problem suggests, we should find a solution based on heaps data structures.

However, it is difficult to avoid the temptation of trying a different approach. Since I am using Python 3 as implementation language, the insort() function from the bisect library, makes the naive solution of sorting the list of data each time a new element arrives less expensive, to the point that is accepted by HackerRank, even though is far from optimal.


Here is a possible implementation:
for i in range(n):  # 1
    item = int(input().strip())
    insort(data, item)  # 2
    pos = len(data) // 2  # 3
    result = float(data[pos])
    if len(data) % 2 == 0:
        result += data[pos - 1]
        result /= 2
1. I have previously read in n the number of elements that I should expect in input.
2. I insert in the list data each value I get using insort() so, keeping the list sorted. The cost of finding the right position in the list is logarithmic, hence the name bisect of the library, then we have to pay a linear cost to move to the right all the elements bigger than the current one. Still a Big-Oh n*log(n) algorithm, however, on average cheaper than sorting data each time, if we don't use an algorithm that knows it is dealing with a quasi-sorted collection.
3. Than we calculate the median, peaking the central element and, if the current size of the data list is even, also its left neighbor - in this case dividing by two.

It works find, but it sort of cheating.


Unfortunately, the Python standard library does not support heaps so strongly as other languages do. There is a good implementation for the min heap, but here we need to work also with a max heap. The common hack, that I am going to use here, is negating the values inserted in a heap, to let it work as a max heap. Not the cleanest thing to do, however it works fine here.

I have written a class, MedianHeap, that shield the intricacies of the min/max heaps, and offer a simple interface to my solution, that gets very easy:
def solution(values):
    heap = MedianHeap()

    result = []
    for value in values:  # 1

    return '\n'.join(result)
The MedianHeap constructor is trivial:
def __init__(self):
    self.left = []   # max heap
    self.right = []  # min heap
I am going to use two heaps. One the left, keeping the smaller values, that is going to be a max heap, and one on the right, a min heap that keeps the bigger values. I will be always only interested in the max value on the left and the min value on the right.

To push an element, I check if it is smaller that the max to the left, and in this case I am going to push it there, otherwise it will go to the right.
def push(self, value):
    if not self.left or value < -self.left[0]:  # 1
        heappush(self.left, -value)  # 2
        heappush(self.right, value)
1. Couple of nuisances. Before checking I must ensure the left heap is not empty. And, I have to remember my hack on the max heap, so I change the sign of the max element on it before comparing it to the current value.
2. Beside that, I simply use the heappush() function from the heapq python core library. Still, remember that the left heap is a max one, so change the sign of the value I pushing in.

After the push, we should ensure the size difference between the two heaps is minimal, zero or one, otherwise I have to balance them:
bigger = self.left if len(self.left) > len(self.right) else self.right
smaller = self.left if len(self.left) < len(self.right) else self.right
if len(bigger) - len(smaller) > 1:  # 1
    moved = -1 * heappop(bigger)  # 2
    heappush(smaller, moved)
1. If the unbalance is bigger than one, I move from the smaller heap to the bigger one. I care only of their size, left to right or the other way round is just the same.
2. Still, I should remember to swapping the sign of the element, since I am moving from a min to a max heap, or viceversa. Notice the use of the other heapq function at work here, heappop().

Getting the median element from my MedianHeap is easy:
def median(self):
    bigger = self.left if len(self.left) > len(self.right) else self.right
    smaller = self.left if len(self.left) < len(self.right) else self.right

    if bigger is smaller:
        return (self.right[0] - self.left[0]) / 2  # 1
        maxi = -1.0 if bigger is self.left else 1.0
        return maxi * bigger[0]
1. If the heaps have the same size, I should return the mean of their max/min elements. Instead of adding, I subtract them, for the hack on the max heap.
2. Otherwise I return the min/max element of the biggest heap. If the winner is the left heap, it is the max one, so I have to change its sign.

I have pushed both the full python script, the bisect insort and the heap based one, and the unit test I used for help me to develop the more complex second solution, to GitHub.

No comments:

Post a Comment