处理中国地区BUFR探空数据

萌新 · 更新于
Published on

Authors
  • Name
    萌新

# 前情提要

由于种种不明因素???,官方渠道的探空数据进一步收紧。Wyoming极为便利的查询网站对于中国境内数据也多数缺少高空部分的数据,少部分渠道数据且用且珍惜。 CMA每天需要向各国共享基本站点的00/12Z探空数据,其中共享的数据格式为BUFR(数据质量很不错)。先前在Github搜寻一圈使用过ecmwf/eccodes(ECMWF's GRIB and BUFR encoding/decoding library)进行处理,但对于进行探空可视化所需的读取过于繁杂。

前几天Pyy老师开源了CyanideCN/TaxolLib包含许多的Libs,其中的 upperair_bufr.py 使用pybufrkit进行读取,已经预先处理好了各类要素的编号,大大减轻了处理负担。

稍做处理返回要素数据,配套metpy库可以计算各类指数。

python
	def get_data(self, station_id):
			df = self.query(station_id)
			pres = df['pressure']
			temp = df['temperature']
			dwpt = df['dewpoint']
			wdir = df['wind_direction']
			wspd = df['wind_speed']
			alt = df['geopotential']
			return pres, temp, dwpt, wdir, wspd, alt
	def get_data(self, station_id):
			df = self.query(station_id)
			pres = df['pressure']
			temp = df['temperature']
			dwpt = df['dewpoint']
			wdir = df['wind_direction']
			wspd = df['wind_speed']
			alt = df['geopotential']
			return pres, temp, dwpt, wdir, wspd, alt
python
Sounding = SoundingDecoder('C:/Users/Administrator/Desktop/Sat/SatData/Upper_air/A_IUKN09BABJ300000_C_RJTD_20220130013517_60.bufr')
data = Sounding.query("52836")
pres, temp, dwpt, wdir, wspd, alt = Sounding.get_data("52836")
print(alt)
Sounding = SoundingDecoder('C:/Users/Administrator/Desktop/Sat/SatData/Upper_air/A_IUKN09BABJ300000_C_RJTD_20220130013517_60.bufr')
data = Sounding.query("52836")
pres, temp, dwpt, wdir, wspd, alt = Sounding.get_data("52836")
print(alt)

同时也可以使用to_sharppy直接生成SHARPPy格式的探空数据,丢入SHARPPy绘制分析。

python
import datetime
date = "2022013001351"
dtime = datetime.datetime.strptime(date, '%Y%m%d%H%M%S')
outdir = 'C:/Users/Administrator/Desktop/Sat/SatData/Upper_air'
Sounding = SoundingDecoder('C:/Users/Administrator/Desktop/Sat/SatData/Upper_air/A_IUKN09BABJ300000_C_RJTD_20220130013517_60.bufr')
out = Sounding.to_sharppy("52836", dtime, outdir)
import datetime
date = "2022013001351"
dtime = datetime.datetime.strptime(date, '%Y%m%d%H%M%S')
outdir = 'C:/Users/Administrator/Desktop/Sat/SatData/Upper_air'
Sounding = SoundingDecoder('C:/Users/Administrator/Desktop/Sat/SatData/Upper_air/A_IUKN09BABJ300000_C_RJTD_20220130013517_60.bufr')
out = Sounding.to_sharppy("52836", dtime, outdir)

这里贴一份upperair_bufr.py,还请大家给Pyy老师一个Star!CyanideCN/TaxolLib

python
# https://github.com/CyanideCN/TaxolLib/blob/main/upperair_bufr.py

import os
from pybufrkit.dataquery import NodePathParser, DataQuerent
from pybufrkit.decoder import Decoder
import numpy as np
from pandas import DataFrame

def dump_msg(msg):
    return np.array(list(msg.values())[0][0], float).ravel()

def safe_number(arr):
    nanpos = np.isnan(arr)
    arr_new = arr.copy()
    if nanpos.any():
        arr_new[nanpos] = -9999
    return arr_new

def decimal_fmt(num):
    return '{:.2f}'.format(num)

def format_str(tup):
    tup = map(decimal_fmt, tup)
    fmt = '{:>7},{:>10},{:>10},{:>10},{:>10},{:>10}\n'.format(*tup)
    return fmt

class SoundingDecoder(object):

    CODE_PRES = '007004'
    CODE_TMP = '012101'
    CODE_TD = '012103'
    CODE_HGT = '010009'
    CODE_WDIR = '011001'
    CODE_WSPD = '011002'
    CODE_BLOCK = '001001'
    CODE_ID = '001002'

    def __init__(self, file):
        decoder = Decoder()
        f = open(file, 'rb')
        self.msg = decoder.process(f.read())
        f.close()
        self.id_list = self._station_list()

    def _station_list(self):
        q = DataQuerent(NodePathParser())
        wmo_block = q.query(self.msg, self.CODE_BLOCK).results
        wmo_id = q.query(self.msg, self.CODE_ID).results
        def ravel(l):
            return [i for j in l for i in j]
        id_list = [str(i) + str(j).zfill(3) for i, j in zip(ravel(wmo_block.values()), ravel(wmo_id.values()))]
        return np.array(id_list)

    def _query(self, subset_num):
        TEMPLATE = '@[{}] > {}'
        q = DataQuerent(NodePathParser())
        df = DataFrame()
        pres_msg = q.query(self.msg, TEMPLATE.format(subset_num, self.CODE_PRES)).results
        df['pressure'] = dump_msg(pres_msg) / 100
        tmp_msg = q.query(self.msg, TEMPLATE.format(subset_num, self.CODE_TMP)).results
        df['temperature'] = dump_msg(tmp_msg) - 273.15
        td_msg = q.query(self.msg, TEMPLATE.format(subset_num, self.CODE_TD)).results
        df['dewpoint'] = dump_msg(td_msg) - 273.15
        hgt_msg = q.query(self.msg, TEMPLATE.format(subset_num, self.CODE_HGT)).results
        df['geopotential'] = dump_msg(hgt_msg)
        wdir_msg = q.query(self.msg, TEMPLATE.format(subset_num, self.CODE_WDIR)).results
        df['wind_direction'] = dump_msg(wdir_msg)
        wspd_msg = q.query(self.msg, TEMPLATE.format(subset_num, self.CODE_WSPD)).results
        df['wind_speed'] = dump_msg(wspd_msg)
        # Basic QC
        df = df.fillna(value={'dewpoint':-9999}) # Fill missing dewpoint at high levels
        df = df.interpolate(method='linear', limit_direction='forward', axis=0).drop_duplicates('geopotential').sort_values(by='geopotential')
        return df.fillna(-9999) # Fill Nans in starting level

    def query(self, station_id):
        if station_id not in self.id_list:
            raise ValueError('Station not found')
        id_idx = (station_id == self.id_list).nonzero()
        subset_num = id_idx[0][0]
        return self._query(subset_num)

    def to_sharppy(self, station_id, dtime, outdir):
        df = self.query(station_id)
        fname = 'SKEWT_' + dtime.strftime('%Y%m%d%H0000') + '_' + station_id +'.txt'
        outpath = os.path.join(outdir, fname)
        f = open(outpath, 'w')
        f.write('%TITLE%\n')
        f.write(' {}   {}\n\n'.format(station_id, dtime.strftime('%y%m%d/%H%M')))
        f.write('   LEVEL       HGHT       TEMP       DWPT       WDIR       WSPD\n')
        f.write('-------------------------------------------------------------------\n')
        f.write('%RAW%\n')
        for data in zip(safe_number(df['pressure']), safe_number(df['geopotential']),
                        safe_number(df['temperature']), safe_number(df['dewpoint']),
                        safe_number(df['wind_direction']), safe_number(df['wind_speed'] * 1.9438)):
            formatted = format_str(data)
            f.write(formatted)
        f.write('%END%')
        f.close()
        return outpath
# https://github.com/CyanideCN/TaxolLib/blob/main/upperair_bufr.py

import os
from pybufrkit.dataquery import NodePathParser, DataQuerent
from pybufrkit.decoder import Decoder
import numpy as np
from pandas import DataFrame

def dump_msg(msg):
    return np.array(list(msg.values())[0][0], float).ravel()

def safe_number(arr):
    nanpos = np.isnan(arr)
    arr_new = arr.copy()
    if nanpos.any():
        arr_new[nanpos] = -9999
    return arr_new

def decimal_fmt(num):
    return '{:.2f}'.format(num)

def format_str(tup):
    tup = map(decimal_fmt, tup)
    fmt = '{:>7},{:>10},{:>10},{:>10},{:>10},{:>10}\n'.format(*tup)
    return fmt

class SoundingDecoder(object):

    CODE_PRES = '007004'
    CODE_TMP = '012101'
    CODE_TD = '012103'
    CODE_HGT = '010009'
    CODE_WDIR = '011001'
    CODE_WSPD = '011002'
    CODE_BLOCK = '001001'
    CODE_ID = '001002'

    def __init__(self, file):
        decoder = Decoder()
        f = open(file, 'rb')
        self.msg = decoder.process(f.read())
        f.close()
        self.id_list = self._station_list()

    def _station_list(self):
        q = DataQuerent(NodePathParser())
        wmo_block = q.query(self.msg, self.CODE_BLOCK).results
        wmo_id = q.query(self.msg, self.CODE_ID).results
        def ravel(l):
            return [i for j in l for i in j]
        id_list = [str(i) + str(j).zfill(3) for i, j in zip(ravel(wmo_block.values()), ravel(wmo_id.values()))]
        return np.array(id_list)

    def _query(self, subset_num):
        TEMPLATE = '@[{}] > {}'
        q = DataQuerent(NodePathParser())
        df = DataFrame()
        pres_msg = q.query(self.msg, TEMPLATE.format(subset_num, self.CODE_PRES)).results
        df['pressure'] = dump_msg(pres_msg) / 100
        tmp_msg = q.query(self.msg, TEMPLATE.format(subset_num, self.CODE_TMP)).results
        df['temperature'] = dump_msg(tmp_msg) - 273.15
        td_msg = q.query(self.msg, TEMPLATE.format(subset_num, self.CODE_TD)).results
        df['dewpoint'] = dump_msg(td_msg) - 273.15
        hgt_msg = q.query(self.msg, TEMPLATE.format(subset_num, self.CODE_HGT)).results
        df['geopotential'] = dump_msg(hgt_msg)
        wdir_msg = q.query(self.msg, TEMPLATE.format(subset_num, self.CODE_WDIR)).results
        df['wind_direction'] = dump_msg(wdir_msg)
        wspd_msg = q.query(self.msg, TEMPLATE.format(subset_num, self.CODE_WSPD)).results
        df['wind_speed'] = dump_msg(wspd_msg)
        # Basic QC
        df = df.fillna(value={'dewpoint':-9999}) # Fill missing dewpoint at high levels
        df = df.interpolate(method='linear', limit_direction='forward', axis=0).drop_duplicates('geopotential').sort_values(by='geopotential')
        return df.fillna(-9999) # Fill Nans in starting level

    def query(self, station_id):
        if station_id not in self.id_list:
            raise ValueError('Station not found')
        id_idx = (station_id == self.id_list).nonzero()
        subset_num = id_idx[0][0]
        return self._query(subset_num)

    def to_sharppy(self, station_id, dtime, outdir):
        df = self.query(station_id)
        fname = 'SKEWT_' + dtime.strftime('%Y%m%d%H0000') + '_' + station_id +'.txt'
        outpath = os.path.join(outdir, fname)
        f = open(outpath, 'w')
        f.write('%TITLE%\n')
        f.write(' {}   {}\n\n'.format(station_id, dtime.strftime('%y%m%d/%H%M')))
        f.write('   LEVEL       HGHT       TEMP       DWPT       WDIR       WSPD\n')
        f.write('-------------------------------------------------------------------\n')
        f.write('%RAW%\n')
        for data in zip(safe_number(df['pressure']), safe_number(df['geopotential']),
                        safe_number(df['temperature']), safe_number(df['dewpoint']),
                        safe_number(df['wind_direction']), safe_number(df['wind_speed'] * 1.9438)):
            formatted = format_str(data)
            f.write(formatted)
        f.write('%END%')
        f.close()
        return outpath
Valaxy v0.17.2 驱动 | 主题 - Sakura v0.0.7