Source code for climada.hazard.trop_cyclone

"""
This file is part of CLIMADA.

Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.

CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU Lesser General Public License as published by the Free
Software Foundation, version 3.

CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.

---

Define TropCyclone class.
"""

__all__ = ['TropCyclone']

import itertools
import logging
import copy
import time
import datetime as dt
import numpy as np
from numpy import linalg as LA
from scipy import sparse
import matplotlib.animation as animation
from pint import UnitRegistry
from numba import jit
from tqdm import tqdm

from climada.hazard.base import Hazard
from climada.hazard.tag import Tag as TagHazard
from climada.hazard.tc_tracks import TCTracks
from climada.hazard.tc_clim_change import get_knutson_criterion, calc_scale_knutson
from climada.hazard.centroids.centr import Centroids
from climada.util.constants import GLB_CENTROIDS_MAT
from climada.util.interpolation import dist_approx
import climada.util.plot as u_plot

LOGGER = logging.getLogger(__name__)

HAZ_TYPE = 'TC'
""" Hazard type acronym for Tropical Cyclone """

INLAND_MAX_DIST_KM = 1000
""" Maximum inland distance of the centroids in km """

CENTR_NODE_MAX_DIST_KM = 300
""" Maximum distance between centroid and TC track node in km """

MODEL_VANG = {'H08': 0
             }
""" Enumerate different symmetric wind field calculation."""

[docs]class TropCyclone(Hazard): """Contains tropical cyclone events. Attributes: category (np.array(int)): for every event, the TC category using the Saffir-Simpson scale: -1 tropical depression 0 tropical storm 1 Hurrican category 1 2 Hurrican category 2 3 Hurrican category 3 4 Hurrican category 4 5 Hurrican category 5 basin (list(str)): basin where every event starts 'NA' North Atlantic 'EP' Eastern North Pacific 'WP' Western North Pacific 'NI' North Indian 'SI' South Indian 'SP' Southern Pacific 'SA' South Atlantic """ intensity_thres = 17.5 """ intensity threshold for storage in m/s """ vars_opt = Hazard.vars_opt.union({'category'}) """Name of the variables that aren't need to compute the impact."""
[docs] def __init__(self, pool=None): """Empty constructor. """ Hazard.__init__(self, HAZ_TYPE) self.category = np.array([], int) self.basin = list() if pool: self.pool = pool LOGGER.info('Using %s CPUs.', self.pool.ncpus) else: self.pool = None
[docs] def set_from_tracks(self, tracks, centroids=None, description='', model='H08', ignore_distance_to_coast=False): """Clear and model tropical cyclone from input IBTrACS tracks. Parallel process. Parameters: tracks (TCTracks): tracks of events centroids (Centroids, optional): Centroids where to model TC. Default: global centroids. description (str, optional): description of the events model (str, optional): model to compute gust. Default Holland2008. ignore_distance_to_coast (boolean, optional): if True, centroids far from coast are not ignored. Default False Raises: ValueError """ num_tracks = tracks.size if centroids is None: centroids = Centroids() centroids.read_mat(GLB_CENTROIDS_MAT) if ignore_distance_to_coast: # Select centroids with lat < 61 coastal_idx = np.logical_and(centroids.lat < 61, True).nonzero()[0] else: # Select centroids which are inside INLAND_MAX_DIST_KM and lat < 61 coastal_idx = coastal_centr_idx(centroids) if not centroids.coord.size: centroids.set_meta_to_lat_lon() LOGGER.info('Mapping %s tracks to %s centroids.', str(tracks.size), str(centroids.size)) if self.pool: chunksize = min(num_tracks//self.pool.ncpus, 1000) tc_haz = self.pool.map(self._tc_from_track, tracks.data, itertools.repeat(centroids, num_tracks), itertools.repeat(coastal_idx, num_tracks), itertools.repeat(model, num_tracks), chunksize=chunksize) else: tc_haz = list() for track in tracks.data: tc_haz.append(self._tc_from_track(track, centroids, coastal_idx, model)) LOGGER.debug('Append events.') self._append_all(tc_haz) LOGGER.debug('Compute frequency.') self._set_frequency(tracks.data) self.tag.description = description
[docs] def set_climate_scenario_knu(self, ref_year=2050, rcp_scenario=45): """ Compute future events for given RCP scenario and year. RCP 4.5 from Knutson et al 2015. Parameters: ref_year (int): year between 2000 ad 2100. Default: 2050 rcp_scenario (int): 26 for RCP 2.6, 45 for RCP 4.5 (default), 60 for RCP 6.0 and 85 for RCP 8.5. Returns: TropCyclone """ criterion = get_knutson_criterion() scale = calc_scale_knutson(ref_year, rcp_scenario) haz_cc = self._apply_criterion(criterion, scale) haz_cc.tag.description = 'climate change scenario for year %s and RCP %s '\ 'from Knutson et al 2015.' % (str(ref_year), str(rcp_scenario)) return haz_cc
[docs] @staticmethod def video_intensity(track_name, tracks, centroids, file_name=None, writer=animation.PillowWriter(bitrate=500), **kwargs): """ Generate video of TC wind fields node by node and returns its corresponding TropCyclone instances and track pieces. Parameters: track_name (str): name of the track contained in tracks to record tracks (TCTracks): tracks centroids (Centroids): centroids where wind fields are mapped file_name (str, optional): file name to save video, if provided writer = (matplotlib.animation.*, optional): video writer. Default: pillow with bitrate=500 kwargs (optional): arguments for pcolormesh matplotlib function used in event plots Returns: list(TropCyclone), list(np.array) Raises: ValueError """ # initialization track = tracks.get_track(track_name) if not track: LOGGER.error('%s not found in track data.', track_name) raise ValueError idx_plt = np.argwhere(np.logical_and(np.logical_and(np.logical_and( \ track.lon.values < centroids.total_bounds[2] + 1, \ centroids.total_bounds[0] - 1 < track.lon.values), \ track.lat.values < centroids.total_bounds[3] + 1), \ centroids.total_bounds[1] - 1 < track.lat.values)).reshape(-1) tc_list = [] tr_coord = {'lat':[], 'lon':[]} for node in range(idx_plt.size-2): tr_piece = track.sel(time=slice(track.time.values[idx_plt[node]], \ track.time.values[idx_plt[node+2]])) tr_piece.attrs['n_nodes'] = 2 # plot only one node tr_sel = TCTracks() tr_sel.append(tr_piece) tr_coord['lat'].append(tr_sel.data[0].lat.values[:-1]) tr_coord['lon'].append(tr_sel.data[0].lon.values[:-1]) tc_tmp = TropCyclone() tc_tmp.set_from_tracks(tr_sel, centroids) tc_tmp.event_name = [track.name + ' ' + time.strftime("%d %h %Y %H:%M", \ time.gmtime(tr_sel.data[0].time[1].values.astype(int)/1000000000))] tc_list.append(tc_tmp) if 'cmap' not in kwargs: kwargs['cmap'] = 'Greys' if 'vmin' not in kwargs: kwargs['vmin'] = np.array([tc_.intensity.min() for tc_ in tc_list]).min() if 'vmax' not in kwargs: kwargs['vmax'] = np.array([tc_.intensity.max() for tc_ in tc_list]).max() def run(node): tc_list[node].plot_intensity(1, axis=axis, **kwargs) axis.plot(tr_coord['lon'][node], tr_coord['lat'][node], 'k') axis.set_title(tc_list[node].event_name[0]) pbar.update() if file_name: LOGGER.info('Generating video %s', file_name) fig, axis = u_plot.make_map() pbar = tqdm(total=idx_plt.size-2) ani = animation.FuncAnimation(fig, run, frames=idx_plt.size-2, interval=500, blit=False) ani.save(file_name, writer=writer) pbar.close() return tc_list, tr_coord
def _set_frequency(self, tracks): """Set hazard frequency from tracks data. Parameters: tracks (list(xr.Dataset)) """ if not tracks: return delta_time = np.max([np.max(track.time.dt.year.values) \ for track in tracks]) - np.min([np.min(track.time.dt.year.values) \ for track in tracks]) + 1 num_orig = self.orig.nonzero()[0].size if num_orig > 0: ens_size = self.event_id.size / num_orig else: ens_size = 1 self.frequency = np.ones(self.event_id.size) / delta_time / ens_size @staticmethod @jit def _tc_from_track(track, centroids, coastal_centr, model='H08'): """ Set hazard from input file. If centroids are not provided, they are read from the same file. Parameters: track (xr.Dataset): tropical cyclone track. centroids (Centroids): Centroids instance. Use global centroids if not provided. coastal_centr (np.array): indeces of centroids close to coast. model (str, optional): model to compute gust. Default Holland2008. Raises: ValueError, KeyError Returns: TropCyclone """ new_haz = TropCyclone() new_haz.tag = TagHazard(HAZ_TYPE, 'IBTrACS: ' + track.name) new_haz.intensity = gust_from_track(track, centroids, coastal_centr, model) new_haz.units = 'm/s' new_haz.centroids = centroids new_haz.event_id = np.array([1]) # frequency set when all tracks available new_haz.frequency = np.array([1]) new_haz.event_name = [track.sid] new_haz.fraction = new_haz.intensity.copy() new_haz.fraction.data.fill(1) # store date of start new_haz.date = np.array([dt.datetime( track.time.dt.year[0], track.time.dt.month[0], track.time.dt.day[0]).toordinal()]) new_haz.orig = np.array([track.orig_event_flag]) new_haz.category = np.array([track.category]) new_haz.basin = [track.basin] return new_haz def _apply_criterion(self, criterion, scale): """ Apply changes defined in criterion with a given scale Parameters: criterion (list(dict)): list of criteria scale (float): scale parameter because of chosen year and RCP Returns: TropCyclone """ haz_cc = copy.deepcopy(self) for chg in criterion: # filter criteria select = np.ones(haz_cc.size, bool) for var_name, cri_val in chg['criteria'].items(): var_val = getattr(haz_cc, var_name) if isinstance(var_val, list): var_val = np.array(var_val) tmp_select = np.logical_or.reduce([var_val == val for val in cri_val]) select = np.logical_and(select, tmp_select) if chg['function'] == np.multiply: change = 1 + (chg['change'] - 1) * scale elif chg['function'] == np.add: change = chg['change'] * scale if select.any(): new_val = getattr(haz_cc, chg['variable']) new_val[select] *= change setattr(haz_cc, chg['variable'], new_val) return haz_cc
def coastal_centr_idx(centroids, lat_max=61): """ Compute centroids indices which are inside INLAND_MAX_DIST_KM and with lat < lat_max. Parameters: lat_max (float, optional): Maximum latitude to consider. Default: 61. Returns: np.array """ if not centroids.dist_coast.size: centroids.set_dist_coast() return np.logical_and(centroids.dist_coast < INLAND_MAX_DIST_KM*1000, centroids.lat < lat_max).nonzero()[0] def gust_from_track(track, centroids, coastal_idx=None, model='H08'): """ Compute wind gusts at centroids from track. Track is interpolated to configured time step. Parameters: track (xr.Dataset): track infomation centroids (Centroids): centroids where gusts are computed coastal_idx (np.array): indices of centroids which are close to coast model (str, optional): model to compute gust. Default Holland2008 Returns: sparse.csr_matrix """ if coastal_idx is None: coastal_idx = coastal_centr_idx(centroids) try: mod_id = MODEL_VANG[model] except KeyError: LOGGER.error('Not implemented model %s.', model) raise ValueError # Compute wind gusts intensity = _windfield(track, centroids.coord, coastal_idx, mod_id) return sparse.csr_matrix(intensity) @jit def _windfield(track, centroids, coastal_idx, model): """ Compute windfields (in m/s) in centroids using Holland model 08. Parameters: track (xr.Dataset): track infomation centroids (2d np.array): each row is a centroid [lat, lon] coastal_idx (1d np.array): centroids indices that are close to coast model (int): Holland model selection according to MODEL_VANG Returns: np.array """ np.warnings.filterwarnings('ignore') # Make sure that CentralPressure never exceeds EnvironmentalPressure up_pr = np.argwhere(track.central_pressure.values > track.environmental_pressure.values) track.central_pressure.values[up_pr] = \ track.environmental_pressure.values[up_pr] # Extrapolate RadiusMaxWind from pressure if not given ureg = UnitRegistry() track['radius_max_wind'] = ('time', _extra_rad_max_wind( \ track.central_pressure.values, track.radius_max_wind.values, ureg)) # Track translational speed at every node v_trans = _vtrans(track.lat.values, track.lon.values, track.time_step.values, ureg) # Compute windfield intensity = np.zeros((centroids.shape[0], )) intensity[coastal_idx] = _wind_per_node(centroids[coastal_idx, :], track, v_trans, model) return intensity @jit def _vtrans(t_lat, t_lon, t_tstep, ureg): """ Translational spped at every track node. Parameters: t_lat (np.array): track latitudes t_lon (np.array): track longitudes t_tstep (np.array): track time steps ureg (UnitRegistry): units handler Returns: np.array """ v_trans = dist_approx(t_lat[:-1], t_lon[:-1], np.cos(np.radians(t_lat[:-1])), t_lat[1:], t_lon[1:]) / t_tstep[1:] v_trans = (v_trans * ureg.km/ureg.hour).to(ureg.meter/ureg.second).magnitude # nautical miles/hour, limit to 30 nmph v_max = (30*ureg.knot).to(ureg.meter/ureg.second).magnitude v_trans[v_trans > v_max] = v_max return v_trans @jit def _extra_rad_max_wind(t_cen, t_rad, ureg): """ Extrapolate RadiusMaxWind from pressure and change to km. Parameters: t_cen (np.array): track central pressures t_rad (np.array): track radius of maximum wind ureg (UnitRegistry): units handler Returns: np.array """ # TODO: always extrapolate???!!! # rmax thresholds in nm rmax_1, rmax_2, rmax_3 = 15, 25, 50 # pressure in mb pres_1, pres_2, pres_3 = 950, 980, 1020 t_rad[t_cen <= pres_1] = rmax_1 to_change = np.logical_and(t_cen > pres_1, t_cen <= pres_2).nonzero()[0] t_rad[to_change] = (t_cen[to_change] - pres_1) * \ (rmax_2 - rmax_1)/(pres_2 - pres_1) + rmax_1 to_change = np.argwhere(t_cen > pres_2).squeeze() t_rad[to_change] = (t_cen[to_change] - pres_2) * \ (rmax_3 - rmax_2)/(pres_3 - pres_2) + rmax_2 return (t_rad * ureg.nautical_mile).to(ureg.kilometer).magnitude @jit(parallel=True) def _wind_per_node(coastal_centr, track, v_trans, model): """ Compute sustained winds at each centroid. Parameters: coastal_centr (2d np.array): centroids track (xr.Dataset): track latitudes v_trans (np.array): track translational velocity model (int): Holland model selection according to MODEL_VANG Returns: 2d np.array """ t_lat, t_lon = track.lat.values, track.lon.values t_rad, t_env = track.radius_max_wind.values, track.environmental_pressure.values t_cen, t_tstep = track.central_pressure.values, track.time_step.values centr_cos_lat = np.cos(np.radians(coastal_centr[:, 0])) intensity = np.zeros((coastal_centr.shape[0],)) n_nodes = t_lat.size if 'n_nodes' in track.attrs: n_nodes = track.attrs['n_nodes'] for i_node in range(1, n_nodes): # compute distance to all centroids r_arr = dist_approx(coastal_centr[:, 0], coastal_centr[:, 1], \ centr_cos_lat, t_lat[i_node], t_lon[i_node]) # Choose centroids that are close enough close_centr = np.argwhere(r_arr < CENTR_NODE_MAX_DIST_KM).reshape(-1,) r_arr = r_arr[close_centr] # translational component if i_node < t_lat.size-1: v_trans_corr = _vtrans_correct(t_lat[i_node:i_node+2], \ t_lon[i_node:i_node+2], t_rad[i_node], \ coastal_centr[close_centr, :], r_arr) else: v_trans_corr = np.zeros((r_arr.size,)) # angular component v_ang = _vang_sym(t_env[i_node], t_cen[i_node-1:i_node+1], t_lat[i_node], t_tstep[i_node], t_rad[i_node], r_arr, v_trans[i_node-1], model) v_full = v_trans[i_node-1] * v_trans_corr + v_ang v_full[np.isnan(v_full)] = 0 v_full[v_full < TropCyclone.intensity_thres] = 0 # keep maximum instantaneous wind intensity[close_centr] = np.maximum(intensity[close_centr], v_full) return intensity @jit def _vtrans_correct(t_lats, t_lons, t_rad, close_centr, r_arr): """ Compute Hollands translational wind corrections. Returns factor. Parameters: t_lats (tuple): current and next latitude t_lats (tuple): current and next longitude t_rad (float): current radius of maximum wind close_centr (np.array): centroids r_arr (np.array): distance from current node to all centroids Returns: np.array """ # we use the scalar product of the track forward vector and the vector # towards each centroid to figure the angle between and hence whether # the translational wind needs to be added (on the right side of the # track for Northern hemisphere) and to which extent (100% exactly 90 # to the right of the track, zero in front of the track) lon, nex_lon = t_lons lat, nex_lat = t_lats # hence, rotate track forward vector 90 degrees clockwise, i.e. node_dy = -nex_lon + lon node_dx = nex_lat - lat # the vector towards each centroid centroids_dlon = close_centr[:, 1] - lon centroids_dlat = close_centr[:, 0] - lat # scalar product, a*b=|a|*|b|*cos(phi), phi angle between vectors cos_phi = (centroids_dlon * node_dx + centroids_dlat * node_dy) / \ LA.norm([centroids_dlon, centroids_dlat], axis=0) / LA.norm([node_dx, node_dy]) # southern hemisphere if lat < 0: cos_phi = -cos_phi # calculate v_trans wind field array assuming that # - effect of v_trans decreases with distance from eye (r_arr_normed) # - v_trans is added 100% to the right of the track, 0% in front (cos_phi) r_arr_normed = t_rad / r_arr r_arr_normed[r_arr_normed > 1] = 1 return np.multiply(r_arr_normed, cos_phi) @jit(['f8(f8, f8, f8, f8, f8, f8, f8)'], nopython=True) def _bs_hol08(v_trans, penv, pcen, prepcen, lat, hol_xx, tint): """ Halland's 2008 b value computation. Parameters: v_trans (float): translational wind (m/s) penv (float): environmental pressure (hPa) pcen (float): central pressure (hPa) prepcen (float): previous central pressure (hPa) lat (float): latitude (degrees) hol_xx (float): Holland's xx value tint (float): time step (h) Returns: float """ return -4.4e-5 * (penv - pcen)**2 + 0.01 * (penv-pcen) + \ 0.03 * (pcen - prepcen) / tint - 0.014 * abs(lat) + \ 0.15 * v_trans**hol_xx + 1.0 @jit(nopython=True) def _stat_holland(r_arr, r_max, hol_b, penv, pcen, ycoord): """ Holland symmetric and static wind field (in m/s) according to Holland1980 or Holland2008m depending on hol_b parameter. Parameters: r_arr (np.array): distance between coastal centroids and track node r_max (float): radius_max_wind hol_b (float): Holland's b parameter penv (float): environmental pressure pcen (float): central pressure ycoord (float): latitude Returns: np.array """ rho = 1.15 f_val = 2 * 0.0000729 * np.sin(np.radians(np.abs(ycoord))) r_arr_mult = 0.5 * 1000 * r_arr * f_val # units are m/s r_max_norm = (r_max/r_arr)**hol_b return np.sqrt(100 * hol_b / rho * r_max_norm * (penv - pcen) * np.exp(-r_max_norm) + r_arr_mult**2) - r_arr_mult @jit(nopython=True) def _vang_sym(t_env, t_cens, t_lat, t_step, t_rad, r_arr, v_trans, model): """ Compute symmetric and static wind field (in m/s) filed (angular wind component. Parameters: t_env (float): environmental pressures t_cens (tuple): previous and current central pressures t_lat (float): latitude t_tstep (float): time steps t_rad (float): radius of maximum wind r_arr (np.array): distance from current node to all centroids v_trans (float): translational wind field model (int): Holland model to use, default 2008. Returns: np.array """ # data for windfield calculation prev_pres, pres = t_cens hol_xx = 0.6 * (1. - (t_env - pres) / 215) if model == 0: # adjust pressure at previous track point if prev_pres < 850: prev_pres = pres hol_b = _bs_hol08(v_trans, t_env, pres, prev_pres, t_lat, hol_xx, t_step) else: # TODO H80: b=b_value(v_trans,vmax,penv,pcen,rho); raise NotImplementedError return _stat_holland(r_arr, t_rad, hol_b, t_env, pres, t_lat)