"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU Lesser General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Define Measure class.
"""
__all__ = ['Measure']
import copy
import logging
import numpy as np
import pandas as pd
from climada.entity.exposures.base import Exposures, INDICATOR_IF, INDICATOR_CENTR
import climada.util.checker as check
LOGGER = logging.getLogger(__name__)
IF_ID_FACT = 1000
""" Factor internally used as id for impact functions when region selected."""
NULL_STR = 'nil'
""" String considered as no path in measures exposures_set and hazard_set or
no string in imp_fun_map"""
[docs]class Measure():
"""Contains the definition of one measure.
Attributes:
name (str): name of the action
haz_type (str): related hazard type (peril), e.g. TC
color_rgb (np.array): integer array of size 3. Gives color code of
this measure in RGB
cost (float): discounted cost (in same units as assets)
hazard_set (str): file name of hazard to use (in h5 format)
hazard_freq_cutoff (float): hazard frequency cutoff
exposures_set (str): file name of exposure to use (in h5 format) or
Exposure instance
imp_fun_map (str): change of impact function id of exposures, e.g. '1to3'
hazard_inten_imp (tuple): parameter a and b of hazard intensity change
mdd_impact (tuple): parameter a and b of the impact over the mean
damage degree
paa_impact (tuple): parameter a and b of the impact over the
percentage of affected assets
exp_region_id (int): region id of the selected exposures to consider ALL
the previous parameters
risk_transf_attach (float): risk transfer attachment
risk_transf_cover (float): risk transfer cover
risk_transf_cost_factor (float): factor to multiply to resulting
insurance layer to get the total cost of risk transfer
"""
[docs] def __init__(self):
""" Empty initialization."""
self.name = ''
self.haz_type = ''
self.color_rgb = np.array([0, 0, 0])
self.cost = 0
# related to change in hazard
self.hazard_set = NULL_STR
self.hazard_freq_cutoff = 0
# related to change in exposures
self.exposures_set = NULL_STR
self.imp_fun_map = NULL_STR # ids of impact functions to change e.g. 1to10
# related to change in impact functions
self.hazard_inten_imp = (1, 0) # parameter a and b
self.mdd_impact = (1, 0) # parameter a and b
self.paa_impact = (1, 0) # parameter a and b
# related to change in region
self.exp_region_id = []
# risk transfer
self.risk_transf_attach = 0
self.risk_transf_cover = 0
self.risk_transf_cost_factor = 1
[docs] def check(self):
""" Check consistent instance data.
Raises:
ValueError
"""
try:
check.size(3, self.color_rgb, 'Measure.color_rgb')
except ValueError:
check.size(4, self.color_rgb, 'Measure.color_rgb')
check.size(2, self.hazard_inten_imp, 'Measure.hazard_inten_imp')
check.size(2, self.mdd_impact, 'Measure.mdd_impact')
check.size(2, self.paa_impact, 'Measure.paa_impact')
[docs] def calc_impact(self, exposures, imp_fun_set, hazard):
"""Apply measure and compute impact and risk transfer of measure
implemented over inputs.
Parameters:
exposures (Exposures): exposures instance
imp_fun_set (ImpactFuncSet): impact functions instance
hazard (Hazard): hazard instance
Returns:
Impact (resulting impact), Impact (insurance layer)
"""
new_exp, new_ifs, new_haz = self.apply(exposures, imp_fun_set, hazard)
return self._calc_impact(new_exp, new_ifs, new_haz)
[docs] def apply(self, exposures, imp_fun_set, hazard):
"""Implement measure with all its defined parameters.
Parameters:
exposures (Exposures): exposures instance
imp_fun_set (ImpactFuncSet): impact functions instance
hazard (Hazard): hazard instance
Returns:
Exposures, ImpactFuncSet, Hazard
"""
# change hazard
new_haz = self._change_all_hazard(hazard)
# change exposures
new_exp = self._change_all_exposures(exposures)
new_exp = self._change_exposures_if(new_exp)
# change impact functions
new_ifs = self._change_imp_func(imp_fun_set)
# cutoff events whose damage happen with high frequency (in region if specified)
new_haz = self._cutoff_hazard_damage(new_exp, new_ifs, new_haz)
# apply all previous changes only to the selected exposures
new_exp, new_ifs, new_haz = self._filter_exposures(exposures, \
imp_fun_set, hazard, new_exp, new_ifs, new_haz)
return new_exp, new_ifs, new_haz
def _calc_impact(self, new_exp, new_ifs, new_haz):
"""Compute impact and risk transfer of measure implemented over inputs.
Parameters:
new_exp (Exposures): exposures once measure applied
new_ifs (ImpactFuncSet): impact functions once measure applied
new_haz (Hazard): hazard once measure applied
Returns:
Impact, Impact
"""
from climada.engine.impact import Impact
imp = Impact()
imp.calc(new_exp, new_ifs, new_haz)
return imp.calc_risk_transfer(self.risk_transf_attach, self.risk_transf_cover)
def _change_all_hazard(self, hazard):
"""Change hazard to provided hazard_set.
Parameters:
hazard (Hazard): hazard instance
Returns:
Hazard
"""
if self.hazard_set == NULL_STR:
return hazard
LOGGER.debug('Setting new hazard %s', self.hazard_set)
from climada.hazard.base import Hazard
new_haz = Hazard(hazard.tag.haz_type)
new_haz.read_hdf5(self.hazard_set)
new_haz.check()
return new_haz
def _change_all_exposures(self, exposures):
"""Change exposures to provided exposures_set.
Parameters:
exposures (Exposures): exposures instance
Returns:
Exposures
"""
if isinstance(self.exposures_set, str) and self.exposures_set == NULL_STR:
return exposures
if isinstance(self.exposures_set, str):
LOGGER.debug('Setting new exposures %s', self.exposures_set)
new_exp = Exposures()
new_exp.read_hdf5(self.exposures_set)
new_exp.check()
elif isinstance(self.exposures_set, Exposures):
LOGGER.debug('Setting new exposures. ')
new_exp = copy.deepcopy(self.exposures_set)
new_exp.check()
else:
LOGGER.error('Wrong input exposures.')
raise ValueError
if not np.array_equal(np.unique(exposures.latitude.values),
np.unique(new_exp.latitude.values)) or \
not np.array_equal(np.unique(exposures.longitude.values),
np.unique(new_exp.longitude.values)):
LOGGER.warning('Exposures locations have changed.')
return new_exp
def _change_exposures_if(self, exposures):
""" Change exposures impact functions ids according to imp_fun_map.
Parameters:
exposures (Exposures): exposures instance
"""
if self.imp_fun_map == NULL_STR:
return exposures
LOGGER.debug('Setting new exposures impact functions%s', self.imp_fun_map)
new_exp = copy.deepcopy(exposures)
from_id = int(self.imp_fun_map[0:self.imp_fun_map.find('to')])
to_id = int(self.imp_fun_map[self.imp_fun_map.find('to')+2:])
try:
exp_change = np.argwhere(new_exp[INDICATOR_IF+self.haz_type].values == from_id).\
reshape(-1)
new_exp[INDICATOR_IF+self.haz_type].values[exp_change] = to_id
except KeyError:
exp_change = np.argwhere(new_exp[INDICATOR_IF].values == from_id).\
reshape(-1)
new_exp[INDICATOR_IF].values[exp_change] = to_id
return new_exp
def _change_imp_func(self, imp_set):
"""Apply measure to impact functions of the same hazard type.
Parameters:
imp_set (ImpactFuncSet): impact functions to be modified
Returns:
ImpactFuncSet
"""
if self.hazard_inten_imp == (1, 0) and self.mdd_impact == (1, 0)\
and self.paa_impact == (1, 0):
return imp_set
new_imp_set = copy.deepcopy(imp_set)
for imp_fun in new_imp_set.get_func(self.haz_type):
LOGGER.debug('Transforming impact functions.')
imp_fun.intensity = np.maximum(imp_fun.intensity * \
self.hazard_inten_imp[0] - self.hazard_inten_imp[1], 0.0)
imp_fun.mdd = np.maximum(imp_fun.mdd * self.mdd_impact[0] + \
self.mdd_impact[1], 0.0)
imp_fun.paa = np.maximum(imp_fun.paa * self.paa_impact[0] + \
self.paa_impact[1], 0.0)
if not new_imp_set.size():
LOGGER.info('No impact function of hazard %s found.', self.haz_type)
return new_imp_set
def _cutoff_hazard_damage(self, exposures, if_set, hazard):
"""Cutoff of hazard events which generate damage with a frequency higher
than hazard_freq_cutoff.
Parameters:
exposures (Exposures): exposures instance
imp_set (ImpactFuncSet): impact functions instance
hazard (Hazard): hazard instance
Returns:
Hazard
"""
if self.hazard_freq_cutoff == 0:
return hazard
from climada.engine.impact import Impact
imp = Impact()
exp_imp = exposures
if self.exp_region_id:
# compute impact only in selected region
in_reg = np.logical_or.reduce([exposures.region_id.values == reg for reg
in self.exp_region_id])
exp_imp = exposures[in_reg]
exp_imp = Exposures(exp_imp, crs=exposures.crs)
imp.calc(exp_imp, if_set, hazard)
LOGGER.debug('Cutting events whose damage have a frequency > %s.',
self.hazard_freq_cutoff)
new_haz = copy.deepcopy(hazard)
sort_idxs = np.argsort(imp.at_event)[::-1]
exceed_freq = np.cumsum(imp.frequency[sort_idxs])
cutoff = exceed_freq > self.hazard_freq_cutoff
sel_haz = sort_idxs[cutoff]
for row in sel_haz:
new_haz.intensity.data[new_haz.intensity.indptr[row]: \
new_haz.intensity.indptr[row+1]] = 0
new_haz.intensity.eliminate_zeros()
return new_haz
def _filter_exposures(self, exposures, imp_set, hazard, new_exp, new_ifs,
new_haz):
""" Incorporate changes of new elements to previous ones only for the
selected exp_region_id. If exp_region_id is [], all new changes
will be accepted.
Parameters:
exposures (Exposures): old exposures instance
imp_set (ImpactFuncSet): old impact functions instance
hazard (Hazard): old hazard instance
new_exp (Exposures): new exposures instance
new_ifs (ImpactFuncSet): new impact functions instance
new_haz (Hazard): new hazard instance
Returns:
Exposures, ImpactFuncSet, Hazard
"""
if not self.exp_region_id:
return new_exp, new_ifs, new_haz
if exposures is new_exp:
new_exp = copy.deepcopy(exposures)
chg_reg = np.logical_or.reduce([exposures.region_id.values == reg for reg
in self.exp_region_id])
no_chg_reg = np.argwhere(np.logical_not(chg_reg)).reshape(-1)
chg_reg = np.argwhere(chg_reg).reshape(-1)
LOGGER.debug('Number of changed exposures: %s', chg_reg.size)
if imp_set is not new_ifs:
# provide new impact functions ids to changed impact functions
fun_ids = list(new_ifs.get_func()[self.haz_type].keys())
for key in fun_ids:
new_ifs.get_func()[self.haz_type][key].id = key + IF_ID_FACT
new_ifs.get_func()[self.haz_type][key + IF_ID_FACT] = \
new_ifs.get_func()[self.haz_type][key]
try:
new_exp[INDICATOR_IF+self.haz_type] += IF_ID_FACT
except KeyError:
new_exp[INDICATOR_IF] += IF_ID_FACT
# collect old impact functions as well (used by exposures)
new_ifs.get_func()[self.haz_type].update(imp_set.get_func()[self.haz_type])
# concatenate previous and new exposures
new_exp = pd.concat([exposures.iloc[no_chg_reg], new_exp.iloc[chg_reg]])
# set missing values of centr_
if INDICATOR_CENTR+self.haz_type in new_exp.columns and \
np.isnan(new_exp[INDICATOR_CENTR+self.haz_type].values).any():
new_exp.drop(columns=INDICATOR_CENTR+self.haz_type, inplace=True)
elif INDICATOR_CENTR in new_exp.columns and \
np.isnan(new_exp[INDICATOR_CENTR].values).any():
new_exp.drop(columns=INDICATOR_CENTR, inplace=True)
# put hazard intensities outside region to previous intensities
if hazard is not new_haz:
if INDICATOR_CENTR+self.haz_type in exposures.columns:
centr = exposures[INDICATOR_CENTR+self.haz_type].values[chg_reg]
elif INDICATOR_CENTR in exposures.columns:
centr = exposures[INDICATOR_CENTR].values[chg_reg]
else:
exposures.assign_centroids(hazard)
centr = exposures[INDICATOR_CENTR+self.haz_type].values[chg_reg]
centr = np.delete(np.arange(hazard.intensity.shape[1]), np.unique(centr))
new_haz_inten = new_haz.intensity.tolil()
new_haz_inten[:, centr] = hazard.intensity[:, centr]
new_haz.intensity = new_haz_inten.tocsr()
return new_exp, new_ifs, new_haz