Python遥感开发之数据趋势分析Sen+mk

Python遥感开发之数据趋势分析Sen+mk

  • 1 方法介绍
  • 1.1 Theil-Sen Median方法
  • 1.2 Mann-Kendall方法
  • 2 Python实现Sen+mk
  • 2.1 代码(作者实现,推荐使用)
  • 2.2 另外版本
  • 3 最终结果
  • 3.1 对slope.tif以及z.tif进行重分类
  • 3.2 对分类结果相乘
  • 3.3 最终结果Python代码实现
  • 前言:主要使用Python完成遥感时间序列数据趋势分析Sen+mk,得到slope、trend、p、s、tau、z指标。


    1 方法介绍

    1.1 Theil-Sen Median方法

    又被称为 Sen 斜率估计,是一种稳健的非参数统计的趋势计算方法。该方法计算效率高,对于测量误差和离群数据不敏感,常被用于长时间序列数据的趋势分析中。对于后续代码计算结果中的slope.tif解读,当slope大于0表示随时间序列呈现上升趋势;slope小于0表示随时间序列呈现下降趋势。

    1.2 Mann-Kendall方法

    Mann-Kendall是一种非参数统计检验方法,最初由Mann在1945年提出,后由Kendall和Sneyers进一步完善,其优点是不需要测量值服从正态分布,也不要求趋势是线性的,并且不受缺失值和异常值的影响,在长时间序列数据的趋势显著检验中得到了十分广泛的应用。对于后续代码计算结果中的z.tif,当|Z|大于1.65、1.96和2.58时,表示趋势分别通过了置信度为90%、95%和99%的显著性检验。

    2 Python实现Sen+mk

    2.1 代码(作者实现,推荐使用)

    # coding:utf-8
    import numpy as np
    import pymannkendall as mk
    import os
    from osgeo import gdal
    
    
    def read_tif(filepath):
        dataset = gdal.Open(filepath)
        col = dataset.RasterXSize  # 图像长度
        row = dataset.RasterYSize  # 图像宽度
        geotrans = dataset.GetGeoTransform()  # 读取仿射变换
        proj = dataset.GetProjection()  # 读取投影
        data = dataset.ReadAsArray()  # 转为numpy格式
        data = data.astype(np.float32)
        no_data_value = data[0][0]  # 设定NoData值
        data[data == no_data_value] = np.nan  # 将NoData转换为NaN
        return col, row, geotrans, proj, data
    
    
    def save_tif(data, reference_file, output_file):
        ds = gdal.Open(reference_file)
        shape = data.shape
        driver = gdal.GetDriverByName("GTiff")
        dataset = driver.Create(output_file, shape[1], shape[0], 1, gdal.GDT_Float32)  # 保存的数据类型
        dataset.SetGeoTransform(ds.GetGeoTransform())
        dataset.SetProjection(ds.GetProjection())
        dataset.GetRasterBand(1).WriteArray(data)
        dataset.FlushCache()
    
    
    def get_tif_list(directory):
        """
        获取目录下所有TIF文件的完整路径
        """
        return [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.tif')]
    
    
    if __name__ == '__main__':
        # 输入文件夹路径
        input_directory = r"E:\AAWORK\学校相关\rsei\data\四季\rsei-winter"
        output_directory = r"E:\AAWORK\学校相关\rsei\data\四季\sen+mk"
    
        # 读取文件列表
        file_list = get_tif_list(input_directory)
    
        # 读取第一个文件获取基本信息(列、行、仿射变换、投影)
        col, row, geotrans, proj, _ = read_tif(file_list[0])
    
        # 预分配结果数组
        slope_data = np.zeros((row, col), dtype=np.float32)
        s_data = np.zeros((row, col), dtype=np.float32)
        z_data = np.zeros((row, col), dtype=np.float32)
        trend_data = np.zeros((row, col), dtype=np.float32)
        tau_data = np.zeros((row, col), dtype=np.float32)
        p_data = np.zeros((row, col), dtype=np.float32)
    
        # 将所有 TIFF 文件的数据堆叠为一个三维数组 (时间, 行, 列)
        tif_stack = np.array([read_tif(f)[4] for f in file_list])
    
        # 遍历每个像素
        for i in range(row):
            print(f"Processing row {i+1}/{row}...")
            for j in range(col):
                pixel_values = tif_stack[:, i, j]  # 取出所有时间点的该像素值
                if not np.any(np.isnan(pixel_values)):  # 过滤掉包含NaN的像素
                    trend, h, p, z, Tau, s, var_s, slope, intercept = mk.original_test(pixel_values)
                    trend_data[i, j] = 1 if trend == "increasing" else -1 if trend == "decreasing" else 0
                    slope_data[i, j] = slope
                    s_data[i, j] = s
                    z_data[i, j] = z
                    p_data[i, j] = p
                    tau_data[i, j] = Tau
    
        # 输出文件路径
        save_tif(slope_data, file_list[0], os.path.join(output_directory, 'winter_slope.tif'))
        save_tif(s_data, file_list[0], os.path.join(output_directory, 'winter_s.tif'))
        save_tif(z_data, file_list[0], os.path.join(output_directory, 'winter_z.tif'))
        save_tif(trend_data, file_list[0], os.path.join(output_directory, 'winter_trend.tif'))
        save_tif(p_data, file_list[0], os.path.join(output_directory, 'winter_p.tif'))
        save_tif(tau_data, file_list[0], os.path.join(output_directory, 'winter_tau.tif'))
    
        print("Processing completed!")
    
    

    2.2 另外版本

    (参考《遥感数据趋势分析Sen+mk》)

    # coding:utf-8
    import numpy as np
    import pymannkendall as mk
    import os
    import rasterio as ras
    
    # 写影像
    def writeImage(image_save_path, height1, width1, para_array, bandDes, transform1):
        with ras.open(
                image_save_path,
                'w',
                driver='GTiff',
                height=height1,
                width=width1,
                count=1,
                dtype=para_array.dtype,
                crs='+proj=latlong',
                transform=transform1,
        ) as dst:
            dst.write_band(1, para_array)
            dst.set_band_description(1, bandDes)
        del dst
    
    def sen_mk_test(image_path, outputPath):
        # image_path:影像的存储路径
        # outputPath:结果输出路径
        global path1
        filepaths = []
        for file in os.listdir(path1):
            filepath1 = os.path.join(path1, file)
            filepaths.append(filepath1)
    
        # 获取影像数量
        num_images = len(filepaths)
        # 读取影像数据
        img1 = ras.open(filepaths[0])
        # 获取影像的投影,高度和宽度
        transform1 = img1.transform
        height1 = img1.height
        width1 = img1.width
        array1 = img1.read()
        img1.close()
        # 读取所有影像
        for path1 in filepaths[1:]:
            if path1[-3:] == 'tif':
                print(path1)
                img2 = ras.open(path1)
                array2 = img2.read()
                array1 = np.vstack((array1, array2))
                img2.close()
        nums, width, height = array1.shape
        # 输出矩阵,无值区用-9999填充
        slope_array = np.full([width, height], np.nan)
        z_array = np.full([width, height], np.nan)
        Trend_array = np.full([width, height], np.nan)
        Tau_array = np.full([width, height], np.nan)
        s_array = np.full([width, height], np.nan)
        p_array = np.full([width, height], np.nan)
        # 只有有值的区域才进行mk检验
        c1 = np.isnan(array1)
        sum_array1 = np.sum(c1, axis=0)
        nan_positions = np.where(sum_array1 == num_images)
        positions = np.where(sum_array1 != num_images)
        # 输出总像元数量
        print("all the pixel counts are {0}".format(len(positions[0])))
        # mk test
        for i in range(len(positions[0])):
            print(i)
            x = positions[0][i]
            y = positions[1][i]
            mk_list1 = array1[:, x, y]
            trend, h, p, z, Tau, s, var_s, slope, intercept = mk.original_test(mk_list1)
            '''        
            trend: tells the trend (increasing, decreasing or no trend)
                    h: True (if trend is present) or False (if trend is absence)
                    p: p-value of the significance test
                    z: normalized test statistics
                    Tau: Kendall Tau
                    s: Mann-Kendal's score
                    var_s: Variance S
                    slope: Theil-Sen estimator/slope
                    intercept: intercept of Kendall-Theil Robust Line
            '''
            if trend == "decreasing":
                trend_value = -1
            elif trend == "increasing":
                trend_value = 1
            else:
                trend_value = 0
            slope_array[x, y] = slope  # senslope
            s_array[x, y] = s
            z_array[x, y] = z
            Trend_array[x, y] = trend_value
            p_array[x, y] = p
            Tau_array[x, y] = Tau
        all_array = [slope_array, Trend_array, p_array, s_array, Tau_array, z_array]
        slope_save_path = os.path.join(result_path, "slope.tif")
        Trend_save_path = os.path.join(result_path, "Trend.tif")
        p_save_path = os.path.join(result_path, "p.tif")
        s_save_path = os.path.join(result_path, "s.tif")
        tau_save_path = os.path.join(result_path, "tau.tif")
        z_save_path = os.path.join(result_path, "z.tif")
        image_save_paths = [slope_save_path, Trend_save_path, p_save_path, s_save_path, tau_save_path, z_save_path]
        band_Des = ['slope', 'trend', 'p_value', 'score', 'tau', 'z_value']
        for i in range(len(all_array)):
            writeImage(image_save_paths[i], height1, width1, all_array[i], band_Des[i], transform1)
    
    if __name__ == '__main__':
        # 调用
        path1 = r"E:\AAWORK\work\研究方向\rsei\data\年\rsei_year-25km"
        result_path = r"E:\AAWORK\work\研究方向\rsei\data\年\out"
        sen_mk_test(path1, result_path)
    
  • slope_array:存储每个像元的Sen’s Slope值,表示该像元的趋势斜率。
  • trend_array:存储每个像元的趋势方向,1表示增加,-1表示减少,0表示无趋势。
  • p_array:存储每个像元的p值,用于判断趋势的显著性。
  • s_array:存储每个像元的Mann-Kendall S统计量。
  • tau_array:存储每个像元的Kendall Tau值,表示相关性强度和方向。
  • z_array:存储每个像元的标准化检验统计量Z,用于计算p值并判断趋势显著性。
  • 3 最终结果

    3.1 对slope.tif以及z.tif进行重分类

  • slope>0赋值1表示增加
  • slope=0赋值0表示不变
  • slope<0赋值-1表示减少
  • |z|>1.96赋值2表示显著
  • |z|<=1.96赋值1表示不显著
  • 3.2 对分类结果相乘

  • -2:显著减少
  • -1:不显著减少
  • 0:稳定不变
  • 1:不显著增加
  • 2:显著增加
  • 3.3 最终结果Python代码实现

    import numpy as np
    import rasterio as ras
    
    def classify_rasters(slope_path, z_path, output_path):
        with ras.open(slope_path) as slope_src, ras.open(z_path) as z_src:
            slope = slope_src.read(1)
            z = z_src.read(1)
            profile = slope_src.profile
    
        # 重分类
        classified_slope = np.full(slope.shape, 0, dtype=np.int8)
        classified_slope[slope > 0] = 1
        classified_slope[slope < 0] = -1
    
        classified_z = np.full(z.shape, 1, dtype=np.int8)
        classified_z[np.abs(z) > 1.96] = 2
        classified_z[np.abs(z) <= 1.96] = 1
    
        # 相乘得到最终分类结果
        final_class = classified_slope * classified_z
    
        # 保存结果
        with ras.open(
            output_path, 'w',
            driver='GTiff',
            height=final_class.shape[0],
            width=final_class.shape[1],
            count=1,
            dtype=final_class.dtype,
            crs=profile['crs'],
            transform=profile['transform']
        ) as dst:
            dst.write(final_class, 1)
    
    if __name__ == '__main__':
        # 调用
        slope_path = r"E:\AAWORK\work\研究方向\rsei\data\年\out\slope.tif"
        z_path = r"E:\AAWORK\work\研究方向\rsei\data\年\out\z.tif"
        output_path = r"E:\AAWORK\work\研究方向\rsei\data\年\out\final_classification.tif"
        classify_rasters(slope_path, z_path, output_path)
    

    作者:等待着冬天的风

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python遥感开发之数据趋势分析Sen+mk

    发表回复