"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU Lesser General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Define Hazard.
"""
__all__ = ['Hazard']
import copy
import itertools
import logging
import datetime as dt
import warnings
import numpy as np
import pandas as pd
import geopandas as gpd
from scipy import sparse
import matplotlib.pyplot as plt
import h5py
import rasterio
from rasterio.features import rasterize
from rasterio.warp import reproject, Resampling, calculate_default_transform
from climada.hazard.tag import Tag as TagHazard
from climada.hazard.centroids.centr import Centroids
import climada.util.plot as u_plot
import climada.util.checker as check
import climada.util.dates_times as u_dt
from climada.util.config import CONFIG
import climada.util.hdf5_handler as hdf5
import climada.util.coordinates as co
LOGGER = logging.getLogger(__name__)
DEF_VAR_EXCEL = {'sheet_name': {'inten' : 'hazard_intensity',
'freq' : 'hazard_frequency'
},
'col_name': {'cen_id' : 'centroid_id/event_id',
'even_id' : 'event_id',
'even_dt' : 'event_date',
'even_name' : 'event_name',
'freq' : 'frequency',
'orig': 'orig_event_flag'
},
'col_centroids': {'sheet_name': 'centroids',
'col_name': {'cen_id' : 'centroid_id',
'lat' : 'latitude',
'lon' : 'longitude'
}
}
}
""" Excel variable names """
DEF_VAR_MAT = {'field_name': 'hazard',
'var_name': {'per_id' : 'peril_ID',
'even_id' : 'event_ID',
'ev_name' : 'name',
'freq' : 'frequency',
'inten': 'intensity',
'unit': 'units',
'frac': 'fraction',
'comment': 'comment',
'datenum': 'datenum',
'orig': 'orig_event_flag'
},
'var_cent': {'field_names': ['centroids', 'hazard'],
'var_name': {'cen_id' : 'centroid_ID',
'lat' : 'lat',
'lon' : 'lon'
}
}
}
""" MATLAB variable names """
[docs]class Hazard():
"""Contains events of some hazard type defined at centroids. Loads from
files with format defined in FILE_EXT.
Attributes:
tag (TagHazard): information about the source
units (str): units of the intensity
centroids (Centroids): centroids of the events
event_id (np.array): id (>0) of each event
event_name (list(str)): name of each event (default: event_id)
date (np.array): integer date corresponding to the proleptic
Gregorian ordinal, where January 1 of year 1 has ordinal 1
(ordinal format of datetime library)
orig (np.array): flags indicating historical events (True)
or probabilistic (False)
frequency (np.array): frequency of each event in years
intensity (sparse.csr_matrix): intensity of the events at centroids
fraction (sparse.csr_matrix): fraction of affected exposures for each
event at each centroid
"""
intensity_thres = 10
""" Intensity threshold per hazard used to filter lower intensities. To be
set for every hazard type """
vars_oblig = {'tag',
'units',
'centroids',
'event_id',
'frequency',
'intensity',
'fraction'
}
"""Name of the variables needed to compute the impact. Types: scalar, str,
list, 1dim np.array of size num_events, scipy.sparse matrix of shape
num_events x num_centroids, Centroids and Tag."""
vars_def = {'date',
'orig',
'event_name'
}
"""Name of the variables used in impact calculation whose value is
descriptive and can therefore be set with default values. Types: scalar,
string, list, 1dim np.array of size num_events.
"""
vars_opt = set()
"""Name of the variables that aren't need to compute the impact. Types:
scalar, string, list, 1dim np.array of size num_events."""
[docs] def __init__(self, haz_type, pool=None):
"""Initialize values.
Parameters:
haz_type (str, optional): acronym of the hazard type (e.g. 'TC').
Examples:
Fill hazard values by hand:
>>> haz = Hazard('TC')
>>> haz.intensity = sparse.csr_matrix(np.zeros((2, 2)))
>>> ...
Take hazard values from file:
>>> haz = Hazard('TC', HAZ_DEMO_MAT)
>>> haz.read_mat(HAZ_DEMO_MAT, 'demo')
"""
self.tag = TagHazard()
self.tag.haz_type = haz_type
self.units = ''
self.centroids = Centroids()
# following values are defined for each event
self.event_id = np.array([], int)
self.frequency = np.array([], float)
self.event_name = list()
self.date = np.array([], int)
self.orig = np.array([], bool)
# following values are defined for each event and centroid
self.intensity = sparse.csr_matrix(np.empty((0, 0))) # events x centroids
self.fraction = sparse.csr_matrix(np.empty((0, 0))) # events x centroids
if pool:
self.pool = pool
LOGGER.info('Using %s CPUs.', self.pool.ncpus)
else:
self.pool = None
[docs] def clear(self):
"""Reinitialize attributes."""
for (var_name, var_val) in self.__dict__.items():
if isinstance(var_val, np.ndarray) and var_val.ndim == 1:
setattr(self, var_name, np.array([], dtype=var_val.dtype))
elif isinstance(var_val, sparse.csr_matrix):
setattr(self, var_name, sparse.csr_matrix(np.empty((0, 0))))
else:
setattr(self, var_name, var_val.__class__())
[docs] def check(self):
"""Check dimension of attributes.
Raises:
ValueError
"""
self.centroids.check()
self._check_events()
[docs] def set_raster(self, files_intensity, files_fraction=None, attrs={},
band=[1], src_crs=None, window=False, geometry=False,
dst_crs=False, transform=None, width=None, height=None,
resampling=Resampling.nearest):
""" Append intensity and fraction from raster file. 0s put to the masked
values. File can be partially read using window OR geometry.
Alternatively, CRS and/or transformation can be set using dst_crs and/or
(transform, width and height).
Parameters:
files_intensity (list(str)): file names containing intensity
files_fraction (list(str)): file names containing fraction
attrs (dict, optional): name of Hazard attributes and their values
band (list(int), optional): bands to read (starting at 1)
src_crs (crs, optional): source CRS. Provide it if error without it.
window (rasterio.windows.Windows, optional): window where data is
extracted
geometry (shapely.geometry, optional): consider pixels only in shape
dst_crs (crs, optional): reproject to given crs
transform (rasterio.Affine): affine transformation to apply
wdith (float): number of lons for transform
height (float): number of lats for transform
resampling (rasterio.warp,.Resampling optional): resampling
function used for reprojection to dst_crs
"""
if files_fraction is not None and len(files_intensity) != len(files_fraction):
LOGGER.error('Number of intensity files differs from fraction files: %s != %s',
len(files_intensity), len(files_fraction))
raise ValueError
self.tag.file_name = str(files_intensity) + ' ; ' + str(files_fraction)
self.centroids = Centroids()
if self.pool:
chunksize = min(len(files_intensity)//self.pool.ncpus, 1000)
# set first centroids
inten_list = [sparse.csr.csr_matrix(self.centroids.set_raster_file( \
files_intensity[0], band, src_crs, window, geometry, dst_crs, \
transform, width, height, resampling))]
inten_list += self.pool.map(self.centroids.set_raster_file, \
files_intensity[1:], itertools.repeat(band), itertools.repeat(src_crs), \
itertools.repeat(window), itertools.repeat(geometry), \
itertools.repeat(dst_crs), itertools.repeat(transform), \
itertools.repeat(width), itertools.repeat(height), \
itertools.repeat(resampling), chunksize=chunksize)
self.intensity = sparse.vstack(inten_list, format='csr')
if files_fraction is not None:
fract_list = self.pool.map(self.centroids.set_raster_file, \
files_fraction, itertools.repeat(band), itertools.repeat(src_crs), \
itertools.repeat(window), itertools.repeat(geometry), \
itertools.repeat(dst_crs), itertools.repeat(transform), \
itertools.repeat(width), itertools.repeat(height), \
itertools.repeat(resampling), chunksize=chunksize)
self.fraction = sparse.vstack(fract_list, format='csr')
else:
inten_list = []
for file in files_intensity:
inten_list.append(self.centroids.set_raster_file(file, band, src_crs, window, \
geometry, dst_crs, transform, width, height, resampling))
self.intensity = sparse.vstack(inten_list, format='csr')
if files_fraction is not None:
fract_list = []
for file in files_fraction:
fract_list.append(self.centroids.set_raster_file(file, band, src_crs, \
window, geometry, dst_crs, transform, width, height, resampling))
self.fraction = sparse.vstack(fract_list, format='csr')
if files_fraction is None:
self.fraction = self.intensity.copy()
self.fraction.data.fill(1)
if 'event_id' in attrs:
self.event_id = attrs['event_id']
else:
self.event_id = np.arange(1, self.intensity.shape[0]+1)
if 'frequency' in attrs:
self.frequency = attrs['frequency']
else:
self.frequency = np.ones(self.event_id.size)
if 'event_name' in attrs:
self.event_name = attrs['event_name']
else:
self.event_name = list(map(str, self.event_id))
if 'date' in attrs:
self.date = np.array([attrs['date']])
else:
self.date = np.ones(self.event_id.size)
if 'orig' in attrs:
self.orig = np.array([attrs['orig']])
else:
self.orig = np.ones(self.event_id.size, bool)
if 'unit' in attrs:
self.unit = attrs['unit']
[docs] def set_vector(self, files_intensity, files_fraction=None, attrs={},
inten_name=['intensity'], frac_name=['fraction'], dst_crs=None):
""" Read vector files format supported by fiona. Each intensity name is
considered an event.
Parameters:
files_intensity (list(str)): file names containing intensity
files_fraction (list(str)): file names containing fraction
attrs (dict, optional): name of Hazard attributes and their values
inten_name (list(str), optional): name of variables containing
the intensities of each event
frac_name (list(str), optional): name of variables containing
the fractions of each event
dst_crs (crs, optional): reproject to given crs
"""
if files_fraction is not None and len(files_intensity) != len(files_fraction):
LOGGER.error('Number of intensity files differs from fraction files: %s != %s',
len(files_intensity), len(files_fraction))
raise ValueError
self.tag.file_name = str(files_intensity) + ' ; ' + str(files_fraction)
self.centroids = Centroids()
for file in files_intensity:
inten = self.centroids.set_vector_file(file, inten_name, dst_crs)
self.intensity = sparse.vstack([self.intensity, inten], format='csr')
if files_fraction is None:
self.fraction = self.intensity.copy()
self.fraction.data.fill(1)
else:
for file in files_fraction:
fract = self.centroids.set_vector_file(file, frac_name, dst_crs)
self.fraction = sparse.vstack([self.fraction, fract], format='csr')
if 'event_id' in attrs:
self.event_id = attrs['event_id']
else:
self.event_id = np.arange(1, self.intensity.shape[0]+1)
if 'frequency' in attrs:
self.frequency = attrs['frequency']
else:
self.frequency = np.ones(self.event_id.size)
if 'event_name' in attrs:
self.event_name = attrs['event_name']
else:
self.event_name = list(map(str, self.event_id))
if 'date' in attrs:
self.date = np.array([attrs['date']])
else:
self.date = np.ones(self.event_id.size)
if 'orig' in attrs:
self.orig = np.array([attrs['orig']])
else:
self.orig = np.ones(self.event_id.size, bool)
if 'unit' in attrs:
self.unit = attrs['unit']
[docs] def reproject_raster(self, dst_crs=False, transform=None, width=None, height=None,
resampl_inten=Resampling.nearest, resampl_fract=Resampling.nearest):
""" Change current raster data to other CRS and/or transformation
Parameters:
dst_crs (crs, optional): reproject to given crs
transform (rasterio.Affine): affine transformation to apply
wdith (float): number of lons for transform
height (float): number of lats for transform
resampl_inten (rasterio.warp,.Resampling optional): resampling
function used for reprojection to dst_crs for intensity
resampl_fract (rasterio.warp,.Resampling optional): resampling
function used for reprojection to dst_crs for fraction
"""
if not self.centroids.meta:
LOGGER.error('Raster not set')
raise ValueError
if not dst_crs:
dst_crs = self.centroids.meta['crs']
if transform and not width or transform and not height:
LOGGER.error('Provide width and height to given transformation.')
raise ValueError
if not transform:
transform, width, height = calculate_default_transform(\
self.centroids.meta['crs'], dst_crs, self.centroids.meta['width'], \
self.centroids.meta['height'], self.centroids.meta['transform'][2], \
self.centroids.meta['transform'][5] + \
self.centroids.meta['height']*self.centroids.meta['transform'][4],
self.centroids.meta['transform'][2] + \
self.centroids.meta['width']*self.centroids.meta['transform'][0], \
self.centroids.meta['transform'][5])
dst_meta = self.centroids.meta.copy()
dst_meta.update({'crs': dst_crs, 'transform': transform,
'width': width, 'height': height
})
intensity = np.zeros((self.size, dst_meta['height'], dst_meta['width']))
fraction = np.zeros((self.size, dst_meta['height'], dst_meta['width']))
kwargs = {'src_transform': self.centroids.meta['transform'],
'src_crs': self.centroids.meta['crs'],
'dst_transform': transform, 'dst_crs': dst_crs,
'resampling': resampl_inten}
for idx_ev, inten in enumerate(self.intensity.todense()):
reproject(source=np.asarray(inten.reshape((self.centroids.meta['height'], \
self.centroids.meta['width']))), destination=intensity[idx_ev, :, :], \
**kwargs)
kwargs.update(resampling=resampl_fract)
for idx_ev, fract in enumerate(self.fraction.todense()):
reproject(source=np.asarray(fract.reshape((self.centroids.meta['height'], \
self.centroids.meta['width']))), destination=fraction[idx_ev, :, :], \
**kwargs)
self.centroids.meta = dst_meta
self.intensity = sparse.csr_matrix(intensity.reshape(self.size, \
dst_meta['height'] * dst_meta['width']))
self.fraction = sparse.csr_matrix(fraction.reshape(self.size, \
dst_meta['height'] * dst_meta['width']))
self.check()
[docs] def reproject_vector(self, dst_crs, scheduler=None):
""" Change current point data to a a given projection
Parameters:
dst_crs (crs): reproject to given crs
scheduler (str, optional): used for dask map_partitions. “threads”,
“synchronous” or “processes”
"""
self.centroids.set_geometry_points(scheduler)
self.centroids.geometry = self.centroids.geometry.to_crs(dst_crs)
self.centroids.lat = self.centroids.geometry[:].y
self.centroids.lon = self.centroids.geometry[:].x
self.check()
[docs] def raster_to_vector(self):
""" Change current raster to points (center of the pixels) """
self.centroids.set_meta_to_lat_lon()
self.centroids.meta = dict()
self.check()
[docs] def vector_to_raster(self, scheduler=None):
""" Change current point data to a raster with same resolution
Parameters:
scheduler (str, optional): used for dask map_partitions. “threads”,
“synchronous” or “processes”
"""
points_df = gpd.GeoDataFrame(crs=self.centroids.geometry.crs)
points_df['latitude'] = self.centroids.lat
points_df['longitude'] = self.centroids.lon
val_names = ['val'+str(i_ev) for i_ev in range(2*self.size)]
for i_ev, inten_name in enumerate(val_names):
if i_ev < self.size:
points_df[inten_name] = np.asarray(self.intensity[i_ev, :].todense()).reshape(-1)
else:
points_df[inten_name] = np.asarray(self.fraction[i_ev-self.size, :].todense()).\
reshape(-1)
raster, meta = co.points_to_raster(points_df, val_names, scheduler=scheduler)
self.intensity = sparse.csr_matrix(raster[:self.size, :, :].reshape(self.size, -1))
self.fraction = sparse.csr_matrix(raster[self.size:, :, :].reshape(self.size, -1))
self.centroids = Centroids()
self.centroids.meta = meta
self.check()
[docs] def read_mat(self, file_name, description='', var_names=DEF_VAR_MAT):
"""Read climada hazard generate with the MATLAB code.
Parameters:
file_name (str): absolute file name
description (str, optional): description of the data
var_names (dict, default): name of the variables in the file,
default: DEF_VAR_MAT constant
Raises:
KeyError
"""
LOGGER.info('Reading %s', file_name)
self.clear()
self.tag.file_name = file_name
self.tag.description = description
try:
data = hdf5.read(file_name)
try:
data = data[var_names['field_name']]
except KeyError:
pass
haz_type = hdf5.get_string(data[var_names['var_name']['per_id']])
self.tag.haz_type = haz_type
self.centroids.read_mat(file_name, var_names=var_names['var_cent'])
self._read_att_mat(data, file_name, var_names)
except KeyError as var_err:
LOGGER.error("Not existing variable: %s", str(var_err))
raise var_err
[docs] def read_excel(self, file_name, description='', var_names=DEF_VAR_EXCEL):
"""Read climada hazard generate with the MATLAB code.
Parameters:
file_name (str): absolute file name
description (str, optional): description of the data
centroids (Centroids, optional): provide centroids if not contained
in the file
var_names (dict, default): name of the variables in the file,
default: DEF_VAR_EXCEL constant
Raises:
KeyError
"""
LOGGER.info('Reading %s', file_name)
haz_type = self.tag.haz_type
self.clear()
self.tag.file_name = file_name
self.tag.haz_type = haz_type
self.tag.description = description
try:
self.centroids.read_excel(file_name, var_names=var_names['col_centroids'])
self._read_att_excel(file_name, var_names)
except KeyError as var_err:
LOGGER.error("Not existing variable: %s", str(var_err))
raise var_err
[docs] def select(self, date=None, orig=None, reg_id=None, reset_frequency=False):
"""Select events within provided date and/or (historical or synthetical)
and/or region. Frequency of the events may need to be recomputed!
Parameters:
date (tuple(str or int), optional): (initial date, final date) in
string ISO format ('2011-01-02') or datetime ordinal integer
orig (bool, optional): select only historical (True) or only
synthetic (False)
reg_id (int, optional): region identifier of the centroids's
region_id attibute
reset_frequency (boolean): change frequency of events proportional to
difference between first and last year (old and new)
default = False
Returns:
Hazard or children
"""
try:
haz = self.__class__()
except TypeError:
haz = Hazard(self.tag.haz_type)
sel_ev = np.ones(self.event_id.size, bool)
sel_cen = np.ones(self.centroids.size, bool)
# filter events with date
if isinstance(date, tuple):
date_ini, date_end = date[0], date[1]
if isinstance(date_ini, str):
date_ini = u_dt.str_to_date(date[0])
date_end = u_dt.str_to_date(date[1])
sel_ev = np.logical_and(date_ini <= self.date,
self.date <= date_end)
if not np.any(sel_ev):
LOGGER.info('No hazard in date range %s.', date)
return None
# filter events hist/synthetic
if isinstance(orig, bool):
sel_ev = np.logical_and(sel_ev, self.orig.astype(bool) == orig)
if not np.any(sel_ev):
LOGGER.info('No hazard with %s tracks.', str(orig))
return None
# filter centroids
if reg_id is not None:
sel_cen = np.argwhere(self.centroids.region_id == reg_id).reshape(-1)
if not sel_cen.size:
LOGGER.info('No hazard centroids with region %s.', str(reg_id))
return None
sel_ev = np.argwhere(sel_ev).reshape(-1)
for (var_name, var_val) in self.__dict__.items():
if isinstance(var_val, np.ndarray) and var_val.ndim == 1 and \
var_val.size:
setattr(haz, var_name, var_val[sel_ev])
elif isinstance(var_val, sparse.csr_matrix):
setattr(haz, var_name, var_val[sel_ev, :][:, sel_cen])
elif isinstance(var_val, list) and var_val:
setattr(haz, var_name, [var_val[idx] for idx in sel_ev])
elif var_name == 'centroids':
if reg_id is not None:
setattr(haz, var_name, var_val.select(reg_id))
else:
setattr(haz, var_name, var_val)
else:
setattr(haz, var_name, var_val)
# reset frequency if date span has changed (optional):
if reset_frequency:
year_span_old = np.abs(dt.datetime.fromordinal(self.date.max()).year - \
dt.datetime.fromordinal(self.date.min()).year)+1
year_span_new = np.abs(dt.datetime.fromordinal(haz.date.max()).year - \
dt.datetime.fromordinal(haz.date.min()).year)+1
haz.frequency = haz.frequency*year_span_old/year_span_new
return haz
[docs] def local_exceedance_inten(self, return_periods=(25, 50, 100, 250)):
""" Compute exceedance intensity map for given return periods.
Parameters:
return_periods (np.array): return periods to consider
Returns:
np.array
"""
# warn if return period is above return period of rarest event:
for rp in return_periods:
if rp > 1/self.frequency.min():
LOGGER.warning('Return period %1.1f exceeds max. event return period.' %(rp))
LOGGER.info('Computing exceedance intenstiy map for return periods: %s',
return_periods)
num_cen = self.intensity.shape[1]
inten_stats = np.zeros((len(return_periods), num_cen))
cen_step = int(CONFIG['global']['max_matrix_size']/self.intensity.shape[0])
if not cen_step:
LOGGER.error('Increase max_matrix_size configuration parameter to'\
' > %s', str(self.intensity.shape[0]))
raise ValueError
# separte in chunks
chk = -1
for chk in range(int(num_cen/cen_step)):
self._loc_return_inten(np.array(return_periods), \
self.intensity[:, chk*cen_step:(chk+1)*cen_step].todense(), \
inten_stats[:, chk*cen_step:(chk+1)*cen_step])
self._loc_return_inten(np.array(return_periods), \
self.intensity[:, (chk+1)*cen_step:].todense(), \
inten_stats[:, (chk+1)*cen_step:])
# set values below 0 to zero if minimum of hazard.intensity >= 0:
if self.intensity.min()>=0 and np.min(inten_stats)<0:
LOGGER.warning('Exceedance intenstiy values below 0 are set to 0. \
Reason: no negative intensity values were found in hazard.')
inten_stats[inten_stats<0] = 0
return inten_stats
[docs] def plot_rp_intensity(self, return_periods=(25, 50, 100, 250),
smooth=True, axis=None, **kwargs):
"""Compute and plot hazard exceedance intensity maps for different
return periods. Calls local_exceedance_inten.
Parameters:
return_periods (tuple(int), optional): return periods to consider
smooth (bool, optional): smooth plot to plot.RESOLUTIONxplot.RESOLUTION
axis (matplotlib.axes._subplots.AxesSubplot, optional): axis to use
kwargs (optional): arguments for pcolormesh matplotlib function
used in event plots
Returns:
matplotlib.axes._subplots.AxesSubplot,
np.ndarray (return_periods.size x num_centroids)
"""
self._set_coords_centroids()
inten_stats = self.local_exceedance_inten(np.array(return_periods))
colbar_name = 'Intensity (' + self.units + ')'
title = list()
for ret in return_periods:
title.append('Return period: ' + str(ret) + ' years')
_, axis = u_plot.geo_im_from_array(inten_stats, self.centroids.coord,\
colbar_name, title, smooth=smooth, axes=axis, **kwargs)
return axis, inten_stats
[docs] def plot_intensity(self, event=None, centr=None, smooth=True, axis=None,
**kwargs):
"""Plot intensity values for a selected event or centroid.
Parameters:
event (int or str, optional): If event > 0, plot intensities of
event with id = event. If event = 0, plot maximum intensity in
each centroid. If event < 0, plot abs(event)-largest event. If
event is string, plot events with that name.
centr (int or tuple, optional): If centr > 0, plot intensity
of all events at centroid with id = centr. If centr = 0,
plot maximum intensity of each event. If centr < 0,
plot abs(centr)-largest centroid where higher intensities
are reached. If tuple with (lat, lon) plot intensity of nearest
centroid.
smooth (bool, optional): smooth plot to plot.RESOLUTIONxplot.RESOLUTION
axis (matplotlib.axes._subplots.AxesSubplot, optional): axis to use
kwargs (optional): arguments for pcolormesh matplotlib function
used in event plots or for plot function used in centroids plots
Returns:
matplotlib.axes._subplots.AxesSubplot
Raises:
ValueError
"""
self._set_coords_centroids()
col_label = 'Intensity (%s)' % self.units
if event is not None:
if isinstance(event, str):
event = self.get_event_id(event)
return self._event_plot(event, self.intensity, col_label,
smooth, axis, **kwargs)
if centr is not None:
if isinstance(centr, tuple):
_, _, centr = self.centroids.get_closest_point(centr[0], centr[1])
return self._centr_plot(centr, self.intensity, col_label, axis, **kwargs)
LOGGER.error("Provide one event id or one centroid id.")
raise ValueError
[docs] def plot_fraction(self, event=None, centr=None, smooth=True, axis=None,
**kwargs):
"""Plot fraction values for a selected event or centroid.
Parameters:
event (int or str, optional): If event > 0, plot fraction of event
with id = event. If event = 0, plot maximum fraction in each
centroid. If event < 0, plot abs(event)-largest event. If event
is string, plot events with that name.
centr (int or tuple, optional): If centr > 0, plot fraction
of all events at centroid with id = centr. If centr = 0,
plot maximum fraction of each event. If centr < 0,
plot abs(centr)-largest centroid where highest fractions
are reached. If tuple with (lat, lon) plot fraction of nearest
centroid.
smooth (bool, optional): smooth plot to plot.RESOLUTIONxplot.RESOLUTION
axis (matplotlib.axes._subplots.AxesSubplot, optional): axis to use
kwargs (optional): arguments for pcolormesh matplotlib function
used in event plots or for plot function used in centroids plots
Returns:
matplotlib.axes._subplots.AxesSubplot
Raises:
ValueError
"""
self._set_coords_centroids()
col_label = 'Fraction'
if event is not None:
if isinstance(event, str):
event = self.get_event_id(event)
return self._event_plot(event, self.fraction, col_label, smooth, axis,
**kwargs)
if centr is not None:
if isinstance(centr, tuple):
_, _, centr = self.centroids.get_closest_point(centr[0], centr[1])
return self._centr_plot(centr, self.fraction, col_label, axis, **kwargs)
LOGGER.error("Provide one event id or one centroid id.")
raise ValueError
[docs] def get_event_id(self, event_name):
""""Get an event id from its name. Several events might have the same
name.
Parameters:
event_name (str): Event name
Returns:
np.array(int)
"""
list_id = self.event_id[[i_name for i_name, val_name \
in enumerate(self.event_name) if val_name == event_name]]
if list_id.size == 0:
LOGGER.error("No event with name: %s", event_name)
raise ValueError
return list_id
[docs] def get_event_name(self, event_id):
""""Get the name of an event id.
Parameters:
event_id (int): id of the event
Returns:
str
Raises:
ValueError
"""
try:
return self.event_name[np.argwhere(
self.event_id == event_id)[0][0]]
except IndexError:
LOGGER.error("No event with id: %s", event_id)
raise ValueError
[docs] def get_event_date(self, event=None):
""" Return list of date strings for given event or for all events,
if no event provided.
Parameters:
event (str or int, optional): event name or id.
Returns:
list(str)
"""
if event is None:
l_dates = [u_dt.date_to_str(date) for date in self.date]
elif isinstance(event, str):
ev_ids = self.get_event_id(event)
l_dates = [u_dt.date_to_str(self.date[ \
np.argwhere(self.event_id == ev_id)[0][0]]) \
for ev_id in ev_ids]
else:
ev_idx = np.argwhere(self.event_id == event)[0][0]
l_dates = [u_dt.date_to_str(self.date[ev_idx])]
return l_dates
[docs] def calc_year_set(self):
""" From the dates of the original events, get number yearly events.
Returns:
dict: key are years, values array with event_ids of that year
"""
orig_year = np.array([dt.datetime.fromordinal(date).year
for date in self.date[self.orig]])
orig_yearset = {}
for year in np.unique(orig_year):
orig_yearset[year] = self.event_id[self.orig][orig_year == year]
return orig_yearset
[docs] def append(self, hazard):
"""Append events and centroids in hazard.
Parameters:
hazard (Hazard): Hazard instance to append to current
Raises:
ValueError
"""
hazard._check_events()
if self.event_id.size == 0:
for key in hazard.__dict__:
try:
self.__dict__[key] = copy.deepcopy(hazard.__dict__[key])
except TypeError:
self.__dict__[key] = copy.copy(hazard.__dict__[key])
return
if (self.units == '') and (hazard.units != ''):
LOGGER.info("Initial hazard does not have units.")
self.units = hazard.units
elif hazard.units == '':
LOGGER.info("Appended hazard does not have units.")
elif self.units != hazard.units:
LOGGER.error("Hazards with different units can't be appended: "
"%s != %s.", self.units, hazard.units)
raise ValueError
self.tag.append(hazard.tag)
n_ini_ev = self.event_id.size
# append all 1-dim variables
for (var_name, var_val), haz_val in zip(self.__dict__.items(),
hazard.__dict__.values()):
if isinstance(var_val, np.ndarray) and var_val.ndim == 1 and \
var_val.size:
setattr(self, var_name, np.append(var_val, haz_val). \
astype(var_val.dtype, copy=False))
elif isinstance(var_val, list) and var_val:
setattr(self, var_name, var_val + haz_val)
# append intensity and fraction:
# if same centroids, just append events
if self.centroids.equal(hazard.centroids):
self.intensity = sparse.vstack([self.intensity, hazard.intensity],
format='csr')
self.fraction = sparse.vstack([self.fraction, hazard.fraction],
format='csr')
elif hazard.intensity.size:
n_ini_cen = self.centroids.size
self.centroids.append(hazard.centroids)
self.intensity = sparse.hstack([self.intensity, \
sparse.lil_matrix((self.intensity.shape[0], \
self.centroids.size - n_ini_cen))], format='lil')
self.fraction = sparse.hstack([self.fraction, \
sparse.lil_matrix((self.fraction.shape[0], \
self.centroids.size - n_ini_cen))], format='lil')
self.intensity = sparse.vstack([self.intensity, \
sparse.lil_matrix((hazard.intensity.shape[0],
self.intensity.shape[1]))], format='lil')
self.fraction = sparse.vstack([self.fraction, \
sparse.lil_matrix((hazard.intensity.shape[0],
self.intensity.shape[1]))], format='lil')
self.intensity[n_ini_ev:, -hazard.intensity.shape[1]:] = hazard.intensity
self.fraction[n_ini_ev:, -hazard.intensity.shape[1]:] = hazard.fraction
self.intensity = self.intensity.tocsr()
self.fraction = self.fraction.tocsr()
# Make event id unique
if np.unique(self.event_id).size != self.event_id.size:
LOGGER.debug('Resetting event_id.')
self.event_id = np.arange(self.event_id.size) + 1
[docs] def remove_duplicates(self):
"""Remove duplicate events (events with same name and date)."""
dup_pos = list()
set_ev = set()
for ev_pos, (ev_name, ev_date) in enumerate(zip(self.event_name,
self.date)):
if (ev_name, ev_date) in set_ev:
dup_pos.append(ev_pos)
set_ev.add((ev_name, ev_date))
if len(set_ev) == self.event_id.size:
return
for var_name, var_val in self.__dict__.items():
if isinstance(var_val, np.ndarray) and var_val.ndim == 1:
setattr(self, var_name, np.delete(var_val, dup_pos))
elif isinstance(var_val, list):
setattr(self, var_name, np.delete(var_val, dup_pos).tolist())
mask = np.ones(self.intensity.shape, dtype=bool)
mask[dup_pos, :] = False
self.intensity = sparse.csr_matrix(self.intensity[mask].\
reshape(self.event_id.size, self.intensity.shape[1]))
self.fraction = sparse.csr_matrix(self.fraction[mask].\
reshape(self.event_id.size, self.intensity.shape[1]))
@property
def size(self):
""" Returns number of events """
return self.event_id.size
[docs] def write_raster(self, file_name, intensity=True):
""" Write intensity or fraction as GeoTIFF file. Each band is an event
Parameters:
file_name (str): file name to write in tif format
intensity (bool): if True, write intensity, otherwise write fraction
"""
variable = self.intensity
if not intensity:
variable = self.fraction
if self.centroids.meta:
co.write_raster(file_name, variable.todense(), self.centroids.meta)
else:
pixel_geom = self.centroids.calc_pixels_polygons()
profile = self.centroids.meta
profile.update(driver='GTiff', dtype=rasterio.float32, count=self.size)
with rasterio.open(file_name, 'w', **profile) as dst:
LOGGER.info('Writting %s', file_name)
for i_ev in range(variable.shape[0]):
raster = rasterize([(x, val) for (x, val) in \
zip(pixel_geom, np.array(variable[i_ev, :].todense()).reshape(-1))], \
out_shape=(profile['height'], profile['width']),\
transform=profile['transform'], fill=0, \
all_touched=True, dtype=profile['dtype'],)
dst.write(raster.astype(profile['dtype']), i_ev+1)
[docs] def write_hdf5(self, file_name,todense=False):
""" Write hazard in hdf5 format.
Parameters:
file_name (str): file name to write, with h5 format
"""
LOGGER.info('Writting %s', file_name)
hf_data = h5py.File(file_name, 'w')
str_dt = h5py.special_dtype(vlen=str)
for (var_name, var_val) in self.__dict__.items():
if var_name == 'centroids':
self.centroids.write_hdf5(hf_data.create_group(var_name))
elif var_name == 'tag':
hf_str = hf_data.create_dataset('haz_type', (1,), dtype=str_dt)
hf_str[0] = var_val.haz_type
hf_str = hf_data.create_dataset('file_name', (1,), dtype=str_dt)
hf_str[0] = str(var_val.file_name)
hf_str = hf_data.create_dataset('description', (1,), dtype=str_dt)
hf_str[0] = str(var_val.description)
elif isinstance(var_val, sparse.csr_matrix):
if todense:
hf_data.create_dataset(var_name, data=var_val.todense())
else:
hf_csr = hf_data.create_group(var_name)
hf_csr.create_dataset('data', data=var_val.data)
hf_csr.create_dataset('indices', data=var_val.indices)
hf_csr.create_dataset('indptr', data=var_val.indptr)
hf_csr.attrs['shape'] = var_val.shape
elif isinstance(var_val, str):
hf_str = hf_data.create_dataset(var_name, (1,), dtype=str_dt)
hf_str[0] = var_val
elif isinstance(var_val, list) and isinstance(var_val[0], str):
hf_str = hf_data.create_dataset(var_name, (len(var_val),), dtype=str_dt)
for i_ev, var_ev in enumerate(var_val):
hf_str[i_ev] = var_ev
elif var_val is not None and var_name != 'pool':
hf_data.create_dataset(var_name, data=var_val)
hf_data.close()
[docs] def read_hdf5(self, file_name):
""" Read hazard in hdf5 format.
Parameters:
file_name (str): file name to read, with h5 format
"""
LOGGER.info('Reading %s', file_name)
self.clear()
hf_data = h5py.File(file_name, 'r')
for (var_name, var_val) in self.__dict__.items():
if var_name == 'centroids':
self.centroids.read_hdf5(hf_data.get(var_name))
elif var_name == 'tag':
self.tag.haz_type = hf_data.get('haz_type')[0]
self.tag.file_name = hf_data.get('file_name')[0]
self.tag.description = hf_data.get('description')[0]
elif isinstance(var_val, np.ndarray) and var_val.ndim == 1:
setattr(self, var_name, np.array(hf_data.get(var_name)))
elif isinstance(var_val, sparse.csr_matrix):
hf_csr = hf_data.get(var_name)
if isinstance(hf_csr,h5py.Dataset):
setattr(self, var_name, sparse.csr_matrix(hf_csr))
else:
setattr(self, var_name, sparse.csr_matrix((hf_csr['data'][:], \
hf_csr['indices'][:], hf_csr['indptr'][:]), hf_csr.attrs['shape']))
elif isinstance(var_val, str):
setattr(self, var_name, hf_data.get(var_name)[0])
elif isinstance(var_val, list):
setattr(self, var_name, np.array(hf_data.get(var_name)).tolist())
else:
setattr(self, var_name, hf_data.get(var_name))
hf_data.close()
def _append_all(self, list_haz_ev):
"""Append event by event with same centroids. Takes centroids and units
of first event.
Parameters:
list_haz_ev (list): Hazard instances with one event and same
centroids
"""
self.clear()
num_ev = len(list_haz_ev)
num_cen = list_haz_ev[0].centroids.size
# check for new variables
for key_new in list_haz_ev[0].__dict__.keys():
if key_new not in self.__dict__:
self.__dict__[key_new] = list_haz_ev[0].__dict__[key_new]
for var_name, var_val in self.__dict__.items():
if isinstance(var_val, np.ndarray) and var_val.ndim == 1:
setattr(self, var_name, np.zeros((num_ev,), dtype=var_val.dtype))
elif isinstance(var_val, sparse.csr.csr_matrix):
setattr(self, var_name, sparse.lil_matrix((num_ev, num_cen)))
for i_ev, haz_ev in enumerate(list_haz_ev):
for (var_name, var_val), ev_val in zip(self.__dict__.items(),
haz_ev.__dict__.values()):
if isinstance(var_val, np.ndarray) and var_val.ndim == 1:
var_val[i_ev] = ev_val[0]
elif isinstance(var_val, list):
var_val.extend(ev_val)
elif isinstance(var_val, sparse.lil_matrix):
var_val[i_ev, :] = ev_val[0, :]
elif isinstance(var_val, TagHazard):
var_val.append(ev_val)
self.centroids = copy.deepcopy(list_haz_ev[0].centroids)
self.units = list_haz_ev[0].units
self.intensity = self.intensity.tocsr()
self.fraction = self.fraction.tocsr()
self.event_id = np.arange(1, num_ev+1)
def _set_coords_centroids(self):
""" If centroids are raster, set lat and lon coordinates """
if self.centroids.meta and not self.centroids.coord.size:
self.centroids.set_meta_to_lat_lon()
def _events_set(self):
"""Generate set of tuples with (event_name, event_date) """
ev_set = set()
for ev_name, ev_date in zip(self.event_name, self.date):
ev_set.add((ev_name, ev_date))
return ev_set
def _event_plot(self, event_id, mat_var, col_name, smooth, axis=None, **kwargs):
""""Plot an event of the input matrix.
Parameters:
event_id (int or np.array(int)): If event_id > 0, plot mat_var of
event with id = event_id. If event_id = 0, plot maximum
mat_var in each centroid. If event_id < 0, plot
abs(event_id)-largest event.
mat_var (sparse matrix): Sparse matrix where each row is an event
col_name (sparse matrix): Colorbar label
smooth (bool, optional): smooth plot to plot.RESOLUTIONxplot.RESOLUTION
axis (matplotlib.axes._subplots.AxesSubplot, optional): axis to use
kwargs (optional): arguments for pcolormesh matplotlib function
Returns:
matplotlib.figure.Figure, matplotlib.axes._subplots.AxesSubplot
"""
if not isinstance(event_id, np.ndarray):
event_id = np.array([event_id])
array_val = list()
l_title = list()
for ev_id in event_id:
if ev_id > 0:
try:
event_pos = np.where(self.event_id == ev_id)[0][0]
except IndexError:
LOGGER.error('Wrong event id: %s.', ev_id)
raise ValueError from IndexError
im_val = mat_var[event_pos, :].todense().transpose()
title = 'Event ID %s: %s' % (str(self.event_id[event_pos]), \
self.event_name[event_pos])
elif ev_id < 0:
max_inten = np.asarray(np.sum(mat_var, axis=1)).reshape(-1)
event_pos = np.argpartition(max_inten, ev_id)[ev_id:]
event_pos = event_pos[np.argsort(max_inten[event_pos])][0]
im_val = mat_var[event_pos, :].todense().transpose()
title = '%s-largest Event. ID %s: %s' % (np.abs(ev_id), \
str(self.event_id[event_pos]), self.event_name[event_pos])
else:
im_val = np.max(mat_var, axis=0).todense().transpose()
title = '%s max intensity at each point' % self.tag.haz_type
array_val.append(im_val)
l_title.append(title)
return u_plot.geo_im_from_array(array_val, self.centroids.coord, col_name,
l_title, smooth=smooth, axes=axis, **kwargs)
def _centr_plot(self, centr_idx, mat_var, col_name, axis=None, **kwargs):
""""Plot a centroid of the input matrix.
Parameters:
centr_id (int): If centr_id > 0, plot mat_var
of all events at centroid with id = centr_id. If centr_id = 0,
plot maximum mat_var of each event. If centr_id < 0,
plot abs(centr_id)-largest centroid where highest mat_var
are reached.
mat_var (sparse matrix): Sparse matrix where each column represents
a centroid
col_name (sparse matrix): Colorbar label
axis (matplotlib.axes._subplots.AxesSubplot, optional): axis to use
kwargs (optional): arguments for plot matplotlib function
Returns:
matplotlib.figure.Figure, matplotlib.axes._subplots.AxesSubplot
"""
coord = self.centroids.coord
if centr_idx > 0:
try:
centr_pos = centr_idx
except IndexError:
LOGGER.error('Wrong centroid id: %s.', centr_idx)
raise ValueError from IndexError
array_val = mat_var[:, centr_pos].todense()
title = 'Centroid %s: (%s, %s)' % (str(centr_idx), \
coord[centr_pos, 0], coord[centr_pos, 1])
elif centr_idx < 0:
max_inten = np.asarray(np.sum(mat_var, axis=0)).reshape(-1)
centr_pos = np.argpartition(max_inten, centr_idx)[centr_idx:]
centr_pos = centr_pos[np.argsort(max_inten[centr_pos])][0]
array_val = mat_var[:, centr_pos].todense()
title = '%s-largest Centroid. %s: (%s, %s)' % \
(np.abs(centr_idx), str(centr_pos), coord[centr_pos, 0], \
coord[centr_pos, 1])
else:
array_val = np.max(mat_var, axis=1).todense()
title = '%s max intensity at each event' % self.tag.haz_type
if not axis:
_, axis = plt.subplots(1)
if 'color' not in kwargs:
kwargs['color'] = 'b'
axis.set_title(title)
axis.set_xlabel('Event number')
axis.set_ylabel(str(col_name))
axis.plot(range(len(array_val)), array_val, **kwargs)
axis.set_xlim([0, len(array_val)])
return axis
def _loc_return_inten(self, return_periods, inten, exc_inten):
""" Compute local exceedence intensity for given return period.
Parameters:
return_periods (np.array): return periods to consider
cen_pos (int): centroid position
Returns:
np.array
"""
# sorted intensity
sort_pos = np.argsort(inten, axis=0)[::-1, :]
columns = np.ones(inten.shape, int)
columns *= np.arange(columns.shape[1])
inten_sort = inten[sort_pos, columns]
# cummulative frequency at sorted intensity
freq_sort = self.frequency[sort_pos]
np.cumsum(freq_sort, axis=0, out=freq_sort)
for cen_idx in range(inten.shape[1]):
exc_inten[:, cen_idx] = self._cen_return_inten(
inten_sort[:, cen_idx], freq_sort[:, cen_idx],
self.intensity_thres, return_periods)
def _check_events(self):
""" Check that all attributes but centroids contain consistent data.
Put default date, event_name and orig if not provided. Check not
repeated events (i.e. with same date and name)
Raises:
ValueError
"""
num_ev = len(self.event_id)
num_cen = self.centroids.size
if np.unique(self.event_id).size != num_ev:
LOGGER.error("There are events with the same identifier.")
raise ValueError
check.check_oligatories(self.__dict__, self.vars_oblig, 'Hazard.',
num_ev, num_ev, num_cen)
check.check_optionals(self.__dict__, self.vars_opt, 'Hazard.', num_ev)
self.event_name = check.array_default(num_ev, self.event_name, \
'Hazard.event_name', list(self.event_id))
self.date = check.array_default(num_ev, self.date, 'Hazard.date', \
np.ones(self.event_id.shape, dtype=int))
self.orig = check.array_default(num_ev, self.orig, 'Hazard.orig', \
np.zeros(self.event_id.shape, dtype=bool))
if len(self._events_set()) != num_ev:
LOGGER.error("There are events with same date and name.")
raise ValueError
@staticmethod
def _cen_return_inten(inten, freq, inten_th, return_periods):
"""From ordered intensity and cummulative frequency at centroid, get
exceedance intensity at input return periods.
Parameters:
inten (np.array): sorted intensity at centroid
freq (np.array): cummulative frequency at centroid
inten_th (float): intensity threshold
return_periods (np.array): return periods
Returns:
np.array
"""
inten_th = np.asarray(inten > inten_th).squeeze()
inten_cen = inten[inten_th]
freq_cen = freq[inten_th]
if not inten_cen.size:
return np.zeros((return_periods.size,))
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
pol_coef = np.polyfit(np.log(freq_cen), inten_cen, deg=1)
except ValueError:
pol_coef = np.polyfit(np.log(freq_cen), inten_cen, deg=0)
inten_fit = np.polyval(pol_coef, np.log(1/return_periods))
wrong_inten = np.logical_and(return_periods > np.max(1/freq_cen), \
np.isnan(inten_fit))
inten_fit[wrong_inten] = 0.
return inten_fit
def _read_att_mat(self, data, file_name, var_names):
""" Read MATLAB hazard's attributes. """
self.frequency = np.squeeze(data[var_names['var_name']['freq']])
self.orig = np.squeeze(data[var_names['var_name']['orig']]).astype(bool)
self.event_id = np.squeeze(data[var_names['var_name']['even_id']]. \
astype(np.int, copy=False))
try:
self.units = hdf5.get_string(data[var_names['var_name']['unit']])
except KeyError:
pass
n_cen = self.centroids.size
n_event = len(self.event_id)
try:
self.intensity = hdf5.get_sparse_csr_mat( \
data[var_names['var_name']['inten']], (n_event, n_cen))
except ValueError as err:
LOGGER.error('Size missmatch in intensity matrix.')
raise err
try:
self.fraction = hdf5.get_sparse_csr_mat( \
data[var_names['var_name']['frac']], (n_event, n_cen))
except ValueError as err:
LOGGER.error('Size missmatch in fraction matrix.')
raise err
except KeyError:
self.fraction = sparse.csr_matrix(np.ones(self.intensity.shape,
dtype=np.float))
# Event names: set as event_id if no provided
try:
self.event_name = hdf5.get_list_str_from_ref(
file_name, data[var_names['var_name']['ev_name']])
except KeyError:
self.event_name = list(self.event_id)
try:
comment = hdf5.get_string(data[var_names['var_name']['comment']])
self.tag.description += ' ' + comment
except KeyError:
pass
try:
datenum = data[var_names['var_name']['datenum']].squeeze()
self.date = np.array([(dt.datetime.fromordinal(int(date)) + \
dt.timedelta(days=date%1)- \
dt.timedelta(days=366)).toordinal() for date in datenum])
except KeyError:
pass
def _read_att_excel(self, file_name, var_names):
""" Read Excel hazard's attributes. """
dfr = pd.read_excel(file_name, var_names['sheet_name']['freq'])
num_events = dfr.shape[0]
self.frequency = dfr[var_names['col_name']['freq']].values
self.orig = dfr[var_names['col_name']['orig']].values.astype(bool)
self.event_id = dfr[var_names['col_name']['even_id']].values. \
astype(int, copy=False)
self.date = dfr[var_names['col_name']['even_dt']].values. \
astype(int, copy=False)
self.event_name = dfr[var_names['col_name']['even_name']].values.tolist()
dfr = pd.read_excel(file_name, var_names['sheet_name']['inten'])
# number of events (ignore centroid_ID column)
# check the number of events is the same as the one in the frequency
if dfr.shape[1] - 1 is not num_events:
LOGGER.error('Hazard intensity is given for a number of events ' \
'different from the number of defined in its frequency: ' \
'%s != %s', dfr.shape[1] - 1, num_events)
raise ValueError
# check number of centroids is the same as retrieved before
if dfr.shape[0] is not self.centroids.size:
LOGGER.error('Hazard intensity is given for a number of centroids ' \
'different from the number of centroids defined: %s != %s', \
dfr.shape[0], self.centroids.size)
raise ValueError
self.intensity = sparse.csr_matrix(dfr.values[:, 1:num_events+1].transpose())
self.fraction = sparse.csr_matrix(np.ones(self.intensity.shape,
dtype=np.float))