"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Define Exposures class.
"""
__all__ = ['Exposures', 'add_sea', 'INDICATOR_IF', 'INDICATOR_CENTR']
import logging
import copy
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
from geopandas import GeoDataFrame
import rasterio
from rasterio.warp import Resampling
import contextily as ctx
import cartopy.crs as ccrs
from climada.entity.tag import Tag
import climada.util.hdf5_handler as u_hdf5
from climada.util.constants import ONE_LAT_KM, DEF_CRS
import climada.util.coordinates as u_coord
from climada.util.interpolation import interpol_index
import climada.util.plot as u_plot
LOGGER = logging.getLogger(__name__)
INDICATOR_IF = 'if_'
"""Name of the column containing the impact functions id of specified hazard"""
INDICATOR_CENTR = 'centr_'
"""Name of the column containing the centroids id of specified hazard"""
DEF_REF_YEAR = 2018
"""Default reference year"""
DEF_VALUE_UNIT = 'USD'
"""Default value unit"""
DEF_VAR_MAT = {'sup_field_name': 'entity',
'field_name': 'assets',
'var_name': {'lat': 'lat',
'lon': 'lon',
'val': 'Value',
'ded': 'Deductible',
'cov': 'Cover',
'imp': 'DamageFunID',
'cat': 'Category_ID',
'reg': 'Region_ID',
'uni': 'Value_unit',
'ass': 'centroid_index',
'ref': 'reference_year'
}
}
"""MATLAB variable names"""
[docs]class Exposures():
"""geopandas GeoDataFrame with metada and columns (pd.Series) defined in
Attributes.
Attributes:
tag (Tag): metada - information about the source data
ref_year (int): metada - reference year
value_unit (str): metada - unit of the exposures values
latitude (pd.Series): latitude
longitude (pd.Series): longitude
crs (dict or crs): CRS information inherent to GeoDataFrame.
value (pd.Series): a value for each exposure
if_ (pd.Series, optional): e.g. if_TC. impact functions id for hazard TC.
There might be different hazards defined: if_TC, if_FL, ...
If not provided, set to default 'if_' with ids 1 in check().
geometry (pd.Series, optional): geometry of type Point of each instance.
Computed in method set_geometry_points().
meta (dict): dictionary containing corresponding raster properties (if any):
width, height, crs and transform must be present at least (transform needs
to contain upper left corner!). Exposures might not contain all the points
of the corresponding raster. Not used in internal computations.
deductible (pd.Series, optional): deductible value for each exposure
cover (pd.Series, optional): cover value for each exposure
category_id (pd.Series, optional): category id for each exposure
region_id (pd.Series, optional): region id for each exposure
centr_ (pd.Series, optional): e.g. centr_TC. centroids index for hazard
TC. There might be different hazards defined: centr_TC, centr_FL, ...
Computed in method assign_centroids().
"""
_metadata = ['tag', 'ref_year', 'value_unit', 'meta']
vars_oblig = ['value', 'latitude', 'longitude']
"""Name of the variables needed to compute the impact."""
vars_def = [INDICATOR_IF]
"""Name of variables that can be computed."""
vars_opt = [INDICATOR_CENTR, 'deductible', 'cover', 'category_id',
'region_id', 'geometry']
"""Name of the variables that aren't need to compute the impact."""
@property
def crs(self):
"""Coordinate Reference System, refers to the crs attribute of the inherent GeoDataFrame"""
try:
return self.gdf.crs
except AttributeError:
return self.meta.get('crs')
[docs] def __init__(self, *args, **kwargs):
"""Creates an Exposures object from a GeoDataFrame
Parameters
----------
*args :
Arguments of the GeoDataFrame constructor
**kwargs :
Named arguments of the GeoDataFrame constructor, additionally
tag : climada.entity.exposures.tag.Tag
Exopusres tag
ref_year : int
Reference Year
value_unit : str
Unit of the exposed value
meta : dict
Metadata dictionary
"""
# meta data
try:
self.meta = kwargs.pop('meta')
if self.meta is None:
self.meta = {}
if not isinstance(self.meta, dict):
raise ValueError("meta must be a dictionary")
except KeyError:
self.meta = {}
LOGGER.info('meta set to default value %s', self.meta)
# tag
try:
self.tag = kwargs.pop('tag')
except KeyError:
self.tag = self.meta.get('tag', Tag())
if 'tag' not in self.meta:
LOGGER.info('tag set to default value %s', self.tag)
# reference year
try:
self.ref_year = kwargs.pop('ref_year')
except KeyError:
self.ref_year = self.meta.get('ref_year', DEF_REF_YEAR)
if 'ref_year' not in self.meta:
LOGGER.info('ref_year set to default value %s', self.ref_year)
# value unit
try:
self.value_unit = kwargs.pop('value_unit')
except KeyError:
self.value_unit = self.meta.get('ref_year', DEF_VALUE_UNIT)
if 'value_unit' not in self.meta:
LOGGER.info('value_unit set to default value %s', self.value_unit)
# remaining generic attributes
for mda in type(self)._metadata:
if mda not in Exposures._metadata:
if mda in kwargs:
setattr(self, mda, kwargs.pop(mda))
elif mda in self.meta:
setattr(self, mda, self.meta[mda])
else:
setattr(self, mda, None)
# make the data frame
self.gdf = GeoDataFrame(*args, **kwargs)
# align crs from gdf and meta data
if self.gdf.crs:
crs = self.gdf.crs
# With geopandas 3.1, the crs attribute is not conserved by the constructor
# without a geometry column. Therefore the conservation is done 'manually':
elif len(args) > 0:
try:
crs = args[0].crs
except AttributeError:
crs = None
elif 'data' in kwargs:
try:
crs = kwargs['data'].crs
except AttributeError:
crs = None
else:
crs = None
# store the crs in the meta dictionary
if crs:
if self.meta.get('crs') and not u_coord.equal_crs(self.meta.get('crs'), crs):
LOGGER.info('crs from `meta` argument ignored and overwritten by GeoDataFrame'
' crs: %s', self.gdf.crs)
self.meta['crs'] = crs
if not self.gdf.crs:
self.gdf.crs = crs
else:
if 'crs' not in self.meta:
LOGGER.info('crs set to default value: %s', DEF_CRS)
self.meta['crs'] = DEF_CRS
self.gdf.crs = self.meta['crs']
def __str__(self):
return '\n'.join(
[f"{md}: {self.__dict__[md]}" for md in type(self)._metadata] +
[f"crs: {self.crs}", "data:", str(self.gdf)]
)
def _access_item(self, *args):
raise TypeError("Since CLIMADA 2.0, Exposures objects are not subscriptable. Data "
"fields of Exposures objects are accessed using the `gdf` attribute. "
"For example, `expo['value']` is replaced by `expo.gdf['value']`.")
__getitem__ = _access_item
__setitem__ = _access_item
__delitem__ = _access_item
[docs] def check(self):
"""Check Exposures consistency.
Reports missing columns in log messages.
If no if_* column is present in the dataframe, a default column 'if_' is added with
default impact function id 1.
"""
# mandatory columns
for var in self.vars_oblig:
if var not in self.gdf.columns:
LOGGER.error("%s missing.", var)
raise ValueError(f"{var} missing in gdf")
# computable columns except if_*
for var in sorted(set(self.vars_def).difference([INDICATOR_IF])):
if not var in self.gdf.columns:
LOGGER.info("%s not set.", var)
# special treatment for if_*
if INDICATOR_IF in self.gdf.columns:
LOGGER.info("Hazard type not set in %s", INDICATOR_IF)
elif not any([col.startswith(INDICATOR_IF) for col in self.gdf.columns]):
LOGGER.info("Setting %s to default impact functions ids 1.", INDICATOR_IF)
self.gdf[INDICATOR_IF] = 1
# optional columns except centr_*
for var in sorted(set(self.vars_opt).difference([INDICATOR_CENTR])):
if not var in self.gdf.columns:
LOGGER.info("%s not set.", var)
# special treatment for centr_*
if INDICATOR_CENTR in self.gdf.columns:
LOGGER.info("Hazard type not set in %s", INDICATOR_CENTR)
elif not any([col.startswith(INDICATOR_CENTR) for col in self.gdf.columns]):
LOGGER.info("%s not set.", INDICATOR_CENTR)
# check whether geometry corresponds to lat/lon
try:
if (self.gdf.geometry.values[0].x != self.gdf.longitude.values[0] or
self.gdf.geometry.values[0].y != self.gdf.latitude.values[0]):
raise ValueError("Geometry values do not correspond to latitude and" +
" longitude. Use set_geometry_points() or set_lat_lon().")
except AttributeError: # no geometry column
pass
[docs] def assign_centroids(self, hazard, method='NN', distance='haversine',
threshold=100):
"""Assign for each exposure coordinate closest hazard coordinate.
-1 used for disatances > threshold in point distances. If raster hazard,
-1 used for centroids outside raster.
Parameters:
hazard (Hazard): hazard to match (with raster or vector centroids)
method (str, optional): interpolation method to use in vector hazard.
Nearest neighbor (NN) default
distance (str, optional): distance to use in vector hazard. Haversine
default
threshold (float): distance threshold in km over which no neighbor
will be found in vector hazard. Those are assigned with a -1.
Default 100 km.
"""
LOGGER.info('Matching %s exposures with %s centroids.',
str(self.gdf.shape[0]), str(hazard.centroids.size))
if not u_coord.equal_crs(self.crs, hazard.centroids.crs):
LOGGER.error('Set hazard and exposure to same CRS first!')
raise ValueError
if hazard.centroids.meta:
xres, _, xmin, _, yres, ymin = hazard.centroids.meta['transform'][:6]
xmin, ymin = xmin + 0.5 * xres, ymin + 0.5 * yres
x_i = np.round((self.gdf.longitude.values - xmin) / xres).astype(int)
y_i = np.round((self.gdf.latitude.values - ymin) / yres).astype(int)
assigned = y_i * hazard.centroids.meta['width'] + x_i
assigned[(x_i < 0) | (x_i >= hazard.centroids.meta['width'])] = -1
assigned[(y_i < 0) | (y_i >= hazard.centroids.meta['height'])] = -1
else:
coord = np.stack([self.gdf.latitude.values, self.gdf.longitude.values], axis=1).astype('float64')
haz_coord = hazard.centroids.coord.astype('float64')
if np.array_equal(coord, haz_coord):
assigned = np.arange(self.gdf.shape[0])
else:
# pairs of floats can be sorted (lexicographically) in NumPy
coord_view = coord.view(dtype='float64,float64').reshape(-1)
haz_coord_view = haz_coord.view(dtype='float64,float64').reshape(-1)
# assign each hazard coordinate to an element in coord using searchsorted
coord_sorter = np.argsort(coord_view)
haz_assign_idx = np.fmin(coord_sorter.size - 1, np.searchsorted(
coord_view, haz_coord_view, side="left", sorter=coord_sorter))
haz_assign_idx = coord_sorter[haz_assign_idx]
# determine which of the assignements match exactly
haz_match_idx = (coord_view[haz_assign_idx] == haz_coord_view).nonzero()[0]
assigned = np.full_like(coord_sorter, -1)
assigned[haz_assign_idx[haz_match_idx]] = haz_match_idx
# assign remaining coordinates to their geographically nearest neighbor
if haz_match_idx.size != coord_view.size:
not_assigned_mask = (assigned == -1)
assigned[not_assigned_mask] = interpol_index(
haz_coord, coord[not_assigned_mask],
method=method, distance=distance, threshold=threshold)
self.gdf[INDICATOR_CENTR + hazard.tag.haz_type] = assigned
[docs] def set_geometry_points(self, scheduler=None):
"""Set geometry attribute of GeoDataFrame with Points from latitude and
longitude attributes.
Parameters:
scheduler (str): used for dask map_partitions. “threads”,
“synchronous” or “processes”
"""
u_coord.set_df_geometry_points(self.gdf, scheduler)
[docs] def set_lat_lon(self):
"""Set latitude and longitude attributes from geometry attribute."""
LOGGER.info('Setting latitude and longitude attributes.')
self.gdf['latitude'] = self.gdf.geometry[:].y
self.gdf['longitude'] = self.gdf.geometry[:].x
[docs] def set_from_raster(self, file_name, band=1, src_crs=None, window=False,
geometry=False, dst_crs=False, transform=None,
width=None, height=None, resampling=Resampling.nearest):
"""Read raster data and set latitude, longitude, value and meta
Parameters:
file_name (str): file name containing values
band (int, optional): bands to read (starting at 1)
src_crs (crs, optional): source CRS. Provide it if error without it.
window (rasterio.windows.Windows, optional): window where data is
extracted
geometry (shapely.geometry, optional): consider pixels only in shape
dst_crs (crs, optional): reproject to given crs
transform (rasterio.Affine): affine transformation to apply
wdith (float): number of lons for transform
height (float): number of lats for transform
resampling (rasterio.warp,.Resampling optional): resampling
function used for reprojection to dst_crs
"""
self.tag = Tag()
self.tag.file_name = str(file_name)
meta, value = u_coord.read_raster(file_name, [band], src_crs, window,
geometry, dst_crs, transform, width,
height, resampling)
ulx, xres, _, uly, _, yres = meta['transform'].to_gdal()
lrx = ulx + meta['width'] * xres
lry = uly + meta['height'] * yres
x_grid, y_grid = np.meshgrid(np.arange(ulx + xres / 2, lrx, xres),
np.arange(uly + yres / 2, lry, yres))
try:
self.gdf.crs = meta['crs'].to_dict()
except AttributeError:
self.gdf.crs = meta['crs']
self.gdf['longitude'] = x_grid.flatten()
self.gdf['latitude'] = y_grid.flatten()
self.gdf['value'] = value.reshape(-1)
self.meta = meta
[docs] def plot_scatter(self, mask=None, ignore_zero=False, pop_name=True,
buffer=0.0, extend='neither', axis=None, figsize=(9, 13),
**kwargs):
"""Plot exposures geometry's value sum scattered over Earth's map.
The plot will we projected according to the current crs.
Parameters:
mask (np.array, optional): mask to apply to eai_exp plotted.
ignore_zero (bool, optional): flag to indicate if zero and negative
values are ignored in plot. Default: False
pop_name (bool, optional): add names of the populated places
buffer (float, optional): border to add to coordinates. Default: 0.0.
extend (str, optional): extend border colorbar with arrows.
[ 'neither' | 'both' | 'min' | 'max' ]
axis (matplotlib.axes._subplots.AxesSubplot, optional): axis to use
figsize (tuple, optional): figure size for plt.subplots
kwargs (optional): arguments for scatter matplotlib function, e.g.
cmap='Greys'. Default: 'Wistia'
Returns:
cartopy.mpl.geoaxes.GeoAxesSubplot
"""
crs_epsg, _ = u_plot.get_transformation(self.crs)
title = self.tag.description
cbar_label = 'Value (%s)' % self.value_unit
if mask is None:
mask = np.ones((self.gdf.shape[0],), dtype=bool)
if ignore_zero:
pos_vals = self.gdf.value[mask].values > 0
else:
pos_vals = np.ones((self.gdf.value[mask].values.size,), dtype=bool)
value = self.gdf.value[mask][pos_vals].values
coord = np.stack([self.gdf.latitude[mask][pos_vals].values,
self.gdf.longitude[mask][pos_vals].values], axis=1)
return u_plot.geo_scatter_from_array(value, coord, cbar_label, title,
pop_name, buffer, extend,
proj=crs_epsg, axes=axis,
figsize=figsize, **kwargs)
[docs] def plot_hexbin(self, mask=None, ignore_zero=False, pop_name=True,
buffer=0.0, extend='neither', axis=None, figsize=(9, 13),
**kwargs):
"""Plot exposures geometry's value sum binned over Earth's map.
An other function for the bins can be set through the key reduce_C_function.
The plot will we projected according to the current crs.
Parameters:
mask (np.array, optional): mask to apply to eai_exp plotted.
ignore_zero (bool, optional): flag to indicate if zero and negative
values are ignored in plot. Default: False
pop_name (bool, optional): add names of the populated places
buffer (float, optional): border to add to coordinates. Default: 0.0.
extend (str, optional): extend border colorbar with arrows.
[ 'neither' | 'both' | 'min' | 'max' ]
axis (matplotlib.axes._subplots.AxesSubplot, optional): axis to use
figsize (tuple): figure size for plt.subplots
kwargs (optional): arguments for hexbin matplotlib function, e.g.
reduce_C_function=np.average. Default: reduce_C_function=np.sum
Returns:
cartopy.mpl.geoaxes.GeoAxesSubplot
"""
crs_epsg, _ = u_plot.get_transformation(self.crs)
title = self.tag.description
cbar_label = 'Value (%s)' % self.value_unit
if 'reduce_C_function' not in kwargs:
kwargs['reduce_C_function'] = np.sum
if mask is None:
mask = np.ones((self.gdf.shape[0],), dtype=bool)
if ignore_zero:
pos_vals = self.gdf.value[mask].values > 0
else:
pos_vals = np.ones((self.gdf.value[mask].values.size,), dtype=bool)
value = self.gdf.value[mask][pos_vals].values
coord = np.stack([self.gdf.latitude[mask][pos_vals].values,
self.gdf.longitude[mask][pos_vals].values], axis=1)
return u_plot.geo_bin_from_array(value, coord, cbar_label, title,
pop_name, buffer, extend, proj=crs_epsg,
axes=axis, figsize=figsize, **kwargs)
[docs] def plot_raster(self, res=None, raster_res=None, save_tiff=None,
raster_f=lambda x: np.log10((np.fmax(x + 1, 1))),
label='value (log10)', scheduler=None, axis=None,
figsize=(9, 13), **kwargs):
"""Generate raster from points geometry and plot it using log10 scale:
np.log10((np.fmax(raster+1, 1))).
Parameters:
res (float, optional): resolution of current data in units of latitude
and longitude, approximated if not provided.
raster_res (float, optional): desired resolution of the raster
save_tiff (str, optional): file name to save the raster in tiff
format, if provided
raster_f (lambda function): transformation to use to data. Default:
log10 adding 1.
label (str): colorbar label
scheduler (str): used for dask map_partitions. “threads”,
“synchronous” or “processes”
axis (matplotlib.axes._subplots.AxesSubplot, optional): axis to use
figsize (tuple, optional): figure size for plt.subplots
kwargs (optional): arguments for imshow matplotlib function
Returns:
matplotlib.figure.Figure, cartopy.mpl.geoaxes.GeoAxesSubplot
"""
if self.meta and self.meta.get('height', 0) * self.meta.get('height', 0) == len(self.gdf):
raster = self.gdf.value.values.reshape((self.meta['height'],
self.meta['width']))
# check raster starts by upper left corner
if self.gdf.latitude.values[0] < self.gdf.latitude.values[-1]:
raster = np.flip(raster, axis=0)
if self.gdf.longitude.values[0] > self.gdf.longitude.values[-1]:
LOGGER.error('Points are not ordered according to meta raster.')
raise ValueError
else:
raster, meta = u_coord.points_to_raster(self.gdf, ['value'], res, raster_res, scheduler)
raster = raster.reshape((meta['height'], meta['width']))
# save tiff
if save_tiff is not None:
with rasterio.open(save_tiff, 'w', driver='GTiff',
height=meta['height'], width=meta['width'], count=1,
dtype=np.float32, crs=self.crs, transform=meta['transform']
) as ras_tiff:
ras_tiff.write(raster.astype(np.float32), 1)
# make plot
proj_data, _ = u_plot.get_transformation(self.crs)
proj_plot = proj_data
if isinstance(proj_data, ccrs.PlateCarree):
# use different projections for plot and data to shift the central lon in the plot
xmin, ymin, xmax, ymax = u_coord.latlon_bounds(
self.gdf.latitude.values, self.gdf.longitude.values)
proj_plot = ccrs.PlateCarree(central_longitude=0.5 * (xmin + xmax))
else:
xmin, ymin, xmax, ymax = (self.gdf.longitude.min(), self.gdf.latitude.min(),
self.gdf.longitude.max(), self.gdf.latitude.max())
if not axis:
_, axis = u_plot.make_map(proj=proj_plot, figsize=figsize)
cbar_ax = make_axes_locatable(axis).append_axes('right', size="6.5%",
pad=0.1, axes_class=plt.Axes)
axis.set_extent((xmin, xmax, ymin, ymax), crs=proj_data)
u_plot.add_shapes(axis)
imag = axis.imshow(raster_f(raster), **kwargs, origin='upper',
extent=(xmin, xmax, ymin, ymax), transform=proj_data)
plt.colorbar(imag, cax=cbar_ax, label=label)
plt.draw()
return axis
[docs] def plot_basemap(self, mask=None, ignore_zero=False, pop_name=True,
buffer=0.0, extend='neither', zoom=10,
url='http://tile.stamen.com/terrain/tileZ/tileX/tileY.png',
axis=None, **kwargs):
"""Scatter points over satellite image using contextily
Parameters:
mask (np.array, optional): mask to apply to eai_exp plotted. Same
size of the exposures, only the selected indexes will be plot.
ignore_zero (bool, optional): flag to indicate if zero and negative
values are ignored in plot. Default: False
pop_name (bool, optional): add names of the populated places
buffer (float, optional): border to add to coordinates. Default: 0.0.
extend (str, optional): extend border colorbar with arrows.
[ 'neither' | 'both' | 'min' | 'max' ]
zoom (int, optional): zoom coefficient used in the satellite image
url (str, optional): image source, e.g. ctx.sources.OSM_C
axis (matplotlib.axes._subplots.AxesSubplot, optional): axis to use
kwargs (optional): arguments for scatter matplotlib function, e.g.
cmap='Greys'. Default: 'Wistia'
Returns:
matplotlib.figure.Figure, cartopy.mpl.geoaxes.GeoAxesSubplot
"""
if 'geometry' not in self.gdf.columns:
self.set_geometry_points()
crs_ori = self.crs
self.to_crs(epsg=3857, inplace=True)
axis = self.plot_scatter(mask, ignore_zero, pop_name, buffer,
extend, shapes=False, axis=axis, **kwargs)
ctx.add_basemap(axis, zoom, url, origin='upper')
axis.set_axis_off()
self.to_crs(crs_ori, inplace=True)
return axis
[docs] def write_hdf5(self, file_name):
"""Write data frame and metadata in hdf5 format
Parameters:
file_name (str): (path and) file name to write to.
"""
LOGGER.info('Writting %s', file_name)
store = pd.HDFStore(file_name)
pandas_df = pd.DataFrame(self.gdf)
for col in pandas_df.columns:
if str(pandas_df[col].dtype) == "geometry":
pandas_df[col] = np.asarray(self.gdf[col])
store.put('exposures', pandas_df)
var_meta = {}
for var in type(self)._metadata:
var_meta[var] = getattr(self, var)
store.get_storer('exposures').attrs.metadata = var_meta
store.close()
[docs] def read_hdf5(self, file_name):
"""Read data frame and metadata in hdf5 format
Parameters:
file_name (str): (path and) file name to read from.
Optional Parameters:
additional_vars (list): list of additional variable names to read that
are not in exposures.base._metadata
"""
LOGGER.info('Reading %s', file_name)
with pd.HDFStore(file_name) as store:
self.__init__(store['exposures'])
metadata = store.get_storer('exposures').attrs.metadata
for key, val in metadata.items():
if key in type(self)._metadata:
setattr(self, key, val)
if key == 'crs':
self.gdf.crs = val
[docs] def read_mat(self, file_name, var_names=None):
"""Read MATLAB file and store variables in exposures.
Parameters:
file_name (str): absolute path file
var_names (dict, optional): dictionary containing the name of the
MATLAB variables. Default: DEF_VAR_MAT.
"""
LOGGER.info('Reading %s', file_name)
if not var_names:
var_names = DEF_VAR_MAT
data = u_hdf5.read(file_name)
try:
data = data[var_names['sup_field_name']]
except KeyError:
pass
try:
data = data[var_names['field_name']]
exposures = dict()
_read_mat_obligatory(exposures, data, var_names)
_read_mat_optional(exposures, data, var_names)
except KeyError as var_err:
LOGGER.error("Not existing variable: %s", str(var_err))
raise var_err
self.gdf = GeoDataFrame(data=exposures, crs=self.crs)
_read_mat_metadata(self, data, file_name, var_names)
#
# Extends the according geopandas method
#
[docs] def to_crs(self, crs=None, epsg=None, inplace=False):
"""Wrapper of the GeoDataFrame.to_crs method.
Transform geometries to a new coordinate reference system.
Transform all geometries in a GeoSeries to a different coordinate reference system.
The crs attribute on the current GeoSeries must be set. Either crs in string or dictionary
form or an EPSG code may be specified for output.
This method will transform all points in all objects. It has no notion or projecting entire
geometries. All segments joining points are assumed to be lines in the current projection,
not geodesics. Objects crossing the dateline (or other projection boundary) will have
undesirable behavior.
Parameters:
crs : dict or str
Output projection parameters as string or in dictionary form.
epsg : int
EPSG code specifying output projection.
inplace : bool, optional, default: False
Whether to return a new GeoDataFrame or do the transformation in
place.
Returns:
None if inplace is True
else a transformed copy of the exposures object
"""
if inplace:
self.gdf.to_crs(crs, epsg, True)
self.meta['crs'] = crs
self.set_lat_lon()
return None
exp = self.copy()
exp.to_crs(crs, epsg, True)
return exp
[docs] def plot(self, *args, **kwargs):
"""Wrapper of the GeoDataFram.plot method"""
self.gdf.plot(*args, **kwargs)
plot.__doc__ = GeoDataFrame.plot.__doc__
[docs] def copy(self, deep=True):
"""Make a copy of this Exposures object.
Parameters
----------
deep (bool): Make a deep copy, i.e. also copy data. Default True.
Returns
-------
Exposures
"""
gdf = self.gdf.copy(deep=deep)
metadata = dict([
(md, copy.deepcopy(self.__dict__[md])) for md in type(self)._metadata
])
metadata['crs'] = self.crs
return type(self)(
gdf,
**metadata
)
[docs] def write_raster(self, file_name, value_name='value', scheduler=None):
"""Write value data into raster file with GeoTiff format
Parameters:
file_name (str): name output file in tif format
"""
if self.meta and self.meta['height'] * self.meta['width'] == len(self.gdf):
raster = self.gdf[value_name].values.reshape((self.meta['height'],
self.meta['width']))
# check raster starts by upper left corner
if self.gdf.latitude.values[0] < self.gdf.latitude.values[-1]:
raster = np.flip(raster, axis=0)
if self.gdf.longitude.values[0] > self.gdf.longitude.values[-1]:
LOGGER.error('Points are not ordered according to meta raster.')
raise ValueError
u_coord.write_raster(file_name, raster, self.meta)
else:
raster, meta = u_coord.points_to_raster(self, [value_name], scheduler=scheduler)
u_coord.write_raster(file_name, raster, meta)
[docs] @staticmethod
def concat(exposures_list):
"""Concatenates Exposures or DataFrame objectss to one Exposures object.
Parameters
----------
exposures_list : list of Exposures or DataFrames
The list must not be empty with the first item supposed to be an Exposures object.
Returns
-------
Exposures
with the metadata of the first item in the list and the dataframes concatenated.
"""
exp = exposures_list[0].copy(deep=False)
df_list = [
ex.gdf if isinstance(ex, Exposures) else ex
for ex in exposures_list
]
exp.gdf = GeoDataFrame(
pd.concat(df_list, ignore_index=True, sort=False),
crs=exp.crs
)
return exp
[docs]def add_sea(exposures, sea_res):
"""Add sea to geometry's surroundings with given resolution. region_id
set to -1 and other variables to 0.
Parameters:
sea_res (tuple): (sea_coast_km, sea_res_km), where first parameter
is distance from coast to fill with water and second parameter
is resolution between sea points
Returns:
Exposures
"""
LOGGER.info("Adding sea at %s km resolution and %s km distance from coast.",
str(sea_res[1]), str(sea_res[0]))
sea_res = (sea_res[0] / ONE_LAT_KM, sea_res[1] / ONE_LAT_KM)
min_lat = max(-90, float(exposures.gdf.latitude.min()) - sea_res[0])
max_lat = min(90, float(exposures.gdf.latitude.max()) + sea_res[0])
min_lon = max(-180, float(exposures.gdf.longitude.min()) - sea_res[0])
max_lon = min(180, float(exposures.gdf.longitude.max()) + sea_res[0])
lat_arr = np.arange(min_lat, max_lat + sea_res[1], sea_res[1])
lon_arr = np.arange(min_lon, max_lon + sea_res[1], sea_res[1])
lon_mgrid, lat_mgrid = np.meshgrid(lon_arr, lat_arr)
lon_mgrid, lat_mgrid = lon_mgrid.ravel(), lat_mgrid.ravel()
on_land = ~u_coord.coord_on_land(lat_mgrid, lon_mgrid)
sea_exp_gdf = GeoDataFrame()
sea_exp_gdf['latitude'] = lat_mgrid[on_land]
sea_exp_gdf['longitude'] = lon_mgrid[on_land]
sea_exp_gdf['region_id'] = np.zeros(sea_exp_gdf.latitude.size, int) - 1
if 'geometry' in exposures.gdf.columns:
u_coord.set_df_geometry_points(sea_exp_gdf)
for var_name in exposures.gdf.columns:
if var_name not in ('latitude', 'longitude', 'region_id', 'geometry'):
sea_exp_gdf[var_name] = np.zeros(sea_exp_gdf.latitude.size,
exposures.gdf[var_name].dtype)
return Exposures(
pd.concat([exposures.gdf, sea_exp_gdf], ignore_index=True, sort=False),
crs=exposures.crs,
ref_year=exposures.ref_year,
value_unit=exposures.value_unit,
meta=exposures.meta,
tag=exposures.tag
)
def _read_mat_obligatory(exposures, data, var_names):
"""Fill obligatory variables."""
exposures['value'] = np.squeeze(data[var_names['var_name']['val']])
exposures['latitude'] = data[var_names['var_name']['lat']].reshape(-1)
exposures['longitude'] = data[var_names['var_name']['lon']].reshape(-1)
exposures[INDICATOR_IF] = np.squeeze(
data[var_names['var_name']['imp']]).astype(int, copy=False)
def _read_mat_optional(exposures, data, var_names):
"""Fill optional parameters."""
try:
exposures['deductible'] = np.squeeze(data[var_names['var_name']['ded']])
except KeyError:
pass
try:
exposures['cover'] = np.squeeze(data[var_names['var_name']['cov']])
except KeyError:
pass
try:
exposures['category_id'] = \
np.squeeze(data[var_names['var_name']['cat']]).astype(int, copy=False)
except KeyError:
pass
try:
exposures['region_id'] = \
np.squeeze(data[var_names['var_name']['reg']]).astype(int, copy=False)
except KeyError:
pass
try:
assigned = np.squeeze(data[var_names['var_name']['ass']]).astype(int, copy=False)
if assigned.size > 0:
exposures[INDICATOR_CENTR] = assigned
except KeyError:
pass
def _read_mat_metadata(exposures, data, file_name, var_names):
"""Fille metadata in DataFrame object"""
try:
exposures.ref_year = int(np.squeeze(data[var_names['var_name']['ref']]))
except KeyError:
exposures.ref_year = DEF_REF_YEAR
try:
exposures.value_unit = u_hdf5.get_str_from_ref(
file_name, data[var_names['var_name']['uni']][0][0])
except KeyError:
exposures.value_unit = DEF_VALUE_UNIT
exposures.tag = Tag(file_name)