"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Define Impact and ImpactFreqCurve classes.
"""
__all__ = ["ImpactFreqCurve", "Impact"]
import copy
import csv
import datetime as dt
import logging
import warnings
from collections.abc import Collection
from dataclasses import dataclass, field
from itertools import zip_longest
from pathlib import Path
from typing import Any, Iterable, Union
import contextily as ctx
import geopandas as gpd
import h5py
import matplotlib.animation as animation
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import xlsxwriter
from deprecation import deprecated
from matplotlib.colors import Normalize
from pandas.api.types import is_string_dtype
from pyproj import CRS as pyprojCRS
from rasterio.crs import CRS as rasterioCRS # pylint: disable=no-name-in-module
from scipy import sparse
from tqdm import tqdm
import climada.util.coordinates as u_coord
import climada.util.dates_times as u_dt
import climada.util.interpolation as u_interp
import climada.util.plot as u_plot
from climada.entity import Exposures
from climada.util.constants import CMAP_IMPACT, DEF_CRS, DEF_FREQ_UNIT
from climada.util.select import get_attributes_with_matching_dimension
from climada.util.value_representation import safe_divide
LOGGER = logging.getLogger(__name__)
[docs]
class Impact:
"""Impact definition. Compute from an entity (exposures and impact
functions) and hazard.
Attributes
----------
event_id : np.array
id (>0) of each hazard event
event_name : list
list name of each hazard event
date : np.array
date if events as integer date corresponding to the
proleptic Gregorian ordinal, where January 1 of year 1 has
ordinal 1 (ordinal format of datetime library)
coord_exp : np.array
exposures coordinates [lat, lon] (in degrees)
crs : str
WKT string of the impact's crs
eai_exp : np.array
expected impact for each exposure within a period of 1/frequency_unit
at_event : np.array
impact for each hazard event
frequency : np.array
frequency of event
frequency_unit : str
frequency unit used (given by hazard), default is '1/year'
aai_agg : float
average impact within a period of 1/frequency_unit (aggregated)
unit : str
value unit used (given by exposures unit)
imp_mat : sparse.csr_matrix
matrix num_events x num_exp with impacts.
only filled if save_mat is True in calc()
haz_type : str
the hazard type of the hazard
"""
[docs]
def __init__(
self,
event_id=None,
event_name=None,
date=None,
frequency=None,
frequency_unit=DEF_FREQ_UNIT,
coord_exp=None,
crs=DEF_CRS,
eai_exp=None,
at_event=None,
tot_value=0.0,
aai_agg=0.0,
unit="",
imp_mat=None,
haz_type="",
):
"""
Init Impact object
Parameters
----------
event_id : np.array, optional
id (>0) of each hazard event
event_name : list, optional
list name of each hazard event
date : np.array, optional
date if events as integer date corresponding to the
proleptic Gregorian ordinal, where January 1 of year 1 has
ordinal 1 (ordinal format of datetime library)
frequency : np.array, optional
frequency of event impact for each hazard event
frequency_unit : np.array, optional
frequency unit, default: '1/year'
coord_exp : np.array, optional
exposures coordinates [lat, lon] (in degrees)
crs : Any, optional
Coordinate reference system. CRS instances from ``pyproj`` and ``rasterio``
will be transformed into WKT. Other types are not handled explicitly.
eai_exp : np.array, optional
expected impact for each exposure within a period of 1/frequency_unit
at_event : np.array, optional
impact for each hazard event
tot_value : float, optional
total exposure value affected
aai_agg : float, optional
average impact within a period of 1/frequency_unit (aggregated)
unit : str, optional
value unit used (given by exposures unit)
imp_mat : sparse.csr_matrix, optional
matrix num_events x num_exp with impacts.
haz_type : str, optional
the hazard type
"""
self.haz_type = haz_type
self.event_id = np.array([], int) if event_id is None else event_id
self.event_name = [] if event_name is None else event_name
self.date = np.array([], int) if date is None else date
self.coord_exp = np.array([], float) if coord_exp is None else coord_exp
self.crs = crs.to_wkt() if isinstance(crs, (pyprojCRS, rasterioCRS)) else crs
self.eai_exp = np.array([], float) if eai_exp is None else eai_exp
self.at_event = np.array([], float) if at_event is None else at_event
self.frequency = np.array([], float) if frequency is None else frequency
self.frequency_unit = frequency_unit
self._tot_value = tot_value
self.aai_agg = aai_agg
self.unit = unit
if len(self.event_id) != len(self.event_name):
raise AttributeError(
f"Hazard event ids {len(self.event_id)} and event names"
f" {len(self.event_name)} are not of the same length"
)
if len(self.event_id) != len(self.date):
raise AttributeError(
f"Hazard event ids {len(self.event_id)} and event dates"
f" {len(self.date)} are not of the same length"
)
if len(self.event_id) != len(self.frequency):
raise AttributeError(
f"Hazard event ids {len(self.event_id)} and event frequency"
f" {len(self.frequency)} are not of the same length"
)
if len(self.event_id) != len(self.at_event):
raise AttributeError(
f"Number of hazard event ids {len(self.event_id)} is different "
f"from number of at_event values {len(self.at_event)}"
)
if len(self.coord_exp) != len(self.eai_exp):
raise AttributeError(
"Number of exposures points is different from"
"number of eai_exp values"
)
if imp_mat is not None:
self.imp_mat = imp_mat
if self.imp_mat.size > 0:
if len(self.event_id) != self.imp_mat.shape[0]:
raise AttributeError(
f"The number of rows {imp_mat.shape[0]} of the impact "
+ f"matrix is inconsistent with the number {len(event_id)} "
"of hazard events."
)
if len(self.coord_exp) != self.imp_mat.shape[1]:
raise AttributeError(
f"The number of columns {imp_mat.shape[1]} of the impact"
+ f" matrix is inconsistent with the number {len(coord_exp)}"
" exposures points."
)
else:
self.imp_mat = sparse.csr_matrix(np.empty((0, 0)))
[docs]
def calc(
self, exposures, impact_funcs, hazard, save_mat=False, assign_centroids=True
):
"""This function is deprecated, use ``ImpactCalc.impact`` instead."""
LOGGER.warning(
"The use of Impact().calc() is deprecated."
" Use ImpactCalc().impact() instead."
)
from climada.engine.impact_calc import ( # pylint: disable=import-outside-toplevel
ImpactCalc,
)
impcalc = ImpactCalc(exposures, impact_funcs, hazard)
self.__dict__ = impcalc.impact(
save_mat=save_mat, assign_centroids=assign_centroids
).__dict__
# TODO: new name
[docs]
@classmethod
def from_eih(cls, exposures, hazard, at_event, eai_exp, aai_agg, imp_mat=None):
"""
Set Impact attributes from precalculated impact metrics.
.. versionchanged:: 3.3
The ``impfset`` argument was removed.
Parameters
----------
exposures : climada.entity.Exposures
exposure used to compute imp_mat
impfset: climada.entity.ImpactFuncSet
impact functions set used to compute imp_mat
hazard : climada.Hazard
hazard used to compute imp_mat
at_event : np.array
impact for each hazard event
eai_exp : np.array
expected impact for each exposure within a period of 1/frequency_unit
aai_agg : float
average impact within a period of 1/frequency_unit (aggregated)
imp_mat : sparse.csr_matrix, optional
matrix num_events x num_exp with impacts.
Default is None (empty sparse csr matrix).
Returns
-------
climada.engine.impact.Impact
impact with all risk metrics set based on the given impact matrix
"""
return cls(
event_id=hazard.event_id,
event_name=hazard.event_name,
date=hazard.date,
frequency=hazard.frequency,
frequency_unit=hazard.frequency_unit,
coord_exp=np.stack([exposures.latitude, exposures.longitude], axis=1),
crs=exposures.crs,
unit=exposures.value_unit,
tot_value=exposures.centroids_total_value(hazard),
eai_exp=eai_exp,
at_event=at_event,
aai_agg=aai_agg,
imp_mat=imp_mat if imp_mat is not None else sparse.csr_matrix((0, 0)),
haz_type=hazard.haz_type,
)
@property
def tot_value(self):
"""Return the total exposure value close to a hazard
.. deprecated:: 3.3
Use :py:meth:`climada.entity.exposures.base.Exposures.affected_total_value`
instead.
"""
LOGGER.warning(
"The Impact.tot_value attribute is deprecated."
"Use Exposures.affected_total_value to calculate the affected "
"total exposure value based on a specific hazard intensity "
"threshold"
)
return self._tot_value
@tot_value.setter
def tot_value(self, value):
"""Set the total exposure value close to a hazard"""
LOGGER.warning(
"The Impact.tot_value attribute is deprecated."
"Use Exposures.affected_total_value to calculate the affected "
"total exposure value based on a specific hazard intensity "
"threshold"
)
self._tot_value = value
[docs]
def transfer_risk(self, attachment, cover):
"""Compute the risk transfer for the full portfolio. This is the risk
of the full portfolio summed over all events. For each
event, the transfered risk amounts to the impact minus the attachment
(but maximally equal to the cover) multiplied with the probability
of the event.
Parameters
----------
attachment : float
attachment per event for entire portfolio.
cover : float
cover per event for entire portfolio.
Returns
-------
transfer_at_event : np.array
risk transfered per event
transfer_aai_agg : float
average risk within a period of 1/frequency_unit, transfered
"""
transfer_at_event = np.minimum(np.maximum(self.at_event - attachment, 0), cover)
transfer_aai_agg = np.sum(transfer_at_event * self.frequency)
return transfer_at_event, transfer_aai_agg
[docs]
def residual_risk(self, attachment, cover):
"""Compute the residual risk after application of insurance
attachment and cover to entire portfolio. This is the residual risk
of the full portfolio summed over all events. For each
event, the residual risk is obtained by subtracting the transfered risk
from the trom the total risk per event.
of the event.
Parameters
----------
attachment : float
attachment per event for entire portfolio.
cover : float
cover per event for entire portfolio.
Returns
-------
residual_at_event : np.array
residual risk per event
residual_aai_agg : float
average residual risk within a period of 1/frequency_unit
See also
--------
transfer_risk: compute the transfer risk per portfolio.
"""
transfer_at_event, _ = self.transfer_risk(attachment, cover)
residual_at_event = np.maximum(self.at_event - transfer_at_event, 0)
residual_aai_agg = np.sum(residual_at_event * self.frequency)
return residual_at_event, residual_aai_agg
# TODO: rewrite and deprecate method
[docs]
def calc_risk_transfer(self, attachment, cover):
"""Compute traaditional risk transfer over impact. Returns new impact
with risk transfer applied and the insurance layer resulting
Impact metrics.
Parameters
----------
attachment : float
(deductible)
cover : float
Returns
-------
climada.engine.impact.Impact
"""
new_imp = copy.deepcopy(self)
if attachment or cover:
imp_layer = np.minimum(np.maximum(new_imp.at_event - attachment, 0), cover)
new_imp.at_event = np.maximum(new_imp.at_event - imp_layer, 0)
new_imp.aai_agg = np.sum(new_imp.at_event * new_imp.frequency)
# next values are no longer valid
new_imp.eai_exp = np.array([])
new_imp.coord_exp = np.array([])
new_imp.imp_mat = sparse.csr_matrix((0, 0))
# insurance layer metrics
risk_transfer = copy.deepcopy(new_imp)
risk_transfer.at_event = imp_layer
risk_transfer.aai_agg = np.sum(imp_layer * new_imp.frequency)
return new_imp, risk_transfer
return new_imp, Impact()
[docs]
def impact_per_year(self, all_years=True, year_range=None):
"""Calculate yearly impact from impact data.
Note: the impact in a given year is summed over all events.
Thus, the impact in a given year can be larger than the
total affected exposure value.
Parameters
----------
all_years : boolean, optional
return values for all years between first and
last year with event, including years without any events.
Default: True
year_range : tuple or list with integers, optional
start and end year
Returns
-------
year_set : dict
Key=year, value=Summed impact per year.
"""
if year_range is None:
year_range = []
orig_year = np.array([dt.datetime.fromordinal(date).year for date in self.date])
if orig_year.size == 0 and len(year_range) == 0:
return dict()
if orig_year.size == 0 or (len(year_range) > 0 and all_years):
years = np.arange(min(year_range), max(year_range) + 1)
elif all_years:
years = np.arange(min(orig_year), max(orig_year) + 1)
else:
years = np.array(sorted(np.unique(orig_year)))
if not len(year_range) == 0:
years = years[years >= min(year_range)]
years = years[years <= max(year_range)]
year_set = dict()
for year in years:
year_set[year] = sum(self.at_event[orig_year == year])
return year_set
[docs]
def impact_at_reg(self, agg_regions=None):
"""Aggregate impact on given aggregation regions. This method works
only if Impact.imp_mat was stored during the impact calculation.
Parameters
----------
agg_regions : np.array, list (optional)
The length of the array must equal the number of centroids in exposures.
It reports what macro-regions these centroids belong to. For example,
asuming there are three centroids and agg_regions = ['A', 'A', 'B']
then impact of the first and second centroids will be assigned to
region A, whereas impact from the third centroid will be assigned
to area B. If no aggregation regions are passed, the method aggregates
impact at the country (admin_0) level.
Default is None.
Returns
-------
pd.DataFrame
Contains the aggregated data per event.
Rows: Hazard events. Columns: Aggregation regions.
"""
if np.prod(self.imp_mat.shape) == 0:
raise ValueError(
"The aggregated impact cannot be computed as no Impact.imp_mat was "
"stored during the impact calculation"
)
if agg_regions is None:
agg_regions = u_coord.country_to_iso(
u_coord.get_country_code(self.coord_exp[:, 0], self.coord_exp[:, 1])
)
agg_regions = np.asanyarray(agg_regions)
agg_reg_unique = np.unique(agg_regions)
at_reg_event = np.hstack(
[
self.imp_mat[:, np.where(agg_regions == reg)[0]].sum(1)
for reg in agg_reg_unique
]
)
at_reg_event = pd.DataFrame(
at_reg_event, columns=agg_reg_unique, index=self.event_id
)
return at_reg_event
[docs]
def calc_impact_year_set(self, all_years=True, year_range=None):
"""This function is deprecated, use Impact.impact_per_year instead."""
LOGGER.warning(
"The use of Impact.calc_impact_year_set is deprecated."
"Use Impact.impact_per_year instead."
)
return self.impact_per_year(all_years=all_years, year_range=year_range)
[docs]
def local_exceedance_impact(
self,
return_periods=(25, 50, 100, 250),
method="interpolate",
min_impact=0,
log_frequency=True,
log_impact=True,
bin_decimals=None,
):
"""Compute local exceedance impact for given return periods. The default method
is fitting the ordered impacts per centroid to the corresponding cummulated
frequency with linear interpolation on log-log scale.
Parameters
----------
return_periods : array_like
User-specified return periods for which the exceedance intensity should be calculated
locally (at each centroid). Defaults to (25, 50, 100, 250).
method : str
Method to interpolate to new return periods. Currently available are "interpolate",
"extrapolate", "extrapolate_constant" and "stepfunction". If set to "interpolate",
return periods outside the range of the Impact object's observed local return periods
will be assigned NaN. If set to "extrapolate_constant" or "stepfunction",
return periods larger than the Impact object's observed local return periods will be
assigned the largest local impact, and return periods smaller than the Impact object's
observed local return periods will be assigned 0. If set to "extrapolate", local
exceedance impacts will be extrapolated (and interpolated). The extrapolation to
large return periods uses the two highest impacts of the centroid and their return
periods and extends the interpolation between these points to the given return period
(similar for small return periods). Defauls to "interpolate".
min_impact : float, optional
Minimum threshold to filter the impact. Defaults to 0.
log_frequency : bool, optional
If set to True, (cummulative) frequency values are converted to log scale before
inter- and extrapolation. Defaults to True.
log_impact : bool, optional
If set to True, impact values are converted to log scale before
inter- and extrapolation. Defaults to True.
bin_decimals : int, optional
Number of decimals to group and bin impact values. Binning results in smoother (and
coarser) interpolation and more stable extrapolation. For more details and sensible
values for bin_decimals, see Notes. If None, values are not binned. Defaults to None.
Returns
-------
gdf : gpd.GeoDataFrame
GeoDataFrame containing exeedance impacts for given return periods. Each column
corresponds to a return period, each row corresponds to a centroid. Values
in the gdf correspond to the exceedance impact for the given centroid and
return period
label : str
GeoDataFrame label, for reporting and plotting
column_label : function
Column-label-generating function, for reporting and plotting
See Also
--------
util.interpolation.preprocess_and_interpolate_ev :
inter- and extrapolation method
Notes
-------
If an integer bin_decimals is given, the impact values are binned according to their
bin_decimals decimals, and their corresponding frequencies are summed. This binning leads
to a smoother (and coarser) interpolation, and a more stable extrapolation. For instance,
if bin_decimals=1, the two values 12.01 and 11.97 with corresponding frequencies 0.1 and
0.2 are combined to a value 12.0 with frequency 0.3. The default bin_decimals=None results
in not binning the values.
E.g., if your impact range from 1 to 100, you could use bin_decimals=1, if your
impact range from 1e6 to 1e9, you could use bin_decimals=-5, if your impact
range from 0.0001 to .01, you could use bin_decimals=5.
"""
LOGGER.info(
"Computing exceedance impact map for return periods: %s", return_periods
)
if self.imp_mat.size == 0:
raise ValueError(
"Attribute imp_mat is empty. Recalculate Impact"
"instance with parameter save_mat=True"
)
# check frequency unit
return_period_unit = u_dt.convert_frequency_unit_to_time_unit(
self.frequency_unit
)
# check method
if method not in [
"interpolate",
"extrapolate",
"extrapolate_constant",
"stepfunction",
]:
raise ValueError(f"Unknown method: {method}")
# calculate local exceedance impact
test_frequency = 1 / np.array(return_periods)
exceedance_impact = np.full(
(self.imp_mat.shape[1], len(test_frequency)),
np.nan if method == "interpolate" else 0.0,
)
nonzero_centroids = np.where(self.imp_mat.getnnz(axis=0) > 0)[0]
if not len(nonzero_centroids) == 0:
exceedance_impact[nonzero_centroids, :] = np.array(
[
u_interp.preprocess_and_interpolate_ev(
test_frequency,
None,
self.frequency,
self.imp_mat.getcol(i_centroid).toarray().flatten(),
log_frequency=log_frequency,
log_values=log_impact,
value_threshold=min_impact,
method=method,
y_asymptotic=0.0,
bin_decimals=bin_decimals,
)
for i_centroid in nonzero_centroids
]
)
# create the output GeoDataFrame
gdf = gpd.GeoDataFrame(
geometry=gpd.points_from_xy(self.coord_exp[:, 1], self.coord_exp[:, 0]),
crs=self.crs,
)
col_names = [f"{ret_per}" for ret_per in return_periods]
gdf[col_names] = exceedance_impact
# create label and column_label
label = f"Impact ({self.unit})"
def column_label(column_names):
return [
f"Return Period: {col} {return_period_unit}" for col in column_names
]
return gdf, label, column_label
[docs]
@deprecated(
details="The use of Impact.local_exceedance_imp is deprecated. Use "
"Impact.local_exceedance_impact instead. Some errors in the previous calculation "
"in Impact.local_exceedance_imp have been corrected. To reproduce data with the "
"previous calculation, use CLIMADA v5.0.0 or less."
)
def local_exceedance_imp(self, return_periods=(25, 50, 100, 250)):
"""This function is deprecated, use Impact.local_exceedance_impact instead."""
return (
self.local_exceedance_impact(return_periods)[0]
.values[:, 1:]
.T.astype(float)
)
[docs]
def local_return_period(
self,
threshold_impact=(1000.0, 10000.0),
method="interpolate",
min_impact=0,
log_frequency=True,
log_impact=True,
bin_decimals=None,
):
"""Compute local return periods for given threshold impacts. The default method
is fitting the ordered impacts per centroid to the corresponding cummulated
frequency with linear interpolation on log-log scale.
Parameters
----------
threshold_impact : array_like
User-specified impact values for which the return period should be calculated
locally (at each centroid). Defaults to (1000, 10000)
method : str
Method to interpolate to new threshold impacts. Currently available are
"interpolate", "extrapolate", "extrapolate_constant" and "stepfunction". If set to
"interpolate", threshold impacts outside the range of the Impact object's local
impacts will be assigned NaN. If set to "extrapolate_constant" or
"stepfunction", threshold impacts larger than the Impacts object's local
impacts will be assigned NaN, and threshold impacts smaller than the Impact
object's local impacts will be assigned the smallest observed local return period.
If set to "extrapolate", local return periods will be extrapolated (and interpolated).
The extrapolation to large threshold impacts uses the two highest impacts of
the centroid and their return periods and extends the interpolation between these
points to the given threshold imapct (similar for small large threshold impacts).
Defaults to "interpolate".
min_impacts : float, optional
Minimum threshold to filter the impact. Defaults to 0.
log_frequency : bool, optional
If set to True, (cummulative) frequency values are converted to log scale before
inter- and extrapolation. Defaults to True.
log_impact : bool, optional
If set to True, impact values are converted to log scale before
inter- and extrapolation. Defaults to True.
bin_decimals : int, optional
Number of decimals to group and bin impact values. Binning results in smoother (and
coarser) interpolation and more stable extrapolation. For more details and sensible
values for bin_decimals, see Notes. If None, values are not binned. Defaults to None.
Returns
-------
gdf : gpd.GeoDataFrame
GeoDataFrame containing return periods for given threshold impacts. Each column
corresponds to a threshold_impact value, each row corresponds to a centroid. Values
in the gdf correspond to the return period for the given centroid and
threshold_impact value
label : str
GeoDataFrame label, for reporting and plotting
column_label : function
Column-label-generating function, for reporting and plotting
See Also
--------
util.interpolation.preprocess_and_interpolate_ev :
inter- and extrapolation method
Notes
-------
If an integer bin_decimals is given, the impact values are binned according to their
bin_decimals decimals, and their corresponding frequencies are summed. This binning leads
to a smoother (and coarser) interpolation, and a more stable extrapolation. For instance,
if bin_decimals=1, the two values 12.01 and 11.97 with corresponding frequencies 0.1 and
0.2 are combined to a value 12.0 with frequency 0.3. The default bin_decimals=None results
in not binning the values.
E.g., if your impact range from 1 to 100, you could use bin_decimals=1, if your
impact range from 1e6 to 1e9, you could use bin_decimals=-5, if your impact
range from 0.0001 to .01, you could use bin_decimals=5.
"""
LOGGER.info("Computing return period map for impacts: %s", threshold_impact)
if self.imp_mat.size == 0:
raise ValueError(
"Attribute imp_mat is empty. Recalculate Impact"
"instance with parameter save_mat=True"
)
# check frequency unit
return_period_unit = u_dt.convert_frequency_unit_to_time_unit(
self.frequency_unit
)
# check method
if method not in [
"interpolate",
"extrapolate",
"extrapolate_constant",
"stepfunction",
]:
raise ValueError(f"Unknown method: {method}")
return_periods = np.full((self.imp_mat.shape[1], len(threshold_impact)), np.nan)
nonzero_centroids = np.where(self.imp_mat.getnnz(axis=0) > 0)[0]
# calculate local return periods
if not len(nonzero_centroids) == 0:
return_periods[nonzero_centroids, :] = np.array(
[
u_interp.preprocess_and_interpolate_ev(
None,
np.array(threshold_impact),
self.frequency,
self.imp_mat.getcol(i_centroid).toarray().flatten(),
log_frequency=log_frequency,
log_values=log_impact,
value_threshold=min_impact,
method=method,
y_asymptotic=np.nan,
bin_decimals=bin_decimals,
)
for i_centroid in nonzero_centroids
]
)
return_periods = safe_divide(1.0, return_periods)
# create the output GeoDataFrame
gdf = gpd.GeoDataFrame(
geometry=gpd.points_from_xy(self.coord_exp[:, 1], self.coord_exp[:, 0]),
crs=self.crs,
)
col_names = [f"{thresh_impact}" for thresh_impact in threshold_impact]
gdf[col_names] = return_periods
# create label and column_label
label = f"Return Periods ({return_period_unit})"
def column_label(column_names):
return [f"Impact: {col} {self.unit}" for col in column_names]
return gdf, label, column_label
[docs]
def calc_freq_curve(self, return_per=None):
"""Compute impact exceedance frequency curve.
Parameters
----------
return_per : np.array, optional
return periods where to compute
the exceedance impact. Use impact's frequencies if not provided
Returns
-------
ImpactFreqCurve
"""
# Sort descendingly the impacts per events
sort_idxs = np.argsort(self.at_event)[::-1]
# Calculate exceedence frequency
exceed_freq = np.cumsum(self.frequency[sort_idxs])
# Set return period and impact exceeding frequency
ifc_return_per = 1 / exceed_freq[::-1]
ifc_impact = self.at_event[sort_idxs][::-1]
if return_per is not None:
interp_imp = np.interp(return_per, ifc_return_per, ifc_impact)
ifc_return_per = return_per
ifc_impact = interp_imp
return ImpactFreqCurve(
return_per=ifc_return_per,
impact=ifc_impact,
unit=self.unit,
frequency_unit=self.frequency_unit,
label="Exceedance frequency curve",
)
def _eai_title(self):
if self.frequency_unit in ["1/year", "annual", "1/y", "1/a"]:
return "Expected annual impact"
if self.frequency_unit in ["1/day", "daily", "1/d"]:
return "Expected daily impact"
if self.frequency_unit in ["1/month", "monthly", "1/m"]:
return "Expected monthly impact"
return f"Expected impact ({self.frequency_unit})"
[docs]
def plot_scatter_eai_exposure(
self,
mask=None,
ignore_zero=False,
pop_name=True,
buffer=0.0,
extend="neither",
axis=None,
adapt_fontsize=True,
**kwargs,
):
"""Plot scatter expected impact within a period of 1/frequency_unit of each exposure.
Parameters
----------
mask : np.array, optional
mask to apply to eai_exp plotted.
ignore_zero : bool, optional
flag to indicate if zero and negative
values are ignored in plot. Default: False
pop_name : bool, optional
add names of the populated places
buffer : float, optional
border to add to coordinates.
Default: 1.0.
extend : str
optional extend border colorbar with arrows.
[ 'neither' | 'both' | 'min' | 'max' ]
axis : matplotlib.axes.Axes, optional
axis to use
adapt_fontsize : bool, optional
If set to true, the size of the fonts will be adapted to the size of the figure.
Otherwise the default matplotlib font size is used. Default is True.
kwargs : dict, optional
arguments for hexbin matplotlib function
Returns
-------
cartopy.mpl.geoaxes.GeoAxesSubplot
"""
if "cmap" not in kwargs:
kwargs["cmap"] = CMAP_IMPACT
eai_exp = self._build_exp()
axis = eai_exp.plot_scatter(
mask,
ignore_zero,
pop_name,
buffer,
extend,
axis=axis,
adapt_fontsize=adapt_fontsize,
**kwargs,
)
axis.set_title(self._eai_title())
return axis
[docs]
def plot_hexbin_eai_exposure(
self,
mask=None,
ignore_zero=False,
pop_name=True,
buffer=0.0,
extend="neither",
axis=None,
adapt_fontsize=True,
**kwargs,
):
"""Plot hexbin expected impact within a period of 1/frequency_unit of each exposure.
Parameters
----------
mask : np.array, optional
mask to apply to eai_exp plotted.
ignore_zero : bool, optional
flag to indicate if zero and negative
values are ignored in plot. Default: False
pop_name : bool, optional
add names of the populated places
buffer : float, optional
border to add to coordinates.
Default: 1.0.
extend : str, optional
extend border colorbar with arrows.
[ 'neither' | 'both' | 'min' | 'max' ]
axis : matplotlib.axes.Axes, optional
axis to use
adapt_fontsize : bool, optional
If set to true, the size of the fonts will be adapted to the size of the figure.
Otherwise the default matplotlib font size is used. Default: True
kwargs : dict, optional
arguments for hexbin matplotlib function
Returns
-------
cartopy.mpl.geoaxes.GeoAxesSubplot
"""
if "cmap" not in kwargs:
kwargs["cmap"] = CMAP_IMPACT
eai_exp = self._build_exp()
axis = eai_exp.plot_hexbin(
mask,
ignore_zero,
pop_name,
buffer,
extend,
axis=axis,
adapt_fontsize=adapt_fontsize,
**kwargs,
)
axis.set_title(self._eai_title())
return axis
[docs]
def plot_raster_eai_exposure(
self,
res=None,
raster_res=None,
save_tiff=None,
raster_f=lambda x: np.log10((np.fmax(x + 1, 1))),
label="value (log10)",
axis=None,
adapt_fontsize=True,
**kwargs,
):
"""Plot raster expected impact within a period of 1/frequency_unit of each exposure.
Parameters
----------
res : float, optional
resolution of current data in units of latitude
and longitude, approximated if not provided.
raster_res : float, optional
desired resolution of the raster
save_tiff : str, optional
file name to save the raster in tiff
format, if provided
raster_f : lambda function
transformation to use to data. Default: log10 adding 1.
label : str
colorbar label
axis : matplotlib.axes.Axes, optional
axis to use
adapt_fontsize : bool, optional
If set to true, the size of the fonts will be adapted to the size of the figure.
Otherwise the default matplotlib font size is used. Default is True.
kwargs : dict, optional
arguments for imshow matplotlib function
Returns
-------
cartopy.mpl.geoaxes.GeoAxesSubplot
"""
eai_exp = self._build_exp()
axis = eai_exp.plot_raster(
res,
raster_res,
save_tiff,
raster_f,
label,
axis=axis,
adapt_fontsize=adapt_fontsize,
**kwargs,
)
axis.set_title(self._eai_title())
return axis
[docs]
def plot_basemap_eai_exposure(
self,
mask=None,
ignore_zero=False,
pop_name=True,
buffer=0.0,
extend="neither",
zoom=10,
url=ctx.providers.CartoDB.Positron,
axis=None,
**kwargs,
):
"""Plot basemap expected impact of each exposure within a period of 1/frequency_unit.
Parameters
----------
mask : np.array, optional
mask to apply to eai_exp plotted.
ignore_zero : bool, optional
flag to indicate if zero and negative
values are ignored in plot. Default: False
pop_name : bool, optional
add names of the populated places
buffer : float, optional
border to add to coordinates. Default: 0.0.
extend : str, optional
extend border colorbar with arrows.
[ 'neither' | 'both' | 'min' | 'max' ]
zoom : int, optional
zoom coefficient used in the satellite image
url : str, optional
image source, default: ctx.providers.CartoDB.Positron
axis : matplotlib.axes.Axes, optional
axis to use
kwargs : dict, optional
arguments for scatter matplotlib function, e.g.
cmap='Greys'. Default: 'Wistia'
Returns
-------
cartopy.mpl.geoaxes.GeoAxesSubplot
"""
if "cmap" not in kwargs:
kwargs["cmap"] = CMAP_IMPACT
eai_exp = self._build_exp()
axis = eai_exp.plot_basemap(
mask, ignore_zero, pop_name, buffer, extend, zoom, url, axis=axis, **kwargs
)
axis.set_title(self._eai_title())
return axis
[docs]
def plot_hexbin_impact_exposure(
self,
event_id=1,
mask=None,
ignore_zero=False,
pop_name=True,
buffer=0.0,
extend="neither",
axis=None,
adapt_fontsize=True,
**kwargs,
):
"""Plot hexbin impact of an event at each exposure.
Requires attribute imp_mat.
Parameters
----------
event_id : int, optional
id of the event for which to plot the impact.
Default: 1.
mask : np.array, optional
mask to apply to impact plotted.
ignore_zero : bool, optional
flag to indicate if zero and negative
values are ignored in plot. Default: False
pop_name : bool, optional
add names of the populated places
buffer : float, optional
border to add to coordinates.
Default: 1.0.
extend : str, optional
extend border colorbar with arrows.
[ 'neither' | 'both' | 'min' | 'max' ]
axis : matplotlib.axes.Axes
optional axis to use
adapt_fontsize : bool, optional
If set to true, the size of the fonts will be adapted to the size of the figure.
Otherwise the default matplotlib font size is used. Default is True.
kwargs : dict, optional
arguments for hexbin matplotlib function
Returns
-------
cartopy.mpl.geoaxes.GeoAxesSubplot
"""
if self.imp_mat.size == 0:
raise ValueError(
"Attribute imp_mat is empty. Recalculate Impact"
"instance with parameter save_mat=True"
)
if "cmap" not in kwargs:
kwargs["cmap"] = CMAP_IMPACT
impact_at_events_exp = self._build_exp_event(event_id)
axis = impact_at_events_exp.plot_hexbin(
mask,
ignore_zero,
pop_name,
buffer,
extend,
axis=axis,
adapt_fontsize=adapt_fontsize,
**kwargs,
)
return axis
[docs]
def plot_basemap_impact_exposure(
self,
event_id=1,
mask=None,
ignore_zero=False,
pop_name=True,
buffer=0.0,
extend="neither",
zoom=10,
url=ctx.providers.CartoDB.Positron,
axis=None,
**kwargs,
):
"""Plot basemap impact of an event at each exposure.
Requires attribute imp_mat.
Parameters
----------
event_id : int, optional
id of the event for which to plot the impact.
Default: 1.
mask : np.array, optional
mask to apply to impact plotted.
ignore_zero : bool, optional
flag to indicate if zero and negative
values are ignored in plot. Default: False
pop_name : bool, optional
add names of the populated places
buffer : float, optional
border to add to coordinates. Default: 0.0.
extend : str, optional
extend border colorbar with arrows.
[ 'neither' | 'both' | 'min' | 'max' ]
zoom : int, optional
zoom coefficient used in the satellite image
url : str, optional
image source, default: ctx.providers.CartoDB.Positron
axis : matplotlib.axes.Axes, optional
axis to use
kwargs : dict, optional
arguments for scatter matplotlib function, e.g. cmap='Greys'.
Default: 'Wistia'
Returns
-------
cartopy.mpl.geoaxes.GeoAxesSubplot
"""
if self.imp_mat.size == 0:
raise ValueError(
"Attribute imp_mat is empty. Recalculate Impact"
"instance with parameter save_mat=True"
)
if event_id not in self.event_id:
raise ValueError(f"Event ID {event_id} not found")
if "cmap" not in kwargs:
kwargs["cmap"] = CMAP_IMPACT
impact_at_events_exp = self._build_exp_event(event_id)
axis = impact_at_events_exp.plot_basemap(
mask, ignore_zero, pop_name, buffer, extend, zoom, url, axis=axis, **kwargs
)
return axis
[docs]
def plot_rp_imp(
self,
return_periods=(25, 50, 100, 250),
log10_scale=True,
axis=None,
mask_distance=0.01,
kwargs_local_exceedance_impact=None,
**kwargs,
):
"""
Compute and plot exceedance impact maps for different return periods.
Calls local_exceedance_impact. For handling large data sets and for further options,
see Notes.
Parameters
----------
return_periods : tuple of int, optional
return periods to consider. Default: (25, 50, 100, 250)
log10_scale : boolean, optional
plot impact as log10(impact). Default: True
smooth : bool, optional
smooth plot to plot.RESOLUTIONxplot.RESOLUTION. Default: True
mask_distance: float, optional
Only regions are plotted that are closer to any of the data points than this distance,
relative to overall plot size. For instance, to only plot values
at the centroids, use mask_distance=0.01. If None, the plot is not masked.
Default is 0.01.
kwargs_local_exceedance_impact: dict
Dictionary of keyword arguments for the method impact.local_exceedance_impact.
kwargs : dict, optional
arguments for pcolormesh matplotlib function
used in event plots
Returns
-------
axis : matplotlib.axes.Axes
imp_stats : np.array
return_periods.size x num_centroids
See Also
--------
engine.impact.local_exceedance_impact :
inter- and extrapolation method
Notes
-----
For handling large data, and for more flexible options in the exceedance
impact computation and in the plotting, we recommend to use
gdf, title, labels = impact.local_exceedance_impact() and
util.plot.plot_from_gdf(gdf, title, labels) instead.
"""
LOGGER.info(
"Some errors in the previous calculation of local exceedance impacts have been "
"corrected, see Impact.local_exceedance_impact. To reproduce data with the "
"previous calculation, use CLIMADA v5.0.0 or less."
)
if kwargs_local_exceedance_impact is None:
kwargs_local_exceedance_impact = {}
impacts_stats, title, column_labels = self.local_exceedance_impact(
return_periods, **kwargs_local_exceedance_impact
)
impacts_stats_vals = impacts_stats.values[:, 1:].T.astype(float)
if not log10_scale:
min_impact, max_impact = np.nanmin(impacts_stats_vals), np.nanmax(
impacts_stats_vals
)
kwargs.update(
{
"norm": Normalize(vmin=min_impact, vmax=max_impact),
}
)
axis = u_plot.plot_from_gdf(
impacts_stats,
title,
column_labels,
axis=axis,
mask_distance=mask_distance,
**kwargs,
)
return axis, impacts_stats_vals
[docs]
def write_csv(self, file_name):
"""Write data into csv file. imp_mat is not saved.
Parameters
----------
file_name : str
absolute path of the file
"""
if not all((isinstance(val, str) for val in self.event_name)):
raise TypeError("'event_name' must be a list of strings")
LOGGER.info("Writing %s", file_name)
with open(file_name, "w", encoding="utf-8") as imp_file:
imp_wr = csv.writer(imp_file)
imp_wr.writerow(
[
"haz_type",
"unit",
"tot_value",
"aai_agg",
"event_id",
"event_name",
"event_date",
"event_frequency",
"frequency_unit",
"at_event",
"eai_exp",
"exp_lat",
"exp_lon",
"exp_crs",
]
)
csv_data = [
[self.haz_type],
[self.unit],
[self._tot_value],
[self.aai_agg],
self.event_id,
self.event_name,
self.date,
self.frequency,
[self.frequency_unit],
self.at_event,
self.eai_exp,
self.coord_exp[:, 0],
self.coord_exp[:, 1],
[str(self.crs)],
]
for values in zip_longest(*csv_data):
imp_wr.writerow(values)
[docs]
def write_excel(self, file_name):
"""Write data into Excel file. imp_mat is not saved.
Parameters
----------
file_name : str
absolute path of the file
"""
if not all((isinstance(val, str) for val in self.event_name)):
raise TypeError("'event_name' must be a list of strings")
LOGGER.info("Writing %s", file_name)
def write_col(i_col, imp_ws, xls_data):
"""Write one measure"""
row_ini = 1
for dat_row in xls_data:
imp_ws.write(row_ini, i_col, dat_row)
row_ini += 1
imp_wb = xlsxwriter.Workbook(file_name)
imp_ws = imp_wb.add_worksheet()
header = [
"haz_type",
"unit",
"tot_value",
"aai_agg",
"event_id",
"event_name",
"event_date",
"event_frequency",
"frequency_unit",
"at_event",
"eai_exp",
"exp_lat",
"exp_lon",
"exp_crs",
]
for icol, head_dat in enumerate(header):
imp_ws.write(0, icol, head_dat)
data = [str(self.haz_type)]
write_col(0, imp_ws, data)
write_col(1, imp_ws, [self.unit])
write_col(2, imp_ws, [self._tot_value])
write_col(3, imp_ws, [self.aai_agg])
write_col(4, imp_ws, self.event_id)
write_col(5, imp_ws, self.event_name)
write_col(6, imp_ws, self.date)
write_col(7, imp_ws, self.frequency)
write_col(8, imp_ws, [self.frequency_unit])
write_col(9, imp_ws, self.at_event)
write_col(10, imp_ws, self.eai_exp)
write_col(11, imp_ws, self.coord_exp[:, 0])
write_col(12, imp_ws, self.coord_exp[:, 1])
write_col(13, imp_ws, [str(self.crs)])
imp_wb.close()
[docs]
def write_hdf5(self, file_path: Union[str, Path], dense_imp_mat: bool = False):
"""Write the data stored in this object into an H5 file.
Try to write all attributes of this class into H5 datasets or attributes.
By default, any iterable will be stored in a dataset and any string or scalar
will be stored in an attribute. Dictionaries will be stored as groups, with
the previous rules being applied recursively to their values.
The impact matrix can be stored in a sparse or dense format.
Parameters
----------
file_path : str or Path
File path to write data into. The enclosing directory must exist.
dense_imp_mat : bool
If ``True``, write the impact matrix as dense matrix that can be more easily
interpreted by common H5 file readers but takes up (vastly) more space.
Defaults to ``False``.
Raises
------
TypeError
If :py:attr:`event_name` does not contain strings exclusively.
"""
# Define writers for all types (will be filled later)
type_writers = dict()
def write(group: h5py.Group, name: str, value: Any):
"""Write the given name-value pair with a type-specific writer.
This selects a writer by calling ``isinstance(value, key)``, where ``key``
iterates through the keys of ``type_writers``. If a type matches multiple
entries in ``type_writers``, the *first* match is chosen.
Parameters
----------
group : h5py.Group
The group in the H5 file to write into
name : str
The identifier of the value
value : scalar or array
The value/data to write
Raises
------
TypeError
If no suitable writer could be found for a given type
"""
for key, writer in type_writers.items():
if isinstance(value, key):
return writer(group, name, value)
raise TypeError(f"Could not find a writer for dataset: {name}")
def _str_type_helper(values: Collection):
"""Return string datatype if we assume 'values' contains strings"""
if all((isinstance(val, str) for val in values)):
return h5py.string_dtype()
return None
def write_attribute(group, name, value):
"""Write any attribute. This should work for almost any data"""
group.attrs[name] = value
def write_dataset(group, name, value):
"""Write a dataset"""
group.create_dataset(name, data=value, dtype=_str_type_helper(value))
def write_dict(group, name, value):
"""Write a dictionary with unknown level recursively into a group"""
group = group.create_group(name)
for key, val in value.items():
write(group, key, val)
def _write_csr_dense(group, name, value):
"""Write a CSR Matrix in dense format"""
group.create_dataset(name, data=value.toarray())
def _write_csr_sparse(group, name, value):
"""Write a CSR Matrix in sparse format"""
group = group.create_group(name)
group.create_dataset("data", data=value.data)
group.create_dataset("indices", data=value.indices)
group.create_dataset("indptr", data=value.indptr)
group.attrs["shape"] = value.shape
def write_csr(group, name, value):
"""Write a CSR matrix depending on user input"""
if dense_imp_mat:
_write_csr_dense(group, name, value)
else:
_write_csr_sparse(group, name, value)
# Set up writers based on types
# NOTE: 1) Many things are 'Collection', so make sure that precendence fits!
# 2) Anything is 'object', so this serves as fallback/default.
type_writers = {
str: write_attribute,
dict: write_dict,
sparse.csr_matrix: write_csr,
Collection: write_dataset,
object: write_attribute,
}
# Open file in write mode
with h5py.File(file_path, "w") as file:
# Now write all attributes
# NOTE: Remove leading underscore to write '_tot_value' as regular attribute
for name, value in self.__dict__.items():
if name == "event_name" and _str_type_helper(value) is None:
raise TypeError("'event_name' must be a list of strings")
write(file, name.lstrip("_"), value)
[docs]
def write_sparse_csr(self, file_name):
"""Write imp_mat matrix in numpy's npz format."""
LOGGER.info("Writing %s", file_name)
np.savez(
file_name,
data=self.imp_mat.data,
indices=self.imp_mat.indices,
indptr=self.imp_mat.indptr,
shape=self.imp_mat.shape,
)
[docs]
@staticmethod
def read_sparse_csr(file_name):
"""Read imp_mat matrix from numpy's npz format.
Parameters
----------
file_name : str
Returns
-------
sparse.csr_matrix
"""
LOGGER.info("Reading %s", file_name)
loader = np.load(file_name)
return sparse.csr_matrix(
(loader["data"], loader["indices"], loader["indptr"]), shape=loader["shape"]
)
[docs]
@classmethod
def from_csv(cls, file_name):
"""Read csv file containing impact data generated by write_csv.
Parameters
----------
file_name : str
absolute path of the file
Returns
-------
imp : climada.engine.impact.Impact
Impact from csv file
"""
# pylint: disable=no-member
LOGGER.info("Reading %s", file_name)
imp_df = pd.read_csv(file_name)
imp = cls(haz_type=imp_df["haz_type"][0])
imp.unit = imp_df["unit"][0]
imp.tot_value = imp_df["tot_value"][0]
imp.aai_agg = imp_df["aai_agg"][0]
imp.event_id = imp_df["event_id"][~np.isnan(imp_df["event_id"])].values
num_ev = imp.event_id.size
event_names = imp_df["event_name"][:num_ev]
if not is_string_dtype(event_names):
warnings.warn(
"Some event names are not str will be converted to str.", UserWarning
)
event_names = event_names.astype(str)
imp.event_name = event_names.values.tolist()
imp.date = imp_df["event_date"][:num_ev].values
imp.at_event = imp_df["at_event"][:num_ev].values
imp.frequency = imp_df["event_frequency"][:num_ev].values
imp.frequency_unit = (
imp_df["frequency_unit"][0] if "frequency_unit" in imp_df else DEF_FREQ_UNIT
)
imp.eai_exp = imp_df["eai_exp"][~np.isnan(imp_df["eai_exp"])].values
num_exp = imp.eai_exp.size
imp.coord_exp = np.zeros((num_exp, 2))
imp.coord_exp[:, 0] = imp_df["exp_lat"][:num_exp]
imp.coord_exp[:, 1] = imp_df["exp_lon"][:num_exp]
try:
imp.crs = u_coord.to_crs_user_input(imp_df["exp_crs"].values[0])
except AttributeError:
imp.crs = DEF_CRS
return imp
[docs]
def read_csv(self, *args, **kwargs):
"""This function is deprecated, use Impact.from_csv instead."""
LOGGER.warning(
"The use of Impact.read_csv is deprecated. Use Impact.from_csv instead."
)
self.__dict__ = Impact.from_csv(*args, **kwargs).__dict__
[docs]
@classmethod
def from_excel(cls, file_name):
"""Read excel file containing impact data generated by write_excel.
Parameters
----------
file_name : str
absolute path of the file
Returns
-------
imp : climada.engine.impact.Impact
Impact from excel file
"""
LOGGER.info("Reading %s", file_name)
imp_df = pd.read_excel(file_name)
imp = cls(haz_type=str(imp_df["haz_type"][0]))
imp.unit = imp_df["unit"][0]
imp.tot_value = imp_df["tot_value"][0]
imp.aai_agg = imp_df["aai_agg"][0]
imp.event_id = imp_df["event_id"][~np.isnan(imp_df["event_id"].values)].values
event_names = imp_df["event_name"][~np.isnan(imp_df["event_id"].values)]
if not is_string_dtype(event_names):
warnings.warn(
"Some event names are not str will be converted to str", UserWarning
)
event_names = event_names.astype(str)
imp.event_name = event_names.values
imp.date = imp_df["event_date"][: imp.event_id.size].values
imp.frequency = imp_df["event_frequency"][: imp.event_id.size].values
imp.frequency_unit = (
imp_df["frequency_unit"][0] if "frequency_unit" in imp_df else DEF_FREQ_UNIT
)
imp.at_event = imp_df["at_event"][: imp.event_id.size].values
imp.eai_exp = imp_df["eai_exp"][~np.isnan(imp_df["eai_exp"].values)].values
imp.coord_exp = np.zeros((imp.eai_exp.size, 2))
imp.coord_exp[:, 0] = imp_df["exp_lat"].values[: imp.eai_exp.size]
imp.coord_exp[:, 1] = imp_df["exp_lon"].values[: imp.eai_exp.size]
try:
imp.crs = u_coord.to_csr_user_input(imp_df["exp_crs"].values[0])
except AttributeError:
imp.crs = DEF_CRS
return imp
[docs]
def read_excel(self, *args, **kwargs):
"""This function is deprecated, use Impact.from_excel instead."""
LOGGER.warning(
"The use of Impact.read_excel is deprecated."
"Use Impact.from_excel instead."
)
self.__dict__ = Impact.from_excel(*args, **kwargs).__dict__
[docs]
@classmethod
def from_hdf5(cls, file_path: Union[str, Path]):
"""Create an impact object from an H5 file.
This assumes a specific layout of the file. If values are not found in the
expected places, they will be set to the default values for an ``Impact`` object.
The following H5 file structure is assumed (H5 groups are terminated with ``/``,
attributes are denoted by ``.attrs/``)::
file.h5
├─ at_event
├─ coord_exp
├─ eai_exp
├─ event_id
├─ event_name
├─ frequency
├─ imp_mat
├─ .attrs/
│ ├─ aai_agg
│ ├─ crs
│ ├─ frequency_unit
│ ├─ haz_type
│ ├─ tot_value
│ ├─ unit
As per the :py:func:`climada.engine.impact.Impact.__init__`, any of these entries
is optional. If it is not found, the default value will be used when constructing
the Impact.
The impact matrix ``imp_mat`` can either be an H5 dataset, in which case it is
interpreted as dense representation of the matrix, or an H5 group, in which case
the group is expected to contain the following data for instantiating a
`scipy.sparse.csr_matrix <https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.csr_matrix.html>`_::
imp_mat/
├─ data
├─ indices
├─ indptr
├─ .attrs/
│ ├─ shape
Parameters
----------
file_path : str or Path
The file path of the file to read.
Returns
-------
imp : Impact
Impact with data from the given file
"""
kwargs = dict()
with h5py.File(file_path, "r") as file:
# Impact matrix
if "imp_mat" in file:
impact_matrix = file["imp_mat"]
if isinstance(impact_matrix, h5py.Dataset): # Dense
impact_matrix = sparse.csr_matrix(impact_matrix)
else: # Sparse
impact_matrix = sparse.csr_matrix(
(
impact_matrix["data"],
impact_matrix["indices"],
impact_matrix["indptr"],
),
shape=impact_matrix.attrs["shape"],
)
kwargs["imp_mat"] = impact_matrix
# Scalar attributes
scalar_attrs = set(
("crs", "tot_value", "unit", "aai_agg", "frequency_unit", "haz_type")
).intersection(file.attrs.keys())
kwargs.update({attr: file.attrs[attr] for attr in scalar_attrs})
# Array attributes
# NOTE: Need [:] to copy array data. Otherwise, it would be a view that is
# invalidated once we close the file.
array_attrs = set(
("event_id", "date", "coord_exp", "eai_exp", "at_event", "frequency")
).intersection(file.keys())
kwargs.update({attr: file[attr][:] for attr in array_attrs})
# Special handling for 'event_name' because it should be a list of strings
if "event_name" in file:
# pylint: disable=no-member
try:
event_name = file["event_name"].asstr()[:]
except TypeError:
LOGGER.warning(
"'event_name' is not stored as strings. Trying to decode "
"values with 'str()' instead."
)
event_name = map(str, file["event_name"][:])
kwargs["event_name"] = list(event_name)
# Create the impact object
return cls(**kwargs)
[docs]
@staticmethod
def video_direct_impact(
exp,
impf_set,
haz_list,
file_name="",
writer=animation.PillowWriter(bitrate=500),
imp_thresh=0,
args_exp=None,
args_imp=None,
ignore_zero=False,
pop_name=False,
):
"""
Computes and generates video of accumulated impact per input events
over exposure.
Parameters
----------
exp : climada.entity.Exposures
exposures instance, constant during all video
impf_set : climada.entity.ImpactFuncSet
impact functions
haz_list : (list(Hazard))
every Hazard contains an event; all hazards
use the same centroids
file_name : str, optional
file name to save video, if provided
writer : matplotlib.animation.*, optional
video writer. Default: pillow with bitrate=500
imp_thresh : float, optional
represent damages greater than threshold. Default: 0
args_exp : dict, optional
arguments for scatter (points) or hexbin (raster)
matplotlib function used in exposures
args_imp : dict, optional
arguments for scatter (points) or hexbin (raster)
matplotlib function used in impact
ignore_zero : bool, optional
flag to indicate if zero and negative
values are ignored in plot. Default: False
pop_name : bool, optional
add names of the populated places
The default is False.
Returns
-------
list of Impact
"""
from climada.engine.impact_calc import ( # pylint: disable=import-outside-toplevel
ImpactCalc,
)
if args_exp is None:
args_exp = dict()
if args_imp is None:
args_imp = dict()
imp_list = []
exp_list = []
imp_arr = np.zeros(len(exp.gdf))
# assign centroids once for all
exp.assign_centroids(haz_list[0])
for i_time, _ in enumerate(haz_list):
imp_tmp = ImpactCalc(exp, impf_set, haz_list[i_time]).impact(
assign_centroids=False
)
imp_arr = np.maximum(imp_arr, imp_tmp.eai_exp)
# remove not impacted exposures
save_exp = imp_arr > imp_thresh
imp_tmp.coord_exp = imp_tmp.coord_exp[save_exp, :]
imp_tmp.eai_exp = imp_arr[save_exp]
imp_list.append(imp_tmp)
exp_list.append(~save_exp)
v_lim = [
np.array([haz.intensity.min() for haz in haz_list]).min(),
np.array([haz.intensity.max() for haz in haz_list]).max(),
]
if "vmin" not in args_exp:
args_exp["vmin"] = exp.gdf["value"].values.min()
if "vmin" not in args_imp:
args_imp["vmin"] = np.array(
[imp.eai_exp.min() for imp in imp_list if imp.eai_exp.size]
).min()
if "vmax" not in args_exp:
args_exp["vmax"] = exp.gdf["value"].values.max()
if "vmax" not in args_imp:
args_imp["vmax"] = np.array(
[imp.eai_exp.max() for imp in imp_list if imp.eai_exp.size]
).max()
if "cmap" not in args_exp:
args_exp["cmap"] = "winter_r"
if "cmap" not in args_imp:
args_imp["cmap"] = "autumn_r"
plot_raster = False
if exp.meta:
plot_raster = True
def run(i_time):
haz_list[i_time].plot_intensity(
1, axis=axis, cmap="Greys", vmin=v_lim[0], vmax=v_lim[1], alpha=0.8
)
if plot_raster:
exp.plot_hexbin(
axis=axis,
mask=exp_list[i_time],
ignore_zero=ignore_zero,
pop_name=pop_name,
**args_exp,
)
if imp_list[i_time].coord_exp.size:
imp_list[i_time].plot_hexbin_eai_exposure(
axis=axis, pop_name=pop_name, **args_imp
)
fig.delaxes(fig.axes[1])
else:
exp.plot_scatter(
axis=axis,
mask=exp_list[i_time],
ignore_zero=ignore_zero,
pop_name=pop_name,
**args_exp,
)
if imp_list[i_time].coord_exp.size:
imp_list[i_time].plot_scatter_eai_exposure(
axis=axis, pop_name=pop_name, **args_imp
)
fig.delaxes(fig.axes[1])
fig.delaxes(fig.axes[1])
fig.delaxes(fig.axes[1])
axis.set_xlim(
haz_list[-1].centroids.lon.min(), haz_list[-1].centroids.lon.max()
)
axis.set_ylim(
haz_list[-1].centroids.lat.min(), haz_list[-1].centroids.lat.max()
)
axis.set_title(haz_list[i_time].event_name[0])
pbar.update()
if file_name:
LOGGER.info("Generating video %s", file_name)
fig, axis, _fontsize = u_plot.make_map()
ani = animation.FuncAnimation(
fig, run, frames=len(haz_list), interval=500, blit=False
)
pbar = tqdm(total=len(haz_list))
fig.tight_layout()
ani.save(file_name, writer=writer)
pbar.close()
return imp_list
def _build_exp(self):
return Exposures(
data={
"value": self.eai_exp,
"latitude": self.coord_exp[:, 0],
"longitude": self.coord_exp[:, 1],
},
crs=self.crs,
value_unit=self.unit,
ref_year=0,
meta=None,
)
def _build_exp_event(self, event_id):
"""Write impact of an event as Exposures
Parameters
----------
event_id : int
id of the event
"""
[[idx]] = (self.event_id == event_id).nonzero()
return Exposures(
data={
"value": self.imp_mat[idx].toarray().ravel(),
"latitude": self.coord_exp[:, 0],
"longitude": self.coord_exp[:, 1],
},
crs=self.crs,
value_unit=self.unit,
ref_year=0,
meta=None,
)
[docs]
def select(
self,
event_ids=None,
event_names=None,
dates=None,
coord_exp=None,
reset_frequency=False,
):
"""
Select a subset of events and/or exposure points from the impact.
If multiple input variables are not None, it returns all the impacts
matching at least one of the conditions.
Notes
-----
the frequencies are NOT adjusted. Method to adjust frequencies
and obtain correct eai_exp:
1- Select subset of impact according to your choice
imp = impact.select(...)
2- Adjust manually the frequency of the subset of impact
imp.frequency = [...]
3- Use select without arguments to select all events and recompute
the eai_exp with the updated frequencies.
imp = imp.select()
Parameters
----------
event_ids : list of int, optional
Selection of events by their id. The default is None.
event_names : list of str, optional
Selection of events by their name. The default is None.
dates : tuple, optional
(start-date, end-date), events are selected if they are >=
than start-date and <= than end-date. Dates in same format
as impact.date (ordinal format of datetime library)
The default is None.
coord_exp : np.array, optional
Selection of exposures coordinates [lat, lon] (in degrees)
The default is None.
reset_frequency : bool, optional
Change frequency of events proportional to difference between first and last
year (old and new). Assumes annual frequency values. Default: False.
Raises
------
ValueError
If the impact matrix is missing, the eai_exp and aai_agg cannot
be updated for a selection of events and/or exposures.
Returns
-------
imp : climada.engine.impact.Impact
A new impact object with a selection of events and/or exposures
"""
nb_events = self.event_id.size
nb_exp = len(self.coord_exp)
if self.imp_mat.shape != (nb_events, nb_exp):
raise ValueError(
"The impact matrix is missing or incomplete. "
"The eai_exp and aai_agg cannot be computed. "
"Please recompute impact.calc() with save_mat=True "
"before using impact.select()"
)
if nb_events == nb_exp:
LOGGER.warning(
"The number of events is equal to the number of "
"exposure points. It is not possible to "
"differentiate events and exposures attributes. "
"Please add/remove one event/exposure point. "
"This is a purely technical limitation of this "
"method."
)
return None
imp = copy.deepcopy(self)
# apply event selection to impact attributes
sel_ev = self._selected_events_idx(event_ids, event_names, dates, nb_events)
if sel_ev is not None:
# set all attributes that are 'per event', i.e. have a dimension
# of length equal to the number of events (=nb_events)
for attr in get_attributes_with_matching_dimension(imp, [nb_events]):
value = imp.__getattribute__(attr)
if isinstance(value, np.ndarray):
if value.ndim == 1:
setattr(imp, attr, value[sel_ev])
else:
LOGGER.warning(
"Found a multidimensional numpy array "
"with one dimension matching the number of events. "
"But multidimensional numpy arrays are not handled "
"in impact.select"
)
elif isinstance(value, sparse.csr_matrix):
setattr(imp, attr, value[sel_ev, :])
elif isinstance(value, list) and value:
setattr(imp, attr, [value[idx] for idx in sel_ev])
else:
pass
LOGGER.info(
"The eai_exp and aai_agg are computed for the "
"selected subset of events WITHOUT modification of "
"the frequencies."
)
# apply exposure selection to impact attributes
if coord_exp is not None:
sel_exp = self._selected_exposures_idx(coord_exp)
imp.coord_exp = imp.coord_exp[sel_exp]
imp.imp_mat = imp.imp_mat[:, sel_exp]
# .A1 reduce 1d matrix to 1d array
imp.at_event = imp.imp_mat.sum(axis=1).A1
imp.tot_value = None
LOGGER.info(
"The total value cannot be re-computed for a "
"subset of exposures and is set to None."
)
# reset frequency if date span has changed (optional):
if reset_frequency:
if self.frequency_unit not in ["1/year", "annual", "1/y", "1/a"]:
LOGGER.warning(
"Resetting the frequency is based on the calendar year of given"
" dates but the frequency unit here is %s. Consider setting the frequency"
" manually for the selection or changing the frequency unit to %s.",
self.frequency_unit,
DEF_FREQ_UNIT,
)
year_span_old = (
np.abs(
dt.datetime.fromordinal(self.date.max()).year
- dt.datetime.fromordinal(self.date.min()).year
)
+ 1
)
year_span_new = (
np.abs(
dt.datetime.fromordinal(imp.date.max()).year
- dt.datetime.fromordinal(imp.date.min()).year
)
+ 1
)
imp.frequency = imp.frequency * year_span_old / year_span_new
# cast frequency vector into 2d array for sparse matrix multiplication
freq_mat = imp.frequency.reshape(len(imp.frequency), 1)
# .A1 reduce 1d matrix to 1d array
imp.eai_exp = imp.imp_mat.multiply(freq_mat).sum(axis=0).A1
imp.aai_agg = imp.eai_exp.sum()
return imp
def _selected_events_idx(self, event_ids, event_names, dates, nb_events):
if all(var is None for var in [dates, event_ids, event_names]):
return None
# filter events by date
if dates is None:
mask_dt = np.zeros(nb_events, dtype=bool)
else:
mask_dt = np.ones(nb_events, dtype=bool)
date_ini, date_end = dates
if isinstance(date_ini, str):
date_ini = u_dt.str_to_date(date_ini)
date_end = u_dt.str_to_date(date_end)
mask_dt &= date_ini <= self.date
mask_dt &= self.date <= date_end
if not np.any(mask_dt):
LOGGER.info("No impact event in given date range %s.", dates)
sel_dt = mask_dt.nonzero()[0] # Convert bool to indices
# filter events by id
if event_ids is None:
sel_id = np.array([], dtype=int)
else:
(sel_id,) = np.isin(self.event_id, event_ids).nonzero()
# pylint: disable=no-member
if sel_id.size == 0:
LOGGER.info("No impact event with given ids %s found.", event_ids)
# filter events by name
if event_names is None:
sel_na = np.array([], dtype=int)
else:
(sel_na,) = np.isin(self.event_name, event_names).nonzero()
# pylint: disable=no-member
if sel_na.size == 0:
LOGGER.info("No impact event with given names %s found.", event_names)
# select events with machting id, name or date field.
sel_ev = np.unique(np.concatenate([sel_dt, sel_id, sel_na]))
# if no event found matching ids, names or dates, warn the user
if sel_ev.size == 0:
LOGGER.warning("No event matches the selection. ")
return sel_ev
def _selected_exposures_idx(self, coord_exp):
assigned_idx = u_coord.match_coordinates(self.coord_exp, coord_exp, threshold=0)
sel_exp = (assigned_idx >= 0).nonzero()[0]
if sel_exp.size == 0:
LOGGER.warning("No exposure coordinates match the selection.")
return sel_exp
[docs]
@classmethod
def concat(cls, imp_list: Iterable, reset_event_ids: bool = False):
"""Concatenate impact objects with the same exposure
This function is useful if, e.g. different impact functions
have to be applied for different seasons (e.g. for agricultural impacts).
It checks if the exposures of the passed impact objects are identical and then
- concatenates the attributes ``event_id``, ``event_name``, ``date``,
``frequency``, ``imp_mat``, ``at_event``,
- sums up the values of attributes ``eai_exp``, ``aai_exp``
- and takes the following attributes from the first impact object in the passed
impact list: ``coord_exp``, ``crs``, ``unit``, ``tot_value``,
``frequency_unit``, ``haz_type``
If event ids are not unique among the passed impact objects an error is raised.
In this case, the user can set ``reset_event_ids=True`` to create unique event ids
for the concatenated impact.
If all impact matrices of the impacts in ``imp_list`` are empty,
the impact matrix of the concatenated impact is also empty.
Parameters
----------
imp_list : Iterable of climada.engine.impact.Impact
Iterable of Impact objects to concatenate
reset_event_ids: boolean, optional
Reset event ids of the concatenated impact object
Returns
--------
impact: climada.engine.impact.Impact
New impact object which is a concatenation of all impacts
Notes
-----
- Concatenation of impacts with different exposure (e.g. different countries)
could also be implemented here in the future.
"""
def check_unique_attr(attr_name: str):
"""Check if an attribute is unique among all impacts"""
if len({getattr(imp, attr_name) for imp in imp_list}) > 1:
raise ValueError(
f"Attribute '{attr_name}' must be unique among impacts"
)
# Check if single-value attribute are unique
for attr in ("crs", "tot_value", "unit", "frequency_unit"):
check_unique_attr(attr)
# Check exposure coordinates
imp_iter = iter(imp_list)
first_imp = next(imp_iter)
for imp in imp_iter:
if not np.array_equal(first_imp.coord_exp, imp.coord_exp):
raise ValueError("The impacts have different exposure coordinates")
# Stack attributes
def stack_attribute(attr_name: str) -> np.ndarray:
"""Stack an attribute of all impacts passed to this method"""
return np.concatenate([getattr(imp, attr_name) for imp in imp_list])
# Concatenate event IDs
event_ids = stack_attribute("event_id")
if reset_event_ids:
# NOTE: event_ids must not be zero!
event_ids = np.array(range(len(event_ids))) + 1
else:
unique_ids, count = np.unique(event_ids, return_counts=True)
if np.any(count > 1):
raise ValueError(
f"Duplicate event IDs: {unique_ids[count > 1]}\n"
"Consider setting 'reset_event_ids=True'"
)
# Concatenate impact matrices
imp_mats = [imp.imp_mat for imp in imp_list]
if len({mat.shape[1] for mat in imp_mats}) > 1:
raise ValueError(
"Impact matrices do not have the same number of exposure points"
)
imp_mat = sparse.vstack(imp_mats)
# Concatenate other attributes
kwargs = {
attr: stack_attribute(attr) for attr in ("date", "frequency", "at_event")
}
# Get remaining attributes from first impact object in list
return cls(
event_id=event_ids,
event_name=list(stack_attribute("event_name").flat),
coord_exp=first_imp.coord_exp,
crs=first_imp.crs,
unit=first_imp.unit,
tot_value=first_imp.tot_value,
eai_exp=np.nansum([imp.eai_exp for imp in imp_list], axis=0),
aai_agg=np.nansum([imp.aai_agg for imp in imp_list]),
imp_mat=imp_mat,
haz_type=first_imp.haz_type,
frequency_unit=first_imp.frequency_unit,
**kwargs,
)
[docs]
def match_centroids(
self, hazard, distance="euclidean", threshold=u_coord.NEAREST_NEIGHBOR_THRESHOLD
):
"""
Finds the closest hazard centroid for each impact coordinate.
Creates a temporary GeoDataFrame and uses ``u_coord.match_centroids()``.
See there for details and parameters
Parameters
----------
hazard : Hazard
Hazard to match (with raster or vector centroids).
distance : str, optional
Distance to use in case of vector centroids.
Possible values are "euclidean", "haversine" and "approx".
Default: "euclidean"
threshold : float
If the distance (in km) to the nearest neighbor exceeds `threshold`,
the index `-1` is assigned.
Set `threshold` to 0, to disable nearest neighbor matching.
Default: 100 (km)
Returns
-------
np.array
array of closest Hazard centroids, aligned with the Impact's `coord_exp` array
"""
return u_coord.match_centroids(
self._build_exp().gdf,
hazard.centroids,
distance=distance,
threshold=threshold,
)
[docs]
@dataclass
class ImpactFreqCurve:
"""Impact exceedence frequency curve."""
return_per: np.ndarray = field(default_factory=lambda: np.empty(0))
"""return period"""
impact: np.ndarray = field(default_factory=lambda: np.empty(0))
"""impact exceeding frequency"""
unit: str = ""
"""value unit used (given by exposures unit)"""
frequency_unit: str = DEF_FREQ_UNIT
"""value unit used (given by exposures unit)"""
label: str = ""
"""string describing source data"""
[docs]
def plot(self, axis=None, log_frequency=False, **kwargs):
"""Plot impact frequency curve.
Parameters
----------
axis : matplotlib.axes.Axes, optional
axis to use
log_frequency : boolean, optional
plot logarithmioc exceedance frequency on x-axis
kwargs : dict, optional
arguments for plot matplotlib function, e.g. color='b'
Returns
-------
matplotlib.axes.Axes
"""
if not axis:
_, axis = plt.subplots(1, 1)
axis.set_title(self.label)
axis.set_ylabel("Impact (" + self.unit + ")")
if log_frequency:
axis.set_xlabel(f"Exceedance frequency ({self.frequency_unit})")
axis.set_xscale("log")
axis.plot(self.return_per**-1, self.impact, **kwargs)
else:
axis.set_xlabel("Return period (year)")
axis.plot(self.return_per, self.impact, **kwargs)
return axis