雷达组网拼图3.0数据解析与Python处理技巧详解
废话不多说,先展示雷达图
以反射率为例:
核对数据格式
Z_RADA_C_BABJ_20240705043615_P_DOR_ACHN_CREF_20240705_043000.bin
数据分析认识
1. 组网产品分类:
组网产品包括组网混合扫描反射率(HSR),组网组合反射率(CR)、组网最大反射率高度(CRH)、组网回波顶高产品(ET)、组网垂直累积液态水含量(VIL)、组网一小时降水(OHP)、组网等高面反射率(CAP)。数据频次为10分钟
2. 文件名结构:
Z_RADA_C_BABJ_{YYYYMMDDhhmmss}_P_DOR_{AREACODE}_{DTYPE}_{YYYYMMDD_hhmmss}.bin,{}内表示为命名规则中的变量,定义依次如下:
YYYYMMDDhhmmss:产品生成时间,年月日时分秒
AREACODE:区域号,如全国区域:ACHN
DTYPE:雷达产品类型简写,如QREF
YYYYMMDD_hhmmss:资料观测时间,年月日_时分秒
3. 文件格式:
组网拼图产品文件格式定义为:文件头 + 数据块。文件头中定义了文件卷标(文件固定标识、文件格式版本代码、文件字节数)、
产品描述(拼图产品编号、坐标类型、产品代码、产品描述、产品数据起始位置、产品数据字节数)、数据时间(数据时钟、观测时间年、月、日等)、
数据区信息(数据区的南、西、北、东边界等),具体定义如下表所示。文件头共占用256字节,实际定义变量占176字节,
后面的80字节为预留空间。数据块为nX*nY(文件头中定义变量)个short类型数组,根据文件头中的Compress标志位判断是否压缩
总体数据包括文件头和数据块,数据为bin格式二进制,根据文件格式将二进制转化为实际的数据类型,其余不过多分析,到这里,我们基本了解了组网产品的基本情况,接下来我们直接读取和出雷达图。
# 雷达拼图3.0数据绘制雷达图*************
# 设置日志
logger = logging.getLogger('RADA_L3_MST_CREF_QC')
logging.basicConfig(level=logging.INFO)
# 假设雷达颜色映射表 radarColoMap 已定义,并且是Python列表格式
def group(array, sub_group_length):
for i in range(0, len(array), sub_group_length):
yield array[i:i + sub_group_length]
def read_string(buffer, length, encoding='gb2312'):
return buffer[:length].decode(encoding).replace('\x00', '').replace(' ', '')
def unbzip2_decompress(data):
try:
return bz2.decompress(data)
except Exception as e:
logger.error(f"Error decompressing bz2 data: {e}")
return None
def get_path(path, path1):
# 找到path1在path中的结束位置
end_index = path.find(path1) + len(path1)
# 截取从path1之后的部分
sub_path = path[end_index:]
return sub_path
class BinaryReader:
def __init__(self, buffer, is_little_endian=True):
self.buffer = buffer
self.index = 0
self.is_little_endian = is_little_endian
def seek(self, offset, origin=0):
if origin == 0: # Set absolute position
self.index = offset
elif origin == 1: # Set relative to current position
self.index += offset
elif origin == 2: # Set relative to end of file
self.index = len(self.buffer) + offset
def read_bytes(self, length):
data = self.buffer[self.index:self.index + length]
self.index += length
return data
def read_string(self, length):
return read_string(self.read_bytes(length), length)
def read_int(self):
fmt = '<i' if self.is_little_endian else '>i'
value, = struct.unpack_from(fmt, self.buffer, self.index)
self.index += struct.calcsize(fmt)
return value
def read_int16(self):
fmt = '<h' if self.is_little_endian else '>h'
value, = struct.unpack_from(fmt, self.buffer, self.index)
self.index += struct.calcsize(fmt)
return value
def readHeader(br):
# 读取文件头信息
label = br.read_string(4) # 文件固定标识
version = br.read_string(4) # 文件格式版本代码
file_bytes = br.read_int() # 文件字节数
mosaic_id = br.read_int16() # 拼图产品编号
coordinate = br.read_int16()
varname = br.read_string(8)
description = br.read_string(64)
BlockPos = br.read_int()
BlockLen = br.read_int()
TimeZone = br.read_int()
yr = br.read_int16()
mon = br.read_int16()
day = br.read_int16()
hr = br.read_int16()
min = br.read_int16()
sec = br.read_int16()
ObsSeconds = br.read_int()
ObsDates = br.read_int16()
GenDates = br.read_int16()
GenSeconds = br.read_int()
latMin = br.read_int() / 1000
lonMin = br.read_int() / 1000
latMax = br.read_int() / 1000
lonMax = br.read_int() / 1000
cx = br.read_int() / 1000
cy = br.read_int() / 1000
# 读取数据区前的信息
nX = br.read_int() # 列数
nY = br.read_int() # 行数
dx = br.read_int() / 10000
dy = br.read_int() / 10000
height = br.read_int16()
compress = br.read_int16()
num_of_radars = br.read_int()
UnZipBytes = br.read_int()
scale = br.read_int16()
unUsed = br.read_int16()
RgnID = br.read_string(8)
units = br.read_string(8)
reserved = br.read_string(8)
return yr, mon, day, nX, nY, latMin, latMax, lonMin, lonMax, scale
# 读取数据字节
def readData(buffer):
# 读取数据区
# br.seek(256) # 跳过未压缩的256字节头部
compressed_data = unbzip2_decompress(buffer[256:])
return compressed_data
# 解压bz2数据
def unbzip2_decompress(data):
try:
return bz2.decompress(data)
except Exception as e:
logger.error(f"Error decompressing bz2 data: {e}")
return None
class RadarWarnRequest(BaseModel):
filePath: Any
radarValue: Any
lonMin: Any
lonMax: Any
latMin: Any
latMax: Any
provinceId: Any
class RADA_L3_MST_CREF_QC:
@staticmethod
def parse(file_path, radarValue, lonMin, lonMax, latMin, latMax, provinceId):
with open(file_path, 'rb') as f:
buffer = f.read()
br = BinaryReader(buffer, is_little_endian=True)
yr, mon, day, nX, nY, lat_Min, lat_Max, lon_Min, lon_Max, scale = readHeader(br)
compressed_data = readData(buffer)
if compressed_data:
data = np.frombuffer(compressed_data, dtype=np.int16)
array_2d = data.reshape((nY, nX))
array_2d = array_2d / scale # 数据放大倍数
array_2d = np.where((array_2d >= 0) & (array_2d <= 75), array_2d, np.nan)
# array_2d = ma.masked_where((array_2d > 75) | (array_2d < -5), array_2d) # 筛除无效值
# array_2d = ma.filled(array_2d)
# array_2d = np.round(array_2d, decimals=0) # 保留两位
# 使用flipud函数进行翻转
array_2d = np.flipud(array_2d)
lon = np.linspace(lon_Min, lon_Max, nX)
lat = np.linspace(lat_Min, lat_Max, nY)
# # 裁切完的数据
cropped_array_2d, cropped_lon, cropped_lat = crop_masked_array_by_bounds(array_2d, lon, lat, lonMin, lonMax,
latMin, latMax)
# 绘制河南的
proj = ccrs.PlateCarree() # 创建投影,选择cartopy的platecarree投影
fig = plt.figure(figsize=(10, 8)) # 创建页面,可以自己选择大小
# ax = fig.subplots(subplot_kw={'projection': proj}) #子图
ax = fig.add_subplot(1, 1, 1, projection=proj)
levs = [0, 5.0, 10.0, 15.0, 20.0, 25.0, 30.0, 35.0, 40.0, 45.0, 50.0, 55.0, 60.0, 65.0, 70.0]
cols = [(0, 0, 239 / 255), (65 / 255, 157 / 255, 241 / 255), (100 / 255, 231 / 255, 235 / 255),
(109 / 255, 250 / 255, 61 / 255), (0, 216 / 255, 0),
(1 / 255, 144 / 255, 0), (255 / 255, 255 / 255, 0), (231 / 255, 192 / 255, 0),
(255 / 255, 144 / 255, 0), (255 / 255, 0, 0), (214 / 255, 0, 0), (192 / 255, 0, 0),
(255 / 255, 0, 240 / 255),
(150 / 255, 0, 180 / 255), (173 / 255, 144 / 255, 240 / 255)]
# 创建自定义颜色映射
custom_cmap = ListedColormap(cols)
norm = BoundaryNorm(levs, custom_cmap.N, clip=False)
# 绘制等高线填充图
plt.contourf(lon, lat, array_2d, cmap=custom_cmap, norm=norm, levels=levs)
# 设置透明背景并保存图片
plt.savefig('../v3/china.png', transparent=True, bbox_inches='tight',
pad_inches=0)
else:
logger.error("Failed to decompress data")
return
作者:七颗糖很甜