Source code for climada.hazard.tc_tracks

"""
This file is part of CLIMADA.

Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.

CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.

CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.  See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.

---

Define TCTracks: IBTracs reader and tracks manager.
"""

__all__ = ['CAT_NAMES', 'SAFFIR_SIM_CAT', 'TCTracks', 'set_category']

# standard libraries
import datetime as dt
import itertools
import logging
from typing import Optional, List
import re
import shutil
import warnings
from pathlib import Path

# additional libraries
import cartopy.crs as ccrs
import cftime
import geopandas as gpd
import pathos
import matplotlib.cm as cm_mp
from matplotlib.collections import LineCollection
from matplotlib.colors import BoundaryNorm, ListedColormap
from matplotlib.lines import Line2D
import matplotlib.pyplot as plt
import netCDF4 as nc
import numba
import numpy as np
import pandas as pd
import scipy.io.matlab as matlab
from shapely.geometry import Point, LineString, MultiLineString
import shapely.ops
from sklearn.metrics import DistanceMetric
import statsmodels.api as sm
import xarray as xr

# climada dependencies
from climada.util import ureg
import climada.util.coordinates as u_coord
from climada.util.constants import EARTH_RADIUS_KM, SYSTEM_DIR, DEF_CRS
from climada.util.files_handler import get_file_names, download_ftp
import climada.util.plot as u_plot
from climada.hazard import Centroids
import climada.hazard.tc_tracks_synth

LOGGER = logging.getLogger(__name__)

SAFFIR_SIM_CAT = [34, 64, 83, 96, 113, 137, 1000]
"""Saffir-Simpson Hurricane Wind Scale in kn based on NOAA"""

CAT_NAMES = {
    -1: 'Tropical Depression',
    0: 'Tropical Storm',
    1: 'Hurricane Cat. 1',
    2: 'Hurricane Cat. 2',
    3: 'Hurricane Cat. 3',
    4: 'Hurricane Cat. 4',
    5: 'Hurricane Cat. 5',
}
"""Saffir-Simpson category names."""

CAT_COLORS = cm_mp.rainbow(np.linspace(0, 1, len(SAFFIR_SIM_CAT)))
"""Color scale to plot the Saffir-Simpson scale."""

IBTRACS_URL = ('https://www.ncei.noaa.gov/data/'
               'international-best-track-archive-for-climate-stewardship-ibtracs/'
               'v04r00/access/netcdf')
"""Site of IBTrACS netcdf file containing all tracks v4.0,
s. https://www.ncdc.noaa.gov/ibtracs/index.php?name=ib-v4-access"""

IBTRACS_FILE = 'IBTrACS.ALL.v04r00.nc'
"""IBTrACS v4.0 file all"""

IBTRACS_AGENCIES = [
    'usa', 'tokyo', 'newdelhi', 'reunion', 'bom', 'nadi', 'wellington',
    'cma', 'hko', 'ds824', 'td9636', 'td9635', 'neumann', 'mlc',
]
"""Names/IDs of agencies in IBTrACS v4.0"""

IBTRACS_USA_AGENCIES = [
    'atcf', 'cphc', 'hurdat_atl', 'hurdat_epa', 'jtwc_cp', 'jtwc_ep', 'jtwc_io',
    'jtwc_sh', 'jtwc_wp', 'nhc_working_bt', 'tcvightals', 'tcvitals'
]
"""Names/IDs of agencies in IBTrACS that correspond to 'usa_*' variables"""


IBTRACS_AGENCY_1MIN_WIND_FACTOR = {
    "usa": [1.0, 0.0],
    "tokyo": [0.60, 23.3],
    "newdelhi": [1.0, 0.0],
    "reunion": [0.88, 0.0],
    "bom": [0.88, 0.0],
    "nadi": [0.88, 0.0],
    "wellington": [0.88, 0.0],
    'cma': [0.871, 0.0],
    'hko': [0.9, 0.0],
    'ds824': [1.0, 0.0],
    'td9636': [1.0, 0.0],
    'td9635': [1.0, 0.0],
    'neumann': [0.88, 0.0],
    'mlc': [1.0, 0.0],
}
"""Scale and shift used by agencies to convert their internal Dvorak 1-minute sustained winds to
the officially reported values that are in IBTrACS. From Table 1 in:

Knapp, K.R. & Kruk, M.C. (2010): Quantifying Interagency Differences in Tropical Cyclone Best-Track
Wind Speed Estimates. Monthly Weather Review 138(4): 1459–1473.
https://journals.ametsoc.org/view/journals/mwre/138/4/2009mwr3123.1.xml"""

DEF_ENV_PRESSURE = 1010
"""Default environmental pressure"""

BASIN_ENV_PRESSURE = {
    '': DEF_ENV_PRESSURE,
    'EP': 1010, 'NA': 1010, 'SA': 1010,
    'NI': 1005, 'SI': 1005, 'WP': 1005,
    'SP': 1004,
}
"""Basin-specific default environmental pressure"""

EMANUEL_RMW_CORR_FILES = [
    'temp_ccsm420thcal.mat', 'temp_ccsm4rcp85_full.mat',
    'temp_gfdl520thcal.mat', 'temp_gfdl5rcp85cal_full.mat',
    'temp_hadgem20thcal.mat', 'temp_hadgemrcp85cal_full.mat',
    'temp_miroc20thcal.mat', 'temp_mirocrcp85cal_full.mat',
    'temp_mpi20thcal.mat', 'temp_mpircp85cal_full.mat',
    'temp_mri20thcal.mat', 'temp_mrircp85cal_full.mat',
]
EMANUEL_RMW_CORR_FACTOR = 2.0
"""Kerry Emanuel track files in this list require a correction: The radius of
    maximum wind (rmstore) needs to be multiplied by factor 2."""

STORM_1MIN_WIND_FACTOR = 0.88
"""Scaling factor used in Bloemendaal et al. (2020) to convert 1-minute sustained wind speeds to
10-minute sustained wind speeds.

Bloemendaal et al. (2020): Generation of a global synthetic tropical cyclone hazard
dataset using STORM. Scientific Data 7(1): 40."""

[docs] class TCTracks(): """Contains tropical cyclone tracks. Attributes ---------- data : list(xarray.Dataset) List of tropical cyclone tracks. Each track contains following attributes: - time (coords) - lat (coords) - lon (coords) - time_step (in hours) - radius_max_wind (in nautical miles) - radius_oci (in nautical miles) - max_sustained_wind (in knots) - central_pressure (in hPa/mbar) - environmental_pressure (in hPa/mbar) - basin (for each track position) - max_sustained_wind_unit (attrs) - central_pressure_unit (attrs) - name (attrs) - sid (attrs) - orig_event_flag (attrs) - data_provider (attrs) - id_no (attrs) - category (attrs) Computed during processing: - on_land (bool for each track position) - dist_since_lf (in km) Additional data variables such as "nature" (specifiying, for each track position, whether a system is a disturbance, tropical storm, post-transition extratropical storm etc.) might be included, depending on the data source and on use cases. """
[docs] def __init__(self, data: Optional[List[xr.Dataset]] = None, pool: Optional[pathos.multiprocessing.ProcessPool] = None): """Create new (empty) TCTracks instance. Parameters ---------- data : list of xarray.Dataset, optional List of tropical cyclone tracks, each stored as single xarray Dataset. See the Attributes for a full description of the required Dataset variables and attributes. Defaults to an empty list. pool : pathos.pools, optional Pool that will be used for parallel computation when applicable. Default: None """ self.data = data if data is not None else list() self.pool = pool if pool: LOGGER.debug('Using %s CPUs.', self.pool.ncpus)
[docs] def append(self, tracks): """Append tracks to current. Parameters ---------- tracks : xarray.Dataset or list(xarray.Dataset) tracks to append. """ if not isinstance(tracks, list): tracks = [tracks] self.data.extend(tracks)
[docs] def get_track(self, track_name=None): """Get track with provided name. Returns the first matching track based on the assumption that no other track with the same name or sid exists in the set. Parameters ---------- track_name : str, optional Name or sid (ibtracsID for IBTrACS) of track. If None (default), return all tracks. Returns ------- result : xarray.Dataset or list of xarray.Dataset Usually, a single track is returned. If no track with the specified name is found, an empty list `[]` is returned. If called with `track_name=None`, the list of all tracks is returned. """ if track_name is None: if len(self.data) == 1: return self.data[0] return self.data for track in self.data: if track.name == track_name: return track if hasattr(track, 'sid') and track.sid == track_name: return track LOGGER.info('No track with name or sid %s found.', track_name) return []
[docs] def subset(self, filterdict): """Subset tracks based on track attributes. Select all tracks matching exactly the given attribute values. Parameters ---------- filterdict : dict or OrderedDict Keys are attribute names, values are the corresponding attribute values to match. In case of an ordered dict, the filters are applied in the given order. Returns ------- tc_tracks : TCTracks A new instance of TCTracks containing only the matching tracks. """ out = self.__class__(pool=self.pool) out.data = self.data for key, pattern in filterdict.items(): if key == "basin": out.data = [ds for ds in out.data if pattern in ds.basin] else: out.data = [ds for ds in out.data if ds.attrs[key] == pattern] return out
[docs] def tracks_in_exp(self, exposure, buffer=1.0): """Select only the tracks that are in the vicinity (buffer) of an exposure. Each exposure point/geometry is extended to a disc of radius `buffer`. Each track is converted to a line and extended by a radius `buffer`. Parameters ---------- exposure : Exposure Exposure used to select tracks. buffer : float, optional Size of buffer around exposure geometries (in the units of `exposure.crs`), see `geopandas.distance`. Default: 1.0 Returns ------- filtered_tracks : TCTracks TCTracks object with tracks from tc_tracks intersecting the exposure whitin a buffer distance. """ if buffer <= 0.0: raise ValueError(f"buffer={buffer} is invalid, must be above zero.") try: exposure.gdf.geometry except AttributeError: exposure.set_geometry_points() exp_buffer = exposure.gdf.buffer(distance=buffer, resolution=0) exp_buffer = exp_buffer.unary_union tc_tracks_lines = self.to_geodataframe().buffer(distance=buffer) select_tracks = tc_tracks_lines.intersects(exp_buffer) tracks_in_exp = [track for j, track in enumerate(self.data) if select_tracks[j]] filtered_tracks = TCTracks(tracks_in_exp) return filtered_tracks
[docs] def read_ibtracs_netcdf(self, *args, **kwargs): """This function is deprecated, use TCTracks.from_ibtracs_netcdf instead.""" LOGGER.warning("The use of TCTracks.read_ibtracs_netcdf is deprecated. " "Use TCTracks.from_ibtracs_netcdf instead.") self.__dict__ = TCTracks.from_ibtracs_netcdf(*args, **kwargs).__dict__
[docs] @classmethod def from_ibtracs_netcdf(cls, provider=None, rescale_windspeeds=True, storm_id=None, year_range=None, basin=None, genesis_basin=None, interpolate_missing=True, estimate_missing=False, correct_pres=False, discard_single_points=True, additional_variables=None, file_name='IBTrACS.ALL.v04r00.nc'): """Create new TCTracks object from IBTrACS databse. When using data from IBTrACS, make sure to be familiar with the scope and limitations of IBTrACS, e.g. by reading the official documentation (https://www.ncdc.noaa.gov/ibtracs/pdf/IBTrACS_version4_Technical_Details.pdf). Reading the CLIMADA documentation can't replace a thorough understanding of the underlying data. This function only provides a (hopefully useful) interface for the data input, but cannot provide any guidance or make recommendations about if and how to use IBTrACS data for your particular project. Resulting tracks are required to have both pressure and wind speed information at all time steps. Therefore, all track positions where one of wind speed or pressure are missing are discarded unless one of `interpolate_missing` or `estimate_missing` are active. Some corrections are automatically applied, such as: `environmental_pressure` is enforced to be larger than `central_pressure`. Note that the tracks returned by this function might contain irregular time steps since that is often the case for the original IBTrACS records: many agencies add an additional time step at landfall. Apply the `equal_timestep` function afterwards to enforce regular time steps. Parameters ---------- provider : str or list of str, optional Either specify an agency, such as "usa", "newdelhi", "bom", "cma", "tokyo", or the special values "official" and "official_3h": * "official" means using the (usually 6-hourly) officially reported values of the officially responsible agencies. * "official_3h" means to include (inofficial) 3-hourly data of the officially responsible agencies (whenever available). If you want to restrict to the officially reported values by the officially responsible agencies (`provider="official"`) without any modifications to the original official data, make sure to also set `estimate_missing=False` and `interpolate_missing=False`. Otherwise, gaps in the official reporting will be filled using interpolation and/or statistical estimation procedures (see below). If a list is given, the following logic is applied: For each storm, the variables that are not reported by the first agency for this storm are taken from the next agency in the list that did report this variable for this storm. For different storms, the same variable might be taken from different agencies. Default: ``['official_3h', 'usa', 'tokyo', 'newdelhi', 'reunion', 'bom', 'nadi', 'wellington', 'cma', 'hko', 'ds824', 'td9636', 'td9635', 'neumann', 'mlc']`` rescale_windspeeds : bool, optional If True, all wind speeds are linearly rescaled to 1-minute sustained winds. Note however that the IBTrACS documentation (Section 5.2, https://www.ncdc.noaa.gov/ibtracs/pdf/IBTrACS_version4_Technical_Details.pdf) includes a warning about this kind of conversion: "While a multiplicative factor can describe the numerical differences, there are procedural and observational differences between agencies that can change through time, which confounds the simple multiplicative factor." Default: True storm_id : str or list of str, optional IBTrACS ID of the storm, e.g. 1988234N13299, [1988234N13299, 1989260N11316]. year_range : tuple (min_year, max_year), optional Year range to filter track selection. Default: None. basin : str, optional If given, select storms that have at least one position in the specified basin. This allows analysis of a given basin, but also means that basin-specific track sets should not be combined across basins since some storms will be in more than one set. If you would like to select storms by their (unique) genesis basin instead, use the parameter `genesis_basin`. For possible values (basin abbreviations), see the parameter `genesis_basin`. If None, this filter is not applied. Default: None. genesis_basin : str, optional The basin where a TC is formed is not defined in IBTrACS. However, this filter option allows to restrict to storms whose first valid eye position is in the specified basin, which simulates the genesis location. Note that the resulting genesis basin of a particular track may depend on the selected `provider` and on `estimate_missing` because only the first *valid* eye position is considered. Possible values are 'NA' (North Atlantic), 'SA' (South Atlantic), 'EP' (Eastern North Pacific, which includes the Central Pacific region), 'WP' (Western North Pacific), 'SP' (South Pacific), 'SI' (South Indian), 'NI' (North Indian). If None, this filter is not applied. Default: None. interpolate_missing : bool, optional If True, interpolate temporal reporting gaps within a variable (such as pressure, wind speed, or radius) linearly if possible. Temporal interpolation is with respect to the time steps defined in IBTrACS for a particular storm. No new time steps are added that are not originally defined in IBTrACS. For each time step with a missing value, this procedure is only able to fill in that value if there are other time steps before and after this time step for which values have been reported. This procedure will be applied before the statistical estimations referred to by `estimate_missing`. It is applied to all variables (eye position, wind speed, environmental and central pressure, storm radius and radius of maximum winds). Default: True estimate_missing : bool, optional For each fixed time step, estimate missing pressure, wind speed and radius using other variables that are available at that time step. The relationships between the variables are purely statistical. In comparison to `interpolate_missing`, this procedure is able to estimate values for variables that haven't been reported by any agency at any time step, as long as other variables are available. A typical example are storms before 1950, for which there are often no reported values for pressure, but for wind speed. In this case, a rough statistical pressure-wind relationship is applied to estimate the missing pressure values from the available wind-speed values. Make sure to set `rescale_windspeeds=True` when using this option because the statistical relationships are calibrated using rescaled wind speeds. Default: False correct_pres : bool, optional For backwards compatibility, alias for `estimate_missing`. This is deprecated, use `estimate_missing` instead! discard_single_points : bool, optional Whether to discard tracks that consists of a single point. Recommended for full compatiblity with other functions such as `equal_timesteps`. Default: True. file_name : str, optional Name of NetCDF file to be dowloaded or located at climada/data/system. Default: 'IBTrACS.ALL.v04r00.nc' additional_variables : list of str, optional If specified, additional IBTrACS data variables are extracted, such as "nature" or "storm_speed". Only variables that are not agency-specific are supported. Default: None. Returns ------- tracks : TCTracks TCTracks with data from IBTrACS """ if correct_pres: LOGGER.warning("`correct_pres` is deprecated. " "Use `estimate_missing` instead.") estimate_missing = True if estimate_missing and not rescale_windspeeds: LOGGER.warning( "Using `estimate_missing` without `rescale_windspeeds` is strongly discouraged!") ibtracs_path = SYSTEM_DIR.joinpath(file_name) if not ibtracs_path.is_file(): try: download_ftp(f'{IBTRACS_URL}/{IBTRACS_FILE}', IBTRACS_FILE) shutil.move(IBTRACS_FILE, ibtracs_path) except ValueError as err: raise ValueError( f'Error while downloading {IBTRACS_URL}. Try to download it manually and ' f'put the file in {ibtracs_path}') from err if additional_variables is None: additional_variables = [] ibtracs_ds = xr.open_dataset(ibtracs_path) ibtracs_date = ibtracs_ds.attrs["date_created"] if (np.datetime64('today') - np.datetime64(ibtracs_date)).item().days > 180: LOGGER.warning("The cached IBTrACS data set dates from %s (older " "than 180 days). Very likely, a more recent version is available. " "Consider manually removing the file %s and re-running " "this function, which will download the most recent version of the " "IBTrACS data set from the official URL.", ibtracs_date, ibtracs_path) match = np.ones(ibtracs_ds.sid.shape[0], dtype=bool) if storm_id is not None: if not isinstance(storm_id, list): storm_id = [storm_id] invalid_mask = np.array( [re.match(r"[12][0-9]{6}[NS][0-9]{5}", s) is None for s in storm_id]) if invalid_mask.any(): invalid_sids = list(np.array(storm_id)[invalid_mask]) raise ValueError("The following given IDs are invalid: %s%s" % ( ", ".join(invalid_sids[:5]), ", ..." if len(invalid_sids) > 5 else ".")) storm_id = list(np.array(storm_id)[~invalid_mask]) storm_id_encoded = [i.encode() for i in storm_id] non_existing_mask = ~np.isin(storm_id_encoded, ibtracs_ds.sid.values) if np.count_nonzero(non_existing_mask) > 0: non_existing_sids = list(np.array(storm_id)[non_existing_mask]) raise ValueError("The following given IDs are not in IBTrACS: %s%s" % ( ", ".join(non_existing_sids[:5]), ", ..." if len(non_existing_sids) > 5 else ".")) storm_id_encoded = list(np.array(storm_id_encoded)[~non_existing_mask]) match &= ibtracs_ds.sid.isin(storm_id_encoded) if year_range is not None: years = ibtracs_ds.sid.str.slice(0, 4).astype(int) match &= (years >= year_range[0]) & (years <= year_range[1]) if np.count_nonzero(match) == 0: LOGGER.info('No tracks in time range (%s, %s).', *year_range) if basin is not None: match &= (ibtracs_ds.basin == basin.encode()).any(dim='date_time') if np.count_nonzero(match) == 0: LOGGER.info('No tracks in basin %s.', basin) if genesis_basin is not None: # Here, we only filter for the basin at *any* eye position. We will filter again later # for the basin of the *first* eye position, but only after restricting to the valid # time steps in the data. match &= (ibtracs_ds.basin == genesis_basin.encode()).any(dim='date_time') if np.count_nonzero(match) == 0: LOGGER.info('No tracks in genesis basin %s.', genesis_basin) if np.count_nonzero(match) == 0: LOGGER.info("IBTrACS doesn't contain any tracks matching the specified requirements.") return cls() ibtracs_ds = ibtracs_ds.sel(storm=match) ibtracs_ds['valid_t'] = ibtracs_ds.time.notnull() if rescale_windspeeds: for agency in IBTRACS_AGENCIES: scale, shift = IBTRACS_AGENCY_1MIN_WIND_FACTOR[agency] ibtracs_ds[f'{agency}_wind'] -= shift ibtracs_ds[f'{agency}_wind'] /= scale if provider is None: provider = ["official_3h"] + IBTRACS_AGENCIES elif isinstance(provider, str): provider = [provider] phys_vars = ['lat', 'lon', 'wind', 'pres', 'rmw', 'poci', 'roci'] for tc_var in phys_vars: if "official" in provider or "official_3h" in provider: ibtracs_add_official_variable( ibtracs_ds, tc_var, add_3h=("official_3h" in provider)) # set up dimension of agency-reported values in order of preference, including the # newly created `official` and `official_3h` data if specified ag_vars = [f'{ag}_{tc_var}' for ag in provider] ag_vars = [ag_var for ag_var in ag_vars if ag_var in ibtracs_ds.data_vars.keys()] if len(ag_vars) == 0: ag_vars = [f'{provider[0]}_{tc_var}'] ibtracs_ds[ag_vars[0]] = xr.full_like(ibtracs_ds[f'usa_{tc_var}'], np.nan) all_vals = ibtracs_ds[ag_vars].to_array(dim='agency') # argmax returns the first True (i.e. valid) along the 'agency' dimension preferred_idx = all_vals.notnull().any(dim="date_time").argmax(dim='agency') ibtracs_ds[tc_var] = all_vals.isel(agency=preferred_idx) selected_ags = np.array([v[:-len(f'_{tc_var}')].encode() for v in ag_vars]) ibtracs_ds[f'{tc_var}_agency'] = ('storm', selected_ags[preferred_idx.values]) if tc_var == 'lon': # Most IBTrACS longitudes are either normalized to [-180, 180] or to [0, 360], but # some aren't normalized at all, so we have to make sure that the values are okay: lons = ibtracs_ds[tc_var].values.copy() lon_valid_mask = np.isfinite(lons) lons[lon_valid_mask] = u_coord.lon_normalize(lons[lon_valid_mask], center=0.0) ibtracs_ds[tc_var].values[:] = lons # Make sure that the longitude is always chosen positive if a track crosses the # antimeridian: crossing_mask = ((ibtracs_ds[tc_var] > 170).any(dim="date_time") & (ibtracs_ds[tc_var] < -170).any(dim="date_time") & (ibtracs_ds[tc_var] < 0)).values ibtracs_ds[tc_var].values[crossing_mask] += 360 if interpolate_missing: with warnings.catch_warnings(): # Upstream issue, see https://github.com/pydata/xarray/issues/4167 warnings.simplefilter(action="ignore", category=FutureWarning) # don't interpolate if there is only a single record for this variable nonsingular_mask = ( ibtracs_ds[tc_var].notnull().sum(dim="date_time") > 1).values if nonsingular_mask.sum() > 0: ibtracs_ds[tc_var].values[nonsingular_mask] = ( ibtracs_ds[tc_var].sel(storm=nonsingular_mask).interpolate_na( dim="date_time", method="linear")) ibtracs_ds = ibtracs_ds[['sid', 'name', 'basin', 'time', 'valid_t'] + additional_variables + phys_vars + [f'{v}_agency' for v in phys_vars]] if estimate_missing: ibtracs_ds['pres'][:] = _estimate_pressure( ibtracs_ds.pres, ibtracs_ds.lat, ibtracs_ds.lon, ibtracs_ds.wind) ibtracs_ds['wind'][:] = _estimate_vmax( ibtracs_ds.wind, ibtracs_ds.lat, ibtracs_ds.lon, ibtracs_ds.pres) ibtracs_ds['valid_t'] &= (ibtracs_ds.lat.notnull() & ibtracs_ds.lon.notnull() & ibtracs_ds.wind.notnull() & ibtracs_ds.pres.notnull()) valid_storms_mask = ibtracs_ds.valid_t.any(dim="date_time") invalid_storms_idx = np.nonzero(~valid_storms_mask.data)[0] if invalid_storms_idx.size > 0: invalid_sids = list(ibtracs_ds.sid.sel(storm=invalid_storms_idx).astype(str).data) LOGGER.warning('%d storm events are discarded because no valid wind/pressure values ' 'have been found: %s%s', len(invalid_sids), ", ".join(invalid_sids[:5]), ", ..." if len(invalid_sids) > 5 else ".") ibtracs_ds = ibtracs_ds.sel(storm=valid_storms_mask) if discard_single_points: valid_storms_mask = ibtracs_ds.valid_t.sum(dim="date_time") > 1 invalid_storms_idx = np.nonzero(~valid_storms_mask.data)[0] if invalid_storms_idx.size > 0: invalid_sids = list(ibtracs_ds.sid.sel(storm=invalid_storms_idx).astype(str).data) LOGGER.warning('%d storm events are discarded because only one valid timestep ' 'has been found: %s%s', len(invalid_sids), ", ".join(invalid_sids[:5]), ", ..." if len(invalid_sids) > 5 else ".") ibtracs_ds = ibtracs_ds.sel(storm=valid_storms_mask) if ibtracs_ds.dims['storm'] == 0: LOGGER.info('After discarding IBTrACS events without valid values by the selected ' 'reporting agencies, there are no tracks left that match the specified ' 'requirements.') return cls() max_wind = ibtracs_ds.wind.max(dim="date_time").data.ravel() category_test = (max_wind[:, None] < np.array(SAFFIR_SIM_CAT)[None]) category = np.argmax(category_test, axis=1) - 1 basin_map = {b.encode("utf-8"): v for b, v in BASIN_ENV_PRESSURE.items()} basin_fun = lambda b: basin_map[b] ibtracs_ds['id_no'] = (ibtracs_ds.sid.str.replace(b'N', b'0') .str.replace(b'S', b'1') .astype(float)) last_perc = 0 all_tracks = [] for i_track, t_msk in enumerate(ibtracs_ds.valid_t.data): perc = 100 * len(all_tracks) / ibtracs_ds.sid.size if perc - last_perc >= 10: LOGGER.info("Progress: %d%%", perc) last_perc = perc track_ds = ibtracs_ds.sel(storm=i_track, date_time=t_msk) tr_basin_penv = xr.apply_ufunc(basin_fun, track_ds.basin, vectorize=True) tr_genesis_basin = track_ds.basin.values[0].astype(str).item() # Now that the valid time steps have been selected, we discard this track if it # doesn't fit the specified basin definitions: if genesis_basin is not None and tr_genesis_basin != genesis_basin: continue if basin is not None and basin.encode() not in track_ds.basin.values: continue # A track that crosses the antimeridian in IBTrACS might be truncated by `t_msk` in # such a way that the remaining part is not crossing the antimeridian: if (track_ds.lon.values > 180).all(): track_ds['lon'] -= 360 # set time_step in hours track_ds['time_step'] = xr.ones_like(track_ds.time, dtype=float) if track_ds.time.size > 1: track_ds.time_step.values[1:] = (track_ds.time.diff(dim="date_time") / np.timedelta64(1, 'h')) track_ds.time_step.values[0] = track_ds.time_step[1] with warnings.catch_warnings(): # See https://github.com/pydata/xarray/issues/4167 warnings.simplefilter(action="ignore", category=FutureWarning) track_ds['rmw'] = track_ds.rmw \ .ffill(dim='date_time', limit=1) \ .bfill(dim='date_time', limit=1) \ .fillna(0) track_ds['roci'] = track_ds.roci \ .ffill(dim='date_time', limit=1) \ .bfill(dim='date_time', limit=1) \ .fillna(0) track_ds['poci'] = track_ds.poci \ .ffill(dim='date_time', limit=4) \ .bfill(dim='date_time', limit=4) # this is the most time consuming line in the processing: track_ds['poci'] = track_ds.poci.fillna(tr_basin_penv) if estimate_missing: track_ds['rmw'][:] = estimate_rmw(track_ds.rmw.values, track_ds.pres.values) track_ds['roci'][:] = estimate_roci(track_ds.roci.values, track_ds.pres.values) track_ds['roci'][:] = np.fmax(track_ds.rmw.values, track_ds.roci.values) # ensure environmental pressure >= central pressure # this is the second most time consuming line in the processing: track_ds['poci'][:] = np.fmax(track_ds.poci, track_ds.pres) provider_str = f"ibtracs_{provider[0]}" if len(provider) > 1: provider_str = "ibtracs_mixed:" + ",".join( "{}({})".format(v, track_ds[f'{v}_agency'].astype(str).item()) for v in phys_vars) data_vars = { 'radius_max_wind': ('time', track_ds.rmw.data), 'radius_oci': ('time', track_ds.roci.data), 'max_sustained_wind': ('time', track_ds.wind.data), 'central_pressure': ('time', track_ds.pres.data), 'environmental_pressure': ('time', track_ds.poci.data), } coords = { 'time': ('time', track_ds.time.dt.round('s').data), 'lat': ('time', track_ds.lat.data), 'lon': ('time', track_ds.lon.data), } attrs = { 'max_sustained_wind_unit': 'kn', 'central_pressure_unit': 'mb', 'orig_event_flag': True, 'data_provider': provider_str, 'category': category[i_track], } # automatically assign the remaining variables as attributes or data variables for varname in ["time_step", "basin", "name", "sid", "id_no"] + additional_variables: values = track_ds[varname].data if track_ds[varname].dtype.kind == "S": # This converts the `bytes` (dtype "|S*") in IBTrACS to the more common `str` # objects (dtype "<U*") that we use in CLIMADA. values = values.astype(str) if values.ndim == 0: attrs[varname] = values.item() else: data_vars[varname] = ('time', values) all_tracks.append(xr.Dataset(data_vars, coords=coords, attrs=attrs)) if last_perc != 100: LOGGER.info("Progress: 100%") if len(all_tracks) == 0: # If all tracks have been discarded in the loop due to the basin filters: LOGGER.info('There were no tracks left in the specified basin ' 'after discarding invalid track positions.') return cls(all_tracks)
[docs] def read_processed_ibtracs_csv(self, *args, **kwargs): """This function is deprecated, use TCTracks.from_processed_ibtracs_csv instead.""" LOGGER.warning("The use of TCTracks.read_processed_ibtracs_csv is deprecated. " "Use TCTracks.from_processed_ibtracs_csv instead.") self.__dict__ = TCTracks.from_processed_ibtracs_csv(*args, **kwargs).__dict__
[docs] @classmethod def from_processed_ibtracs_csv(cls, file_names): """Create TCTracks object from processed ibtracs CSV file(s). Parameters ---------- file_names : str or list of str Absolute file name(s) or folder name containing the files to read. Returns ------- tracks : TCTracks TCTracks with data from the processed ibtracs CSV file. """ return cls([_read_ibtracs_csv_single(f) for f in get_file_names(file_names)])
[docs] def read_simulations_emanuel(self, *args, **kwargs): """This function is deprecated, use TCTracks.from_simulations_emanuel instead.""" LOGGER.warning("The use of TCTracks.read_simulations_emanuel is deprecated. " "Use TCTracks.from_simulations_emanuel instead.") self.__dict__ = TCTracks.from_simulations_emanuel(*args, **kwargs).__dict__
[docs] @classmethod def from_simulations_emanuel(cls, file_names, hemisphere=None, subset=None): """Create new TCTracks object from Kerry Emanuel's tracks. Parameters ---------- file_names : str or list of str Absolute file name(s) or folder name containing the files to read. hemisphere : str or None, optional For global data sets, restrict to northern ('N') or southern ('S') hemisphere. Default: None (no restriction) subset : list of int, optional If given, only include the tracks with the given indices. Since the simulation files can be huge, this feature is useful for running tests on smaller subsets or on random subsamples. Default: None Returns ------- tracks : TCTracks TCTracks with data from Kerry Emanuel's simulations. """ data = [] for path in get_file_names(file_names): data.extend(_read_file_emanuel( path, hemisphere=hemisphere, subset=subset, rmw_corr=Path(path).name in EMANUEL_RMW_CORR_FILES)) return cls(data)
[docs] def read_one_gettelman(self, nc_data, i_track): """This function is deprecated, use TCTracks.from_gettelman instead.""" LOGGER.warning("The use of TCTracks.read_one_gettelman is deprecated. " "Use TCTracks.from_gettelman instead.") self.data.append(_read_one_gettelman(nc_data, i_track))
[docs] @classmethod def from_gettelman(cls, path): """Create new TCTracks object from Andrew Gettelman's tracks. Parameters ---------- path : str or Path Path to one of Andrew Gettelman's NetCDF files. Returns ------- tracks : TCTracks TCTracks with data from Andrew Gettelman's simulations. """ nc_data = nc.Dataset(path) nstorms = nc_data.dimensions['storm'].size return cls([_read_one_gettelman(nc_data, i) for i in range(nstorms)])
[docs] def read_simulations_chaz(self, *args, **kwargs): """This function is deprecated, use TCTracks.from_simulations_chaz instead.""" LOGGER.warning("The use of TCTracks.read_simulations_chaz is deprecated. " "Use TCTracks.from_simulations_chaz instead.") self.__dict__ = TCTracks.from_simulations_chaz(*args, **kwargs).__dict__
[docs] @classmethod def from_simulations_chaz(cls, file_names, year_range=None, ensemble_nums=None): """Create new TCTracks object from CHAZ simulations Lee, C.-Y., Tippett, M.K., Sobel, A.H., Camargo, S.J. (2018): An Environmentally Forced Tropical Cyclone Hazard Model. J Adv Model Earth Sy 10(1): 223–241. Parameters ---------- file_names : str or list of str Absolute file name(s) or folder name containing the files to read. year_range : tuple (min_year, max_year), optional Filter by year, if given. ensemble_nums : list, optional Filter by ensembleNum, if given. Returns ------- tracks : TCTracks TCTracks with data from the CHAZ simulations. """ data = [] for path in get_file_names(file_names): LOGGER.info('Reading %s.', path) chaz_ds = xr.open_dataset(path) chaz_ds.time.attrs["units"] = "days since 1950-1-1" chaz_ds.time.attrs["missing_value"] = -54786.0 chaz_ds = xr.decode_cf(chaz_ds) chaz_ds['id_no'] = chaz_ds.stormID * 1000 + chaz_ds.ensembleNum for var in ['time', 'longitude', 'latitude']: chaz_ds[var] = chaz_ds[var].expand_dims(ensembleNum=chaz_ds.ensembleNum) chaz_ds = chaz_ds.stack(id=("ensembleNum", "stormID")) years_uniq = chaz_ds.time.dt.year.data years_uniq = np.unique(years_uniq[~np.isnan(years_uniq)]) LOGGER.info("File contains %s tracks (at most %s nodes each), " "representing %s years (%d-%d).", chaz_ds.id_no.size, chaz_ds.lifelength.size, years_uniq.size, years_uniq[0], years_uniq[-1]) # filter by year range if given if year_range: match = ((chaz_ds.time.dt.year >= year_range[0]) & (chaz_ds.time.dt.year <= year_range[1])).sel(lifelength=0) if np.count_nonzero(match) == 0: LOGGER.info('No tracks in time range (%s, %s).', *year_range) continue chaz_ds = chaz_ds.sel(id=match) # filter by ensembleNum if given if ensemble_nums is not None: match = np.isin(chaz_ds.ensembleNum.values, ensemble_nums) if np.count_nonzero(match) == 0: LOGGER.info('No tracks with specified ensemble numbers.') continue chaz_ds = chaz_ds.sel(id=match) # remove invalid tracks from selection chaz_ds['valid_t'] = chaz_ds.time.notnull() & chaz_ds.Mwspd.notnull() valid_st = chaz_ds.valid_t.any(dim="lifelength") invalid_st = np.nonzero(~valid_st.data)[0] if invalid_st.size > 0: LOGGER.info('No valid Mwspd values found for %d out of %d storm tracks.', invalid_st.size, valid_st.size) chaz_ds = chaz_ds.sel(id=valid_st) # estimate central pressure from location and max wind chaz_ds['pres'] = xr.full_like(chaz_ds.Mwspd, -1, dtype=float) chaz_ds['pres'][:] = _estimate_pressure( chaz_ds.pres, chaz_ds.latitude, chaz_ds.longitude, chaz_ds.Mwspd) # compute time stepsizes chaz_ds['time_step'] = xr.zeros_like(chaz_ds.time, dtype=float) chaz_ds['time_step'][1:, :] = (chaz_ds.time.diff(dim="lifelength") / np.timedelta64(1, 'h')) chaz_ds['time_step'][0, :] = chaz_ds.time_step[1, :] # determine Saffir-Simpson category max_wind = chaz_ds.Mwspd.max(dim="lifelength").data.ravel() category_test = (max_wind[:, None] < np.array(SAFFIR_SIM_CAT)[None]) chaz_ds['category'] = ("id", np.argmax(category_test, axis=1) - 1) fname = Path(path).name chaz_ds.time[:] = chaz_ds.time.dt.round('s').data chaz_ds['radius_max_wind'] = xr.full_like(chaz_ds.pres, np.nan) chaz_ds['environmental_pressure'] = xr.full_like(chaz_ds.pres, DEF_ENV_PRESSURE) chaz_ds["track_name"] = ("id", [f"{fname}-{track_id.item()[1]}-{track_id.item()[0]}" for track_id in chaz_ds.id]) # add tracks one by one last_perc = 0 for cnt, i_track in enumerate(chaz_ds.id_no): perc = 100 * cnt / chaz_ds.id_no.size if perc - last_perc >= 10: LOGGER.info("Progress: %d%%", perc) last_perc = perc track_ds = chaz_ds.sel(id=i_track.id.item()) track_ds = track_ds.sel(lifelength=track_ds.valid_t.data) data.append(xr.Dataset({ 'time_step': ('time', track_ds.time_step.values), 'max_sustained_wind': ('time', track_ds.Mwspd.values), 'central_pressure': ('time', track_ds.pres.values), 'radius_max_wind': ('time', track_ds.radius_max_wind.values), 'environmental_pressure': ('time', track_ds.environmental_pressure.values), 'basin': ('time', np.full(track_ds.time.size, "GB", dtype="<U2")), }, coords={ 'time': track_ds.time.values, 'lat': ('time', track_ds.latitude.values), 'lon': ('time', track_ds.longitude.values), }, attrs={ 'max_sustained_wind_unit': 'kn', 'central_pressure_unit': 'mb', 'name': track_ds.track_name.item(), 'sid': track_ds.track_name.item(), 'orig_event_flag': True, 'data_provider': "CHAZ", 'id_no': track_ds.id_no.item(), 'category': track_ds.category.item(), })) if last_perc != 100: LOGGER.info("Progress: 100%") return cls(data)
[docs] def read_simulations_storm(self, *args, **kwargs): """This function is deprecated, use TCTracks.from_simulations_storm instead.""" LOGGER.warning("The use of TCTracks.read_simulations_storm is deprecated. " "Use TCTracks.from_simulations_storm instead.") self.__dict__ = TCTracks.from_simulations_storm(*args, **kwargs).__dict__
[docs] @classmethod def from_simulations_storm(cls, path, years=None): """Create new TCTracks object from STORM simulations Bloemendaal et al. (2020): Generation of a global synthetic tropical cyclone hazard dataset using STORM. Scientific Data 7(1): 40. Track data available for download from https://doi.org/10.4121/uuid:82c1dc0d-5485-43d8-901a-ce7f26cda35d Wind speeds are converted to 1-minute sustained winds through division by 0.88 (this value is taken from Bloemendaal et al. (2020), cited above). Parameters ---------- path : str Full path to a txt-file as contained in the `data.zip` archive from the official source linked above. years : list of int, optional If given, only read the specified "years" from the txt-File. Note that a "year" refers to one ensemble of tracks in the data set that represents one sample year. Returns ------- tracks : TCTracks TCTracks with data from the STORM simulations. Notes ----- All tracks are set in the year 1980. The id of the year (starting from 0) is saved in the attribute 'id_no'. To obtain the year of each track use >>> years = [int(tr.attrs['id_no'] / 1000) for tr in tc_tracks.data] >>> # or, alternatively, >>> years = [int(tr.attrs['sid'].split("-")[-2]) for tr in tc_tracks.data] If a windfield is generated from these tracks using the method ``TropCylcone.from_tracks()``, the following should be considered: 1. The frequencies will be set to ``1`` for each storm. Thus, in order to compute annual values, the frequencies of the TropCylone should be changed to ``1/number of years``. 2. The storm year and the storm id are stored in the ``TropCyclone.event_name`` attribute. """ basins = ["EP", "NA", "NI", "SI", "SP", "WP"] tracks_df = pd.read_csv(path, names=['year', 'time_start', 'tc_num', 'time_delta', 'basin', 'lat', 'lon', 'pres', 'wind', 'rmw', 'category', 'landfall', 'dist_to_land'], converters={ "time_start": lambda d: dt.datetime(1980, int(float(d)), 1, 0), "time_delta": lambda d: dt.timedelta(hours=3 * float(d)), "basin": lambda d: basins[int(float(d))], }, dtype={ "year": int, "tc_num": int, "category": int, }) # filter specified years if years is not None: tracks_df = tracks_df[np.isin(tracks_df['year'], years)] # a bug in the data causes some storm tracks to be double-listed: tracks_df = tracks_df.drop_duplicates(subset=["year", "tc_num", "time_delta"]) # conversion of units tracks_df['rmw'] *= (1 * ureg.kilometer).to(ureg.nautical_mile).magnitude tracks_df['wind'] *= (1 * ureg.meter / ureg.second).to(ureg.knot).magnitude # convert from 10-minute to 1-minute sustained winds, see Bloemendaal et al. (2020) tracks_df['wind'] /= STORM_1MIN_WIND_FACTOR # conversion to absolute times tracks_df['time'] = tracks_df['time_start'] + tracks_df['time_delta'] tracks_df = tracks_df.drop( labels=['time_start', 'time_delta', 'landfall', 'dist_to_land'], axis=1) # add tracks one by one last_perc = 0 fname = Path(path).name groups = tracks_df.groupby(by=["year", "tc_num"]) data = [] for idx, group in groups: perc = 100 * len(data) / len(groups) if perc - last_perc >= 10: LOGGER.info("Progress: %d%%", perc) last_perc = perc track_name = f"{fname}-{idx[0]}-{idx[1]}" env_pressure = np.array([ BASIN_ENV_PRESSURE[basin] if basin in BASIN_ENV_PRESSURE else DEF_ENV_PRESSURE for basin in group['basin'].values]) data.append(xr.Dataset({ 'time_step': ('time', np.full(group['time'].shape, 3)), 'max_sustained_wind': ('time', group['wind'].values), 'central_pressure': ('time', group['pres'].values), 'radius_max_wind': ('time', group['rmw'].values), 'environmental_pressure': ('time', env_pressure), 'basin': ("time", group['basin'].values.astype("<U2")), }, coords={ 'time': ('time', group['time'].values), 'lat': ('time', group['lat'].values), 'lon': ('time', group['lon'].values), }, attrs={ 'max_sustained_wind_unit': 'kn', 'central_pressure_unit': 'mb', 'name': track_name, 'sid': track_name, 'orig_event_flag': True, 'data_provider': "STORM", 'id_no': idx[0] * 1000 + idx[1], 'category': group['category'].max(), })) if last_perc != 100: LOGGER.info("Progress: 100%") return cls(data)
[docs] def equal_timestep(self, time_step_h=1, land_params=False, pool=None): """Resample all tracks at the specified temporal resolution The resulting track data will be given at evenly distributed time steps, relative to midnight (00:00). For example, if `time_step_h` is 1 and the original track data starts at 06:30, the interpolated track will not have a time step at 06:30 because only multiples of 01:00 (relative to midnight) are included. In this case, the interpolated track will start at 07:00. Depending on the original resolution of the track data, this method may up- or downsample track time steps. Note that tracks that already have the specified resolution remain unchanged. Parameters ---------- time_step_h : float or int, optional Temporal resolution in hours (positive, may be non-integer-valued). Default: 1. land_params : bool, optional If True, recompute `on_land` and `dist_since_lf` at each node. Default: False. pool : pathos.pool, optional Pool that will be used for parallel computation when applicable. If not given, the pool attribute of `self` will be used. Default: None """ pool = self.pool if pool is None else pool if time_step_h <= 0: raise ValueError(f"time_step_h is not a positive number: {time_step_h}") # set step size to None for tracks that already have the specified resolution l_time_step_h = [ None if np.allclose(np.unique(tr['time_step'].values), time_step_h) else time_step_h for tr in self.data ] n_skip = np.sum([ts is None for ts in l_time_step_h]) if n_skip == self.size: LOGGER.info('All tracks are already at the requested temporal resolution.') return if n_skip > 0: LOGGER.info('%d track%s already at the requested temporal resolution.', n_skip, "s are" if n_skip > 1 else " is") LOGGER.info('Interpolating %d tracks to %sh time steps.', self.size - n_skip, time_step_h) if land_params: extent = self.get_extent() land_geom = u_coord.get_land_geometry(extent=extent, resolution=10) else: land_geom = None if pool: chunksize = max(min(self.size // pool.ncpus, 1000), 1) self.data = pool.map( self._one_interp_data, self.data, l_time_step_h, itertools.repeat(land_geom, self.size), chunksize=chunksize ) else: last_perc = 0 new_data = [] for track, ts_h in zip(self.data, l_time_step_h): # progress indicator perc = 100 * len(new_data) / len(self.data) if perc - last_perc >= 10: LOGGER.debug("Progress: %d%%", perc) last_perc = perc track_int = self._one_interp_data(track, ts_h, land_geom) new_data.append(track_int) self.data = new_data
[docs] def calc_random_walk(self, **kwargs): """Deprecated. Use `TCTracks.calc_perturbed_trajectories` instead.""" LOGGER.warning("The use of TCTracks.calc_random_walk is deprecated." "Use TCTracks.calc_perturbed_trajectories instead.") if kwargs.get('ens_size'): kwargs['nb_synth_tracks'] = kwargs.pop('ens_size') return self.calc_perturbed_trajectories(**kwargs)
[docs] def calc_perturbed_trajectories(self, **kwargs): """See function in `climada.hazard.tc_tracks_synth`.""" climada.hazard.tc_tracks_synth.calc_perturbed_trajectories(self, **kwargs)
@property def size(self): """Get longitude from coord array.""" return len(self.data)
[docs] def get_bounds(self, deg_buffer=0.1): """Get bounds as (lon_min, lat_min, lon_max, lat_max) tuple. Parameters ---------- deg_buffer : float A buffer to add around the bounding box Returns ------- bounds : tuple (lon_min, lat_min, lon_max, lat_max) """ bounds = u_coord.latlon_bounds( np.concatenate([t.lat.values for t in self.data]), np.concatenate([t.lon.values for t in self.data]), buffer=deg_buffer) return bounds
@property def bounds(self): """Exact bounds of trackset as tuple, no buffer.""" return self.get_bounds(deg_buffer=0.0)
[docs] def get_extent(self, deg_buffer=0.1): """Get extent as (lon_min, lon_max, lat_min, lat_max) tuple. Parameters ---------- deg_buffer : float A buffer to add around the bounding box Returns ------- extent : tuple (lon_min, lon_max, lat_min, lat_max) """ return u_coord.toggle_extent_bounds(self.get_bounds(deg_buffer=deg_buffer))
@property def extent(self): """Exact extent of trackset as tuple, no buffer.""" return self.get_extent(deg_buffer=0.0)
[docs] def generate_centroids(self, res_deg, buffer_deg): """Generate gridded centroids within padded bounds of tracks Parameters ---------- res_deg : float Resolution in degrees. buffer_deg : float Buffer around tracks in degrees. Returns ------- centroids : Centroids Centroids instance. """ bounds = self.get_bounds(deg_buffer=buffer_deg) lat = np.arange(bounds[1] + 0.5 * res_deg, bounds[3], res_deg) lon = np.arange(bounds[0] + 0.5 * res_deg, bounds[2], res_deg) lon, lat = [ar.ravel() for ar in np.meshgrid(lon, lat)] return Centroids(lat=lat, lon=lon)
[docs] def plot(self, axis=None, figsize=(9, 13), legend=True, adapt_fontsize=True, **kwargs): """Track over earth. Historical events are blue, probabilistic black. Parameters ---------- axis : matplotlib.axes._subplots.AxesSubplot, optional axis to use figsize: (float, float), optional figure size for plt.subplots The default is (9, 13) legend : bool, optional whether to display a legend of Tropical Cyclone categories. Default: True. kwargs : optional arguments for LineCollection matplotlib, e.g. alpha=0.5 adapt_fontsize : bool, optional If set to true, the size of the fonts will be adapted to the size of the figure. Otherwise the default matplotlib font size is used. Default is True. Returns ------- axis : matplotlib.axes._subplots.AxesSubplot """ if 'lw' not in kwargs: kwargs['lw'] = 2 if 'transform' not in kwargs: kwargs['transform'] = ccrs.PlateCarree() if not self.size: LOGGER.info('No tracks to plot') return None extent = self.get_extent(deg_buffer=1) mid_lon = 0.5 * (extent[1] + extent[0]) if not axis: proj = ccrs.PlateCarree(central_longitude=mid_lon) _, axis, _ = u_plot.make_map(proj=proj, figsize=figsize, adapt_fontsize=adapt_fontsize) else: proj = axis.projection axis.set_extent(extent, crs=kwargs['transform']) u_plot.add_shapes(axis) cmap = ListedColormap(colors=CAT_COLORS) norm = BoundaryNorm([0] + SAFFIR_SIM_CAT, len(SAFFIR_SIM_CAT)) for track in self.data: lonlat = np.stack([track.lon.values, track.lat.values], axis=-1) lonlat[:, 0] = u_coord.lon_normalize(lonlat[:, 0], center=mid_lon) segments = np.stack([lonlat[:-1], lonlat[1:]], axis=1) # Truncate segments which cross the antimeridian. # Note: Since we apply `lon_normalize` above and shift the central longitude of the # plot to `mid_lon`, this is not necessary (and will do nothing) in cases where all # tracks are located in a region around the antimeridian, like the Pacific ocean. # The only case where this is relevant: Crowded global data sets where `mid_lon` # falls back to 0, i.e. using the [-180, 180] range. mask = (segments[:, 0, 0] > 100) & (segments[:, 1, 0] < -100) segments[mask, 1, 0] = 180 mask = (segments[:, 0, 0] < -100) & (segments[:, 1, 0] > 100) segments[mask, 1, 0] = -180 track_lc = LineCollection( segments, linestyle='solid' if track.orig_event_flag else ':', cmap=cmap, norm=norm, **kwargs) track_lc.set_array(track.max_sustained_wind.values) axis.add_collection(track_lc) if legend: leg_lines = [Line2D([0], [0], color=CAT_COLORS[i_col], lw=2) for i_col in range(len(SAFFIR_SIM_CAT))] leg_names = [CAT_NAMES[i_col] for i_col in sorted(CAT_NAMES.keys())] if any(not tr.orig_event_flag for tr in self.data): leg_lines.append(Line2D([0], [0], color='grey', lw=2, ls='solid')) leg_lines.append(Line2D([0], [0], color='grey', lw=2, ls=':')) leg_names.append('Historical') leg_names.append('Synthetic') axis.legend(leg_lines, leg_names, loc=0) plt.tight_layout() return axis
[docs] def write_netcdf(self, folder_name): """Write a netcdf file per track with track.sid name in given folder. Parameters ---------- folder_name : str Folder name where to write files. """ list_path = [Path(folder_name, track.sid + '.nc') for track in self.data] LOGGER.info('Writting %s files.', self.size) for track in self.data: track.attrs['orig_event_flag'] = int(track.orig_event_flag) xr.save_mfdataset(self.data, list_path)
[docs] def read_netcdf(self, *args, **kwargs): """This function is deprecated, use TCTracks.from_netcdf instead.""" LOGGER.warning("The use of TCTracks.read_netcdf is deprecated. " "Use TCTracks.from_netcdf instead.") self.__dict__ = TCTracks.from_netcdf(*args, **kwargs).__dict__
[docs] @classmethod def from_netcdf(cls, folder_name): """Create new TCTracks object from NetCDF files contained in a given folder Warning ------- Do not use this classmethod for reading IBTrACS NetCDF files! If you need to manually download IBTrACS NetCDF files, place them in the ``~/climada/data/system`` folder and use the ``TCTracks.from_ibtracks_netcdf`` classmethod. Parameters ---------- folder_name : str Folder name from where to read files. Returns ------- tracks : TCTracks TCTracks with data from the given directory of NetCDF files. """ file_tr = get_file_names(folder_name) LOGGER.info('Reading %s files.', len(file_tr)) data = [] for file in file_tr: if Path(file).suffix != '.nc': continue track = xr.open_dataset(file) track.attrs['orig_event_flag'] = bool(track.orig_event_flag) if "basin" in track.attrs: LOGGER.warning("Track data comes with legacy basin attribute. " "We assume that the track remains in that basin during its " "whole life time.") basin = track.basin del track.attrs['basin'] track['basin'] = ("time", np.full(track.time.size, basin, dtype="<U2")) data.append(track) return cls(data)
[docs] def write_hdf5(self, file_name, complevel=5): """Write TC tracks in NetCDF4-compliant HDF5 format. Parameters ---------- file_name: str or Path Path to a new HDF5 file. If it exists already, the file is overwritten. complevel : int Specifies a compression level (0-9) for the zlib compression of the data. A value of 0 or None disables compression. Default: 5 """ data = [] for track in self.data: # convert "time" into a data variable and use a neutral name for the steps track = track.rename(time="step") track["time"] = ("step", track["step"].values) track["step"] = np.arange(track.sizes["step"]) # change dtype from bool to int to be NetCDF4-compliant track.attrs['orig_event_flag'] = int(track.attrs['orig_event_flag']) data.append(track) # concatenate all data sets along new dimension "storm" ds_combined = xr.combine_nested(data, concat_dim=["storm"]) ds_combined["storm"] = np.arange(ds_combined.sizes["storm"]) # convert attributes to data variables of combined dataset df_attrs = pd.DataFrame([t.attrs for t in data], index=ds_combined["storm"].to_series()) ds_combined = xr.merge([ds_combined, df_attrs.to_xarray()]) encoding = {v: dict(zlib=True, complevel=complevel) for v in ds_combined.data_vars} LOGGER.info('Writing %d tracks to %s', self.size, file_name) ds_combined.to_netcdf(file_name, encoding=encoding)
[docs] @classmethod def from_hdf5(cls, file_name): """Create new TCTracks object from a NetCDF4-compliant HDF5 file Parameters ---------- file_name : str or Path Path to a file that has been generated with `TCTracks.write_hdf`. Returns ------- tracks : TCTracks TCTracks with data from the given HDF5 file. """ _raise_if_legacy_or_unknown_hdf5_format(file_name) ds_combined = xr.open_dataset(file_name) # when writing '<U*' and reading in again, xarray reads as dtype 'object'. undo this: for varname in ds_combined.data_vars: if ds_combined[varname].dtype == "object": ds_combined[varname] = ds_combined[varname].astype(str) data = [] for i in range(ds_combined.sizes["storm"]): # extract a single storm and restrict to valid time steps track = ( ds_combined .isel(storm=i) .dropna(dim="step", how="any", subset=["time", "lat", "lon"]) ) # convert the "time" variable to a coordinate track = track.drop_vars(["storm", "step"]).rename(step="time") track = track.assign_coords(time=track["time"]).compute() # convert 0-dimensional variables to attributes: attr_vars = [v for v in track.data_vars if track[v].ndim == 0] track = ( track .assign_attrs({v: track[v].item() for v in attr_vars}) .drop_vars(attr_vars) ) track.attrs['orig_event_flag'] = bool(track.attrs['orig_event_flag']) data.append(track) return cls(data)
[docs] def to_geodataframe(self, as_points=False, split_lines_antimeridian=True): """Transform this TCTracks instance into a GeoDataFrame. Parameters ---------- as_points : bool, optional If False (default), one feature (row) per track with a LineString or MultiLineString as geometry (or Point geometry for tracks of length one) and all track attributes (sid, name, orig_event_flag, etc) as dataframe columns. If True, one feature (row) per track time step, with variable values per time step (radius_max_wind, max_sustained_wind, etc) as columns in addition to attributes. split_lines_antimeridian : bool, optional If True, tracks that cross the antimeridian are split into multiple Lines as a MultiLineString, with each Line on either side of the meridian. This ensures all Lines are within (-180, +180) degrees longitude. Note that lines might be split at more locations than strictly necessary, due to the underlying splitting algorithm (https://github.com/Toblerity/Shapely/issues/572). Returns ------- gdf : GeoDataFrame """ gdf = gpd.GeoDataFrame( [dict(track.attrs) for track in self.data] ) if as_points: gdf_long = pd.concat([track.to_dataframe().assign(idx=i) for i, track in enumerate(self.data)]) gdf_long['lon'] = u_coord.lon_normalize(gdf_long['lon'].values.copy()) gdf_long['geometry'] = gdf_long.apply(lambda x: Point(x['lon'], x['lat']), axis=1) gdf_long = gdf_long.drop(columns=['lon', 'lat']) gdf_long = gpd.GeoDataFrame(gdf_long.reset_index().set_index('idx'), geometry='geometry', crs=DEF_CRS) gdf = gdf_long.join(gdf) elif split_lines_antimeridian: # enforce longitudes to be within [-180, 180] range t_lons = [u_coord.lon_normalize(t.lon.values.copy()) for t in self.data] t_lats = [t.lat.values for t in self.data] # LineString only works with more than one lat/lon pair gdf.geometry = gpd.GeoSeries([ LineString(np.c_[lons, lats]) if lons.size > 1 else Point(lons, lats) for lons, lats in zip(t_lons, t_lats) ]) gdf.crs = DEF_CRS # for splitting, restrict to tracks that come close to the antimeridian t_split_mask = np.asarray([ (lon > 170).any() and (lon < -170).any() and lon.size > 1 for lon in t_lons]) # note that tracks might be splitted at self-intersections as well: # https://github.com/Toblerity/Shapely/issues/572 antimeridian = LineString([(180, -90), (180, 90)]) gdf.loc[t_split_mask, "geometry"] = gdf.geometry[t_split_mask] \ .to_crs({"proj": "longlat", "lon_wrap": 180}) \ .apply(lambda line: MultiLineString([ LineString([(x - 360, y) for x, y in segment.coords]) if any(x > 180 for x, y in segment.coords) else segment for segment in shapely.ops.split(line, antimeridian).geoms ])) else: # LineString only works with more than one lat/lon pair gdf.geometry = gpd.GeoSeries([ LineString(np.c_[track.lon, track.lat]) if track.lon.size > 1 else Point(track.lon.data, track.lat.data) for track in self.data ]) gdf.crs = DEF_CRS return gdf
@staticmethod @numba.jit(forceobj=True) def _one_interp_data(track, time_step_h, land_geom=None): """Interpolate values of one track. Parameters ---------- track : xr.Dataset Track data. time_step_h : int, float or None Desired temporal resolution in hours (may be non-integer-valued). If None, no interpolation is done and the input track dataset is returned unchanged. land_geom : shapely.geometry.multipolygon.MultiPolygon, optional Land geometry. If given, recompute `dist_since_lf` and `on_land` property. Returns ------- track_int : xr.Dataset """ if time_step_h is None: return track if track.time.size < 2: LOGGER.warning('Track interpolation not done. ' 'Not enough elements for %s', track.name) track_int = track else: method = ['linear', 'quadratic', 'cubic'][min(2, track.time.size - 2)] # handle change of sign in longitude lon = u_coord.lon_normalize(track.lon.copy(), center=0) if (lon < -170).any() and (lon > 170).any(): # crosses 180 degrees east/west -> use positive degrees east lon[lon < 0] += 360 time_step = pd.tseries.frequencies.to_offset(pd.Timedelta(hours=time_step_h)).freqstr track_int = track.resample(time=time_step, skipna=True)\ .interpolate('linear') for var in track.data_vars: if "time" in track[var].dims and track[var].dtype.kind != "f": track_int[var] = track[var].resample(time=time_step).nearest() track_int['time_step'][:] = time_step_h lon_int = lon.resample(time=time_step).interpolate(method) lon_int[lon_int > 180] -= 360 track_int.coords['lon'] = lon_int track_int.coords['lat'] = track.lat.resample(time=time_step)\ .interpolate(method) track_int.attrs['category'] = set_category( track_int.max_sustained_wind.values, track_int.max_sustained_wind_unit) # restrict to time steps within original bounds track_int = track_int.sel( time=(track.time[0] <= track_int.time) & (track_int.time <= track.time[-1])) if land_geom: track_land_params(track_int, land_geom) return track_int
def _raise_if_legacy_or_unknown_hdf5_format(file_name): """Raise an exception if the HDF5 format of the file is not supported Since the HDF5 format changed without preserving backwards compatibility, a verbose error message is produced if users attempt to open a file in the legacy format. This function also raises an error if the file is not supported for other (unknown) reasons. Parameters ---------- file_name : str or Path Path to a NetCDF-compliant HDF5 file. Raises ------ ValueError """ test_ds = xr.open_dataset(file_name) if len(test_ds.dims) > 0: # The legacy format only has data in the subgroups, not in the root. # => This cannot be the legacy file format! return # After this line, it is sure that the format is not supported (since there is no data in the # root group). Before raising an exception, we double-check if it is the legacy format. try: # Check if the file has groups 'track{i}' by trying to access the first group. with xr.open_dataset(file_name, group="track0") as ds_track: # Check if the group actually contains track data: is_legacy = ( "time" in ds_track.dims and "max_sustained_wind" in ds_track.variables ) except OSError as err: if "group not found" in str(err): # No 'track0' group. => This cannot be the legacy file format! is_legacy = False else: # A different (unexpected) error occurred. => Re-raise the exception. raise raise ValueError( ( f"The file you try to read ({file_name}) is in a format that is no longer" " supported by CLIMADA. Please store the data again using" " TCTracks.write_hdf5. If you struggle to convert the data, please open an" " issue on GitHub." ) if is_legacy else ( f"Unknown HDF5/NetCDF file format: {file_name}" ) ) def _read_one_gettelman(nc_data, i_track): """Read a single track from Andrew Gettelman's NetCDF dataset Parameters ---------- nc_data : nc.Dataset Opened NetCDF dataset. i_track : int Track number within the dataset. Returns ------- xr.Dataset """ scale_to_10m = (10. / 60.)**.11 mps2kts = 1.94384 basin_dict = {0: 'NA - North Atlantic', 1: 'SA - South Atlantic', 2: 'WP - West Pacific', 3: 'EP - East Pacific', 4: 'SP - South Pacific', 5: 'NI - North Indian', 6: 'SI - South Indian', 7: 'AS - Arabian Sea', 8: 'BB - Bay of Bengal', 9: 'EA - Eastern Australia', 10: 'WA - Western Australia', 11: 'CP - Central Pacific', 12: 'CS - Carribbean Sea', 13: 'GM - Gulf of Mexico', 14: 'MM - Missing'} val_len = nc_data.variables['numObs'][i_track] sid = str(i_track) times = nc_data.variables['source_time'][i_track, :][:val_len] datetimes = list() for time in times: try: datetimes.append( dt.datetime.strptime( str(nc.num2date(time, 'days since {}'.format('1858-11-17'), calendar='standard')), '%Y-%m-%d %H:%M:%S')) except ValueError: # If wrong t, set t to previous t plus 3 hours if datetimes: datetimes.append(datetimes[-1] + dt.timedelta(hours=3)) else: pos = list(times).index(time) time = times[pos + 1] - 1 / 24 * 3 datetimes.append( dt.datetime.strptime( str(nc.num2date(time, 'days since {}'.format('1858-11-17'), calendar='standard')), '%Y-%m-%d %H:%M:%S')) time_step = [] for i_time, time in enumerate(datetimes[1:], 1): time_step.append((time - datetimes[i_time - 1]).total_seconds() / 3600) time_step.append(time_step[-1]) basins_numeric = nc_data.variables['basin'][i_track, :val_len] basins = [basin_dict[b] if b in basin_dict else basin_dict[14] for b in basins_numeric] lon = nc_data.variables['lon'][i_track, :][:val_len] lon[lon > 180] = lon[lon > 180] - 360 # change lon format to -180 to 180 lat = nc_data.variables['lat'][i_track, :][:val_len] cen_pres = nc_data.variables['pres'][i_track, :][:val_len] av_prec = nc_data.variables['precavg'][i_track, :][:val_len] max_prec = nc_data.variables['precmax'][i_track, :][:val_len] # m/s to kn wind = nc_data.variables['wind'][i_track, :][:val_len] * mps2kts * scale_to_10m if not all(wind.data): # if wind is empty wind = np.ones(wind.size) * -999.9 tr_df = pd.DataFrame({'time': datetimes, 'lat': lat, 'lon': lon, 'max_sustained_wind': wind, 'central_pressure': cen_pres, 'environmental_pressure': np.ones(lat.size) * 1015., 'radius_max_wind': np.ones(lat.size) * 65., 'maximum_precipitation': max_prec, 'average_precipitation': av_prec, 'basin': [b[:2] for b in basins], 'time_step': time_step}) # construct xarray tr_ds = xr.Dataset.from_dataframe(tr_df.set_index('time')) tr_ds.coords['lat'] = ('time', tr_ds.lat.values) tr_ds.coords['lon'] = ('time', tr_ds.lon.values) tr_ds['basin'] = tr_ds['basin'].astype('<U2') tr_ds.attrs = {'max_sustained_wind_unit': 'kn', 'central_pressure_unit': 'mb', 'sid': sid, 'name': sid, 'orig_event_flag': False, 'id_no': i_track, 'category': set_category(wind, 'kn')} return tr_ds def _read_file_emanuel(path, hemisphere=None, rmw_corr=False, subset=None): """Read track data from file containing Kerry Emanuel simulations. Parameters ---------- path : str absolute path of file to read. hemisphere : str or None, optional For global data sets, restrict to northern ('N') or southern ('S') hemisphere. Default: None (no restriction) rmw_corr : str, optional If True, multiply the radius of maximum wind by factor 2. Default: False. subset : list of int, optional If given, only include the tracks with the given indices. Since the simulation files can be huge, this feature is useful for running tests on smaller subsets or on random subsamples. Default: None Returns ------- list(xr.Dataset) """ LOGGER.info('Reading %s.', path) data_mat = matlab.loadmat(path) basin = str(data_mat['bas'][0]) hem_min, hem_max = -90, 90 # for backwards compatibility, also check for value 'both' if basin == "GB" and hemisphere != 'both' and hemisphere is not None: if hemisphere == 'S': hem_min, hem_max = -90, 0 basin = "S" elif hemisphere == 'N': hem_min, hem_max = 0, 90 basin = "N" else: raise ValueError(f"Unknown hemisphere: '{hemisphere}'. Use 'N' or 'S' or None.") lat = data_mat['latstore'] ntracks, nnodes = lat.shape years_uniq = np.unique(data_mat['yearstore']) LOGGER.info("File contains %s tracks (at most %s nodes each), " "representing %s years (%s-%s).", ntracks, nnodes, years_uniq.size, years_uniq[0], years_uniq[-1]) # filter according to chosen hemisphere hem_mask = (lat >= hem_min) & (lat <= hem_max) | (lat == 0) hem_idx = np.all(hem_mask, axis=1).nonzero()[0] data_hem = lambda keys: [data_mat[f'{k}store'][hem_idx] for k in keys] lat, lon = data_hem(['lat', 'long']) months, days, hours = data_hem(['month', 'day', 'hour']) months, days, hours = [np.int8(ar) for ar in [months, days, hours]] tc_rmw, tc_maxwind, tc_pressure = data_hem(['rm', 'v', 'p']) years = data_mat['yearstore'][0, hem_idx] ntracks, nnodes = lat.shape LOGGER.info("Loading %s tracks%s.", ntracks, f" on {hemisphere} hemisphere" if hemisphere in ['N', 'S'] else "") # change lon format to -180 to 180 lon[lon > 180] = lon[lon > 180] - 360 # change units from kilometers to nautical miles tc_rmw = (tc_rmw * ureg.kilometer).to(ureg.nautical_mile).magnitude if rmw_corr: LOGGER.info("Applying RMW correction.") tc_rmw *= EMANUEL_RMW_CORR_FACTOR data = [] for i_track in range(lat.shape[0]): if subset is not None and i_track not in subset: continue valid_idx = (lat[i_track, :] != 0).nonzero()[0] nnodes = valid_idx.size time_step = np.float64(np.abs(np.diff(hours[i_track, valid_idx])).min()) # deal with change of year year = np.full(valid_idx.size, years[i_track]) year_change = (np.diff(months[i_track, valid_idx]) < 0) year_change = year_change.nonzero()[0] if year_change.size > 0: year[year_change[0] + 1:] += 1 try: datetimes = map(dt.datetime, year, months[i_track, valid_idx], days[i_track, valid_idx], hours[i_track, valid_idx]) datetimes = list(datetimes) except ValueError as err: # dates are known to contain invalid February 30 date_feb = (months[i_track, valid_idx] == 2) \ & (days[i_track, valid_idx] > 28) if np.count_nonzero(date_feb) == 0: # unknown invalid date issue raise err step = time_step if not date_feb[0] else -time_step reference_idx = 0 if not date_feb[0] else -1 reference_date = dt.datetime( year[reference_idx], months[i_track, valid_idx[reference_idx]], days[i_track, valid_idx[reference_idx]], hours[i_track, valid_idx[reference_idx]],) datetimes = [reference_date + dt.timedelta(hours=int(step * i)) for i in range(nnodes)] datetimes = [cftime.DatetimeProlepticGregorian(d.year, d.month, d.day, d.hour) for d in datetimes] max_sustained_wind = tc_maxwind[i_track, valid_idx] max_sustained_wind_unit = 'kn' env_pressure = np.full(nnodes, DEF_ENV_PRESSURE) category = set_category(max_sustained_wind, max_sustained_wind_unit, SAFFIR_SIM_CAT) tr_ds = xr.Dataset({ 'time_step': ('time', np.full(nnodes, time_step)), 'radius_max_wind': ('time', tc_rmw[i_track, valid_idx]), 'max_sustained_wind': ('time', max_sustained_wind), 'central_pressure': ('time', tc_pressure[i_track, valid_idx]), 'environmental_pressure': ('time', env_pressure), 'basin': ('time', np.full(nnodes, basin, dtype="<U2")), }, coords={ 'time': datetimes, 'lat': ('time', lat[i_track, valid_idx]), 'lon': ('time', lon[i_track, valid_idx]), }, attrs={ 'max_sustained_wind_unit': max_sustained_wind_unit, 'central_pressure_unit': 'mb', 'name': str(hem_idx[i_track]), 'sid': str(hem_idx[i_track]), 'orig_event_flag': True, 'data_provider': 'Emanuel', 'id_no': hem_idx[i_track], 'category': category, }) data.append(tr_ds) return data def _read_ibtracs_csv_single(file_name): """Read single track from IBTrACS file in (legacy) CSV format. Parameters ---------- file_name : str File name of CSV file. Returns ------- xr.Dataset """ LOGGER.info('Reading %s', file_name) # keep_default_na=False avoids interpreting the North Atlantic ('NA') basin as a NaN-value dfr = pd.read_csv(file_name, keep_default_na=False) name = dfr['ibtracsID'].values[0] datetimes = list() for time in dfr['isotime'].values: year = np.fix(time / 1e6) time = time - year * 1e6 month = np.fix(time / 1e4) time = time - month * 1e4 day = np.fix(time / 1e2) hour = time - day * 1e2 datetimes.append(dt.datetime(int(year), int(month), int(day), int(hour))) lat = dfr['cgps_lat'].values.astype('float') lon = dfr['cgps_lon'].values.astype('float') cen_pres = dfr['pcen'].values.astype('float') max_sus_wind = dfr['vmax'].values.astype('float') max_sus_wind_unit = 'kn' if np.any(cen_pres <= 0): # Warning: If any pressure value is invalid, this enforces to use # estimated pressure values everywhere! cen_pres[:] = -999 cen_pres = _estimate_pressure(cen_pres, lat, lon, max_sus_wind) tr_ds = xr.Dataset() tr_ds.coords['time'] = ('time', datetimes) tr_ds.coords['lat'] = ('time', lat) tr_ds.coords['lon'] = ('time', lon) tr_ds['time_step'] = ('time', dfr['tint'].values) tr_ds['radius_max_wind'] = ('time', dfr['rmax'].values.astype('float')) tr_ds['max_sustained_wind'] = ('time', max_sus_wind) tr_ds['central_pressure'] = ('time', cen_pres) tr_ds['environmental_pressure'] = ('time', dfr['penv'].values.astype('float')) tr_ds['basin'] = ('time', dfr['gen_basin'].values.astype('<U2')) tr_ds.attrs['max_sustained_wind_unit'] = max_sus_wind_unit tr_ds.attrs['central_pressure_unit'] = 'mb' tr_ds.attrs['name'] = name tr_ds.attrs['sid'] = name tr_ds.attrs['orig_event_flag'] = bool(dfr['original_data']. values[0]) tr_ds.attrs['data_provider'] = dfr['data_provider'].values[0] try: tr_ds.attrs['id_no'] = float(name.replace('N', '0').replace('S', '1')) except ValueError: tr_ds.attrs['id_no'] = float(str(datetimes[0].date()).replace('-', '')) tr_ds.attrs['category'] = set_category(max_sus_wind, max_sus_wind_unit) return tr_ds def track_land_params(track, land_geom): """Compute parameters of land for one track. Parameters ---------- track : xr.Dataset tropical cyclone track land_geom : shapely.geometry.multipolygon.MultiPolygon land geometry """ track['on_land'] = ('time', u_coord.coord_on_land(track.lat.values, track.lon.values, land_geom)) track['dist_since_lf'] = ('time', _dist_since_lf(track)) def _dist_since_lf(track): """Compute the distance to landfall in km point for every point on land. Parameters ---------- track : xr.Dataset Single tropical cyclone track. Returns ------- dist : np.arrray Distances in km, points on water get nan values. """ dist_since_lf = np.zeros(track.time.values.shape) # Index in sea that follows a land index sea_land_idx, land_sea_idx = _get_landfall_idx(track, True) if not sea_land_idx.size: return (dist_since_lf + 1) * np.nan orig_lf = np.empty((sea_land_idx.size, 2)) for i_lf, lf_point in enumerate(sea_land_idx): if lf_point > 0: # Assume the landfall started between this and the previous point orig_lf[i_lf][0] = track.lat[lf_point - 1] + \ (track.lat[lf_point] - track.lat[lf_point - 1]) / 2 orig_lf[i_lf][1] = track.lon[lf_point - 1] + \ (track.lon[lf_point] - track.lon[lf_point - 1]) / 2 else: # track starts over land, assume first 'landfall' starts here orig_lf[i_lf][0] = track.lat[lf_point] orig_lf[i_lf][1] = track.lon[lf_point] dist = DistanceMetric.get_metric('haversine') nodes1 = np.radians(np.array([track.lat.values[1:], track.lon.values[1:]]).transpose()) nodes0 = np.radians(np.array([track.lat.values[:-1], track.lon.values[:-1]]).transpose()) dist_since_lf[1:] = dist.pairwise(nodes1, nodes0).diagonal() dist_since_lf[~track.on_land.values] = 0.0 nodes1 = np.array([track.lat.values[sea_land_idx], track.lon.values[sea_land_idx]]).transpose() / 180 * np.pi dist_since_lf[sea_land_idx] = \ dist.pairwise(nodes1, orig_lf / 180 * np.pi).diagonal() for sea_land, land_sea in zip(sea_land_idx, land_sea_idx): dist_since_lf[sea_land:land_sea] = \ np.cumsum(dist_since_lf[sea_land:land_sea]) dist_since_lf *= EARTH_RADIUS_KM dist_since_lf[~track.on_land.values] = np.nan return dist_since_lf def _get_landfall_idx(track, include_starting_landfall=False): """Get the position of the start and end of landfalls for a TC track. Parameters ---------- track : xr.Dataset track (variable 'on_land' must exist, if not present they can be added with climada.hazard.tc_tracks.track_land_params(track, land_geom)) include_starting_landfall : bool If the track starts over land, whether to include the track segment before reaching the ocean as a landfall. Default: False. Returns ------- sea_land_idx : numpy.ndarray of dtype int Indexes of the first point over land for each landfall land_sea_idx : numpy.ndarrat of dtype int Indexes of first point over the ocean after each landfall. If the track ends over land, the last value is set to track.time.size. """ # Index in land that comes from previous sea index sea_land_idx = np.where(np.diff(track.on_land.astype(int)) == 1)[0] + 1 # Index in sea that comes from previous land index land_sea_idx = np.where(np.diff(track.on_land.astype(int)) == -1)[0] + 1 if track.on_land[-1]: # track ends over land: add last track point as the end of that landfall land_sea_idx = np.append(land_sea_idx, track.time.size) if track.on_land[0]: # track starts over land: remove first land-to-sea transition (not a landfall)? if include_starting_landfall: sea_land_idx = np.append(0, sea_land_idx) else: land_sea_idx = land_sea_idx[1:] if land_sea_idx.size != sea_land_idx.size: raise ValueError('Mismatch') return sea_land_idx,land_sea_idx def _estimate_pressure(cen_pres, lat, lon, v_max): """Replace missing pressure values with statistical estimate. In addition to NaNs, negative values and zeros in `cen_pres` are interpreted as missing values. See function `ibtracs_fit_param` for more details about the statistical estimation: >>> ibtracs_fit_param('pres', ['lat', 'lon', 'wind'], year_range=(1980, 2020)) >>> r^2: 0.8726728075520206 Parameters ---------- cen_pres : array-like Central pressure values along track in hPa (mbar). lat : array-like Latitudinal coordinates of eye location. lon : array-like Longitudinal coordinates of eye location. v_max : array-like Maximum wind speed along track in knots. Returns ------- cen_pres_estimated : np.array Estimated central pressure values in hPa (mbar). """ cen_pres = np.where(np.isnan(cen_pres), -1, cen_pres) v_max = np.where(np.isnan(v_max), -1, v_max) lat, lon = [np.where(np.isnan(ar), -999, ar) for ar in [lat, lon]] msk = (cen_pres <= 0) & (v_max > 0) & (lat > -999) & (lon > -999) c_const, c_lat, c_lon, c_vmax = 1026.3401, -0.05504, -0.03536, -0.7357 cen_pres[msk] = c_const + c_lat * lat[msk] \ + c_lon * lon[msk] \ + c_vmax * v_max[msk] return np.where(cen_pres <= 0, np.nan, cen_pres) def _estimate_vmax(v_max, lat, lon, cen_pres): """Replace missing wind speed values with a statistical estimate. In addition to NaNs, negative values and zeros in `v_max` are interpreted as missing values. See function `ibtracs_fit_param` for more details about the statistical estimation: >>> ibtracs_fit_param('wind', ['lat', 'lon', 'pres'], year_range=(1980, 2020)) >>> r^2: 0.8683725434617979 Parameters ---------- v_max : array-like Maximum wind speed along track in knots. lat : array-like Latitudinal coordinates of eye location. lon : array-like Longitudinal coordinates of eye location. cen_pres : array-like Central pressure values along track in hPa (mbar). Returns ------- v_max_estimated : np.array Estimated maximum wind speed values in knots. """ v_max = np.where(np.isnan(v_max), -1, v_max) cen_pres = np.where(np.isnan(cen_pres), -1, cen_pres) lat, lon = [np.where(np.isnan(ar), -999, ar) for ar in [lat, lon]] msk = (v_max <= 0) & (cen_pres > 0) & (lat > -999) & (lon > -999) c_const, c_lat, c_lon, c_pres = 1216.5223, -0.04086, -0.04190, -1.1797 v_max[msk] = c_const + c_lat * lat[msk] \ + c_lon * lon[msk] \ + c_pres * cen_pres[msk] return np.where(v_max <= 0, np.nan, v_max) def estimate_roci(roci, cen_pres): """Replace missing radius (ROCI) values with statistical estimate. In addition to NaNs, negative values and zeros in `roci` are interpreted as missing values. See function `ibtracs_fit_param` for more details about the statistical estimation: >>> ibtracs_fit_param('roci', ['pres'], ... order=[(872, 950, 985, 1005, 1021)], ... year_range=(1980, 2019)) >>> r^2: 0.9148320406675339 Parameters ---------- roci : array-like ROCI values along track in nautical miles (nm). cen_pres : array-like Central pressure values along track in hPa (mbar). Returns ------- roci_estimated : np.array Estimated ROCI values in nautical miles (nm). """ roci = np.where(np.isnan(roci), -1, roci) cen_pres = np.where(np.isnan(cen_pres), -1, cen_pres) msk = (roci <= 0) & (cen_pres > 0) pres_l = [872, 950, 985, 1005, 1021] roci_l = [210.711487, 215.897110, 198.261520, 159.589508, 90.900116] roci[msk] = 0 for i, pres_l_i in enumerate(pres_l): slope_0 = 1. / (pres_l_i - pres_l[i - 1]) if i > 0 else 0 slope_1 = 1. / (pres_l[i + 1] - pres_l_i) if i + 1 < len(pres_l) else 0 roci[msk] += roci_l[i] * np.fmax(0, (1 - slope_0 * np.fmax(0, pres_l_i - cen_pres[msk]) - slope_1 * np.fmax(0, cen_pres[msk] - pres_l_i))) return np.where(roci <= 0, np.nan, roci) def estimate_rmw(rmw, cen_pres): """Replace missing radius (RMW) values with statistical estimate. In addition to NaNs, negative values and zeros in `rmw` are interpreted as missing values. See function `ibtracs_fit_param` for more details about the statistical estimation: >>> ibtracs_fit_param('rmw', ['pres'], order=[(872, 940, 980, 1021)], year_range=(1980, 2019)) >>> r^2: 0.7905970811843872 Parameters ---------- rmw : array-like RMW values along track in nautical miles (nm). cen_pres : array-like Central pressure values along track in hPa (mbar). Returns ------- rmw : np.array Estimated RMW values in nautical miles (nm). """ rmw = np.where(np.isnan(rmw), -1, rmw) cen_pres = np.where(np.isnan(cen_pres), -1, cen_pres) msk = (rmw <= 0) & (cen_pres > 0) pres_l = [872, 940, 980, 1021] rmw_l = [14.907318, 15.726927, 25.742142, 56.856522] rmw[msk] = 0 for i, pres_l_i in enumerate(pres_l): slope_0 = 1. / (pres_l_i - pres_l[i - 1]) if i > 0 else 0 slope_1 = 1. / (pres_l[i + 1] - pres_l_i) if i + 1 < len(pres_l) else 0 rmw[msk] += rmw_l[i] * np.fmax(0, (1 - slope_0 * np.fmax(0, pres_l_i - cen_pres[msk]) - slope_1 * np.fmax(0, cen_pres[msk] - pres_l_i))) return np.where(rmw <= 0, np.nan, rmw) def ibtracs_fit_param(explained, explanatory, year_range=(1980, 2019), order=1): """Statistically fit an ibtracs parameter to other ibtracs variables. A linear ordinary least squares fit is done using the statsmodels package. Parameters ---------- explained : str Name of explained variable. explanatory : iterable Names of explanatory variables. year_range : tuple First and last year to include in the analysis. order : int or tuple The maximal order of the explanatory variables. Returns ------- result : OLSResults """ wmo_vars = ['wind', 'pres', 'rmw', 'roci', 'poci'] all_vars = ['lat', 'lon'] + wmo_vars explanatory = list(explanatory) variables = explanatory + [explained] for var in variables: if var not in all_vars: raise KeyError("Unknown ibtracs variable: %s" % var) # load ibtracs dataset fn_nc = SYSTEM_DIR.joinpath('IBTrACS.ALL.v04r00.nc') ibtracs_ds = xr.open_dataset(fn_nc) # choose specified year range years = ibtracs_ds.sid.str.slice(0, 4).astype(int) match = (years >= year_range[0]) & (years <= year_range[1]) ibtracs_ds = ibtracs_ds.sel(storm=match) if "wind" in variables: for agency in IBTRACS_AGENCIES: scale, shift = IBTRACS_AGENCY_1MIN_WIND_FACTOR[agency] ibtracs_ds[f'{agency}_wind'] -= shift ibtracs_ds[f'{agency}_wind'] /= scale # fill values agency_pref, track_agency_ix = ibtracs_track_agency(ibtracs_ds) for var in wmo_vars: if var not in variables: continue # array of values in order of preference cols = [f'{a}_{var}' for a in agency_pref] cols = [col for col in cols if col in ibtracs_ds.data_vars.keys()] all_vals = ibtracs_ds[cols].to_array(dim='agency') preferred_ix = all_vals.notnull().argmax(dim='agency') if var in ['wind', 'pres']: # choice: wmo -> wmo_agency/usa_agency -> preferred ibtracs_ds[var] = ibtracs_ds['wmo_' + var] \ .fillna(all_vals.isel(agency=track_agency_ix)) \ .fillna(all_vals.isel(agency=preferred_ix)) else: ibtracs_ds[var] = all_vals.isel(agency=preferred_ix) fit_df = pd.DataFrame({var: ibtracs_ds[var].values.ravel() for var in variables}) fit_df = fit_df.dropna(axis=0, how='any').reset_index(drop=True) if 'lat' in explanatory: fit_df['lat'] = fit_df['lat'].abs() # prepare explanatory variables d_explanatory = fit_df[explanatory] if isinstance(order, int): order = (order,) * len(explanatory) add_const = False for ex, max_o in zip(explanatory, order): if isinstance(max_o, tuple): if fit_df[ex].min() > max_o[0]: print(f"Minimum data value is {fit_df[ex].min()} > {max_o[0]}.") if fit_df[ex].max() < max_o[-1]: print(f"Maximum data value is {fit_df[ex].max()} < {max_o[-1]}.") # piecewise linear with given break points d_explanatory = d_explanatory.drop(labels=[ex], axis=1) for i, max_o_i in enumerate(max_o): col = f'{ex}{max_o_i}' slope_0 = 1. / (max_o_i - max_o[i - 1]) if i > 0 else 0 slope_1 = 1. / (max_o[i + 1] - max_o_i) if i + 1 < len(max_o) else 0 d_explanatory[col] = np.fmax(0, (1 - slope_0 * np.fmax(0, max_o_i - fit_df[ex]) - slope_1 * np.fmax(0, fit_df[ex] - max_o_i))) elif max_o < 0: d_explanatory = d_explanatory.drop(labels=[ex], axis=1) for order in range(1, abs(max_o) + 1): d_explanatory[f'{ex}^{-order}'] = fit_df[ex]**(-order) add_const = True else: for order in range(2, max_o + 1): d_explanatory[f'{ex}^{order}'] = fit_df[ex]**order add_const = True d_explained = fit_df[[explained]] if add_const: d_explanatory['const'] = 1.0 # run statistical fit sm_results = sm.OLS(d_explained, d_explanatory).fit() # print results print(sm_results.params) print("r^2:", sm_results.rsquared) return sm_results def ibtracs_track_agency(ds_sel): """Get preferred IBTrACS agency for each entry in the dataset. Parameters ---------- ds_sel : xarray.Dataset Subselection of original IBTrACS NetCDF dataset. Returns ------- agency_pref : list of str Names of IBTrACS agencies in order of preference. track_agency_ix : xarray.DataArray of ints For each entry in `ds_sel`, the agency to use, given as an index into `agency_pref`. """ agency_pref = ["wmo"] + IBTRACS_AGENCIES agency_map = {a.encode('utf-8'): i for i, a in enumerate(agency_pref)} agency_map.update({ a.encode('utf-8'): agency_map[b'usa'] for a in IBTRACS_USA_AGENCIES }) agency_map[b''] = agency_map[b'wmo'] agency_fun = lambda x: agency_map[x] if "track_agency" not in ds_sel.data_vars.keys(): ds_sel['track_agency'] = ds_sel.wmo_agency.where(ds_sel.wmo_agency != b'', ds_sel.usa_agency) track_agency_ix = xr.apply_ufunc(agency_fun, ds_sel.track_agency, vectorize=True) return agency_pref, track_agency_ix def ibtracs_add_official_variable(ibtracs_ds, tc_var, add_3h=False): """Add variables for the officially responsible agencies to an IBTrACS dataset This function adds new variables to the xarray.Dataset `ibtracs_ds` that contain values of the specified TC variable `var` that have been reported by the officially responsible agencies. For example, if `tc_var` is "wind", there will be a new variable "official_wind" and, if `add_3h` is True, an additional variable "official_3h_wind". Parameters ---------- ibtracs_ds : xarray.Dataset Subselection of original IBTrACS NetCDF dataset. tc_var : str Name of variable for which to add an "official" version, e.g. "lat", "wind", "pres". add_3h : bool, optional Optionally, add an "official_3h" version where also 3-hourly data by the officially reporting agencies is included (if available). Default: False """ if "nan_var" not in ibtracs_ds.data_vars.keys(): # add an array full of NaN as a fallback value in the procedure ibtracs_ds['nan_var'] = xr.full_like(ibtracs_ds.lat, np.nan) # determine which of the official agencies report this variable at all available_agencies = [a for a in IBTRACS_AGENCIES if f'{a}_{tc_var}' in ibtracs_ds.data_vars.keys()] # map all non-reporting agency variables to the 'nan_var' (0) agency_map = { a.encode("utf-8"): available_agencies.index(a) + 1 if a in available_agencies else 0 for a in [''] + IBTRACS_AGENCIES } agency_map.update({ a.encode('utf-8'): agency_map[b'usa'] for a in IBTRACS_USA_AGENCIES }) # read from officially responsible agencies that report this variable, but only # at official reporting times (usually 6-hourly) official_agency_ix = xr.apply_ufunc( lambda x: agency_map[x], ibtracs_ds.wmo_agency, vectorize=True) available_cols = ['nan_var'] + [f'{a}_{tc_var}' for a in available_agencies] all_vals = ibtracs_ds[available_cols].to_array(dim='agency') ibtracs_ds[f'official_{tc_var}'] = all_vals.isel(agency=official_agency_ix) if add_3h: # create a copy in float for NaN interpolation official_agency_ix_interp = official_agency_ix.astype(np.float16) # extrapolate track agency for tracks with only a single record mask_singular = ((official_agency_ix_interp > 0).sum(dim="date_time") == 1).values official_agency_ix_interp.values[mask_singular,:] = \ official_agency_ix_interp.sel(storm=mask_singular).max(dim="date_time").values[:,None] with warnings.catch_warnings(): # See https://github.com/pydata/xarray/issues/4167 warnings.simplefilter(action="ignore", category=FutureWarning) # interpolate responsible agencies using nearest neighbor interpolation official_agency_ix_interp.values[official_agency_ix_interp.values == 0.0] = np.nan official_agency_ix_interp = official_agency_ix_interp.interpolate_na( dim="date_time", method="nearest", fill_value="extrapolate") # read from officially responsible agencies that report this variable, including # 3-hour time steps if available official_agency_ix_interp.values[official_agency_ix_interp.isnull().values] = 0.0 ibtracs_ds[f'official_3h_{tc_var}'] = all_vals.isel( agency=official_agency_ix_interp.astype(int)) def _change_max_wind_unit(wind, unit_orig, unit_dest): """Compute maximum wind speed in unit_dest. Parameters ---------- wind : np.array Wind speed values in original units. unit_orig : str Original units of wind speed. unit_dest : str New units of the computed maximum wind speed. Returns ------- maxwind : double Maximum wind speed in specified wind speed units. """ if unit_orig in ('kn', 'kt'): ur_orig = ureg.knot elif unit_orig == 'mph': ur_orig = ureg.mile / ureg.hour elif unit_orig == 'm/s': ur_orig = ureg.meter / ureg.second elif unit_orig == 'km/h': ur_orig = ureg.kilometer / ureg.hour else: raise ValueError('Unit not recognised %s.' % unit_orig) if unit_dest in ('kn', 'kt'): ur_dest = ureg.knot elif unit_dest == 'mph': ur_dest = ureg.mile / ureg.hour elif unit_dest == 'm/s': ur_dest = ureg.meter / ureg.second elif unit_dest == 'km/h': ur_dest = ureg.kilometer / ureg.hour else: raise ValueError('Unit not recognised %s.' % unit_dest) return (np.nanmax(wind) * ur_orig).to(ur_dest).magnitude
[docs] def set_category(max_sus_wind, wind_unit='kn', saffir_scale=None): """Add storm category according to Saffir-Simpson hurricane scale. Parameters ---------- max_sus_wind : np.array Maximum sustained wind speed records for a single track. wind_unit : str, optional Units of wind speed. Default: 'kn'. saffir_scale : list, optional Saffir-Simpson scale in same units as wind (default scale valid for knots). Returns ------- category : int Intensity of given track according to the Saffir-Simpson hurricane scale: * -1 : tropical depression * 0 : tropical storm * 1 : Hurricane category 1 * 2 : Hurricane category 2 * 3 : Hurricane category 3 * 4 : Hurricane category 4 * 5 : Hurricane category 5 """ if saffir_scale is None: saffir_scale = SAFFIR_SIM_CAT if wind_unit != 'kn': max_sus_wind = _change_max_wind_unit(max_sus_wind, wind_unit, 'kn') max_wind = np.nanmax(max_sus_wind) try: return (np.argwhere(max_wind < saffir_scale) - 1)[0][0] except IndexError: return -1