【Python】Python中的heapq模块详解,令人费解的_siftup与_siftdown函数

【Python】Python中的heapq模块详解,令人费解的_siftup与_siftdown函数

  • heapq模块基本介绍
  • 小顶堆的定义和特性
  • heapq模块的作用
  • heappush函数
  • heappop函数
  • heappush函数与_siftdown函数
  • 改良后的heap_push函数
  • heappop函数与_siftup函数
  • 改良后的heap_pop函数
  • heapq模块与改进后的HeapLittle的验证
  • heapq模块基本介绍

    heapq模块是python库中实现的小顶堆相关的函数的集合。对外暴露的接口函数有’heappush’, ‘heappop’, ‘heapify’, ‘heapreplace’, ‘merge’, ‘nlargest’, ‘nsmallest’, ‘heappushpop’。我挑选前两个进行详细的讲解。

    小顶堆的定义和特性

    在Python中,heapq模块提供了对小顶堆(Min Heap)的支持。小顶堆是一种数据结构,其中每个节点的值都小于或等于其子节点的值。这意味着堆的根节点始终是最小的元素。

    堆的定义如下,n个关键字序列L[0…n-1】称为堆,当且仅当该序列满足∶
    ①L(i)>=L(2i+1)且L(i)>=L(2i+2)或
    ②L(i)<=L(2i+1)且L(i)<=L(2i+2)(0≤i≤n//2)
    可以将该一维数组视为一棵完全二叉树,满足条件①的堆称为大根堆(大顶堆)。大根堆的最大元素存放在根结点。满足条件②的堆称为小根堆(小顶堆), 小根堆的定义刚好相反, 根结点是最小元素。

    让我们通过一个简单的例子来理解小顶堆的特性:

    import heapq
    
    # 创建一个空的小顶堆
    heap = []
    
    # 往堆中添加元素
    heapq.heappush(heap, 4)
    heapq.heappush(heap, 2)
    heapq.heappush(heap, 8)
    heapq.heappush(heap, 1)
    
    # 弹出堆中最小的元素
    min_element = heapq.heappop(heap)
    
    print("堆中最小的元素是:", min_element)
    print("剩余堆的元素:", heap)
    
    堆中最小的元素是: 1
    剩余堆的元素: [2, 4, 8]
    

    上述例子中,我们使用heappush向堆中添加元素,然后使用heappop弹出最小的元素。小顶堆的特性确保每次弹出的元素都是堆中最小的。

    heapq模块的作用

    heapq模块主要用于处理堆数据结构,提供了一些函数接口来有效地进行堆操作。其中,heappush和heappop是两个常用的函数,让我们深入了解它们的作用。

    heappush函数

    heapq.heappush(heap, item)函数用于将元素item添加到堆heap中,并保持堆的特性。即在添加元素后,堆仍然是一个有效的小顶堆。

    import heapq
    
    heap = [1, 3, 5, 7, 9]
    print("原始堆:", heap)
    
    # 使用heappush添加元素到堆中
    heapq.heappush(heap, 4)
    print("添加元素后的堆:", heap)
    
    原始堆: [1, 3, 5, 7, 9]
    添加元素后的堆: [1, 3, 4, 7, 9, 5]
    

    heappop函数

    heapq.heappop(heap)函数用于弹出并返回堆heap中的最小元素。在弹出元素后,堆的特性仍然被维护。

    import heapq
    
    heap = [1, 3, 5, 7, 9]
    print("原始堆:", heap)
    
    # 使用heappop弹出最小元素
    min_element = heapq.heappop(heap)
    print("弹出的最小元素是:", min_element)
    print("剩余堆的元素:", heap)
    
    原始堆: [1, 3, 5, 7, 9]
    弹出的最小元素是: 1
    剩余堆的元素: [3, 7, 5, 9]
    

    heappush函数与_siftdown函数

    def heappush(heap, item):
        """Push item onto heap, maintaining the heap invariant."""
        heap.append(item)
        _siftdown(heap, 0, len(heap)-1)
    
    # 'heap' is a heap at all indices >= startpos, except possibly for pos.  pos
    # is the index of a leaf with a possibly out-of-order value.  Restore the
    # heap invariant.
    def _siftdown(heap, startpos, pos):
        newitem = heap[pos]
        # Follow the path to the root, moving parents down until finding a place
        # newitem fits.
        while pos > startpos:
            parentpos = (pos - 1) >> 1 # 计算父节点的位置
            parent = heap[parentpos]
            if newitem < parent:
                heap[pos] = parent # 如果新元素小于父节点,交换它们的位置
                pos = parentpos
                continue
            break
        heap[pos] = newitem # 将新元素放置在正确的位置
    

    当调用heapq.heappush(heap, item)时,heappush函数会将元素item添加到堆heap中,并确保堆的特性得以维持。在heappush函数内部,使用_siftdown函数来执行以下功能:将新添加的元素item放置在正确的位置,以保持小顶堆的性质。具体而言,它通过比较元素的值和其父节点的值,并在需要时进行交换,直到满足小顶堆的条件为止。因为heappush函数将新元素是放在堆底,然后让堆底元素上浮到合适的位置,所有称为新加元素的上浮操作,_siftdown的down有点让人混淆视听。用面向对象的编程方法改写heapq.heappush函数如下节。

    改良后的heap_push函数

        class HeapLittle:
    	    def __init__(self):
    	        self.size = 0
    	        self.array = []
    	        
    	    @staticmethod
    	    def get_parent_index(child_index):
    	        return (child_index - 1) // 2
    
    	    def heap_push(self, item):
    	        self.array.append(item)
    	        self.size += 1
    	        self.heap_item_up(0, self.size - 1)
    	       
    	    def heap_item_up(self, start_position, end_position):
    	            new_item = self.array[end_position]
    	            iteration_index = end_position
    	            iteration_parent_index = self.get_parent_index(iteration_index)
    	            while iteration_parent_index >= start_position and new_item < self.array[iteration_parent_index]:
    	                    self.array[iteration_index] = self.array[iteration_parent_index]
    	                    iteration_index = iteration_parent_index
    	                    iteration_parent_index = self.get_parent_index(iteration_index)
    	            self.array[iteration_index] = new_item
    

    上面的heap_item_up函数改进了原版_siftdown函数的while循环的continue,break的逻辑,并增加函数封装更加表意。

    heappop函数与_siftup函数

    def heappop(heap):
        """Pop the smallest item off the heap, maintaining the heap invariant."""
        lastelt = heap.pop()    # raises appropriate IndexError if heap is empty
        if heap:
            returnitem = heap[0]
            heap[0] = lastelt
            _siftup(heap, 0)
            return returnitem
        return lastelt
    
    def _siftup(heap, pos):
        endpos = len(heap)
        startpos = pos
        newitem = heap[pos]
        # Bubble up the smaller child until hitting a leaf.
        childpos = 2*pos + 1    # leftmost child position
        while childpos < endpos:
            # Set childpos to index of smaller child.
            rightpos = childpos + 1
            if rightpos < endpos and not heap[childpos] < heap[rightpos]:
                childpos = rightpos # 如果右子节点存在且比左子节点小,则选择右子节点
            # Move the smaller child up.
            heap[pos] = heap[childpos] # 如果pos位置的节点有子节点,选最小子节点的位置继续往下沉,直到沉到叶子节点。
            pos = childpos
            childpos = 2*pos + 1
        # The leaf at pos is empty now.  Put newitem there, and bubble it up
        # to its final resting place (by sifting its parents down).
        heap[pos] = newitem
        _siftdown(heap, startpos, pos)  #将沉到叶子节点(pos位置)往上浮到合适位置。
    
    

    当调用heapq.heappop(heap)时,heappop函数会弹出并返回堆heap中的最小元素也就是堆顶元素,将堆的最后一个元素移动到堆顶,堆的长度减1。此时是新加入到堆顶的元素破坏了堆的性质,就需要把这个元素安置到合理位置。使用_siftup函数来执行以下功能:如果新加入到堆顶的元素有子节点,选它的最小子节点的位置往下沉,直到沉到叶子节点,此时叶子节点的位置假设为pos。那么就要执行_siftdown(heap, startpos, pos)函数将沉到的叶子节点(pos位置)往上浮到合适位置。以确保堆的特性得以维持。
    分析上述_siftup函数,总是感觉怪怪的,是否有点脱裤子放屁–多次一举的操作。先将堆顶元素下沉到叶子节点,然后将叶子节点上浮到合适位置。这是否将操作量变大了?或许有我无法理解的深刻含义,读者如果有知道的可以告知一二。

    改良后的heap_pop函数

        def heap_pop(self):
            """Pop the smallest item off the heap, maintaining the heap invariant."""
            last_item = self.array.pop()    # raises appropriate IndexError if heap is empty
            self.size -= 1
            if self.size != 0:
                return_item = self.array[0]
                self.array[0] = last_item
                # print(self)
                # print(f"first not adjusted, item is {last_item}")
                self.heap_item_down(0)
                return return_item
            return last_item
            
        def heap_item_down(self, start_position):
            new_item = self.array[start_position]
            iteration_index = start_position
            while self.has_left_child(iteration_index):
                smaller_child_index = self.get_left_child_index(iteration_index)
                right_child_index = self.get_right_child_index(iteration_index)
                if self.has_right_child(iteration_index) and self.array[right_child_index] < self.array[smaller_child_index]:
                    smaller_child_index = right_child_index
                if new_item > self.array[smaller_child_index]:
                    self.array[iteration_index] = self.array[smaller_child_index]
                    iteration_index = smaller_child_index
                else:
                    break
                # print(self)
                # print(f"iteration_index is {iteration_index}, item value is {new_item}")
            self.array[iteration_index] = new_item
            # print(self)
            # print("heap_item_down")
    
        def has_left_child(self, index):
            return self.get_left_child_index(index) < self.size
            
        def get_left_child(self, index):
            return self.array[self.get_left_child_index(index)]
    
        def get_right_child(self, index):
            return self.array[self.get_right_child_index(index)]
    
    

    新加入到堆顶的元素破坏了堆的性质,就需要把这个元素安置到合理位置。使用heap_item_down函数来执行以下功能:如果新加入到堆顶的元素有子节点,并且它的子节点有比它值小的,那就选它的最小子节点的位置往下沉。直到已经沉到叶子节点了,或它的节点值都比它子节点值都小。

    heapq模块与改进后的HeapLittle的验证

    import numpy as np
    import heapq
    
    
    class HeapLittle:
        def __init__(self):
            self.size = 0
            self.array = []
    
        def print_heap(self):
            for i in self.array:
                print(i)
            print("heap is above")
    
        def __str__(self):
            if self.size == 0:
                return "Heap is empty"
            else:
                return str(self.array)
    
        @staticmethod
        def get_left_child_index(parent_index):
            return 2 * parent_index + 1
    
        @staticmethod
        def get_right_child_index(parent_index):
            return 2 * parent_index + 2
    
        @staticmethod
        def get_parent_index(child_index):
            return (child_index - 1) // 2
    
        def has_left_child(self, index):
            return self.get_left_child_index(index) < self.size
    
        def has_right_child(self, index):
            return self.get_right_child_index(index) < self.size
    
        def has_parent(self, index):
            return self.get_parent_index(index) >= 0
    
        def get_left_child(self, index):
            return self.array[self.get_left_child_index(index)]
    
        def get_right_child(self, index):
            return self.array[self.get_right_child_index(index)]
    
        def get_parent(self, index):
            return self.array[self.get_parent_index(index)]
    
        def heap_item_up(self, start_position, end_position):
                new_item = self.array[end_position]
                iteration_index = end_position
                iteration_parent_index = self.get_parent_index(iteration_index)
                while iteration_parent_index >= start_position and new_item < self.array[iteration_parent_index]:
                        self.array[iteration_index] = self.array[iteration_parent_index]
                        iteration_index = iteration_parent_index
                        iteration_parent_index = self.get_parent_index(iteration_index)
                self.array[iteration_index] = new_item
    
        def heap_item_down(self, start_position):
            new_item = self.array[start_position]
            iteration_index = start_position
            while self.has_left_child(iteration_index):
                smaller_child_index = self.get_left_child_index(iteration_index)
                right_child_index = self.get_right_child_index(iteration_index)
                if self.has_right_child(iteration_index) and self.array[right_child_index] < self.array[smaller_child_index]:
                    smaller_child_index = right_child_index
                if new_item > self.array[smaller_child_index]:
                    self.array[iteration_index] = self.array[smaller_child_index]
                    iteration_index = smaller_child_index
                else:
                    break
                # print(self)
                # print(f"iteration_index is {iteration_index}, item value is {new_item}")
            self.array[iteration_index] = new_item
            # print(self)
            # print("heap_item_down")
    
        def heap_push(self, item):
            self.array.append(item)
            self.size += 1
            self.heap_item_up(0, self.size - 1)
    
        def heap_pop(self):
            """Pop the smallest item off the heap, maintaining the heap invariant."""
            last_item = self.array.pop()    # raises appropriate IndexError if heap is empty
            self.size -= 1
            if self.size != 0:
                return_item = self.array[0]
                self.array[0] = last_item
                # print(self)
                # print(f"first not adjusted, item is {last_item}")
                self.heap_item_down(0)
                return return_item
            return last_item
    
    
    heap_little_object = HeapLittle()
    
    array_normal = [np.random.randint(1, 50) for i in range(0, 10)]
    print(array_normal)
    for i in array_normal:
        heap_little_object.heap_push(i)
        print(heap_little_object)
    for i in range(0, 10):
        pop_item = heap_little_object.heap_pop()
        print(heap_little_object)
    
    print('python heapq')
    array_heapq = []
    for i in array_normal:
        heapq.heappush(array_heapq, i)
        print(array_heapq)
    for i in range(0, 10):
        pop_item = heapq.heappop(array_heapq)
        print(array_heapq)
    
    

    验证效果如下

    [9, 13, 45, 3, 23, 2, 4, 49, 19, 36]
    [9]
    [9, 13]
    [9, 13, 45]
    [3, 9, 45, 13]
    [3, 9, 45, 13, 23]
    [2, 9, 3, 13, 23, 45]
    [2, 9, 3, 13, 23, 45, 4]
    [2, 9, 3, 13, 23, 45, 4, 49]
    [2, 9, 3, 13, 23, 45, 4, 49, 19]
    [2, 9, 3, 13, 23, 45, 4, 49, 19, 36]
    [3, 9, 4, 13, 23, 45, 36, 49, 19]
    [4, 9, 19, 13, 23, 45, 36, 49]
    [9, 13, 19, 49, 23, 45, 36]
    [13, 23, 19, 49, 36, 45]
    [19, 23, 45, 49, 36]
    [23, 36, 45, 49]
    [36, 49, 45]
    [45, 49]
    [49]
    Heap is empty
    python heapq
    [9]
    [9, 13]
    [9, 13, 45]
    [3, 9, 45, 13]
    [3, 9, 45, 13, 23]
    [2, 9, 3, 13, 23, 45]
    [2, 9, 3, 13, 23, 45, 4]
    [2, 9, 3, 13, 23, 45, 4, 49]
    [2, 9, 3, 13, 23, 45, 4, 49, 19]
    [2, 9, 3, 13, 23, 45, 4, 49, 19, 36]
    [3, 9, 4, 13, 23, 45, 36, 49, 19]
    [4, 9, 19, 13, 23, 45, 36, 49]
    [9, 13, 19, 49, 23, 45, 36]
    [13, 23, 19, 49, 36, 45]
    [19, 23, 45, 49, 36]
    [23, 36, 45, 49]
    [36, 49, 45]
    [45, 49]
    [49]
    []
    

    作者:水击三千里,扶摇而上

    物联沃分享整理
    物联沃-IOTWORD物联网 » 【Python】Python中的heapq模块详解,令人费解的_siftup与_siftdown函数

    发表回复