"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Define Uncertainty class for Impact differences between two climates.
"""
__all__ = ["CalcDeltaImpact"]
import logging
import time
from typing import Union
import itertools
import pandas as pd
import numpy as np
import pathos.multiprocessing as mp
# use pathos.multiprocess fork of multiprocessing for compatibility
# wiht notebooks and other environments https://stackoverflow.com/a/65001152/12454103
from climada.engine import ImpactCalc
from climada.engine.unsequa import Calc, InputVar, UncImpactOutput
from climada.engine.unsequa.calc_base import (
_sample_parallel_iterator,
_multiprocess_chunksize,
_transpose_chunked_data,
)
from climada.entity import Exposures, ImpactFuncSet
from climada.hazard import Hazard
from climada.util import log_level
from climada.util.value_representation import safe_divide
LOGGER = logging.getLogger(__name__)
[docs]
class CalcDeltaImpact(Calc):
"""
Delta Impact uncertainty caclulation class.
This is the class to perform uncertainty analysis on the outputs of a relative
change in impact from a (final impact - initial impact) / initial impact.
Impact objects are regular climada.engine.impact.Impact() objects. The resulting
Delta Impact is a relative change (fraction of the inital impact). The relative
change is intuitive to understand in contrast to absolute changes which are
hard to understand without knowledge of the absolute initial (baseline) state.
Attributes
----------
rp : list(int)
List of the chosen return periods.
calc_eai_exp : bool
Compute eai_exp or not
calc_at_event : bool
Compute eai_exp or not
value_unit : str
Unit of the exposures value
exp_input_var : InputVar or Exposures
Exposure uncertainty variable
impf_input_var : InputVar if ImpactFuncSet
Impact function set uncertainty variable
haz_input_var: InputVar or Hazard
Hazard uncertainty variable
_input_var_names : tuple(str)
Names of the required uncertainty input variables
('exp_initial_input_var', 'impf_initial_input_var', 'haz_initial_input_var',
'exp_final_input_var', 'impf_final_input_var', 'haz_final_input_var'')
_metric_names : tuple(str)
Names of the impact output metrics
('aai_agg', 'freq_curve', 'at_event', 'eai_exp')
"""
_input_var_names = (
"exp_initial_input_var",
"impf_initial_input_var",
"haz_initial_input_var",
"exp_final_input_var",
"impf_final_input_var",
"haz_final_input_var",
)
"""Names of the required uncertainty variables"""
_metric_names = ("aai_agg", "freq_curve", "at_event", "eai_exp")
"""Names of the cost benefit output metrics"""
[docs]
def __init__(
self,
exp_initial_input_var: Union[InputVar, Exposures],
impf_initial_input_var: Union[InputVar, ImpactFuncSet],
haz_initial_input_var: Union[InputVar, Hazard],
exp_final_input_var: Union[InputVar, Exposures],
impf_final_input_var: Union[InputVar, ImpactFuncSet],
haz_final_input_var: Union[InputVar, Hazard],
):
"""Initialize UncCalcImpact
Sets the uncertainty input variables, the impact metric_names, and the
units.
Parameters
----------
exp_initial_input_var : climada.engine.uncertainty.input_var.InputVar
or climada.entity.Exposure
Exposure uncertainty variable or Exposure of initial state
impf_initital_input_var : climada.engine.uncertainty.input_var.InputVar
or climada.entity.ImpactFuncSet
Impact function set uncertainty variable or Impact function set of
initial state
haz_initial_input_var : climada.engine.uncertainty.input_var.InputVar
or climada.hazard.Hazard
Hazard uncertainty variable or Hazard of initial state
exp_final_input_var : climada.engine.uncertainty.input_var.InputVar or
climada.entity.Exposure
Exposure uncertainty variable or Exposure of final state
impf_final_input_var : climada.engine.uncertainty.input_var.InputVar or
climada.entity.ImpactFuncSet
Impact function set uncertainty variable or Impact function set of
final state
haz_final_input_var : climada.engine.uncertainty.input_var.InputVar or
climada.hazard.Hazard
Hazard uncertainty variable or Hazard of final state
"""
Calc.__init__(self)
self.exp_initial_input_var = InputVar.var_to_inputvar(exp_initial_input_var)
self.impf_initial_input_var = InputVar.var_to_inputvar(impf_initial_input_var)
self.haz_initial_input_var = InputVar.var_to_inputvar(haz_initial_input_var)
self.exp_final_input_var = InputVar.var_to_inputvar(exp_final_input_var)
self.impf_final_input_var = InputVar.var_to_inputvar(impf_final_input_var)
self.haz_final_input_var = InputVar.var_to_inputvar(haz_final_input_var)
self.value_unit = self.exp_initial_input_var.evaluate().value_unit
self.check_distr()
[docs]
def uncertainty(
self,
unc_sample,
rp=None,
calc_eai_exp=False,
calc_at_event=False,
relative_delta=True,
processes=1,
chunksize=None,
):
"""
Computes the differential impact between the reference (initial) and
future (final) for each sample in unc_data.sample_df.
By default, the aggregated average impact within a period of 1/frequency_unit
(impact.aai_agg) and the excess impact at return periods rp
(imppact.calc_freq_curve(self.rp).impact) is computed.
Optionally, eai_exp and at_event is computed (this may require
a larger amount of memory if the number of samples and/or the number
of centroids and/or exposures points is large).
For all metrics, the impacts are caculated first and the the difference
thereof is computed.
For example:
(impact_final.aai_agg - impact_inital.aai_agg / impact_inital.aai_agg)
This sets the attributes self.rp, self.calc_eai_exp,
self.calc_at_event, self.metrics.
This sets the attributes:
unc_output.aai_agg_unc_df,
unc_output.freq_curve_unc_df
unc_output.eai_exp_unc_df
unc_output.at_event_unc_df
unc_output.unit
Parameters
----------
unc_sample : climada.engine.uncertainty.unc_output.UncOutput
Uncertainty data object with the input parameters samples
rp : list(int), optional
Return periods in years to be computed.
The default is [5, 10, 20, 50, 100, 250].
calc_eai_exp : boolean, optional
Toggle computation of the impact at each centroid location.
The default is False.
calc_at_event : boolean, optional
Toggle computation of the impact for each event.
The default is False.
relative_delta : bool, optional
Normalize delta impacts by past impacts or not.
The default is True.
processes : int, optional
Number of CPUs to use for parralel computations.
The default is 1 (not parallel)
chunksize: int, optional
Size of the sample chunks for parallel processing.
Default is equal to the number of samples divided by the
number of processes.
Returns
-------
unc_output : climada.engine.uncertainty.unc_output.UncImpactOutput
Uncertainty data object with the delta impact outputs for each sample
and all the sample data copied over from unc_sample.
Raises
------
ValueError:
If no sampling parameters defined, the distribution cannot
be computed.
Notes
-----
Parallelization logic is described in the base class
here :py:class:`~climada.engine.unsequa.calc_base.Calc`
See Also
--------
climada.engine.impact:
compute impact and risk.
"""
if unc_sample.samples_df.empty:
raise ValueError(
"No sample was found. Please create one first"
"using UncImpact.make_sample(N)"
)
# copy may not be needed, but is kept to prevent potential
# data corruption issues. The computational cost should be
# minimal as only a list of floats is copied.'''
samples_df = unc_sample.samples_df.copy(deep=True)
if chunksize is None:
chunksize = _multiprocess_chunksize(samples_df, processes)
unit = self.value_unit
if rp is None:
rp = [5, 10, 20, 50, 100, 250]
self.rp = rp
self.calc_eai_exp = calc_eai_exp
self.calc_at_event = calc_at_event
self.relative_delta = relative_delta
one_sample = samples_df.iloc[0:1]
start = time.time()
self._compute_imp_metrics(one_sample, chunksize=1, processes=1)
elapsed_time = time.time() - start
self.est_comp_time(unc_sample.n_samples, elapsed_time, processes)
[
aai_agg_list,
freq_curve_list,
eai_exp_list,
at_event_list,
] = self._compute_imp_metrics(
samples_df, chunksize=chunksize, processes=processes
)
# Assign computed impact distribution data to self
aai_agg_unc_df = pd.DataFrame(aai_agg_list, columns=["aai_agg"])
freq_curve_unc_df = pd.DataFrame(
freq_curve_list, columns=["rp" + str(n) for n in rp]
)
eai_exp_unc_df = pd.DataFrame(eai_exp_list)
at_event_unc_df = pd.DataFrame(at_event_list)
if calc_eai_exp:
exp = self.exp_input_var.evaluate()
coord_df = exp.gdf[["latitude", "longitude"]]
else:
coord_df = pd.DataFrame([])
return UncImpactOutput(
samples_df=samples_df,
unit=unit,
aai_agg_unc_df=aai_agg_unc_df,
freq_curve_unc_df=freq_curve_unc_df,
eai_exp_unc_df=eai_exp_unc_df,
at_event_unc_df=at_event_unc_df,
coord_df=coord_df,
)
def _compute_imp_metrics(self, samples_df, chunksize, processes):
"""Compute the uncertainty metrics
Parameters
----------
samples_df : pd.DataFrame
dataframe of input parameter samples
chunksize : int
size of the samples chunks
processes : int
number of processes to use
Returns
-------
list
values of impact metrics per sample
"""
# Compute impact distributions
with log_level(level="ERROR", name_prefix="climada"):
p_iterator = _sample_parallel_iterator(
samples=samples_df,
chunksize=chunksize,
exp_initial_input_var=self.exp_initial_input_var,
impf_initial_input_var=self.impf_initial_input_var,
haz_initial_input_var=self.haz_initial_input_var,
exp_final_input_var=self.exp_final_input_var,
impf_final_input_var=self.impf_final_input_var,
haz_final_input_var=self.haz_final_input_var,
rp=self.rp,
calc_eai_exp=self.calc_eai_exp,
calc_at_event=self.calc_at_event,
relative_delta=self.relative_delta,
)
if processes > 1:
with mp.Pool(processes=processes) as pool:
LOGGER.info("Using %s CPUs.", processes)
imp_metrics = pool.starmap(_map_impact_calc, p_iterator)
else:
imp_metrics = itertools.starmap(_map_impact_calc, p_iterator)
# Perform the actual computation
with log_level(level="ERROR", name_prefix="climada"):
return _transpose_chunked_data(imp_metrics)
def _map_impact_calc(
sample_chunks,
exp_initial_input_var,
impf_initial_input_var,
haz_initial_input_var,
exp_final_input_var,
impf_final_input_var,
haz_final_input_var,
rp,
calc_eai_exp,
calc_at_event,
relative_delta,
):
"""
Map to compute impact for all parameter samples in parallel
Parameters
----------
sample_chunks : pd.DataFrame
Dataframe of the parameter samples
exp_input_var : InputVar or Exposures
Exposure uncertainty variable
impf_input_var : InputVar if ImpactFuncSet
Impact function set uncertainty variable
haz_input_var: InputVar or Hazard
Hazard uncertainty variable
rp : list(int)
List of the chosen return periods.
calc_eai_exp : bool
Compute eai_exp or not
calc_at_event : bool
Compute at_event or not
relative_delta : bool
Normalize delta impacts by past impacts or not
Returns
-------
: list
impact metrics list for all samples containing aai_agg, rp_curve,
eai_exp (np.array([]) if self.calc_eai_exp=False) and at_event
(np.array([]) if self.calc_at_event=False).
"""
uncertainty_values = []
for _, sample in sample_chunks.iterrows():
exp_initial_samples = sample[exp_initial_input_var.labels].to_dict()
impf_initial_samples = sample[impf_initial_input_var.labels].to_dict()
haz_initial_samples = sample[haz_initial_input_var.labels].to_dict()
exp_final_samples = sample[exp_final_input_var.labels].to_dict()
impf_final_samples = sample[impf_final_input_var.labels].to_dict()
haz_final_samples = sample[haz_final_input_var.labels].to_dict()
exp_initial = exp_initial_input_var.evaluate(**exp_initial_samples)
impf_initial = impf_initial_input_var.evaluate(**impf_initial_samples)
haz_initial = haz_initial_input_var.evaluate(**haz_initial_samples)
exp_final = exp_final_input_var.evaluate(**exp_final_samples)
impf_final = impf_final_input_var.evaluate(**impf_final_samples)
haz_final = haz_final_input_var.evaluate(**haz_final_samples)
exp_initial.assign_centroids(haz_initial, overwrite=False)
exp_final.assign_centroids(haz_final, overwrite=False)
imp_initial = ImpactCalc(
exposures=exp_initial, impfset=impf_initial, hazard=haz_initial
).impact(assign_centroids=False, save_mat=False)
imp_final = ImpactCalc(
exposures=exp_final, impfset=impf_final, hazard=haz_final
).impact(assign_centroids=False, save_mat=False)
# Extract from climada.impact the chosen metrics
freq_curve_initial = imp_initial.calc_freq_curve(rp).impact
freq_curve_final = imp_final.calc_freq_curve(rp).impact
if calc_eai_exp:
eai_exp_initial = imp_initial.eai_exp
eai_exp_final = imp_final.eai_exp
else:
eai_exp_initial = np.array([])
eai_exp_final = np.array([])
if calc_at_event:
at_event_initial = imp_initial.at_event
at_event_final = imp_final.at_event
else:
at_event_initial = np.array([])
at_event_final = np.array([])
if relative_delta:
delta_func = lambda x, y: safe_divide(x - y, y)
else:
delta_func = lambda x, y: x - y
delta_aai_agg = delta_func(
imp_final.aai_agg, imp_initial.aai_agg
)
delta_freq_curve = delta_func(
freq_curve_final, freq_curve_initial
)
delta_eai_exp = (
delta_func(eai_exp_final, eai_exp_initial)
if calc_eai_exp
else np.array([])
)
delta_at_event = (
delta_func(at_event_final, at_event_initial)
if calc_at_event
else np.array([])
)
uncertainty_values.append(
[delta_aai_agg, delta_freq_curve, delta_eai_exp, delta_at_event]
)
return list(zip(*uncertainty_values))