Source code for climada.hazard.tc_tracks

"""
This file is part of CLIMADA.

Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.

CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.

CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.  See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.

---

Define TCTracks: IBTracs reader and tracks manager.
"""

__all__ = ['CAT_NAMES', 'SAFFIR_SIM_CAT', 'TCTracks', 'set_category']

# standard libraries
import datetime as dt
import itertools
import logging
import shutil
import warnings
from pathlib import Path

# additional libraries
import cartopy.crs as ccrs
import cftime
import geopandas as gpd
import matplotlib.cm as cm_mp
from matplotlib.collections import LineCollection
from matplotlib.colors import BoundaryNorm, ListedColormap
from matplotlib.lines import Line2D
import netCDF4 as nc
from numba import jit
import numpy as np
import pandas as pd
import scipy.io.matlab as matlab
from shapely.geometry import Point, LineString, MultiLineString
import shapely.ops
from sklearn.neighbors import DistanceMetric
import statsmodels.api as sm
import xarray as xr

# climada dependencies
from climada.util import ureg
import climada.util.coordinates as u_coord
from climada.util.constants import EARTH_RADIUS_KM, SYSTEM_DIR, DEF_CRS
from climada.util.files_handler import get_file_names, download_ftp
import climada.util.plot as u_plot
import climada.hazard.tc_tracks_synth

LOGGER = logging.getLogger(__name__)

SAFFIR_SIM_CAT = [34, 64, 83, 96, 113, 137, 1000]
"""Saffir-Simpson Hurricane Wind Scale in kn based on NOAA"""

CAT_NAMES = {
    -1: 'Tropical Depression',
    0: 'Tropical Storm',
    1: 'Hurricane Cat. 1',
    2: 'Hurricane Cat. 2',
    3: 'Hurricane Cat. 3',
    4: 'Hurricane Cat. 4',
    5: 'Hurricane Cat. 5',
}
"""Saffir-Simpson category names."""

CAT_COLORS = cm_mp.rainbow(np.linspace(0, 1, len(SAFFIR_SIM_CAT)))
"""Color scale to plot the Saffir-Simpson scale."""

IBTRACS_URL = ('https://www.ncei.noaa.gov/data/'
               'international-best-track-archive-for-climate-stewardship-ibtracs/'
               'v04r00/access/netcdf')
"""Site of IBTrACS netcdf file containing all tracks v4.0,
s. https://www.ncdc.noaa.gov/ibtracs/index.php?name=ib-v4-access"""

IBTRACS_FILE = 'IBTrACS.ALL.v04r00.nc'
"""IBTrACS v4.0 file all"""

IBTRACS_AGENCIES = [
    'usa', 'tokyo', 'newdelhi', 'reunion', 'bom', 'nadi', 'wellington',
    'cma', 'hko', 'ds824', 'td9636', 'td9635', 'neumann', 'mlc',
]
"""Names/IDs of agencies in IBTrACS v4.0"""

IBTRACS_USA_AGENCIES = [
    'atcf', 'cphc', 'hurdat_atl', 'hurdat_epa', 'jtwc_cp', 'jtwc_ep', 'jtwc_io',
    'jtwc_sh', 'jtwc_wp', 'nhc_working_bt', 'tcvightals', 'tcvitals'
]
"""Names/IDs of agencies in IBTrACS that correspond to 'usa_*' variables"""


IBTRACS_AGENCY_1MIN_WIND_FACTOR = {
    "usa": [1.0, 0.0],
    "tokyo": [0.60, 23.3],
    "newdelhi": [1.0, 0.0],
    "reunion": [0.88, 0.0],
    "bom": [0.88, 0.0],
    "nadi": [0.88, 0.0],
    "wellington": [0.88, 0.0],
    'cma': [0.871, 0.0],
    'hko': [0.9, 0.0],
    'ds824': [1.0, 0.0],
    'td9636': [1.0, 0.0],
    'td9635': [1.0, 0.0],
    'neumann': [0.88, 0.0],
    'mlc': [1.0, 0.0],
}
"""Scale and shift used by agencies to convert their internal Dvorak 1-minute sustained winds to
the officially reported values that are in IBTrACS. From Table 1 in:

Knapp, K.R., Kruk, M.C. (2010): Quantifying Interagency Differences in Tropical Cyclone Best-Track
Wind Speed Estimates. Monthly Weather Review 138(4): 1459–1473.
https://library.wmo.int/index.php?lvl=notice_display&id=135"""

DEF_ENV_PRESSURE = 1010
"""Default environmental pressure"""

BASIN_ENV_PRESSURE = {
    '': DEF_ENV_PRESSURE,
    'EP': 1010, 'NA': 1010, 'SA': 1010,
    'NI': 1005, 'SI': 1005, 'WP': 1005,
    'SP': 1004,
}
"""Basin-specific default environmental pressure"""

EMANUEL_RMW_CORR_FILES = [
    'temp_ccsm420thcal.mat', 'temp_ccsm4rcp85_full.mat',
    'temp_gfdl520thcal.mat', 'temp_gfdl5rcp85cal_full.mat',
    'temp_hadgem20thcal.mat', 'temp_hadgemrcp85cal_full.mat',
    'temp_miroc20thcal.mat', 'temp_mirocrcp85cal_full.mat',
    'temp_mpi20thcal.mat', 'temp_mpircp85cal_full.mat',
    'temp_mri20thcal.mat', 'temp_mrircp85cal_full.mat',
]
EMANUEL_RMW_CORR_FACTOR = 2.0
"""Kerry Emanuel track files in this list require a correction: The radius of
    maximum wind (rmstore) needs to be multiplied by factor 2."""

[docs]class TCTracks(): """Contains tropical cyclone tracks. Attributes ---------- data : list(xarray.Dataset) List of tropical cyclone tracks. Each track contains following attributes: - time (coords) - lat (coords) - lon (coords) - time_step (in hours) - radius_max_wind (in nautical miles) - max_sustained_wind - central_pressure - environmental_pressure - max_sustained_wind_unit (attrs) - central_pressure_unit (attrs) - name (attrs) - sid (attrs) - orig_event_flag (attrs) - data_provider (attrs) - basin (attrs) - id_no (attrs) - category (attrs) Computed during processing: - on_land - dist_since_lf """
[docs] def __init__(self, pool=None): """Empty constructor. Read csv IBTrACS files if provided.""" self.data = list() if pool: self.pool = pool LOGGER.debug('Using %s CPUs.', self.pool.ncpus) else: self.pool = None
[docs] def append(self, tracks): """Append tracks to current. Parameters ---------- tracks : xarray.Dataset or list(xarray.Dataset) tracks to append. """ if not isinstance(tracks, list): tracks = [tracks] self.data.extend(tracks)
[docs] def get_track(self, track_name=None): """Get track with provided name. Returns the first matching track based on the assumption that no other track with the same name or sid exists in the set. Parameters ---------- track_name : str, optional Name or sid (ibtracsID for IBTrACS) of track. If None (default), return all tracks. Returns ------- result : xarray.Dataset or list of xarray.Dataset Usually, a single track is returned. If no track with the specified name is found, an empty list `[]` is returned. If called with `track_name=None`, the list of all tracks is returned. """ if track_name is None: if len(self.data) == 1: return self.data[0] return self.data for track in self.data: if track.name == track_name: return track if hasattr(track, 'sid') and track.sid == track_name: return track LOGGER.info('No track with name or sid %s found.', track_name) return []
[docs] def subset(self, filterdict): """Subset tracks based on track attributes. Select all tracks matching exactly the given attribute values. Parameters ---------- filterdict : dict or OrderedDict Keys are attribute names, values are the corresponding attribute values to match. In case of an ordered dict, the filters are applied in the given order. Returns ------- tc_tracks : TCTracks A new instance of TCTracks containing only the matching tracks. """ out = self.__class__(self.pool) out.data = self.data for key, pattern in filterdict.items(): out.data = [ds for ds in out.data if ds.attrs[key] == pattern] return out
[docs] def tracks_in_exp(self, exposure, buffer=1.0): """Select only the tracks that are in the vicinity (buffer) of an exposure. Each exposure point/geometry is extended to a disc of radius `buffer`. Each track is converted to a line and extended by a radius `buffer`. Parameters ---------- exposure : Exposure Exposure used to select tracks. buffer : float, optional Size of buffer around exposure geometries (in the units of `exposure.crs`), see `geopandas.distance`. Default: 1.0 Returns ------- filtered_tracks : climada.hazard.TCTracks() TCTracks object with tracks from tc_tracks intersecting the exposure whitin a buffer distance. """ if buffer <= 0.0: raise ValueError(f"buffer={buffer} is invalid, must be above zero.") try: exposure.geometry except AttributeError: exposure.set_geometry_points() exp_buffer = exposure.buffer(distance=buffer, resolution=0) exp_buffer = exp_buffer.unary_union tc_tracks_lines = self.to_geodataframe().buffer(distance=buffer) select_tracks = tc_tracks_lines.intersects(exp_buffer) tracks_in_exp = [track for j, track in enumerate(self.data) if select_tracks[j]] filtered_tracks= TCTracks() filtered_tracks.append(tracks_in_exp) return filtered_tracks
[docs] def read_ibtracs_netcdf(self, provider=None, rescale_windspeeds=True, storm_id=None, year_range=None, basin=None, interpolate_missing=True, estimate_missing=False, correct_pres=False, file_name='IBTrACS.ALL.v04r00.nc'): """Read track data from IBTrACS databse. When using data from IBTrACS, make sure to be familiar with the scope and limitations of IBTrACS, e.g. by reading the official documentation (https://www.ncdc.noaa.gov/ibtracs/pdf/IBTrACS_version4_Technical_Details.pdf). Reading the CLIMADA documentation can't replace a thorough understanding of the underlying data. This function only provides a (hopefully useful) interface for the data input, but cannot provide any guidance or make recommendations about if and how to use IBTrACS data for your particular project. Resulting tracks are required to have both pressure and wind speed information at all time steps. Therefore, all track positions where one of wind speed or pressure are missing are discarded unless one of `interpolate_missing` or `estimate_missing` are active. Some corrections are automatically applied, such as: `environmental_pressure` is enforced to be larger than `central_pressure`. Note that the tracks returned by this function might contain irregular time steps since that is often the case for the original IBTrACS records. Apply the `equal_timestep` function afterwards to enforce regular time steps. Parameters ---------- provider : str or list of str, optional Either specify an agency, such as "usa", "newdelhi", "bom", "cma", "tokyo", or the special values "official" and "official_3h": * "official" means using the (usually 6-hourly) officially reported values of the officially responsible agencies. * "official_3h" means to include (inofficial) 3-hourly data of the officially responsible agencies (whenever available). If you want to restrict to the officially reported values by the officially responsible agencies (`provider="official"`) without any modifications to the original official data, make sure to also set `estimate_missing=False` and `interpolate_missing=False`. Otherwise, gaps in the official reporting will be filled using interpolation and/or statistical estimation procedures (see below). If a list is given, the following logic is applied: For each storm, the variables that are not reported by the first agency for this storm are taken from the next agency in the list that did report this variable for this storm. For different storms, the same variable might be taken from different agencies. Default: ['official_3h', 'usa', 'tokyo', 'newdelhi', 'reunion', 'bom', 'nadi', 'wellington', 'cma', 'hko', 'ds824', 'td9636', 'td9635', 'neumann', 'mlc'] rescale_windspeeds : bool, optional If True, all wind speeds are linearly rescaled to 1-minute sustained winds. Note however that the IBTrACS documentation (Section 5.2, https://www.ncdc.noaa.gov/ibtracs/pdf/IBTrACS_version4_Technical_Details.pdf) includes a warning about this kind of conversion: "While a multiplicative factor can describe the numerical differences, there are procedural and observational differences between agencies that can change through time, which confounds the simple multiplicative factor." Default: True storm_id : str or list of str, optional IBTrACS ID of the storm, e.g. 1988234N13299, [1988234N13299, 1989260N11316]. year_range : tuple (min_year, max_year), optional Year range to filter track selection. Default: (1980, 2018) basin : str, optional E.g. US, SA, NI, SI, SP, WP, EP, NA. If not provided, consider all basins. interpolate_missing : bool, optional If True, interpolate temporal reporting gaps within a variable (such as pressure, wind speed, or radius) linearly if possible. Temporal interpolation is with respect to the time steps defined in IBTrACS for a particular storm. No new time steps are added that are not originally defined in IBTrACS. For each time step with a missing value, this procedure is only able to fill in that value if there are other time steps before and after this time step for which values have been reported. This procedure will be applied before the statistical estimations referred to by `estimate_missing`. It is applied to all variables (eye position, wind speed, environmental and central pressure, storm radius and radius of maximum winds). Default: True estimate_missing : bool, optional For each fixed time step, estimate missing pressure, wind speed and radius using other variables that are available at that time step. The relationships between the variables are purely statistical. In comparison to `interpolate_missing`, this procedure is able to estimate values for variables that haven't been reported by any agency at any time step, as long as other variables are available. A typical example are storms before 1950, for which there are often no reported values for pressure, but for wind speed. In this case, a rough statistical pressure-wind relationship is applied to estimate the missing pressure values from the available wind-speed values. Make sure to set `rescale_windspeeds=True` when using this option because the statistical relationships are calibrated using rescaled wind speeds. Default: False correct_pres : bool, optional For backwards compatibility, alias for `estimate_missing`. This is deprecated, use `estimate_missing` instead! file_name : str, optional Name of NetCDF file to be dowloaded or located at climada/data/system. Default: 'IBTrACS.ALL.v04r00.nc' """ if correct_pres: LOGGER.warning("`correct_pres` is deprecated. " "Use `estimate_missing` instead.") estimate_missing = True if estimate_missing and not rescale_windspeeds: LOGGER.warning( "Using `estimate_missing` without `rescale_windspeeds` is strongly discouraged!") self.data = list() fn_nc = SYSTEM_DIR.joinpath(file_name) if not fn_nc.is_file(): try: download_ftp(f'{IBTRACS_URL}/{IBTRACS_FILE}', IBTRACS_FILE) shutil.move(IBTRACS_FILE, fn_nc) except ValueError as err: LOGGER.error('Error while downloading %s. Try to download it ' 'manually and put the file in ' 'climada_python/data/system/', IBTRACS_URL) raise err ibtracs_ds = xr.open_dataset(fn_nc) match = np.ones(ibtracs_ds.sid.shape[0], dtype=bool) if storm_id: if not isinstance(storm_id, list): storm_id = [storm_id] match &= ibtracs_ds.sid.isin([i.encode() for i in storm_id]) if np.count_nonzero(match) == 0: LOGGER.info('No tracks with given IDs %s.', storm_id) else: year_range = year_range if year_range else (1980, 2018) if year_range: years = ibtracs_ds.sid.str.slice(0, 4).astype(int) match &= (years >= year_range[0]) & (years <= year_range[1]) if np.count_nonzero(match) == 0: LOGGER.info('No tracks in time range (%s, %s).', *year_range) if basin: match &= (ibtracs_ds.basin == basin.encode()).any(dim='date_time') if np.count_nonzero(match) == 0: LOGGER.info('No tracks in basin %s.', basin) if np.count_nonzero(match) == 0: LOGGER.info('There are no tracks matching the specified requirements.') self.data = [] return ibtracs_ds = ibtracs_ds.sel(storm=match) ibtracs_ds['valid_t'] = ibtracs_ds.time.notnull() valid_storms_mask = ibtracs_ds.valid_t.any(dim="date_time") invalid_storms_idx = np.nonzero(~valid_storms_mask.data)[0] if invalid_storms_idx.size > 0: invalid_sids = ', '.join(ibtracs_ds.sid.sel(storm=invalid_storms_idx).astype(str).data) LOGGER.warning('No valid timestamps found for %s.', invalid_sids) ibtracs_ds = ibtracs_ds.sel(storm=valid_storms_mask) if rescale_windspeeds: for agency in IBTRACS_AGENCIES: scale, shift = IBTRACS_AGENCY_1MIN_WIND_FACTOR[agency] ibtracs_ds[f'{agency}_wind'] -= shift ibtracs_ds[f'{agency}_wind'] /= scale if provider is None: provider = ["official_3h"] + IBTRACS_AGENCIES elif isinstance(provider, str): provider = [provider] for tc_var in ['lat', 'lon', 'wind', 'pres', 'rmw', 'poci', 'roci']: if "official" in provider or "official_3h" in provider: ibtracs_add_official_variable( ibtracs_ds, tc_var, add_3h=("official_3h" in provider)) # set up dimension of agency-reported values in order of preference, including the # newly created `official` and `official_3h` data if specified ag_vars = [f'{ag}_{tc_var}' for ag in provider] ag_vars = [ag_var for ag_var in ag_vars if ag_var in ibtracs_ds.data_vars.keys()] all_vals = ibtracs_ds[ag_vars].to_array(dim='agency') # argmax returns the first True (i.e. valid) along the 'agency' dimension preferred_idx = all_vals.notnull().any(dim="date_time").argmax(dim='agency') ibtracs_ds[tc_var] = all_vals.isel(agency=preferred_idx) # Usually, if an agency reports about a track that crosses the antimeridian, the # longitude is always chosen positive. However, it can happen, that the TC crosses the # antimeridian according to one agency, but not according to another. When mixing # agency data, this can yield inconsistent sign changes in longitude. We remove those: if tc_var == 'lon': # By IBTrACS default, no longitude should be <= -180, but this is not true for some # agencies, so we have to manually enforce this policy: ibtracs_ds[tc_var].values[(ibtracs_ds[tc_var] <= -180).values] += 360 crossing_mask = ((ibtracs_ds[tc_var] > 170).any(dim="date_time") & (ibtracs_ds[tc_var] < -170).any(dim="date_time") & (ibtracs_ds[tc_var] < 0)).values ibtracs_ds[tc_var].values[crossing_mask] += 360 if interpolate_missing: with warnings.catch_warnings(): # Upstream issue, see https://github.com/pydata/xarray/issues/4167 warnings.simplefilter(action="ignore", category=FutureWarning) # don't interpolate if there is only a single record for this variable nonsingular_mask = ( ibtracs_ds[tc_var].notnull().sum(dim="date_time") > 1).values if nonsingular_mask.sum() > 0: ibtracs_ds[tc_var].values[nonsingular_mask] = ( ibtracs_ds[tc_var].sel(storm=nonsingular_mask).interpolate_na( dim="date_time", method="linear")) ibtracs_ds = ibtracs_ds[['sid', 'name', 'basin', 'lat', 'lon', 'time', 'valid_t', 'wind', 'pres', 'rmw', 'roci', 'poci']] if estimate_missing: ibtracs_ds['pres'][:] = _estimate_pressure( ibtracs_ds.pres, ibtracs_ds.lat, ibtracs_ds.lon, ibtracs_ds.wind) ibtracs_ds['wind'][:] = _estimate_vmax( ibtracs_ds.wind, ibtracs_ds.lat, ibtracs_ds.lon, ibtracs_ds.pres) ibtracs_ds['valid_t'] &= (ibtracs_ds.lat.notnull() & ibtracs_ds.lon.notnull() & ibtracs_ds.wind.notnull() & ibtracs_ds.pres.notnull()) valid_storms_mask = ibtracs_ds.valid_t.any(dim="date_time") invalid_storms_idx = np.nonzero(~valid_storms_mask.data)[0] if invalid_storms_idx.size > 0: invalid_sids = list(ibtracs_ds.sid.sel(storm=invalid_storms_idx).astype(str).data) LOGGER.warning('%d storm events are discarded because no valid wind/pressure values ' 'have been found: %s%s', len(invalid_sids), ", ".join(invalid_sids[:5]), ", ..." if len(invalid_sids) > 5 else ".") ibtracs_ds = ibtracs_ds.sel(storm=valid_storms_mask) max_wind = ibtracs_ds.wind.max(dim="date_time").data.ravel() category_test = (max_wind[:, None] < np.array(SAFFIR_SIM_CAT)[None]) category = np.argmax(category_test, axis=1) - 1 basin_map = {b.encode("utf-8"): v for b, v in BASIN_ENV_PRESSURE.items()} basin_fun = lambda b: basin_map[b] ibtracs_ds['id_no'] = (ibtracs_ds.sid.str.replace(b'N', b'0') .str.replace(b'S', b'1') .astype(float)) provider_str = f"ibtracs_{provider[0]}" + ("" if len(provider) == 1 else "_mixed") last_perc = 0 all_tracks = [] for i_track, t_msk in enumerate(ibtracs_ds.valid_t.data): perc = 100 * len(all_tracks) / ibtracs_ds.sid.size if perc - last_perc >= 10: LOGGER.info("Progress: %d%%", perc) last_perc = perc track_ds = ibtracs_ds.sel(storm=i_track, date_time=t_msk) st_penv = xr.apply_ufunc(basin_fun, track_ds.basin, vectorize=True) # a track that crosses the antimeridian in IBTrACS might be truncated by `t_msk` in # such a way that the remaining part is not crossing the antimeridian: if (track_ds.lon.values > 180).all(): track_ds['lon'] -= 360 # set time_step in hours track_ds['time_step'] = xr.ones_like(track_ds.time, dtype=float) if track_ds.time.size > 1: track_ds.time_step.values[1:] = (track_ds.time.diff(dim="date_time") / np.timedelta64(1, 'h')) track_ds.time_step.values[0] = track_ds.time_step[1] with warnings.catch_warnings(): # See https://github.com/pydata/xarray/issues/4167 warnings.simplefilter(action="ignore", category=FutureWarning) track_ds['rmw'] = track_ds.rmw \ .ffill(dim='date_time', limit=1) \ .bfill(dim='date_time', limit=1) \ .fillna(0) track_ds['roci'] = track_ds.roci \ .ffill(dim='date_time', limit=1) \ .bfill(dim='date_time', limit=1) \ .fillna(0) track_ds['poci'] = track_ds.poci \ .ffill(dim='date_time', limit=4) \ .bfill(dim='date_time', limit=4) # this is the most time consuming line in the processing: track_ds['poci'] = track_ds.poci.fillna(st_penv) if estimate_missing: track_ds['rmw'][:] = estimate_rmw(track_ds.rmw.values, track_ds.pres.values) track_ds['roci'][:] = estimate_roci(track_ds.roci.values, track_ds.pres.values) track_ds['roci'][:] = np.fmax(track_ds.rmw.values, track_ds.roci.values) # ensure environmental pressure >= central pressure # this is the second most time consuming line in the processing: track_ds['poci'][:] = np.fmax(track_ds.poci, track_ds.pres) all_tracks.append(xr.Dataset({ 'time_step': ('time', track_ds.time_step.data), 'radius_max_wind': ('time', track_ds.rmw.data), 'radius_oci': ('time', track_ds.roci.data), 'max_sustained_wind': ('time', track_ds.wind.data), 'central_pressure': ('time', track_ds.pres.data), 'environmental_pressure': ('time', track_ds.poci.data), }, coords={ 'time': track_ds.time.dt.round('s').data, 'lat': ('time', track_ds.lat.data), 'lon': ('time', track_ds.lon.data), }, attrs={ 'max_sustained_wind_unit': 'kn', 'central_pressure_unit': 'mb', 'name': track_ds.name.astype(str).item(), 'sid': track_ds.sid.astype(str).item(), 'orig_event_flag': True, 'data_provider': provider_str, 'basin': track_ds.basin.values[0].astype(str).item(), 'id_no': track_ds.id_no.item(), 'category': category[i_track], })) if last_perc != 100: LOGGER.info("Progress: 100%") self.data = all_tracks
[docs] def read_processed_ibtracs_csv(self, file_names): """Fill from processed ibtracs csv file(s). Parameters ---------- file_names : str or list of str Absolute file name(s) or folder name containing the files to read. """ self.data = list() all_file = get_file_names(file_names) for file in all_file: self._read_ibtracs_csv_single(file)
[docs] def read_simulations_emanuel(self, file_names, hemisphere='S'): """Fill from Kerry Emanuel tracks. Parameters ---------- file_names : str or list of str Absolute file name(s) or folder name containing the files to read. hemisphere : str, optional 'S', 'N' or 'both'. Default: 'S' """ self.data = [] for path in get_file_names(file_names): rmw_corr = Path(path).name in EMANUEL_RMW_CORR_FILES self._read_file_emanuel(path, hemisphere=hemisphere, rmw_corr=rmw_corr)
def _read_file_emanuel(self, path, hemisphere='S', rmw_corr=False): """Append tracks from file containing Kerry Emanuel simulations. Parameters ---------- path : str absolute path of file to read. hemisphere : str, optional 'S', 'N' or 'both'. Default: 'S' rmw_corr : str, optional If True, multiply the radius of maximum wind by factor 2. Default: False. """ if hemisphere == 'S': hem_min, hem_max = -90, 0 elif hemisphere == 'N': hem_min, hem_max = 0, 90 else: hem_min, hem_max = -90, 90 LOGGER.info('Reading %s.', path) data_mat = matlab.loadmat(path) lat = data_mat['latstore'] ntracks, nnodes = lat.shape years_uniq = np.unique(data_mat['yearstore']) LOGGER.info("File contains %s tracks (at most %s nodes each), " "representing %s years (%s-%s).", ntracks, nnodes, years_uniq.size, years_uniq[0], years_uniq[-1]) # filter according to chosen hemisphere hem_mask = (lat >= hem_min) & (lat <= hem_max) | (lat == 0) hem_idx = np.all(hem_mask, axis=1).nonzero()[0] data_hem = lambda keys: [data_mat[f'{k}store'][hem_idx] for k in keys] lat, lon = data_hem(['lat', 'long']) months, days, hours = data_hem(['month', 'day', 'hour']) months, days, hours = [np.int8(ar) for ar in [months, days, hours]] tc_rmw, tc_maxwind, tc_pressure = data_hem(['rm', 'v', 'p']) years = data_mat['yearstore'][0, hem_idx] ntracks, nnodes = lat.shape LOGGER.info("Loading %s tracks on %s hemisphere.", ntracks, hemisphere) # change lon format to -180 to 180 lon[lon > 180] = lon[lon > 180] - 360 # change units from kilometers to nautical miles tc_rmw = (tc_rmw * ureg.kilometer).to(ureg.nautical_mile).magnitude if rmw_corr: LOGGER.info("Applying RMW correction.") tc_rmw *= EMANUEL_RMW_CORR_FACTOR for i_track in range(lat.shape[0]): valid_idx = (lat[i_track, :] != 0).nonzero()[0] nnodes = valid_idx.size time_step = np.abs(np.diff(hours[i_track, valid_idx])).min() # deal with change of year year = np.full(valid_idx.size, years[i_track]) year_change = (np.diff(months[i_track, valid_idx]) < 0) year_change = year_change.nonzero()[0] if year_change.size > 0: year[year_change[0] + 1:] += 1 try: datetimes = map(dt.datetime, year, months[i_track, valid_idx], days[i_track, valid_idx], hours[i_track, valid_idx]) datetimes = list(datetimes) except ValueError as err: # dates are known to contain invalid February 30 date_feb = (months[i_track, valid_idx] == 2) \ & (days[i_track, valid_idx] > 28) if np.count_nonzero(date_feb) == 0: # unknown invalid date issue raise err step = time_step if not date_feb[0] else -time_step reference_idx = 0 if not date_feb[0] else -1 reference_date = dt.datetime( year[reference_idx], months[i_track, valid_idx[reference_idx]], days[i_track, valid_idx[reference_idx]], hours[i_track, valid_idx[reference_idx]],) datetimes = [reference_date + dt.timedelta(hours=int(step * i)) for i in range(nnodes)] datetimes = [cftime.DatetimeProlepticGregorian(d.year, d.month, d.day, d.hour) for d in datetimes] max_sustained_wind = tc_maxwind[i_track, valid_idx] max_sustained_wind_unit = 'kn' env_pressure = np.full(nnodes, DEF_ENV_PRESSURE) category = set_category(max_sustained_wind, max_sustained_wind_unit, SAFFIR_SIM_CAT) tr_ds = xr.Dataset({ 'time_step': ('time', np.full(nnodes, time_step)), 'radius_max_wind': ('time', tc_rmw[i_track, valid_idx]), 'max_sustained_wind': ('time', max_sustained_wind), 'central_pressure': ('time', tc_pressure[i_track, valid_idx]), 'environmental_pressure': ('time', env_pressure), }, coords={ 'time': datetimes, 'lat': ('time', lat[i_track, valid_idx]), 'lon': ('time', lon[i_track, valid_idx]), }, attrs={ 'max_sustained_wind_unit': max_sustained_wind_unit, 'central_pressure_unit': 'mb', 'name': str(hem_idx[i_track]), 'sid': str(hem_idx[i_track]), 'orig_event_flag': True, 'data_provider': 'Emanuel', 'basin': hemisphere, 'id_no': hem_idx[i_track], 'category': category, }) self.data.append(tr_ds)
[docs] def read_one_gettelman(self, nc_data, i_track): """Fill from Andrew Gettelman tracks. Parameters ---------- nc_data : str netCDF4.Dataset Objekt i_tracks : int track number """ scale_to_10m = (10. / 60.)**.11 mps2kts = 1.94384 basin_dict = {0: 'NA - North Atlantic', 1: 'SA - South Atlantic', 2: 'WP - West Pacific', 3: 'EP - East Pacific', 4: 'SP - South Pacific', 5: 'NI - North Indian', 6: 'SI - South Indian', 7: 'AS - Arabian Sea', 8: 'BB - Bay of Bengal', 9: 'EA - Eastern Australia', 10: 'WA - Western Australia', 11: 'CP - Central Pacific', 12: 'CS - Carribbean Sea', 13: 'GM - Gulf of Mexico', 14: 'MM - Missing'} val_len = nc_data.variables['numObs'][i_track] sid = str(i_track) times = nc_data.variables['source_time'][i_track, :][:val_len] datetimes = list() for time in times: try: datetimes.append( dt.datetime.strptime( str(nc.num2date(time, 'days since {}'.format('1858-11-17'), calendar='standard')), '%Y-%m-%d %H:%M:%S')) except ValueError: # If wrong t, set t to previous t plus 3 hours if datetimes: datetimes.append(datetimes[-1] + dt.timedelta(hours=3)) else: pos = list(times).index(time) time = times[pos + 1] - 1 / 24 * 3 datetimes.append( dt.datetime.strptime( str(nc.num2date(time, 'days since {}'.format('1858-11-17'), calendar='standard')), '%Y-%m-%d %H:%M:%S')) time_step = [] for i_time, time in enumerate(datetimes[1:], 1): time_step.append((time - datetimes[i_time - 1]).total_seconds() / 3600) time_step.append(time_step[-1]) basins = list() for basin in nc_data.variables['basin'][i_track, :][:val_len]: try: basins.extend([basin_dict[basin]]) except KeyError: basins.extend([np.nan]) lon = nc_data.variables['lon'][i_track, :][:val_len] lon[lon > 180] = lon[lon > 180] - 360 # change lon format to -180 to 180 lat = nc_data.variables['lat'][i_track, :][:val_len] cen_pres = nc_data.variables['pres'][i_track, :][:val_len] av_prec = nc_data.variables['precavg'][i_track, :][:val_len] max_prec = nc_data.variables['precmax'][i_track, :][:val_len] # m/s to kn wind = nc_data.variables['wind'][i_track, :][:val_len] * mps2kts * scale_to_10m if not all(wind.data): # if wind is empty wind = np.ones(wind.size) * -999.9 tr_df = pd.DataFrame({'time': datetimes, 'lat': lat, 'lon': lon, 'max_sustained_wind': wind, 'central_pressure': cen_pres, 'environmental_pressure': np.ones(lat.size) * 1015., 'radius_max_wind': np.ones(lat.size) * 65., 'maximum_precipitation': max_prec, 'average_precipitation': av_prec, 'basins': basins, 'time_step': time_step}) # construct xarray tr_ds = xr.Dataset.from_dataframe(tr_df.set_index('time')) tr_ds.coords['lat'] = ('time', tr_ds.lat) tr_ds.coords['lon'] = ('time', tr_ds.lon) tr_ds.attrs = {'max_sustained_wind_unit': 'kn', 'central_pressure_unit': 'mb', 'sid': sid, 'name': sid, 'orig_event_flag': False, 'basin': basins[0], 'id_no': i_track, 'category': set_category(wind, 'kn')} self.data.append(tr_ds)
[docs] def read_simulations_chaz(self, file_names, year_range=None, ensemble_nums=None): """Read track output from CHAZ simulations Lee, C.-Y., Tippett, M.K., Sobel, A.H., Camargo, S.J. (2018): An Environmentally Forced Tropical Cyclone Hazard Model. J Adv Model Earth Sy 10(1): 223–241. Parameters ---------- file_names : str or list of str Absolute file name(s) or folder name containing the files to read. year_range : tuple (min_year, max_year), optional Filter by year, if given. ensemble_nums : list, optional Filter by ensembleNum, if given. """ self.data = [] for path in get_file_names(file_names): LOGGER.info('Reading %s.', path) chaz_ds = xr.open_dataset(path) chaz_ds.time.attrs["units"] = "days since 1950-1-1" chaz_ds.time.attrs["missing_value"] = -54786.0 chaz_ds = xr.decode_cf(chaz_ds) chaz_ds['id_no'] = chaz_ds.stormID * 1000 + chaz_ds.ensembleNum for var in ['time', 'longitude', 'latitude']: chaz_ds[var] = chaz_ds[var].expand_dims(ensembleNum=chaz_ds.ensembleNum) chaz_ds = chaz_ds.stack(id=("ensembleNum", "stormID")) years_uniq = chaz_ds.time.dt.year.data years_uniq = np.unique(years_uniq[~np.isnan(years_uniq)]) LOGGER.info("File contains %s tracks (at most %s nodes each), " "representing %s years (%d-%d).", chaz_ds.id_no.size, chaz_ds.lifelength.size, years_uniq.size, years_uniq[0], years_uniq[-1]) # filter by year range if given if year_range: match = ((chaz_ds.time.dt.year >= year_range[0]) & (chaz_ds.time.dt.year <= year_range[1])).sel(lifelength=0) if np.count_nonzero(match) == 0: LOGGER.info('No tracks in time range (%s, %s).', *year_range) self.data = [] continue chaz_ds = chaz_ds.sel(id=match) # filter by ensembleNum if given if ensemble_nums is not None: match = np.isin(chaz_ds.ensembleNum.values, ensemble_nums) if np.count_nonzero(match) == 0: LOGGER.info('No tracks with specified ensemble numbers.') self.data = [] continue chaz_ds = chaz_ds.sel(id=match) # remove invalid tracks from selection chaz_ds['valid_t'] = chaz_ds.time.notnull() & chaz_ds.Mwspd.notnull() valid_st = chaz_ds.valid_t.any(dim="lifelength") invalid_st = np.nonzero(~valid_st.data)[0] if invalid_st.size > 0: LOGGER.info('No valid Mwspd values found for %d out of %d storm tracks.', invalid_st.size, valid_st.size) chaz_ds = chaz_ds.sel(id=valid_st) # estimate central pressure from location and max wind chaz_ds['pres'] = xr.full_like(chaz_ds.Mwspd, -1, dtype=float) chaz_ds['pres'][:] = _estimate_pressure( chaz_ds.pres, chaz_ds.latitude, chaz_ds.longitude, chaz_ds.Mwspd) # compute time stepsizes chaz_ds['time_step'] = xr.zeros_like(chaz_ds.time, dtype=float) chaz_ds['time_step'][1:, :] = (chaz_ds.time.diff(dim="lifelength") / np.timedelta64(1, 'h')) chaz_ds['time_step'][0, :] = chaz_ds.time_step[1, :] # determine Saffir-Simpson category max_wind = chaz_ds.Mwspd.max(dim="lifelength").data.ravel() category_test = (max_wind[:, None] < np.array(SAFFIR_SIM_CAT)[None]) chaz_ds['category'] = ("id", np.argmax(category_test, axis=1) - 1) fname = Path(path).name chaz_ds.time[:] = chaz_ds.time.dt.round('s').data chaz_ds['radius_max_wind'] = xr.full_like(chaz_ds.pres, np.nan) chaz_ds['environmental_pressure'] = xr.full_like(chaz_ds.pres, DEF_ENV_PRESSURE) chaz_ds["track_name"] = ("id", [f"{fname}-{track_id.item()[1]}-{track_id.item()[0]}" for track_id in chaz_ds.id]) # add tracks one by one last_perc = 0 for i_track in chaz_ds.id_no: perc = 100 * len(self.data) / chaz_ds.id_no.size if perc - last_perc >= 10: LOGGER.info("Progress: %d%%", perc) last_perc = perc track_ds = chaz_ds.sel(id=i_track.id.item()) track_ds = track_ds.sel(lifelength=track_ds.valid_t.data) self.data.append(xr.Dataset({ 'time_step': ('time', track_ds.time_step), 'max_sustained_wind': ('time', track_ds.Mwspd.data), 'central_pressure': ('time', track_ds.pres.data), 'radius_max_wind': ('time', track_ds.radius_max_wind.data), 'environmental_pressure': ('time', track_ds.environmental_pressure.data), }, coords={ 'time': track_ds.time.data, 'lat': ('time', track_ds.latitude.data), 'lon': ('time', track_ds.longitude.data), }, attrs={ 'max_sustained_wind_unit': 'kn', 'central_pressure_unit': 'mb', 'name': track_ds.track_name.item(), 'sid': track_ds.track_name.item(), 'orig_event_flag': True, 'data_provider': "CHAZ", 'basin': "global", 'id_no': track_ds.id_no.item(), 'category': track_ds.category.item(), })) if last_perc != 100: LOGGER.info("Progress: 100%")
[docs] def read_simulations_storm(self, path, years=None): """Read track output from STORM simulations Bloemendaal et al. (2020): Generation of a global synthetic tropical cyclone hazard dataset using STORM. Scientific Data 7(1): 40. Track data available for download from https://doi.org/10.4121/uuid:82c1dc0d-5485-43d8-901a-ce7f26cda35d Parameters ---------- path : str Full path to a txt-file as contained in the `data.zip` archive from the official source linked above. years : list of int, optional If given, only read the specified "years" from the txt-File. Note that a "year" refers to one ensemble of tracks in the data set that represents one sample year. """ self.data = [] basins = ["EP", "NA", "NI", "SI", "SP", "WP"] tracks_df = pd.read_csv(path, names=['year', 'time_start', 'tc_num', 'time_delta', 'basin', 'lat', 'lon', 'pres', 'wind', 'rmw', 'category', 'landfall', 'dist_to_land'], converters={ "time_start": lambda d: dt.datetime(1980, int(float(d)), 1, 0), "time_delta": lambda d: dt.timedelta(hours=3 * float(d)), "basin": lambda d: basins[int(float(d))], }, dtype={ "year": int, "tc_num": int, "category": int, }) # filter specified years if years is not None: tracks_df = tracks_df[np.isin(tracks_df['year'], years)] # a bug in the data causes some storm tracks to be double-listed: tracks_df = tracks_df.drop_duplicates(subset=["year", "tc_num", "time_delta"]) # conversion of units and time tracks_df['rmw'] *= (1 * ureg.kilometer).to(ureg.nautical_mile).magnitude tracks_df['wind'] *= (1 * ureg.meter / ureg.second).to(ureg.knot).magnitude tracks_df['time'] = tracks_df['time_start'] + tracks_df['time_delta'] tracks_df = tracks_df.drop( labels=['time_start', 'time_delta', 'landfall', 'dist_to_land'], axis=1) # add tracks one by one last_perc = 0 fname = Path(path).name groups = tracks_df.groupby(by=["year", "tc_num"]) for idx, group in groups: perc = 100 * len(self.data) / len(groups) if perc - last_perc >= 10: LOGGER.info("Progress: %d%%", perc) last_perc = perc track_name = f"{fname}-{idx[0]}-{idx[1]}" basin = group['basin'].values[0] env_pressure = DEF_ENV_PRESSURE if basin in BASIN_ENV_PRESSURE: env_pressure = BASIN_ENV_PRESSURE[basin] env_pressure = np.full_like(group['pres'].values, env_pressure) self.data.append(xr.Dataset({ 'time_step': ('time', np.full(group['time'].shape, 3)), 'max_sustained_wind': ('time', group['wind'].values), 'central_pressure': ('time', group['pres'].values), 'radius_max_wind': ('time', group['rmw'].values), 'environmental_pressure': ('time', env_pressure), }, coords={ 'time': ('time', group['time'].values), 'lat': ('time', group['lat'].values), 'lon': ('time', group['lon'].values), }, attrs={ 'max_sustained_wind_unit': 'kn', 'central_pressure_unit': 'mb', 'name': track_name, 'sid': track_name, 'orig_event_flag': True, 'data_provider': "STORM", 'basin': basin, 'id_no': idx[0] * 1000 + idx[1], 'category': group['category'].max(), })) if last_perc != 100: LOGGER.info("Progress: 100%")
[docs] def equal_timestep(self, time_step_h=1, land_params=False): """Generate interpolated track values to time steps of time_step_h. Parameters ---------- time_step_h : float or int, optional Temporal resolution in hours (positive, may be non-integer-valued). Default: 1. land_params : bool, optional If True, recompute `on_land` and `dist_since_lf` at each node. Default: False. """ if time_step_h <= 0: raise ValueError(f"time_step_h is not a positive number: {time_step_h}") LOGGER.info('Interpolating %s tracks to %sh time steps.', self.size, time_step_h) if land_params: extent = self.get_extent() land_geom = u_coord.get_land_geometry(extent, resolution=10) else: land_geom = None if self.pool: chunksize = min(self.size // self.pool.ncpus, 1000) self.data = self.pool.map(self._one_interp_data, self.data, itertools.repeat(time_step_h, self.size), itertools.repeat(land_geom, self.size), chunksize=chunksize) else: last_perc = 0 new_data = [] for track in self.data: # progress indicator perc = 100 * len(new_data) / len(self.data) if perc - last_perc >= 10: LOGGER.debug("Progress: %d%%", perc) last_perc = perc new_data.append( self._one_interp_data(track, time_step_h, land_geom)) self.data = new_data
[docs] def calc_random_walk(self, **kwargs): """Deprecated. Use `TCTracks.calc_perturbed_trajectories` instead.""" LOGGER.warning("The use of TCTracks.calc_random_walk is deprecated." "Use TCTracks.calc_perturbed_trajectories instead.") if kwargs.get('ens_size'): kwargs['nb_synth_tracks'] = kwargs.pop('ens_size') return self.calc_perturbed_trajectories(**kwargs)
[docs] def calc_perturbed_trajectories(self, **kwargs): """See function in `climada.hazard.tc_tracks_synth`.""" climada.hazard.tc_tracks_synth.calc_perturbed_trajectories(self, **kwargs)
@property def size(self): """Get longitude from coord array.""" return len(self.data)
[docs] def get_bounds(self, deg_buffer=0.1): """Get bounds as (lon_min, lat_min, lon_max, lat_max) tuple. Parameters ---------- deg_buffer : float A buffer to add around the bounding box Returns ------- bounds : tuple (lon_min, lat_min, lon_max, lat_max) """ bounds = u_coord.latlon_bounds( np.concatenate([t.lat.values for t in self.data]), np.concatenate([t.lon.values for t in self.data]), buffer=deg_buffer) return bounds
@property def bounds(self): """Exact bounds of trackset as tuple, no buffer.""" return self.get_bounds(deg_buffer=0.0)
[docs] def get_extent(self, deg_buffer=0.1): """Get extent as (lon_min, lon_max, lat_min, lat_max) tuple. Parameters ---------- deg_buffer : float A buffer to add around the bounding box Returns ------- extent : tuple (lon_min, lon_max, lat_min, lat_max) """ bounds = self.get_bounds(deg_buffer=deg_buffer) return (bounds[0], bounds[2], bounds[1], bounds[3])
@property def extent(self): """Exact extent of trackset as tuple, no buffer.""" return self.get_extent(deg_buffer=0.0)
[docs] def plot(self, axis=None, figsize=(9, 13), legend=True, **kwargs): """Track over earth. Historical events are blue, probabilistic black. Parameters ---------- axis : matplotlib.axes._subplots.AxesSubplot, optional axis to use figsize: (float, float), optional figure size for plt.subplots The default is (9, 13) legend : bool, optional whether to display a legend of Tropical Cyclone categories. Default: True. kwargs : optional arguments for LineCollection matplotlib, e.g. alpha=0.5 Returns ------- axis : matplotlib.axes._subplots.AxesSubplot """ if 'lw' not in kwargs: kwargs['lw'] = 2 if 'transform' not in kwargs: kwargs['transform'] = ccrs.PlateCarree() if not self.size: LOGGER.info('No tracks to plot') return None extent = self.get_extent(deg_buffer=1) mid_lon = 0.5 * (extent[1] + extent[0]) if not axis: proj = ccrs.PlateCarree(central_longitude=mid_lon) _, axis = u_plot.make_map(proj=proj, figsize=figsize) axis.set_extent(extent, crs=kwargs['transform']) u_plot.add_shapes(axis) synth_flag = False cmap = ListedColormap(colors=CAT_COLORS) norm = BoundaryNorm([0] + SAFFIR_SIM_CAT, len(SAFFIR_SIM_CAT)) for track in self.data: lonlat = np.stack([track.lon.values, track.lat.values], axis=-1) lonlat[:, 0] = u_coord.lon_normalize(lonlat[:, 0], center=mid_lon) segments = np.stack([lonlat[:-1], lonlat[1:]], axis=1) # remove segments which cross 180 degree longitude boundary segments = segments[segments[:, 0, 0] * segments[:, 1, 0] >= 0, :, :] if track.orig_event_flag: track_lc = LineCollection(segments, cmap=cmap, norm=norm, linestyle='solid', **kwargs) else: synth_flag = True track_lc = LineCollection(segments, cmap=cmap, norm=norm, linestyle=':', **kwargs) track_lc.set_array(track.max_sustained_wind.values) axis.add_collection(track_lc) if legend: leg_lines = [Line2D([0], [0], color=CAT_COLORS[i_col], lw=2) for i_col in range(len(SAFFIR_SIM_CAT))] leg_names = [CAT_NAMES[i_col] for i_col in sorted(CAT_NAMES.keys())] if synth_flag: leg_lines.append(Line2D([0], [0], color='grey', lw=2, ls='solid')) leg_lines.append(Line2D([0], [0], color='grey', lw=2, ls=':')) leg_names.append('Historical') leg_names.append('Synthetic') axis.legend(leg_lines, leg_names, loc=0) return axis
[docs] def write_netcdf(self, folder_name): """Write a netcdf file per track with track.sid name in given folder. Parameters ---------- folder_name : str Folder name where to write files. """ list_path = [Path(folder_name, track.sid + '.nc') for track in self.data] LOGGER.info('Writting %s files.', self.size) for track in self.data: track.attrs['orig_event_flag'] = int(track.orig_event_flag) xr.save_mfdataset(self.data, list_path)
[docs] def read_netcdf(self, folder_name): """Read all netcdf files contained in folder and fill a track per file. Parameters ---------- folder_name : str Folder name where to write files. """ file_tr = get_file_names(folder_name) LOGGER.info('Reading %s files.', len(file_tr)) self.data = list() for file in file_tr: if Path(file).suffix != '.nc': continue track = xr.open_dataset(file) track.attrs['orig_event_flag'] = bool(track.orig_event_flag) self.data.append(track)
[docs] def to_geodataframe(self, as_points=False, split_lines_antimeridian=True): """Transform this TCTracks instance into a GeoDataFrame. Parameters ---------- as_points : bool, optional If False (default), one feature (row) per track with a LineString or MultiLineString as geometry (or Point geometry for tracks of length one) and all track attributes (sid, name, orig_event_flag, etc) as dataframe columns. If True, one feature (row) per track time step, with variable values per time step (radius_max_wind, max_sustained_wind, etc) as columns in addition to attributes. split_lines_antimeridian : bool, optional If True, tracks that cross the antimeridian are split into multiple Lines as a MultiLineString, with each Line on either side of the meridian. This ensures all Lines are within (-180, +180) degrees longitude. Note that lines might be split at more locations than strictly necessary, due to the underlying splitting algorithm (https://github.com/Toblerity/Shapely/issues/572). Returns ------- gdf : GeoDataFrame """ gdf = gpd.GeoDataFrame( [dict(track.attrs) for track in self.data] ) if as_points: gdf_long = pd.concat([track.to_dataframe().assign(idx=i) for i, track in enumerate(self.data)]) gdf_long['lon'] = u_coord.lon_normalize(gdf_long['lon'].values.copy()) gdf_long['geometry'] = gdf_long.apply(lambda x: Point(x['lon'], x['lat']), axis=1) gdf_long = gdf_long.drop(columns=['lon', 'lat']) gdf_long = gpd.GeoDataFrame(gdf_long.reset_index().set_index('idx'), geometry='geometry', crs=DEF_CRS) gdf = gdf_long.join(gdf) elif split_lines_antimeridian: # enforce longitudes to be within [-180, 180] range t_lons = [u_coord.lon_normalize(t.lon.values.copy()) for t in self.data] t_lats = [t.lat.values for t in self.data] # LineString only works with more than one lat/lon pair gdf.geometry = gpd.GeoSeries([ LineString(np.c_[lons, lats]) if lons.size > 1 else Point(lons, lats) for lons, lats in zip(t_lons, t_lats) ]) gdf.crs = DEF_CRS # for splitting, restrict to tracks that come close to the antimeridian t_split_mask = np.asarray([ (lon > 170).any() and (lon < -170).any() and lon.size > 1 for lon in t_lons]) # note that tracks might be splitted at self-intersections as well: # https://github.com/Toblerity/Shapely/issues/572 antimeridian = LineString([(180, -90), (180, 90)]) gdf.loc[t_split_mask, "geometry"] = gdf.geometry[t_split_mask] \ .to_crs({"proj": "longlat", "lon_wrap": 180}) \ .apply(lambda line: MultiLineString([ LineString([(x - 360, y) for x, y in segment.coords]) if any(x > 180 for x, y in segment.coords) else segment for segment in shapely.ops.split(line, antimeridian).geoms ])) else: # LineString only works with more than one lat/lon pair gdf.geometry = gpd.GeoSeries([ LineString(np.c_[track.lon, track.lat]) if track.lon.size > 1 else Point(track.lon.data, track.lat.data) for track in self.data ]) gdf.crs = DEF_CRS return gdf
@staticmethod @jit(parallel=True, forceobj=True) def _one_interp_data(track, time_step_h, land_geom=None): """Interpolate values of one track. Parameters ---------- track : xr.Dataset Track data. time_step_h : int or float Desired temporal resolution in hours (may be non-integer-valued). land_geom : shapely.geometry.multipolygon.MultiPolygon, optional Land geometry. If given, recompute `dist_since_lf` and `on_land` property. Returns ------- track_int : xr.Dataset """ if track.time.size >= 2: method = ['linear', 'quadratic', 'cubic'][min(2, track.time.size - 2)] # handle change of sign in longitude lon = track.lon.copy() if (lon < -170).any() and (lon > 170).any(): # crosses 180 degrees east/west -> use positive degrees east lon[lon < 0] += 360 time_step = pd.tseries.frequencies.to_offset(pd.Timedelta(hours=time_step_h)).freqstr track_int = track.resample(time=time_step, keep_attrs=True, skipna=True)\ .interpolate('linear') track_int['time_step'][:] = time_step_h lon_int = lon.resample(time=time_step).interpolate(method) lon_int[lon_int > 180] -= 360 track_int.coords['lon'] = lon_int track_int.coords['lat'] = track.lat.resample(time=time_step)\ .interpolate(method) track_int.attrs['category'] = set_category( track_int.max_sustained_wind.values, track_int.max_sustained_wind_unit) # restrict to time steps within original bounds track_int = track_int.sel( time=(track.time[0] <= track_int.time) & (track_int.time <= track.time[-1])) else: LOGGER.warning('Track interpolation not done. ' 'Not enough elements for %s', track.name) track_int = track if land_geom: track_land_params(track_int, land_geom) return track_int def _read_ibtracs_csv_single(self, file_name): """Read IBTrACS track file in CSV format. Parameters ---------- file_name : str File name of CSV file. """ LOGGER.info('Reading %s', file_name) dfr = pd.read_csv(file_name) name = dfr['ibtracsID'].values[0] datetimes = list() for time in dfr['isotime'].values: year = np.fix(time / 1e6) time = time - year * 1e6 month = np.fix(time / 1e4) time = time - month * 1e4 day = np.fix(time / 1e2) hour = time - day * 1e2 datetimes.append(dt.datetime(int(year), int(month), int(day), int(hour))) lat = dfr['cgps_lat'].values.astype('float') lon = dfr['cgps_lon'].values.astype('float') cen_pres = dfr['pcen'].values.astype('float') max_sus_wind = dfr['vmax'].values.astype('float') max_sus_wind_unit = 'kn' if np.any(cen_pres <= 0): # Warning: If any pressure value is invalid, this enforces to use # estimated pressure values everywhere! cen_pres[:] = -999 cen_pres = _estimate_pressure(cen_pres, lat, lon, max_sus_wind) tr_ds = xr.Dataset() tr_ds.coords['time'] = ('time', datetimes) tr_ds.coords['lat'] = ('time', lat) tr_ds.coords['lon'] = ('time', lon) tr_ds['time_step'] = ('time', dfr['tint'].values) tr_ds['radius_max_wind'] = ('time', dfr['rmax'].values.astype('float')) tr_ds['max_sustained_wind'] = ('time', max_sus_wind) tr_ds['central_pressure'] = ('time', cen_pres) tr_ds['environmental_pressure'] = ('time', dfr['penv'].values.astype('float')) tr_ds.attrs['max_sustained_wind_unit'] = max_sus_wind_unit tr_ds.attrs['central_pressure_unit'] = 'mb' tr_ds.attrs['name'] = name tr_ds.attrs['sid'] = name tr_ds.attrs['orig_event_flag'] = bool(dfr['original_data']. values[0]) tr_ds.attrs['data_provider'] = dfr['data_provider'].values[0] tr_ds.attrs['basin'] = dfr['gen_basin'].values[0] try: tr_ds.attrs['id_no'] = float(name.replace('N', '0'). replace('S', '1')) except ValueError: tr_ds.attrs['id_no'] = float(str(datetimes[0].date()). replace('-', '')) tr_ds.attrs['category'] = set_category(max_sus_wind, max_sus_wind_unit) self.data.append(tr_ds)
def track_land_params(track, land_geom): """Compute parameters of land for one track. Parameters ---------- track : xr.Dataset tropical cyclone track land_geom : shapely.geometry.multipolygon.MultiPolygon land geometry """ track['on_land'] = ('time', u_coord.coord_on_land(track.lat.values, track.lon.values, land_geom)) track['dist_since_lf'] = ('time', _dist_since_lf(track)) def _dist_since_lf(track): """Compute the distance to landfall in km point for every point on land. Parameters ---------- track : xr.Dataset Single tropical cyclone track. Returns ------- dist : np.arrray Distances in km, points on water get nan values. """ dist_since_lf = np.zeros(track.time.values.shape) # Index in sea that follows a land index sea_land_idx = np.where(np.diff(track.on_land.astype(int)) == 1)[0] if not sea_land_idx.size: return (dist_since_lf + 1) * np.nan # Index in sea that comes from previous land index land_sea_idx = np.where(np.diff(track.on_land.astype(int)) == -1)[0] + 1 if track.on_land[-1]: land_sea_idx = np.append(land_sea_idx, track.time.size) orig_lf = np.empty((sea_land_idx.size, 2)) for i_lf, lf_point in enumerate(sea_land_idx): orig_lf[i_lf][0] = track.lat[lf_point] + \ (track.lat[lf_point + 1] - track.lat[lf_point]) / 2 orig_lf[i_lf][1] = track.lon[lf_point] + \ (track.lon[lf_point + 1] - track.lon[lf_point]) / 2 dist = DistanceMetric.get_metric('haversine') nodes1 = np.radians(np.array([track.lat.values[1:], track.lon.values[1:]]).transpose()) nodes0 = np.radians(np.array([track.lat.values[:-1], track.lon.values[:-1]]).transpose()) dist_since_lf[1:] = dist.pairwise(nodes1, nodes0).diagonal() dist_since_lf[~track.on_land.values] = 0.0 nodes1 = np.array([track.lat.values[sea_land_idx + 1], track.lon.values[sea_land_idx + 1]]).transpose() / 180 * np.pi dist_since_lf[sea_land_idx + 1] = \ dist.pairwise(nodes1, orig_lf / 180 * np.pi).diagonal() for sea_land, land_sea in zip(sea_land_idx, land_sea_idx): dist_since_lf[sea_land + 1:land_sea] = \ np.cumsum(dist_since_lf[sea_land + 1:land_sea]) dist_since_lf *= EARTH_RADIUS_KM dist_since_lf[~track.on_land.values] = np.nan return dist_since_lf def _estimate_pressure(cen_pres, lat, lon, v_max): """Replace missing pressure values with statistical estimate. In addition to NaNs, negative values and zeros in `cen_pres` are interpreted as missing values. See function `ibtracs_fit_param` for more details about the statistical estimation: >>> ibtracs_fit_param('pres', ['lat', 'lon', 'wind'], year_range=(1980, 2020)) >>> r^2: 0.8726728075520206 Parameters ---------- cen_pres : array-like Central pressure values along track in hPa (mbar). lat : array-like Latitudinal coordinates of eye location. lon : array-like Longitudinal coordinates of eye location. v_max : array-like Maximum wind speed along track in knots. Returns ------- cen_pres_estimated : np.array Estimated central pressure values in hPa (mbar). """ cen_pres = np.where(np.isnan(cen_pres), -1, cen_pres) v_max = np.where(np.isnan(v_max), -1, v_max) lat, lon = [np.where(np.isnan(ar), -999, ar) for ar in [lat, lon]] msk = (cen_pres <= 0) & (v_max > 0) & (lat > -999) & (lon > -999) c_const, c_lat, c_lon, c_vmax = 1026.3401, -0.05504, -0.03536, -0.7357 cen_pres[msk] = c_const + c_lat * lat[msk] \ + c_lon * lon[msk] \ + c_vmax * v_max[msk] return np.where(cen_pres <= 0, np.nan, cen_pres) def _estimate_vmax(v_max, lat, lon, cen_pres): """Replace missing wind speed values with a statistical estimate. In addition to NaNs, negative values and zeros in `v_max` are interpreted as missing values. See function `ibtracs_fit_param` for more details about the statistical estimation: >>> ibtracs_fit_param('wind', ['lat', 'lon', 'pres'], year_range=(1980, 2020)) >>> r^2: 0.8683725434617979 Parameters ---------- v_max : array-like Maximum wind speed along track in knots. lat : array-like Latitudinal coordinates of eye location. lon : array-like Longitudinal coordinates of eye location. cen_pres : array-like Central pressure values along track in hPa (mbar). Returns ------- v_max_estimated : np.array Estimated maximum wind speed values in knots. """ v_max = np.where(np.isnan(v_max), -1, v_max) cen_pres = np.where(np.isnan(cen_pres), -1, cen_pres) lat, lon = [np.where(np.isnan(ar), -999, ar) for ar in [lat, lon]] msk = (v_max <= 0) & (cen_pres > 0) & (lat > -999) & (lon > -999) c_const, c_lat, c_lon, c_pres = 1216.5223, -0.04086, -0.04190, -1.1797 v_max[msk] = c_const + c_lat * lat[msk] \ + c_lon * lon[msk] \ + c_pres * cen_pres[msk] return np.where(v_max <= 0, np.nan, v_max) def estimate_roci(roci, cen_pres): """Replace missing radius (ROCI) values with statistical estimate. In addition to NaNs, negative values and zeros in `roci` are interpreted as missing values. See function `ibtracs_fit_param` for more details about the statistical estimation: >>> ibtracs_fit_param('roci', ['pres'], ... order=[(872, 950, 985, 1005, 1021)], ... year_range=(1980, 2019)) >>> r^2: 0.9148320406675339 Parameters ---------- roci : array-like ROCI values along track in km. cen_pres : array-like Central pressure values along track in hPa (mbar). Returns ------- roci_estimated : np.array Estimated ROCI values in km. """ roci = np.where(np.isnan(roci), -1, roci) cen_pres = np.where(np.isnan(cen_pres), -1, cen_pres) msk = (roci <= 0) & (cen_pres > 0) pres_l = [872, 950, 985, 1005, 1021] roci_l = [210.711487, 215.897110, 198.261520, 159.589508, 90.900116] roci[msk] = 0 for i, pres_l_i in enumerate(pres_l): slope_0 = 1. / (pres_l_i - pres_l[i - 1]) if i > 0 else 0 slope_1 = 1. / (pres_l[i + 1] - pres_l_i) if i + 1 < len(pres_l) else 0 roci[msk] += roci_l[i] * np.fmax(0, (1 - slope_0 * np.fmax(0, pres_l_i - cen_pres[msk]) - slope_1 * np.fmax(0, cen_pres[msk] - pres_l_i))) return np.where(roci <= 0, np.nan, roci) def estimate_rmw(rmw, cen_pres): """Replace missing radius (RMW) values with statistical estimate. In addition to NaNs, negative values and zeros in `rmw` are interpreted as missing values. See function `ibtracs_fit_param` for more details about the statistical estimation: >>> ibtracs_fit_param('rmw', ['pres'], order=[(872, 940, 980, 1021)], year_range=(1980, 2019)) >>> r^2: 0.7905970811843872 Parameters ---------- rmw : array-like RMW values along track in km. cen_pres : array-like Central pressure values along track in hPa (mbar). Returns ------- rmw : np.array Estimated RMW values in km. """ rmw = np.where(np.isnan(rmw), -1, rmw) cen_pres = np.where(np.isnan(cen_pres), -1, cen_pres) msk = (rmw <= 0) & (cen_pres > 0) pres_l = [872, 940, 980, 1021] rmw_l = [14.907318, 15.726927, 25.742142, 56.856522] rmw[msk] = 0 for i, pres_l_i in enumerate(pres_l): slope_0 = 1. / (pres_l_i - pres_l[i - 1]) if i > 0 else 0 slope_1 = 1. / (pres_l[i + 1] - pres_l_i) if i + 1 < len(pres_l) else 0 rmw[msk] += rmw_l[i] * np.fmax(0, (1 - slope_0 * np.fmax(0, pres_l_i - cen_pres[msk]) - slope_1 * np.fmax(0, cen_pres[msk] - pres_l_i))) return np.where(rmw <= 0, np.nan, rmw) def ibtracs_fit_param(explained, explanatory, year_range=(1980, 2019), order=1): """Statistically fit an ibtracs parameter to other ibtracs variables. A linear ordinary least squares fit is done using the statsmodels package. Parameters ---------- explained : str Name of explained variable. explanatory : iterable Names of explanatory variables. year_range : tuple First and last year to include in the analysis. order : int or tuple The maximal order of the explanatory variables. Returns ------- result : OLSResults """ wmo_vars = ['wind', 'pres', 'rmw', 'roci', 'poci'] all_vars = ['lat', 'lon'] + wmo_vars explanatory = list(explanatory) variables = explanatory + [explained] for var in variables: if var not in all_vars: LOGGER.error("Unknown ibtracs variable: %s", var) raise KeyError # load ibtracs dataset fn_nc = SYSTEM_DIR.joinpath('IBTrACS.ALL.v04r00.nc') ibtracs_ds = xr.open_dataset(fn_nc) # choose specified year range years = ibtracs_ds.sid.str.slice(0, 4).astype(int) match = (years >= year_range[0]) & (years <= year_range[1]) ibtracs_ds = ibtracs_ds.sel(storm=match) if "wind" in variables: for agency in IBTRACS_AGENCIES: scale, shift = IBTRACS_AGENCY_1MIN_WIND_FACTOR[agency] ibtracs_ds[f'{agency}_wind'] -= shift ibtracs_ds[f'{agency}_wind'] /= scale # fill values agency_pref, track_agency_ix = ibtracs_track_agency(ibtracs_ds) for var in wmo_vars: if var not in variables: continue # array of values in order of preference cols = [f'{a}_{var}' for a in agency_pref] cols = [col for col in cols if col in ibtracs_ds.data_vars.keys()] all_vals = ibtracs_ds[cols].to_array(dim='agency') preferred_ix = all_vals.notnull().argmax(dim='agency') if var in ['wind', 'pres']: # choice: wmo -> wmo_agency/usa_agency -> preferred ibtracs_ds[var] = ibtracs_ds['wmo_' + var] \ .fillna(all_vals.isel(agency=track_agency_ix)) \ .fillna(all_vals.isel(agency=preferred_ix)) else: ibtracs_ds[var] = all_vals.isel(agency=preferred_ix) fit_df = pd.DataFrame({var: ibtracs_ds[var].values.ravel() for var in variables}) fit_df = fit_df.dropna(axis=0, how='any').reset_index(drop=True) if 'lat' in explanatory: fit_df['lat'] = fit_df['lat'].abs() # prepare explanatory variables d_explanatory = fit_df[explanatory] if isinstance(order, int): order = (order,) * len(explanatory) add_const = False for ex, max_o in zip(explanatory, order): if isinstance(max_o, tuple): if fit_df[ex].min() > max_o[0]: print(f"Minimum data value is {fit_df[ex].min()} > {max_o[0]}.") if fit_df[ex].max() < max_o[-1]: print(f"Maximum data value is {fit_df[ex].max()} < {max_o[-1]}.") # piecewise linear with given break points d_explanatory = d_explanatory.drop(labels=[ex], axis=1) for i, max_o_i in enumerate(max_o): col = f'{ex}{max_o_i}' slope_0 = 1. / (max_o_i - max_o[i - 1]) if i > 0 else 0 slope_1 = 1. / (max_o[i + 1] - max_o_i) if i + 1 < len(max_o) else 0 d_explanatory[col] = np.fmax(0, (1 - slope_0 * np.fmax(0, max_o_i - fit_df[ex]) - slope_1 * np.fmax(0, fit_df[ex] - max_o_i))) elif max_o < 0: d_explanatory = d_explanatory.drop(labels=[ex], axis=1) for order in range(1, abs(max_o) + 1): d_explanatory[f'{ex}^{-order}'] = fit_df[ex]**(-order) add_const = True else: for order in range(2, max_o + 1): d_explanatory[f'{ex}^{order}'] = fit_df[ex]**order add_const = True d_explained = fit_df[[explained]] if add_const: d_explanatory['const'] = 1.0 # run statistical fit sm_results = sm.OLS(d_explained, d_explanatory).fit() # print results print(sm_results.params) print("r^2:", sm_results.rsquared) return sm_results def ibtracs_track_agency(ds_sel): """Get preferred IBTrACS agency for each entry in the dataset. Parameters ---------- ds_sel : xarray.Dataset Subselection of original IBTrACS NetCDF dataset. Returns ------- agency_pref : list of str Names of IBTrACS agencies in order of preference. track_agency_ix : xarray.DataArray of ints For each entry in `ds_sel`, the agency to use, given as an index into `agency_pref`. """ agency_pref = ["wmo"] + IBTRACS_AGENCIES agency_map = {a.encode('utf-8'): i for i, a in enumerate(agency_pref)} agency_map.update({ a.encode('utf-8'): agency_map[b'usa'] for a in IBTRACS_USA_AGENCIES }) agency_map[b''] = agency_map[b'wmo'] agency_fun = lambda x: agency_map[x] if "track_agency" not in ds_sel.data_vars.keys(): ds_sel['track_agency'] = ds_sel.wmo_agency.where(ds_sel.wmo_agency != b'', ds_sel.usa_agency) track_agency_ix = xr.apply_ufunc(agency_fun, ds_sel.track_agency, vectorize=True) return agency_pref, track_agency_ix def ibtracs_add_official_variable(ibtracs_ds, tc_var, add_3h=False): """Add variables for the officially responsible agencies to an IBTrACS dataset This function adds new variables to the xarray.Dataset `ibtracs_ds` that contain values of the specified TC variable `var` that have been reported by the officially responsible agencies. For example, if `tc_var` is "wind", there will be a new variable "official_wind" and, if `add_3h` is True, an additional variable "official_3h_wind". Parameters ---------- ibtracs_ds : xarray.Dataset Subselection of original IBTrACS NetCDF dataset. tc_var : str Name of variable for which to add an "official" version, e.g. "lat", "wind", "pres". add_3h : bool, optional Optionally, add an "official_3h" version where also 3-hourly data by the officially reporting agencies is included (if available). Default: False """ if "nan_var" not in ibtracs_ds.data_vars.keys(): # add an array full of NaN as a fallback value in the procedure ibtracs_ds['nan_var'] = xr.full_like(ibtracs_ds.lat, np.nan) # determine which of the official agencies report this variable at all available_agencies = [a for a in IBTRACS_AGENCIES if f'{a}_{tc_var}' in ibtracs_ds.data_vars.keys()] # map all non-reporting agency variables to the 'nan_var' (0) agency_map = { a.encode("utf-8"): available_agencies.index(a) + 1 if a in available_agencies else 0 for a in [''] + IBTRACS_AGENCIES } agency_map.update({ a.encode('utf-8'): agency_map[b'usa'] for a in IBTRACS_USA_AGENCIES }) # read from officially responsible agencies that report this variable, but only # at official reporting times (usually 6-hourly) official_agency_ix = xr.apply_ufunc( lambda x: agency_map[x], ibtracs_ds.wmo_agency, vectorize=True) available_cols = ['nan_var'] + [f'{a}_{tc_var}' for a in available_agencies] all_vals = ibtracs_ds[available_cols].to_array(dim='agency') ibtracs_ds[f'official_{tc_var}'] = all_vals.isel(agency=official_agency_ix) if add_3h: # create a copy in float for NaN interpolation official_agency_ix_interp = official_agency_ix.astype(np.float16) # extrapolate track agency for tracks with only a single record mask_singular = ((official_agency_ix_interp > 0).sum(dim="date_time") == 1).values official_agency_ix_interp.values[mask_singular,:] = \ official_agency_ix_interp.sel(storm=mask_singular).max(dim="date_time").values[:,None] with warnings.catch_warnings(): # See https://github.com/pydata/xarray/issues/4167 warnings.simplefilter(action="ignore", category=FutureWarning) # interpolate responsible agencies using nearest neighbor interpolation official_agency_ix_interp.values[official_agency_ix_interp.values == 0.0] = np.nan official_agency_ix_interp = official_agency_ix_interp.interpolate_na( dim="date_time", method="nearest", fill_value="extrapolate") # read from officially responsible agencies that report this variable, including # 3-hour time steps if available official_agency_ix_interp.values[official_agency_ix_interp.isnull().values] = 0.0 ibtracs_ds[f'official_3h_{tc_var}'] = all_vals.isel( agency=official_agency_ix_interp.astype(int)) def _change_max_wind_unit(wind, unit_orig, unit_dest): """Compute maximum wind speed in unit_dest. Parameters ---------- wind : np.array Wind speed values in original units. unit_orig : str Original units of wind speed. unit_dest : str New units of the computed maximum wind speed. Returns ------- maxwind : double Maximum wind speed in specified wind speed units. """ if unit_orig in ('kn', 'kt'): ur_orig = ureg.knot elif unit_orig == 'mph': ur_orig = ureg.mile / ureg.hour elif unit_orig == 'm/s': ur_orig = ureg.meter / ureg.second elif unit_orig == 'km/h': ur_orig = ureg.kilometer / ureg.hour else: LOGGER.error('Unit not recognised %s.', unit_orig) raise ValueError if unit_dest in ('kn', 'kt'): ur_dest = ureg.knot elif unit_dest == 'mph': ur_dest = ureg.mile / ureg.hour elif unit_dest == 'm/s': ur_dest = ureg.meter / ureg.second elif unit_dest == 'km/h': ur_dest = ureg.kilometer / ureg.hour else: LOGGER.error('Unit not recognised %s.', unit_dest) raise ValueError return (np.nanmax(wind) * ur_orig).to(ur_dest).magnitude
[docs]def set_category(max_sus_wind, wind_unit='kn', saffir_scale=None): """Add storm category according to Saffir-Simpson hurricane scale. Parameters ---------- max_sus_wind : np.array Maximum sustained wind speed records for a single track. wind_unit : str, optional Units of wind speed. Default: 'kn'. saffir_scale : list, optional Saffir-Simpson scale in same units as wind (default scale valid for knots). Returns ------- category : int Intensity of given track according to the Saffir-Simpson hurricane scale: * -1 : tropical depression * 0 : tropical storm * 1 : Hurricane category 1 * 2 : Hurricane category 2 * 3 : Hurricane category 3 * 4 : Hurricane category 4 * 5 : Hurricane category 5 """ if saffir_scale is None: saffir_scale = SAFFIR_SIM_CAT if wind_unit != 'kn': max_sus_wind = _change_max_wind_unit(max_sus_wind, wind_unit, 'kn') max_wind = np.nanmax(max_sus_wind) try: return (np.argwhere(max_wind < saffir_scale) - 1)[0][0] except IndexError: return -1