"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Define Exposures class.
"""
__all__ = ['Exposures', 'add_sea', 'INDICATOR_IMPF', 'INDICATOR_CENTR']
import logging
import copy
from pathlib import Path
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
from geopandas import GeoDataFrame
import rasterio
from rasterio.warp import Resampling
import contextily as ctx
import cartopy.crs as ccrs
from climada.hazard import Hazard
import climada.util.hdf5_handler as u_hdf5
from climada.util.constants import ONE_LAT_KM, DEF_CRS, CMAP_RASTER
import climada.util.coordinates as u_coord
import climada.util.plot as u_plot
from climada import CONFIG
LOGGER = logging.getLogger(__name__)
INDICATOR_IMPF_OLD = 'if_'
"""Previously used name of the column containing the impact functions id of specified hazard"""
INDICATOR_IMPF = 'impf_'
"""Name of the column containing the impact functions id of specified hazard"""
INDICATOR_CENTR = 'centr_'
"""Name of the column containing the centroids id of specified hazard"""
DEF_REF_YEAR = CONFIG.exposures.def_ref_year.int()
"""Default reference year"""
DEF_VALUE_UNIT = 'USD'
"""Default value unit"""
DEF_VAR_MAT = {'sup_field_name': 'entity',
'field_name': 'assets',
'var_name': {'lat': 'lat',
'lon': 'lon',
'val': 'Value',
'ded': 'Deductible',
'cov': 'Cover',
'impf': 'DamageFunID',
'cat': 'Category_ID',
'reg': 'Region_ID',
'uni': 'Value_unit',
'ass': 'centroid_index',
'ref': 'reference_year'
}
}
"""MATLAB variable names"""
[docs]class Exposures():
"""geopandas GeoDataFrame with metada and columns (pd.Series) defined in
Attributes.
Attributes
----------
description : str
metadata - description of content and origin of the data
ref_year : int
metada - reference year
value_unit : str
metada - unit of the exposures values
latitude : pd.Series
latitude
longitude : pd.Series
longitude
value : pd.Series
a value for each exposure
impf_SUFFIX : pd.Series, optional
e.g. impf_TC. impact functions id for hazard TC.
There might be different hazards defined: impf_TC, impf_FL, ...
If not provided, set to default ``impf_`` with ids 1 in check().
geometry : pd.Series, optional
geometry of type Point of each instance.
Computed in method set_geometry_points().
meta : dict
dictionary containing corresponding raster properties (if any):
width, height, crs and transform must be present at least (transform needs
to contain upper left corner!). Exposures might not contain all the points
of the corresponding raster. Not used in internal computations.
deductible : pd.Series, optional
deductible value for each exposure
cover : pd.Series, optional
cover value for each exposure
category_id : pd.Series, optional
category id for each exposure
region_id : pd.Series, optional
region id for each exposure
centr_SUFFIX : pd.Series, optional
e.g. centr_TC. centroids index for hazard
TC. There might be different hazards defined: centr_TC, centr_FL, ...
Computed in method assign_centroids().
"""
_metadata = ['description', 'ref_year', 'value_unit', 'meta']
vars_oblig = ['value', 'latitude', 'longitude']
"""Name of the variables needed to compute the impact."""
vars_def = [INDICATOR_IMPF, INDICATOR_IMPF_OLD]
"""Name of variables that can be computed."""
vars_opt = [INDICATOR_CENTR, 'deductible', 'cover', 'category_id',
'region_id', 'geometry']
"""Name of the variables that aren't need to compute the impact."""
@property
def crs(self):
"""Coordinate Reference System, refers to the crs attribute of the inherent GeoDataFrame"""
try:
return self.gdf.geometry.crs or self.meta.get('crs')
except AttributeError: # i.e., no geometry, crs is assumed to be a property
# In case of gdf without geometry, empty or before set_geometry_points was called
return self.meta.get('crs')
[docs] def __init__(self, *args, meta=None, description=None, ref_year=DEF_REF_YEAR,
value_unit=DEF_VALUE_UNIT, crs=None, **kwargs):
"""Creates an Exposures object from a GeoDataFrame
Parameters
----------
args :
Arguments of the GeoDataFrame constructor
kwargs :
Named arguments of the GeoDataFrame constructor, additionally
meta : dict, optional
Metadata dictionary. Default: {} (empty dictionary)
description : str, optional
Default: None
ref_year : int, optional
Reference Year. Defaults to the entry of the same name in `meta` or 2018.
value_unit : str, optional
Unit of the exposed value. Defaults to the entry of the same name in `meta` or 'USD'.
crs : object, anything accepted by pyproj.CRS.from_user_input
Coordinate reference system. Defaults to the entry of the same name in `meta`, or to
the CRS of the GeoDataFrame (if provided) or to 'epsg:4326'.
"""
# meta data
self.meta = {} if meta is None else meta
if not isinstance(self.meta, dict):
raise ValueError("meta must be a dictionary")
self.description = self.meta.get('description') if description is None else description
self.ref_year = self.meta.get('ref_year', DEF_REF_YEAR) if ref_year is None else ref_year
self.value_unit = (self.meta.get('value_unit', DEF_VALUE_UNIT)
if value_unit is None else value_unit)
# remaining generic attributes from derived classes
for mda in type(self)._metadata:
if mda not in Exposures._metadata:
if mda in kwargs:
setattr(self, mda, kwargs.pop(mda))
elif mda in self.meta:
setattr(self, mda, self.meta[mda])
else:
setattr(self, mda, None)
# crs (property) and geometry
data = args[0] if args else kwargs.get('data', {})
try:
data_crs = data.geometry.crs
except AttributeError:
data_crs = None
if data_crs and data.crs and not u_coord.equal_crs(data_crs, data.crs):
raise ValueError("Inconsistent crs definition in data and data.geometry")
crs = (crs if crs is not None
else self.meta['crs'] if 'crs' in self.meta
else data_crs if data_crs
else None)
if 'crs' in self.meta and not u_coord.equal_crs(self.meta['crs'], crs):
raise ValueError("Inconsistent CRS definition, crs and meta arguments don't match")
if data_crs and not u_coord.equal_crs(data_crs, crs):
raise ValueError("Inconsistent CRS definition, data doesn't match meta or crs argument")
if not crs:
crs = DEF_CRS
geometry = kwargs.get('geometry')
if geometry and isinstance(geometry, str):
raise ValueError("Exposures is not able to handle customized 'geometry' column names.")
# make the data frame
self.set_gdf(GeoDataFrame(*args, **kwargs), crs=crs)
def __str__(self):
return '\n'.join(
[f"{md}: {self.__dict__[md]}" for md in type(self)._metadata] +
[f"crs: {self.crs}", "data:", str(self.gdf)]
)
def _access_item(self, *args):
raise TypeError("Since CLIMADA 2.0, Exposures objects are not subscriptable. Data "
"fields of Exposures objects are accessed using the `gdf` attribute. "
"For example, `expo['value']` is replaced by `expo.gdf['value']`.")
__getitem__ = _access_item
__setitem__ = _access_item
__delitem__ = _access_item
[docs] def check(self):
"""Check Exposures consistency.
Reports missing columns in log messages.
If no ``impf_*`` column is present in the dataframe, a default column ``impf_`` is added
with default impact function id 1.
"""
# mandatory columns
for var in self.vars_oblig:
if var not in self.gdf.columns:
raise ValueError(f"{var} missing in gdf")
# computable columns except impf_*
for var in sorted(set(self.vars_def).difference([INDICATOR_IMPF, INDICATOR_IMPF_OLD])):
if not var in self.gdf.columns:
LOGGER.info("%s not set.", var)
# special treatment for impf_*
default_impf_present = False
for var in [INDICATOR_IMPF, INDICATOR_IMPF_OLD]:
if var in self.gdf.columns:
LOGGER.info("Hazard type not set in %s", var)
default_impf_present = True
if not default_impf_present and not [
col for col in self.gdf.columns
if col.startswith(INDICATOR_IMPF) or col.startswith(INDICATOR_IMPF_OLD)
]:
LOGGER.info("Setting %s to default impact functions ids 1.", INDICATOR_IMPF)
self.gdf[INDICATOR_IMPF] = 1
# optional columns except centr_*
for var in sorted(set(self.vars_opt).difference([INDICATOR_CENTR])):
if not var in self.gdf.columns:
LOGGER.info("%s not set.", var)
# special treatment for centr_*
if INDICATOR_CENTR in self.gdf.columns:
LOGGER.info("Hazard type not set in %s", INDICATOR_CENTR)
elif not any([col.startswith(INDICATOR_CENTR) for col in self.gdf.columns]):
LOGGER.info("%s not set.", INDICATOR_CENTR)
# check if CRS is consistent
if self.crs != self.meta.get('crs'):
raise ValueError(f"Inconsistent CRS definition, gdf ({self.crs}) attribute doesn't "
f"match meta ({self.meta.get('crs')}) attribute.")
# check whether geometry corresponds to lat/lon
try:
if (self.gdf.geometry.values[0].x != self.gdf.longitude.values[0] or
self.gdf.geometry.values[0].y != self.gdf.latitude.values[0]):
raise ValueError("Geometry values do not correspond to latitude and" +
" longitude. Use set_geometry_points() or set_lat_lon().")
except AttributeError: # no geometry column
pass
[docs] def set_crs(self, crs=None):
"""Set the Coordinate Reference System.
If the epxosures GeoDataFrame has a 'geometry' column it will be updated too.
Parameters
----------
crs : object, optional
anything anything accepted by pyproj.CRS.from_user_input
if the original value is None it will be set to the default CRS.
"""
# clear the meta dictionary entry
if 'crs' in self.meta:
old_crs = self.meta.pop('crs')
crs = crs if crs else self.crs if self.crs else DEF_CRS
# adjust the dataframe
if 'geometry' in self.gdf.columns:
try:
self.gdf.set_crs(crs, inplace=True)
except ValueError:
# restore popped crs and leave
self.meta['crs'] = old_crs
raise
# store the value
self.meta['crs'] = crs
[docs] def set_gdf(self, gdf:GeoDataFrame, crs=None):
"""Set the `gdf` GeoDataFrame and update the CRS
Parameters
----------
gdf : GeoDataFrame
crs : object, optional,
anything anything accepted by pyproj.CRS.from_user_input,
by default None, then `gdf.crs` applies or - if not set - the exposure's current crs
"""
# check argument type
if not isinstance(gdf, GeoDataFrame):
raise ValueError("gdf is not a GeoDataFrame")
# set the dataframe
self.gdf = gdf
# update the coordinate reference system
self.set_crs(crs)
[docs] def get_impf_column(self, haz_type=''):
"""Find the best matching column name in the exposures dataframe for a given hazard type,
Parameters
----------
haz_type : str or None
hazard type, as in the hazard's.haz_type
which is the HAZ_TYPE constant of the hazard's module
Returns
-------
str
a column name, the first of the following that is present in the exposures' dataframe:
- ``impf_[haz_type]``
- ``if_[haz_type]``
- ``impf_``
- ``if_``
Raises
------
ValueError
if none of the above is found in the dataframe.
"""
if INDICATOR_IMPF + haz_type in self.gdf.columns:
return INDICATOR_IMPF + haz_type
if INDICATOR_IMPF_OLD + haz_type in self.gdf.columns:
LOGGER.info("Impact function column name 'if_%s' is not according to current"
" naming conventions. It's suggested to use 'impf_%s' instead.",
haz_type, haz_type)
return INDICATOR_IMPF_OLD + haz_type
if INDICATOR_IMPF in self.gdf.columns:
LOGGER.info("No specific impact function column found for hazard %s."
" Using the anonymous 'impf_' column.", haz_type)
return INDICATOR_IMPF
if INDICATOR_IMPF_OLD in self.gdf.columns:
LOGGER.info("No specific impact function column found for hazard %s. Using the"
" anonymous 'if_' column, which is not according to current naming"
" conventions. It's suggested to use 'impf_' instead.", haz_type)
return INDICATOR_IMPF_OLD
raise ValueError(f"Missing exposures impact functions {INDICATOR_IMPF}.")
[docs] def assign_centroids(self, hazard, distance='euclidean',
threshold=u_coord.NEAREST_NEIGHBOR_THRESHOLD,
overwrite=True):
"""Assign for each exposure coordinate closest hazard coordinate.
The Exposures ``gdf`` will be altered by this method. It will have an additional
(or modified) column named ``centr_[hazard.HAZ_TYPE]`` after the call.
Uses the utility function ``u_coord.match_centroids``. See there for details
and parameters.
The value -1 is used for distances larger than ``threshold`` in point distances.
In case of raster hazards the value -1 is used for centroids outside of the raster.
Parameters
----------
hazard : Hazard
Hazard to match (with raster or vector centroids).
distance : str, optional
Distance to use in case of vector centroids.
Possible values are "euclidean", "haversine" and "approx".
Default: "euclidean"
threshold : float
If the distance (in km) to the nearest neighbor exceeds `threshold`,
the index `-1` is assigned.
Set `threshold` to 0, to disable nearest neighbor matching.
Default: 100 (km)
overwrite: bool
If True, overwrite centroids already present. If False, do
not assign new centroids. Default is True.
See Also
--------
climada.util.coordinates.match_grid_points: method to associate centroids to
exposure points when centroids is a raster
climada.util.coordinates.match_coordinates:
method to associate centroids to exposure points
Notes
-----
The default order of use is:
1. if centroid raster is defined, assign exposures points to
the closest raster point.
2. if no raster, assign centroids to the nearest neighbor using
euclidian metric
Both cases can introduce innacuracies for coordinates in lat/lon
coordinates as distances in degrees differ from distances in meters
on the Earth surface, in particular for higher latitude and distances
larger than 100km. If more accuracy is needed, please use 'haversine'
distance metric. This however is slower for (quasi-)gridded data,
and works only for non-gridded data.
"""
haz_type = hazard.haz_type
centr_haz = INDICATOR_CENTR + haz_type
if centr_haz in self.gdf:
LOGGER.info('Exposures matching centroids already found for %s', haz_type)
if overwrite:
LOGGER.info('Existing centroids will be overwritten for %s', haz_type)
else:
return
LOGGER.info('Matching %s exposures with %s centroids.',
str(self.gdf.shape[0]), str(hazard.centroids.size))
if not u_coord.equal_crs(self.crs, hazard.centroids.crs):
raise ValueError('Set hazard and exposure to same CRS first!')
# Note: equal_crs is tested here, rather than within match_centroids(),
# because exp.gdf.crs may not be defined, but exp.crs must be defined.
assigned_centr = u_coord.match_centroids(self.gdf, hazard.centroids,
distance=distance, threshold=threshold)
self.gdf[centr_haz] = assigned_centr
[docs] def set_geometry_points(self, scheduler=None):
"""Set geometry attribute of GeoDataFrame with Points from latitude and
longitude attributes.
Parameters
----------
scheduler : str, optional
used for dask map_partitions.
“threads”, “synchronous” or “processes”
"""
u_coord.set_df_geometry_points(self.gdf, scheduler=scheduler, crs=self.crs)
[docs] def set_lat_lon(self):
"""Set latitude and longitude attributes from geometry attribute."""
LOGGER.info('Setting latitude and longitude attributes.')
self.gdf['latitude'] = self.gdf.geometry[:].y
self.gdf['longitude'] = self.gdf.geometry[:].x
[docs] def set_from_raster(self, *args, **kwargs):
"""This function is deprecated, use Exposures.from_raster instead."""
LOGGER.warning("The use of Exposures.set_from_raster is deprecated."
"Use Exposures.from_raster instead.")
self.__dict__ = Exposures.from_raster(*args, **kwargs).__dict__
[docs] @classmethod
def from_raster(cls, file_name, band=1, src_crs=None, window=None,
geometry=None, dst_crs=None, transform=None,
width=None, height=None, resampling=Resampling.nearest):
"""Read raster data and set latitude, longitude, value and meta
Parameters
----------
file_name : str
file name containing values
band : int, optional
bands to read (starting at 1)
src_crs : crs, optional
source CRS. Provide it if error without it.
window : rasterio.windows.Windows, optional
window where data is
extracted
geometry : list of shapely.geometry, optional
consider pixels only within these shape
dst_crs : crs, optional
reproject to given crs
transform : rasterio.Affine
affine transformation to apply
wdith : float
number of lons for transform
height : float
number of lats for transform
resampling : rasterio.warp,.Resampling optional
resampling
function used for reprojection to dst_crs
returns
--------
Exposures
"""
exp = cls()
meta, value = u_coord.read_raster(file_name, [band], src_crs, window,
geometry, dst_crs, transform, width,
height, resampling)
ulx, xres, _, uly, _, yres = meta['transform'].to_gdal()
lrx = ulx + meta['width'] * xres
lry = uly + meta['height'] * yres
x_grid, y_grid = np.meshgrid(np.arange(ulx + xres / 2, lrx, xres),
np.arange(uly + yres / 2, lry, yres))
if exp.crs is None:
exp.set_crs()
exp.gdf['longitude'] = x_grid.flatten()
exp.gdf['latitude'] = y_grid.flatten()
exp.gdf['value'] = value.reshape(-1)
exp.meta = meta
return exp
[docs] def plot_scatter(self, mask=None, ignore_zero=False, pop_name=True,
buffer=0.0, extend='neither', axis=None, figsize=(9, 13),
adapt_fontsize=True, title=None, **kwargs):
"""Plot exposures geometry's value sum scattered over Earth's map.
The plot will we projected according to the current crs.
Parameters
----------
mask : np.array, optional
mask to apply to eai_exp plotted.
ignore_zero : bool, optional
flag to indicate if zero and negative
values are ignored in plot. Default: False
pop_name : bool, optional
add names of the populated places, by default True.
buffer : float, optional
border to add to coordinates. Default: 0.0.
extend : str, optional
extend border colorbar with arrows.
[ 'neither' | 'both' | 'min' | 'max' ]
axis : matplotlib.axes._subplots.AxesSubplot, optional
axis to use
figsize : tuple, optional
figure size for plt.subplots
adapt_fontsize : bool, optional
If set to true, the size of the fonts will be adapted to the size of the figure.
Otherwise the default matplotlib font size is used. Default is True.
title : str, optional
a title for the plot. If not set `self.description` is used.
kwargs : optional
arguments for scatter matplotlib function, e.g.
cmap='Greys'
Returns
-------
cartopy.mpl.geoaxes.GeoAxesSubplot
"""
crs_epsg, _ = u_plot.get_transformation(self.crs)
if title is None:
title = self.description or ""
if mask is None:
mask = np.ones((self.gdf.shape[0],), dtype=bool)
if ignore_zero:
pos_vals = self.gdf.value[mask].values > 0
else:
pos_vals = np.ones((self.gdf.value[mask].values.size,), dtype=bool)
value = self.gdf.value[mask][pos_vals].values
coord = np.stack([self.gdf.latitude[mask][pos_vals].values,
self.gdf.longitude[mask][pos_vals].values], axis=1)
return u_plot.geo_scatter_from_array(array_sub=value,
geo_coord=coord,
var_name=f'Value ({self.value_unit})',
title=title,
pop_name=pop_name,
buffer=buffer,
extend=extend,
proj=crs_epsg,
axes=axis,
figsize=figsize,
adapt_fontsize=adapt_fontsize,
**kwargs)
[docs] def plot_hexbin(self, mask=None, ignore_zero=False, pop_name=True,
buffer=0.0, extend='neither', axis=None, figsize=(9, 13),
adapt_fontsize=True, title=None, **kwargs):
"""Plot exposures geometry's value sum binned over Earth's map.
An other function for the bins can be set through the key reduce_C_function.
The plot will we projected according to the current crs.
Parameters
----------
mask : np.array, optional
mask to apply to eai_exp plotted.
ignore_zero : bool, optional
flag to indicate if zero and negative
values are ignored in plot. Default: False
pop_name : bool, optional
add names of the populated places, by default True.
buffer : float, optional
border to add to coordinates. Default: 0.0.
extend : str, optional
extend border colorbar with arrows.
[ 'neither' | 'both' | 'min' | 'max' ]
Default is 'neither'.
axis : matplotlib.axes._subplots.AxesSubplot, optional
axis to use
figsize : tuple
figure size for plt.subplots
Default is (9, 13).
adapt_fontsize : bool, optional
If set to true, the size of the fonts will be adapted to the size of the figure.
Otherwise the default matplotlib font size is used.
Default is True.
title : str, optional
a title for the plot. If not set `self.description` is used.
kwargs : optional
arguments for hexbin matplotlib function, e.g.
`reduce_C_function=np.average`.
Default is `reduce_C_function=np.sum`
Returns
-------
cartopy.mpl.geoaxes.GeoAxesSubplot
"""
crs_epsg, _ = u_plot.get_transformation(self.crs)
if title is None:
title = self.description or ""
if 'reduce_C_function' not in kwargs:
kwargs['reduce_C_function'] = np.sum
if mask is None:
mask = np.ones((self.gdf.shape[0],), dtype=bool)
if ignore_zero:
pos_vals = self.gdf.value[mask].values > 0
else:
pos_vals = np.ones((self.gdf.value[mask].values.size,), dtype=bool)
value = self.gdf.value[mask][pos_vals].values
coord = np.stack([self.gdf.latitude[mask][pos_vals].values,
self.gdf.longitude[mask][pos_vals].values], axis=1)
return u_plot.geo_bin_from_array(array_sub=value,
geo_coord=coord,
var_name=f'Value ({self.value_unit})',
title=title,
pop_name=pop_name,
buffer=buffer,
extend=extend,
proj=crs_epsg,
axes=axis,
figsize=figsize,
adapt_fontsize=adapt_fontsize,
**kwargs)
[docs] def plot_raster(self, res=None, raster_res=None, save_tiff=None,
raster_f=lambda x: np.log10((np.fmax(x + 1, 1))),
label='value (log10)', scheduler=None, axis=None,
figsize=(9, 13), fill=True, adapt_fontsize=True, **kwargs):
"""Generate raster from points geometry and plot it using log10 scale
`np.log10((np.fmax(raster+1, 1)))`.
Parameters
----------
res : float, optional
resolution of current data in units of latitude
and longitude, approximated if not provided.
raster_res : float, optional
desired resolution of the raster
save_tiff : str, optional
file name to save the raster in tiff
format, if provided
raster_f : lambda function
transformation to use to data. Default:
log10 adding 1.
label : str
colorbar label
scheduler : str
used for dask map_partitions. “threads”,
“synchronous” or “processes”
axis : matplotlib.axes._subplots.AxesSubplot, optional
axis to use
figsize : tuple, optional
figure size for plt.subplots
fill : bool, optional
If false, the areas with no data will be plotted
in white. If True, the areas with missing values are filled as 0s.
The default is True.
adapt_fontsize : bool, optional
If set to true, the size of the fonts will be adapted to the size of the figure.
Otherwise the default matplotlib font size is used. Default is True.
kwargs : optional
arguments for imshow matplotlib function
Returns
-------
matplotlib.figure.Figure, cartopy.mpl.geoaxes.GeoAxesSubplot
"""
if self.meta and self.meta.get('height', 0) * self.meta.get('height', 0) == len(self.gdf):
raster = self.gdf.value.values.reshape((self.meta['height'],
self.meta['width']))
# check raster starts by upper left corner
if self.gdf.latitude.values[0] < self.gdf.latitude.values[-1]:
raster = np.flip(raster, axis=0)
if self.gdf.longitude.values[0] > self.gdf.longitude.values[-1]:
raise ValueError('Points are not ordered according to meta raster.')
else:
raster, meta = u_coord.points_to_raster(self.gdf, ['value'], res, raster_res, scheduler)
raster = raster.reshape((meta['height'], meta['width']))
# save tiff
if save_tiff is not None:
with rasterio.open(save_tiff, 'w', driver='GTiff',
height=meta['height'], width=meta['width'], count=1,
dtype=np.float32, crs=self.crs, transform=meta['transform']
) as ras_tiff:
ras_tiff.write(raster.astype(np.float32), 1)
# make plot
proj_data, _ = u_plot.get_transformation(self.crs)
proj_plot = proj_data
if isinstance(proj_data, ccrs.PlateCarree):
# use different projections for plot and data to shift the central lon in the plot
xmin, ymin, xmax, ymax = u_coord.latlon_bounds(
self.gdf.latitude.values, self.gdf.longitude.values)
proj_plot = ccrs.PlateCarree(central_longitude=0.5 * (xmin + xmax))
else:
xmin, ymin, xmax, ymax = (self.gdf.longitude.min(), self.gdf.latitude.min(),
self.gdf.longitude.max(), self.gdf.latitude.max())
if not axis:
_, axis, fontsize = u_plot.make_map(proj=proj_plot, figsize=figsize,
adapt_fontsize=adapt_fontsize)
else:
fontsize = None
cbar_ax = make_axes_locatable(axis).append_axes('right', size="6.5%",
pad=0.1, axes_class=plt.Axes)
axis.set_extent((xmin, xmax, ymin, ymax), crs=proj_data)
u_plot.add_shapes(axis)
if not fill:
raster = np.where(raster == 0, np.nan, raster)
raster_f = lambda x: np.log10((np.maximum(x + 1, 1)))
if 'cmap' not in kwargs:
kwargs['cmap'] = CMAP_RASTER
imag = axis.imshow(raster_f(raster), **kwargs, origin='upper',
extent=(xmin, xmax, ymin, ymax), transform=proj_data)
cbar = plt.colorbar(imag, cax=cbar_ax, label=label)
plt.colorbar(imag, cax=cbar_ax, label=label)
plt.tight_layout()
plt.draw()
if fontsize:
cbar.ax.tick_params(labelsize=fontsize)
cbar.ax.yaxis.get_offset_text().set_fontsize(fontsize)
for item in [axis.title, cbar.ax.xaxis.label, cbar.ax.yaxis.label]:
item.set_fontsize(fontsize)
return axis
[docs] def plot_basemap(self, mask=None, ignore_zero=False, pop_name=True,
buffer=0.0, extend='neither', zoom=10,
url=None, axis=None, **kwargs):
"""Scatter points over satellite image using contextily
Parameters
----------
mask : np.array, optional
mask to apply to eai_exp plotted. Same
size of the exposures, only the selected indexes will be plot.
ignore_zero : bool, optional
flag to indicate if zero and negative
values are ignored in plot. Default: False
pop_name : bool, optional
add names of the populated places, by default True.
buffer : float, optional
border to add to coordinates. Default: 0.0.
extend : str, optional
extend border colorbar with arrows.
[ 'neither' | 'both' | 'min' | 'max' ]
zoom : int, optional
zoom coefficient used in the satellite image
url : Any, optional
image source, e.g., ``ctx.providers.OpenStreetMap.Mapnik``.
Default: ``ctx.providers.Stamen.Terrain``
axis : matplotlib.axes._subplots.AxesSubplot, optional
axis to use
kwargs : optional
arguments for scatter matplotlib function, e.g.
cmap='Greys'. Default: 'Wistia'
Returns
-------
matplotlib.figure.Figure, cartopy.mpl.geoaxes.GeoAxesSubplot
"""
if 'geometry' not in self.gdf:
self.set_geometry_points()
crs_ori = self.crs
self.to_crs(epsg=3857, inplace=True)
axis = self.plot_scatter(mask, ignore_zero, pop_name, buffer,
extend, shapes=False, axis=axis, **kwargs)
ctx.add_basemap(axis, zoom, source=url, origin='upper')
axis.set_axis_off()
self.to_crs(crs_ori, inplace=True)
return axis
[docs] def write_hdf5(self, file_name):
"""Write data frame and metadata in hdf5 format
Parameters
----------
file_name : str
(path and) file name to write to.
"""
LOGGER.info('Writing %s', file_name)
store = pd.HDFStore(file_name, mode='w')
pandas_df = pd.DataFrame(self.gdf)
for col in pandas_df.columns:
if str(pandas_df[col].dtype) == "geometry":
pandas_df[col] = np.asarray(self.gdf[col])
# Avoid pandas PerformanceWarning when writing HDF5 data
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=pd.errors.PerformanceWarning)
# Write dataframe
store.put('exposures', pandas_df)
var_meta = {}
for var in type(self)._metadata:
var_meta[var] = getattr(self, var)
store.get_storer('exposures').attrs.metadata = var_meta
store.close()
[docs] def read_hdf5(self, *args, **kwargs):
"""This function is deprecated, use Exposures.from_hdf5 instead."""
LOGGER.warning("The use of Exposures.read_hdf5 is deprecated."
"Use Exposures.from_hdf5 instead.")
self.__dict__ = Exposures.from_hdf5(*args, **kwargs).__dict__
[docs] @classmethod
def from_hdf5(cls, file_name):
"""Read data frame and metadata in hdf5 format
Parameters
----------
file_name : str
(path and) file name to read from.
additional_vars : list
list of additional variable names to read that
are not in exposures.base._metadata
Returns
-------
Exposures
"""
LOGGER.info('Reading %s', file_name)
if not Path(file_name).is_file():
raise FileNotFoundError(str(file_name))
with pd.HDFStore(file_name, mode='r') as store:
metadata = store.get_storer('exposures').attrs.metadata
# in previous versions of CLIMADA and/or geopandas, the CRS was stored in '_crs'/'crs'
crs = metadata.get('crs', metadata.get('_crs'))
if crs is None and metadata.get('meta'):
crs = metadata['meta'].get('crs')
exp = cls(store['exposures'], crs=crs)
for key, val in metadata.items():
if key in type(exp)._metadata: # pylint: disable=protected-access
setattr(exp, key, val)
if key == 'tag': # for backwards compatitbility with climada <= 3.x
descriptions = [u_hdf5.to_string(x) for x in getattr(val, 'description', [])]
exp.description = "\n".join(descriptions) if descriptions else None
return exp
[docs] def read_mat(self, *args, **kwargs):
"""This function is deprecated, use Exposures.from_mat instead."""
LOGGER.warning("The use of Exposures.read_mat is deprecated."
"Use Exposures.from_mat instead.")
self.__dict__ = Exposures.from_mat(*args, **kwargs).__dict__
[docs] @classmethod
def from_mat(cls, file_name, var_names=None):
"""Read MATLAB file and store variables in exposures.
Parameters
----------
file_name : str
absolute path file
var_names : dict, optional
dictionary containing the name of the
MATLAB variables. Default: DEF_VAR_MAT.
Returns
-------
Exposures
"""
LOGGER.info('Reading %s', file_name)
if not var_names:
var_names = DEF_VAR_MAT
data = u_hdf5.read(file_name)
try:
data = data[var_names['sup_field_name']]
except KeyError:
pass
try:
data = data[var_names['field_name']]
exposures = dict()
_read_mat_obligatory(exposures, data, var_names)
_read_mat_optional(exposures, data, var_names)
except KeyError as var_err:
raise KeyError(f"Variable not in MAT file: {var_names.get('field_name')}")\
from var_err
exp = cls(data=exposures)
_read_mat_metadata(exp, data, file_name, var_names)
return exp
#
# Extends the according geopandas method
#
[docs] def to_crs(self, crs=None, epsg=None, inplace=False):
"""Wrapper of the :py:meth:`GeoDataFrame.to_crs` method.
Transform geometries to a new coordinate reference system.
Transform all geometries in a GeoSeries to a different coordinate reference system.
The crs attribute on the current GeoSeries must be set. Either crs in string or dictionary
form or an EPSG code may be specified for output.
This method will transform all points in all objects. It has no notion or projecting entire
geometries. All segments joining points are assumed to be lines in the current projection,
not geodesics. Objects crossing the dateline (or other projection boundary) will have
undesirable behavior.
Parameters
----------
crs : dict or str
Output projection parameters as string or in dictionary form.
epsg : int
EPSG code specifying output projection.
inplace : bool, optional, default: False
Whether to return a new GeoDataFrame or do the transformation in
place.
Returns
-------
None if inplace is True else a transformed copy of the exposures object
"""
if crs and epsg:
raise ValueError("one of crs or epsg must be None")
if inplace:
self.gdf.to_crs(crs, epsg, True)
self.meta['crs'] = crs or f'EPSG:{epsg}'
self.set_lat_lon()
return None
exp = self.copy()
exp.to_crs(crs, epsg, True)
return exp
[docs] def plot(self, *args, **kwargs):
"""Wrapper of the :py:meth:`GeoDataFrame.plot` method"""
self.gdf.plot(*args, **kwargs)
[docs] def copy(self, deep=True):
"""Make a copy of this Exposures object.
Parameters
----------
deep (bool): Make a deep copy, i.e. also copy data. Default True.
Returns
-------
Exposures
"""
gdf = self.gdf.copy(deep=deep)
metadata = dict([
(md, copy.deepcopy(self.__dict__[md])) for md in type(self)._metadata
])
metadata['crs'] = self.crs
return type(self)(
gdf,
**metadata
)
[docs] def write_raster(self, file_name, value_name='value', scheduler=None):
"""Write value data into raster file with GeoTiff format
Parameters
----------
file_name : str
name output file in tif format
"""
if self.meta and self.meta['height'] * self.meta['width'] == len(self.gdf):
raster = self.gdf[value_name].values.reshape((self.meta['height'],
self.meta['width']))
# check raster starts by upper left corner
if self.gdf.latitude.values[0] < self.gdf.latitude.values[-1]:
raster = np.flip(raster, axis=0)
if self.gdf.longitude.values[0] > self.gdf.longitude.values[-1]:
raise ValueError('Points are not ordered according to meta raster.')
u_coord.write_raster(file_name, raster, self.meta)
else:
raster, meta = u_coord.points_to_raster(self.gdf, [value_name], scheduler=scheduler)
u_coord.write_raster(file_name, raster, meta)
[docs] @staticmethod
def concat(exposures_list):
"""Concatenates Exposures or DataFrame objectss to one Exposures object.
Parameters
----------
exposures_list : list of Exposures or DataFrames
The list must not be empty with the first item supposed to be an Exposures object.
Returns
-------
Exposures
with the metadata of the first item in the list and the dataframes concatenated.
"""
exp = exposures_list[0].copy(deep=False)
if not isinstance(exp, Exposures):
exp = Exposures(exp)
exp.check()
df_list = [
ex.gdf if isinstance(ex, Exposures) else ex
for ex in exposures_list
]
crss = [
ex.crs for ex in exposures_list
if isinstance(ex, (Exposures, GeoDataFrame))
and hasattr(ex, "crs")
and ex.crs is not None
]
if crss:
crs = crss[0]
if any(not u_coord.equal_crs(c, crs) for c in crss[1:]):
raise ValueError("concatenation of exposures with different crs")
else:
crs = None
exp.set_gdf(GeoDataFrame(
pd.concat(df_list, ignore_index=True, sort=False)
), crs=crs)
return exp
[docs] def centroids_total_value(self, hazard):
"""Compute value of exposures close enough to be affected by hazard
.. deprecated:: 3.3
This method will be removed in a future version. Use
:py:meth:`affected_total_value` instead.
This method computes the sum of the value of all exposures points for which a
Hazard centroid is assigned.
Parameters
----------
hazard : Hazard
Hazard affecting Exposures
Returns
-------
float
Sum of value of all exposures points for which
a centroids is assigned
"""
nz_mask = (
(self.gdf.value.values > 0)
& (self.gdf[hazard.centr_exp_col].values >= 0)
)
return np.sum(self.gdf.value.values[nz_mask])
[docs] def affected_total_value(
self,
hazard: Hazard,
threshold_affected: float = 0,
overwrite_assigned_centroids: bool = True,
):
"""
Total value of the exposures that are affected by at least
one hazard event (sum of value of all exposures points for which
at least one event has intensity larger than the threshold).
Parameters
----------
hazard : Hazard
Hazard affecting Exposures
threshold_affected : int or float
Hazard intensity threshold above which an exposures is
considere affected.
The default is 0.
overwrite_assigned_centroids : boolean
Assign centroids from the hazard to the exposures and overwrite
existing ones.
The default is True.
Returns
-------
float
Sum of value of all exposures points for which
a centroids is assigned and that have at least one
event intensity above threshold.
See Also
--------
Exposures.assign_centroids : method to assign centroids.
Note
----
The fraction attribute of the hazard is ignored. Thus, for hazards
with fraction defined the affected values will be overestimated.
"""
self.assign_centroids(hazard=hazard, overwrite=overwrite_assigned_centroids)
assigned_centroids = self.gdf[hazard.centr_exp_col]
nz_mask = (self.gdf.value.values > 0) & (assigned_centroids.values >= 0)
cents = np.unique(assigned_centroids[nz_mask])
cent_with_inten_above_thres = (
hazard.intensity[:, cents].max(axis=0) > threshold_affected
).nonzero()[1]
above_thres_mask = np.isin(
self.gdf[hazard.centr_exp_col].values, cents[cent_with_inten_above_thres]
)
return np.sum(self.gdf.value.values[above_thres_mask])
[docs]def add_sea(exposures, sea_res, scheduler=None):
"""Add sea to geometry's surroundings with given resolution. region_id
set to -1 and other variables to 0.
Parameters
----------
exposures : Exposures
the Exposures object without sea surroundings.
sea_res : tuple (float,float)
(sea_coast_km, sea_res_km), where first parameter
is distance from coast to fill with water and second parameter
is resolution between sea points
scheduler : str, optional
used for dask map_partitions.
“threads”, “synchronous” or “processes”
Returns
-------
Exposures
"""
LOGGER.info("Adding sea at %s km resolution and %s km distance from coast.",
str(sea_res[1]), str(sea_res[0]))
sea_res = (sea_res[0] / ONE_LAT_KM, sea_res[1] / ONE_LAT_KM)
min_lat = max(-90, float(exposures.gdf.latitude.min()) - sea_res[0])
max_lat = min(90, float(exposures.gdf.latitude.max()) + sea_res[0])
min_lon = max(-180, float(exposures.gdf.longitude.min()) - sea_res[0])
max_lon = min(180, float(exposures.gdf.longitude.max()) + sea_res[0])
lat_arr = np.arange(min_lat, max_lat + sea_res[1], sea_res[1])
lon_arr = np.arange(min_lon, max_lon + sea_res[1], sea_res[1])
lon_mgrid, lat_mgrid = np.meshgrid(lon_arr, lat_arr)
lon_mgrid, lat_mgrid = lon_mgrid.ravel(), lat_mgrid.ravel()
on_land = ~u_coord.coord_on_land(lat_mgrid, lon_mgrid)
sea_exp_gdf = GeoDataFrame()
sea_exp_gdf['latitude'] = lat_mgrid[on_land]
sea_exp_gdf['longitude'] = lon_mgrid[on_land]
sea_exp_gdf['region_id'] = np.zeros(sea_exp_gdf.latitude.size, int) - 1
if 'geometry' in exposures.gdf.columns:
u_coord.set_df_geometry_points(sea_exp_gdf, crs=exposures.crs, scheduler=scheduler)
for var_name in exposures.gdf.columns:
if var_name not in ('latitude', 'longitude', 'region_id', 'geometry'):
sea_exp_gdf[var_name] = np.zeros(sea_exp_gdf.latitude.size,
exposures.gdf[var_name].dtype)
return Exposures(
pd.concat([exposures.gdf, sea_exp_gdf], ignore_index=True, sort=False),
crs=exposures.crs,
ref_year=exposures.ref_year,
value_unit=exposures.value_unit,
meta=exposures.meta,
description=exposures.description,
)
def _read_mat_obligatory(exposures, data, var_names):
"""Fill obligatory variables."""
exposures['value'] = np.squeeze(data[var_names['var_name']['val']])
exposures['latitude'] = data[var_names['var_name']['lat']].reshape(-1)
exposures['longitude'] = data[var_names['var_name']['lon']].reshape(-1)
exposures[INDICATOR_IMPF] = np.squeeze(
data[var_names['var_name']['impf']]).astype(int, copy=False)
def _read_mat_optional(exposures, data, var_names):
"""Fill optional parameters."""
try:
exposures['deductible'] = np.squeeze(data[var_names['var_name']['ded']])
except KeyError:
pass
try:
exposures['cover'] = np.squeeze(data[var_names['var_name']['cov']])
except KeyError:
pass
try:
exposures['category_id'] = \
np.squeeze(data[var_names['var_name']['cat']]).astype(int, copy=False)
except KeyError:
pass
try:
exposures['region_id'] = \
np.squeeze(data[var_names['var_name']['reg']]).astype(int, copy=False)
except KeyError:
pass
try:
assigned = np.squeeze(data[var_names['var_name']['ass']]).astype(int, copy=False)
if assigned.size > 0:
exposures[INDICATOR_CENTR] = assigned
except KeyError:
pass
def _read_mat_metadata(exposures, data, file_name, var_names):
"""Fill metadata in DataFrame object"""
try:
exposures.ref_year = int(np.squeeze(data[var_names['var_name']['ref']]))
except KeyError:
exposures.ref_year = DEF_REF_YEAR
try:
exposures.value_unit = u_hdf5.get_str_from_ref(
file_name, data[var_names['var_name']['uni']][0][0])
except KeyError:
exposures.value_unit = DEF_VALUE_UNIT