"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Define Impact and ImpactFreqCurve classes.
"""
__all__ = ['ImpactFreqCurve', 'Impact']
import logging
import copy
import csv
import warnings
import datetime as dt
from itertools import zip_longest
import numpy as np
from scipy import sparse
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import pandas as pd
import xlsxwriter
from tqdm import tqdm
from climada.entity import Exposures, Tag
from climada.entity.exposures import INDICATOR_CENTR
from climada.hazard import Tag as TagHaz
import climada.util.plot as u_plot
from climada import CONFIG
from climada.util.constants import DEF_CRS, CMAP_IMPACT
import climada.util.coordinates as u_coord
import climada.util.dates_times as u_dt
from climada.util.select import get_attributes_with_matching_dimension
LOGGER = logging.getLogger(__name__)
[docs]class Impact():
"""Impact definition. Compute from an entity (exposures and impact
functions) and hazard.
Attributes
----------
tag : dict
dictionary of tags of exposures, impact functions set and
hazard: {'exp': Tag(), 'impf_set': Tag(), 'haz': TagHazard()}
event_id :
np.array id (>0) of each hazard event
event_name :
list name of each hazard event
date : np.array
date if events as integer date corresponding to the
proleptic Gregorian ordinal, where January 1 of year 1 has
ordinal 1 (ordinal format of datetime library)
coord_exp : np.array
exposures coordinates [lat, lon] (in degrees)
eai_exp : np.array
expected annual impact for each exposure
at_event : np.array
impact for each hazard event
frequency : np.array
annual frequency of event
tot_value : float
total exposure value affected
aai_agg : float
average annual impact (aggregated)
unit : str
value unit used (given by exposures unit)
imp_mat : sparse.csr_matrix
matrix num_events x num_exp with impacts.
only filled if save_mat is True in calc()
"""
[docs] def __init__(self):
"""Empty initialization."""
self.tag = dict()
self.event_id = np.array([], int)
self.event_name = list()
self.date = np.array([], int)
self.coord_exp = np.ndarray([], float)
self.crs = DEF_CRS
self.eai_exp = np.array([])
self.at_event = np.array([])
self.frequency = np.array([])
self.tot_value = 0
self.aai_agg = 0
self.unit = ''
self.imp_mat = sparse.csr_matrix(np.empty((0, 0)))
[docs] def calc_freq_curve(self, return_per=None):
"""Compute impact exceedance frequency curve.
Parameters
----------
return_per : np.array, optional
return periods where to compute
the exceedance impact. Use impact's frequencies if not provided
Returns
-------
ImpactFreqCurve
"""
ifc = ImpactFreqCurve()
ifc.tag = self.tag
# Sort descendingly the impacts per events
sort_idxs = np.argsort(self.at_event)[::-1]
# Calculate exceedence frequency
exceed_freq = np.cumsum(self.frequency[sort_idxs])
# Set return period and imact exceeding frequency
ifc.return_per = 1 / exceed_freq[::-1]
ifc.impact = self.at_event[sort_idxs][::-1]
ifc.unit = self.unit
ifc.label = 'Exceedance frequency curve'
if return_per is not None:
interp_imp = np.interp(return_per, ifc.return_per, ifc.impact)
ifc.return_per = return_per
ifc.impact = interp_imp
return ifc
[docs] def calc(self, exposures, impact_funcs, hazard, save_mat=False):
"""Compute impact of an hazard to exposures.
Parameters
----------
exposures : climada.entity.Exposures
impact_funcs : climada.entity.ImpactFuncSet
impact functions
hazard : climada.Hazard
save_mat : bool
self impact matrix: events x exposures
Examples
--------
Use Entity class:
>>> haz = Hazard.from_mat(HAZ_DEMO_MAT) # Set hazard
>>> haz.check()
>>> ent = Entity.from_excel(ENT_TEMPLATE_XLS) # Set exposures
>>> ent.check()
>>> imp = Impact()
>>> imp.calc(ent.exposures, ent.impact_funcs, haz)
>>> imp.calc_freq_curve().plot()
Specify only exposures and impact functions:
>>> haz = Hazard.from_mat(HAZ_DEMO_MAT) # Set hazard
>>> haz.check()
>>> funcs = ImpactFuncSet.from_excel(ENT_TEMPLATE_XLS) # Set impact functions
>>> funcs.check()
>>> exp = Exposures(pd.read_excel(ENT_TEMPLATE_XLS)) # Set exposures
>>> exp.check()
>>> imp = Impact()
>>> imp.calc(exp, funcs, haz)
>>> imp.aai_agg
"""
# 1. Assign centroids to each exposure if not done
assign_haz = INDICATOR_CENTR + hazard.tag.haz_type
if assign_haz not in exposures.gdf:
exposures.assign_centroids(hazard)
else:
LOGGER.info('Exposures matching centroids found in %s', assign_haz)
# 2. Initialize values
self.unit = exposures.value_unit
self.event_id = hazard.event_id
self.event_name = hazard.event_name
self.date = hazard.date
self.coord_exp = np.stack([exposures.gdf.latitude.values,
exposures.gdf.longitude.values], axis=1)
self.frequency = hazard.frequency
self.at_event = np.zeros(hazard.intensity.shape[0])
self.eai_exp = np.zeros(exposures.gdf.value.size)
self.tag = {'exp': exposures.tag, 'impf_set': impact_funcs.tag,
'haz': hazard.tag}
self.crs = exposures.crs
# Select exposures with positive value and assigned centroid
exp_idx = np.where((exposures.gdf.value > 0) & (exposures.gdf[assign_haz] >= 0))[0]
if exp_idx.size == 0:
LOGGER.warning("No affected exposures.")
num_events = hazard.intensity.shape[0]
LOGGER.info('Calculating damage for %s assets (>0) and %s events.',
exp_idx.size, num_events)
# Get damage functions for this hazard
impf_haz = exposures.get_impf_column(hazard.tag.haz_type)
haz_imp = impact_funcs.get_func(hazard.tag.haz_type)
# Check if deductible and cover should be applied
insure_flag = False
if ('deductible' in exposures.gdf) and ('cover' in exposures.gdf) \
and exposures.gdf.cover.max():
insure_flag = True
if save_mat:
# (data, (row_ind, col_ind))
self.imp_mat = ([], ([], []))
# 3. Loop over exposures according to their impact function
tot_exp = 0
for imp_fun in haz_imp:
# get indices of all the exposures with this impact function
exp_iimp = np.where(exposures.gdf[impf_haz].values[exp_idx] == imp_fun.id)[0]
tot_exp += exp_iimp.size
exp_step = CONFIG.max_matrix_size.int() // num_events
if not exp_step:
raise ValueError('Increase max_matrix_size configuration parameter to > %s'
% str(num_events))
# separate in chunks
chk = -1
for chk in range(int(exp_iimp.size / exp_step)):
self._exp_impact(
exp_idx[exp_iimp[chk * exp_step:(chk + 1) * exp_step]],
exposures, hazard, imp_fun, insure_flag)
self._exp_impact(exp_idx[exp_iimp[(chk + 1) * exp_step:]],
exposures, hazard, imp_fun, insure_flag)
if not tot_exp:
LOGGER.warning('No impact functions match the exposures.')
self.aai_agg = sum(self.at_event * hazard.frequency)
if save_mat:
shape = (self.date.size, exposures.gdf.value.size)
self.imp_mat = sparse.csr_matrix(self.imp_mat, shape=shape)
[docs] def calc_risk_transfer(self, attachment, cover):
"""Compute traaditional risk transfer over impact. Returns new impact
with risk transfer applied and the insurance layer resulting Impact metrics.
Parameters
----------
attachment : float
(deductible)
cover : float
Returns
-------
climada.engine.Impact
"""
new_imp = copy.deepcopy(self)
if attachment or cover:
imp_layer = np.minimum(np.maximum(new_imp.at_event - attachment, 0), cover)
new_imp.at_event = np.maximum(new_imp.at_event - imp_layer, 0)
new_imp.aai_agg = np.sum(new_imp.at_event * new_imp.frequency)
# next values are no longer valid
new_imp.eai_exp = np.array([])
new_imp.coord_exp = np.array([])
new_imp.imp_mat = sparse.csr_matrix(np.empty((0, 0)))
# insurance layer metrics
risk_transfer = copy.deepcopy(new_imp)
risk_transfer.at_event = imp_layer
risk_transfer.aai_agg = np.sum(imp_layer * new_imp.frequency)
return new_imp, risk_transfer
return new_imp, Impact()
[docs] def plot_hexbin_eai_exposure(self, mask=None, ignore_zero=True,
pop_name=True, buffer=0.0, extend='neither',
axis=None, adapt_fontsize=True, **kwargs):
"""Plot hexbin expected annual impact of each exposure.
Parameters
----------
mask : np.array, optional
mask to apply to eai_exp plotted.
ignore_zero : bool, optional
flag to indicate if zero and negative
values are ignored in plot. Default: False
pop_name : bool, optional
add names of the populated places
buffer : float, optional
border to add to coordinates.
Default: 1.0.
extend : str, optional
extend border colorbar with arrows.
[ 'neither' | 'both' | 'min' | 'max' ]
axis : matplotlib.axes._subplots.AxesSubplot, optional
axis to use
kwargs : optional
arguments for hexbin matplotlib function
Returns
-------
cartopy.mpl.geoaxes.GeoAxesSubplot
"""
if 'cmap' not in kwargs:
kwargs['cmap'] = CMAP_IMPACT
eai_exp = self._build_exp()
axis = eai_exp.plot_hexbin(mask, ignore_zero, pop_name, buffer,
extend, axis=axis, adapt_fontsize=adapt_fontsize, **kwargs)
axis.set_title('Expected annual impact')
return axis
[docs] def plot_scatter_eai_exposure(self, mask=None, ignore_zero=True,
pop_name=True, buffer=0.0, extend='neither',
axis=None, adapt_fontsize=True, **kwargs):
"""Plot scatter expected annual impact of each exposure.
Parameters
----------
mask : np.array, optional
mask to apply to eai_exp plotted.
ignore_zero : bool, optional
flag to indicate if zero and negative
values are ignored in plot. Default: False
pop_name : bool, optional
add names of the populated places
buffer : float, optional
border to add to coordinates.
Default: 1.0.
extend : str
optional extend border colorbar with arrows.
[ 'neither' | 'both' | 'min' | 'max' ]
axis : matplotlib.axes._subplots.AxesSubplot, optional
axis to use
adapt_fontsize : bool, optional
If set to true, the size of the fonts will be adapted to the size of the figure. Otherwise
the default matplotlib font size is used. Default is True.
kwargs : optional
arguments for hexbin matplotlib function
Returns
-------
cartopy.mpl.geoaxes.GeoAxesSubplot
"""
if 'cmap' not in kwargs:
kwargs['cmap'] = CMAP_IMPACT
eai_exp = self._build_exp()
axis = eai_exp.plot_scatter(mask, ignore_zero, pop_name, buffer,
extend, axis=axis, adapt_fontsize=adapt_fontsize, **kwargs)
axis.set_title('Expected annual impact')
return axis
[docs] def plot_raster_eai_exposure(self, res=None, raster_res=None, save_tiff=None,
raster_f=lambda x: np.log10((np.fmax(x + 1, 1))),
label='value (log10)', axis=None, adapt_fontsize=True,
**kwargs):
"""Plot raster expected annual impact of each exposure.
Parameters
----------
res : float, optional
resolution of current data in units of latitude
and longitude, approximated if not provided.
raster_res : float, optional
desired resolution of the raster
save_tiff : str, optional
file name to save the raster in tiff
format, if provided
raster_f : lambda function
transformation to use to data. Default: log10 adding 1.
label : str colorbar label
axis : matplotlib.axes._subplots.AxesSubplot, optional
axis to use
adapt_fontsize : bool, optional
If set to true, the size of the fonts will be adapted to the size of the figure. Otherwise
the default matplotlib font size is used. Default is True.
kwargs : optional
arguments for imshow matplotlib function
Returns
-------
cartopy.mpl.geoaxes.GeoAxesSubplot
"""
eai_exp = self._build_exp()
axis = eai_exp.plot_raster(res, raster_res, save_tiff, raster_f,
label, axis=axis, adapt_fontsize=adapt_fontsize, **kwargs)
axis.set_title('Expected annual impact')
return axis
[docs] def plot_basemap_eai_exposure(self, mask=None, ignore_zero=False, pop_name=True,
buffer=0.0, extend='neither', zoom=10,
url='http://tile.stamen.com/terrain/tileZ/tileX/tileY.png',
axis=None, **kwargs):
"""Plot basemap expected annual impact of each exposure.
Parameters
----------
mask : np.array, optional
mask to apply to eai_exp plotted.
ignore_zero : bool, optional
flag to indicate if zero and negative
values are ignored in plot. Default: False
pop_name : bool, optional
add names of the populated places
buffer : float, optional
border to add to coordinates. Default: 0.0.
extend : str, optional
extend border colorbar with arrows.
[ 'neither' | 'both' | 'min' | 'max' ]
zoom : int, optional
zoom coefficient used in the satellite image
url : str, optional
image source, e.g. ctx.sources.OSM_C
axis : matplotlib.axes._subplots.AxesSubplot, optional
axis to use
kwargs : optional
arguments for scatter matplotlib function, e.g.
cmap='Greys'. Default: 'Wistia'
Returns
-------
cartopy.mpl.geoaxes.GeoAxesSubplot
"""
if 'cmap' not in kwargs:
kwargs['cmap'] = CMAP_IMPACT
eai_exp = self._build_exp()
axis = eai_exp.plot_basemap(mask, ignore_zero, pop_name, buffer,
extend, zoom, url, axis=axis, **kwargs)
axis.set_title('Expected annual impact')
return axis
[docs] def plot_hexbin_impact_exposure(self, event_id=1, mask=None, ignore_zero=True,
pop_name=True, buffer=0.0, extend='neither',
axis=None, adapt_fontsize=True, **kwargs):
"""Plot hexbin impact of an event at each exposure.
Requires attribute imp_mat.
Parameters
----------
event_id : int, optional
id of the event for which to plot the impact.
Default: 1.
mask : np.array, optional
mask to apply to impact plotted.
ignore_zero : bool, optional
flag to indicate if zero and negative
values are ignored in plot. Default: False
pop_name : bool, optional
add names of the populated places
buffer : float, optional
border to add to coordinates.
Default: 1.0.
extend : str, optional
extend border colorbar with arrows.
[ 'neither' | 'both' | 'min' | 'max' ]
kwargs : optional
arguments for hexbin matplotlib function
axis : matplotlib.axes._subplots.AxesSubplot
optional axis to use
adapt_fontsize : bool, optional
If set to true, the size of the fonts will be adapted to the size of the figure. Otherwise
the default matplotlib font size is used. Default is True.
Returns
--------
matplotlib.figure.Figure, cartopy.mpl.geoaxes.GeoAxesSubplot
"""
if not hasattr(self.imp_mat, "shape") or self.imp_mat.shape[1] == 0:
raise ValueError('attribute imp_mat is empty. Recalculate Impact'
'instance with parameter save_mat=True')
if 'cmap' not in kwargs:
kwargs['cmap'] = CMAP_IMPACT
impact_at_events_exp = self._build_exp_event(event_id)
axis = impact_at_events_exp.plot_hexbin(mask, ignore_zero, pop_name,
buffer, extend, axis=axis, adapt_fontsize=adapt_fontsize,
**kwargs)
return axis
[docs] def plot_basemap_impact_exposure(self, event_id=1, mask=None, ignore_zero=True,
pop_name=True, buffer=0.0, extend='neither', zoom=10,
url='http://tile.stamen.com/terrain/tileZ/tileX/tileY.png',
axis=None, **kwargs):
"""Plot basemap impact of an event at each exposure.
Requires attribute imp_mat.
Parameters
----------
event_id : int, optional
id of the event for which to plot the impact.
Default: 1.
mask : np.array, optional
mask to apply to impact plotted.
ignore_zero : bool, optional
flag to indicate if zero and negative
values are ignored in plot. Default: False
pop_name : bool, optional
add names of the populated places
buffer : float, optional
border to add to coordinates. Default: 0.0.
extend : str, optional
extend border colorbar with arrows.
[ 'neither' | 'both' | 'min' | 'max' ]
zoom : int, optional
zoom coefficient used in the satellite image
url : str, optional
image source, e.g. ctx.sources.OSM_C
axis : matplotlib.axes._subplots.AxesSubplot, optional axis to use
kwargs : optional arguments for scatter matplotlib function, e.g.
cmap='Greys'. Default: 'Wistia'
Returns
-------
cartopy.mpl.geoaxes.GeoAxesSubplot
"""
if not hasattr(self.imp_mat, "shape") or self.imp_mat.shape[1] == 0:
raise ValueError('attribute imp_mat is empty. Recalculate Impact'
'instance with parameter save_mat=True')
if event_id not in self.event_id:
raise ValueError(f'Event ID {event_id} not found')
if 'cmap' not in kwargs:
kwargs['cmap'] = CMAP_IMPACT
impact_at_events_exp = self._build_exp_event(event_id)
axis = impact_at_events_exp.plot_basemap(mask, ignore_zero, pop_name,
buffer, extend, zoom, url, axis=axis, **kwargs)
return axis
[docs] def write_csv(self, file_name):
"""Write data into csv file. imp_mat is not saved.
Parameters
----------
file_name : str
absolute path of the file
"""
LOGGER.info('Writing %s', file_name)
with open(file_name, "w") as imp_file:
imp_wr = csv.writer(imp_file)
imp_wr.writerow(["tag_hazard", "tag_exposure", "tag_impact_func",
"unit", "tot_value", "aai_agg", "event_id",
"event_name", "event_date", "event_frequency",
"at_event", "eai_exp", "exp_lat", "exp_lon", "exp_crs"])
csv_data = [[[self.tag['haz'].haz_type], [self.tag['haz'].file_name],
[self.tag['haz'].description]],
[[self.tag['exp'].file_name], [self.tag['exp'].description]],
[[self.tag['impf_set'].file_name], [self.tag['impf_set'].description]],
[self.unit], [self.tot_value], [self.aai_agg],
self.event_id, self.event_name, self.date,
self.frequency, self.at_event,
self.eai_exp, self.coord_exp[:, 0], self.coord_exp[:, 1],
[str(self.crs)]]
for values in zip_longest(*csv_data):
imp_wr.writerow(values)
[docs] def write_excel(self, file_name):
"""Write data into Excel file. imp_mat is not saved.
Parameters
----------
file_name : str
absolute path of the file
"""
LOGGER.info('Writing %s', file_name)
def write_col(i_col, imp_ws, xls_data):
"""Write one measure"""
row_ini = 1
for dat_row in xls_data:
imp_ws.write(row_ini, i_col, dat_row)
row_ini += 1
imp_wb = xlsxwriter.Workbook(file_name)
imp_ws = imp_wb.add_worksheet()
header = ["tag_hazard", "tag_exposure", "tag_impact_func",
"unit", "tot_value", "aai_agg", "event_id",
"event_name", "event_date", "event_frequency",
"at_event", "eai_exp", "exp_lat", "exp_lon", "exp_crs"]
for icol, head_dat in enumerate(header):
imp_ws.write(0, icol, head_dat)
data = [self.tag['haz'].haz_type, str(self.tag['haz'].file_name),
str(self.tag['haz'].description)]
write_col(0, imp_ws, data)
data = [str(self.tag['exp'].file_name), str(self.tag['exp'].description)]
write_col(1, imp_ws, data)
data = [str(self.tag['impf_set'].file_name), str(self.tag['impf_set'].description)]
write_col(2, imp_ws, data)
write_col(3, imp_ws, [self.unit])
write_col(4, imp_ws, [self.tot_value])
write_col(5, imp_ws, [self.aai_agg])
write_col(6, imp_ws, self.event_id)
write_col(7, imp_ws, self.event_name)
write_col(8, imp_ws, self.date)
write_col(9, imp_ws, self.frequency)
write_col(10, imp_ws, self.at_event)
write_col(11, imp_ws, self.eai_exp)
write_col(12, imp_ws, self.coord_exp[:, 0])
write_col(13, imp_ws, self.coord_exp[:, 1])
write_col(14, imp_ws, [str(self.crs)])
imp_wb.close()
[docs] def write_sparse_csr(self, file_name):
"""Write imp_mat matrix in numpy's npz format."""
LOGGER.info('Writing %s', file_name)
np.savez(file_name, data=self.imp_mat.data, indices=self.imp_mat.indices,
indptr=self.imp_mat.indptr, shape=self.imp_mat.shape)
[docs] def calc_impact_year_set(self, all_years=True, year_range=None):
"""Calculate yearly impact from impact data.
Parameters
----------
all_years : boolean
return values for all years between first and
last year with event, including years without any events.
year_range : tuple or list with integers
start and end year
Returns
-------
Impact year set of type numpy.ndarray with summed impact per year.
"""
if year_range is None:
year_range = []
orig_year = np.array([dt.datetime.fromordinal(date).year
for date in self.date])
if orig_year.size == 0 and len(year_range) == 0:
return dict()
if orig_year.size == 0 or (len(year_range) > 0 and all_years):
years = np.arange(min(year_range), max(year_range) + 1)
elif all_years:
years = np.arange(min(orig_year), max(orig_year) + 1)
else:
years = np.array(sorted(np.unique(orig_year)))
if not len(year_range) == 0:
years = years[years >= min(year_range)]
years = years[years <= max(year_range)]
year_set = dict()
for year in years:
year_set[year] = sum(self.at_event[orig_year == year])
return year_set
[docs] def local_exceedance_imp(self, return_periods=(25, 50, 100, 250)):
"""Compute exceedance impact map for given return periods.
Requires attribute imp_mat.
Parameters
----------
return_periods : np.array return periods to consider
Returns
-------
np.array
"""
LOGGER.info('Computing exceedance impact map for return periods: %s',
return_periods)
try:
self.imp_mat.shape[1]
except AttributeError as err:
raise ValueError('attribute imp_mat is empty. Recalculate Impact'
'instance with parameter save_mat=True') from err
num_cen = self.imp_mat.shape[1]
imp_stats = np.zeros((len(return_periods), num_cen))
cen_step = CONFIG.max_matrix_size.int() // self.imp_mat.shape[0]
if not cen_step:
raise ValueError('Increase max_matrix_size configuration parameter to > %s'
% str(self.imp_mat.shape[0]))
# separte in chunks
chk = -1
for chk in range(int(num_cen / cen_step)):
self._loc_return_imp(np.array(return_periods),
self.imp_mat[:, chk * cen_step:(chk + 1) * cen_step].toarray(),
imp_stats[:, chk * cen_step:(chk + 1) * cen_step])
self._loc_return_imp(np.array(return_periods),
self.imp_mat[:, (chk + 1) * cen_step:].toarray(),
imp_stats[:, (chk + 1) * cen_step:])
return imp_stats
[docs] def plot_rp_imp(self, return_periods=(25, 50, 100, 250),
log10_scale=True, smooth=True, axis=None, **kwargs):
"""Compute and plot exceedance impact maps for different return periods.
Calls local_exceedance_imp.
Parameters
----------
return_periods : tuple(int), optional
return periods to consider
log10_scale : boolean, optional
plot impact as log10(impact)
smooth : bool, optional
smooth plot to plot.RESOLUTIONxplot.RESOLUTION
kwargs : optional
arguments for pcolormesh matplotlib function
used in event plots
Returns
--------
matplotlib.axes._subplots.AxesSubplot,
np.ndarray (return_periods.size x num_centroids)
"""
imp_stats = self.local_exceedance_imp(np.array(return_periods))
if imp_stats == []:
raise ValueError('Error: Attribute imp_mat is empty. Recalculate Impact'
'instance with parameter save_mat=True')
if log10_scale:
if np.min(imp_stats) < 0:
imp_stats_log = np.log10(abs(imp_stats) + 1)
colbar_name = 'Log10(abs(Impact)+1) (' + self.unit + ')'
elif np.min(imp_stats) < 1:
imp_stats_log = np.log10(imp_stats + 1)
colbar_name = 'Log10(Impact+1) (' + self.unit + ')'
else:
imp_stats_log = np.log10(imp_stats)
colbar_name = 'Log10(Impact) (' + self.unit + ')'
else:
imp_stats_log = imp_stats
colbar_name = 'Impact (' + self.unit + ')'
title = list()
for ret in return_periods:
title.append('Return period: ' + str(ret) + ' years')
axis = u_plot.geo_im_from_array(imp_stats_log, self.coord_exp,
colbar_name, title, smooth=smooth, axes=axis, **kwargs)
return axis, imp_stats
[docs] @staticmethod
def read_sparse_csr(file_name):
"""Read imp_mat matrix from numpy's npz format.
Parameters
----------
file_name : str file name
Returns
-------
sparse.csr_matrix
"""
LOGGER.info('Reading %s', file_name)
loader = np.load(file_name)
return sparse.csr_matrix(
(loader['data'], loader['indices'], loader['indptr']), shape=loader['shape'])
[docs] @classmethod
def from_csv(cls, file_name):
"""Read csv file containing impact data generated by write_csv.
Parameters
----------
file_name : str absolute path of the file
Returns
-------
imp : climada.engine.impact.Impact
Impact from csv file
"""
LOGGER.info('Reading %s', file_name)
imp_df = pd.read_csv(file_name)
imp = cls()
imp.unit = imp_df.unit[0]
imp.tot_value = imp_df.tot_value[0]
imp.aai_agg = imp_df.aai_agg[0]
imp.event_id = imp_df.event_id[~np.isnan(imp_df.event_id)].values
num_ev = imp.event_id.size
imp.event_name = imp_df.event_name[:num_ev].values.tolist()
imp.date = imp_df.event_date[:num_ev].values
imp.at_event = imp_df.at_event[:num_ev].values
imp.frequency = imp_df.event_frequency[:num_ev].values
imp.eai_exp = imp_df.eai_exp[~np.isnan(imp_df.eai_exp)].values
num_exp = imp.eai_exp.size
imp.coord_exp = np.zeros((num_exp, 2))
imp.coord_exp[:, 0] = imp_df.exp_lat[:num_exp]
imp.coord_exp[:, 1] = imp_df.exp_lon[:num_exp]
try:
imp.crs = u_coord.to_crs_user_input(imp_df.exp_crs.values[0])
except AttributeError:
imp.crs = DEF_CRS
imp.tag['haz'] = TagHaz(str(imp_df.tag_hazard[0]),
str(imp_df.tag_hazard[1]),
str(imp_df.tag_hazard[2]))
imp.tag['exp'] = Tag(str(imp_df.tag_exposure[0]),
str(imp_df.tag_exposure[1]))
imp.tag['impf_set'] = Tag(str(imp_df.tag_impact_func[0]),
str(imp_df.tag_impact_func[1]))
return imp
[docs] def read_csv(self, *args, **kwargs):
"""This function is deprecated, use Impact.from_csv instead."""
LOGGER.warning("The use of Impact.read_csv is deprecated."
"Use Impact.from_csv instead.")
self.__dict__ = Impact.from_csv(*args, **kwargs).__dict__
[docs] @classmethod
def from_excel(cls, file_name):
"""Read excel file containing impact data generated by write_excel.
Parameters
----------
file_name : str absolute path of the file
Returns
-------
imp : climada.engine.impact.Impact
Impact from excel file
"""
LOGGER.info('Reading %s', file_name)
dfr = pd.read_excel(file_name)
imp =cls()
imp.tag['haz'] = TagHaz()
imp.tag['haz'].haz_type = dfr['tag_hazard'][0]
imp.tag['haz'].file_name = dfr['tag_hazard'][1]
imp.tag['haz'].description = dfr['tag_hazard'][2]
imp.tag['exp'] = Tag()
imp.tag['exp'].file_name = dfr['tag_exposure'][0]
imp.tag['exp'].description = dfr['tag_exposure'][1]
imp.tag['impf_set'] = Tag()
imp.tag['impf_set'].file_name = dfr['tag_impact_func'][0]
imp.tag['impf_set'].description = dfr['tag_impact_func'][1]
imp.unit = dfr.unit[0]
imp.tot_value = dfr.tot_value[0]
imp.aai_agg = dfr.aai_agg[0]
imp.event_id = dfr.event_id[~np.isnan(dfr.event_id.values)].values
imp.event_name = dfr.event_name[:imp.event_id.size].values
imp.date = dfr.event_date[:imp.event_id.size].values
imp.frequency = dfr.event_frequency[:imp.event_id.size].values
imp.at_event = dfr.at_event[:imp.event_id.size].values
imp.eai_exp = dfr.eai_exp[~np.isnan(dfr.eai_exp.values)].values
imp.coord_exp = np.zeros((imp.eai_exp.size, 2))
imp.coord_exp[:, 0] = dfr.exp_lat.values[:imp.eai_exp.size]
imp.coord_exp[:, 1] = dfr.exp_lon.values[:imp.eai_exp.size]
try:
imp.crs = u_coord.to_csr_user_input(dfr.exp_crs.values[0])
except AttributeError:
imp.crs = DEF_CRS
return imp
[docs] def read_excel(self, *args, **kwargs):
"""This function is deprecated, use Impact.from_excel instead."""
LOGGER.warning("The use of Impact.read_excel is deprecated."
"Use Impact.from_excel instead.")
self.__dict__ = Impact.from_excel(*args, **kwargs).__dict__
[docs] @staticmethod
def video_direct_impact(exp, impf_set, haz_list, file_name='',
writer=animation.PillowWriter(bitrate=500),
imp_thresh=0, args_exp=None, args_imp=None):
"""
Computes and generates video of accumulated impact per input events
over exposure.
Parameters
----------
exp : Exposures
exposures instance, constant during all video
impf_set : ImpactFuncSet
impact functions
haz_list : (list(Hazard))
every Hazard contains an event; all hazards
use the same centroids
file_name : str, optional
file name to save video, if provided
writer : matplotlib.animation.*, optional
video writer. Default: pillow with bitrate=500
imp_thresh : float
represent damages greater than threshold
args_exp : optional
arguments for scatter (points) or hexbin (raster)
matplotlib function used in exposures
args_imp : optional
arguments for scatter (points) or hexbin (raster)
matplotlib function used in impact
Returns
-------
list(Impact)
"""
if args_exp is None:
args_exp = dict()
if args_imp is None:
args_imp = dict()
imp_list = []
exp_list = []
imp_arr = np.zeros(len(exp.gdf))
for i_time, _ in enumerate(haz_list):
imp_tmp = Impact()
imp_tmp.calc(exp, impf_set, haz_list[i_time])
imp_arr = np.maximum(imp_arr, imp_tmp.eai_exp)
# remove not impacted exposures
save_exp = imp_arr > imp_thresh
imp_tmp.coord_exp = imp_tmp.coord_exp[save_exp, :]
imp_tmp.eai_exp = imp_arr[save_exp]
imp_list.append(imp_tmp)
exp_list.append(~save_exp)
v_lim = [np.array([haz.intensity.min() for haz in haz_list]).min(),
np.array([haz.intensity.max() for haz in haz_list]).max()]
if 'vmin' not in args_exp:
args_exp['vmin'] = exp.gdf.value.values.min()
if 'vmin' not in args_imp:
args_imp['vmin'] = np.array([imp.eai_exp.min() for imp in imp_list
if imp.eai_exp.size]).min()
if 'vmax' not in args_exp:
args_exp['vmax'] = exp.gdf.value.values.max()
if 'vmax' not in args_imp:
args_imp['vmax'] = np.array([imp.eai_exp.max() for imp in imp_list
if imp.eai_exp.size]).max()
if 'cmap' not in args_exp:
args_exp['cmap'] = 'winter_r'
if 'cmap' not in args_imp:
args_imp['cmap'] = 'autumn_r'
plot_raster = False
if exp.meta:
plot_raster = True
def run(i_time):
haz_list[i_time].plot_intensity(1, axis=axis, cmap='Greys', vmin=v_lim[0],
vmax=v_lim[1], alpha=0.8)
if plot_raster:
exp.plot_hexbin(axis=axis, mask=exp_list[i_time], ignore_zero=True,
pop_name=False, **args_exp)
if imp_list[i_time].coord_exp.size:
imp_list[i_time].plot_hexbin_eai_exposure(axis=axis, pop_name=False,
**args_imp)
fig.delaxes(fig.axes[1])
else:
exp.plot_scatter(axis=axis, mask=exp_list[i_time], ignore_zero=True,
pop_name=False, **args_exp)
if imp_list[i_time].coord_exp.size:
imp_list[i_time].plot_scatter_eai_exposure(axis=axis, pop_name=False,
**args_imp)
fig.delaxes(fig.axes[1])
fig.delaxes(fig.axes[1])
fig.delaxes(fig.axes[1])
axis.set_xlim(haz_list[-1].centroids.lon.min(), haz_list[-1].centroids.lon.max())
axis.set_ylim(haz_list[-1].centroids.lat.min(), haz_list[-1].centroids.lat.max())
axis.set_title(haz_list[i_time].event_name[0])
pbar.update()
if file_name:
LOGGER.info('Generating video %s', file_name)
fig, axis, _fontsize = u_plot.make_map()
ani = animation.FuncAnimation(fig, run, frames=len(haz_list),
interval=500, blit=False)
pbar = tqdm(total=len(haz_list))
fig.tight_layout()
ani.save(file_name, writer=writer)
pbar.close()
return imp_list
def _loc_return_imp(self, return_periods, imp, exc_imp):
"""Compute local exceedence impact for given return period.
Parameters
----------
return_periods : np.array return periods to consider
cen_pos (int): centroid position
Returns
-------
np.array
"""
# sorted impacts
sort_pos = np.argsort(imp, axis=0)[::-1, :]
columns = np.ones(imp.shape, int)
# pylint: disable=unsubscriptable-object # pylint/issues/3139
columns *= np.arange(columns.shape[1])
imp_sort = imp[sort_pos, columns]
# cummulative frequency at sorted intensity
freq_sort = self.frequency[sort_pos]
np.cumsum(freq_sort, axis=0, out=freq_sort)
for cen_idx in range(imp.shape[1]):
exc_imp[:, cen_idx] = self._cen_return_imp(
imp_sort[:, cen_idx], freq_sort[:, cen_idx],
0, return_periods)
def _exp_impact(self, exp_iimp, exposures, hazard, imp_fun, insure_flag):
"""Compute impact for inpute exposure indexes and impact function.
Parameters
----------
exp_iimp : np.array exposures indexes
exposures: climada.entity.Exposures instance
hazard : climada.Hazard
imp_fun : climada.entity.ImpactFunc
impact function instance
insure_flag : bool
consider deductible and cover of exposures
"""
if not exp_iimp.size:
return
# get assigned centroids
icens = exposures.gdf[INDICATOR_CENTR + hazard.tag.haz_type].values[exp_iimp]
# get affected intensities
inten_val = hazard.intensity[:, icens]
# get affected fractions
fract = hazard.fraction[:, icens]
# impact = fraction * mdr * value
inten_val.data = imp_fun.calc_mdr(inten_val.data)
impact = fract.multiply(inten_val).multiply(exposures.gdf.value.values[exp_iimp])
if insure_flag and impact.nonzero()[0].size:
inten_val = hazard.intensity[:, icens].toarray()
paa = np.interp(inten_val, imp_fun.intensity, imp_fun.paa)
impact = impact.toarray()
impact -= exposures.gdf.deductible.values[exp_iimp] * paa
impact = np.clip(impact, 0, exposures.gdf.cover.values[exp_iimp])
self.eai_exp[exp_iimp] += np.einsum('ji,j->i', impact, hazard.frequency)
impact = sparse.coo_matrix(impact)
else:
self.eai_exp[exp_iimp] += np.squeeze(np.asarray(np.sum(
impact.multiply(hazard.frequency.reshape(-1, 1)), axis=0)))
self.at_event += np.squeeze(np.asarray(np.sum(impact, axis=1)))
self.tot_value += np.sum(exposures.gdf.value.values[exp_iimp])
if isinstance(self.imp_mat, tuple):
row_ind, col_ind = impact.nonzero()
self.imp_mat[0].extend(list(impact.data))
self.imp_mat[1][0].extend(list(row_ind))
self.imp_mat[1][1].extend(list(exp_iimp[col_ind]))
def _build_exp(self):
return Exposures(
data={
'value': self.eai_exp,
'latitude': self.coord_exp[:, 0],
'longitude': self.coord_exp[:, 1],
},
crs=self.crs,
value_unit=self.unit,
ref_year=0,
tag=Tag(),
meta=None
)
def _build_exp_event(self, event_id):
"""Write impact of an event as Exposures
Parameters
----------
event_id : int
id of the event
"""
[[ix]] = (self.event_id == event_id).nonzero()
return Exposures(
data={
'value': self.imp_mat[ix].toarray().ravel(),
'latitude': self.coord_exp[:, 0],
'longitude': self.coord_exp[:, 1],
},
crs=self.crs,
value_unit=self.unit,
ref_year=0,
tag=Tag(),
meta=None
)
@staticmethod
def _cen_return_imp(imp, freq, imp_th, return_periods):
"""From ordered impact and cummulative frequency at centroid, get
exceedance impact at input return periods.
Parameters
----------
imp : np.array
sorted impact at centroid
freq : np.array
cummulative frequency at centroid
imp_th : float
impact threshold
return_periods : np.array
return periods
Returns
-------
np.array
"""
imp_th = np.asarray(imp > imp_th).squeeze()
imp_cen = imp[imp_th]
freq_cen = freq[imp_th]
if not imp_cen.size:
return np.zeros((return_periods.size,))
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
pol_coef = np.polyfit(np.log(freq_cen), imp_cen, deg=1)
except ValueError:
pol_coef = np.polyfit(np.log(freq_cen), imp_cen, deg=0)
imp_fit = np.polyval(pol_coef, np.log(1 / return_periods))
wrong_inten = (return_periods > np.max(1 / freq_cen)) & np.isnan(imp_fit)
imp_fit[wrong_inten] = 0.
return imp_fit
[docs] def select(self,
event_ids=None, event_names=None, dates=None,
coord_exp=None):
"""
Select a subset of events and/or exposure points from the impact.
If multiple input variables are not None, it returns all the impacts
matching at least one of the conditions.
Note
----
the frequencies are NOT adjusted. Method to adjust frequencies
and obtain correct eai_exp:
1- Select subset of impact according to your choice
imp = impact.select(...)
2- Adjust manually the frequency of the subset of impact
imp.frequency = [...]
3- Use select without arguments to select all events and recompute
the eai_exp with the updated frequencies.
imp = imp.select()
Parameters
----------
event_ids : list[int], optional
Selection of events by their id. The default is None.
event_names : list[str], optional
Selection of events by their name. The default is None.
dates : tuple(), optional
(start-date, end-date), events are selected if they are >=
than start-date and <= than end-date. Dates in same format
as impact.date (ordinal format of datetime library)
The default is None.
coord_exp : np.ndarray), optional
Selection of exposures coordinates [lat, lon] (in degrees)
The default is None.
Raises
------
ValueError
If the impact matrix is missing, the eai_exp and aai_agg cannot
be updated for a selection of events and/or exposures.
Returns
-------
imp : climada.engine.Impact
A new impact object with a selection of events and/or exposures
"""
nb_events = self.event_id.size
nb_exp = len(self.coord_exp)
if self.imp_mat.shape != (nb_events, nb_exp):
raise ValueError("The impact matrix is missing or incomplete. "
"The eai_exp and aai_agg cannot be computed. "
"Please recompute impact.calc() with save_mat=True "
"before using impact.select()")
if nb_events == nb_exp:
LOGGER.warning("The number of events is equal to the number of "
"exposure points. It is not possible to "
"differentiate events and exposures attributes. "
"Please add/remove one event/exposure point. "
"This is a purely technical limitation of this "
"method.")
return None
imp = copy.deepcopy(self)
# apply event selection to impact attributes
sel_ev = self._selected_events_idx(event_ids, event_names, dates, nb_events)
if sel_ev is not None:
# set all attributes that are 'per event', i.e. have a dimension
# of length equal to the number of events (=nb_events)
for attr in get_attributes_with_matching_dimension(imp, [nb_events]):
value = imp.__getattribute__(attr)
if isinstance(value, np.ndarray):
if value.ndim == 1:
setattr(imp, attr, value[sel_ev])
else:
LOGGER.warning("Found a multidimensional numpy array "
"with one dimension matching the number of events. "
"But multidimensional numpy arrays are not handled "
"in impact.select")
elif isinstance(value, sparse.csr_matrix):
setattr(imp, attr, value[sel_ev, :])
elif isinstance(value, list) and value:
setattr(imp, attr, [value[idx] for idx in sel_ev])
else:
pass
LOGGER.info("The eai_exp and aai_agg are computed for the "
"selected subset of events WITHOUT modification of "
"the frequencies.")
# apply exposure selection to impact attributes
if coord_exp is not None:
sel_exp = self._selected_exposures_idx(coord_exp)
imp.coord_exp = imp.coord_exp[sel_exp]
imp.imp_mat = imp.imp_mat[:, sel_exp]
# .A1 reduce 1d matrix to 1d array
imp.at_event = imp.imp_mat.sum(axis=1).A1
imp.tot_value = None
LOGGER.info("The total value cannot be re-computed for a "
"subset of exposures and is set to None.")
# cast frequency vector into 2d array for sparse matrix multiplication
freq_mat = imp.frequency.reshape(len(imp.frequency), 1)
# .A1 reduce 1d matrix to 1d array
imp.eai_exp = imp.imp_mat.multiply(freq_mat).sum(axis=0).A1
imp.aai_agg = imp.eai_exp.sum()
return imp
def _selected_exposures_idx(self, coord_exp):
assigned_idx = u_coord.assign_coordinates(self.coord_exp, coord_exp, threshold=0)
sel_exp = (assigned_idx >= 0).nonzero()[0]
if sel_exp.size == 0:
LOGGER.warning("No exposure coordinates match the selection.")
return sel_exp
def _selected_events_idx(self, event_ids, event_names, dates, nb_events):
if all(var is None for var in [dates, event_ids, event_names]):
return None
# filter events by date
if dates is None:
mask_dt = np.zeros(nb_events, dtype=bool)
else:
mask_dt = np.ones(nb_events, dtype=bool)
date_ini, date_end = dates
if isinstance(date_ini, str):
date_ini = u_dt.str_to_date(date_ini)
date_end = u_dt.str_to_date(date_end)
mask_dt &= (date_ini <= self.date)
mask_dt &= (self.date <= date_end)
if not np.any(mask_dt):
LOGGER.info('No impact event in given date range %s.', dates)
sel_dt = mask_dt.nonzero()[0] # Convert bool to indices
# filter events by id
if event_ids is None:
sel_id = np.array([], dtype=int)
else:
sel_id = np.isin(self.event_id, event_ids).nonzero()[0]
if sel_id.size == 0:
LOGGER.info('No impact event with given ids %s found.', event_ids)
# filter events by name
if event_names is None:
sel_na = np.array([], dtype=int)
else:
sel_na = np.isin(self.event_name, event_names).nonzero()[0]
if sel_na.size == 0:
LOGGER.info('No impact event with given names %s found.', event_names)
# select events with machting id, name or date field.
sel_ev = np.unique(np.concatenate([sel_dt, sel_id, sel_na]))
# if no event found matching ids, names or dates, warn the user
if sel_ev.size == 0:
LOGGER.warning("No event matches the selection. ")
return sel_ev
[docs]class ImpactFreqCurve():
"""Impact exceedence frequency curve.
Attributes
----------
tag : dict
dictionary of tags of exposures, impact functions set and
hazard: {'exp': Tag(), 'impf_set': Tag(), 'haz': TagHazard()}
return_per : np.array
return period
impact : np.array
impact exceeding frequency
unit : str
value unit used (given by exposures unit)
label : str
string describing source data
"""
[docs] def __init__(self):
self.tag = dict()
self.return_per = np.array([])
self.impact = np.array([])
self.unit = ''
self.label = ''
[docs] def plot(self, axis=None, log_frequency=False, **kwargs):
"""Plot impact frequency curve.
Parameters
----------
axis : matplotlib.axes._subplots.AxesSubplot, optional
axis to use
log_frequency : boolean, optional
plot logarithmioc exceedance frequency on x-axis
kwargs : optional
arguments for plot matplotlib function, e.g. color='b'
Returns
-------
matplotlib.axes._subplots.AxesSubplot
"""
if not axis:
_, axis = plt.subplots(1, 1)
axis.set_title(self.label)
axis.set_ylabel('Impact (' + self.unit + ')')
if log_frequency:
axis.set_xlabel('Exceedance frequency (1/year)')
axis.set_xscale('log')
axis.plot(self.return_per**-1, self.impact, **kwargs)
else:
axis.set_xlabel('Return period (year)')
axis.plot(self.return_per, self.impact, **kwargs)
return axis