"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU Lesser General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Define TCTracks: IBTracs reader and tracks manager.
"""
__all__ = ['SAFFIR_SIM_CAT', 'TCTracks', 'set_category']
import os
import glob
import shutil
import logging
import datetime as dt
import array
import itertools
import numpy as np
import matplotlib.cm as cm_mp
from matplotlib.lines import Line2D
from matplotlib.collections import LineCollection
from matplotlib.colors import BoundaryNorm, ListedColormap
import matplotlib.pyplot as plt
import cartopy.crs as ccrs
import pandas as pd
import xarray as xr
from sklearn.neighbors import DistanceMetric
import netCDF4 as nc
from numba import jit
from pint import UnitRegistry
import scipy.io.matlab as matlab
from climada.util.config import CONFIG
import climada.util.coordinates as coord_util
from climada.util.constants import EARTH_RADIUS_KM, SYSTEM_DIR
from climada.util.files_handler import get_file_names, download_ftp
import climada.util.plot as u_plot
LOGGER = logging.getLogger(__name__)
SAFFIR_SIM_CAT = [34, 64, 83, 96, 113, 135, 1000]
""" Saffir-Simpson Hurricane Wind Scale in kn"""
CAT_NAMES = {1: 'Tropical Depression', 2: 'Tropical Storm',
3: 'Hurrican Cat. 1', 4: 'Hurrican Cat. 2',
5: 'Hurrican Cat. 3', 6: 'Hurrican Cat. 4', 7: 'Hurrican Cat. 5'}
""" Saffir-Simpson category names. """
CAT_COLORS = cm_mp.rainbow(np.linspace(0, 1, len(SAFFIR_SIM_CAT)))
""" Color scale to plot the Saffir-Simpson scale."""
IBTRACS_URL = 'ftp://eclipse.ncdc.noaa.gov/pub/ibtracs//v04r00/provisional/netcdf/'
""" FTP of IBTrACS netcdf file containing all tracks v4.0 """
IBTRACS_FILE = 'IBTrACS.ALL.v04r00.nc'
""" IBTrACS v4.0 file all """
DEF_ENV_PRESSURE = 1010
""" Default environmental pressure """
[docs]class TCTracks():
"""Contains tropical cyclone tracks.
Attributes:
data (list(xarray.Dataset)): list of tropical cyclone tracks. Each
track contains following attributes:
- time (coords)
- lat (coords)
- lon (coords)
- time_step
- radius_max_wind
- max_sustained_wind
- central_pressure
- environmental_pressure
- max_sustained_wind_unit (attrs)
- central_pressure_unit (attrs)
- name (attrs)
- sid (attrs)
- orig_event_flag (attrs)
- data_provider (attrs)
- basin (attrs)
- id_no (attrs)
- category (attrs)
computed during processing:
- on_land
- dist_since_lf
"""
[docs] def __init__(self, pool=None):
"""Empty constructor. Read csv IBTrACS files if provided. """
self.data = list()
if pool:
self.pool = pool
LOGGER.debug('Using %s CPUs.', self.pool.ncpus)
else:
self.pool = None
[docs] def append(self, tracks):
"""Append tracks to current.
Parameters:
tracks (xarray.Dataset or list(xarray.Dataset)): tracks to append.
"""
if not isinstance(tracks, list):
tracks = [tracks]
self.data.extend(tracks)
[docs] def get_track(self, track_name=None):
"""Get track with provided name. Return all tracks if no name provided.
Parameters:
track_name (str, optional): name or sid (ibtracsID for IBTrACS)
of track
Returns:
xarray.Dataset or [xarray.Dataset]
"""
if track_name is None:
if len(self.data) == 1:
return self.data[0]
return self.data
for track in self.data:
if track.name == track_name:
return track
if hasattr(track, 'sid') and track.sid == track_name:
return track
LOGGER.info('No track with name or sid %s found.', track_name)
return []
[docs] def read_ibtracs_netcdf(self, provider='usa', storm_id=None,
year_range=(1980, 2018), basin=None,
file_name='IBTrACS.ALL.v04r00.nc', correct_pres=True):
"""Fill from raw ibtracs v04. Removes nans in coordinates, central
pressure and removes repeated times data. Fills nans of environmental_pressure
and radius_max_wind. Checks environmental_pressure > central_pressure.
Parameters:
provider (str): data provider. e.g. usa, newdelhi, bom, cma, tokyo
storm_id (str or list(str), optional): ibtracs if of the storm,
e.g. 1988234N13299, [1988234N13299, 1989260N11316]
year_range(tuple, optional): (min_year, max_year). Default: (1980, 2018)
basin (str, optional): e.g. US, SA, NI, SI, SP, WP, EP, NA. if not
provided, consider all basins.
file_name (str, optional): name of netcdf file to be dowloaded or located
at climada/data/system. Default: 'IBTrACS.ALL.v04r00.nc'.
correct_pres (bool, optional): correct central pressure if missing
values. Default: False
"""
self.data = list()
fn_nc = os.path.join(os.path.abspath(SYSTEM_DIR), file_name)
if not glob.glob(fn_nc):
try:
download_ftp(os.path.join(IBTRACS_URL, IBTRACS_FILE), IBTRACS_FILE)
shutil.move(IBTRACS_FILE, fn_nc)
except ValueError as err:
LOGGER.error('Error while downloading %s. Try to download it '+
'manually and put the file in ' +
'climada_python/data/system/', IBTRACS_URL)
raise err
sel_tracks = self._filter_ibtracs(fn_nc, storm_id, year_range, basin)
nc_data = nc.Dataset(fn_nc)
all_tracks = []
for i_track in sel_tracks:
all_tracks.append(self._read_one_raw(nc_data, i_track, provider,
correct_pres))
self.data = [track for track in all_tracks if track is not None]
[docs] def read_processed_ibtracs_csv(self, file_names):
"""Fill from processed ibtracs csv file.
Parameters:
file_names (str or list(str)): absolute file name(s) or
folder name containing the files to read.
"""
self.data = list()
all_file = get_file_names(file_names)
for file in all_file:
self._read_one_csv(file)
[docs] def read_simulations_emanuel(self, file_names, hemisphere='S'):
"""Fill from Kerry Emanuel tracks.
Parameters:
file_names (str or list(str)): absolute file name(s) or
folder name containing the files to read.
hemisphere (str, optional): 'S', 'N' or 'both'. Default: 'S'
"""
corr_files = ['temp_ccsm420thcal.mat', 'temp_ccsm4rcp85_full.mat', \
'temp_gfdl520thcal.mat', 'temp_gfdl5rcp85cal_full.mat', \
'temp_hadgem20thcal.mat', 'temp_hadgemrcp85cal_full.mat', \
'temp_miroc20thcal.mat', 'temp_mirocrcp85cal_full.mat', \
'temp_mpi20thcal.mat', 'temp_mpircp85cal_full.mat', \
'temp_mri20thcal.mat', 'temp_mrircp85cal_full.mat']
all_file = get_file_names(file_names)
if hemisphere == 'S':
hem_min, hem_max = -90, 0
elif hemisphere == 'N':
hem_min, hem_max = 0, 90
else:
hem_min, hem_max = -90, 90
self.data = list()
for file in all_file:
LOGGER.info('Reading %s.', file)
data = matlab.loadmat(file)
data_lon, data_lat, data_y, data_m, data_d, data_h, data_r, \
data_v, data_p = data['longstore'], data['latstore'], data['yearstore'], \
data['monthstore'], data['daystore'], data['hourstore'], data['rmstore'], \
data['vstore'], data['pstore']
LOGGER.info('Loading %s tracks (each %s nodes), representing %s years.', \
data_lat.shape[0], data_lat.shape[1], data_lat.shape[0]//600)
for i_track in range(data_lat.shape[0]):
pos = np.argwhere(np.logical_and(np.abs(data_lat[i_track, :]) > 0, \
np.abs(data_lon[i_track, :]) > 0)).reshape(-1)
if hem_min > data_lat[i_track, pos].min() or \
hem_max < data_lat[i_track, pos].max():
continue
datetimes = []
for month, day, hour in zip(data_m[i_track, pos], \
data_d[i_track, pos], data_h[i_track, pos]):
datetimes.append(dt.datetime(data_y[0, i_track], month, day, hour))
datetimes = np.array(datetimes)
tr_ds = xr.Dataset({ \
'time_step': ('time', np.diff(data_h[i_track, pos]).min() * \
np.ones(datetimes.size)), \
'radius_max_wind': ('time', data_r[i_track, pos]/1.852), \
'max_sustained_wind': ('time', data_v[i_track, pos]), \
'central_pressure': ('time', data_p[i_track, pos]), \
'environmental_pressure': ('time', np.ones(datetimes.size)*DEF_ENV_PRESSURE)}, \
coords={'time': datetimes, 'lat': ('time', data_lat[i_track, pos]), \
'lon': ('time', data_lon[i_track, pos])}, \
attrs={'max_sustained_wind_unit':'kn', \
'central_pressure_unit':'mb', 'name':str(i_track), 'sid':str(i_track), \
'orig_event_flag':True, 'data_provider':'Emanuel', 'basin':hemisphere, \
'id_no':i_track})
tr_ds.attrs['category'] = set_category(tr_ds.max_sustained_wind.values, \
tr_ds.max_sustained_wind_unit, SAFFIR_SIM_CAT)
if os.path.basename(file) in corr_files:
tr_ds['radius_max_wind'] *= 2
self.data.append(tr_ds)
[docs] def equal_timestep(self, time_step_h=1, land_params=False):
""" Generate interpolated track values to time steps of min_time_step.
Parameters:
time_step_h (float, optional): time step in hours to which to
interpolate. Default: 1.
land_params (bool, optional): compute on_land and dist_since_lf at
each node. Default: False.
"""
LOGGER.info('Interpolating %s tracks to %sh time steps.', self.size,
time_step_h)
if land_params:
land_geom = _calc_land_geom(self.data)
else:
land_geom = None
if self.pool:
chunksize = min(self.size//self.pool.ncpus, 1000)
self.data = self.pool.map(self._one_interp_data, self.data,
itertools.repeat(time_step_h, self.size),
itertools.repeat(land_geom, self.size),
chunksize=chunksize)
else:
new_data = list()
for track in self.data:
new_data.append(self._one_interp_data(track, time_step_h,
land_geom))
self.data = new_data
[docs] def calc_random_walk(self, ens_size=9, ens_amp0=1.5, max_angle=np.pi/10, \
ens_amp=0.1, seed=CONFIG['trop_cyclone']['random_seed'], decay=True):
""" Generate synthetic tracks. An ensamble of tracks is computed for
every track contained.
Parameters:
ens_size (int, optional): number of ensamble per original track.
Default 9.
ens_amp0 (float, optional): amplitude of max random starting point
shift degree longitude. Default: 1.5
max_angle (float, optional): maximum angle of variation, =pi is
like undirected, pi/4 means one quadrant. Default: pi/10
ens_amp (float, optional): amplitude of random walk wiggles in
degree longitude for 'directed'. Default: 0.1
seed (int, optional): random number generator seed. Put negative
value if you don't want to use it. Default: configuration file
decay (bool, optional): compute land decay in probabilistic tracks.
Default: True
"""
LOGGER.info('Computing %s synthetic tracks.', ens_size*self.size)
if seed >= 0:
np.random.seed(seed)
random_vec = list()
for track in self.data:
random_vec.append(np.random.uniform(size=ens_size*(2+track.time.size)))
num_tracks = self.size
new_ens = list()
if self.pool:
chunksize = min(num_tracks//self.pool.ncpus, 1000)
new_ens = self.pool.map(self._one_rnd_walk, self.data,
itertools.repeat(ens_size, num_tracks),
itertools.repeat(ens_amp0, num_tracks),
itertools.repeat(ens_amp, num_tracks),
itertools.repeat(max_angle, num_tracks),
random_vec, chunksize=chunksize)
else:
for i_track, track in enumerate(self.data):
new_ens.append(self._one_rnd_walk(track, ens_size, ens_amp0, \
ens_amp, max_angle, random_vec[i_track]))
self.data = list()
for ens_track in new_ens:
self.data.extend(ens_track)
if decay:
try:
land_geom = _calc_land_geom(self.data)
v_rel, p_rel = self._calc_land_decay(land_geom)
self._apply_land_decay(v_rel, p_rel, land_geom)
except ValueError as err:
LOGGER.info('No land decay coefficients could be applied. %s',
str(err))
@property
def size(self):
""" Get longitude from coord array """
return len(self.data)
[docs] def plot(self, axis=None, **kwargs):
"""Track over earth. Historical events are blue, probabilistic black.
Parameters:
axis (matplotlib.axes._subplots.AxesSubplot, optional): axis to use
kwargs (optional): arguments for LineCollection matplotlib, e.g. alpha=0.5
Returns:
matplotlib.axes._subplots.AxesSubplot
"""
if 'lw' not in kwargs:
kwargs['lw'] = 2
if 'transform' not in kwargs:
kwargs['transform'] = ccrs.PlateCarree()
if not self.size:
LOGGER.info('No tracks to plot')
return None
deg_border = 0.5
if not axis:
_, axis = u_plot.make_map()
min_lat, max_lat = 10000, -10000
min_lon, max_lon = 10000, -10000
for track in self.data:
min_lat, max_lat = min(min_lat, np.min(track.lat.values)), \
max(max_lat, np.max(track.lat.values))
min_lon, max_lon = min(min_lon, np.min(track.lon.values)), \
max(max_lon, np.max(track.lon.values))
min_lon, max_lon = min_lon-deg_border, max_lon+deg_border
min_lat, max_lat = min_lat-deg_border, max_lat+deg_border
if abs(min_lon - max_lon) > 360:
min_lon, max_lon = -180, 180
axis.set_extent(([min_lon, max_lon, min_lat, max_lat]), crs=kwargs['transform'])
u_plot.add_shapes(axis)
synth_flag = False
cmap = ListedColormap(colors=CAT_COLORS)
norm = BoundaryNorm([0] + SAFFIR_SIM_CAT, len(SAFFIR_SIM_CAT))
for track in self.data:
points = np.array([track.lon.values,
track.lat.values]).T.reshape(-1, 1, 2)
segments = np.concatenate([points[:-1], points[1:]], axis=1)
try:
segments = np.delete(segments, np.argwhere(segments[:, 0, 0] * \
segments[:, 1, 0] < 0).reshape(-1), 0)
except IndexError:
pass
if track.orig_event_flag:
track_lc = LineCollection(segments, cmap=cmap, norm=norm, \
linestyle='solid', **kwargs)
else:
synth_flag = True
track_lc = LineCollection(segments, cmap=cmap, norm=norm, \
linestyle=':', **kwargs)
track_lc.set_array(track.max_sustained_wind.values)
axis.add_collection(track_lc)
leg_lines = [Line2D([0], [0], color=CAT_COLORS[i_col], lw=2)
for i_col in range(len(SAFFIR_SIM_CAT))]
leg_names = [CAT_NAMES[i_col] for i_col
in range(1, len(SAFFIR_SIM_CAT)+1)]
if synth_flag:
leg_lines.append(Line2D([0], [0], color='grey', lw=2, ls='solid'))
leg_lines.append(Line2D([0], [0], color='grey', lw=2, ls=':'))
leg_names.append('Historical')
leg_names.append('Synthetic')
axis.legend(leg_lines, leg_names, loc=0)
return axis
[docs] def write_netcdf(self, folder_name):
""" Write a netcdf file per track with track.sid name in given folder.
Parameter:
folder_name (str): folder name where to write files
"""
list_path = [os.path.join(folder_name, track.sid+'.nc') for track in self.data]
LOGGER.info('Writting %s files.', self.size)
for track in self.data:
track.attrs['orig_event_flag'] = int(track.orig_event_flag)
xr.save_mfdataset(self.data, list_path)
[docs] def read_netcdf(self, folder_name):
""" Read all netcdf files contained in folder and fill a track per file.
Parameters:
folder_name (str): folder name where to write files
"""
file_tr = get_file_names(folder_name)
LOGGER.info('Reading %s files.', len(file_tr))
self.data = list()
for file in file_tr:
if not os.path.splitext(file)[1] == '.nc':
continue
track = xr.open_dataset(file)
track.attrs['orig_event_flag'] = bool(track.orig_event_flag)
self.data.append(track)
@staticmethod
@jit(parallel=True)
def _one_rnd_walk(track, ens_size, ens_amp0, ens_amp, max_angle, rnd_vec):
""" Interpolate values of one track.
Parameters:
track (xr.Dataset): track data
Returns:
list(xr.Dataset)
"""
ens_track = list()
n_dat = track.time.size
rand_unif_ini = rnd_vec[:2*ens_size].reshape((2, ens_size))
rand_unif_ang = rnd_vec[2*ens_size:]
xy_ini = ens_amp0 * (rand_unif_ini - 0.5)
tmp_ang = np.cumsum(2 * max_angle * rand_unif_ang - max_angle)
coord_xy = np.empty((2, ens_size * n_dat))
coord_xy[0] = np.cumsum(ens_amp * np.sin(tmp_ang))
coord_xy[1] = np.cumsum(ens_amp * np.cos(tmp_ang))
ens_track.append(track)
for i_ens in range(ens_size):
i_track = track.copy(True)
d_xy = coord_xy[:, i_ens * n_dat: (i_ens + 1) * n_dat] - \
np.expand_dims(coord_xy[:, i_ens * n_dat], axis=1)
d_lat_lon = d_xy + np.expand_dims(xy_ini[:, i_ens], axis=1)
i_track.lon.values = i_track.lon.values + d_lat_lon[0, :]
i_track.lat.values = i_track.lat.values + d_lat_lon[1, :]
i_track.attrs['orig_event_flag'] = False
i_track.attrs['name'] = i_track.attrs['name'] + '_gen' + str(i_ens+1)
i_track.attrs['sid'] = i_track.attrs['sid'] + '_gen' + str(i_ens+1)
i_track.attrs['id_no'] = i_track.attrs['id_no'] + (i_ens+1)/100
ens_track.append(i_track)
return ens_track
@staticmethod
@jit(parallel=True)
def _one_interp_data(track, time_step_h, land_geom=None):
""" Interpolate values of one track.
Parameters:
track (xr.Dataset): track data
Returns:
xr.Dataset
"""
if track.time.size > 3:
time_step = str(time_step_h) + 'H'
track_int = track.resample(time=time_step).interpolate('linear')
track_int['time_step'] = ('time', track_int.time.size * [time_step_h])
# handle change of sign in longitude
pos_lon = track.coords['lon'].values > 0
neg_lon = track.coords['lon'].values <= 0
if neg_lon.any() and pos_lon.any() and \
np.any(abs(track.coords['lon'].values[pos_lon]) > 170):
if neg_lon[0]:
track.coords['lon'].values[pos_lon] -= 360
track_int.coords['lon'] = track.lon.resample(time=time_step).\
interpolate('cubic')
track_int.coords['lon'][track_int.coords['lon'] < -180] += 360
else:
track.coords['lon'].values[neg_lon] += 360
track_int.coords['lon'] = track.lon.resample(time=time_step).\
interpolate('cubic')
track_int.coords['lon'][track_int.coords['lon'] > 180] -= 360
else:
track_int.coords['lon'] = track.lon.resample(time=time_step).\
interpolate('cubic')
track_int.coords['lat'] = track.lat.resample(time=time_step).\
interpolate('cubic')
track_int.attrs = track.attrs
track_int.attrs['category'] = set_category( \
track.max_sustained_wind.values, \
track.max_sustained_wind_unit)
else:
LOGGER.warning('Track interpolation not done. ' \
'Not enough elements for %s', track.name)
track_int = track
if land_geom:
_track_land_params(track_int, land_geom)
return track_int
def _calc_land_decay(self, land_geom, s_rel=True, check_plot=False):
"""Compute wind and pressure decay coefficients for every TC category
from the historical events according to the formulas:
- wind decay = exp(-x*A)
- pressure decay = S-(S-1)*exp(-x*B)
Parameters:
land_geom (shapely.geometry.multipolygon.MultiPolygon): land geometry
s_rel (bool, optional): use environmental presure to calc S value
(true) or central presure (false)
check_plot (bool, optional): visualize computed coefficients.
Default: False
Returns:
v_rel (dict(category: A)), p_rel (dict(category: (S, B)))
"""
hist_tracks = [track for track in self.data if track.orig_event_flag]
if not hist_tracks:
LOGGER.error('No historical tracks contained. Historical tracks' \
' are needed.')
raise ValueError
# Key is Saffir-Simpson scale
# values are lists of wind/wind at landfall
v_lf = dict()
# values are tuples with first value the S parameter, second value
# list of central pressure/central pressure at landfall
p_lf = dict()
# x-scale values to compute landfall decay
x_val = dict()
dec_val = list()
if self.pool:
chunksize = min(len(hist_tracks)//self.pool.ncpus, 1000)
dec_val = self.pool.map(_decay_values, hist_tracks, itertools.repeat(land_geom),
itertools.repeat(s_rel), chunksize=chunksize)
else:
for track in hist_tracks:
dec_val.append(_decay_values(track, land_geom, s_rel))
for (tv_lf, tp_lf, tx_val) in dec_val:
for key in tv_lf.keys():
v_lf.setdefault(key, []).extend(tv_lf[key])
p_lf.setdefault(key, ([], []))
p_lf[key][0].extend(tp_lf[key][0])
p_lf[key][1].extend(tp_lf[key][1])
x_val.setdefault(key, []).extend(tx_val[key])
v_rel, p_rel = _decay_calc_coeff(x_val, v_lf, p_lf)
if check_plot:
_check_decay_values_plot(x_val, v_lf, p_lf, v_rel, p_rel)
return v_rel, p_rel
def _apply_land_decay(self, v_rel, p_rel, land_geom, s_rel=True,
check_plot=False):
"""Compute wind and pressure decay due to landfall in synthetic tracks.
Parameters:
v_rel (dict): {category: A}, where wind decay = exp(-x*A)
p_rel (dict): (category: (S, B)}, where pressure decay
= S-(S-1)*exp(-x*B)
land_geom (shapely.geometry.multipolygon.MultiPolygon): land geometry
s_rel (bool, optional): use environmental presure to calc S value
(true) or central presure (false)
check_plot (bool, optional): visualize computed changes
"""
sy_tracks = [track for track in self.data if not track.orig_event_flag]
if not sy_tracks:
LOGGER.error('No synthetic tracks contained. Synthetic tracks' \
' are needed.')
raise ValueError
if not v_rel or not p_rel:
LOGGER.info('No decay coefficients.')
return
if check_plot:
orig_wind, orig_pres = [], []
for track in sy_tracks:
orig_wind.append(np.copy(track.max_sustained_wind.values))
orig_pres.append(np.copy(track.central_pressure.values))
if self.pool:
chunksize = min(self.size//self.pool.ncpus, 1000)
self.data = self.pool.map(_apply_decay_coeffs, self.data,
itertools.repeat(v_rel), itertools.repeat(p_rel),
itertools.repeat(land_geom), itertools.repeat(s_rel),
chunksize=chunksize)
else:
new_data = list()
for track in self.data:
new_data.append(_apply_decay_coeffs(track, v_rel, p_rel, \
land_geom, s_rel))
self.data = new_data
if check_plot:
_check_apply_decay_plot(self.data, orig_wind, orig_pres)
def _read_one_csv(self, file_name):
"""Read IBTrACS track file.
Parameters:
file_name (str): file name containing one IBTrACS track to read
"""
LOGGER.info('Reading %s', file_name)
dfr = pd.read_csv(file_name)
name = dfr['ibtracsID'].values[0]
datetimes = list()
for time in dfr['isotime'].values:
year = np.fix(time/1e6)
time = time - year*1e6
month = np.fix(time/1e4)
time = time - month*1e4
day = np.fix(time/1e2)
hour = time - day*1e2
datetimes.append(dt.datetime(int(year), int(month), int(day), \
int(hour)))
lat = dfr['cgps_lat'].values.astype('float')
lon = dfr['cgps_lon'].values.astype('float')
cen_pres = dfr['pcen'].values.astype('float')
max_sus_wind = dfr['vmax'].values.astype('float')
max_sus_wind_unit = 'kn'
cen_pres = _missing_pressure(cen_pres, max_sus_wind, lat, lon)
tr_ds = xr.Dataset()
tr_ds.coords['time'] = ('time', datetimes)
tr_ds.coords['lat'] = ('time', lat)
tr_ds.coords['lon'] = ('time', lon)
tr_ds['time_step'] = ('time', dfr['tint'].values)
tr_ds['radius_max_wind'] = ('time', dfr['rmax'].values.astype('float'))
tr_ds['max_sustained_wind'] = ('time', max_sus_wind)
tr_ds['central_pressure'] = ('time', cen_pres)
tr_ds['environmental_pressure'] = ('time', \
dfr['penv'].values.astype('float'))
tr_ds.attrs['max_sustained_wind_unit'] = max_sus_wind_unit
tr_ds.attrs['central_pressure_unit'] = 'mb'
tr_ds.attrs['name'] = name
tr_ds.attrs['sid'] = name
tr_ds.attrs['orig_event_flag'] = bool(dfr['original_data']. values[0])
tr_ds.attrs['data_provider'] = dfr['data_provider'].values[0]
tr_ds.attrs['basin'] = dfr['gen_basin'].values[0]
try:
tr_ds.attrs['id_no'] = float(name.replace('N', '0'). \
replace('S', '1'))
except ValueError:
tr_ds.attrs['id_no'] = float(str(datetimes[0].date()). \
replace('-', ''))
tr_ds.attrs['category'] = set_category(max_sus_wind, \
max_sus_wind_unit)
self.data.append(tr_ds)
@staticmethod
def _filter_ibtracs(fn_nc, storm_id, year_range, basin):
""" Select tracks from input conditions.
Parameters:
fn_nc (str): ibtracs netcdf data file name
storm_id (str os list): ibtrac id of the storm
year_range(tuple): (min_year, max_year)
basin (str): e.g. US, SA, NI, SI, SP, WP, EP, NA
Returns:
np.array
"""
nc_data = nc.Dataset(fn_nc)
storm_ids = [''.join(name.astype(str))
for name in nc_data.variables['sid']]
sel_tracks = []
# filter name
if storm_id:
if not isinstance(storm_id, list):
storm_id = [storm_id]
for storm in storm_id:
sel_tracks.append(storm_ids.index(storm))
sel_tracks = np.array(sel_tracks)
else:
# filter years
years = np.array([int(iso_name[:4]) for iso_name in storm_ids])
sel_tracks = np.argwhere(np.logical_and(years >= year_range[0], \
years <= year_range[1])).reshape(-1)
if not sel_tracks.size:
LOGGER.info('No tracks in time range (%s, %s).', year_range[0],
year_range[1])
return sel_tracks
# filter basin
if basin:
basin0 = np.array([''.join(bas.astype(str)) \
for bas in nc_data.variables['basin'][:, 0, :]])[sel_tracks]
sel_bas = np.argwhere(basin0 == basin).reshape(-1)
if not sel_tracks.size:
LOGGER.info('No tracks in basin %s.', basin)
return sel_tracks
sel_tracks = sel_tracks[sel_bas]
return sel_tracks
def _read_one_raw(self, nc_data, i_track, provider, correct_pres=False):
"""Fill given track.
Parameters:
nc_data (Dataset): netcdf data set
i_track (int): track position in netcdf data
provider (str): data provider. e.g. usa, newdelhi, bom, cma, tokyo
"""
name = ''.join(nc_data.variables['name'][i_track] \
[nc_data.variables['name'][i_track].mask == False].data.astype(str))
sid = ''.join(nc_data.variables['sid'][i_track].astype(str))
basin = ''.join(nc_data.variables['basin'][i_track, 0, :].astype(str))
LOGGER.info('Reading %s: %s', sid, name)
isot = nc_data.variables['iso_time'][i_track, :, :]
val_len = isot.mask[isot.mask == False].shape[0]//isot.shape[1]
datetimes = list()
for date_time in isot[:val_len]:
datetimes.append(dt.datetime.strptime(''.join(date_time.astype(str)),
'%Y-%m-%d %H:%M:%S'))
id_no = float(sid.replace('N', '0').replace('S', '1'))
lat = nc_data.variables[provider + '_lat'][i_track, :][:val_len]
lon = nc_data.variables[provider + '_lon'][i_track, :][:val_len]
max_sus_wind = nc_data.variables[provider + '_wind'][i_track, :]. \
data[:val_len].astype(float)
cen_pres = nc_data.variables[provider + '_pres'][i_track, :]. \
data[:val_len].astype(float)
if correct_pres:
cen_pres = _missing_pressure(cen_pres, max_sus_wind, lat, lon)
if np.all(lon == nc_data.variables[provider + '_lon']._FillValue) or \
(np.any(lon == nc_data.variables[provider + '_lon']._FillValue) and \
np.all(max_sus_wind == nc_data.variables[provider + '_wind']._FillValue) \
and np.all(cen_pres == nc_data.variables[provider + '_pres']._FillValue)):
LOGGER.warning('Skipping %s. It does not contain valid values. ' +\
'Try another provider.', sid)
return None
try:
rmax = nc_data.variables[provider + '_rmw'][i_track, :][:val_len]
except KeyError:
LOGGER.info('%s: No rmax for given provider %s. Set to default.',
sid, provider)
rmax = np.zeros(lat.size)
try:
penv = nc_data.variables[provider + '_poci'][i_track, :][:val_len]
except KeyError:
LOGGER.info('%s: No penv for given provider %s. Set to default.',
sid, provider)
penv = np.ones(lat.size)*self._set_penv(basin)
tr_ds = pd.DataFrame({'time': datetimes, 'lat': lat, 'lon':lon, \
'radius_max_wind': rmax.astype('float'), 'max_sustained_wind': max_sus_wind, \
'central_pressure': cen_pres, 'environmental_pressure': penv.astype('float')})
# deal with nans
tr_ds = self._deal_nans(tr_ds, nc_data, provider, datetimes, basin)
if not tr_ds.shape[0]:
LOGGER.warning('Skipping %s. No usable data.', sid)
return None
# ensure environmental pressure > central pressure
chg_pres = (tr_ds.central_pressure > tr_ds.environmental_pressure).values
tr_ds.environmental_pressure.values[chg_pres] = tr_ds.central_pressure.values[chg_pres]
# construct xarray
tr_ds = xr.Dataset.from_dataframe(tr_ds.set_index('time'))
tr_ds.coords['lat'] = ('time', tr_ds.lat)
tr_ds.coords['lon'] = ('time', tr_ds.lon)
tr_ds.attrs = {'max_sustained_wind_unit': 'kn', 'central_pressure_unit': 'mb', \
'name': name, 'sid': sid, 'orig_event_flag': True, 'data_provider': provider, \
'basin': basin, 'id_no': id_no, 'category': set_category(max_sus_wind, 'kn')}
return tr_ds
def _deal_nans(self, tr_ds, nc_data, provider, datetimes, basin):
""" Remove or substitute fill values of netcdf variables. """
# remove nan coordinates
tr_ds.drop(tr_ds[tr_ds.lat == nc_data.variables[provider + '_lat']. \
_FillValue].index, inplace=True)
tr_ds.drop(tr_ds[np.isnan(tr_ds.lat.values)].index, inplace=True)
tr_ds.drop(tr_ds[tr_ds.lon == nc_data.variables[provider + '_lon']. \
_FillValue].index, inplace=True)
tr_ds.drop(tr_ds[np.isnan(tr_ds.lon.values)].index, inplace=True)
# remove nan central pressures
tr_ds.drop(tr_ds[tr_ds.central_pressure == nc_data.variables[provider + '_pres']. \
_FillValue].index, inplace=True)
# remove repeated dates
tr_ds.drop_duplicates('time', inplace=True)
# fill nans of environmental_pressure and radius_max_wind
try:
tr_ds.environmental_pressure.values[tr_ds.environmental_pressure == \
nc_data.variables[provider + '_poci']._FillValue] = np.nan
tr_ds.environmental_pressure = tr_ds.environmental_pressure.ffill(limit=4). \
bfill(limit=4).fillna(self._set_penv(basin))
except KeyError:
pass
try:
tr_ds.radius_max_wind.values[tr_ds.radius_max_wind == \
nc_data.variables[provider + '_rmw']._FillValue] = np.nan
tr_ds['radius_max_wind'] = tr_ds.radius_max_wind.ffill(limit=1).bfill(limit=1).fillna(0)
except KeyError:
pass
# set time steps
tr_ds['time_step'] = np.zeros(tr_ds.shape[0])
for i_time, time in enumerate(tr_ds.time[1:], 1):
tr_ds.time_step.values[i_time] = (time - datetimes[i_time-1]).total_seconds()/3600
if tr_ds.shape[0]:
tr_ds.time_step.values[0] = tr_ds.time_step.values[-1]
return tr_ds
@staticmethod
def _set_penv(basin):
""" Set environmental pressure depending on basin """
penv = 1010
if basin in ('NI', 'SI', 'WP'):
penv = 1005
elif basin == 'SP':
penv = 1004
return penv
def _calc_land_geom(ens_track):
"""Compute land geometry used for land distance computations.
Returns:
shapely.geometry.multipolygon.MultiPolygon
"""
deg_buffer = 0.1
min_lat = np.min([np.min(track.lat.values) for track in ens_track])
min_lat = max(min_lat-deg_buffer, -90)
max_lat = np.max([np.max(track.lat.values) for track in ens_track])
max_lat = min(max_lat+deg_buffer, 90)
min_lon = np.min([np.min(track.lon.values) for track in ens_track])
min_lon = max(min_lon-deg_buffer, -180)
max_lon = np.max([np.max(track.lon.values) for track in ens_track])
max_lon = min(max_lon+deg_buffer, 180)
return coord_util.get_land_geometry(extent=(min_lon, max_lon, \
min_lat, max_lat), resolution=10)
def _track_land_params(track, land_geom):
""" Compute parameters of land for one track.
Parameters:
track (xr.Dataset): track values
land_geom (shapely.geometry.multipolygon.MultiPolygon): land geometry
"""
track['on_land'] = ('time', coord_util.coord_on_land(track.lat.values, \
track.lon.values, land_geom))
track['dist_since_lf'] = ('time', _dist_since_lf(track))
def _dist_since_lf(track):
""" Compute the distance to landfall in km point for every point on land.
Points on water get nan values.
Parameters:
track (xr.Dataset): tropical cyclone track
Returns:
np.arrray
"""
dist_since_lf = np.zeros(track.time.values.shape)
# Index in sea that follows a land index
sea_land_idx = np.where(np.diff(track.on_land.astype(int)) == 1)[0]
if not sea_land_idx.size:
return (dist_since_lf+1)*np.nan
# Index in sea that comes from previous land index
land_sea_idx = np.where(np.diff(track.on_land.astype(int)) == -1)[0] + 1
if track.on_land[-1]:
land_sea_idx = np.append(land_sea_idx, track.time.size)
orig_lf = _calc_orig_lf(track, sea_land_idx)
dist = DistanceMetric.get_metric('haversine')
nodes1 = np.radians(np.array([track.lat.values[1:],
track.lon.values[1:]]).transpose())
nodes0 = np.radians(np.array([track.lat.values[:-1],
track.lon.values[:-1]]).transpose())
dist_since_lf[1:] = dist.pairwise(nodes1, nodes0).diagonal()
dist_since_lf[np.logical_not(track.on_land.values)] = 0.0
nodes1 = np.array([track.lat.values[sea_land_idx+1],
track.lon.values[sea_land_idx+1]]).transpose()/180*np.pi
dist_since_lf[sea_land_idx+1] = \
dist.pairwise(nodes1, orig_lf/180*np.pi).diagonal()
for sea_land, land_sea in zip(sea_land_idx, land_sea_idx):
dist_since_lf[sea_land+1:land_sea] = \
np.cumsum(dist_since_lf[sea_land+1:land_sea])
dist_since_lf *= EARTH_RADIUS_KM
dist_since_lf[np.logical_not(track.on_land.values)] = np.nan
return dist_since_lf
def _calc_orig_lf(track, sea_land_idx):
""" Approximate coast coordinates in landfall as the middle point
before landfall and after.
Parameters:
track (xr.Dataset): TC track
sea_land_idx (np.array): array position of sea before landfall
Returns:
np.array (first column lat and second lon of each landfall coord)
"""
# TODO change to pos where landfall (v_landfall)??
orig_lf = np.empty((sea_land_idx.size, 2))
for i_lf, lf_point in enumerate(sea_land_idx):
orig_lf[i_lf][0] = track.lat[lf_point] + \
(track.lat[lf_point+1] - track.lat[lf_point])/2
orig_lf[i_lf][1] = track.lon[lf_point] + \
(track.lon[lf_point+1] - track.lon[lf_point])/2
return orig_lf
def _decay_v_function(a_coef, x_val):
"""Decay function used for wind after landfall."""
return np.exp(-a_coef * x_val)
def _solve_decay_v_function(v_y, x_val):
"""Solve decay function used for wind after landfall. Get A coefficient."""
return -np.log(v_y) / x_val
def _decay_p_function(s_coef, b_coef, x_val):
"""Decay function used for pressure after landfall."""
return s_coef - (s_coef - 1) * np.exp(-b_coef*x_val)
def _solve_decay_p_function(ps_y, p_y, x_val):
"""Solve decay function used for pressure after landfall.
Get B coefficient."""
return -np.log((ps_y - p_y)/(ps_y - 1.0)) / x_val
def _calc_decay_ps_value(track, p_landfall, pos, s_rel):
if s_rel:
p_land_s = track.environmental_pressure[pos].values
else:
p_land_s = track.central_pressure[pos].values
return float(p_land_s / p_landfall)
def _decay_values(track, land_geom, s_rel):
""" Compute wind and pressure relative to landafall values.
Parameters:
track (xr.Dataset): track
land_geom (shapely.geometry.multipolygon.MultiPolygon): land geometry
s_rel (bool): use environmental presure for S value (true) or
central presure (false)
Returns:
v_lf (dict): key is Saffir-Simpson scale, values are arrays of
wind/wind at landfall
p_lf (dict): key is Saffir-Simpson scale, values are tuples with
first value array of S parameter, second value array of central
pressure/central pressure at landfall
x_val (dict): key is Saffir-Simpson scale, values are arrays with
the values used as "x" in the coefficient fitting, the
distance since landfall
"""
v_lf = dict()
p_lf = dict()
x_val = dict()
_track_land_params(track, land_geom)
# Index in land that comes from previous sea index
sea_land_idx = np.where(np.diff(track.on_land.astype(int)) == 1)[0] + 1
# Index in sea that comes from previous land index
land_sea_idx = np.where(np.diff(track.on_land.astype(int)) == -1)[0] + 1
if track.on_land[-1]:
land_sea_idx = np.append(land_sea_idx, track.time.size)
if sea_land_idx.size and land_sea_idx.size <= sea_land_idx.size:
for sea_land, land_sea in zip(sea_land_idx, land_sea_idx):
v_landfall = track.max_sustained_wind[sea_land-1].values
ss_scale_idx = np.where(v_landfall < SAFFIR_SIM_CAT)[0][0]+1
v_land = track.max_sustained_wind[sea_land-1:land_sea].values
if v_land[0] > 0:
v_land = (v_land[1:]/v_land[0]).tolist()
else:
v_land = v_land[1:].tolist()
p_landfall = float(track.central_pressure[sea_land-1].values)
p_land = track.central_pressure[sea_land-1:land_sea].values
p_land = (p_land[1:]/p_land[0]).tolist()
p_land_s = _calc_decay_ps_value(track, p_landfall, land_sea-1, s_rel)
p_land_s = len(p_land)*[p_land_s]
if ss_scale_idx not in v_lf:
v_lf[ss_scale_idx] = array.array('f', v_land)
p_lf[ss_scale_idx] = (array.array('f', p_land_s),
array.array('f', p_land))
x_val[ss_scale_idx] = array.array('f', \
track.dist_since_lf[sea_land:land_sea])
else:
v_lf[ss_scale_idx].extend(v_land)
p_lf[ss_scale_idx][0].extend(p_land_s)
p_lf[ss_scale_idx][1].extend(p_land)
x_val[ss_scale_idx].extend(track.dist_since_lf[ \
sea_land:land_sea])
return v_lf, p_lf, x_val
def _decay_calc_coeff(x_val, v_lf, p_lf):
""" From track's relative velocity and pressure, compute the decay
coefficients.
- wind decay = exp(-x*A)
- pressure decay = S-(S-1)*exp(-x*A)
Parameters:
x_val (dict): key is Saffir-Simpson scale, values are lists with
the values used as "x" in the coefficient fitting, the
distance since landfall
v_lf (dict): key is Saffir-Simpson scale, values are lists of
wind/wind at landfall
p_lf (dict): key is Saffir-Simpson scale, values are tuples with
first value the S parameter, second value list of central
pressure/central pressure at landfall
Returns:
v_rel (dict()), p_rel (dict())
"""
np.warnings.filterwarnings('ignore')
v_rel = dict()
p_rel = dict()
for ss_scale, val_lf in v_lf.items():
x_val_ss = np.array(x_val[ss_scale])
y_val = np.array(val_lf)
v_coef = _solve_decay_v_function(y_val, x_val_ss)
v_coef = v_coef[np.isfinite(v_coef)]
v_coef = np.mean(v_coef)
ps_y_val = np.array(p_lf[ss_scale][0])
y_val = np.array(p_lf[ss_scale][1])
y_val[ps_y_val <= y_val] = np.nan
y_val[ps_y_val <= 1] = np.nan
valid_p = np.isfinite(y_val)
ps_y_val = ps_y_val[valid_p]
y_val = y_val[valid_p]
p_coef = _solve_decay_p_function(ps_y_val, y_val, x_val_ss[valid_p])
ps_y_val = np.mean(ps_y_val)
p_coef = np.mean(p_coef)
if np.isfinite(v_coef) and np.isfinite(ps_y_val) and np.isfinite(ps_y_val):
v_rel[ss_scale] = v_coef
p_rel[ss_scale] = (ps_y_val, p_coef)
scale_fill = np.array(list(p_rel.keys()))
if not scale_fill.size:
LOGGER.info('No historical track with landfall.')
return v_rel, p_rel
for ss_scale in range(1, len(SAFFIR_SIM_CAT)+1):
if ss_scale not in p_rel:
close_scale = scale_fill[np.argmin(np.abs(scale_fill-ss_scale))]
LOGGER.debug('No historical track of category %s with landfall. ' \
'Decay parameters from category %s taken.',
CAT_NAMES[ss_scale], CAT_NAMES[close_scale])
v_rel[ss_scale] = v_rel[close_scale]
p_rel[ss_scale] = p_rel[close_scale]
return v_rel, p_rel
def _check_decay_values_plot(x_val, v_lf, p_lf, v_rel, p_rel):
""" Generate one graph with wind decay and an other with central pressure
decay, true and approximated."""
# One graph per TC category
for track_cat, color in zip(v_lf.keys(),
cm_mp.rainbow(np.linspace(0, 1, len(v_lf)))):
_, axes = plt.subplots(2, 1)
x_eval = np.linspace(0, np.max(x_val[track_cat]), 20)
axes[0].set_xlabel('Distance from landfall (km)')
axes[0].set_ylabel('Max sustained wind relative to landfall')
axes[0].set_title('Wind')
axes[0].plot(x_val[track_cat], v_lf[track_cat], '*', c=color,
label=CAT_NAMES[track_cat])
axes[0].plot(x_eval, _decay_v_function(v_rel[track_cat], x_eval),
'-', c=color)
axes[1].set_xlabel('Distance from landfall (km)')
axes[1].set_ylabel('Central pressure relative to landfall')
axes[1].set_title('Pressure')
axes[1].plot(x_val[track_cat], p_lf[track_cat][1], '*', c=color,
label=CAT_NAMES[track_cat])
axes[1].plot(x_eval, _decay_p_function(p_rel[track_cat][0], \
p_rel[track_cat][1], x_eval), '-', c=color)
def _apply_decay_coeffs(track, v_rel, p_rel, land_geom, s_rel):
""" Change track's max sustained wind and central pressure using the land
decay coefficients.
Parameters:
track (xr.Dataset): TC track
v_rel (dict): {category: A}, where wind decay = exp(-x*A)
p_rel (dict): (category: (S, B)},
where pressure decay = S-(S-1)*exp(-x*B)
land_geom (shapely.geometry.multipolygon.MultiPolygon): land geometry
s_rel (bool): use environmental presure for S value (true) or
central presure (false)
Returns:
xr.Dataset
"""
# return if historical track
if track.orig_event_flag:
return track
_track_land_params(track, land_geom)
# Index in land that comes from previous sea index
sea_land_idx = np.where(np.diff(track.on_land.astype(int)) == 1)[0] + 1
# Index in sea that comes from previous land index
land_sea_idx = np.where(np.diff(track.on_land.astype(int)) == -1)[0] + 1
if track.on_land[-1]:
land_sea_idx = np.append(land_sea_idx, track.time.size)
if not sea_land_idx.size or land_sea_idx.size > sea_land_idx.size:
return track
for idx, (sea_land, land_sea) \
in enumerate(zip(sea_land_idx, land_sea_idx)):
v_landfall = track.max_sustained_wind[sea_land-1].values
p_landfall = float(track.central_pressure[sea_land-1].values)
try:
ss_scale_idx = np.where(v_landfall < SAFFIR_SIM_CAT)[0][0]+1
except IndexError:
continue
if land_sea - sea_land == 1:
continue
p_decay = _calc_decay_ps_value(track, p_landfall, land_sea-1, s_rel)
p_decay = _decay_p_function(p_decay, p_rel[ss_scale_idx][1], \
track.dist_since_lf[sea_land:land_sea].values)
# dont applay decay if it would decrease central pressure
p_decay[p_decay < 1] = track.central_pressure[sea_land:land_sea][p_decay < 1]/p_landfall
track.central_pressure[sea_land:land_sea] = p_landfall * p_decay
v_decay = _decay_v_function(v_rel[ss_scale_idx], \
track.dist_since_lf[sea_land:land_sea].values)
# dont applay decay if it would increas wind speeds
v_decay[v_decay > 1] = track.max_sustained_wind[sea_land:land_sea][v_decay > 1]/v_landfall
track.max_sustained_wind[sea_land:land_sea] = v_landfall * v_decay
# correct values of sea between two landfalls
if land_sea < track.time.size and idx+1 < sea_land_idx.size:
rndn = 0.1 * float(np.abs(np.random.normal(size=1)*5)+6)
r_diff = track.central_pressure[land_sea].values - \
track.central_pressure[land_sea-1].values + rndn
track.central_pressure[land_sea:sea_land_idx[idx+1]] += - r_diff
rndn = rndn * 10 # mean value 10
r_diff = track.max_sustained_wind[land_sea].values - \
track.max_sustained_wind[land_sea-1].values - rndn
track.max_sustained_wind[land_sea:sea_land_idx[idx+1]] += - r_diff
# correct limits
np.warnings.filterwarnings('ignore')
cor_p = track.central_pressure.values > track.environmental_pressure.values
track.central_pressure[cor_p] = track.environmental_pressure[cor_p]
track.max_sustained_wind[track.max_sustained_wind < 0] = 0
track.attrs['category'] = set_category(track.max_sustained_wind.values,
track.max_sustained_wind_unit)
return track
def _check_apply_decay_plot(all_tracks, syn_orig_wind, syn_orig_pres):
""" Plot wind and presure before and after correction for synthetic tracks.
Plot wind and presure for unchanged historical tracks."""
# Plot synthetic tracks
sy_tracks = [track for track in all_tracks if not track.orig_event_flag]
graph_v_b, graph_v_a, graph_p_b, graph_p_a, graph_pd_a, graph_ped_a = \
_check_apply_decay_syn_plot(sy_tracks, syn_orig_wind,
syn_orig_pres)
# Plot historic tracks
hist_tracks = [track for track in all_tracks if track.orig_event_flag]
graph_hv, graph_hp, graph_hpd_a, graph_hped_a = \
_check_apply_decay_hist_plot(hist_tracks)
# Put legend and fix size
leg_lines = [Line2D([0], [0], color=CAT_COLORS[i_col], lw=2)
for i_col in range(len(SAFFIR_SIM_CAT))]
leg_lines.append(Line2D([0], [0], color='k', lw=2))
leg_names = [CAT_NAMES[i_col] for i_col in range(1, len(SAFFIR_SIM_CAT)+1)]
leg_names.append('Sea')
all_gr = [graph_v_a, graph_v_b, graph_p_a, graph_p_b, graph_ped_a,
graph_pd_a, graph_hv, graph_hp, graph_hpd_a, graph_hped_a]
for graph in all_gr:
graph.axs[0].legend(leg_lines, leg_names)
fig, _ = graph.get_elems()
fig.set_size_inches(18.5, 10.5)
def _check_apply_decay_syn_plot(sy_tracks, syn_orig_wind,
syn_orig_pres):
"""Plot winds and pressures of synthetic tracks before and after
correction."""
_, graph_v_b = plt.subplots()
graph_v_b.set_title('Wind before land decay correction')
graph_v_b.set_xlabel('Node number')
graph_v_b.set_ylabel('Max sustained wind (kn)')
_, graph_v_a = plt.subplots()
graph_v_a.set_title('Wind after land decay correction')
graph_v_a.set_xlabel('Node number')
graph_v_a.set_ylabel('Max sustained wind (kn)')
_, graph_p_b = plt.subplots()
graph_p_b.set_title('Pressure before land decay correctionn')
graph_p_b.set_xlabel('Node number')
graph_p_b.set_ylabel('Central pressure (mb)')
_, graph_p_a = plt.subplots()
graph_p_a.set_title('Pressure after land decay correctionn')
graph_p_a.set_xlabel('Node number')
graph_p_a.set_ylabel('Central pressure (mb)')
_, graph_pd_a = plt.subplots()
graph_pd_a.set_title('Relative pressure after land decay correction')
graph_pd_a.set_xlabel('Distance from landfall (km)')
graph_pd_a.set_ylabel('Central pressure relative to landfall')
_, graph_ped_a = plt.subplots()
graph_ped_a.set_title('Environmental - central pressure after land decay correction')
graph_ped_a.set_xlabel('Distance from landfall (km)')
graph_ped_a.set_ylabel('Environmental pressure - Central pressure (mb)')
for track, orig_wind, orig_pres in \
zip(sy_tracks, syn_orig_wind, syn_orig_pres):
# Index in land that comes from previous sea index
sea_land_idx = np.where(np.diff(track.on_land.astype(int)) == 1)[0]+1
# Index in sea that comes from previous land index
land_sea_idx = np.where(np.diff(track.on_land.astype(int)) == -1)[0]+1
if track.on_land[-1]:
land_sea_idx = np.append(land_sea_idx, track.time.size)
if sea_land_idx.size and land_sea_idx.size <= sea_land_idx.size:
for sea_land, land_sea in zip(sea_land_idx, land_sea_idx):
v_lf = track.max_sustained_wind[sea_land-1].values
p_lf = track.central_pressure[sea_land-1].values
ss_scale = np.where(v_lf < SAFFIR_SIM_CAT)[0][0]
on_land = np.arange(track.time.size)[sea_land:land_sea]
graph_v_a.plot(on_land, track.max_sustained_wind[on_land],
'o', c=CAT_COLORS[ss_scale])
graph_v_b.plot(on_land, orig_wind[on_land],
'o', c=CAT_COLORS[ss_scale])
graph_p_a.plot(on_land, track.central_pressure[on_land],
'o', c=CAT_COLORS[ss_scale])
graph_p_b.plot(on_land, orig_pres[on_land],
'o', c=CAT_COLORS[ss_scale])
graph_pd_a.plot(track.dist_since_lf[on_land],
track.central_pressure[on_land]/p_lf,
'o', c=CAT_COLORS[ss_scale])
graph_ped_a.plot(track.dist_since_lf[on_land],
track.environmental_pressure[on_land]-
track.central_pressure[on_land],
'o', c=CAT_COLORS[ss_scale])
on_sea = np.arange(track.time.size)[np.logical_not(track.on_land)]
graph_v_a.plot(on_sea, track.max_sustained_wind[on_sea],
'o', c='k', markersize=5)
graph_v_b.plot(on_sea, orig_wind[on_sea],
'o', c='k', markersize=5)
graph_p_a.plot(on_sea, track.central_pressure[on_sea],
'o', c='k', markersize=5)
graph_p_b.plot(on_sea, orig_pres[on_sea],
'o', c='k', markersize=5)
return graph_v_b, graph_v_a, graph_p_b, graph_p_a, graph_pd_a, graph_ped_a
def _check_apply_decay_hist_plot(hist_tracks):
"""Plot winds and pressures of historical tracks."""
_, graph_hv = plt.subplots()
graph_hv.set_title('Historical wind')
graph_hv.set_xlabel('Node number')
graph_hv.set_ylabel('Max sustained wind (kn)')
_, graph_hp = plt.subplots()
graph_hp.set_title('Historical pressure')
graph_hp.set_xlabel('Node number')
graph_hp.set_ylabel('Central pressure (mb)')
_, graph_hpd_a = plt.subplots()
graph_hpd_a.set_title('Historical relative pressure')
graph_hpd_a.set_xlabel('Distance from landfall (km)')
graph_hpd_a.set_ylabel('Central pressure relative to landfall')
_, graph_hped_a = plt.subplots()
graph_hped_a.set_title('Historical environmental - central pressure')
graph_hped_a.set_xlabel('Distance from landfall (km)')
graph_hped_a.set_ylabel('Environmental pressure - Central pressure (mb)')
for track in hist_tracks:
# Index in land that comes from previous sea index
sea_land_idx = np.where(np.diff(track.on_land.astype(int)) == 1)[0]+1
# Index in sea that comes from previous land index
land_sea_idx = np.where(np.diff(track.on_land.astype(int)) == -1)[0]+1
if track.on_land[-1]:
land_sea_idx = np.append(land_sea_idx, track.time.size)
if sea_land_idx.size and land_sea_idx.size <= sea_land_idx.size:
for sea_land, land_sea in zip(sea_land_idx, land_sea_idx):
p_lf = track.central_pressure[sea_land-1].values
scale = np.where(track.max_sustained_wind[sea_land-1].values <
SAFFIR_SIM_CAT)[0][0]
on_land = np.arange(track.time.size)[sea_land:land_sea]
graph_hv.add_curve(on_land, track.max_sustained_wind[on_land],
'o', c=CAT_COLORS[scale])
graph_hp.add_curve(on_land, track.central_pressure[on_land],
'o', c=CAT_COLORS[scale])
graph_hpd_a.plot(track.dist_since_lf[on_land],
track.central_pressure[on_land]/p_lf,
'o', c=CAT_COLORS[scale])
graph_hped_a.plot(track.dist_since_lf[on_land],
track.environmental_pressure[on_land]-
track.central_pressure[on_land],
'o', c=CAT_COLORS[scale])
on_sea = np.arange(track.time.size)[np.logical_not(track.on_land)]
graph_hp.plot(on_sea, track.central_pressure[on_sea],
'o', c='k', markersize=5)
graph_hv.plot(on_sea, track.max_sustained_wind[on_sea],
'o', c='k', markersize=5)
return graph_hv, graph_hp, graph_hpd_a, graph_hped_a
def _missing_pressure(cen_pres, v_max, lat, lon):
"""Deal with missing central pressures."""
if np.argwhere(cen_pres <= 0).size > 0:
cen_pres = 1024.388 + 0.047*lat - 0.029*lon - 0.818*v_max # ibtracs 1980 -2013 (r2=0.91)
# cen_pres = 1024.688+0.055*lat-0.028*lon-0.815*v_max # peduzzi
return cen_pres
def _change_max_wind_unit(wind, unit_orig, unit_dest):
""" Compute maximum wind speed in unit_dest
Parameters:
wind (np.array): wind
unit_orig (str): units of wind
unit_dest (str): units to change wind
Returns:
double
"""
ureg = UnitRegistry()
if unit_orig in ('kn', 'kt'):
ur_orig = ureg.knot
elif unit_orig == 'mph':
ur_orig = ureg.mile / ureg.hour
elif unit_orig == 'm/s':
ur_orig = ureg.meter / ureg.second
elif unit_orig == 'km/h':
ur_orig = ureg.kilometer / ureg.hour
else:
LOGGER.error('Unit not recognised %s.', unit_orig)
raise ValueError
if unit_dest in ('kn', 'kt'):
ur_dest = ureg.knot
elif unit_dest == 'mph':
ur_dest = ureg.mile / ureg.hour
elif unit_dest == 'm/s':
ur_dest = ureg.meter / ureg.second
elif unit_dest == 'km/h':
ur_dest = ureg.kilometer / ureg.hour
else:
LOGGER.error('Unit not recognised %s.', unit_dest)
raise ValueError
return (np.nanmax(wind) * ur_orig).to(ur_dest).magnitude
[docs]def set_category(max_sus_wind, max_sus_wind_unit, saffir_scale=None):
"""Add storm category according to saffir-simpson hurricane scale
- -1 tropical depression
- 0 tropical storm
- 1 Hurrican category 1
- 2 Hurrican category 2
- 3 Hurrican category 3
- 4 Hurrican category 4
- 5 Hurrican category 5
Parameters:
max_sus_wind (np.array): max sustained wind
max_sus_wind_unit (str): units of max sustained wind
saffir_scale (list, optional): Saffir-Simpson scale in same units as wind
Returns:
double
"""
if saffir_scale:
max_wind = np.nanmax(max_sus_wind)
elif max_sus_wind_unit != 'kn':
max_wind = _change_max_wind_unit(max_sus_wind, max_sus_wind_unit, 'kn')
saffir_scale = SAFFIR_SIM_CAT
else:
saffir_scale = SAFFIR_SIM_CAT
max_wind = np.nanmax(max_sus_wind)
try:
return (np.argwhere(max_wind < saffir_scale) - 1)[0][0]
except IndexError:
return -1