"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Generate synthetic tropical cyclone tracks from real ones
"""
import array
import itertools
import logging
import warnings
import matplotlib.cm as cm_mp
from matplotlib.lines import Line2D
import matplotlib.pyplot as plt
import numba
import numpy as np
from climada import CONFIG
import climada.util.coordinates
import climada.hazard.tc_tracks
LOGGER = logging.getLogger(__name__)
LANDFALL_DECAY_V = {
-1: 0.00012859077693295416,
0: 0.0017226346292718126,
1: 0.002309772914350468,
2: 0.0025968221565522698,
3: 0.002626252944053856,
4: 0.002550639312763181,
5: 0.003788695795963695
}
"""Global landfall decay parameters for wind speed by TC category.
Keys are TC categories with -1='TD', 0='TS', 1='Cat 1', ..., 5='Cat 5'.
It is v_rel as derived from:
>>> tracks = TCTracks.from_ibtracs_netcdf(year_range=(1980,2019), estimate_missing=True)
>>> extent = tracks.get_extent()
>>> land_geom = climada.util.coordinates.get_land_geometry(
... extent=extent, resolution=10
... )
>>> v_rel, p_rel = _calc_land_decay(tracks.data, land_geom, pool=tracks.pool)
"""
LANDFALL_DECAY_P = {
-1: (1.0088807492745373, 0.002117478217863062),
0: (1.0192813768091684, 0.003068578025845065),
1: (1.0362982218631644, 0.003620816186262243),
2: (1.0468630800617038, 0.004067381088015585),
3: (1.0639055205005432, 0.003708174876364079),
4: (1.0828373148889825, 0.003997492773076179),
5: (1.1088615145002092, 0.005224331234796362)}
"""Global landfall decay parameters for pressure by TC category.
Keys are TC categories with -1='TD', 0='TS', 1='Cat 1', ..., 5='Cat 5'.
It is p_rel as derived from:
>>> tracks = TCTracks.from_ibtracs_netcdf(year_range=(1980,2019), estimate_missing=True)
>>> extent = tracks.get_extent()
>>> land_geom = climada.util.coordinates.get_land_geometry(
... extent=extent, resolution=10
... )
>>> v_rel, p_rel = _calc_land_decay(tracks.data, land_geom, pool=tracks.pool)
"""
[docs]
def calc_perturbed_trajectories(tracks,
nb_synth_tracks=9,
max_shift_ini=0.75,
max_dspeed_rel=0.3,
max_ddirection=np.pi / 360,
autocorr_dspeed=0.85,
autocorr_ddirection=0.5,
seed=CONFIG.hazard.trop_cyclone.random_seed.int(),
decay=True,
use_global_decay_params=True,
pool=None):
"""
Generate synthetic tracks based on directed random walk. An ensemble of nb_synth_tracks
synthetic tracks is computed for every track contained in self.
The methodology perturbs the tracks locations, and if decay is True it additionally
includes decay of wind speed and central pressure drop after landfall. No other track
parameter is perturbed.
The track starting point location is perturbed by random uniform values of
magnitude up to max_shift_ini in both longitude and latitude. Then, each segment
between two consecutive points is perturbed in direction and distance (i.e.,
translational speed). These perturbations can be correlated in time, i.e.,
the perturbation in direction applied to segment i is correlated with the perturbation
in direction applied to segment i-1 (and similarly for the perturbation in translational
speed).
Perturbations in track direction and temporal auto-correlations in perturbations are
on an hourly basis, and the perturbations in translational speed is relative.
Hence, the parameter values are relatively insensitive to the temporal
resolution of the tracks. Note however that all tracks should be at the same
temporal resolution, which can be achieved using equal_timestep().
max_dspeed_rel and autocorr_dspeed control the spread along the track ('what distance
does the track run for'), while max_ddirection and autocorr_ddirection control the spread
perpendicular to the track movement ('how does the track diverge in direction').
max_dspeed_rel and max_ddirection control the amplitude of perturbations at each track
timestep but perturbations may tend to compensate each other over time, leading to
a similar location at the end of the track, while autocorr_dspeed and autocorr_ddirection
control how these perturbations persist in time and hence the amplitude of the
perturbations towards the end of the track.
Note that the default parameter values have been only roughly calibrated so that
the frequency of tracks in each 5x5degree box remains approximately constant.
This is not an in-depth calibration and should be treated as such.
The object is mutated in-place.
Parameters
----------
tracks : climada.hazard.TCTracks
Tracks data.
nb_synth_tracks : int, optional
Number of ensemble members per track. Default: 9.
max_shift_ini : float, optional
Amplitude of max random starting point shift in decimal degree
(up to +/-max_shift_ini for longitude and latitude). Default: 0.75.
max_dspeed_rel : float, optional
Amplitude of translation speed perturbation in relative terms
(e.g., 0.2 for +/-20%). Default: 0.3.
max_ddirection : float, optional
Amplitude of track direction (bearing angle) perturbation
per hour, in radians. Default: pi/360.
autocorr_dspeed : float, optional
Temporal autocorrelation in translation speed perturbation
at a lag of 1 hour. Default: 0.85.
autocorr_ddirection : float, optional
Temporal autocorrelation of translational direction perturbation
at a lag of 1 hour. Default: 0.5.
seed : int, optional
Random number generator seed for replicability of random walk.
Put negative value if you don't want to use it. Default: configuration file.
decay : bool, optional
Whether to apply landfall decay in probabilistic tracks. Default: True.
use_global_decay_params : bool, optional
Whether to use precomputed global parameter values for landfall decay
obtained from IBTrACS (1980-2019). If False, parameters are fitted
using historical tracks in input parameter 'tracks', in which case the
landfall decay applied depends on the tracks passed as an input and may
not be robust if few historical tracks make landfall in this object.
Default: True.
pool : pathos.pool, optional
Pool that will be used for parallel computation when applicable. If not given, the
pool attribute of `tracks` will be used. Default: None
"""
LOGGER.info('Computing %s synthetic tracks.', nb_synth_tracks * tracks.size)
pool = tracks.pool if pool is None else pool
if seed >= 0:
np.random.seed(seed)
# ensure tracks have constant time steps
time_step_h = np.unique(np.concatenate([np.unique(x['time_step']) for x in tracks.data]))
if not np.allclose(time_step_h, time_step_h[0]):
raise ValueError('Tracks have different temporal resolution. '
'Please ensure constant time steps by applying equal_timestep beforehand')
time_step_h = time_step_h[0]
# number of random value per synthetic track:
# 2*nb_synth_tracks for starting points (lon, lat)
# nb_synth_tracks*(track.time.size-1) for angle and same for translation perturbation
# hence sum is nb_synth_tracks * (2 + 2*(size-1)) = nb_synth_tracks * 2 * size
# https://stats.stackexchange.com/questions/48086/algorithm-to-produce-autocorrelated-uniformly-distributed-number
if autocorr_ddirection == 0 and autocorr_dspeed == 0:
random_vec = [np.random.uniform(size=nb_synth_tracks * (2 * track.time.size))
for track in tracks.data]
else:
random_vec = [np.concatenate((np.random.uniform(size=nb_synth_tracks * 2),
_random_uniform_ac(nb_synth_tracks * (track.time.size - 1),
autocorr_ddirection, time_step_h),
_random_uniform_ac(nb_synth_tracks * (track.time.size - 1),
autocorr_dspeed, time_step_h)))
if track.time.size > 1 else np.random.uniform(size=nb_synth_tracks * 2)
for track in tracks.data]
if pool:
chunksize = max(min(tracks.size // pool.ncpus, 1000), 1)
new_ens = pool.map(_one_rnd_walk, tracks.data,
itertools.repeat(nb_synth_tracks, tracks.size),
itertools.repeat(max_shift_ini, tracks.size),
itertools.repeat(max_dspeed_rel, tracks.size),
itertools.repeat(max_ddirection, tracks.size),
random_vec, chunksize=chunksize)
else:
new_ens = [_one_rnd_walk(track, nb_synth_tracks, max_shift_ini,
max_dspeed_rel, max_ddirection, rand)
for track, rand in zip(tracks.data, random_vec)]
cutoff_track_ids_tc = [x[1] for x in new_ens]
cutoff_track_ids_tc = sum(cutoff_track_ids_tc, [])
cutoff_track_ids_ts = [x[2] for x in new_ens]
cutoff_track_ids_ts = sum(cutoff_track_ids_ts, [])
if len(cutoff_track_ids_tc) > 0:
LOGGER.info('The following generated synthetic tracks moved beyond '
'the range of [-70, 70] degrees latitude. Cut out '
'at TC category >1: %s.',
', '.join(cutoff_track_ids_tc))
if len(cutoff_track_ids_ts) > 0:
LOGGER.debug('The following generated synthetic tracks moved beyond '
'the range of [-70, 70] degrees latitude. Cut out '
'at TC category <= 1: %s.',
', '.join(cutoff_track_ids_ts))
new_ens = [x[0] for x in new_ens]
tracks.data = sum(new_ens, [])
if decay:
extent = tracks.get_extent()
land_geom = climada.util.coordinates.get_land_geometry(
extent=extent, resolution=10
)
if use_global_decay_params:
tracks.data = _apply_land_decay(tracks.data, LANDFALL_DECAY_V,
LANDFALL_DECAY_P, land_geom, pool=pool)
else:
# fit land decay coefficients based on historical tracks
hist_tracks = [track for track in tracks.data if track.orig_event_flag]
if hist_tracks:
try:
v_rel, p_rel = _calc_land_decay(hist_tracks, land_geom, pool=pool)
tracks.data = _apply_land_decay(
tracks.data, v_rel, p_rel, land_geom, pool=pool)
except ValueError as verr:
raise ValueError('Landfall decay could not be applied.') from verr
else:
raise ValueError('No historical tracks found. Historical'
' tracks are needed for land decay calibration'
' if use_global_decay_params=False.')
def _one_rnd_walk(track, nb_synth_tracks, max_shift_ini, max_dspeed_rel, max_ddirection, rnd_vec):
"""
Apply random walk to one track.
Parameters
----------
track : xr.Dataset
Track data.
nb_synth_tracks : int, optional
Number of ensemble members per track. Default: 9.
max_shift_ini : float, optional
Amplitude of max random starting point shift in decimal degree
(up to +/-max_shift_ini for longitude and latitude). Default: 0.75.
max_dspeed_rel : float, optional
Amplitude of translation speed perturbation in relative terms
(e.g., 0.2 for +/-20%). Default: 0.3.
max_ddirection : float, optional
Amplitude of track direction (bearing angle) perturbation
per hour, in radians. Default: pi/180.
rnd_vec : np.ndarray of shape (2 * nb_synth_tracks * track.time.size),)
Vector of random perturbations.
Returns
-------
ens_track : list(xr.Dataset)
List of the track and the generated synthetic tracks.
cutoff_track_ids_tc : List of str
List containing information about the tracks that were cut off at high
latitudes with wind speed of TC category 2-5.
curoff_track_ids_ts : List of str
List containing information about the tracks that were cut off at high
latitudes with a wind speed up to TC category 1.
"""
ens_track = list()
n_dat = track.time.size
n_seg = n_dat - 1
xy_ini = max_shift_ini * (2 * rnd_vec[:2 * nb_synth_tracks].reshape((2, nb_synth_tracks)) - 1)
[dt] = np.unique(track['time_step'])
ens_track.append(track)
cutoff_track_ids_ts = []
cutoff_track_ids_tc = []
for i_ens in range(nb_synth_tracks):
i_track = track.copy(True)
# select angular perturbation for that synthetic track
i_start_ang = 2 * nb_synth_tracks + i_ens * n_seg
i_end_ang = i_start_ang + track.time.size - 1
# scale by maximum perturbation and time step in hour (temporal-resolution independent)
ang_pert = dt * np.degrees(max_ddirection * (2 * rnd_vec[i_start_ang:i_end_ang] - 1))
ang_pert_cum = np.cumsum(ang_pert)
# select translational speed perturbation for that synthetic track
i_start_trans = 2 * nb_synth_tracks + nb_synth_tracks * n_seg + i_ens * n_seg
i_end_trans = i_start_trans + track.time.size - 1
# scale by maximum perturbation and time step in hour (temporal-resolution independent)
trans_pert = 1 + max_dspeed_rel * (2 * rnd_vec[i_start_trans:i_end_trans] - 1)
# get bearings and angular distance for the original track
bearings = _get_bearing_angle(i_track.lon.values, i_track.lat.values)
angular_dist = climada.util.coordinates.dist_approx(i_track.lat.values[:-1, None],
i_track.lon.values[:-1, None],
i_track.lat.values[1:, None],
i_track.lon.values[1:, None],
method="geosphere",
units="degree")[:, 0, 0]
# apply perturbation to lon / lat
new_lon = np.zeros_like(i_track.lon.values)
new_lat = np.zeros_like(i_track.lat.values)
new_lon[0] = i_track.lon.values[0] + xy_ini[0, i_ens]
new_lat[0] = i_track.lat.values[0] + xy_ini[1, i_ens]
last_idx = i_track.time.size
for i in range(0, len(new_lon) - 1):
new_lon[i + 1], new_lat[i + 1] = \
_get_destination_points(new_lon[i], new_lat[i],
bearings[i] + ang_pert_cum[i],
trans_pert[i] * angular_dist[i])
# if track crosses latitudinal thresholds (+-70°),
# keep up to this segment (i+1), set i+2 as last point,
# and discard all further points > i+2.
if i+2 < last_idx and (new_lat[i + 1] > 70 or new_lat[i + 1] < -70):
last_idx = i + 2
# end the track here
max_wind_end = i_track.max_sustained_wind.values[last_idx]
ss_scale_end = climada.hazard.tc_tracks.set_category(max_wind_end,
i_track.max_sustained_wind_unit)
# TC category at ending point should not be higher than 1
cutoff_txt = (f"{i_track.attrs['name']}_gen{i_ens + 1}"
f" ({climada.hazard.tc_tracks.CAT_NAMES[ss_scale_end]})")
if ss_scale_end > 1:
cutoff_track_ids_tc = cutoff_track_ids_tc + [cutoff_txt]
else:
cutoff_track_ids_ts = cutoff_track_ids_ts + [cutoff_txt]
break
# make sure longitude values are within (-180, 180)
climada.util.coordinates.lon_normalize(new_lon, center=0.0)
i_track.lon.values = new_lon
i_track.lat.values = new_lat
i_track.attrs['orig_event_flag'] = False
i_track.attrs['name'] = f"{i_track.attrs['name']}_gen{i_ens + 1}"
i_track.attrs['sid'] = f"{i_track.attrs['sid']}_gen{i_ens + 1}"
i_track.attrs['id_no'] = i_track.attrs['id_no'] + (i_ens + 1) / 100
i_track = i_track.isel(time=slice(None, last_idx))
ens_track.append(i_track)
return ens_track, cutoff_track_ids_tc, cutoff_track_ids_ts
def _random_uniform_ac(n_ts, autocorr, time_step_h):
"""
Generate a series of autocorrelated uniformly distributed random numbers.
This implements the algorithm described here to derive a uniformly distributed
series with specified autocorrelation (here at a lag of 1 hour):
https://stats.stackexchange.com/questions/48086/
algorithm-to-produce-autocorrelated-uniformly-distributed-number
Autocorrelation is specified at a lag of 1 hour. To get a time series at a
different temporal resolution (time_step_h), an hourly time series is generated
and resampled (using linear interpolation) to the target resolution.
Parameters
----------
n_ts : int
Length of the series.
autocorr : float
Autocorrelation (between -1 and 1) at hourly time scale.
time_step_h : float
Temporal resolution of the time series, in hour.
Returns
-------
x_ts : numpy.ndarray of shape (n_ts,)
n values at time_step_h intervals that are uniformly distributed and with
the requested temporal autocorrelation at a scale of 1 hour.
"""
# generate autocorrelated 1-hourly perturbations, so first create hourly
# time series of perturbations
n_ts_hourly_exact = n_ts * time_step_h
n_ts_hourly = int(np.ceil(n_ts_hourly_exact))
x = np.random.normal(size=n_ts_hourly)
theta = np.arccos(autocorr)
for i in range(1, len(x)):
x[i] = _h_ac(x[i - 1], x[i], theta)
# scale x to have magnitude [0,1]
x = (x + np.sqrt(3)) / (2 * np.sqrt(3))
# resample at target time step
x_ts = np.interp(np.arange(start=0, stop=n_ts_hourly_exact, step=time_step_h),
np.arange(n_ts_hourly), x)
return x_ts
@numba.njit
def _h_ac(x, y, theta):
"""
Generate next random number from current number for autocorrelated uniform series
Implements function h defined in:
https://stats.stackexchange.com/questions/48086/
algorithm-to-produce-autocorrelated-uniformly-distributed-number
Parameters
----------
x : float
Previous random number.
y : float
Random Standard Normal.
theta : float
arccos of autocorrelation.
Returns
-------
x_next : float
Next value in the series.
"""
gamma = np.abs(np.mod(theta, np.pi) - \
np.floor((np.mod(theta, np.pi) / (np.pi / 2)) + 0.5) * np.pi / 2)
x_next = 2 * np.sqrt(3) * (_f_ac(np.cos(theta) * x + np.sin(theta) * y, gamma) - 1 / 2)
return x_next
@numba.njit
def _f_ac(z, theta):
"""
F transform for autocorrelated random uniform series generation
Implements function F defined in:
https://stats.stackexchange.com/questions/48086/
algorithm-to-produce-autocorrelated-uniformly-distributed-number
i.e., the CDF of Y.
Parameters
----------
z : float
Value.
theta : float
arccos of autocorrelation.
Returns
-------
res : float
CDF at value z
"""
c = np.cos(theta)
s = np.sin(theta)
if z >= np.sqrt(3) * (c + s):
res = 1
elif z > np.sqrt(3) * (c - s):
res = 1 / 12 / np.sin(2 * theta) * \
(-3 - z ** 2 + 2 * np.sqrt(3) * z * (c + s) + 9 * np.sin(2 * theta))
elif z > np.sqrt(3) * (-c + s):
res = 1 / 6 * (3 + np.sqrt(3) * z / c)
elif z > -np.sqrt(3) * (c + s):
res = 1 / 12 / np.sin(2 * theta) * \
(z ** 2 + 2 * np.sqrt(3) * z * (c + s) + 3 * (1 + np.sin(2 * theta)))
else:
res = 0
return res
@numba.njit
def _get_bearing_angle(lon, lat):
"""
Compute bearing angle of great circle paths defined by consecutive points
Returns initial bearing (also called forward azimuth) of the n-1 great circle
paths define by n consecutive longitude/latitude points. The bearing is the angle
(clockwise from North) which if followed in a straight line along a great-circle
arc will take you from the start point to the end point. See also:
http://www.movable-type.co.uk/scripts/latlong.html
Here, the bearing of each pair of consecutive points is computed.
Parameters
----------
lon : numpy.ndarray of shape (n,)
Longitude coordinates of consecutive point, in decimal degrees.
lat : numpy.ndarray of shape (n,)
Latitude coordinates of consecutive point, in decimal degrees.
Returns
-------
earth_ang_fix : numpy.ndarray of shape (n-1,)
Bearing angle for each segment, in decimal degrees
"""
lon, lat = map(np.radians, [lon, lat])
# Segments between all point (0 -> 1, 1 -> 2, ..., n-1 -> n)
# starting points
lat_1 = lat[:-1]
lon_1 = lon[:-1]
# ending points
lat_2 = lat[1:]
lon_2 = lon[1:]
delta_lon = lon_2 - lon_1
# what to do with the points that don't move?
# i.e. where lat_2=lat_1 and lon_2=lon_1? The angle does not matter in
# that case because angular distance will be 0.
earth_ang_fix = np.arctan2(np.sin(delta_lon) * np.cos(lat_2),
np.cos(lat_1) * np.sin(lat_2) - \
np.sin(lat_1) * np.cos(lat_2) * np.cos(delta_lon))
return np.degrees(earth_ang_fix)
@numba.njit
def _get_destination_points(lon, lat, bearing, angular_distance):
"""
Get coordinates of endpoints from a given locations with the provided bearing and distance
Parameters
----------
lon : numpy.ndarray of shape (n,)
Longitude coordinates of each starting point, in decimal degrees.
lat : numpy.ndarray of shape (n,)
Latitude coordinates of each starting point, in decimal degrees.
bearing : numpy.ndarray of shape (n,)
Bearing to follow for each starting point (direction Northward, clockwise).
angular_distance : numpy.ndarray of shape (n,)
Angular distance to travel for each starting point, in decimal degrees.
Returns
-------
lon_2 : numpy.ndarray of shape (n,)
Longitude coordinates of each ending point, in decimal degrees.
lat_2 : numpy.ndarray of shape (n,)
Latitude coordinates of each ending point, in decimal degrees.
"""
lon, lat = map(np.radians, [lon, lat])
bearing = np.radians(bearing)
angular_distance = np.radians(angular_distance)
lat_2 = np.arcsin(np.sin(lat) * np.cos(angular_distance) + np.cos(lat) * \
np.sin(angular_distance) * np.cos(bearing))
lon_2 = lon + np.arctan2(np.sin(bearing) * np.sin(angular_distance) * np.cos(lat),
np.cos(angular_distance) - np.sin(lat) * np.sin(lat_2))
return np.degrees(lon_2), np.degrees(lat_2)
def _calc_land_decay(hist_tracks, land_geom, s_rel=True, check_plot=False,
pool=None):
"""Compute wind and pressure decay coefficients from historical events
Decay is calculated for every TC category according to the formulas:
- wind decay = exp(-x*A)
- pressure decay = S-(S-1)*exp(-x*B)
Parameters
----------
hist_tracks : list
List of xarray Datasets describing TC tracks.
land_geom : shapely.geometry.multipolygon.MultiPolygon
land geometry
s_rel : bool, optional
use environmental presure to calc S value
(true) or central presure (false)
check_plot : bool, optional
visualize computed coefficients.
Default: False
Returns
-------
v_rel : dict(category: A)
p_rel : dict(category: (S, B))
"""
if len(hist_tracks) < 100:
LOGGER.warning('For the calibration of the landfall decay '
'it is recommended to provide as many historical '
'tracks as possible, but only %s historical tracks '
'were provided. '
'For a more robust calculation consider using '
'a larger number of tracks or set '
'`use_global_decay_params` to True', len(hist_tracks))
# Key is Saffir-Simpson scale
# values are lists of wind/wind at landfall
v_lf = dict()
# values are tuples with first value the S parameter, second value
# list of central pressure/central pressure at landfall
p_lf = dict()
# x-scale values to compute landfall decay
x_val = dict()
if pool:
dec_val = pool.map(_decay_values, hist_tracks, itertools.repeat(land_geom),
itertools.repeat(s_rel),
chunksize=max(min(len(hist_tracks) // pool.ncpus, 1000), 1))
else:
dec_val = [_decay_values(track, land_geom, s_rel) for track in hist_tracks]
for (tv_lf, tp_lf, tx_val) in dec_val:
for key in tv_lf.keys():
v_lf.setdefault(key, []).extend(tv_lf[key])
p_lf.setdefault(key, ([], []))
p_lf[key][0].extend(tp_lf[key][0])
p_lf[key][1].extend(tp_lf[key][1])
x_val.setdefault(key, []).extend(tx_val[key])
v_rel, p_rel = _decay_calc_coeff(x_val, v_lf, p_lf)
if check_plot:
_check_decay_values_plot(x_val, v_lf, p_lf, v_rel, p_rel)
return v_rel, p_rel
def _apply_land_decay(tracks, v_rel, p_rel, land_geom, s_rel=True,
check_plot=False, pool=None):
"""Compute wind and pressure decay due to landfall in synthetic tracks.
Parameters
----------
v_rel : dict
{category: A}, where wind decay = exp(-x*A)
p_rel : dict
(category: (S, B)}, where pressure decay
= S-(S-1)*exp(-x*B)
land_geom : shapely.geometry.multipolygon.MultiPolygon
land geometry
s_rel : bool, optional
use environmental presure to calc S value
(true) or central presure (false)
check_plot : bool, optional
visualize computed changes
"""
sy_tracks = [track for track in tracks if not track.orig_event_flag]
if not sy_tracks:
raise ValueError('No synthetic tracks contained. Synthetic tracks'
' are needed.')
if not v_rel or not p_rel:
LOGGER.info('No decay coefficients.')
return
if check_plot:
orig_wind, orig_pres = [], []
for track in sy_tracks:
orig_wind.append(np.copy(track.max_sustained_wind.values))
orig_pres.append(np.copy(track.central_pressure.values))
if pool:
chunksize = max(min(len(tracks) // pool.ncpus, 1000), 1)
tracks = pool.map(_apply_decay_coeffs, tracks,
itertools.repeat(v_rel), itertools.repeat(p_rel),
itertools.repeat(land_geom), itertools.repeat(s_rel),
chunksize=chunksize)
else:
tracks = [_apply_decay_coeffs(track, v_rel, p_rel, land_geom, s_rel)
for track in tracks]
for track in tracks:
if track.orig_event_flag:
climada.hazard.tc_tracks.track_land_params(track, land_geom)
if check_plot:
_check_apply_decay_plot(tracks, orig_wind, orig_pres)
return tracks
def _decay_values(track, land_geom, s_rel):
"""Compute wind and pressure relative to landafall values.
Parameters
----------
track : xr.Dataset
track
land_geom : shapely.geometry.multipolygon.MultiPolygon
land geometry
s_rel : bool
use environmental presure for S value (true) or central presure (false)
Returns
-------
v_lf : dict
key is Saffir-Simpson scale, values are arrays of wind/wind at landfall
p_lf : dict
key is Saffir-Simpson scale, values are tuples with first value array
of S parameter, second value array of central pressure/central pressure
at landfall
x_val : dict
key is Saffir-Simpson scale, values are arrays with the values used as
"x" in the coefficient fitting, the distance since landfall
"""
# pylint: disable=protected-access
v_lf = dict()
p_lf = dict()
x_val = dict()
climada.hazard.tc_tracks.track_land_params(track, land_geom)
sea_land_idx, land_sea_idx = climada.hazard.tc_tracks._get_landfall_idx(track)
if sea_land_idx.size:
for sea_land, land_sea in zip(sea_land_idx, land_sea_idx):
v_landfall = track.max_sustained_wind[sea_land - 1].values
ss_scale = climada.hazard.tc_tracks.set_category(v_landfall,
track.max_sustained_wind_unit)
v_land = track.max_sustained_wind[sea_land - 1:land_sea].values
if v_land[0] > 0:
v_land = (v_land[1:] / v_land[0]).tolist()
else:
v_land = v_land[1:].tolist()
p_landfall = float(track.central_pressure[sea_land - 1].values)
p_land = track.central_pressure[sea_land - 1:land_sea].values
p_land = (p_land[1:] / p_land[0]).tolist()
p_land_s = _calc_decay_ps_value(
track, p_landfall, land_sea - 1, s_rel)
p_land_s = len(p_land) * [p_land_s]
if ss_scale not in v_lf:
v_lf[ss_scale] = array.array('f', v_land)
p_lf[ss_scale] = (array.array('f', p_land_s),
array.array('f', p_land))
x_val[ss_scale] = array.array('f',
track.dist_since_lf[sea_land:land_sea])
else:
v_lf[ss_scale].extend(v_land)
p_lf[ss_scale][0].extend(p_land_s)
p_lf[ss_scale][1].extend(p_land)
x_val[ss_scale].extend(track.dist_since_lf[sea_land:land_sea])
return v_lf, p_lf, x_val
def _decay_calc_coeff(x_val, v_lf, p_lf):
"""From track's relative velocity and pressure, compute the decay
coefficients.
- wind decay = exp(-x*A)
- pressure decay = S-(S-1)*exp(-x*A)
Parameters
----------
x_val : dict
key is Saffir-Simpson scale, values are lists with
the values used as "x" in the coefficient fitting, the
distance since landfall
v_lf : dict
key is Saffir-Simpson scale, values are lists of
wind/wind at landfall
p_lf : dict
key is Saffir-Simpson scale, values are tuples with
first value the S parameter, second value list of central
pressure/central pressure at landfall
Returns
-------
v_rel : dict
p_rel : dict
"""
warnings.filterwarnings('ignore')
v_rel = dict()
p_rel = dict()
for ss_scale, val_lf in v_lf.items():
x_val_ss = np.array(x_val[ss_scale])
y_val = np.array(val_lf)
v_coef = _solve_decay_v_function(y_val, x_val_ss)
v_coef = v_coef[np.isfinite(v_coef)]
v_coef = np.mean(v_coef)
ps_y_val = np.array(p_lf[ss_scale][0])
y_val = np.array(p_lf[ss_scale][1])
y_val[ps_y_val <= y_val] = np.nan
y_val[ps_y_val <= 1] = np.nan
valid_p = np.isfinite(y_val)
ps_y_val = ps_y_val[valid_p]
y_val = y_val[valid_p]
p_coef = _solve_decay_p_function(ps_y_val, y_val, x_val_ss[valid_p])
ps_y_val = np.mean(ps_y_val)
p_coef = np.mean(p_coef)
if np.isfinite(v_coef) and np.isfinite(ps_y_val) and np.isfinite(ps_y_val):
v_rel[ss_scale] = v_coef
p_rel[ss_scale] = (ps_y_val, p_coef)
scale_fill = np.array(list(p_rel.keys()))
if not scale_fill.size:
LOGGER.info('No historical track with landfall.')
return v_rel, p_rel
for ss_scale, ss_name in climada.hazard.tc_tracks.CAT_NAMES.items():
if ss_scale not in p_rel:
close_scale = scale_fill[np.argmin(np.abs(scale_fill - ss_scale))]
close_name = climada.hazard.tc_tracks.CAT_NAMES[close_scale]
LOGGER.debug('No historical track of category %s with landfall. '
'Decay parameters from category %s taken.',
ss_name, close_name)
v_rel[ss_scale] = v_rel[close_scale]
p_rel[ss_scale] = p_rel[close_scale]
elif v_rel[ss_scale] < 0:
raise ValueError('The calibration of landfall decay for wind speed resulted in'
f' a wind speed increase for TC category {ss_name}.'
' This behaviour is unphysical. Please use a larger number of tracks'
' or use global paramaters by setting `use_global_decay_params` to'
' `True`')
elif p_rel[ss_scale][0] < 0 or p_rel[ss_scale][1] < 0:
raise ValueError('The calibration of landfall decay for central pressure resulted in'
f' a pressure decrease for TC category {ss_name}.'
' This behaviour is unphysical. Please use a larger number of tracks'
' or use global paramaters by setting `use_global_decay_params` to'
' `True`')
return v_rel, p_rel
def _check_decay_values_plot(x_val, v_lf, p_lf, v_rel, p_rel):
"""Generate one graph with wind decay and an other with central pressure
decay, true and approximated."""
# One graph per TC category
for track_cat, color in zip(v_lf.keys(),
cm_mp.rainbow(np.linspace(0, 1, len(v_lf)))):
_, axes = plt.subplots(2, 1)
x_eval = np.linspace(0, np.max(x_val[track_cat]), 20)
axes[0].set_xlabel('Distance from landfall (km)')
axes[0].set_ylabel('Max sustained wind\nrelative to landfall')
axes[0].set_title(f'Wind, TC cat {climada.hazard.tc_tracks.CAT_NAMES[track_cat]}')
axes[0].plot(x_val[track_cat], v_lf[track_cat], '*', c=color,
label=climada.hazard.tc_tracks.CAT_NAMES[track_cat])
axes[0].plot(x_eval, _decay_v_function(v_rel[track_cat], x_eval),
'-', c=color)
axes[1].set_xlabel('Distance from landfall (km)')
axes[1].set_ylabel('Central pressure\nrelative to landfall')
axes[1].set_title(f'Pressure, TC cat {climada.hazard.tc_tracks.CAT_NAMES[track_cat]}')
axes[1].plot(x_val[track_cat], p_lf[track_cat][1], '*', c=color,
label=climada.hazard.tc_tracks.CAT_NAMES[track_cat])
axes[1].plot(
x_eval,
_decay_p_function(p_rel[track_cat][0], p_rel[track_cat][1], x_eval),
'-', c=color)
def _apply_decay_coeffs(track, v_rel, p_rel, land_geom, s_rel):
"""Change track's max sustained wind and central pressure using the land
decay coefficients.
Parameters
----------
track : xr.Dataset
TC track
v_rel : dict
{category: A}, where wind decay = exp(-x*A)
p_rel : dict
(category: (S, B)},
where pressure decay = S-(S-1)*exp(-x*B)
land_geom : shapely.geometry.multipolygon.MultiPolygon
land geometry
s_rel : bool
use environmental presure for S value (true) or
central presure (false)
Returns
-------
xr.Dataset
"""
# pylint: disable=protected-access
# return if historical track
if track.orig_event_flag:
return track
climada.hazard.tc_tracks.track_land_params(track, land_geom)
sea_land_idx, land_sea_idx = climada.hazard.tc_tracks._get_landfall_idx(track)
if not sea_land_idx.size:
return track
for idx, (sea_land, land_sea) \
in enumerate(zip(sea_land_idx, land_sea_idx)):
v_landfall = track.max_sustained_wind[sea_land - 1].values
p_landfall = float(track.central_pressure[sea_land - 1].values)
ss_scale = climada.hazard.tc_tracks.set_category(v_landfall,
track.max_sustained_wind_unit)
if land_sea - sea_land == 1:
continue
S = _calc_decay_ps_value(track, p_landfall, land_sea - 1, s_rel)
if S <= 1:
# central_pressure at start of landfall > env_pres after landfall:
# set central_pressure to environmental pressure during whole lf
track.central_pressure[sea_land:land_sea] = track.environmental_pressure[sea_land:land_sea]
else:
p_decay = _decay_p_function(S, p_rel[ss_scale][1],
track.dist_since_lf[sea_land:land_sea].values)
# dont apply decay if it would decrease central pressure
if np.any(p_decay < 1):
LOGGER.info('Landfall decay would decrease pressure for '
'track id %s, leading to an intensification '
'of the Tropical Cyclone. This behaviour is '
'unphysical and therefore landfall decay is not '
'applied in this case.',
track.sid)
p_decay[p_decay < 1] = (track.central_pressure[sea_land:land_sea][p_decay < 1]
/ p_landfall)
track.central_pressure[sea_land:land_sea] = p_landfall * p_decay
v_decay = _decay_v_function(v_rel[ss_scale],
track.dist_since_lf[sea_land:land_sea].values)
# dont apply decay if it would increase wind speeds
if np.any(v_decay > 1):
# should not happen unless v_rel is negative
LOGGER.info('Landfall decay would increase wind speed for '
'track id %s. This behavious in unphysical and '
'therefore landfall decay is not applied in this '
'case.',
track.sid)
v_decay[v_decay > 1] = (track.max_sustained_wind[sea_land:land_sea][v_decay > 1]
/ v_landfall)
track.max_sustained_wind[sea_land:land_sea] = v_landfall * v_decay
# correct values of sea after a landfall (until next landfall, if any)
if land_sea < track.time.size:
if idx + 1 < sea_land_idx.size:
# if there is a next landfall, correct until last point before
# reaching land again
end_cor = sea_land_idx[idx + 1]
else:
# if there is no further landfall, correct until the end of
# the track
end_cor = track.time.size
rndn = 0.1 * float(np.abs(np.random.normal(size=1) * 5) + 6)
r_diff = track.central_pressure[land_sea].values - \
track.central_pressure[land_sea - 1].values + rndn
track.central_pressure[land_sea:end_cor] += - r_diff
rndn = rndn * 10 # mean value 10
r_diff = track.max_sustained_wind[land_sea].values - \
track.max_sustained_wind[land_sea - 1].values - rndn
track.max_sustained_wind[land_sea:end_cor] += - r_diff
# correct limits
warnings.filterwarnings('ignore')
cor_p = track.central_pressure.values > track.environmental_pressure.values
track.central_pressure[cor_p] = track.environmental_pressure[cor_p]
track.max_sustained_wind[track.max_sustained_wind < 0] = 0
track.attrs['category'] = climada.hazard.tc_tracks.set_category(
track.max_sustained_wind.values, track.max_sustained_wind_unit)
return track
def _check_apply_decay_plot(all_tracks, syn_orig_wind, syn_orig_pres):
"""Plot wind and presure before and after correction for synthetic tracks.
Plot wind and presure for unchanged historical tracks."""
# Plot synthetic tracks
sy_tracks = [track for track in all_tracks if not track.orig_event_flag]
graph_v_b, graph_v_a, graph_p_b, graph_p_a, graph_pd_a, graph_ped_a = \
_check_apply_decay_syn_plot(sy_tracks, syn_orig_wind,
syn_orig_pres)
# Plot historic tracks
hist_tracks = [track for track in all_tracks if track.orig_event_flag]
graph_hv, graph_hp, graph_hpd_a, graph_hped_a = \
_check_apply_decay_hist_plot(hist_tracks)
# Put legend and fix size
scale_thresholds = climada.hazard.tc_tracks.SAFFIR_SIM_CAT
leg_lines = [Line2D([0], [0], color=climada.hazard.tc_tracks.CAT_COLORS[i_col], lw=2)
for i_col in range(len(scale_thresholds))]
leg_lines.append(Line2D([0], [0], color='k', lw=2))
leg_names = [climada.hazard.tc_tracks.CAT_NAMES[i_col]
for i_col in sorted(climada.hazard.tc_tracks.CAT_NAMES.keys())]
leg_names.append('Sea')
all_gr = [graph_v_a, graph_v_b, graph_p_a, graph_p_b, graph_ped_a,
graph_pd_a, graph_hv, graph_hp, graph_hpd_a, graph_hped_a]
for graph in all_gr:
graph.axs[0].legend(leg_lines, leg_names)
fig, _ = graph.get_elems()
fig.set_size_inches(18.5, 10.5)
def _calc_decay_ps_value(track, p_landfall, pos, s_rel):
if s_rel:
p_land_s = track.environmental_pressure[pos].values
else:
p_land_s = track.central_pressure[pos].values
return float(p_land_s / p_landfall)
def _decay_v_function(a_coef, x_val):
"""Decay function used for wind after landfall."""
return np.exp(-a_coef * x_val)
def _solve_decay_v_function(v_y, x_val):
"""Solve decay function used for wind after landfall. Get A coefficient."""
return -np.log(v_y) / x_val
def _decay_p_function(s_coef, b_coef, x_val):
"""Decay function used for pressure after landfall."""
return s_coef - (s_coef - 1) * np.exp(-b_coef * x_val)
def _solve_decay_p_function(ps_y, p_y, x_val):
"""Solve decay function used for pressure after landfall.
Get B coefficient."""
return -np.log((ps_y - p_y) / (ps_y - 1.0)) / x_val
def _check_apply_decay_syn_plot(sy_tracks, syn_orig_wind,
syn_orig_pres):
"""Plot winds and pressures of synthetic tracks before and after
correction."""
# pylint: disable=protected-access
_, graph_v_b = plt.subplots()
graph_v_b.set_title('Wind before land decay correction')
graph_v_b.set_xlabel('Node number')
graph_v_b.set_ylabel('Max sustained wind (kn)')
_, graph_v_a = plt.subplots()
graph_v_a.set_title('Wind after land decay correction')
graph_v_a.set_xlabel('Node number')
graph_v_a.set_ylabel('Max sustained wind (kn)')
_, graph_p_b = plt.subplots()
graph_p_b.set_title('Pressure before land decay correctionn')
graph_p_b.set_xlabel('Node number')
graph_p_b.set_ylabel('Central pressure (mb)')
_, graph_p_a = plt.subplots()
graph_p_a.set_title('Pressure after land decay correctionn')
graph_p_a.set_xlabel('Node number')
graph_p_a.set_ylabel('Central pressure (mb)')
_, graph_pd_a = plt.subplots()
graph_pd_a.set_title('Relative pressure after land decay correction')
graph_pd_a.set_xlabel('Distance from landfall (km)')
graph_pd_a.set_ylabel('Central pressure relative to landfall')
_, graph_ped_a = plt.subplots()
graph_ped_a.set_title(
'Environmental - central pressure after land decay correction')
graph_ped_a.set_xlabel('Distance from landfall (km)')
graph_ped_a.set_ylabel('Environmental pressure - Central pressure (mb)')
for track, orig_wind, orig_pres in \
zip(sy_tracks, syn_orig_wind, syn_orig_pres):
sea_land_idx, land_sea_idx = climada.hazard.tc_tracks._get_landfall_idx(track)
if sea_land_idx.size:
for sea_land, land_sea in zip(sea_land_idx, land_sea_idx):
v_lf = track.max_sustained_wind[sea_land - 1].values
p_lf = track.central_pressure[sea_land - 1].values
scale_thresholds = climada.hazard.tc_tracks.SAFFIR_SIM_CAT
ss_scale_idx = np.where(v_lf < scale_thresholds)[0][0]
on_land = np.arange(track.time.size)[sea_land:land_sea]
graph_v_a.plot(on_land, track.max_sustained_wind[on_land],
'o', c=climada.hazard.tc_tracks.CAT_COLORS[ss_scale_idx])
graph_v_b.plot(on_land, orig_wind[on_land],
'o', c=climada.hazard.tc_tracks.CAT_COLORS[ss_scale_idx])
graph_p_a.plot(on_land, track.central_pressure[on_land],
'o', c=climada.hazard.tc_tracks.CAT_COLORS[ss_scale_idx])
graph_p_b.plot(on_land, orig_pres[on_land],
'o', c=climada.hazard.tc_tracks.CAT_COLORS[ss_scale_idx])
graph_pd_a.plot(track.dist_since_lf[on_land],
track.central_pressure[on_land] / p_lf,
'o', c=climada.hazard.tc_tracks.CAT_COLORS[ss_scale_idx])
graph_ped_a.plot(track.dist_since_lf[on_land],
track.environmental_pressure[on_land] -
track.central_pressure[on_land],
'o', c=climada.hazard.tc_tracks.CAT_COLORS[ss_scale_idx])
on_sea = np.arange(track.time.size)[~track.on_land]
graph_v_a.plot(on_sea, track.max_sustained_wind[on_sea],
'o', c='k', markersize=5)
graph_v_b.plot(on_sea, orig_wind[on_sea],
'o', c='k', markersize=5)
graph_p_a.plot(on_sea, track.central_pressure[on_sea],
'o', c='k', markersize=5)
graph_p_b.plot(on_sea, orig_pres[on_sea],
'o', c='k', markersize=5)
return graph_v_b, graph_v_a, graph_p_b, graph_p_a, graph_pd_a, graph_ped_a
def _check_apply_decay_hist_plot(hist_tracks):
"""Plot winds and pressures of historical tracks."""
# pylint: disable=protected-access
_, graph_hv = plt.subplots()
graph_hv.set_title('Historical wind')
graph_hv.set_xlabel('Node number')
graph_hv.set_ylabel('Max sustained wind (kn)')
_, graph_hp = plt.subplots()
graph_hp.set_title('Historical pressure')
graph_hp.set_xlabel('Node number')
graph_hp.set_ylabel('Central pressure (mb)')
_, graph_hpd_a = plt.subplots()
graph_hpd_a.set_title('Historical relative pressure')
graph_hpd_a.set_xlabel('Distance from landfall (km)')
graph_hpd_a.set_ylabel('Central pressure relative to landfall')
_, graph_hped_a = plt.subplots()
graph_hped_a.set_title('Historical environmental - central pressure')
graph_hped_a.set_xlabel('Distance from landfall (km)')
graph_hped_a.set_ylabel('Environmental pressure - Central pressure (mb)')
for track in hist_tracks:
sea_land_idx, land_sea_idx = climada.hazard.tc_tracks._get_landfall_idx(track)
if sea_land_idx.size:
for sea_land, land_sea in zip(sea_land_idx, land_sea_idx):
scale_thresholds = climada.hazard.tc_tracks.SAFFIR_SIM_CAT
ss_scale_idx = np.where(track.max_sustained_wind[sea_land - 1].values
< scale_thresholds)[0][0]
on_land = np.arange(track.time.size)[sea_land:land_sea]
graph_hv.add_curve(on_land, track.max_sustained_wind[on_land],
'o', c=climada.hazard.tc_tracks.CAT_COLORS[ss_scale_idx])
graph_hp.add_curve(on_land, track.central_pressure[on_land],
'o', c=climada.hazard.tc_tracks.CAT_COLORS[ss_scale_idx])
graph_hpd_a.plot(track.dist_since_lf[on_land],
track.central_pressure[on_land]
/ track.central_pressure[sea_land - 1].values,
'o', c=climada.hazard.tc_tracks.CAT_COLORS[ss_scale_idx])
graph_hped_a.plot(track.dist_since_lf[on_land],
track.environmental_pressure[on_land] -
track.central_pressure[on_land],
'o', c=climada.hazard.tc_tracks.CAT_COLORS[ss_scale_idx])
on_sea = np.arange(track.time.size)[~track.on_land]
graph_hp.plot(on_sea, track.central_pressure[on_sea],
'o', c='k', markersize=5)
graph_hv.plot(on_sea, track.max_sustained_wind[on_sea],
'o', c='k', markersize=5)
return graph_hv, graph_hp, graph_hpd_a, graph_hped_a