Source code for climada.hazard.base

"""
This file is part of CLIMADA.

Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.

CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.

CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.  See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.

---

Define Hazard.
"""

__all__ = ["Hazard"]

import copy
import datetime as dt
import logging
import warnings
from typing import List, Optional

import geopandas as gpd
import numpy as np
from deprecation import deprecated
from pathos.pools import ProcessPool as Pool
from scipy import sparse

import climada.util.checker as u_check
import climada.util.constants as u_const
import climada.util.coordinates as u_coord
import climada.util.dates_times as u_dt
import climada.util.interpolation as u_interp
from climada import CONFIG
from climada.hazard.centroids.centr import Centroids
from climada.hazard.io import HazardIO
from climada.hazard.plot import HazardPlot
from climada.util.value_representation import safe_divide

LOGGER = logging.getLogger(__name__)


[docs] class Hazard(HazardIO, HazardPlot): """ Contains events of some hazard type defined at centroids. Loads from files with format defined in FILE_EXT. Attention --------- This class uses instances of `scipy.sparse.csr_matrix <https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.csr_matrix.html>`_ to store :py:attr:`intensity` and :py:attr:`fraction`. This data types comes with its particular pitfalls. Depending on how the objects are instantiated and modified, a matrix might end up in a "non-canonical" state. In this state, its ``.data`` attribute does not necessarily represent the values apparent in the final matrix. In particular, a "non-canonical" matrix may store "duplicates", i.e. multiple values that map to the same matrix position. This is supported, and the default behavior is to sum up these values. To avoid any inconsistencies, call :py:meth:`check_matrices` before accessing the ``data`` attribute of either matrix. This will explicitly sum all values at the same matrix position and eliminate explicit zeros. Attributes ---------- haz_type : str two-letters hazard-type string, e.g., "TC" (tropical cyclone), "RF" (river flood) or "WF" (wild fire). Note: The acronym is used as reference to the hazard when centroids of multiple hazards are assigned to an ``Exposures`` object. units : str units of the intensity centroids : Centroids centroids of the events event_id : np.array id (>0) of each event event_name : list(str) name of each event (default: event_id) date : np.array integer date corresponding to the proleptic Gregorian ordinal, where January 1 of year 1 has ordinal 1 (ordinal format of datetime library) orig : np.array flags indicating historical events (True) or probabilistic (False) frequency : np.array frequency of each event frequency_unit : str unit of the frequency (default: "1/year") intensity : sparse.csr_matrix intensity of the events at centroids fraction : sparse.csr_matrix fraction of affected exposures for each event at each centroid. If empty (all 0), it is ignored in the impact computations (i.e., is equivalent to fraction is 1 everywhere). """ intensity_thres = 10 """Intensity threshold per hazard used to filter lower intensities. To be set for every hazard type""" vars_oblig = { "units", "centroids", "event_id", "frequency", "intensity", "fraction", } """Name of the variables needed to compute the impact. Types: scalar, str, list, 1dim np.array of size num_events, scipy.sparse matrix of shape num_events x num_centroids, Centroids.""" vars_def = {"date", "orig", "event_name", "frequency_unit"} """Name of the variables used in impact calculation whose value is descriptive and can therefore be set with default values. Types: scalar, string, list, 1dim np.array of size num_events. """ vars_opt = set() """Name of the variables that aren't need to compute the impact. Types: scalar, string, list, 1dim np.array of size num_events."""
[docs] def __init__( self, haz_type: str = "", pool: Optional[Pool] = None, units: str = "", centroids: Optional[Centroids] = None, event_id: Optional[np.ndarray] = None, frequency: Optional[np.ndarray] = None, frequency_unit: str = u_const.DEF_FREQ_UNIT, event_name: Optional[List[str]] = None, date: Optional[np.ndarray] = None, orig: Optional[np.ndarray] = None, intensity: Optional[sparse.csr_matrix] = None, fraction: Optional[sparse.csr_matrix] = None, ): """ Initialize values. Parameters ---------- haz_type : str, optional acronym of the hazard type (e.g. 'TC'). pool : pathos.pool, optional Pool that will be used for parallel computation when applicable. Default: None units : str, optional units of the intensity. Defaults to empty string. centroids : Centroids, optional centroids of the events. Defaults to empty Centroids object. event_id : np.array, optional id (>0) of each event. Defaults to empty array. event_name : list(str), optional name of each event (default: event_id). Defaults to empty list. date : np.array, optional integer date corresponding to the proleptic Gregorian ordinal, where January 1 of year 1 has ordinal 1 (ordinal format of datetime library). Defaults to empty array. orig : np.array, optional flags indicating historical events (True) or probabilistic (False). Defaults to empty array. frequency : np.array, optional frequency of each event. Defaults to empty array. frequency_unit : str, optional unit of the frequency (default: "1/year"). intensity : sparse.csr_matrix, optional intensity of the events at centroids. Defaults to empty matrix. fraction : sparse.csr_matrix, optional fraction of affected exposures for each event at each centroid. Defaults to empty matrix. Examples -------- Initialize using keyword arguments: >>> haz = Hazard('TC', intensity=sparse.csr_matrix(np.zeros((2, 2)))) Take hazard values from file: >>> haz = Hazard.from_hdf5(HAZ_DEMO_H5) """ self.haz_type = haz_type self.units = units self.centroids = ( centroids if centroids is not None else Centroids(lat=np.empty(0), lon=np.empty(0)) ) # following values are defined for each event self.event_id = event_id if event_id is not None else np.array([], int) self.frequency = frequency if frequency is not None else np.array([], float) self.frequency_unit = frequency_unit self.event_name = event_name if event_name is not None else list() self.date = date if date is not None else np.array([], int) self.orig = orig if orig is not None else np.array([], bool) # following values are defined for each event and centroid self.intensity = ( intensity if intensity is not None else sparse.csr_matrix(np.empty((0, 0))) ) # events x centroids self.fraction = ( fraction if fraction is not None else sparse.csr_matrix(self.intensity.shape) ) # events x centroids self.pool = pool if self.pool: LOGGER.info("Using %s CPUs.", self.pool.ncpus)
[docs] def check_matrices(self): """Ensure that matrices are consistently shaped and stored It is good practice to call this method before accessing the ``data`` attribute of either :py:attr:`intensity` or :py:attr:`fraction`. See Also -------- :py:func:`climada.util.checker.prune_csr_matrix` Raises ------ ValueError If matrices are ill-formed or ill-shaped in relation to each other """ # TODO: Check consistency with centroids u_check.prune_csr_matrix(self.intensity) u_check.prune_csr_matrix(self.fraction) if self.fraction.nnz > 0: if self.intensity.shape != self.fraction.shape: raise ValueError( "Intensity and fraction matrices must have the same shape" )
[docs] @classmethod def get_default(cls, attribute): """Get the Hazard type default for a given attribute. Parameters ---------- attribute : str attribute name Returns ------ Any """ return { "frequency_unit": u_const.DEF_FREQ_UNIT, }.get(attribute)
[docs] def check(self): """Check dimension of attributes. Raises ------ ValueError """ self._check_events()
[docs] def reproject_vector(self, dst_crs): """Change current point data to a a given projection Parameters ---------- dst_crs : crs reproject to given crs """ self.centroids.gdf.to_crs(dst_crs, inplace=True) self.check()
[docs] def select( self, event_names=None, event_id=None, date=None, orig=None, reg_id=None, extent=None, reset_frequency=False, ): """Select events matching provided criteria The frequency of events may need to be recomputed (see `reset_frequency`)! Parameters ---------- event_names : list of str, optional Names of events. event_id : list of int, optional Id of events. Default is None. date : array-like of length 2 containing str or int, optional (initial date, final date) in string ISO format ('2011-01-02') or datetime ordinal integer. orig : bool, optional Select only historical (True) or only synthetic (False) events. reg_id : int, optional Region identifier of the centroids' region_id attibute. extent: tuple(float, float, float, float), optional Extent of centroids as (min_lon, max_lon, min_lat, max_lat). The default is None. reset_frequency : bool, optional Change frequency of events proportional to difference between first and last year (old and new). Default: False. Returns ------- haz : Hazard or None If no event matching the specified criteria is found, None is returned. """ # pylint: disable=unidiomatic-typecheck if type(self) is Hazard: haz = Hazard(self.haz_type) else: haz = self.__class__() # filter events sel_ev = np.ones(self.event_id.size, dtype=bool) # filter events by date if date is not None: date_ini, date_end = date if isinstance(date_ini, str): date_ini = u_dt.str_to_date(date[0]) date_end = u_dt.str_to_date(date[1]) sel_ev &= (date_ini <= self.date) & (self.date <= date_end) if not np.any(sel_ev): LOGGER.info("No hazard in date range %s.", date) return None # filter events hist/synthetic if orig is not None: sel_ev &= self.orig.astype(bool) == orig if not np.any(sel_ev): LOGGER.info("No hazard with %s original events.", str(orig)) return None # filter events based on name sel_ev = np.argwhere(sel_ev).reshape(-1) if event_names is not None: filtered_events = [self.event_name[i] for i in sel_ev] try: new_sel = [filtered_events.index(n) for n in event_names] except ValueError as err: name = str(err).replace(" is not in list", "") LOGGER.info("No hazard with name %s", name) return None sel_ev = sel_ev[new_sel] # filter events based on id if event_id is not None: # preserves order of event_id sel_ev = np.array( [ np.argwhere(self.event_id == n)[0, 0] for n in event_id if n in self.event_id[sel_ev] ] ) # filter centroids sel_cen = self.centroids.select_mask(reg_id=reg_id, extent=extent) if not np.any(sel_cen): LOGGER.info("No hazard centroids within extent and region") return None # Sanitize fraction, because we check non-zero entries later self.fraction.eliminate_zeros() # Perform attribute selection for var_name, var_val in self.__dict__.items(): if ( isinstance(var_val, np.ndarray) and var_val.ndim == 1 and var_val.size > 0 ): setattr(haz, var_name, var_val[sel_ev]) elif isinstance(var_val, sparse.csr_matrix): setattr(haz, var_name, var_val[sel_ev, :][:, sel_cen]) elif isinstance(var_val, list) and var_val: setattr(haz, var_name, [var_val[idx] for idx in sel_ev]) elif var_name == "centroids": if reg_id is None and extent is None: new_cent = var_val else: new_cent = var_val.select(sel_cen=sel_cen) setattr(haz, var_name, new_cent) else: setattr(haz, var_name, var_val) # reset frequency if date span has changed (optional): if reset_frequency: if self.frequency_unit not in ["1/year", "annual", "1/y", "1/a"]: LOGGER.warning( "Resetting the frequency is based on the calendar year of given" " dates but the frequency unit here is %s. Consider setting the frequency" " manually for the selection or changing the frequency unit to %s.", self.frequency_unit, u_const.DEF_FREQ_UNIT, ) year_span_old = ( np.abs( dt.datetime.fromordinal(self.date.max()).year - dt.datetime.fromordinal(self.date.min()).year ) + 1 ) year_span_new = ( np.abs( dt.datetime.fromordinal(haz.date.max()).year - dt.datetime.fromordinal(haz.date.min()).year ) + 1 ) haz.frequency = haz.frequency * year_span_old / year_span_new # Check if new fraction is zero everywhere if self._get_fraction() is not None and haz._get_fraction() is None: raise RuntimeError( "Your selection created a Hazard object where the fraction matrix is " "zero everywhere. This hazard will have zero impact everywhere. " "We are catching this condition because of an implementation detail: " "A fraction matrix without nonzero-valued entries will be completely " "ignored. This is surely not what you intended. If you really want to, " "you can circumvent this error by setting your original fraction " "matrix to zero everywhere, but there probably is no point in doing so." ) haz.sanitize_event_ids() return haz
[docs] def select_tight( self, buffer=u_coord.NEAREST_NEIGHBOR_THRESHOLD / u_const.ONE_LAT_KM, val="intensity", ): """ Reduce hazard to those centroids spanning a minimal box which contains all non-zero intensity or fraction points. Parameters ---------- buffer : float, optional Buffer of box in the units of the centroids. The default is approximately equal to the default threshold from the assign_centroids method (works if centroids in lat/lon) val: string, optional Select tight by non-zero 'intensity' or 'fraction'. The default is 'intensity'. Returns ------- Hazard Copy of the Hazard with centroids reduced to minimal box. All other hazard properties are carried over without changes. See also -------- self.select: Method to select centroids by lat/lon extent util.coordinates.match_coordinates: algorithm to match centroids. """ if val == "intensity": cent_nz = (self.intensity != 0).sum(axis=0).nonzero()[1] if val == "fraction": cent_nz = (self.fraction != 0).sum(axis=0).nonzero()[1] lon_nz = self.centroids.lon[cent_nz] lat_nz = self.centroids.lat[cent_nz] return self.select( extent=u_coord.toggle_extent_bounds( u_coord.latlon_bounds(lat=lat_nz, lon=lon_nz, buffer=buffer) ) )
[docs] def local_exceedance_intensity( self, return_periods=(25, 50, 100, 250), method="interpolate", min_intensity=None, log_frequency=True, log_intensity=True, bin_decimals=None, ): """Compute local exceedance intensity for given return periods. The default method is fitting the ordered intensitites per centroid to the corresponding cummulated frequency with linear interpolation on log-log scale. Parameters ---------- return_periods : array_like User-specified return periods for which the exceedance intensity should be calculated locally (at each centroid). Defaults to (25, 50, 100, 250). method : str Method to interpolate to new return periods. Currently available are "interpolate", "extrapolate", "extrapolate_constant" and "stepfunction". If set to "interpolate", return periods outside the range of the Hazard object's observed local return periods will be assigned NaN. If set to "extrapolate_constant" or "stepfunction", return periods larger than the Hazard object's observed local return periods will be assigned the largest local intensity, and return periods smaller than the Hazard object's observed local return periods will be assigned 0. If set to "extrapolate", local exceedance intensities will be extrapolated (and interpolated). The extrapolation to large return periods uses the two highest intensites of the centroid and their return periods and extends the interpolation between these points to the given return period (similar for small return periods). Defauls to "interpolate". min_intensity : float, optional Minimum threshold to filter the hazard intensity. If set to None, self.intensity_thres will be used. Defaults to None. log_frequency : bool, optional If set to True, (cummulative) frequency values are converted to log scale before inter- and extrapolation. Defaults to True. log_intensity : bool, optional If set to True, intensity values are converted to log scale before inter- and extrapolation. Defaults to True. bin_decimals : int, optional Number of decimals to group and bin intensity values. Binning results in smoother (and coarser) interpolation and more stable extrapolation. For more details and sensible values for bin_decimals, see Notes. If None, values are not binned. Defaults to None. Returns ------- gdf : gpd.GeoDataFrame GeoDataFrame containing exeedance intensities for given return periods. Each column corresponds to a return period, each row corresponds to a centroid. Values in the gdf correspond to the exceedance intensity for the given centroid and return period label : str GeoDataFrame label, for reporting and plotting column_label : function Column-label-generating function, for reporting and plotting See Also -------- util.interpolation.preprocess_and_interpolate_ev : inter- and extrapolation method Notes ------- If an integer bin_decimals is given, the intensity values are binned according to their bin_decimals decimals, and their corresponding frequencies are summed. This binning leads to a smoother (and coarser) interpolation, and a more stable extrapolation. For instance, if bin_decimals=1, the two values 12.01 and 11.97 with corresponding frequencies 0.1 and 0.2 are combined to a value 12.0 with frequency 0.3. The default bin_decimals=None results in not binning the values. E.g., if your intensities range from 1 to 100, you could use bin_decimals=1, if your intensities range from 1e6 to 1e9, you could use bin_decimals=-5, if your intensities range from 0.0001 to .01, you could use bin_decimals=5. """ if not min_intensity and min_intensity != 0: min_intensity = self.intensity_thres # check frequency unit return_period_unit = u_dt.convert_frequency_unit_to_time_unit( self.frequency_unit ) # check method if method not in [ "interpolate", "extrapolate", "extrapolate_constant", "stepfunction", ]: raise ValueError(f"Unknown method: {method}") # calculate local exceedance intensity test_frequency = 1 / np.array(return_periods) exceedance_intensity = np.full( (self.intensity.shape[1], len(test_frequency)), np.nan if method == "interpolate" else 0.0, ) nonzero_centroids = np.where(self.intensity.getnnz(axis=0) > 0)[0] if not len(nonzero_centroids) == 0: exceedance_intensity[nonzero_centroids, :] = np.array( [ u_interp.preprocess_and_interpolate_ev( test_frequency, None, self.frequency, self.intensity.getcol(i_centroid).toarray().flatten(), log_frequency=log_frequency, log_values=log_intensity, value_threshold=min_intensity, method=method, y_asymptotic=0.0, bin_decimals=bin_decimals, ) for i_centroid in nonzero_centroids ] ) # create the output GeoDataFrame gdf = gpd.GeoDataFrame( geometry=self.centroids.gdf["geometry"], crs=self.centroids.gdf.crs ) column_names = [f"{rp}" for rp in return_periods] gdf[column_names] = exceedance_intensity # create label and column_label label = f"Intensity ({self.units})" def column_label(column_names): return [ f"Return Period: {col} {return_period_unit}" for col in column_names ] return gdf, label, column_label
[docs] @deprecated( details="The use of Hazard.local_exceedance_inten is deprecated. Use " "Hazard.local_exceedance_intensity instead. Some errors in the previous calculation " "in Hazard.local_exceedance_inten have been corrected. To reproduce data with the " "previous calculation, use CLIMADA v5.0.0 or less." ) def local_exceedance_inten(self, return_period=(25, 50, 100, 250)): """This function is deprecated, use Hazard.local_exceedance_intensity instead.""" return ( self.local_exceedance_intensity(return_period)[0] .values[:, 1:] .T.astype(float) )
[docs] def sanitize_event_ids(self): """Make sure that event ids are unique""" if np.unique(self.event_id).size != self.event_id.size: LOGGER.debug("Resetting event_id.") self.event_id = np.arange(1, self.event_id.size + 1)
[docs] def local_return_period( self, threshold_intensities=(10.0, 20.0), method="interpolate", min_intensity=None, log_frequency=True, log_intensity=True, bin_decimals=None, ): """Compute local return periods for given hazard intensities. The default method is fitting the ordered intensitites per centroid to the corresponding cummulated frequency with linear interpolation on log-log scale. Parameters ---------- threshold_intensities : array_like User-specified hazard intensities for which the return period should be calculated locally (at each centroid). Defaults to (10, 20) method : str Method to interpolate to new threshold intensities. Currently available are "interpolate", "extrapolate", "extrapolate_constant" and "stepfunction". If set to "interpolate", threshold intensities outside the range of the Hazard object's local intensities will be assigned NaN. If set to "extrapolate_constant" or "stepfunction", threshold intensities larger than the Hazard object's local intensities will be assigned NaN, and threshold intensities smaller than the Hazard object's local intensities will be assigned the smallest observed local return period. If set to "extrapolate", local return periods will be extrapolated (and interpolated). The extrapolation to large threshold intensities uses the two highest intensites of the centroid and their return periods and extends the interpolation between these points to the given threshold intensity (similar for small threshold intensites). Defaults to "interpolate". min_intensity : float, optional Minimum threshold to filter the hazard intensity. If set to None, self.intensity_thres will be used. Defaults to None. log_frequency : bool, optional If set to True, (cummulative) frequency values are converted to log scale before inter- and extrapolation. Defaults to True. log_intensity : bool, optional If set to True, intensity values are converted to log scale before inter- and extrapolation. Defaults to True. bin_decimals : int, optional Number of decimals to group and bin intensity values. Binning results in smoother (and coarser) interpolation and more stable extrapolation. For more details and sensible values for bin_decimals, see Notes. If None, values are not binned. Defaults to None. Returns ------- gdf : gpd.GeoDataFrame GeoDataFrame containing return periods for given threshold intensities. Each column corresponds to a threshold_intensity value, each row corresponds to a centroid. Values in the gdf correspond to the return period for the given centroid and threshold_intensity value label : str GeoDataFrame label, for reporting and plotting column_label : function Column-label-generating function, for reporting and plotting See Also -------- util.interpolation.preprocess_and_interpolate_ev : inter- and extrapolation method Notes ------- If an integer bin_decimals is given, the intensity values are binned according to their bin_decimals decimals, and their corresponding frequencies are summed. This binning leads to a smoother (and coarser) interpolation, and a more stable extrapolation. For instance, if bin_decimals=1, the two values 12.01 and 11.97 with corresponding frequencies 0.1 and 0.2 are combined to a value 12.0 with frequency 0.3. The default bin_decimals=None results in not binning the values. E.g., if your intensities range from 1 to 100, you could use bin_decimals=1, if your intensities range from 1e6 to 1e9, you could use bin_decimals=-5, if your intensities range from 0.0001 to .01, you could use bin_decimals=5. """ if not min_intensity and min_intensity != 0: min_intensity = self.intensity_thres # check frequency unit return_period_unit = u_dt.convert_frequency_unit_to_time_unit( self.frequency_unit ) # check method if method not in [ "interpolate", "extrapolate", "extrapolate_constant", "stepfunction", ]: raise ValueError(f"Unknown method: {method}") return_periods = np.full( (self.intensity.shape[1], len(threshold_intensities)), np.nan ) nonzero_centroids = np.where(self.intensity.getnnz(axis=0) > 0)[0] if not len(nonzero_centroids) == 0: return_periods[nonzero_centroids, :] = np.array( [ u_interp.preprocess_and_interpolate_ev( None, np.array(threshold_intensities), self.frequency, self.intensity.getcol(i_centroid).toarray().flatten(), log_frequency=log_frequency, log_values=log_intensity, value_threshold=min_intensity, method=method, y_asymptotic=np.nan, bin_decimals=bin_decimals, ) for i_centroid in nonzero_centroids ] ) return_periods = safe_divide(1.0, return_periods) # create the output GeoDataFrame gdf = gpd.GeoDataFrame( geometry=self.centroids.gdf["geometry"], crs=self.centroids.gdf.crs ) col_names = [f"{tresh_inten}" for tresh_inten in threshold_intensities] gdf[col_names] = return_periods # create label and column_label label = f"Return Periods ({return_period_unit})" def column_label(column_names): return [f"Threshold Intensity: {col} {self.units}" for col in column_names] return gdf, label, column_label
[docs] def get_event_id(self, event_name): """Get an event id from its name. Several events might have the same name. Parameters ---------- event_name: str Event name Returns ------- list_id: np.array(int) """ list_id = self.event_id[ [ i_name for i_name, val_name in enumerate(self.event_name) if val_name == event_name ] ] if list_id.size == 0: raise ValueError(f"No event with name: {event_name}") return list_id
[docs] def get_event_name(self, event_id): """Get the name of an event id. Parameters ---------- event_id: int id of the event Returns ------- str Raises ------ ValueError """ try: return self.event_name[np.argwhere(self.event_id == event_id)[0][0]] except IndexError as err: raise ValueError(f"No event with id: {event_id}") from err
[docs] def get_event_date(self, event=None): """Return list of date strings for given event or for all events, if no event provided. Parameters ---------- event: str or int, optional event name or id. Returns ------- l_dates: list(str) """ if event is None: l_dates = [u_dt.date_to_str(date) for date in self.date] elif isinstance(event, str): ev_ids = self.get_event_id(event) l_dates = [ u_dt.date_to_str(self.date[np.argwhere(self.event_id == ev_id)[0][0]]) for ev_id in ev_ids ] else: ev_idx = np.argwhere(self.event_id == event)[0][0] l_dates = [u_dt.date_to_str(self.date[ev_idx])] return l_dates
[docs] def calc_year_set(self): """From the dates of the original events, get number yearly events. Returns ------- orig_yearset: dict key are years, values array with event_ids of that year """ orig_year = np.array( [dt.datetime.fromordinal(date).year for date in self.date[self.orig]] ) orig_yearset = {} for year in np.unique(orig_year): orig_yearset[year] = self.event_id[self.orig][orig_year == year] return orig_yearset
[docs] def remove_duplicates(self): """Remove duplicate events (events with same name and date).""" events = list(zip(self.event_name, self.date)) set_ev = set(events) if len(set_ev) == self.event_id.size: return unique_pos = sorted([events.index(event) for event in set_ev]) for var_name, var_val in vars(self).items(): if isinstance(var_val, sparse.csr_matrix): setattr(self, var_name, var_val[unique_pos, :]) elif isinstance(var_val, np.ndarray) and var_val.ndim == 1: setattr(self, var_name, var_val[unique_pos]) elif isinstance(var_val, list) and len(var_val) > 0: setattr(self, var_name, [var_val[p] for p in unique_pos])
[docs] def set_frequency(self, yearrange=None): """Set hazard frequency from yearrange or intensity matrix. Parameters ---------- yearrange: tuple or list, optional year range to be used to compute frequency per event. If yearrange is not given (None), the year range is derived from self.date """ if self.frequency_unit not in ["1/year", "annual", "1/y", "1/a"]: LOGGER.warning( "setting the frequency on a hazard object who's frequency unit" "is %s and not %s will most likely lead to unexpected results", self.frequency_unit, u_const.DEF_FREQ_UNIT, ) if not yearrange: delta_time = ( dt.datetime.fromordinal(int(np.max(self.date))).year - dt.datetime.fromordinal(int(np.min(self.date))).year + 1 ) else: delta_time = max(yearrange) - min(yearrange) + 1 num_orig = self.orig.nonzero()[0].size if num_orig > 0: ens_size = self.event_id.size / num_orig else: ens_size = 1 self.frequency = np.ones(self.event_id.size) / delta_time / ens_size
@property def size(self): """Return number of events.""" return self.event_id.size def _events_set(self): """Generate set of tuples with (event_name, event_date)""" ev_set = set() for ev_name, ev_date in zip(self.event_name, self.date): ev_set.add((ev_name, ev_date)) return ev_set def _check_events(self): """Check that all attributes but centroids contain consistent data. Put default date, event_name and orig if not provided. Check not repeated events (i.e. with same date and name) Raises ------ ValueError """ num_ev = len(self.event_id) num_cen = self.centroids.size if np.unique(self.event_id).size != num_ev: raise ValueError("There are events with the same identifier.") u_check.check_obligatories( self.__dict__, self.vars_oblig, "Hazard.", num_ev, num_ev, num_cen ) u_check.check_optionals(self.__dict__, self.vars_opt, "Hazard.", num_ev) self.event_name = u_check.array_default( num_ev, self.event_name, "Hazard.event_name", list(self.event_id) ) self.date = u_check.array_default( num_ev, self.date, "Hazard.date", np.ones(self.event_id.shape, dtype=int) ) self.orig = u_check.array_default( num_ev, self.orig, "Hazard.orig", np.zeros(self.event_id.shape, dtype=bool) ) if len(self._events_set()) != num_ev: raise ValueError("There are events with same date and name.")
[docs] def append(self, *others): """Append the events and centroids to this hazard object. All of the given hazards must be of the same type and use the same units as self. The centroids of all hazards must have the same CRS. The following kinds of object attributes are processed: - All centroids are combined together using `Centroids.union`. - Lists, 1-dimensional arrays (NumPy) and sparse CSR matrices (SciPy) are concatenated. Sparse matrices are concatenated along the first (vertical) axis. For any other type of attribute: A ValueError is raised if an attribute of that name is not defined in all of the non-empty hazards at least. However, there is no check that the attribute value is identical among the given hazard objects. The initial attribute value of `self` will not be modified. Note: Each of the hazard's `centroids` attributes might be modified in place in the sense that missing properties are added, but existing ones are not overwritten. In case of raster centroids, conversion to point centroids is applied so that raster information (meta) is lost. For more information, see `Centroids.union`. Parameters ---------- others : one or more climada.hazard.Hazard objects Hazard instances to append to self Raises ------ TypeError, ValueError See Also -------- Hazard.concat : concatenate 2 or more hazards Centroids.union : combine centroids """ # pylint: disable=no-member, protected-access if len(others) == 0: return haz_list = [self] + list(others) haz_list_nonempty = [haz for haz in haz_list if haz.size > 0] for haz in haz_list: haz._check_events() # check type, unit, and attribute consistency among hazards haz_types = {haz.haz_type for haz in haz_list if haz.haz_type != ""} if len(haz_types) > 1: raise ValueError( f"The given hazards are of different types: {haz_types}. " "The hazards are incompatible and cannot be concatenated." ) self.haz_type = haz_types.pop() haz_classes = {haz.__class__.__name__ for haz in haz_list} if len(haz_classes) > 1: raise TypeError( f"The given hazards are of different classes: {haz_classes}. " "The hazards are incompatible and cannot be concatenated." ) freq_units = {haz.frequency_unit for haz in haz_list} if len(freq_units) > 1: raise ValueError( f"The given hazards have different frequency units: {freq_units}. " "The hazards are incompatible and cannot be concatenated." ) self.frequency_unit = freq_units.pop() units = {haz.units for haz in haz_list if haz.units != ""} if len(units) > 1: raise ValueError( f"The given hazards use different units: {units}. " "The hazards are incompatible and cannot be concatenated." ) if len(units) == 0: units = {""} self.units = units.pop() attributes = sorted(set.union(*[set(vars(haz).keys()) for haz in haz_list])) for attr_name in attributes: if not all(hasattr(haz, attr_name) for haz in haz_list_nonempty): raise ValueError( f"Attribute {attr_name} is not shared by all hazards. " "The hazards are incompatible and cannot be concatenated." ) # map individual centroids objects to union centroids = Centroids.union(*[haz.centroids for haz in haz_list]) hazcent_in_cent_idx_list = [ u_coord.match_coordinates(haz.centroids.coord, centroids.coord, threshold=0) for haz in haz_list_nonempty ] # concatenate array and list attributes of non-empty hazards for attr_name in attributes: attr_val_list = [getattr(haz, attr_name) for haz in haz_list_nonempty] if isinstance(attr_val_list[0], sparse.csr_matrix): # map sparse matrix onto centroids setattr( self, attr_name, sparse.vstack( [ sparse.csr_matrix( (matrix.data, cent_idx[matrix.indices], matrix.indptr), shape=(matrix.shape[0], centroids.size), ) for matrix, cent_idx in zip( attr_val_list, hazcent_in_cent_idx_list ) ], format="csr", ), ) elif ( isinstance(attr_val_list[0], np.ndarray) and attr_val_list[0].ndim == 1 ): setattr(self, attr_name, np.hstack(attr_val_list)) elif isinstance(attr_val_list[0], list): setattr(self, attr_name, sum(attr_val_list, [])) self.centroids = centroids self.sanitize_event_ids()
[docs] @classmethod def concat(cls, haz_list): """ Concatenate events of several hazards of same type. This function creates a new hazard of the same class as the first hazard in the given list and then applies the `append` method. Please refer to the docs of `Hazard.append` for caveats and limitations of the concatenation procedure. For centroids, lists, arrays and sparse matrices, the remarks in `Hazard.append` apply. All other attributes are copied from the first object in `haz_list`. Note that `Hazard.concat` can be used to concatenate hazards of a subclass. The result's type will be the subclass. However, calling `concat([])` (with an empty list) is equivalent to instantiation without init parameters. So, `Hazard.concat([])` is equivalent to `Hazard()`. If `HazardB` is a subclass of `Hazard`, then `HazardB.concat([])` is equivalent to `HazardB()` (unless `HazardB` overrides the `concat` method). Parameters ---------- haz_list : list of climada.hazard.Hazard objects Hazard instances of the same hazard type (subclass). Returns ------- haz_concat : instance of climada.hazard.Hazard This will be of the same type (subclass) as all the hazards in `haz_list`. See Also -------- Hazard.append : append hazards to a hazard in place Centroids.union : combine centroids """ if len(haz_list) == 0: return cls() haz_concat = haz_list[0].__class__( centroids=Centroids(lat=[], lon=[], crs=haz_list[0].centroids.crs) ) for attr_name, attr_val in vars(haz_list[0]).items(): # to save memory, only copy simple attributes like # "units" that are not explicitly handled by Hazard.append if not ( isinstance(attr_val, (list, np.ndarray, sparse.csr_matrix)) or attr_name in ["centroids"] ): setattr(haz_concat, attr_name, copy.deepcopy(attr_val)) haz_concat.append(*haz_list) return haz_concat
[docs] def change_centroids(self, centroids, threshold=u_coord.NEAREST_NEIGHBOR_THRESHOLD): """ Assign (new) centroids to hazard. Centoids of the hazard not in centroids are mapped onto the nearest point. Fails if a point is further than threshold from the closest centroid. The centroids must have the same CRS as self.centroids. Parameters ---------- haz: Hazard Hazard instance centroids: Centroids Centroids instance on which to map the hazard. threshold: int or float Threshold (in km) for mapping haz.centroids not in centroids. Argument is passed to climada.util.coordinates.match_coordinates. Default: 100 (km) Returns ------- haz_new_cent: Hazard Hazard projected onto centroids Raises ------ ValueError See Also -------- util.coordinates.match_coordinates: algorithm to match centroids. """ # define empty hazard haz_new_cent = copy.deepcopy(self) haz_new_cent.centroids = centroids new_cent_idx = u_coord.match_coordinates( self.centroids.coord, centroids.coord, threshold=threshold ) if -1 in new_cent_idx: raise ValueError( "At least one hazard centroid is at a larger distance than the given threshold" f" {threshold} from the given centroids. Please choose a larger threshold or" " enlarge the centroids" ) if np.unique(new_cent_idx).size < new_cent_idx.size: raise ValueError( "At least two hazard centroids are mapped to the same centroids. Please make sure" " that the given centroids cover the same area like the original centroids and are" " not of lower resolution." ) # re-assign attributes intensity and fraction for attr_name in ["intensity", "fraction"]: matrix = getattr(self, attr_name) setattr( haz_new_cent, attr_name, sparse.csr_matrix( (matrix.data, new_cent_idx[matrix.indices], matrix.indptr), shape=(matrix.shape[0], centroids.size), ), ) return haz_new_cent
@property def centr_exp_col(self): """ Name of the centroids columns for this hazard in an exposures Returns ------- String centroids string indicator with hazard type defining column in an exposures gdf. E.g. "centr_TC" """ from climada.entity.exposures import ( INDICATOR_CENTR, # pylint: disable=import-outside-toplevel ) # import outside toplevel is necessary for it not being circular return INDICATOR_CENTR + self.haz_type
[docs] def get_mdr(self, cent_idx, impf): """ Return Mean Damage Ratio (mdr) for chosen centroids (cent_idx) for given impact function. Parameters ---------- cent_idx : array-like array of indices of chosen centroids from hazard impf : ImpactFunc impact function to compute mdr Returns ------- sparse.csr_matrix sparse matrix (n_events x len(cent_idx)) with mdr values See Also -------- get_paa: get the paa ffor the given centroids """ uniq_cent_idx, indices = np.unique(cent_idx, return_inverse=True) mdr = self.intensity[:, uniq_cent_idx] if impf.calc_mdr(0) == 0: mdr.data = impf.calc_mdr(mdr.data) else: LOGGER.warning( "Impact function id=%d has mdr(0) != 0." "The mean damage ratio must thus be computed for all values of" "hazard intensity including 0 which can be very time consuming.", impf.id, ) mdr_array = impf.calc_mdr(mdr.toarray().ravel()).reshape(mdr.shape) mdr = sparse.csr_matrix(mdr_array) mdr_out = mdr[:, indices] mdr_out.eliminate_zeros() return mdr_out
[docs] def get_paa(self, cent_idx, impf): """ Return Percentage of Affected Assets (paa) for chosen centroids (cent_idx) for given impact function. Note that value as intensity = 0 are ignored. This is different from get_mdr. Parameters ---------- cent_idx : array-like array of indices of chosen centroids from hazard impf : ImpactFunc impact function to compute mdr Returns ------- sparse.csr_matrix sparse matrix (n_events x len(cent_idx)) with paa values See Also -------- get_mdr: get the mean-damage ratio for the given centroids """ uniq_cent_idx, indices = np.unique(cent_idx, return_inverse=True) paa = self.intensity[:, uniq_cent_idx] paa.data = np.interp(paa.data, impf.intensity, impf.paa) return paa[:, indices]
def _get_fraction(self, cent_idx=None): """ Return fraction for chosen centroids (cent_idx). Parameters ---------- cent_idx : array-like array of indices of chosen centroids from hazard Default is None (full fraction is returned) Returns ------- sparse.csr_matrix or None sparse matrix (n_events x len(cent_idx)) with fraction values None if fraction is empty. (When calculating the impact, an empty fraction is equivalent to the identity under multiplication, i.e. a uniform matrix with value 1 everywhere.) """ if self.fraction.nnz == 0: return None if cent_idx is None: return self.fraction return self.fraction[:, cent_idx]